bi_etl.utility.run_sql_script module
Created on Sept 12 2016
@author: Derek Wood
- class bi_etl.utility.run_sql_script.RunSQLScript(config: BI_ETL_Config_Base, database_entry: str | DatabaseMetadata, script_name: str | Path, script_path: str | Path = None, sql_replacements: Dict[str, str] = None, task_id=None, parent_task_id=None, root_task_id=None, scheduler=None, task_rec=None)[source]
Bases:
ETLTask
- CLASS_VERSION = 1.0
- DAGSTER_compute_kind = 'python'
- static ExitStack()
Convenience method to build an ExitStack
- SQLDep(sql_path: str, script_path: str = None, database: DatabaseMetadata = None) RunSQLScript
- __init__(config: BI_ETL_Config_Base, database_entry: str | DatabaseMetadata, script_name: str | Path, script_path: str | Path = None, sql_replacements: Dict[str, str] = None, task_id=None, parent_task_id=None, root_task_id=None, scheduler=None, task_rec=None)[source]
Constructor. It should do as little as possible.
- Parameters:
parent_task_id¶ – The task_id of the parent of this job
root_task_id¶ – The task_id of the root ancestor of this job
config¶ (bi_etl.config.bi_etl_config_base.BI_ETL_Config_Base) – The configuration
bi_etl.config.bi_etl_config_base.BI_ETL_Config_Base
to use (See config.ini).
- add_database(database_object)
- add_warning(warning_message)
- close(error: bool = False)
Cleanup the task. Close any registered objects, close any database connections.
- classmethod dagster_asset_definition(scope_set: Set[AssetKey] | None = None, before_all_assets: Iterable[AssetsDefinition | SourceAsset | CacheableAssetsDefinition] | None = None, *, debug: bool = False, **kwargs) AssetsDefinition
Build a dagster asset for this ETLTask class. Note: The load method can capture and return results to dagster using the
dagster_results member of the instance. Those will be passed to jobs for assets that depend on this asset.
- Parameters:
- Return type:
AssetsDefinition built using the dagster_* class method results.
- classmethod dagster_asset_key(**kwargs) AssetKey
- classmethod dagster_auto_materialize_policy(*, debug: bool = False, **kwargs) AutoMaterializePolicy | None
- auto_materialize_policy (AutoMaterializePolicy): (Experimental) Configure Dagster to automatically materialize
this asset according to its FreshnessPolicy and when upstream dependencies change.
- classmethod dagster_code_version(**kwargs) str | None
- code_version (Optional[str]): (Experimental) Version of the code that generates this asset. In
general, versions should be set only for code that deterministically produces the same output when given the same inputs.
- classmethod dagster_get_config(dagster_config: Config = None, *, debug: bool = False, **kwargs) BI_ETL_Config_Base
Build a config (subclass of BI_ETL_Config) for use by dagster runs.
- classmethod dagster_group_name(**kwargs) str | None
group_name (Optional[str]): A string name used to organize multiple assets into groups. If not provided try these defaults in this order:
The asset key prefix will be used
The name “default” is used.
- classmethod dagster_input_etl_tasks(**kwargs) List[module | Type[ETLTask]] | None
List of ETLTask subclasses that are inputs to this task. Note: This needs to return the class objects and not instances of that class.
- classmethod dagster_inputs_asset_id(**kwargs) Dict[str, AssetIn]
Starting dictionary of dagster asset inputs. Normally the inputs will come from dagster_input_etl_tasks.
- classmethod dagster_op_tags(**kwargs) Mapping[str, Any] | None
- op_tags (Optional[Dict[str, Any]]): A dictionary of tags for the op that computes the asset.
Frameworks may expect and require certain metadata to be attached to an op. Values that are not strings will be json encoded and must meet the criteria that json.loads(json.dumps(value)) == value.
- classmethod dagster_partitions_def(*, debug: bool = False, **kwargs) PartitionsDefinition | None
- partitions_def (Optional[PartitionsDefinition]): Defines the set of partition keys that
compose the asset.
- classmethod dagster_retry_policy(**kwargs) RetryPolicy | None
retry_policy (Optional[RetryPolicy]): The retry policy for the op that computes the asset.
- classmethod dagster_schedules(*, debug: bool = False, **kwargs) Sequence[ScheduleDefinition] | None
Return one or more schedules linked to this task. They don’t have to run only this task.
- classmethod dagster_sensors(*, debug: bool = False, **kwargs) Sequence[SensorDefinition] | None
Return a list of one more sensors for this task
- debug_sql(mode: bool | int = True)
Control the output of sqlalchemy engine
- Parameters:
mode¶ – Boolean (debug if True, Error if false) or int logging level.
- depends_on() Iterable[ETLTask] [source]
Override to provide a static list of tasks that this task will wait on if they are running.
Each dependent task entry should consist of either 1) The module name (str) 2) A tuple of the module name (str) and class name (str)
- property environment_name
- finish()
Placeholder for post-load cleanup. This might be useful for cleaning up what was done in
init
.
- get_database(database_name: str) DatabaseMetadata
- get_database_metadata(db_config: SQLAlchemyDatabase) DatabaseMetadata
- get_database_name()
Returns the database name (entry in config) to use for calls to get_database where no name is provided.
- Returns:
- get_notifier(channel_config: ConfigHierarchy) NotifierBase
- get_notifiers(channel_list: List[DynamicallyReferenced], auto_include_log=True) List[NotifierBase]
- get_parameter(param_name, default=Ellipsis)
Returns the value of the parameter with the name provided, or default if that is not None.
- get_sql_script_runner(script_name: str | Path, script_path: str | Path, database_entry: str | DatabaseMetadata | None = None) RunSQLScript
- get_task_singleton()
- init()
preload initialization. Runs on the execution server. Override to add setup tasks.
Note: init method is useful in cases were you wish to define a common base class with a single load method. Each inheriting class can then do its own stuff in init With init you can have the flow of execution be:
spec_class.init (if any code before super call)
base_class.init
spec_class.init (after super call, where your code should really go)
base_class.load
Note 2: Sometimes the functionality above can be achieved with __init__. However, when the scheduler thread creates an ETLTask, it creates an instance and thus runs __init__. Therefore, you will want __init__ to be as simple as possible. On the other hand, init is run only by the task execution thread. So it can safely do more time-consuming work. Again though this method is for when class inheritance is used, and that logic can not go into the load method.
Why does the scheduler create an instance? It does that in case a task needs a full instance and possibly parameter values in order to answer some of the methods like depends_on or mutually_exclusive_execution.
- internal_tasks() Iterable[ETLTask]
Override to provide a static list of tasks that this task will run internally. Can be used by the job orchestrator to build a complete dependency tree.
- load()[source]
Placeholder for load. This is where the main body of the ETLTask’s work should be performed.
- load_parameters()
Load parameters for this task from the scheduler.
- property log
Get a logger using the task name.
- log_logging_level()
- make_statistics_entry(stats_id) Statistics
- notify(channel_list: List[DynamicallyReferenced], subject, message=None, sensitive_message: str = None, attachment=None, skip_channels: set = None)
- notify_status(channel_list: List[DynamicallyReferenced], status_message: str, skip_channels: set = None)
Send a temporary status messages that gets overwritten with the next status message that is sent.
- parameter_names()
Returns a list of parameter names
- parameters()
Returns a generator yielding tuples of parameter (name,value)
- register_object(obj: ETLComponent | Statistics)
Register an ETLComponent or Statistics object with the task. This allows the task to 1) Get statistics from the component 2) Close the component when done
- run(suppress_notifications=None, handle_exceptions=True, **kwargs)
Should not generally be overridden. This is called to run the task’s code in the init, load, and finish methods.
- run_sql_script(script_name: str | Path, script_path: str | Path, database_entry: str | DatabaseMetadata)
- set_parameters(**kwargs)
Add multiple parameters to this task. Parameters can be passed in as any combination of: * dict instance. Example
set_parameters( {'param1':'example', 'param2':100} )
* list of lists. Example:set_parameters( [ ['param1','example'], ['param2',100] ] )
* list of tuples. Example:set_parameters( [ ('param1','example'), ('param2',100) ] )
* keyword arguments. Example:set_parameters(foo=1, bar='apple')
- Parameters:
kwargs¶ – keyword arguments send to parameters. See above.
- shutdown()
- property statistics
Return the execution statistics from the task and all of it’s registered components.
- property target_database: DatabaseMetadata