Source code for bi_etl.components.data_analyzer

"""
Created on Oct 9, 2015

@author: Derek Wood
"""
import io
from collections import defaultdict
from decimal import Context, ROUND_HALF_EVEN
from operator import itemgetter
from sys import stdout
from typing import Optional, Union

from bi_etl.components.etlcomponent import ETLComponent
from bi_etl.conversions import str2decimal, str2date
from bi_etl.scheduler.task import ETLTask
from bi_etl.utility import get_integer_places


# noinspection PyBroadException
[docs] class DataAnalyzer(ETLComponent): """ Class that analyzes the data rows passed to it. * Tracks distinct columns passed in * Tracks datatype of each column * Tracks valid values of each column Parameters ---------- task: ETLTask The instance to register in (if not None) logical_name: str The logical name of this source. Used for log messages. """ COLUMN_HEADERS_DICT = { 'col': 'Column Name', 'type': 'Data Type', 'non_null_rows': 'Non-Null Rows', 'cardinality': 'Cardinality', 'msg': 'Message', 'present': 'Rows with this column', 'not_present_on_rows': 'Rows without this column', 'most_common_value': 'Most Common Value', } COLUMN_HEADERS_FORMAT = "{col:45} {type:20} {non_null_rows:>15} {cardinality:>15} {msg}" COLUMN_HEADERS = COLUMN_HEADERS_FORMAT.format(**COLUMN_HEADERS_DICT) DEFAULT_FORMAT = "{col:45} {type:20} {non_null_rows:15,} {cardinality:15,} {msg}" EQUALS_FORMAT = "{col:45} type = {type:20} non_null_rows={non_null_rows:15,} cardinality={cardinality:15,} {msg}" PIPE_FORMAT = "{col}|{type}|{present}|{not_present_on_rows}|{non_null_rows}|{cardinality}|{most_common_value}|{msg}" PIPE_HEADERS = PIPE_FORMAT.format(**COLUMN_HEADERS_DICT)
[docs] class DataType(object):
[docs] def __init__(self, name, length=None, precision=None, fmt=None): self.name = name self.length = length self.precision = precision self.format = fmt
def __repr__(self): return f"{self.name}({self.length},{self.precision},fmt={self.format})" def __str__(self): if self.length is None and self.format is None: return self.name if self.format is not None: return f"{self.name}({self.format})" elif self.precision is None: return f"{self.name}({self.length})" else: return f"{self.name}({self.length},{self.precision})"
[docs] def __init__(self, task: Optional[ETLTask] = None, logical_name: str = 'DataAnalyzer', **kwargs ): # Don't pass kwargs up. They should be set here at the end super(DataAnalyzer, self).__init__(task=task, logical_name=logical_name) self.float_as_decimal = False self.rows_processed = 0 self.column_names = list() self.column_valid_values = dict() self.column_data_types = dict() self.column_data_types_counts = dict() self.column_names_consistent = True self.new_columns_after_first_row = False self.column_present_count = dict() self.column_not_null = dict() self.duplicate_column_names = dict() # Row level storage self.row_column_name_set = set() # Should be the last call of every init self.set_kwattrs(**kwargs)
def _reset_storage(self): self.rows_processed = 0 self.column_names = list() self.column_valid_values = dict() self.column_data_types = dict() self.column_data_types_counts = dict() self.column_names_consistent = True self.new_columns_after_first_row = False self.column_present_count = dict() self.column_not_null = dict() self.duplicate_column_names = dict() # Row level storage self.row_column_name_set = set() def __iter__(self): return None
[docs] def close(self, error: bool = False): if not self.is_closed: super(DataAnalyzer, self).close(error=error) self._reset_storage()
def _type_from_value(self, value): if isinstance(value, str): # Look for numbers in text try: dec = str2decimal(value) # If the value has no fractional digits, return integer. # Note: We could use _isinteger() however that calls 1.0 an integer. # Whereas a file with 1.0 values indicates possible fractional values # Decimal('1.0').as_tuple().exponent returns -1 # or # Decimal('1.0')._exp returns -1 (_, digits, exponent) = dec.as_tuple() if exponent >= 0: return DataAnalyzer.DataType(name='Integer', length=len(digits) + exponent, ) else: return DataAnalyzer.DataType(name='Decimal', length=max(len(digits), abs(exponent)), precision=abs(exponent), ) except Exception: pass # Look for date in text if '-' in value: if ':' in value: for dt_format in [ "%Y-%m-%d %H:%M", "%m-%d-%Y %H:%M", "%d-%m-%Y %H:%M", "%Y-%m-%d %H:%M:%S", "%m-%d-%Y %H:%M:%S", "%d-%m-%Y %H:%M:%S", "%Y-%m-%d %H:%M:%S.%f", "%m-%d-%Y %H:%M:%S.%f", "%d-%m-%Y %H:%M:%S.%f", "%Y-%m-%d %I:%M %p", "%m-%d-%Y %I:%M %p", "%d-%m-%Y %I:%M %p", "%Y-%m-%d %I:%M:%S %p", "%m-%d-%Y %I:%M:%S %p", "%d-%m-%Y %I:%M:%S %p", ]: # noinspection PyBroadException try: _ = str2date(value, dt_format=dt_format) dt_type = DataAnalyzer.DataType(name="Date") dt_type.format = dt_format dt_type.length = len(value) return dt_type except Exception: pass else: # No time in value for dt_format in ["%Y-%m-%d", "%m-%d-%Y", "%d-%m-%Y"]: try: _ = str2date(value, dt_format=dt_format) dt_type = DataAnalyzer.DataType(name="Date") dt_type.format = dt_format dt_type.length = len(value) return dt_type except Exception: pass elif '/' in value: if ':' in value: for dt_format in [ "%Y/%m/%d %H:%M", "%m/%d/%Y %H:%M", "%d/%m/%Y %H:%M", "%Y/%m/%d %H:%M:%S", "%m/%d/%Y %H:%M:%S", "%d/%m/%Y %H:%M:%S", "%Y/%m/%d %H:%M:%S.%f", "%m/%d/%Y %H:%M:%S.%f", "%d/%m/%Y %H:%M:%S.%f", "%Y/%m/%d %I:%M %p", "%m/%d/%Y %I:%M %p", "%d/%m/%Y %I:%M %p", "%Y/%m/%d %I:%M:%S %p", "%m/%d/%Y %I:%M:%S %p", "%d/%m/%Y %I:%M:%S %p", ]: try: _ = str2date(value, dt_format=dt_format) dt_type = DataAnalyzer.DataType(name="Date") dt_type.format = dt_format dt_type.length = len(value) return dt_type except Exception: pass # Else it's an actual string else: # No time in value for dt_format in ["%Y/%m/%d", "%m/%d/%Y", "%d/%m/%Y"]: try: _ = str2date(value, dt_format=dt_format) dt_type = DataAnalyzer.DataType(name="Date") dt_type.format = dt_format dt_type.length = len(value) return dt_type except Exception: pass # Else it's an actual string return DataAnalyzer.DataType(name=type(value).__name__, length=len(value)) elif isinstance(value, int): return DataAnalyzer.DataType(name='Integer', length=get_integer_places(value)) elif isinstance(value, float): if self.float_as_decimal: dec = Context(prec=16, rounding=ROUND_HALF_EVEN).create_decimal_from_float(value).normalize() (_, digits, exponent) = dec.as_tuple() if exponent >= 0: return DataAnalyzer.DataType(name='Integer', length=len(digits) + exponent, ) else: return DataAnalyzer.DataType(name='Decimal', length=max(len(digits), abs(exponent) ), precision=abs(exponent)) return DataAnalyzer.DataType(name=type(value).__name__)
[docs] def next_row(self): self.row_column_name_set = set() self.rows_processed += 1
[docs] @staticmethod def null_safe_max(a: Union[int, None], b: Union[int, None]) -> Union[int, None]: if a is None: if b is None: return None else: return b else: if b is None: return a else: return max(a, b)
[docs] def analyze_column(self, column_name, column_value, column_number=None): self.column_present_count[column_name] = self.column_present_count.get(column_name, 0) + 1 # Process column names if column_number is not None: if len(self.column_names) < column_number: if column_name not in self.column_names: self.column_names.append(column_name) else: self.column_names_consistent = False else: if self.column_names[column_number - 1] != column_name: self.column_names_consistent = False if column_name not in self.column_names: self.column_names.append(column_name) else: if column_name not in self.column_names: self.column_names.append(column_name) if column_name not in self.row_column_name_set: self.row_column_name_set.add(column_name) else: self.duplicate_column_names.get(column_name, set()).add(column_number) # Process column_valid_values if column_name not in self.column_valid_values: self.column_valid_values[column_name] = dict() if self.rows_processed > 0: self.new_columns_after_first_row = True value = column_value try: hash(column_value) except TypeError: # un-hashable type value = str(column_value) self.column_valid_values[column_name][value] = self.column_valid_values[column_name].get(value, 0) + 1 # Process column_data_types if column_value is not None: self.column_not_null[column_name] = self.column_not_null.get(column_name, 0) + 1 existing_type = self.column_data_types.get(column_name) row_type = self._type_from_value(column_value) if column_name not in self.column_data_types_counts: self.column_data_types_counts[column_name] = defaultdict(int) self.column_data_types_counts[column_name][row_type.name] += 1 new_type = existing_type if existing_type is None: new_type = row_type else: if existing_type.name in ['str', 'unicode', 'bytes']: new_type.length = self.null_safe_max(row_type.length, existing_type.length) elif existing_type.name == 'Date': if row_type.name == 'Date': new_type.length = self.null_safe_max(row_type.length, existing_type.length) if isinstance(existing_type.format, dict): # Add one to the counter for this format new_type.format[row_type.format] = new_type.format.get(row_type.format, 0) + 1 elif row_type.format != existing_type.format: fmts = dict() fmts[existing_type.format] = self.rows_processed - 1 fmts[row_type.format] = 1 new_type.format = fmts elif row_type.name in ['str', 'unicode', 'bytes']: new_type = row_type new_type.length = self.null_safe_max(row_type.length, existing_type.length) elif existing_type.name == 'Integer': if row_type.name == 'Integer': new_type.length = self.null_safe_max(row_type.length, existing_type.length) elif row_type.name == 'Decimal': new_type = row_type new_type.length = self.null_safe_max(row_type.length, existing_type.length) else: new_type = DataAnalyzer.DataType(name='str') new_type.length = self.null_safe_max(row_type.length, existing_type.length) else: if row_type.name != existing_type.name: new_type = DataAnalyzer.DataType(name='str') new_type.length = self.null_safe_max(row_type.length, existing_type.length) self.column_data_types[column_name] = new_type
[docs] def analyze_row(self, row): """ Analyze the data row passed in. Call this for all the rows that should be analyzed. """ stats = self.get_stats_entry(stats_id='analyze_row') stats.timer.start() stats['rows processed'] = self.rows_processed column_number = 0 for column_name in row.columns_in_order: column_number += 1 column_value = row[column_name] self.analyze_column(column_name=column_name, column_value=column_value, column_number=column_number, ) self.next_row() stats.timer.stop()
[docs] def print_analysis(self, out: io.TextIOBase = None, valid_value_limit: int = 10, columns_header: str = None, columns_out_fmt: str = None ): """ Print the data analysis results. Parameters ---------- out: The File to write the results to. Default=``stdout`` valid_value_limit (int): How many valid values should be printed. valid_value_limit: The number of valid values to output columns_header: The table header for the columns list columns_out_fmt: The format to use for lines """ if out is None: out = stdout if columns_out_fmt is None: columns_out_fmt = self.DEFAULT_FORMAT if columns_header is None: columns_header = self.COLUMN_HEADERS print(f"\nRows processed = {self.rows_processed:,}", file=out) if not self.column_names_consistent: print("**** COLUMN NAMES NOT CONSISTENT IN ALL ROWS", file=out) if self.new_columns_after_first_row: print("**** NEW COLUMN NAME APPEARED AFTER FIRST ROW", file=out) print("Columns:", file=out) print(columns_header, file=out) column_dict = dict() col_cnt = 0 for col in self.column_names: col_cnt += 1 if col not in column_dict: column_dict[col] = list() column_dict[col].append(col_cnt) msg = "" not_present_on_rows = self.rows_processed - self.column_present_count.get(col, 0) if not_present_on_rows > 0: msg += f" [Not present on {not_present_on_rows:,} rows]" most_common_value = None vv = self.column_valid_values.get(col) if vv is not None: vv_list = sorted(list(vv.items()), key=itemgetter(1), reverse=True) if len(vv_list) >= 1: most_common_value = vv_list[0] print(columns_out_fmt.format( col=col, type=str(self.column_data_types.get(col)), present=self.column_present_count.get(col, 0), not_present_on_rows=self.rows_processed - self.column_present_count.get(col, 0), non_null_rows=self.column_not_null.get(col, 0), cardinality=len(self.column_valid_values.get(col, list())), most_common_value=most_common_value, msg=msg, ), file=out ) print("", file=out) if len(self.duplicate_column_names) > 0: print("Duplicate column names:", file=out) for col_name, col_positions in self.duplicate_column_names.items(): print(f"Column {col_name} appears in positions {col_positions}", file=out) print("", file=out) print("Columns Valid Values:", file=out) col_cnt = 0 for col in self.column_names: col_cnt += 1 if col_cnt > 1: print("", file=out) vv = self.column_valid_values.get(col) if vv is not None: vv_list = sorted(list(vv.items()), key=itemgetter(1), reverse=True) print(f"{col} (col {col_cnt}) Cardinality {len(vv):,} Apparent type {str(self.column_data_types.get(col))} Source Type {type(vv_list[0][0]).__name__} :", file=out) for (v, freq) in vv_list[:valid_value_limit]: try: v_str = str(v) if len(v_str) <= 60: print(f"\t{v_str:60}\tFreq = {freq:,}", file=out) else: print(f"\t{v_str[:60]}\tFreq = {freq:,} (Value truncated actual width = {len(v_str)} chars)", file=out) except Exception as e: print(e, file=out) print(f"Freq = {freq:,}", file=out) if len(vv) > valid_value_limit: print("\t--More values not printed--", file=out) else: print(f"{col} (col {col_cnt}) had no data values", file=out) print('', file=out) if col in self.column_data_types_counts: type_counts = self.column_data_types_counts[col] print("\tApparent Data Types:", file=out) for row_type, freq in sorted(list(type_counts.items()), key=itemgetter(1), reverse=True): print(f'\t\t{freq:,} rows appear to be {row_type}', file=out)
[docs] def get_analysis_str(self) -> str: analysis_block = io.StringIO() self.print_analysis(out=analysis_block) return analysis_block.getvalue()
[docs] def log_analysis(self): self.log.info(self.get_analysis_str())