"""
Copyright 2023 Inmanta
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
Contact: code@inmanta.com
"""
import abc
import itertools
import logging
import re
from collections.abc import Sequence
from typing import Optional, TypeGuard, TypeVar, Union, overload
from inmanta.stable_api import stable_api
LOGGER = logging.getLogger(__name__)
TWDP = TypeVar("TWDP", bound="WildDictPath")
TWID = TypeVar("TWID", bound="WildInDict")
TWKL = TypeVar("TWKL", bound="WildKeyedList")
TWCP = TypeVar("TWCP", bound="WildComposedPath")
@stable_api
class InvalidPathException(Exception):
"""
The path could not be parsed correctly.
"""
@stable_api
class ContainerStructureException(LookupError):
"""
The requested item could not be found,
because the container passed to this path is not of the expected type.
"""
@stable_api
class DictPathValue(abc.ABC):
"""
Represents a data value part of a WildDictPath.
"""
@abc.abstractmethod
def escape(self) -> str:
"""
Return this value with all special characters escaped.
"""
raise NotImplementedError()
@abc.abstractmethod
def matches(self, value: Optional[object]) -> bool:
"""
Return true iff the given value matches this value.
"""
raise NotImplementedError()
@property
@abc.abstractmethod
def value(self) -> Optional[str]:
"""
Return the unescaped data value.
"""
raise NotImplementedError()
@classmethod
def create(cls, value: str) -> "DictPathValue":
"""
Create a DictPathValue from the given value. Argument `value` must have all
special characters escaped when their special meaning is not desired.
"""
if value == WildCardValue.WILDCARD_CHARACTER:
return WildCardValue()
elif value == NullValue.NULL_VALUE_CHARACTER:
return NullValue()
else:
unescaped_value: str = WildDictPath.PATTERN_ESCAPED_SPECIAL_CHARACTER.sub(r"\1", value)
return NormalValue(unescaped_value)
@classmethod
def from_object(cls, value: object) -> "DictPathValue":
"""
Create a DictPathValue from the given object.
:param value: The object to construct a DictPathValue for. It is interpreted as a literal value: if it is a string,
special characters will not be interpreted and must not be escaped. If it is not `None`, it must implement `str()`
to be an unambiguous representation of the object.
"""
if value is None:
return NullValue()
return NormalValue(str(value))
def __eq__(self, other: object) -> bool:
return isinstance(other, self.__class__)
@stable_api
class NormalValue(DictPathValue):
"""
A normal dict path value. This is a data value that is matched literally
against another value.
"""
def __init__(self, value: str) -> None:
super(DictPathValue, self).__init__()
self._value: str = value
self._numeric_value: Optional[float] = self._try_parse_numeric(value)
def escape(self) -> str:
return WildDictPath.PATTERN_SPECIAL_CHARACTER.sub(r"\\\1", self._value)
def matches(self, value: Optional[object]) -> bool:
if value is None:
return False
# Perform a numeric comparison only if the value is an int/float and
# The key in the dictpath can be interpreted as an int/float
if self._numeric_value is not None and isinstance(value, (int, float)):
return self._numeric_value == value
# Fallback to string comparison for other types
return self._value == str(value)
@staticmethod
def _try_parse_numeric(value: str) -> Optional[float]:
try:
return float(value)
except (ValueError, TypeError):
return None
@property
def value(self) -> str:
return self._value
def __eq__(self, other: object) -> bool:
if not isinstance(other, NormalValue):
return False
return self.value == other.value
@stable_api
class WildCardValue(DictPathValue):
"""
Represents a wildcard value. This is a data value that matches any other value.
"""
WILDCARD_CHARACTER: str = "*"
def escape(self) -> str:
return self.WILDCARD_CHARACTER
def matches(self, value: Optional[object]) -> bool:
return True
@property
def value(self) -> str:
raise Exception("A WildCardValue doesn't have an actual data value.")
@stable_api
class NullValue(DictPathValue):
"""
Represents the data value None. Matches against any other None value.
"""
NULL_VALUE_CHARACTER: str = r"\0"
def escape(self) -> str:
return self.NULL_VALUE_CHARACTER
def matches(self, value: Optional[object]) -> bool:
return value is None
@property
def value(self) -> None:
return None
@stable_api
class WildDictPath(abc.ABC):
"""
A base class for all dict paths segments. It supports the usage of wildcards, allowing to reach
multiple elements in the same Dict.
The wildcard is the character "*", its usage for the different segment type is documented later.
In the internal representation of this object, wildcards are represented using the value None.
The special characters mentioned in `WildDictPath.SPECIAL_CHARACTERS` should be escaped using a
backslash when its used as a data character (e.g. the value of a key in a dictionary) instead
of a control character (a character used to guide the dictpath library). Other characters must
never be escaped.
"""
# Special characters should be escaped in data elements of the dict path
# to prevent incorrect interpretation of the dict path.
SPECIAL_CHARACTERS: list[str] = ["\\", "[", "]", ".", "*", "="]
REGEX_SPECIAL_CHARACTER = rf"([{re.escape(''.join(SPECIAL_CHARACTERS))}])"
PATTERN_SPECIAL_CHARACTER = re.compile(REGEX_SPECIAL_CHARACTER)
REGEX_ESCAPED_SPECIAL_CHARACTER = rf"\\([{re.escape(''.join(SPECIAL_CHARACTERS))}])"
PATTERN_ESCAPED_SPECIAL_CHARACTER = re.compile(REGEX_ESCAPED_SPECIAL_CHARACTER)
REGEX_NORMAL_CHARACTER = rf"[^{re.escape(''.join(SPECIAL_CHARACTERS))}]"
# Add the WILDCARD variable here for backwards compatibility
WILDCARD = WildCardValue.WILDCARD_CHARACTER
@abc.abstractmethod
def get_elements(self, container: object) -> list[object]:
"""
Get the elements identified by this Path from the given collection.
If no element is matched, an empty list is returned.
:param container: the container to search in
"""
def resolve_wild_cards(self: TWDP, container: object) -> Sequence[TWDP]:
"""
Resolve all the wild cards contained in this wild dict path, based on the
given container. For each possible set of values that can replace the wild
cards and still resolve a subset of the elements matched by this expression,
return a wild dict path expression without the use of any wild card.
The set of all elements that can be resolved by all returned wild dict paths
for the given container, should be equal to the set of all elements matched by
this wild dict path expression for the given container.
This method may return a NotImplementedError if the path upon which it is called
isn't supported by the method.
:param container: the container to search in
"""
raise NotImplementedError(
"There is no default behavior for resolve_wild_cards on the base class WildDictCard, "
"it should be implemented in the sub classes."
)
@abc.abstractmethod
def to_str(self) -> str:
"""
Returns the dict path expression represented by this instance.
"""
def __str__(self) -> str:
return self.to_str()
def __add__(self, other: object) -> "WildDictPath":
if not isinstance(other, WildDictPath):
return NotImplemented
return WildComposedPath(path=list(self.get_path_sections()) + list(other.get_path_sections()))
@abc.abstractmethod
def get_path_sections(self) -> Sequence["WildDictPath"]:
"""
A DictPath can be a combination of multiple DictPaths, this returns all the DictPaths
that compose this one, or itself if it is not a composition of multiple DictPaths.
"""
@classmethod
@abc.abstractmethod
def parse(cls: type[TWDP], inp: str) -> Optional[TWDP]:
pass
def _validate_container(self, container: object) -> TypeGuard[dict[object, object]]:
return isinstance(container, dict)
@stable_api
class WildInDict(WildDictPath):
"""
This is the path that, if you call get_element on a dict, it returns the value stored in
that key in that dict. This class accepts only top level keys as its expression.
The string representation of the following path element is `a`
.. code_block:: python
assert WildInDict("a").get_elements(
{
"a":"b",
"c":"d",
}) == ["b"]
A wild card can be used to get all values from the dict. The wildcard only works as a single character.
.. code_block:: python
assert WildInDict("*").get_elements(
{
"a":"b",
"c":"d",
}
) == ["b", "d"]
The following code raises a KeyError.
.. code_block:: python
WildInDict("a*").get_elements(
{
"a":"b",
"c":"d",
}
)
"""
IN_DICT_PATTERN = re.compile(
rf"(?P<key>({WildDictPath.REGEX_NORMAL_CHARACTER}|{WildDictPath.REGEX_ESCAPED_SPECIAL_CHARACTER})+|\*)"
)
def __init__(self, key: str) -> None:
key_value = DictPathValue.create(key)
if not isinstance(key_value, NormalValue) and not isinstance(key_value, WildCardValue):
raise InvalidPathException(f"Invalid dictionary key {key}")
self.key: Union[NormalValue, WildCardValue] = key_value
def get_elements(self, container: object) -> list[object]:
if self._validate_container(container):
try:
return [value for key, value in container.items() if self.key.matches(key)]
except KeyError:
return []
else:
raise ContainerStructureException(f"{container} is not a Dict")
def resolve_wild_cards(self, container: object) -> Sequence["WildInDict"]:
"""
Get one WildInDict by key matching the wild key of this object.
..code-block:: python
assert WildInDict("a").resolve_wild_cards(
{
"a": "b",
"aa": "bb",
"c": "d",
}
) == [
WildInDict("a"),
]
assert WildInDict("*").resolve_wild_cards(
{
"a": "b",
"aa": "bb",
"c": "d",
}
) == [
WildInDict("a"),
WildInDict("aa"),
WildInDict("c"),
]
"""
if self._validate_container(container):
try:
return [WildInDict(str(key)) for key in container if self.key.matches(key)]
except KeyError:
return []
else:
raise ContainerStructureException(f"{container} is not a Dict")
def to_str(self) -> str:
return self.key.escape()
def get_path_sections(self) -> Sequence[WildDictPath]:
return [self]
@classmethod
def parse(cls: type[TWID], inp: str) -> Optional[TWID]:
match = cls.IN_DICT_PATTERN.fullmatch(inp)
if match:
return cls(inp)
return None
@stable_api
class WildKeyedList(WildDictPath):
"""
Find a specific item in a list, based on a key-value pair.
The list is in a dictionary itself.
The string representation of the following path element is `relation[key_attribute=key_value]`
A wild card can be used to get all values from the list having the key_attribute.
e.g.::
WildKeyedList("relation","key_attribute","key_value").get_elements(
{
"relation":[
{
"key_attribute":"key_value",
"other_attribute":"other_value"
},
{
"key_attribute":"other_value"
}
[
})
will return::
[
{
"key_attribute":"key_value",
"other_attribute":"other_value"
}
]
e.g.::
WildKeyedList("relation","key_attribute","*").get_elements(
{
"relation":[
{
"key_attribute":"key_value",
"other_attribute":"other_value",
},
{
"key_attribute":"other_value",
},
{
"other_key_attribute":"other_value",
},
[
})
will return::
[
{
"key_attribute":"key_value",
"other_attribute":"other_value",
},
{
"key_attribute":"other_value",
}
]
"""
REGEX_RELATION = rf"(?P<relation>({WildDictPath.REGEX_NORMAL_CHARACTER}|{WildDictPath.REGEX_ESCAPED_SPECIAL_CHARACTER})+)"
REGEX_KEY_ATTRIBUTE = (
rf"(?P<key_attribute>({WildDictPath.REGEX_NORMAL_CHARACTER}|{WildDictPath.REGEX_ESCAPED_SPECIAL_CHARACTER})+|\*)"
)
REGEX_KEY_VALUE = (
rf"(?P<key_value>({WildDictPath.REGEX_NORMAL_CHARACTER}|{WildDictPath.REGEX_ESCAPED_SPECIAL_CHARACTER})*|\*|\\0)"
)
KEY_VALUE_PAIR = rf"\[{REGEX_KEY_ATTRIBUTE}={REGEX_KEY_VALUE}]"
KEYED_LIST_PATTERN = re.compile(rf"^{REGEX_RELATION}(?P<selectors>({KEY_VALUE_PAIR})+)$")
KEY_VALUE_PAIRS_PATTERN = re.compile(KEY_VALUE_PAIR)
@overload
def __init__(self, relation: str, key_value_pairs: Sequence[tuple[str, str]]) -> None:
"""
:param relation: The relation on the object that is the keyed list
:param key_value_pairs: The key-value pairs to look for in each item of the keyed list.
The key is compared using string comparison. As such 5=="5" and False=="False"
"""
...
@overload
def __init__(self, relation: str, key_attribute: str, key_value: str, /) -> None:
"""
Deprecated constructor, kept for backwards compatibility reasons.
:param relation: The relation on the object that is the keyed list
:param key_attribute: The attribute to look for in each item of the keyed list
:param key_value: The attribute value to look for in each item of the keyed list.
"""
def __init__(
self, relation: str, key_value_pairs: Union[str, Sequence[tuple[str, str]]], key_value: Optional[str] = None
) -> None:
if isinstance(key_value_pairs, str):
LOGGER.warning(
"The %s(relation: str, key_attribute: str, key_value: str, /) constructor is deprecated and will be removed"
" in a future version. Please use %s(relation: str, key_value_pairs: Sequence[Tuple[str, str]]) instead",
self.__class__.__name__,
self.__class__.__name__,
)
assert key_value is not None
key_value_pairs = [(key_value_pairs, key_value)]
relation_value = DictPathValue.create(relation)
if not isinstance(relation_value, NormalValue):
raise InvalidPathException(f"Invalid relation name: {relation}")
self.relation: NormalValue = relation_value
if not key_value_pairs:
raise ValueError("A keyed list path requires at least one key-value pair.")
if len({pair[0] for pair in key_value_pairs}) != len(key_value_pairs):
raise ValueError("No duplicate keys allowed in keyed list path")
self.key_value_pairs: Sequence[tuple[Union[NormalValue, WildCardValue], DictPathValue]] = [
(self._parse_key(pair[0]), self._parse_value(pair[1])) for pair in key_value_pairs
]
@classmethod
def _parse_key(cls, key: str) -> Union[NormalValue, WildCardValue]:
"""
Parse a key string into the corresponding dict path object.
"""
result: DictPathValue = DictPathValue.create(key)
if not isinstance(result, NormalValue) and not isinstance(result, WildCardValue):
raise InvalidPathException(f"Invalid dictionary key name: {key}")
return result
@classmethod
def _parse_value(cls, value: str) -> DictPathValue:
"""
Parse a value string into the corresponding dict path object.
"""
return DictPathValue.create(value)
def _validate_outer_container(self, container: object) -> dict[object, object]:
if not isinstance(container, dict):
raise ContainerStructureException(f"{container} is not a Dict")
return container
def _validate_inner_container(self, container: object) -> list[object]:
if not isinstance(container, list):
raise ContainerStructureException(f"{container} is not a List or Set")
return container
def get_elements(self, container: object) -> list[object]:
outer = self._validate_outer_container(container)
try:
inner = outer[self.relation.value]
except KeyError:
return []
the_list = self._validate_inner_container(inner)
return [
dct
for dct in the_list
if isinstance(dct, dict)
and all(any(key.matches(k) and value.matches(v) for k, v in dct.items()) for key, value in self.key_value_pairs)
]
def resolve_wild_cards(self, container: object) -> Sequence["WildKeyedList"]:
"""
Get one WildKeyedList by set of matching key value pairs that can be found
in the given container at the relation specified in this object.
As of now, this method can not be called on dict path expressions using a wild card
in the key of any key-value pair. This is because the behavior for such type of path
is undefined. It might get defined later on, once we get a use case for them.
If the method is called on such a path, a NotImplementedError is raised.
..code-block:: python
assert WildKeyedList("relation","key_attribute","*").resolve_wild_cards(
{
"relation":[
{
"key_attribute":"key_value",
"other_attribute":"other_value",
},
{
"key_attribute":"other_value",
},
{
"other_key_attribute":"other_value",
},
[
}
) == [
WildKeyedList("relation", "key_attribute", "key_value"),
WildKeyedList("relation", "key_attribute", "other_value"),
]
assert WildKeyedList("relation",[("key_attribute","*"), ("other_attribute", "*")]).resolve_wild_cards(
{
"relation":[
{
"key_attribute":"key_value",
"other_attribute":"other_value",
},
{
"key_attribute":"other_value",
},
{
"other_key_attribute":"other_value",
},
[
}
) == [
WildKeyedList("relation", [("key_attribute","key_value"), ("other_attribute", "other_value")]),
]
"""
# Check if we have a wild card for any of the keys, if we do, raise a NotImplementedError
# as we didn't define an expected behavior for it yet
for key, _ in self.key_value_pairs:
if key == WildCardValue():
raise NotImplementedError(
"Can not call resolve_wild_cards on this path because it uses "
f"wild cards (`*`) for some of its keys: `{self}`. The desired "
"behavior of resolve_wild_cards for this type of path is currently undefined."
)
outer = self._validate_outer_container(container)
try:
inner = outer[self.relation.value]
except KeyError:
return []
# Save here all the KeyedList that can be constructed
matches: list[WildKeyedList] = []
# Check each of the inner values, and emit as many paths for it
# as can be created to match it (within the constraints of this path
# filters)
for dct in self._validate_inner_container(inner):
if not isinstance(dct, dict):
# Can't be a match, it is not even a dict
continue
# Save here all the key-value pairs that will compose our
# KeyedList dict path
pairs = [
[(str(k), str(v)) for k, v in dct.items() if key.matches(k) and value.matches(v)]
for key, value in self.key_value_pairs
]
# Emit one KeyedList for each combination of matched key-value pairs
matches.extend(
WildKeyedList(
self.relation.value,
path,
)
for path in itertools.product(*pairs)
)
return matches
def to_str(self) -> str:
escaped_relation: str = self.relation.escape()
escaped_key_value_pairs: str = "][".join(key.escape() + "=" + value.escape() for key, value in self.key_value_pairs)
return f"{escaped_relation}[{escaped_key_value_pairs}]"
def get_path_sections(self) -> Sequence[WildDictPath]:
return [self]
@classmethod
def parse(cls: type[TWKL], inp: str) -> Optional[TWKL]:
match = cls.KEYED_LIST_PATTERN.fullmatch(inp)
if match:
group_dct = match.groupdict()
pairs: list[tuple[str, str]] = [
(pair.group("key_attribute"), pair.group("key_value"))
for pair in cls.KEY_VALUE_PAIRS_PATTERN.finditer(group_dct["selectors"])
]
return cls(group_dct["relation"], pairs)
return None
def get_key_value_pairs(self) -> Sequence[tuple[str, Optional[str]]]:
"""
Return a list of tuples, where each element in the list is a literal (unescaped) key-value pair for this WildKeyedList.
"""
return [(key.value, value.value) for key, value in self.key_value_pairs]
def __eq__(self, other: object) -> bool:
if other.__class__ != self.__class__:
return False
assert isinstance(other, WildKeyedList) # Make mypy happy
return self.relation == other.relation and self.key_value_pairs == other.key_value_pairs
@stable_api
class WildComposedPath(WildDictPath):
"""
A path composed of multiple elements, separated by "."
"""
element_types: Sequence[type[WildDictPath]] = [WildInDict, WildKeyedList]
COMPOSED_DICT_PATH_PATTERN = re.compile(r"(?:[^.\\]|\\.)+")
def __init__(self, path_str: Optional[str] = None, path: Optional[Sequence[WildDictPath]] = None) -> None:
if (path_str is None) == (path is None):
raise ValueError("Either path or path_str should be set")
self.path: str
self.expanded_path: Sequence[WildDictPath]
if path_str is not None:
self.path = path_str
self.expanded_path = self.do_parse(path_str)
else:
assert path is not None
self.expanded_path = path
self.path = self.un_parse()
def un_parse(self) -> str:
return ".".join(element.to_str() for element in self.expanded_path)
@classmethod
def split_on_dots(cls, path_str: str) -> list[str]:
"""
Split the given `path_str` on dot characters if they are not escaped with a backslash.
"""
match = cls.COMPOSED_DICT_PATH_PATTERN.findall(path_str)
if not match:
raise InvalidPathException(f"Could not parse path {path_str}")
return match
@classmethod
def do_parse(cls, path_str: str) -> Sequence[WildDictPath]:
splitted_path_str: list[str] = cls.split_on_dots(path_str)
def parse_element(inp: str) -> WildDictPath:
for subtype in cls.element_types:
parsed = subtype.parse(inp)
if parsed is not None:
return parsed
raise InvalidPathException(f"Could not parse path segment {inp}")
return [parse_element(e) for e in splitted_path_str]
@classmethod
def parse(cls: type[TWCP], inp: str) -> Optional[TWCP]:
try:
path = cls.do_parse(path_str=inp)
return cls(path=path)
except InvalidPathException:
return None
def get_elements(self, container: object) -> list[object]:
if container is None:
raise IndexError("Can not get anything from None")
containers = [container]
for item in self.expanded_path:
next_containers = []
for container in containers:
next_containers.extend(item.get_elements(container))
containers = next_containers
return containers
def resolve_wild_cards(self, container: object) -> Sequence["WildComposedPath"]:
if container is None:
raise IndexError("Can not get anything from None")
if len(self.expanded_path) == 0:
# No need to dig further
return []
if len(self.expanded_path) == 1:
# The path is equivalent to its unique element, but we have to wrap
# each of the possible path into a ComposedPath object to be consistent
return [WildComposedPath(path=[path]) for path in self.expanded_path[0].resolve_wild_cards(container)]
return [
WildComposedPath(path=[path, *next_part.get_path_sections()])
for path in self.expanded_path[0].resolve_wild_cards(container)
for elem in path.get_elements(container)
for next_part in WildComposedPath(path=self.expanded_path[1:]).resolve_wild_cards(elem)
]
def to_str(self) -> str:
return self.path
def get_path_sections(self) -> Sequence[WildDictPath]:
return self.expanded_path
def __eq__(self, other: object) -> bool:
if other.__class__ != self.__class__:
return False
assert isinstance(other, type(self)), f"{type(other)} != {type(self)}" # Make mypy happy
return self.expanded_path == other.expanded_path
@stable_api
class WildNullPath(WildDictPath):
"""
A DictPath with no length
(i.e. return the container itself, wrapped in a list)
"""
def get_elements(self, container: object) -> list[object]:
if self._validate_container(container):
return [container]
else:
raise ContainerStructureException(f"{container} is not a Dict")
def resolve_wild_cards(self, container: object) -> Sequence["WildNullPath"]:
if self._validate_container(container):
return [WildNullPath()]
else:
raise ContainerStructureException(f"{container} is not a Dict")
def get_path_sections(self) -> Sequence["DictPath"]:
return []
def to_str(self) -> str:
return "."
@classmethod
def parse(cls, inp: str) -> None:
raise NotImplementedError("NullPath is not intended to be parseable, it should only be used programmatically.")
[docs]
@stable_api
class DictPath(WildDictPath):
"""
A base class for all non-wild dict paths segments. The key difference between WildDictPath and DictPath subclasses are:
1. WildDictPath can only get a list of elements, with get_elements. If no element is found, an empty list is returned,
no error is raised.
2. DictPath can not use get_elements as it is always expected to have exactly one match.
3. DictPath can use get_element, which will return the matching element, or raise an exception if more or less than one
is found.
4. DictPath can set values, using set_element, and can build the dict structure expected by the path by using the
construct flag in the get_element method.
"""
[docs]
@abc.abstractmethod
def get_element(self, container: object, construct: bool = False) -> object:
"""
Get the element identified by this Path from the given collection
:param container: the container to search in
:param construct: construct a dict on the location identified by this path in the container
if the element doesn't exist. Return this new dict.
:raises KeyError: if the element is not found or if more than one occurrence was found.
"""
[docs]
def get_elements(self, container: object) -> list[object]:
try:
return [self.get_element(container, False)]
except LookupError:
return []
[docs]
@abc.abstractmethod
def set_element(self, container: object, value: object, construct: bool = True) -> None:
"""
Set the element identified by this Path from the given collection.
If construct is True, all containers on the path towards the value are constructed if absent.
:raises LookupError: if the path leading to the element is not found or if more than one occurrence was found.
"""
def __add__(self, other: object) -> "DictPath":
if not isinstance(other, DictPath):
return NotImplemented
return ComposedPath(path=list(self.get_path_sections()) + list(other.get_path_sections()))
[docs]
def get_path_sections(self) -> Sequence["DictPath"]:
"""Get the individual parts of this path"""
return []
[docs]
@abc.abstractmethod
def get_key(self) -> str:
"""
Return the dictionary key referenced by this element in the dict path.
"""
[docs]
@abc.abstractmethod
def remove(self, container: object) -> None:
"""
Remove an element if it exists:
* On an InDict or a WildInDict: Remove the referenced key from the dictionary.
* On a KeyedList or a WildKeyedList: Remove the referenced element from the list.
* On a NullPath: This operation is not supported on a NullPath.
"""
raise NotImplementedError()
@stable_api
class InDict(DictPath, WildInDict):
"""
This is the path that, if you call get_element on a dict, it returns the value stored in that key in that dict.
The string representation of the following path element is: "a"
.. code_block:: python
assert InDict("a").get_element(
{
"a":"b",
"c":"d",
}) == "b"
"""
def __init__(self, key: str) -> None:
WildInDict.__init__(self, key)
if isinstance(self.key, WildCardValue):
raise ValueError(f"The Wildcard ('{WildCardValue.WILDCARD_CHARACTER}') can not be used in DictPath's")
# Override type annotation from super class
self.key: NormalValue
def get_element(self, container: object, construct: bool = False) -> object:
elements = WildInDict.get_elements(self, container)
if not elements and construct:
if self._validate_container(container):
container[self.key.value] = {}
return container[self.key.value]
else:
raise ContainerStructureException(f"{container} is not a Dict")
if len(elements) != 1:
raise KeyError(f"Found no or multiple items matching {self.to_str()} in {container}: {elements}")
return elements[0]
def set_element(self, container: object, value: object, construct: bool = True) -> None:
if self._validate_container(container):
container[self.key.value] = value
else:
raise ContainerStructureException(f"{container} is not a Dict")
def get_path_sections(self) -> Sequence[DictPath]:
return [self]
def get_key(self) -> str:
return self.key.value
def remove(self, container: object) -> None:
if self._validate_container(container):
for key in list(container.keys()):
if self.key.matches(key):
del container[key]
else:
raise ContainerStructureException(f"{container} is not a Dict")
@stable_api
class KeyedList(DictPath, WildKeyedList):
"""
Find a specific item in a list, based on a key-value pair.
The list is in a dictionary itself.
The string representation of the following path element is `relation[key_attribute=key_value]`
e.g.::
KeyedList("relation","key_attribute","key_value").get_element(
{
"relation":[
{
"key_attribute":"key_value",
"other_attribute":"other_value"
},
{
"key_attribute":"other_value"
}
[
})
will return::
{
"key_attribute":"key_value",
"other_attribute":"other_value"
}
"""
def __init__(
self, relation: str, key_value_pairs: Union[str, Sequence[tuple[str, str]]], key_value: Optional[str] = None
) -> None:
if isinstance(key_value_pairs, str):
assert key_value is not None
WildKeyedList.__init__(self, relation, key_value_pairs, key_value)
else:
assert key_value is None
WildKeyedList.__init__(self, relation, key_value_pairs)
# Override type annotation from super class
self.key_value_pairs: Sequence[tuple[NormalValue, Union[NormalValue, NullValue]]]
@classmethod
def _parse_key(cls, key: str) -> NormalValue:
result: Union[NormalValue, WildCardValue] = super()._parse_key(key)
if isinstance(result, WildCardValue):
raise ValueError(f"The Wildcard ('{WildCardValue.WILDCARD_CHARACTER}') can not be used in DictPath's")
return result
@classmethod
def _parse_value(cls, value: str) -> DictPathValue:
"""
Parse a value string into the corresponding dict path object.
"""
result: DictPathValue = super()._parse_value(value)
if isinstance(result, WildCardValue):
raise ValueError(f"The Wildcard ('{WildCardValue.WILDCARD_CHARACTER}') can not be used in DictPath's")
return result
def get_element(self, container: object, construct: bool = False) -> object:
found = WildKeyedList.get_elements(self, container)
if not found and construct:
outer = self._validate_outer_container(container)
if self.relation.value not in outer:
outer[self.relation.value] = []
the_list = self._validate_inner_container(outer[self.relation.value])
new_dict: dict[Optional[str], Optional[str]] = {key.value: value.value for key, value in self.key_value_pairs}
the_list.append(new_dict)
return new_dict
if len(found) != 1:
raise KeyError(f"Found no or multiple items matching {self.to_str()} in {container}: {found}")
return found[0]
def set_element(self, container: object, value: object, construct: bool = True) -> None:
outer: dict[object, object] = self._validate_outer_container(container)
try:
inner = outer[self.relation.value]
except KeyError:
inner = []
outer[self.relation.value] = inner
the_list: list[object] = self._validate_inner_container(inner)
try:
element_to_be_replaced: object = self.get_element(container, construct=False)
except KeyError:
the_list.append(value)
else:
index = the_list.index(element_to_be_replaced)
the_list[index] = value
def get_path_sections(self) -> Sequence[DictPath]:
return [self]
def get_key(self) -> str:
return self.relation.value
def remove(self, container: object) -> None:
outer = self._validate_outer_container(container)
try:
inner = outer[self.relation.value]
except KeyError:
return
the_list = self._validate_inner_container(inner)
outer[self.relation.value] = [
dct
for dct in the_list
if not isinstance(dct, dict)
or not all(any(key.matches(k) and value.matches(v) for k, v in dct.items()) for key, value in self.key_value_pairs)
]
@stable_api
class ComposedPath(DictPath, WildComposedPath):
"""
A path composed of multiple elements, separated by "."
"""
element_types: Sequence[type[DictPath]] = [InDict, KeyedList]
def __init__(self, path_str: Optional[str] = None, path: Optional[Sequence[DictPath]] = None) -> None:
WildComposedPath.__init__(self, path_str, path)
self.expanded_path: Sequence[DictPath]
def get_element(self, container: object, construct: bool = False) -> object:
elements = WildComposedPath.get_elements(self, container)
if not elements and construct:
element = container
for item in self.get_path_sections():
element = item.get_element(element, True)
return element
if len(elements) != 1:
raise KeyError(f"Found no or multiple items matching {self.to_str()} in {container}: {elements}")
return elements[0]
def set_element(self, container: object, value: object, construct: bool = True) -> None:
for item in self.get_path_sections()[:-1]:
container = item.get_element(container, construct=construct)
self.get_path_sections()[-1].set_element(container, value)
def get_path_sections(self) -> Sequence[DictPath]:
return self.expanded_path
def get_key(self) -> str:
raise NotImplementedError("Method get_key() not supported on a ComposedPath")
def remove(self, container: object) -> None:
for item in self.get_path_sections()[:-1]:
try:
container = item.get_element(container, construct=False)
except KeyError:
return
self.get_path_sections()[-1].remove(container)
@stable_api
class NullPath(DictPath, WildNullPath):
"""
A DictPath with no length
(i.e. return the container itself)
"""
def get_element(self, container: object, construct: bool = False) -> dict[object, object]:
if self._validate_container(container):
return container
else:
raise ContainerStructureException(f"{container} is not a Dict")
def set_element(self, container: object, value: object, construct: bool = True) -> None:
if not self._validate_container(container):
raise ContainerStructureException(f"Argument container is not a Dict: {container}")
if not self._validate_container(value):
raise ContainerStructureException(f"Argument value is not a Dict: {container}")
assert isinstance(container, dict)
assert isinstance(value, dict)
container.clear()
for key, value in value.items():
container[key] = value
@classmethod
def parse(cls, inp: str) -> None:
raise NotImplementedError("NullPath is not intended to be parseable, it should only be used programmatically.")
def get_key(self) -> str:
raise NotImplementedError("Method get_key() is not supported on a NullPath")
def remove(self, container: object) -> None:
raise NotImplementedError("Method remove() is not supported on a NullPath")
[docs]
@stable_api
def to_wild_path(inp: str) -> WildDictPath:
"""
Convert a string to a WildDictPath
:raises InvalidPathException: the path is not valid
"""
if inp == ".":
return WildNullPath()
if inp.startswith("."):
# A leading dot represents the entire container
inp = inp[1:]
try:
return WildComposedPath(path_str=inp)
except ValueError as e:
raise InvalidPathException(str(e))
[docs]
@stable_api
def to_path(inp: str) -> DictPath:
"""
Convert a string to a DictPath
:raises InvalidPathException: the path is not valid
"""
if inp == ".":
return NullPath()
if inp.startswith("."):
# A leading dot represents the entire container
inp = inp[1:]
try:
return ComposedPath(path_str=inp)
except ValueError as e:
raise InvalidPathException(str(e))