"""
Created on Sep 17, 2014
"""
import codecs
import csv
import logging
import os
from typing import *
from typing import TextIO
from pathlib import Path
from itertools import islice
from bi_etl.components.etlcomponent import ETLComponent
from bi_etl.scheduler.task import ETLTask
from bi_etl.timer import Timer
__all__ = ['CSVReader']
# Only quote none is really needed here, QUOTE_MINIMAL is the default.
# The other quoting levels are only relevant to the Writer
QUOTE_NONE = csv.QUOTE_NONE
QUOTE_MINIMAL = csv.QUOTE_MINIMAL
[docs]
class CSVReader(ETLComponent):
"""
CSVReader is similar to csv.DictReader
However, instead of a dict it uses our :class:`~bi_etl.components.row.row.Row` class as it's return type.
It uses :class:`csv.reader` (in :mod:`csv`) to read the file.
Note optional, but important, parameter ``delimiter``.
**Valid values for `errors` parameter:**
+--------------------+-----------------------------------------------------------+
| Value | Meaning |
+====================+===========================================================+
| 'strict' | raise a ValueError error (or a subclass) |
+--------------------+-----------------------------------------------------------+
| 'ignore' | ignore the character and continue with the next |
+--------------------+-----------------------------------------------------------+
| 'replace' | replace with a suitable replacement character; |
| | Python will use the official U+FFFD REPLACEMENT |
| | CHARACTER for the builtin Unicode codecs on |
| | decoding and '?' on encoding. |
+--------------------+-----------------------------------------------------------+
| 'surrogateescape' | replace with private code points U+DCnn. |
+--------------------+-----------------------------------------------------------+
Args:
task: The instance to register in (if not None)
filedata:
The file to parse as delimited. If str then it's assumed to be a filename.
Otherwise it's assumed to be a file object.
encoding:
The encoding to use when opening the file,
if it was a filename and not already opened.
Default is None which becomes the Python default encoding
errors:
The error handling to use when opening the file
(if it was a filename and not already opened)
Default is 'strict'
See above for valid errors values.
logical_name:
The logical name of this source. Used for log messages.
Attributes:
column_names: list
The names to use for columns
primary_key: str
The name of the primary key column. Only impacts trace messages. Default=None.
dialect: str or subclass of :class:`csv.Dialect`
Default "excel". The dialect value to pass to :mod:`csv`
delimiter: str
The delimiter used in the file. Default is ','.
doublequote: boolean
Controls how instances of quotechar appearing inside a column should themselves be quoted.
When True, the character is doubled.
When False, the escapechar is used as a prefix to the quotechar.
It defaults to True.
escapechar: str
A one-character string used by the writer to escape the delimiter if quoting is set to
QUOTE_NONE and the quotechar if doublequote is False. On reading, the escapechar removes
any special meaning from the following character. It defaults to None, which disables escaping.
quotechar: str
A one-character string used to quote columns containing special characters, such as the delimiter
or quotechar, or which contain new-line characters.
It defaults to '"'.
quoting:
Controls when quotes should be generated by the writer and recognised by the reader.
Can be either of the constants defined in this module.
* QUOTE_NONE
* QUOTE_MINIMAL
Defaults to QUOTE_MINIMAL.
For more details see https://docs.python.org/3/library/csv.html#csv.QUOTE_ALL
skipinitialspace: boolean
When True, whitespace immediately following the delimiter is ignored. The default is False.
strict: boolean
When True, raise exception Error on bad CSV input. The default is False.
header_row: int
The row to parse for headers
start_row: int
The first row to parse for data
log_first_row : boolean
Should we log progress on the first row read. *Only applies if used as a source.*
(inherited from ETLComponent)
max_rows : int, optional
The maximum number of rows to read. *Only applies if Table is used as a source.*
(inherited from ETLComponent)
primary_key: list
The name of the primary key column(s). Only impacts trace messages. Default=None.
(inherited from ETLComponent)
progress_frequency: int
How often (in seconds) to output progress messages. None for no progress messages.
(inherited from ETLComponent)
progress_message: str
The progress message to print. Default is ``"{logical_name} row # {row_number}"``.
Note ``logical_name`` and ``row_number`` subs.
(inherited from ETLComponent)
restkey: str
Column name to catch long rows (extra values).
restval: str
The value to put in columns that are in the column_names but
not present in a given row (missing values).
large_field_support: boolean
Enable support for csv columns bigger than 131,072 default limit.
"""
DELIMITER_SNIFF_LINES = 10
[docs]
def __init__(self,
task: Optional[ETLTask],
filedata: Union[TextIO, str, Path],
encoding: str = None,
errors: str = 'strict',
logical_name: str = None,
**kwargs
):
self.log = logging.getLogger(f"{self.__class__.__module__}.{self.__class__.__name__}")
self.__close_file = False
if isinstance(filedata, Path):
filedata = str(filedata)
# We have to check / open the file here to get the name for the logical name
if isinstance(filedata, str):
self.log.debug(f"Opening file {filedata}")
if encoding is None or encoding in {'utf8', 'utf-8'}:
with open(filedata, 'rb') as binary_file:
begin_of_file = binary_file.read(3)
if begin_of_file == codecs.BOM_UTF8:
encoding = 'utf-8-sig'
self.log.warning(f'BOM found for {filedata} using encoding {encoding}')
else:
self.log.debug(f'No BOM found for {filedata} ')
self.file = open(filedata,
mode='rt',
newline='',
encoding=encoding,
errors=errors
)
self.__close_file = True
else:
self.log.info(f"Treating input as file object {filedata}")
self.file = filedata
if logical_name is None:
try:
logical_name = os.path.basename(self.file.name)
except AttributeError:
logical_name = str(self.file)
# Don't pass kwargs up. They should be set here at the end
super(CSVReader, self).__init__(task=task,
logical_name=logical_name,
)
self.__reader = None
# Begin csv module params
self.dialect = "excel"
self.delimiter = None
self.doublequote = True
self.escapechar = None
self.quotechar = '"'
self.quoting = csv.QUOTE_MINIMAL
self.skipinitialspace = False
self.strict = False
self.large_field_support = False
# End csv module params
self.header_row = 1
self.start_row = 1 # Will be incremented if the header row is read
# column to catch long rows (more values than columns)
self.restkey = 'extra data past last delimiter'
# default value for short rows (value for missing keys)
self.restval = None
self.trace_data = False
self.extra_part_msg_limit = 100
self.extra_part_msg_cnt = 0
# Should be the last call of every init
self.set_kwattrs(**kwargs)
def __repr__(self):
return (
f"{self.__class__.__name__}("
f"task={self.task},logical_name={self.logical_name},filedata={self.file},"
f"primary_key={self.primary_key},column_names={self.column_names})"
)
@property
def reader(self) -> csv.reader:
"""
Build or get the csv.reader object.
"""
if self.__reader is None:
if self.delimiter is None:
delimiters = ''.join([',', '|', '\t'])
try:
# Note: The sniff routine builds a histogram for delimiter frequency
# So we want to make sure to give it whole lines and not cut the
# last line off in the middle.
sample_data = ''.join(islice(self.file, self.DELIMITER_SNIFF_LINES))
# CSV Sniffer does a split on only \n
# so if the file has \r as well those get counted as potential delimiters
sample_data = sample_data.replace('\r\n', '\n')
dialect = csv.Sniffer().sniff(sample_data, delimiters=delimiters)
except csv.Error as e:
try:
name = self.file.name
except AttributeError:
name = self.file
raise ValueError(f"{e} in file {name} delimiters={delimiters}")
if dialect.delimiter not in delimiters:
msg = f'Invalid delimiter \'{dialect.delimiter}\' found in {self.file.name} csv file. ' \
f'Expected one of {delimiters}'
self.log.warning(msg)
raise ValueError(msg)
else:
self.log.info(f'Found delimiter \'{dialect.delimiter}\' in {self.file.name} csv file.')
self.dialect = dialect
self.file.seek(0)
self.delimiter = dialect.delimiter
# Initialize the reader
self.__reader = csv.reader(
self.file,
dialect=self.dialect,
delimiter=self.delimiter,
doublequote=self.doublequote,
escapechar=self.escapechar,
quotechar=self.quotechar,
quoting=self.quoting,
skipinitialspace=self.skipinitialspace,
strict=self.strict,
)
if self.large_field_support:
csv.field_size_limit(2147483647) # largest value it will accept
return self.__reader
@property
def line_num(self):
"""
The current line number in the source file.
line_num differs from rows_read in that rows_read deals with rows that would be returned to the caller
"""
return self.reader.line_num
[docs]
def seek_row(self, target_row):
saved_position = None
saved_line_num = None
if target_row is not None:
current_row = self.line_num + 1
if target_row < current_row:
if self.file.seekable():
saved_position = self.file.tell()
saved_line_num = self.reader.line_num
self.file.seek(0)
try:
self.reader.line_num = 0
except AttributeError:
pass
else:
raise ValueError(f"Un-seekable file, already read past the seek target row {target_row:,}")
elif target_row > current_row:
self.log.info(f"Seeking row {target_row:,}")
stats = self.get_stats_entry('seek')
stats.timer.start()
progress_timer = Timer()
for _ in range(target_row - current_row):
# noinspection PyTypeChecker
next(self.reader, None)
# Increment self.rows_read although they aren't really processed, so we know where we were
stats['rows read'] += 1
stats['lines read'] = self.line_num
if self.progress_frequency is not None:
if progress_timer.seconds_elapsed >= self.progress_frequency:
self.log.info(f"Seek reached row {stats['rows read']:,}")
progress_timer.reset()
self.log.info(f"Done seeking row {self.start_row:,}")
return saved_position, saved_line_num
def __read_header_row(self):
"""
Read the header row. This is a function so it can be overridden with more
complex header parsing.
"""
(saved_position, saved_line_num) = self.seek_row(self.header_row)
if self.start_row == self.header_row:
self.start_row += 1
# noinspection PyTypeChecker
header_row = next(self.reader, None)
if saved_position is not None and saved_position > 0:
new_position = self.file.tell()
if new_position < saved_position:
self.file.seek(saved_position)
self.reader.line_num = saved_line_num
return header_row
def _obtain_column_names(self):
"""
Get the column names from the file. ETLComponent only call this if self._column_names is None:
"""
try:
header_row = self.__read_header_row()
assert header_row is not None, "Header row not found (or empty)"
# Make sure to use setter here! It deals with duplicate names
self.column_names = header_row
except StopIteration:
self.column_names = []
if self.trace_data:
self.log.debug(f"Column names read: {self.column_names}")
def _raw_rows(self):
len_column_names = len(self.column_names)
try:
self.seek_row(self.start_row)
this_iteration_header = self.full_iteration_header
# noinspection PyTypeChecker
for row in self.reader:
if len(row) != 0:
# Convert empty strings to None to be consistent with DB reads
row = [None if s == '' else s for s in row]
d = self.Row(iteration_header=this_iteration_header)
d.update_from_values(row[:len_column_names])
len_row = len(row)
if len_column_names < len_row:
# Note: Adding restkey to the row will create a new iteration header
# (shared by subsequent rows with extra values)
d[self.restkey] = row[len_column_names:]
if self.extra_part_msg_cnt < self.extra_part_msg_limit:
self.extra_part_msg_cnt += 1
self.log.debug(f"Extra part of row {self.rows_read:,} read={row[len_column_names:]}")
if self.extra_part_msg_cnt == self.extra_part_msg_limit:
self.log.debug("No more Extra part of row read messages will be logged")
elif len_column_names > len_row:
# This could be done in a faster way, but hopefully is rare so not worth optimizing
for key in self.column_names[len_row:]:
d[key] = self.restval
yield d
except csv.Error as e:
if 'field larger than field limit' in str(e):
raise ValueError('{}. Please enable CSVReader.large_field_support.')
else:
raise e
[docs]
def close(self, error: bool = False):
"""
Close the file
"""
if not self.is_closed:
if self.__close_file:
self.file.close()
super(CSVReader, self).close(error=error)