bi_etl.informatica.pmcmd_task module

Created on May 5, 2015

@author: Derek Wood

class bi_etl.informatica.pmcmd_task.PMCMD_Task(config: BI_ETL_Config_Base, parent_task_id=None, root_task_id=None, **kwargs)[source]

Bases: ETLTask

Runs Informatica Workflows

DAGSTER_compute_kind = 'python'
static ExitStack()

Convenience method to build an ExitStack

PythonDep(etl_class_str: str) ETLTask
SQLDep(sql_path: str, script_path: str = None, database: DatabaseMetadata = None) RunSQLScript
__init__(config: BI_ETL_Config_Base, parent_task_id=None, root_task_id=None, **kwargs)

Constructor. It should do as little as possible.

auto_close(ctx_mgr: Any) Any
close(error: bool = False)

Cleanup the task. Close any registered objects, close any database connections.

classmethod dagster_asset_definition(scope_set: Set[AssetKey] | None = None, before_all_assets: Iterable[AssetsDefinition | SourceAsset | CacheableAssetsDefinition] | None = None, *, debug: bool = False, **kwargs) AssetsDefinition

Build a dagster asset for this ETLTask class. Note: The load method can capture and return results to dagster using the

dagster_results member of the instance. Those will be passed to jobs for assets that depend on this asset.

  • scope_set – A set of other assets that are in the current scope. If this is provided, it will be used to filter the dependencies to assets in the set.

  • debug – True to print debug messages.

  • kwargs – Placeholder for other arguments

Return type:

AssetsDefinition built using the dagster_* class method results.

classmethod dagster_asset_key(**kwargs) AssetKey
classmethod dagster_auto_materialize_policy(*, debug: bool = False, **kwargs) AutoMaterializePolicy | None
auto_materialize_policy (AutoMaterializePolicy): (Experimental) Configure Dagster to automatically materialize

this asset according to its FreshnessPolicy and when upstream dependencies change.

classmethod dagster_code_version(**kwargs) str | None
code_version (Optional[str]): (Experimental) Version of the code that generates this asset. In

general, versions should be set only for code that deterministically produces the same output when given the same inputs.

property dagster_context: AssetExecutionContext | None
classmethod dagster_description(**kwargs) str | None
classmethod dagster_freshness_policy(*, debug: bool = False, **kwargs) FreshnessPolicy | None
classmethod dagster_get_config(dagster_config: Config = None, *, debug: bool = False, **kwargs) BI_ETL_Config_Base

Build a config (subclass of BI_ETL_Config) for use by dagster runs.

classmethod dagster_group_name(**kwargs) str | None

group_name (Optional[str]): A string name used to organize multiple assets into groups. If not provided try these defaults in this order:

  • The asset key prefix will be used

  • The name “default” is used.

classmethod dagster_input_etl_tasks(**kwargs) List[module | Type[ETLTask]] | None

List of ETLTask subclasses that are inputs to this task. Note: This needs to return the class objects and not instances of that class.

classmethod dagster_inputs_asset_id(**kwargs) Dict[str, AssetIn]

Starting dictionary of dagster asset inputs. Normally the inputs will come from dagster_input_etl_tasks.

classmethod dagster_job_name(**kwargs) str
classmethod dagster_key_prefix(**kwargs) Sequence[str]
classmethod dagster_op_tags(**kwargs) Mapping[str, Any] | None
op_tags (Optional[Dict[str, Any]]): A dictionary of tags for the op that computes the asset.

Frameworks may expect and require certain metadata to be attached to an op. Values that are not strings will be json encoded and must meet the criteria that json.loads(json.dumps(value)) == value.

classmethod dagster_partitions_def(*, debug: bool = False, **kwargs) PartitionsDefinition | None
partitions_def (Optional[PartitionsDefinition]): Defines the set of partition keys that

compose the asset.

classmethod dagster_retry_policy(**kwargs) RetryPolicy | None

retry_policy (Optional[RetryPolicy]): The retry policy for the op that computes the asset.

classmethod dagster_schedules(*, debug: bool = False, **kwargs) Sequence[ScheduleDefinition] | None

Return one or more schedules linked to this task. They don’t have to run only this task.

classmethod dagster_sensors(*, debug: bool = False, **kwargs) Sequence[SensorDefinition] | None

Return a list of one more sensors for this task

debug_sql(mode: bool | int = True)

Control the output of sqlalchemy engine


mode – Boolean (debug if True, Error if false) or int logging level.

dependency_full_set(parents: tuple = None) FrozenSet[ETLTask]
depends_on() Iterable[ETLTask]

Override to provide a static list of tasks that this task will wait on if they are running.

Each dependent task entry should consist of either 1) The module name (str) 2) A tuple of the module name (str) and class name (str)

property environment_name

Placeholder for post-load cleanup. This might be useful for cleaning up what was done in init.

get_database(database_name: str) DatabaseMetadata
get_database_metadata(db_config: SQLAlchemyDatabase) DatabaseMetadata

Returns the database name (entry in config) to use for calls to get_database where no name is provided.


static get_etl_task_instance(input: module | Type[ETLTask]) Type[ETLTask]
static get_etl_task_list(input_list: List[module | Type[ETLTask]] | None) List[Type[ETLTask]]
get_notifier(channel_config: ConfigHierarchy) NotifierBase
get_notifiers(channel_list: List[DynamicallyReferenced], auto_include_log=True) List[NotifierBase]
get_parameter(param_name, default=Ellipsis)

Returns the value of the parameter with the name provided, or default if that is not None.

  • param_name (str) – The parameter to retrieve

  • default (any) – The default value. Default default = None


ParameterError: – If named parameter does not exist and no default is provided.

get_sql_script_runner(script_name: str | Path, script_path: str | Path, database_entry: str | DatabaseMetadata | None = None) bi_etl.utility.run_sql_script.RunSQLScript

pre-load initialization.

internal_tasks() Iterable[ETLTask]

Override to provide a static list of tasks that this task will run internally. Can be used by the job orchestrator to build a complete dependency tree.

static is_etl_task(item: Any)

Placeholder for load. This is where the main body of the ETLTask’s work should be performed.


Load parameters for this task from the scheduler.

load_parameters_from_dict(parameters: dict)
property log

Get a logger using the task name.

make_statistics_entry(stats_id) Statistics
property name

Return value needs to be compatible with find_etl_class



notify(channel_list: List[DynamicallyReferenced], subject, message=None, sensitive_message: str = None, attachment=None, skip_channels: set = None)
notify_status(channel_list: List[DynamicallyReferenced], status_message: str, skip_channels: set = None)

Send a temporary status messages that gets overwritten with the next status message that is sent.

  • channel_list

  • status_message

  • skip_channels


Returns a list of parameter names


Returns a generator yielding tuples of parameter (name,value)

register_object(obj: ETLComponent | Statistics)

Register an ETLComponent or Statistics object with the task. This allows the task to 1) Get statistics from the component 2) Close the component when done

run(suppress_notifications=None, handle_exceptions=True, **kwargs)

Should not generally be overridden. This is called to run the task’s code in the init, load, and finish methods.

run_sql_script(script_name: str | Path, script_path: str | Path, database_entry: str | DatabaseMetadata)
set_parameter(param_name: str, param_value: object)

Add a single parameter to this task.

  • param_name (str) – The name of the parameter to add

  • param_value (object) – The value of the parameter


Add multiple parameters to this task. Parameters can be passed in as any combination of: * dict instance. Example set_parameters( {'param1':'example', 'param2':100} ) * list of lists. Example: set_parameters( [ ['param1','example'], ['param2',100] ] ) * list of tuples. Example: set_parameters( [ ('param1','example'), ('param2',100) ] ) * keyword arguments. Example: set_parameters(foo=1, bar='apple')


kwargs – keyword arguments send to parameters. See above.

property statistics

Return the execution statistics from the task and all of it’s registered components.

property target_database: DatabaseMetadata