"""
Created on May 4, 2015
@author: Derek Wood
"""
import logging
import os
import re
import shutil
import subprocess
import sys
import tempfile
from typing import Union
from bi_etl.informatica.exceptions import NoObjects
from bi_etl.informatica.pm_config import PMCMDConfig
from bi_etl.utility import dict_to_str, line_counter
[docs]
class PMREP(object):
SETTINGS_SECTION = 'INFORMATICA'
[docs]
def __init__(self, config: PMCMDConfig):
self.config = config
self.log = logging.getLogger(f"{self.__class__.__module__}.{self.__class__.__name__}")
self.f_dev_null = open(os.devnull, 'w')
self.control_file_name = "Control_import_No_folder_rep_change.xml"
self._folder = None
[docs]
def setup_inf_path(self):
user_dir = os.path.expanduser('~')
os.environ['PATH'] = ':'.join([os.path.join(user_dir, 'bin'),
self.informatica_bin_dir(),
'/usr/bin',
]
)
os.environ['LD_LIBRARY_PATH'] = self.informatica_bin_dir()
[docs]
def infa_home(self) -> Union[str, None]:
if 'INFA_HOME' in os.environ:
return os.environ['INFA_HOME']
else:
return self.config.INFA_HOME
[docs]
def usersecuritydomain(self):
return self.config.USER_SECURITY_DOMAIN
[docs]
def user_id(self):
return self.config.user_id
[docs]
def password(self):
return self.config.get_password()
[docs]
def set_password_in_env(self):
os.environ['INFA_PM_PASSWORD'] = self.password()
[docs]
def repository(self):
return self.config.REPOSITORY
[docs]
def service(self):
return self.config.SERVICE
[docs]
def domain(self):
return self.config.DOMAIN
[docs]
def folder(self):
if self._folder is None:
self._folder = self.config.DEFAULT_FOLDER
return self._folder
[docs]
def connect(self):
pmrep_cmd = [self.informatica_pmrep(),
'connect',
'-r', self.repository(),
'-d', self.domain(),
'-n', self.user_id(),
'-X', 'INFA_PM_PASSWORD'
]
self.set_password_in_env()
self.setup_inf_path()
self.log.info("pmrep Connecting to Informatica")
try:
if self.log.getEffectiveLevel() >= logging.DEBUG:
file_out = sys.stdout
else:
file_out = self.f_dev_null
subprocess.check_call(pmrep_cmd, stdout=file_out)
except subprocess.CalledProcessError as e:
self.log.error("Error code " + str(e.returncode))
self.log.error("From " + ' '.join(e.cmd))
self.log.error(e.output)
raise e
[docs]
def cleanup(self):
pmrep_cmd = [self.informatica_pmrep(), 'cleanup']
try:
messages = subprocess.check_output(pmrep_cmd, stderr=subprocess.STDOUT)
self.log.debug(messages)
except subprocess.CalledProcessError as e:
self.log.error("Error code " + str(e.returncode))
self.log.error("From " + ' '.join(e.cmd))
self.log.error(e.output)
finally:
self.f_dev_null.close()
[docs]
def get_objects(self, object_type, folder_name):
# print "pmrep ListObjects -o " + objectType + ' -f ' + folderName
object_list = list()
process = subprocess.Popen(
[
self.informatica_pmrep(),
'ListObjects',
'-o', object_type,
'-f', folder_name
],
stdout=subprocess.PIPE
)
count = 0
found_invoked = False
found_blank_line = False
for line in iter(process.stdout.readline, ''):
# End on line .ListObjects completed successfully.
if line.startswith(b'.ListObjects'):
break
if found_blank_line:
count += 1
# if count >= 15: break
parts = line.rstrip(b'\n').split(b' ')
subtype = parts[0]
if len(parts) == 2:
reusable = 'reusable'
name = parts[1]
else:
reusable = parts[1]
name = parts[2]
# print "parts = " + pformat(parts)
if reusable == 'reusable':
# print "subtype = " + subtype + " name = " + name
object_dict = {'objectType': object_type,
'subtype': subtype,
'name': name,
'folderName': folder_name
}
object_list.append(object_dict)
if line.startswith(b'Invoked'):
found_invoked = True
if found_invoked and line == '\n':
found_blank_line = True
return object_list
[docs]
def get_objects_from_query(self, query_name):
# pmrep executequery -q $INFA_QUERY_NAME -t shared -u ${OUTPUT_PATH}\${INFA_QUERY_NAME}_results.txt
tempDir = tempfile.mkdtemp()
temp_file = os.path.join(tempDir, 'query.out')
obj_list = list()
try:
if self.log.getEffectiveLevel() >= logging.DEBUG:
file_out = sys.stdout
else:
file_out = self.f_dev_null
subprocess.check_call([self.informatica_pmrep(),
'executequery',
'-q', query_name,
'-u', temp_file
],
stdout=file_out)
if os.path.exists(temp_file):
count = 0
with open(temp_file, 'r') as f:
for line in f:
count += 1
# if count >= 15: break
parts = line.rstrip('\n').split(',')
folder = parts[1]
name = parts[2]
object_type = parts[3]
subtype = parts[4]
# version = parts[5]
if len(parts) == 7:
reusable = parts[6]
else:
reusable = 'reusable'
if reusable == 'reusable':
# print("parts {} = 0-excluded {}" .format(len(parts),[parts[i] for i in range(1,7)]))
object_dict = {'objectType': object_type,
'subtype': subtype,
'name': name,
'folder': folder
}
obj_list.append(object_dict)
else: # query.out not created
pass
except subprocess.CalledProcessError:
raise RuntimeError("Error executing query {name}".format(name=query_name))
finally:
# Cleanup temp
shutil.rmtree(tempDir)
return obj_list
[docs]
def deleteObject(self, objectDict):
# pmrep DeleteObject -o <object_type> -f <folder_name> -n <object_name>
pmrep_cmd = [self.informatica_pmrep(),
'DeleteObject',
'-f', objectDict['folder'],
'-o', objectDict['type'],
'-n', objectDict['name']
]
# Include subtype if required
if objectDict['type'].lower() in ('task', 'transformation'):
pmrep_cmd.append('-t')
pmrep_cmd.append(objectDict['subtype'])
try:
if self.log.getEffectiveLevel() >= logging.DEBUG:
file_out = sys.stdout
else:
file_out = self.f_dev_null
subprocess.check_call(pmrep_cmd, stdout=file_out)
except subprocess.CalledProcessError as e:
self.log.error("Error code " + str(e.returncode))
self.log.error("From " + ' '.join(e.cmd))
self.log.error(e.output)
[docs]
def exportObject(self, objectDict, dependents, outputPath):
# pmrep objectexport -f $FOLDER -n "$NAME" -o "$TYPE" -t "$SUBTYPE" $DEPENDENTS_OPTIONS -u "${TYPE}s/${NAME}.xml"
pmrep_cmd = [
self.informatica_pmrep(),
'objectexport',
'-f', objectDict['folder'],
'-n', objectDict['name'],
'-o', objectDict['type']
]
# Include subtype if required
if objectDict['type'].lower() in ('task', 'transformation'):
pmrep_cmd.append('-t')
pmrep_cmd.append(objectDict['subtype'])
# include all dependents or only non-reusable dependents
if dependents:
pmrep_cmd.append('-m') # [-m (export pk-fk dependency)]
pmrep_cmd.append('-s') # [-s (export objects referred by shortcut)]
pmrep_cmd.append('-b') # [-b (export non-reusable dependents)]
pmrep_cmd.append('-r') # [-r (export reusable dependents)]
else:
pmrep_cmd.append('-b') # [-b (export non-reusable dependents)]
pmrep_cmd.append('-u')
pmrep_cmd.append(outputPath)
try:
messages = subprocess.check_output(pmrep_cmd, stderr=subprocess.STDOUT)
count_xml_lines = line_counter.bufcount(outputPath)
errors = re.findall('^.*<Warning>.*$|^.*<Error>.*$', messages, re.MULTILINE)
if len(errors) > 0:
self.log.error(errors)
# noinspection PyTypeChecker
return '\n'.join(errors)
elif count_xml_lines <= 3:
print("WARNING: No valid objects exported")
os.remove(outputPath)
raise NoObjects()
except subprocess.CalledProcessError as e:
messages = e.output
print("Error code " + str(e.returncode))
print("From " + ' '.join(e.cmd))
print(messages)
return messages
[docs]
def validateObject(self, objectDict):
#
# pmrep validate {{-n <object_name> -o <object_type (mapplet, mapping, session, worklet, workflow)>
# [-v <version_number>] [-f <folder_name>]} | -i <persistent_input_file>}
# [-s (save upon valid) [-k (check in upon valid) [-m <check_in_comments>]]]
# [-p <output_option_types (valid, saved, skipped, save_failed, invalid_before, invalid_after, or all)>
# [-u <persistent_output_file_name>] [-a (append)]
# [-c <column_separator>] [-r <end-of-record_separator>] [-l <end-of-listing_indicator>] [-b (verbose)]
#
pmrep_cmd = [self.informatica_pmrep(),
'validate',
'-f', objectDict['folder'],
'-n', objectDict['name'],
'-o', objectDict['type'],
'-s',
'-b'
]
try:
messages = subprocess.check_output(pmrep_cmd, stderr=subprocess.STDOUT)
errors = re.findall('^.*<Warning>.*$|^.*<Error>.*$', messages, re.MULTILINE)
if len(errors) > 0:
print(errors)
# noinspection PyTypeChecker
return '\n'.join(errors)
except subprocess.CalledProcessError as e:
messages = e.output
print("Error code " + str(e.returncode))
print("From " + ' '.join(e.cmd))
print(messages)
return messages
# Export only mappings with reusable dependents included
[docs]
def exportObjectAutoDependents(self, objectDict, outputPath):
if objectDict['type'].lower() == 'mapping':
dependents = True
else:
dependents = False
return self.exportObject(objectDict, dependents, outputPath)
[docs]
@staticmethod
def getFolderName(objectDict):
return objectDict['type'].capwords() + 's'
[docs]
@staticmethod
def getFileName(objectDict):
return objectDict['name'] + '.xml'
[docs]
@staticmethod
def attributesString(element):
s = element.tag
# print 'attributesString ' + xml.tostring(element)
if list(element.items()) is not None:
for attr in sorted(element.items()):
s += ' ' + ' '.join(attr)
# print 'end attributesString = ' + s
return s
[docs]
def exportObjectList(self, objectList):
messageList = list()
tempDir = tempfile.mkdtemp()
try:
newFilesDict = dict()
self.log.info("{cnt} objects to export".format(cnt=len(objectList)))
for objectDict in objectList:
self.log.debug(dict_to_str(objectDict))
fullTempDir = os.path.join(tempDir, self.getFolderName(objectDict))
os.makedirs(fullTempDir)
tempFilePath = os.path.join(fullTempDir, self.getFileName(objectDict))
self.log.info("Exporting {}/{}".format(self.getFolderName(objectDict),
self.getFileName(objectDict)
)
)
try:
messages = self.exportObject(objectDict, False, tempFilePath)
if messages is not None and len(messages) > 0:
messageList.append((self.getFileName(objectDict), messages))
targetDir = os.path.join(os.getcwd(), self.getFolderName(objectDict))
os.makedirs(targetDir)
targetFilePath = os.path.join(targetDir, self.getFileName(objectDict))
newFilesDict[targetFilePath] = 1
self.log.debug("Copying to {}".format(targetFilePath))
except NoObjects:
pass
finally:
# Cleanup temp
shutil.rmtree(tempDir)
return messageList
[docs]
def validateObjectList(self, objectList):
messageList = list()
self.log.info("{cnt} objects to validate".format(cnt=len(objectList)))
for objectDict in objectList:
self.log.debug(dict_to_str(objectDict))
self.log.info("Validating {}/{}".format(self.getFolderName(objectDict),
self.getFileName(objectDict)
)
)
try:
messages = self.validateObject(objectDict)
if messages is not None and len(messages) > 0:
messageList.append((self.getFileName(objectDict), messages))
except NoObjects:
pass
return messageList
[docs]
def importXMLFile(self, path, control_file):
# pmrep objectimport -c "${CONTROL_FILE}" -i "${FILE}" -p
pmrep_cmd = [
self.informatica_pmrep(),
'objectimport',
'-c', control_file,
'-i', path,
'-p',
]
try:
messages = subprocess.check_output(pmrep_cmd, stderr=subprocess.STDOUT)
errors = re.findall('^.*<Warning>.*$|^.*<Error>.*$', messages, re.MULTILINE)
if len(errors) > 0:
print(errors)
# noinspection PyTypeChecker
return '\n'.join(errors)
except subprocess.CalledProcessError as e:
messages = e.output
if e.returncode == 1 and messages.find('No objects to import into repository') != -1:
messages = "WARNING: No objects to import into repository"
print(messages)
return messages
else:
print("pmrep Error code " + str(e.returncode))
print("From " + ' '.join(e.cmd))
print(messages)
return messages
[docs]
def specifizeControlFile(self, controlFile, workingControlFile):
with open(controlFile, 'r') as sf:
with open(workingControlFile, 'w') as tf:
for line in sf:
# Replace generic repository name with our specific one
line = re.sub(r'impcntl.dtd',
os.path.join(self.informatica_bin_dir(), 'impcntl.dtd'),
line)
tf.write(line)
[docs]
def importFileObj(self, fileObj):
print("Importing {}".format(fileObj.name))
tempDir = tempfile.mkdtemp()
scriptDir = os.path.dirname(os.path.realpath(__file__))
try:
(_, fileName) = os.path.split(fileObj.name)
workingFile = os.path.join(tempDir, fileName)
controlFile = os.path.join(scriptDir, self.control_file_name)
workingControlFile = os.path.join(tempDir, self.control_file_name)
self.specifizeControlFile(controlFile, workingControlFile)
messages = self.importXMLFile(workingFile, workingControlFile)
except Exception as e:
messages = e
finally:
# Cleanup temp
shutil.rmtree(tempDir)
return messages
[docs]
def importFile(self, folderName, fileName):
path = os.path.join(folderName, fileName)
try:
with open(path, 'r') as sf:
messages = self.importFileObj(sf)
except Exception as e:
messages = e
return messages