import textwrap
import typing
from bi_etl.components.hst_table import HistoryTable
from bi_etl.components.readonlytable import ReadOnlyTable
from bi_etl.components.row.row import Row
from bi_etl.components.row.row_status import RowStatus
from bi_etl.components.table import Table
from bi_etl.database import DatabaseMetadata
from bi_etl.exceptions import NoResultFound
from bi_etl.scheduler.task import ETLTask
from bi_etl.statistics import Statistics
from bi_etl.timer import Timer
__all__ = ['HistoryTableSourceBased']
# noinspection PyAbstractClass
[docs]
class HistoryTableSourceBased(HistoryTable):
"""
ETL target component for a table that stores history of updates where the source provides the dates to use.
Also usable as a source.
Example:
Target table currently has:
2016 - A
2017 - B
2018 - C
Source Deletes 2017 version. A normal HistoryTable load would not touch the 2017 version since it was not
provided in the source.
The desired output is:
2016 - A
2017 - A
2018 - C
This component handles that update by stepping through the provided source versions and existing target versions
updating each target version as required so it matches the closest source version earlier or equal in time.
Parameters
----------
task : ETLTask
The instance to register in (if not None)
database : bi_etl.scheduler.task.Database
The database to find the table/view in.
table_name : str
The name of the table/view.
exclude_columns : frozenset
Optional. A list of columns to exclude from the table/view. These columns will not be included in SELECT, INSERT, or UPDATE statements.
Attributes
----------
auto_generate_key: boolean
Should the primary key be automatically generated by the insert/upsert process?
If True, the process will get the current maximum value and then increment it with each insert.
begin_date: str
Name of the begin date field
end_date: str
Name of the end date field
inserts_use_default_begin_date: boolean
Should inserts use the default begin date instead of the class effective date
This allows records to match up in joins with other history tables where the effective
date of the 'primary' might be before the first version effective date.
Default = True
default_begin_date: date
Default begin date to assign for begin_date.
Used for new records if inserts_use_default_begin_date is True.
Also used for
:meth:`get_missing_row`,
:meth:`get_invalid_row`,
:meth:`get_not_applicable_row`,
:meth:`get_various_row`
Default = 1900-1-1
default_end_date: date
Default begin date to assign for end_date for active rows.
Also used for
:meth:`get_missing_row`,
:meth:`get_invalid_row`,
:meth:`get_not_applicable_row`,
:meth:`get_various_row`
Default = 9999-1-1
auto_generate_key: boolean
Should the primary key be automatically generated by the insert/upsert process?
If True, the process will get the current maximum value and then increment it with each insert.
(inherited from Table)
batch_size: int
How many rows should be insert/update/deleted in a single batch.
(inherited from Table)
delete_flag : str
The name of the delete_flag column, if any.
Optional.
(inherited from ReadOnlyTable)
delete_flag_yes : str
The value of delete_flag for deleted rows.
Optional.
(inherited from ReadOnlyTable)
delete_flag_no : str
The value of delete_flag for *not* deleted rows.
Optional.
(inherited from ReadOnlyTable)
default_date_format: str
The date parsing format to use for str -> date conversions.
If more than one date format exists in the source, then explicit conversions will be required.
Default = '%m/%d/%Y'
(inherited from Table)
force_ascii: boolean
Should text values be forced into the ascii character set before passing to the database?
Default = False
(inherited from Table)
last_update_date: str
Name of the column which we should update when table updates are made.
Default = None
(inherited from Table)
log_first_row : boolean
Should we log progress on the first row read. *Only applies if used as a source.*
(inherited from ETLComponent)
max_rows : int
The maximum number of rows to read. *Only applies if Table is used as a source.*
Optional.
(inherited from ETLComponent)
primary_key: list
The name of the primary key column(s). Only impacts trace messages. Default=None.
If not passed in, will use the database value, if any.
(inherited from ETLComponent)
natural_key: list
The list of natural key columns (as Column objects).
The default is the list of non-begin/end date primary key columns.
The default is *not* appropriate for dimension tables with surrogate keys.
progress_frequency : int
How often (in seconds) to output progress messages.
Optional.
(inherited from ETLComponent)
progress_message : str
The progress message to print.
Optional. Default is ``"{logical_name} row # {row_number}"``. Note ``logical_name`` and ``row_number``
substitutions applied via :func:`format`.
(inherited from ETLComponent)
special_values_descriptive_columns: list
A list of columns that get longer descriptive text in
:meth:`get_missing_row`,
:meth:`get_invalid_row`,
:meth:`get_not_applicable_row`,
:meth:`get_various_row`
Optional.
(inherited from ReadOnlyTable)
track_source_rows: boolean
Should the :meth:`upsert` method keep a set container of source row keys that it has processed?
That set would then be used by :meth:`update_not_processed`, :meth:`logically_delete_not_processed`,
and :meth:`delete_not_processed`.
(inherited from Table)
type_1_surrogate: str
The name of the type 1 surrogate key. The value is automatically generated as equal to the type 2 key on
inserts and equal to the existing value on updates.
Optional.
"""
[docs]
def __init__(self,
task: typing.Optional[ETLTask],
database: DatabaseMetadata,
table_name: str,
table_name_case_sensitive: bool = None,
schema: str = None,
exclude_columns: frozenset = None,
**kwargs
):
# Don't pass kwargs up to super. They should be set at the end of this method
super().__init__(task=task,
database=database,
table_name=table_name,
schema=schema,
table_name_case_sensitive=table_name_case_sensitive,
exclude_columns=exclude_columns,
)
self.prior_upsert_key = None
self.prior_upsert_lookup_name = None
self.prior_upsert_existing_rows_list = list()
self.prior_upsert_existing_row_index = 0
self.prior_upsert_row = None
self.prior_upsert_update_callback = None
self.prior_upsert_insert_callback = None
self.prior_upsert_stat_name = None
self.prior_upsert_parent_stats = None
# Should be the last call of every init
self.set_kwattrs(**kwargs)
def _upsert_version_set(
self,
source_mapped_as_target_row: Row,
lookup_name: str = None,
skip_update_check_on: list = None,
do_not_update: list = None,
additional_update_values: dict = None,
additional_insert_values: dict = None,
update_callback: typing.Callable[[list, Row], None] = None,
insert_callback: typing.Callable[[Row], None] = None,
source_excludes: frozenset = None,
target_excludes: frozenset = None,
stat_name: str = 'upsert',
parent_stats: Statistics = None,
):
"""
Update (if changed) or Insert a row in the table.
Returns the row found/inserted, with the auto-generated key (if that feature is enabled)
Parameters
----------
source_mapped_as_target_row: :class:`~bi_etl.components.row.row_case_insensitive.Row`
Row to upsert
lookup_name: str
The name of the lookup (see :meth:`define_lookup`) to use when searching for an existing row.
skip_update_check_on: list
List of column names to not compare old vs new for updates.
do_not_update: list
List of columns to never update.
additional_update_values: dict
Additional updates to apply when updating
additional_insert_values: dict
Additional values to set on each row when inserting.
update_callback: func
Function to pass updated rows to. Function should not modify row.
insert_callback: func
Function to pass inserted rows to. Function should not modify row.
source_excludes:
list of source columns to exclude when mapping to this Table.
target_excludes:
list of Table columns to exclude when mapping from the source row
stat_name: string
Name of this step for the ETLTask statistics. Default = 'upsert'
parent_stats: bi_etl.statistics.Statistics
Optional Statistics object to nest this steps statistics in.
Default is to place statistics in the ETLTask level statistics.
"""
# This should be the (or a) natural key lookup
lookup = self.get_lookup(lookup_name)
nk_tuple = lookup.get_hashable_combined_key(source_mapped_as_target_row)
if self.prior_upsert_key != nk_tuple or self.prior_upsert_lookup_name != lookup_name:
# Check if this is the first row and not the end of a batch
if self.prior_upsert_key is not None:
# This is the end of a batch
self.finish_pending_existing_rows()
# Reset for the next list
self.prior_upsert_row = None
self.prior_upsert_key = nk_tuple
self.prior_upsert_lookup_name = lookup_name
fallback_to_db = self.always_fallback_to_db or not self.cache_clean
try:
existing_rows = lookup.find_versions_list(self.prior_upsert_key, fallback_to_db=fallback_to_db)
except NoResultFound:
# No existing rows were found, insert
new_row = source_mapped_as_target_row
if self.inserts_use_default_begin_date:
new_row.set_keeping_parent(self.begin_date_column, self.default_begin_date)
if additional_insert_values:
for colName, value in additional_insert_values.items():
new_row[colName] = value
new_row = self.insert_row(new_row, parent_stats=parent_stats)
if insert_callback:
insert_callback(new_row)
return new_row
# We only reach here if a result was found from the lookup
self.prior_upsert_existing_rows_list = existing_rows
self.prior_upsert_existing_row_index = 0
self.prior_upsert_update_callback = update_callback
self.prior_upsert_insert_callback = insert_callback
self.prior_upsert_stat_name = stat_name
self.prior_upsert_parent_stats = parent_stats
if self.inserts_use_default_begin_date:
# Force first row to use the default begin date (year 1900)
row_begin_date = self.default_begin_date
source_mapped_as_target_row.set_keeping_parent(self.begin_date_column, row_begin_date)
else:
row_begin_date = source_mapped_as_target_row[self.begin_date_column]
else:
row_begin_date = source_mapped_as_target_row[self.begin_date_column]
# Check that rows are coming in order
if self.prior_upsert_row is not None:
if row_begin_date < self.prior_upsert_row[self.begin_date_column]:
raise RuntimeError(
textwrap.dedent(f"""\
rows not in begin date order!
nk_tuple = {nk_tuple}
row = {repr(source_mapped_as_target_row)}
prior_upsert_row = {repr(self.prior_upsert_row)}
""")
)
# Nested join over existing rows and pending upsert rows
first_new_row = None
get_next_new_row = False
upsert_row = source_mapped_as_target_row
while not get_next_new_row:
needs_upsert = True
if self.prior_upsert_existing_row_index >= len(self.prior_upsert_existing_rows_list):
# There is no next existing row, use a max date value
existing_begin_date = self.default_end_date
existing_row = None
else:
existing_row = self.prior_upsert_existing_rows_list[self.prior_upsert_existing_row_index]
existing_begin_date = existing_row[self.begin_date_column]
if row_begin_date < existing_begin_date:
# New date is before all remaining existing rows
upsert_row = source_mapped_as_target_row
get_next_new_row = True
elif row_begin_date == existing_begin_date:
# New date equal to existing date
upsert_row = source_mapped_as_target_row
get_next_new_row = True
self.prior_upsert_existing_row_index += 1
else:
# New row begin > Existing date
if self.prior_upsert_row is not None:
excludes = set(self.primary_key)
excludes.add(self.end_date_column)
if source_excludes is not None:
excludes.update(source_excludes)
upsert_row = self.prior_upsert_row.subset(exclude=excludes)
upsert_row.set_keeping_parent(self.begin_date_column, existing_begin_date)
self.prior_upsert_existing_row_index += 1
else:
# No prior row, keep existing data
upsert_row = existing_row
self.prior_upsert_existing_row_index += 1
# As the logic stands now we could skip the upsert entirely in this scenario
needs_upsert = False
if needs_upsert:
new_row = super().upsert(
source_row=upsert_row,
lookup_name=lookup_name,
skip_update_check_on=skip_update_check_on,
do_not_update=do_not_update,
additional_update_values=additional_update_values,
additional_insert_values=additional_insert_values,
update_callback=update_callback,
insert_callback=insert_callback,
source_excludes=source_excludes,
target_excludes=target_excludes,
stat_name=stat_name,
parent_stats=parent_stats,
)
if first_new_row is None:
first_new_row = new_row
self.prior_upsert_row = upsert_row
return first_new_row
[docs]
def finish_pending_existing_rows(self):
if self.prior_upsert_row is not None:
for existing_row in self.prior_upsert_existing_rows_list[self.prior_upsert_existing_row_index:]:
existing_begin_date = existing_row[self.begin_date_column]
excludes = set(self.primary_key)
excludes.add(self.end_date_column)
upsert_row = self.prior_upsert_row.subset(exclude=excludes)
upsert_row.set_keeping_parent(self.begin_date_column, existing_begin_date)
super().upsert(
source_row=upsert_row,
lookup_name=self.prior_upsert_lookup_name,
update_callback=self.prior_upsert_update_callback,
insert_callback=self.prior_upsert_insert_callback,
stat_name=self.prior_upsert_stat_name,
parent_stats=self.prior_upsert_parent_stats,
)
else:
pass
else:
pass
[docs]
def upsert(self,
source_row: typing.Union[Row, typing.List[Row]],
lookup_name: str = None,
skip_update_check_on: list = None,
do_not_update: list = None,
additional_update_values: dict = None,
additional_insert_values: dict = None,
update_callback: typing.Callable[[list, Row], None] = None,
insert_callback: typing.Callable[[Row], None] = None,
source_excludes: frozenset = None,
target_excludes: frozenset = None,
stat_name: str = 'upsert',
parent_stats: Statistics = None,
**kwargs
):
"""
Update (if changed) or Insert a row in the table.
Returns the row found/inserted, with the auto-generated key (if that feature is enabled)
Parameters
----------
source_row: :class:`~bi_etl.components.row.row_case_insensitive.Row`
Row to upsert
lookup_name: str
The name of the lookup (see :meth:`define_lookup`) to use when searching for an existing row.
skip_update_check_on: list
List of column names to not compare old vs new for updates.
do_not_update: list
List of columns to never update.
additional_update_values: dict
Additional updates to apply when updating
additional_insert_values: dict
Additional values to set on each row when inserting.
update_callback: func
Function to pass updated rows to. Function should not modify row.
insert_callback: func
Function to pass inserted rows to. Function should not modify row.
source_excludes:
list of source columns to exclude when mapping to this Table.
target_excludes:
list of Table columns to exclude when mapping from the source row
stat_name: string
Name of this step for the ETLTask statistics. Default = 'upsert'
parent_stats: bi_etl.statistics.Statistics
Optional Statistics object to nest this steps statistics in.
Default is to place statistics in the ETLTask level statistics.
kwargs:
effective_date: datetime
The effective date to use for the update
"""
stats = self.get_stats_entry(stat_name, parent_stats=parent_stats)
stats.timer.start()
stats['upsert source row count'] += 1
self._set_upsert_mode()
self.begin()
effective_date = kwargs.get('effective_date')
if effective_date is None:
if self.begin_date_column in source_row and source_row[self.begin_date_column] is not None:
pass
else:
source_row.set_keeping_parent(self.begin_date_column, self.default_effective_date)
else:
date_coerce = self.get_coerce_method(self.begin_date_column)
source_row.set_keeping_parent(self.begin_date_column, date_coerce(effective_date))
target_excludes = self._target_excludes_for_updates(target_excludes)
source_mapped_as_target_row = self.build_row(source_row=source_row,
source_excludes=source_excludes,
target_excludes=target_excludes,
parent_stats=stats,
)
if self.track_source_rows:
# Keep track of source records so we can check if target rows don't exist in source
self.source_keys_processed.add(self.get_natural_key_tuple(source_mapped_as_target_row))
if lookup_name is None:
lookup = self.get_default_lookup(source_row.iteration_header)
lookup_name = lookup.lookup_name
else:
lookup = self.get_lookup(lookup_name)
if len(lookup.lookup_keys) == 1: # Primary key, it can't be anything else unless not valid.
# PK lookup doesn't need to deal with list of dates, so call the parent upsert routine directly
return super().upsert(
source_row=source_mapped_as_target_row,
lookup_name=lookup_name,
skip_update_check_on=skip_update_check_on,
do_not_update=do_not_update,
additional_update_values=additional_update_values,
additional_insert_values=additional_insert_values,
update_callback=update_callback,
insert_callback=insert_callback,
source_excludes=source_excludes,
target_excludes=target_excludes,
stat_name=stat_name,
parent_stats=parent_stats,
)
else:
return self._upsert_version_set(
source_mapped_as_target_row=source_mapped_as_target_row,
lookup_name=lookup_name,
skip_update_check_on=skip_update_check_on,
do_not_update=do_not_update,
additional_update_values=additional_update_values,
additional_insert_values=additional_insert_values,
update_callback=update_callback,
insert_callback=insert_callback,
source_excludes=source_excludes,
target_excludes=target_excludes,
stat_name=stat_name,
parent_stats=parent_stats,
)
[docs]
def update_not_in_set(
self,
updates_to_make: typing.Union[dict, Row],
set_of_key_tuples: set,
lookup_name: str = None,
criteria_list: list = None,
criteria_dict: dict = None,
use_cache_as_source: bool = True,
progress_frequency: int = None,
stat_name: str = 'update_not_in_set',
parent_stats: Statistics = None,
**kwargs
):
"""
Applies update to rows matching criteria that are not in the list_of_key_tuples pass in.
Parameters
----------
updates_to_make: :class:`Row`
Row or dict of updates to make
set_of_key_tuples: set
Set of tuples comprising the primary key values. This list represents the rows that should *not* be updated.
lookup_name: str
The name of the lookup (see :meth:`define_lookup`) to use when searching for an existing row.
criteria_list : string or list of strings
Each string value will be passed to :meth:`sqlalchemy.sql.expression.Select.where`.
https://goo.gl/JlY9us
criteria_dict : dict
Dict keys should be columns, values are set using = or in
use_cache_as_source: bool
Attempt to read existing rows from the cache?
stat_name: string
Name of this step for the ETLTask statistics. Default = 'delete_not_in_set'
progress_frequency: int
How often (in seconds) to output progress messages. Default = 10. None for no progress messages.
parent_stats: bi_etl.statistics.Statistics
Optional Statistics object to nest this steps statistics in.
Default is to place statistics in the ETLTask level statistics.
kwargs:
effective_date: datetime
The effective date to use for the update
"""
if criteria_dict is None:
criteria_dict = {}
criteria_dict[self.delete_flag] = self.delete_flag_no
stats = self.get_unique_stats_entry(stat_name, parent_stats=parent_stats)
stats['rows read'] = 0
stats['updates count'] = 0
stats.timer.start()
self.begin()
if progress_frequency is None:
progress_frequency = self.progress_frequency
progress_timer = Timer()
# Turn off read progress reports
saved_progress_frequency = self.progress_frequency
self.progress_frequency = None
if lookup_name is None:
lookup = self.get_nk_lookup()
else:
lookup = self.get_lookup(lookup_name)
# Ensure that PK columns are returned in addition to whatever lookup is used
keys = list(lookup.lookup_keys)
for pk_col in self.primary_key:
if pk_col not in keys:
keys.append(pk_col)
# Note, here we select only lookup + PK columns from self
row_iter = self.where(column_list=keys,
criteria_list=criteria_list,
criteria_dict=criteria_dict,
use_cache_as_source=use_cache_as_source,
parent_stats=stats)
# Save the iterator as a list since we'll be modifying the cache as we run through it
rows_list = list(row_iter)
for row in rows_list:
pk_val = row[self.primary_key[0]]
if not isinstance(pk_val, int):
raise ValueError(f"Row has {pk_val} (not int) primary key value for {self.primary_key[0]} in {row.as_dict}")
elif pk_val < 0:
# Skip special rows
continue
if row.status == RowStatus.unknown:
pass
stats['rows read'] += 1
existing_key = lookup.get_hashable_combined_key(row)
if 0 < progress_frequency <= progress_timer.seconds_elapsed:
progress_timer.reset()
self.log.info(f"update_not_in_set current current row#={stats['rows read']:,} row key={existing_key} updates done so far = {stats['updates count']:,}")
if existing_key not in set_of_key_tuples:
stats['updates count'] += 1
# Direct update method. This will be slow due to new statement per update
# and also update statements per row
# self.update(updates_to_make, key_names=lookup.lookup_keys, key_values=existing_key, connection_name='update')
try:
# First we need the entire existing row (all versions)
target_row_list = lookup.find_versions_list(row)
except NoResultFound:
raise RuntimeError(
f"keys {row.as_key_value_list} found in database or cache but not found now by get_by_lookup"
)
for target_row in target_row_list:
# Then we can apply the updates to it using Table non-versioned updates
Table.apply_updates(
self=self,
row=target_row,
additional_update_values=updates_to_make,
allow_insert=False,
parent_stats=stats,
)
stats.timer.stop()
# Restore saved progress_frequency
self.progress_frequency = saved_progress_frequency
[docs]
def commit(self,
stat_name='commit',
parent_stats=None,
print_to_log=True,
connection_name: str = None,
begin_new: bool = True,
):
if len(self.prior_upsert_existing_rows_list) > 0:
# End the final batch
self.finish_pending_existing_rows()
super().commit(stat_name=stat_name,
parent_stats=parent_stats,
print_to_log=print_to_log,
connection_name=connection_name,
begin_new=begin_new,
)
[docs]
def sql_upsert(
self,
source_table: ReadOnlyTable,
source_excludes: typing.Optional[frozenset] = None,
target_excludes: typing.Optional[frozenset] = None,
skip_update_check_on: typing.Optional[frozenset] = None,
connection_name: str = 'sql_upsert',
stat_name: str = 'upsert_db_exclusive',
parent_stats: typing.Optional[Statistics] = None,
**kwargs
):
upsert_db_excl_stats = self.get_stats_entry(stat_name, parent_stats=parent_stats)
upsert_db_excl_stats.print_start_stop_times = False
upsert_db_excl_stats.timer.start()
upsert_db_excl_stats['calls'] += 1
source_row_columns_set = source_table.column_names_set
if source_excludes is not None:
source_row_columns_set = source_row_columns_set - source_excludes
target_column_set = self.column_names_set
if target_excludes is not None:
target_column_set = target_column_set - target_excludes
matching_columns = {column_name for column_name in source_row_columns_set if column_name in target_column_set}
compare_columns = matching_columns - {self.begin_date_column, self.end_date_column}
non_nk_columns = matching_columns - set(self.natural_key)
if self.last_update_date is not None:
compare_columns -= {self.last_update_date}
if len(self.primary_key) != 1:
raise ValueError(f"{self}.upsert_db_exclusive requires single primary_key column. Got {self.primary_key}")
primary_key_name = self.primary_key[0]
now_str = self.get_current_time().isoformat()
# TODO: Need to "deduplicate" source versions if two have identical values except for the effective_date_column
deduplicated_source_table = 'public.deduplicated_source'
self.database.drop_table_if_exists(deduplicated_source_table, connection_name=connection_name, auto_close=False, transaction=False)
deduplicate_source_sql = f"""
CREATE TABLE {deduplicated_source_table} AS
WITH source_next_prev AS (
SELECT
{Table._sql_indent_join([col for col in matching_columns], 23, suffix=',')},
{kwargs['source_begin_date_column']},
LAG({kwargs['source_begin_date_column']}) OVER (
PARTITION BY
{Table._sql_indent_join([nk_col for nk_col in self.natural_key], 32, suffix=',')}
ORDER BY {kwargs['source_begin_date_column']}
) as prev_effective_date
/*
LEAD({kwargs['source_begin_date_column']}) OVER (
PARTITION BY
{Table._sql_indent_join([nk_col for nk_col in self.natural_key], 32, suffix=',')}
ORDER BY {kwargs['source_begin_date_column']}
) as next_effective_date
*/
FROM {source_table.qualified_table_name} s
)
, changes_only AS (
SELECT r.*
FROM source_next_prev r
LEFT OUTER JOIN
source_next_prev prev
ON {Table._sql_indent_join([f"prev.{nk_col} = r.{nk_col}" for nk_col in self.natural_key], 32, prefix='AND ')}
AND prev.{kwargs['source_begin_date_column']} = r.prev_effective_date
WHERE
{Table._sql_indent_join([Table._sql_column_not_equals(m_col, 'prev', 'r') for m_col in non_nk_columns], 20, prefix='OR ')}
)
SELECT
changes_only.*,
/* Note we can't use the next_effective_date since we might have skipped one with no changes */
COALESCE(
LEAD({kwargs['source_begin_date_column']}) OVER (
PARTITION BY
{Table._sql_indent_join([nk_col for nk_col in self.natural_key], 32, suffix=',')}
ORDER BY {kwargs['source_begin_date_column']}
),
{self._sql_date_literal(self.default_end_date)}
) as end_effective_date
FROM changes_only
"""
self.log.debug('-' * 80)
self.log.debug('deduplicate_source_sql')
self.log.debug('-' * 80)
sql_timer = Timer()
self.log.debug(deduplicate_source_sql)
self.execute(deduplicate_source_sql, connection_name=connection_name)
self.log.debug(f"Execution time ={sql_timer.seconds_elapsed_formatted}")
# Insert rows for new Natural Keys
self._sql_insert_new(
source_table=deduplicated_source_table,
matching_columns=matching_columns,
effective_date_column=kwargs['source_begin_date_column'],
connection_name=connection_name,
)
# unified_dates_table = '#unified_dates_table'
unified_dates_table = 'public.derek_unified_dates_table'
self.database.drop_table_if_exists(unified_dates_table, connection_name=connection_name, auto_close=False, transaction=False)
unified_dates_sql = f"""
CREATE TABLE {unified_dates_table} AS
WITH pass1 as (
SELECT DISTINCT
{Table._sql_indent_join([f"s.{nk_col}" for nk_col in self.natural_key], 16, suffix=',')},
s.{kwargs['source_begin_date_column']} as effective_date
FROM {deduplicated_source_table} s
UNION
SELECT
{Table._sql_indent_join([f"e.{nk_col}" for nk_col in self.natural_key], 16, suffix=',')},
e.{self.begin_date_column} as effective_date
FROM {self.qualified_table_name} e
)
SELECT
pass1.*,
COALESCE(
LEAD({self._sql_add_timedelta('pass1.effective_date', self._end_date_timedelta)}) OVER (
PARTITION BY {Table._sql_indent_join([nk_col for nk_col in self.natural_key], 16, suffix=',')}
ORDER BY effective_date
),
{self._sql_date_literal(self.default_end_date)}
) as new_end_effective_date
FROM pass1
"""
self.log.debug('-' * 80)
self.log.debug('unified_dates_sql')
self.log.debug('-' * 80)
sql_timer = Timer()
self.log.debug(unified_dates_sql)
self.execute(unified_dates_sql, connection_name=connection_name)
self.log.debug(f"Execution time ={sql_timer.seconds_elapsed_formatted}")
# TODO: Need DB specific CREATE TABLE AS or SELECT ... INTO
# derek_updated_row_nk_table = '#updated_row_nk'
updated_row_nk_table = 'public.derek_updated_row_nk'
self.database.drop_table_if_exists(updated_row_nk_table, connection_name=connection_name, auto_close=False, transaction=False)
find_updates_sql = f"""
CREATE TABLE {updated_row_nk_table} AS
SELECT
{Table._sql_indent_join([f"e.{nk_col}" for nk_col in self.natural_key], 16, suffix=',')},
s.{kwargs['source_begin_date_column']} as source_effective_date,
e.{self.begin_date_column} as target_begin_date,
e.{self.end_date_column} as target_end_date,
d.new_end_effective_date,
{Table._sql_indent_join([f"CASE WHEN {Table._sql_column_not_equals(m_col, 'e', 's')} THEN 1 ELSE 0 END as chg_in_{m_col}" for m_col in non_nk_columns], 16, suffix=',')}
FROM {unified_dates_table} d
INNER JOIN
{self.qualified_table_name} e
ON {Table._sql_indent_join([f"d.{nk_col} = e.{nk_col}" for nk_col in self.natural_key], 19, prefix='AND ')}
AND d.effective_date BETWEEN e.{self.begin_date_column} AND e.{self.end_date_column}
INNER JOIN
{deduplicated_source_table} s
ON {Table._sql_indent_join([f"e.{nk_col} = s.{nk_col}" for nk_col in self.natural_key], 19, prefix='AND ')}
AND d.effective_date BETWEEN s.{kwargs['source_begin_date_column']} AND s.{kwargs['source_end_date_column']}
WHERE (
{Table._sql_indent_join([Table._sql_column_not_equals(m_col, 'e', 's') for m_col in non_nk_columns], 20, prefix='OR ')}
OR e.{self.delete_flag} != '{self.delete_flag_no}'
)
"""
self.log.debug('-' * 80)
self.log.debug('find_updates_sql')
self.log.debug('-' * 80)
sql_timer = Timer()
self.log.debug(find_updates_sql)
results = self.execute(find_updates_sql, connection_name=connection_name)
self.log.debug(f"Execution time ={sql_timer.seconds_elapsed_formatted}")
# Note: This SQL will only work if generated AFTER the insert_new_sql has been run on the DB
# otherwise the primary_key values will conflict.
# TODO type 1 srgt
insert_updates_sql = f"""
INSERT INTO {self.qualified_table_name} (
{primary_key_name},
{self.delete_flag},
{self.begin_date_column},
{self.end_date_column},
{self.last_update_date},
{Table._sql_indent_join(matching_columns, 16, suffix=',')}
)
WITH max_srgt AS (SELECT coalesce(max({primary_key_name}),0) as max_srgt FROM {self.qualified_table_name})
SELECT
max_srgt.max_srgt + ROW_NUMBER() OVER (order by 1) as {primary_key_name},
'{self.delete_flag_no}' as {self.delete_flag},
u.source_effective_date as {self.begin_date_column},
u.new_end_effective_date as {self.end_date_column},
'{now_str}' as {self.last_update_date},
{Table._sql_indent_join([f's.{col}' for col in matching_columns], 16, suffix=',')}
FROM max_srgt
CROSS JOIN
{deduplicated_source_table} s
INNER JOIN
{updated_row_nk_table} u
ON {Table._sql_indent_join([f"u.{nk_col} = s.{nk_col}" for nk_col in self.natural_key], 18, prefix='AND')}
AND s.{kwargs['source_begin_date_column']} = u.source_effective_date
/* Do not insert entries for when dates match exactly, those must be updated in place */
WHERE u.source_effective_date != u.target_begin_date
"""
self.log.debug('-' * 80)
self.log.debug('insert_updates_sql')
self.log.debug('-' * 80)
self.log.debug(insert_updates_sql)
sql_timer = Timer()
results = self.execute(insert_updates_sql, connection_name=connection_name)
self.log.debug(f"Impacted rows = {results.rowcount:,} (meaningful count? {results.supports_sane_rowcount()})")
self.log.debug(f"Execution time ={sql_timer.seconds_elapsed_formatted}")
joins = [(nk_col, ("u", nk_col)) for nk_col in self.natural_key]
joins.append((self.begin_date_column, ('u', 'target_begin_date')))
self._sql_update_from(
update_name='update_retire',
source_sql=f"{updated_row_nk_table} u",
list_of_joins=joins,
# Do not insert entries for when dates match exactly, those must be updated in place.
extra_where="u.source_effective_date != u.target_begin_date",
target_alias_in_source='e',
list_of_sets=[
(
self.end_date_column,
f"u.new_end_effective_date"
),
]
)
self._sql_update_from(
update_name='update_in_place',
source_sql=f"""
{updated_row_nk_table} u
INNER JOIN
{deduplicated_source_table} s
ON {Table._sql_indent_join([f"s.{nk_col} = u.{nk_col}" for nk_col in self.natural_key], 18, prefix='AND ')}
AND s.{kwargs['source_begin_date_column']} = u.source_effective_date
""",
list_of_joins=joins,
target_alias_in_source='e',
list_of_sets=[
(m_col,
f"s.{m_col}")
for m_col in non_nk_columns
]
)
self.log.debug('=' * 80)
self.commit()
self.close_connection(connection_name=connection_name)
upsert_db_excl_stats.timer.stop()