"""
:copyright: 2021 Inmanta
:contact: code@inmanta.com
:license: Inmanta EULA
"""
from abc import ABC, abstractmethod
from copy import deepcopy
from typing import Dict, List, Optional, Tuple, Type, TypeVar
import pydantic
from inmanta import plugins
from inmanta.util.dict_path import (
ComposedPath,
DictPath,
DictPathValue,
InDict,
KeyedList,
WildComposedPath,
WildDictPath,
WildKeyedList,
to_path,
)
from inmanta_plugins.lsm.allocation_v2.entity import AttributeSetName, ServiceInstance
T = TypeVar("T")
TC = TypeVar("TC", bound="ContextV2")
TCC = TypeVar("TCC", bound="ChildContextV2")
[docs]
class AllocatorV2(ABC):
"""
Base class for all V2 Allocators
"""
[docs]
def pre_allocate(self) -> None:
"""
Called before allocation, can be used e.g., to establish a connection to the external inventory
"""
[docs]
def post_allocate(self) -> None:
"""
Called after allocation, can be used e.g., to teardown a connection to the external inventory
"""
[docs]
@abstractmethod
def needs_allocation(self, context: "ContextV2") -> bool:
"""
Determine if this allocator has any work to do or if all values have already been allocated correctly.
"""
[docs]
@abstractmethod
def allocate(self, context: "ContextV2") -> None:
"""
Perform the actual allocation
Allocated values will be written back to the context using :func:`ContextV2.set_value`
"""
[docs]
class ContextV2:
"""
All interactions between allocators happen via this context.
The context is responsible for
1. exposing the current instance to the allocator
2. allow the allocator to set values
3. allow allocators to be used in the same way on embedded instances as on top-level instances
4. carry shared state between allocators (such as connections to the inventory)
Each Context exposes a :class:`ServiceInstance` via the :func:`get_instance` method.
It exposes this instance as if it is the service itself.
I.e. embedded entities are presented in the same way as top level services.
To allow the context to carry additional state (such as a connection to an inventory),
make a subclass of this class.
:ivar plugin_context: The plugin context attached to this context
:ivar instance: The instance this context is attached to
:ivar flush_set: A dict containing a trace of all the set_value call that where made on this context object.
The key is the :param path: that was passed to the method
The value is the :param value: that was passed to the method
"""
def __init__(
self,
instance: ServiceInstance,
plugin_context: plugins.Context,
):
"""
A context, that has no parent
:param instance: the instance this context is attached to
"""
self.instance = instance
self.plugin_context = plugin_context
self.flush_set: Dict[str, object] = {}
self._plugin_context: Optional[plugins.Context] = plugin_context
self._list_children: Dict[Tuple[str, str], List[ChildContextV2]] = {}
self._children: Dict[str, ChildContextV2] = {}
@property
def environment(self) -> str:
"""
Return the environment id
"""
return self.plugin_context.get_environment_id()
[docs]
@classmethod
def clone(
cls: Type[TC],
instance: ServiceInstance,
plugin_context: plugins.Context,
) -> TC:
"""
Produce a new instance of this context
(this method acts as a polymorphic constructor)
:param instance: the instance this new context should be attached to
"""
return cls(instance, plugin_context)
[docs]
def build_child(self, path: DictPath, wild_path: WildDictPath) -> "ChildContextV2":
"""
Produce a new child instance of this context
(this method acts as a polymorphic constructor)
:param path: the path for the new context, relative to the parent
:param wild_path: the path for the new context, or any context at the same level, relative to the parent
"""
return ChildContextV2(self, path, wild_path)
[docs]
def get_instance(self) -> ServiceInstance:
"""
:returns: Get the service instance bound to this context
"""
return self.instance
[docs]
def set_value(self, path: DictPath, value: object) -> None:
"""
Set the `value` at the given `path` and make sure this allocation is written back to the LSM database.
This call will update the :class:`ServiceInstance` and the LSM database.
The flushing to database can either be when allocation succeeds, when it fails or immediately.
This behavior is determined by the context.
:param path: the path at which to set the value
:param value: the value to set at the given path
"""
# write internal
path.set_element(self.instance.candidate_attributes, value)
# store for flush
self.flush_set[path.to_str()] = value
[docs]
def get_full_path(self) -> Optional[DictPath]:
"""
Get the full path up to the instance in this context.
The root context should return None as there is no path required to reach it.
"""
return None
[docs]
def get_wild_full_path(self) -> Optional[WildDictPath]:
"""
Get the full path up to the instance in this context, replacing all KeyedList by WildKeyedList.
The root context should return None as there is no path required to reach it.
"""
return None
[docs]
def get_list_children(
self, relation: str, key_attribute: str
) -> "List[ChildContextV2]":
"""
Not intended for external usage
Idempotent method
Get a context for every child that matches the path
`relation[key_attribute=*]` in the attributes set of the instance
"""
key = (relation, key_attribute)
if key in self._list_children:
return self._list_children[key]
else:
keyset = {
item[key_attribute]
for item in pydantic.parse_obj_as(
List[Dict], self.instance.get(InDict(relation))
)
}
self._list_children[key] = [
self.build_child(
KeyedList(
relation,
[(key_attribute, DictPathValue.from_object(key).escape())],
),
WildKeyedList(relation, [(key_attribute, WildDictPath.WILDCARD)]),
)
for key in keyset
]
return self._list_children[key]
[docs]
def get_child(self, relation: str) -> "ChildContextV2":
"""
Not intended for external usage
Idempotent method
Get a context for the child that matches the path `relation`
"""
if relation in self._children:
return self._children[relation]
else:
self._children[relation] = self.build_child(
ComposedPath(relation),
WildComposedPath(relation),
)
return self._children[relation]
class ChildContextV2(ContextV2):
"""
All interactions between allocators happen via this context.
The context is responsible for
1. exposing the current instance to the allocator
2. allow the allocator to set values
3. allow allocators to be used in the same way on embedded instances as on top-level instances
4. carry shared state between allocators (such as connections to the inventory)
Each Context exposes a :class:`ServiceInstance` via the :func:`get_instance` method.
It exposes this instance as if it is the service itself.
I.e. embedded entities are presented in the same way as top level services.
To construct the top-level context, use the :class:`ContextV2`
To allow the context to carry additional state (such as a connection to an inventory),
make a subclass of this class.
By overriding the :func:`clone_as_child` method, this new type will also be used to construct child contexts.
"""
def __init__(self, parent: ContextV2, path: DictPath, wild_path: WildDictPath):
"""
A context, that is a child of parent, distinguished by the given path
:param parent: the parent context
:param path: the path for the new context, relative to the parent
:param wild_path: the path for the new context, or any context at the same level, relative to the parent
"""
self.parent = parent
self.path = path
self.wild_path = wild_path
super().__init__(
instance=parent.get_instance().child_for(path),
plugin_context=parent.plugin_context,
)
@classmethod
def clone(
cls: Type[TC],
instance: ServiceInstance,
plugin_context: plugins.Context,
) -> TC:
raise NotImplementedError(
"Clone should never be used on ChildContextV2 instances"
)
@classmethod
def clone_as_child(
cls: Type[TCC], parent: "ContextV2", path: DictPath, wild_path: WildDictPath
) -> TCC:
"""
Produce a new instance of this context
(this method acts as a polymorphic constructor)
:param parent: the parent context
:param path: the path for the new context, relative to the parent
:param wild_path: the path for the new context, or any context at the same level, relative to the parent
"""
return cls(parent, path, wild_path)
def build_child(self, path: DictPath, wild_path: WildDictPath) -> "ChildContextV2":
# By default, a child context will have as child context an instance of the same class.
# This is done by calling directly the polymorphic constructor.
return self.clone_as_child(self, path, wild_path)
def set_value(self, path: DictPath, value: object) -> None:
"""
Set the `value` at the given `path` and make sure this allocation is written back to the LSM database.
This call will update the :class:`ServiceInstance` and the LSM database.
The flushing to database can either be when allocation succeeds, when it fails or immediately.
This behavior is determined by the context.
:param path: the path at which to set the value
:param value: the value to set at the given path
"""
self.parent.set_value(self.path + path, value)
def get_full_path(self) -> Optional[DictPath]:
"""
Get the full path up to the instance in this context.
The root context should return None as there is no path required to reach it.
"""
parent_path = self.parent.get_full_path()
if parent_path is None:
return self.path
return parent_path + self.path
def get_wild_full_path(self) -> Optional[WildDictPath]:
"""
Get the full path up to the instance in this context, replacing all KeyedList by WildKeyedList.
The root context should return None as there is not path required to reach it.
"""
parent_path = self.parent.get_wild_full_path()
if parent_path is None:
return self.wild_path
return parent_path + self.wild_path
[docs]
class ForEach(AllocatorV2):
"""
Allocator to apply a given set of allocators to each instance in a list of instances.
The instances are uniquely identified by an attribute
"""
[docs]
def __init__(
self, item: str, in_list: str, identified_by: str, apply: List[AllocatorV2]
) -> None:
"""
:param item: name of the loop variable (not used, to improve readability and error reporting)
:param in_list: the relation from which to get the items
:param identified_by: the identifying attribute of each item
:param apply: the allocators to apply to each item
e.g:
.. code-block:: inmanta
entity Service extends lsm::ServiceInstance:
end
entity SubService extends lsm::EmbeddedEntity:
string id
string to_allocate
lsm::attribute_modifier to_allocate__modifier="r"
end
Service.subservices [0:] -- SubService
.. code-block:: python
lsm.AllocationSpecV2(
"service_allocator_spec",
allocation.ContextV2Wrapper(
"allocated",
allocation.ForEach(
item="subservice",
in_list="subservices",
identified_by="id",
apply=[
ValueAllocator(
into="to_allocate",
),
],
),
),
)
"""
self.item = item
self.in_list = in_list
self.identified_by = identified_by
self.apply = apply
[docs]
def needs_allocation(self, context: "ContextV2") -> bool:
return any(
allocator.needs_allocation(ctx)
for ctx in context.get_list_children(self.in_list, self.identified_by)
for allocator in self.apply
)
[docs]
def allocate(self, context: "ContextV2") -> None:
for ctx in context.get_list_children(self.in_list, self.identified_by):
for allocator in self.apply:
allocator.allocate(ctx)
class SetSensitiveForEach(ForEach):
"""A for loop requiring allocation if the candidate is different from the active"""
def needs_allocation(self, context: "ContextV2") -> bool:
# check children
if super().needs_allocation(context):
return True
try:
# check if sets have changed
all_id_path = WildKeyedList(
self.in_list, [(self.identified_by, WildDictPath.WILDCARD)]
)
id_path = WildComposedPath(path=[all_id_path, InDict(self.identified_by)])
active_set = id_path.get_elements(context.get_instance().active_attributes)
candidate_set = id_path.get_elements(
context.get_instance().candidate_attributes
)
return set(active_set) != set(candidate_set)
except LookupError:
# something isn't there
return True
[docs]
class ContextV2Wrapper(AllocatorV2):
"""
This wrapper allows to use allocators for attributes of embedded entities. It performs the allocation
for all the provided allocators, and save/allocate all of their allocated values in a single key.
This wrapper can be used in pair with the context_v2_unwrapper plugin, which allows to place all those
allocated values back where they are supposed to go.
"""
[docs]
def __init__(
self,
fallback_attribute: str,
*allocators: AllocatorV2,
track_deletes: bool = False,
) -> None:
"""
This wrapper allows to use allocators for attributes of embedded entities. It will condense
all the changes in a dict:
- whose keys are dict_path to the attribute of the embedded entities
- whose values are the one to allocate
This dict is itself saved at the root of the changed attributes dict, saved in the fallback attributes
specified in the constructor.
:param fallback_attributes: The attribute in which all the allocated values should be saved
:param allocators: The allocators whose values should be saved
:param track_deletes: when this is set, when an embedded entity is deleted during an update,
it will also be dropped from the fallback_attribute.
This also implies that embedded entities which are created by allocation alone (all attributes of type 'r')
are not supported.
e.g:
.. code-block:: python
lsm.AllocationSpecV2(
"service_allocator_spec",
allocation.ContextV2Wrapper(
"allocated",
allocation.ForEach(
item="subservice",
in_list="subservices",
identified_by="id",
apply=[
ValueAllocator(
into="to_allocate",
),
],
),
),
)
This would generate as update attribute dict something like
.. code-block:: json
{
"allocated": {
"subservices[id=1].to_allocate": "a",
"subservices[id=2].to_allocate": "b",
}
}
"""
self.fallback_attribute = fallback_attribute
self.allocators = allocators
self.track_deletes = track_deletes
def _get_wrapped_context(self, parent_context: "ContextV2") -> "ContextV2":
"""
Idempotent method, only creates a new wrapped context if none has been created yet.
The wrapped context is a parent context replacement, that this allocator holds and
all its children will see as root parent. Once all the allocations have been done
on this context, this allocator will save all those allocations in the
fallback_attribute dict attribute, in its parent context.
The context replacement is a duplicate of the parent one because it acts on the same
instance. This also means that all changes applied to the context replacement
instance are automatically applied to the parent context instance. This is desired
as we want any change one the instance done by any allocator to be visible in any
other allocator executing on the same instance.
Returns the context to pass to children allocators.
"""
assert (
type(parent_context) is ContextV2
), "ContextV2Wrapper can only be used at the root of an allocation tree"
parent_reserved_attribute = f"reserved_{self.fallback_attribute}"
if hasattr(parent_context, parent_reserved_attribute):
context: ContextV2 = getattr(parent_context, parent_reserved_attribute)
assert type(context) is ContextV2
return context
fresh_context = ContextV2(
parent_context.instance,
plugin_context=parent_context._plugin_context,
)
# pre-populate flush set with old values
active_attributes = parent_context.instance.get_attribute_set(
AttributeSetName.attributes
)
allocated = active_attributes.get(self.fallback_attribute)
if allocated:
assert isinstance(allocated, dict)
# When tracking deletes
# Remove all things we had allocated for entities that don't exist anymore
def is_path_actual(path: str) -> bool:
path_head = to_path(path).get_path_sections()[:-1]
try:
ComposedPath(path=path_head).get_element(active_attributes)
except LookupError:
return False
return True
if self.track_deletes:
allocated_filtered = {
path: value
for path, value in allocated.items()
if is_path_actual(path)
}
else:
allocated_filtered = allocated
fresh_context.flush_set = deepcopy(allocated_filtered)
# Unwrap
for attributes_set in AttributeSetName:
attributes = parent_context.instance.get_attribute_set(attributes_set)
if attributes is None:
continue
unwrap_attributes(
attributes, self.fallback_attribute, track_deletes=self.track_deletes
)
# cache
setattr(parent_context, parent_reserved_attribute, fresh_context)
return fresh_context
[docs]
def needs_allocation(self, context: "ContextV2") -> bool:
"""
Needs allocation if any of the children allocators need allocation.
"""
wrapped_context = self._get_wrapped_context(context)
return any(
allocator.needs_allocation(wrapped_context) for allocator in self.allocators
)
[docs]
def allocate(self, context: "ContextV2") -> None:
"""
Run allocation for all the children, then aggregate all the allocated values into
a single value, in the parent context.
"""
wrapped_context = self._get_wrapped_context(context)
for allocator in self.allocators:
allocator.allocate(wrapped_context)
# For each allocation in the flush set, if the allocation is only one level deep,
# we add it to the parent context. Else it will be added to the allocated dict.
key: str
value: object
for key, value in wrapped_context.flush_set.items():
path = to_path(key)
if len(path.get_path_sections()) > 1:
# Skipping too deep allocation
continue
context.set_value(path, value)
# Set value for all allocated values
context.set_value(to_path(self.fallback_attribute), wrapped_context.flush_set)
def unwrap_attributes(
attributes: dict, fallback_attribute: str, track_deletes: bool = False
) -> dict:
allocated: Optional[dict] = attributes.get(fallback_attribute)
if not allocated:
return attributes
for path_str, value in allocated.items():
path = to_path(path_str)
# when constructing paths, we effectively ignore deletes by re-creating the deleted embedded entities
# when not constructing paths, we 'follow' the deletes.
try:
path.set_element(attributes, value, construct=not track_deletes)
except LookupError:
pass
return attributes