Source code for bi_etl.components.csv_writer

"""
Created on Sep 17, 2014

"""
import csv
import io
import logging
import os
from typing import *
from typing import TextIO
from pathlib import Path

from bi_etl.components.etlcomponent import ETLComponent
from bi_etl.components.row.row import Row
from bi_etl.scheduler.task import ETLTask
from bi_etl.statistics import Statistics

__all__ = ['CSVWriter', 'QUOTE_NONE', 'QUOTE_MINIMAL']

# Only quote none is really needed here, QUOTE_MINIMAL is the default.
# The other quoting levels are only relevant to the Writer
QUOTE_NONE = csv.QUOTE_NONE
QUOTE_MINIMAL = csv.QUOTE_MINIMAL


[docs] class CSVWriter(ETLComponent): """ CSVWriter is similar to csv.DictWriter However, instead of a dict it uses our :class:`~bi_etl.components.row.row.Row` class as it's return type. It uses :class:`csv.reader` (in :mod:`csv`) to read the file. Note the optional, but important, parameter ``delimiter``. **Valid values for ``errors`` parameter:** +-----------------------+-----------------------------------------------------------+ | Value | Meaning | +=======================+===========================================================+ | ``'strict'`` | raise a ValueError error (or a subclass) | +-----------------------+-----------------------------------------------------------+ | ``'ignore'`` | ignore the character and continue with the next | +-----------------------+-----------------------------------------------------------+ | ``'replace'`` | replace with a suitable replacement character; | | | Python will use the official U+FFFD REPLACEMENT | | | CHARACTER for the builtin Unicode codecs on | | | decoding and '?' on encoding. | +-----------------------+-----------------------------------------------------------+ | ``'surrogateescape'`` | replace with private code points U+DCnn. | +-----------------------+-----------------------------------------------------------+ Args: task: The ETLTask instance to register in (if not None) file_data: The file to parse as delimited. If str then it's assumed to be a filename. Otherwise, it's assumed to be a file object. column_names: The names to use for columns include_header: Should the first line of the output file be a header with the column names? append: Should the file be appended to instead of overwritten (if it already exists)? encoding: The encoding to use when opening the file, if it was a filename and not already opened. Default is None which becomes the Python default encoding errors: The error handling to use when opening the file (if it was a filename and not already opened) Default is ``'strict'`` See tabke above for valid errors values. logical_name: The logical name of this source. Used for log messages. kwargs: Optional Key word arguments. See below. Attributes: dialect: str or subclass of :class:`csv.Dialect` Default "excel". The dialect value to pass to :mod:`csv` delimiter: str The delimiter used in the file. Default is comma ``','``. doublequote: boolean Controls how instances of quotechar appearing inside a column should themselves be quoted. When True, the character is doubled. When False, the escapechar is used as a prefix to the quotechar. It defaults to True. escapechar: str A one-character string used by the writer to escape the delimiter if quoting is set to QUOTE_NONE and the quotechar if doublequote is False. On reading, the escapechar removes any special meaning from the following character. It defaults to None, which disables escaping. quotechar: str A one-character string used to quote columns containing special characters, such as the delimiter or quotechar, or which contain new-line characters. It defaults to '"'. quoting: Controls when quotes should be generated by the writer and recognised by the reader. Can be either of the constants defined in this module. * QUOTE_NONE * QUOTE_MINIMAL Defaults to QUOTE_MINIMAL. For more details see https://docs.python.org/3/library/csv.html#csv.QUOTE_ALL skipinitialspace: boolean When True, whitespace immediately following the delimiter is ignored. The default is False. strict: boolean When True, raise exception Error on bad CSV input. The default is False. header_row: int The row to parse for headers start_row: int The first row to parse for data log_first_row : boolean Should we log progress on the first row read. *Only applies if used as a source.* (inherited from ETLComponent) max_rows : int, optional The maximum number of rows to read. *Only applies if Table is used as a source.* (inherited from ETLComponent) primary_key: list The name of the primary key column(s). Only impacts trace messages. Default=None. (inherited from ETLComponent) progress_frequency: int How often (in seconds) to output progress messages. None for no progress messages. (inherited from ETLComponent) progress_message: str The progress message to print. Default is ``"{logical_name} row # {row_number}"``. Note ``logical_name`` and ``row_number`` subs. (inherited from ETLComponent) restkey: str Column name to catch long rows (extra values). restval: str The value to put in columns that are in the column_names but not present in a given row (missing values). large_field_support: boolean Enable support for csv columns bigger than 131,072 default limit. """
[docs] def __init__(self, task: Optional[ETLTask], file_data: Union[TextIO, str, Path], column_names: List[str], include_header: Optional[bool] = True, append: bool = False, encoding: Optional[str] = None, errors: str = 'strict', logical_name: Optional[str] = None, **kwargs ): self.log = logging.getLogger(f"{self.__class__.__module__}.{self.__class__.__name__}") self.__close_file = False self.include_header = include_header self._header_written = False self.column_names = column_names self._save_append = append self._save_encoding = encoding self._save_errors = errors if isinstance(file_data, Path): file_data = str(file_data) # We have to check / open the file here to get the name for the logical name if isinstance(file_data, str): self.log.info(f"Opening file {file_data} for writing") if append: file_mode = 'at' if os.path.exists(file_data): self._header_written = True else: file_mode = 'wt' self.file = open( file_data, mode=file_mode, newline='', encoding=encoding, errors=errors ) self.__close_file = True else: if not isinstance(file_data, io.IOBase): self.log.info(f"Treating input as file object {file_data}") self.file = file_data if logical_name is None: try: logical_name = os.path.basename(self.file.name) except AttributeError: logical_name = str(self.file) # Don't pass kwargs up. They should be set here at the end super().__init__( task=task, logical_name=logical_name, ) self.__writer = None # Begin csv module params self.dialect = "excel" self.delimiter = ',' self.doublequote = True self.escapechar = None self.quotechar = '"' self.quoting = csv.QUOTE_MINIMAL self.skipinitialspace = False self.strict = False self.large_field_support = False self.lineterminator = '\n' self.null = None # End csv module params self.fix_blank_strings = True self.trace_data = False # Should be the last call of every init self.set_kwattrs(**kwargs)
def __reduce_ex__(self, protocol): return ( # A callable object that will be called to create the initial version of the object. self.__class__, # A tuple of arguments for the callable object. An empty tuple must be given if the callable does not accept any argument ( self.task, self.file.name, self.column_names, self.include_header, self._save_append, self._save_encoding, self._save_errors, self.logical_name, ), # Optionally, the object’s state, which will be passed to the object’s __setstate__() method as previously described. # If the object has no such method then, the value must be a dictionary and it will be added to the object’s __dict__ attribute. None, # Optionally, an iterator (and not a sequence) yielding successive items. # These items will be appended to the object either using obj.append(item) or, in batch, using obj.extend(list_of_items). # Optionally, an iterator (not a sequence) yielding successive key-value pairs. # These items will be stored to the object using obj[key] = value # PROTOCOL 5+ only # Optionally, a callable with a (obj, state) signature. # This callable allows the user to programmatically control the state-updating behavior of a specific object, # instead of using obj’s static __setstate__() method. # If not None, this callable will have priority over obj’s __setstate__(). ) def __repr__(self): return ( f"{self.__class__.__name__}(task={self.task}," f"logical_name={self.logical_name}, " f"filedata={self.file}, " f"primary_key={self.primary_key}, " f"column_names={self.column_names})" )
[docs] def get_writer(self) -> csv.writer: """ Build or get the csv.DictWriter object. """ if self.__writer is None: # Initialize the reader self.__writer = csv.DictWriter( self.file, fieldnames=self.column_names, dialect=self.dialect, delimiter=self.delimiter, doublequote=self.doublequote, escapechar=self.escapechar, quotechar=self.quotechar, quoting=self.quoting, skipinitialspace=self.skipinitialspace, strict=self.strict, lineterminator=self.lineterminator, ) if self.large_field_support: csv.field_size_limit(2147483647) # largest value it will accept return self.__writer
[docs] def write_header(self): """ Write the header row. """ if self.include_header: self.get_writer().writeheader() self._header_written = True
[docs] def close(self, error: bool = False): """ Close the file """ if not self.is_closed: if self.__close_file: self.file.close() super().close(error=error)
[docs] def insert_row( self, source_row: Row, # Must be a single row additional_insert_values: dict = None, stat_name: str = 'insert', parent_stats: Statistics = None, ) -> Row: """ Inserts a row into the target file. Parameters ---------- source_row: The row with values to insert additional_insert_values: Values to add / override in the row before inserting. stat_name: Name of this step for the ETLTask statistics. parent_stats: Optional Statistics object to nest this steps statistics in. Default is to place statistics in the ETLTask level statistics. Returns ------- new_row """ stats = self.get_stats_entry(stat_name, parent_stats=parent_stats) stats.timer.start() if not self._header_written: self.write_header() assert len(source_row) > 0, "Empty row passed into csv insert" new_row = source_row.subset(keep_only=self.column_names) assert len(new_row) > 0, "Empty row generated using columns passed into csv insert" if additional_insert_values: for colName, value in additional_insert_values.items(): new_row[colName] = value if self.trace_data: self.log.debug(f"{self} Raw row being inserted:\n{new_row.str_formatted()}") # Set blank strings to single space, so that they differ from None values for colName, value in new_row.items(): if value is None: new_row[colName] = self.null else: if self.fix_blank_strings: if value == '': new_row[colName] = ' ' stats.add_to_stat('inserts', 1) self.get_writer().writerow(new_row) stats.timer.stop() return new_row
[docs] def insert(self, source: Union[Row, list], additional_insert_values: dict = None, parent_stats: Statistics = None, **kwargs ): """ Insert a row or list of rows in the table. Parameters ---------- source: :class:`Row` or list thereof Row(s) to insert additional_insert_values: dict Additional values to set on each row. parent_stats: bi_etl.statistics.Statistics Optional Statistics object to nest this steps statistics in. Default is to place statistics in the ETLTask level statistics. """ if isinstance(source, list): for row in source: self.insert_row( row, additional_insert_values=additional_insert_values, parent_stats=parent_stats, **kwargs ) return None else: return self.insert_row( source, additional_insert_values=additional_insert_values, parent_stats=parent_stats, **kwargs )
[docs] def commit(self): pass