Source code for bi_etl.informatica.pmcmd

"""
Created on May 5, 2015

@author: Derek Wood
"""

import errno
import logging
import os
import subprocess
import sys

from bi_etl.informatica.pm_config import PMCMDConfig


[docs] class PMCMD(object): """ classdocs """ CONFIG_INFORMATICA_COMMANDS = 'INFORMATICA_COMMANDS'
[docs] def __init__(self, config: PMCMDConfig, folder=None, parmfile=None, localparamfile=None, osprofile=None, ): self.config = config self.log = logging.getLogger(f"{self.__class__.__module__}.{self.__class__.__name__}") self.f_dev_null = open(os.devnull, 'w') self.control_file_name = "Control_import_No_folder_rep_change.xml" self.setup_inf_path() self._folder = folder self._parmfile = parmfile self._localparamfile = localparamfile self._osprofile = osprofile
[docs] def infa_home(self): if 'INFA_HOME' in os.environ: return os.environ['INFA_HOME'] else: return self.config.INFA_HOME
[docs] def setup_inf_path(self): userDir = os.path.expanduser('~') if sys.platform == 'posix': os.environ['PATH'] = os.path.join(userDir, 'bin') + ':' + os.path.join(self.infa_home(), 'server', 'bin') + ':/usr/bin' os.environ['LD_LIBRARY_PATH'] = os.path.join(self.infa_home(), 'server', 'bin') if 'INFA_DOMAINS_FILE' not in os.environ or os.environ['INFA_DOMAINS_FILE'] is None: if self.config.INFA_DOMAINS_FILE is None: raise ValueError(f"INFA_DOMAINS_FILE required in environemnt varialble or config file.") else: # noinspection PyTypeChecker os.environ['INFA_DOMAINS_FILE'] = self.config.INFA_DOMAINS_FILE
[docs] def informatica_bin_dir(self): return os.path.join(self.infa_home(), 'server', 'bin')
[docs] def informatica_pmcmd(self): return os.path.join(self.informatica_bin_dir(), 'pmcmd')
[docs] def usersecuritydomain(self): return self.config.USER_SECURITY_DOMAIN
[docs] def user_id(self): return self.config.user_id
[docs] def password(self): return self.config.get_password()
[docs] def set_password_in_env(self): os.environ['INFA_PM_PASSWORD'] = self.password()
[docs] def repository(self): return self.config.REPOSITORY
[docs] def service(self): return self.config.SERVICE
[docs] def domain(self): return self.config.DOMAIN
[docs] def folder(self): if self._folder is None: self._folder = self.config.DEFAULT_FOLDER return self._folder
[docs] def parmfile(self): if self._parmfile is None: self._parmfile = self.config.DEFAULT_PARMFILE return self._parmfile
[docs] def localparamfile(self): if self._localparamfile is None: self._localparamfile = self.config.DEFAULT_LOCALPARAMFILE return self._localparamfile
[docs] def osprofile(self): if self._osprofile is None: self._osprofile = self.config.OSPROFILE return self._osprofile
[docs] def run_via_cmd(self): return self.config.RUN_VIA_CMD
[docs] def startworkflow(self, workflow, runinsname=None, ): """ runinsname is an optional instance name for this run """ cmd = [] if sys.platform == 'win32' and self.run_via_cmd(): cmd.append('cmd') cmd.append('/C') cmd.append(self.informatica_pmcmd()) cmd.append('startworkflow') cmd.append('-service') cmd.append(self.service()) cmd.append('-d') cmd.append(self.domain()) usersecuritydomain = self.usersecuritydomain() if usersecuritydomain: cmd.append('-usersecuritydomain') cmd.append(usersecuritydomain) cmd.append('-u') cmd.append(self.user_id()) cmd.append('-passwordvar') self.set_password_in_env() cmd.append('INFA_PM_PASSWORD') # Informatica will read environment variable cmd.append('-f') cmd.append(self.folder()) parmfile = self.parmfile() if parmfile: cmd.append('-paramfile') cmd.append(parmfile) localparamfile = self.localparamfile() if localparamfile: cmd.append('-localparamfile') cmd.append(localparamfile) osprofile = self.osprofile() if osprofile: cmd.append('-osprofile') cmd.append(osprofile) if runinsname: cmd.append('-runinsname') cmd.append(runinsname) cmd.append('-wait') cmd.append(workflow) self.log.debug(" ".join(cmd)) self.log.info("Executing pmcmd startworkflow") while True: try: messages = subprocess.check_output(cmd, stderr=subprocess.STDOUT) self.log.info(messages) break # exit while True loop # TODO: Parse the run id from the line below that comes in stdout and return it. # Workflow wf_TEST_Derek with run instance name [] and run id [113759] started successfully. # TODO: python 3.3+ catch InterruptedError # See https://www.python.org/dev/peps/pep-0475/ except IOError as e: if e.errno != errno.EINTR: raise e # Else loop except subprocess.CalledProcessError as e: self.log.error("Error code " + str(e.returncode)) self.log.error("From " + ' '.join(e.cmd)) self.log.error(e.output) raise e
[docs] def getworkflowdetails(self, workflow, workflow_run_id=None, runinsname=None, ): cmd = [] if sys.platform == 'win32' and self.run_via_cmd(): cmd.append('cmd') cmd.append('/C') cmd.append(self.informatica_pmcmd()) cmd.append('getworkflowdetails') cmd.append('-service') cmd.append(self.service()) cmd.append('-d') cmd.append(self.domain()) usersecuritydomain = self.usersecuritydomain() if usersecuritydomain: cmd.append('-usersecuritydomain') cmd.append(usersecuritydomain) cmd.append('-u') cmd.append(self.user_id()) cmd.append('-passwordvar') self.set_password_in_env() cmd.append('INFA_PM_PASSWORD') # Informatica will read environment variable cmd.append('-f') cmd.append(self.folder()) if runinsname: cmd.append('-runinsname') cmd.append(runinsname) if workflow_run_id: cmd.append('-wfrunid') cmd.append(workflow_run_id) cmd.append(workflow) self.log.debug(" ".join(cmd)) self.log.info("Executing pmcmd getworkflowdetails") try: messages = subprocess.check_output(cmd, stderr=subprocess.STDOUT) self.log.info(messages) except subprocess.CalledProcessError as e: self.log.error("Error code " + str(e.returncode)) self.log.error("From " + ' '.join(e.cmd)) self.log.error(e.output) raise e
if __name__ == '__main__': test_config = PMCMDConfig() pcmd = PMCMD(folder='MASTER', config=test_config) test_workflow = 'Invalid_Name' pcmd.startworkflow(test_workflow) pcmd.getworkflowdetails(test_workflow)