Source code for bi_etl.components.get_next_key.shared_table_memory
from bi_etl.components.get_next_key.base_table_memory import BaseTableMemory
[docs]class SharedTableMemory(BaseTableMemory):
[docs] def __init__(self, manager, chunk_size: int):
# Table will need to be set on local process before use
super().__init__(table=None)
self.chunk_size = chunk_size
self.lock = manager.Lock()
self.current_key_values = manager.dict()
# These need to be None on init because init is done on main process
self._local_current_keys = None
self._local_max_allocated_keys = None
self.log.debug(f"SharedTableMemory chunk_size= {chunk_size}")
[docs] def get_key_from_shared(self, column: str) -> int:
column_obj = self.table.get_column(column)
column = column_obj.name
with self.lock:
current_max = self.current_key_values.get(column)
if current_max is None:
next_key = self.get_next_from_database(column_obj)
else:
next_key = current_max + 1
local_max_allocated = next_key + self.chunk_size
self.current_key_values[column] = local_max_allocated
self._local_max_allocated_keys[column] = local_max_allocated
return next_key
[docs] def get_next_key(self, column: str) -> int:
column_obj = self.table.get_column(column)
column = column_obj.name
if self._local_current_keys is None:
self._local_current_keys = dict()
self._local_max_allocated_keys = dict()
current_max = self._local_current_keys.get(column)
if current_max is None:
next_key = self.get_key_from_shared(column)
else:
local_max_allocated = self._local_max_allocated_keys.get(column)
if current_max >= local_max_allocated:
# defer to parent to get max
next_key = self.get_key_from_shared(column)
else:
next_key = current_max + 1
self._local_current_keys[column] = next_key
return next_key