import ast
import json
import logging
import re
import types
from datetime import timezone, datetime
from typing import *
from pydantic import BaseModel, ValidationError
from pydantic.fields import FieldInfo
from pydicti import dicti, Dicti
from config_wrangler.config_exception import ConfigError
from config_wrangler.config_types.delimited_field import DelimitedListFieldInfo
from config_wrangler.config_types.dynamically_referenced import DynamicallyReferenced, DynamicFieldInfo
# Moved here because Pydantic V2 deprecated it
[docs]
def lenient_issubclass(
cls: Any,
class_or_tuple: Union[Type[Any], Tuple[Type[Any], ...], Set[Type[Any]], None]
) -> bool:
try:
if isinstance(class_or_tuple, set):
class_or_tuple = tuple(class_or_tuple)
if isinstance(cls, type):
return issubclass(cls, class_or_tuple)
else:
try:
origin = cls.__origin__
except AttributeError:
origin = get_origin(cls)
if origin is not None:
if origin == Union:
for union_cls in get_args(cls):
if lenient_issubclass(union_cls, class_or_tuple):
return True
return False
else:
return issubclass(origin, class_or_tuple)
except TypeError:
if isinstance(cls, (types.GenericAlias, types.UnionType)):
return False
raise
[docs]
def get_inner_type(cls: Any):
try:
origin = cls.__origin__
except AttributeError:
origin = get_origin(cls)
if origin == Union:
for inner_cls in get_args(cls):
return get_inner_type(inner_cls)
else:
return get_args(cls)
[docs]
def has_sub_fields(inner_type: Type):
return hasattr(inner_type, 'model_fields')
[docs]
def merge_configs(child: MutableMapping, parent: MutableMapping) -> None:
for section in parent:
if section not in child:
child[section] = parent[section]
else:
if isinstance(child[section], MutableMapping):
merge_configs(child[section], parent[section])
else:
pass
[docs]
def resolve_variable(root_config_data: MutableMapping, variable_name: str, part_delimiter=':') -> Any:
variable_name_parts = variable_name.split(part_delimiter)
result = root_config_data
for part in variable_name_parts:
# Change to case-insensitive dict
if not isinstance(result, dicti):
result = Dicti(result)
if part in result:
result = result[part]
else:
raise ValueError(f"<<{part} NOT FOUND when resolving variable with parts: {variable_name_parts}>>")
return result
_interpolation_re = re.compile(r"\${([^}]+)}")
[docs]
def interpolate_values(container: MutableMapping, root_config_data: MutableMapping) -> List[Tuple[str, str]]:
errors = []
for section in container:
value = container[section]
if isinstance(value, MutableMapping):
sub_errors = interpolate_values(value, root_config_data=root_config_data)
errors.extend(sub_errors)
elif isinstance(value, str):
if '$' in value:
depth = 0
done = False
new_value = value
while not done:
done = True
depth += 1
variables_cnt = 0
result_values = []
next_start = 0
if isinstance(new_value, str):
for variable_found in _interpolation_re.finditer(new_value):
variables_cnt += 1
variable_name = variable_found.groups()[0]
var_start, var_end = variable_found.span()
part_delimiter = None
if ':' in variable_name:
part_delimiter = ':'
elif '.' in variable_name:
part_delimiter = '.'
variable_replacement = 'ERROR'
if part_delimiter is not None:
try:
variable_replacement = resolve_variable(
root_config_data,
variable_name,
part_delimiter=part_delimiter,
)
except ValueError as e:
errors.append((section, str(e),))
else:
try:
# Change to case-insensitive dict
search_container = Dicti(container)
variable_replacement = search_container[variable_name]
except KeyError:
errors.append((section, f"<<{variable_name} NOT FOUND>>",))
result_values.append(new_value[next_start:var_start])
result_values.append(variable_replacement)
next_start = var_end
if variables_cnt > 0:
if next_start < len(new_value):
result_values.append(new_value[next_start:])
result_values = [part for part in result_values if part != '']
if len(result_values) == 1:
# Possibly not a string -- maybe a dict
new_value = result_values[0]
if not isinstance(new_value, str):
done = True
else:
new_value = ''.join([str(v) for v in result_values])
if depth < 50:
done = False
else:
raise ValueError(
f"Interpolation recursion depth limit reached on value {value} "
f"ended processing with {new_value}"
)
container[section] = new_value
return errors
[docs]
def parse_as_literal_or_json(value: str) -> Any:
try:
return ast.literal_eval(value)
except (ValueError, SyntaxError) as el:
try:
return json.loads(value)
except ValueError as ej:
raise ConfigError(
f"Value {value} could not be parsed as python literal '{el}' or json '{ej}'"
)
[docs]
def parse_delimited_list(
field_name: str,
field_info: FieldInfo,
field_value: str
) -> Sequence:
value = field_value.strip()
if len(value) == 0:
return []
if isinstance(field_info, DelimitedListFieldInfo):
delimiter = field_info.delimiter
else:
delimiter = None
if delimiter is None and value[0] not in {'[', '{'}:
# Try to automatically recognize the delimiter
for try_delimiter in ['\n', ',', '|']:
if try_delimiter in value:
delimiter = try_delimiter
break
if delimiter is not None:
if value[0] == delimiter:
value = value[1:]
result = [v.strip() for v in value.split(delimiter)]
if lenient_issubclass(field_info.annotation, int):
result = [int(v) for v in result]
elif lenient_issubclass(field_info.annotation, float):
result = [float(v) for v in result]
else:
try:
result = parse_as_literal_or_json(value)
except ValueError as e:
# Single value list
result = [value]
return result
[docs]
def full_name(parents: List[str], field_name: str):
return '.'.join(parents + [field_name])
[docs]
def inherit_fill(parent_config, child_config):
for inherit_key, inherit_value in parent_config.items():
if inherit_key not in child_config:
child_config[inherit_key] = inherit_value
[docs]
def find_referenced_section(
field_name: str,
field_info: FieldInfo,
parents: List[str],
section_name: Union[str, MutableMapping],
current_dict: MutableMapping,
root_dict: MutableMapping
) -> MutableMapping:
try:
# noinspection PyUnresolvedReferences
inherit = field_info.inherit
except AttributeError:
inherit = False
if isinstance(section_name, str):
section_value = dict()
section_name_parts = section_name.split('.')
parts_used = []
for parts_to_use in range(len(section_name_parts), 0, -1):
section_name_2 = '.'.join(section_name_parts[:parts_to_use])
if section_name_2 in root_dict:
parts_used.append(tuple(section_name_2,))
inherit_fill(root_dict[section_name_2], section_value)
elif section_name_2 in current_dict:
parts_used.append((*parents, section_name_2,))
inherit_fill(current_dict[section_name_2], section_value)
if inherit:
inherit_fill(current_dict, section_value)
if len(parts_used) == 0:
raise ConfigError(
f"{full_name(parents, field_name)} refers to section {section_name} which does not exist."
)
return section_value
else:
return section_name
[docs]
def build_referenced_objects(
field_name: str,
field_info: FieldInfo,
parents: List[str],
parent_container: MutableMapping,
root_config_data: MutableMapping,
list_of_sections: Sequence[str],
inner_type: type[BaseModel],
) -> Dict[str, BaseModel]:
section_name = None
try:
section_contents = dict()
for section_name in list_of_sections:
# For DynamicallyReferenced we don't use the full value returned
# here, but this helps us by validating the reference
ref_section_value = find_referenced_section(
field_name=field_name,
field_info=field_info,
parents=parents,
section_name=section_name,
current_dict=parent_container,
root_dict=root_config_data,
)
if issubclass(inner_type, DynamicallyReferenced):
# In the case of DynamicallyReferenced it refers to an existing static instance,
# so we don't build a new instance, we instead just store the pointer.
inner_type_instance = DynamicallyReferenced(ref=section_name)
else:
inner_type_instance = inner_type(**ref_section_value)
section_contents[section_name] = inner_type_instance
return section_contents
except ValidationError as e:
raise ValueError(f"Field {full_name(parents, field_name)} error on section {section_name} = {repr(e)})")
[docs]
def match_config_data_to_field(
field_name: str,
field_info: FieldInfo,
field_value: object,
parent_container: MutableMapping,
root_config_data: MutableMapping,
parents: List[str],
):
if lenient_issubclass(field_info.annotation, (str, int, float)):
pass
elif lenient_issubclass(field_info.annotation, list):
if isinstance(field_value, str):
field_value = parse_delimited_list(field_name, field_info, field_value)
inner_type_args = get_inner_type(field_info.annotation)
if len(inner_type_args) > 1:
raise SyntaxError(
f"{full_name(parents, field_name)} has type {field_info.annotation} "
f"with more than one inner type {inner_type_args}"
)
elif len(inner_type_args) == 1:
inner_type = inner_type_args[0]
if has_sub_fields(inner_type):
ref_object_dict = build_referenced_objects(
field_name=field_name,
field_info=field_info,
parents=parents,
parent_container=parent_container,
root_config_data=root_config_data,
list_of_sections=field_value,
inner_type=inner_type,
)
field_value = list(ref_object_dict.values())
elif lenient_issubclass(field_info.annotation, tuple):
if isinstance(field_value, str):
field_value = parse_delimited_list(field_name, field_info, field_value)
# See docs for Tuple annotations
# https://docs.python.org/3/library/typing.html#annotating-tuples
inner_type_args = get_inner_type(field_info.annotation)
if len(inner_type_args) == 0:
# Nothing to do for untyped tuple container
pass
elif len(inner_type_args) == 2 and inner_type_args[1] == Ellipsis:
inner_type = inner_type_args[0]
if has_sub_fields(inner_type):
ref_object_dict = build_referenced_objects(
field_name=field_name,
field_info=field_info,
parents=parents,
parent_container=parent_container,
root_config_data=root_config_data,
list_of_sections=field_value,
inner_type=inner_type,
)
field_value = list(ref_object_dict.values())
elif len(inner_type_args) != len(field_value):
raise ValueError(
f"{full_name(parents, field_name)} has type {field_info.annotation} "
f"expects {len(inner_type_args)} values but got {len(field_value)} values."
)
else:
new_field_values = list()
for inner_type, value in zip(inner_type_args, field_value):
if has_sub_fields(inner_type):
ref_object_dict = build_referenced_objects(
field_name=field_name,
field_info=field_info,
parents=parents,
parent_container=parent_container,
root_config_data=root_config_data,
list_of_sections=[value],
inner_type=inner_type,
)
new_field_values.append(ref_object_dict[value])
else:
new_field_values.append(value)
elif lenient_issubclass(field_info.annotation, dict):
if isinstance(field_value, str):
try:
field_value = parse_as_literal_or_json(field_value)
except ValueError as e:
try:
list_of_sections = parse_delimited_list(field_name, field_info, field_value)
inner_type_args = get_inner_type(field_info.annotation)
if len(inner_type_args) != 2:
raise SyntaxError(
f"{full_name(parents, field_name)} has type {field_info.annotation} "
f"without exactly 2 inner types {inner_type_args}"
)
elif len(inner_type_args) == 2:
inner_type1 = inner_type_args[0]
if inner_type1 != str:
raise SyntaxError(
f"{full_name(parents, field_name)} has type {field_info.annotation} "
f"with the first inner type not being str (it is {inner_type1})"
)
inner_type2 = inner_type_args[1]
if has_sub_fields(inner_type2):
ref_object_dict = build_referenced_objects(
field_name=field_name,
field_info=field_info,
parents=parents,
parent_container=parent_container,
root_config_data=root_config_data,
list_of_sections=list_of_sections,
inner_type=inner_type2,
)
field_value = ref_object_dict
except ValueError as e2:
raise ConfigError(
f"Field {full_name(parents, field_name)}"
f"Tried as list of section references and got error {e2}.\n"
f"Also tried as literal and got {e}"
)
elif lenient_issubclass(field_info.annotation, set):
if isinstance(field_value, str):
field_value = set(
parse_delimited_list(
field_name=field_name,
field_info=field_info,
field_value=field_value
)
)
elif lenient_issubclass(field_info.annotation, frozenset):
if isinstance(field_value, str):
field_value = frozenset(
parse_delimited_list(
field_name=field_name,
field_info=field_info,
field_value=field_value
)
)
# In all cases not explicitly matched above, we'll let pydantic parse it as is
return field_value
[docs]
def match_config_data_to_model(
model: BaseModel,
config_data: MutableMapping,
root_config_data: MutableMapping = None,
parents=None
):
if parents is None:
parents = []
if root_config_data is None:
root_config_data = config_data
# Make mappings from lower case names to actual config field names
config_name_map = {key.lower(): key for key in config_data}
root_config_name_map = {key.lower(): key for key in root_config_data}
# Scan all model fields to look for values in the MutableMapping
for field_name_outer, field_info in model.model_fields.items():
# Check for either a direct name match or a case in-sensitive match
field_name = field_info.alias or field_name_outer
field_lower = field_name.lower()
if field_lower in config_name_map:
found = True
source_field_name = config_name_map[field_lower]
if source_field_name != field_name:
config_data[field_name] = config_data[source_field_name]
del config_data[source_field_name]
else:
found = False
# Check for nested objects set using top level dotted names (e.g. [parent.child])
# (either a direct name match or a case in-sensitive match)
if not found and len(parents) > 0:
section_name = '.'.join(parents + [field_name])
section_name_lower = section_name.lower()
if section_name in root_config_data:
found = True
# Copy data into place where pydantic will expect it
config_data[field_name] = root_config_data[section_name]
elif section_name_lower in root_config_name_map:
found = True
section_name = root_config_name_map[section_name_lower]
# Copy data into place where pydantic will expect it
config_data[field_name] = root_config_data[section_name]
if found:
updated_value = match_config_data_to_field_or_submodel(
field_name=field_name,
field_info=field_info,
parent_container=config_data,
root_config_data=root_config_data,
parents=parents + [field_name]
)
config_data[field_name] = updated_value
return config_data
[docs]
def match_config_data_to_field_or_submodel(
field_name: str,
field_info: FieldInfo,
parent_container: MutableMapping,
root_config_data: MutableMapping = None,
parents=None
):
if has_sub_fields(field_info.annotation):
if field_name not in parent_container:
raise ValueError(f"Field {full_name(parents, field_name)} not found.")
if isinstance(field_info, DynamicFieldInfo):
ref_object_dict = build_referenced_objects(
field_name=field_name,
field_info=field_info,
parents=parents,
parent_container=parent_container,
root_config_data=root_config_data,
list_of_sections=[parent_container[field_name]],
inner_type=field_info.annotation,
)
if lenient_issubclass(field_info.annotation, BaseModel):
updated_value = list(ref_object_dict.values())[0]
elif lenient_issubclass(field_info.annotation, list):
updated_value = list(ref_object_dict.values())
elif lenient_issubclass(field_info.annotation, set):
updated_value = set(ref_object_dict.values())
elif lenient_issubclass(field_info.annotation, frozenset):
updated_value = frozenset(ref_object_dict.values())
elif lenient_issubclass(field_info.annotation, dict):
updated_value = ref_object_dict
else:
raise ValueError(
f"Field {full_name(parents, field_name)} "
f"DynamicFieldInfo type {field_info.annotation} not expected"
)
else:
# noinspection PyTypeChecker
updated_value = match_config_data_to_model(
model=field_info.annotation,
config_data=parent_container[field_name],
root_config_data=root_config_data,
parents=parents
)
else:
updated_value = match_config_data_to_field(
field_name=field_name,
field_info=field_info,
field_value=parent_container[field_name],
parent_container=parent_container,
root_config_data=root_config_data,
parents=parents
)
return updated_value
[docs]
def walk_model(
model: BaseModel,
parents=None
):
if parents is None:
parents = []
# Scan all model fields to look for values in the MutableMapping
for field_name, field_info in model.model_fields.items():
if has_sub_fields(field_info.annotation):
# noinspection PyTypeChecker
yield from walk_model(
model=field_info.annotation,
parents=parents + [field_name]
)
else:
yield field_name, field_info, parents