Source code for inmanta_plugins.lsm.allocation_v2.framework

"""
    :copyright: 2021 Inmanta
    :contact: code@inmanta.com
    :license: Inmanta EULA
"""

from abc import ABC, abstractmethod
from copy import deepcopy
from typing import Dict, List, Optional, Tuple, Type, TypeVar

import pydantic
from inmanta import plugins
from inmanta.util.dict_path import (
    ComposedPath,
    DictPath,
    DictPathValue,
    InDict,
    KeyedList,
    WildComposedPath,
    WildDictPath,
    WildKeyedList,
    to_path,
)
from inmanta_plugins.lsm.allocation_v2.entity import AttributeSetName, ServiceInstance

T = TypeVar("T")
TC = TypeVar("TC", bound="ContextV2")
TCC = TypeVar("TCC", bound="ChildContextV2")


[docs] class AllocatorV2(ABC): """ Base class for all V2 Allocators """
[docs] def pre_allocate(self) -> None: """ Called before allocation, can be used e.g., to establish a connection to the external inventory """
[docs] def post_allocate(self) -> None: """ Called after allocation, can be used e.g., to teardown a connection to the external inventory """
[docs] @abstractmethod def needs_allocation(self, context: "ContextV2") -> bool: """ Determine if this allocator has any work to do or if all values have already been allocated correctly. """
[docs] @abstractmethod def allocate(self, context: "ContextV2") -> None: """ Perform the actual allocation Allocated values will be written back to the context using :func:`ContextV2.set_value` """
[docs] class ContextV2: """ All interactions between allocators happen via this context. The context is responsible for 1. exposing the current instance to the allocator 2. allow the allocator to set values 3. allow allocators to be used in the same way on embedded instances as on top-level instances 4. carry shared state between allocators (such as connections to the inventory) Each Context exposes a :class:`ServiceInstance` via the :func:`get_instance` method. It exposes this instance as if it is the service itself. I.e. embedded entities are presented in the same way as top level services. To allow the context to carry additional state (such as a connection to an inventory), make a subclass of this class. :ivar plugin_context: The plugin context attached to this context :ivar instance: The instance this context is attached to :ivar flush_set: A dict containing a trace of all the set_value call that where made on this context object. The key is the :param path: that was passed to the method The value is the :param value: that was passed to the method """ def __init__( self, instance: ServiceInstance, plugin_context: plugins.Context, ): """ A context, that has no parent :param instance: the instance this context is attached to """ self.instance = instance self.plugin_context = plugin_context self.flush_set: Dict[str, object] = {} self._plugin_context: Optional[plugins.Context] = plugin_context self._list_children: Dict[Tuple[str, str], List[ChildContextV2]] = {} self._children: Dict[str, ChildContextV2] = {} @property def environment(self) -> str: """ Return the environment id """ return self.plugin_context.get_environment_id()
[docs] @classmethod def clone( cls: Type[TC], instance: ServiceInstance, plugin_context: plugins.Context, ) -> TC: """ Produce a new instance of this context (this method acts as a polymorphic constructor) :param instance: the instance this new context should be attached to """ return cls(instance, plugin_context)
[docs] def build_child(self, path: DictPath, wild_path: WildDictPath) -> "ChildContextV2": """ Produce a new child instance of this context (this method acts as a polymorphic constructor) :param path: the path for the new context, relative to the parent :param wild_path: the path for the new context, or any context at the same level, relative to the parent """ return ChildContextV2(self, path, wild_path)
[docs] def get_instance(self) -> ServiceInstance: """ :returns: Get the service instance bound to this context """ return self.instance
[docs] def set_value(self, path: DictPath, value: object) -> None: """ Set the `value` at the given `path` and make sure this allocation is written back to the LSM database. This call will update the :class:`ServiceInstance` and the LSM database. The flushing to database can either be when allocation succeeds, when it fails or immediately. This behavior is determined by the context. :param path: the path at which to set the value :param value: the value to set at the given path """ # write internal path.set_element(self.instance.candidate_attributes, value) # store for flush self.flush_set[path.to_str()] = value
[docs] def get_full_path(self) -> Optional[DictPath]: """ Get the full path up to the instance in this context. The root context should return None as there is no path required to reach it. """ return None
[docs] def get_wild_full_path(self) -> Optional[WildDictPath]: """ Get the full path up to the instance in this context, replacing all KeyedList by WildKeyedList. The root context should return None as there is no path required to reach it. """ return None
[docs] def get_list_children( self, relation: str, key_attribute: str ) -> "List[ChildContextV2]": """ Not intended for external usage Idempotent method Get a context for every child that matches the path `relation[key_attribute=*]` in the attributes set of the instance """ key = (relation, key_attribute) if key in self._list_children: return self._list_children[key] else: keyset = { item[key_attribute] for item in pydantic.parse_obj_as( List[Dict], self.instance.get(InDict(relation)) ) } self._list_children[key] = [ self.build_child( KeyedList( relation, [(key_attribute, DictPathValue.from_object(key).escape())], ), WildKeyedList(relation, [(key_attribute, WildDictPath.WILDCARD)]), ) for key in keyset ] return self._list_children[key]
[docs] def get_child(self, relation: str) -> "ChildContextV2": """ Not intended for external usage Idempotent method Get a context for the child that matches the path `relation` """ if relation in self._children: return self._children[relation] else: self._children[relation] = self.build_child( ComposedPath(relation), WildComposedPath(relation), ) return self._children[relation]
class ChildContextV2(ContextV2): """ All interactions between allocators happen via this context. The context is responsible for 1. exposing the current instance to the allocator 2. allow the allocator to set values 3. allow allocators to be used in the same way on embedded instances as on top-level instances 4. carry shared state between allocators (such as connections to the inventory) Each Context exposes a :class:`ServiceInstance` via the :func:`get_instance` method. It exposes this instance as if it is the service itself. I.e. embedded entities are presented in the same way as top level services. To construct the top-level context, use the :class:`ContextV2` To allow the context to carry additional state (such as a connection to an inventory), make a subclass of this class. By overriding the :func:`clone_as_child` method, this new type will also be used to construct child contexts. """ def __init__(self, parent: ContextV2, path: DictPath, wild_path: WildDictPath): """ A context, that is a child of parent, distinguished by the given path :param parent: the parent context :param path: the path for the new context, relative to the parent :param wild_path: the path for the new context, or any context at the same level, relative to the parent """ self.parent = parent self.path = path self.wild_path = wild_path super().__init__( instance=parent.get_instance().child_for(path), plugin_context=parent.plugin_context, ) @classmethod def clone( cls: Type[TC], instance: ServiceInstance, plugin_context: plugins.Context, ) -> TC: raise NotImplementedError( "Clone should never be used on ChildContextV2 instances" ) @classmethod def clone_as_child( cls: Type[TCC], parent: "ContextV2", path: DictPath, wild_path: WildDictPath ) -> TCC: """ Produce a new instance of this context (this method acts as a polymorphic constructor) :param parent: the parent context :param path: the path for the new context, relative to the parent :param wild_path: the path for the new context, or any context at the same level, relative to the parent """ return cls(parent, path, wild_path) def build_child(self, path: DictPath, wild_path: WildDictPath) -> "ChildContextV2": # By default, a child context will have as child context an instance of the same class. # This is done by calling directly the polymorphic constructor. return self.clone_as_child(self, path, wild_path) def set_value(self, path: DictPath, value: object) -> None: """ Set the `value` at the given `path` and make sure this allocation is written back to the LSM database. This call will update the :class:`ServiceInstance` and the LSM database. The flushing to database can either be when allocation succeeds, when it fails or immediately. This behavior is determined by the context. :param path: the path at which to set the value :param value: the value to set at the given path """ self.parent.set_value(self.path + path, value) def get_full_path(self) -> Optional[DictPath]: """ Get the full path up to the instance in this context. The root context should return None as there is no path required to reach it. """ parent_path = self.parent.get_full_path() if parent_path is None: return self.path return parent_path + self.path def get_wild_full_path(self) -> Optional[WildDictPath]: """ Get the full path up to the instance in this context, replacing all KeyedList by WildKeyedList. The root context should return None as there is not path required to reach it. """ parent_path = self.parent.get_wild_full_path() if parent_path is None: return self.wild_path return parent_path + self.wild_path
[docs] class ForEach(AllocatorV2): """ Allocator to apply a given set of allocators to each instance in a list of instances. The instances are uniquely identified by an attribute """
[docs] def __init__( self, item: str, in_list: str, identified_by: str, apply: List[AllocatorV2] ) -> None: """ :param item: name of the loop variable (not used, to improve readability and error reporting) :param in_list: the relation from which to get the items :param identified_by: the identifying attribute of each item :param apply: the allocators to apply to each item e.g: .. code-block:: inmanta entity Service extends lsm::ServiceInstance: end entity SubService extends lsm::EmbeddedEntity: string id string to_allocate lsm::attribute_modifier to_allocate__modifier="r" end Service.subservices [0:] -- SubService .. code-block:: python lsm.AllocationSpecV2( "service_allocator_spec", allocation.ContextV2Wrapper( "allocated", allocation.ForEach( item="subservice", in_list="subservices", identified_by="id", apply=[ ValueAllocator( into="to_allocate", ), ], ), ), ) """ self.item = item self.in_list = in_list self.identified_by = identified_by self.apply = apply
[docs] def needs_allocation(self, context: "ContextV2") -> bool: return any( allocator.needs_allocation(ctx) for ctx in context.get_list_children(self.in_list, self.identified_by) for allocator in self.apply )
[docs] def allocate(self, context: "ContextV2") -> None: for ctx in context.get_list_children(self.in_list, self.identified_by): for allocator in self.apply: allocator.allocate(ctx)
class SetSensitiveForEach(ForEach): """A for loop requiring allocation if the candidate is different from the active""" def needs_allocation(self, context: "ContextV2") -> bool: # check children if super().needs_allocation(context): return True try: # check if sets have changed all_id_path = WildKeyedList( self.in_list, [(self.identified_by, WildDictPath.WILDCARD)] ) id_path = WildComposedPath(path=[all_id_path, InDict(self.identified_by)]) active_set = id_path.get_elements(context.get_instance().active_attributes) candidate_set = id_path.get_elements( context.get_instance().candidate_attributes ) return set(active_set) != set(candidate_set) except LookupError: # something isn't there return True
[docs] class ContextV2Wrapper(AllocatorV2): """ This wrapper allows to use allocators for attributes of embedded entities. It performs the allocation for all the provided allocators, and save/allocate all of their allocated values in a single key. This wrapper can be used in pair with the context_v2_unwrapper plugin, which allows to place all those allocated values back where they are supposed to go. """
[docs] def __init__( self, fallback_attribute: str, *allocators: AllocatorV2, track_deletes: bool = False, ) -> None: """ This wrapper allows to use allocators for attributes of embedded entities. It will condense all the changes in a dict: - whose keys are dict_path to the attribute of the embedded entities - whose values are the one to allocate This dict is itself saved at the root of the changed attributes dict, saved in the fallback attributes specified in the constructor. :param fallback_attributes: The attribute in which all the allocated values should be saved :param allocators: The allocators whose values should be saved :param track_deletes: when this is set, when an embedded entity is deleted during an update, it will also be dropped from the fallback_attribute. This also implies that embedded entities which are created by allocation alone (all attributes of type 'r') are not supported. e.g: .. code-block:: python lsm.AllocationSpecV2( "service_allocator_spec", allocation.ContextV2Wrapper( "allocated", allocation.ForEach( item="subservice", in_list="subservices", identified_by="id", apply=[ ValueAllocator( into="to_allocate", ), ], ), ), ) This would generate as update attribute dict something like .. code-block:: json { "allocated": { "subservices[id=1].to_allocate": "a", "subservices[id=2].to_allocate": "b", } } """ self.fallback_attribute = fallback_attribute self.allocators = allocators self.track_deletes = track_deletes
def _get_wrapped_context(self, parent_context: "ContextV2") -> "ContextV2": """ Idempotent method, only creates a new wrapped context if none has been created yet. The wrapped context is a parent context replacement, that this allocator holds and all its children will see as root parent. Once all the allocations have been done on this context, this allocator will save all those allocations in the fallback_attribute dict attribute, in its parent context. The context replacement is a duplicate of the parent one because it acts on the same instance. This also means that all changes applied to the context replacement instance are automatically applied to the parent context instance. This is desired as we want any change one the instance done by any allocator to be visible in any other allocator executing on the same instance. Returns the context to pass to children allocators. """ assert ( type(parent_context) is ContextV2 ), "ContextV2Wrapper can only be used at the root of an allocation tree" parent_reserved_attribute = f"reserved_{self.fallback_attribute}" if hasattr(parent_context, parent_reserved_attribute): context: ContextV2 = getattr(parent_context, parent_reserved_attribute) assert type(context) is ContextV2 return context fresh_context = ContextV2( parent_context.instance, plugin_context=parent_context._plugin_context, ) # pre-populate flush set with old values active_attributes = parent_context.instance.get_attribute_set( AttributeSetName.attributes ) allocated = active_attributes.get(self.fallback_attribute) if allocated: assert isinstance(allocated, dict) # When tracking deletes # Remove all things we had allocated for entities that don't exist anymore def is_path_actual(path: str) -> bool: path_head = to_path(path).get_path_sections()[:-1] try: ComposedPath(path=path_head).get_element(active_attributes) except LookupError: return False return True if self.track_deletes: allocated_filtered = { path: value for path, value in allocated.items() if is_path_actual(path) } else: allocated_filtered = allocated fresh_context.flush_set = deepcopy(allocated_filtered) # Unwrap for attributes_set in AttributeSetName: attributes = parent_context.instance.get_attribute_set(attributes_set) if attributes is None: continue unwrap_attributes( attributes, self.fallback_attribute, track_deletes=self.track_deletes ) # cache setattr(parent_context, parent_reserved_attribute, fresh_context) return fresh_context
[docs] def needs_allocation(self, context: "ContextV2") -> bool: """ Needs allocation if any of the children allocators need allocation. """ wrapped_context = self._get_wrapped_context(context) return any( allocator.needs_allocation(wrapped_context) for allocator in self.allocators )
[docs] def allocate(self, context: "ContextV2") -> None: """ Run allocation for all the children, then aggregate all the allocated values into a single value, in the parent context. """ wrapped_context = self._get_wrapped_context(context) for allocator in self.allocators: allocator.allocate(wrapped_context) # For each allocation in the flush set, if the allocation is only one level deep, # we add it to the parent context. Else it will be added to the allocated dict. key: str value: object for key, value in wrapped_context.flush_set.items(): path = to_path(key) if len(path.get_path_sections()) > 1: # Skipping too deep allocation continue context.set_value(path, value) # Set value for all allocated values context.set_value(to_path(self.fallback_attribute), wrapped_context.flush_set)
def unwrap_attributes( attributes: dict, fallback_attribute: str, track_deletes: bool = False ) -> dict: allocated: Optional[dict] = attributes.get(fallback_attribute) if not allocated: return attributes for path_str, value in allocated.items(): path = to_path(path_str) # when constructing paths, we effectively ignore deletes by re-creating the deleted embedded entities # when not constructing paths, we 'follow' the deletes. try: path.set_element(attributes, value, construct=not track_deletes) except LookupError: pass return attributes