Source code for bi_etl.boto3_helper.session

"""
Created on March 28, 2022

@author: 
"""
import logging
import warnings

import boto3
import keyring


[docs] def get_user_id( **kwargs ): raise DeprecationWarning("Replaced with self.config.my_config_section.user_id. " "Most likely not needed since the config S3_Bucket will connect seamlessly")
[docs] def get_boto3_session_from_config( config, user_id: str = None, password: str = None, keyring_system: str = 's3' ): warnings.warn("Replace with self.s3_helper_from_config", DeprecationWarning, stacklevel=2) log = logging.getLogger(__name__) if user_id is None: user_id = get_user_id(config) if password is None: password = keyring.get_password(keyring_system, user_id) if password is None: raise ValueError(f"Password not supplied in keyring for {keyring_system} {user_id}") # Detect short passwords from bad copy/paste elif len(password) < 3: raise ValueError(f"Password too short ({password}) in keyring for {keyring_system} {user_id}") log.info(f'Connecting to S3 with ID {user_id}') session = boto3.session.Session( aws_access_key_id=user_id, aws_secret_access_key=password ) return session
[docs] class Boto3_Base(object): SERVICE = None
[docs] def __init__(self, session, region_name: str = None): self.session = session self.region_name = region_name self._client = None self._resource = None
@property def resource(self, ): if self._resource is None: self._resource = self.session.resource(self.SERVICE, region_name=self.region_name) return self._resource @property def client(self, ): if self._client is None: self._client = self.session.client(self.SERVICE, region_name=self.region_name) return self._client