"""
Created on Nov 4, 2014
@author: Derek Wood
"""
import traceback
import typing
import warnings
from datetime import datetime, timedelta
from enum import Enum
from typing import Union, Callable, List
from bi_etl.components.readonlytable import ReadOnlyTable
from bi_etl.components.row.column_difference import ColumnDifference
from bi_etl.components.row.row import Row
from bi_etl.components.row.row_status import RowStatus
from bi_etl.components.table import Table
from bi_etl.conversions import ensure_datetime
from bi_etl.database.database_metadata import DatabaseMetadata
from bi_etl.exceptions import AfterExisting, NoResultFound, BeforeAllExisting
from bi_etl.lookups.autodisk_range_lookup import AutoDiskRangeLookup
from bi_etl.scheduler.task import ETLTask
from bi_etl.statistics import Statistics
from bi_etl.timer import Timer
__all__ = ['HistoryTable']
# noinspection PyAbstractClass,SqlDialectInspection
[docs]
class HistoryTable(Table):
"""
ETL target component for a table that stores history of updates. Also usable as a source.
Parameters
----------
task : ETLTask
The instance to register in (if not None)
database : bi_etl.scheduler.task.Database
The database to find the table/view in.
table_name : str
The name of the table/view.
exclude_columns : frozenset
Optional. A list of columns to exclude from the table/view.
These columns will not be included in SELECT, INSERT, or UPDATE statements.
Attributes
----------
auto_generate_key: boolean
Should the primary key be automatically generated by the insert/upsert process?
If True, the process will get the current maximum value and then increment it with each insert.
begin_date: str
Name of the begin date field
end_date: str
Name of the end date field
inserts_use_default_begin_date: boolean
Should inserts use the default begin date instead of the class effective date
This allows records to match up in joins with other history tables where the effective
date of the 'primary' might be before the first version effective date.
Default = True
default_begin_date: date
Default begin date to assign for begin_date.
Used for new records if inserts_use_default_begin_date is True.
Also used for
:meth:`get_missing_row`,
:meth:`get_invalid_row`,
:meth:`get_not_applicable_row`,
:meth:`get_various_row`
Default = 1900-1-1
default_end_date: date
Default begin date to assign for end_date for active rows.
Also used for
:meth:`get_missing_row`,
:meth:`get_invalid_row`,
:meth:`get_not_applicable_row`,
:meth:`get_various_row`
Default = 9999-1-1
auto_generate_key: boolean
Should the primary key be automatically generated by the insert/upsert process?
If True, the process will get the current maximum value and then increment it with each insert.
(inherited from Table)
batch_size: int
How many rows should be insert/update/deleted in a single batch.
(inherited from Table)
delete_flag : str
The name of the delete_flag column, if any.
Optional.
(inherited from ReadOnlyTable)
delete_flag_yes : str
The value of delete_flag for deleted rows.
Optional.
(inherited from ReadOnlyTable)
delete_flag_no : str
The value of delete_flag for *not* deleted rows.
Optional.
(inherited from ReadOnlyTable)
default_date_format: str
The date parsing format to use for str -> date conversions.
If more than one date format exists in the source, then explicit conversions will be required.
Default = '%m/%d/%Y'
(inherited from Table)
force_ascii: boolean
Should text values be forced into the ascii character set before passing to the database?
Default = False
(inherited from Table)
last_update_date: str
Name of the column which we should update when table updates are made.
Default = None
(inherited from Table)
log_first_row : boolean
Should we log progress on the first row read. *Only applies if used as a source.*
(inherited from ETLComponent)
max_rows : int
The maximum number of rows to read. *Only applies if Table is used as a source.*
Optional.
(inherited from ETLComponent)
primary_key: list
The name of the primary key column(s). Only impacts trace messages. Default=None.
If not passed in, will use the database value, if any.
(inherited from ETLComponent)
natural_key: list
The list of natural key columns (as Column objects).
The default is the list of non-begin/end date primary key columns.
The default is *not* appropriate for dimension tables with surrogate keys.
progress_frequency : int
How often (in seconds) to output progress messages.
Optional.
(inherited from ETLComponent)
progress_message : str
The progress message to print.
Optional. Default is ``"{logical_name} row # {row_number}"``. Note ``logical_name`` and ``row_number``
substitutions applied via :func:`format`.
(inherited from ETLComponent)
special_values_descriptive_columns: list
A list of columns that get longer descriptive text in
:meth:`get_missing_row`,
:meth:`get_invalid_row`,
:meth:`get_not_applicable_row`,
:meth:`get_various_row`
Optional.
(inherited from ReadOnlyTable)
track_source_rows: boolean
Should the :meth:`upsert` method keep a set container of source row keys that it has processed?
That set would then be used by :meth:`update_not_processed`, :meth:`logically_delete_not_processed`,
and :meth:`delete_not_processed`.
(inherited from Table)
type_1_surrogate: str
The name of the type 1 surrogate key. The value is automatically generated as equal to the type 2 key on
inserts and equal to the existing value on updates.
Optional.
"""
[docs]
class TimePrecision(Enum):
microsecond = 'µs'
millisecond = 'ms'
second = 's'
minute = 'm'
hour = 'h'
day = 'd'
[docs]
def __init__(
self,
task: typing.Optional[ETLTask],
database: DatabaseMetadata,
table_name: str,
table_name_case_sensitive: bool = True,
schema: str = None,
exclude_columns: frozenset = None,
default_effective_date: datetime = None,
**kwargs
):
# Don't pass kwargs up to super. They should be set at the end of this method
super().__init__(
task=task,
database=database,
table_name=table_name,
table_name_case_sensitive=table_name_case_sensitive,
schema=schema,
exclude_columns=exclude_columns,
)
self.__default_effective_date = None
if default_effective_date is not None:
self.default_effective_date = default_effective_date
self._end_date_timedelta = None # Will be set in the assignment to begin_end_precision below
self.begin_end_precision = HistoryTable.TimePrecision.second
self.__begin_date_column = None
self.__end_date_column = None
self.__default_begin_date = None
self.default_begin_date = datetime(year=1900, month=1, day=1, hour=0, minute=0, second=0)
self.__default_end_date = None
self.default_end_date = datetime(year=9999, month=1, day=1, hour=0, minute=0, second=0)
self.inserts_use_default_begin_date = True
self.default_lookup_class = AutoDiskRangeLookup
# define_lookup will add the begin & end dates to the lookup args
self.type_1_surrogate = None
# Should be the last call of every init
self.set_kwattrs(**kwargs)
def __repr__(self):
return f"HistoryTable({self.table.name})"
[docs]
@staticmethod
def kwattrs_order() -> typing.Dict[str, int]:
"""
Certain values need to be set before others in order to work correctly.
This method should return a dict mapping those key values = arg name to
a value less than the default of 9999, which will be used for any arg
not explicitly listed here.
"""
return {
'begin_date_column': 1,
'end_date_column': 1,
}
@property
def default_effective_date(self):
if self.__default_effective_date is None:
self.__default_effective_date = self.get_current_time()
return self.__default_effective_date
@default_effective_date.setter
def default_effective_date(self, value):
self.__default_effective_date = ensure_datetime(value)
@property
def default_begin_date(self):
return self.__default_begin_date
@default_begin_date.setter
def default_begin_date(self, value):
self.__default_begin_date = ensure_datetime(value)
@property
def default_end_date(self):
return self.__default_end_date
@default_end_date.setter
def default_end_date(self, value):
self.__default_end_date = ensure_datetime(value)
[docs]
def is_date_key_column(self, column):
if self.begin_date_column is None or self.end_date_column is None:
raise AssertionError('begin_date or end_date not defined')
return (self.get_column_name(column) == self.begin_date_column
or self.get_column_name(column) == self.end_date_column
)
[docs]
def build_nk(self):
if len(list(self.primary_key)) == 1:
#
# TODO: For scd type2 dimension tables the primary_key will be the surrogate key.
# So this won't generate the actual natural key.
# Need to fix that. We could scan for other unique indexes / constraints.
#
self.log.error("Cannot determine natural key. Please set it explicitly using the natural_key attribute")
return None
else:
#
# If we have multiple keys, we'll assume this is a history table where
# the key is the natural key plus an effective date
#
natural_key = list()
# noinspection PyTypeChecker
for col in self.primary_key:
if not self.is_date_key_column(col):
natural_key.append(col)
return natural_key
@property
def begin_end_precision(self) -> TimePrecision:
return self.__begin_end_precision
@begin_end_precision.setter
def begin_end_precision(self, value: TimePrecision):
if isinstance(value, HistoryTable.TimePrecision):
self.__begin_end_precision = value
if value == HistoryTable.TimePrecision.second:
self._end_date_timedelta = timedelta(seconds=-1)
elif value == HistoryTable.TimePrecision.millisecond:
self._end_date_timedelta = timedelta(milliseconds=-1)
elif value == HistoryTable.TimePrecision.microsecond:
self._end_date_timedelta = timedelta(microseconds=-1)
elif value == HistoryTable.TimePrecision.minute:
self._end_date_timedelta = timedelta(minutes=-1)
elif value == HistoryTable.TimePrecision.hour:
self._end_date_timedelta = timedelta(hours=-1)
elif value == HistoryTable.TimePrecision.day:
self._end_date_timedelta = timedelta(days=-1)
else:
raise ValueError(f"{value} is a TimePrecision instance that is not handled")
else:
raise ValueError(f"{value} is not a TimePrecision instance")
@property
def begin_date_column(self) -> str:
return self.__begin_date_column
@begin_date_column.setter
def begin_date_column(self, value: str):
try:
# Remove the old name from custom_special_values
if self.__begin_date_column in self.custom_special_values:
del self.custom_special_values[self.__begin_date_column]
except AttributeError:
# __begin_date doesn't exist yet
pass
self.__begin_date_column = value
if self.__begin_date_column is not None:
self.custom_special_values[self.__begin_date_column] = self.default_begin_date
@property
def end_date_column(self) -> str:
return self.__end_date_column
@end_date_column.setter
def end_date_column(self, value: str):
try:
# Remove the old name from custom_special_values
if self.__end_date_column in self.custom_special_values:
del self.custom_special_values[self.__end_date_column]
except AttributeError:
# __end_date doesn't exist yet
pass
self.__end_date_column = value
if self.__end_date_column is not None:
self.custom_special_values[self.end_date_column] = self.default_end_date
[docs]
def define_lookup(
self,
lookup_name,
lookup_keys,
lookup_class=None,
lookup_class_kwargs=None,
):
"""
Define a new lookup.
Parameters
----------
lookup_name: str
Name for the lookup. Used to refer to it later.
lookup_keys: list
list of lookup key columns
lookup_class: Class
Optional python class to use for the lookup. Defaults to value of default_lookup_class attribute.
lookup_class_kwargs: dict
Optional dict of additional parameters to pass to lookup constructor. Defaults to empty dict.
begin_date and end_date are added automatically.
"""
if lookup_class_kwargs is None:
lookup_class_kwargs = self.default_lookup_class_kwargs
# Add default begin_date and end_date arguments to the lookup class
if 'begin_date' not in lookup_class_kwargs:
lookup_class_kwargs['begin_date'] = self.begin_date_column
if 'end_date' not in lookup_class_kwargs:
lookup_class_kwargs['end_date'] = self.end_date_column
# Remove both the begin_date and end_date from the lookup columns
# (they are used in the range part)
non_date_lookup_columns = list()
for c in lookup_keys:
if not self.is_date_key_column(c):
non_date_lookup_columns.append(c)
assert len(non_date_lookup_columns) > 0, (
f"define_lookup {lookup_name} lookup_columns had only begin / end dates or none at all. "
f"lookup_columns= {lookup_keys}"
)
super().define_lookup(
lookup_name=lookup_name,
lookup_keys=non_date_lookup_columns,
lookup_class=lookup_class,
lookup_class_kwargs=lookup_class_kwargs,
)
[docs]
def fill_cache(
self,
progress_frequency: float = 10,
progress_message="{component} fill_cache current row # {row_number:,}",
criteria_list: list = None,
criteria_dict: dict = None,
column_list: list = None,
exclude_cols: frozenset = None,
order_by: list = None,
assume_lookup_complete: bool = None,
allow_duplicates_in_src: bool = False,
row_limit: int = None,
parent_stats: Statistics = None,
):
"""
Fill all lookup caches from the table.
Parameters
----------
column_list: list
Optional. Specific columns to include when filling the cache.
exclude_cols: frozenset
Optional. Columns to exclude from the cached rows.
progress_frequency : int
How often (in seconds) to output progress messages. Default 10. None for no progress messages.
Optional.
progress_message : str
The progress message to print.
Default is ``"{table} fill_cache current row # {row_number:,}"``. Note ``logical_name`` and ``row_number``
substitutions applied via :func:`format`.
Optional.
criteria_list : string or list of strings
Each string value will be passed to :meth:`sqlalchemy.sql.expression.Select.where`.
https://goo.gl/JlY9us
criteria_dict : dict
Dict keys should be columns, values are set using = or in
assume_lookup_complete: boolean
Should later lookup calls assume the cache is complete
(and thus raise an Exception if a key combination is not found)?
Default to False if filtering criteria was used, otherwise defaults to True.
allow_duplicates_in_src:
Should we quietly let the source provide multiple rows with the same key values? Default = False
row_limit: int
limit on number of rows to cache.
parent_stats: bi_etl.statistics.Statistics
Optional Statistics object to nest this steps statistics in.
Default is to place statistics in the ETLTask level statistics.
order_by: list
Columns to order by when pulling data. Sometimes required to build the cache corretly.
"""
if order_by == '-PK-':
order_by = list(self.get_pk_lookup().lookup_keys)
# Make sure begin date is not already in the order_by
if self.begin_date_column in order_by:
order_by.remove(self.begin_date_column)
# Add begin date as the last part of the order by
order_by.append(self.begin_date_column)
super().fill_cache(
progress_frequency=progress_frequency,
progress_message=progress_message,
criteria_list=criteria_list,
criteria_dict=criteria_dict,
column_list=column_list,
exclude_cols=exclude_cols,
order_by=order_by,
assume_lookup_complete=assume_lookup_complete,
allow_duplicates_in_src=allow_duplicates_in_src,
row_limit=row_limit,
parent_stats=parent_stats,
)
[docs]
def get_by_lookup_and_effective_date(
self,
lookup_name,
source_row,
effective_date,
stats_id='get_by_lookup_and_effective_date',
parent_stats=None,
fallback_to_db: bool = False,
):
"""
Get by an alternate key. Returns a row.
Parameters
----------
lookup_name: str
Name passed into :meth:`define_lookup`
source_row: :class:`~bi_etl.components.row.row_case_insensitive.Row`
:class:`~bi_etl.components.row.row_case_insensitive.Row` to get lookup keys from
effective_date: date
Effective date to use for lookup
stats_id: str
Statistics name to use
parent_stats: bi_etl.statistics.Statistics
Optional Statistics object to nest this steps statistics in.
Default is to place statistics in the ETLTask level statistics.
fallback_to_db:
Should we check the DB if a record is not found in the cache
Raises
------
NoResultFound:
If key doesn't exist.
BeforeAllExisting:
If the effective date provided is before all existing records.
"""
if lookup_name is None:
lookup = self.get_default_lookup(source_row.iteration_header)
else:
lookup = self.get_lookup(lookup_name)
stats = self.get_stats_entry(stats_id, parent_stats=parent_stats)
stats.timer.start()
fallback_to_db = fallback_to_db or self.always_fallback_to_db or not self.cache_clean
return lookup.find(
row=source_row,
fallback_to_db=fallback_to_db,
maintain_cache=self.maintain_cache_during_load,
effective_date=effective_date,
stats=stats,
)
[docs]
def get_by_lookup(
self,
lookup_name: str,
source_row: Row,
stats_id: str = 'get_by_lookup',
parent_stats: typing.Optional[Statistics] = None,
fallback_to_db: bool = False,
) -> Row:
"""
Get by an alternate key. Returns a row.
Parameters
----------
lookup_name: str
Name passed into :meth:`define_lookup`
source_row: :class:`~bi_etl.components.row.row_case_insensitive.Row`
Row to get lookup keys from (including effective date)
stats_id: str
Statistics name to use
parent_stats: bi_etl.statistics.Statistics
Optional Statistics object to nest this steps statistics in.
Default is to place statistics in the ETLTask level statistics.
fallback_to_db:
Should we check the DB if a record is not found in the cache
Raises
------
NoResultFound:
If key doesn't exist.
BeforeAllExisting:
If the effective date provided is before all existing records.
"""
if self.begin_date_column in source_row:
effective_date = source_row[self.begin_date_column]
else:
effective_date = self.default_effective_date
return self.get_by_lookup_and_effective_date(
lookup_name=lookup_name,
source_row=source_row,
effective_date=effective_date,
stats_id=stats_id,
parent_stats=parent_stats,
fallback_to_db=fallback_to_db,
)
[docs]
def sanity_check_source_mapping(
self,
source_definition,
source_name=None,
source_excludes: typing.Optional[frozenset] = None,
target_excludes: typing.Optional[frozenset] = None,
ignore_source_not_in_target=None,
ignore_target_not_in_source=None,
raise_on_source_not_in_target=None,
raise_on_target_not_in_source=None,
):
if target_excludes is None:
target_excludes = set()
else:
target_excludes = set(target_excludes)
# include update begin or end dates like normal columns
if self.begin_date_column not in target_excludes:
target_excludes.add(self.begin_date_column)
if self.end_date_column not in target_excludes:
target_excludes.add(self.end_date_column)
if self.type_1_surrogate and self.type_1_surrogate not in target_excludes:
target_excludes.add(self.type_1_surrogate)
target_excludes = frozenset(target_excludes)
super().sanity_check_source_mapping(
source_definition=source_definition,
source_name=None,
source_excludes=source_excludes,
target_excludes=target_excludes,
ignore_source_not_in_target=ignore_source_not_in_target,
ignore_target_not_in_source=ignore_target_not_in_source,
raise_on_source_not_in_target=raise_on_source_not_in_target,
raise_on_target_not_in_source=raise_on_target_not_in_source,
)
[docs]
def insert_row(
self,
source_row: Row,
additional_insert_values: dict = None,
source_excludes: typing.Optional[frozenset] = None,
target_excludes: typing.Optional[frozenset] = None,
stat_name: str = 'insert',
parent_stats: Statistics = None,
**kwargs
) -> Row:
"""
Inserts a row into the database (batching rows as batch_size)
Parameters
----------
source_row
The row with values to insert
additional_insert_values
source_excludes:
set of source columns to exclude
target_excludes
set of target columns to exclude
stat_name
parent_stats
**kwargs
Returns
-------
new_row
"""
stats = self.get_stats_entry(stat_name, parent_stats=parent_stats)
stats.timer.start()
if self.begin_date_column is None:
raise ValueError("begin_date column name not set")
if self.end_date_column is None:
raise ValueError("end_date column name not set")
if source_row.get(self.begin_date_column, None) is None:
source_row.set_keeping_parent(self.begin_date_column, self.default_begin_date)
if source_row.get(self.end_date_column, None) is None:
source_row.set_keeping_parent(self.end_date_column, self.default_end_date)
new_row = self.build_row(
source_row=source_row,
source_excludes=source_excludes,
target_excludes=target_excludes,
parent_stats=stats,
)
if new_row[self.begin_date_column] > new_row[self.end_date_column]:
raise ValueError(
f"Begin date {new_row[self.begin_date_column]} "
f"greater than end date {new_row[self.end_date_column]} for row {new_row.str_formatted()}"
)
# Generate a new type 1 key value
if (self.type_1_surrogate is not None
and self.auto_generate_key
and new_row.get(self.type_1_surrogate, None) is None):
if new_row[self.begin_date_column] == self.default_begin_date:
self.autogenerate_sequence(new_row, seq_column=self.type_1_surrogate, force_override=False)
else:
msg = 'Insert cannot maintain type1 surrogate keys for anything except new values. Please use upsert.'
warnings.warn(msg)
new_row_from_super = super().insert_row(
new_row,
additional_insert_values=additional_insert_values,
source_excludes=source_excludes,
target_excludes=target_excludes,
parent_stats=parent_stats,
)
return new_row_from_super
[docs]
def apply_updates(
self,
row,
changes_list: typing.MutableSequence[ColumnDifference] = None,
additional_update_values: Union[dict, Row] = None,
add_to_cache: bool = True,
allow_insert=True,
stat_name: str = 'update',
parent_stats: Statistics = None,
**kwargs
):
"""
This method should only be called with a row that has already been transformed into the correct datatypes
and column names.
The update values can be in any of the following parameters
- row (also used for PK)
- changes_list
- additional_update_values
Parameters
----------
row:
The row to update with (needs to have at least PK values)
changes_list:
A list of ColumnDifference objects to apply to the row
additional_update_values:
A Row or dict of additional values to apply to the row
add_to_cache:
Should this method update the cache (not if caller will)
allow_insert: boolean
Allow this method to insert a new row into the cache
stat_name:
Name of this step for the ETLTask statistics. Default = 'update'
parent_stats:
kwargs:
effective_date: datetime
The effective date to use for the update
"""
end_date_coerce = self.get_coerce_method(self.end_date_column)
begin_date_coerce = self.get_coerce_method(self.begin_date_column)
effective_date = begin_date_coerce(kwargs.get('effective_date', self.default_effective_date))
row.set_keeping_parent(self.begin_date_column, end_date_coerce(row[self.begin_date_column]))
row.set_keeping_parent(self.end_date_column, end_date_coerce(row[self.end_date_column]))
if row[self.begin_date_column] > row[self.end_date_column]:
raise ValueError(
f"Begin date {row[self.begin_date_column]} "
f"greater than end date {row[self.end_date_column]} for row {row.str_formatted()}"
)
# Check for special case that the target already has that exact begin date
if row[self.begin_date_column] == effective_date:
# In which case, we simply update it in place
if self.trace_data:
self.log.debug(
f"Begin date exact match. Type 1 update row with begin effective {row[self.begin_date_column]} "
f"with new end date of {row[self.end_date_column]}"
)
# Apply updates to the row (use parent class routine to finish the work)
super(HistoryTable, self).apply_updates(
row,
changes_list=changes_list,
additional_update_values=additional_update_values,
stat_name=stat_name,
parent_stats=parent_stats,
add_to_cache=add_to_cache,
allow_insert=allow_insert,
)
else:
# Create a clone of the existing row
new_row = row.clone()
new_row.status = RowStatus.insert
# Force the new row to get a new surrogate key (if enabled)
self.autogenerate_key(new_row, force_override=True)
# Apply updates to the new row (use parent class routine to finish the work)
# It won't send update since new_row.status = RowStatus.insert
# The row actually gets inserted at the end of this method
super().apply_updates(
new_row,
changes_list=changes_list,
additional_update_values=additional_update_values,
add_to_cache=False,
parent_stats=parent_stats,
allow_insert=allow_insert,
)
new_row.set_keeping_parent(self.begin_date_column, effective_date)
# check that we aren't inserting after all existing rows
if new_row[self.end_date_column] < effective_date:
self.warnings_issued += 1
if self.warnings_issued < self.warnings_limit:
self.log.warning(
f"The table had an existing key sequence that did not cover all dates.\n"
f" End = {new_row[self.end_date_column]} is less than eff date {effective_date}\n"
f" Natural Keys = {self.get_nk_lookup().get_list_of_lookup_column_values(row)}."
)
for row in self.get_nk_lookup().get_versions_collection(row).values():
self.log.info(f" begin= {row[self.begin_date_column]}\tend= {row[self.end_date_column]}")
new_row.set_keeping_parent(self.end_date_column, self.default_end_date)
# Retire the existing row (after clone so that the existing end_date is passed on to the new row)
# Note: The parent class apply_updates will check to only send update statement if the row is in the
# database, otherwise it will update the record that's pending insert
super().apply_updates(
row,
stat_name=stat_name,
parent_stats=parent_stats,
changes_list=[ColumnDifference(
self.end_date_column,
old_value=row[self.end_date_column],
new_value=end_date_coerce(effective_date + self._end_date_timedelta)
)],
add_to_cache=add_to_cache,
allow_insert=allow_insert,
)
if self.trace_data:
self.log.debug(
f"retiring row with begin effective {row[self.begin_date_column]} "
f"with new end date of {row[self.end_date_column]}"
)
# Add the new row
self.insert_row(new_row, stat_name=stat_name, parent_stats=parent_stats)
def _target_excludes_for_updates(self, target_excludes: frozenset):
if target_excludes is None:
target_excludes = set()
# Don't map a source end date through to the target
if self.end_date_column not in target_excludes:
new_set = set(target_excludes)
new_set.add(self.end_date_column)
target_excludes = frozenset(new_set)
return target_excludes
[docs]
def build_row(
self,
source_row: Row,
source_excludes: typing.Optional[frozenset] = None,
target_excludes: typing.Optional[frozenset] = None,
stat_name: str = 'build rows',
parent_stats: Statistics = None,
) -> Row:
source_mapped_as_target_row = super().build_row(
source_row=source_row,
source_excludes=source_excludes,
target_excludes=target_excludes,
stat_name=stat_name,
parent_stats=parent_stats
)
if self.delete_flag is not None and (
self.delete_flag not in source_mapped_as_target_row
or source_mapped_as_target_row[self.delete_flag] is None):
source_mapped_as_target_row.set_keeping_parent(self.delete_flag, self.delete_flag_no)
return source_mapped_as_target_row
[docs]
def upsert(
self,
source_row: Union[Row, List[Row]],
lookup_name: str = None,
skip_update_check_on: typing.Optional[frozenset] = None,
do_not_update: list = None,
additional_update_values: dict = None,
additional_insert_values: dict = None,
update_callback: Callable[[typing.MutableSequence, Row], None] = None,
insert_callback: Callable[[Row], None] = None,
source_excludes: typing.Optional[frozenset] = None,
target_excludes: typing.Optional[frozenset] = None,
stat_name: str = 'upsert',
parent_stats: Statistics = None,
**kwargs
):
"""
Update (if changed) or Insert a row in the table.
Returns the row found/inserted, with the auto-generated key (if that feature is enabled)
Parameters
----------
source_row: :class:`~bi_etl.components.row.row_case_insensitive.Row`
Row to upsert
lookup_name: str
The name of the lookup (see :meth:`define_lookup`) to use when searching for an existing row.
skip_update_check_on: list
List of column names to not compare old vs new for updates.
do_not_update: list
List of columns to never update.
additional_update_values: dict
Additional updates to apply when updating
additional_insert_values: dict
Additional values to set on each row when inserting.
update_callback: func
Function to pass updated rows to. Function should not modify row.
insert_callback: func
Function to pass inserted rows to. Function should not modify row.
source_excludes: frozenset
list of source columns to exclude when mapping to this Table.
target_excludes: frozenset
list of Table columns to exclude when mapping from the source row
stat_name: string
Name of this step for the ETLTask statistics. Default = 'upsert'
parent_stats: bi_etl.statistics.Statistics
Optional Statistics object to nest this steps statistics in.
Default is to place statistics in the ETLTask level statistics.
kwargs:
effective_date: datetime
The effective date to use for the update
"""
stats = self.get_stats_entry(stat_name, parent_stats=parent_stats)
stats.ensure_exists('upsert source row count')
stats.ensure_exists('apply_updates called')
stats.ensure_exists('insert new called')
stats.timer.start()
stats['upsert source row count'] += 1
self.begin()
self._set_upsert_mode()
effective_date = kwargs.get('effective_date')
begin_date_coerce = self.get_coerce_method(self.begin_date_column)
if effective_date is None:
if self.begin_date_column in source_row and source_row[self.begin_date_column] is not None:
effective_date = begin_date_coerce(source_row[self.begin_date_column])
else:
effective_date = begin_date_coerce(self.default_effective_date)
else:
effective_date = begin_date_coerce(effective_date)
skip_update_check_on = self._target_excludes_for_updates(skip_update_check_on)
source_mapped_as_target_row = self.build_row(
source_row=source_row,
source_excludes=source_excludes,
target_excludes=target_excludes,
parent_stats=stats,
)
if self.delete_flag is not None and (
self.delete_flag not in source_mapped_as_target_row
or source_mapped_as_target_row[self.delete_flag] is None):
source_mapped_as_target_row.set_keeping_parent(self.delete_flag, self.delete_flag_no)
if self.track_source_rows:
# Keep track of source records so we can check if target rows don't exist in source
self.source_keys_processed.add(self.get_natural_key_tuple(source_mapped_as_target_row))
# Don't check in begin or end dates for differences
if skip_update_check_on is None:
skip_update_check_on = frozenset()
if do_not_update is None:
do_not_update = frozenset()
if self.begin_date_column not in skip_update_check_on:
skip_update_check_on = skip_update_check_on | {self.begin_date_column}
if self.end_date_column not in skip_update_check_on:
skip_update_check_on = skip_update_check_on | {self.end_date_column}
if self.last_update_date not in skip_update_check_on:
skip_update_check_on = skip_update_check_on | {self.last_update_date}
type_1_surrogate = self.type_1_surrogate
lookup_keys = None
# Check for existing row
try:
# We'll default to using the primary key or NK as provided in the row
if lookup_name is None:
lookup_object = self.get_default_lookup(source_row.iteration_header)
lookup_name = lookup_object.lookup_name
else:
lookup_object = self.get_lookup(lookup_name)
if not lookup_object.cache_enabled and self.maintain_cache_during_load and self.batch_size > 1:
raise AssertionError("Caching needs to be turned on if batch mode is on!")
if self.trace_data:
# lookup_keys is used only for data trace
lookup_keys = lookup_object.get_list_of_lookup_column_values(source_mapped_as_target_row)
lookup_keys.append(effective_date)
existing_row = self.get_by_lookup_and_effective_date(
lookup_name,
source_row=source_mapped_as_target_row,
effective_date=effective_date,
parent_stats=stats
)
changes_list = existing_row.compare_to(
source_mapped_as_target_row,
exclude=do_not_update | skip_update_check_on,
coerce_types=False,
)
delayed_changes = existing_row.compare_to(source_mapped_as_target_row, compare_only=skip_update_check_on)
if self.track_update_columns:
col_stats = self.get_stats_entry('updated columns', parent_stats=stats)
for chg in changes_list:
col_stats.add_to_stat(key=chg.column_name, increment=1)
if self.trace_data:
self.log.debug(
f"{lookup_keys} {chg.column_name} changed from {chg.old_value} to {chg.new_value}"
)
if len(changes_list) > 0:
for chg in delayed_changes:
col_stats.add_to_stat(key=chg.column_name, increment=1)
if self.trace_data:
self.log.debug(
f"{lookup_keys} NO UPDATE TRIGGERED BY {chg.column_name} "
f"changed from {chg.old_value} to {chg.new_value}"
)
if len(changes_list) > 0:
stats['apply_updates called'] += 1
self.apply_updates(
existing_row,
effective_date=effective_date,
changes_list=list(changes_list) + list(delayed_changes),
additional_update_values=additional_update_values,
parent_stats=stats
)
# For testing commit each update
# self.commit(parent_stats= stats);
if update_callback:
update_callback(changes_list, existing_row)
target_row = existing_row
except BeforeAllExisting as e:
# Logic to insert a new row before the existing row in e.first_existing_row
new_row = source_mapped_as_target_row
if additional_insert_values:
for colName, value in additional_insert_values.items():
new_row[colName] = value
if self.inserts_use_default_begin_date:
new_row.set_keeping_parent(self.begin_date_column, self.default_begin_date)
else:
new_row.set_keeping_parent(self.begin_date_column, effective_date)
# Set end date to 1 second before existing records begin date
first_begin = e.first_existing_row[self.begin_date_column]
first_minus_1 = first_begin + self._end_date_timedelta
new_row.set_keeping_parent(self.end_date_column, first_minus_1)
# Get type 1 key value from existing row
if type_1_surrogate is not None:
new_row.set_keeping_parent(type_1_surrogate, e.first_existing_row[type_1_surrogate])
# Generate a new type 2 surrogate key value
self.autogenerate_key(new_row, force_override=True)
if self.last_update_date is not None:
self.set_last_update_date(new_row)
if self.trace_data:
self.log.debug(f"Inserting record effective {effective_date} before existing history")
stats['insert before existing called'] += 1
self.insert_row(new_row, parent_stats=stats)
if insert_callback:
insert_callback(new_row)
target_row = new_row
except NoResultFound:
new_row = source_mapped_as_target_row
if additional_insert_values:
for colName, value in additional_insert_values.items():
new_row[colName] = value
if self.inserts_use_default_begin_date:
new_row.set_keeping_parent(self.begin_date_column, self.default_begin_date)
else:
new_row.set_keeping_parent(self.begin_date_column, effective_date)
new_row.set_keeping_parent(self.end_date_column, self.default_end_date)
# Generate a new type 1 key value
if type_1_surrogate is not None and self.auto_generate_key:
self.autogenerate_sequence(new_row, seq_column=type_1_surrogate, force_override=True)
# Generate a new type 2 surrogate key value
self.autogenerate_key(new_row, force_override=True)
if self.last_update_date is not None:
self.set_last_update_date(new_row)
if self.trace_data:
self.log.debug(f"{lookup_keys} not found, inserting as new")
stats['insert new called'] += 1
self.insert_row(new_row, parent_stats=stats)
if insert_callback:
insert_callback(new_row)
# debugging commit
# self.commit(parent_stats= stats);
target_row = new_row
stats.timer.stop()
return target_row
[docs]
def delete(self, **kwargs):
"""
Not implemented for history table. Instead use `physically_delete_version`.
"""
raise NotImplementedError("Use physically_delete_version instead")
[docs]
def physically_delete_version(
self,
row_to_be_deleted: Row,
remove_from_cache=True,
prior_row: Row = ...,
stat_name='delete_version',
parent_stats=None
):
"""
**Physically** delete a given version row. Corrects the preceding end date value.
Parameters
----------
row_to_be_deleted: :class:`~bi_etl.components.row.row_case_insensitive.Row`
Expected to be an entire existing row.
remove_from_cache: boolean
Optional. Remove the row from the cache?
Default = True
prior_row: bi_etl.components.row.row_case_insensitive.Row
Optional. The prior row, if already available.
If None, it will be obtained via :meth:`get_by_lookup_and_effective_date`
stat_name: str
Name of this step for the ETLTask statistics. Default = 'delete_version'
parent_stats: bi_etl.statistics.Statistics
Optional. Statistics object to nest this steps statistics in.
Default is to place statistics in the ETLTask level statistics.
"""
stats = self.get_stats_entry(stat_name, parent_stats=parent_stats)
stats.timer.start()
stats['rows sent for delete'] += 1
self.begin()
if row_to_be_deleted.status == RowStatus.unknown:
# Tag the callers row as deleted for that functions sake
row_to_be_deleted.status = RowStatus.deleted
# Get from the cache
row_to_be_deleted = self.get_by_key(row_to_be_deleted)
prior_effective_date = row_to_be_deleted[self.begin_date_column] + self._end_date_timedelta
if not self.in_bulk_mode:
# if the row is not pending insert, issue delete to database
if row_to_be_deleted.status != RowStatus.insert:
# Delete the row in the database
super().delete(
key_names=self.primary_key,
key_values=row_to_be_deleted,
maintain_cache=False,
parent_stats=stats,
)
row_to_be_deleted.status = RowStatus.deleted
# remove the row from the cache, if that is selected. Otherwise, the caller will need to do that.
if remove_from_cache:
self.uncache_row(row_to_be_deleted)
# If we were not given a prior row
first_existing_row = None
if prior_row is ...:
try:
prior_row = self.get_by_lookup_and_effective_date(
self._get_pk_lookup_name(),
row_to_be_deleted,
prior_effective_date,
parent_stats=stats,
)
except BeforeAllExisting as e:
prior_row = None
first_existing_row = e.first_existing_row
except AfterExisting as e:
# No following. Delete existing. Update prior to be high end date
prior_row = e.prior_row
except NoResultFound:
# No other rows. Nothing to do. Ellipsis will skip both blocks below.
prior_row = ...
if prior_row is None:
# No prior row.
# If inserts_use_default_begin_date then we need to update the following row
if self.inserts_use_default_begin_date:
if first_existing_row is not None:
first_existing_row.set_keeping_parent(
self.begin_date_column,
row_to_be_deleted[self.begin_date_column]
)
super().apply_updates(
row=first_existing_row,
parent_stats=stats,
allow_insert=False,
)
# If we have a real prior row
if prior_row is not None and prior_row is not ...:
# If we didn't end up with prior = this row
if prior_row[self.begin_date_column] != row_to_be_deleted[self.begin_date_column]:
prior_row.set_keeping_parent(self.end_date_column, row_to_be_deleted[self.end_date_column])
# Apply updates to the row (use parent class routine to finish the work)
super().apply_updates(
row=prior_row,
parent_stats=stats,
allow_insert=False,
)
stats.timer.stop()
[docs]
def delete_not_in_set(
self,
set_of_key_tuples: set,
lookup_name: str = None,
criteria_list: list = None,
criteria_dict: dict = None,
use_cache_as_source: bool = True,
stat_name: str = 'delete_not_in_set',
progress_frequency: int = None,
parent_stats: Statistics = None,
**kwargs
):
"""
Overridden to call logical delete.
Deletes rows matching criteria that are not in the list_of_key_tuples pass in.
Parameters
----------
set_of_key_tuples
List of tuples comprising the primary key values.
This list represents the rows that should *not* be deleted.
lookup_name: str
The name of the lookup to use to find key tuples.
criteria_list : string or list of strings
Each string value will be passed to :meth:`sqlalchemy.sql.expression.Select.where`.
https://goo.gl/JlY9us
criteria_dict : dict
Dict keys should be columns, values are set using = or in
use_cache_as_source: bool
Attempt to read existing rows from the cache?
stat_name: string
Name of this step for the ETLTask statistics. Default = 'delete_not_in_set'
progress_frequency: int
How often (in seconds) to output progress messages. Default 10. None for no progress messages.
parent_stats: bi_etl.statistics.Statistics
Optional Statistics object to nest this steps statistics in.
Default is to place statistics in the ETLTask level statistics.
kwargs:
effective_date: datetime
The effective date to use for the update
"""
self.logically_delete_not_in_set(
set_of_key_tuples=set_of_key_tuples,
lookup_name=lookup_name,
criteria_list=criteria_list,
criteria_dict=criteria_dict,
use_cache_as_source=use_cache_as_source,
stat_name=stat_name,
parent_stats=parent_stats,
**kwargs
)
[docs]
def delete_not_processed(
self,
criteria_list: list = None,
criteria_dict: dict = None,
use_cache_as_source: bool = True,
stat_name: str = 'delete_not_processed',
parent_stats: Statistics = None,
**kwargs
):
"""
Overridden to call logical delete.
Logically deletes rows matching criteria that are not in the Table memory of rows passed to :meth:`upsert`.
Parameters
----------
criteria_list : string or list of strings
Each string value will be passed to :meth:`sqlalchemy.sql.expression.Select.where`.
https://goo.gl/JlY9us
criteria_dict : dict
Dict keys should be columns, values are set using = or in
use_cache_as_source: bool
Attempt to read existing rows from the cache?
stat_name: string
Name of this step for the ETLTask statistics.
parent_stats: bi_etl.statistics.Statistics
Optional Statistics object to nest this steps statistics in.
Default is to place statistics in the ETLTask level statistics.
"""
# Perform logical delete despite call to "delete"
self.logically_delete_not_processed(
criteria_list=criteria_list,
criteria_dict=criteria_dict,
parent_stats=parent_stats,
**kwargs
)
[docs]
def update_not_in_set(
self,
updates_to_make: Union[dict, Row],
set_of_key_tuples: set,
lookup_name: str = None,
criteria_list: list = None,
criteria_dict: dict = None,
use_cache_as_source: bool = True,
progress_frequency: int = None,
stat_name: str = 'update_not_in_set',
parent_stats: Statistics = None,
**kwargs
):
"""
Applies update to rows matching criteria that are not in the list_of_key_tuples pass in.
Parameters
----------
updates_to_make: :class:`Row`
Row or dict of updates to make
set_of_key_tuples: set
Set of tuples comprising the primary key values. This list represents the rows that should *not* be updated.
lookup_name: str
The name of the lookup (see :meth:`define_lookup`) to use when searching for an existing row.
criteria_list : string or list of strings
Each string value will be passed to :meth:`sqlalchemy.sql.expression.Select.where`.
https://goo.gl/JlY9us
criteria_dict : dict
Dict keys should be columns, values are set using = or in
use_cache_as_source: bool
Attempt to read existing rows from the cache?
stat_name: string
Name of this step for the ETLTask statistics. Default = 'delete_not_in_set'
progress_frequency: int
How often (in seconds) to output progress messages. Default = 10. None for no progress messages.
parent_stats: bi_etl.statistics.Statistics
Optional Statistics object to nest this steps statistics in.
Default is to place statistics in the ETLTask level statistics.
kwargs:
effective_date: datetime
The effective date to use for the update
"""
date_coerce = self.get_coerce_method(self.begin_date_column)
effective_date = date_coerce(kwargs.get('effective_date', self.default_effective_date))
if criteria_list is None:
criteria_list = []
criteria_list.extend(
[
self.get_column(self.delete_flag) == self.delete_flag_no,
self.get_column(self.begin_date_column) <= effective_date,
self.get_column(self.end_date_column) >= effective_date,
]
)
stats = self.get_unique_stats_entry(stat_name, parent_stats=parent_stats)
stats['rows read'] = 0
stats['updates count'] = 0
stats.timer.start()
self.begin()
if progress_frequency is None:
progress_frequency = self.progress_frequency
progress_timer = Timer()
# Turn off read progress reports
saved_progress_frequency = self.progress_frequency
self.progress_frequency = None
if lookup_name is None:
lookup_name = self.get_nk_lookup_name()
lookup = self.get_nk_lookup()
else:
lookup = self.get_lookup(lookup_name)
# Note, here we select only lookup columns from self
for row in self.where(
column_list=lookup.lookup_keys,
criteria_list=criteria_list,
criteria_dict=criteria_dict,
# We can't use cache since we have a list of in-equality filters
use_cache_as_source=False,
parent_stats=stats
):
if row.status == RowStatus.unknown:
pass
stats['rows read'] += 1
existing_key = lookup.get_hashable_combined_key(row)
if 0 < progress_frequency <= progress_timer.seconds_elapsed:
progress_timer.reset()
self.log.info(
f"update_not_in_set current current row#={stats['rows read']} "
f"row key={existing_key} updates done so far = {stats['updates count']}"
)
if existing_key not in set_of_key_tuples:
stats['updates count'] += 1
# First we need the entire existing row
target_row = self.get_by_lookup_and_effective_date(
lookup_name,
row,
effective_date=effective_date
)
# Then we can apply the updates to it
self.apply_updates(
target_row,
additional_update_values=updates_to_make,
parent_stats=stats,
allow_insert=False,
effective_date=effective_date,
)
stats.timer.stop()
# Restore saved progress_frequency
self.progress_frequency = saved_progress_frequency
[docs]
def cleanup_versions(
self,
remove_spurious_deletes: bool = False,
remove_redundant_versions: bool = None, # Default depends on auto_generate_key value
lookup_name: str = None,
exclude_from_compare: frozenset = None,
criteria_list: list = None,
criteria_dict: dict = None,
use_cache_as_source: bool = True,
repeat_until_clean: bool = True,
max_begin_date_warnings: int = 100,
max_end_date_warnings: int = 100,
max_passes: int = 10,
parent_stats=None,
progress_message="{table} cleanup_versions "
"pass {pass_number} current row # {row_number:,}",
):
"""
This routine will look for and remove versions where no material difference exists
between it and the prior version. That can happen during loads if rows come in out of order.
The routine can also optionally look for remove_spurious_deletes (see below).
It also checks for version dates that are not set correctly.
Parameters
----------
remove_spurious_deletes: boolean
(defaults to False): Should the routine check for and remove versions that tag a record as being
deleted only to be un-deleted later.
remove_redundant_versions: boolean
(defaults to opposite of auto_generate_key value): Should the routine delete rows that are exactly
the same as the previous.
lookup_name: str
Name passed into :meth:`define_lookup`
criteria_list : string or list of strings
Each string value will be passed to :meth:`sqlalchemy.sql.expression.Select.where`.
https://goo.gl/JlY9us
criteria_dict : dict
Dict keys should be columns, values are set using = or in
exclude_from_compare: frozenset
Columns to exclude from comparisons. Defaults to begin date, end date, and last update date.
Any values passed in are added to that list.
use_cache_as_source: bool
Attempt to read existing rows from the cache?
repeat_until_clean:
repeat loop until all issues are cleaned. Multiple bad rows in a key set can require multiple passes.
max_begin_date_warnings:
How many warning messages to print about bad begin dates
max_end_date_warnings:
How many warning messages to print about bad end dates
max_passes:
How many times should we loop over the dataset if we keep finding fixes.
Note: Some situations do require multiple passes to fully correct.
parent_stats: bi_etl.statistics.Statistics
Optional Statistics object to nest this steps statistics in.
Default is to place statistics in the ETLTask level statistics.
progress_message : str
The progress message to print.
"""
stats = self.get_unique_stats_entry('cleanup_versions', parent_stats=parent_stats)
stats.timer.start()
stats.ensure_exists('rows sent for delete')
self.commit()
# Figure out default remove_redundant_versions value if not provided
if remove_redundant_versions is None:
if self.auto_generate_key:
# Assume that tables with autogenerated keys are dimensions where we don't
# want to delete a surrogate key that might be referenced
remove_redundant_versions = False
else:
remove_redundant_versions = True
if exclude_from_compare is None:
exclude_from_compare = frozenset()
if lookup_name is None:
if self.natural_key is not None:
lookup_object = self.get_nk_lookup()
exclude_from_compare = exclude_from_compare | frozenset(self.primary_key)
else:
lookup_object = self.get_pk_lookup()
else:
lookup_object = self.get_lookup(lookup_name)
# Don't check in begin or end dates for differences
if self.begin_date_column not in exclude_from_compare:
exclude_from_compare = exclude_from_compare | {self.begin_date_column}
if self.end_date_column not in exclude_from_compare:
exclude_from_compare = exclude_from_compare | {self.end_date_column}
if self.last_update_date not in exclude_from_compare:
exclude_from_compare = exclude_from_compare | {self.last_update_date}
# Save and later restore the master trace_data setting.
# We're assuming caller doesn't want the read data traced, only the delete actions
trace_delete_data = self.trace_data
# Temporarily turn off read progress messages
saved_read_progress = self.progress_frequency
self.progress_frequency = None
begin_date_issues = 0
end_date_issues = 0
try:
self.trace_data = False
pass_number = 0
# Repeat until... see below
while True:
pass_number += 1
if pass_number > max_passes:
raise ValueError(f"cleanup_versions failed to get to clean state after {max_passes} passes.")
clean_pass = True
prior_row = None
prior_row_pk_values = None
second_prior_row = None
second_prior_row_pk_values = None
pending_cache_deletes = list()
if use_cache_as_source:
if criteria_list is not None:
self.log.warning("cleanup_versions had to use DB source to honor criteria_list")
use_cache_as_source = False
if not self.cache_filled:
self.log.warning(
f"cleanup_versions had to use DB source since cache_filled = {self.cache_filled}"
)
use_cache_as_source = False
if use_cache_as_source:
iterator = iter(lookup_object)
row_stat_name = 'rows read from cache'
else:
if not self.in_bulk_mode and pass_number == 1:
# Apply any pending updates and deletes
self.commit()
order_by = list(lookup_object.lookup_keys)
# Make sure begin date is not already in the order_by
if self.begin_date_column in order_by:
order_by.remove(self.begin_date_column)
# Add begin date as the last part of the order by
order_by.append(self.begin_date_column)
self.log.debug(f"ordering by {order_by}")
iterator = self.where(
criteria_list=criteria_list,
criteria_dict=criteria_dict,
order_by=order_by,
parent_stats=stats,
)
row_stat_name = 'rows read from table'
progress_timer = Timer()
rows_read = 0
# Note iterator needs to return the rows with the natural key
# grouped together (but not necessarily ordered) and in begin_date order.
for row in iterator:
rows_read += 1
if 0 < saved_read_progress <= progress_timer.seconds_elapsed:
progress_timer.reset()
self.log.info(
progress_message.format(
pass_number=pass_number,
row_number=rows_read,
table=self,
)
)
stats[row_stat_name] += 1
if row.status == RowStatus.deleted:
raise RuntimeError(f"iterator returned deleted row!\n row={repr(row)}")
# ===========================================================
# Debugging
# if row['person_srgt_key'] in (4550, 986):
# print repr(row)
# ===========================================================
if prior_row is not None:
row_pk_values = lookup_object.get_hashable_combined_key(row)
if row_pk_values != prior_row_pk_values:
# Change in PK.
# Reset second_prior_row
second_prior_row = None
second_prior_row_pk_values = None
# Check first begin date
if self.default_begin_date is not None:
if row[self.begin_date_column] > self.default_begin_date:
clean_pass = False
begin_date_issues += 1
if begin_date_issues < max_begin_date_warnings:
self.log.warning(
f"Correcting first version begin date for {row_pk_values} "
f"from {row[self.begin_date_column]} to {self.default_begin_date}"
)
elif begin_date_issues == max_begin_date_warnings:
self.log.warning("Limit reached for 'Correcting begin date' messages.")
# If not correct update the prior row end date
super().apply_updates(
row,
additional_update_values={
self.begin_date_column: self.default_begin_date
},
parent_stats=stats,
allow_insert=False,
)
stats.add_to_stat('rows with bad date sequence', 1)
# Check final end date on prior row (last in series)
if prior_row[self.end_date_column] != self.default_end_date:
end_date_issues += 1
if end_date_issues < max_end_date_warnings:
self.log.warning(
f"Correcting final version end date for {prior_row_pk_values} "
f"from {prior_row[self.end_date_column]} to {self.default_end_date}"
)
elif end_date_issues == max_end_date_warnings:
self.log.warning("Limit reached for 'Correcting end date' messages.")
# If not correct update the prior row end date
super().apply_updates(
prior_row,
additional_update_values={
self.end_date_column: self.default_end_date
},
parent_stats=stats,
allow_insert=False,
)
stats.add_to_stat('rows with bad date sequence', 1)
###########################################################################################
# Checks for subsequent versions after the first.
# (if the last row and this have the same natural key)
else:
# Check that rows are coming in order
if row[self.begin_date_column] < prior_row[self.begin_date_column]:
raise RuntimeError(
f"rows not in begin date order!\nrow={repr(row)}\nprior_row={repr(prior_row)}"
)
# Check that end dates are correct
correct_prior_end_date = row[self.begin_date_column] + self._end_date_timedelta
if prior_row[self.end_date_column] != correct_prior_end_date:
clean_pass = False
# If not correct update the prior row end date
end_date_issues += 1
if end_date_issues < max_end_date_warnings:
self.log.warning(
f"Correcting end date for {prior_row_pk_values} "
f"from {prior_row[self.end_date_column]} to {correct_prior_end_date}"
)
elif end_date_issues == max_end_date_warnings:
self.log.warning(
f"Limit ({max_end_date_warnings}) reached "
f"for 'Correcting end date' messages."
)
super().apply_updates(
prior_row,
additional_update_values={
self.end_date_column: correct_prior_end_date
},
parent_stats=stats,
allow_insert=False,
)
stats.add_to_stat('rows with bad date sequence', 1)
# Delete the previous row if it's a delete, and this following one is not
if (
remove_spurious_deletes
and prior_row[self.delete_flag] == self.delete_flag_yes
and row[self.delete_flag] == self.delete_flag_no
):
if trace_delete_data:
self.log.debug(f"deleting spurious logical delete row {prior_row}")
# We can't risk a change to cache as we iterate, so we keep a list of
# deletes to make in the cache
pending_cache_deletes.append((prior_row, second_prior_row))
clean_pass = False
prior_row = second_prior_row
prior_row_pk_values = second_prior_row_pk_values
second_prior_row = None
second_prior_row_pk_values = None
# Compare this row to last row, and delete
# this if all values are the same (and this is not a delete)
elif (remove_redundant_versions
and prior_row is not None
and prior_row.status != RowStatus.deleted):
differences = row.compare_to(
prior_row,
exclude=exclude_from_compare,
coerce_types=False
)
if len(differences) == 0:
if trace_delete_data:
self.log.debug(f"deleting spurious unchanged row {row}")
# Note: We can't remove from the cache in case we are iterating on that
pending_cache_deletes.append((row, prior_row))
stats['rows sent for delete'] += 1
clean_pass = False
if row.status != RowStatus.deleted:
second_prior_row = prior_row
prior_row = row
prior_row_pk_values = lookup_object.get_hashable_combined_key(row)
# ensure we don't accidentally refer to prior_row or second_prior_row anymore
del prior_row
del second_prior_row
# If using cache as source we didn't call physically_delete_version
# to delete from the db or cache, we need to do it now
for row, prior_row in pending_cache_deletes:
# TODO: Need to store & get second_prior_row in the pending del list
self.physically_delete_version(
row,
prior_row=prior_row,
remove_from_cache=True,
parent_stats=stats,
)
if row.status != RowStatus.deleted:
raise RuntimeError(
f"row not actually tagged as deleted row!\nrow={repr(row)}"
)
self.commit()
# Break out of the repeat loop if the pass was clean, or we aren't supposed to loop
if not clean_pass and repeat_until_clean:
self.log.debug(
f"Repeating cleanup_versions check. Completed pass {pass_number}."
)
self.log.debug(f"{stats['rows sent for delete']} rows deleted so far.")
self.log.debug(f"{begin_date_issues} begin date issues so far.")
self.log.debug(f"{end_date_issues} end date issues so far.")
else:
break
except Exception as e:
self.log.error(traceback.format_exc())
self.log.exception(e)
raise RuntimeError("cleanup_versions failed")
finally:
self.trace_data = trace_delete_data
# Restore read progress messages
self.progress_frequency = saved_read_progress
stats.timer.stop()
[docs]
def upsert_special_values_rows(
self,
stat_name: str = 'upsert_special_values_rows',
parent_stats: Statistics = None
):
"""
Send all special values rows to upsert to ensure they exist and are current.
Rows come from:
- :meth:`get_missing_row`
- :meth:`get_invalid_row`
- :meth:`get_not_applicable_row`
- :meth:`get_various_row`
Parameters
----------
stat_name:
Name of this step for the ETLTask statistics.
Default = 'upsert_special_values_rows'
parent_stats:
Optional Statistics object to nest this steps statistics in.
Default is to place statistics in the ETLTask level statistics.
"""
# By using the default begin date here we should force a type 1 update since
# the existing record, if any, would have the same begin date.
# If the existing record does not have that begin date, this will fail since
# we turned off auto_generate_key.
self.custom_special_values[self.begin_date_column] = self.default_begin_date
self.custom_special_values[self.end_date_column] = self.default_end_date
super().upsert_special_values_rows(
stat_name=stat_name,
parent_stats=parent_stats,
)
def _sql_insert_new(
self,
source_table: str,
matching_columns: typing.Iterable,
effective_date_column: str = None,
connection_name: str = 'sql_upsert',
):
now_str = self._sql_date_literal(self.get_current_time())
if self.inserts_use_default_begin_date or effective_date_column is None:
new_row_begin_date = self._sql_date_literal(self.default_begin_date)
else:
new_row_begin_date = effective_date_column
last_update_date_into_str = '-- No last update date'
last_update_date_select_str = '-- No last update date'
if self.last_update_date is not None:
last_update_date_into_str = f"{self.last_update_date}"
last_update_date_select_str = f"'{now_str}' as {self.last_update_date}"
if self.auto_generate_key:
type2_srgt_sql = 'max_srgt.max_srgt + ROW_NUMBER() OVER (order by 1)'
# Type 1 srgt
if self.type_1_surrogate is None:
insert_list_type_1 = '--No type 1 srgt present'
select_list_type_1 = '--No type 1 srgt present'
else:
insert_list_type_1 = f"{self.type_1_surrogate}"
select_list_type_1 = f"{type2_srgt_sql} as {self.type_1_surrogate}"
insert_new_sql = f"""
INSERT INTO {self.qualified_table_name} (
{self._sql_primary_key_name()},
{insert_list_type_1},
{self.delete_flag},
{self.begin_date_column},
{self.end_date_column},
{last_update_date_into_str},
{Table._sql_indent_join(matching_columns, 16, suffix=',')}
)
WITH max_srgt AS (SELECT coalesce(max({self._sql_primary_key_name()}),0) as max_srgt
FROM {self.qualified_table_name}
)
SELECT
{type2_srgt_sql} as {self._sql_primary_key_name()},
{select_list_type_1},
'{self.delete_flag_no}' as {self.delete_flag},
{new_row_begin_date} as {self.begin_date_column},
{self._sql_date_literal(self.default_end_date)} as {self.end_date_column},
{last_update_date_select_str},
{Table._sql_indent_join(matching_columns, 16, suffix=',')}
FROM max_srgt
CROSS JOIN
{source_table} s
WHERE NOT EXISTS(
SELECT 1
FROM {self.qualified_table_name} e
WHERE {Table._sql_indent_join(
[f"e.{nk_col} = s.{nk_col}"
for nk_col in self.natural_key], 18, prefix='AND '
)}
)
"""
else:
insert_new_sql = f"""
INSERT INTO {self.qualified_table_name} (
{Table._sql_indent_join(self.primary_key, 20, suffix=',')},
{self.delete_flag},
{self.begin_date_column},
{self.end_date_column},
{last_update_date_into_str}
{Table._sql_indent_join(matching_columns, 20, suffix=',')}
)
SELECT
{Table._sql_indent_join(self.primary_key, 20, suffix=',')},
'{self.delete_flag_no}' as {self.delete_flag},
{new_row_begin_date} as {self.begin_date_column},
{self._sql_date_literal(self.default_end_date)} as {self.end_date_column},
{last_update_date_select_str}
{Table._sql_indent_join(matching_columns, 20, suffix=',')}
FROM {source_table} s
WHERE NOT EXISTS(
SELECT 1
FROM {self.qualified_table_name} e
WHERE {Table._sql_indent_join(
[f"e.{key_col} = s.{key_col}"
for key_col in self.primary_key], 18, prefix='AND '
)}
)
"""
self.log.debug('')
self.log.debug('=' * 80)
self.log.debug('insert_new_sql')
self.log.debug('=' * 80)
self.log.debug(insert_new_sql)
sql_timer = Timer()
results = self.execute(insert_new_sql, connection_name=connection_name)
self.log.info(f"Impacted rows = {results.rowcount} (meaningful count? {results.supports_sane_rowcount()})")
self.log.info(f"Execution time ={sql_timer.seconds_elapsed_formatted}")
def _sql_find_updates(
self,
source_table: ReadOnlyTable,
source_effective_date_column: str,
key_columns_set: typing.Iterable,
non_nk_matching_columns: typing.Iterable,
target_only_columns: set,
updated_row_nk_table: str,
connection_name: str = 'sql_upsert',
):
self.database.drop_table_if_exists(updated_row_nk_table, connection_name=connection_name, auto_close=False, transaction=False)
find_updates_sql = f"""
CREATE TABLE {updated_row_nk_table} AS
SELECT
{Table._sql_indent_join([f"e.{nk_col}" for nk_col in key_columns_set], 16, suffix=',')},
s.{source_effective_date_column} as source_effective_date,
e.{self.begin_date_column} as target_begin_date,
e.{self.end_date_column} as target_end_date,
{Table._sql_indent_join([f"CASE WHEN {Table._sql_column_not_equals(m_col, 'e', 's')} THEN 1 ELSE 0 END as chg_in_{m_col}" for m_col in non_nk_matching_columns], 16, suffix=',')}
{Table._sql_indent_join([f'e.{col}' for col in target_only_columns], 16, suffix=',', prefix_if_not_empty=',')}
FROM {self.qualified_table_name} e
INNER JOIN
{source_table.qualified_table_name} s
ON {Table._sql_indent_join([f"e.{nk_col} = s.{nk_col}" for nk_col in key_columns_set], 19, prefix='AND ')}
AND s.{source_effective_date_column} BETWEEN e.{self.begin_date_column} AND e.{self.end_date_column}
WHERE (
{Table._sql_indent_join([Table._sql_column_not_equals(m_col, 'e', 's') for m_col in non_nk_matching_columns], 20, prefix='OR ')}
OR e.{self.delete_flag} != '{self.delete_flag_no}'
)
"""
self.log.debug('-' * 80)
self.log.debug('find_updates_sql')
self.log.debug('-' * 80)
sql_timer = Timer()
self.log.debug(find_updates_sql)
self.execute(find_updates_sql, connection_name=connection_name)
self.log.debug(f"Execution time ={sql_timer.seconds_elapsed_formatted}")
def _sql_insert_updates(
self,
source_table: ReadOnlyTable,
source_effective_date_column: str,
matching_columns: typing.Iterable,
non_nk_matching_columns: typing.Iterable,
target_only_columns: set,
updated_row_nk_table: str,
connection_name: str = 'sql_upsert',
):
now_str = self._sql_date_literal(self.get_current_time())
last_update_date_into_str = '/* No last update date */'
last_update_date_select_str = '/* No last update date */'
if self.last_update_date is not None:
last_update_date_into_str = f"{self.last_update_date},"
last_update_date_select_str = f"'{now_str}' as {self.last_update_date},"
if self.auto_generate_key:
primary_key_name = self.primary_key[0]
insert_updates_sql = f"""
INSERT INTO {self.qualified_table_name} (
{primary_key_name},
{self.delete_flag},
{self.begin_date_column},
{self.end_date_column},
{last_update_date_into_str}
{Table._sql_indent_join(matching_columns, 16, suffix=',')}
{Table._sql_indent_join(target_only_columns, 16, suffix=',', prefix_if_not_empty=',')}
)
WITH max_srgt AS (SELECT coalesce(max({primary_key_name}),0) as max_srgt
FROM {self.qualified_table_name}
)
SELECT
max_srgt.max_srgt + ROW_NUMBER() OVER (order by 1) as {primary_key_name},
'{self.delete_flag_no}' as {self.delete_flag},
u.source_effective_date as {self.begin_date_column},
u.target_end_date as {self.end_date_column},
{last_update_date_select_str}
{Table._sql_indent_join([f's.{col}' for col in matching_columns], 16, suffix=',')}
{Table._sql_indent_join([f'u.{col}' for col in target_only_columns], 16, suffix=',', prefix_if_not_empty=',')}
FROM max_srgt
CROSS JOIN
{source_table.qualified_table_name} s
INNER JOIN
{updated_row_nk_table} u
ON {Table._sql_indent_join([f"u.{nk_col} = s.{nk_col}" for nk_col in self.natural_key], 18, prefix='AND')}
AND s.{source_effective_date_column} = u.source_effective_date
"""
else:
insert_updates_sql = f"""
INSERT INTO {self.qualified_table_name} (
{Table._sql_indent_join([key_col for key_col in self.primary_key], 16, suffix=',')},
{self.delete_flag},
{self.begin_date_column},
{self.end_date_column},
{last_update_date_into_str}
{Table._sql_indent_join(non_nk_matching_columns, 16, suffix=',')}
{Table._sql_indent_join(target_only_columns, 16, suffix=',', prefix_if_not_empty=',')}
)
SELECT
{Table._sql_indent_join([f"u.{key_col}" for key_col in self.primary_key], 16, suffix=',')},
'{self.delete_flag_no}' as {self.delete_flag},
u.source_effective_date as {self.begin_date_column},
u.target_end_date as {self.end_date_column},
{last_update_date_select_str}
{Table._sql_indent_join([f's.{col}' for col in non_nk_matching_columns], 16, suffix=',')}
{Table._sql_indent_join([f'u.{col}' for col in target_only_columns], 16, suffix=',', prefix_if_not_empty=',')}
FROM {source_table.qualified_table_name} s
INNER JOIN
{updated_row_nk_table} u
ON {Table._sql_indent_join([f"u.{key_col} = s.{key_col}" for key_col in self.primary_key], 18, prefix='AND')}
AND s.{source_effective_date_column} = u.source_effective_date
"""
self.log.debug('-' * 80)
self.log.debug('insert_updates_sql')
self.log.debug('-' * 80)
self.log.debug(insert_updates_sql)
sql_timer = Timer()
results = self.execute(insert_updates_sql, connection_name=connection_name)
self.log.debug(f"Impacted rows = {results.rowcount:,} (meaningful count? {results.supports_sane_rowcount()})")
self.log.debug(f"Execution time ={sql_timer.seconds_elapsed_formatted}")
def _sql_find_deletes(
self,
source_table: ReadOnlyTable,
key_columns_set: typing.Iterable,
deleted_row_nk_table: str,
connection_name: str = 'sql_upsert',
):
# self.execute(f"DROP TABLE IF EXISTS {updated_row_nk_table}")
now_str = self._sql_date_literal(self.get_current_time())
self.database.drop_table_if_exists(deleted_row_nk_table, connection_name=connection_name, auto_close=False, transaction=False)
find_deletes_sql = f"""
CREATE TABLE {deleted_row_nk_table} AS
SELECT
{Table._sql_indent_join([f"e.{nk_col}" for nk_col in key_columns_set], 16, suffix=',')},
{now_str} as source_effective_date,
e.{self.begin_date_column} as target_begin_date,
e.{self.end_date_column} as target_end_date
FROM {self.qualified_table_name} e
WHERE {now_str} BETWEEN e.{self.begin_date_column} AND e.{self.end_date_column}
AND NOT EXISTS (
SELECT 1
FROM {source_table.qualified_table_name} s
WHERE {Table._sql_indent_join([f"e.{nk_col} = s.{nk_col}" for nk_col in key_columns_set], 19, prefix='AND ')}
)
"""
self.log.debug('-' * 80)
self.log.debug('find_deletes_sql')
self.log.debug('-' * 80)
sql_timer = Timer()
self.log.debug(find_deletes_sql)
self.execute(find_deletes_sql, connection_name=connection_name)
self.log.debug(f"Execution time ={sql_timer.seconds_elapsed_formatted}")
def _sql_insert_deletes(
self,
target_column_set: typing.Iterable,
metadata_column_set: typing.Iterable,
deleted_row_nk_table: str,
connection_name: str = 'sql_upsert',
):
target_non_metadata_column_set = set(target_column_set) - set(metadata_column_set)
now_str = self._sql_date_literal(self.get_current_time())
last_update_date_into_str = '/* No last update date */'
last_update_date_select_str = '/* No last update date */'
if self.last_update_date is not None:
last_update_date_into_str = f"{self.last_update_date},"
last_update_date_select_str = f"'{now_str}' as {self.last_update_date},"
if self.auto_generate_key:
primary_key_name = self.primary_key[0]
insert_delete_versions_sql = f"""
INSERT INTO {self.qualified_table_name} (
{primary_key_name},
{self.delete_flag},
{self.begin_date_column},
{self.end_date_column},
{last_update_date_into_str}
{Table._sql_indent_join(target_non_metadata_column_set, 20, suffix=',')}
)
WITH max_srgt AS (SELECT coalesce(max({primary_key_name}),0) as max_srgt
FROM {self.qualified_table_name}
)
SELECT
max_srgt.max_srgt + ROW_NUMBER() OVER (order by 1) as {primary_key_name},
'{self.delete_flag_yes}' as {self.delete_flag},
u.source_effective_date as {self.begin_date_column},
u.target_end_date as {self.end_date_column},
{last_update_date_select_str}
{Table._sql_indent_join([f'e.{col}' for col in target_non_metadata_column_set], 20, suffix=',')}
FROM max_srgt
CROSS JOIN
{self.qualified_table_name} e
INNER JOIN
{deleted_row_nk_table} u
ON {Table._sql_indent_join([f"u.{nk_col} = e.{nk_col}" for nk_col in self.natural_key], 18, prefix='AND')}
AND u.target_begin_date = e.{self.begin_date_column}
"""
else:
insert_delete_versions_sql = f"""
INSERT INTO {self.qualified_table_name} (
{Table._sql_indent_join([key_col for key_col in self.primary_key], 16, suffix=',')},
{self.delete_flag},
{self.begin_date_column},
{self.end_date_column},
{last_update_date_into_str}
{Table._sql_indent_join(target_non_metadata_column_set, 20, suffix=',')}
)
SELECT
{Table._sql_indent_join([f"u.{key_col}" for key_col in self.primary_key], 20, suffix=',')},
'{self.delete_flag_yes}' as {self.delete_flag},
u.source_effective_date as {self.begin_date_column},
u.target_end_date as {self.end_date_column},
{last_update_date_select_str}
{Table._sql_indent_join([f'e.{col}' for col in target_non_metadata_column_set], 16, suffix=',')}
FROM {self.qualified_table_name} e
INNER JOIN
{deleted_row_nk_table} u
ON {Table._sql_indent_join([f"u.{key_col} = e.{key_col}" for key_col in self.primary_key], 18, prefix='AND')}
AND u.target_begin_date = e.{self.begin_date_column}
"""
self.log.debug('-' * 80)
self.log.debug('insert_delete_versions_sql')
self.log.debug('-' * 80)
self.log.debug(insert_delete_versions_sql)
sql_timer = Timer()
results = self.execute(insert_delete_versions_sql, connection_name=connection_name)
self.log.debug(f"Impacted rows = {results.rowcount:,} (meaningful count? {results.supports_sane_rowcount()})")
self.log.debug(f"Execution time ={sql_timer.seconds_elapsed_formatted}")
[docs]
def sql_upsert(
self,
source_table: ReadOnlyTable,
source_effective_date_column: str,
source_excludes: typing.Optional[frozenset] = None,
target_excludes: typing.Optional[frozenset] = None,
skip_update_check_on: typing.Optional[frozenset] = None,
check_for_deletes: bool = None,
connection_name: str = 'sql_upsert',
temp_table_prefix: str = '',
# Only set commit_each_table to True for debugging purposes
commit_each_table: bool = False,
stat_name: str = 'upsert_db_exclusive',
parent_stats: typing.Optional[Statistics] = None,
):
upsert_db_excl_stats = self.get_stats_entry(stat_name, parent_stats=parent_stats)
upsert_db_excl_stats.print_start_stop_times = False
upsert_db_excl_stats.timer.start()
upsert_db_excl_stats['calls'] += 1
self.transaction(connection_name)
source_row_columns_set = source_table.column_names_set
if source_excludes is not None:
source_row_columns_set = source_row_columns_set - source_excludes
target_column_set = self.column_names_set
if target_excludes is not None:
target_column_set = target_column_set - target_excludes
version_date_column_set = {self.begin_date_column, self.end_date_column}
metadata_column_set = set(version_date_column_set)
if self.delete_flag is not None:
metadata_column_set.add(self.delete_flag)
if self.last_update_date is not None:
metadata_column_set.add(self.last_update_date)
metadata_column_set.update(self.primary_key)
matching_columns = {column_name for column_name in source_row_columns_set if
column_name in target_column_set and column_name not in metadata_column_set}
target_only_columns = {column_name for column_name in target_column_set if
column_name not in (source_row_columns_set | metadata_column_set)}
compare_columns = matching_columns
if self.natural_key is not None:
if len(self.natural_key) == 0:
raise ValueError(f"Empty natural key for {self}")
if not self.autogenerate_key:
raise ValueError(
f"Natural key for {self} when autogenerate_key = False and primary key is {self.primary_key}"
)
key_columns_set = set(self.natural_key)
elif self.primary_key is not None and len(self.primary_key) > 0:
key_columns_set = set(self.primary_key)
else:
raise ValueError(f"No primary key or natural key for {self}")
non_nk_matching_columns = matching_columns - key_columns_set
if self.delete_flag is not None:
if check_for_deletes is None:
check_for_deletes = True
if self.delete_flag in non_nk_matching_columns:
non_nk_matching_columns.remove(self.delete_flag)
if self.delete_flag in target_only_columns:
target_only_columns.remove(self.delete_flag)
else:
if check_for_deletes is None:
check_for_deletes = False
if self.last_update_date is not None:
compare_columns -= {self.last_update_date}
if self.last_update_date in non_nk_matching_columns:
non_nk_matching_columns.remove(self.last_update_date)
if self.last_update_date in target_only_columns:
target_only_columns.remove(self.last_update_date)
if len(self.primary_key) != 1:
raise ValueError(f"{self}.upsert_db_exclusive requires single primary_key column. Got {self.primary_key}")
# Insert rows for new Natural Keys
self._sql_insert_new(
source_table=source_table.qualified_table_name,
matching_columns=matching_columns,
connection_name=connection_name,
)
if commit_each_table:
self.commit()
self.transaction()
self.log.debug("Commit done")
updated_row_nk_table = f"{temp_table_prefix}{self.table_name}_updated_row_nk"
updated_row_nk_table = self._safe_temp_table_name(updated_row_nk_table)
self._sql_find_updates(
source_table=source_table,
source_effective_date_column=source_effective_date_column,
key_columns_set=key_columns_set,
non_nk_matching_columns=non_nk_matching_columns,
target_only_columns=target_only_columns,
updated_row_nk_table=updated_row_nk_table,
connection_name=connection_name,
)
if commit_each_table:
self.commit()
self.transaction()
self.log.debug("Commit done")
self._sql_insert_updates(
source_table=source_table,
source_effective_date_column=source_effective_date_column,
matching_columns=matching_columns,
non_nk_matching_columns=non_nk_matching_columns,
target_only_columns=target_only_columns,
updated_row_nk_table=updated_row_nk_table,
connection_name=connection_name,
)
if commit_each_table:
self.commit()
self.transaction()
self.log.debug("Commit done")
joins = [(nk_col, ("u", nk_col)) for nk_col in key_columns_set]
joins.append((self.begin_date_column, ('u', 'target_begin_date')))
self._sql_update_from(
update_name='update_retire',
source_sql=f"{updated_row_nk_table} u",
list_of_joins=joins,
extra_where="u.source_effective_date != u.target_begin_date",
list_of_sets=[
(self.end_date_column,
self._sql_add_timedelta('u.source_effective_date', self._end_date_timedelta)
),
],
connection_name=connection_name,
)
if commit_each_table:
self.commit()
self.transaction()
self.log.debug("Commit done")
if check_for_deletes:
deleted_row_nk_table = f"{temp_table_prefix}{self.table_name}_del_row_nk"
deleted_row_nk_table = self._safe_temp_table_name(deleted_row_nk_table)
self._sql_find_deletes(
source_table=source_table,
key_columns_set=key_columns_set,
deleted_row_nk_table=deleted_row_nk_table,
connection_name=connection_name,
)
if commit_each_table:
self.commit()
self.transaction()
self.log.debug("Commit done")
# Retire versions
joins = [(nk_col, ("d", nk_col)) for nk_col in key_columns_set]
joins.append((self.begin_date_column, ('d', 'target_begin_date')))
self._sql_update_from(
update_name='delete_retire',
source_sql=f"{deleted_row_nk_table} d",
list_of_joins=joins,
extra_where="d.source_effective_date != d.target_begin_date",
list_of_sets=[
(self.end_date_column,
self._sql_add_timedelta('d.source_effective_date', self._end_date_timedelta)
),
],
connection_name=connection_name,
)
if commit_each_table:
self.commit()
self.transaction()
self.log.debug("Commit done")
self._sql_insert_deletes(
target_column_set=target_column_set,
metadata_column_set=metadata_column_set,
deleted_row_nk_table=deleted_row_nk_table,
connection_name=connection_name,
)
self.log.debug('=' * 80)
upsert_db_excl_stats.timer.stop()