"""
Created on May 15, 2015
@author: Derek Wood
"""
# https://www.python.org/dev/peps/pep-0563/
from __future__ import annotations
import dbm
import math
import os
import pickle
import shelve
import string
import sys
import tempfile
import typing
import weakref
import semidbm
from bi_etl.config.bi_etl_config_base import BI_ETL_Config_Base
from bi_etl.lookups.lookup import Lookup
from bi_etl.memory_size import get_dir_size
from bi_etl.memory_size import get_size_gc
if typing.TYPE_CHECKING:
from bi_etl.components.etlcomponent import ETLComponent
__all__ = ['DiskLookup']
[docs]
class DiskLookup(Lookup):
DEFAULT_PATH = None
[docs]
def __init__(self,
lookup_name: str,
lookup_keys: list,
parent_component: ETLComponent,
config: BI_ETL_Config_Base = None,
use_value_cache: bool = True,
path=None,
init_parent: bool = True,
**kwargs):
"""
Optional parameter path where the lookup files should be persisted to disk
"""
if init_parent:
super().__init__(
lookup_name=lookup_name,
lookup_keys=lookup_keys,
parent_component=parent_component,
use_value_cache=use_value_cache,
config=config,
**kwargs
)
self._set_path(path)
self.dbm = None
self._cache_dir_mgr = None
self._cache_file_path = None
self.use_value_cache = False
self._finalizer = None
# Shelf requires str keys
self._hashable_key_type = str
def _set_path(self, path: str):
if path is not None:
self.path = path
else:
if self.config is not None:
self.path = self.config.get('Cache', 'path', fallback=DiskLookup.DEFAULT_PATH)
else:
self.path = DiskLookup.DEFAULT_PATH
[docs]
def init_cache(self):
if self.cache_enabled is None:
self.cache_enabled = True
if self.cache_enabled:
file_prefix = ''.join([c for c in self.lookup_name if c in string.ascii_letters])
self._cache_dir_mgr = tempfile.TemporaryDirectory(
dir=self.path,
prefix=file_prefix,
ignore_cleanup_errors=True,
)
self._cache_file_path = self._cache_dir_mgr.name
self.log.info(f"Creating cache in {self._cache_file_path}")
if sys.platform.startswith('win'):
self.dbm = semidbm.open(self._cache_file_path, 'n')
else:
file = os.path.join(self._cache_file_path, 'data')
self.dbm = dbm.open(file, 'n')
self._cache = shelve.BsdDbShelf(
self.dbm,
protocol=pickle.HIGHEST_PROTOCOL,
writeback=False,
)
self._finalizer = weakref.finalize(self, self._cleanup)
def __len__(self):
if self._cache is not None:
return len(self.dbm.keys())
else:
return 0
def _get_first_row_size(self, row: Lookup.ROW_TYPES):
if self.dbm:
# Slow but shouldn't be too bad twice
self._row_size = get_size_gc(self.dbm)
[docs]
def check_estimate_row_size(self, force_now: bool = False):
if force_now or not self._done_get_estimate_row_size:
row_cnt = min(len(self), 1000)
total_row_sizes = 0
row_num = 0
for row in self:
total_row_sizes += get_size_gc(self.get_hashable_combined_key(row))
row_num += 1
if row_num >= row_cnt:
break
self._row_size = math.ceil(total_row_sizes / row_cnt)
msg = (f'{self.lookup_name} row key size (in memory) '
f'now estimated at {self._row_size:,} bytes per row')
self.log.debug(msg)
self._done_get_estimate_row_size = True
[docs]
def get_disk_size(self):
if self._cache_file_path:
return get_dir_size(self._cache_file_path)
else:
return 0
def _cleanup(self):
if self._cache is not None:
self.log.debug(f"Cleanup cache file {self._cache_dir_mgr} {self._cache_dir_mgr.name}")
self.clear_cache()
[docs]
def clear_cache(self):
if self._cache is not None:
self._cache.close()
self._cache_dir_mgr.cleanup()
self._cache = None