Source code for bi_etl.lookups.autodisk_lookup

# -*- coding: utf-8 -*-
"""
Created on Jan 5, 2016

@author: Derek Wood
"""
# https://www.python.org/dev/peps/pep-0563/
from __future__ import annotations

import gc
import typing
from datetime import datetime

import psutil

from bi_etl.components.row.row import Row
from bi_etl.config.bi_etl_config_base import BI_ETL_Config_Base
from bi_etl.exceptions import NoResultFound
from bi_etl.lookups.disk_lookup import DiskLookup
from bi_etl.lookups.lookup import Lookup
from bi_etl.timer import Timer

if typing.TYPE_CHECKING:
    from bi_etl.components.etlcomponent import ETLComponent

__all__ = ['AutoDiskLookup']


[docs] class AutoDiskLookup(Lookup): """ Automatic memory / disk lookup cache. This version divides the cache into N chunks (default is 10). If RAM usage gets beyond limits, it starts moving chunks to disk. Once a chunk is on disk, it stays there. TODO: For use cases where the lookup will be used in a mostly sequential fashion, it would be useful to have a version that uses ranges instead of a hash function. Then when find_in_cache is called on a disk segment, we could swap a different segment out and bring that segment in. That's a lot more complicated. We'd also want to maintain a last used date for each segment so that if we add rows to the cache, we can choose the best segment to swap to disk. Also worth considering is that if we bring a segment in from disk, it would best to keep the disk version. At that point any additions to that segment would need to go to both places. """
[docs] def __init__(self, lookup_name: str, lookup_keys: list, parent_component: ETLComponent, config: BI_ETL_Config_Base = None, use_value_cache: bool = True, path=None, max_percent_ram_used=None, max_process_ram_usage_mb=None, init_parent: bool = True, **kwargs ): if init_parent: super().__init__( lookup_name=lookup_name, lookup_keys=lookup_keys, parent_component=parent_component, use_value_cache=use_value_cache, config=config, ) self._cache = None self.rows_cached = 0 # First try and use passed value self.max_percent_ram_used = max_percent_ram_used # If not passed in config if config is not None: self.config = config else: self.config = parent_component.task.config if self.max_percent_ram_used is None: if self.config is not None: self.max_percent_ram_used = self.config.bi_etl.lookup_disk_swap_at_percent_ram_used # Finally default value if self.max_percent_ram_used is None: # Needs to be less than the default in bi_etl.components.table.Table.fill_cache self.max_percent_ram_used = 70 self.max_process_ram_usage_mb = max_process_ram_usage_mb # If not passed in config if self.max_process_ram_usage_mb is None: if self.config is not None: self.max_process_ram_usage_mb = self.config.bi_etl.lookup_disk_swap_at_process_ram_usage_mb else: # Perhaps instead of re-specifying a no-config default here we should build an BI_ETL_Config_Section self.max_process_ram_usage_mb = 2.5 * 1024**3 self._set_path(path) self.ram_check_row_interval = 5000 self.last_ram_check_at_row = 0 self.disk_cache = None self.MemoryLookupClass = Lookup self.DiskLookupClass = DiskLookup if kwargs is None: kwargs = dict() kwargs['use_value_cache'] = use_value_cache self.lookup_class_args = kwargs
def _set_path(self, path): if path is not None: self.path = path else: if self.config is not None: self.path = self.config.get('Cache', 'path', fallback=DiskLookup.DEFAULT_PATH) else: self.path = DiskLookup.DEFAULT_PATH def _init_mem_cache(self): self._cache = self.MemoryLookupClass(lookup_name=self.lookup_name, lookup_keys=self.lookup_keys, parent_component=self.parent_component, config=self.config, **self.lookup_class_args ) self._cache.init_cache()
[docs] def init_cache(self): if self.cache_enabled is None: self.cache_enabled = True if self.cache_enabled: self._init_mem_cache()
[docs] def get_memory_size(self): ram_size = 0 if self._cache is not None: ram_size += self._cache.get_memory_size() if self.disk_cache is not None: ram_size += self.disk_cache.get_memory_size() return ram_size
[docs] def get_disk_size(self): disk_size = 0 if self._cache is not None: disk_size += self._cache.get_disk_size() if self.disk_cache is not None: disk_size += self.disk_cache.get_disk_size() return disk_size
[docs] def clear_cache(self): if self._cache is not None: self._cache.clear_cache() del self._cache if self.disk_cache is not None: self.disk_cache.clear_cache() del self.disk_cache self._cache = None self.disk_cache = None
def __len__(self): total_len = 0 if self._cache is not None: total_len += len(self._cache) if self.disk_cache is not None: total_len += len(self.disk_cache) return total_len
[docs] def init_disk_cache(self): if self.disk_cache is None: self.disk_cache = self.DiskLookupClass(lookup_name=self.lookup_name, lookup_keys=self.lookup_keys, parent_component=self.parent_component, config=self.config, path=self.path, **self.lookup_class_args ) self.disk_cache.init_cache() # Do not warn about protected access to _get_estimate_row_size # pylint: disable=protected-access self._cache.check_estimate_row_size(force_now=True) self.disk_cache._row_size = self._cache.estimated_row_size() self.disk_cache._done_get_estimate_row_size = self._cache.has_done_get_estimate_row_size()
[docs] def flush_to_disk(self): if self._cache is not None and len(self._cache) > 0: rows_before = len(self) self.init_disk_cache() timer = Timer() self.log.info(f'Flushing {len(self._cache):,} rows to disk.') gc.collect() before_move_mb = self.our_process.memory_info().rss/(1024**2) for row in self._cache: self.disk_cache.cache_row(row) self._cache.clear_cache() del self._cache self._cache = None self._init_mem_cache() if len(self) != rows_before: raise AssertionError( f"Row count changed during flush to disk. " f"Rows before flush = {rows_before}, rows after flush = {len(self)}" ) self.log.info(f'Flushing rows took {timer.seconds_elapsed_formatted} seconds') gc.collect() after_move_mb = self.our_process.memory_info().rss/(1024**2) self.log.info( f'Flushing rows freed {before_move_mb - after_move_mb:,.3f} MB from process ' f'before {before_move_mb:,.3f} after {after_move_mb:,.3f})' )
[docs] def memory_limit_reached(self) -> bool: if self.max_percent_ram_used is not None: if psutil.virtual_memory().percent > self.max_percent_ram_used: self.log.warning( f"{self.lookup_name} system memory limit reached " f"{psutil.virtual_memory().percent} > {self.max_percent_ram_used}% " f"with {self.rows_cached:,} rows of data" ) return True process_mb = self.our_process.memory_info().rss / (1024 ** 2) if self.max_process_ram_usage_mb is not None: if process_mb > self.max_process_ram_usage_mb: self.log.warning( f"{self.lookup_name} process memory limit reached." f" {process_mb:,} > {self.max_process_ram_usage_mb:,} KB " f"with {self.rows_cached:,} rows of data" ) return True return False
[docs] def cache_row( self, row: Row, allow_update: bool = True, allow_insert: bool = True, ): """ Adds the given row to the cache for this lookup. Parameters ---------- row: Row The row to cache allow_update: boolean Allow this method to update an existing row in the cache. allow_insert: boolean Allow this method to insert a new row into the cache Raises ------ ValueError If allow_update is False and an already existing row (lookup key) is passed in. """ if self.cache_enabled: if self._cache is None: self.init_cache() # Note: This will double count updated rows. # We fix it below at each ram_check_row_interval, but that calc is slower. self.rows_cached += 1 # Note: The memory check needs to be here and not in Table.fill_cache since rows can be added to cache # during the load and not just by fill_cache. # Python memory is hard to free... so read first rows into RAM and then use disk for all rows after # Every X rows check memory limits if ( self.disk_cache is None and (self.rows_cached - self.last_ram_check_at_row) >= self.ram_check_row_interval ): # Double check our cache size. Calls to cache_row might have overwritten existing rows self.rows_cached = len(self) self.last_ram_check_at_row = self.rows_cached if self.memory_limit_reached(): self.init_disk_cache() # Now cache the row if self.disk_cache is None: self._cache.cache_row(row, allow_update=allow_update) else: # We need to make sure each row is in only once place lk_tuple = self.get_hashable_combined_key(row) if lk_tuple in self._cache: # Move existing key date ranges to disk versions_collection = self.get_versions_collection(row) self._cache.uncache_set(lk_tuple) # Change collection type if needed if not isinstance(versions_collection, DiskLookup.VERSION_COLLECTION_TYPE): versions_collection = DiskLookup.VERSION_COLLECTION_TYPE(versions_collection) disk_lk_tuple = self.disk_cache.get_hashable_combined_key(row) self.disk_cache.cache_set(disk_lk_tuple, versions_collection) # Ensure our new row is there self.disk_cache.cache_row(row, allow_update=True) else: # Cache the row to disk self.disk_cache.cache_row(row, allow_update=allow_update)
[docs] def uncache_row(self, row: Lookup.ROW_TYPES): if self._cache is not None: self._cache.uncache_row(row) if self.disk_cache is not None: self.disk_cache.uncache_row(row)
def __iter__(self): """ The rows will come out in any order. DO NOT MODIFY cache during the loop """ if self._cache is not None: for row in self._cache: yield row if self.disk_cache is not None: for row in self.disk_cache: yield row
[docs] def get_versions_collection( self, row: Lookup.ROW_TYPES ) -> typing.MutableMapping[datetime, Row]: """ This method exists for compatibility with range caches Parameters ---------- row The row with keys to search row Returns ------- A MutableMapping of rows """ if not self.cache_enabled: raise ValueError(f"Lookup {self.lookup_name} cache not enabled") if self._cache is None: self.init_cache() try: return self._cache.get_versions_collection(row) except NoResultFound: if self.disk_cache is not None: return self.disk_cache.get_versions_collection(row) else: raise NoResultFound()
[docs] def report_on_value_cache_effectiveness(self, lookup_name: str = None): if lookup_name is None: lookup_name = self.lookup_name if self._cache: self._cache.report_on_value_cache_effectiveness(f'{lookup_name} RAM') if self.disk_cache: self.disk_cache.report_on_value_cache_effectiveness(f'{lookup_name} DISK')