bi_etl.components.sqlquery module
Created on Sep 17, 2014
@author: Derek Wood
- class bi_etl.components.sqlquery.SQLQuery(task: ETLTask | None, database: DatabaseMetadata, sql: str, logical_name: str | None = None, **kwargs)[source]
Bases:
ETLComponent
A class for reading an arbitrary SQL statement. Consider using sqlalchemy.sql.text to wrap the SQL. http://docs.sqlalchemy.org/en/latest/core/tutorial.html#using-text
- DEFAULT_PROGRESS_FREQUENCY = 10
Default for number of seconds between progress messages when reading from this component. See
ETLComponent.progress_frequency`
to override.
- DEFAULT_PROGRESS_MESSAGE = '{logical_name} current row # {row_number:,}'
Default progress message when reading from this component. See
ETLComponent.progress_message`
to override.
- FULL_ITERATION_HEADER = 'full'
Constant value passed into
ETLComponent.Row()
to request all columns in the row. Deprecated: Please useETLComponent.full_row_instance()
to get a row with all columns.
- class ParamType(value, names=None, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]
Bases:
IntEnum
Row status values
- __init__(*args, **kwds)
- as_integer_ratio()
Return a pair of integers, whose ratio is equal to the original int.
The ratio is in lowest terms and has a positive denominator.
>>> (10).as_integer_ratio() (10, 1) >>> (-10).as_integer_ratio() (-10, 1) >>> (0).as_integer_ratio() (0, 1)
- bind = 1
- bit_count()
Number of ones in the binary representation of the absolute value of self.
Also known as the population count.
>>> bin(13) '0b1101' >>> (13).bit_count() 3
- bit_length()
Number of bits necessary to represent self in binary.
>>> bin(37) '0b100101' >>> (37).bit_length() 6
- conjugate()
Returns self, the complex conjugate of any int.
- denominator
the denominator of a rational number in lowest terms
- format = 2
- from_bytes(byteorder='big', *, signed=False)
Return the integer represented by the given array of bytes.
- bytes
Holds the array of bytes to convert. The argument must either support the buffer protocol or be an iterable object producing bytes. Bytes and bytearray are examples of built-in objects that support the buffer protocol.
- byteorder
The byte order used to represent the integer. If byteorder is ‘big’, the most significant byte is at the beginning of the byte array. If byteorder is ‘little’, the most significant byte is at the end of the byte array. To request the native byte order of the host system, use `sys.byteorder’ as the byte order value. Default is to use ‘big’.
- signed
Indicates whether two’s complement is used to represent the integer.
- imag
the imaginary part of a complex number
- is_integer()
Returns True. Exists for duck type compatibility with float.is_integer.
- numerator
the numerator of a rational number in lowest terms
- real
the real part of a complex number
- to_bytes(length=1, byteorder='big', *, signed=False)
Return an array of bytes representing an integer.
- length
Length of bytes object to use. An OverflowError is raised if the integer is not representable with the given number of bytes. Default is length 1.
- byteorder
The byte order used to represent the integer. If byteorder is ‘big’, the most significant byte is at the beginning of the byte array. If byteorder is ‘little’, the most significant byte is at the end of the byte array. To request the native byte order of the host system, use `sys.byteorder’ as the byte order value. Default is to use ‘big’.
- signed
Determines whether two’s complement is used to represent the integer. If signed is False and a negative integer is given, an OverflowError is raised.
- Row(data: MutableMapping | Iterator | None = None, iteration_header: RowIterationHeader | str | None = None) Row
Make a new empty row with this components structure.
- __init__(task: ETLTask | None, database: DatabaseMetadata, sql: str, logical_name: str | None = None, **kwargs)[source]
- build_row(source_row: Row, source_excludes: frozenset | None = None, target_excludes: frozenset | None = None, stat_name: str = 'build_row_safe', parent_stats: Statistics | None = None) Row
Use a source row to build a row with correct data types for this table.
- build_row_dynamic_source(source_row: Row, source_excludes: frozenset | None = None, target_excludes: frozenset | None = None, stat_name: str = 'build_row_dynamic_source', parent_stats: Statistics | None = None) Row
Use a source row to build a row with correct data types for this table. This version expects dynamically changing source rows, so it sanity checks all rows.
- cache_commit()
- property check_row_limit
- clear_cache()
Clear all lookup caches. Sets to un-cached state (unknown state v.s. empty state which is what init_cache gives)
- clear_statistics()
- property column_names_set: set
A set containing the column names for this component. Usable to quickly check if the component contains a certain column.
- define_lookup(lookup_name: str, lookup_keys: list, lookup_class: Type[Lookup] = None, lookup_class_kwargs: dict | None = None)
Define a new lookup.
- Parameters:
lookup_name¶ – Name for the lookup. Used to refer to it later.
lookup_keys¶ – list of lookup key columns
lookup_class¶ – Optional python class to use for the lookup. Defaults to value of default_lookup_class attribute.
lookup_class_kwargs¶ – Optional dict of additional parameters to pass to lookup constructor. Defaults to empty dict.
- property empty_iteration_header: RowIterationHeader
- fill_cache(progress_frequency: float = 10, progress_message='{component} fill_cache current row # {row_number:,}', criteria_list: list = None, criteria_dict: dict = None, column_list: list = None, exclude_cols: frozenset = None, order_by: list = None, assume_lookup_complete: bool = None, allow_duplicates_in_src: bool = False, row_limit: int = None, parent_stats: Statistics = None)
Fill all lookup caches from the table.
- Parameters:
progress_frequency¶ – How often (in seconds) to output progress messages. Default 10. None for no progress messages.
progress_message¶ – The progress message to print. Default is
"{component} fill_cache current row # {row_number:,}"
. Notelogical_name
androw_number
substitutions applied viaformat()
.criteria_list¶ – Each string value will be passed to
sqlalchemy.sql.expression.Select.where()
. https://goo.gl/JlY9uscriteria_dict¶ – Dict keys should be columns, values are set using = or in
column_list¶ – List of columns to include
exclude_cols¶ (frozenset) – Optional. Columns to exclude when filling the cache
order_by¶ – list of columns to sort by when filling the cache (helps range caches)
assume_lookup_complete¶ – Should later lookup calls assume the cache is complete? If so, lookups will raise an Exception if a key combination is not found. Default to False if filtering criteria was used, otherwise defaults to True.
allow_duplicates_in_src¶ – Should we quietly let the source provide multiple rows with the same key values? Default = False
row_limit¶ – limit on number of rows to cache.
row_limit¶ – limit on number of rows to cache.
parent_stats¶ – Optional Statistics object to nest this steps statistics in. Default is to place statistics in the ETLTask level statistics.
- fill_cache_from_source(source: ETLComponent, progress_frequency: float = 10, progress_message='{component} fill_cache current row # {row_number:,}', criteria_list: list = None, criteria_dict: dict = None, column_list: list = None, exclude_cols: frozenset = None, order_by: list = None, assume_lookup_complete: bool = None, allow_duplicates_in_src: bool = False, row_limit: int = None, parent_stats: Statistics = None)
Fill all lookup caches from the database table. Note that filtering criteria can be specified so that the resulting cache is not the entire current contents. See
assume_lookup_complete
for how the lookup will handle cache misses – note only database table backed components have the ability to fall back to querying the existing data on cache misses.- Parameters:
source¶ – Source component to get rows from.
progress_frequency¶ – How often (in seconds) to output progress messages. Default 10. None for no progress messages.
progress_message¶ – The progress message to print. Default is
"{component} fill_cache current row # {row_number:,}"
. Notelogical_name
androw_number
substitutions applied viaformat()
.criteria_list¶ – Each string value will be passed to
sqlalchemy.sql.expression.Select.where()
. https://goo.gl/JlY9uscriteria_dict¶ – Dict keys should be columns, values are set using = or in
column_list¶ – List of columns to include
exclude_cols¶ – Optional. Columns to exclude when filling the cache
order_by¶ – list of columns to sort by when filling the cache (helps range caches)
assume_lookup_complete¶ – Should later lookup calls assume the cache is complete? If so, lookups will raise an Exception if a key combination is not found. Default to False if filtering criteria was used, otherwise defaults to True.
allow_duplicates_in_src¶ – Should we quietly let the source provide multiple rows with the same key values? Default = False
row_limit¶ – limit on number of rows to cache.
parent_stats¶ – Optional Statistics object to nest this steps statistics in. Default is to place statistics in the ETLTask level statistics.
- property full_iteration_header: RowIterationHeader
- full_row_instance(data: MutableMapping | Iterator | None = None) Row
Build a full row (all columns) using the source data.
Note: If data is passed here, it uses
bi_etl.components.row.row.Row.update()
to map the data into the columns. That is nicely automatic, but slower since it has to try various ways to read the data container object.Consider using the appropriate one of the more specific update methods based on the source data container.
- generate_iteration_header(logical_name: str | None = None, columns_in_order: list | None = None, result_primary_key: list | None = None) RowIterationHeader
- get_by_lookup(lookup_name: str, source_row: Row, stats_id: str = 'get_by_lookup', parent_stats: Statistics | None = None, fallback_to_db: bool = False) Row
Get by an alternate key. Returns a
Row
- Throws:
NoResultFound
- get_stats_entry(stats_id: str, parent_stats: Statistics | None = None, print_start_stop_times: bool | None = None)
- get_unique_stats_entry(stats_id: str, parent_stats: Statistics | None = None, print_start_stop_times: bool | None = None)
- init_cache()
Initialize all lookup caches as empty.
- property is_closed
- iter_result(result_list: object, columns_in_order: list | None = None, criteria_dict: dict | None = None, logical_name: str | None = None, progress_frequency: int | None = None, stats_id: str | None = None, parent_stats: Statistics | None = None) Iterable[Row]
- Yields:
row (
Row
) – next row
- static kwattrs_order() Dict[str, int]
Certain values need to be set before others in order to work correctly. This method should return a dict mapping those key values = arg name to a value less than the default of 9999, which will be used for any arg not explicitly listed here.
- log_progress(row: Row, stats: Statistics)
- logging_level_reported = False
Has the logging level of this component been reported (logged) yet? Stored at class level so that it can be logged only once.
- property lookups
- property primary_key: list
The name of the primary key column(s). Only impacts trace messages. Default=Empty list.
- property primary_key_tuple: tuple
The name of the primary key column(s) in a tuple. Used when a hashable PK definition is needed.
- property progress_frequency: int
How often (in seconds) to output progress messages. None for no progress messages.
- sanity_check_example_row(example_source_row, source_excludes=None, target_excludes=None, ignore_source_not_in_target=None, ignore_target_not_in_source=None)
- sanity_check_source_mapping(source_definition: ETLComponent, source_name: str = None, source_excludes: frozenset = None, target_excludes: frozenset = None, ignore_source_not_in_target: bool = None, ignore_target_not_in_source: bool = None, raise_on_source_not_in_target: bool = None, raise_on_target_not_in_source: bool = None)
- set_kwattrs(**kwargs)
- property statistics
- property trace_data: bool
boolean Should a debug message be printed with the parsed contents (as columns) of each row.
- uncache_row(row)
- uncache_where(key_names, key_values_dict)
- where(criteria_list: list | None = None, criteria_dict: dict | None = None, order_by: list | None = None, column_list: List[Column | str] = None, exclude_cols: FrozenSet[Column | str] = None, use_cache_as_source: bool | None = None, progress_frequency: int | None = None, stats_id: str | None = None, parent_stats: Statistics | None = None) Iterable[Row]
- Parameters:
criteria_list¶ – Each string value will be passed to
sqlalchemy.sql.expression.Select.where()
. https://docs.sqlalchemy.org/en/14/core/selectable.html?highlight=where#sqlalchemy.sql.expression.Select.wherecriteria_dict¶ – Dict keys should be columns, values are set using = or in
order_by¶ – List of sort keys
column_list¶ – List of columns (str or Column)
exclude_cols¶ –
use_cache_as_source¶ –
progress_frequency¶ –
stats_id¶ –
parent_stats¶ –
- Return type:
rows