bi_etl.utility.run_sql_script module
Created on Sept 12 2016
@author: Derek Wood
- class bi_etl.utility.run_sql_script.RunSQLScript(config: BI_ETL_Config_Base, database_entry: str | DatabaseMetadata, script_name: str | Path, script_path: str | Path = None, sql_replacements: Dict[str, str] = None, task_id=None, parent_task_id=None, root_task_id=None, scheduler=None, task_rec=None)[source]
- Bases: - ETLTask- CLASS_VERSION = 1.0
 - DAGSTER_compute_kind = 'python'
 - static ExitStack()
- Convenience method to build an ExitStack 
 - SQLDep(sql_path: str, script_path: str = None, database: DatabaseMetadata = None) RunSQLScript
 - __init__(config: BI_ETL_Config_Base, database_entry: str | DatabaseMetadata, script_name: str | Path, script_path: str | Path = None, sql_replacements: Dict[str, str] = None, task_id=None, parent_task_id=None, root_task_id=None, scheduler=None, task_rec=None)[source]
- Constructor. It should do as little as possible. - Parameters:
- parent_task_id¶ – The task_id of the parent of this job 
- root_task_id¶ – The task_id of the root ancestor of this job 
- config¶ (bi_etl.config.bi_etl_config_base.BI_ETL_Config_Base) – The configuration - bi_etl.config.bi_etl_config_base.BI_ETL_Config_Baseto use (See config.ini).
 
 
 - add_database(database_object)
 - add_warning(warning_message)
 - close(error: bool = False)
- Cleanup the task. Close any registered objects, close any database connections. 
 - classmethod dagster_asset_definition(scope_set: Set[AssetKey] | None = None, before_all_assets: Iterable[AssetsDefinition | SourceAsset | CacheableAssetsDefinition] | None = None, *, debug: bool = False, **kwargs) AssetsDefinition
- Build a dagster asset for this ETLTask class. Note: The load method can capture and return results to dagster using the - dagster_results member of the instance. Those will be passed to jobs for assets that depend on this asset. - Parameters:
- Return type:
- AssetsDefinition built using the dagster_* class method results. 
 
 - classmethod dagster_asset_key(**kwargs) AssetKey
 - classmethod dagster_auto_materialize_policy(*, debug: bool = False, **kwargs) AutoMaterializePolicy | None
- auto_materialize_policy (AutoMaterializePolicy): (Experimental) Configure Dagster to automatically materialize
- this asset according to its FreshnessPolicy and when upstream dependencies change. 
 
 - classmethod dagster_code_version(**kwargs) str | None
- code_version (Optional[str]): (Experimental) Version of the code that generates this asset. In
- general, versions should be set only for code that deterministically produces the same output when given the same inputs. 
 
 - classmethod dagster_get_config(dagster_config: Config = None, *, debug: bool = False, **kwargs) BI_ETL_Config_Base
- Build a config (subclass of BI_ETL_Config) for use by dagster runs. 
 - classmethod dagster_group_name(**kwargs) str | None
- group_name (Optional[str]): A string name used to organize multiple assets into groups. If not provided try these defaults in this order: - The asset key prefix will be used 
- The name “default” is used. 
 
 - classmethod dagster_input_etl_tasks(**kwargs) List[module | Type[ETLTask]] | None
- List of ETLTask subclasses that are inputs to this task. Note: This needs to return the class objects and not instances of that class. 
 - classmethod dagster_inputs_asset_id(**kwargs) Dict[str, AssetIn]
- Starting dictionary of dagster asset inputs. Normally the inputs will come from dagster_input_etl_tasks. 
 - classmethod dagster_op_tags(**kwargs) Mapping[str, Any] | None
- op_tags (Optional[Dict[str, Any]]): A dictionary of tags for the op that computes the asset.
- Frameworks may expect and require certain metadata to be attached to an op. Values that are not strings will be json encoded and must meet the criteria that json.loads(json.dumps(value)) == value. 
 
 - classmethod dagster_partitions_def(*, debug: bool = False, **kwargs) PartitionsDefinition | None
- partitions_def (Optional[PartitionsDefinition]): Defines the set of partition keys that
- compose the asset. 
 
 - classmethod dagster_retry_policy(**kwargs) RetryPolicy | None
- retry_policy (Optional[RetryPolicy]): The retry policy for the op that computes the asset. 
 - classmethod dagster_schedules(*, debug: bool = False, **kwargs) Sequence[ScheduleDefinition] | None
- Return one or more schedules linked to this task. They don’t have to run only this task. 
 - classmethod dagster_sensors(*, debug: bool = False, **kwargs) Sequence[SensorDefinition] | None
- Return a list of one more sensors for this task 
 - debug_sql(mode: bool | int = True)
- Control the output of sqlalchemy engine - Parameters:
- mode¶ – Boolean (debug if True, Error if false) or int logging level. 
 
 - depends_on() Iterable[ETLTask][source]
- Override to provide a static list of tasks that this task will wait on if they are running. - Each dependent task entry should consist of either 1) The module name (str) 2) A tuple of the module name (str) and class name (str) 
 - property environment_name
 - finish()
- Placeholder for post-load cleanup. This might be useful for cleaning up what was done in - init.
 - get_database(database_name: str) DatabaseMetadata
 - get_database_metadata(db_config: SQLAlchemyDatabase) DatabaseMetadata
 - get_database_name()
- Returns the database name (entry in config) to use for calls to get_database where no name is provided. - Returns:
 
 - get_notifier(channel_config: ConfigHierarchy) NotifierBase
 - get_notifiers(channel_list: List[DynamicallyReferenced], auto_include_log=True) List[NotifierBase]
 - get_parameter(param_name, default=Ellipsis)
- Returns the value of the parameter with the name provided, or default if that is not None. 
 - get_sql_script_runner(script_name: str | Path, script_path: str | Path, database_entry: str | DatabaseMetadata | None = None) RunSQLScript
 - get_task_singleton()
 - init()
- preload initialization. Runs on the execution server. Override to add setup tasks. - Note: init method is useful in cases were you wish to define a common base class with a single load method. Each inheriting class can then do its own stuff in init With init you can have the flow of execution be: - spec_class.init (if any code before super call) 
- base_class.init 
- spec_class.init (after super call, where your code should really go) 
- base_class.load 
 - Note 2: Sometimes the functionality above can be achieved with __init__. However, when the scheduler thread creates an ETLTask, it creates an instance and thus runs __init__. Therefore, you will want __init__ to be as simple as possible. On the other hand, init is run only by the task execution thread. So it can safely do more time-consuming work. Again though this method is for when class inheritance is used, and that logic can not go into the load method. - Why does the scheduler create an instance? It does that in case a task needs a full instance and possibly parameter values in order to answer some of the methods like depends_on or mutually_exclusive_execution. 
 - internal_tasks() Iterable[ETLTask]
- Override to provide a static list of tasks that this task will run internally. Can be used by the job orchestrator to build a complete dependency tree. 
 - load()[source]
- Placeholder for load. This is where the main body of the ETLTask’s work should be performed. 
 - load_parameters()
- Load parameters for this task from the scheduler. 
 - property log
- Get a logger using the task name. 
 - log_logging_level()
 - make_statistics_entry(stats_id) Statistics
 - notify(channel_list: List[DynamicallyReferenced], subject, message=None, sensitive_message: str = None, attachment=None, skip_channels: set = None)
 - notify_status(channel_list: List[DynamicallyReferenced], status_message: str, skip_channels: set = None)
- Send a temporary status messages that gets overwritten with the next status message that is sent. 
 - parameter_names()
- Returns a list of parameter names 
 - parameters()
- Returns a generator yielding tuples of parameter (name,value) 
 - register_object(obj: ETLComponent | Statistics)
- Register an ETLComponent or Statistics object with the task. This allows the task to 1) Get statistics from the component 2) Close the component when done 
 - run(suppress_notifications=None, handle_exceptions=True, **kwargs)
- Should not generally be overridden. This is called to run the task’s code in the init, load, and finish methods. 
 - run_sql_script(script_name: str | Path, script_path: str | Path, database_entry: str | DatabaseMetadata)
 - set_parameters(**kwargs)
- Add multiple parameters to this task. Parameters can be passed in as any combination of: * dict instance. Example - set_parameters( {'param1':'example', 'param2':100} )* list of lists. Example:- set_parameters( [ ['param1','example'], ['param2',100] ] )* list of tuples. Example:- set_parameters( [ ('param1','example'), ('param2',100) ] )* keyword arguments. Example:- set_parameters(foo=1, bar='apple')- Parameters:
- kwargs¶ – keyword arguments send to parameters. See above. 
 
 - shutdown()
 - property statistics
- Return the execution statistics from the task and all of it’s registered components. 
 - property target_database: DatabaseMetadata