Source code for bi_etl.components.hst_table

"""
Created on Nov 4, 2014

@author: Derek Wood
"""
import traceback
import typing
import warnings
from datetime import datetime, timedelta
from enum import Enum
from typing import Union, Callable, List

from bi_etl.components.readonlytable import ReadOnlyTable
from bi_etl.components.row.column_difference import ColumnDifference
from bi_etl.components.row.row import Row
from bi_etl.components.row.row_status import RowStatus
from bi_etl.components.table import Table
from bi_etl.conversions import ensure_datetime
from bi_etl.database.database_metadata import DatabaseMetadata
from bi_etl.exceptions import AfterExisting, NoResultFound, BeforeAllExisting
from bi_etl.lookups.autodisk_range_lookup import AutoDiskRangeLookup
from bi_etl.scheduler.task import ETLTask
from bi_etl.statistics import Statistics
from bi_etl.timer import Timer

__all__ = ['HistoryTable']


# noinspection PyAbstractClass,SqlDialectInspection
[docs] class HistoryTable(Table): """ ETL target component for a table that stores history of updates. Also usable as a source. Parameters ---------- task : ETLTask The instance to register in (if not None) database : bi_etl.scheduler.task.Database The database to find the table/view in. table_name : str The name of the table/view. exclude_columns : frozenset Optional. A list of columns to exclude from the table/view. These columns will not be included in SELECT, INSERT, or UPDATE statements. Attributes ---------- auto_generate_key: boolean Should the primary key be automatically generated by the insert/upsert process? If True, the process will get the current maximum value and then increment it with each insert. begin_date: str Name of the begin date field end_date: str Name of the end date field inserts_use_default_begin_date: boolean Should inserts use the default begin date instead of the class effective date This allows records to match up in joins with other history tables where the effective date of the 'primary' might be before the first version effective date. Default = True default_begin_date: date Default begin date to assign for begin_date. Used for new records if inserts_use_default_begin_date is True. Also used for :meth:`get_missing_row`, :meth:`get_invalid_row`, :meth:`get_not_applicable_row`, :meth:`get_various_row` Default = 1900-1-1 default_end_date: date Default begin date to assign for end_date for active rows. Also used for :meth:`get_missing_row`, :meth:`get_invalid_row`, :meth:`get_not_applicable_row`, :meth:`get_various_row` Default = 9999-1-1 auto_generate_key: boolean Should the primary key be automatically generated by the insert/upsert process? If True, the process will get the current maximum value and then increment it with each insert. (inherited from Table) batch_size: int How many rows should be insert/update/deleted in a single batch. (inherited from Table) delete_flag : str The name of the delete_flag column, if any. Optional. (inherited from ReadOnlyTable) delete_flag_yes : str The value of delete_flag for deleted rows. Optional. (inherited from ReadOnlyTable) delete_flag_no : str The value of delete_flag for *not* deleted rows. Optional. (inherited from ReadOnlyTable) default_date_format: str The date parsing format to use for str -> date conversions. If more than one date format exists in the source, then explicit conversions will be required. Default = '%m/%d/%Y' (inherited from Table) force_ascii: boolean Should text values be forced into the ascii character set before passing to the database? Default = False (inherited from Table) last_update_date: str Name of the column which we should update when table updates are made. Default = None (inherited from Table) log_first_row : boolean Should we log progress on the first row read. *Only applies if used as a source.* (inherited from ETLComponent) max_rows : int The maximum number of rows to read. *Only applies if Table is used as a source.* Optional. (inherited from ETLComponent) primary_key: list The name of the primary key column(s). Only impacts trace messages. Default=None. If not passed in, will use the database value, if any. (inherited from ETLComponent) natural_key: list The list of natural key columns (as Column objects). The default is the list of non-begin/end date primary key columns. The default is *not* appropriate for dimension tables with surrogate keys. progress_frequency : int How often (in seconds) to output progress messages. Optional. (inherited from ETLComponent) progress_message : str The progress message to print. Optional. Default is ``"{logical_name} row # {row_number}"``. Note ``logical_name`` and ``row_number`` substitutions applied via :func:`format`. (inherited from ETLComponent) special_values_descriptive_columns: list A list of columns that get longer descriptive text in :meth:`get_missing_row`, :meth:`get_invalid_row`, :meth:`get_not_applicable_row`, :meth:`get_various_row` Optional. (inherited from ReadOnlyTable) track_source_rows: boolean Should the :meth:`upsert` method keep a set container of source row keys that it has processed? That set would then be used by :meth:`update_not_processed`, :meth:`logically_delete_not_processed`, and :meth:`delete_not_processed`. (inherited from Table) type_1_surrogate: str The name of the type 1 surrogate key. The value is automatically generated as equal to the type 2 key on inserts and equal to the existing value on updates. Optional. """
[docs] class TimePrecision(Enum): microsecond = 'µs' millisecond = 'ms' second = 's' minute = 'm' hour = 'h' day = 'd'
[docs] def __init__( self, task: typing.Optional[ETLTask], database: DatabaseMetadata, table_name: str, table_name_case_sensitive: bool = True, schema: str = None, exclude_columns: frozenset = None, default_effective_date: datetime = None, **kwargs ): # Don't pass kwargs up to super. They should be set at the end of this method super().__init__( task=task, database=database, table_name=table_name, table_name_case_sensitive=table_name_case_sensitive, schema=schema, exclude_columns=exclude_columns, ) self.__default_effective_date = None if default_effective_date is not None: self.default_effective_date = default_effective_date self._end_date_timedelta = None # Will be set in the assignment to begin_end_precision below self.begin_end_precision = HistoryTable.TimePrecision.second self.__begin_date_column = None self.__end_date_column = None self.__default_begin_date = None self.default_begin_date = datetime(year=1900, month=1, day=1, hour=0, minute=0, second=0) self.__default_end_date = None self.default_end_date = datetime(year=9999, month=1, day=1, hour=0, minute=0, second=0) self.inserts_use_default_begin_date = True self.default_lookup_class = AutoDiskRangeLookup # define_lookup will add the begin & end dates to the lookup args self.type_1_surrogate = None # Should be the last call of every init self.set_kwattrs(**kwargs)
def __repr__(self): return f"HistoryTable({self.table.name})"
[docs] @staticmethod def kwattrs_order() -> typing.Dict[str, int]: """ Certain values need to be set before others in order to work correctly. This method should return a dict mapping those key values = arg name to a value less than the default of 9999, which will be used for any arg not explicitly listed here. """ return { 'begin_date_column': 1, 'end_date_column': 1, }
@property def default_effective_date(self): if self.__default_effective_date is None: self.__default_effective_date = self.get_current_time() return self.__default_effective_date @default_effective_date.setter def default_effective_date(self, value): self.__default_effective_date = ensure_datetime(value) @property def default_begin_date(self): return self.__default_begin_date @default_begin_date.setter def default_begin_date(self, value): self.__default_begin_date = ensure_datetime(value) @property def default_end_date(self): return self.__default_end_date @default_end_date.setter def default_end_date(self, value): self.__default_end_date = ensure_datetime(value)
[docs] def is_date_key_column(self, column): if self.begin_date_column is None or self.end_date_column is None: raise AssertionError('begin_date or end_date not defined') return (self.get_column_name(column) == self.begin_date_column or self.get_column_name(column) == self.end_date_column )
[docs] def build_nk(self): if len(list(self.primary_key)) == 1: # # TODO: For scd type2 dimension tables the primary_key will be the surrogate key. # So this won't generate the actual natural key. # Need to fix that. We could scan for other unique indexes / constraints. # self.log.error("Cannot determine natural key. Please set it explicitly using the natural_key attribute") return None else: # # If we have multiple keys, we'll assume this is a history table where # the key is the natural key plus an effective date # natural_key = list() # noinspection PyTypeChecker for col in self.primary_key: if not self.is_date_key_column(col): natural_key.append(col) return natural_key
@property def begin_end_precision(self) -> TimePrecision: return self.__begin_end_precision @begin_end_precision.setter def begin_end_precision(self, value: TimePrecision): if isinstance(value, HistoryTable.TimePrecision): self.__begin_end_precision = value if value == HistoryTable.TimePrecision.second: self._end_date_timedelta = timedelta(seconds=-1) elif value == HistoryTable.TimePrecision.millisecond: self._end_date_timedelta = timedelta(milliseconds=-1) elif value == HistoryTable.TimePrecision.microsecond: self._end_date_timedelta = timedelta(microseconds=-1) elif value == HistoryTable.TimePrecision.minute: self._end_date_timedelta = timedelta(minutes=-1) elif value == HistoryTable.TimePrecision.hour: self._end_date_timedelta = timedelta(hours=-1) elif value == HistoryTable.TimePrecision.day: self._end_date_timedelta = timedelta(days=-1) else: raise ValueError(f"{value} is a TimePrecision instance that is not handled") else: raise ValueError(f"{value} is not a TimePrecision instance") @property def begin_date_column(self) -> str: return self.__begin_date_column @begin_date_column.setter def begin_date_column(self, value: str): try: # Remove the old name from custom_special_values if self.__begin_date_column in self.custom_special_values: del self.custom_special_values[self.__begin_date_column] except AttributeError: # __begin_date doesn't exist yet pass self.__begin_date_column = value if self.__begin_date_column is not None: self.custom_special_values[self.__begin_date_column] = self.default_begin_date @property def end_date_column(self) -> str: return self.__end_date_column @end_date_column.setter def end_date_column(self, value: str): try: # Remove the old name from custom_special_values if self.__end_date_column in self.custom_special_values: del self.custom_special_values[self.__end_date_column] except AttributeError: # __end_date doesn't exist yet pass self.__end_date_column = value if self.__end_date_column is not None: self.custom_special_values[self.end_date_column] = self.default_end_date
[docs] def define_lookup( self, lookup_name, lookup_keys, lookup_class=None, lookup_class_kwargs=None, ): """ Define a new lookup. Parameters ---------- lookup_name: str Name for the lookup. Used to refer to it later. lookup_keys: list list of lookup key columns lookup_class: Class Optional python class to use for the lookup. Defaults to value of default_lookup_class attribute. lookup_class_kwargs: dict Optional dict of additional parameters to pass to lookup constructor. Defaults to empty dict. begin_date and end_date are added automatically. """ if lookup_class_kwargs is None: lookup_class_kwargs = self.default_lookup_class_kwargs # Add default begin_date and end_date arguments to the lookup class if 'begin_date' not in lookup_class_kwargs: lookup_class_kwargs['begin_date'] = self.begin_date_column if 'end_date' not in lookup_class_kwargs: lookup_class_kwargs['end_date'] = self.end_date_column # Remove both the begin_date and end_date from the lookup columns # (they are used in the range part) non_date_lookup_columns = list() for c in lookup_keys: if not self.is_date_key_column(c): non_date_lookup_columns.append(c) assert len(non_date_lookup_columns) > 0, ( f"define_lookup {lookup_name} lookup_columns had only begin / end dates or none at all. " f"lookup_columns= {lookup_keys}" ) super().define_lookup( lookup_name=lookup_name, lookup_keys=non_date_lookup_columns, lookup_class=lookup_class, lookup_class_kwargs=lookup_class_kwargs, )
[docs] def fill_cache( self, progress_frequency: float = 10, progress_message="{component} fill_cache current row # {row_number:,}", criteria_list: list = None, criteria_dict: dict = None, column_list: list = None, exclude_cols: frozenset = None, order_by: list = None, assume_lookup_complete: bool = None, allow_duplicates_in_src: bool = False, row_limit: int = None, parent_stats: Statistics = None, ): """ Fill all lookup caches from the table. Parameters ---------- column_list: list Optional. Specific columns to include when filling the cache. exclude_cols: frozenset Optional. Columns to exclude from the cached rows. progress_frequency : int How often (in seconds) to output progress messages. Default 10. None for no progress messages. Optional. progress_message : str The progress message to print. Default is ``"{table} fill_cache current row # {row_number:,}"``. Note ``logical_name`` and ``row_number`` substitutions applied via :func:`format`. Optional. criteria_list : string or list of strings Each string value will be passed to :meth:`sqlalchemy.sql.expression.Select.where`. https://goo.gl/JlY9us criteria_dict : dict Dict keys should be columns, values are set using = or in assume_lookup_complete: boolean Should later lookup calls assume the cache is complete (and thus raise an Exception if a key combination is not found)? Default to False if filtering criteria was used, otherwise defaults to True. allow_duplicates_in_src: Should we quietly let the source provide multiple rows with the same key values? Default = False row_limit: int limit on number of rows to cache. parent_stats: bi_etl.statistics.Statistics Optional Statistics object to nest this steps statistics in. Default is to place statistics in the ETLTask level statistics. order_by: list Columns to order by when pulling data. Sometimes required to build the cache corretly. """ if order_by == '-PK-': order_by = list(self.get_pk_lookup().lookup_keys) # Make sure begin date is not already in the order_by if self.begin_date_column in order_by: order_by.remove(self.begin_date_column) # Add begin date as the last part of the order by order_by.append(self.begin_date_column) super().fill_cache( progress_frequency=progress_frequency, progress_message=progress_message, criteria_list=criteria_list, criteria_dict=criteria_dict, column_list=column_list, exclude_cols=exclude_cols, order_by=order_by, assume_lookup_complete=assume_lookup_complete, allow_duplicates_in_src=allow_duplicates_in_src, row_limit=row_limit, parent_stats=parent_stats, )
[docs] def get_by_lookup_and_effective_date( self, lookup_name, source_row, effective_date, stats_id='get_by_lookup_and_effective_date', parent_stats=None, fallback_to_db: bool = False, ): """ Get by an alternate key. Returns a row. Parameters ---------- lookup_name: str Name passed into :meth:`define_lookup` source_row: :class:`~bi_etl.components.row.row_case_insensitive.Row` :class:`~bi_etl.components.row.row_case_insensitive.Row` to get lookup keys from effective_date: date Effective date to use for lookup stats_id: str Statistics name to use parent_stats: bi_etl.statistics.Statistics Optional Statistics object to nest this steps statistics in. Default is to place statistics in the ETLTask level statistics. fallback_to_db: Should we check the DB if a record is not found in the cache Raises ------ NoResultFound: If key doesn't exist. BeforeAllExisting: If the effective date provided is before all existing records. """ if lookup_name is None: lookup = self.get_default_lookup(source_row.iteration_header) else: lookup = self.get_lookup(lookup_name) stats = self.get_stats_entry(stats_id, parent_stats=parent_stats) stats.timer.start() fallback_to_db = fallback_to_db or self.always_fallback_to_db or not self.cache_clean return lookup.find( row=source_row, fallback_to_db=fallback_to_db, maintain_cache=self.maintain_cache_during_load, effective_date=effective_date, stats=stats, )
[docs] def get_by_lookup( self, lookup_name: str, source_row: Row, stats_id: str = 'get_by_lookup', parent_stats: typing.Optional[Statistics] = None, fallback_to_db: bool = False, ) -> Row: """ Get by an alternate key. Returns a row. Parameters ---------- lookup_name: str Name passed into :meth:`define_lookup` source_row: :class:`~bi_etl.components.row.row_case_insensitive.Row` Row to get lookup keys from (including effective date) stats_id: str Statistics name to use parent_stats: bi_etl.statistics.Statistics Optional Statistics object to nest this steps statistics in. Default is to place statistics in the ETLTask level statistics. fallback_to_db: Should we check the DB if a record is not found in the cache Raises ------ NoResultFound: If key doesn't exist. BeforeAllExisting: If the effective date provided is before all existing records. """ if self.begin_date_column in source_row: effective_date = source_row[self.begin_date_column] else: effective_date = self.default_effective_date return self.get_by_lookup_and_effective_date( lookup_name=lookup_name, source_row=source_row, effective_date=effective_date, stats_id=stats_id, parent_stats=parent_stats, fallback_to_db=fallback_to_db, )
[docs] def sanity_check_source_mapping( self, source_definition, source_name=None, source_excludes: typing.Optional[frozenset] = None, target_excludes: typing.Optional[frozenset] = None, ignore_source_not_in_target=None, ignore_target_not_in_source=None, raise_on_source_not_in_target=None, raise_on_target_not_in_source=None, ): if target_excludes is None: target_excludes = set() else: target_excludes = set(target_excludes) # include update begin or end dates like normal columns if self.begin_date_column not in target_excludes: target_excludes.add(self.begin_date_column) if self.end_date_column not in target_excludes: target_excludes.add(self.end_date_column) if self.type_1_surrogate and self.type_1_surrogate not in target_excludes: target_excludes.add(self.type_1_surrogate) target_excludes = frozenset(target_excludes) super().sanity_check_source_mapping( source_definition=source_definition, source_name=None, source_excludes=source_excludes, target_excludes=target_excludes, ignore_source_not_in_target=ignore_source_not_in_target, ignore_target_not_in_source=ignore_target_not_in_source, raise_on_source_not_in_target=raise_on_source_not_in_target, raise_on_target_not_in_source=raise_on_target_not_in_source, )
[docs] def insert_row( self, source_row: Row, additional_insert_values: dict = None, source_excludes: typing.Optional[frozenset] = None, target_excludes: typing.Optional[frozenset] = None, stat_name: str = 'insert', parent_stats: Statistics = None, **kwargs ) -> Row: """ Inserts a row into the database (batching rows as batch_size) Parameters ---------- source_row The row with values to insert additional_insert_values source_excludes: set of source columns to exclude target_excludes set of target columns to exclude stat_name parent_stats **kwargs Returns ------- new_row """ stats = self.get_stats_entry(stat_name, parent_stats=parent_stats) stats.timer.start() if self.begin_date_column is None: raise ValueError("begin_date column name not set") if self.end_date_column is None: raise ValueError("end_date column name not set") if source_row.get(self.begin_date_column, None) is None: source_row.set_keeping_parent(self.begin_date_column, self.default_begin_date) if source_row.get(self.end_date_column, None) is None: source_row.set_keeping_parent(self.end_date_column, self.default_end_date) new_row = self.build_row( source_row=source_row, source_excludes=source_excludes, target_excludes=target_excludes, parent_stats=stats, ) if new_row[self.begin_date_column] > new_row[self.end_date_column]: raise ValueError( f"Begin date {new_row[self.begin_date_column]} " f"greater than end date {new_row[self.end_date_column]} for row {new_row.str_formatted()}" ) # Generate a new type 1 key value if (self.type_1_surrogate is not None and self.auto_generate_key and new_row.get(self.type_1_surrogate, None) is None): if new_row[self.begin_date_column] == self.default_begin_date: self.autogenerate_sequence(new_row, seq_column=self.type_1_surrogate, force_override=False) else: msg = 'Insert cannot maintain type1 surrogate keys for anything except new values. Please use upsert.' warnings.warn(msg) new_row_from_super = super().insert_row( new_row, additional_insert_values=additional_insert_values, source_excludes=source_excludes, target_excludes=target_excludes, parent_stats=parent_stats, ) return new_row_from_super
[docs] def apply_updates( self, row, changes_list: typing.MutableSequence[ColumnDifference] = None, additional_update_values: Union[dict, Row] = None, add_to_cache: bool = True, allow_insert=True, stat_name: str = 'update', parent_stats: Statistics = None, **kwargs ): """ This method should only be called with a row that has already been transformed into the correct datatypes and column names. The update values can be in any of the following parameters - row (also used for PK) - changes_list - additional_update_values Parameters ---------- row: The row to update with (needs to have at least PK values) changes_list: A list of ColumnDifference objects to apply to the row additional_update_values: A Row or dict of additional values to apply to the row add_to_cache: Should this method update the cache (not if caller will) allow_insert: boolean Allow this method to insert a new row into the cache stat_name: Name of this step for the ETLTask statistics. Default = 'update' parent_stats: kwargs: effective_date: datetime The effective date to use for the update """ end_date_coerce = self.get_coerce_method(self.end_date_column) begin_date_coerce = self.get_coerce_method(self.begin_date_column) effective_date = begin_date_coerce(kwargs.get('effective_date', self.default_effective_date)) row.set_keeping_parent(self.begin_date_column, end_date_coerce(row[self.begin_date_column])) row.set_keeping_parent(self.end_date_column, end_date_coerce(row[self.end_date_column])) if row[self.begin_date_column] > row[self.end_date_column]: raise ValueError( f"Begin date {row[self.begin_date_column]} " f"greater than end date {row[self.end_date_column]} for row {row.str_formatted()}" ) # Check for special case that the target already has that exact begin date if row[self.begin_date_column] == effective_date: # In which case, we simply update it in place if self.trace_data: self.log.debug( f"Begin date exact match. Type 1 update row with begin effective {row[self.begin_date_column]} " f"with new end date of {row[self.end_date_column]}" ) # Apply updates to the row (use parent class routine to finish the work) super(HistoryTable, self).apply_updates( row, changes_list=changes_list, additional_update_values=additional_update_values, stat_name=stat_name, parent_stats=parent_stats, add_to_cache=add_to_cache, allow_insert=allow_insert, ) else: # Create a clone of the existing row new_row = row.clone() new_row.status = RowStatus.insert # Force the new row to get a new surrogate key (if enabled) self.autogenerate_key(new_row, force_override=True) # Apply updates to the new row (use parent class routine to finish the work) # It won't send update since new_row.status = RowStatus.insert # The row actually gets inserted at the end of this method super().apply_updates( new_row, changes_list=changes_list, additional_update_values=additional_update_values, add_to_cache=False, parent_stats=parent_stats, allow_insert=allow_insert, ) new_row.set_keeping_parent(self.begin_date_column, effective_date) # check that we aren't inserting after all existing rows if new_row[self.end_date_column] < effective_date: self.warnings_issued += 1 if self.warnings_issued < self.warnings_limit: self.log.warning( f"The table had an existing key sequence that did not cover all dates.\n" f" End = {new_row[self.end_date_column]} is less than eff date {effective_date}\n" f" Natural Keys = {self.get_nk_lookup().get_list_of_lookup_column_values(row)}." ) for row in self.get_nk_lookup().get_versions_collection(row).values(): self.log.info(f" begin= {row[self.begin_date_column]}\tend= {row[self.end_date_column]}") new_row.set_keeping_parent(self.end_date_column, self.default_end_date) # Retire the existing row (after clone so that the existing end_date is passed on to the new row) # Note: The parent class apply_updates will check to only send update statement if the row is in the # database, otherwise it will update the record that's pending insert super().apply_updates( row, stat_name=stat_name, parent_stats=parent_stats, changes_list=[ColumnDifference( self.end_date_column, old_value=row[self.end_date_column], new_value=end_date_coerce(effective_date + self._end_date_timedelta) )], add_to_cache=add_to_cache, allow_insert=allow_insert, ) if self.trace_data: self.log.debug( f"retiring row with begin effective {row[self.begin_date_column]} " f"with new end date of {row[self.end_date_column]}" ) # Add the new row self.insert_row(new_row, stat_name=stat_name, parent_stats=parent_stats)
def _target_excludes_for_updates(self, target_excludes: frozenset): if target_excludes is None: target_excludes = set() # Don't map a source end date through to the target if self.end_date_column not in target_excludes: new_set = set(target_excludes) new_set.add(self.end_date_column) target_excludes = frozenset(new_set) return target_excludes
[docs] def build_row( self, source_row: Row, source_excludes: typing.Optional[frozenset] = None, target_excludes: typing.Optional[frozenset] = None, stat_name: str = 'build rows', parent_stats: Statistics = None, ) -> Row: source_mapped_as_target_row = super().build_row( source_row=source_row, source_excludes=source_excludes, target_excludes=target_excludes, stat_name=stat_name, parent_stats=parent_stats ) if self.delete_flag is not None and ( self.delete_flag not in source_mapped_as_target_row or source_mapped_as_target_row[self.delete_flag] is None): source_mapped_as_target_row.set_keeping_parent(self.delete_flag, self.delete_flag_no) return source_mapped_as_target_row
[docs] def upsert( self, source_row: Union[Row, List[Row]], lookup_name: str = None, skip_update_check_on: typing.Optional[frozenset] = None, do_not_update: list = None, additional_update_values: dict = None, additional_insert_values: dict = None, update_callback: Callable[[typing.MutableSequence, Row], None] = None, insert_callback: Callable[[Row], None] = None, source_excludes: typing.Optional[frozenset] = None, target_excludes: typing.Optional[frozenset] = None, stat_name: str = 'upsert', parent_stats: Statistics = None, **kwargs ): """ Update (if changed) or Insert a row in the table. Returns the row found/inserted, with the auto-generated key (if that feature is enabled) Parameters ---------- source_row: :class:`~bi_etl.components.row.row_case_insensitive.Row` Row to upsert lookup_name: str The name of the lookup (see :meth:`define_lookup`) to use when searching for an existing row. skip_update_check_on: list List of column names to not compare old vs new for updates. do_not_update: list List of columns to never update. additional_update_values: dict Additional updates to apply when updating additional_insert_values: dict Additional values to set on each row when inserting. update_callback: func Function to pass updated rows to. Function should not modify row. insert_callback: func Function to pass inserted rows to. Function should not modify row. source_excludes: frozenset list of source columns to exclude when mapping to this Table. target_excludes: frozenset list of Table columns to exclude when mapping from the source row stat_name: string Name of this step for the ETLTask statistics. Default = 'upsert' parent_stats: bi_etl.statistics.Statistics Optional Statistics object to nest this steps statistics in. Default is to place statistics in the ETLTask level statistics. kwargs: effective_date: datetime The effective date to use for the update """ stats = self.get_stats_entry(stat_name, parent_stats=parent_stats) stats.ensure_exists('upsert source row count') stats.ensure_exists('apply_updates called') stats.ensure_exists('insert new called') stats.timer.start() stats['upsert source row count'] += 1 self.begin() self._set_upsert_mode() effective_date = kwargs.get('effective_date') begin_date_coerce = self.get_coerce_method(self.begin_date_column) if effective_date is None: if self.begin_date_column in source_row and source_row[self.begin_date_column] is not None: effective_date = begin_date_coerce(source_row[self.begin_date_column]) else: effective_date = begin_date_coerce(self.default_effective_date) else: effective_date = begin_date_coerce(effective_date) skip_update_check_on = self._target_excludes_for_updates(skip_update_check_on) source_mapped_as_target_row = self.build_row( source_row=source_row, source_excludes=source_excludes, target_excludes=target_excludes, parent_stats=stats, ) if self.delete_flag is not None and ( self.delete_flag not in source_mapped_as_target_row or source_mapped_as_target_row[self.delete_flag] is None): source_mapped_as_target_row.set_keeping_parent(self.delete_flag, self.delete_flag_no) if self.track_source_rows: # Keep track of source records so we can check if target rows don't exist in source self.source_keys_processed.add(self.get_natural_key_tuple(source_mapped_as_target_row)) # Don't check in begin or end dates for differences if skip_update_check_on is None: skip_update_check_on = frozenset() if do_not_update is None: do_not_update = frozenset() if self.begin_date_column not in skip_update_check_on: skip_update_check_on = skip_update_check_on | {self.begin_date_column} if self.end_date_column not in skip_update_check_on: skip_update_check_on = skip_update_check_on | {self.end_date_column} if self.last_update_date not in skip_update_check_on: skip_update_check_on = skip_update_check_on | {self.last_update_date} type_1_surrogate = self.type_1_surrogate lookup_keys = None # Check for existing row try: # We'll default to using the primary key or NK as provided in the row if lookup_name is None: lookup_object = self.get_default_lookup(source_row.iteration_header) lookup_name = lookup_object.lookup_name else: lookup_object = self.get_lookup(lookup_name) if not lookup_object.cache_enabled and self.maintain_cache_during_load and self.batch_size > 1: raise AssertionError("Caching needs to be turned on if batch mode is on!") if self.trace_data: # lookup_keys is used only for data trace lookup_keys = lookup_object.get_list_of_lookup_column_values(source_mapped_as_target_row) lookup_keys.append(effective_date) existing_row = self.get_by_lookup_and_effective_date( lookup_name, source_row=source_mapped_as_target_row, effective_date=effective_date, parent_stats=stats ) changes_list = existing_row.compare_to( source_mapped_as_target_row, exclude=do_not_update | skip_update_check_on, coerce_types=False, ) delayed_changes = existing_row.compare_to(source_mapped_as_target_row, compare_only=skip_update_check_on) if self.track_update_columns: col_stats = self.get_stats_entry('updated columns', parent_stats=stats) for chg in changes_list: col_stats.add_to_stat(key=chg.column_name, increment=1) if self.trace_data: self.log.debug( f"{lookup_keys} {chg.column_name} changed from {chg.old_value} to {chg.new_value}" ) if len(changes_list) > 0: for chg in delayed_changes: col_stats.add_to_stat(key=chg.column_name, increment=1) if self.trace_data: self.log.debug( f"{lookup_keys} NO UPDATE TRIGGERED BY {chg.column_name} " f"changed from {chg.old_value} to {chg.new_value}" ) if len(changes_list) > 0: stats['apply_updates called'] += 1 self.apply_updates( existing_row, effective_date=effective_date, changes_list=list(changes_list) + list(delayed_changes), additional_update_values=additional_update_values, parent_stats=stats ) # For testing commit each update # self.commit(parent_stats= stats); if update_callback: update_callback(changes_list, existing_row) target_row = existing_row except BeforeAllExisting as e: # Logic to insert a new row before the existing row in e.first_existing_row new_row = source_mapped_as_target_row if additional_insert_values: for colName, value in additional_insert_values.items(): new_row[colName] = value if self.inserts_use_default_begin_date: new_row.set_keeping_parent(self.begin_date_column, self.default_begin_date) else: new_row.set_keeping_parent(self.begin_date_column, effective_date) # Set end date to 1 second before existing records begin date first_begin = e.first_existing_row[self.begin_date_column] first_minus_1 = first_begin + self._end_date_timedelta new_row.set_keeping_parent(self.end_date_column, first_minus_1) # Get type 1 key value from existing row if type_1_surrogate is not None: new_row.set_keeping_parent(type_1_surrogate, e.first_existing_row[type_1_surrogate]) # Generate a new type 2 surrogate key value self.autogenerate_key(new_row, force_override=True) if self.last_update_date is not None: self.set_last_update_date(new_row) if self.trace_data: self.log.debug(f"Inserting record effective {effective_date} before existing history") stats['insert before existing called'] += 1 self.insert_row(new_row, parent_stats=stats) if insert_callback: insert_callback(new_row) target_row = new_row except NoResultFound: new_row = source_mapped_as_target_row if additional_insert_values: for colName, value in additional_insert_values.items(): new_row[colName] = value if self.inserts_use_default_begin_date: new_row.set_keeping_parent(self.begin_date_column, self.default_begin_date) else: new_row.set_keeping_parent(self.begin_date_column, effective_date) new_row.set_keeping_parent(self.end_date_column, self.default_end_date) # Generate a new type 1 key value if type_1_surrogate is not None and self.auto_generate_key: self.autogenerate_sequence(new_row, seq_column=type_1_surrogate, force_override=True) # Generate a new type 2 surrogate key value self.autogenerate_key(new_row, force_override=True) if self.last_update_date is not None: self.set_last_update_date(new_row) if self.trace_data: self.log.debug(f"{lookup_keys} not found, inserting as new") stats['insert new called'] += 1 self.insert_row(new_row, parent_stats=stats) if insert_callback: insert_callback(new_row) # debugging commit # self.commit(parent_stats= stats); target_row = new_row stats.timer.stop() return target_row
[docs] def delete(self, **kwargs): """ Not implemented for history table. Instead use `physically_delete_version`. """ raise NotImplementedError("Use physically_delete_version instead")
[docs] def physically_delete_version( self, row_to_be_deleted: Row, remove_from_cache=True, prior_row: Row = ..., stat_name='delete_version', parent_stats=None ): """ **Physically** delete a given version row. Corrects the preceding end date value. Parameters ---------- row_to_be_deleted: :class:`~bi_etl.components.row.row_case_insensitive.Row` Expected to be an entire existing row. remove_from_cache: boolean Optional. Remove the row from the cache? Default = True prior_row: bi_etl.components.row.row_case_insensitive.Row Optional. The prior row, if already available. If None, it will be obtained via :meth:`get_by_lookup_and_effective_date` stat_name: str Name of this step for the ETLTask statistics. Default = 'delete_version' parent_stats: bi_etl.statistics.Statistics Optional. Statistics object to nest this steps statistics in. Default is to place statistics in the ETLTask level statistics. """ stats = self.get_stats_entry(stat_name, parent_stats=parent_stats) stats.timer.start() stats['rows sent for delete'] += 1 self.begin() if row_to_be_deleted.status == RowStatus.unknown: # Tag the callers row as deleted for that functions sake row_to_be_deleted.status = RowStatus.deleted # Get from the cache row_to_be_deleted = self.get_by_key(row_to_be_deleted) prior_effective_date = row_to_be_deleted[self.begin_date_column] + self._end_date_timedelta if not self.in_bulk_mode: # if the row is not pending insert, issue delete to database if row_to_be_deleted.status != RowStatus.insert: # Delete the row in the database super().delete( key_names=self.primary_key, key_values=row_to_be_deleted, maintain_cache=False, parent_stats=stats, ) row_to_be_deleted.status = RowStatus.deleted # remove the row from the cache, if that is selected. Otherwise, the caller will need to do that. if remove_from_cache: self.uncache_row(row_to_be_deleted) # If we were not given a prior row first_existing_row = None if prior_row is ...: try: prior_row = self.get_by_lookup_and_effective_date( self._get_pk_lookup_name(), row_to_be_deleted, prior_effective_date, parent_stats=stats, ) except BeforeAllExisting as e: prior_row = None first_existing_row = e.first_existing_row except AfterExisting as e: # No following. Delete existing. Update prior to be high end date prior_row = e.prior_row except NoResultFound: # No other rows. Nothing to do. Ellipsis will skip both blocks below. prior_row = ... if prior_row is None: # No prior row. # If inserts_use_default_begin_date then we need to update the following row if self.inserts_use_default_begin_date: if first_existing_row is not None: first_existing_row.set_keeping_parent( self.begin_date_column, row_to_be_deleted[self.begin_date_column] ) super().apply_updates( row=first_existing_row, parent_stats=stats, allow_insert=False, ) # If we have a real prior row if prior_row is not None and prior_row is not ...: # If we didn't end up with prior = this row if prior_row[self.begin_date_column] != row_to_be_deleted[self.begin_date_column]: prior_row.set_keeping_parent(self.end_date_column, row_to_be_deleted[self.end_date_column]) # Apply updates to the row (use parent class routine to finish the work) super().apply_updates( row=prior_row, parent_stats=stats, allow_insert=False, ) stats.timer.stop()
[docs] def delete_not_in_set( self, set_of_key_tuples: set, lookup_name: str = None, criteria_list: list = None, criteria_dict: dict = None, use_cache_as_source: bool = True, stat_name: str = 'delete_not_in_set', progress_frequency: int = None, parent_stats: Statistics = None, **kwargs ): """ Overridden to call logical delete. Deletes rows matching criteria that are not in the list_of_key_tuples pass in. Parameters ---------- set_of_key_tuples List of tuples comprising the primary key values. This list represents the rows that should *not* be deleted. lookup_name: str The name of the lookup to use to find key tuples. criteria_list : string or list of strings Each string value will be passed to :meth:`sqlalchemy.sql.expression.Select.where`. https://goo.gl/JlY9us criteria_dict : dict Dict keys should be columns, values are set using = or in use_cache_as_source: bool Attempt to read existing rows from the cache? stat_name: string Name of this step for the ETLTask statistics. Default = 'delete_not_in_set' progress_frequency: int How often (in seconds) to output progress messages. Default 10. None for no progress messages. parent_stats: bi_etl.statistics.Statistics Optional Statistics object to nest this steps statistics in. Default is to place statistics in the ETLTask level statistics. kwargs: effective_date: datetime The effective date to use for the update """ self.logically_delete_not_in_set( set_of_key_tuples=set_of_key_tuples, lookup_name=lookup_name, criteria_list=criteria_list, criteria_dict=criteria_dict, use_cache_as_source=use_cache_as_source, stat_name=stat_name, parent_stats=parent_stats, **kwargs )
[docs] def delete_not_processed( self, criteria_list: list = None, criteria_dict: dict = None, use_cache_as_source: bool = True, stat_name: str = 'delete_not_processed', parent_stats: Statistics = None, **kwargs ): """ Overridden to call logical delete. Logically deletes rows matching criteria that are not in the Table memory of rows passed to :meth:`upsert`. Parameters ---------- criteria_list : string or list of strings Each string value will be passed to :meth:`sqlalchemy.sql.expression.Select.where`. https://goo.gl/JlY9us criteria_dict : dict Dict keys should be columns, values are set using = or in use_cache_as_source: bool Attempt to read existing rows from the cache? stat_name: string Name of this step for the ETLTask statistics. parent_stats: bi_etl.statistics.Statistics Optional Statistics object to nest this steps statistics in. Default is to place statistics in the ETLTask level statistics. """ # Perform logical delete despite call to "delete" self.logically_delete_not_processed( criteria_list=criteria_list, criteria_dict=criteria_dict, parent_stats=parent_stats, **kwargs )
[docs] def update_not_in_set( self, updates_to_make: Union[dict, Row], set_of_key_tuples: set, lookup_name: str = None, criteria_list: list = None, criteria_dict: dict = None, use_cache_as_source: bool = True, progress_frequency: int = None, stat_name: str = 'update_not_in_set', parent_stats: Statistics = None, **kwargs ): """ Applies update to rows matching criteria that are not in the list_of_key_tuples pass in. Parameters ---------- updates_to_make: :class:`Row` Row or dict of updates to make set_of_key_tuples: set Set of tuples comprising the primary key values. This list represents the rows that should *not* be updated. lookup_name: str The name of the lookup (see :meth:`define_lookup`) to use when searching for an existing row. criteria_list : string or list of strings Each string value will be passed to :meth:`sqlalchemy.sql.expression.Select.where`. https://goo.gl/JlY9us criteria_dict : dict Dict keys should be columns, values are set using = or in use_cache_as_source: bool Attempt to read existing rows from the cache? stat_name: string Name of this step for the ETLTask statistics. Default = 'delete_not_in_set' progress_frequency: int How often (in seconds) to output progress messages. Default = 10. None for no progress messages. parent_stats: bi_etl.statistics.Statistics Optional Statistics object to nest this steps statistics in. Default is to place statistics in the ETLTask level statistics. kwargs: effective_date: datetime The effective date to use for the update """ date_coerce = self.get_coerce_method(self.begin_date_column) effective_date = date_coerce(kwargs.get('effective_date', self.default_effective_date)) if criteria_list is None: criteria_list = [] criteria_list.extend( [ self.get_column(self.delete_flag) == self.delete_flag_no, self.get_column(self.begin_date_column) <= effective_date, self.get_column(self.end_date_column) >= effective_date, ] ) stats = self.get_unique_stats_entry(stat_name, parent_stats=parent_stats) stats['rows read'] = 0 stats['updates count'] = 0 stats.timer.start() self.begin() if progress_frequency is None: progress_frequency = self.progress_frequency progress_timer = Timer() # Turn off read progress reports saved_progress_frequency = self.progress_frequency self.progress_frequency = None if lookup_name is None: lookup_name = self.get_nk_lookup_name() lookup = self.get_nk_lookup() else: lookup = self.get_lookup(lookup_name) # Note, here we select only lookup columns from self for row in self.where( column_list=lookup.lookup_keys, criteria_list=criteria_list, criteria_dict=criteria_dict, # We can't use cache since we have a list of in-equality filters use_cache_as_source=False, parent_stats=stats ): if row.status == RowStatus.unknown: pass stats['rows read'] += 1 existing_key = lookup.get_hashable_combined_key(row) if 0 < progress_frequency <= progress_timer.seconds_elapsed: progress_timer.reset() self.log.info( f"update_not_in_set current current row#={stats['rows read']} " f"row key={existing_key} updates done so far = {stats['updates count']}" ) if existing_key not in set_of_key_tuples: stats['updates count'] += 1 # First we need the entire existing row target_row = self.get_by_lookup_and_effective_date( lookup_name, row, effective_date=effective_date ) # Then we can apply the updates to it self.apply_updates( target_row, additional_update_values=updates_to_make, parent_stats=stats, allow_insert=False, effective_date=effective_date, ) stats.timer.stop() # Restore saved progress_frequency self.progress_frequency = saved_progress_frequency
[docs] def cleanup_versions( self, remove_spurious_deletes: bool = False, remove_redundant_versions: bool = None, # Default depends on auto_generate_key value lookup_name: str = None, exclude_from_compare: frozenset = None, criteria_list: list = None, criteria_dict: dict = None, use_cache_as_source: bool = True, repeat_until_clean: bool = True, max_begin_date_warnings: int = 100, max_end_date_warnings: int = 100, max_passes: int = 10, parent_stats=None, progress_message="{table} cleanup_versions " "pass {pass_number} current row # {row_number:,}", ): """ This routine will look for and remove versions where no material difference exists between it and the prior version. That can happen during loads if rows come in out of order. The routine can also optionally look for remove_spurious_deletes (see below). It also checks for version dates that are not set correctly. Parameters ---------- remove_spurious_deletes: boolean (defaults to False): Should the routine check for and remove versions that tag a record as being deleted only to be un-deleted later. remove_redundant_versions: boolean (defaults to opposite of auto_generate_key value): Should the routine delete rows that are exactly the same as the previous. lookup_name: str Name passed into :meth:`define_lookup` criteria_list : string or list of strings Each string value will be passed to :meth:`sqlalchemy.sql.expression.Select.where`. https://goo.gl/JlY9us criteria_dict : dict Dict keys should be columns, values are set using = or in exclude_from_compare: frozenset Columns to exclude from comparisons. Defaults to begin date, end date, and last update date. Any values passed in are added to that list. use_cache_as_source: bool Attempt to read existing rows from the cache? repeat_until_clean: repeat loop until all issues are cleaned. Multiple bad rows in a key set can require multiple passes. max_begin_date_warnings: How many warning messages to print about bad begin dates max_end_date_warnings: How many warning messages to print about bad end dates max_passes: How many times should we loop over the dataset if we keep finding fixes. Note: Some situations do require multiple passes to fully correct. parent_stats: bi_etl.statistics.Statistics Optional Statistics object to nest this steps statistics in. Default is to place statistics in the ETLTask level statistics. progress_message : str The progress message to print. """ stats = self.get_unique_stats_entry('cleanup_versions', parent_stats=parent_stats) stats.timer.start() stats.ensure_exists('rows sent for delete') self.commit() # Figure out default remove_redundant_versions value if not provided if remove_redundant_versions is None: if self.auto_generate_key: # Assume that tables with autogenerated keys are dimensions where we don't # want to delete a surrogate key that might be referenced remove_redundant_versions = False else: remove_redundant_versions = True if exclude_from_compare is None: exclude_from_compare = frozenset() if lookup_name is None: if self.natural_key is not None: lookup_object = self.get_nk_lookup() exclude_from_compare = exclude_from_compare | frozenset(self.primary_key) else: lookup_object = self.get_pk_lookup() else: lookup_object = self.get_lookup(lookup_name) # Don't check in begin or end dates for differences if self.begin_date_column not in exclude_from_compare: exclude_from_compare = exclude_from_compare | {self.begin_date_column} if self.end_date_column not in exclude_from_compare: exclude_from_compare = exclude_from_compare | {self.end_date_column} if self.last_update_date not in exclude_from_compare: exclude_from_compare = exclude_from_compare | {self.last_update_date} # Save and later restore the master trace_data setting. # We're assuming caller doesn't want the read data traced, only the delete actions trace_delete_data = self.trace_data # Temporarily turn off read progress messages saved_read_progress = self.progress_frequency self.progress_frequency = None begin_date_issues = 0 end_date_issues = 0 try: self.trace_data = False pass_number = 0 # Repeat until... see below while True: pass_number += 1 if pass_number > max_passes: raise ValueError(f"cleanup_versions failed to get to clean state after {max_passes} passes.") clean_pass = True prior_row = None prior_row_pk_values = None second_prior_row = None second_prior_row_pk_values = None pending_cache_deletes = list() if use_cache_as_source: if criteria_list is not None: self.log.warning("cleanup_versions had to use DB source to honor criteria_list") use_cache_as_source = False if not self.cache_filled: self.log.warning( f"cleanup_versions had to use DB source since cache_filled = {self.cache_filled}" ) use_cache_as_source = False if use_cache_as_source: iterator = iter(lookup_object) row_stat_name = 'rows read from cache' else: if not self.in_bulk_mode and pass_number == 1: # Apply any pending updates and deletes self.commit() order_by = list(lookup_object.lookup_keys) # Make sure begin date is not already in the order_by if self.begin_date_column in order_by: order_by.remove(self.begin_date_column) # Add begin date as the last part of the order by order_by.append(self.begin_date_column) self.log.debug(f"ordering by {order_by}") iterator = self.where( criteria_list=criteria_list, criteria_dict=criteria_dict, order_by=order_by, parent_stats=stats, ) row_stat_name = 'rows read from table' progress_timer = Timer() rows_read = 0 # Note iterator needs to return the rows with the natural key # grouped together (but not necessarily ordered) and in begin_date order. for row in iterator: rows_read += 1 if 0 < saved_read_progress <= progress_timer.seconds_elapsed: progress_timer.reset() self.log.info( progress_message.format( pass_number=pass_number, row_number=rows_read, table=self, ) ) stats[row_stat_name] += 1 if row.status == RowStatus.deleted: raise RuntimeError(f"iterator returned deleted row!\n row={repr(row)}") # =========================================================== # Debugging # if row['person_srgt_key'] in (4550, 986): # print repr(row) # =========================================================== if prior_row is not None: row_pk_values = lookup_object.get_hashable_combined_key(row) if row_pk_values != prior_row_pk_values: # Change in PK. # Reset second_prior_row second_prior_row = None second_prior_row_pk_values = None # Check first begin date if self.default_begin_date is not None: if row[self.begin_date_column] > self.default_begin_date: clean_pass = False begin_date_issues += 1 if begin_date_issues < max_begin_date_warnings: self.log.warning( f"Correcting first version begin date for {row_pk_values} " f"from {row[self.begin_date_column]} to {self.default_begin_date}" ) elif begin_date_issues == max_begin_date_warnings: self.log.warning("Limit reached for 'Correcting begin date' messages.") # If not correct update the prior row end date super().apply_updates( row, additional_update_values={ self.begin_date_column: self.default_begin_date }, parent_stats=stats, allow_insert=False, ) stats.add_to_stat('rows with bad date sequence', 1) # Check final end date on prior row (last in series) if prior_row[self.end_date_column] != self.default_end_date: end_date_issues += 1 if end_date_issues < max_end_date_warnings: self.log.warning( f"Correcting final version end date for {prior_row_pk_values} " f"from {prior_row[self.end_date_column]} to {self.default_end_date}" ) elif end_date_issues == max_end_date_warnings: self.log.warning("Limit reached for 'Correcting end date' messages.") # If not correct update the prior row end date super().apply_updates( prior_row, additional_update_values={ self.end_date_column: self.default_end_date }, parent_stats=stats, allow_insert=False, ) stats.add_to_stat('rows with bad date sequence', 1) ########################################################################################### # Checks for subsequent versions after the first. # (if the last row and this have the same natural key) else: # Check that rows are coming in order if row[self.begin_date_column] < prior_row[self.begin_date_column]: raise RuntimeError( f"rows not in begin date order!\nrow={repr(row)}\nprior_row={repr(prior_row)}" ) # Check that end dates are correct correct_prior_end_date = row[self.begin_date_column] + self._end_date_timedelta if prior_row[self.end_date_column] != correct_prior_end_date: clean_pass = False # If not correct update the prior row end date end_date_issues += 1 if end_date_issues < max_end_date_warnings: self.log.warning( f"Correcting end date for {prior_row_pk_values} " f"from {prior_row[self.end_date_column]} to {correct_prior_end_date}" ) elif end_date_issues == max_end_date_warnings: self.log.warning( f"Limit ({max_end_date_warnings}) reached " f"for 'Correcting end date' messages." ) super().apply_updates( prior_row, additional_update_values={ self.end_date_column: correct_prior_end_date }, parent_stats=stats, allow_insert=False, ) stats.add_to_stat('rows with bad date sequence', 1) # Delete the previous row if it's a delete, and this following one is not if ( remove_spurious_deletes and prior_row[self.delete_flag] == self.delete_flag_yes and row[self.delete_flag] == self.delete_flag_no ): if trace_delete_data: self.log.debug(f"deleting spurious logical delete row {prior_row}") # We can't risk a change to cache as we iterate, so we keep a list of # deletes to make in the cache pending_cache_deletes.append((prior_row, second_prior_row)) clean_pass = False prior_row = second_prior_row prior_row_pk_values = second_prior_row_pk_values second_prior_row = None second_prior_row_pk_values = None # Compare this row to last row, and delete # this if all values are the same (and this is not a delete) elif (remove_redundant_versions and prior_row is not None and prior_row.status != RowStatus.deleted): differences = row.compare_to( prior_row, exclude=exclude_from_compare, coerce_types=False ) if len(differences) == 0: if trace_delete_data: self.log.debug(f"deleting spurious unchanged row {row}") # Note: We can't remove from the cache in case we are iterating on that pending_cache_deletes.append((row, prior_row)) stats['rows sent for delete'] += 1 clean_pass = False if row.status != RowStatus.deleted: second_prior_row = prior_row prior_row = row prior_row_pk_values = lookup_object.get_hashable_combined_key(row) # ensure we don't accidentally refer to prior_row or second_prior_row anymore del prior_row del second_prior_row # If using cache as source we didn't call physically_delete_version # to delete from the db or cache, we need to do it now for row, prior_row in pending_cache_deletes: # TODO: Need to store & get second_prior_row in the pending del list self.physically_delete_version( row, prior_row=prior_row, remove_from_cache=True, parent_stats=stats, ) if row.status != RowStatus.deleted: raise RuntimeError( f"row not actually tagged as deleted row!\nrow={repr(row)}" ) self.commit() # Break out of the repeat loop if the pass was clean, or we aren't supposed to loop if not clean_pass and repeat_until_clean: self.log.debug( f"Repeating cleanup_versions check. Completed pass {pass_number}." ) self.log.debug(f"{stats['rows sent for delete']} rows deleted so far.") self.log.debug(f"{begin_date_issues} begin date issues so far.") self.log.debug(f"{end_date_issues} end date issues so far.") else: break except Exception as e: self.log.error(traceback.format_exc()) self.log.exception(e) raise RuntimeError("cleanup_versions failed") finally: self.trace_data = trace_delete_data # Restore read progress messages self.progress_frequency = saved_read_progress stats.timer.stop()
[docs] def upsert_special_values_rows( self, stat_name: str = 'upsert_special_values_rows', parent_stats: Statistics = None ): """ Send all special values rows to upsert to ensure they exist and are current. Rows come from: - :meth:`get_missing_row` - :meth:`get_invalid_row` - :meth:`get_not_applicable_row` - :meth:`get_various_row` Parameters ---------- stat_name: Name of this step for the ETLTask statistics. Default = 'upsert_special_values_rows' parent_stats: Optional Statistics object to nest this steps statistics in. Default is to place statistics in the ETLTask level statistics. """ # By using the default begin date here we should force a type 1 update since # the existing record, if any, would have the same begin date. # If the existing record does not have that begin date, this will fail since # we turned off auto_generate_key. self.custom_special_values[self.begin_date_column] = self.default_begin_date self.custom_special_values[self.end_date_column] = self.default_end_date super().upsert_special_values_rows( stat_name=stat_name, parent_stats=parent_stats, )
def _sql_insert_new( self, source_table: str, matching_columns: typing.Iterable, effective_date_column: str = None, connection_name: str = 'sql_upsert', ): now_str = self._sql_date_literal(self.get_current_time()) if self.inserts_use_default_begin_date or effective_date_column is None: new_row_begin_date = self._sql_date_literal(self.default_begin_date) else: new_row_begin_date = effective_date_column last_update_date_into_str = '-- No last update date' last_update_date_select_str = '-- No last update date' if self.last_update_date is not None: last_update_date_into_str = f"{self.last_update_date}" last_update_date_select_str = f"'{now_str}' as {self.last_update_date}" if self.auto_generate_key: type2_srgt_sql = 'max_srgt.max_srgt + ROW_NUMBER() OVER (order by 1)' # Type 1 srgt if self.type_1_surrogate is None: insert_list_type_1 = '--No type 1 srgt present' select_list_type_1 = '--No type 1 srgt present' else: insert_list_type_1 = f"{self.type_1_surrogate}" select_list_type_1 = f"{type2_srgt_sql} as {self.type_1_surrogate}" insert_new_sql = f""" INSERT INTO {self.qualified_table_name} ( {self._sql_primary_key_name()}, {insert_list_type_1}, {self.delete_flag}, {self.begin_date_column}, {self.end_date_column}, {last_update_date_into_str}, {Table._sql_indent_join(matching_columns, 16, suffix=',')} ) WITH max_srgt AS (SELECT coalesce(max({self._sql_primary_key_name()}),0) as max_srgt FROM {self.qualified_table_name} ) SELECT {type2_srgt_sql} as {self._sql_primary_key_name()}, {select_list_type_1}, '{self.delete_flag_no}' as {self.delete_flag}, {new_row_begin_date} as {self.begin_date_column}, {self._sql_date_literal(self.default_end_date)} as {self.end_date_column}, {last_update_date_select_str}, {Table._sql_indent_join(matching_columns, 16, suffix=',')} FROM max_srgt CROSS JOIN {source_table} s WHERE NOT EXISTS( SELECT 1 FROM {self.qualified_table_name} e WHERE {Table._sql_indent_join( [f"e.{nk_col} = s.{nk_col}" for nk_col in self.natural_key], 18, prefix='AND ' )} ) """ else: insert_new_sql = f""" INSERT INTO {self.qualified_table_name} ( {Table._sql_indent_join(self.primary_key, 20, suffix=',')}, {self.delete_flag}, {self.begin_date_column}, {self.end_date_column}, {last_update_date_into_str} {Table._sql_indent_join(matching_columns, 20, suffix=',')} ) SELECT {Table._sql_indent_join(self.primary_key, 20, suffix=',')}, '{self.delete_flag_no}' as {self.delete_flag}, {new_row_begin_date} as {self.begin_date_column}, {self._sql_date_literal(self.default_end_date)} as {self.end_date_column}, {last_update_date_select_str} {Table._sql_indent_join(matching_columns, 20, suffix=',')} FROM {source_table} s WHERE NOT EXISTS( SELECT 1 FROM {self.qualified_table_name} e WHERE {Table._sql_indent_join( [f"e.{key_col} = s.{key_col}" for key_col in self.primary_key], 18, prefix='AND ' )} ) """ self.log.debug('') self.log.debug('=' * 80) self.log.debug('insert_new_sql') self.log.debug('=' * 80) self.log.debug(insert_new_sql) sql_timer = Timer() results = self.execute(insert_new_sql, connection_name=connection_name) self.log.info(f"Impacted rows = {results.rowcount} (meaningful count? {results.supports_sane_rowcount()})") self.log.info(f"Execution time ={sql_timer.seconds_elapsed_formatted}") def _sql_find_updates( self, source_table: ReadOnlyTable, source_effective_date_column: str, key_columns_set: typing.Iterable, non_nk_matching_columns: typing.Iterable, target_only_columns: set, updated_row_nk_table: str, connection_name: str = 'sql_upsert', ): self.database.drop_table_if_exists(updated_row_nk_table, connection_name=connection_name, auto_close=False, transaction=False) find_updates_sql = f""" CREATE TABLE {updated_row_nk_table} AS SELECT {Table._sql_indent_join([f"e.{nk_col}" for nk_col in key_columns_set], 16, suffix=',')}, s.{source_effective_date_column} as source_effective_date, e.{self.begin_date_column} as target_begin_date, e.{self.end_date_column} as target_end_date, {Table._sql_indent_join([f"CASE WHEN {Table._sql_column_not_equals(m_col, 'e', 's')} THEN 1 ELSE 0 END as chg_in_{m_col}" for m_col in non_nk_matching_columns], 16, suffix=',')} {Table._sql_indent_join([f'e.{col}' for col in target_only_columns], 16, suffix=',', prefix_if_not_empty=',')} FROM {self.qualified_table_name} e INNER JOIN {source_table.qualified_table_name} s ON {Table._sql_indent_join([f"e.{nk_col} = s.{nk_col}" for nk_col in key_columns_set], 19, prefix='AND ')} AND s.{source_effective_date_column} BETWEEN e.{self.begin_date_column} AND e.{self.end_date_column} WHERE ( {Table._sql_indent_join([Table._sql_column_not_equals(m_col, 'e', 's') for m_col in non_nk_matching_columns], 20, prefix='OR ')} OR e.{self.delete_flag} != '{self.delete_flag_no}' ) """ self.log.debug('-' * 80) self.log.debug('find_updates_sql') self.log.debug('-' * 80) sql_timer = Timer() self.log.debug(find_updates_sql) self.execute(find_updates_sql, connection_name=connection_name) self.log.debug(f"Execution time ={sql_timer.seconds_elapsed_formatted}") def _sql_insert_updates( self, source_table: ReadOnlyTable, source_effective_date_column: str, matching_columns: typing.Iterable, non_nk_matching_columns: typing.Iterable, target_only_columns: set, updated_row_nk_table: str, connection_name: str = 'sql_upsert', ): now_str = self._sql_date_literal(self.get_current_time()) last_update_date_into_str = '/* No last update date */' last_update_date_select_str = '/* No last update date */' if self.last_update_date is not None: last_update_date_into_str = f"{self.last_update_date}," last_update_date_select_str = f"'{now_str}' as {self.last_update_date}," if self.auto_generate_key: primary_key_name = self.primary_key[0] insert_updates_sql = f""" INSERT INTO {self.qualified_table_name} ( {primary_key_name}, {self.delete_flag}, {self.begin_date_column}, {self.end_date_column}, {last_update_date_into_str} {Table._sql_indent_join(matching_columns, 16, suffix=',')} {Table._sql_indent_join(target_only_columns, 16, suffix=',', prefix_if_not_empty=',')} ) WITH max_srgt AS (SELECT coalesce(max({primary_key_name}),0) as max_srgt FROM {self.qualified_table_name} ) SELECT max_srgt.max_srgt + ROW_NUMBER() OVER (order by 1) as {primary_key_name}, '{self.delete_flag_no}' as {self.delete_flag}, u.source_effective_date as {self.begin_date_column}, u.target_end_date as {self.end_date_column}, {last_update_date_select_str} {Table._sql_indent_join([f's.{col}' for col in matching_columns], 16, suffix=',')} {Table._sql_indent_join([f'u.{col}' for col in target_only_columns], 16, suffix=',', prefix_if_not_empty=',')} FROM max_srgt CROSS JOIN {source_table.qualified_table_name} s INNER JOIN {updated_row_nk_table} u ON {Table._sql_indent_join([f"u.{nk_col} = s.{nk_col}" for nk_col in self.natural_key], 18, prefix='AND')} AND s.{source_effective_date_column} = u.source_effective_date """ else: insert_updates_sql = f""" INSERT INTO {self.qualified_table_name} ( {Table._sql_indent_join([key_col for key_col in self.primary_key], 16, suffix=',')}, {self.delete_flag}, {self.begin_date_column}, {self.end_date_column}, {last_update_date_into_str} {Table._sql_indent_join(non_nk_matching_columns, 16, suffix=',')} {Table._sql_indent_join(target_only_columns, 16, suffix=',', prefix_if_not_empty=',')} ) SELECT {Table._sql_indent_join([f"u.{key_col}" for key_col in self.primary_key], 16, suffix=',')}, '{self.delete_flag_no}' as {self.delete_flag}, u.source_effective_date as {self.begin_date_column}, u.target_end_date as {self.end_date_column}, {last_update_date_select_str} {Table._sql_indent_join([f's.{col}' for col in non_nk_matching_columns], 16, suffix=',')} {Table._sql_indent_join([f'u.{col}' for col in target_only_columns], 16, suffix=',', prefix_if_not_empty=',')} FROM {source_table.qualified_table_name} s INNER JOIN {updated_row_nk_table} u ON {Table._sql_indent_join([f"u.{key_col} = s.{key_col}" for key_col in self.primary_key], 18, prefix='AND')} AND s.{source_effective_date_column} = u.source_effective_date """ self.log.debug('-' * 80) self.log.debug('insert_updates_sql') self.log.debug('-' * 80) self.log.debug(insert_updates_sql) sql_timer = Timer() results = self.execute(insert_updates_sql, connection_name=connection_name) self.log.debug(f"Impacted rows = {results.rowcount:,} (meaningful count? {results.supports_sane_rowcount()})") self.log.debug(f"Execution time ={sql_timer.seconds_elapsed_formatted}") def _sql_find_deletes( self, source_table: ReadOnlyTable, key_columns_set: typing.Iterable, deleted_row_nk_table: str, connection_name: str = 'sql_upsert', ): # self.execute(f"DROP TABLE IF EXISTS {updated_row_nk_table}") now_str = self._sql_date_literal(self.get_current_time()) self.database.drop_table_if_exists(deleted_row_nk_table, connection_name=connection_name, auto_close=False, transaction=False) find_deletes_sql = f""" CREATE TABLE {deleted_row_nk_table} AS SELECT {Table._sql_indent_join([f"e.{nk_col}" for nk_col in key_columns_set], 16, suffix=',')}, {now_str} as source_effective_date, e.{self.begin_date_column} as target_begin_date, e.{self.end_date_column} as target_end_date FROM {self.qualified_table_name} e WHERE {now_str} BETWEEN e.{self.begin_date_column} AND e.{self.end_date_column} AND NOT EXISTS ( SELECT 1 FROM {source_table.qualified_table_name} s WHERE {Table._sql_indent_join([f"e.{nk_col} = s.{nk_col}" for nk_col in key_columns_set], 19, prefix='AND ')} ) """ self.log.debug('-' * 80) self.log.debug('find_deletes_sql') self.log.debug('-' * 80) sql_timer = Timer() self.log.debug(find_deletes_sql) self.execute(find_deletes_sql, connection_name=connection_name) self.log.debug(f"Execution time ={sql_timer.seconds_elapsed_formatted}") def _sql_insert_deletes( self, target_column_set: typing.Iterable, metadata_column_set: typing.Iterable, deleted_row_nk_table: str, connection_name: str = 'sql_upsert', ): target_non_metadata_column_set = set(target_column_set) - set(metadata_column_set) now_str = self._sql_date_literal(self.get_current_time()) last_update_date_into_str = '/* No last update date */' last_update_date_select_str = '/* No last update date */' if self.last_update_date is not None: last_update_date_into_str = f"{self.last_update_date}," last_update_date_select_str = f"'{now_str}' as {self.last_update_date}," if self.auto_generate_key: primary_key_name = self.primary_key[0] insert_delete_versions_sql = f""" INSERT INTO {self.qualified_table_name} ( {primary_key_name}, {self.delete_flag}, {self.begin_date_column}, {self.end_date_column}, {last_update_date_into_str} {Table._sql_indent_join(target_non_metadata_column_set, 20, suffix=',')} ) WITH max_srgt AS (SELECT coalesce(max({primary_key_name}),0) as max_srgt FROM {self.qualified_table_name} ) SELECT max_srgt.max_srgt + ROW_NUMBER() OVER (order by 1) as {primary_key_name}, '{self.delete_flag_yes}' as {self.delete_flag}, u.source_effective_date as {self.begin_date_column}, u.target_end_date as {self.end_date_column}, {last_update_date_select_str} {Table._sql_indent_join([f'e.{col}' for col in target_non_metadata_column_set], 20, suffix=',')} FROM max_srgt CROSS JOIN {self.qualified_table_name} e INNER JOIN {deleted_row_nk_table} u ON {Table._sql_indent_join([f"u.{nk_col} = e.{nk_col}" for nk_col in self.natural_key], 18, prefix='AND')} AND u.target_begin_date = e.{self.begin_date_column} """ else: insert_delete_versions_sql = f""" INSERT INTO {self.qualified_table_name} ( {Table._sql_indent_join([key_col for key_col in self.primary_key], 16, suffix=',')}, {self.delete_flag}, {self.begin_date_column}, {self.end_date_column}, {last_update_date_into_str} {Table._sql_indent_join(target_non_metadata_column_set, 20, suffix=',')} ) SELECT {Table._sql_indent_join([f"u.{key_col}" for key_col in self.primary_key], 20, suffix=',')}, '{self.delete_flag_yes}' as {self.delete_flag}, u.source_effective_date as {self.begin_date_column}, u.target_end_date as {self.end_date_column}, {last_update_date_select_str} {Table._sql_indent_join([f'e.{col}' for col in target_non_metadata_column_set], 16, suffix=',')} FROM {self.qualified_table_name} e INNER JOIN {deleted_row_nk_table} u ON {Table._sql_indent_join([f"u.{key_col} = e.{key_col}" for key_col in self.primary_key], 18, prefix='AND')} AND u.target_begin_date = e.{self.begin_date_column} """ self.log.debug('-' * 80) self.log.debug('insert_delete_versions_sql') self.log.debug('-' * 80) self.log.debug(insert_delete_versions_sql) sql_timer = Timer() results = self.execute(insert_delete_versions_sql, connection_name=connection_name) self.log.debug(f"Impacted rows = {results.rowcount:,} (meaningful count? {results.supports_sane_rowcount()})") self.log.debug(f"Execution time ={sql_timer.seconds_elapsed_formatted}")
[docs] def sql_upsert( self, source_table: ReadOnlyTable, source_effective_date_column: str, source_excludes: typing.Optional[frozenset] = None, target_excludes: typing.Optional[frozenset] = None, skip_update_check_on: typing.Optional[frozenset] = None, check_for_deletes: bool = None, connection_name: str = 'sql_upsert', temp_table_prefix: str = '', # Only set commit_each_table to True for debugging purposes commit_each_table: bool = False, stat_name: str = 'upsert_db_exclusive', parent_stats: typing.Optional[Statistics] = None, ): upsert_db_excl_stats = self.get_stats_entry(stat_name, parent_stats=parent_stats) upsert_db_excl_stats.print_start_stop_times = False upsert_db_excl_stats.timer.start() upsert_db_excl_stats['calls'] += 1 self.transaction(connection_name) source_row_columns_set = source_table.column_names_set if source_excludes is not None: source_row_columns_set = source_row_columns_set - source_excludes target_column_set = self.column_names_set if target_excludes is not None: target_column_set = target_column_set - target_excludes version_date_column_set = {self.begin_date_column, self.end_date_column} metadata_column_set = set(version_date_column_set) if self.delete_flag is not None: metadata_column_set.add(self.delete_flag) if self.last_update_date is not None: metadata_column_set.add(self.last_update_date) metadata_column_set.update(self.primary_key) matching_columns = {column_name for column_name in source_row_columns_set if column_name in target_column_set and column_name not in metadata_column_set} target_only_columns = {column_name for column_name in target_column_set if column_name not in (source_row_columns_set | metadata_column_set)} compare_columns = matching_columns if self.natural_key is not None: if len(self.natural_key) == 0: raise ValueError(f"Empty natural key for {self}") if not self.autogenerate_key: raise ValueError( f"Natural key for {self} when autogenerate_key = False and primary key is {self.primary_key}" ) key_columns_set = set(self.natural_key) elif self.primary_key is not None and len(self.primary_key) > 0: key_columns_set = set(self.primary_key) else: raise ValueError(f"No primary key or natural key for {self}") non_nk_matching_columns = matching_columns - key_columns_set if self.delete_flag is not None: if check_for_deletes is None: check_for_deletes = True if self.delete_flag in non_nk_matching_columns: non_nk_matching_columns.remove(self.delete_flag) if self.delete_flag in target_only_columns: target_only_columns.remove(self.delete_flag) else: if check_for_deletes is None: check_for_deletes = False if self.last_update_date is not None: compare_columns -= {self.last_update_date} if self.last_update_date in non_nk_matching_columns: non_nk_matching_columns.remove(self.last_update_date) if self.last_update_date in target_only_columns: target_only_columns.remove(self.last_update_date) if len(self.primary_key) != 1: raise ValueError(f"{self}.upsert_db_exclusive requires single primary_key column. Got {self.primary_key}") # Insert rows for new Natural Keys self._sql_insert_new( source_table=source_table.qualified_table_name, matching_columns=matching_columns, connection_name=connection_name, ) if commit_each_table: self.commit() self.transaction() self.log.debug("Commit done") updated_row_nk_table = f"{temp_table_prefix}{self.table_name}_updated_row_nk" updated_row_nk_table = self._safe_temp_table_name(updated_row_nk_table) self._sql_find_updates( source_table=source_table, source_effective_date_column=source_effective_date_column, key_columns_set=key_columns_set, non_nk_matching_columns=non_nk_matching_columns, target_only_columns=target_only_columns, updated_row_nk_table=updated_row_nk_table, connection_name=connection_name, ) if commit_each_table: self.commit() self.transaction() self.log.debug("Commit done") self._sql_insert_updates( source_table=source_table, source_effective_date_column=source_effective_date_column, matching_columns=matching_columns, non_nk_matching_columns=non_nk_matching_columns, target_only_columns=target_only_columns, updated_row_nk_table=updated_row_nk_table, connection_name=connection_name, ) if commit_each_table: self.commit() self.transaction() self.log.debug("Commit done") joins = [(nk_col, ("u", nk_col)) for nk_col in key_columns_set] joins.append((self.begin_date_column, ('u', 'target_begin_date'))) self._sql_update_from( update_name='update_retire', source_sql=f"{updated_row_nk_table} u", list_of_joins=joins, extra_where="u.source_effective_date != u.target_begin_date", list_of_sets=[ (self.end_date_column, self._sql_add_timedelta('u.source_effective_date', self._end_date_timedelta) ), ], connection_name=connection_name, ) if commit_each_table: self.commit() self.transaction() self.log.debug("Commit done") if check_for_deletes: deleted_row_nk_table = f"{temp_table_prefix}{self.table_name}_del_row_nk" deleted_row_nk_table = self._safe_temp_table_name(deleted_row_nk_table) self._sql_find_deletes( source_table=source_table, key_columns_set=key_columns_set, deleted_row_nk_table=deleted_row_nk_table, connection_name=connection_name, ) if commit_each_table: self.commit() self.transaction() self.log.debug("Commit done") # Retire versions joins = [(nk_col, ("d", nk_col)) for nk_col in key_columns_set] joins.append((self.begin_date_column, ('d', 'target_begin_date'))) self._sql_update_from( update_name='delete_retire', source_sql=f"{deleted_row_nk_table} d", list_of_joins=joins, extra_where="d.source_effective_date != d.target_begin_date", list_of_sets=[ (self.end_date_column, self._sql_add_timedelta('d.source_effective_date', self._end_date_timedelta) ), ], connection_name=connection_name, ) if commit_each_table: self.commit() self.transaction() self.log.debug("Commit done") self._sql_insert_deletes( target_column_set=target_column_set, metadata_column_set=metadata_column_set, deleted_row_nk_table=deleted_row_nk_table, connection_name=connection_name, ) self.log.debug('=' * 80) upsert_db_excl_stats.timer.stop()