"""
Created on Feb 26, 2015
@author: Derek Wood
"""
# https://www.python.org/dev/peps/pep-0563/
from __future__ import annotations
import functools
import logging
import math
import sys
import traceback
from typing import *
import warnings
from datetime import datetime
import psutil
# noinspection PyUnresolvedReferences
from BTrees.OOBTree import OOBTree
from sqlalchemy.sql import Selectable
from sqlalchemy.sql.expression import bindparam
from bi_etl.components.row.row import Row
from bi_etl.components.row.row_iteration_header import RowIterationHeader
from bi_etl.components.row.row_status import RowStatus
from bi_etl.config.bi_etl_config_base import BI_ETL_Config_Base
from bi_etl.conversions import ensure_datetime
from bi_etl.exceptions import AfterExisting
from bi_etl.exceptions import BeforeAllExisting
from bi_etl.exceptions import NoResultFound
from bi_etl.memory_size import get_size_gc
from bi_etl.statistics import Statistics
if TYPE_CHECKING:
from bi_etl.components.etlcomponent import ETLComponent
__all__ = ['Lookup']
[docs]
class Lookup(Iterable):
COLLECTION_INDEX = datetime(year=1900, month=1, day=1, hour=0, minute=0, second=0)
# OOBTree.BTree is now used instead of SortedDict
# It ran tests in 80% of the time of the SortedDict equivalent
# http://pythonhosted.org/BTrees/index.html
VERSION_COLLECTION_TYPE = OOBTree
ROW_TYPES = Union[Row, Sequence]
DB_LOOKUP_WARNING = 1000
[docs]
def __init__(self,
lookup_name: str,
lookup_keys: list,
parent_component: ETLComponent,
config: BI_ETL_Config_Base = None,
use_value_cache: bool = True,
**kwargs):
self.lookup_name = lookup_name
self.lookup_keys = lookup_keys
self._lookup_keys_set = None
if config is not None:
self.config = config
else:
self.config = parent_component.task.config
if self.config is None:
raise ValueError(f'{self} caller needs to provide config or parent_component that has a config ')
self._cache = None
self.parent_component = parent_component
self.stats = parent_component.get_unique_stats_entry(
stats_id=f'{self.__class__.__name__} {self.lookup_name}'
)
self.log = logging.getLogger(f"{self.__class__.__module__}.{self.__class__.__name__}")
self._remote_lookup_stmt = None
self._row_size = None
self.our_process = psutil.Process()
self._first_row_process_size = None
self._row_count_to_get_estimate_row_size = 1000
self._done_get_estimate_row_size = False
self.cache_enabled = True
self.use_value_cache = use_value_cache
self._value_cache_str = dict()
self._value_cache_str_usage = dict()
self._value_cache_datetime = dict()
self._value_cache_datetime_usage = dict()
self.end_date = None
self._hashable_key_type = tuple
self._fallback_to_db_count = 0
def _get_version(
self,
versions_collection: Lookup.VERSION_COLLECTION_TYPE[datetime, Row],
effective_date: datetime
):
# SortedDict version
# first_effective_index = versions_collection.bisect_right(effective_date) - 1
# if first_effective_index <= -1:
# raise BeforeAllExisting(versions_collection[versions_collection.iloc[0]], effective_date)
try:
first_effective_index = versions_collection.maxKey(effective_date)
except ValueError:
raise BeforeAllExisting(versions_collection[versions_collection.minKey()], effective_date)
first_effective_row = versions_collection[first_effective_index]
# Check that record doesn't end before our date
if ensure_datetime(first_effective_row[self.end_date]) < effective_date:
raise AfterExisting(first_effective_row, effective_date)
return first_effective_row
def __repr__(self):
return f"{self.__class__.__name__}(lookup_name={self.lookup_name}, lookup_keys={self.lookup_keys})"
@property
def lookup_keys_set(self):
if self._lookup_keys_set is None:
self._lookup_keys_set = set(self.lookup_keys)
return self._lookup_keys_set
def _set_log_level(self, log_level):
self.log.setLevel(log_level)
[docs]
def estimated_row_size(self):
return self._row_size
[docs]
def has_done_get_estimate_row_size(self):
return self._done_get_estimate_row_size
[docs]
def get_memory_size(self) -> int:
if self._row_size is not None:
return self._row_size * len(self)
else:
if len(self) > 0:
msg = (
f'{self.lookup_name} Row size was 0 on call to '
f'get_memory_size with non zero row count ({len(self):,})'
)
self.log.warning(msg)
return 0
[docs]
def get_disk_size(self) -> int:
return 0
[docs]
def add_size_to_stats(self) -> None:
self.stats['Final Row Count'] = len(self)
self.stats['Memory Size'] = self.get_memory_size()
self.stats['Disk Size'] = self.get_disk_size()
[docs]
def get_list_of_lookup_column_values(self, row: ROW_TYPES) -> list:
if isinstance(row, tuple):
return list(row)
elif isinstance(row, list):
return row
else:
lookup_values = [row[k] for k in self.lookup_keys]
return lookup_values
[docs]
@staticmethod
def rstrip_key_value(val: object) -> object:
"""
Since most, if not all, DBs consider two strings that only differ in trailing blanks to
be equal, we need to rstrip any string values so that the lookup does the same.
:param val:
:return:
"""
if isinstance(val, str):
return val.rstrip()
else:
return val
def _hashable_key_stripped_non_str(self, row) -> Sequence:
return self._hashable_key_type(
tuple(
[Lookup.rstrip_key_value(val) for val in row]
)
)
def _hashable_key_stripped(self, row) -> Sequence:
if isinstance(row, str):
return row
else:
return self._hashable_key_stripped_non_str(row)
[docs]
def get_hashable_combined_key(self, row: ROW_TYPES) -> Sequence:
if isinstance(row, self._hashable_key_type):
if not isinstance(row, str):
return self._hashable_key_stripped_non_str(row)
else:
return row
else:
return self._hashable_key_stripped_non_str(self.get_list_of_lookup_column_values(row))
[docs]
def clear_cache(self) -> None:
"""
Removes cache and resets to un-cached state
"""
if self._cache is not None:
del self._cache
self._cache = None
def __len__(self) -> int:
if self._cache is not None:
return len(self._cache)
else:
return 0
def __contains__(self, item):
return self.has_row(item)
[docs]
def init_cache(self) -> None:
"""
Initializes the cache as empty.
"""
if self.cache_enabled is None:
self.cache_enabled = True
if self.cache_enabled:
self._cache = dict()
def _get_first_row_size(self, row) -> int:
# get_size_gc is slow but shouldn't be too bad twice (once here and once below in _get_estimate_row_size)
self._row_size = get_size_gc(row)
# self.log.debug(f'First row memory size {self._row_size:,} bytes (includes overhead)')
return self._row_size
[docs]
def check_estimate_row_size(self, force_now=False):
if force_now or not self._done_get_estimate_row_size:
row_cnt = len(self)
if force_now or row_cnt >= self._row_count_to_get_estimate_row_size:
# Tag the estimate as done so we don't keep doing it
self._done_get_estimate_row_size = True
# get_size_gc is slow but shouldn't be too bad twice (once here and once above in _get_first_row_size)
total_cache_size = get_size_gc(self._cache)
if row_cnt > 0:
new_row_size = math.ceil(total_cache_size / row_cnt)
else:
new_row_size = total_cache_size
self.log.debug(
f'{self.lookup_name} Row memory size now estimated at {new_row_size:,} '
f'bytes per row using cache of {row_cnt:,} rows'
)
self._row_size = new_row_size
[docs]
def cache_row(
self,
row: Row,
allow_update: bool = True,
allow_insert: bool = True,
):
"""
Adds the given row to the cache for this lookup.
Parameters
----------
row: Row
The row to cache
allow_update: boolean
Allow this method to update an existing row in the cache.
allow_insert: boolean
Allow this method to insert a new row into the cache
Raises
------
ValueError
If allow_update is False and an already existing row (lookup key) is passed in.
"""
if self.cache_enabled:
assert isinstance(row, Row), f"cache_row requires Row and not {type(row)}"
if self.use_value_cache:
self._update_with_value_cache(row)
lk_tuple = self.get_hashable_combined_key(row)
if self._cache is None:
self.init_cache()
if not allow_update:
if lk_tuple in self._cache:
raise ValueError(
f'Key value {lk_tuple} already in cache and allow_update was False.'
f' Possible error with the keys defined for this lookup {self.lookup_name} {self.lookup_keys}.'
)
if not allow_insert:
if lk_tuple not in self._cache:
raise ValueError(
f'Key value {lk_tuple} not already in cache and allow_insert was False.'
f' Error with {self.lookup_name} keys {self.lookup_keys}.'
)
self._cache[lk_tuple] = row
# Capture memory usage snapshots
if self._row_size is None:
self._get_first_row_size(row)
else:
self.check_estimate_row_size()
[docs]
def cache_set(
self,
lk_tuple: tuple,
version_collection: OOBTree[datetime, Row],
allow_update: bool = True
):
"""
Adds the given set of rows to the cache for this lookup.
Parameters
----------
lk_tuple:
The key tuple to store the rows under
version_collection:
The set of rows to cache
allow_update: boolean
Allow this method to update an existing row in the cache.
Raises
------
ValueError
If allow_update is False and an already existing row (lookup key) is passed in.
"""
if self.cache_enabled:
assert isinstance(version_collection, Lookup.VERSION_COLLECTION_TYPE), \
f"cache_row requires {Lookup.VERSION_COLLECTION_TYPE} and not {type(version_collection)}"
if self.use_value_cache:
for row in version_collection.values():
self._update_with_value_cache(row)
if self._cache is None:
self.init_cache()
if not allow_update:
if lk_tuple in self._cache:
raise ValueError(
f'Key value {lk_tuple} already in cache and allow_update was False. '
f'Possible error with the keys defined for this lookup {self.lookup_name} {self.lookup_keys}.'
)
self._cache[lk_tuple] = version_collection
# Capture memory usage snapshots
if self._row_size is None:
self._get_first_row_size(version_collection)
else:
self.check_estimate_row_size()
[docs]
def commit(self):
"""
Placeholder for other implementations that might need it
"""
pass
[docs]
def uncache_row(self, row: ROW_TYPES):
if self._cache is not None:
try:
lk_tuple = self.get_hashable_combined_key(row)
except KeyError as e:
# This lookup uses columns not in the row provided.
raise ValueError(f"uncache_row called on {self} with insufficient values {row}. {e}")
try:
del self._cache[lk_tuple]
except KeyError:
pass
[docs]
def uncache_set(self, row: ROW_TYPES):
self.uncache_row(row)
def __iter__(self) -> Iterator[Row]:
"""
Iterates over rows in the lookup cache. Returns clones of rows in case they are modified.
The rows will come out in any order.
"""
if self._cache is not None:
for row in list(self._cache.values()):
# Do not return the actual instance of the row since the caller might modify it to no longer be
# compatible with the parent table
yield row.clone()
def _iter_raw(self) -> Iterable[Row]:
"""
Iterates over rows in the lookup cache. Returns actual cached rows. Be careful! Direct modifications to
the cached rows could break the later use of the cache.
The rows will come out in any order.
:return:
"""
if self._cache is not None:
for row in self._cache.values():
yield row
[docs]
def find_where(
self,
key_names: Sequence,
key_values_dict: Mapping,
limit: int = None
):
"""
Scan all cached rows (expensive) to find list of rows that match criteria.
"""
results = list()
for row in self:
matches = True
for key in key_names:
if row[key] != key_values_dict[key]:
matches = False
break
if matches:
results.append(row)
if limit is not None and len(results) >= limit:
break
return results
[docs]
def uncache_where(
self,
key_names: Sequence,
key_values_dict: Mapping):
"""
Scan all cached rows (expensive) to find rows to remove.
"""
deletes = self.find_where(key_names, key_values_dict)
for row in deletes:
self.uncache_row(row)
[docs]
def get_versions_collection(
self,
row: ROW_TYPES
) -> MutableMapping[datetime, Row]:
"""
This method exists for compatibility with range caches
Parameters
----------
row
The row with keys to search row
Returns
-------
A MutableMapping of rows
"""
if not self.cache_enabled:
raise ValueError(f"Lookup {self.lookup_name} cache not enabled")
if self._cache is None:
self.init_cache()
lk_tuple = self.get_hashable_combined_key(row)
try:
return Lookup.VERSION_COLLECTION_TYPE({Lookup.COLLECTION_INDEX: self._cache[lk_tuple]})
except KeyError as e:
raise NoResultFound(e)
[docs]
def find_in_cache(self, row: ROW_TYPES, **kwargs) -> Row:
"""
Find a matching row in the lookup based on the lookup index (keys)
"""
assert len(kwargs) == 0, f"lookup.find_in_cache got unexpected args {kwargs}"
versions_collection = self.get_versions_collection(row)
return versions_collection[Lookup.COLLECTION_INDEX]
[docs]
def find_matches_in_cache(self, row: ROW_TYPES, **kwargs) -> Sequence[Row]:
return list(self.find_in_cache(row, **kwargs))
[docs]
def has_row(self, row: ROW_TYPES) -> bool:
"""
Does the row exist in the cache (for any date if it's a date range cache)
Parameters
----------
row
Returns
-------
"""
try:
self.get_versions_collection(row)
except (NoResultFound, BeforeAllExisting, AfterExisting):
return False
return True
def _add_remote_stmt_where_clause(self, stmt: Selectable) -> Selectable:
"""
Only works if parent_component is based on bi_etl.components.readonlytable
"""
col_num = 1
for key_col in self.lookup_keys:
# This statement only works if parent_component is based on bi_etl.components.readonlytable
# noinspection PyUnresolvedReferences
stmt = stmt.where(self.parent_component.get_column(key_col) == bindparam(f'k{col_num}'))
col_num += 1
return stmt
def _get_remote_stmt_where_values(
self,
row: ROW_TYPES,
effective_date: datetime = None,
) -> dict:
values_dict = dict()
col_num = 1
values_list = self.get_list_of_lookup_column_values(row)
for key_val in values_list:
values_dict[f'k{col_num}'] = key_val
col_num += 1
return values_dict
[docs]
def find_in_remote_table(
self,
row: ROW_TYPES,
**kwargs
) -> Row:
"""
Find a matching row in the lookup based on the lookup index (keys)
Only works if parent_component is based on bi_etl.components.readonlytable
Parameters
----------
row
The row with keys to search row
Returns
-------
A row
"""
assert hasattr(self.parent_component, 'execute'), \
f'ReadOnlyTable or class with execute method needed ' \
f'for DB lookup {type(self.parent_component)} will not work'
self.stats.timer.start()
if self._remote_lookup_stmt is None:
# noinspection PyUnresolvedReferences
stmt = self.parent_component.select()
stmt = self._add_remote_stmt_where_clause(stmt)
self._remote_lookup_stmt = stmt.compile()
values_dict = self._get_remote_stmt_where_values(row)
# noinspection PyUnresolvedReferences
rows = list(self.parent_component.execute(self._remote_lookup_stmt, values_dict))
self.stats.timer.stop()
if len(rows) == 0:
raise NoResultFound()
elif len(rows) == 1:
# noinspection PyUnresolvedReferences
return self.parent_component.Row(data=rows[0])
else:
raise RuntimeError(
f"{self.lookup_name} find_in_remote_table {row} matched multiple records {rows}"
)
[docs]
def find_versions_list_in_remote_table(self, row: ROW_TYPES) -> list:
return [self.find_in_remote_table(row)]
[docs]
def find_versions_list(
self,
row: ROW_TYPES,
fallback_to_db: bool = True,
maintain_cache: bool = True,
stats: Statistics = None,
) -> list:
"""
Parameters
----------
row:
row or tuple to find
fallback_to_db:
Use db to search if not found in cached copy
maintain_cache:
Add DB lookup rows to the cached copy?
stats:
Statistics to maintain
Returns
-------
A MutableMapping of rows
"""
if self.cache_enabled:
try:
rows = list(self.get_versions_collection(row).values())
if stats is not None:
stats['Found in cache'] += 1
stats.timer.stop()
return rows
except NoResultFound as e:
if stats is not None:
stats['Not in cache'] += 1
if not fallback_to_db:
# Don't pass onto SQL if the lookup cache has initialized but the value isn't there
if stats is not None:
stats.timer.stop()
raise e
# Otherwise we'll continue to the DB query below
except (KeyError, ValueError):
if stats is not None:
stats['Cache not setup'] += 1
# Lookup not cached. Allow database lookup to proceed, but give warning since we thought it was cached
warnings.warn(
f"WARNING: {self} caching is enabled, but find_in_cache returned error {traceback.format_exc()}"
)
except AfterExisting as e:
stats['After all in cache'] += 1
if stats is not None:
stats['Not in cache'] += 1
if not fallback_to_db:
# Don't pass onto SQL if the lookup cache has initialized but the value isn't there
stats.timer.stop()
# return the last existing row since that what we'll update
return e.prior_row
# Otherwise we'll continue to the DB query below
# BeforeExisting exceptions are returned to the caller
# Do a lookup on the database
try:
if stats is not None:
stats['DB lookup performed'] += 1
rows = self.find_versions_list_in_remote_table(row)
for row in rows:
row.status = RowStatus.existing
if maintain_cache and self.cache_enabled:
self.cache_row(row, allow_update=False)
if stats is not None:
stats['DB lookup found row'] += 1
finally:
if stats is not None:
stats.timer.stop()
return rows
[docs]
def find(self,
row: ROW_TYPES,
fallback_to_db: bool = True,
maintain_cache: bool = True,
stats: Statistics = None,
**kwargs
) -> Row:
if self.cache_enabled:
try:
row = self.find_in_cache(row, **kwargs)
if stats is not None:
stats['Found in cache'] += 1
stats.timer.stop()
return row
except NoResultFound as e:
if stats is not None:
stats['Not in cache'] += 1
if fallback_to_db:
self._fallback_to_db_count += 1
if (self._fallback_to_db_count % self.DB_LOOKUP_WARNING) == 0:
self.log.warning(
f"{self} has done {self._fallback_to_db_count} lookups on the DB due to fallback_to_db"
)
else:
# Don't pass onto SQL if the lookup cache has initialized but the value isn't there
if stats is not None:
stats.timer.stop()
raise e
# Otherwise we'll continue to the DB query below
except (KeyError, ValueError):
if stats is not None:
stats['Cache not setup'] += 1
# Lookup not cached. Allow database lookup to proceed, but give warning since we thought it was cached
warnings.warn(
f"WARNING: {self} caching is enabled, but find_in_cache returned error {traceback.format_exc()}"
)
except AfterExisting as e:
stats['After all in cache'] += 1
if stats is not None:
stats['Not in cache'] += 1
if fallback_to_db:
if (stats['Not in cache'] % self.DB_LOOKUP_WARNING) == 0:
self.log.warning(
f"{self} has done {stats['Not in cache']} lookups on the DB due to fallback_to_db")
else:
# Don't pass onto SQL if the lookup cache has initialized but the value isn't there
stats.timer.stop()
# return the last existing row since that what we'll update
return e.prior_row
# Otherwise we'll continue to the DB query below
# BeforeExisting exceptions are returned to the caller
# Do a lookup on the database
try:
if stats is not None:
stats['DB lookup performed'] += 1
row = self.find_in_remote_table(row, **kwargs)
row.status = RowStatus.existing
if maintain_cache and self.cache_enabled:
self.cache_row(row, allow_update=False)
if stats is not None:
stats['DB lookup found row'] += 1
finally:
if stats is not None:
stats.timer.stop()
return row
def _update_with_value_cache(self, row: Row):
"""
Update a row with str and datetime de-duplication references
:param row:
:return:
"""
for column_number, column_value in enumerate(row.values()):
if isinstance(column_value, str):
if column_value in self._value_cache_str:
# Replace with value cache reference
row.set_by_zposition_unsafe(column_number, self._value_cache_str[column_value])
self._value_cache_str_usage[column_value] += 1
else:
# Add to value cache
self._value_cache_str[column_value] = column_value
self._value_cache_str_usage[column_value] = 0
elif isinstance(column_value, datetime):
if column_value in self._value_cache_datetime:
# Replace with value cache reference
row.set_by_zposition_unsafe(column_number, self._value_cache_datetime[column_value])
self._value_cache_datetime_usage[column_value] += 1
else:
# Don't bother adding microsecond level datetime values to the dict
# It will perform better if smaller
if column_value.microsecond in {0, 999990, 999999}:
# Add to value cache
self._value_cache_datetime[column_value] = column_value
self._value_cache_datetime_usage[column_value] = 0
@staticmethod
def _str_report_on_value_cache_usage(lookup_name: str, type_name: str, value_cache: dict) -> str:
key_count = len(value_cache)
if key_count > 0:
max_usage = max(value_cache.values())
total_usage = sum(value_cache.values())
total_bytes_saved = sum([sys.getsizeof(key) * value for key, value in value_cache.items()])
return f'{lookup_name} {type_name} value cache stored {key_count:,} values. ' \
f'Max key usage = {max_usage:,}. ' \
f'Avg key usage = {100.0 * total_usage / key_count :.2f}%. total_bytes_saved = {total_bytes_saved:,}'
else:
return f'{lookup_name} {type_name} value cache stored no values in cache'
[docs]
def report_on_value_cache_effectiveness(self, lookup_name: str = None):
if lookup_name is None:
lookup_name = self.lookup_name
self.log.info(self._str_report_on_value_cache_usage(lookup_name, 'str', self._value_cache_str_usage))
self.log.info(self._str_report_on_value_cache_usage(lookup_name, 'datetime', self._value_cache_datetime_usage))