Source code for inmanta_plugins.lsm

"""
    Inmanta LSM

    :copyright: 2019 Inmanta
    :contact: code@inmanta.com
    :license: Inmanta EULA
"""

import builtins
import copy
import itertools
import logging
import os
import re
import subprocess
import typing
import uuid
from collections import abc, defaultdict
from typing import Any, Mapping, Optional, Sequence, Set, Tuple, Union
from uuid import UUID

import inmanta.config
import inmanta.const
import inmanta.plugins
import inmanta.protocol
import inmanta_plugins.lsm.allocation_helpers
import inmanta_plugins.lsm.embedded_entities_tracker
import inmanta_plugins.lsm.partial
import pydantic
from inmanta import ast, config, const, plugins, protocol, resources
from inmanta.agent import handler
from inmanta.agent.handler import SkipResource
from inmanta.ast import (
    DirectExecuteException,
    OptionalValueException,
    WrappingRuntimeException,
)
from inmanta.ast import type as ast_type
from inmanta.ast.attribute import Attribute
from inmanta.ast.entity import Entity
from inmanta.data.model import ResourceIdStr
from inmanta.docstring_parser import parse_docstring
from inmanta.execute.proxy import DynamicProxy
from inmanta.execute.runtime import Instance
from inmanta.execute.util import NoneValue
from inmanta.export import (
    Exporter,
    ModelDict,
    ProxiedType,
    ResourceDict,
    dependency_manager,
    export,
)
from inmanta.plugins import PluginException
from inmanta.resources import IgnoreResourceException
from inmanta.util import dict_path
from inmanta_lsm import const as lsm_const
from inmanta_lsm import methods  # noqa
from inmanta_lsm.model import (
    EmbeddedServiceEntity,
    InterServiceRelation,
    Lifecycle,
    LifecycleState,
    LifecycleTransfer,
    ModelState,
    ResourceState,
    ServiceAttribute,
    ServiceEntity,
    ServiceEntityVersions,
)
from inmanta_plugins.lsm.allocation_v2 import entity, framework

try:
    from inmanta_lsm.const import ENV_NO_INSTANCES
except ImportError:
    # Ensure backwards compatibility with older versions of the inmanta-lsm extensions.
    ENV_NO_INSTANCES = "lsm_no_instances"

LOGGER = logging.getLogger(__name__)


def get_optional_value(value: DynamicProxy, attribute: str) -> Optional[object]:
    """
    Returns the value of an attribute that is known to exist on the entity.
    Returns None iff the attribute/relation is set to null in the DSL.
    Suppresses the OptionalValueException which is typically raised when reading an optional relation set to null.
    """
    try:
        return getattr(value, attribute)
    except OptionalValueException:
        return None


def get_environment() -> str:
    """
    Figure out the id of the environment the compiler is running in.
    """
    env = config.Config.get("config", "environment", None)
    if env is None:
        raise RuntimeError(
            "The environment is not set! Are we being called in a compile?"
        )
    return env


[docs] @plugins.plugin def context_v2_unwrapper( assignments: "dict[]", fallback_attribute: "string", track_deletes: "bool" = False, ) -> "dict[]": """ This plugin can be used to wrap the instances coming out of lsm::all and place all allocated values in :param fallback_attribute: where they should go. The returned value is what has been given as input, except for the allocated values being set where they should. :param track_deletes: drop deleted embedded entities, even if they still exist in the fallback set. This should be used together with ContextV2Wrapper allocator. Each assignment is an attribute dict containing a fallback attribute assigned with allocated values as produced by the ContextV2 (one-level deep, keys are string representation of the paths, values are allocated values) and update the dict placing values where their key-path would reach them. :param assignments: The list of service instance dictionaries as returned by lsm::all :param fallback_attributes: The attribute name at the root of the instance attributes that contains all the allocated values e.g.:: context_v2_unwrapper( [ { "environment": "8f7bf3a5-d655-4bcb-bbd4-6222407be999", "id": "f93acfad-7894-4a12-9770-b27cbdd85c74", "service_entity": "carrierEthernetEvc", "version": 4, "config": {}, "state": "allocating", "attributes": { "allocated": { "evcEndPoints[identifier=my-evc-ep-1].uni": { "uniref": "inmanta:456-852-789", "test_value": "test_value", }, "evcEndPoints[identifier=my-evc-ep-2].uni": { "uniref": "inmanta:123-852-456", "test_value": "test_value", }, }, "another_key": "any value", }, "candidate_attributes": { "allocated": { "evcEndPoints[identifier=my-evc-ep-1].uni": { "uniref": "inmanta:456-852-789", "test_value": "test_value", }, "evcEndPoints[identifier=my-evc-ep-2].uni": { "uniref": "inmanta:123-852-456", "test_value": "test_value", }, }, "another_key": "any value", }, "active_attributes": {}, "rollback_attributes": {}, } ], "allocated", ) will return:: [ { "environment": "8f7bf3a5-d655-4bcb-bbd4-6222407be999", "id": "f93acfad-7894-4a12-9770-b27cbdd85c74", "service_entity": "carrierEthernetEvc", "version": 4, "config": {}, "state": "allocating", "attributes": { "allocated": { "evcEndPoints[identifier=my-evc-ep-1].uni": { "uniref": "inmanta:456-852-789", "test_value": "test_value", }, "evcEndPoints[identifier=my-evc-ep-2].uni": { "uniref": "inmanta:123-852-456", "test_value": "test_value", }, }, "evcEndPoints": [ { "identifier": "my-evc-ep-1", "uni": { "uniref": "inmanta:456-852-789", "test_value": "test_value", }, }, { "identifier": "my-evc-ep-2", "uni": { "uniref": "inmanta:123-852-456", "test_value": "test_value", }, }, ], "another_key": "any value", }, "candidate_attributes": { "allocated": { "evcEndPoints[identifier=my-evc-ep-1].uni": { "uniref": "inmanta:456-852-789", "test_value": "test_value", }, "evcEndPoints[identifier=my-evc-ep-2].uni": { "uniref": "inmanta:123-852-456", "test_value": "test_value", }, }, "evcEndPoints": [ { "identifier": "my-evc-ep-1", "uni": { "uniref": "inmanta:456-852-789", "test_value": "test_value", }, }, { "identifier": "my-evc-ep-2", "uni": { "uniref": "inmanta:123-852-456", "test_value": "test_value", }, }, ], "another_key": "any value", }, "active_attributes": {}, "rollback_attributes": {}, } ] """ def for_all_attributes(assignment: dict) -> dict: for attributes_set in entity.AttributeSetName: attributes: Optional[dict] = assignment.get(attributes_set.value) if isinstance(attributes, NoneValue) or not attributes: continue assignment[attributes_set.value] = framework.unwrap_attributes( attributes, fallback_attribute, track_deletes=track_deletes ) return assignment return [ for_all_attributes(DynamicProxy.unwrap(assignment)) for assignment in assignments ]
def is_instance(ctx: plugins.Context, obj: "any", cls: "string") -> "bool": t = ctx.get_type(cls) try: t.validate(obj._get_instance()) except ast.RuntimeException: return False return True def getoptional(obj, *relations): try: for rel in relations: obj = getattr(obj, rel) return obj except OptionalValueException: return None def create_lifecycle(fsm_config) -> "dict": """ Generate a dict with the state metadata in to send to the server """ states = set() def state(st): if st is None: return None states.add(st) return st.name transfers = [ LifecycleTransfer( source=state(tr.source), target=state(tr.target), error=state(getoptional(tr, "error")), on_update=tr.on_update, on_delete=tr.on_delete, api_set_state=tr.api_set_state, resource_based=tr.resource_based, auto=tr.auto, validate=tr.validate, config_name=tr.config_name, target_operation=tr.target_operation, error_operation=tr.error_operation, target_same_desired_state=tr.target_same_desired_state, error_same_desired_state=tr.error_same_desired_state, description=tr.description, ) for tr in fsm_config.transfers ] states = [ LifecycleState( name=state.name, label=getoptional(state, "label"), export_resources=getoptional(state, "export_resources"), validate_self=getoptional(state, "validate_self"), validate_others=getoptional(state, "validate_others"), purge_resources=state.purge_resources, deleted=state.deleted, values={k: v for k, v in state.values.items()}, ) for state in states ] out = Lifecycle( name=fsm_config.name, states=states, transfers=transfers, initial_state=fsm_config.initial_state.name, ) out.validate_graph() return out def collect_states(fsm_config) -> "dict": """ Generate a dict with the state metadata in to send to the server """ states = set() def state(st): if st is None: return None states.add(st) for tr in fsm_config.transfers: state(tr.source) state(tr.target) state(getoptional(tr, "error")) return {state.name: state for state in states} def map_attr_type( attr: ast.attribute.Attribute, thetype: Optional[ast.type.Type] = None ) -> str: tp: ast.type.Type = ( attr.type if thetype is None else attr.type.with_base_type(thetype) ) result: Optional[str] = tp.type_string() if result is None: raise Exception("Invalid attribute type %s" % thetype) return result def unwrap(value: DynamicProxy) -> Any: """ This code is not quite the compiler domain, not is it purely the plugin domain. As such, the default unwrapping doesn't really work """ value = DynamicProxy.unwrap(value) if isinstance(value, NoneValue): return None return value class ServiceEntityBuilder: """ This class creates a ServiceEntity instance from a ServiceEntity in model. """ @classmethod def get_service_entity_from_model( cls, env: UUID, all_bindings: abc.Mapping[str, DynamicProxy], entity_binding: ProxiedType, entity_type: Entity, ) -> ServiceEntity: """ Convert the entity type object from the compiler to a ServiceEntity object. :param env: the environment to use. :param all_bindings: All service entity bindings defined in the model, mapped by corresponding ServiceEntity subtype. :param entity_binding: The entity binding defined in the model. :param entity_type: An Entity object representing a defined entity in the domain model. """ try: return cls._get_service_entity_from_model( env, all_bindings, entity_binding, entity_type ) except DirectExecuteException as e: raise WrappingRuntimeException( None, "Only Literal statements are allowed as defaults in model definition", e, ) @classmethod def _get_service_entity_from_model( cls, env: UUID, all_bindings: abc.Mapping[str, DynamicProxy], entity_binding: ProxiedType, entity_type: Entity, ) -> ServiceEntity: """ Convert the entity type object from the compiler to a ServiceEntity object. :param env: the environment to use. :param all_bindings: All service entity bindings defined in the model, mapped by corresponding ServiceEntity subtype. :param entity_binding: The entity binding defined in the model. :param entity_type: An Entity object representing a defined entity in the domain model. """ docstring = parse_docstring(entity_type.comment) ( service_attributes, embedded_service_entities, inter_service_relations, ) = cls._get_attributes( entity_type, all_bindings=all_bindings, strict_enforcement=entity_binding.strict_modifier_enforcement, ) owner = get_optional_value(entity_binding, "owner") if owner: owner = owner.service.service_entity_name return ServiceEntity( environment=env, name=entity_binding.service.service_entity_name, description=docstring.get_description(), attributes=service_attributes, embedded_entities=embedded_service_entities, inter_service_relations=inter_service_relations, lifecycle=create_lifecycle(entity_binding.lifecycle), # The config field shouldn't be populated. It is populated indirectly via the config_name attribute of a # StateTransfer. The lsm_service_entity_config_set() API call should be used to update the config of a # ServiceEntity. config={}, service_identity=entity_binding.service_identity, service_identity_display_name=entity_binding.service_identity_display_name, strict_modifier_enforcement=entity_binding.strict_modifier_enforcement, owner=owner, relation_to_owner=entity_binding.relation_to_owner, entity_annotations=cls._get_annotations_for_entity(entity_type), version=entity_binding.version, ) @classmethod def _get_annotation_in_attribute( cls, attribute: ast.attribute.Attribute, entity_type: Entity ) -> dict[str, str]: """ Returns the default value of the given attribute that contains attribute annotations. This method verifies the constraints that should hold on an attribute that contains annotations. :param attribute: The attribute that contains the annotations in its default values. :param entity_type: The entity that attribute belongs to. """ try: default_value = entity_type.get_default(attribute.name).execute_direct({}) except AttributeError: raise Exception( f"Attribute {attribute.name} of entity {entity_type.get_full_name()} is missing a default value" f" ({attribute.location})" ) if not isinstance(default_value, dict): raise Exception( f"Default value on attribute {attribute.name} of entity {entity_type.get_full_name()} doesn't have" f" type dict ({attribute.location})." ) return default_value @classmethod def _get_annotations_for_entity(cls, entity_type: Entity) -> dict[str, str]: """ Returns the dictionary with the annotations for the given Entity. """ annotations_attr_name = "__annotations" if annotations_attr_name not in entity_type.attributes: return {} attr = entity_type.attributes[annotations_attr_name] return cls._get_annotation_in_attribute(attr, entity_type) @classmethod def _get_annotations_for_relation( cls, relation: ast.attribute.RelationAttribute ) -> dict[str, str]: """ Returns the dictionary with the annotations for the given relation. """ if relation.source_annotations: relation_annotations_instances = [ source_annotation.get_value() for source_annotation in relation.source_annotations if isinstance(source_annotation.get_value(), Instance) and source_annotation.get_value().type.get_full_name() == "lsm::RelationAnnotations" ] if len(relation_annotations_instances) > 1: raise Exception( f"More than one lsm::RelationAnnotations instance found on relationship {relation} ({relation.location})" ) if relation_annotations_instances: instance = relation_annotations_instances[0] attribute = instance.get_attribute("annotations") annotations = attribute.get_value() if not builtins.all(isinstance(d, str) for d in annotations.values()): raise Exception( f"Attribute annotations of entity {instance.type.get_full_name()} must have the type" f" dict[str, str], but some values have a non-string type. Got: {annotations} ({attribute.location})" ) return annotations return {} @classmethod def _has_parent_entity(cls, entity_type: Entity, parent_entity: str) -> bool: """ Returns true iff this entity type has parent_entity as a parent entity. """ return parent_entity in [ p.get_full_name() for p in entity_type.get_all_parent_entities() ] @classmethod def _ensure_parent_entity( cls, entity_type: Entity, required_parent_entity: str ) -> None: """ Raise an exception when the given entity doesn't have required_parent_entity as a parent entity. """ if not cls._has_parent_entity(entity_type, required_parent_entity): raise Exception( f"{entity_type.get_full_name()} should be a subclass of {required_parent_entity}" ) @classmethod def _get_attributes( cls, entity_type: Entity, *, all_bindings: abc.Mapping[str, DynamicProxy], strict_enforcement: bool = False, is_embedded_entity: bool = False, entity_chain_from_service_entity: Optional[Sequence[Entity]] = None, ) -> Tuple[ list[ServiceAttribute], list[EmbeddedServiceEntity], list[InterServiceRelation] ]: """ :param strict_enforcement: True iff `ServiceEntityBinding.strict_modifier_enforcement` is set to True. :param is_embedded_entity: Indicates whether the given entity is an embedded entity. :param entity_chain_from_service_entity: All entities that exist between the service entity and this entity_type. """ if entity_chain_from_service_entity is None: entity_chain_from_service_entity = [] if is_embedded_entity: cls._ensure_parent_entity(entity_type, "lsm::EmbeddedEntity") else: cls._ensure_parent_entity(entity_type, "lsm::ServiceEntity") attributes: dict[str, Attribute] attribute_description_map: dict[str, str] attributes, attribute_description_map = cls._get_all_attributes_compiler_type( entity_type ) service_attrs: list[ServiceAttribute] embedded_entities: list[EmbeddedServiceEntity] ( service_attrs, embedded_entities, inter_service_relations, ) = cls._get_all_attributes_model_type( entity_type, attributes, attribute_description_map, entity_chain_from_service_entity, all_bindings, strict_enforcement, ) cls._log_warning_for_missing_documentation( entity_type, attribute_description_map, service_attrs ) return service_attrs, embedded_entities, inter_service_relations @classmethod def _get_all_attributes_compiler_type( cls, entity_type: Entity ) -> Tuple[dict[str, Attribute], dict[str, str]]: """ Get all attributes of the given entity in types used by the compiler. :return: A tuple with as a first element the attributes. The second element contains the description of attributes present in the docstring. """ # collect all attributes, not on std::Entity, lsm::ServiceEntity or lsm::ServiceBase attributes: dict[str, Attribute] = {} attribute_description_map = {} for name, attr in entity_type.attributes.items(): attributes[name] = attr docstring = parse_docstring(entity_type.comment) attribute_description_map.update(docstring.get_attribute_description_map()) for current in entity_type.get_all_parent_entities(): name = current.get_full_name() if name in [ "lsm::ServiceEntity", "lsm::ServiceBase", "lsm::EmbeddedEntity", "std::Entity", ]: continue for name, attr in current.attributes.items(): attributes.setdefault(name, attr) docstring = parse_docstring(current.comment) attribute_description_map.update(docstring.get_attribute_description_map()) return attributes, attribute_description_map @classmethod def _check_relationship_constraints( cls, attr: ast.attribute.RelationAttribute ) -> None: """ This method checks the following constraints: * The upperbound of the arity of the relationship to the embedding entity should be 1. * The name of the attribute referencing the embedding entity should start with an underscore. """ # Bidirectional relationship if attr.end is not None: # Check arity relationship to embedding entity if attr.end.low not in [0, 1] or attr.end.high != 1: raise Exception( f"Attribute {attr.end.name} of relationship {attr.end.type}.{attr.name} -- " f"{attr.type}.{attr.end.name} should have a lower bound of 0 or 1 and an upper " "bound of 1 for its arity. The entity structure of embedded entities should " "form a tree." ) # Check name variable referencing embedding entity if not attr.end.name.startswith("_"): attribute_to_embedding_entity = ( f"{attr.end.entity.get_full_name()}.{attr.end.name}" ) raise Exception( f"Attribute {attribute_to_embedding_entity} should start with an underscore " "because it references the embedding entity." ) @classmethod def _get_plain_attributes(cls, entity: Entity) -> Set[str]: """ Return all attributes except relational and metadata attributes. """ attributes, _ = cls._get_all_attributes_compiler_type(entity) return set( name for name, attr in attributes.items() if not ( name.startswith("_") or "__" in name or isinstance(attr, ast.attribute.RelationAttribute) ) ) @classmethod def _get_modifier( cls, attr: ast.attribute.RelationAttribute, ) -> Optional[str]: """ get the modifier from the RelationshipMetadata if found in the source_annotations of attr. """ if attr.source_annotations: relationship_metadata_entities = [ source_annotation.get_value() for source_annotation in attr.source_annotations if isinstance(source_annotation.get_value(), Instance) and source_annotation.get_value().type.get_full_name() == "lsm::RelationshipMetadata" ] if len(relationship_metadata_entities) > 1: raise Exception( "More than one attribute modifier annotation (__rwplus__, __rw__ or __r__) found on relationship " f"{attr}" ) if relationship_metadata_entities: return ( relationship_metadata_entities[0] .get_attribute("modifier") .get_value() ) return None @classmethod def _get_all_attributes_model_type( cls, entity_type: Entity, attributes_from_compiler: Mapping[str, Attribute], attribute_description_map: Mapping[str, str], entity_chain_from_service_entity: Sequence[Entity], all_bindings: abc.Mapping[str, DynamicProxy], strict_enforcement: bool = False, ) -> Tuple[ list[ServiceAttribute], list[EmbeddedServiceEntity], list[InterServiceRelation] ]: """ Get all attributes of the given entity in types used by the internal model. :param attribute_description_map: Maps the attribute name to the description present in the docstring of the entity. :param entity_chain_from_service_entity: All entities that exist between the service entity and this entity_type. :param strict_enforcement: True iff `ServiceEntityBinding.strict_modifier_enforcement` is set to True. """ defaults = entity_type.get_default_values() service_attr_dct: dict[str, dict[str, Any]] = defaultdict(dict) embedded_service_entities: list[EmbeddedServiceEntity] = [] inter_service_relations: list[InterServiceRelation] = [] # Check if loop exists between embedded entities if entity_type.get_full_name() in [ t.get_full_name() for t in entity_chain_from_service_entity ]: chain = ( " -> ".join(t.get_full_name() for t in entity_chain_from_service_entity) + f" -> {entity_type.get_full_name()}" ) raise Exception( f"A cycle exists in the embedded entity structure: {chain}. Embedded entities should form a tree structure." ) for name, attr in attributes_from_compiler.items(): try: if name.startswith("_"): # Ignore attributes starting with _ pass elif isinstance(attr, ast.attribute.RelationAttribute): if cls._has_parent_entity(attr.type, "lsm::EmbeddedEntity"): cls._check_relationship_constraints(attr) docstring = parse_docstring(attr.type.comment) ( ref_attributes, ref_embedded_service_entities, ref_inter_service_relations, ) = cls._get_attributes( attr.type, all_bindings=all_bindings, is_embedded_entity=True, entity_chain_from_service_entity=[ *entity_chain_from_service_entity, entity_type, ], strict_enforcement=strict_enforcement, ) name_attr_to_embedder: Optional[str] = ( attr.end.name if attr.end is not None else None ) ese = EmbeddedServiceEntity( name=attr.name, lower_limit=attr.low, upper_limit=attr.high, description=docstring.get_description(), attributes=ref_attributes, embedded_entities=ref_embedded_service_entities, inter_service_relations=ref_inter_service_relations, key_attributes=( [] if not strict_enforcement else cls._get_key_attributes( attr.type, name_attr_to_embedder, attr.high ) ), entity_annotations=cls._get_annotations_for_entity( attr.type ), attribute_annotations=cls._get_annotations_for_relation( attr ), ) modifier = cls._get_modifier(attr) if modifier: ese.modifier = modifier embedded_service_entities.append(ese) elif attr.source_annotations: inter_service_relations_entries = [ entity.get_value() for entity in attr.source_annotations if isinstance(entity.get_value(), Instance) and entity.get_value().type.get_full_name() == "lsm::InterServiceRelation" ] if len(inter_service_relations_entries) == 0: pass elif len(inter_service_relations_entries) > 1: raise Exception( "More than one `__service__` annotation found on relationship " f"{attr}" ) else: docstring = parse_docstring(attr.type.comment) isr = InterServiceRelation( name=attr.name, lower_limit=attr.low, upper_limit=attr.high, description=docstring.get_description(), # use service name rather than entity name as entity type entity_type=cls._get_entity_binding_for_target_inter_service_rel( attr, all_bindings ).service.service_entity_name, attribute_annotations=cls._get_annotations_for_relation( attr ), ) modifier = cls._get_modifier(attr) if modifier: isr.modifier = modifier inter_service_relations.append(isr) else: # values and metadata obj = re.search("(.*)__(.*)", name) if obj: # metadata source_attr_name, opt = obj.groups() # Make sure that attribute documentation via the docstring takes precedence # over attribute documentation via the <attr_name>__description field. if ( opt == "description" and source_attr_name in attribute_description_map ): continue if opt == "annotations": annotations = cls._get_annotation_in_attribute( attr, entity_type ) service_attr_dct[source_attr_name][ "attribute_annotations" ] = annotations continue service_attr_dct[source_attr_name][opt] = defaults[ name ].execute_direct({}) else: service_attr_dct[name]["name"] = name service_attr_dct[name]["type"] = get_type_as_str(attr) service_attr_dct[name]["description"] = ( attribute_description_map.get(name, None) ) ( validation_type, validation_params, ) = get_validation_type_and_params(attr) service_attr_dct[name]["validation_type"] = validation_type service_attr_dct[name][ "validation_parameters" ] = validation_params service_attr_dct[name]["default_value_set"] = name in defaults if name in defaults and defaults[name] is not None: value = do_execute_direct( defaults[name], f"Invalid default value for attribute {name} " "of service entity {entity_binding.service.service_entity_name}", ) attr.validate(DynamicProxy.unwrap(value)) service_attr_dct[name]["default_value"] = unwrap(value) except Exception: LOGGER.exception("Failed to process attribute %s.%s", entity_type, name) raise service_attributes: list[ServiceAttribute] = [] for attribute, values in service_attr_dct.items(): try: if "name" not in values.keys(): if ( "attribute_annotations" in values and attribute in entity_type.attributes and isinstance( entity_type.attributes[attribute], ast.attribute.RelationAttribute, ) ): # The annotations for a relationship were specified using the syntax for an attribute annotation. cls.raise_exception_to_use_annotation_on_relationship( entity_type, attribute, values["attribute_annotations"] ) all_metadata_attributes = [ f'{attribute}__{value if value != "attribute_annotations" else "annotations"}' for value in values.keys() ] raise Exception( f"Metadata attribute(s): { ', '.join(all_metadata_attributes) } " f"of entity {entity_type.get_full_name()} specified without a non-relational attribute {attribute}" ) service_attributes.append(ServiceAttribute(**values)) except pydantic.ValidationError as e: LOGGER.exception( "Failed to parse attribute %s due to an validation exception", attribute, ) raise e return service_attributes, embedded_service_entities, inter_service_relations @classmethod def _get_entity_binding_for_target_inter_service_rel( cls, attribute: ast.attribute.RelationAttribute, all_bindings: abc.Mapping[str, DynamicProxy], ) -> DynamicProxy: """ :attr attribute: The RelationAttribute for the inter-service relationship. :attr all_bindings: Mapping from the fully-qualified name of a service entity to its ServiceEntityBinding. """ if str(attribute.type) in all_bindings: return all_bindings[str(attribute.type)] # No service entity binding exists for the type of the given attribute. Maybe polymorphism was used # in the model. Search for bindings associated with child entities. bindings_for_children = [ all_bindings[str(child.get_full_name())] for child in attribute.type.get_all_child_entities() if str(child.get_full_name()) in all_bindings ] if len(bindings_for_children) == 0: raise Exception( f"No service entity binding found for target of inter-service relationship {attribute}" ) elif len(bindings_for_children) > 1: service_entity_names = ", ".join( sorted( binding.service.service_entity_name for binding in bindings_for_children ) ) raise Exception( f"Could not infer the correct type for inter-service relationship {attribute}:" f" more than one service entity binding exists for the target type: {service_entity_names}" ) return bindings_for_children[0] @classmethod def raise_exception_to_use_annotation_on_relationship( cls, entity_type: Entity, attribute_name: str, annotations: dict[str, str] ) -> None: """ This method raises an exception about the fact that annotations on the relational attribute `attribute_name` were defined using the attribute annotation syntax instead of the syntax for relational attributes. :param entity_type: The entity the relational attribute belongs to. :param attribute_name: The name of the relational attribute. :param annotations: The annotations that were defined by the attribute annotation. """ metadata_attribute_name = f"{attribute_name}__annotations" assert metadata_attribute_name in entity_type.attributes relationship = entity_type.attributes[attribute_name] assert isinstance(relationship, ast.attribute.RelationAttribute) def _format_limits(relationship: ast.attribute.RelationAttribute) -> str: """ Returns a textual representation of the upper and lower arity of the given relationship. """ if relationship.high is None: return f"[{relationship.low}:]" elif relationship.low == relationship.high: return f"[{relationship.low}]" else: return f"[{relationship.low}:{relationship.high}]" def _format_path_for_relationship( relationship_location: ast.Location, entity_type: Entity, name_relational_attribute: Optional[str] = None, ) -> str: """ Return the path to an entity or a relational attribute as part of the textual representation of a relationship. :param relationship_location: The location of the relationship definition. :param entity_type: The entity type referenced by the relationship. :param name_relational_attribute: The name of the relational attribute. Can be set to None for the side of a unidirectional relationship that doesn't have a relational attribute. """ if relationship_location.file == entity_type.location.file: # The relationship and the entity are defined in the same namespace. # We don't have to reference the entity with its fully-qualified name. result = entity_type.name else: result = entity_type.get_full_name() if name_relational_attribute: result = f"{result}.{name_relational_attribute}" return result def _format_relationship() -> str: """ Return a textual representation of the relationship. This method returns a relationship where the -- in the middle of the relationship definition is replaced with __annotations__. """ path_relational_attr_lhs = _format_path_for_relationship( relationship.location, entity_type, relationship.name ) formatted_lhs = f"{path_relational_attr_lhs} {_format_limits(relationship)}" if relationship.end: path_relational_attr_rhs = _format_path_for_relationship( relationship.location, relationship.type, relationship.end.name ) formatted_rhs = ( f"{path_relational_attr_rhs} {_format_limits(relationship.end)}" ) else: formatted_rhs = _format_path_for_relationship( relationship.location, relationship.type ) return f"{formatted_lhs} __annotations__ {formatted_rhs}" raise Exception( f""" Metadata attribute(s): {metadata_attribute_name} of entity {entity_type.get_full_name()} on the relation {attribute_name} is specified as an attribute annotation. Write: __annotations__ = lsm::RelationAnnotations( annotations={annotations} ) {_format_relationship()} instead and remove the metadata attribute at {entity_type.attributes[metadata_attribute_name].location} """.strip() ) @classmethod def _get_key_attributes( cls, embedded_entity: Entity, name_attr_to_embedder: Optional[str], arity_upper_limit: Optional[int], ) -> list[str]: """ Return the list of attributes that uniquely identify the given embedded entity. :param arity_upper_limit: The upper limit of the arity in the relationship to the embedded entity. """ key_attributes: list[str] key_attributes_attr_name = "__lsm_key_attributes" try: key_attributes = embedded_entity.get_default( key_attributes_attr_name ).execute_direct({}) except AttributeError: raise Exception( f"A default value should be defined for the attribute {key_attributes_attr_name} " f"of entity {embedded_entity.get_full_name()}" ) potential_indices: list[Set[str]] = ( cls._get_potential_indices_for_object_identity( embedded_entity, name_attr_to_embedder ) ) if not isinstance(key_attributes, NoneValue): # Verify whether an index was defined for these key_attributes key_attribute_set = set(key_attributes) if key_attribute_set in potential_indices: pass elif ( name_attr_to_embedder is not None and key_attribute_set.union({name_attr_to_embedder}) in potential_indices ): pass else: raise Exception( f"Key attributes '{key_attributes}' defined with `__lsm_key_attributes` for entity " f"{embedded_entity.get_full_name()} is not an index." ) else: # Derive key attributes from defined indices if not potential_indices: if arity_upper_limit is not None and arity_upper_limit == 1: # If the upper arity is 1, there is no need to define an index to uniquely identity the embedded entity. return [] else: raise Exception( f"Failed to identify key attributes of embedded entity {embedded_entity.get_full_name()}. " f"Attribute `__lsm_key_attributes` is null and no identity could be derived from the indices of " f"the entity." ) if len(potential_indices) > 1: raise Exception( f"Failed to identify the key attributes of embedded entity {embedded_entity.get_full_name()}. Attribute " f"`__lsm_key_attributes` is null and more than one index was found that could be used to define " f"the identity of the entity. Use the `__lsm_key_attributes` attribute to define the identifying " f"attributes explicitly." ) key_attributes = potential_indices[0] # Filter out relation to embedding entity return sorted( [attr for attr in key_attributes if attr != name_attr_to_embedder] ) @classmethod def _get_potential_indices_for_object_identity( cls, embedded_entity: Entity, name_attr_to_embedder: Optional[str] ) -> list[Set[str]]: """ Return a subset of all the indices defined on the given embedded entity, that can be used to define the identity of the embedded entity from the perspective of the north-bound API. :param name_attr_to_embedder: The name of the relational attribute, defined on the embedded entity, that references to the embedding entity. Set to None is no such relationship exists. """ def is_valid_index(attribute_names_in_index: Sequence[str]) -> bool: """ Return True iff the given index can be used to define the identity of an embedded entity. """ for attribute_name in attribute_names_in_index: if attribute_name == name_attr_to_embedder: continue if attribute_name.startswith("_"): LOGGER.debug( "Ignoring index %s on entity %s as potential key attributes for the embedded entity. " "The identifying index can only contain " "attributes exposed via the north-bound API", attribute_names_in_index, embedded_entity.get_full_name(), ) return False attribute: Optional[Attribute] = embedded_entity.get_attribute( attribute_name ) assert attribute is not None # Make mypy happy if isinstance(attribute, ast.attribute.RelationAttribute): LOGGER.debug( "Ignoring index %s on entity %s as potential key attributes for the embedded entity. " "The identifying index cannot contain " "relationships except for a relationship to the entity this entity is embedded in", attribute_names_in_index, embedded_entity.get_full_name(), ) return False attribute_type: ast_type.Type = attribute.get_type().get_base_type() if isinstance(attribute_type, ast_type.List): LOGGER.info( "Ignoring index %s on entity %s as potential key attributes for the embedded entity because it" " contains the attribute %s of type list.", attribute_names_in_index, embedded_entity.get_full_name(), attribute.name, ) return False if isinstance(attribute_type, ast_type.Dict): LOGGER.info( "Ignoring index %s on entity %s as potential key attributes for the embedded entity because it" " contains the attribute %s of type dict.", attribute_names_in_index, embedded_entity.get_full_name(), attribute.name, ) return False return True return [ set(index) for index in embedded_entity.get_indices() if is_valid_index(index) ] @classmethod def _log_warning_for_missing_documentation( cls, entity_type: Entity, attribute_description_map: dict[str, str], service_attributes: list[ServiceAttribute], ) -> None: """ Log a warning when the docstring contains documentation for an attribute that is not defined in the entity. """ attributes_in_docstring = set(attribute_description_map.keys()) attributes_in_service_definition = set([s.name for s in service_attributes]) undefined_attributes_in_docstring = ( attributes_in_docstring - attributes_in_service_definition ) for attr_name in undefined_attributes_in_docstring: if attr_name.startswith("_") or "__" in attr_name: # Documenting 'private' attributes or metadata attributes shouldn't emit a warning continue LOGGER.warning( "Attribute '%s' is defined in the docstring of service entity '%s', but attribute doesn't exist.", attr_name, entity_type.name, ) def get_type_as_str(attr: ast.attribute.Attribute) -> str: base_type: ast.type.Type = attr.type.get_base_type() def get_constraint_basetype(tp: ast.type.ConstraintType): if isinstance(tp.basetype, ast.type.ConstraintType): return get_constraint_basetype(tp.basetype) return tp.basetype if isinstance(base_type, ast.type.ConstraintType): return map_attr_type(attr, get_constraint_basetype(base_type)) else: return map_attr_type(attr) def get_validation_type_and_params( attr: ast.attribute.Attribute, ) -> Tuple[Optional[str], Optional[Mapping[str, object]]]: """ If the type of the given attribute was defined with a typedef statement and it uses a supported constraint, this method returns the inmanta_lsm validation type and validation parameters. """ import inmanta_plugins.lsm.typedef as typedef result: Optional[typedef.ValidationType] = typedef.get_validation_type(attr) return result if result is not None else (None, None) def do_execute_direct( statement: ast.statements.DynamicStatement, error_msg: str ) -> Any: """ Execute the given statement directly. """ try: return DynamicProxy.return_value(statement.execute_direct({})) except DirectExecuteException as e: raise WrappingRuntimeException(None, error_msg, e) @plugins.plugin def fsm_to_dot(config: "lsm::LifecycleStateMachine") -> "string": """ Generate a dot representation of the state machine """ return create_lifecycle(config).to_dot() @plugins.plugin def render_dot(fsm: "lsm::LifecycleStateMachine") -> None: """ Render a dot graph in the current directory """ dot = fsm_to_dot(fsm) with open("fsm.dot", "w+") as fd: fd.write(dot) subprocess.check_call(["dot", "-Tsvg", "-ofsm.svg", "fsm.dot"]) def do_multi_version_export( env: UUID, conn: protocol.endpoints.SyncClient, service_entity: str, service_entity_versions: ServiceEntityVersions, ) -> None: """ Does the export for multi versioned service entities. :param env: The environment to use. :param conn: The connection to use. :param service_entity: The name of the service :param service_entity_versions: A ServiceEntityVersions object containing the ServiceEntityDefinitions for each of the versions of the service and the default version """ export_result = conn.lsm_service_catalog_update_entity_versions( tid=env, service_entity=service_entity, service_entity_definitions=service_entity_versions.versions, default_version=service_entity_versions.default_version, ) if export_result.code != 200: if export_result.code == 404: # lsm endpoint is not found raise Exception( "A version of the LSM extension that supports this operation is not loaded on the server. " "Make sure to install a compatible version of the lsm extension (version 4.3.0 or above)." ) raise Exception( f"could not upload lifecycle {export_result.result} (code: {export_result.code})" ) def do_export( env: UUID, conn: protocol.endpoints.SyncClient, service_entity: ServiceEntity, allow_instance_updates: bool, ) -> None: """ Does the export for unversioned service entities. :param env: The environment to use. :param conn: The connection to use. :param service_entity: The definition of the service entity :param allow_instance_updates: Allow rewriting existing instances when executing an update. """ get_entity_result = conn.lsm_service_catalog_get_entity( tid=env, service_entity=service_entity.name ) if get_entity_result.code == 404: export_result = conn.lsm_service_catalog_create_entity( tid=env, service_entity_definition=service_entity ) else: kwargs = {} if allow_instance_updates: # Only set parameter if required, to preserve backward compatibility kwargs["allow_instance_updates"] = True # Config settings cannot be updated via the update_entity API call service_entity.config = get_entity_result.result["data"]["config"] if not service_entity.strict_modifier_enforcement: LOGGER.warning( ( "The strict_modifier_enforcement flag is not enabled for service entity binding %s. This behaviour is being" " deprecated. Please refer to the documentation for more information on how to enable it." ), service_entity.name, ) # try export export_result = conn.lsm_service_catalog_update_entity( tid=env, service_entity=service_entity.name, service_entity_definition=service_entity, **kwargs, ) if ( allow_instance_updates and export_result == 400 and "allow_instance_updates" in export_result.result.get("message", "") ): # Backward compatibility! LOGGER.warning( "The version of inmanta-lsm used by the compiler and server are different, this may cause issues." ) LOGGER.warning( "The server version is too old to support type updates or updates on embedded instances." ) # for backward compatibility with inmanta_lsm<2.0.1.dev # we do the call again without allow_instance_updates the argument export_result = conn.lsm_service_catalog_update_entity( tid=env, service_entity=service_entity.name, service_entity_definition=service_entity, ) if export_result.code != 200: if export_result.code == 404: # lsm endpoint is not found raise Exception( "The LSM extension is not loaded on the server. Make sure to install the lsm extension, enable it through the" " server.enabled_extensions option and restart the server." ) # This update requires the `allow_type_updates` parameter to be set if "allow_instance_updates" in export_result.result.get("message", ""): raise Exception( "could not upload lifecycle, " "because it would require updating existing instances, " "use `--export-plugin service_entities_exporter`" ) raise Exception( f"could not upload lifecycle {export_result.result} (code: {export_result.code})" ) @export( "service_entities_exporter_strict", "lsm::ServiceEntityBinding", "lsm::ServiceBinding", ) def export_service_entities( exporter: Exporter, types: dict[str, Sequence[ProxiedType]] = {} ) -> None: do_export_service_entities(exporter, types, False) @export( "service_entities_exporter", "lsm::ServiceEntityBinding", "lsm::ServiceBinding", ) def service_entities_exporter_with_instance_updates( exporter: Exporter, types: dict[str, Sequence[ProxiedType]] = {} ) -> None: do_export_service_entities(exporter, types, True) def do_export_service_entities( exporter: Exporter, types: dict[str, Sequence[ProxiedType]], force: bool = False, ) -> None: def fetch_entity_type(service_entity: str, service_entity_name: str) -> Entity: """ Used to translate an entity path in the model to an Entity object. :param service_entity: The fully qualified path of the entity in the model :param service_entity_name: The human-readable name of the service entity """ try: return exporter.types[service_entity] except KeyError as e: raise PluginException( f"Entity {service_entity} in service entity binding for" f" {service_entity_name} does not exist." ) from e conn = protocol.endpoints.SyncClient("compiler") env: Optional[UUID] = config.Config.get("config", "environment", None) if env is None: raise Exception( "The environment of the model should be configured in config>environment" ) services: list[DynamicProxy] = [ service for service in ( typing.cast(DynamicProxy, service) for service in itertools.chain(types["lsm::ServiceBinding"]) ) ] all_bindings: dict[str, DynamicProxy] = {} # The same service entity in the model can be associated with multiple versions # So we unpack them like this instead of building the dictionary directly # We don't care which version ends up here since they all have the same service_entity_name for service in services: for version in service.versions: all_bindings[version.service_entity] = version for service in services: versions_obj: list[ServiceEntity] = [] for service_version in service.versions: entity_type = fetch_entity_type( service_version.service_entity, service.service_entity_name ) versions_obj.append( ServiceEntityBuilder.get_service_entity_from_model( env, all_bindings, service_version, entity_type ) ) if len(versions_obj) == 1 and versions_obj[0].version == 0: # We are dealing with an unversioned service, so we use the old export do_export(env, conn, versions_obj[0], force) else: versioned_entity = ServiceEntityVersions( service_entity=service.service_entity_name, versions=versions_obj, default_version=service.default_version, ) do_multi_version_export( env, conn, service.service_entity_name, versioned_entity ) def get_service_entity(type_class): defaults = type_class.get_default_values() if "__service_entity_name" not in defaults: return None return defaults["__service_entity_name"].execute(None, None, None) def is_partial() -> bool: return os.environ.get(lsm_const.ENV_PARTIAL_COMPILE, "false").lower() == "true"
[docs] class CacheManager: """ Entry point for all internal caches in LSM, accessed via :py:data:`global_cache`. Also caches a connection to the server. """ def __init__(self) -> None: self._instances_per_binding: dict[str, list[dict]] = {} self._current_state_cache: dict[str, Tuple[str, int, Optional[int]]] = {} self._all_instance_cache: dict[str, dict] = {} self._client = protocol.endpoints.SyncClient("compiler") # Cache of all bindings, may be unresolvable and cause rescheduling self._binding_cache_pre: dict[ "inmanta_plugins.lsm.partial.VersionedServiceEntity", DynamicProxy ] = {} # Cache of all lifecycles, resolved self._binding_cache_lifecycles: dict[ "inmanta_plugins.lsm.partial.VersionedServiceEntity", dict ] = {} # Cache of all service entity definitions we pulled from the api self._service_entities_cache: dict[tuple[str, int | None], ServiceEntity] = {} self._selector_factory: Optional[ typing.Callable[[str], "inmanta_plugins.lsm.partial.SelectorApi"] ] = None self._selector: Optional["inmanta_plugins.lsm.partial.SelectorApi"] = None
[docs] def reset(self) -> None: """Reset all state: drop all caches and renew the connection""" self._instances_per_binding = {} self._current_state_cache = {} self._client = protocol.endpoints.SyncClient("compiler") self._binding_cache_pre = {} self._binding_cache_lifecycles = {} self._service_entities_cache = {} self._all_instance_cache = {} self._selector = None
[docs] def get_all_versioned_bindings( self, ) -> dict["inmanta_plugins.lsm.partial.VersionedServiceEntity", DynamicProxy]: """Return all bindings that have been registered""" return self._binding_cache_pre
[docs] def get_all_bindings(self) -> dict[str, DynamicProxy]: """Return all unversioned service entity bindings that have been registered""" bindings = self.get_all_versioned_bindings() unversioned_bindings = { name: binding for (name, version), binding in bindings.items() if version == 0 } if len(bindings.keys()) != len(unversioned_bindings.keys()): raise Exception( "This method was deprecated with the introduction of versioned service entities. " "Use get_all_versioned_bindings instead." ) return unversioned_bindings
[docs] def set_selector_factory( self, selector_factory: typing.Optional[ typing.Callable[[str], "inmanta_plugins.lsm.partial.SelectorApi"] ], ) -> None: """Set the selector factory that will produce a selector for partial compile For testing: the factory is not reset when the module is reloaded. To reset, set it to None. """ self._selector_factory = selector_factory
def selector(self, environment: str) -> "inmanta_plugins.lsm.partial.SelectorApi": if self._selector: return self._selector if not is_partial(): self._selector = inmanta_plugins.lsm.partial.AllSelector(environment) elif self._selector_factory: self._selector = self._selector_factory(environment) else: self._selector = inmanta_plugins.lsm.partial.TreeSelector(environment) return self._selector def get_client(self) -> protocol.endpoints.SyncClient: return self._client
[docs] def get_instance( self, env: str, service_entity_name: Optional[str], instance_id: Union[str, UUID], force: bool = False, include_terminated: bool = False, ) -> Optional[dict]: """ Return the service instance with the given environment, service_entity_name and instance_id or None if no such instance exists. :param force: when true, the cache is refreshed from the server :param include_terminated: when trying to pull a specific instance, and it is not in the cache, try to get it from the API directly, so that it is returned even when terminated. """ assert not ( force and not service_entity_name ), "when forcing refresh, service_entity_name must be passed in" if isinstance(instance_id, UUID): instance_id = str(instance_id) if service_entity_name: self.get_all_instances(env, service_entity_name, force=force) found = self._all_instance_cache.get(instance_id) if found is None: if include_terminated: # Try to pull it from the API directly in case it is terminated result = self._client.lsm_services_get_by_id( tid=env, service_id=instance_id ) if result.code == 404: # it is not there at all return None if result.code != 200: message = result.result.get("message") if result.result else None extra_info = f": {message}" if message else "." raise Exception( f"Unable to retrieve service instance from the API (code {result.code}){extra_info}" ) found = result.result["data"] if found is None: return None if service_entity_name and found["service_entity"] != service_entity_name: return None return found
[docs] def convert_instance( self, instance: dict, validation: bool, instance_id: Optional[str] ) -> Optional[dict]: """ Convert an instance from the API form to the return format of lsm::all :param instance: The instance dict as returned by the api :param validation: Whether this is a validation compile :param instance_id: The id of the instance being validated (if this is a validation compile) """ service_entity = instance["service_entity"] service_entity_version = instance.get("service_entity_version", 0) ls = self.get_lifecycle(service_entity, service_entity_version) state = ls[instance["state"]] if not validation: if not state.export_resources: return None else: attributes = instance["active_attributes"] else: if instance_id == instance["id"]: take_set = state.validate_self if not take_set: return None else: attributes = instance[f"{take_set}_attributes"] else: take_set = state.validate_others if not take_set: return None else: attributes = instance[f"{take_set}_attributes"] if attributes is None: attributes = {} return { "id": instance["id"], "version": instance["version"], "desired_state_version": instance.get("desired_state_version", None), "state": instance["state"], "delete": instance["deleted"], "deleted": instance["deleted"], "service_entity": service_entity, "service_entity_version": instance.get("service_entity_version", 0), "attributes": attributes, "candidate_attributes": instance["candidate_attributes"], "active_attributes": instance["active_attributes"], "rollback_attributes": instance["rollback_attributes"], }
[docs] def get_all_instances( self, env: str, service_entity_name: str, force: bool = False ) -> list[dict]: """ Get all (non-terminal) instances from the server for a specific environment and service_entity_name. The result is cached and any subsequent call uses the cache. :param env: the environment to use. :param service_entity_name: the name of specific service entity for which to fetch the instances. :param force: when true, the cache is refreshed from the server :return: all instances, as dicts, in the format returned by the server. """ if force or service_entity_name not in self._instances_per_binding: # get ALL instances result = self._client.lsm_services_list( tid=env, service_entity=service_entity_name ) if result.code == 404: # the type probably needs to be registered first return [] if result.code != 200: message = result.result.get("message") if result.result else None extra_info = f": {message}" if message else "." raise Exception( f"Unable to retrieve service instance from the API (code {result.code}){extra_info}" ) out = result.result["data"] self._instances_per_binding[service_entity_name] = out for instance in out: self.cache_instance(instance) return self._instances_per_binding[service_entity_name]
def cache_instance(self, instance: dict) -> None: LOGGER.log( inmanta.const.LogLevel.TRACE.to_int, "Caching instance %s", instance["id"] ) self._all_instance_cache[instance["id"]] = instance self._current_state_cache[instance["id"]] = ( instance["state"], instance["version"], instance.get("desired_state_version", None), )
[docs] def get_instance_state( self, instance_id: str ) -> "inmanta_plugins.lsm.partial.VersionedServiceEntity": """ Get the current state and version for a specific instance. Can only be called for instances retrieved via get_all_instances :param instance_id: the uuid for the service instance :return: current state and version for the specific instance """ state_name, current_version, _ = ( self.get_instance_state_with_desired_state_version(instance_id) ) return state_name, current_version
[docs] def get_instance_state_with_desired_state_version( self, instance_id: str ) -> tuple[str, int, Optional[int]]: """ Get the current state and version for a specific instance. Can only be called for instances retrieved via get_all_instances :param instance_id: the uuid for the service instance :return: current state, version and desired state version for the specific instance """ if instance_id not in self._current_state_cache: # This should never happen, since the all() plugin is always called before the current_state() plugin raise Exception( f"No service instance {instance_id} found, ensure the 'all' plugin has been called before" ) return self._current_state_cache[instance_id]
[docs] def register_binding(self, binding) -> dict[str, object]: """Register a lifecycle binding and return its lifecycle""" name = binding.service.service_entity_name version = binding.version if (name, version) not in self._binding_cache_pre: LOGGER.debug("First registered binding %s version %s", name, version) self._binding_cache_pre[(name, version)] = binding self.resolve_binding_cache() return self._binding_cache_lifecycles[(name, version)]
def resolve_binding_cache(self) -> None: # Resolve all bindings at once, making multiple lsm::all behave more regular todo = set(self._binding_cache_pre.keys()) - set( self._binding_cache_lifecycles.keys() ) for name, version in todo: binding = self.get_binding(name, version) self._binding_cache_lifecycles[(name, version)] = collect_states( binding.lifecycle )
[docs] def get_lifecycle( self, entity_name: str, entity_version: Optional[int] = None ) -> dict[str, dict[str, object]]: """ Get the lifecycle for a specific version of service entity. :param entity_name: The name of the service entity. :param entity_version: The version of the service entity. Optional for backwards compatibility. If it is not provided, will only work if there is one registered version. :return: The lifecycle of the provided service entity version. """ if entity_version is None: matching_versions = [ version for name, version in self._binding_cache_lifecycles.keys() if name == entity_name ] if len(matching_versions) > 1: raise Exception( f"Service entity {entity_name} has multiple versions registered {matching_versions}," f" please specify a version while calling this method " ) elif len(matching_versions) == 0: raise Exception( f"No lifecycle found for any version of {entity_name}, ensure register_binding has been called." ) entity_version = matching_versions[0] if (entity_name, entity_version) not in self._binding_cache_lifecycles: raise Exception( f"No lifecycle found for {entity_name} and version {entity_version}, ensure register_binding has been called." ) return self._binding_cache_lifecycles[(entity_name, entity_version)]
[docs] def get_binding( self, entity_name: str, entity_version: Optional[int] = None ) -> DynamicProxy: """ Takes an entity name and version and returns the appropriate binding. If entity_version is not provided we will assume it is 0 and that we are dealing with unversioned entities. If no specific version is requested and multiple versions exist, raises an exception. The binding returned may not be fully resolved and can raise UnsetExceptions when accessed. """ if entity_version is None: entity_version = 0 entity_versions = [ version for (name, version) in self._binding_cache_pre.keys() if name == entity_name ] if len(entity_versions) > 1: raise LookupError( f"No specific version was requested for binding with name {entity_name}. " f"Found multiple cached versions: {entity_versions} " ) key = (entity_name, entity_version) if key not in self._binding_cache_pre: raise KeyError( f"No binding found for {entity_name} and version {entity_version}." ) return self._binding_cache_pre[key]
[docs] def get_service_entity( self, service_entity_name: str, service_entity_version: int | None = None, ) -> ServiceEntity: """ Get the service entity definition with the given name from the api. :param service_entity_name: The name of the service whose definition we want to query. :param service_entity_version: The version of the service entity (if multi-version is supported by this inmanta server) """ cache_key = (service_entity_name, service_entity_version) if cache_key in self._service_entities_cache: return self._service_entities_cache[cache_key] # Get the service definition, if a version is specified, we use the newer # api, otherwise keep the old one, to ensure backward compatibility with older # versions of the server service_entity_result: inmanta.protocol.common.Result if service_entity_version is not None: # We have an entity version, we try the new version aware api service_entity_result = self._client.lsm_service_catalog_get_entity_version( tid=get_environment(), service_entity=service_entity_name, version=service_entity_version, ) if service_entity_version is None or service_entity_result.code >= 500: # No entity version or unavailable api, we try the legacy api service_entity_result = self._client.lsm_service_catalog_get_entity( tid=get_environment(), service_entity=service_entity_name, ) if service_entity_result.code != 200: message = ( service_entity_result.result.get("message") if service_entity_result.result else None ) extra_info = f": {message}" if message else "." raise Exception( f"Unable to retrieve service entity from the API (code {service_entity_result.code}){extra_info}" ) # Build the service entity object, for easier data access service_entity = ServiceEntity(**service_entity_result.result["data"]) # Cache the constructed entity for later access self._service_entities_cache[cache_key] = service_entity return service_entity
global_cache = CacheManager() @plugins.plugin def get_service_binding_version( context: plugins.Context, service: "lsm::ServiceBinding", version: "int" ) -> "lsm::ServiceBindingVersion": """ Filters the specific binding version from a service :param service: The ServiceBinding that groups the different ServiceBindingVersions :param version: The version that we want to fetch :return: The ServiceBindingVersion that matches the version that we provided """ version = [ service_binding_version for service_binding_version in service.versions if service_binding_version.version == version ] if len(version) == 0: raise Exception( f"ServiceBindingVersion {version} not found for service {service.service_entity_name}" ) if len(version) > 1: raise Exception( f"Multiple ServiceBindingVersions were found on service {service.service_entity_name} with version {version}" ) return version[0] @plugins.plugin def all( context: plugins.Context, service: "lsm::ServiceBinding", min_version: "int" = None, max_version: "int" = None, include_embedded_entities_id: "bool" = False, include_purged_embedded_entities: "bool" = False, added_attribute: "string" = "_added", # type: ignore removed_attribute: "string" = "_removed", # type: ignore ) -> "list": """ Returns a list of records for the given binding. :param service: The service that we are unrolling :param include_embedded_entities_id: Set to True to make sure all embedded entities which are part of each instance `attributes` have an identity baked in, to allow the usage of the lsm::get_previous_value plugin. :param include_purged_embedded_entities: Set to True to enable tracking embedded entities added or removed during an update. At each step of the update, relevant embedded entities will be flagged as added or removed by the lsm::all plugin. In practise, this means the `_added` or the `_removed` boolean attributes will be set on the relevant embedded entities. To use different flagging attributes, use the `added_attribute` and the `removed_attribute` parameters. :param min_version: The minimum service_entity_version of the selected instances (including this version). If left empty, there is no minimum boundary. :param max_version: The maximum service_entity_version of the selected instances (including this version). If left empty, there is no maximum boundary. :param added_attribute: Use in conjunction with `include_purged_embedded_entities=True` to set the name of the attribute flagging embedded entities that were added during an update. :param removed_attribute: Use in conjunction with `include_purged_embedded_entities=True` to set the name of the attribute flagging embedded entities that were removed during an update. :return: A list of dict with all the defined records. """ # make calls that can cause re-scheduling if service.service_entity_name is None: raise Exception( "Only instances of entities that define its service type can be retrieved." ) is_service_catalog_update: bool = ( os.environ.get(ENV_NO_INSTANCES, "false").lower() == "true" ) if is_service_catalog_update: LOGGER.log( logging.DEBUG, "lsm::all returning 0 instances for type %s and version range (min version %s and max version %s) " "during update of the service catalog.", service.service_entity_name, str(min_version), str(max_version), ) return [] LOGGER.log( inmanta.const.LogLevel.TRACE.to_int, "Entering lsm:all for %s", service.service_entity_name, ) for version in service.versions: # this is a failsafe if the selector doesn't do it properly global_cache.register_binding(version) # collect environment context service_entity_name = service.service_entity_name model_state = os.environ.get(lsm_const.ENV_MODEL_STATE, ModelState.active) validation = model_state == ModelState.candidate env = context.get_environment_id() # Get the selector selector = global_cache.selector(environment=env) selector.reload_bindings() # at this point, we are probably good wrt to re-execution # Parse instances ids instance_id_env = os.environ.get(lsm_const.ENV_INSTANCE_ID, None) if not instance_id_env: instance_ids: list[str] = [] else: instance_ids = instance_id_env.split(" ") # Validate for instance_id in instance_ids: try: uuid.UUID(hex=instance_id) except ValueError: raise PluginException( f"Instance Id set via {lsm_const.ENV_INSTANCE_ID} is not a valid uuid: {instance_id}" ) if validation: if len(instance_ids) == 0: raise Exception( "Validation compile without instance set! This is not allowed" ) if len(instance_ids) > 1: raise Exception( "Validation compile with multiple instance set! This is not allowed. instances: %s", instance_ids, ) instance_to_validate: Optional[str] = instance_ids[0] else: instance_to_validate = None selector.register_instances(instance_ids) # Select all instances regardless of version # We do the version filtering below raw_instances: list[dict] = selector.select(service_entity_name) allocation_specs: dict[int, Optional[str]] = { version.version: get_optional_value(version, "allocation_spec") for version in service.versions } instances = [] to_allocate: dict[int, list[dict]] = defaultdict(list) for data in raw_instances: service_entity_version = data.get("service_entity_version", 0) if min_version is not None and min_version > service_entity_version: continue if max_version is not None and max_version < service_entity_version: continue instance = global_cache.convert_instance(data, validation, instance_to_validate) if not instance: continue instances.append(instance) if ( validation and instance_to_validate == data["id"] and allocation_specs[service_entity_version] is not None ): to_allocate[service_entity_version].append(instance) if to_allocate: import inmanta_plugins.lsm.allocation as allocation for version in to_allocate.keys(): to_allocate_version = to_allocate[version] binding = [ binding for binding in service.versions if binding.version == version ][0] allocate_spec = allocation.global_allocation_collector.get_spec( binding.allocation_spec ) LOGGER.log( logging.DEBUG, "Performing allocation for %d instances of type %s and version %s: done", len(to_allocate_version), service.service_entity_name, version, ) allocation.do_allocate_instances( context, binding, allocate_spec, to_allocate_version ) LOGGER.log( logging.DEBUG, "lsm::all returning %d instances for type %s", len(instances), service_entity_name, ) if include_purged_embedded_entities: instances = insert_purged_embedded_entities( instances=instances, added_attribute=added_attribute, removed_attribute=removed_attribute, ) if include_embedded_entities_id: instances = insert_embedded_entities_id(instances) return instances @plugins.plugin def current_state(ctx: plugins.Context, fsm: "lsm::ServiceEntity") -> "dict": """ Returns the current state from the lifecycle and the next version of the instance """ instance_id = fsm.instance_id state_name, current_version, current_desired_state_version = ( global_cache.get_instance_state_with_desired_state_version(instance_id) ) states = {} # collect all states in the graph for transfer in fsm.entity_binding.lifecycle.transfers: states[transfer.source.name] = transfer.source states[transfer.target.name] = transfer.target try: states[transfer.error.name] = transfer.error except OptionalValueException: pass if state_name not in states: raise Exception( "The state returned from the API does not exist in the lifecycle defined in the model." ) return { "state": states[state_name]._get_instance(), "next_version": current_version + 1, "next_desired_state_version": ( current_desired_state_version + 1 if current_desired_state_version is not None else None ), } @plugins.plugin def has_current_state( ctx: plugins.Context, service_instance: "lsm::ServiceEntity", state_name: "string" ) -> "bool": """ Check whether the given service instance is currently in the given state of its lifecycle. :param service_instance: The ServiceEntity object. :param state_name: The name of the lifecycle state """ instance_id = service_instance.instance_id current_state, _ = global_cache.get_instance_state(instance_id) return current_state == state_name @plugins.plugin def is_validating(instance_id: "string") -> "bool": """ Return true if the current compile is a validating compile and the instance being validated has the given id. :param instance_id: The id of the instance we want to check for validation. """ return ( inmanta_plugins.lsm.allocation_helpers.is_validation_compile() and inmanta_plugins.lsm.allocation_helpers.get_validated_instance_id() == instance_id ) @plugins.plugin def format(__string: "string", args: "list", kwargs: "dict") -> "string": """ Format a string using python string formatter, and accepting statements which native inmanta f-string doesn't support (such as accessing dict values) :param __string: The string to apply formatting to :param args: The positional arguments to feed into the `str.format` method :param kwargs: The keyword arguments to feed into the `str.format` method """ return __string.format(*args, **kwargs) @plugins.plugin() def update_read_only_attribute( service: "lsm::ServiceEntity", # type: ignore attribute_path: "string", # type: ignore *, value: "any", # type: ignore ) -> "any": """ Update the value of a read-only (candidate) attribute in the service, in any compile, at any time. The value will first be compared to the previous set value, and only be sent to the server if it is different. :param service: The service on which we want to set the value :param attribute_path: The path towards the service attribute :param value: The new value that we want to make sure is currently written in the service. """ # Get the instance to check the current value instance = inmanta_plugins.lsm.allocation_helpers.get_service_instance( service.entity_binding.service.service_entity_name, service.instance_id, ) # The current value is the one in the candidate attributes if they exist, in the # active attributes otherwise attributes = instance["candidate_attributes"] or instance["active_attributes"] try: current_value = dict_path.to_path(attribute_path).get_element(attributes) except LookupError: current_value = None # Only write the new value if it is different from the previous one if current_value != value: inmanta_plugins.lsm.allocation_helpers.set_allocated_value( service_entity=service, attribute_path=dict_path.to_path(attribute_path), value=value, ) return value @plugins.plugin() def validate_service_index( binding: "lsm::ServiceBindingVersion", # type: ignore attributes: "dict", ignored_states: "string[]" = [], # type: ignore ) -> "bool": """ Validate that amongst all the services of the given binding that should be taken into account in this compile, only one of them has the provided set of attributes names and values. :param binding: The binding for which we want to check the services. :param attributes: The attributes that we are looking for, the keys should be the attributes names, and the values the expected values. """ def match(service: dict) -> bool: """ Check if the given service has for each attribute we care about, the value we care about. """ # First check if the service should be taken into account in this compile if not service: return False # The service is in a state that we shouldn't consider if service["state"] in ignored_states: return False # Then check for all the relevant attributes for k, v in attributes.items(): if service["attributes"][k] != v: return False return True # Get all the services and check if any service also has the same set # of attributes with the same values. If it is the case, raise an explicit # exception all_services = [ global_cache.convert_instance( service, inmanta_plugins.lsm.allocation_helpers.is_validation_compile(), inmanta_plugins.lsm.allocation_helpers.get_validated_instance_id() or "", ) for service in global_cache.get_all_instances( get_environment(), binding.service.service_entity_name, ) ] matching_services = [service["id"] for service in all_services if match(service)] if len(matching_services) <= 1: # No issue there return True raise inmanta.plugins.PluginException( f"The attributes {list(attributes.keys())} should form a unique combination across all " f"services, but multiple services use the same values: {matching_services} ({attributes})" ) @resources.resource("lsm::LifecycleTransfer", agent="agent", id_attribute="instance_id") class LifecycleTransferResource(resources.PurgeableResource): """ A resource that collects deploy events and send them to the lsm """ service_entity: str instance_id: uuid.UUID next_version: int next_desired_state_version: Optional[int] fields = ( "service_entity", "instance_id", "next_version", "next_desired_state_version", "resources", ) @staticmethod def get_resources(exporter, transfer): resource_list = [] unregistered_resources = set() for res in transfer.resources: try: current_resource_id = resources.to_id(res) if current_resource_id is None: unregistered_resources.add(str(DynamicProxy.unwrap(res))) else: resource_list.append(current_resource_id) except IgnoreResourceException: pass if len(unregistered_resources) > 0: raise RuntimeError( f"Unregistered resources are present in this export: {unregistered_resources}" ) sorted_resources = sorted(resource_list) return sorted_resources # This class should ideally inherit from the more generic `HandlerAPI` instead of `CRUDHandler` due to its limited adherence # to the interface. However, since the `HandlerAPI` class is only available on iso7+ and we want to remain compatible with # iso6 this cannot yet be done. @handler.provider("lsm::LifecycleTransfer", name="local_state") class LifecycleTransferHandler(handler.CRUDHandler): """ A handler that collects all resource statuses from resources that are part of a service instance. The deploy() method is used to determine whether the resources of a service instance are deployed successfully or not. """ client: Optional[protocol.Client] def pre( self, ctx: handler.HandlerContext, resource: LifecycleTransferResource ) -> None: self.client = protocol.Client("agent") def deploy( self, ctx: handler.HandlerContext, resource: LifecycleTransferResource, requires: dict[ResourceIdStr, const.ResourceState], ) -> None: self.pre(ctx, resource) try: all_resources_are_deployed_successfully = self._send_current_state( ctx, resource, requires ) if all_resources_are_deployed_successfully: ctx.set_status(const.ResourceState.deployed) else: ctx.set_status(const.ResourceState.failed) finally: self.post(ctx, resource) def _send_current_state( self, ctx: handler.HandlerContext, resource: LifecycleTransferResource, fine_grained_resource_states: dict[ResourceIdStr, const.ResourceState], ) -> bool: """ Report the resource states for the resources in this service instance to inmanta LSM. This method raises a SkipResource exception when one of the given resource states is a transient state (i.e. it doesn't match the failed or deployed state). :param resource: This LifecycleTransfer resource. :param fine_grained_resource_states: The resource state for each resource part of this service instance. :return: True iff all the given resource states are equal to deployed. """ # If a resource is not in events, it means that it was deployed before so we can mark it as success resource_states = {res: ResourceState.success for res in resource.resources} is_failed = False skipped_resources = [] # Convert inmanta.const.ResourceState to inmanta_lsm.model.ResourceState for resource_id, state in fine_grained_resource_states.items(): if state == const.ResourceState.failed: resource_states[resource_id] = ResourceState.error is_failed = True elif state == const.ResourceState.deployed: # same as default list pass elif resource_id in resource_states: # some transient state that is not failed and not success, so lets skip skipped_resources.append( f"skipped because the `{resource_id}` is `{state.value}`" ) # failure takes precedence over transient # transient takes precedence over success if len(skipped_resources) > 0 and not is_failed: raise SkipResource("\n".join(skipped_resources)) ctx.info( "Sending %(states_str)s", states_str=", ".join([f"{k}: {v}" for k, v in resource_states.items()]), states=resource_states, ) set_state_args = { "tid": self._agent.environment, "service_entity": resource.service_entity, "service_id": resource.instance_id, "resource_states": resource_states, } if resource.next_desired_state_version is not None: # Makes mypy happy assert resource.next_desired_state_version is not None set_state_args["current_desired_state_version"] = ( resource.next_desired_state_version - 1 ) set_state_args["current_version"] = None else: set_state_args["current_version"] = resource.next_version - 1 def call() -> typing.Awaitable[protocol.Result]: return self.client.lsm_services_resources_set_state(**set_state_args) response = self.run_sync(call) if response.code == 409: # Conflict: Transfer already happened return not is_failed if response.code != 200: ctx.error( "Failed to set resource state on lsm.", code=response.code, result=response.result, ) # and raise exception to mark this resource as failed raise Exception("Failed to set resource state on lsm.") return not is_failed def _diff( self, current: LifecycleTransferResource, desired: LifecycleTransferResource ) -> dict[str, dict[str, Any]]: changes = super()._diff(current, desired) if "next_version" in changes: # no changes required when current > desired current = changes["next_version"]["current"] desired = changes["next_version"]["desired"] if current > desired: del changes["next_version"] return changes def read_resource( self, ctx: handler.HandlerContext, resource: LifecycleTransferResource ) -> None: """ This method is used by the dry_run code path. """ def call(): # get the state from lsm return self.client.lsm_services_get( self._agent.environment, resource.service_entity, resource.instance_id ) response = self.run_sync(call) if response.code != 200: ctx.error( "Failed to retrieve service instance", message=response.result, status=response.code, ) raise Exception("Failed to retrieve service instance") ctx.set("instance", response.result["data"]) ctx.set("transferred", False) ctx.info( "Retrieved instance with version %(version)s", version=response.result["data"]["version"], ) resource.next_version = response.result["data"]["version"] resource.next_desired_state_version = response.result["data"].get( "next_desired_state_version", None ) def create_resource( self, ctx: handler.HandlerContext, resource: LifecycleTransferResource ) -> None: raise Exception( "This method should not be called, because this handler overrides the deploy() method." ) def delete_resource( self, ctx: handler.HandlerContext, resource: LifecycleTransferResource ) -> None: raise Exception( "This method should not be called, because this handler overrides the deploy() method." ) def update_resource( self, ctx: handler.HandlerContext, changes: dict, resource: LifecycleTransferResource, ) -> None: raise Exception( "This method should not be called, because this handler overrides the deploy() method." ) def post( self, ctx: handler.HandlerContext, resource: LifecycleTransferResource ) -> None: self.client = None @resources.resource("lsm::ServiceInstance", agent="agent", id_attribute="instance_id") class ServiceInstanceResource(resources.PurgeableResource): fields = ( "service_entity", "instance_id", "attributes", "skip_update_states", "rejected_states", ) @staticmethod def get_attributes(exporter, instance): return instance.attributes @staticmethod def get_instance_id(_, instance): return instance.instance_id @staticmethod def get_service_entity(_, instance): return instance.service_entity @handler.provider("lsm::ServiceInstance", name="service_instance") class ServiceInstanceHandler(handler.CRUDHandler): client: protocol.SyncClient def pre(self, ctx, resource) -> None: self.client = protocol.Client("agent") def read_instance(self, service_instance: ServiceInstanceResource): def call(): return self.client.lsm_services_get( self._agent.environment, service_instance.service_entity, service_instance.instance_id, ) return call def read_resource( self, ctx: handler.HandlerContext, service_instance: ServiceInstanceResource ) -> None: # get the state from lsm def call(): return self.client.lsm_services_get( self._agent.environment, service_instance.service_entity, service_instance.instance_id, ) response = self.run_sync(call) if response.code == 404: service_instance.purged = True return if response.code != 200: ctx.error( "Failed to retrieve service instance", message=response.result, status=response.code, ) raise Exception("Failed to retrieve service instance") ctx.set("instance", response.result["data"]) ctx.info( "Retrieved instance with version %(version)s", version=response.result["data"]["version"], ) service_instance.attributes = ( response.result["data"]["candidate_attributes"] if not response.result["data"]["active_attributes"] else response.result["data"]["active_attributes"] ) def create_resource( self, ctx: handler.HandlerContext, service_instance: ServiceInstanceResource ) -> None: def call(): return self.client.lsm_services_create( self._agent.environment, service_instance.service_entity, service_instance.attributes, service_instance.instance_id, ) response = self.run_sync(call) if response.code != 200: ctx.error( "Failed to create service instance on lsm.", code=response.code, result=response.result, ) raise Exception( f"Failed to set resource state on lsm. {str(response.result)}" ) ctx.set("instance", response.result["data"]) ctx.info( "Created instance with version %(version)s", version=response.result["data"]["version"], ) service_instance.instance_id = response.result["data"]["id"] ctx.set_created() def delete_resource( self, ctx: handler.HandlerContext, service_instance: ServiceInstanceResource ) -> None: instance_read_response = self.run_sync(self.read_instance(service_instance)) instance_version = instance_read_response.result["data"]["version"] instance_current_state = instance_read_response.result["data"]["state"] def call(): return self.client.lsm_services_delete( self._agent.environment, service_instance.service_entity, service_instance.instance_id, instance_version, ) response = self.run_sync(call) if response.code == 404 and "cannot be updated" in response.result["message"]: raise SkipResource( f"Instance {instance_read_response.result['data']['id']} " f"cannot be deleted while in state {instance_current_state}" ) if response.code != 200: ctx.error( "Failed to delete service instance on lsm.", code=response.code, result=response.result, ) raise Exception("Failed to delete service instance on lsm.") ctx.set("instance", None) ctx.set_purged() def calculate_diff( self, ctx: handler.HandlerContext, current: ServiceInstanceResource, desired: ServiceInstanceResource, ) -> dict[str, dict[str, Any]]: changes = super()._diff(current, desired) if "attributes" not in changes: return changes # Desired might not contain the attributes that have default value for attribute_name, attribute_value in changes["attributes"]["current"].items(): if attribute_name not in changes["attributes"]["desired"].keys(): changes["attributes"]["desired"][attribute_name] = attribute_value if changes["attributes"]["current"] == changes["attributes"]["desired"]: del changes["attributes"] return changes def update_resource( self, ctx: handler.HandlerContext, changes: dict, service_instance: ServiceInstanceResource, ) -> None: read_response = self.run_sync(self.read_instance(service_instance)) state = read_response.result["data"]["state"] instance_version = read_response.result["data"]["version"] if state in service_instance.skip_update_states: raise SkipResource(f"Instance cannot be updated from state {state}") if state in service_instance.rejected_states: raise Exception(f"Instance is in a rejected state: {state}") updated = {} if "attributes" in changes: # Check the desired attributes for attribute_name, attribute_value in changes["attributes"][ "desired" ].items(): # If it's already in the candidates, but with a different value, update the value if ( read_response.result["data"]["candidate_attributes"] and read_response.result["data"]["candidate_attributes"][ attribute_name ] != attribute_value ): updated[attribute_name] = attribute_value # If it's different from the active attribute, and not already in the candidates, update it if ( read_response.result["data"]["active_attributes"] and read_response.result["data"]["active_attributes"][ attribute_name ] != attribute_value and attribute_name not in updated.keys() ): updated[attribute_name] = attribute_value del changes["attributes"] if changes: raise Exception( f"Changing {changes.keys()} of an instance is not supported" ) def call(): return self.client.lsm_services_update( self._agent.environment, service_instance.service_entity, service_instance.instance_id, instance_version, updated, ) if updated: ctx.debug( f"Updating instance {service_instance.id} with attributes {str(updated)}" ) response = self.run_sync(call) if response.code != 200: ctx.error( "Failed to update service instance on lsm.", code=response.code, result=response.result, ) raise Exception( f"Failed to update service instance on lsm. {str(response.result)}" ) ctx.set_updated() @dependency_manager def lsm_dependency_manager(entities: ModelDict, resources: ResourceDict) -> None: for res in resources.values(): if res.id.entity_type == "lsm::LifecycleTransfer": # Ensure that the resources list contains only resources that exist. # The requires set was already filtered from resources that don't exist. res.resources = sorted([r.resource_str() for r in res.requires]) def inmanta_reset_state() -> None: """ Reset the cached state for this module. """ global_cache.reset() @inmanta.plugins.plugin() def insert_purged_embedded_entities( instances: "dict[]", # type: ignore *, removed_attribute: "string" = "_removed", # type: ignore added_attribute: "string" = "_added", # type: ignore ) -> "dict[]": # type: ignore """ Update the instances in the provided list to include in the current attribute set all the embedded entities that have been removed in the current update. If no update is happening for an instance, its attributes are left untouched. :param instances: The list of instance to update, the original dicts are not modified, the returned values are a modified copy. Instances can be of different types. :param removed_attribute: The attribute that should be inserted in the embedded entities which are inserted back into the current state, to mark the entity as purged. :param added_attribute: The attribute that should be inserted in the embedded entities which are added into the current state, to mark the entity as "new". """ cache: dict[tuple[str, int | None], list[dict_path.WildDictPath]] = {} # Go over each service and transform its attributes to include any deleted embedded # entity updated_instances: list[dict] = [] for instance in instances: # Bring the attributes into the python domain so that dict path can handle it # and we can modify its values # This method makes a deepcopy of the original object so we can safely modify it instance = inmanta_plugins.lsm.unwrap(instance) previous_attributes = get_previous_attributes(instance) if previous_attributes is None: # We don't have a previous attributes set, there can't be any removed embedded # entities to insert back, we can continue updated_instances.append(instance) continue # The selection of the current attributes is done automatically by lsm::all, based on # the validate_self and validate_others attributes of the state we are in. current_attributes = instance["attributes"] # Get all the embedded entities that were added/removed removed_embedded_entities: set[str] = set() added_embedded_entities: set[str] = set() for ( path ) in inmanta_plugins.lsm.embedded_entities_tracker.get_embedded_entities_paths_or_cache( instance["service_entity"], instance.get("service_entity_version", None), cache=cache, ): # We treat any null value as a removed object, for this we need to filter them # out of the attributes first previous_paths = { str(p) for p in path.resolve_wild_cards( inmanta_plugins.lsm.embedded_entities_tracker.remove_none_values( previous_attributes ) ) } current_paths = { str(p) for p in path.resolve_wild_cards( inmanta_plugins.lsm.embedded_entities_tracker.remove_none_values( current_attributes ) ) } removed_embedded_entities |= previous_paths - current_paths added_embedded_entities |= current_paths - previous_paths for removed in sorted(removed_embedded_entities): removed_path = dict_path.to_path(removed) embedded_entity = copy.copy(removed_path.get_element(previous_attributes)) embedded_entity[removed_attribute] = True removed_path.set_element( current_attributes, embedded_entity, construct=True ) for added in sorted(added_embedded_entities): added_path = dict_path.to_path(added) embedded_entity = added_path.get_element(current_attributes) embedded_entity[added_attribute] = True updated_instances.append(instance) return updated_instances @inmanta.plugins.plugin() def insert_embedded_entities_id(instances: "dict[]") -> "dict[]": # type: ignore """ Extend the instance attributes with information about the identity of each embedded entity that is part of it. This includes the id of the service it is part of (in the `_instance_id` attribute) and the dict path that leads to this specific entity, from the root of the instance attributes (in the `_path` attribute). :param instances: The instances, as returned by lsm::all, which should get augmented with the embedded entities identities. All injected attributes (`_instance_id` and `_path`) are only added to the `attributes` attribute of the instance. """ # Go over each service and make sure to convert any dynamic proxy object into its # mainstream python counterpart updated_instances: list[dict] = [inmanta_plugins.lsm.unwrap(i) for i in instances] def add_embedded_entity_id(instance: dict, path: str, attributes: dict) -> None: """ Update the attributes to make sure it contains its identity (as an embedded entity of a service). :param instance: The instance the attributes is a part of :param path: The path to this embedded entity :param attributes: The attributes of the embedded entity that we will modify in place """ attributes["_instance_id"] = instance["id"] attributes["_path"] = path # Register the identity in each embedded entity of each instance inmanta_plugins.lsm.embedded_entities_tracker.for_each_embedded_entity( updated_instances, add_embedded_entity_id, ) return updated_instances @inmanta.plugins.plugin() def get_previous_attributes(instance: "dict") -> "dict?": # type: ignore """ Get the previous attributes set for a given instance if it is in an updating state. The selected attribute set depends on whether the instance is being validated or not :param instance: Map representation of the instance """ validating = is_validating(instance["id"]) service_entity = str(instance.get("service_entity")) service_entity_version = int(instance.get("service_entity_version", 0)) lifecycle = global_cache.get_lifecycle(service_entity, service_entity_version) state = lifecycle[instance["state"]] previous_attributes_set = ( state.previous_attr_set_on_validate if validating else state.previous_attr_set_on_export ) if previous_attributes_set is None: # There is no previous attributes to merge into the current ones return None return instance[str(previous_attributes_set) + "_attributes"] @inmanta.plugins.plugin() def get_previous_value( instance_element: "dict", attribute: "string", *, default: "any" = None, ) -> "any": """ Get the value that a specific attribute had previously. This plugin can be called in update states, when unrolling an lsm service, to figure out if a specific attribute has been modified during the update. The plugin can be called at the root of the instance dict, or on any embedded entity that is part of the current attributes of the instance. Each embedded entity is expected to have the following keys set, which is automatically added when using `lsm::all` with `include_embedded_entities_id=true`. :param instance_element: A dict that is either an instance coming out of lsm::all, or a part of the `attributes` dict of an instance coming out of lsm::all. :param attribute: The name of an attribute that is supposed to exist on the current instance_element, in the previous version of the attributes. :param default: A value to return in case the attribute doesn't exist in the previous version of the attributes. Either because it never existed, or because the embedded entity it is a part of didn't exist, or because we are not in an updating state (meaning there is no notion of previous attributes in that case). """ # First we need to figure out whether the argument we received is an # instance or an embedded entity if "_path" in instance_element and "_instance_id" in instance_element: # This is an embedded entity, we can resolve the full instance and # the embedded entity that is being targeted instance = global_cache.get_instance( get_environment(), None, instance_element["_instance_id"] ) path = instance_element["_path"] elif "id" in instance_element and "service_entity" in instance_element: # This is the full instance instance = instance_element path = "." else: raise ValueError( "Unexpected instance element format. " "Expected a dict coming out of lsm::all, which is either the instance itself, or " f"a part of its attributes. But it doesn't look like any of this: {instance_element}" ) # Get the previous attributes for our instance previous = get_previous_attributes(instance) if previous is None: # No previous attributes, not an updated state return default # Get the attribute in the previous attributes try: return dict_path.to_path(path).get_element(previous)[attribute] except LookupError: # Either the embedded entity or the attribute doesn't exist in # the previous attributes, return the default instead return default