import os
import warnings
from typing import Optional
from pydantic import PrivateAttr, model_validator, Field
from config_wrangler.config_templates.config_hierarchy import ConfigHierarchy
from config_wrangler.config_templates.keepass_config import KeepassConfig
from config_wrangler.config_templates.password_defaults import PasswordDefaults
from config_wrangler.config_templates.password_source import PasswordSource, PasswordSourceValidated
__all__ = ['Credentials', 'PasswordDefaults', 'PasswordSource']
from config_wrangler.validate_config_hierarchy import config_hierarchy_validator
[docs]
class Credentials(ConfigHierarchy):
user_id: Optional[str] = None
"""
The user ID to use
"""
password_source: Optional[PasswordSourceValidated] = None
"""
The source to use when getting a password for the user.
See :py:class:`PasswordSource` for valid values.
"""
raw_password: Optional[str] = Field(repr=False, default=None)
"""
This is only used for the extremely non-secure `CONFIG_FILE` password source.
The password is stored directly in the config file next to the user_id with
the setting name `raw_password`
"""
keyring_section: Optional[str] = None
"""
If the password_source is KEYRING, then which section (AKA system)
should this module look for the password in.
See https://pypi.org/project/keyring/
or https://github.com/jaraco/keyring
"""
keepass_config: Optional[str] = 'keepass'
"""
If the password_source is KEEPASS, then which root level config item contains
the settings for Keepass (must be an instance of
:py:class:`config_wrangler.config_templates.keepass_config.KeepassConfig`)
"""
keepass: Optional[KeepassConfig] = None
"""
If the password_source is KEEPASS, then load a sub-section with the
:py:class:`config_wrangler.config_templates.keepass_config.KeepassConfig`) settings
"""
keepass_group: Optional[str] = None
"""
If the password_source is KEEPASS, which group in the Keepass database should
be searched for an entry with a matching entry.
If is None, then the `KeepassConfig.default_group` value will be checked.
If that is also None, then a ValueError will be raised.
"""
keepass_title: Optional[str] = None
"""
If the password_source is KEEPASS, this is an optional filter on the title
of the keepass entries in the group.
"""
validate_password_on_load: bool = True
"""
Should config_wrangler query the password source for this password at
load (startup) time? If so, it will raise an error if the password is
None or an empty string. It **does not** actually connect or
authenticate the user_id & password combination.
"""
# Values to hide from config exports
_private_value_atts = PrivateAttr(default={'raw_password'})
def _get_password_keyring(self):
if self.keyring_section is None:
raise ValueError(f"{self.full_item_name()} keyring_section is None but password_source was keyring")
import keyring
password = keyring.get_password(self.keyring_section, self.user_id)
search_info = f"{self.keyring_section} {self.user_id}"
return password, search_info
def _get_password_config(self):
warnings.warn(
'Passwords stored directly in config or worse in code are not safe. Please make sure to fix this before deploying.',
stacklevel=3,
)
password = self.raw_password
search_info = f"{self.full_item_name()}.raw_password"
return password, search_info
@staticmethod
def _get_envt_name(search_name: str) -> Optional[str]:
if search_name in os.environ:
return search_name
else:
search_name_upper = search_name.upper()
for envt_var in os.environ:
envt_var_upper = envt_var.upper()
if envt_var_upper == search_name_upper:
return envt_var
return None
def _get_password_environment(self):
password = self.raw_password
if password is not None and password != '':
# Env loader might have already found it for us. At least we'll assume that's where it came from.
search_info = f"{self.full_item_name()}.raw_password"
return password, search_info
else:
name_in_envt = self.full_item_name(delimiter='_').replace('.', '_')
search_info_list = []
password = None
for envt_name_search in [
f"PASSWORD_{self.user_id}",
f"{self.user_id}_PASSWORD",
self.user_id,
f"{name_in_envt}_PASSWORD",
]:
search_info_list.append(envt_name_search)
envt_name_found = self._get_envt_name(envt_name_search)
if envt_name_found is not None:
if envt_name_found in os.environ:
password = os.environ[envt_name_found]
break
if password is None:
raise ValueError(f"{self.full_item_name()} password_source = ENVIRONMENT. Value not found in {search_info_list}")
else:
return password, ','.join(search_info_list)
def _get_keepass_config_str_ref(self) -> KeepassConfig:
# Try older keepass_config string reference
keepass_config = None
if self.keepass_config is not None:
try:
if self._root_config is None:
raise ValueError("get_password called on Credentials that are not part of a ConfigRoot hierarchy (and keepass_config used)")
keepass_config = getattr(self._root_config, self.keepass_config)
if not isinstance(keepass_config, KeepassConfig):
raise ValueError(f"{self.full_item_name()} keepass_config is {type(keepass_config)} = {keepass_config} not KeepassConfig instance")
except (KeyError, AttributeError):
pass
return keepass_config
def _get_keepass_config_sub_section(self) -> KeepassConfig:
if self.keepass is not None:
keepass_config = self.keepass
else:
# Try root passwords subsection definition
if self._root_config is None:
raise ValueError(
"get_password called on Credentials that are not part of a ConfigRoot hierarchy "
"and does not have a local keepass_config or keepass sub-section"
)
if self._root_config.passwords is None:
raise ValueError(
f"{self.full_item_name()} get_password called with PasswordSource = KEEPASS, "
f"but keepass_config/keepass is not in "
f"local section and yet the root passwords section is missing."
)
if self._root_config.passwords.keepass is None:
raise ValueError(
f"{self.full_item_name()} get_password called with PasswordSource = KEEPASS, but keepass_config/keepass is not in "
f"local section and yet the root passwords section is also missing keepass_config."
)
keepass_config = self._root_config.passwords.keepass
if not isinstance(keepass_config, KeepassConfig):
raise ValueError(f"{self.full_item_name()} keepass_config is {type(keepass_config)} = {keepass_config} not KeepassConfig instance")
return keepass_config
def _get_password_keepass(self):
keepass_config = self._get_keepass_config_str_ref()
if keepass_config is None:
keepass_config = self._get_keepass_config_sub_section()
try:
search_info = keepass_config.full_item_name()
get_password = keepass_config.get_password
try:
password = get_password(self.keepass_group, self.keepass_title, self.user_id)
except ValueError as e:
raise ValueError(f"{self.full_item_name()} -> keepass config {keepass_config.full_item_name()} error {e}")
except AttributeError as e:
raise ValueError(
f"{self.full_item_name()} -> keepass config {keepass_config.full_item_name()} does not appear to be valid. "
f"{e}"
)
return password, search_info
[docs]
def get_password(self) -> str:
"""
Get the password for this resource.
`password_source` controls where it looks for the password.
If that is None, then the root level `passwords` container is checked for `password_source` value.
"""
if self.password_source is None:
if self._root_config is None:
raise ValueError("get_password called on Credentials that are not part of a ConfigRoot hierarchy")
try:
passwords_defaults = getattr(self._root_config, 'passwords')
except AttributeError:
raise ValueError(
f"{self.full_item_name()} password_source not provided and 'passwords' section not found"
)
if passwords_defaults is None:
raise ValueError(
f"{self.full_item_name()} password_source not provided "
f"and 'passwords' section does not exist."
)
else:
try:
self.password_source = passwords_defaults.password_source
except AttributeError as e:
raise ValueError(
f"{self.full_item_name()} password_source not provided "
f"and 'passwords' section does not have 'password_source' {e} "
)
try:
if self.password_source == PasswordSource.KEYRING:
password, search_info = self._get_password_keyring()
elif self.password_source == PasswordSource.CONFIG_FILE:
password, search_info = self._get_password_config()
elif self.password_source == PasswordSource.ENVIRONMENT:
password, search_info = self._get_password_environment()
elif self.password_source == PasswordSource.KEEPASS:
password, search_info = self._get_password_keepass()
else:
raise ValueError(f"invalid password_source")
except Exception as e:
raise ValueError(f"{self.full_item_name()} with password_source={self.password_source} got error {e}")
if password is None or password == '':
raise ValueError(f"{self.full_item_name()} password is not set. Source is {self.password_source} location = {search_info}")
return password
[docs]
@model_validator(mode='after')
def check_model(self):
user_id = self.user_id
if user_id == '' or user_id is None:
raise ValueError("user_id not provided")
if self.password_source == PasswordSource.KEYRING:
keyring_section = self.keyring_section
if keyring_section is None:
raise ValueError(f"{self} keyring_section is None but password_source was keyring")
elif self.password_source == PasswordSource.CONFIG_FILE:
password = self.raw_password
if password is None or password == '':
raise ValueError(f"{self} password_source is Config but password is not set")
return self
@config_hierarchy_validator
def check_config_hierarchy(self):
if self._root_config is None:
raise ValueError("Credentials are not part of a ConfigRoot hierarchy")
else:
model_config = self._root_config.model_config
validate_credentials = getattr(model_config, 'validate_credentials', True)
if model_config.get('validate_default', True) and validate_credentials and self.validate_password_on_load:
root_config = self._root_config.model_config
root_validate_credentials = root_config.get('validate_credentials', True)
if root_validate_credentials:
_ = self.get_password()