Source code for bi_etl.components.row.row_iteration_header_case_insensitive
import functools
from typing import *
from sqlalchemy import Column
from bi_etl.components.row.row_iteration_header import RowIterationHeader
if TYPE_CHECKING:
from bi_etl.components.row.row import Row
from bi_etl.components.etlcomponent import ETLComponent
[docs]
class RowIterationHeaderCaseInsensitive(RowIterationHeader):
# Override since the case-sensitive versions should not share a name map
__shared_name_map_db = dict()
[docs]
def __init__(
self,
logical_name: Optional[str] = None,
primary_key: Optional[Iterable] = None,
parent: Optional['ETLComponent'] = None,
columns_in_order: Optional[Iterable] = None,
owner_pid: int = None,
):
if columns_in_order is not None:
columns_in_order = [c.lower() for c in columns_in_order]
super().__init__(
logical_name=logical_name,
primary_key=primary_key,
parent=parent,
columns_in_order=columns_in_order,
owner_pid=owner_pid,
)
[docs]
@classmethod
def from_other_header(cls, other_header: RowIterationHeader):
columns_in_order = [c.lower() for c in other_header.columns_in_order]
new_inst = cls(
logical_name=other_header.logical_name,
primary_key=other_header.primary_key,
parent=other_header.parent,
columns_in_order=columns_in_order,
owner_pid=other_header.owner_pid,
)
return new_inst
[docs]
def get_column_name(self, input_name: Union[str, Column]) -> str:
name_str = super().get_column_name(input_name)
lower_name = name_str.lower()
self._name_map_db[input_name] = lower_name
return lower_name
[docs]
def has_column(self, column_name) -> bool:
return self.get_column_name(column_name) in self._columns_positions
[docs]
@functools.lru_cache(maxsize=1000)
def get_column_position(
self,
column_name: str,
allow_create: bool = False,
) -> int:
column_name = self.get_column_name(column_name)
return super().get_column_position(
column_name=column_name,
allow_create=allow_create,
)
[docs]
def row_set_item(
self,
column_name: str,
value,
row: 'Row',
) -> RowIterationHeader:
column_name_fixed = self.get_column_name(column_name)
return super().row_set_item(
column_name=column_name_fixed,
value=value,
row=row,
)
[docs]
def rename_column(
self,
old_name: str,
new_name: str,
ignore_missing: bool = False,
no_new_header: bool = False,
) -> RowIterationHeader:
"""
Rename a column
Parameters:
old_name: str
The name of the column to find and rename.
new_name: str
The new name to give the column.
ignore_missing: boolean
Ignore (don't raise error) if we don't have a column with the name in old_name.
Defaults to False
no_new_header:
Skip creating a new row header, modify in place.
** BE CAREFUL USING THIS! **
All new rows created with this header will immediately get the new name,
in which case you won't want to call this method again.
"""
old_name = self.get_column_name(old_name)
new_name = self.get_column_name(new_name)
return super().rename_column(
old_name=old_name,
new_name=new_name,
ignore_missing=ignore_missing,
no_new_header=no_new_header,
)
[docs]
def row_remove_column(
self,
column_name: str,
row: 'Row',
ignore_missing: bool = False,
) -> RowIterationHeader:
column_name = self.get_column_name(column_name)
return super().row_remove_column(
column_name=column_name,
row=row,
ignore_missing=ignore_missing,
)