bi_etl.components.readonlytable module

Created on Sep 17, 2014

@author: Derek Wood

class bi_etl.components.readonlytable.ReadOnlyTable(task: ETLTask | None, database: DatabaseMetadata, table_name: str, table_name_case_sensitive: bool = True, schema: str = None, exclude_columns: set = None, include_only_columns: set = None, **kwargs)[source]

Bases: ETLComponent

Reads all columns from a database table or view. Rows can be filtered using the where() method.

Parameters:
  • task (ETLTask) – The instance to register in (if not None)

  • database (bi_etl.scheduler.task.Database) – The database to find the table/view in.

  • table_name (str) – The name of the table/view.

  • table_name_case_sensitive (bool) –

    Should the table name be treated in a case-sensitive manner?

    If false, it will convert the table name to lower case which indicates to SQLAlchemy that it should be not case-sensitive in the Oracle dialect.

    NOTE from SQLAlchemy:

    In Oracle, the data dictionary represents all case-insensitive identifier names using UPPERCASE text. SQLAlchemy on the other hand considers an all-lower case identifier name to be case insensitive. The Oracle dialect converts all case insensitive identifiers to and from those two formats during schema level communication, such as reflection of tables and indexes. Using an UPPERCASE name on the SQLAlchemy side indicates a case sensitive identifier, and SQLAlchemy will quote the name - this will cause mismatches against data dictionary data received from Oracle, so unless identifier names have been truly created as case sensitive (i.e. using quoted names), all lowercase names should be used on the SQLAlchemy side. https://docs.sqlalchemy.org/en/20/dialects/oracle.html#identifier-casing

include_only_columnslist, optional

Optional. A list of specific columns to include when reading the table/view. All other columns are excluded.

exclude_columns :

Optional. A list of columns to exclude when reading the table/view.

delete_flag

The name of the delete_flag column, if any.

Type:

str, optional

delete_flag_yes

The value of delete_flag for deleted rows.

Type:

str, optional

delete_flag_no

The value of delete_flag for not deleted rows.

Type:

str, optional

special_values_descriptive_columns

A list of columns that should get longer descriptive text (e.g. ‘Missing’ instead of ‘?’) in get_missing_row(), get_invalid_row(), get_not_applicable_row(), get_various_row()

Type:

list, optional

log_first_row

Should we log progress on the first row read. Only applies if used as a source. (inherited from ETLComponent)

Type:

boolean

max_rows

The maximum number of rows to read. Only applies if Table is used as a source. (inherited from ETLComponent)

Type:

int, optional

maintain_cache_during_load

Default = True. Should we maintain the lookup caches as we load records. Can safely be set to False for sources that will never use a key combination twice during a single load. Setting it to False should improve performance.

Type:

boolean

primary_key

The name of the primary key column(s). Only impacts trace messages. Default=None. If not passed in, will use the database value, if any. (inherited from ETLComponent)

Type:

list

natural_key

The list of natural key columns (as Column objects). Default is None

Type:

list

progress_frequency

How often (in seconds) to output progress messages. None for no progress messages. (inherited from ETLComponent)

Type:

int

progress_message

The progress message to print. Default is "{logical_name} row # {row_number}". Note logical_name and row_number subs. (inherited from ETLComponent)

Type:

str

DEFAULT_PROGRESS_FREQUENCY = 10

Default for number of seconds between progress messages when reading from this component. See ETLComponent.progress_frequency` to override.

DEFAULT_PROGRESS_MESSAGE = '{logical_name} current row # {row_number:,}'

Default progress message when reading from this component. See ETLComponent.progress_message` to override.

FULL_ITERATION_HEADER = 'full'

Constant value passed into ETLComponent.Row() to request all columns in the row. Deprecated: Please use ETLComponent.full_row_instance() to get a row with all columns.

NK_LOOKUP = 'NK'
PK_LOOKUP = 'PK'
Row(data: MutableMapping | Iterator | None = None, iteration_header: RowIterationHeader | str | None = None) Row

Make a new empty row with this components structure.

__init__(task: ETLTask | None, database: DatabaseMetadata, table_name: str, table_name_case_sensitive: bool = True, schema: str = None, exclude_columns: set = None, include_only_columns: set = None, **kwargs)[source]
build_row(source_row: Row, source_excludes: frozenset | None = None, target_excludes: frozenset | None = None, stat_name: str = 'build_row_safe', parent_stats: Statistics | None = None) Row

Use a source row to build a row with correct data types for this table.

Parameters:
  • source_row

  • source_excludes

  • target_excludes

  • stat_name – Name of this step for the ETLTask statistics. Default = ‘build rows’

  • parent_stats

Return type:

Row

build_row_dynamic_source(source_row: Row, source_excludes: frozenset | None = None, target_excludes: frozenset | None = None, stat_name: str = 'build_row_dynamic_source', parent_stats: Statistics | None = None) Row

Use a source row to build a row with correct data types for this table. This version expects dynamically changing source rows, so it sanity checks all rows.

Parameters:
  • source_row

  • source_excludes

  • target_excludes

  • stat_name – Name of this step for the ETLTask statistics. Default = ‘build rows’

  • parent_stats

Return type:

Row

cache_commit()
cache_iterable()[source]
cache_row(row: Row, allow_update: bool = False, allow_insert: bool = True)
property check_row_limit
clear_cache()

Clear all lookup caches. Sets to un-cached state (unknown state v.s. empty state which is what init_cache gives)

clear_statistics()
close(error: bool = False)[source]
close_connection(connection_name: str = None)[source]
close_connections(exceptions: set | None = None)[source]
property column_names: List[str]

The list of column names for this component.

property column_names_set: set

A set containing the column names for this component. Usable to quickly check if the component contains a certain column.

property columns: List[Column]

A named-based collection of sqlalchemy.sql.expression.ColumnElement objects in this table/view.

connection(connection_name: str | None = None, open_if_not_exist: bool = True, open_if_closed: bool = True) Connection[source]
count(column: str = None, where=None) int[source]

Query the table/view to get the count of a given column.

Parameters:
Returns:

count

Return type:

int

debug_log(state: bool = True)
define_lookup(lookup_name: str, lookup_keys: list, lookup_class: Type[Lookup] = None, lookup_class_kwargs: dict | None = None)

Define a new lookup.

Parameters:
  • lookup_name – Name for the lookup. Used to refer to it later.

  • lookup_keys – list of lookup key columns

  • lookup_class – Optional python class to use for the lookup. Defaults to value of default_lookup_class attribute.

  • lookup_class_kwargs – Optional dict of additional parameters to pass to lookup constructor. Defaults to empty dict.

property delete_flag
property empty_iteration_header: RowIterationHeader
ensure_nk_lookup()[source]
exclude_columns(columns_to_exclude: set)[source]

Exclude columns from the table. Removes them from all SQL statements.

columns_to_exclude :

A list of columns to exclude when reading the table/view.

execute(statement, *list_params, connection_name: str = None, **params) LegacyCursorResult[source]
Parameters:
  • statement – The SQL statement to execute. Note: caller must handle the transaction begin/end.

  • connection_name – Name of the pooled connection to use Defaults to DEFAULT_CONNECTION_NAME

Return type:

sqlalchemy.engine.ResultProxy with results

fill_cache(progress_frequency: float = 10, progress_message='{component} fill_cache current row # {row_number:,}', criteria_list: list = None, criteria_dict: dict = None, column_list: list = None, exclude_cols: frozenset = None, order_by: list = None, assume_lookup_complete: bool = None, allow_duplicates_in_src: bool = False, row_limit: int = None, parent_stats: Statistics = None)

Fill all lookup caches from the table.

Parameters:
  • progress_frequency – How often (in seconds) to output progress messages. Default 10. None for no progress messages.

  • progress_message – The progress message to print. Default is "{component} fill_cache current row # {row_number:,}". Note logical_name and row_number substitutions applied via format().

  • criteria_list – Each string value will be passed to sqlalchemy.sql.expression.Select.where(). https://goo.gl/JlY9us

  • criteria_dict – Dict keys should be columns, values are set using = or in

  • column_list – List of columns to include

  • exclude_cols (frozenset) – Optional. Columns to exclude when filling the cache

  • order_by – list of columns to sort by when filling the cache (helps range caches)

  • assume_lookup_complete – Should later lookup calls assume the cache is complete? If so, lookups will raise an Exception if a key combination is not found. Default to False if filtering criteria was used, otherwise defaults to True.

  • allow_duplicates_in_src – Should we quietly let the source provide multiple rows with the same key values? Default = False

  • row_limit – limit on number of rows to cache.

  • row_limit – limit on number of rows to cache.

  • parent_stats – Optional Statistics object to nest this steps statistics in. Default is to place statistics in the ETLTask level statistics.

fill_cache_from_source(source: ETLComponent, progress_frequency: float = 10, progress_message='{component} fill_cache current row # {row_number:,}', criteria_list: list = None, criteria_dict: dict = None, column_list: list = None, exclude_cols: frozenset = None, order_by: list = None, assume_lookup_complete: bool = None, allow_duplicates_in_src: bool = False, row_limit: int = None, parent_stats: Statistics = None)[source]

Fill all lookup caches from the table.

Parameters:
  • source – Source compontent to get rows from.

  • progress_frequency (int, optional) – How often (in seconds) to output progress messages. Default 10. None for no progress messages.

  • progress_message (str, optional) – The progress message to print. Default is "{component} fill_cache current row # {row_number:,}". Note logical_name and row_number substitutions applied via format().

  • criteria_list (string or list of strings) – Each string value will be passed to sqlalchemy.sql.expression.Select.where(). https://goo.gl/JlY9us

  • criteria_dict (dict) – Dict keys should be columns, values are set using = or in

  • column_list – List of columns to include

  • exclude_cols – Optional. Columns to exclude when filling the cache

  • order_by (list) – list of columns to sort by when filling the cache (helps range caches)

  • assume_lookup_complete (boolean) – Should later lookup calls assume the cache is complete? If so, lookups will raise an Exception if a key combination is not found. Default to False if filtering criteria was used, otherwise defaults to True.

  • allow_duplicates_in_src – Should we quietly let the source provide multiple rows with the same key values? Default = False

  • row_limit (int) – limit on number of rows to cache.

  • parent_stats (bi_etl.statistics.Statistics) – Optional Statistics object to nest this steps statistics in. Default is to place statistics in the ETLTask level statistics.

property full_iteration_header: RowIterationHeader
full_row_instance(data: MutableMapping | Iterator | None = None) Row

Build a full row (all columns) using the source data.

Note: If data is passed here, it uses bi_etl.components.row.row.Row.update() to map the data into the columns. That is nicely automatic, but slower since it has to try various ways to read the data container object.

Consider using the appropriate one of the more specific update methods based on the source data container.

generate_iteration_header(logical_name: str | None = None, columns_in_order: list | None = None, result_primary_key: list | None = None) RowIterationHeader
get_by_key(source_row: Row, stats_id: str = 'get_by_key', parent_stats: Statistics = None) Row[source]

Get by the primary key.

get_by_lookup(lookup_name: str, source_row: Row, stats_id: str = 'get_by_lookup', parent_stats: Statistics | None = None, fallback_to_db: bool = False) Row[source]

Get by an alternate key. Returns a Row

Throws:

NoResultFound

get_column(column: str | Column) Column[source]

Get the sqlalchemy.sql.expression.ColumnElement object for a given column name.

get_column_name(column)[source]

Get the column name given a possible sqlalchemy.sql.expression.ColumnElement object.

get_column_special_value(column: Column, short_char: str, long_char: str, int_value: int, date_value: datetime, use_custom_special_values: bool = 'Y') object[source]
get_default_lookup(row_iteration_header: RowIterationHeader) Lookup[source]
get_invalid_row()[source]

Get a Row with the Invalid special values filled in for all columns.

Type

Value

Integer

-8888

Short Text

‘!’

Long Text

‘Invalid’

Date

9999-8-1

get_lookup(lookup_name: str | None) Lookup[source]
get_lookup_keys(lookup_name: str) list
get_lookup_tuple(lookup_name: str, row: Row) tuple
get_missing_row()[source]

Get a Row with the Missing special values filled in for all columns.

Type

Value

Integer

-9999

Short Text

‘?’

Long Text

‘Missing’

Date

9999-9-1

get_natural_key_tuple(row) tuple[source]
get_natural_key_value_list(row: Row) list[source]
get_nk_lookup() Lookup[source]
get_nk_lookup_name()[source]
get_none_selected_row()[source]

Get a Row with the None Selected special values filled in for all columns.

Type

Value

Integer

-5555

Short Text

‘#’

Long Text

‘None Selected’

Date

9999-5-1

get_not_applicable_row()[source]

Get a Row with the Not Applicable special values filled in for all columns.

Type

Value

Integer

-7777

Short Text

‘~’

Long Text

‘Not Available’

Date

9999-7-1

get_one(statement=None)[source]

Executes and gets one row from the statement.

Parameters:

statement – The SQL statement to execute

Returns:

row – The row returned

Return type:

Row

Raises:
get_pk_lookup() Lookup[source]
get_primary_key_value_list(row) list[source]
get_primary_key_value_tuple(row) tuple[source]
get_qualified_lookup_name(base_lookup_name: str) str
get_special_row(short_char: str, long_char: str, int_value: int, date_value: datetime)[source]
get_stats_entry(stats_id: str, parent_stats: Statistics | None = None, print_start_stop_times: bool | None = None)
get_unique_stats_entry(stats_id: str, parent_stats: Statistics | None = None, print_start_stop_times: bool | None = None)
get_various_row()[source]

Get a Row with the Various special values filled in for all columns.

Type

Value

Integer

-6666

Short Text

‘*’

Long Text

‘Various’

Date

9999-6-1

include_only_columns(columns_to_include: set)[source]

Include only specified columns in the table definition. Columns that are non included are removed them from all SQL statements.

columns_to_includelist

A list of columns to include when reading the table/view.

init_cache()

Initialize all lookup caches as empty.

property is_closed
is_connected(connection_name: str | None = None) bool[source]
iter_result(result_list: object, columns_in_order: list | None = None, criteria_dict: dict | None = None, logical_name: str | None = None, progress_frequency: int | None = None, stats_id: str | None = None, parent_stats: Statistics | None = None) Iterable[Row]
Yields:

row (Row) – next row

static kwattrs_order() Dict[str, int]

Certain values need to be set before others in order to work correctly. This method should return a dict mapping those key values = arg name to a value less than the default of 9999, which will be used for any arg not explicitly listed here.

log_progress(row: Row, stats: Statistics)
logging_level_reported = False

Has the logging level of this component been reported (logged) yet? Stored at class level so that it can be logged only once.

property lookups
property maintain_cache_during_load: bool
max(column, where=None, connection_name: str = 'max')[source]

Query the table/view to get the maximum value of a given column.

Parameters:
Returns:

max

Return type:

depends on column datatype

property natural_key: list

Get this tables natural key

order_by(order_by: list, stats_id: str = None, parent_stats: Statistics = None) Iterable[Row][source]

Iterate over all rows in order provided.

Parameters:
  • order_by (string or list of strings) – Each value should represent a column to order by.

  • stats_id (string) – Name of this step for the ETLTask statistics.

  • parent_stats (bi_etl.statistics.Statistics) – Optional Statistics object to nest this steps statistics in. Default is to place statistics in the ETLTask level statistics.

Yields:

row (Row) – Row object with contents of a table/view row

property primary_key: list

The name of the primary key column(s). Only impacts trace messages. Default=Empty list.

property primary_key_tuple: tuple

The name of the primary key column(s) in a tuple. Used when a hashable PK definition is needed.

property progress_frequency: int

How often (in seconds) to output progress messages. None for no progress messages.

property qualified_table_name

The table name

property quoted_qualified_table_name

The table name

property row_name
property rows_read: int

int The number of rows read and returned.

sanity_check_example_row(example_source_row, source_excludes=None, target_excludes=None, ignore_source_not_in_target=None, ignore_target_not_in_source=None)
sanity_check_source_mapping(source_definition: ETLComponent, source_name: str = None, source_excludes: frozenset = None, target_excludes: frozenset = None, ignore_source_not_in_target: bool = None, ignore_target_not_in_source: bool = None, raise_on_source_not_in_target: bool = None, raise_on_target_not_in_source: bool = None)
select(column_list: list | None = None, exclude_cols: frozenset | None = None) GenerativeSelect[source]

Builds a select statement for this table.

Return type:

statement

set_columns(columns)[source]
set_kwattrs(**kwargs)
property statistics
property table
property table_name

The table name

property trace_data: bool

boolean Should a debug message be printed with the parsed contents (as columns) of each row.

uncache_row(row)
uncache_where(key_names, key_values_dict)
where(criteria_list: list = None, criteria_dict: dict = None, order_by: list = None, column_list: List[Column | str] = None, exclude_cols: FrozenSet[Column | str] = None, use_cache_as_source: bool = None, connection_name: str = 'select', progress_frequency: int = None, stats_id: str = None, parent_stats: Statistics = None) Iterable[Row][source]
Parameters:
Return type:

rows