bi_etl.components.xlsx_reader module

Created on Apr 2, 2015

class bi_etl.components.xlsx_reader.XLSXReader(task: ETLTask | None, file_name: str | Path, logical_name: str | None = None, **kwargs)[source]

Bases: ETLComponent

XLSXReader will read rows from a Microsoft Excel XLSX formatted workbook.

Parameters:
  • task (ETLTask) – The instance to register in (if not None)

  • file_name (str) – The file_name to parse as xlsx.

  • logical_name (str) – The logical name of this source. Used for log messages.

column_names

The names to use for columns

Type:

list

header_row

The sheet row to read headers from. Default = 1.

Type:

int

start_row

The first row to parse for data. Default = header_row + 1

Type:

int

workbook

The workbook that was opened.

Type:

openpyxl.workbook.workbook.Workbook

log_first_row

Should we log progress on the first row read. Only applies if used as a source. (inherited from ETLComponent)

Type:

boolean

max_rows

The maximum number of rows to read. Only applies if Table is used as a source. (inherited from ETLComponent)

Type:

int, optional

primary_key

The name of the primary key column(s). Only impacts trace messages. Default=None. (inherited from ETLComponent)

Type:

list

progress_frequency

How often (in seconds) to output progress messages. None for no progress messages. (inherited from ETLComponent)

Type:

int

progress_message

The progress message to print. Default is "{logical_name} row # {row_number}". Note logical_name and row_number subs. (inherited from ETLComponent)

Type:

str

restkey

Column name to catch extra long rows (more columns than we have column names) when reading values (extra values).

Type:

str

restval

The value to put in columns that are in the column_names but not present in a given row when reading (missing values).

Type:

str

DEFAULT_PROGRESS_FREQUENCY = 10

Default for number of seconds between progress messages when reading from this component. See ETLComponent.progress_frequency` to override.

DEFAULT_PROGRESS_MESSAGE = '{logical_name} current row # {row_number:,}'

Default progress message when reading from this component. See ETLComponent.progress_message` to override.

FULL_ITERATION_HEADER = 'full'

Constant value passed into ETLComponent.Row() to request all columns in the row. Deprecated: Please use ETLComponent.full_row_instance() to get a row with all columns.

Row(data: MutableMapping | Iterator | None = None, iteration_header: RowIterationHeader | str | None = None) Row

Make a new empty row with this components structure.

__init__(task: ETLTask | None, file_name: str | Path, logical_name: str | None = None, **kwargs)[source]
property active_worksheet
property active_worksheet_name: str
build_row(source_row: Row, source_excludes: frozenset | None = None, target_excludes: frozenset | None = None, stat_name: str = 'build_row_safe', parent_stats: Statistics | None = None) Row

Use a source row to build a row with correct data types for this table.

Parameters:
  • source_row

  • source_excludes

  • target_excludes

  • stat_name – Name of this step for the ETLTask statistics. Default = ‘build rows’

  • parent_stats

Return type:

Row

build_row_dynamic_source(source_row: Row, source_excludes: frozenset | None = None, target_excludes: frozenset | None = None, stat_name: str = 'build_row_dynamic_source', parent_stats: Statistics | None = None) Row

Use a source row to build a row with correct data types for this table. This version expects dynamically changing source rows, so it sanity checks all rows.

Parameters:
  • source_row

  • source_excludes

  • target_excludes

  • stat_name – Name of this step for the ETLTask statistics. Default = ‘build rows’

  • parent_stats

Return type:

Row

cache_commit()
cache_row(row: Row, allow_update: bool = False, allow_insert: bool = True)
property check_row_limit
clear_cache()

Clear all lookup caches. Sets to un-cached state (unknown state v.s. empty state which is what init_cache gives)

clear_statistics()
close(error: bool = False)[source]
property column_names: List[str]

The list of column names for this component.

property column_names_set: set

A set containing the column names for this component. Usable to quickly check if the component contains a certain column.

debug_log(state: bool = True)
define_lookup(lookup_name: str, lookup_keys: list, lookup_class: Type[Lookup] = None, lookup_class_kwargs: dict | None = None)

Define a new lookup.

Parameters:
  • lookup_name – Name for the lookup. Used to refer to it later.

  • lookup_keys – list of lookup key columns

  • lookup_class – Optional python class to use for the lookup. Defaults to value of default_lookup_class attribute.

  • lookup_class_kwargs – Optional dict of additional parameters to pass to lookup constructor. Defaults to empty dict.

property empty_iteration_header: RowIterationHeader
fill_cache(progress_frequency: float = 10, progress_message='{component} fill_cache current row # {row_number:,}', criteria_list: list = None, criteria_dict: dict = None, column_list: list = None, exclude_cols: frozenset = None, order_by: list = None, assume_lookup_complete: bool = None, allow_duplicates_in_src: bool = False, row_limit: int = None, parent_stats: Statistics = None)

Fill all lookup caches from the table.

Parameters:
  • progress_frequency – How often (in seconds) to output progress messages. Default 10. None for no progress messages.

  • progress_message – The progress message to print. Default is "{component} fill_cache current row # {row_number:,}". Note logical_name and row_number substitutions applied via format().

  • criteria_list – Each string value will be passed to sqlalchemy.sql.expression.Select.where(). https://goo.gl/JlY9us

  • criteria_dict – Dict keys should be columns, values are set using = or in

  • column_list – List of columns to include

  • exclude_cols (frozenset) – Optional. Columns to exclude when filling the cache

  • order_by – list of columns to sort by when filling the cache (helps range caches)

  • assume_lookup_complete – Should later lookup calls assume the cache is complete? If so, lookups will raise an Exception if a key combination is not found. Default to False if filtering criteria was used, otherwise defaults to True.

  • allow_duplicates_in_src – Should we quietly let the source provide multiple rows with the same key values? Default = False

  • row_limit – limit on number of rows to cache.

  • row_limit – limit on number of rows to cache.

  • parent_stats – Optional Statistics object to nest this steps statistics in. Default is to place statistics in the ETLTask level statistics.

fill_cache_from_source(source: ETLComponent, progress_frequency: float = 10, progress_message='{component} fill_cache current row # {row_number:,}', criteria_list: list = None, criteria_dict: dict = None, column_list: list = None, exclude_cols: frozenset = None, order_by: list = None, assume_lookup_complete: bool = None, allow_duplicates_in_src: bool = False, row_limit: int = None, parent_stats: Statistics = None)

Fill all lookup caches from the database table. Note that filtering criteria can be specified so that the resulting cache is not the entire current contents. See assume_lookup_complete for how the lookup will handle cache misses – note only database table backed components have the ability to fall back to querying the existing data on cache misses.

Parameters:
  • source – Source component to get rows from.

  • progress_frequency – How often (in seconds) to output progress messages. Default 10. None for no progress messages.

  • progress_message – The progress message to print. Default is "{component} fill_cache current row # {row_number:,}". Note logical_name and row_number substitutions applied via format().

  • criteria_list – Each string value will be passed to sqlalchemy.sql.expression.Select.where(). https://goo.gl/JlY9us

  • criteria_dict – Dict keys should be columns, values are set using = or in

  • column_list – List of columns to include

  • exclude_cols – Optional. Columns to exclude when filling the cache

  • order_by – list of columns to sort by when filling the cache (helps range caches)

  • assume_lookup_complete – Should later lookup calls assume the cache is complete? If so, lookups will raise an Exception if a key combination is not found. Default to False if filtering criteria was used, otherwise defaults to True.

  • allow_duplicates_in_src – Should we quietly let the source provide multiple rows with the same key values? Default = False

  • row_limit – limit on number of rows to cache.

  • parent_stats – Optional Statistics object to nest this steps statistics in. Default is to place statistics in the ETLTask level statistics.

property full_iteration_header: RowIterationHeader
full_row_instance(data: MutableMapping | Iterator | None = None) Row

Build a full row (all columns) using the source data.

Note: If data is passed here, it uses bi_etl.components.row.row.Row.update() to map the data into the columns. That is nicely automatic, but slower since it has to try various ways to read the data container object.

Consider using the appropriate one of the more specific update methods based on the source data container.

generate_iteration_header(logical_name: str | None = None, columns_in_order: list | None = None, result_primary_key: list | None = None) RowIterationHeader
get_by_lookup(lookup_name: str, source_row: Row, stats_id: str = 'get_by_lookup', parent_stats: Statistics | None = None, fallback_to_db: bool = False) Row

Get by an alternate key. Returns a Row

Throws:

NoResultFound

get_column_name(column: str)
get_lookup(lookup_name: str) Lookup
get_lookup_keys(lookup_name: str) list
get_lookup_tuple(lookup_name: str, row: Row) tuple
get_qualified_lookup_name(base_lookup_name: str) str
get_sheet_by_name(name)[source]

Returns a worksheet by its name.

Parameters:

name (str) – The name of the worksheet to look for

Returns:

Worksheet object, or None if no worksheet has the name specified.

Return type:

openpyxl.worksheet.worksheet.Worksheet

get_sheet_names()[source]
get_stats_entry(stats_id: str, parent_stats: Statistics | None = None, print_start_stop_times: bool | None = None)
get_unique_stats_entry(stats_id: str, parent_stats: Statistics | None = None, print_start_stop_times: bool | None = None)
has_workbook_init() bool[source]
property header_row: int

The sheet row to read headers from. Default = 1.

init_cache()

Initialize all lookup caches as empty.

property is_closed
iter_result(result_list: object, columns_in_order: list | None = None, criteria_dict: dict | None = None, logical_name: str | None = None, progress_frequency: int | None = None, stats_id: str | None = None, parent_stats: Statistics | None = None) Iterable[Row]
Yields:

row (Row) – next row

static kwattrs_order() Dict[str, int]

Certain values need to be set before others in order to work correctly. This method should return a dict mapping those key values = arg name to a value less than the default of 9999, which will be used for any arg not explicitly listed here.

property line_num

The current line number in the source file. line_num differs from rows_read in that rows_read deals with rows that would be returned to the caller

log_progress(row: Row, stats: Statistics)
logging_level_reported = False

Has the logging level of this component been reported (logged) yet? Stored at class level so that it can be logged only once.

property lookups
property primary_key: list

The name of the primary key column(s). Only impacts trace messages. Default=Empty list.

property primary_key_tuple: tuple

The name of the primary key column(s) in a tuple. Used when a hashable PK definition is needed.

property progress_frequency: int

How often (in seconds) to output progress messages. None for no progress messages.

read_header_row()[source]
property row_name: str
property rows_read: int

int The number of rows read and returned.

sanity_check_example_row(example_source_row, source_excludes=None, target_excludes=None, ignore_source_not_in_target=None, ignore_target_not_in_source=None)
sanity_check_source_mapping(source_definition: ETLComponent, source_name: str = None, source_excludes: frozenset = None, target_excludes: frozenset = None, ignore_source_not_in_target: bool = None, ignore_target_not_in_source: bool = None, raise_on_source_not_in_target: bool = None, raise_on_target_not_in_source: bool = None)
set_active_worksheet_by_name(sheet_name: str)[source]
set_active_worksheet_by_number(sheet_number: int)[source]

Change to an existing worksheet based on the sheet number.

set_kwattrs(**kwargs)
property start_row: int

The sheet row to start reading data from. Default = header_row + 1

property statistics
property trace_data: bool

boolean Should a debug message be printed with the parsed contents (as columns) of each row.

uncache_row(row)
uncache_where(key_names, key_values_dict)
where(criteria_list: list | None = None, criteria_dict: dict | None = None, order_by: list | None = None, column_list: List[Column | str] = None, exclude_cols: FrozenSet[Column | str] = None, use_cache_as_source: bool | None = None, progress_frequency: int | None = None, stats_id: str | None = None, parent_stats: Statistics | None = None) Iterable[Row]
Parameters:
Return type:

rows

property workbook