Source code for bi_etl.bulk_loaders.postgresql_copy

# https://www.python.org/dev/peps/pep-0563/
from __future__ import annotations

import os.path
import tempfile
from pathlib import Path
from typing import *

from bi_etl.bulk_loaders.bulk_loader import BulkLoader
from bi_etl.bulk_loaders.postgresql_bulk_load_config import PostgreSQLBulkLoaderConfig
from bi_etl.components.csv_writer import CSVWriter, QUOTE_MINIMAL
from bi_etl.utility.postgresql.psycopg2_helpers import psycopg2_import_using_cursor

if TYPE_CHECKING:
    from bi_etl.scheduler.task import ETLTask
    from bi_etl.components.table import Table


[docs] class PostgreSQLCopy(BulkLoader):
[docs] def __init__(self, config: PostgreSQLBulkLoaderConfig, ): super().__init__( ) self.config = config
[docs] def load_from_files( self, local_files: List[Union[str, Path]], table_object: Table, table_to_load: str = None, perform_rename: bool = False, file_compression: str = '', options: str = '', analyze_compression: str = None, ) -> int: rows_inserted = 0 conn = None try: conn = table_object.table.bind.raw_connection() conn.set_client_encoding(self.config.encoding) cursor = conn.cursor() for file_name in local_files: rows_inserted = psycopg2_import_using_cursor( cursor=cursor, table_spec=table_object.qualified_table_name, input_file_path=file_name, delimiter=self.config.delimiter, csv_mode=True, header=self.config.header, null=self.config.null, encoding=self.config.encoding, ) self.log.info(f"{rows_inserted} rows inserted from {file_name}") # Note: We need to commit here otherwise the changes are lost when we close the raw connection. conn.commit() finally: if conn is not None: conn.close() if perform_rename: self.rename_table(table_to_load, table_object) return rows_inserted
[docs] def load_from_iterable( self, iterable: Iterable, table_object: Table, table_to_load: str = None, perform_rename: bool = False, progress_frequency: int = 10, analyze_compression: str = None, parent_task: Optional[ETLTask] = None, ) -> int: with tempfile.TemporaryDirectory(dir=self.config.temp_file_path, ignore_cleanup_errors=True) as temp_dir: data_file_path = os.path.join(temp_dir, f'data_{table_object.table_name}.data') # Save to a file first, so we can call load_from_files with open(data_file_path, 'w+', encoding="utf-8") as file: with CSVWriter( parent_task, file, delimiter=self.config.delimiter, column_names=table_object.column_names, include_header=self.config.header, encoding='utf-8', escapechar='\\', quoting=QUOTE_MINIMAL, null=self.config.null, ) as writer: for row in iterable: writer.insert_row(row) row_count = self.load_from_files( [data_file_path], table_object=table_object, table_to_load=table_to_load, perform_rename=perform_rename, analyze_compression=analyze_compression, ) return row_count