Source code for bi_etl.informatica.pmcmd_task
"""
Created on May 5, 2015
@author: Derek Wood
"""
from bi_etl.informatica.pmcmd import PMCMD
from bi_etl.scheduler.exceptions import ParameterError
from bi_etl.scheduler.task import ETLTask
[docs]
class PMCMD_Task(ETLTask):
"""
Runs Informatica Workflows
"""
[docs]
def init(self):
"""
pre-load initialization.
"""
try:
folder = self.get_parameter('folder')
except ParameterError:
## For testing purposes - Shouldn't get here
self.log.error("PMCMD_Task didn't get folder parameter. Assuming test run")
folder = 'MASTER'
self.set_parameter('folder', folder)
self.set_parameter('workflow', 'wf_TEST_Derek')
self.cmd = PMCMD(config=self.config, folder= folder)
[docs]
def load(self):
workflow = self.get_parameter('workflow')
self.cmd.startworkflow(workflow)
try:
self.cmd.getworkflowdetails(workflow)
except Exception:
pass
if __name__ == '__main__':
task = PMCMD_Task()
task.set_parameter('folder', 'MASTER')
task.set_parameter('workflow', 'wf_TEST_Derek')
task.run(suppress_notifications= True)