"""
Inmanta LSM
:copyright: 2020 Inmanta
:contact: code@inmanta.com
:license: Inmanta EULA
"""
import logging
from abc import ABC, abstractmethod
from random import randint
from typing import Any, Callable, Dict, Generic, List, Set, TypeVar
from uuid import UUID
from inmanta.execute.proxy import DynamicProxy
from inmanta.plugins import Context, PluginException
from inmanta.protocol.common import Result
from inmanta.util import dict_path
from inmanta_lsm.model import EditOperation, PatchCallEdit
from inmanta_plugins.lsm import global_cache
from inmanta_plugins.lsm.allocation_v2.entity import ServiceInstance
from inmanta_plugins.lsm.allocation_v2.framework import (
AllocatorV2,
ContextV2,
ContextV2Wrapper,
)
T = TypeVar("T")
A = TypeVar("A", "Allocator", AllocatorV2)
LOGGER = logging.getLogger(__name__)
def send_update_allocated_attributes(
env_id: UUID,
service_entity: str,
service_id: UUID,
current_version: int,
attributes: Dict[str, Any],
strict_modifier_enforcement: bool,
) -> Result:
"""
Send the allocated attributes to the server. When `strict_modifier_enforcement` is set to true,
the V2 endpoint to update allocated attributes is used, otherwise the V1 endpoint is used.
"""
client = global_cache.get_client()
if strict_modifier_enforcement:
# Use new API endpoint
return client.lsm_services_update_attributes_v2(
tid=env_id,
service_entity=service_entity,
service_id=service_id,
current_version=current_version,
patch_id=f"Update for: {service_id}",
edit=[
PatchCallEdit(
edit_id=f"Set attribute: {target}",
operation=EditOperation.replace,
target=target,
value=attr_val,
)
for target, attr_val in attributes.items()
],
)
else:
return client.lsm_services_update_attributes(
tid=env_id,
service_entity=service_entity,
service_id=service_id,
current_version=current_version,
attributes=attributes,
)
class AllocationContext:
"""
The shared context for all allocators.
Specific to a binding
"""
def __init__(
self,
context: Context,
service_entity_name: str,
strict_modifier_enforcement: bool,
) -> None:
self.plugin_context = context
self.env = context.get_environment_id()
self.service_entity_name = service_entity_name
self.strict_modifier_enforcement = strict_modifier_enforcement
def get_all_values_for(self, attribute: str) -> Set:
"""
Retrieve all values assigned to a specific attribute in any attribute set for any none terminal instance for this
service.
"""
return {
attributes[attribute]
for instance in global_cache.get_all_instances(
self.env, self.service_entity_name
)
if not instance["deleted"]
for attributes in [
instance["active_attributes"],
instance["candidate_attributes"],
instance["rollback_attributes"],
]
if attributes is not None and attribute in attributes
}
class AllocationStrategy(Generic[T]):
"""
A strategy to select a new value, given a set of all values that are already taken.
"""
def select(self, ctx: AllocationContext, taken: Set[T]) -> T:
raise NotImplementedError()
class AnyUniqueInt(AllocationStrategy[int]):
"""
Allocation strategy that selects an int in a range
"""
def __init__(self, lower: int = 0, upper: int = 2**31) -> None:
"""
:param lower: lower bound, inclusive
:param upper: upper bound, inclusive
"""
self.lower = lower
self.upper = upper
def select(self, ctx: AllocationContext, taken: Set[int]) -> int:
# select random is less then 80% full
if len(taken) < 0.8 * (self.upper + 1 - self.lower):
value = randint(self.lower, self.upper)
while value in taken:
value = randint(self.lower, self.upper)
return value
else:
# find first free
for candidate in range(self.lower, self.upper + 1):
if candidate not in taken:
return candidate
raise Exception("No free value available")
class DenseInt(AllocationStrategy[int]):
"""
Allocation strategy that selects the lowest free int in a range
"""
def __init__(self, lower: int = 0, upper: int = 2**31) -> None:
"""
:param lower: lower bound, inclusive
:param upper: upper bound, inclusive
"""
self.lower = lower
self.upper = upper
def select(self, ctx: AllocationContext, taken: Set[int]) -> int:
# find first free
for candidate in range(self.lower, self.upper + 1):
if candidate not in taken:
return candidate
raise Exception("No free value available")
class Allocator:
"""
An object capable of allocating one or more attributes
"""
def pre_allocate(self) -> None:
"""
Called before allocation, can be used e.g., to establish a connection to the external inventory
"""
pass
def post_allocate(self) -> None:
"""
Called after allocation, can be used e.g., to teardown a connection to the external inventory
"""
pass
def get_attributes(self) -> List[str]:
"""
Returns the names of all attributes allocated by this allocator
"""
raise NotImplementedError()
def allocate_for(
self, ctx: AllocationContext, instance: Dict[str, Any]
) -> Dict[str, Any]:
"""
Perform allocation and return values
:param instance: the instance, as returned by lsm::all
:return: The allocated attributes
"""
raise NotImplementedError()
def needs_allocation(
self, ctx: AllocationContext, instance: Dict[str, Any]
) -> bool:
"""
Check if allocation is required, based on what is in LSM database.
This will be called after the pre_allocate() method. If False is returned,
allocate_for won't be called.
:param instance: the instance, as returned by lsm::all
"""
attributes = instance["attributes"]
return any(
(
(attr not in attributes) or (attributes[attr] is None)
for attr in self.get_attributes()
)
)
class SingleAllocator(Allocator, Generic[T]):
"""
An Allocator that allocates a single attribute at a time
"""
def __init__(self, attribute: str) -> None:
"""
Name of the attribute to allocate
"""
self.attribute = attribute
def get_attribute(self) -> str:
"""
Get the name of the attribute allocated by this allocator.
"""
return self.attribute
def get_attributes(self) -> List[str]:
return [self.get_attribute()]
def do_allocate_for(self, ctx: AllocationContext, instance: Dict[str, Any]) -> T:
"""
Perform allocation, like :py:meth:`allocate_for` but for a single attribute
"""
raise NotImplementedError()
def allocate_for(
self, ctx: AllocationContext, instance: Dict[str, Any]
) -> Dict[str, Any]:
return {self.get_attribute(): self.do_allocate_for(ctx, instance)}
class LSM_Allocator(SingleAllocator[T]):
"""
The Allocator for attributes that are internal to LSM,
i.e. where no external inventory is involved.
It uses an underlying AllocationStrategy to perform actual selection of a new value
"""
def __init__(self, attribute: str, strategy: AllocationStrategy[T]) -> None:
super().__init__(attribute)
self.strategy = strategy
def do_allocate_for(self, ctx: AllocationContext, instance: Dict[str, Any]) -> T:
taken = ctx.get_all_values_for(self.get_attribute())
value = self.strategy.select(ctx, taken)
return value
class ExternalAllocator(Allocator):
def has_allocation_in_inventory(self, serviceid: UUID) -> bool:
"""
Check whether an allocation was already done for the given service instance.
:param serviceid: The uuid of the service instance.
"""
raise NotImplementedError()
def de_allocate(self, serviceid: UUID) -> None:
"""
De-allocate the allocation of the given service instance.
:param serviceid: The uuid of the service instance.
"""
raise NotImplementedError()
class ExternalServiceIdAllocator(ExternalAllocator, SingleAllocator[T]):
"""
Allocate a single attribute from an external inventory, based on the value of the service instance ID.
"""
def __init__(self, attribute: str) -> None:
super().__init__(attribute)
def do_allocate_for(self, ctx: AllocationContext, instance: Dict[str, Any]) -> T:
return self.allocate_for_id(instance["id"])
def allocate_for_id(self, serviceid: UUID) -> T:
"""
Execute allocation in an external inventory, based on the internal service id
:param serviceid: UUID of the service instance
:return: The allocated attribute value
"""
raise NotImplementedError()
class ExternalAttributeAllocator(ExternalAllocator, SingleAllocator[T]):
"""
Allocate a single attribute from an external inventory, based on the value of a specific attribute.
"""
def __init__(self, attribute: str, id_attribute: str) -> None:
"""
:param attribute: The attribute that the allocation should be done for
:param id_attribute: The attribute that should be used as an id for the allocation
"""
super().__init__(attribute)
self.id_attribute = id_attribute
def do_allocate_for(self, ctx: AllocationContext, instance: Dict[str, Any]) -> T:
id_attribute_value = instance["attributes"][self.id_attribute]
return self.allocate_for_attribute(id_attribute_value)
def allocate_for_attribute(self, id_attribute_value: Any) -> T:
"""
Execute allocation in an external inventory, based on an attribute of a service instance
:param id_attribute_value: Value of the attribute that will be used as id for
the service instance in the external inventory
:return: The allocated value
"""
raise NotImplementedError()
class AllocatorWrapper(Allocator):
"""
This allocator takes other allocator as argument. Its role is to ensure
that if any error occurred during any of the allocations, all the ones that
already completed will see their changes applied to the service they have
been called for. This allows not to lose allocated values (and potentially
keys) when doing multiple ones on a single resource.
This only works for errors that occurred during an allocation wrapped by this
allocation collector.
"""
def __init__(self, *allocators: Allocator) -> None:
"""
:param *allocators: The list of allocators to provide a failsafe for
"""
super().__init__()
self.allocators: List[Allocator] = list(allocators)
def pre_allocate(self) -> None:
for allocator in self.allocators:
allocator.pre_allocate()
def post_allocate(self) -> None:
for allocator in self.allocators:
allocator.post_allocate()
def needs_allocation(
self, ctx: AllocationContext, instance: Dict[str, Any]
) -> bool:
for allocator in self.allocators:
if allocator.needs_allocation(ctx, instance):
return True
return False
def get_attributes(self) -> List[str]:
return [
attribute
for allocator in self.allocators
for attribute in allocator.get_attributes()
]
def allocate_for(
self, ctx: AllocationContext, instance: Dict[str, Any]
) -> Dict[str, Any]:
allocated_attributes = dict()
attributes: Dict[str, Any] = instance["attributes"]
for allocator in self.allocators:
if not allocator.needs_allocation(ctx, instance):
continue
try:
allocation = allocator.allocate_for(ctx, instance)
allocated_attributes.update(allocation)
attributes.update(allocation)
except Exception as e:
# We force the update of the service, so that when it is deleted, the allocations
# that did succeed can be de-allocated
send_update_allocated_attributes(
env_id=UUID(ctx.env),
service_entity=ctx.service_entity_name,
service_id=instance["id"],
current_version=instance["version"],
attributes=allocated_attributes,
strict_modifier_enforcement=ctx.strict_modifier_enforcement,
)
raise e
return allocated_attributes
class AllocationSpecCollector:
"""
Internal collector for all :py:class:`AllocationSpec`s that have been instantiated.
Entry point for lsm::all to find :py:class:`AllocationSpec`s
"""
def __init__(self) -> None:
# Note: do not reset this in the inmanta_reset_state function
# As the allocation specs will not be re-registered
self.specs: Dict[str, "AllocationAggregator"] = {}
def add_spec(self, spec: "AllocationAggregator") -> None:
"""
Register a spec to this :py:class:`AllocationSpecCollector`
Called by the constructor of :py:class:`AllocationSpec`, you should never explicitly use it.
"""
self.specs[spec.name] = spec
def get_spec(self, name: str) -> "AllocationAggregator":
"""
Find the spec with the given name
:raises Exception: No spec with this name was registered
"""
if name not in self.specs:
raise Exception(f"No AllocationSpec with name {name}")
return self.specs[name]
global_allocation_collector = AllocationSpecCollector()
"""
Singleton instance of :py:class:`AllocationSpecCollector`,
used internally to collect all :py:class:`AllocationSpec`s
"""
class AllocationAggregator(Generic[A], ABC):
"""
Abstract superclass for AllocationSpec and AllocationSpecV2
"""
def __init__(self, name: str, *allocators: A) -> None:
"""
Create the allocation spec and register it in the global collector
:param name: The name of the allocator, must be unique
:param allocators: The allocators contained in this aggregator
"""
self.name = name
self._allocators: List[A] = list(allocators)
global_allocation_collector.add_spec(self)
@property
def allocators(self) -> List[A]:
return self._allocators
def pre_allocate(
self,
save_exception_cb: Callable[[PluginException], None],
save_successful_allocator: Callable[[A], None],
) -> None:
"""
Run this step before doing any allocation, it will call pre_allocate on each object
in self.allocators().
:param save_exception_cb: A callback function that must be called for each encountered
PluginException we want to notify the user about
Returns the list of object in self.allocators() that could be passed to _pre_allocate
without raising any exception.
"""
for allocator in self.allocators:
LOGGER.debug("Calling pre_allocate() before allocating attributes")
try:
allocator.pre_allocate()
save_successful_allocator(allocator)
except Exception as e:
e = wrap_plugin_exception(e)
save_exception_cb(e)
raise e
@abstractmethod
def _do_allocate(
self,
context: Context,
binding: DynamicProxy,
instance: Dict[str, Any],
) -> Dict[str, Any]:
"""
Performs the allocation, this has to be implemented by the subclasses
:param Context: The plugin context of the plugin calling this method
:param binding: The :inmanta:entity:`lsm::ServiceEntityBinding`
:param instance: The instance, will be updated by this method. It is assumed to be the live cache instance, \
where the `attributes` is the same instance as `candidate_attributes`
:returns: The dict that should be sent for the attributes updates
"""
pass
def do_allocate(
self,
context: Context,
binding: DynamicProxy,
instance: Dict[str, Any],
) -> None:
"""
This method:
1. allocates all attributes for the instance
2. update the internal cache instances by modifying instance passed in
3. writes the allocation to LSM.
:param Context: The plugin context of the plugin calling this method
:param binding: The :inmanta:entity:`lsm::ServiceEntityBinding`
:param allocationspec: The allocation spec to use to perform allocation
:param instance: The instance, will be updated by this method. It is assumed to be the live cache instance, \
where the `attributes` is the same instance as `candidate_attributes`
"""
if binding.strict_modifier_enforcement and any(
isinstance(alloc, ContextV2Wrapper) for alloc in self._allocators
):
LOGGER.warning(
"AllocationSpec %s uses the deprecated `ContextV2Wrapper`. Read-only attributes on embedded entities can "
"be updated using any AllocatorV2 now, without wrapping it into a ContextV2Wrapper or using the "
"`lsm::context_v2_unwrapper()` plugin. See documentation for more information.",
self.name,
)
service_entity_name: str = binding.service.service_entity_name
env = context.get_environment_id()
result: Result = send_update_allocated_attributes(
env_id=UUID(env),
service_entity=service_entity_name,
service_id=instance["id"],
current_version=instance["version"],
attributes=self._do_allocate(context, binding, instance),
strict_modifier_enforcement=binding.strict_modifier_enforcement,
)
if result.code != 200:
raise PluginException(
f"Updating attributes of instance {instance['id']} "
f"during allocation failed with message: {result.result['message']}"
)
def post_allocate(
self, allocators: List[A], save_exception_cb: Callable[[PluginException], None]
) -> None:
"""
Run this step after having done any allocation
:param allocators: The object which should be passed one by one to the _post_allocate
method. All the ones that didn't result in an exception are returned
in a list.
:param save_exception_cb: A callback function that must be called for each encountered
PluginException we want to notify the user about
"""
for allocator in allocators:
LOGGER.debug("Calling post_allocate() before allocating attributes")
try:
allocator.post_allocate()
except Exception as e:
e = wrap_plugin_exception(e)
save_exception_cb(e)
class AllocationSpec(AllocationAggregator[Allocator]):
"""
A named collection of allocators, that can be used by a ServiceBinding to perform allocation
"""
def __init__(self, name: str, *allocators: Allocator):
super().__init__(name, *allocators)
def _do_allocate(
self,
context: Context,
binding: DynamicProxy,
instance: Dict[str, Any],
) -> Dict[str, Any]:
to_write: Dict[str, Any] = dict()
service_entity_name: str = binding.service.service_entity_name
ctx = AllocationContext(
context, service_entity_name, binding.strict_modifier_enforcement
)
attributes: Dict[str, Any] = instance["attributes"]
# Dealing with allocation v1
for allocator in self.allocators:
needs_allocation = allocator.needs_allocation(ctx, instance)
if needs_allocation:
allocation = allocator.allocate_for(ctx, instance)
attributes.update(allocation)
to_write.update(allocation)
return to_write
[docs]
class AllocationSpecV2(AllocationAggregator[AllocatorV2]):
[docs]
def __init__(self, name: str, *allocators: AllocatorV2) -> None:
super().__init__(name, *allocators)
def _do_allocate(
self,
context: Context,
binding: DynamicProxy,
instance: Dict[str, Any],
) -> Dict[str, Any]:
service_entity_name: str = binding.service.service_entity_name
service_instance = ServiceInstance(
id=UUID(instance["id"]),
environment=UUID(context.get_environment_id()),
service_entity=service_entity_name,
version=instance["version"],
state=instance["state"],
candidate_attributes=instance.get("candidate_attributes"),
active_attributes=instance.get("active_attributes"),
rollback_attributes=instance.get("rollback_attributes"),
attributes=instance.get("attributes"),
)
ctx = ContextV2(instance=service_instance, plugin_context=context)
# Dealing with allocation v2
for allocator_v2 in self.allocators:
needs_allocation = allocator_v2.needs_allocation(ctx)
if needs_allocation:
allocator_v2.allocate(ctx)
# Updating nested attributes is only supported when strict_modifier_enforcement is enabled
if not binding.strict_modifier_enforcement:
for path_str in ctx.flush_set.keys():
path = dict_path.to_path(path_str)
path_depth = len(path.get_path_sections())
if path_depth > 1:
raise PluginException(
f"Updating attributes of instance {instance['id']} won't work, "
"the following update dictionary contains dict paths with length"
f"greater than one at its root: {ctx.flush_set}. "
f"len(dict_path.to_path('{path_str}').get_path_sections()) == {path_depth}. "
"Use ContextV2Wrapper to work around this."
)
return ctx.flush_set
def do_allocate(
context: Context,
binding: DynamicProxy,
allocationspec: AllocationAggregator[A],
instance: Dict[str, Any],
) -> None:
"""
Internal
Main entry point for lsm::all to perform allocation for an instance given a specific allocationspec
This method:
1. allocates all attributes for the instance
2. update the internal cache instances by modifying instance passed in
3. writes the allocation to LSM.
:param Context: The plugin context of the plugin calling this method
:param binding: The :inmanta:entity:`lsm::ServiceEntityBinding`
:param allocationspec: The allocation spec to use to perform allocation
:param instance: The instance, will be updated by this method. It is assumed to be the live cache instance, \
where the `attributes` is the same instance as `candidate_attributes`
"""
allocationspec.do_allocate(context, binding, instance)
def wrap_plugin_exception(e: Exception) -> PluginException:
if not isinstance(e, PluginException):
e = PluginException(
f"Exception during allocation: {type(e).__name__}: {str(e)}"
).with_traceback(e.__traceback__)
return e
def do_allocate_instances(
context: Context,
binding: DynamicProxy,
allocationspec: AllocationAggregator[A],
instances: List[Dict[str, Any]],
) -> None:
"""
:param Context: The plugin context of the plugin calling this method
:param binding: The :inmanta:entity:`lsm::ServiceBindingEntity`
:param allocationspec: The allocation spec to use to perform allocation
:param instances: The instances, that will be updated by this method. They are assumed to be the live cache instances, \
where the `attributes` is the same as `candidate_attributes`
"""
exception_list: List[PluginException] = []
successful_pre_allocations: List[A] = []
def append_exception(e: PluginException) -> None:
exception_list.append(e)
def append_allocator(e: A) -> None:
successful_pre_allocations.append(e)
try:
allocationspec.pre_allocate(append_exception, append_allocator)
for instance in instances:
try:
do_allocate(context, binding, allocationspec, instance)
except Exception as e:
e = wrap_plugin_exception(e)
exception_list.append(e)
finally:
try:
allocationspec.post_allocate(successful_pre_allocations, append_exception)
finally:
if len(exception_list) > 1:
raise PluginException(
message="\n".join([exc.message for exc in exception_list])
).with_traceback(exception_list[0].__traceback__)
elif exception_list:
raise exception_list[0]