Source code for bi_etl.components.hst_table_source_based

import textwrap
import typing

from bi_etl.components.hst_table import HistoryTable
from bi_etl.components.readonlytable import ReadOnlyTable
from bi_etl.components.row.row import Row
from bi_etl.components.row.row_status import RowStatus
from bi_etl.components.table import Table
from bi_etl.database import DatabaseMetadata
from bi_etl.exceptions import NoResultFound
from bi_etl.scheduler.task import ETLTask
from bi_etl.statistics import Statistics
from bi_etl.timer import Timer

__all__ = ['HistoryTableSourceBased']


# noinspection PyAbstractClass
[docs] class HistoryTableSourceBased(HistoryTable): """ ETL target component for a table that stores history of updates where the source provides the dates to use. Also usable as a source. Example: Target table currently has: 2016 - A 2017 - B 2018 - C Source Deletes 2017 version. A normal HistoryTable load would not touch the 2017 version since it was not provided in the source. The desired output is: 2016 - A 2017 - A 2018 - C This component handles that update by stepping through the provided source versions and existing target versions updating each target version as required so it matches the closest source version earlier or equal in time. Parameters ---------- task : ETLTask The instance to register in (if not None) database : bi_etl.scheduler.task.Database The database to find the table/view in. table_name : str The name of the table/view. exclude_columns : frozenset Optional. A list of columns to exclude from the table/view. These columns will not be included in SELECT, INSERT, or UPDATE statements. Attributes ---------- auto_generate_key: boolean Should the primary key be automatically generated by the insert/upsert process? If True, the process will get the current maximum value and then increment it with each insert. begin_date: str Name of the begin date field end_date: str Name of the end date field inserts_use_default_begin_date: boolean Should inserts use the default begin date instead of the class effective date This allows records to match up in joins with other history tables where the effective date of the 'primary' might be before the first version effective date. Default = True default_begin_date: date Default begin date to assign for begin_date. Used for new records if inserts_use_default_begin_date is True. Also used for :meth:`get_missing_row`, :meth:`get_invalid_row`, :meth:`get_not_applicable_row`, :meth:`get_various_row` Default = 1900-1-1 default_end_date: date Default begin date to assign for end_date for active rows. Also used for :meth:`get_missing_row`, :meth:`get_invalid_row`, :meth:`get_not_applicable_row`, :meth:`get_various_row` Default = 9999-1-1 auto_generate_key: boolean Should the primary key be automatically generated by the insert/upsert process? If True, the process will get the current maximum value and then increment it with each insert. (inherited from Table) batch_size: int How many rows should be insert/update/deleted in a single batch. (inherited from Table) delete_flag : str The name of the delete_flag column, if any. Optional. (inherited from ReadOnlyTable) delete_flag_yes : str The value of delete_flag for deleted rows. Optional. (inherited from ReadOnlyTable) delete_flag_no : str The value of delete_flag for *not* deleted rows. Optional. (inherited from ReadOnlyTable) default_date_format: str The date parsing format to use for str -> date conversions. If more than one date format exists in the source, then explicit conversions will be required. Default = '%m/%d/%Y' (inherited from Table) force_ascii: boolean Should text values be forced into the ascii character set before passing to the database? Default = False (inherited from Table) last_update_date: str Name of the column which we should update when table updates are made. Default = None (inherited from Table) log_first_row : boolean Should we log progress on the first row read. *Only applies if used as a source.* (inherited from ETLComponent) max_rows : int The maximum number of rows to read. *Only applies if Table is used as a source.* Optional. (inherited from ETLComponent) primary_key: list The name of the primary key column(s). Only impacts trace messages. Default=None. If not passed in, will use the database value, if any. (inherited from ETLComponent) natural_key: list The list of natural key columns (as Column objects). The default is the list of non-begin/end date primary key columns. The default is *not* appropriate for dimension tables with surrogate keys. progress_frequency : int How often (in seconds) to output progress messages. Optional. (inherited from ETLComponent) progress_message : str The progress message to print. Optional. Default is ``"{logical_name} row # {row_number}"``. Note ``logical_name`` and ``row_number`` substitutions applied via :func:`format`. (inherited from ETLComponent) special_values_descriptive_columns: list A list of columns that get longer descriptive text in :meth:`get_missing_row`, :meth:`get_invalid_row`, :meth:`get_not_applicable_row`, :meth:`get_various_row` Optional. (inherited from ReadOnlyTable) track_source_rows: boolean Should the :meth:`upsert` method keep a set container of source row keys that it has processed? That set would then be used by :meth:`update_not_processed`, :meth:`logically_delete_not_processed`, and :meth:`delete_not_processed`. (inherited from Table) type_1_surrogate: str The name of the type 1 surrogate key. The value is automatically generated as equal to the type 2 key on inserts and equal to the existing value on updates. Optional. """
[docs] def __init__(self, task: typing.Optional[ETLTask], database: DatabaseMetadata, table_name: str, table_name_case_sensitive: bool = None, schema: str = None, exclude_columns: frozenset = None, **kwargs ): # Don't pass kwargs up to super. They should be set at the end of this method super().__init__(task=task, database=database, table_name=table_name, schema=schema, table_name_case_sensitive=table_name_case_sensitive, exclude_columns=exclude_columns, ) self.prior_upsert_key = None self.prior_upsert_lookup_name = None self.prior_upsert_existing_rows_list = list() self.prior_upsert_existing_row_index = 0 self.prior_upsert_row = None self.prior_upsert_update_callback = None self.prior_upsert_insert_callback = None self.prior_upsert_stat_name = None self.prior_upsert_parent_stats = None # Should be the last call of every init self.set_kwattrs(**kwargs)
def _upsert_version_set( self, source_mapped_as_target_row: Row, lookup_name: str = None, skip_update_check_on: list = None, do_not_update: list = None, additional_update_values: dict = None, additional_insert_values: dict = None, update_callback: typing.Callable[[list, Row], None] = None, insert_callback: typing.Callable[[Row], None] = None, source_excludes: frozenset = None, target_excludes: frozenset = None, stat_name: str = 'upsert', parent_stats: Statistics = None, ): """ Update (if changed) or Insert a row in the table. Returns the row found/inserted, with the auto-generated key (if that feature is enabled) Parameters ---------- source_mapped_as_target_row: :class:`~bi_etl.components.row.row_case_insensitive.Row` Row to upsert lookup_name: str The name of the lookup (see :meth:`define_lookup`) to use when searching for an existing row. skip_update_check_on: list List of column names to not compare old vs new for updates. do_not_update: list List of columns to never update. additional_update_values: dict Additional updates to apply when updating additional_insert_values: dict Additional values to set on each row when inserting. update_callback: func Function to pass updated rows to. Function should not modify row. insert_callback: func Function to pass inserted rows to. Function should not modify row. source_excludes: list of source columns to exclude when mapping to this Table. target_excludes: list of Table columns to exclude when mapping from the source row stat_name: string Name of this step for the ETLTask statistics. Default = 'upsert' parent_stats: bi_etl.statistics.Statistics Optional Statistics object to nest this steps statistics in. Default is to place statistics in the ETLTask level statistics. """ # This should be the (or a) natural key lookup lookup = self.get_lookup(lookup_name) nk_tuple = lookup.get_hashable_combined_key(source_mapped_as_target_row) if self.prior_upsert_key != nk_tuple or self.prior_upsert_lookup_name != lookup_name: # Check if this is the first row and not the end of a batch if self.prior_upsert_key is not None: # This is the end of a batch self.finish_pending_existing_rows() # Reset for the next list self.prior_upsert_row = None self.prior_upsert_key = nk_tuple self.prior_upsert_lookup_name = lookup_name fallback_to_db = self.always_fallback_to_db or not self.cache_clean try: existing_rows = lookup.find_versions_list(self.prior_upsert_key, fallback_to_db=fallback_to_db) except NoResultFound: # No existing rows were found, insert new_row = source_mapped_as_target_row if self.inserts_use_default_begin_date: new_row.set_keeping_parent(self.begin_date_column, self.default_begin_date) if additional_insert_values: for colName, value in additional_insert_values.items(): new_row[colName] = value new_row = self.insert_row(new_row, parent_stats=parent_stats) if insert_callback: insert_callback(new_row) return new_row # We only reach here if a result was found from the lookup self.prior_upsert_existing_rows_list = existing_rows self.prior_upsert_existing_row_index = 0 self.prior_upsert_update_callback = update_callback self.prior_upsert_insert_callback = insert_callback self.prior_upsert_stat_name = stat_name self.prior_upsert_parent_stats = parent_stats if self.inserts_use_default_begin_date: # Force first row to use the default begin date (year 1900) row_begin_date = self.default_begin_date source_mapped_as_target_row.set_keeping_parent(self.begin_date_column, row_begin_date) else: row_begin_date = source_mapped_as_target_row[self.begin_date_column] else: row_begin_date = source_mapped_as_target_row[self.begin_date_column] # Check that rows are coming in order if self.prior_upsert_row is not None: if row_begin_date < self.prior_upsert_row[self.begin_date_column]: raise RuntimeError( textwrap.dedent(f"""\ rows not in begin date order! nk_tuple = {nk_tuple} row = {repr(source_mapped_as_target_row)} prior_upsert_row = {repr(self.prior_upsert_row)} """) ) # Nested join over existing rows and pending upsert rows first_new_row = None get_next_new_row = False upsert_row = source_mapped_as_target_row while not get_next_new_row: needs_upsert = True if self.prior_upsert_existing_row_index >= len(self.prior_upsert_existing_rows_list): # There is no next existing row, use a max date value existing_begin_date = self.default_end_date existing_row = None else: existing_row = self.prior_upsert_existing_rows_list[self.prior_upsert_existing_row_index] existing_begin_date = existing_row[self.begin_date_column] if row_begin_date < existing_begin_date: # New date is before all remaining existing rows upsert_row = source_mapped_as_target_row get_next_new_row = True elif row_begin_date == existing_begin_date: # New date equal to existing date upsert_row = source_mapped_as_target_row get_next_new_row = True self.prior_upsert_existing_row_index += 1 else: # New row begin > Existing date if self.prior_upsert_row is not None: excludes = set(self.primary_key) excludes.add(self.end_date_column) if source_excludes is not None: excludes.update(source_excludes) upsert_row = self.prior_upsert_row.subset(exclude=excludes) upsert_row.set_keeping_parent(self.begin_date_column, existing_begin_date) self.prior_upsert_existing_row_index += 1 else: # No prior row, keep existing data upsert_row = existing_row self.prior_upsert_existing_row_index += 1 # As the logic stands now we could skip the upsert entirely in this scenario needs_upsert = False if needs_upsert: new_row = super().upsert( source_row=upsert_row, lookup_name=lookup_name, skip_update_check_on=skip_update_check_on, do_not_update=do_not_update, additional_update_values=additional_update_values, additional_insert_values=additional_insert_values, update_callback=update_callback, insert_callback=insert_callback, source_excludes=source_excludes, target_excludes=target_excludes, stat_name=stat_name, parent_stats=parent_stats, ) if first_new_row is None: first_new_row = new_row self.prior_upsert_row = upsert_row return first_new_row
[docs] def finish_pending_existing_rows(self): if self.prior_upsert_row is not None: for existing_row in self.prior_upsert_existing_rows_list[self.prior_upsert_existing_row_index:]: existing_begin_date = existing_row[self.begin_date_column] excludes = set(self.primary_key) excludes.add(self.end_date_column) upsert_row = self.prior_upsert_row.subset(exclude=excludes) upsert_row.set_keeping_parent(self.begin_date_column, existing_begin_date) super().upsert( source_row=upsert_row, lookup_name=self.prior_upsert_lookup_name, update_callback=self.prior_upsert_update_callback, insert_callback=self.prior_upsert_insert_callback, stat_name=self.prior_upsert_stat_name, parent_stats=self.prior_upsert_parent_stats, ) else: pass else: pass
[docs] def upsert(self, source_row: typing.Union[Row, typing.List[Row]], lookup_name: str = None, skip_update_check_on: list = None, do_not_update: list = None, additional_update_values: dict = None, additional_insert_values: dict = None, update_callback: typing.Callable[[list, Row], None] = None, insert_callback: typing.Callable[[Row], None] = None, source_excludes: frozenset = None, target_excludes: frozenset = None, stat_name: str = 'upsert', parent_stats: Statistics = None, **kwargs ): """ Update (if changed) or Insert a row in the table. Returns the row found/inserted, with the auto-generated key (if that feature is enabled) Parameters ---------- source_row: :class:`~bi_etl.components.row.row_case_insensitive.Row` Row to upsert lookup_name: str The name of the lookup (see :meth:`define_lookup`) to use when searching for an existing row. skip_update_check_on: list List of column names to not compare old vs new for updates. do_not_update: list List of columns to never update. additional_update_values: dict Additional updates to apply when updating additional_insert_values: dict Additional values to set on each row when inserting. update_callback: func Function to pass updated rows to. Function should not modify row. insert_callback: func Function to pass inserted rows to. Function should not modify row. source_excludes: list of source columns to exclude when mapping to this Table. target_excludes: list of Table columns to exclude when mapping from the source row stat_name: string Name of this step for the ETLTask statistics. Default = 'upsert' parent_stats: bi_etl.statistics.Statistics Optional Statistics object to nest this steps statistics in. Default is to place statistics in the ETLTask level statistics. kwargs: effective_date: datetime The effective date to use for the update """ stats = self.get_stats_entry(stat_name, parent_stats=parent_stats) stats.timer.start() stats['upsert source row count'] += 1 self._set_upsert_mode() self.begin() effective_date = kwargs.get('effective_date') if effective_date is None: if self.begin_date_column in source_row and source_row[self.begin_date_column] is not None: pass else: source_row.set_keeping_parent(self.begin_date_column, self.default_effective_date) else: date_coerce = self.get_coerce_method(self.begin_date_column) source_row.set_keeping_parent(self.begin_date_column, date_coerce(effective_date)) target_excludes = self._target_excludes_for_updates(target_excludes) source_mapped_as_target_row = self.build_row(source_row=source_row, source_excludes=source_excludes, target_excludes=target_excludes, parent_stats=stats, ) if self.track_source_rows: # Keep track of source records so we can check if target rows don't exist in source self.source_keys_processed.add(self.get_natural_key_tuple(source_mapped_as_target_row)) if lookup_name is None: lookup = self.get_default_lookup(source_row.iteration_header) lookup_name = lookup.lookup_name else: lookup = self.get_lookup(lookup_name) if len(lookup.lookup_keys) == 1: # Primary key, it can't be anything else unless not valid. # PK lookup doesn't need to deal with list of dates, so call the parent upsert routine directly return super().upsert( source_row=source_mapped_as_target_row, lookup_name=lookup_name, skip_update_check_on=skip_update_check_on, do_not_update=do_not_update, additional_update_values=additional_update_values, additional_insert_values=additional_insert_values, update_callback=update_callback, insert_callback=insert_callback, source_excludes=source_excludes, target_excludes=target_excludes, stat_name=stat_name, parent_stats=parent_stats, ) else: return self._upsert_version_set( source_mapped_as_target_row=source_mapped_as_target_row, lookup_name=lookup_name, skip_update_check_on=skip_update_check_on, do_not_update=do_not_update, additional_update_values=additional_update_values, additional_insert_values=additional_insert_values, update_callback=update_callback, insert_callback=insert_callback, source_excludes=source_excludes, target_excludes=target_excludes, stat_name=stat_name, parent_stats=parent_stats, )
[docs] def update_not_in_set( self, updates_to_make: typing.Union[dict, Row], set_of_key_tuples: set, lookup_name: str = None, criteria_list: list = None, criteria_dict: dict = None, use_cache_as_source: bool = True, progress_frequency: int = None, stat_name: str = 'update_not_in_set', parent_stats: Statistics = None, **kwargs ): """ Applies update to rows matching criteria that are not in the list_of_key_tuples pass in. Parameters ---------- updates_to_make: :class:`Row` Row or dict of updates to make set_of_key_tuples: set Set of tuples comprising the primary key values. This list represents the rows that should *not* be updated. lookup_name: str The name of the lookup (see :meth:`define_lookup`) to use when searching for an existing row. criteria_list : string or list of strings Each string value will be passed to :meth:`sqlalchemy.sql.expression.Select.where`. https://goo.gl/JlY9us criteria_dict : dict Dict keys should be columns, values are set using = or in use_cache_as_source: bool Attempt to read existing rows from the cache? stat_name: string Name of this step for the ETLTask statistics. Default = 'delete_not_in_set' progress_frequency: int How often (in seconds) to output progress messages. Default = 10. None for no progress messages. parent_stats: bi_etl.statistics.Statistics Optional Statistics object to nest this steps statistics in. Default is to place statistics in the ETLTask level statistics. kwargs: effective_date: datetime The effective date to use for the update """ if criteria_dict is None: criteria_dict = {} criteria_dict[self.delete_flag] = self.delete_flag_no stats = self.get_unique_stats_entry(stat_name, parent_stats=parent_stats) stats['rows read'] = 0 stats['updates count'] = 0 stats.timer.start() self.begin() if progress_frequency is None: progress_frequency = self.progress_frequency progress_timer = Timer() # Turn off read progress reports saved_progress_frequency = self.progress_frequency self.progress_frequency = None if lookup_name is None: lookup = self.get_nk_lookup() else: lookup = self.get_lookup(lookup_name) # Ensure that PK columns are returned in addition to whatever lookup is used keys = list(lookup.lookup_keys) for pk_col in self.primary_key: if pk_col not in keys: keys.append(pk_col) # Note, here we select only lookup + PK columns from self row_iter = self.where(column_list=keys, criteria_list=criteria_list, criteria_dict=criteria_dict, use_cache_as_source=use_cache_as_source, parent_stats=stats) # Save the iterator as a list since we'll be modifying the cache as we run through it rows_list = list(row_iter) for row in rows_list: pk_val = row[self.primary_key[0]] if not isinstance(pk_val, int): raise ValueError(f"Row has {pk_val} (not int) primary key value for {self.primary_key[0]} in {row.as_dict}") elif pk_val < 0: # Skip special rows continue if row.status == RowStatus.unknown: pass stats['rows read'] += 1 existing_key = lookup.get_hashable_combined_key(row) if 0 < progress_frequency <= progress_timer.seconds_elapsed: progress_timer.reset() self.log.info(f"update_not_in_set current current row#={stats['rows read']:,} row key={existing_key} updates done so far = {stats['updates count']:,}") if existing_key not in set_of_key_tuples: stats['updates count'] += 1 # Direct update method. This will be slow due to new statement per update # and also update statements per row # self.update(updates_to_make, key_names=lookup.lookup_keys, key_values=existing_key, connection_name='update') try: # First we need the entire existing row (all versions) target_row_list = lookup.find_versions_list(row) except NoResultFound: raise RuntimeError( f"keys {row.as_key_value_list} found in database or cache but not found now by get_by_lookup" ) for target_row in target_row_list: # Then we can apply the updates to it using Table non-versioned updates Table.apply_updates( self=self, row=target_row, additional_update_values=updates_to_make, allow_insert=False, parent_stats=stats, ) stats.timer.stop() # Restore saved progress_frequency self.progress_frequency = saved_progress_frequency
[docs] def commit(self, stat_name='commit', parent_stats=None, print_to_log=True, connection_name: str = None, begin_new: bool = True, ): if len(self.prior_upsert_existing_rows_list) > 0: # End the final batch self.finish_pending_existing_rows() super().commit(stat_name=stat_name, parent_stats=parent_stats, print_to_log=print_to_log, connection_name=connection_name, begin_new=begin_new, )
[docs] def sql_upsert( self, source_table: ReadOnlyTable, source_excludes: typing.Optional[frozenset] = None, target_excludes: typing.Optional[frozenset] = None, skip_update_check_on: typing.Optional[frozenset] = None, connection_name: str = 'sql_upsert', stat_name: str = 'upsert_db_exclusive', parent_stats: typing.Optional[Statistics] = None, **kwargs ): upsert_db_excl_stats = self.get_stats_entry(stat_name, parent_stats=parent_stats) upsert_db_excl_stats.print_start_stop_times = False upsert_db_excl_stats.timer.start() upsert_db_excl_stats['calls'] += 1 source_row_columns_set = source_table.column_names_set if source_excludes is not None: source_row_columns_set = source_row_columns_set - source_excludes target_column_set = self.column_names_set if target_excludes is not None: target_column_set = target_column_set - target_excludes matching_columns = {column_name for column_name in source_row_columns_set if column_name in target_column_set} compare_columns = matching_columns - {self.begin_date_column, self.end_date_column} non_nk_columns = matching_columns - set(self.natural_key) if self.last_update_date is not None: compare_columns -= {self.last_update_date} if len(self.primary_key) != 1: raise ValueError(f"{self}.upsert_db_exclusive requires single primary_key column. Got {self.primary_key}") primary_key_name = self.primary_key[0] now_str = self.get_current_time().isoformat() # TODO: Need to "deduplicate" source versions if two have identical values except for the effective_date_column deduplicated_source_table = 'public.deduplicated_source' self.database.drop_table_if_exists(deduplicated_source_table, connection_name=connection_name, auto_close=False, transaction=False) deduplicate_source_sql = f""" CREATE TABLE {deduplicated_source_table} AS WITH source_next_prev AS ( SELECT {Table._sql_indent_join([col for col in matching_columns], 23, suffix=',')}, {kwargs['source_begin_date_column']}, LAG({kwargs['source_begin_date_column']}) OVER ( PARTITION BY {Table._sql_indent_join([nk_col for nk_col in self.natural_key], 32, suffix=',')} ORDER BY {kwargs['source_begin_date_column']} ) as prev_effective_date /* LEAD({kwargs['source_begin_date_column']}) OVER ( PARTITION BY {Table._sql_indent_join([nk_col for nk_col in self.natural_key], 32, suffix=',')} ORDER BY {kwargs['source_begin_date_column']} ) as next_effective_date */ FROM {source_table.qualified_table_name} s ) , changes_only AS ( SELECT r.* FROM source_next_prev r LEFT OUTER JOIN source_next_prev prev ON {Table._sql_indent_join([f"prev.{nk_col} = r.{nk_col}" for nk_col in self.natural_key], 32, prefix='AND ')} AND prev.{kwargs['source_begin_date_column']} = r.prev_effective_date WHERE {Table._sql_indent_join([Table._sql_column_not_equals(m_col, 'prev', 'r') for m_col in non_nk_columns], 20, prefix='OR ')} ) SELECT changes_only.*, /* Note we can't use the next_effective_date since we might have skipped one with no changes */ COALESCE( LEAD({kwargs['source_begin_date_column']}) OVER ( PARTITION BY {Table._sql_indent_join([nk_col for nk_col in self.natural_key], 32, suffix=',')} ORDER BY {kwargs['source_begin_date_column']} ), {self._sql_date_literal(self.default_end_date)} ) as end_effective_date FROM changes_only """ self.log.debug('-' * 80) self.log.debug('deduplicate_source_sql') self.log.debug('-' * 80) sql_timer = Timer() self.log.debug(deduplicate_source_sql) self.execute(deduplicate_source_sql, connection_name=connection_name) self.log.debug(f"Execution time ={sql_timer.seconds_elapsed_formatted}") # Insert rows for new Natural Keys self._sql_insert_new( source_table=deduplicated_source_table, matching_columns=matching_columns, effective_date_column=kwargs['source_begin_date_column'], connection_name=connection_name, ) # unified_dates_table = '#unified_dates_table' unified_dates_table = 'public.derek_unified_dates_table' self.database.drop_table_if_exists(unified_dates_table, connection_name=connection_name, auto_close=False, transaction=False) unified_dates_sql = f""" CREATE TABLE {unified_dates_table} AS WITH pass1 as ( SELECT DISTINCT {Table._sql_indent_join([f"s.{nk_col}" for nk_col in self.natural_key], 16, suffix=',')}, s.{kwargs['source_begin_date_column']} as effective_date FROM {deduplicated_source_table} s UNION SELECT {Table._sql_indent_join([f"e.{nk_col}" for nk_col in self.natural_key], 16, suffix=',')}, e.{self.begin_date_column} as effective_date FROM {self.qualified_table_name} e ) SELECT pass1.*, COALESCE( LEAD({self._sql_add_timedelta('pass1.effective_date', self._end_date_timedelta)}) OVER ( PARTITION BY {Table._sql_indent_join([nk_col for nk_col in self.natural_key], 16, suffix=',')} ORDER BY effective_date ), {self._sql_date_literal(self.default_end_date)} ) as new_end_effective_date FROM pass1 """ self.log.debug('-' * 80) self.log.debug('unified_dates_sql') self.log.debug('-' * 80) sql_timer = Timer() self.log.debug(unified_dates_sql) self.execute(unified_dates_sql, connection_name=connection_name) self.log.debug(f"Execution time ={sql_timer.seconds_elapsed_formatted}") # TODO: Need DB specific CREATE TABLE AS or SELECT ... INTO # derek_updated_row_nk_table = '#updated_row_nk' updated_row_nk_table = 'public.derek_updated_row_nk' self.database.drop_table_if_exists(updated_row_nk_table, connection_name=connection_name, auto_close=False, transaction=False) find_updates_sql = f""" CREATE TABLE {updated_row_nk_table} AS SELECT {Table._sql_indent_join([f"e.{nk_col}" for nk_col in self.natural_key], 16, suffix=',')}, s.{kwargs['source_begin_date_column']} as source_effective_date, e.{self.begin_date_column} as target_begin_date, e.{self.end_date_column} as target_end_date, d.new_end_effective_date, {Table._sql_indent_join([f"CASE WHEN {Table._sql_column_not_equals(m_col, 'e', 's')} THEN 1 ELSE 0 END as chg_in_{m_col}" for m_col in non_nk_columns], 16, suffix=',')} FROM {unified_dates_table} d INNER JOIN {self.qualified_table_name} e ON {Table._sql_indent_join([f"d.{nk_col} = e.{nk_col}" for nk_col in self.natural_key], 19, prefix='AND ')} AND d.effective_date BETWEEN e.{self.begin_date_column} AND e.{self.end_date_column} INNER JOIN {deduplicated_source_table} s ON {Table._sql_indent_join([f"e.{nk_col} = s.{nk_col}" for nk_col in self.natural_key], 19, prefix='AND ')} AND d.effective_date BETWEEN s.{kwargs['source_begin_date_column']} AND s.{kwargs['source_end_date_column']} WHERE ( {Table._sql_indent_join([Table._sql_column_not_equals(m_col, 'e', 's') for m_col in non_nk_columns], 20, prefix='OR ')} OR e.{self.delete_flag} != '{self.delete_flag_no}' ) """ self.log.debug('-' * 80) self.log.debug('find_updates_sql') self.log.debug('-' * 80) sql_timer = Timer() self.log.debug(find_updates_sql) results = self.execute(find_updates_sql, connection_name=connection_name) self.log.debug(f"Execution time ={sql_timer.seconds_elapsed_formatted}") # Note: This SQL will only work if generated AFTER the insert_new_sql has been run on the DB # otherwise the primary_key values will conflict. # TODO type 1 srgt insert_updates_sql = f""" INSERT INTO {self.qualified_table_name} ( {primary_key_name}, {self.delete_flag}, {self.begin_date_column}, {self.end_date_column}, {self.last_update_date}, {Table._sql_indent_join(matching_columns, 16, suffix=',')} ) WITH max_srgt AS (SELECT coalesce(max({primary_key_name}),0) as max_srgt FROM {self.qualified_table_name}) SELECT max_srgt.max_srgt + ROW_NUMBER() OVER (order by 1) as {primary_key_name}, '{self.delete_flag_no}' as {self.delete_flag}, u.source_effective_date as {self.begin_date_column}, u.new_end_effective_date as {self.end_date_column}, '{now_str}' as {self.last_update_date}, {Table._sql_indent_join([f's.{col}' for col in matching_columns], 16, suffix=',')} FROM max_srgt CROSS JOIN {deduplicated_source_table} s INNER JOIN {updated_row_nk_table} u ON {Table._sql_indent_join([f"u.{nk_col} = s.{nk_col}" for nk_col in self.natural_key], 18, prefix='AND')} AND s.{kwargs['source_begin_date_column']} = u.source_effective_date /* Do not insert entries for when dates match exactly, those must be updated in place */ WHERE u.source_effective_date != u.target_begin_date """ self.log.debug('-' * 80) self.log.debug('insert_updates_sql') self.log.debug('-' * 80) self.log.debug(insert_updates_sql) sql_timer = Timer() results = self.execute(insert_updates_sql, connection_name=connection_name) self.log.debug(f"Impacted rows = {results.rowcount:,} (meaningful count? {results.supports_sane_rowcount()})") self.log.debug(f"Execution time ={sql_timer.seconds_elapsed_formatted}") joins = [(nk_col, ("u", nk_col)) for nk_col in self.natural_key] joins.append((self.begin_date_column, ('u', 'target_begin_date'))) self._sql_update_from( update_name='update_retire', source_sql=f"{updated_row_nk_table} u", list_of_joins=joins, # Do not insert entries for when dates match exactly, those must be updated in place. extra_where="u.source_effective_date != u.target_begin_date", target_alias_in_source='e', list_of_sets=[ ( self.end_date_column, f"u.new_end_effective_date" ), ] ) self._sql_update_from( update_name='update_in_place', source_sql=f""" {updated_row_nk_table} u INNER JOIN {deduplicated_source_table} s ON {Table._sql_indent_join([f"s.{nk_col} = u.{nk_col}" for nk_col in self.natural_key], 18, prefix='AND ')} AND s.{kwargs['source_begin_date_column']} = u.source_effective_date """, list_of_joins=joins, target_alias_in_source='e', list_of_sets=[ (m_col, f"s.{m_col}") for m_col in non_nk_columns ] ) self.log.debug('=' * 80) self.commit() self.close_connection(connection_name=connection_name) upsert_db_excl_stats.timer.stop()