"""
Created on Sep 17, 2014
"""
import csv
import io
import logging
import os
from typing import *
from typing import TextIO
from pathlib import Path
from bi_etl.components.etlcomponent import ETLComponent
from bi_etl.components.row.row import Row
from bi_etl.scheduler.task import ETLTask
from bi_etl.statistics import Statistics
__all__ = ['CSVWriter', 'QUOTE_NONE', 'QUOTE_MINIMAL']
# Only quote none is really needed here, QUOTE_MINIMAL is the default.
# The other quoting levels are only relevant to the Writer
QUOTE_NONE = csv.QUOTE_NONE
QUOTE_MINIMAL = csv.QUOTE_MINIMAL
[docs]
class CSVWriter(ETLComponent):
"""
CSVWriter is similar to csv.DictWriter
However, instead of a dict it uses our :class:`~bi_etl.components.row.row.Row` class as it's return type.
It uses :class:`csv.reader` (in :mod:`csv`) to read the file.
Note the optional, but important, parameter ``delimiter``.
**Valid values for ``errors`` parameter:**
+-----------------------+-----------------------------------------------------------+
| Value | Meaning |
+=======================+===========================================================+
| ``'strict'`` | raise a ValueError error (or a subclass) |
+-----------------------+-----------------------------------------------------------+
| ``'ignore'`` | ignore the character and continue with the next |
+-----------------------+-----------------------------------------------------------+
| ``'replace'`` | replace with a suitable replacement character; |
| | Python will use the official U+FFFD REPLACEMENT |
| | CHARACTER for the builtin Unicode codecs on |
| | decoding and '?' on encoding. |
+-----------------------+-----------------------------------------------------------+
| ``'surrogateescape'`` | replace with private code points U+DCnn. |
+-----------------------+-----------------------------------------------------------+
Args:
task: The ETLTask instance to register in (if not None)
file_data:
The file to parse as delimited. If str then it's assumed to be a filename.
Otherwise, it's assumed to be a file object.
column_names:
The names to use for columns
include_header:
Should the first line of the output file be a header with the column names?
append:
Should the file be appended to instead of overwritten (if it already exists)?
encoding:
The encoding to use when opening the file,
if it was a filename and not already opened.
Default is None which becomes the Python default encoding
errors:
The error handling to use when opening the file
(if it was a filename and not already opened)
Default is ``'strict'``
See tabke above for valid errors values.
logical_name:
The logical name of this source. Used for log messages.
kwargs:
Optional Key word arguments. See below.
Attributes:
dialect: str or subclass of :class:`csv.Dialect`
Default "excel". The dialect value to pass to :mod:`csv`
delimiter: str
The delimiter used in the file. Default is comma ``','``.
doublequote: boolean
Controls how instances of quotechar appearing inside a column should themselves be quoted.
When True, the character is doubled.
When False, the escapechar is used as a prefix to the quotechar.
It defaults to True.
escapechar: str
A one-character string used by the writer to escape the delimiter if quoting is set to
QUOTE_NONE and the quotechar if doublequote is False. On reading, the escapechar removes
any special meaning from the following character. It defaults to None, which disables escaping.
quotechar: str
A one-character string used to quote columns containing special characters, such as the delimiter
or quotechar, or which contain new-line characters.
It defaults to '"'.
quoting:
Controls when quotes should be generated by the writer and recognised by the reader.
Can be either of the constants defined in this module.
* QUOTE_NONE
* QUOTE_MINIMAL
Defaults to QUOTE_MINIMAL.
For more details see https://docs.python.org/3/library/csv.html#csv.QUOTE_ALL
skipinitialspace: boolean
When True, whitespace immediately following the delimiter is ignored. The default is False.
strict: boolean
When True, raise exception Error on bad CSV input. The default is False.
header_row: int
The row to parse for headers
start_row: int
The first row to parse for data
log_first_row : boolean
Should we log progress on the first row read. *Only applies if used as a source.*
(inherited from ETLComponent)
max_rows : int, optional
The maximum number of rows to read. *Only applies if Table is used as a source.*
(inherited from ETLComponent)
primary_key: list
The name of the primary key column(s). Only impacts trace messages. Default=None.
(inherited from ETLComponent)
progress_frequency: int
How often (in seconds) to output progress messages. None for no progress messages.
(inherited from ETLComponent)
progress_message: str
The progress message to print. Default is ``"{logical_name} row # {row_number}"``.
Note ``logical_name`` and ``row_number`` subs.
(inherited from ETLComponent)
restkey: str
Column name to catch long rows (extra values).
restval: str
The value to put in columns that are in the column_names but
not present in a given row (missing values).
large_field_support: boolean
Enable support for csv columns bigger than 131,072 default limit.
"""
[docs]
def __init__(self,
task: Optional[ETLTask],
file_data: Union[TextIO, str, Path],
column_names: List[str],
include_header: Optional[bool] = True,
append: bool = False,
encoding: Optional[str] = None,
errors: str = 'strict',
logical_name: Optional[str] = None,
**kwargs
):
self.log = logging.getLogger(f"{self.__class__.__module__}.{self.__class__.__name__}")
self.__close_file = False
self.include_header = include_header
self._header_written = False
self.column_names = column_names
self._save_append = append
self._save_encoding = encoding
self._save_errors = errors
if isinstance(file_data, Path):
file_data = str(file_data)
# We have to check / open the file here to get the name for the logical name
if isinstance(file_data, str):
self.log.info(f"Opening file {file_data} for writing")
if append:
file_mode = 'at'
if os.path.exists(file_data):
self._header_written = True
else:
file_mode = 'wt'
self.file = open(
file_data,
mode=file_mode,
newline='',
encoding=encoding,
errors=errors
)
self.__close_file = True
else:
if not isinstance(file_data, io.IOBase):
self.log.info(f"Treating input as file object {file_data}")
self.file = file_data
if logical_name is None:
try:
logical_name = os.path.basename(self.file.name)
except AttributeError:
logical_name = str(self.file)
# Don't pass kwargs up. They should be set here at the end
super().__init__(
task=task,
logical_name=logical_name,
)
self.__writer = None
# Begin csv module params
self.dialect = "excel"
self.delimiter = ','
self.doublequote = True
self.escapechar = None
self.quotechar = '"'
self.quoting = csv.QUOTE_MINIMAL
self.skipinitialspace = False
self.strict = False
self.large_field_support = False
self.lineterminator = '\n'
self.null = None
# End csv module params
self.fix_blank_strings = True
self.trace_data = False
# Should be the last call of every init
self.set_kwattrs(**kwargs)
def __reduce_ex__(self, protocol):
return (
# A callable object that will be called to create the initial version of the object.
self.__class__,
# A tuple of arguments for the callable object. An empty tuple must be given if the callable does not accept any argument
(
self.task,
self.file.name,
self.column_names,
self.include_header,
self._save_append,
self._save_encoding,
self._save_errors,
self.logical_name,
),
# Optionally, the object’s state, which will be passed to the object’s __setstate__() method as previously described.
# If the object has no such method then, the value must be a dictionary and it will be added to the object’s __dict__ attribute.
None,
# Optionally, an iterator (and not a sequence) yielding successive items.
# These items will be appended to the object either using obj.append(item) or, in batch, using obj.extend(list_of_items).
# Optionally, an iterator (not a sequence) yielding successive key-value pairs.
# These items will be stored to the object using obj[key] = value
# PROTOCOL 5+ only
# Optionally, a callable with a (obj, state) signature.
# This callable allows the user to programmatically control the state-updating behavior of a specific object,
# instead of using obj’s static __setstate__() method.
# If not None, this callable will have priority over obj’s __setstate__().
)
def __repr__(self):
return (
f"{self.__class__.__name__}(task={self.task},"
f"logical_name={self.logical_name}, "
f"filedata={self.file}, "
f"primary_key={self.primary_key}, "
f"column_names={self.column_names})"
)
[docs]
def get_writer(self) -> csv.writer:
"""
Build or get the csv.DictWriter object.
"""
if self.__writer is None:
# Initialize the reader
self.__writer = csv.DictWriter(
self.file,
fieldnames=self.column_names,
dialect=self.dialect,
delimiter=self.delimiter,
doublequote=self.doublequote,
escapechar=self.escapechar,
quotechar=self.quotechar,
quoting=self.quoting,
skipinitialspace=self.skipinitialspace,
strict=self.strict,
lineterminator=self.lineterminator,
)
if self.large_field_support:
csv.field_size_limit(2147483647) # largest value it will accept
return self.__writer
[docs]
def close(self, error: bool = False):
"""
Close the file
"""
if not self.is_closed:
if self.__close_file:
self.file.close()
super().close(error=error)
[docs]
def insert_row(
self,
source_row: Row, # Must be a single row
additional_insert_values: dict = None,
stat_name: str = 'insert',
parent_stats: Statistics = None,
) -> Row:
"""
Inserts a row into the target file.
Parameters
----------
source_row:
The row with values to insert
additional_insert_values:
Values to add / override in the row before inserting.
stat_name:
Name of this step for the ETLTask statistics.
parent_stats:
Optional Statistics object to nest this steps statistics in.
Default is to place statistics in the ETLTask level statistics.
Returns
-------
new_row
"""
stats = self.get_stats_entry(stat_name, parent_stats=parent_stats)
stats.timer.start()
if not self._header_written:
self.write_header()
assert len(source_row) > 0, "Empty row passed into csv insert"
new_row = source_row.subset(keep_only=self.column_names)
assert len(new_row) > 0, "Empty row generated using columns passed into csv insert"
if additional_insert_values:
for colName, value in additional_insert_values.items():
new_row[colName] = value
if self.trace_data:
self.log.debug(f"{self} Raw row being inserted:\n{new_row.str_formatted()}")
# Set blank strings to single space, so that they differ from None values
for colName, value in new_row.items():
if value is None:
new_row[colName] = self.null
else:
if self.fix_blank_strings:
if value == '':
new_row[colName] = ' '
stats.add_to_stat('inserts', 1)
self.get_writer().writerow(new_row)
stats.timer.stop()
return new_row
[docs]
def insert(self,
source: Union[Row, list],
additional_insert_values: dict = None,
parent_stats: Statistics = None,
**kwargs
):
"""
Insert a row or list of rows in the table.
Parameters
----------
source: :class:`Row` or list thereof
Row(s) to insert
additional_insert_values: dict
Additional values to set on each row.
parent_stats: bi_etl.statistics.Statistics
Optional Statistics object to nest this steps statistics in.
Default is to place statistics in the ETLTask level statistics.
"""
if isinstance(source, list):
for row in source:
self.insert_row(
row,
additional_insert_values=additional_insert_values,
parent_stats=parent_stats,
**kwargs
)
return None
else:
return self.insert_row(
source,
additional_insert_values=additional_insert_values,
parent_stats=parent_stats,
**kwargs
)