Source code for bi_etl.components.w3c_reader

"""
Created on Sep 17, 2014

"""
import csv
import logging
import os
import typing

from sqlalchemy import Column

from bi_etl.components.etlcomponent import ETLComponent
from bi_etl.components.row.row import Row
from bi_etl.scheduler.task import ETLTask
from bi_etl.statistics import Statistics

__all__ = ['W3CReader']

# Only quote none is really needed here, QUOTE_MINIMAL is the default.
# The other quoting levels are only relevant to the Writer
QUOTE_NONE = csv.QUOTE_NONE
QUOTE_MINIMAL = csv.QUOTE_MINIMAL


[docs] class W3CReader(ETLComponent): """ W3CReader reads W3c based log files Args: task: The instance to register in (if not None) filedata: The file to parse as delimited. If str then it's assumed to be a filename. Otherwise it's assumed to be a file object. encoding: The encoding to use when opening the file, if it was a filename and not already opened. Default is None which becomes the Python default encoding errors: The error handling to use when opening the file (if it was a filename and not already opened) Default is 'strict' See above for valid errors values. logical_name: The logical name of this source. Used for log messages. Attributes: column_names: list The names to use for columns """
[docs] def __init__(self, task: ETLTask, filedata: typing.Union[typing.TextIO, str], encoding: str = None, errors: str = 'strict', logical_name: str = None, **kwargs ): self.log = logging.getLogger(f"{self.__class__.__module__}.{self.__class__.__name__}") self.__close_file = False # We have to check / open the file here to get the name for the logical name if isinstance(filedata, str): self.log.info(f"Opening file {filedata}") self.file = open(filedata, mode='rt', newline='', encoding=encoding, errors=errors ) self.__close_file = True else: self.log.info(f"Treating input as file object {filedata}") self.file = filedata if logical_name is None: try: logical_name = os.path.basename(self.file.name) except AttributeError: logical_name = str(self.file) self.read_all = True self._file_read = False self._line_num = 0 self._pending_lines = list() # Don't pass kwargs up. They should be set here at the end super().__init__( task=task, logical_name=logical_name, ) # Should be the last call of every init self.set_kwattrs(**kwargs)
def __repr__(self): return ( f"{self.__class__.__name__}(task={self.task},logical_name={self.logical_name}," f"filedata={self.file},primary_key={self.primary_key},column_names={self.column_names})" ) @property def line_num(self): """ The current line number in the source file. line_num differs from rows_read in that rows_read deals with rows that would be returned to the caller """ return self._line_num def _read_line(self) -> typing.Union[str, None]: if len(self._pending_lines) == 0: if self.read_all: self._pending_lines = self.file.readlines() if self._pending_lines: line = self._pending_lines.pop(0) self._line_num += 1 return line.strip() else: return None else: line = self.file.readline() if line: self._line_num += 1 return line.strip() else: return self._pending_lines.pop(0).strip() def _read_line_no_comments(self) -> typing.Union[str, None]: line = self._read_line() while line is not None and line[0] == '#': line = self._read_line() return line def _obtain_column_names(self): """ Get the column names from the file. ETLComponent only call this if self._column_names is None: """ header_found = False while not header_found: line = self._read_line() if line is None: raise ValueError("EOF found before column names") if line.startswith('#Fields:'): self.column_names = line.split(' ')[1:] header_found = True elif not line.startswith('#'): raise ValueError("Data lines started before column names") if self.trace_data: self.log.debug(f"Column names read: {self.column_names}")
[docs] def where( self, criteria_list: typing.Optional[list] = None, criteria_dict: typing.Optional[dict] = None, order_by: typing.Optional[list] = None, column_list: typing.List[typing.Union[Column, str]] = None, exclude_cols: typing.List[typing.Union[Column, str]] = None, use_cache_as_source: typing.Optional[bool] = None, progress_frequency: typing.Optional[int] = None, stats_id: typing.Optional[str] = None, parent_stats: typing.Optional[Statistics] = None, ) -> typing.Iterable[Row]: """ Parameters ---------- criteria_list: *Not Supported for this component.* criteria_dict: Dict keys should be column names, values are checked for equality with the column on each row. order_by: *Not Supported for this component.* column_list: List of columns names exclude_cols use_cache_as_source progress_frequency stats_id parent_stats Returns ------- rows """ return super().where( criteria_list=criteria_list, criteria_dict=criteria_dict, order_by=order_by, column_list=column_list, exclude_cols=exclude_cols, use_cache_as_source=use_cache_as_source, progress_frequency=progress_frequency, stats_id=stats_id, parent_stats=parent_stats, )
def _raw_rows(self): len_column_names = len(self.column_names) try: this_iteration_header = self.full_iteration_header # noinspection PyTypeChecker done = False while not done: line = self._read_line_no_comments() if line is None: done = True else: if line != '': # Convert empty strings to None to be consistent with DB reads row = line.split(' ') d = self.Row(data=row[:len_column_names], iteration_header=this_iteration_header) len_row = len(row) if len_column_names < len_row: raise ValueError( f'Row {self.line_num} has extra columns Row has {len_row} > Header has {len_column_names}: "{line}"') elif len_column_names > len_row: if self.warnings_issued < self.warnings_limit: self.warnings_issued += 1 self.log.warning( f'Row {self.line_num} is missing columns. Row has {len_row} < Header has {len_column_names}: "{line}"') for key in self.column_names[len_row:]: d[key] = None yield d finally: pass
[docs] def close(self, error: bool = False): """ Close the file """ if not self.is_closed: if self.__close_file: self.file.close() super().close(error=error)