bi_etl.utility.run_sql_script module

Created on Sept 12 2016

@author: Derek Wood

class bi_etl.utility.run_sql_script.RunSQLScript(config: BI_ETL_Config_Base, database_entry: str | DatabaseMetadata, script_name: str | Path, script_path: str | Path = None, sql_replacements: Dict[str, str] = None, task_id=None, parent_task_id=None, root_task_id=None, scheduler=None, task_rec=None)[source]

Bases: ETLTask

DAGSTER_compute_kind = 'python'
static ExitStack()

Convenience method to build an ExitStack

PythonDep(etl_class_str: str) ETLTask
SQLDep(sql_path: str, script_path: str = None, database: DatabaseMetadata = None) RunSQLScript
__init__(config: BI_ETL_Config_Base, database_entry: str | DatabaseMetadata, script_name: str | Path, script_path: str | Path = None, sql_replacements: Dict[str, str] = None, task_id=None, parent_task_id=None, root_task_id=None, scheduler=None, task_rec=None)[source]

Constructor. It should do as little as possible.

auto_close(ctx_mgr: Any) Any
close(error: bool = False)

Cleanup the task. Close any registered objects, close any database connections.

classmethod dagster_asset_definition(scope_set: Set[AssetKey] | None = None, before_all_assets: Iterable[AssetsDefinition | SourceAsset | CacheableAssetsDefinition] | None = None, *, debug: bool = False, **kwargs) AssetsDefinition

Build a dagster asset for this ETLTask class. Note: The load method can capture and return results to dagster using the

dagster_results member of the instance. Those will be passed to jobs for assets that depend on this asset.

  • scope_set – A set of other assets that are in the current scope. If this is provided, it will be used to filter the dependencies to assets in the set.

  • debug – True to print debug messages.

  • kwargs – Placeholder for other arguments

Return type:

AssetsDefinition built using the dagster_* class method results.

classmethod dagster_asset_key(**kwargs) AssetKey
classmethod dagster_auto_materialize_policy(*, debug: bool = False, **kwargs) AutoMaterializePolicy | None
auto_materialize_policy (AutoMaterializePolicy): (Experimental) Configure Dagster to automatically materialize

this asset according to its FreshnessPolicy and when upstream dependencies change.

classmethod dagster_code_version(**kwargs) str | None
code_version (Optional[str]): (Experimental) Version of the code that generates this asset. In

general, versions should be set only for code that deterministically produces the same output when given the same inputs.

property dagster_context: AssetExecutionContext | None
classmethod dagster_description(**kwargs) str | None
classmethod dagster_freshness_policy(*, debug: bool = False, **kwargs) FreshnessPolicy | None
classmethod dagster_get_config(dagster_config: Config = None, *, debug: bool = False, **kwargs) BI_ETL_Config_Base

Build a config (subclass of BI_ETL_Config) for use by dagster runs.

classmethod dagster_group_name(**kwargs) str | None

group_name (Optional[str]): A string name used to organize multiple assets into groups. If not provided try these defaults in this order:

  • The asset key prefix will be used

  • The name “default” is used.

classmethod dagster_input_etl_tasks(**kwargs) List[module | Type[ETLTask]] | None

List of ETLTask subclasses that are inputs to this task. Note: This needs to return the class objects and not instances of that class.

classmethod dagster_inputs_asset_id(**kwargs) Dict[str, AssetIn]

Starting dictionary of dagster asset inputs. Normally the inputs will come from dagster_input_etl_tasks.

classmethod dagster_job_name(**kwargs) str
classmethod dagster_key_prefix(**kwargs) Sequence[str]
classmethod dagster_op_tags(**kwargs) Mapping[str, Any] | None
op_tags (Optional[Dict[str, Any]]): A dictionary of tags for the op that computes the asset.

Frameworks may expect and require certain metadata to be attached to an op. Values that are not strings will be json encoded and must meet the criteria that json.loads(json.dumps(value)) == value.

classmethod dagster_partitions_def(*, debug: bool = False, **kwargs) PartitionsDefinition | None
partitions_def (Optional[PartitionsDefinition]): Defines the set of partition keys that

compose the asset.

classmethod dagster_retry_policy(**kwargs) RetryPolicy | None

retry_policy (Optional[RetryPolicy]): The retry policy for the op that computes the asset.

classmethod dagster_schedules(*, debug: bool = False, **kwargs) Sequence[ScheduleDefinition] | None

Return one or more schedules linked to this task. They don’t have to run only this task.

classmethod dagster_sensors(*, debug: bool = False, **kwargs) Sequence[SensorDefinition] | None

Return a list of one more sensors for this task

debug_sql(mode: bool | int = True)

Control the output of sqlalchemy engine


mode – Boolean (debug if True, Error if false) or int logging level.

dependency_full_set(parents: tuple = None) FrozenSet[ETLTask]
depends_on() Iterable[ETLTask][source]

Override to provide a static list of tasks that this task will wait on if they are running.

Each dependent task entry should consist of either 1) The module name (str) 2) A tuple of the module name (str) and class name (str)

property environment_name

Placeholder for post-load cleanup. This might be useful for cleaning up what was done in init.

get_database(database_name: str) DatabaseMetadata
get_database_metadata(db_config: SQLAlchemyDatabase) DatabaseMetadata

Returns the database name (entry in config) to use for calls to get_database where no name is provided.


static get_etl_task_instance(input: module | Type[ETLTask]) Type[ETLTask]
static get_etl_task_list(input_list: List[module | Type[ETLTask]] | None) List[Type[ETLTask]]
get_notifier(channel_config: ConfigHierarchy) NotifierBase
get_notifiers(channel_list: List[DynamicallyReferenced], auto_include_log=True) List[NotifierBase]
get_parameter(param_name, default=Ellipsis)

Returns the value of the parameter with the name provided, or default if that is not None.

  • param_name (str) – The parameter to retrieve

  • default (any) – The default value. Default default = None


ParameterError: – If named parameter does not exist and no default is provided.

get_sql_script_runner(script_name: str | Path, script_path: str | Path, database_entry: str | DatabaseMetadata | None = None) RunSQLScript

preload initialization. Runs on the execution server. Override to add setup tasks.

Note: init method is useful in cases were you wish to define a common base class with a single load method. Each inheriting class can then do its own stuff in init With init you can have the flow of execution be:

  1. spec_class.init (if any code before super call)

  2. base_class.init

  3. spec_class.init (after super call, where your code should really go)

  4. base_class.load

Note 2: Sometimes the functionality above can be achieved with __init__. However, when the scheduler thread creates an ETLTask, it creates an instance and thus runs __init__. Therefore, you will want __init__ to be as simple as possible. On the other hand, init is run only by the task execution thread. So it can safely do more time-consuming work. Again though this method is for when class inheritance is used, and that logic can not go into the load method.

Why does the scheduler create an instance? It does that in case a task needs a full instance and possibly parameter values in order to answer some of the methods like depends_on or mutually_exclusive_execution.

internal_tasks() Iterable[ETLTask]

Override to provide a static list of tasks that this task will run internally. Can be used by the job orchestrator to build a complete dependency tree.

static is_etl_task(item: Any)

Placeholder for load. This is where the main body of the ETLTask’s work should be performed.


Load parameters for this task from the scheduler.

load_parameters_from_dict(parameters: dict)
property log

Get a logger using the task name.

make_statistics_entry(stats_id) Statistics
property name: str

Return value needs to be compatible with find_etl_class



notify(channel_list: List[DynamicallyReferenced], subject, message=None, sensitive_message: str = None, attachment=None, skip_channels: set = None)
notify_status(channel_list: List[DynamicallyReferenced], status_message: str, skip_channels: set = None)

Send a temporary status messages that gets overwritten with the next status message that is sent.

  • channel_list

  • status_message

  • skip_channels


Returns a list of parameter names


Returns a generator yielding tuples of parameter (name,value)

register_object(obj: ETLComponent | Statistics)

Register an ETLComponent or Statistics object with the task. This allows the task to 1) Get statistics from the component 2) Close the component when done

run(suppress_notifications=None, handle_exceptions=True, **kwargs)

Should not generally be overridden. This is called to run the task’s code in the init, load, and finish methods.

run_sql_script(script_name: str | Path, script_path: str | Path, database_entry: str | DatabaseMetadata)
property script_full_name: Path
set_parameter(param_name: str, param_value: object)

Add a single parameter to this task.

  • param_name (str) – The name of the parameter to add

  • param_value (object) – The value of the parameter


Add multiple parameters to this task. Parameters can be passed in as any combination of: * dict instance. Example set_parameters( {'param1':'example', 'param2':100} ) * list of lists. Example: set_parameters( [ ['param1','example'], ['param2',100] ] ) * list of tuples. Example: set_parameters( [ ('param1','example'), ('param2',100) ] ) * keyword arguments. Example: set_parameters(foo=1, bar='apple')


kwargs – keyword arguments send to parameters. See above.

property statistics

Return the execution statistics from the task and all of it’s registered components.

property target_database: DatabaseMetadata