Source code for inmanta_plugins.lsm.allocation

"""
    Inmanta LSM

    :copyright: 2020 Inmanta
    :contact: code@inmanta.com
    :license: Inmanta EULA
"""
import logging
from abc import ABC, abstractmethod
from random import randint
from typing import Any, Callable, Dict, Generic, List, Set, TypeVar
from uuid import UUID

from inmanta.execute.proxy import DynamicProxy
from inmanta.plugins import Context, PluginException
from inmanta.protocol.common import Result
from inmanta.util import dict_path
from inmanta_lsm.model import EditOperation, PatchCallEdit
from inmanta_plugins.lsm import global_cache
from inmanta_plugins.lsm.allocation_v2.entity import ServiceInstance
from inmanta_plugins.lsm.allocation_v2.framework import (
    AllocatorV2,
    ContextV2,
    ContextV2Wrapper,
)

T = TypeVar("T")
A = TypeVar("A", "Allocator", AllocatorV2)
LOGGER = logging.getLogger(__name__)


def send_update_allocated_attributes(
    env_id: UUID,
    service_entity: str,
    service_id: UUID,
    current_version: int,
    attributes: Dict[str, Any],
    strict_modifier_enforcement: bool,
) -> Result:
    """
    Send the allocated attributes to the server. When `strict_modifier_enforcement` is set to true,
    the V2 endpoint to update allocated attributes is used, otherwise the V1 endpoint is used.
    """
    client = global_cache.get_client()
    if strict_modifier_enforcement:
        # Use new API endpoint
        return client.lsm_services_update_attributes_v2(
            tid=env_id,
            service_entity=service_entity,
            service_id=service_id,
            current_version=current_version,
            patch_id=f"Update for: {service_id}",
            edit=[
                PatchCallEdit(
                    edit_id=f"Set attribute: {target}",
                    operation=EditOperation.replace,
                    target=target,
                    value=attr_val,
                )
                for target, attr_val in attributes.items()
            ],
        )
    else:
        return client.lsm_services_update_attributes(
            tid=env_id,
            service_entity=service_entity,
            service_id=service_id,
            current_version=current_version,
            attributes=attributes,
        )


class AllocationContext:
    """
    The shared context for all allocators.

    Specific to a binding
    """

    def __init__(
        self,
        context: Context,
        service_entity_name: str,
        strict_modifier_enforcement: bool,
    ) -> None:
        self.plugin_context = context
        self.env = context.get_environment_id()
        self.service_entity_name = service_entity_name
        self.strict_modifier_enforcement = strict_modifier_enforcement

    def get_all_values_for(self, attribute: str) -> Set:
        """
        Retrieve all values assigned to a specific attribute in any attribute set for any none terminal instance for this
        service.
        """
        return {
            attributes[attribute]
            for instance in global_cache.get_all_instances(
                self.env, self.service_entity_name
            )
            if not instance["deleted"]
            for attributes in [
                instance["active_attributes"],
                instance["candidate_attributes"],
                instance["rollback_attributes"],
            ]
            if attributes is not None and attribute in attributes
        }


class AllocationStrategy(Generic[T]):
    """
    A strategy to select a new value, given a set of all values that are already taken.
    """

    def select(self, ctx: AllocationContext, taken: Set[T]) -> T:
        raise NotImplementedError()


class AnyUniqueInt(AllocationStrategy[int]):
    """
    Allocation strategy that selects an int in a range
    """

    def __init__(self, lower: int = 0, upper: int = 2**31) -> None:
        """
        :param lower: lower bound, inclusive
        :param upper: upper bound, inclusive
        """
        self.lower = lower
        self.upper = upper

    def select(self, ctx: AllocationContext, taken: Set[int]) -> int:
        # select random is less then 80% full
        if len(taken) < 0.8 * (self.upper + 1 - self.lower):
            value = randint(self.lower, self.upper)
            while value in taken:
                value = randint(self.lower, self.upper)
            return value
        else:
            # find first free
            for candidate in range(self.lower, self.upper + 1):
                if candidate not in taken:
                    return candidate
        raise Exception("No free value available")


class DenseInt(AllocationStrategy[int]):
    """
    Allocation strategy that selects the lowest free int in a range
    """

    def __init__(self, lower: int = 0, upper: int = 2**31) -> None:
        """
        :param lower: lower bound, inclusive
        :param upper: upper bound, inclusive
        """
        self.lower = lower
        self.upper = upper

    def select(self, ctx: AllocationContext, taken: Set[int]) -> int:
        # find first free
        for candidate in range(self.lower, self.upper + 1):
            if candidate not in taken:
                return candidate
        raise Exception("No free value available")


class Allocator:
    """
    An object capable of allocating one or more attributes
    """

    def pre_allocate(self) -> None:
        """
        Called before allocation, can be used e.g., to establish a connection to the external inventory
        """
        pass

    def post_allocate(self) -> None:
        """
        Called after allocation, can be used e.g., to teardown a connection to the external inventory
        """
        pass

    def get_attributes(self) -> List[str]:
        """
        Returns the names of all attributes allocated by this allocator
        """
        raise NotImplementedError()

    def allocate_for(
        self, ctx: AllocationContext, instance: Dict[str, Any]
    ) -> Dict[str, Any]:
        """
        Perform allocation and return values

        :param instance: the instance, as returned by lsm::all
        :return: The allocated attributes
        """
        raise NotImplementedError()

    def needs_allocation(
        self, ctx: AllocationContext, instance: Dict[str, Any]
    ) -> bool:
        """
        Check if allocation is required, based on what is in LSM database.
        This will be called after the pre_allocate() method.  If False is returned,
        allocate_for won't be called.

        :param instance: the instance, as returned by lsm::all
        """
        attributes = instance["attributes"]
        return any(
            (
                (attr not in attributes) or (attributes[attr] is None)
                for attr in self.get_attributes()
            )
        )


class SingleAllocator(Allocator, Generic[T]):
    """
    An Allocator that allocates a single attribute at a time
    """

    def __init__(self, attribute: str) -> None:
        """
        Name of the attribute to allocate
        """
        self.attribute = attribute

    def get_attribute(self) -> str:
        """
        Get the name of the attribute allocated by this allocator.
        """
        return self.attribute

    def get_attributes(self) -> List[str]:
        return [self.get_attribute()]

    def do_allocate_for(self, ctx: AllocationContext, instance: Dict[str, Any]) -> T:
        """
        Perform allocation, like :py:meth:`allocate_for` but for a single attribute
        """
        raise NotImplementedError()

    def allocate_for(
        self, ctx: AllocationContext, instance: Dict[str, Any]
    ) -> Dict[str, Any]:
        return {self.get_attribute(): self.do_allocate_for(ctx, instance)}


class LSM_Allocator(SingleAllocator[T]):
    """
    The Allocator for attributes that are internal to LSM,
    i.e. where no external inventory is involved.

    It uses an underlying AllocationStrategy to perform actual selection of a new value
    """

    def __init__(self, attribute: str, strategy: AllocationStrategy[T]) -> None:
        super().__init__(attribute)
        self.strategy = strategy

    def do_allocate_for(self, ctx: AllocationContext, instance: Dict[str, Any]) -> T:
        taken = ctx.get_all_values_for(self.get_attribute())
        value = self.strategy.select(ctx, taken)
        return value


class ExternalAllocator(Allocator):
    def has_allocation_in_inventory(self, serviceid: UUID) -> bool:
        """
        Check whether an allocation was already done for the given service instance.

        :param serviceid: The uuid of the service instance.
        """
        raise NotImplementedError()

    def de_allocate(self, serviceid: UUID) -> None:
        """
        De-allocate the allocation of the given service instance.

        :param serviceid: The uuid of the service instance.
        """
        raise NotImplementedError()


class ExternalServiceIdAllocator(ExternalAllocator, SingleAllocator[T]):
    """
    Allocate a single attribute from an external inventory, based on the value of the service instance ID.
    """

    def __init__(self, attribute: str) -> None:
        super().__init__(attribute)

    def do_allocate_for(self, ctx: AllocationContext, instance: Dict[str, Any]) -> T:
        return self.allocate_for_id(instance["id"])

    def allocate_for_id(self, serviceid: UUID) -> T:
        """
        Execute allocation in an external inventory, based on the internal service id
        :param serviceid: UUID of the service instance
        :return: The allocated attribute value
        """
        raise NotImplementedError()


class ExternalAttributeAllocator(ExternalAllocator, SingleAllocator[T]):
    """
    Allocate a single attribute from an external inventory, based on the value of a specific attribute.
    """

    def __init__(self, attribute: str, id_attribute: str) -> None:
        """
        :param attribute: The attribute that the allocation should be done for
        :param id_attribute: The attribute that should be used as an id for the allocation
        """
        super().__init__(attribute)
        self.id_attribute = id_attribute

    def do_allocate_for(self, ctx: AllocationContext, instance: Dict[str, Any]) -> T:
        id_attribute_value = instance["attributes"][self.id_attribute]
        return self.allocate_for_attribute(id_attribute_value)

    def allocate_for_attribute(self, id_attribute_value: Any) -> T:
        """
        Execute allocation in an external inventory, based on an attribute of a service instance
        :param id_attribute_value: Value of the attribute that will be used as id for
        the service instance in the external inventory
        :return: The allocated value
        """
        raise NotImplementedError()


class AllocatorWrapper(Allocator):
    """
    This allocator takes other allocator as argument.  Its role is to ensure
    that if any error occurred during any of the allocations, all the ones that
    already completed will see their changes applied to the service they have
    been called for.  This allows not to lose allocated values (and potentially
    keys) when doing multiple ones on a single resource.

    This only works for errors that occurred during an allocation wrapped by this
    allocation collector.
    """

    def __init__(self, *allocators: Allocator) -> None:
        """
        :param *allocators: The list of allocators to provide a failsafe for
        """
        super().__init__()
        self.allocators: List[Allocator] = list(allocators)

    def pre_allocate(self) -> None:
        for allocator in self.allocators:
            allocator.pre_allocate()

    def post_allocate(self) -> None:
        for allocator in self.allocators:
            allocator.post_allocate()

    def needs_allocation(
        self, ctx: AllocationContext, instance: Dict[str, Any]
    ) -> bool:
        for allocator in self.allocators:
            if allocator.needs_allocation(ctx, instance):
                return True

        return False

    def get_attributes(self) -> List[str]:
        return [
            attribute
            for allocator in self.allocators
            for attribute in allocator.get_attributes()
        ]

    def allocate_for(
        self, ctx: AllocationContext, instance: Dict[str, Any]
    ) -> Dict[str, Any]:
        allocated_attributes = dict()
        attributes: Dict[str, Any] = instance["attributes"]
        for allocator in self.allocators:
            if not allocator.needs_allocation(ctx, instance):
                continue

            try:
                allocation = allocator.allocate_for(ctx, instance)
                allocated_attributes.update(allocation)
                attributes.update(allocation)
            except Exception as e:
                # We force the update of the service, so that when it is deleted, the allocations
                # that did succeed can be de-allocated
                send_update_allocated_attributes(
                    env_id=UUID(ctx.env),
                    service_entity=ctx.service_entity_name,
                    service_id=instance["id"],
                    current_version=instance["version"],
                    attributes=allocated_attributes,
                    strict_modifier_enforcement=ctx.strict_modifier_enforcement,
                )
                raise e

        return allocated_attributes


class AllocationSpecCollector:
    """
    Internal collector for all :py:class:`AllocationSpec`s that have been instantiated.

    Entry point for lsm::all to find :py:class:`AllocationSpec`s
    """

    def __init__(self) -> None:
        # Note: do not reset this in the inmanta_reset_state function
        # As the allocation specs will not be re-registered
        self.specs: Dict[str, "AllocationAggregator"] = {}
        # Allow specs with the same name
        self.allow_override = False

    def add_spec(self, spec: "AllocationAggregator") -> None:
        """
        Register a spec to this :py:class:`AllocationSpecCollector`

        Called by the constructor of :py:class:`AllocationSpec`, you should never explicitly use it.

        :raises Exception: a spec with this name was already registered
        """
        if spec.name in self.specs and not self.allow_override:
            raise Exception(f"Duplicate AllocationSpec with name {spec.name}")
        self.specs[spec.name] = spec

    def get_spec(self, name: str) -> "AllocationAggregator":
        """
        Find the spec with the given name

        :raises Exception: No spec with this name was registered
        """
        if name not in self.specs:
            raise Exception(f"No AllocationSpec with name {name}")
        return self.specs[name]


global_allocation_collector = AllocationSpecCollector()
"""
    Singleton instance of :py:class:`AllocationSpecCollector`,
    used internally to collect all :py:class:`AllocationSpec`s
"""


class AllocationAggregator(Generic[A], ABC):
    """
    Abstract superclass for AllocationSpec and AllocationSpecV2
    """

    def __init__(self, name: str, *allocators: A) -> None:
        """
        Create the allocation spec and register it in the global collector

        :param name: The name of the allocator, must be unique
        :param allocators: The allocators contained in this aggregator
        """
        self.name = name
        self._allocators: List[A] = list(allocators)
        global_allocation_collector.add_spec(self)

    @property
    def allocators(self) -> List[A]:
        return self._allocators

    def pre_allocate(
        self,
        save_exception_cb: Callable[[PluginException], None],
        save_successful_allocator: Callable[[A], None],
    ) -> None:
        """
        Run this step before doing any allocation, it will call pre_allocate on each object
        in self.allocators().

        :param save_exception_cb: A callback function that must be called for each encountered
                                  PluginException we want to notify the user about

        Returns the list of object in self.allocators() that could be passed to _pre_allocate
        without raising any exception.
        """
        for allocator in self.allocators:
            LOGGER.debug("Calling pre_allocate() before allocating attributes")
            try:
                allocator.pre_allocate()
                save_successful_allocator(allocator)
            except Exception as e:
                e = wrap_plugin_exception(e)
                save_exception_cb(e)
                raise e

    @abstractmethod
    def _do_allocate(
        self,
        context: Context,
        binding: DynamicProxy,
        instance: Dict[str, Any],
    ) -> Dict[str, Any]:
        """
        Performs the allocation, this has to be implemented by the subclasses

        :param Context: The plugin context of the plugin calling this method
        :param binding: The :inmanta:entity:`lsm::ServiceEntityBinding`
        :param instance: The instance, will be updated by this method. It is assumed to be the live cache instance, \
            where the `attributes` is the same instance as `candidate_attributes`

        :returns: The dict that should be sent for the attributes updates
        """
        pass

    def do_allocate(
        self,
        context: Context,
        binding: DynamicProxy,
        instance: Dict[str, Any],
    ) -> None:
        """
        This method:
            1. allocates all attributes for the instance
            2. update the internal cache instances by modifying instance passed in
            3. writes the allocation to LSM.

        :param Context: The plugin context of the plugin calling this method
        :param binding: The :inmanta:entity:`lsm::ServiceEntityBinding`
        :param allocationspec: The allocation spec to use to perform allocation
        :param instance: The instance, will be updated by this method. It is assumed to be the live cache instance, \
            where the `attributes` is the same instance as `candidate_attributes`
        """
        if binding.strict_modifier_enforcement and any(
            isinstance(alloc, ContextV2Wrapper) for alloc in self._allocators
        ):
            LOGGER.warning(
                "AllocationSpec %s uses the deprecated `ContextV2Wrapper`. Read-only attributes on embedded entities can "
                "be updated using any AllocatorV2 now, without wrapping it into a ContextV2Wrapper or using the "
                "`lsm::context_v2_unwrapper()` plugin. See documentation for more information.",
                self.name,
            )

        service_entity_name: str = binding.service_entity_name
        env = context.get_environment_id()
        result: Result = send_update_allocated_attributes(
            env_id=UUID(env),
            service_entity=service_entity_name,
            service_id=instance["id"],
            current_version=instance["version"],
            attributes=self._do_allocate(context, binding, instance),
            strict_modifier_enforcement=binding.strict_modifier_enforcement,
        )
        if result.code != 200:
            raise PluginException(
                f"Updating attributes of instance {instance['id']} "
                f"during allocation failed with message: {result.result['message']}"
            )

    def post_allocate(
        self, allocators: List[A], save_exception_cb: Callable[[PluginException], None]
    ) -> None:
        """
        Run this step after having done any allocation

        :param allocators: The object which should be passed one by one to the _post_allocate
                           method.  All the ones that didn't result in an exception are returned
                           in a list.
        :param save_exception_cb: A callback function that must be called for each encountered
                                  PluginException we want to notify the user about
        """
        for allocator in allocators:
            LOGGER.debug("Calling post_allocate() before allocating attributes")
            try:
                allocator.post_allocate()
            except Exception as e:
                e = wrap_plugin_exception(e)
                save_exception_cb(e)


class AllocationSpec(AllocationAggregator[Allocator]):
    """
    A named collection of allocators, that can be used by a ServiceBinding to perform allocation
    """

    def __init__(self, name: str, *allocators: Allocator):
        super().__init__(name, *allocators)

    def _do_allocate(
        self,
        context: Context,
        binding: DynamicProxy,
        instance: Dict[str, Any],
    ) -> Dict[str, Any]:
        to_write: Dict[str, Any] = dict()
        service_entity_name: str = binding.service_entity_name
        ctx = AllocationContext(
            context, service_entity_name, binding.strict_modifier_enforcement
        )

        attributes: Dict[str, Any] = instance["attributes"]

        # Dealing with allocation v1
        for allocator in self.allocators:
            needs_allocation = allocator.needs_allocation(ctx, instance)
            if needs_allocation:
                allocation = allocator.allocate_for(ctx, instance)
                attributes.update(allocation)
                to_write.update(allocation)

        return to_write


[docs] class AllocationSpecV2(AllocationAggregator[AllocatorV2]):
[docs] def __init__(self, name: str, *allocators: AllocatorV2) -> None: super().__init__(name, *allocators)
def _do_allocate( self, context: Context, binding: DynamicProxy, instance: Dict[str, Any], ) -> Dict[str, Any]: service_entity_name: str = binding.service_entity_name service_instance = ServiceInstance( id=UUID(instance["id"]), environment=UUID(context.get_environment_id()), service_entity=service_entity_name, version=instance["version"], state=instance["state"], candidate_attributes=instance.get("candidate_attributes"), active_attributes=instance.get("active_attributes"), rollback_attributes=instance.get("rollback_attributes"), attributes=instance.get("attributes"), ) ctx = ContextV2(instance=service_instance, plugin_context=context) # Dealing with allocation v2 for allocator_v2 in self.allocators: needs_allocation = allocator_v2.needs_allocation(ctx) if needs_allocation: allocator_v2.allocate(ctx) # Updating nested attributes is only supported when strict_modifier_enforcement is enabled if not binding.strict_modifier_enforcement: for path_str in ctx.flush_set.keys(): path = dict_path.to_path(path_str) path_depth = len(path.get_path_sections()) if path_depth > 1: raise PluginException( f"Updating attributes of instance {instance['id']} won't work, " "the following update dictionary contains dict paths with length" f"greater than one at its root: {ctx.flush_set}. " f"len(dict_path.to_path('{path_str}').get_path_sections()) == {path_depth}. " "Use ContextV2Wrapper to work around this." ) return ctx.flush_set
def do_allocate( context: Context, binding: DynamicProxy, allocationspec: AllocationAggregator[A], instance: Dict[str, Any], ) -> None: """ Internal Main entry point for lsm::all to perform allocation for an instance given a specific allocationspec This method: 1. allocates all attributes for the instance 2. update the internal cache instances by modifying instance passed in 3. writes the allocation to LSM. :param Context: The plugin context of the plugin calling this method :param binding: The :inmanta:entity:`lsm::ServiceEntityBinding` :param allocationspec: The allocation spec to use to perform allocation :param instance: The instance, will be updated by this method. It is assumed to be the live cache instance, \ where the `attributes` is the same instance as `candidate_attributes` """ allocationspec.do_allocate(context, binding, instance) def wrap_plugin_exception(e: Exception) -> PluginException: if not isinstance(e, PluginException): e = PluginException( f"Exception during allocation: {type(e).__name__}: {str(e)}" ).with_traceback(e.__traceback__) return e def do_allocate_instances( context: Context, binding: DynamicProxy, allocationspec: AllocationAggregator[A], instances: List[Dict[str, Any]], ) -> None: """ :param Context: The plugin context of the plugin calling this method :param binding: The :inmanta:entity:`lsm::ServiceEntityBinding` :param allocationspec: The allocation spec to use to perform allocation :param instances: The instances, that will be updated by this method. They are assumed to be the live cache instances, \ where the `attributes` is the same as `candidate_attributes` """ exception_list: List[PluginException] = [] successful_pre_allocations: List[A] = [] def append_exception(e: PluginException) -> None: exception_list.append(e) def append_allocator(e: A) -> None: successful_pre_allocations.append(e) try: allocationspec.pre_allocate(append_exception, append_allocator) for instance in instances: try: do_allocate(context, binding, allocationspec, instance) except Exception as e: e = wrap_plugin_exception(e) exception_list.append(e) finally: try: allocationspec.post_allocate(successful_pre_allocations, append_exception) finally: if len(exception_list) > 1: raise PluginException( message="\n".join([exc.message for exc in exception_list]) ).with_traceback(exception_list[0].__traceback__) elif exception_list: raise exception_list[0] def inmanta_reset_state() -> None: """ Don't reset the allocator cache when testing, because when we don't reload, we won't re-register Instead, allow overrides. This mean that 'shadowing' (replacing an allocator spec with on with the same name) will no longer raise an exception in the test framework But when using a remote orchestrator to test on, it will still raise an exception """ global_allocation_collector.allow_override = True