bi_etl.components.readonlytable module
Created on Sep 17, 2014
@author: Derek Wood
- class bi_etl.components.readonlytable.ReadOnlyTable(task: ETLTask | None, database: DatabaseMetadata, table_name: str, table_name_case_sensitive: bool = True, schema: str = None, exclude_columns: set = None, include_only_columns: set = None, **kwargs)[source]
Bases:
ETLComponent
Reads all columns from a database table or view. Rows can be filtered using the
where()
method.- Parameters:
database¶ (bi_etl.scheduler.task.Database) – The database to find the table/view in.
table_name_case_sensitive¶ (bool) –
Should the table name be treated in a case-sensitive manner?
If false, it will convert the table name to lower case which indicates to SQLAlchemy that it should be not case-sensitive in the Oracle dialect.
- NOTE from SQLAlchemy:
In Oracle, the data dictionary represents all case-insensitive identifier names using UPPERCASE text. SQLAlchemy on the other hand considers an all-lower case identifier name to be case insensitive. The Oracle dialect converts all case insensitive identifiers to and from those two formats during schema level communication, such as reflection of tables and indexes. Using an UPPERCASE name on the SQLAlchemy side indicates a case sensitive identifier, and SQLAlchemy will quote the name - this will cause mismatches against data dictionary data received from Oracle, so unless identifier names have been truly created as case sensitive (i.e. using quoted names), all lowercase names should be used on the SQLAlchemy side. https://docs.sqlalchemy.org/en/20/dialects/oracle.html#identifier-casing
- include_only_columnslist, optional
Optional. A list of specific columns to include when reading the table/view. All other columns are excluded.
- exclude_columns :
Optional. A list of columns to exclude when reading the table/view.
- special_values_descriptive_columns
A list of columns that should get longer descriptive text (e.g. ‘Missing’ instead of ‘?’) in
get_missing_row()
,get_invalid_row()
,get_not_applicable_row()
,get_various_row()
- Type:
list, optional
- log_first_row
Should we log progress on the first row read. Only applies if used as a source. (inherited from ETLComponent)
- Type:
boolean
- max_rows
The maximum number of rows to read. Only applies if Table is used as a source. (inherited from ETLComponent)
- Type:
int, optional
- maintain_cache_during_load
Default = True. Should we maintain the lookup caches as we load records. Can safely be set to False for sources that will never use a key combination twice during a single load. Setting it to False should improve performance.
- Type:
boolean
- primary_key
The name of the primary key column(s). Only impacts trace messages. Default=None. If not passed in, will use the database value, if any. (inherited from ETLComponent)
- Type:
- progress_frequency
How often (in seconds) to output progress messages. None for no progress messages. (inherited from ETLComponent)
- Type:
- progress_message
The progress message to print. Default is
"{logical_name} row # {row_number}"
. Notelogical_name
androw_number
subs. (inherited from ETLComponent)- Type:
- DEFAULT_PROGRESS_FREQUENCY = 10
Default for number of seconds between progress messages when reading from this component. See
ETLComponent.progress_frequency`
to override.
- DEFAULT_PROGRESS_MESSAGE = '{logical_name} current row # {row_number:,}'
Default progress message when reading from this component. See
ETLComponent.progress_message`
to override.
- FULL_ITERATION_HEADER = 'full'
Constant value passed into
ETLComponent.Row()
to request all columns in the row. Deprecated: Please useETLComponent.full_row_instance()
to get a row with all columns.
- NK_LOOKUP = 'NK'
- PK_LOOKUP = 'PK'
- Row(data: MutableMapping | Iterator | None = None, iteration_header: RowIterationHeader | str | None = None) Row
Make a new empty row with this components structure.
- __init__(task: ETLTask | None, database: DatabaseMetadata, table_name: str, table_name_case_sensitive: bool = True, schema: str = None, exclude_columns: set = None, include_only_columns: set = None, **kwargs)[source]
- build_row(source_row: Row, source_excludes: frozenset | None = None, target_excludes: frozenset | None = None, stat_name: str = 'build_row_safe', parent_stats: Statistics | None = None) Row
Use a source row to build a row with correct data types for this table.
- build_row_dynamic_source(source_row: Row, source_excludes: frozenset | None = None, target_excludes: frozenset | None = None, stat_name: str = 'build_row_dynamic_source', parent_stats: Statistics | None = None) Row
Use a source row to build a row with correct data types for this table. This version expects dynamically changing source rows, so it sanity checks all rows.
- cache_commit()
- property check_row_limit
- clear_cache()
Clear all lookup caches. Sets to un-cached state (unknown state v.s. empty state which is what init_cache gives)
- clear_statistics()
- property column_names_set: set
A set containing the column names for this component. Usable to quickly check if the component contains a certain column.
- property columns: List[Column]
A named-based collection of
sqlalchemy.sql.expression.ColumnElement
objects in this table/view.
- connection(connection_name: str | None = None, open_if_not_exist: bool = True, open_if_closed: bool = True) Connection [source]
- count(column: str = None, where=None) int [source]
Query the table/view to get the count of a given column.
- Parameters:
column¶ (str or
sqlalchemy.sql.expression.ColumnElement
.) – The column to get the max value ofwhere¶ (string or list of strings) – Each string value will be passed to
sqlalchemy.sql.expression.Select.where()
http://docs.sqlalchemy.org/en/rel_1_0/core/selectable.html?highlight=where#sqlalchemy.sql.expression.Select.where
- Returns:
count
- Return type:
- define_lookup(lookup_name: str, lookup_keys: list, lookup_class: Type[Lookup] = None, lookup_class_kwargs: dict | None = None)
Define a new lookup.
- Parameters:
lookup_name¶ – Name for the lookup. Used to refer to it later.
lookup_keys¶ – list of lookup key columns
lookup_class¶ – Optional python class to use for the lookup. Defaults to value of default_lookup_class attribute.
lookup_class_kwargs¶ – Optional dict of additional parameters to pass to lookup constructor. Defaults to empty dict.
- property delete_flag
- property empty_iteration_header: RowIterationHeader
- exclude_columns(columns_to_exclude: set)[source]
Exclude columns from the table. Removes them from all SQL statements.
- columns_to_exclude :
A list of columns to exclude when reading the table/view.
- execute(statement, *list_params, connection_name: str = None, **params) LegacyCursorResult [source]
- fill_cache(progress_frequency: float = 10, progress_message='{component} fill_cache current row # {row_number:,}', criteria_list: list = None, criteria_dict: dict = None, column_list: list = None, exclude_cols: frozenset = None, order_by: list = None, assume_lookup_complete: bool = None, allow_duplicates_in_src: bool = False, row_limit: int = None, parent_stats: Statistics = None)
Fill all lookup caches from the table.
- Parameters:
progress_frequency¶ – How often (in seconds) to output progress messages. Default 10. None for no progress messages.
progress_message¶ – The progress message to print. Default is
"{component} fill_cache current row # {row_number:,}"
. Notelogical_name
androw_number
substitutions applied viaformat()
.criteria_list¶ – Each string value will be passed to
sqlalchemy.sql.expression.Select.where()
. https://goo.gl/JlY9uscriteria_dict¶ – Dict keys should be columns, values are set using = or in
column_list¶ – List of columns to include
exclude_cols¶ (frozenset) – Optional. Columns to exclude when filling the cache
order_by¶ – list of columns to sort by when filling the cache (helps range caches)
assume_lookup_complete¶ – Should later lookup calls assume the cache is complete? If so, lookups will raise an Exception if a key combination is not found. Default to False if filtering criteria was used, otherwise defaults to True.
allow_duplicates_in_src¶ – Should we quietly let the source provide multiple rows with the same key values? Default = False
row_limit¶ – limit on number of rows to cache.
row_limit¶ – limit on number of rows to cache.
parent_stats¶ – Optional Statistics object to nest this steps statistics in. Default is to place statistics in the ETLTask level statistics.
- fill_cache_from_source(source: ETLComponent, progress_frequency: float = 10, progress_message='{component} fill_cache current row # {row_number:,}', criteria_list: list = None, criteria_dict: dict = None, column_list: list = None, exclude_cols: frozenset = None, order_by: list = None, assume_lookup_complete: bool = None, allow_duplicates_in_src: bool = False, row_limit: int = None, parent_stats: Statistics = None)[source]
Fill all lookup caches from the table.
- Parameters:
source¶ – Source compontent to get rows from.
progress_frequency¶ (int, optional) – How often (in seconds) to output progress messages. Default 10. None for no progress messages.
progress_message¶ (str, optional) – The progress message to print. Default is
"{component} fill_cache current row # {row_number:,}"
. Notelogical_name
androw_number
substitutions applied viaformat()
.criteria_list¶ (string or list of strings) – Each string value will be passed to
sqlalchemy.sql.expression.Select.where()
. https://goo.gl/JlY9uscriteria_dict¶ (dict) – Dict keys should be columns, values are set using = or in
column_list¶ – List of columns to include
exclude_cols¶ – Optional. Columns to exclude when filling the cache
order_by¶ (list) – list of columns to sort by when filling the cache (helps range caches)
assume_lookup_complete¶ (boolean) – Should later lookup calls assume the cache is complete? If so, lookups will raise an Exception if a key combination is not found. Default to False if filtering criteria was used, otherwise defaults to True.
allow_duplicates_in_src¶ – Should we quietly let the source provide multiple rows with the same key values? Default = False
parent_stats¶ (bi_etl.statistics.Statistics) – Optional Statistics object to nest this steps statistics in. Default is to place statistics in the ETLTask level statistics.
- property full_iteration_header: RowIterationHeader
- full_row_instance(data: MutableMapping | Iterator | None = None) Row
Build a full row (all columns) using the source data.
Note: If data is passed here, it uses
bi_etl.components.row.row.Row.update()
to map the data into the columns. That is nicely automatic, but slower since it has to try various ways to read the data container object.Consider using the appropriate one of the more specific update methods based on the source data container.
- generate_iteration_header(logical_name: str | None = None, columns_in_order: list | None = None, result_primary_key: list | None = None) RowIterationHeader
- get_by_key(source_row: Row, stats_id: str = 'get_by_key', parent_stats: Statistics = None) Row [source]
Get by the primary key.
- get_by_lookup(lookup_name: str, source_row: Row, stats_id: str = 'get_by_lookup', parent_stats: Statistics | None = None, fallback_to_db: bool = False) Row [source]
Get by an alternate key. Returns a
Row
- Throws:
NoResultFound
- get_column(column: str | Column) Column [source]
Get the
sqlalchemy.sql.expression.ColumnElement
object for a given column name.
- get_column_name(column)[source]
Get the column name given a possible
sqlalchemy.sql.expression.ColumnElement
object.
- get_column_special_value(column: Column, short_char: str, long_char: str, int_value: int, date_value: datetime, use_custom_special_values: bool = 'Y') object [source]
- get_default_lookup(row_iteration_header: RowIterationHeader) Lookup [source]
- get_invalid_row()[source]
Get a
Row
with the Invalid special values filled in for all columns.Type
Value
Integer
-8888
Short Text
‘!’
Long Text
‘Invalid’
Date
9999-8-1
- get_missing_row()[source]
Get a
Row
with the Missing special values filled in for all columns.Type
Value
Integer
-9999
Short Text
‘?’
Long Text
‘Missing’
Date
9999-9-1
- get_none_selected_row()[source]
Get a
Row
with the None Selected special values filled in for all columns.Type
Value
Integer
-5555
Short Text
‘#’
Long Text
‘None Selected’
Date
9999-5-1
- get_not_applicable_row()[source]
Get a
Row
with the Not Applicable special values filled in for all columns.Type
Value
Integer
-7777
Short Text
‘~’
Long Text
‘Not Available’
Date
9999-7-1
- get_one(statement=None)[source]
Executes and gets one row from the statement.
- Parameters:
statement¶ – The SQL statement to execute
- Returns:
row – The row returned
- Return type:
Row
- Raises:
NoResultFound – No rows returned.
MultipleResultsFound – More than one row was returned.
- get_stats_entry(stats_id: str, parent_stats: Statistics | None = None, print_start_stop_times: bool | None = None)
- get_unique_stats_entry(stats_id: str, parent_stats: Statistics | None = None, print_start_stop_times: bool | None = None)
- get_various_row()[source]
Get a
Row
with the Various special values filled in for all columns.Type
Value
Integer
-6666
Short Text
‘*’
Long Text
‘Various’
Date
9999-6-1
- include_only_columns(columns_to_include: set)[source]
Include only specified columns in the table definition. Columns that are non included are removed them from all SQL statements.
- columns_to_includelist
A list of columns to include when reading the table/view.
- init_cache()
Initialize all lookup caches as empty.
- property is_closed
- iter_result(result_list: object, columns_in_order: list | None = None, criteria_dict: dict | None = None, logical_name: str | None = None, progress_frequency: int | None = None, stats_id: str | None = None, parent_stats: Statistics | None = None) Iterable[Row]
- Yields:
row (
Row
) – next row
- static kwattrs_order() Dict[str, int]
Certain values need to be set before others in order to work correctly. This method should return a dict mapping those key values = arg name to a value less than the default of 9999, which will be used for any arg not explicitly listed here.
- log_progress(row: Row, stats: Statistics)
- logging_level_reported = False
Has the logging level of this component been reported (logged) yet? Stored at class level so that it can be logged only once.
- property lookups
- max(column, where=None, connection_name: str = 'max')[source]
Query the table/view to get the maximum value of a given column.
- Parameters:
column¶ (str or
sqlalchemy.sql.expression.ColumnElement
.) – The column to get the max value ofwhere¶ (string or list of strings) – Each string value will be passed to
sqlalchemy.sql.expression.Select.where()
http://docs.sqlalchemy.org/en/rel_1_0/core/selectable.html?highlight=where#sqlalchemy.sql.expression.Select.whereconnection_name¶ – Name of the pooled connection to use Defaults to ‘max’
- Returns:
max
- Return type:
depends on column datatype
- order_by(order_by: list, stats_id: str = None, parent_stats: Statistics = None) Iterable[Row] [source]
Iterate over all rows in order provided.
- Parameters:
order_by¶ (string or list of strings) – Each value should represent a column to order by.
stats_id¶ (string) – Name of this step for the ETLTask statistics.
parent_stats¶ (bi_etl.statistics.Statistics) – Optional Statistics object to nest this steps statistics in. Default is to place statistics in the ETLTask level statistics.
- Yields:
row (
Row
) –Row
object with contents of a table/view row
- property primary_key: list
The name of the primary key column(s). Only impacts trace messages. Default=Empty list.
- property primary_key_tuple: tuple
The name of the primary key column(s) in a tuple. Used when a hashable PK definition is needed.
- property progress_frequency: int
How often (in seconds) to output progress messages. None for no progress messages.
- property qualified_table_name
The table name
- property quoted_qualified_table_name
The table name
- property row_name
- sanity_check_example_row(example_source_row, source_excludes=None, target_excludes=None, ignore_source_not_in_target=None, ignore_target_not_in_source=None)
- sanity_check_source_mapping(source_definition: ETLComponent, source_name: str = None, source_excludes: frozenset = None, target_excludes: frozenset = None, ignore_source_not_in_target: bool = None, ignore_target_not_in_source: bool = None, raise_on_source_not_in_target: bool = None, raise_on_target_not_in_source: bool = None)
- select(column_list: list | None = None, exclude_cols: frozenset | None = None) GenerativeSelect [source]
Builds a select statement for this table.
- Return type:
statement
- set_kwattrs(**kwargs)
- property statistics
- property table
- property table_name
The table name
- property trace_data: bool
boolean Should a debug message be printed with the parsed contents (as columns) of each row.
- uncache_row(row)
- uncache_where(key_names, key_values_dict)
- where(criteria_list: list = None, criteria_dict: dict = None, order_by: list = None, column_list: List[Column | str] = None, exclude_cols: FrozenSet[Column | str] = None, use_cache_as_source: bool = None, connection_name: str = 'select', progress_frequency: int = None, stats_id: str = None, parent_stats: Statistics = None) Iterable[Row] [source]
- Parameters:
criteria_list¶ – Each string value will be passed to
sqlalchemy.sql.expression.Select.where()
. http://docs.sqlalchemy.org/en/rel_1_0/core/selectable.html?highlight=where#sqlalchemy.sql.expression.Select.wherecriteria_dict¶ – Dict keys should be columns, values are set using = or in
order_by¶ – List of sort keys
column_list¶ – List of columns (str or Column)
exclude_cols¶ –
use_cache_as_source¶ –
connection_name¶ – Name of the pooled connection to use
progress_frequency¶ –
stats_id¶ –
parent_stats¶ –
- Return type:
rows