Source code for inmanta_plugins.lsm

"""
    Inmanta LSM

    :copyright: 2019 Inmanta
    :contact: code@inmanta.com
    :license: Inmanta EULA
"""
import builtins
import contextlib
import itertools
import logging
import os
import re
import subprocess
import typing
from collections import abc, defaultdict
from typing import Any, Mapping, Optional, Sequence, Set, Tuple, Union
from uuid import UUID

import inmanta_plugins.lsm.allocation_helpers
import inmanta_plugins.lsm.partial
import pydantic
from inmanta import ast, config, const, plugins, protocol, resources
from inmanta.agent import handler
from inmanta.agent.handler import SkipResource
from inmanta.ast import (
    DirectExecuteException,
    OptionalValueException,
    WrappingRuntimeException,
)
from inmanta.ast import type as ast_type
from inmanta.ast.attribute import Attribute
from inmanta.ast.entity import Entity
from inmanta.const import LOG_LEVEL_TRACE
from inmanta.data.model import ResourceIdStr
from inmanta.docstring_parser import parse_docstring
from inmanta.execute.proxy import DynamicProxy, UnsetException
from inmanta.execute.runtime import Instance
from inmanta.execute.util import NoneValue
from inmanta.export import (
    Exporter,
    ModelDict,
    ProxiedType,
    ResourceDict,
    dependency_manager,
    export,
)
from inmanta.plugins import PluginException
from inmanta.resources import IgnoreResourceException
from inmanta_lsm import const as lsm_const
from inmanta_lsm import methods  # noqa
from inmanta_lsm.model import (
    EmbeddedServiceEntity,
    InterServiceRelation,
    Lifecycle,
    LifecycleState,
    LifecycleTransfer,
    ModelState,
    ResourceState,
    ServiceAttribute,
    ServiceEntity,
)
from inmanta_plugins.lsm.allocation_v2 import entity, framework

LOGGER = logging.getLogger(__name__)


def get_optional_value(value: DynamicProxy, attribute: str) -> Optional[object]:
    try:
        return getattr(value, attribute)
    except OptionalValueException:
        return None


[docs] @plugins.plugin def context_v2_unwrapper( assignments: "dict[]", fallback_attribute: "string", track_deletes: "bool" = False, ) -> "dict[]": """ This plugin can be used to wrap the instances coming out of lsm::all and place all allocated values in :param fallback_attribute: where they should go. The returned value is what has been given as input, except for the allocated values being set where they should. :param track_deletes: drop deleted embedded entities, even if they still exist in the fallback set. This should be used together with ContextV2Wrapper allocator. Each assignment is an attribute dict containing a fallback attribute assigned with allocated values as produced by the ContextV2 (one-level deep, keys are string representation of the paths, values are allocated values) and update the dict placing values where their key-path would reach them. :param assignments: The list of service instance dictionaries as returned by lsm::all :param fallback_attributes: The attribute name at the root of the instance attributes that contains all the allocated values e.g.:: context_v2_unwrapper( [ { "environment": "8f7bf3a5-d655-4bcb-bbd4-6222407be999", "id": "f93acfad-7894-4a12-9770-b27cbdd85c74", "service_entity": "carrierEthernetEvc", "version": 4, "config": {}, "state": "allocating", "attributes": { "allocated": { "evcEndPoints[identifier=my-evc-ep-1].uni": { "uniref": "inmanta:456-852-789", "test_value": "test_value", }, "evcEndPoints[identifier=my-evc-ep-2].uni": { "uniref": "inmanta:123-852-456", "test_value": "test_value", }, }, "another_key": "any value", }, "candidate_attributes": { "allocated": { "evcEndPoints[identifier=my-evc-ep-1].uni": { "uniref": "inmanta:456-852-789", "test_value": "test_value", }, "evcEndPoints[identifier=my-evc-ep-2].uni": { "uniref": "inmanta:123-852-456", "test_value": "test_value", }, }, "another_key": "any value", }, "active_attributes": {}, "rollback_attributes": {}, } ], "allocated", ) will return:: [ { "environment": "8f7bf3a5-d655-4bcb-bbd4-6222407be999", "id": "f93acfad-7894-4a12-9770-b27cbdd85c74", "service_entity": "carrierEthernetEvc", "version": 4, "config": {}, "state": "allocating", "attributes": { "allocated": { "evcEndPoints[identifier=my-evc-ep-1].uni": { "uniref": "inmanta:456-852-789", "test_value": "test_value", }, "evcEndPoints[identifier=my-evc-ep-2].uni": { "uniref": "inmanta:123-852-456", "test_value": "test_value", }, }, "evcEndPoints": [ { "identifier": "my-evc-ep-1", "uni": { "uniref": "inmanta:456-852-789", "test_value": "test_value", }, }, { "identifier": "my-evc-ep-2", "uni": { "uniref": "inmanta:123-852-456", "test_value": "test_value", }, }, ], "another_key": "any value", }, "candidate_attributes": { "allocated": { "evcEndPoints[identifier=my-evc-ep-1].uni": { "uniref": "inmanta:456-852-789", "test_value": "test_value", }, "evcEndPoints[identifier=my-evc-ep-2].uni": { "uniref": "inmanta:123-852-456", "test_value": "test_value", }, }, "evcEndPoints": [ { "identifier": "my-evc-ep-1", "uni": { "uniref": "inmanta:456-852-789", "test_value": "test_value", }, }, { "identifier": "my-evc-ep-2", "uni": { "uniref": "inmanta:123-852-456", "test_value": "test_value", }, }, ], "another_key": "any value", }, "active_attributes": {}, "rollback_attributes": {}, } ] """ def for_all_attributes(assignment: dict) -> dict: for attributes_set in entity.AttributeSetName: attributes: Optional[dict] = assignment.get(attributes_set.value) if isinstance(attributes, NoneValue) or not attributes: continue assignment[attributes_set.value] = framework.unwrap_attributes( attributes, fallback_attribute, track_deletes=track_deletes ) return assignment return [ for_all_attributes(DynamicProxy.unwrap(assignment)) for assignment in assignments ]
def is_instance(ctx: plugins.Context, obj: "any", cls: "string") -> "bool": t = ctx.get_type(cls) try: t.validate(obj._get_instance()) except ast.RuntimeException: return False return True def getoptional(obj, *relations): try: for rel in relations: obj = getattr(obj, rel) return obj except OptionalValueException: return None def create_lifecycle(fsm_config) -> "dict": """ Generate a dict with the state metadata in to send to the server """ states = set() def state(st): if st is None: return None states.add(st) return st.name transfers = [ LifecycleTransfer( source=state(tr.source), target=state(tr.target), error=state(getoptional(tr, "error")), on_update=tr.on_update, on_delete=tr.on_delete, api_set_state=tr.api_set_state, resource_based=tr.resource_based, auto=tr.auto, validate=tr.validate, config_name=tr.config_name, target_operation=tr.target_operation, error_operation=tr.error_operation, description=tr.description, ) for tr in fsm_config.transfers ] states = [ LifecycleState( name=state.name, label=getoptional(state, "label"), export_resources=getoptional(state, "export_resources"), validate_self=getoptional(state, "validate_self"), validate_others=getoptional(state, "validate_others"), purge_resources=state.purge_resources, deleted=state.deleted, values={k: v for k, v in state.values.items()}, ) for state in states ] out = Lifecycle( name=fsm_config.name, states=states, transfers=transfers, initial_state=fsm_config.initial_state.name, ) out.validate_graph() return out def collect_states(fsm_config) -> "dict": """ Generate a dict with the state metadata in to send to the server """ states = set() def state(st): if st is None: return None states.add(st) for tr in fsm_config.transfers: state(tr.source) state(tr.target) state(getoptional(tr, "error")) return {state.name: state for state in states} def map_attr_type( attr: ast.attribute.Attribute, thetype: Optional[ast.type.Type] = None ) -> str: tp: ast.type.Type = ( attr.type if thetype is None else attr.type.with_base_type(thetype) ) result: Optional[str] = tp.type_string() if result is None: raise Exception("Invalid attribute type %s" % thetype) return result def unwrap(value: DynamicProxy) -> Any: """ This code is not quite the compiler domain, not is it purely the plugin domain. As such, the default unwrapping doesn't really work """ value = DynamicProxy.unwrap(value) if isinstance(value, NoneValue): return None return value class ServiceEntityBuilder: """ This class creates a ServiceEntity instance from a ServiceEntity in model. """ @classmethod def get_service_entity_from_model( cls, env: UUID, all_bindings: abc.Mapping[str, DynamicProxy], entity_binding: ProxiedType, entity_type: Entity, ) -> ServiceEntity: """ Convert the entity type object from the compiler to a ServiceEntity object. :param all_bindings: All service entity bindings defined in the model, mapped by corresponding ServiceEntity subtype. """ try: return cls._get_service_entity_from_model( env, all_bindings, entity_binding, entity_type ) except DirectExecuteException as e: raise WrappingRuntimeException( None, "Only Literal statements are allowed as defaults in model definition", e, ) @classmethod def _get_service_entity_from_model( cls, env: UUID, all_bindings: abc.Mapping[str, DynamicProxy], entity_binding: ProxiedType, entity_type: Entity, ) -> ServiceEntity: docstring = parse_docstring(entity_type.comment) ( service_attributes, embedded_service_entities, inter_service_relations, ) = cls._get_attributes( entity_type, all_bindings=all_bindings, strict_enforcement=entity_binding.strict_modifier_enforcement, ) owner = get_optional_value(entity_binding, "owner") if owner: owner = owner.service_entity_name return ServiceEntity( environment=env, name=entity_binding.service_entity_name, description=docstring.get_description(), attributes=service_attributes, embedded_entities=embedded_service_entities, inter_service_relations=inter_service_relations, lifecycle=create_lifecycle(entity_binding.lifecycle), # The config field shouldn't be populated. It is populated indirectly via the config_name attribute of a # StateTransfer. The lsm_service_entity_config_set() API call should be used to update the config of a # ServiceEntity. config={}, service_identity=entity_binding.service_identity, service_identity_display_name=entity_binding.service_identity_display_name, strict_modifier_enforcement=entity_binding.strict_modifier_enforcement, owner=owner, relation_to_owner=entity_binding.relation_to_owner, entity_annotations=cls._get_annotations_for_entity(entity_type), ) @classmethod def _get_annotation_in_attribute( cls, attribute: ast.attribute.Attribute, entity_type: Entity ) -> dict[str, str]: """ Returns the default value of the given attribute that contains attribute annotations. This method verifies the constraints that should hold on an attribute that contains annotations. :param attribute: The attribute that contains the annotations in its default values. :param entity_type: The entity that attribute belongs to. """ try: default_value = entity_type.get_default(attribute.name).execute_direct({}) except AttributeError: raise Exception( f"Attribute {attribute.name} of entity {entity_type.get_full_name()} is missing a default value" f" ({attribute.location})" ) if not isinstance(default_value, dict): raise Exception( f"Default value on attribute {attribute.name} of entity {entity_type.get_full_name()} doesn't have" f" type dict ({attribute.location})." ) if not builtins.all(isinstance(d, str) for d in default_value.values()): raise Exception( f"The default value of the attribute {attribute.name} of entity {entity_type.get_full_name()} must" f" have the type dict[str, str], but some values have a non-string type. Got: {default_value}" f" ({attribute.location})" ) return default_value @classmethod def _get_annotations_for_entity(cls, entity_type: Entity) -> dict[str, str]: """ Returns the dictionary with the annotations for the given Entity. """ annotations_attr_name = "__annotations" if annotations_attr_name not in entity_type.attributes: return {} attr = entity_type.attributes[annotations_attr_name] return cls._get_annotation_in_attribute(attr, entity_type) @classmethod def _get_annotations_for_relation( cls, relation: ast.attribute.RelationAttribute ) -> dict[str, str]: """ Returns the dictionary with the annotations for the given relation. """ if relation.source_annotations: relation_annotations_instances = [ source_annotation.get_value() for source_annotation in relation.source_annotations if isinstance(source_annotation.get_value(), Instance) and source_annotation.get_value().type.get_full_name() == "lsm::RelationAnnotations" ] if len(relation_annotations_instances) > 1: raise Exception( f"More than one lsm::RelationAnnotations instance found on relationship {relation} ({relation.location})" ) if relation_annotations_instances: instance = relation_annotations_instances[0] attribute = instance.get_attribute("annotations") annotations = attribute.get_value() if not builtins.all(isinstance(d, str) for d in annotations.values()): raise Exception( f"Attribute annotations of entity {instance.type.get_full_name()} must have the type" f" dict[str, str], but some values have a non-string type. Got: {annotations} ({attribute.location})" ) return annotations return {} @classmethod def _has_parent_entity(cls, entity_type: Entity, parent_entity: str) -> bool: """ Returns true iff this entity type has parent_entity as a parent entity. """ return parent_entity in [ p.get_full_name() for p in entity_type.get_all_parent_entities() ] @classmethod def _ensure_parent_entity( cls, entity_type: Entity, required_parent_entity: str ) -> None: """ Raise an exception when the given entity doesn't have required_parent_entity as a parent entity. """ if not cls._has_parent_entity(entity_type, required_parent_entity): raise Exception( f"{entity_type.get_full_name()} should be a subclass of {required_parent_entity}" ) @classmethod def _get_attributes( cls, entity_type: Entity, *, all_bindings: abc.Mapping[str, DynamicProxy], strict_enforcement: bool = False, is_embedded_entity: bool = False, entity_chain_from_service_entity: Optional[Sequence[Entity]] = None, ) -> Tuple[ list[ServiceAttribute], list[EmbeddedServiceEntity], list[InterServiceRelation] ]: """ :param strict_enforcement: True iff `ServiceEntityBinding.strict_modifier_enforcement` is set to True. :param is_embedded_entity: Indicates whether the given entity is an embedded entity. :param entity_chain_from_service_entity: All entities that exist between the service entity and this entity_type. """ if entity_chain_from_service_entity is None: entity_chain_from_service_entity = [] if is_embedded_entity: cls._ensure_parent_entity(entity_type, "lsm::EmbeddedEntity") else: cls._ensure_parent_entity(entity_type, "lsm::ServiceEntity") attributes: dict[str, Attribute] attribute_description_map: dict[str, str] attributes, attribute_description_map = cls._get_all_attributes_compiler_type( entity_type ) service_attrs: list[ServiceAttribute] embedded_entities: list[EmbeddedServiceEntity] ( service_attrs, embedded_entities, inter_service_relations, ) = cls._get_all_attributes_model_type( entity_type, attributes, attribute_description_map, entity_chain_from_service_entity, all_bindings, strict_enforcement, ) cls._log_warning_for_missing_documentation( entity_type, attribute_description_map, service_attrs ) return service_attrs, embedded_entities, inter_service_relations @classmethod def _get_all_attributes_compiler_type( cls, entity_type: Entity ) -> Tuple[dict[str, Attribute], dict[str, str]]: """ Get all attributes of the given entity in types used by the compiler. :return: A tuple with as a first element the attributes. The second element contains the description of attributes present in the docstring. """ # collect all attributes, not on std::Entity, lsm::ServiceEntity or lsm::ServiceBase attributes: dict[str, Attribute] = {} attribute_description_map = {} for name, attr in entity_type.attributes.items(): attributes[name] = attr docstring = parse_docstring(entity_type.comment) attribute_description_map.update(docstring.get_attribute_description_map()) for current in entity_type.get_all_parent_entities(): name = current.get_full_name() if name in [ "lsm::ServiceEntity", "lsm::ServiceBase", "lsm::EmbeddedEntity", "std::Entity", ]: continue for name, attr in current.attributes.items(): attributes.setdefault(name, attr) docstring = parse_docstring(current.comment) attribute_description_map.update(docstring.get_attribute_description_map()) return attributes, attribute_description_map @classmethod def _check_relationship_constraints( cls, attr: ast.attribute.RelationAttribute ) -> None: """ This method checks the following constraints: * The upperbound of the arity of the relationship to the embedding entity should be 1. * The name of the attribute referencing the embedding entity should start with an underscore. """ # Bidirectional relationship if attr.end is not None: # Check arity relationship to embedding entity if attr.end.low != 1 or attr.end.high != 1: raise Exception( f"Attribute {attr.end.name} of relationship {attr.end.type}.{attr.name} -- {attr.type}." f"{attr.end.name} should have arity one. The entity structure of embedded entities should " f"form a tree." ) # Check name variable referencing embedding entity if not attr.end.name.startswith("_"): attribute_to_embedding_entity = ( f"{attr.end.entity.get_full_name()}.{attr.end.name}" ) raise Exception( f"Attribute {attribute_to_embedding_entity} should start with an underscore " "because it references the embedding entity." ) @classmethod def _get_plain_attributes(cls, entity: Entity) -> Set[str]: """ Return all attributes except relational and metadata attributes. """ attributes, _ = cls._get_all_attributes_compiler_type(entity) return set( name for name, attr in attributes.items() if not ( name.startswith("_") or "__" in name or isinstance(attr, ast.attribute.RelationAttribute) ) ) @classmethod def _get_modifier( cls, attr: ast.attribute.RelationAttribute, ) -> Optional[str]: """ get the modifier from the RelationshipMetadata if found in the source_annotations of attr. """ if attr.source_annotations: relationship_metadata_entities = [ source_annotation.get_value() for source_annotation in attr.source_annotations if isinstance(source_annotation.get_value(), Instance) and source_annotation.get_value().type.get_full_name() == "lsm::RelationshipMetadata" ] if len(relationship_metadata_entities) > 1: raise Exception( "More than one attribute modifier annotation (__rwplus__, __rw__ or __r__) found on relationship " f"{attr}" ) if relationship_metadata_entities: return ( relationship_metadata_entities[0] .get_attribute("modifier") .get_value() ) return None @classmethod def _get_all_attributes_model_type( cls, entity_type: Entity, attributes_from_compiler: Mapping[str, Attribute], attribute_description_map: Mapping[str, str], entity_chain_from_service_entity: Sequence[Entity], all_bindings: abc.Mapping[str, DynamicProxy], strict_enforcement: bool = False, ) -> Tuple[ list[ServiceAttribute], list[EmbeddedServiceEntity], list[InterServiceRelation] ]: """ Get all attributes of the given entity in types used by the internal model. :param attribute_description_map: Maps the attribute name to the description present in the docstring of the entity. :param entity_chain_from_service_entity: All entities that exist between the service entity and this entity_type. :param strict_enforcement: True iff `ServiceEntityBinding.strict_modifier_enforcement` is set to True. """ defaults = entity_type.get_default_values() service_attr_dct: dict[str, dict[str, Any]] = defaultdict(dict) embedded_service_entities: list[EmbeddedServiceEntity] = [] inter_service_relations: list[InterServiceRelation] = [] # Check if loop exists between embedded entities if entity_type.get_full_name() in [ t.get_full_name() for t in entity_chain_from_service_entity ]: chain = ( " -> ".join(t.get_full_name() for t in entity_chain_from_service_entity) + f" -> {entity_type.get_full_name()}" ) raise Exception( f"A cycle exists in the embedded entity structure: {chain}. Embedded entities should form a tree structure." ) for name, attr in attributes_from_compiler.items(): try: if name.startswith("_"): # Ignore attributes starting with _ pass elif isinstance(attr, ast.attribute.RelationAttribute): if cls._has_parent_entity(attr.type, "lsm::EmbeddedEntity"): cls._check_relationship_constraints(attr) docstring = parse_docstring(attr.type.comment) ( ref_attributes, ref_embedded_service_entities, ref_inter_service_relations, ) = cls._get_attributes( attr.type, all_bindings=all_bindings, is_embedded_entity=True, entity_chain_from_service_entity=[ *entity_chain_from_service_entity, entity_type, ], strict_enforcement=strict_enforcement, ) name_attr_to_embedder: Optional[str] = ( attr.end.name if attr.end is not None else None ) ese = EmbeddedServiceEntity( name=attr.name, lower_limit=attr.low, upper_limit=attr.high, description=docstring.get_description(), attributes=ref_attributes, embedded_entities=ref_embedded_service_entities, inter_service_relations=ref_inter_service_relations, key_attributes=[] if not strict_enforcement else cls._get_key_attributes( attr.type, name_attr_to_embedder, attr.high ), entity_annotations=cls._get_annotations_for_entity( attr.type ), attribute_annotations=cls._get_annotations_for_relation( attr ), ) modifier = cls._get_modifier(attr) if modifier: ese.modifier = modifier embedded_service_entities.append(ese) elif attr.source_annotations: inter_service_relations_entries = [ entity.get_value() for entity in attr.source_annotations if isinstance(entity.get_value(), Instance) and entity.get_value().type.get_full_name() == "lsm::InterServiceRelation" ] if len(inter_service_relations_entries) == 0: pass elif len(inter_service_relations_entries) > 1: raise Exception( "More than one `__service__` annotation found on relationship " f"{attr}" ) else: docstring = parse_docstring(attr.type.comment) isr = InterServiceRelation( name=attr.name, lower_limit=attr.low, upper_limit=attr.high, description=docstring.get_description(), # use service name rather than entity name as entity type entity_type=all_bindings[ str(attr.type) ].service_entity_name, attribute_annotations=cls._get_annotations_for_relation( attr ), ) modifier = cls._get_modifier(attr) if modifier: isr.modifier = modifier inter_service_relations.append(isr) else: # values and metadata obj = re.search("(.*)__(.*)", name) if obj: # metadata source_attr_name, opt = obj.groups() # Make sure that attribute documentation via the docstring takes precedence # over attribute documentation via the <attr_name>__description field. if ( opt == "description" and source_attr_name in attribute_description_map ): continue if opt == "annotations": annotations = cls._get_annotation_in_attribute( attr, entity_type ) service_attr_dct[source_attr_name][ "attribute_annotations" ] = annotations continue service_attr_dct[source_attr_name][opt] = defaults[ name ].execute_direct({}) else: service_attr_dct[name]["name"] = name service_attr_dct[name]["type"] = get_type_as_str(attr) service_attr_dct[name][ "description" ] = attribute_description_map.get(name, None) ( validation_type, validation_params, ) = get_validation_type_and_params(attr) service_attr_dct[name]["validation_type"] = validation_type service_attr_dct[name][ "validation_parameters" ] = validation_params service_attr_dct[name]["default_value_set"] = name in defaults if name in defaults and defaults[name] is not None: value = do_execute_direct( defaults[name], f"Invalid default value for attribute {name} " "of service entity {entity_binding.service_entity_name}", ) attr.validate(DynamicProxy.unwrap(value)) service_attr_dct[name]["default_value"] = unwrap(value) except Exception: LOGGER.exception("Failed to process attribute %s.%s", entity_type, name) raise service_attributes: list[ServiceAttribute] = [] for attribute, values in service_attr_dct.items(): try: if "name" not in values.keys(): if ( "attribute_annotations" in values and attribute in entity_type.attributes and isinstance( entity_type.attributes[attribute], ast.attribute.RelationAttribute, ) ): # The annotations for a relationship were specified using the syntax for an attribute annotation. cls.raise_exception_to_use_annotation_on_relationship( entity_type, attribute, values["attribute_annotations"] ) all_metadata_attributes = [ f'{attribute}__{value if value != "attribute_annotations" else "annotations"}' for value in values.keys() ] raise Exception( f"Metadata attribute(s): { ', '.join(all_metadata_attributes) } " f"of entity {entity_type.get_full_name()} specified without a non-relational attribute {attribute}" ) service_attributes.append(ServiceAttribute(**values)) except pydantic.ValidationError as e: LOGGER.exception( "Failed to parse attribute %s due to an validation exception", attribute, ) raise e return service_attributes, embedded_service_entities, inter_service_relations @classmethod def raise_exception_to_use_annotation_on_relationship( cls, entity_type: Entity, attribute_name: str, annotations: dict[str, str] ) -> None: """ This method raises an exception about the fact that annotations on the relational attribute `attribute_name` were defined using the attribute annotation syntax instead of the syntax for relational attributes. :param entity_type: The entity the relational attribute belongs to. :param attribute_name: The name of the relational attribute. :param annotations: The annotations that were defined by the attribute annotation. """ metadata_attribute_name = f"{attribute_name}__annotations" assert metadata_attribute_name in entity_type.attributes relationship = entity_type.attributes[attribute_name] assert isinstance(relationship, ast.attribute.RelationAttribute) def _format_limits(relationship: ast.attribute.RelationAttribute) -> str: """ Returns a textual representation of the upper and lower arity of the given relationship. """ if relationship.high is None: return f"[{relationship.low}:]" elif relationship.low == relationship.high: return f"[{relationship.low}]" else: return f"[{relationship.low}:{relationship.high}]" def _format_path_for_relationship( relationship_location: ast.Location, entity_type: Entity, name_relational_attribute: Optional[str] = None, ) -> str: """ Return the path to an entity or a relational attribute as part of the textual representation of a relationship. :param relationship_location: The location of the relationship definition. :param entity_type: The entity type referenced by the relationship. :param name_relational_attribute: The name of the relational attribute. Can be set to None for the side of a unidirectional relationship that doesn't have a relational attribute. """ if relationship_location.file == entity_type.location.file: # The relationship and the entity are defined in the same namespace. # We don't have to reference the entity with its fully-qualified name. result = entity_type.name else: result = entity_type.get_full_name() if name_relational_attribute: result = f"{result}.{name_relational_attribute}" return result def _format_relationship() -> str: """ Return a textual representation of the relationship. This method returns a relationship where the -- in the middle of the relationship definition is replaced with __annotations__. """ path_relational_attr_lhs = _format_path_for_relationship( relationship.location, entity_type, relationship.name ) formatted_lhs = f"{path_relational_attr_lhs} {_format_limits(relationship)}" if relationship.end: path_relational_attr_rhs = _format_path_for_relationship( relationship.location, relationship.type, relationship.end.name ) formatted_rhs = ( f"{path_relational_attr_rhs} {_format_limits(relationship.end)}" ) else: formatted_rhs = _format_path_for_relationship( relationship.location, relationship.type ) return f"{formatted_lhs} __annotations__ {formatted_rhs}" raise Exception( f""" Metadata attribute(s): {metadata_attribute_name} of entity {entity_type.get_full_name()} on the relation {attribute_name} is specified as an attribute annotation. Write: __annotations__ = lsm::RelationAnnotations( annotations={annotations} ) {_format_relationship()} instead and remove the metadata attribute at {entity_type.attributes[metadata_attribute_name].location} """.strip() ) @classmethod def _get_key_attributes( cls, embedded_entity: Entity, name_attr_to_embedder: Optional[str], arity_upper_limit: Optional[int], ) -> list[str]: """ Return the list of attributes that uniquely identify the given embedded entity. :param arity_upper_limit: The upper limit of the arity in the relationship to the embedded entity. """ key_attributes: list[str] key_attributes_attr_name = "__lsm_key_attributes" try: key_attributes = embedded_entity.get_default( key_attributes_attr_name ).execute_direct({}) except AttributeError: raise Exception( f"A default value should be defined for the attribute {key_attributes_attr_name} " f"of entity {embedded_entity.get_full_name()}" ) potential_indices: list[ Set[str] ] = cls._get_potential_indices_for_object_identity( embedded_entity, name_attr_to_embedder ) if not isinstance(key_attributes, NoneValue): # Verify whether an index was defined for these key_attributes key_attribute_set = set(key_attributes) if key_attribute_set in potential_indices: pass elif ( name_attr_to_embedder is not None and key_attribute_set.union({name_attr_to_embedder}) in potential_indices ): pass else: raise Exception( f"Key attributes '{key_attributes}' defined with `__lsm_key_attributes` for entity " f"{embedded_entity.get_full_name()} is not an index." ) else: # Derive key attributes from defined indices if not potential_indices: if arity_upper_limit is not None and arity_upper_limit == 1: # If the upper arity is 1, there is no need to define an index to uniquely identity the embedded entity. return [] else: raise Exception( f"Failed to identify key attributes of embedded entity {embedded_entity.get_full_name()}. " f"Attribute `__lsm_key_attributes` is null and no identity could be derived from the indices of " f"the entity." ) if len(potential_indices) > 1: raise Exception( f"Failed to identify the key attributes of embedded entity {embedded_entity.get_full_name()}. Attribute " f"`__lsm_key_attributes` is null and more than one index was found that could be used to define " f"the identity of the entity. Use the `__lsm_key_attributes` attribute to define the identifying " f"attributes explicitly." ) key_attributes = potential_indices[0] # Filter out relation to embedding entity return sorted( [attr for attr in key_attributes if attr != name_attr_to_embedder] ) @classmethod def _get_potential_indices_for_object_identity( cls, embedded_entity: Entity, name_attr_to_embedder: Optional[str] ) -> list[Set[str]]: """ Return a subset of all the indices defined on the given embedded entity, that can be used to define the identity of the embedded entity from the perspective of the north-bound API. :param name_attr_to_embedder: The name of the relational attribute, defined on the embedded entity, that references to the embedding entity. Set to None is no such relationship exists. """ def is_valid_index(attribute_names_in_index: Sequence[str]) -> bool: """ Return True iff the given index can be used to define the identity of an embedded entity. """ for attribute_name in attribute_names_in_index: if attribute_name == name_attr_to_embedder: continue if attribute_name.startswith("_"): LOGGER.debug( "Ignoring index %s on entity %s as potential key attributes for the embedded entity. " "The identifying index can only contain " "attributes exposed via the north-bound API", attribute_names_in_index, embedded_entity.get_full_name(), ) return False attribute: Optional[Attribute] = embedded_entity.get_attribute( attribute_name ) assert attribute is not None # Make mypy happy if isinstance(attribute, ast.attribute.RelationAttribute): LOGGER.debug( "Ignoring index %s on entity %s as potential key attributes for the embedded entity. " "The identifying index cannot contain " "relationships except for a relationship to the entity this entity is embedded in", attribute_names_in_index, embedded_entity.get_full_name(), ) return False attribute_type: ast_type.Type = attribute.get_type().get_base_type() if isinstance(attribute_type, ast_type.List): LOGGER.info( "Ignoring index %s on entity %s as potential key attributes for the embedded entity because it" " contains the attribute %s of type list.", attribute_names_in_index, embedded_entity.get_full_name(), attribute.name, ) return False if isinstance(attribute_type, ast_type.Dict): LOGGER.info( "Ignoring index %s on entity %s as potential key attributes for the embedded entity because it" " contains the attribute %s of type dict.", attribute_names_in_index, embedded_entity.get_full_name(), attribute.name, ) return False return True return [ set(index) for index in embedded_entity.get_indices() if is_valid_index(index) ] @classmethod def _log_warning_for_missing_documentation( cls, entity_type: Entity, attribute_description_map: dict[str, str], service_attributes: list[ServiceAttribute], ) -> None: """ Log a warning when the docstring contains documentation for an attribute that is not defined in the entity. """ attributes_in_docstring = set(attribute_description_map.keys()) attributes_in_service_definition = set([s.name for s in service_attributes]) undefined_attributes_in_docstring = ( attributes_in_docstring - attributes_in_service_definition ) for attr_name in undefined_attributes_in_docstring: if attr_name.startswith("_") or "__" in attr_name: # Documenting 'private' attributes or metadata attributes shouldn't emit a warning continue LOGGER.warning( "Attribute '%s' is defined in the docstring of service entity '%s', but attribute doesn't exist.", attr_name, entity_type.name, ) def get_type_as_str(attr: ast.attribute.Attribute) -> str: base_type: ast.type.Type = attr.type.get_base_type() def get_constraint_basetype(tp: ast.type.ConstraintType): if isinstance(tp.basetype, ast.type.ConstraintType): return get_constraint_basetype(tp.basetype) return tp.basetype if isinstance(base_type, ast.type.ConstraintType): return map_attr_type(attr, get_constraint_basetype(base_type)) else: return map_attr_type(attr) def get_validation_type_and_params( attr: ast.attribute.Attribute, ) -> Tuple[Optional[str], Optional[Mapping[str, object]]]: """ If the type of the given attribute was defined with a typedef statement and it uses a supported constraint, this method returns the inmanta_lsm validation type and validation parameters. """ import inmanta_plugins.lsm.typedef as typedef result: Optional[typedef.ValidationType] = typedef.get_validation_type(attr) return result if result is not None else (None, None) def do_execute_direct( statement: ast.statements.DynamicStatement, error_msg: str ) -> Any: """ Execute the given statement directly. """ try: return DynamicProxy.return_value(statement.execute_direct({})) except DirectExecuteException as e: raise WrappingRuntimeException(None, error_msg, e) @plugins.plugin def fsm_to_dot(config: "lsm::LifecycleStateMachine") -> "string": """ Generate a dot representation of the state machine """ return create_lifecycle(config).to_dot() @plugins.plugin def render_dot(fsm: "lsm::LifecycleStateMachine") -> None: """ Render a dot graph in the current directory """ dot = fsm_to_dot(fsm) with open("fsm.dot", "w+") as fd: fd.write(dot) subprocess.check_call(["dot", "-Tsvg", "-ofsm.svg", "fsm.dot"]) def do_export( env: str, conn: protocol.endpoints.SyncClient, se: ServiceEntity, allow_instance_updates: bool, ) -> None: get_entity_result = conn.lsm_service_catalog_get_entity( tid=env, service_entity=se.name ) if get_entity_result.code == 404: export_result = conn.lsm_service_catalog_create_entity( tid=env, service_entity_definition=se ) else: kwargs = {} if allow_instance_updates: # Only set parameter if required, to preserve backward compatibility kwargs["allow_instance_updates"] = True # Config settings cannot be updated via the update_entity API call se.config = get_entity_result.result["data"]["config"] if not se.strict_modifier_enforcement: LOGGER.warning( ( "The strict_modifier_enforcement flag is not enabled for service entity binding %s. This behaviour is being" " deprecated. Please refer to the documentation for more information on how to enable it." ), se.name, ) # try export export_result = conn.lsm_service_catalog_update_entity( tid=env, service_entity=se.name, service_entity_definition=se, **kwargs ) if ( allow_instance_updates and export_result == 400 and "allow_instance_updates" in export_result.result.get("message", "") ): # Backward compatibility! LOGGER.warning( "The version of inmanta-lsm used by the compiler and server are different, this may cause issues." ) LOGGER.warning( "The server version is too old to support type updates or updates on embedded instances." ) # for backward compatibility with inmanta_lsm<2.0.1.dev # we do the call again without allow_instance_updates the argument export_result = conn.lsm_service_catalog_update_entity( tid=env, service_entity=se.name, service_entity_definition=se ) if export_result.code != 200: # Improve error reporting for # This update requires the `allow_type_updates` parameter to be set if "allow_instance_updates" in export_result.result.get("message", ""): raise Exception( "could not upload lifecycle, " "because it would require updating existing instances, " "use `--export-plugin service_entities_exporter`" ) raise Exception( f"could not upload lifecycle {export_result.result} (code: {export_result.code})" ) @export( "service_entities_exporter_strict", "lsm::ServiceEntityBinding", "lsm::ServiceEntityBindingV2", ) def export_service_entities( exporter: Exporter, types: dict[str, Sequence[ProxiedType]] = {} ) -> None: do_export_service_entities(exporter, types, False) @export( "service_entities_exporter", "lsm::ServiceEntityBinding", "lsm::ServiceEntityBindingV2", ) def service_entities_exporter_with_instance_updates( exporter: Exporter, types: dict[str, Sequence[ProxiedType]] = {} ) -> None: do_export_service_entities(exporter, types, True) def do_export_service_entities( exporter: Exporter, types: dict[str, Sequence[ProxiedType]], force: bool = False, ) -> None: conn = protocol.endpoints.SyncClient("compiler") env: Optional[UUID] = config.Config.get("config", "environment", None) if env is None: raise Exception( "The environment of the model should be configured in config>environment" ) all_bindings: dict[str, DynamicProxy] = { instance.service_entity: instance for instance in ( typing.cast(DynamicProxy, instance) for instance in itertools.chain( types["lsm::ServiceEntityBinding"], types["lsm::ServiceEntityBindingV2"] ) ) } for service_entity_binding in all_bindings.values(): try: entity_type = exporter.types[service_entity_binding.service_entity] except KeyError as e: raise PluginException( f"Entity {service_entity_binding.service_entity} in service entity binding for" f" {service_entity_binding.service_entity_name} does not exist." ) from e type_map = ServiceEntityBuilder.get_service_entity_from_model( env, all_bindings, service_entity_binding, entity_type ) do_export(env, conn, type_map, force) def get_service_entity(type_class): defaults = type_class.get_default_values() if "__service_entity_name" not in defaults: return None return defaults["__service_entity_name"].execute(None, None, None) class CacheManager: """ Entry point for all internal caches in LSM, accessed via :py:data:`global_cache`. Also caches a connection to the server. """ def __init__(self) -> None: self._instances_per_binding: dict[str, list[dict]] = {} self._current_state_cache: dict[str, Tuple[str, int]] = {} self._all_instance_cache: dict[str, dict] = {} self._client = protocol.endpoints.SyncClient("compiler") # Cache of all bindings, may be unresolvable and cause rescheduling self._binding_cache_pre: dict[str, "lsm::ServiceEntityBinding"] = {} # Cache of all lifecycles, resolved self._binding_cache_lifecycles: dict[str, dict] = {} # Cache of selectors self._selector_cache: dict[ str, "inmanta_plugins.lsm.partial.ParentSelector" ] = {} def reset(self) -> None: """Reset all state: drop all caches and renew the connection""" self._instances_per_binding = {} self._current_state_cache = {} self._client = protocol.endpoints.SyncClient("compiler") self._binding_cache_pre = {} self._binding_cache_lifecycles = {} self._all_instance_cache = {} self._selector_cache = {} def get_selector( self, root: "lsm:ServiceBinding", env: str, current_instance_id: str, validate: bool, ) -> "inmanta_plugins.lsm.partial.ParentSelector": root_name = root.service_entity_name if root_name not in self._selector_cache: self._selector_cache[ root_name ] = inmanta_plugins.lsm.partial.ParentSelector( root, env, current_instance_id, validate ) return self._selector_cache[root_name] def get_client(self) -> protocol.endpoints.SyncClient: return self._client def get_instance( self, env: str, service_entity_name: Optional[str], instance_id: Union[str, UUID], force: bool = False, include_terminated: bool = False, ) -> Optional[dict]: """ Return the service instance with the given environment, service_entity_name and instance_id or None if no such instance exists. :param force: when true, the cache is refreshed from the server :param include_terminated: when trying to pull a specific instance, and it is not in the cache, try to get it from the API directly, so that it is returned even when terminated. """ assert not ( force and not service_entity_name ), "when forcing refresh, service_entity_name must be passed in" if isinstance(instance_id, UUID): instance_id = str(instance_id) if service_entity_name: self.get_all_instances(env, service_entity_name, force=force) found = self._all_instance_cache.get(instance_id) if found is None: if include_terminated: # Try to pull it from the API directly in case it is terminated result = self._client.lsm_services_get_by_id( tid=env, service_id=instance_id ) if result.code == 404: # it is not there at all return None if result.code != 200: message = result.result.get("message") if result.result else None extra_info = f": {message}" if message else "." raise Exception( f"Unable to retrieve service instance from the API (code {result.code}){extra_info}" ) found = result.result["data"] if found is None: return None if service_entity_name and found["service_entity"] != service_entity_name: return None return found def convert_instance( self, instance: dict, validation: bool, instance_id: str ) -> Optional[dict]: """ Convert an instance from the API form to the return format of lsm::all :param instance: The instance dict as returned by the api :param validation: Whether this is a validation compile :param instance_id: The id of the instance being validated (if this is a validation compile) """ service_entity = instance["service_entity"] ls = self.get_lifecycle(service_entity) state = ls[instance["state"]] if not validation: if not state.export_resources: return None else: attributes = instance["active_attributes"] else: if instance_id == instance["id"]: take_set = state.validate_self if not take_set: return None else: attributes = instance[f"{take_set}_attributes"] else: take_set = state.validate_others if not take_set: return None else: attributes = instance[f"{take_set}_attributes"] if attributes is None: attributes = {} return { "id": instance["id"], "version": instance["version"], "state": instance["state"], "delete": instance["deleted"], "deleted": instance["deleted"], "service_entity": service_entity, "attributes": attributes, "candidate_attributes": instance["candidate_attributes"], "active_attributes": instance["active_attributes"], "rollback_attributes": instance["rollback_attributes"], } def get_all_instances( self, env: str, service_entity_name: str, force: bool = False ) -> list[dict]: """ Get all (non-terminal) instances from the server for a specific environment and service_entity_name. The result is cached and any subsequent call uses the cache. :param env: the environment to use. :param service_entity_name: the name of specific service entity for which to fetch the instances. :param force: when true, the cache is refreshed from the server :return: all instances, as dicts, in the format returned by the server. """ if force or service_entity_name not in self._instances_per_binding: # get ALL instances result = self._client.lsm_services_list( tid=env, service_entity=service_entity_name ) if result.code == 404: # the type probably needs to be registered first return [] if result.code != 200: message = result.result.get("message") if result.result else None extra_info = f": {message}" if message else "." raise Exception( f"Unable to retrieve service instance from the API (code {result.code}){extra_info}" ) out = result.result["data"] self._instances_per_binding[service_entity_name] = out for instance in out: self.cache_instance(instance) return self._instances_per_binding[service_entity_name] def cache_instance(self, instance: dict) -> None: self._all_instance_cache[instance["id"]] = instance self._current_state_cache[instance["id"]] = ( instance["state"], instance["version"], ) def get_instance_state(self, instance_id: str) -> Tuple[str, int]: """ Get the current state and version for a specific instance. Can only be called for instances retrieved via get_all_instances :param instance_id: the uuid for the service instance :return: current state and version for the specific instance """ if instance_id not in self._current_state_cache: # This should never happen, since the all() plugin is always called before the current_state() plugin raise Exception( "No service instances found, ensure the 'all' plugin has been called before" ) return self._current_state_cache[instance_id] def register_binding(self, binding) -> dict[str, object]: """Register a lifecycle binding and return its lifecycle""" name = binding.service_entity_name self._binding_cache_pre[name] = binding self.resolve_binding_cache() return self._binding_cache_lifecycles[name] def resolve_binding_cache(self) -> None: # Resolve all bindings at once, making multiple lsm::all behave more regular todo = set(self._binding_cache_pre.keys()) - set( self._binding_cache_lifecycles.keys() ) for to_resolve in todo: self._binding_cache_lifecycles[to_resolve] = collect_states( self._binding_cache_pre[to_resolve].lifecycle ) def get_lifecycle(self, entity_name: str) -> dict[str, dict[str, object]]: if entity_name not in self._binding_cache_lifecycles: raise Exception( f"No lifecycle found for {entity_name}, ensure register_binding has been called." ) return self._binding_cache_lifecycles[entity_name] global_cache = CacheManager() @plugins.plugin def all(context: plugins.Context, binding: "lsm::ServiceEntityBinding") -> "list": """ Returns a list of records for the given binding. :param binding: The entity binding :return: A list of dict with all the defined records. """ # make calls that can cause re-scheduling if binding.service_entity_name is None: raise Exception( "Only instances of entities that define its service type can be retrieved." ) global_cache.register_binding(binding) partial_compile: bool = ( os.environ.get(lsm_const.ENV_PARTIAL_COMPILE, "false").lower() == "true" ) LOGGER.log( LOG_LEVEL_TRACE, "Resolution of service binding %s: start", binding.service_entity_name, ) # This may be re-executed a few times # but complex to cache, easy to execute, so we don't bother root = binding try: while True: parent = root.owner if not parent: break root = parent except UnsetException: LOGGER.warning( "Please set the owner relation on the service entity binding for the service entity %s (instantiated at %s) " "in the constructor, even if it is null", binding.service_entity, str(binding.__instance._location), ) # only reraise when strictly required so we don't force the compiler to freeze the relation when unset if partial_compile: raise except OptionalValueException: # found root pass LOGGER.log( LOG_LEVEL_TRACE, "Resolution of service binding %s: found root %s", binding.service_entity_name, root.service_entity_name, ) siblings = set() todo = [root] while todo: item = todo.pop() if item not in siblings: siblings.add(item) with ( contextlib.suppress(UnsetException) if not partial_compile else contextlib.nullcontext() ): todo.extend(item.owned) LOGGER.log( LOG_LEVEL_TRACE, "Resolution of service binding %s: found siblings [%s]", binding.service_entity_name, ", ".join([sibling.service_entity_name for sibling in siblings]), ) for sibling in siblings: global_cache.register_binding(sibling) LOGGER.log( LOG_LEVEL_TRACE, "Resolution of service binding %s: done", binding.service_entity_name, ) # at this point, we are probably good wrt to re-execution # collect environment context service_entity_name = binding.service_entity_name env = context.get_environment_id() model_state = os.environ.get(lsm_const.ENV_MODEL_STATE, ModelState.active) validation = model_state == ModelState.candidate instance_id = os.environ.get(lsm_const.ENV_INSTANCE_ID, None) raw_instances: list[dict] if partial_compile: if instance_id is None: raise Exception(f"Environment variable {lsm_const.ENV_INSTANCE_ID} not set") if len(siblings) == 1: instance: Optional[dict] = global_cache.get_instance( env, service_entity_name, instance_id ) # instance is None if the service instance with instance_id belongs # to a different service entity than service_entity_name. raw_instances = [instance] if instance is not None else [] else: selector: inmanta_plugins.lsm.partial.ParentSelector = ( global_cache.get_selector(root, env, instance_id, validation) ) raw_instances = selector.select(service_entity_name) else: raw_instances = global_cache.get_all_instances(env, service_entity_name) if binding.allocation_spec: import inmanta_plugins.lsm.allocation as allocation allocationspec = allocation.global_allocation_collector.get_spec( binding.allocation_spec ) else: allocationspec = None instances = [] to_allocate = [] for data in raw_instances: instance = global_cache.convert_instance(data, validation, instance_id) if not instance: continue instances.append(instance) if validation and instance_id == data["id"]: to_allocate.append(instance) if to_allocate and allocationspec: LOGGER.log( logging.DEBUG, "Performing allocation for %d instances of type %s: done", len(to_allocate), binding.service_entity_name, ) allocation.do_allocate_instances(context, binding, allocationspec, to_allocate) LOGGER.log( logging.DEBUG, "lsm::all returning %d instances for type %s", len(instances), binding.service_entity_name, ) return instances @plugins.plugin def current_state(ctx: plugins.Context, fsm: "lsm::ServiceEntity") -> "dict": """ Returns the current state from the lifecycle and the next version of the instance """ instance_id = fsm.instance_id state_name, current_version = global_cache.get_instance_state(instance_id) states = {} # collect all states in the graph for transfer in fsm.entity_binding.lifecycle.transfers: states[transfer.source.name] = transfer.source states[transfer.target.name] = transfer.target try: states[transfer.error.name] = transfer.error except OptionalValueException: pass if state_name not in states: raise Exception( "The state returned from the API does not exist in the lifecycle defined in the model." ) return { "state": states[state_name]._get_instance(), "next_version": current_version + 1, } @plugins.plugin def has_current_state( ctx: plugins.Context, service_instance: "lsm::ServiceEntity", state_name: "string" ) -> "bool": """ Check whether the given service instance is currently in the given state of its lifecycle. :param service_instance: The ServiceEntity object. :param state_name: The name of the lifecycle state """ instance_id = service_instance.instance_id current_state, _ = global_cache.get_instance_state(instance_id) return current_state == state_name @plugins.plugin def is_validating(instance_id: "string") -> "bool": """ Return true if the current compile is a validating compile and the instance being validated has the given id. :param instance_id: The id of the instance we want to check for validation. """ return ( inmanta_plugins.lsm.allocation_helpers.is_validation_compile() and inmanta_plugins.lsm.allocation_helpers.get_validated_instance_id() == instance_id ) @plugins.plugin def format(__string: "string", args: "list", kwargs: "dict") -> "string": """ Format a string using python string formatter, and accepting statements which native inmanta f-string doesn't support (such as accessing dict values) :param __string: The string to apply formatting to :param args: The positional arguments to feed into the `str.format` method :param kwargs: The keyword arguments to feed into the `str.format` method """ return __string.format(*args, **kwargs) @resources.resource("lsm::LifecycleTransfer", agent="agent", id_attribute="instance_id") class LifecycleTransferResource(resources.PurgeableResource): """ A resource that collects deploy events and send them to the lsm """ fields = ("service_entity", "instance_id", "next_version", "resources") @staticmethod def get_resources(exporter, transfer): resource_list = [] for res in transfer.resources: try: resource_list.append(resources.to_id(res)) except IgnoreResourceException: pass sorted_resources = sorted(resource_list) return sorted_resources # This class should ideally inherit from the more generic `HandlerAPI` instead of `CRUDHandler` due to its limited adherence # to the interface. However, since the `HandlerAPI` class is only available on iso7+ and we want to remain compatible with # iso6 this cannot yet be done. @handler.provider("lsm::LifecycleTransfer", name="local_state") class LifecycleTransferHandler(handler.CRUDHandler): """ A handler that collects all resource statuses from resources that are part of a service instance. The deploy() method is used to determine whether the resources of a service instance are deployed successfully or not. """ client: Optional[protocol.Client] def pre( self, ctx: handler.HandlerContext, resource: LifecycleTransferResource ) -> None: self.client = protocol.Client("agent") def deploy( self, ctx: handler.HandlerContext, resource: LifecycleTransferResource, requires: dict[ResourceIdStr, const.ResourceState], ) -> None: self.pre(ctx, resource) try: all_resources_are_deployed_successfully = self._send_current_state( ctx, resource, requires ) if all_resources_are_deployed_successfully: ctx.set_status(const.ResourceState.deployed) else: ctx.set_status(const.ResourceState.failed) finally: self.post(ctx, resource) def _send_current_state( self, ctx: handler.HandlerContext, resource: LifecycleTransferResource, fine_grained_resource_states: dict[ResourceIdStr, const.ResourceState], ) -> bool: """ Report the resource states for the resources in this service instance to inmanta LSM. This method raises a SkipResource exception when one of the given resource states is a transient state (i.e. it doesn't match the failed or deployed state). :param resource: This LifecycleTransfer resource. :param fine_grained_resource_states: The resource state for each resource part of this service instance. :return: True iff all the given resource states are equal to deployed. """ # If a resource is not in events, it means that it was deployed before so we can mark it as success resource_states = {res: ResourceState.success for res in resource.resources} is_failed = False is_skip = False # Convert inmanta.const.ResourceState to inmanta_lsm.model.ResourceState for resource_id, state in fine_grained_resource_states.items(): if state == const.ResourceState.failed: resource_states[resource_id] = ResourceState.error is_failed = True elif state == const.ResourceState.deployed: # same as default list pass elif resource_id in resource_states: # some transient state that is not failed and not success, so lets skip is_skip = True # failure takes precedence over transient # transient takes precedence over success if is_skip and not is_failed: raise SkipResource() ctx.info( "Sending %(states_str)s", states_str=", ".join([f"{k}: {v}" for k, v in resource_states.items()]), states=resource_states, ) def call() -> typing.Awaitable[protocol.Result]: return self.client.lsm_services_resources_set_state( self._agent._env_id, resource.service_entity, resource.instance_id, resource.next_version - 1, resource_states, ) response = self.run_sync(call) if response.code == 409: # Conflict: Transfer already happened return not is_failed if response.code != 200: ctx.error( "Failed to set resource state on lsm.", code=response.code, result=response.result, ) # and raise exception to mark this resource as failed raise Exception("Failed to set resource state on lsm.") return not is_failed def _diff( self, current: LifecycleTransferResource, desired: LifecycleTransferResource ) -> dict[str, dict[str, Any]]: changes = super()._diff(current, desired) if "next_version" in changes: # no changes required when current > desired current = changes["next_version"]["current"] desired = changes["next_version"]["desired"] if current > desired: del changes["next_version"] return changes def read_resource( self, ctx: handler.HandlerContext, resource: LifecycleTransferResource ) -> None: """ This method is used by the dry_run code path. """ def call(): # get the state from lsm return self.client.lsm_services_get( self._agent._env_id, resource.service_entity, resource.instance_id ) response = self.run_sync(call) if response.code != 200: ctx.error( "Failed to retrieve service instance", message=response.result, status=response.code, ) raise Exception("Failed to retrieve service instance") ctx.set("instance", response.result["data"]) ctx.set("transferred", False) ctx.info( "Retrieved instance with version %(version)s", version=response.result["data"]["version"], ) resource.next_version = response.result["data"]["version"] def create_resource( self, ctx: handler.HandlerContext, resource: LifecycleTransferResource ) -> None: raise Exception( "This method should not be called, because this handler overrides the deploy() method." ) def delete_resource( self, ctx: handler.HandlerContext, resource: LifecycleTransferResource ) -> None: raise Exception( "This method should not be called, because this handler overrides the deploy() method." ) def update_resource( self, ctx: handler.HandlerContext, changes: dict, resource: LifecycleTransferResource, ) -> None: raise Exception( "This method should not be called, because this handler overrides the deploy() method." ) def post( self, ctx: handler.HandlerContext, resource: LifecycleTransferResource ) -> None: self.client = None @resources.resource("lsm::ServiceInstance", agent="agent", id_attribute="instance_id") class ServiceInstanceResource(resources.PurgeableResource): fields = ( "service_entity", "instance_id", "attributes", "skip_update_states", "rejected_states", ) @staticmethod def get_attributes(exporter, instance): return instance.attributes @staticmethod def get_instance_id(_, instance): return instance.instance_id @staticmethod def get_service_entity(_, instance): return instance.service_entity @handler.provider("lsm::ServiceInstance", name="service_instance") class ServiceInstanceHandler(handler.CRUDHandler): client: protocol.SyncClient def pre(self, ctx, resource) -> None: self.client = protocol.Client("agent") def read_instance(self, service_instance: ServiceInstanceResource): def call(): return self.client.lsm_services_get( self._agent._env_id, service_instance.service_entity, service_instance.instance_id, ) return call def read_resource( self, ctx: handler.HandlerContext, service_instance: ServiceInstanceResource ) -> None: # get the state from lsm def call(): return self.client.lsm_services_get( self._agent._env_id, service_instance.service_entity, service_instance.instance_id, ) response = self.run_sync(call) if response.code == 404: service_instance.purged = True return if response.code != 200: ctx.error( "Failed to retrieve service instance", message=response.result, status=response.code, ) raise Exception("Failed to retrieve service instance") ctx.set("instance", response.result["data"]) ctx.info( "Retrieved instance with version %(version)s", version=response.result["data"]["version"], ) service_instance.attributes = ( response.result["data"]["candidate_attributes"] if not response.result["data"]["active_attributes"] else response.result["data"]["active_attributes"] ) def create_resource( self, ctx: handler.HandlerContext, service_instance: ServiceInstanceResource ) -> None: def call(): return self.client.lsm_services_create( self._agent._env_id, service_instance.service_entity, service_instance.attributes, service_instance.instance_id, ) response = self.run_sync(call) if response.code != 200: ctx.error( "Failed to create service instance on lsm.", code=response.code, result=response.result, ) raise Exception( f"Failed to set resource state on lsm. {str(response.result)}" ) ctx.set("instance", response.result["data"]) ctx.info( "Created instance with version %(version)s", version=response.result["data"]["version"], ) service_instance.instance_id = response.result["data"]["id"] ctx.set_created() def delete_resource( self, ctx: handler.HandlerContext, service_instance: ServiceInstanceResource ) -> None: instance_read_response = self.run_sync(self.read_instance(service_instance)) instance_version = instance_read_response.result["data"]["version"] instance_current_state = instance_read_response.result["data"]["state"] def call(): return self.client.lsm_services_delete( self._agent._env_id, service_instance.service_entity, service_instance.instance_id, instance_version, ) response = self.run_sync(call) if response.code == 404 and "cannot be updated" in response.result["message"]: raise SkipResource( f"Instance {instance_read_response.result['data']['id']} " f"cannot be deleted while in state {instance_current_state}" ) if response.code != 200: ctx.error( "Failed to delete service instance on lsm.", code=response.code, result=response.result, ) raise Exception("Failed to delete service instance on lsm.") ctx.set("instance", None) ctx.set_purged() def calculate_diff( self, ctx: handler.HandlerContext, current: ServiceInstanceResource, desired: ServiceInstanceResource, ) -> dict[str, dict[str, Any]]: changes = super()._diff(current, desired) if "attributes" not in changes: return changes # Desired might not contain the attributes that have default value for attribute_name, attribute_value in changes["attributes"]["current"].items(): if attribute_name not in changes["attributes"]["desired"].keys(): changes["attributes"]["desired"][attribute_name] = attribute_value if changes["attributes"]["current"] == changes["attributes"]["desired"]: del changes["attributes"] return changes def update_resource( self, ctx: handler.HandlerContext, changes: dict, service_instance: ServiceInstanceResource, ) -> None: read_response = self.run_sync(self.read_instance(service_instance)) state = read_response.result["data"]["state"] instance_version = read_response.result["data"]["version"] if state in service_instance.skip_update_states: raise SkipResource(f"Instance cannot be updated from state {state}") if state in service_instance.rejected_states: raise Exception(f"Instance is in a rejected state: {state}") updated = {} if "attributes" in changes: # Check the desired attributes for attribute_name, attribute_value in changes["attributes"][ "desired" ].items(): # If it's already in the candidates, but with a different value, update the value if ( read_response.result["data"]["candidate_attributes"] and read_response.result["data"]["candidate_attributes"][ attribute_name ] != attribute_value ): updated[attribute_name] = attribute_value # If it's different from the active attribute, and not already in the candidates, update it if ( read_response.result["data"]["active_attributes"] and read_response.result["data"]["active_attributes"][ attribute_name ] != attribute_value and attribute_name not in updated.keys() ): updated[attribute_name] = attribute_value del changes["attributes"] if changes: raise Exception( f"Changing {changes.keys()} of an instance is not supported" ) def call(): return self.client.lsm_services_update( self._agent._env_id, service_instance.service_entity, service_instance.instance_id, instance_version, updated, ) if updated: ctx.debug( f"Updating instance {service_instance.id} with attributes {str(updated)}" ) response = self.run_sync(call) if response.code != 200: ctx.error( "Failed to update service instance on lsm.", code=response.code, result=response.result, ) raise Exception( f"Failed to update service instance on lsm. {str(response.result)}" ) ctx.set_updated() @dependency_manager def lsm_dependency_manager(entities: ModelDict, resources: ResourceDict) -> None: for res in resources.values(): if res.id.entity_type == "lsm::LifecycleTransfer": # Ensure that the resources list contains only resources that exist. # The requires set was already filtered from resources that don't exist. res.resources = sorted([r.resource_str() for r in res.requires]) def inmanta_reset_state() -> None: """ Reset the cached state for this module. """ global_cache.reset()