Source code for bi_etl.bulk_loaders.redshift_s3_json_loader

# https://www.python.org/dev/peps/pep-0563/
from __future__ import annotations

import gzip
import io
import json
import os.path
import os.path
from typing import *
from datetime import datetime
from tempfile import TemporaryDirectory

from bi_etl.bulk_loaders.redshift_s3_base import RedShiftS3Base
from bi_etl.bulk_loaders.s3_bulk_load_config import S3_Bulk_Loader_Config
from bi_etl.timer import Timer

if TYPE_CHECKING:
    from bi_etl.components.table import Table
    from bi_etl.scheduler.task import ETLTask


[docs] class RedShiftS3JSONBulk(RedShiftS3Base):
[docs] def __init__(self, config: S3_Bulk_Loader_Config, ): super().__init__( config=config, )
@property def needs_all_columns(self): return False
[docs] def get_copy_sql( self, s3_source_path: str, table_to_load: str, file_compression: str = '', analyze_compression: str = None, options: str = '', ): analyze_compression = analyze_compression or self.analyze_compression if analyze_compression: options += f' COMPUPDATE {self.analyze_compression} ' return f"""\ COPY {table_to_load} FROM 's3://{self.s3_bucket_name}/{s3_source_path}' credentials 'aws_access_key_id={self.s3_user_id};aws_secret_access_key={self.s3_password}' JSON 'auto' {file_compression} {options} """
[docs] @staticmethod def json_serializer(value): if isinstance(value, datetime): return str(value) else: # raise ValueError(f'No json_serializer support for {repr(value)}') str(value)
[docs] def load_from_iterable( self, iterable: Iterable, table_object: Table, table_to_load: str = None, perform_rename: bool = False, progress_frequency: int = 10, analyze_compression: str = None, parent_task: Optional[ETLTask] = None, ) -> int: row_count = 0 with TemporaryDirectory(ignore_cleanup_errors=True) as temp_dir: local_files = [] zip_pool = [] text_wrapper_pool = [] writer_pool_size = self.s3_files_to_generate for file_number in range(writer_pool_size): filepath = os.path.join(temp_dir, f'data_{file_number}.json.gz') local_files.append(filepath) zip_file = gzip.open(filepath, 'wb') # noinspection PyTypeChecker text_wrapper = io.TextIOWrapper(zip_file, encoding='utf-8') text_wrapper_pool.append(text_wrapper) zip_pool.append(zip_file) progress_timer = Timer() for row_number, row in enumerate(iterable): row_count += 1 text_wrapper = text_wrapper_pool[row_number % writer_pool_size] text_wrapper.write(json.dumps(row.as_dict, default=self.json_serializer)) text_wrapper.write("\n") if progress_frequency is not None: # noinspection PyTypeChecker if 0 < progress_frequency < progress_timer.seconds_elapsed: self.log.info(f"Wrote row {row_number:,}") progress_timer.reset() for text_wrapper in text_wrapper_pool: text_wrapper.close() for zip_file in zip_pool: zip_file.close() if row_count > 0: self.load_from_files( local_files, file_compression='GZIP', table_object=table_object, table_to_load=table_to_load, perform_rename=perform_rename, analyze_compression=analyze_compression, ) else: self.log.info(f"{self} had nothing to do with 0 rows found") return row_count