Source code for bi_etl.bulk_loaders.bulk_loader_exception
[docs]class BulkLoaderException(Exception):
[docs] def __init__(self, base_exception, message=None, password=None):
self.base_exception = base_exception
self.message = str(base_exception)
if message is not None:
self.message += message
if password is not None:
self.remove_password(password)
self.errors_set = set()
[docs] def add_error(self, error):
self.errors_set.add(str(error))
[docs] def remove_password(self, password):
if password in self.message:
self.message = self.message.replace(password, '*' * 8)
def __repr__(self):
return f"BulkLoaderException({type(self.base_exception)},message=({str(self)})"
def __str__(self):
msg = f"{self.message})"
if len(self.errors_set) > 0:
msg += '\n'
msg += 'BULK LOAD ERROR SUMMARY:\n'
msg += '------------------------\n'
msg += '\n'.join(self.errors_set)
return msg