Source code for bi_etl.bulk_loaders.sql_server_bcp

import os
import re
import tempfile
from typing import *

from bi_etl.bulk_loaders.bulk_loader import BulkLoader
from bi_etl.components.table import Table
from bi_etl.scheduler.task import ETLTask
from bi_etl.utility.bcp_helpers import create_bcp_format_file, run_bcp, BCPError, BCP_Config


[docs] class SQLServerBCP(BulkLoader):
[docs] def __init__( self, bcp_config: BCP_Config, bcp_encoding: str = 'utf-8', ): super().__init__() self.bcp_config = bcp_config self.delimiter = '\013' self._bcp_encoding = bcp_encoding
[docs] @staticmethod def multiple_replace_safe(string, rep_dict): if string == '': return '\000' # noinspection PyTypeChecker pattern = re.compile("|".join([re.escape(k) for k in sorted(rep_dict, key=len, reverse=True)]), flags=re.DOTALL) return pattern.sub(lambda x: rep_dict[x.group(0)], string)
[docs] def load_from_files( self, local_files: list, # First file in list must be the format file table_object: Table, table_to_load: str = None, perform_rename: bool = False, file_compression: str = '', options: str = '', analyze_compression: str = None, ) -> int: rows_inserted = 0 format_file_path = None for file_name in local_files: # First file should be the format file if format_file_path is None: format_file_path = file_name else: try: rows_inserted = run_bcp( config=self.bcp_config, table_name=table_object.qualified_table_name, database_bind=table_object.database.bind, file_path=file_name, format_file_path=format_file_path, start_line=1, delimiter=self.delimiter, ) self.log.info(f"{rows_inserted} rows inserted from {file_name}") except BCPError: self.log.error(table_object.qualified_table_name, file_name) raise if perform_rename: self.rename_table(table_to_load, table_object) return rows_inserted
[docs] @staticmethod def value_for_bcp(column_value): if column_value is None: return '' else: column_value = str(column_value) # Line delimiter default is \r\n, so we'll replace that with just \n column_value = column_value.replace('\r\n', '\n') if column_value == '': # ASCII null should be for an empty string column_value = '\000' return column_value
[docs] def load_from_iterable( self, iterable: Iterable, table_object: Table, table_to_load: str = None, perform_rename: bool = False, progress_frequency: int = 10, analyze_compression: str = None, parent_task: Optional[ETLTask] = None, ) -> int: with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as temp_dir: format_file_path = os.path.join(temp_dir, f'data_{table_object.table_name}.fmt') data_file_path = os.path.join(temp_dir, f'data_{table_object.table_name}.data') create_bcp_format_file( table_object, format_file_path, delimiter=f'\\{ord(self.delimiter):03o}', row_terminator='\\r\\n' ) with open(data_file_path, 'w+', encoding="utf-8") as file: for row in iterable: line = self.delimiter.join([self.value_for_bcp(row[col_name]) for col_name in table_object.columns]) file.write(line + '\n') # with CSVWriter( # parent_task, # data_file_path, # delimiter=self.delimiter, # column_names=table_object.column_names, # include_header=False, # encoding='utf-8', # escapechar='\\', # doublequote=False, # quotechar='\000', # quoting=QUOTE_NONE, # ) as target_file: # for row in iterator: # row_count += 1 # target_file.insert_row(row) row_count = self.load_from_files( [format_file_path, data_file_path], table_object=table_object, table_to_load=table_to_load, perform_rename=perform_rename, analyze_compression=analyze_compression, ) return row_count