Source code for bi_etl.utility.bcp_helpers

import logging
import os
import subprocess
import tempfile
import textwrap
from datetime import date, datetime
from pprint import pformat

from config_wrangler.config_templates.config_hierarchy import ConfigHierarchy
from config_wrangler.config_types.path_types import ExecutablePath
from sqlalchemy.dialects.mssql import BIT

from bi_etl.components.table import Table

log = logging.getLogger('etl.utils.bcp_helpers')


[docs] class BCPError(Exception): pass
[docs] class BCP_Config(ConfigHierarchy): path_to_bcp_exectable: ExecutablePath = 'bcp' batch_size: int = 10000
[docs] def create_bcp_format_file(table: Table, bcp_format_path, encoding=None, delimiter=None, row_terminator=None): with open(bcp_format_path, "w", encoding="utf-8") as bcp_fmt: field_list = list() column_list = list() max_col = len(table.columns) if delimiter is None: if encoding == 'utf_16_le': delimiter = '|\\0' else: delimiter = '|' for col_num, column in enumerate(table.columns): size = None scale = None c_type = column.type try: p_type = c_type.python_type except NotImplementedError: if isinstance(c_type, BIT): p_type = bool else: raise ValueError(f"Unexpected data type {c_type} for column") # Types # https://docs.microsoft.com/en-us/sql/relational-databases/import-export/xml-format-files-sql-server?view=sql-server-2017 # https://msdn.microsoft.com/en-us/library/ff718877(v=sql.105).aspx if p_type == int: size = 24 elif p_type == bool: size = 1 elif p_type == datetime: size = 48 elif p_type == date: size = 22 elif p_type == str: if c_type.length is not None: if encoding == 'utf_16_le': size = c_type.length * 2 else: size = c_type.length if size > 2 ** 15: size = None if col_num + 1 == max_col: if row_terminator is None: if encoding == 'utf_16_le': delimiter = "\\r\\0\\n\\0" else: delimiter = "\\r\\n" else: delimiter = row_terminator if encoding == 'utf_16_le': field_type = 'NCharTerm' else: field_type = 'CharTerm' field_spec = f'<FIELD ID="{col_num + 1}" xsi:type="{field_type}" COLLATION="" TERMINATOR="{delimiter}"' if size is not None: field_spec += f' MAX_LENGTH="{size}"' if scale is not None: field_spec += f' SCALE="{scale}"' field_spec += "/>" field_list.append(field_spec) column_list.append( f'<COLUMN SOURCE="{col_num + 1}" NAME="{column.name}" />' ) bcp_fmt.write(textwrap.dedent("""\ <?xml version="1.0"?> <BCPFORMAT xmlns="http://schemas.microsoft.com/sqlserver/2004/bulkload/format" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> <RECORD> """)) bcp_fmt.write('\n'.join(field_list)) bcp_fmt.write(textwrap.dedent("""\n</RECORD>\n<ROW>""")) bcp_fmt.write('\n'.join(column_list)) bcp_fmt.write(textwrap.dedent("""\n</ROW>\n</BCPFORMAT>"""))
[docs] def run_bcp( config: BCP_Config, table_name: str, file_path: str, database_bind, format_file_path=None, direction='in', delimiter='^K', temp_dir=None, start_line=1, encoding=None, ): if temp_dir is None: cleanup_temp = True temp_dir_obj = tempfile.TemporaryDirectory(ignore_cleanup_errors=True) temp_dir = temp_dir_obj.name else: temp_dir_obj = None cleanup_temp = False bcp_errors = os.path.join(temp_dir, "bcp.errors") if encoding == 'utf_16_le': char_encoding = 'UTF-16' else: # char_encoding = 'UTF-8' char_encoding = '65001' cmd = [config.path_to_bcp_exectable, table_name, # in / out direction, file_path, '-S', database_bind.url.host, '-d', database_bind.url.database, '-U', database_bind.url.username, '-P', database_bind.url.password, # Max errors # '-m', '1000', # '-o', bcp_output, '-e', bcp_errors, # Batch size '-b', str(config.batch_size), # encoding eg UTF-8 '-C', char_encoding, # hints # '-h', 'CHECK_CONSTRAINTS,TABLOCK', # '-h', 'CHECK_CONSTRAINTS', # packet size = Max # '-a', '65535' ] # Specify start line defaults to 1 in params if start_line > 1: cmd.extend(['-F', str(start_line)]) if format_file_path: cmd.extend(['-f', format_file_path]) else: # UTF Mode if encoding == 'utf_16_le': cmd.extend(['-w']) else: cmd.extend(['-c']) cmd.extend(['-t', delimiter]) log.debug(" ".join(cmd).replace(database_bind.url.password, '****')) messages = list() try: p = subprocess.Popen(cmd, stdout=subprocess.PIPE, bufsize=1, stderr=subprocess.STDOUT) # this did not work # try: # outs, _ = p.communicate(input=database_bind.url.password, timeout=1) # for line in outs.spltlines: # if line != b'': # line = line.strip() # messages.append(line) # log.info(line) # except subprocess.TimeoutExpired: # pass while p.poll() is None: line = p.stdout.readline() if line != b'': line = line.strip() messages.append(line) log.info(line) for line in p.stdout.readlines(): if line != b'': line = line.strip() messages.append(line) log.info(line) p.stdout.close() rc = p.returncode if rc != 0: log.error(f'bcp returned code {rc}') with open(bcp_errors, 'r', encoding='utf-8', errors='skip') as bcp_error_file: error_messages = bcp_error_file.read() if len(error_messages) > 0: if rc == 0: rc = 1 log.error("Errors with rc = 0. Setting rc = 1") log.error('-' * 80) log.error('BCP error detail:') log.error(error_messages) log.error('-' * 80) if rc == 0: log.debug('BCP output parsing:') rows = 0 for line in messages: line = line.strip() if line.endswith(b' rows copied.'): rows = int(line[:-13]) log.debug(f"Rows message found rows = {rows}") break return rows else: raise BCPError('bcp error') except IOError as e: raise e except subprocess.CalledProcessError as e: log.error("Error code " + str(e.returncode)) log.error('BCP messages:') log.error(pformat(messages)) log.error('BCP output:') log.error(e.output) log.error('-' * 80) with open(bcp_errors, 'r') as bcp_error_file: error_messages = bcp_error_file.read() log.error('BCP raw errors:') log.error(error_messages) log.error('-' * 80) raise BCPError("BCP Error code " + str(e.returncode)) finally: if cleanup_temp: temp_dir_obj.cleanup()
[docs] def format_value_for_bcp(value): if isinstance(value, datetime): # noinspection PyTypeChecker,PyTypeChecker return ( f'{value.year:4d}-{value.month:02d}-{value.day:02d} ' f'{value.hour:02d}:{value.minute:02d}:{value.second + value.microsecond / 1000000:06.3f}' ) elif value is None: return '' else: return str(value).replace('|', '/')