Source code for bi_etl.components.csvreader

"""
Created on Sep 17, 2014

"""
import codecs
import csv
import logging
import os
from typing import *
from typing import TextIO
from pathlib import Path

from itertools import islice

from bi_etl.components.etlcomponent import ETLComponent
from bi_etl.scheduler.task import ETLTask
from bi_etl.timer import Timer

__all__ = ['CSVReader']

# Only quote none is really needed here, QUOTE_MINIMAL is the default.
# The other quoting levels are only relevant to the Writer
QUOTE_NONE = csv.QUOTE_NONE
QUOTE_MINIMAL = csv.QUOTE_MINIMAL


[docs] class CSVReader(ETLComponent): """ CSVReader is similar to csv.DictReader However, instead of a dict it uses our :class:`~bi_etl.components.row.row.Row` class as it's return type. It uses :class:`csv.reader` (in :mod:`csv`) to read the file. Note optional, but important, parameter ``delimiter``. **Valid values for `errors` parameter:** +--------------------+-----------------------------------------------------------+ | Value | Meaning | +====================+===========================================================+ | 'strict' | raise a ValueError error (or a subclass) | +--------------------+-----------------------------------------------------------+ | 'ignore' | ignore the character and continue with the next | +--------------------+-----------------------------------------------------------+ | 'replace' | replace with a suitable replacement character; | | | Python will use the official U+FFFD REPLACEMENT | | | CHARACTER for the builtin Unicode codecs on | | | decoding and '?' on encoding. | +--------------------+-----------------------------------------------------------+ | 'surrogateescape' | replace with private code points U+DCnn. | +--------------------+-----------------------------------------------------------+ Args: task: The instance to register in (if not None) filedata: The file to parse as delimited. If str then it's assumed to be a filename. Otherwise it's assumed to be a file object. encoding: The encoding to use when opening the file, if it was a filename and not already opened. Default is None which becomes the Python default encoding errors: The error handling to use when opening the file (if it was a filename and not already opened) Default is 'strict' See above for valid errors values. logical_name: The logical name of this source. Used for log messages. Attributes: column_names: list The names to use for columns primary_key: str The name of the primary key column. Only impacts trace messages. Default=None. dialect: str or subclass of :class:`csv.Dialect` Default "excel". The dialect value to pass to :mod:`csv` delimiter: str The delimiter used in the file. Default is ','. doublequote: boolean Controls how instances of quotechar appearing inside a column should themselves be quoted. When True, the character is doubled. When False, the escapechar is used as a prefix to the quotechar. It defaults to True. escapechar: str A one-character string used by the writer to escape the delimiter if quoting is set to QUOTE_NONE and the quotechar if doublequote is False. On reading, the escapechar removes any special meaning from the following character. It defaults to None, which disables escaping. quotechar: str A one-character string used to quote columns containing special characters, such as the delimiter or quotechar, or which contain new-line characters. It defaults to '"'. quoting: Controls when quotes should be generated by the writer and recognised by the reader. Can be either of the constants defined in this module. * QUOTE_NONE * QUOTE_MINIMAL Defaults to QUOTE_MINIMAL. For more details see https://docs.python.org/3/library/csv.html#csv.QUOTE_ALL skipinitialspace: boolean When True, whitespace immediately following the delimiter is ignored. The default is False. strict: boolean When True, raise exception Error on bad CSV input. The default is False. header_row: int The row to parse for headers start_row: int The first row to parse for data log_first_row : boolean Should we log progress on the first row read. *Only applies if used as a source.* (inherited from ETLComponent) max_rows : int, optional The maximum number of rows to read. *Only applies if Table is used as a source.* (inherited from ETLComponent) primary_key: list The name of the primary key column(s). Only impacts trace messages. Default=None. (inherited from ETLComponent) progress_frequency: int How often (in seconds) to output progress messages. None for no progress messages. (inherited from ETLComponent) progress_message: str The progress message to print. Default is ``"{logical_name} row # {row_number}"``. Note ``logical_name`` and ``row_number`` subs. (inherited from ETLComponent) restkey: str Column name to catch long rows (extra values). restval: str The value to put in columns that are in the column_names but not present in a given row (missing values). large_field_support: boolean Enable support for csv columns bigger than 131,072 default limit. """ DELIMITER_SNIFF_LINES = 10
[docs] def __init__(self, task: Optional[ETLTask], filedata: Union[TextIO, str, Path], encoding: str = None, errors: str = 'strict', logical_name: str = None, **kwargs ): self.log = logging.getLogger(f"{self.__class__.__module__}.{self.__class__.__name__}") self.__close_file = False if isinstance(filedata, Path): filedata = str(filedata) # We have to check / open the file here to get the name for the logical name if isinstance(filedata, str): self.log.debug(f"Opening file {filedata}") if encoding is None or encoding in {'utf8', 'utf-8'}: with open(filedata, 'rb') as binary_file: begin_of_file = binary_file.read(3) if begin_of_file == codecs.BOM_UTF8: encoding = 'utf-8-sig' self.log.warning(f'BOM found for {filedata} using encoding {encoding}') else: self.log.debug(f'No BOM found for {filedata} ') self.file = open(filedata, mode='rt', newline='', encoding=encoding, errors=errors ) self.__close_file = True else: self.log.info(f"Treating input as file object {filedata}") self.file = filedata if logical_name is None: try: logical_name = os.path.basename(self.file.name) except AttributeError: logical_name = str(self.file) # Don't pass kwargs up. They should be set here at the end super(CSVReader, self).__init__(task=task, logical_name=logical_name, ) self.__reader = None # Begin csv module params self.dialect = "excel" self.delimiter = None self.doublequote = True self.escapechar = None self.quotechar = '"' self.quoting = csv.QUOTE_MINIMAL self.skipinitialspace = False self.strict = False self.large_field_support = False # End csv module params self.header_row = 1 self.start_row = 1 # Will be incremented if the header row is read # column to catch long rows (more values than columns) self.restkey = 'extra data past last delimiter' # default value for short rows (value for missing keys) self.restval = None self.trace_data = False self.extra_part_msg_limit = 100 self.extra_part_msg_cnt = 0 # Should be the last call of every init self.set_kwattrs(**kwargs)
def __repr__(self): return ( f"{self.__class__.__name__}(" f"task={self.task},logical_name={self.logical_name},filedata={self.file}," f"primary_key={self.primary_key},column_names={self.column_names})" ) @property def reader(self) -> csv.reader: """ Build or get the csv.reader object. """ if self.__reader is None: if self.delimiter is None: delimiters = ''.join([',', '|', '\t']) try: # Note: The sniff routine builds a histogram for delimiter frequency # So we want to make sure to give it whole lines and not cut the # last line off in the middle. sample_data = ''.join(islice(self.file, self.DELIMITER_SNIFF_LINES)) # CSV Sniffer does a split on only \n # so if the file has \r as well those get counted as potential delimiters sample_data = sample_data.replace('\r\n', '\n') dialect = csv.Sniffer().sniff(sample_data, delimiters=delimiters) except csv.Error as e: try: name = self.file.name except AttributeError: name = self.file raise ValueError(f"{e} in file {name} delimiters={delimiters}") if dialect.delimiter not in delimiters: msg = f'Invalid delimiter \'{dialect.delimiter}\' found in {self.file.name} csv file. ' \ f'Expected one of {delimiters}' self.log.warning(msg) raise ValueError(msg) else: self.log.info(f'Found delimiter \'{dialect.delimiter}\' in {self.file.name} csv file.') self.dialect = dialect self.file.seek(0) self.delimiter = dialect.delimiter # Initialize the reader self.__reader = csv.reader( self.file, dialect=self.dialect, delimiter=self.delimiter, doublequote=self.doublequote, escapechar=self.escapechar, quotechar=self.quotechar, quoting=self.quoting, skipinitialspace=self.skipinitialspace, strict=self.strict, ) if self.large_field_support: csv.field_size_limit(2147483647) # largest value it will accept return self.__reader @property def line_num(self): """ The current line number in the source file. line_num differs from rows_read in that rows_read deals with rows that would be returned to the caller """ return self.reader.line_num
[docs] def seek_row(self, target_row): saved_position = None saved_line_num = None if target_row is not None: current_row = self.line_num + 1 if target_row < current_row: if self.file.seekable(): saved_position = self.file.tell() saved_line_num = self.reader.line_num self.file.seek(0) try: self.reader.line_num = 0 except AttributeError: pass else: raise ValueError(f"Un-seekable file, already read past the seek target row {target_row:,}") elif target_row > current_row: self.log.info(f"Seeking row {target_row:,}") stats = self.get_stats_entry('seek') stats.timer.start() progress_timer = Timer() for _ in range(target_row - current_row): # noinspection PyTypeChecker next(self.reader, None) # Increment self.rows_read although they aren't really processed, so we know where we were stats['rows read'] += 1 stats['lines read'] = self.line_num if self.progress_frequency is not None: if progress_timer.seconds_elapsed >= self.progress_frequency: self.log.info(f"Seek reached row {stats['rows read']:,}") progress_timer.reset() self.log.info(f"Done seeking row {self.start_row:,}") return saved_position, saved_line_num
def __read_header_row(self): """ Read the header row. This is a function so it can be overridden with more complex header parsing. """ (saved_position, saved_line_num) = self.seek_row(self.header_row) if self.start_row == self.header_row: self.start_row += 1 # noinspection PyTypeChecker header_row = next(self.reader, None) if saved_position is not None and saved_position > 0: new_position = self.file.tell() if new_position < saved_position: self.file.seek(saved_position) self.reader.line_num = saved_line_num return header_row def _obtain_column_names(self): """ Get the column names from the file. ETLComponent only call this if self._column_names is None: """ try: header_row = self.__read_header_row() assert header_row is not None, "Header row not found (or empty)" # Make sure to use setter here! It deals with duplicate names self.column_names = header_row except StopIteration: self.column_names = [] if self.trace_data: self.log.debug(f"Column names read: {self.column_names}") def _raw_rows(self): len_column_names = len(self.column_names) try: self.seek_row(self.start_row) this_iteration_header = self.full_iteration_header # noinspection PyTypeChecker for row in self.reader: if len(row) != 0: # Convert empty strings to None to be consistent with DB reads row = [None if s == '' else s for s in row] d = self.Row(iteration_header=this_iteration_header) d.update_from_values(row[:len_column_names]) len_row = len(row) if len_column_names < len_row: # Note: Adding restkey to the row will create a new iteration header # (shared by subsequent rows with extra values) d[self.restkey] = row[len_column_names:] if self.extra_part_msg_cnt < self.extra_part_msg_limit: self.extra_part_msg_cnt += 1 self.log.debug(f"Extra part of row {self.rows_read:,} read={row[len_column_names:]}") if self.extra_part_msg_cnt == self.extra_part_msg_limit: self.log.debug("No more Extra part of row read messages will be logged") elif len_column_names > len_row: # This could be done in a faster way, but hopefully is rare so not worth optimizing for key in self.column_names[len_row:]: d[key] = self.restval yield d except csv.Error as e: if 'field larger than field limit' in str(e): raise ValueError('{}. Please enable CSVReader.large_field_support.') else: raise e
[docs] def close(self, error: bool = False): """ Close the file """ if not self.is_closed: if self.__close_file: self.file.close() super(CSVReader, self).close(error=error)