"""
Inmanta LSM
:copyright: 2019 Inmanta
:contact: code@inmanta.com
:license: Inmanta EULA
"""
import builtins
import copy
import itertools
import logging
import os
import re
import subprocess
import typing
import uuid
from collections import abc, defaultdict
from typing import Any, Mapping, Optional, Sequence, Set, Tuple, Union
from uuid import UUID
import inmanta.config
import inmanta.const
import inmanta.plugins
import inmanta.protocol
import inmanta_plugins.lsm.allocation_helpers
import inmanta_plugins.lsm.embedded_entities_tracker
import inmanta_plugins.lsm.partial
import pydantic
from inmanta import ast, config, const, plugins, protocol, resources
from inmanta.agent import handler
from inmanta.agent.handler import SkipResource
from inmanta.ast import (
DirectExecuteException,
OptionalValueException,
WrappingRuntimeException,
)
from inmanta.ast import type as ast_type
from inmanta.ast.attribute import Attribute
from inmanta.ast.entity import Entity
from inmanta.data.model import ResourceIdStr
from inmanta.docstring_parser import parse_docstring
from inmanta.execute.proxy import DynamicProxy
from inmanta.execute.runtime import Instance
from inmanta.execute.util import NoneValue
from inmanta.export import (
Exporter,
ModelDict,
ProxiedType,
ResourceDict,
dependency_manager,
export,
)
from inmanta.plugins import PluginException
from inmanta.resources import IgnoreResourceException
from inmanta.util import dict_path
from inmanta_lsm import const as lsm_const
from inmanta_lsm import methods # noqa
from inmanta_lsm.model import (
EmbeddedServiceEntity,
InterServiceRelation,
Lifecycle,
LifecycleState,
LifecycleTransfer,
ModelState,
ResourceState,
ServiceAttribute,
ServiceEntity,
ServiceEntityVersions,
)
from inmanta_plugins.lsm.allocation_v2 import entity, framework
try:
from inmanta_lsm.const import ENV_NO_INSTANCES
except ImportError:
# Ensure backwards compatibility with older versions of the inmanta-lsm extensions.
ENV_NO_INSTANCES = "lsm_no_instances"
LOGGER = logging.getLogger(__name__)
def get_optional_value(value: DynamicProxy, attribute: str) -> Optional[object]:
"""
Returns the value of an attribute that is known to exist on the entity.
Returns None iff the attribute/relation is set to null in the DSL.
Suppresses the OptionalValueException which is typically raised when reading an optional relation set to null.
"""
try:
return getattr(value, attribute)
except OptionalValueException:
return None
def get_environment() -> str:
"""
Figure out the id of the environment the compiler is running in.
"""
env = config.Config.get("config", "environment", None)
if env is None:
raise RuntimeError(
"The environment is not set! Are we being called in a compile?"
)
return env
[docs]
@plugins.plugin
def context_v2_unwrapper(
assignments: "dict[]",
fallback_attribute: "string",
track_deletes: "bool" = False,
) -> "dict[]":
"""
This plugin can be used to wrap the instances coming out of lsm::all and place all allocated values in
:param fallback_attribute: where they should go. The returned value is what has been given as input,
except for the allocated values being set where they should.
:param track_deletes: drop deleted embedded entities, even if they still exist in the fallback set.
This should be used together with ContextV2Wrapper allocator.
Each assignment is an attribute dict containing a fallback attribute assigned with allocated values as
produced by the ContextV2 (one-level deep, keys are string representation of the paths, values are
allocated values) and update the dict placing values where their key-path would reach them.
:param assignments: The list of service instance dictionaries as returned by lsm::all
:param fallback_attributes: The attribute name at the root of the instance attributes that contains
all the allocated values
e.g.::
context_v2_unwrapper(
[
{
"environment": "8f7bf3a5-d655-4bcb-bbd4-6222407be999",
"id": "f93acfad-7894-4a12-9770-b27cbdd85c74",
"service_entity": "carrierEthernetEvc",
"version": 4,
"config": {},
"state": "allocating",
"attributes": {
"allocated": {
"evcEndPoints[identifier=my-evc-ep-1].uni": {
"uniref": "inmanta:456-852-789",
"test_value": "test_value",
},
"evcEndPoints[identifier=my-evc-ep-2].uni": {
"uniref": "inmanta:123-852-456",
"test_value": "test_value",
},
},
"another_key": "any value",
},
"candidate_attributes": {
"allocated": {
"evcEndPoints[identifier=my-evc-ep-1].uni": {
"uniref": "inmanta:456-852-789",
"test_value": "test_value",
},
"evcEndPoints[identifier=my-evc-ep-2].uni": {
"uniref": "inmanta:123-852-456",
"test_value": "test_value",
},
},
"another_key": "any value",
},
"active_attributes": {},
"rollback_attributes": {},
}
],
"allocated",
)
will return::
[
{
"environment": "8f7bf3a5-d655-4bcb-bbd4-6222407be999",
"id": "f93acfad-7894-4a12-9770-b27cbdd85c74",
"service_entity": "carrierEthernetEvc",
"version": 4,
"config": {},
"state": "allocating",
"attributes": {
"allocated": {
"evcEndPoints[identifier=my-evc-ep-1].uni": {
"uniref": "inmanta:456-852-789",
"test_value": "test_value",
},
"evcEndPoints[identifier=my-evc-ep-2].uni": {
"uniref": "inmanta:123-852-456",
"test_value": "test_value",
},
},
"evcEndPoints": [
{
"identifier": "my-evc-ep-1",
"uni": {
"uniref": "inmanta:456-852-789",
"test_value": "test_value",
},
},
{
"identifier": "my-evc-ep-2",
"uni": {
"uniref": "inmanta:123-852-456",
"test_value": "test_value",
},
},
],
"another_key": "any value",
},
"candidate_attributes": {
"allocated": {
"evcEndPoints[identifier=my-evc-ep-1].uni": {
"uniref": "inmanta:456-852-789",
"test_value": "test_value",
},
"evcEndPoints[identifier=my-evc-ep-2].uni": {
"uniref": "inmanta:123-852-456",
"test_value": "test_value",
},
},
"evcEndPoints": [
{
"identifier": "my-evc-ep-1",
"uni": {
"uniref": "inmanta:456-852-789",
"test_value": "test_value",
},
},
{
"identifier": "my-evc-ep-2",
"uni": {
"uniref": "inmanta:123-852-456",
"test_value": "test_value",
},
},
],
"another_key": "any value",
},
"active_attributes": {},
"rollback_attributes": {},
}
]
"""
def for_all_attributes(assignment: dict) -> dict:
for attributes_set in entity.AttributeSetName:
attributes: Optional[dict] = assignment.get(attributes_set.value)
if isinstance(attributes, NoneValue) or not attributes:
continue
assignment[attributes_set.value] = framework.unwrap_attributes(
attributes, fallback_attribute, track_deletes=track_deletes
)
return assignment
return [
for_all_attributes(DynamicProxy.unwrap(assignment))
for assignment in assignments
]
def is_instance(ctx: plugins.Context, obj: "any", cls: "string") -> "bool":
t = ctx.get_type(cls)
try:
t.validate(obj._get_instance())
except ast.RuntimeException:
return False
return True
def getoptional(obj, *relations):
try:
for rel in relations:
obj = getattr(obj, rel)
return obj
except OptionalValueException:
return None
def create_lifecycle(fsm_config) -> "dict":
"""
Generate a dict with the state metadata in to send to the server
"""
states = set()
def state(st):
if st is None:
return None
states.add(st)
return st.name
transfers = [
LifecycleTransfer(
source=state(tr.source),
target=state(tr.target),
error=state(getoptional(tr, "error")),
on_update=tr.on_update,
on_delete=tr.on_delete,
api_set_state=tr.api_set_state,
resource_based=tr.resource_based,
auto=tr.auto,
validate=tr.validate,
config_name=tr.config_name,
target_operation=tr.target_operation,
error_operation=tr.error_operation,
target_same_desired_state=tr.target_same_desired_state,
error_same_desired_state=tr.error_same_desired_state,
description=tr.description,
)
for tr in fsm_config.transfers
]
states = [
LifecycleState(
name=state.name,
label=getoptional(state, "label"),
export_resources=getoptional(state, "export_resources"),
validate_self=getoptional(state, "validate_self"),
validate_others=getoptional(state, "validate_others"),
purge_resources=state.purge_resources,
deleted=state.deleted,
values={k: v for k, v in state.values.items()},
)
for state in states
]
out = Lifecycle(
name=fsm_config.name,
states=states,
transfers=transfers,
initial_state=fsm_config.initial_state.name,
)
out.validate_graph()
return out
def collect_states(fsm_config) -> "dict":
"""
Generate a dict with the state metadata in to send to the server
"""
states = set()
def state(st):
if st is None:
return None
states.add(st)
for tr in fsm_config.transfers:
state(tr.source)
state(tr.target)
state(getoptional(tr, "error"))
return {state.name: state for state in states}
def map_attr_type(
attr: ast.attribute.Attribute, thetype: Optional[ast.type.Type] = None
) -> str:
tp: ast.type.Type = (
attr.type if thetype is None else attr.type.with_base_type(thetype)
)
result: Optional[str] = tp.type_string()
if result is None:
raise Exception("Invalid attribute type %s" % thetype)
return result
def unwrap(value: DynamicProxy) -> Any:
"""
This code is not quite the compiler domain, not is it purely the plugin domain.
As such, the default unwrapping doesn't really work
"""
value = DynamicProxy.unwrap(value)
if isinstance(value, NoneValue):
return None
return value
class ServiceEntityBuilder:
"""
This class creates a ServiceEntity instance from a ServiceEntity in model.
"""
@classmethod
def get_service_entity_from_model(
cls,
env: UUID,
all_bindings: abc.Mapping[str, DynamicProxy],
entity_binding: ProxiedType,
entity_type: Entity,
) -> ServiceEntity:
"""
Convert the entity type object from the compiler to a ServiceEntity object.
:param env: the environment to use.
:param all_bindings: All service entity bindings defined in the model, mapped by corresponding ServiceEntity subtype.
:param entity_binding: The entity binding defined in the model.
:param entity_type: An Entity object representing a defined entity in the domain model.
"""
try:
return cls._get_service_entity_from_model(
env, all_bindings, entity_binding, entity_type
)
except DirectExecuteException as e:
raise WrappingRuntimeException(
None,
"Only Literal statements are allowed as defaults in model definition",
e,
)
@classmethod
def _get_service_entity_from_model(
cls,
env: UUID,
all_bindings: abc.Mapping[str, DynamicProxy],
entity_binding: ProxiedType,
entity_type: Entity,
) -> ServiceEntity:
"""
Convert the entity type object from the compiler to a ServiceEntity object.
:param env: the environment to use.
:param all_bindings: All service entity bindings defined in the model, mapped by corresponding ServiceEntity subtype.
:param entity_binding: The entity binding defined in the model.
:param entity_type: An Entity object representing a defined entity in the domain model.
"""
docstring = parse_docstring(entity_type.comment)
(
service_attributes,
embedded_service_entities,
inter_service_relations,
) = cls._get_attributes(
entity_type,
all_bindings=all_bindings,
strict_enforcement=entity_binding.strict_modifier_enforcement,
)
owner = get_optional_value(entity_binding, "owner")
if owner:
owner = owner.service.service_entity_name
return ServiceEntity(
environment=env,
name=entity_binding.service.service_entity_name,
description=docstring.get_description(),
attributes=service_attributes,
embedded_entities=embedded_service_entities,
inter_service_relations=inter_service_relations,
lifecycle=create_lifecycle(entity_binding.lifecycle),
# The config field shouldn't be populated. It is populated indirectly via the config_name attribute of a
# StateTransfer. The lsm_service_entity_config_set() API call should be used to update the config of a
# ServiceEntity.
config={},
service_identity=entity_binding.service_identity,
service_identity_display_name=entity_binding.service_identity_display_name,
strict_modifier_enforcement=entity_binding.strict_modifier_enforcement,
owner=owner,
relation_to_owner=entity_binding.relation_to_owner,
entity_annotations=cls._get_annotations_for_entity(entity_type),
version=entity_binding.version,
)
@classmethod
def _get_annotation_in_attribute(
cls, attribute: ast.attribute.Attribute, entity_type: Entity
) -> dict[str, str]:
"""
Returns the default value of the given attribute that contains attribute annotations. This method verifies the
constraints that should hold on an attribute that contains annotations.
:param attribute: The attribute that contains the annotations in its default values.
:param entity_type: The entity that attribute belongs to.
"""
try:
default_value = entity_type.get_default(attribute.name).execute_direct({})
except AttributeError:
raise Exception(
f"Attribute {attribute.name} of entity {entity_type.get_full_name()} is missing a default value"
f" ({attribute.location})"
)
if not isinstance(default_value, dict):
raise Exception(
f"Default value on attribute {attribute.name} of entity {entity_type.get_full_name()} doesn't have"
f" type dict ({attribute.location})."
)
return default_value
@classmethod
def _get_annotations_for_entity(cls, entity_type: Entity) -> dict[str, str]:
"""
Returns the dictionary with the annotations for the given Entity.
"""
annotations_attr_name = "__annotations"
if annotations_attr_name not in entity_type.attributes:
return {}
attr = entity_type.attributes[annotations_attr_name]
return cls._get_annotation_in_attribute(attr, entity_type)
@classmethod
def _get_annotations_for_relation(
cls, relation: ast.attribute.RelationAttribute
) -> dict[str, str]:
"""
Returns the dictionary with the annotations for the given relation.
"""
if relation.source_annotations:
relation_annotations_instances = [
source_annotation.get_value()
for source_annotation in relation.source_annotations
if isinstance(source_annotation.get_value(), Instance)
and source_annotation.get_value().type.get_full_name()
== "lsm::RelationAnnotations"
]
if len(relation_annotations_instances) > 1:
raise Exception(
f"More than one lsm::RelationAnnotations instance found on relationship {relation} ({relation.location})"
)
if relation_annotations_instances:
instance = relation_annotations_instances[0]
attribute = instance.get_attribute("annotations")
annotations = attribute.get_value()
if not builtins.all(isinstance(d, str) for d in annotations.values()):
raise Exception(
f"Attribute annotations of entity {instance.type.get_full_name()} must have the type"
f" dict[str, str], but some values have a non-string type. Got: {annotations} ({attribute.location})"
)
return annotations
return {}
@classmethod
def _has_parent_entity(cls, entity_type: Entity, parent_entity: str) -> bool:
"""
Returns true iff this entity type has parent_entity as a parent entity.
"""
return parent_entity in [
p.get_full_name() for p in entity_type.get_all_parent_entities()
]
@classmethod
def _ensure_parent_entity(
cls, entity_type: Entity, required_parent_entity: str
) -> None:
"""
Raise an exception when the given entity doesn't have required_parent_entity as a parent entity.
"""
if not cls._has_parent_entity(entity_type, required_parent_entity):
raise Exception(
f"{entity_type.get_full_name()} should be a subclass of {required_parent_entity}"
)
@classmethod
def _get_attributes(
cls,
entity_type: Entity,
*,
all_bindings: abc.Mapping[str, DynamicProxy],
strict_enforcement: bool = False,
is_embedded_entity: bool = False,
entity_chain_from_service_entity: Optional[Sequence[Entity]] = None,
) -> Tuple[
list[ServiceAttribute], list[EmbeddedServiceEntity], list[InterServiceRelation]
]:
"""
:param strict_enforcement: True iff `ServiceEntityBinding.strict_modifier_enforcement` is set to True.
:param is_embedded_entity: Indicates whether the given entity is an embedded entity.
:param entity_chain_from_service_entity: All entities that exist between the service entity and this entity_type.
"""
if entity_chain_from_service_entity is None:
entity_chain_from_service_entity = []
if is_embedded_entity:
cls._ensure_parent_entity(entity_type, "lsm::EmbeddedEntity")
else:
cls._ensure_parent_entity(entity_type, "lsm::ServiceEntity")
attributes: dict[str, Attribute]
attribute_description_map: dict[str, str]
attributes, attribute_description_map = cls._get_all_attributes_compiler_type(
entity_type
)
service_attrs: list[ServiceAttribute]
embedded_entities: list[EmbeddedServiceEntity]
(
service_attrs,
embedded_entities,
inter_service_relations,
) = cls._get_all_attributes_model_type(
entity_type,
attributes,
attribute_description_map,
entity_chain_from_service_entity,
all_bindings,
strict_enforcement,
)
cls._log_warning_for_missing_documentation(
entity_type, attribute_description_map, service_attrs
)
return service_attrs, embedded_entities, inter_service_relations
@classmethod
def _get_all_attributes_compiler_type(
cls, entity_type: Entity
) -> Tuple[dict[str, Attribute], dict[str, str]]:
"""
Get all attributes of the given entity in types used by the compiler.
:return: A tuple with as a first element the attributes. The second element
contains the description of attributes present in the docstring.
"""
# collect all attributes, not on std::Entity, lsm::ServiceEntity or lsm::ServiceBase
attributes: dict[str, Attribute] = {}
attribute_description_map = {}
for name, attr in entity_type.attributes.items():
attributes[name] = attr
docstring = parse_docstring(entity_type.comment)
attribute_description_map.update(docstring.get_attribute_description_map())
for current in entity_type.get_all_parent_entities():
name = current.get_full_name()
if name in [
"lsm::ServiceEntity",
"lsm::ServiceBase",
"lsm::EmbeddedEntity",
"std::Entity",
]:
continue
for name, attr in current.attributes.items():
attributes.setdefault(name, attr)
docstring = parse_docstring(current.comment)
attribute_description_map.update(docstring.get_attribute_description_map())
return attributes, attribute_description_map
@classmethod
def _check_relationship_constraints(
cls, attr: ast.attribute.RelationAttribute
) -> None:
"""
This method checks the following constraints:
* The upperbound of the arity of the relationship to the embedding entity should be 1.
* The name of the attribute referencing the embedding entity should start with an underscore.
"""
# Bidirectional relationship
if attr.end is not None:
# Check arity relationship to embedding entity
if attr.end.low not in [0, 1] or attr.end.high != 1:
raise Exception(
f"Attribute {attr.end.name} of relationship {attr.end.type}.{attr.name} -- "
f"{attr.type}.{attr.end.name} should have a lower bound of 0 or 1 and an upper "
"bound of 1 for its arity. The entity structure of embedded entities should "
"form a tree."
)
# Check name variable referencing embedding entity
if not attr.end.name.startswith("_"):
attribute_to_embedding_entity = (
f"{attr.end.entity.get_full_name()}.{attr.end.name}"
)
raise Exception(
f"Attribute {attribute_to_embedding_entity} should start with an underscore "
"because it references the embedding entity."
)
@classmethod
def _get_plain_attributes(cls, entity: Entity) -> Set[str]:
"""
Return all attributes except relational and metadata attributes.
"""
attributes, _ = cls._get_all_attributes_compiler_type(entity)
return set(
name
for name, attr in attributes.items()
if not (
name.startswith("_")
or "__" in name
or isinstance(attr, ast.attribute.RelationAttribute)
)
)
@classmethod
def _get_modifier(
cls,
attr: ast.attribute.RelationAttribute,
) -> Optional[str]:
"""
get the modifier from the RelationshipMetadata if found in the
source_annotations of attr.
"""
if attr.source_annotations:
relationship_metadata_entities = [
source_annotation.get_value()
for source_annotation in attr.source_annotations
if isinstance(source_annotation.get_value(), Instance)
and source_annotation.get_value().type.get_full_name()
== "lsm::RelationshipMetadata"
]
if len(relationship_metadata_entities) > 1:
raise Exception(
"More than one attribute modifier annotation (__rwplus__, __rw__ or __r__) found on relationship "
f"{attr}"
)
if relationship_metadata_entities:
return (
relationship_metadata_entities[0]
.get_attribute("modifier")
.get_value()
)
return None
@classmethod
def _get_all_attributes_model_type(
cls,
entity_type: Entity,
attributes_from_compiler: Mapping[str, Attribute],
attribute_description_map: Mapping[str, str],
entity_chain_from_service_entity: Sequence[Entity],
all_bindings: abc.Mapping[str, DynamicProxy],
strict_enforcement: bool = False,
) -> Tuple[
list[ServiceAttribute], list[EmbeddedServiceEntity], list[InterServiceRelation]
]:
"""
Get all attributes of the given entity in types used by the internal model.
:param attribute_description_map: Maps the attribute name to the description present in the docstring of the entity.
:param entity_chain_from_service_entity: All entities that exist between the service entity and this entity_type.
:param strict_enforcement: True iff `ServiceEntityBinding.strict_modifier_enforcement` is set to True.
"""
defaults = entity_type.get_default_values()
service_attr_dct: dict[str, dict[str, Any]] = defaultdict(dict)
embedded_service_entities: list[EmbeddedServiceEntity] = []
inter_service_relations: list[InterServiceRelation] = []
# Check if loop exists between embedded entities
if entity_type.get_full_name() in [
t.get_full_name() for t in entity_chain_from_service_entity
]:
chain = (
" -> ".join(t.get_full_name() for t in entity_chain_from_service_entity)
+ f" -> {entity_type.get_full_name()}"
)
raise Exception(
f"A cycle exists in the embedded entity structure: {chain}. Embedded entities should form a tree structure."
)
for name, attr in attributes_from_compiler.items():
try:
if name.startswith("_"):
# Ignore attributes starting with _
pass
elif isinstance(attr, ast.attribute.RelationAttribute):
if cls._has_parent_entity(attr.type, "lsm::EmbeddedEntity"):
cls._check_relationship_constraints(attr)
docstring = parse_docstring(attr.type.comment)
(
ref_attributes,
ref_embedded_service_entities,
ref_inter_service_relations,
) = cls._get_attributes(
attr.type,
all_bindings=all_bindings,
is_embedded_entity=True,
entity_chain_from_service_entity=[
*entity_chain_from_service_entity,
entity_type,
],
strict_enforcement=strict_enforcement,
)
name_attr_to_embedder: Optional[str] = (
attr.end.name if attr.end is not None else None
)
ese = EmbeddedServiceEntity(
name=attr.name,
lower_limit=attr.low,
upper_limit=attr.high,
description=docstring.get_description(),
attributes=ref_attributes,
embedded_entities=ref_embedded_service_entities,
inter_service_relations=ref_inter_service_relations,
key_attributes=(
[]
if not strict_enforcement
else cls._get_key_attributes(
attr.type, name_attr_to_embedder, attr.high
)
),
entity_annotations=cls._get_annotations_for_entity(
attr.type
),
attribute_annotations=cls._get_annotations_for_relation(
attr
),
)
modifier = cls._get_modifier(attr)
if modifier:
ese.modifier = modifier
embedded_service_entities.append(ese)
elif attr.source_annotations:
inter_service_relations_entries = [
entity.get_value()
for entity in attr.source_annotations
if isinstance(entity.get_value(), Instance)
and entity.get_value().type.get_full_name()
== "lsm::InterServiceRelation"
]
if len(inter_service_relations_entries) == 0:
pass
elif len(inter_service_relations_entries) > 1:
raise Exception(
"More than one `__service__` annotation found on relationship "
f"{attr}"
)
else:
docstring = parse_docstring(attr.type.comment)
isr = InterServiceRelation(
name=attr.name,
lower_limit=attr.low,
upper_limit=attr.high,
description=docstring.get_description(),
# use service name rather than entity name as entity type
entity_type=cls._get_entity_binding_for_target_inter_service_rel(
attr, all_bindings
).service.service_entity_name,
attribute_annotations=cls._get_annotations_for_relation(
attr
),
)
modifier = cls._get_modifier(attr)
if modifier:
isr.modifier = modifier
inter_service_relations.append(isr)
else:
# values and metadata
obj = re.search("(.*)__(.*)", name)
if obj:
# metadata
source_attr_name, opt = obj.groups()
# Make sure that attribute documentation via the docstring takes precedence
# over attribute documentation via the <attr_name>__description field.
if (
opt == "description"
and source_attr_name in attribute_description_map
):
continue
if opt == "annotations":
annotations = cls._get_annotation_in_attribute(
attr, entity_type
)
service_attr_dct[source_attr_name][
"attribute_annotations"
] = annotations
continue
service_attr_dct[source_attr_name][opt] = defaults[
name
].execute_direct({})
else:
service_attr_dct[name]["name"] = name
service_attr_dct[name]["type"] = get_type_as_str(attr)
service_attr_dct[name]["description"] = (
attribute_description_map.get(name, None)
)
(
validation_type,
validation_params,
) = get_validation_type_and_params(attr)
service_attr_dct[name]["validation_type"] = validation_type
service_attr_dct[name][
"validation_parameters"
] = validation_params
service_attr_dct[name]["default_value_set"] = name in defaults
if name in defaults and defaults[name] is not None:
value = do_execute_direct(
defaults[name],
f"Invalid default value for attribute {name} "
"of service entity {entity_binding.service.service_entity_name}",
)
attr.validate(DynamicProxy.unwrap(value))
service_attr_dct[name]["default_value"] = unwrap(value)
except Exception:
LOGGER.exception("Failed to process attribute %s.%s", entity_type, name)
raise
service_attributes: list[ServiceAttribute] = []
for attribute, values in service_attr_dct.items():
try:
if "name" not in values.keys():
if (
"attribute_annotations" in values
and attribute in entity_type.attributes
and isinstance(
entity_type.attributes[attribute],
ast.attribute.RelationAttribute,
)
):
# The annotations for a relationship were specified using the syntax for an attribute annotation.
cls.raise_exception_to_use_annotation_on_relationship(
entity_type, attribute, values["attribute_annotations"]
)
all_metadata_attributes = [
f'{attribute}__{value if value != "attribute_annotations" else "annotations"}'
for value in values.keys()
]
raise Exception(
f"Metadata attribute(s): { ', '.join(all_metadata_attributes) } "
f"of entity {entity_type.get_full_name()} specified without a non-relational attribute {attribute}"
)
service_attributes.append(ServiceAttribute(**values))
except pydantic.ValidationError as e:
LOGGER.exception(
"Failed to parse attribute %s due to an validation exception",
attribute,
)
raise e
return service_attributes, embedded_service_entities, inter_service_relations
@classmethod
def _get_entity_binding_for_target_inter_service_rel(
cls,
attribute: ast.attribute.RelationAttribute,
all_bindings: abc.Mapping[str, DynamicProxy],
) -> DynamicProxy:
"""
:attr attribute: The RelationAttribute for the inter-service relationship.
:attr all_bindings: Mapping from the fully-qualified name of a service entity to its ServiceEntityBinding.
"""
if str(attribute.type) in all_bindings:
return all_bindings[str(attribute.type)]
# No service entity binding exists for the type of the given attribute. Maybe polymorphism was used
# in the model. Search for bindings associated with child entities.
bindings_for_children = [
all_bindings[str(child.get_full_name())]
for child in attribute.type.get_all_child_entities()
if str(child.get_full_name()) in all_bindings
]
if len(bindings_for_children) == 0:
raise Exception(
f"No service entity binding found for target of inter-service relationship {attribute}"
)
elif len(bindings_for_children) > 1:
service_entity_names = ", ".join(
sorted(
binding.service.service_entity_name
for binding in bindings_for_children
)
)
raise Exception(
f"Could not infer the correct type for inter-service relationship {attribute}:"
f" more than one service entity binding exists for the target type: {service_entity_names}"
)
return bindings_for_children[0]
@classmethod
def raise_exception_to_use_annotation_on_relationship(
cls, entity_type: Entity, attribute_name: str, annotations: dict[str, str]
) -> None:
"""
This method raises an exception about the fact that annotations on the relational attribute `attribute_name` were
defined using the attribute annotation syntax instead of the syntax for relational attributes.
:param entity_type: The entity the relational attribute belongs to.
:param attribute_name: The name of the relational attribute.
:param annotations: The annotations that were defined by the attribute annotation.
"""
metadata_attribute_name = f"{attribute_name}__annotations"
assert metadata_attribute_name in entity_type.attributes
relationship = entity_type.attributes[attribute_name]
assert isinstance(relationship, ast.attribute.RelationAttribute)
def _format_limits(relationship: ast.attribute.RelationAttribute) -> str:
"""
Returns a textual representation of the upper and lower arity of the given relationship.
"""
if relationship.high is None:
return f"[{relationship.low}:]"
elif relationship.low == relationship.high:
return f"[{relationship.low}]"
else:
return f"[{relationship.low}:{relationship.high}]"
def _format_path_for_relationship(
relationship_location: ast.Location,
entity_type: Entity,
name_relational_attribute: Optional[str] = None,
) -> str:
"""
Return the path to an entity or a relational attribute as part of the textual representation of a relationship.
:param relationship_location: The location of the relationship definition.
:param entity_type: The entity type referenced by the relationship.
:param name_relational_attribute: The name of the relational attribute. Can be set to None for the side of a
unidirectional relationship that doesn't have a relational attribute.
"""
if relationship_location.file == entity_type.location.file:
# The relationship and the entity are defined in the same namespace.
# We don't have to reference the entity with its fully-qualified name.
result = entity_type.name
else:
result = entity_type.get_full_name()
if name_relational_attribute:
result = f"{result}.{name_relational_attribute}"
return result
def _format_relationship() -> str:
"""
Return a textual representation of the relationship. This method returns a relationship where
the -- in the middle of the relationship definition is replaced with __annotations__.
"""
path_relational_attr_lhs = _format_path_for_relationship(
relationship.location, entity_type, relationship.name
)
formatted_lhs = f"{path_relational_attr_lhs} {_format_limits(relationship)}"
if relationship.end:
path_relational_attr_rhs = _format_path_for_relationship(
relationship.location, relationship.type, relationship.end.name
)
formatted_rhs = (
f"{path_relational_attr_rhs} {_format_limits(relationship.end)}"
)
else:
formatted_rhs = _format_path_for_relationship(
relationship.location, relationship.type
)
return f"{formatted_lhs} __annotations__ {formatted_rhs}"
raise Exception(
f"""
Metadata attribute(s): {metadata_attribute_name} of entity {entity_type.get_full_name()} on the relation {attribute_name}
is specified as an attribute annotation. Write:
__annotations__ = lsm::RelationAnnotations(
annotations={annotations}
)
{_format_relationship()}
instead and remove the metadata attribute at {entity_type.attributes[metadata_attribute_name].location}
""".strip()
)
@classmethod
def _get_key_attributes(
cls,
embedded_entity: Entity,
name_attr_to_embedder: Optional[str],
arity_upper_limit: Optional[int],
) -> list[str]:
"""
Return the list of attributes that uniquely identify the given embedded entity.
:param arity_upper_limit: The upper limit of the arity in the relationship to the embedded entity.
"""
key_attributes: list[str]
key_attributes_attr_name = "__lsm_key_attributes"
try:
key_attributes = embedded_entity.get_default(
key_attributes_attr_name
).execute_direct({})
except AttributeError:
raise Exception(
f"A default value should be defined for the attribute {key_attributes_attr_name} "
f"of entity {embedded_entity.get_full_name()}"
)
potential_indices: list[Set[str]] = (
cls._get_potential_indices_for_object_identity(
embedded_entity, name_attr_to_embedder
)
)
if not isinstance(key_attributes, NoneValue):
# Verify whether an index was defined for these key_attributes
key_attribute_set = set(key_attributes)
if key_attribute_set in potential_indices:
pass
elif (
name_attr_to_embedder is not None
and key_attribute_set.union({name_attr_to_embedder})
in potential_indices
):
pass
else:
raise Exception(
f"Key attributes '{key_attributes}' defined with `__lsm_key_attributes` for entity "
f"{embedded_entity.get_full_name()} is not an index."
)
else:
# Derive key attributes from defined indices
if not potential_indices:
if arity_upper_limit is not None and arity_upper_limit == 1:
# If the upper arity is 1, there is no need to define an index to uniquely identity the embedded entity.
return []
else:
raise Exception(
f"Failed to identify key attributes of embedded entity {embedded_entity.get_full_name()}. "
f"Attribute `__lsm_key_attributes` is null and no identity could be derived from the indices of "
f"the entity."
)
if len(potential_indices) > 1:
raise Exception(
f"Failed to identify the key attributes of embedded entity {embedded_entity.get_full_name()}. Attribute "
f"`__lsm_key_attributes` is null and more than one index was found that could be used to define "
f"the identity of the entity. Use the `__lsm_key_attributes` attribute to define the identifying "
f"attributes explicitly."
)
key_attributes = potential_indices[0]
# Filter out relation to embedding entity
return sorted(
[attr for attr in key_attributes if attr != name_attr_to_embedder]
)
@classmethod
def _get_potential_indices_for_object_identity(
cls, embedded_entity: Entity, name_attr_to_embedder: Optional[str]
) -> list[Set[str]]:
"""
Return a subset of all the indices defined on the given embedded entity, that
can be used to define the identity of the embedded entity from the perspective
of the north-bound API.
:param name_attr_to_embedder: The name of the relational attribute, defined on the embedded entity,
that references to the embedding entity. Set to None is no such
relationship exists.
"""
def is_valid_index(attribute_names_in_index: Sequence[str]) -> bool:
"""
Return True iff the given index can be used to define the identity of an embedded entity.
"""
for attribute_name in attribute_names_in_index:
if attribute_name == name_attr_to_embedder:
continue
if attribute_name.startswith("_"):
LOGGER.debug(
"Ignoring index %s on entity %s as potential key attributes for the embedded entity. "
"The identifying index can only contain "
"attributes exposed via the north-bound API",
attribute_names_in_index,
embedded_entity.get_full_name(),
)
return False
attribute: Optional[Attribute] = embedded_entity.get_attribute(
attribute_name
)
assert attribute is not None # Make mypy happy
if isinstance(attribute, ast.attribute.RelationAttribute):
LOGGER.debug(
"Ignoring index %s on entity %s as potential key attributes for the embedded entity. "
"The identifying index cannot contain "
"relationships except for a relationship to the entity this entity is embedded in",
attribute_names_in_index,
embedded_entity.get_full_name(),
)
return False
attribute_type: ast_type.Type = attribute.get_type().get_base_type()
if isinstance(attribute_type, ast_type.List):
LOGGER.info(
"Ignoring index %s on entity %s as potential key attributes for the embedded entity because it"
" contains the attribute %s of type list.",
attribute_names_in_index,
embedded_entity.get_full_name(),
attribute.name,
)
return False
if isinstance(attribute_type, ast_type.Dict):
LOGGER.info(
"Ignoring index %s on entity %s as potential key attributes for the embedded entity because it"
" contains the attribute %s of type dict.",
attribute_names_in_index,
embedded_entity.get_full_name(),
attribute.name,
)
return False
return True
return [
set(index)
for index in embedded_entity.get_indices()
if is_valid_index(index)
]
@classmethod
def _log_warning_for_missing_documentation(
cls,
entity_type: Entity,
attribute_description_map: dict[str, str],
service_attributes: list[ServiceAttribute],
) -> None:
"""
Log a warning when the docstring contains documentation for an
attribute that is not defined in the entity.
"""
attributes_in_docstring = set(attribute_description_map.keys())
attributes_in_service_definition = set([s.name for s in service_attributes])
undefined_attributes_in_docstring = (
attributes_in_docstring - attributes_in_service_definition
)
for attr_name in undefined_attributes_in_docstring:
if attr_name.startswith("_") or "__" in attr_name:
# Documenting 'private' attributes or metadata attributes shouldn't emit a warning
continue
LOGGER.warning(
"Attribute '%s' is defined in the docstring of service entity '%s', but attribute doesn't exist.",
attr_name,
entity_type.name,
)
def get_type_as_str(attr: ast.attribute.Attribute) -> str:
base_type: ast.type.Type = attr.type.get_base_type()
def get_constraint_basetype(tp: ast.type.ConstraintType):
if isinstance(tp.basetype, ast.type.ConstraintType):
return get_constraint_basetype(tp.basetype)
return tp.basetype
if isinstance(base_type, ast.type.ConstraintType):
return map_attr_type(attr, get_constraint_basetype(base_type))
else:
return map_attr_type(attr)
def get_validation_type_and_params(
attr: ast.attribute.Attribute,
) -> Tuple[Optional[str], Optional[Mapping[str, object]]]:
"""
If the type of the given attribute was defined with a typedef statement and it uses a supported constraint, this method
returns the inmanta_lsm validation type and validation parameters.
"""
import inmanta_plugins.lsm.typedef as typedef
result: Optional[typedef.ValidationType] = typedef.get_validation_type(attr)
return result if result is not None else (None, None)
def do_execute_direct(
statement: ast.statements.DynamicStatement, error_msg: str
) -> Any:
"""
Execute the given statement directly.
"""
try:
return DynamicProxy.return_value(statement.execute_direct({}))
except DirectExecuteException as e:
raise WrappingRuntimeException(None, error_msg, e)
@plugins.plugin
def fsm_to_dot(config: "lsm::LifecycleStateMachine") -> "string":
"""
Generate a dot representation of the state machine
"""
return create_lifecycle(config).to_dot()
@plugins.plugin
def render_dot(fsm: "lsm::LifecycleStateMachine") -> None:
"""
Render a dot graph in the current directory
"""
dot = fsm_to_dot(fsm)
with open("fsm.dot", "w+") as fd:
fd.write(dot)
subprocess.check_call(["dot", "-Tsvg", "-ofsm.svg", "fsm.dot"])
def do_multi_version_export(
env: UUID,
conn: protocol.endpoints.SyncClient,
service_entity: str,
service_entity_versions: ServiceEntityVersions,
) -> None:
"""
Does the export for multi versioned service entities.
:param env: The environment to use.
:param conn: The connection to use.
:param service_entity: The name of the service
:param service_entity_versions: A ServiceEntityVersions object containing the ServiceEntityDefinitions for each of
the versions of the service and the default version
"""
export_result = conn.lsm_service_catalog_update_entity_versions(
tid=env,
service_entity=service_entity,
service_entity_definitions=service_entity_versions.versions,
default_version=service_entity_versions.default_version,
)
if export_result.code != 200:
if export_result.code == 404:
# lsm endpoint is not found
raise Exception(
"A version of the LSM extension that supports this operation is not loaded on the server. "
"Make sure to install a compatible version of the lsm extension (version 4.3.0 or above)."
)
raise Exception(
f"could not upload lifecycle {export_result.result} (code: {export_result.code})"
)
def do_export(
env: UUID,
conn: protocol.endpoints.SyncClient,
service_entity: ServiceEntity,
allow_instance_updates: bool,
) -> None:
"""
Does the export for unversioned service entities.
:param env: The environment to use.
:param conn: The connection to use.
:param service_entity: The definition of the service entity
:param allow_instance_updates: Allow rewriting existing instances when executing an update.
"""
get_entity_result = conn.lsm_service_catalog_get_entity(
tid=env, service_entity=service_entity.name
)
if get_entity_result.code == 404:
export_result = conn.lsm_service_catalog_create_entity(
tid=env, service_entity_definition=service_entity
)
else:
kwargs = {}
if allow_instance_updates:
# Only set parameter if required, to preserve backward compatibility
kwargs["allow_instance_updates"] = True
# Config settings cannot be updated via the update_entity API call
service_entity.config = get_entity_result.result["data"]["config"]
if not service_entity.strict_modifier_enforcement:
LOGGER.warning(
(
"The strict_modifier_enforcement flag is not enabled for service entity binding %s. This behaviour is being"
" deprecated. Please refer to the documentation for more information on how to enable it."
),
service_entity.name,
)
# try export
export_result = conn.lsm_service_catalog_update_entity(
tid=env,
service_entity=service_entity.name,
service_entity_definition=service_entity,
**kwargs,
)
if (
allow_instance_updates
and export_result == 400
and "allow_instance_updates" in export_result.result.get("message", "")
):
# Backward compatibility!
LOGGER.warning(
"The version of inmanta-lsm used by the compiler and server are different, this may cause issues."
)
LOGGER.warning(
"The server version is too old to support type updates or updates on embedded instances."
)
# for backward compatibility with inmanta_lsm<2.0.1.dev
# we do the call again without allow_instance_updates the argument
export_result = conn.lsm_service_catalog_update_entity(
tid=env,
service_entity=service_entity.name,
service_entity_definition=service_entity,
)
if export_result.code != 200:
if export_result.code == 404:
# lsm endpoint is not found
raise Exception(
"The LSM extension is not loaded on the server. Make sure to install the lsm extension, enable it through the"
" server.enabled_extensions option and restart the server."
)
# This update requires the `allow_type_updates` parameter to be set
if "allow_instance_updates" in export_result.result.get("message", ""):
raise Exception(
"could not upload lifecycle, "
"because it would require updating existing instances, "
"use `--export-plugin service_entities_exporter`"
)
raise Exception(
f"could not upload lifecycle {export_result.result} (code: {export_result.code})"
)
@export(
"service_entities_exporter_strict",
"lsm::ServiceEntityBinding",
"lsm::ServiceBinding",
)
def export_service_entities(
exporter: Exporter, types: dict[str, Sequence[ProxiedType]] = {}
) -> None:
do_export_service_entities(exporter, types, False)
@export(
"service_entities_exporter",
"lsm::ServiceEntityBinding",
"lsm::ServiceBinding",
)
def service_entities_exporter_with_instance_updates(
exporter: Exporter, types: dict[str, Sequence[ProxiedType]] = {}
) -> None:
do_export_service_entities(exporter, types, True)
def do_export_service_entities(
exporter: Exporter,
types: dict[str, Sequence[ProxiedType]],
force: bool = False,
) -> None:
def fetch_entity_type(service_entity: str, service_entity_name: str) -> Entity:
"""
Used to translate an entity path in the model to an Entity object.
:param service_entity: The fully qualified path of the entity in the model
:param service_entity_name: The human-readable name of the service entity
"""
try:
return exporter.types[service_entity]
except KeyError as e:
raise PluginException(
f"Entity {service_entity} in service entity binding for"
f" {service_entity_name} does not exist."
) from e
conn = protocol.endpoints.SyncClient("compiler")
env: Optional[UUID] = config.Config.get("config", "environment", None)
if env is None:
raise Exception(
"The environment of the model should be configured in config>environment"
)
services: list[DynamicProxy] = [
service
for service in (
typing.cast(DynamicProxy, service)
for service in itertools.chain(types["lsm::ServiceBinding"])
)
]
all_bindings: dict[str, DynamicProxy] = {}
# The same service entity in the model can be associated with multiple versions
# So we unpack them like this instead of building the dictionary directly
# We don't care which version ends up here since they all have the same service_entity_name
for service in services:
for version in service.versions:
all_bindings[version.service_entity] = version
for service in services:
versions_obj: list[ServiceEntity] = []
for service_version in service.versions:
entity_type = fetch_entity_type(
service_version.service_entity, service.service_entity_name
)
versions_obj.append(
ServiceEntityBuilder.get_service_entity_from_model(
env, all_bindings, service_version, entity_type
)
)
if len(versions_obj) == 1 and versions_obj[0].version == 0:
# We are dealing with an unversioned service, so we use the old export
do_export(env, conn, versions_obj[0], force)
else:
versioned_entity = ServiceEntityVersions(
service_entity=service.service_entity_name,
versions=versions_obj,
default_version=service.default_version,
)
do_multi_version_export(
env, conn, service.service_entity_name, versioned_entity
)
def get_service_entity(type_class):
defaults = type_class.get_default_values()
if "__service_entity_name" not in defaults:
return None
return defaults["__service_entity_name"].execute(None, None, None)
def is_partial() -> bool:
return os.environ.get(lsm_const.ENV_PARTIAL_COMPILE, "false").lower() == "true"
[docs]
class CacheManager:
"""
Entry point for all internal caches in LSM, accessed via :py:data:`global_cache`.
Also caches a connection to the server.
"""
def __init__(self) -> None:
self._instances_per_binding: dict[str, list[dict]] = {}
self._current_state_cache: dict[str, Tuple[str, int, Optional[int]]] = {}
self._all_instance_cache: dict[str, dict] = {}
self._client = protocol.endpoints.SyncClient("compiler")
# Cache of all bindings, may be unresolvable and cause rescheduling
self._binding_cache_pre: dict[
"inmanta_plugins.lsm.partial.VersionedServiceEntity", DynamicProxy
] = {}
# Cache of all lifecycles, resolved
self._binding_cache_lifecycles: dict[
"inmanta_plugins.lsm.partial.VersionedServiceEntity", dict
] = {}
# Cache of all service entity definitions we pulled from the api
self._service_entities_cache: dict[tuple[str, int | None], ServiceEntity] = {}
self._selector_factory: Optional[
typing.Callable[[str], "inmanta_plugins.lsm.partial.SelectorApi"]
] = None
self._selector: Optional["inmanta_plugins.lsm.partial.SelectorApi"] = None
[docs]
def reset(self) -> None:
"""Reset all state: drop all caches and renew the connection"""
self._instances_per_binding = {}
self._current_state_cache = {}
self._client = protocol.endpoints.SyncClient("compiler")
self._binding_cache_pre = {}
self._binding_cache_lifecycles = {}
self._service_entities_cache = {}
self._all_instance_cache = {}
self._selector = None
[docs]
def get_all_versioned_bindings(
self,
) -> dict["inmanta_plugins.lsm.partial.VersionedServiceEntity", DynamicProxy]:
"""Return all bindings that have been registered"""
return self._binding_cache_pre
[docs]
def get_all_bindings(self) -> dict[str, DynamicProxy]:
"""Return all unversioned service entity bindings that have been registered"""
bindings = self.get_all_versioned_bindings()
unversioned_bindings = {
name: binding
for (name, version), binding in bindings.items()
if version == 0
}
if len(bindings.keys()) != len(unversioned_bindings.keys()):
raise Exception(
"This method was deprecated with the introduction of versioned service entities. "
"Use get_all_versioned_bindings instead."
)
return unversioned_bindings
[docs]
def set_selector_factory(
self,
selector_factory: typing.Optional[
typing.Callable[[str], "inmanta_plugins.lsm.partial.SelectorApi"]
],
) -> None:
"""Set the selector factory that will produce a selector for partial compile
For testing: the factory is not reset when the module is reloaded. To reset, set it to None.
"""
self._selector_factory = selector_factory
def selector(self, environment: str) -> "inmanta_plugins.lsm.partial.SelectorApi":
if self._selector:
return self._selector
if not is_partial():
self._selector = inmanta_plugins.lsm.partial.AllSelector(environment)
elif self._selector_factory:
self._selector = self._selector_factory(environment)
else:
self._selector = inmanta_plugins.lsm.partial.TreeSelector(environment)
return self._selector
def get_client(self) -> protocol.endpoints.SyncClient:
return self._client
[docs]
def get_instance(
self,
env: str,
service_entity_name: Optional[str],
instance_id: Union[str, UUID],
force: bool = False,
include_terminated: bool = False,
) -> Optional[dict]:
"""
Return the service instance with the given environment, service_entity_name and instance_id or None
if no such instance exists.
:param force: when true, the cache is refreshed from the server
:param include_terminated: when trying to pull a specific instance, and it is not in the cache,
try to get it from the API directly, so that it is returned even when terminated.
"""
assert not (
force and not service_entity_name
), "when forcing refresh, service_entity_name must be passed in"
if isinstance(instance_id, UUID):
instance_id = str(instance_id)
if service_entity_name:
self.get_all_instances(env, service_entity_name, force=force)
found = self._all_instance_cache.get(instance_id)
if found is None:
if include_terminated:
# Try to pull it from the API directly in case it is terminated
result = self._client.lsm_services_get_by_id(
tid=env, service_id=instance_id
)
if result.code == 404:
# it is not there at all
return None
if result.code != 200:
message = result.result.get("message") if result.result else None
extra_info = f": {message}" if message else "."
raise Exception(
f"Unable to retrieve service instance from the API (code {result.code}){extra_info}"
)
found = result.result["data"]
if found is None:
return None
if service_entity_name and found["service_entity"] != service_entity_name:
return None
return found
[docs]
def convert_instance(
self, instance: dict, validation: bool, instance_id: Optional[str]
) -> Optional[dict]:
"""
Convert an instance from the API form to the return format of lsm::all
:param instance: The instance dict as returned by the api
:param validation: Whether this is a validation compile
:param instance_id: The id of the instance being validated (if this is a validation compile)
"""
service_entity = instance["service_entity"]
service_entity_version = instance.get("service_entity_version", 0)
ls = self.get_lifecycle(service_entity, service_entity_version)
state = ls[instance["state"]]
if not validation:
if not state.export_resources:
return None
else:
attributes = instance["active_attributes"]
else:
if instance_id == instance["id"]:
take_set = state.validate_self
if not take_set:
return None
else:
attributes = instance[f"{take_set}_attributes"]
else:
take_set = state.validate_others
if not take_set:
return None
else:
attributes = instance[f"{take_set}_attributes"]
if attributes is None:
attributes = {}
return {
"id": instance["id"],
"version": instance["version"],
"desired_state_version": instance.get("desired_state_version", None),
"state": instance["state"],
"delete": instance["deleted"],
"deleted": instance["deleted"],
"service_entity": service_entity,
"service_entity_version": instance.get("service_entity_version", 0),
"attributes": attributes,
"candidate_attributes": instance["candidate_attributes"],
"active_attributes": instance["active_attributes"],
"rollback_attributes": instance["rollback_attributes"],
}
[docs]
def get_all_instances(
self, env: str, service_entity_name: str, force: bool = False
) -> list[dict]:
"""
Get all (non-terminal) instances from the server for a specific environment and service_entity_name.
The result is cached and any subsequent call uses the cache.
:param env: the environment to use.
:param service_entity_name: the name of specific service entity for which to fetch the instances.
:param force: when true, the cache is refreshed from the server
:return: all instances, as dicts, in the format returned by the server.
"""
if force or service_entity_name not in self._instances_per_binding:
# get ALL instances
result = self._client.lsm_services_list(
tid=env, service_entity=service_entity_name
)
if result.code == 404:
# the type probably needs to be registered first
return []
if result.code != 200:
message = result.result.get("message") if result.result else None
extra_info = f": {message}" if message else "."
raise Exception(
f"Unable to retrieve service instance from the API (code {result.code}){extra_info}"
)
out = result.result["data"]
self._instances_per_binding[service_entity_name] = out
for instance in out:
self.cache_instance(instance)
return self._instances_per_binding[service_entity_name]
def cache_instance(self, instance: dict) -> None:
LOGGER.log(
inmanta.const.LogLevel.TRACE.to_int, "Caching instance %s", instance["id"]
)
self._all_instance_cache[instance["id"]] = instance
self._current_state_cache[instance["id"]] = (
instance["state"],
instance["version"],
instance.get("desired_state_version", None),
)
[docs]
def get_instance_state(
self, instance_id: str
) -> "inmanta_plugins.lsm.partial.VersionedServiceEntity":
"""
Get the current state and version for a specific instance.
Can only be called for instances retrieved via get_all_instances
:param instance_id: the uuid for the service instance
:return: current state and version for the specific instance
"""
state_name, current_version, _ = (
self.get_instance_state_with_desired_state_version(instance_id)
)
return state_name, current_version
[docs]
def get_instance_state_with_desired_state_version(
self, instance_id: str
) -> tuple[str, int, Optional[int]]:
"""
Get the current state and version for a specific instance.
Can only be called for instances retrieved via get_all_instances
:param instance_id: the uuid for the service instance
:return: current state, version and desired state version for the specific instance
"""
if instance_id not in self._current_state_cache:
# This should never happen, since the all() plugin is always called before the current_state() plugin
raise Exception(
f"No service instance {instance_id} found, ensure the 'all' plugin has been called before"
)
return self._current_state_cache[instance_id]
[docs]
def register_binding(self, binding) -> dict[str, object]:
"""Register a lifecycle binding and return its lifecycle"""
name = binding.service.service_entity_name
version = binding.version
if (name, version) not in self._binding_cache_pre:
LOGGER.debug("First registered binding %s version %s", name, version)
self._binding_cache_pre[(name, version)] = binding
self.resolve_binding_cache()
return self._binding_cache_lifecycles[(name, version)]
def resolve_binding_cache(self) -> None:
# Resolve all bindings at once, making multiple lsm::all behave more regular
todo = set(self._binding_cache_pre.keys()) - set(
self._binding_cache_lifecycles.keys()
)
for name, version in todo:
binding = self.get_binding(name, version)
self._binding_cache_lifecycles[(name, version)] = collect_states(
binding.lifecycle
)
[docs]
def get_lifecycle(
self, entity_name: str, entity_version: Optional[int] = None
) -> dict[str, dict[str, object]]:
"""
Get the lifecycle for a specific version of service entity.
:param entity_name: The name of the service entity.
:param entity_version: The version of the service entity. Optional for backwards compatibility.
If it is not provided, will only work if there is one registered version.
:return: The lifecycle of the provided service entity version.
"""
if entity_version is None:
matching_versions = [
version
for name, version in self._binding_cache_lifecycles.keys()
if name == entity_name
]
if len(matching_versions) > 1:
raise Exception(
f"Service entity {entity_name} has multiple versions registered {matching_versions},"
f" please specify a version while calling this method "
)
elif len(matching_versions) == 0:
raise Exception(
f"No lifecycle found for any version of {entity_name}, ensure register_binding has been called."
)
entity_version = matching_versions[0]
if (entity_name, entity_version) not in self._binding_cache_lifecycles:
raise Exception(
f"No lifecycle found for {entity_name} and version {entity_version}, ensure register_binding has been called."
)
return self._binding_cache_lifecycles[(entity_name, entity_version)]
[docs]
def get_binding(
self, entity_name: str, entity_version: Optional[int] = None
) -> DynamicProxy:
"""
Takes an entity name and version and returns the appropriate binding.
If entity_version is not provided we will assume it is 0 and that we are dealing with unversioned entities.
If no specific version is requested and multiple versions exist, raises an exception.
The binding returned may not be fully resolved and can raise UnsetExceptions when accessed.
"""
if entity_version is None:
entity_version = 0
entity_versions = [
version
for (name, version) in self._binding_cache_pre.keys()
if name == entity_name
]
if len(entity_versions) > 1:
raise LookupError(
f"No specific version was requested for binding with name {entity_name}. "
f"Found multiple cached versions: {entity_versions} "
)
key = (entity_name, entity_version)
if key not in self._binding_cache_pre:
raise KeyError(
f"No binding found for {entity_name} and version {entity_version}."
)
return self._binding_cache_pre[key]
[docs]
def get_service_entity(
self,
service_entity_name: str,
service_entity_version: int | None = None,
) -> ServiceEntity:
"""
Get the service entity definition with the given name from the api.
:param service_entity_name: The name of the service whose definition we want
to query.
:param service_entity_version: The version of the service entity (if multi-version
is supported by this inmanta server)
"""
cache_key = (service_entity_name, service_entity_version)
if cache_key in self._service_entities_cache:
return self._service_entities_cache[cache_key]
# Get the service definition, if a version is specified, we use the newer
# api, otherwise keep the old one, to ensure backward compatibility with older
# versions of the server
service_entity_result: inmanta.protocol.common.Result
if service_entity_version is not None:
# We have an entity version, we try the new version aware api
service_entity_result = self._client.lsm_service_catalog_get_entity_version(
tid=get_environment(),
service_entity=service_entity_name,
version=service_entity_version,
)
if service_entity_version is None or service_entity_result.code >= 500:
# No entity version or unavailable api, we try the legacy api
service_entity_result = self._client.lsm_service_catalog_get_entity(
tid=get_environment(),
service_entity=service_entity_name,
)
if service_entity_result.code != 200:
message = (
service_entity_result.result.get("message")
if service_entity_result.result
else None
)
extra_info = f": {message}" if message else "."
raise Exception(
f"Unable to retrieve service entity from the API (code {service_entity_result.code}){extra_info}"
)
# Build the service entity object, for easier data access
service_entity = ServiceEntity(**service_entity_result.result["data"])
# Cache the constructed entity for later access
self._service_entities_cache[cache_key] = service_entity
return service_entity
global_cache = CacheManager()
@plugins.plugin
def get_service_binding_version(
context: plugins.Context, service: "lsm::ServiceBinding", version: "int"
) -> "lsm::ServiceBindingVersion":
"""
Filters the specific binding version from a service
:param service: The ServiceBinding that groups the different ServiceBindingVersions
:param version: The version that we want to fetch
:return: The ServiceBindingVersion that matches the version that we provided
"""
version = [
service_binding_version
for service_binding_version in service.versions
if service_binding_version.version == version
]
if len(version) == 0:
raise Exception(
f"ServiceBindingVersion {version} not found for service {service.service_entity_name}"
)
if len(version) > 1:
raise Exception(
f"Multiple ServiceBindingVersions were found on service {service.service_entity_name} with version {version}"
)
return version[0]
@plugins.plugin
def all(
context: plugins.Context,
service: "lsm::ServiceBinding",
min_version: "int" = None,
max_version: "int" = None,
include_embedded_entities_id: "bool" = False,
include_purged_embedded_entities: "bool" = False,
added_attribute: "string" = "_added", # type: ignore
removed_attribute: "string" = "_removed", # type: ignore
) -> "list":
"""
Returns a list of records for the given binding.
:param service: The service that we are unrolling
:param include_embedded_entities_id: Set to True to make sure all embedded entities
which are part of each instance `attributes` have an identity baked in, to allow
the usage of the lsm::get_previous_value plugin.
:param include_purged_embedded_entities: Set to True to enable tracking embedded entities added or removed during
an update. At each step of the update, relevant embedded entities will be flagged as added or removed by the
lsm::all plugin. In practise, this means the `_added` or the `_removed` boolean attributes will be set on
the relevant embedded entities. To use different flagging attributes, use the `added_attribute` and
the `removed_attribute` parameters.
:param min_version: The minimum service_entity_version of the selected instances (including this version).
If left empty, there is no minimum boundary.
:param max_version: The maximum service_entity_version of the selected instances (including this version).
If left empty, there is no maximum boundary.
:param added_attribute: Use in conjunction with `include_purged_embedded_entities=True` to set the name of the
attribute flagging embedded entities that were added during an update.
:param removed_attribute: Use in conjunction with `include_purged_embedded_entities=True` to set the name of the
attribute flagging embedded entities that were removed during an update.
:return: A list of dict with all the defined records.
"""
# make calls that can cause re-scheduling
if service.service_entity_name is None:
raise Exception(
"Only instances of entities that define its service type can be retrieved."
)
is_service_catalog_update: bool = (
os.environ.get(ENV_NO_INSTANCES, "false").lower() == "true"
)
if is_service_catalog_update:
LOGGER.log(
logging.DEBUG,
"lsm::all returning 0 instances for type %s and version range (min version %s and max version %s) "
"during update of the service catalog.",
service.service_entity_name,
str(min_version),
str(max_version),
)
return []
LOGGER.log(
inmanta.const.LogLevel.TRACE.to_int,
"Entering lsm:all for %s",
service.service_entity_name,
)
for version in service.versions:
# this is a failsafe if the selector doesn't do it properly
global_cache.register_binding(version)
# collect environment context
service_entity_name = service.service_entity_name
model_state = os.environ.get(lsm_const.ENV_MODEL_STATE, ModelState.active)
validation = model_state == ModelState.candidate
env = context.get_environment_id()
# Get the selector
selector = global_cache.selector(environment=env)
selector.reload_bindings()
# at this point, we are probably good wrt to re-execution
# Parse instances ids
instance_id_env = os.environ.get(lsm_const.ENV_INSTANCE_ID, None)
if not instance_id_env:
instance_ids: list[str] = []
else:
instance_ids = instance_id_env.split(" ")
# Validate
for instance_id in instance_ids:
try:
uuid.UUID(hex=instance_id)
except ValueError:
raise PluginException(
f"Instance Id set via {lsm_const.ENV_INSTANCE_ID} is not a valid uuid: {instance_id}"
)
if validation:
if len(instance_ids) == 0:
raise Exception(
"Validation compile without instance set! This is not allowed"
)
if len(instance_ids) > 1:
raise Exception(
"Validation compile with multiple instance set! This is not allowed. instances: %s",
instance_ids,
)
instance_to_validate: Optional[str] = instance_ids[0]
else:
instance_to_validate = None
selector.register_instances(instance_ids)
# Select all instances regardless of version
# We do the version filtering below
raw_instances: list[dict] = selector.select(service_entity_name)
allocation_specs: dict[int, Optional[str]] = {
version.version: get_optional_value(version, "allocation_spec")
for version in service.versions
}
instances = []
to_allocate: dict[int, list[dict]] = defaultdict(list)
for data in raw_instances:
service_entity_version = data.get("service_entity_version", 0)
if min_version is not None and min_version > service_entity_version:
continue
if max_version is not None and max_version < service_entity_version:
continue
instance = global_cache.convert_instance(data, validation, instance_to_validate)
if not instance:
continue
instances.append(instance)
if (
validation
and instance_to_validate == data["id"]
and allocation_specs[service_entity_version] is not None
):
to_allocate[service_entity_version].append(instance)
if to_allocate:
import inmanta_plugins.lsm.allocation as allocation
for version in to_allocate.keys():
to_allocate_version = to_allocate[version]
binding = [
binding for binding in service.versions if binding.version == version
][0]
allocate_spec = allocation.global_allocation_collector.get_spec(
binding.allocation_spec
)
LOGGER.log(
logging.DEBUG,
"Performing allocation for %d instances of type %s and version %s: done",
len(to_allocate_version),
service.service_entity_name,
version,
)
allocation.do_allocate_instances(
context, binding, allocate_spec, to_allocate_version
)
LOGGER.log(
logging.DEBUG,
"lsm::all returning %d instances for type %s",
len(instances),
service_entity_name,
)
if include_purged_embedded_entities:
instances = insert_purged_embedded_entities(
instances=instances,
added_attribute=added_attribute,
removed_attribute=removed_attribute,
)
if include_embedded_entities_id:
instances = insert_embedded_entities_id(instances)
return instances
@plugins.plugin
def current_state(ctx: plugins.Context, fsm: "lsm::ServiceEntity") -> "dict":
"""
Returns the current state from the lifecycle and the next version of the instance
"""
instance_id = fsm.instance_id
state_name, current_version, current_desired_state_version = (
global_cache.get_instance_state_with_desired_state_version(instance_id)
)
states = {}
# collect all states in the graph
for transfer in fsm.entity_binding.lifecycle.transfers:
states[transfer.source.name] = transfer.source
states[transfer.target.name] = transfer.target
try:
states[transfer.error.name] = transfer.error
except OptionalValueException:
pass
if state_name not in states:
raise Exception(
"The state returned from the API does not exist in the lifecycle defined in the model."
)
return {
"state": states[state_name]._get_instance(),
"next_version": current_version + 1,
"next_desired_state_version": (
current_desired_state_version + 1
if current_desired_state_version is not None
else None
),
}
@plugins.plugin
def has_current_state(
ctx: plugins.Context, service_instance: "lsm::ServiceEntity", state_name: "string"
) -> "bool":
"""
Check whether the given service instance is currently in the given state of its lifecycle.
:param service_instance: The ServiceEntity object.
:param state_name: The name of the lifecycle state
"""
instance_id = service_instance.instance_id
current_state, _ = global_cache.get_instance_state(instance_id)
return current_state == state_name
@plugins.plugin
def is_validating(instance_id: "string") -> "bool":
"""
Return true if the current compile is a validating compile and the instance
being validated has the given id.
:param instance_id: The id of the instance we want to check for validation.
"""
return (
inmanta_plugins.lsm.allocation_helpers.is_validation_compile()
and inmanta_plugins.lsm.allocation_helpers.get_validated_instance_id()
== instance_id
)
@plugins.plugin
def format(__string: "string", args: "list", kwargs: "dict") -> "string":
"""
Format a string using python string formatter, and accepting statements which
native inmanta f-string doesn't support (such as accessing dict values)
:param __string: The string to apply formatting to
:param args: The positional arguments to feed into the `str.format` method
:param kwargs: The keyword arguments to feed into the `str.format` method
"""
return __string.format(*args, **kwargs)
@plugins.plugin()
def update_read_only_attribute(
service: "lsm::ServiceEntity", # type: ignore
attribute_path: "string", # type: ignore
*,
value: "any", # type: ignore
) -> "any":
"""
Update the value of a read-only (candidate) attribute in the service, in any compile, at any time.
The value will first be compared to the previous set value, and only be sent to the server
if it is different.
:param service: The service on which we want to set the value
:param attribute_path: The path towards the service attribute
:param value: The new value that we want to make sure is currently written in the service.
"""
# Get the instance to check the current value
instance = inmanta_plugins.lsm.allocation_helpers.get_service_instance(
service.entity_binding.service.service_entity_name,
service.instance_id,
)
# The current value is the one in the candidate attributes if they exist, in the
# active attributes otherwise
attributes = instance["candidate_attributes"] or instance["active_attributes"]
try:
current_value = dict_path.to_path(attribute_path).get_element(attributes)
except LookupError:
current_value = None
# Only write the new value if it is different from the previous one
if current_value != value:
inmanta_plugins.lsm.allocation_helpers.set_allocated_value(
service_entity=service,
attribute_path=dict_path.to_path(attribute_path),
value=value,
)
return value
@plugins.plugin()
def validate_service_index(
binding: "lsm::ServiceBindingVersion", # type: ignore
attributes: "dict",
ignored_states: "string[]" = [], # type: ignore
) -> "bool":
"""
Validate that amongst all the services of the given binding that should be taken into
account in this compile, only one of them has the provided set of attributes names and
values.
:param binding: The binding for which we want to check the services.
:param attributes: The attributes that we are looking for, the keys should be the attributes
names, and the values the expected values.
"""
def match(service: dict) -> bool:
"""
Check if the given service has for each attribute we care about, the
value we care about.
"""
# First check if the service should be taken into account in this compile
if not service:
return False
# The service is in a state that we shouldn't consider
if service["state"] in ignored_states:
return False
# Then check for all the relevant attributes
for k, v in attributes.items():
if service["attributes"][k] != v:
return False
return True
# Get all the services and check if any service also has the same set
# of attributes with the same values. If it is the case, raise an explicit
# exception
all_services = [
global_cache.convert_instance(
service,
inmanta_plugins.lsm.allocation_helpers.is_validation_compile(),
inmanta_plugins.lsm.allocation_helpers.get_validated_instance_id() or "",
)
for service in global_cache.get_all_instances(
get_environment(),
binding.service.service_entity_name,
)
]
matching_services = [service["id"] for service in all_services if match(service)]
if len(matching_services) <= 1:
# No issue there
return True
raise inmanta.plugins.PluginException(
f"The attributes {list(attributes.keys())} should form a unique combination across all "
f"services, but multiple services use the same values: {matching_services} ({attributes})"
)
@resources.resource("lsm::LifecycleTransfer", agent="agent", id_attribute="instance_id")
class LifecycleTransferResource(resources.PurgeableResource):
"""
A resource that collects deploy events and send them to the lsm
"""
service_entity: str
instance_id: uuid.UUID
next_version: int
next_desired_state_version: Optional[int]
fields = (
"service_entity",
"instance_id",
"next_version",
"next_desired_state_version",
"resources",
)
@staticmethod
def get_resources(exporter, transfer):
resource_list = []
unregistered_resources = set()
for res in transfer.resources:
try:
current_resource_id = resources.to_id(res)
if current_resource_id is None:
unregistered_resources.add(str(DynamicProxy.unwrap(res)))
else:
resource_list.append(current_resource_id)
except IgnoreResourceException:
pass
if len(unregistered_resources) > 0:
raise RuntimeError(
f"Unregistered resources are present in this export: {unregistered_resources}"
)
sorted_resources = sorted(resource_list)
return sorted_resources
# This class should ideally inherit from the more generic `HandlerAPI` instead of `CRUDHandler` due to its limited adherence
# to the interface. However, since the `HandlerAPI` class is only available on iso7+ and we want to remain compatible with
# iso6 this cannot yet be done.
@handler.provider("lsm::LifecycleTransfer", name="local_state")
class LifecycleTransferHandler(handler.CRUDHandler):
"""
A handler that collects all resource statuses from resources that are part of a service instance.
The deploy() method is used to determine whether the resources of a service instance are deployed successfully or not.
"""
client: Optional[protocol.Client]
def pre(
self, ctx: handler.HandlerContext, resource: LifecycleTransferResource
) -> None:
self.client = protocol.Client("agent")
def deploy(
self,
ctx: handler.HandlerContext,
resource: LifecycleTransferResource,
requires: dict[ResourceIdStr, const.ResourceState],
) -> None:
self.pre(ctx, resource)
try:
all_resources_are_deployed_successfully = self._send_current_state(
ctx, resource, requires
)
if all_resources_are_deployed_successfully:
ctx.set_status(const.ResourceState.deployed)
else:
ctx.set_status(const.ResourceState.failed)
finally:
self.post(ctx, resource)
def _send_current_state(
self,
ctx: handler.HandlerContext,
resource: LifecycleTransferResource,
fine_grained_resource_states: dict[ResourceIdStr, const.ResourceState],
) -> bool:
"""
Report the resource states for the resources in this service instance to inmanta LSM. This method raises a SkipResource
exception when one of the given resource states is a transient state (i.e. it doesn't match the failed or deployed
state).
:param resource: This LifecycleTransfer resource.
:param fine_grained_resource_states: The resource state for each resource part of this service instance.
:return: True iff all the given resource states are equal to deployed.
"""
# If a resource is not in events, it means that it was deployed before so we can mark it as success
resource_states = {res: ResourceState.success for res in resource.resources}
is_failed = False
skipped_resources = []
# Convert inmanta.const.ResourceState to inmanta_lsm.model.ResourceState
for resource_id, state in fine_grained_resource_states.items():
if state == const.ResourceState.failed:
resource_states[resource_id] = ResourceState.error
is_failed = True
elif state == const.ResourceState.deployed:
# same as default list
pass
elif resource_id in resource_states:
# some transient state that is not failed and not success, so lets skip
skipped_resources.append(
f"skipped because the `{resource_id}` is `{state.value}`"
)
# failure takes precedence over transient
# transient takes precedence over success
if len(skipped_resources) > 0 and not is_failed:
raise SkipResource("\n".join(skipped_resources))
ctx.info(
"Sending %(states_str)s",
states_str=", ".join([f"{k}: {v}" for k, v in resource_states.items()]),
states=resource_states,
)
set_state_args = {
"tid": self._agent.environment,
"service_entity": resource.service_entity,
"service_id": resource.instance_id,
"resource_states": resource_states,
}
if resource.next_desired_state_version is not None:
# Makes mypy happy
assert resource.next_desired_state_version is not None
set_state_args["current_desired_state_version"] = (
resource.next_desired_state_version - 1
)
set_state_args["current_version"] = None
else:
set_state_args["current_version"] = resource.next_version - 1
def call() -> typing.Awaitable[protocol.Result]:
return self.client.lsm_services_resources_set_state(**set_state_args)
response = self.run_sync(call)
if response.code == 409:
# Conflict: Transfer already happened
return not is_failed
if response.code != 200:
ctx.error(
"Failed to set resource state on lsm.",
code=response.code,
result=response.result,
)
# and raise exception to mark this resource as failed
raise Exception("Failed to set resource state on lsm.")
return not is_failed
def _diff(
self, current: LifecycleTransferResource, desired: LifecycleTransferResource
) -> dict[str, dict[str, Any]]:
changes = super()._diff(current, desired)
if "next_version" in changes:
# no changes required when current > desired
current = changes["next_version"]["current"]
desired = changes["next_version"]["desired"]
if current > desired:
del changes["next_version"]
return changes
def read_resource(
self, ctx: handler.HandlerContext, resource: LifecycleTransferResource
) -> None:
"""
This method is used by the dry_run code path.
"""
def call():
# get the state from lsm
return self.client.lsm_services_get(
self._agent.environment, resource.service_entity, resource.instance_id
)
response = self.run_sync(call)
if response.code != 200:
ctx.error(
"Failed to retrieve service instance",
message=response.result,
status=response.code,
)
raise Exception("Failed to retrieve service instance")
ctx.set("instance", response.result["data"])
ctx.set("transferred", False)
ctx.info(
"Retrieved instance with version %(version)s",
version=response.result["data"]["version"],
)
resource.next_version = response.result["data"]["version"]
resource.next_desired_state_version = response.result["data"].get(
"next_desired_state_version", None
)
def create_resource(
self, ctx: handler.HandlerContext, resource: LifecycleTransferResource
) -> None:
raise Exception(
"This method should not be called, because this handler overrides the deploy() method."
)
def delete_resource(
self, ctx: handler.HandlerContext, resource: LifecycleTransferResource
) -> None:
raise Exception(
"This method should not be called, because this handler overrides the deploy() method."
)
def update_resource(
self,
ctx: handler.HandlerContext,
changes: dict,
resource: LifecycleTransferResource,
) -> None:
raise Exception(
"This method should not be called, because this handler overrides the deploy() method."
)
def post(
self, ctx: handler.HandlerContext, resource: LifecycleTransferResource
) -> None:
self.client = None
@resources.resource("lsm::ServiceInstance", agent="agent", id_attribute="instance_id")
class ServiceInstanceResource(resources.PurgeableResource):
fields = (
"service_entity",
"instance_id",
"attributes",
"skip_update_states",
"rejected_states",
)
@staticmethod
def get_attributes(exporter, instance):
return instance.attributes
@staticmethod
def get_instance_id(_, instance):
return instance.instance_id
@staticmethod
def get_service_entity(_, instance):
return instance.service_entity
@handler.provider("lsm::ServiceInstance", name="service_instance")
class ServiceInstanceHandler(handler.CRUDHandler):
client: protocol.SyncClient
def pre(self, ctx, resource) -> None:
self.client = protocol.Client("agent")
def read_instance(self, service_instance: ServiceInstanceResource):
def call():
return self.client.lsm_services_get(
self._agent.environment,
service_instance.service_entity,
service_instance.instance_id,
)
return call
def read_resource(
self, ctx: handler.HandlerContext, service_instance: ServiceInstanceResource
) -> None:
# get the state from lsm
def call():
return self.client.lsm_services_get(
self._agent.environment,
service_instance.service_entity,
service_instance.instance_id,
)
response = self.run_sync(call)
if response.code == 404:
service_instance.purged = True
return
if response.code != 200:
ctx.error(
"Failed to retrieve service instance",
message=response.result,
status=response.code,
)
raise Exception("Failed to retrieve service instance")
ctx.set("instance", response.result["data"])
ctx.info(
"Retrieved instance with version %(version)s",
version=response.result["data"]["version"],
)
service_instance.attributes = (
response.result["data"]["candidate_attributes"]
if not response.result["data"]["active_attributes"]
else response.result["data"]["active_attributes"]
)
def create_resource(
self, ctx: handler.HandlerContext, service_instance: ServiceInstanceResource
) -> None:
def call():
return self.client.lsm_services_create(
self._agent.environment,
service_instance.service_entity,
service_instance.attributes,
service_instance.instance_id,
)
response = self.run_sync(call)
if response.code != 200:
ctx.error(
"Failed to create service instance on lsm.",
code=response.code,
result=response.result,
)
raise Exception(
f"Failed to set resource state on lsm. {str(response.result)}"
)
ctx.set("instance", response.result["data"])
ctx.info(
"Created instance with version %(version)s",
version=response.result["data"]["version"],
)
service_instance.instance_id = response.result["data"]["id"]
ctx.set_created()
def delete_resource(
self, ctx: handler.HandlerContext, service_instance: ServiceInstanceResource
) -> None:
instance_read_response = self.run_sync(self.read_instance(service_instance))
instance_version = instance_read_response.result["data"]["version"]
instance_current_state = instance_read_response.result["data"]["state"]
def call():
return self.client.lsm_services_delete(
self._agent.environment,
service_instance.service_entity,
service_instance.instance_id,
instance_version,
)
response = self.run_sync(call)
if response.code == 404 and "cannot be updated" in response.result["message"]:
raise SkipResource(
f"Instance {instance_read_response.result['data']['id']} "
f"cannot be deleted while in state {instance_current_state}"
)
if response.code != 200:
ctx.error(
"Failed to delete service instance on lsm.",
code=response.code,
result=response.result,
)
raise Exception("Failed to delete service instance on lsm.")
ctx.set("instance", None)
ctx.set_purged()
def calculate_diff(
self,
ctx: handler.HandlerContext,
current: ServiceInstanceResource,
desired: ServiceInstanceResource,
) -> dict[str, dict[str, Any]]:
changes = super()._diff(current, desired)
if "attributes" not in changes:
return changes
# Desired might not contain the attributes that have default value
for attribute_name, attribute_value in changes["attributes"]["current"].items():
if attribute_name not in changes["attributes"]["desired"].keys():
changes["attributes"]["desired"][attribute_name] = attribute_value
if changes["attributes"]["current"] == changes["attributes"]["desired"]:
del changes["attributes"]
return changes
def update_resource(
self,
ctx: handler.HandlerContext,
changes: dict,
service_instance: ServiceInstanceResource,
) -> None:
read_response = self.run_sync(self.read_instance(service_instance))
state = read_response.result["data"]["state"]
instance_version = read_response.result["data"]["version"]
if state in service_instance.skip_update_states:
raise SkipResource(f"Instance cannot be updated from state {state}")
if state in service_instance.rejected_states:
raise Exception(f"Instance is in a rejected state: {state}")
updated = {}
if "attributes" in changes:
# Check the desired attributes
for attribute_name, attribute_value in changes["attributes"][
"desired"
].items():
# If it's already in the candidates, but with a different value, update the value
if (
read_response.result["data"]["candidate_attributes"]
and read_response.result["data"]["candidate_attributes"][
attribute_name
]
!= attribute_value
):
updated[attribute_name] = attribute_value
# If it's different from the active attribute, and not already in the candidates, update it
if (
read_response.result["data"]["active_attributes"]
and read_response.result["data"]["active_attributes"][
attribute_name
]
!= attribute_value
and attribute_name not in updated.keys()
):
updated[attribute_name] = attribute_value
del changes["attributes"]
if changes:
raise Exception(
f"Changing {changes.keys()} of an instance is not supported"
)
def call():
return self.client.lsm_services_update(
self._agent.environment,
service_instance.service_entity,
service_instance.instance_id,
instance_version,
updated,
)
if updated:
ctx.debug(
f"Updating instance {service_instance.id} with attributes {str(updated)}"
)
response = self.run_sync(call)
if response.code != 200:
ctx.error(
"Failed to update service instance on lsm.",
code=response.code,
result=response.result,
)
raise Exception(
f"Failed to update service instance on lsm. {str(response.result)}"
)
ctx.set_updated()
@dependency_manager
def lsm_dependency_manager(entities: ModelDict, resources: ResourceDict) -> None:
for res in resources.values():
if res.id.entity_type == "lsm::LifecycleTransfer":
# Ensure that the resources list contains only resources that exist.
# The requires set was already filtered from resources that don't exist.
res.resources = sorted([r.resource_str() for r in res.requires])
def inmanta_reset_state() -> None:
"""
Reset the cached state for this module.
"""
global_cache.reset()
@inmanta.plugins.plugin()
def insert_purged_embedded_entities(
instances: "dict[]", # type: ignore
*,
removed_attribute: "string" = "_removed", # type: ignore
added_attribute: "string" = "_added", # type: ignore
) -> "dict[]": # type: ignore
"""
Update the instances in the provided list to include in the current attribute set
all the embedded entities that have been removed in the current update. If no update
is happening for an instance, its attributes are left untouched.
:param instances: The list of instance to update, the original dicts are not modified,
the returned values are a modified copy. Instances can be of different types.
:param removed_attribute: The attribute that should be inserted in the embedded entities
which are inserted back into the current state, to mark the entity as purged.
:param added_attribute: The attribute that should be inserted in the embedded entities
which are added into the current state, to mark the entity as "new".
"""
cache: dict[tuple[str, int | None], list[dict_path.WildDictPath]] = {}
# Go over each service and transform its attributes to include any deleted embedded
# entity
updated_instances: list[dict] = []
for instance in instances:
# Bring the attributes into the python domain so that dict path can handle it
# and we can modify its values
# This method makes a deepcopy of the original object so we can safely modify it
instance = inmanta_plugins.lsm.unwrap(instance)
previous_attributes = get_previous_attributes(instance)
if previous_attributes is None:
# We don't have a previous attributes set, there can't be any removed embedded
# entities to insert back, we can continue
updated_instances.append(instance)
continue
# The selection of the current attributes is done automatically by lsm::all, based on
# the validate_self and validate_others attributes of the state we are in.
current_attributes = instance["attributes"]
# Get all the embedded entities that were added/removed
removed_embedded_entities: set[str] = set()
added_embedded_entities: set[str] = set()
for (
path
) in inmanta_plugins.lsm.embedded_entities_tracker.get_embedded_entities_paths_or_cache(
instance["service_entity"],
instance.get("service_entity_version", None),
cache=cache,
):
# We treat any null value as a removed object, for this we need to filter them
# out of the attributes first
previous_paths = {
str(p)
for p in path.resolve_wild_cards(
inmanta_plugins.lsm.embedded_entities_tracker.remove_none_values(
previous_attributes
)
)
}
current_paths = {
str(p)
for p in path.resolve_wild_cards(
inmanta_plugins.lsm.embedded_entities_tracker.remove_none_values(
current_attributes
)
)
}
removed_embedded_entities |= previous_paths - current_paths
added_embedded_entities |= current_paths - previous_paths
for removed in sorted(removed_embedded_entities):
removed_path = dict_path.to_path(removed)
embedded_entity = copy.copy(removed_path.get_element(previous_attributes))
embedded_entity[removed_attribute] = True
removed_path.set_element(
current_attributes, embedded_entity, construct=True
)
for added in sorted(added_embedded_entities):
added_path = dict_path.to_path(added)
embedded_entity = added_path.get_element(current_attributes)
embedded_entity[added_attribute] = True
updated_instances.append(instance)
return updated_instances
@inmanta.plugins.plugin()
def insert_embedded_entities_id(instances: "dict[]") -> "dict[]": # type: ignore
"""
Extend the instance attributes with information about the identity of each
embedded entity that is part of it. This includes the id of the service it is
part of (in the `_instance_id` attribute) and the dict path that leads to this
specific entity, from the root of the instance attributes (in the `_path` attribute).
:param instances: The instances, as returned by lsm::all, which should get
augmented with the embedded entities identities. All injected attributes
(`_instance_id` and `_path`) are only added to the `attributes` attribute
of the instance.
"""
# Go over each service and make sure to convert any dynamic proxy object into its
# mainstream python counterpart
updated_instances: list[dict] = [inmanta_plugins.lsm.unwrap(i) for i in instances]
def add_embedded_entity_id(instance: dict, path: str, attributes: dict) -> None:
"""
Update the attributes to make sure it contains its identity (as an embedded entity
of a service).
:param instance: The instance the attributes is a part of
:param path: The path to this embedded entity
:param attributes: The attributes of the embedded entity that we will
modify in place
"""
attributes["_instance_id"] = instance["id"]
attributes["_path"] = path
# Register the identity in each embedded entity of each instance
inmanta_plugins.lsm.embedded_entities_tracker.for_each_embedded_entity(
updated_instances,
add_embedded_entity_id,
)
return updated_instances
@inmanta.plugins.plugin()
def get_previous_attributes(instance: "dict") -> "dict?": # type: ignore
"""
Get the previous attributes set for a given instance if it is in an updating state.
The selected attribute set depends on whether the instance is being validated or not
:param instance: Map representation of the instance
"""
validating = is_validating(instance["id"])
service_entity = str(instance.get("service_entity"))
service_entity_version = int(instance.get("service_entity_version", 0))
lifecycle = global_cache.get_lifecycle(service_entity, service_entity_version)
state = lifecycle[instance["state"]]
previous_attributes_set = (
state.previous_attr_set_on_validate
if validating
else state.previous_attr_set_on_export
)
if previous_attributes_set is None:
# There is no previous attributes to merge into the current ones
return None
return instance[str(previous_attributes_set) + "_attributes"]
@inmanta.plugins.plugin()
def get_previous_value(
instance_element: "dict",
attribute: "string",
*,
default: "any" = None,
) -> "any":
"""
Get the value that a specific attribute had previously. This plugin
can be called in update states, when unrolling an lsm service, to figure
out if a specific attribute has been modified during the update.
The plugin can be called at the root of the instance dict, or on any
embedded entity that is part of the current attributes of the instance.
Each embedded entity is expected to have the following keys set, which is
automatically added when using `lsm::all` with `include_embedded_entities_id=true`.
:param instance_element: A dict that is either an instance coming out of lsm::all, or
a part of the `attributes` dict of an instance coming out of lsm::all.
:param attribute: The name of an attribute that is supposed to exist on the current
instance_element, in the previous version of the attributes.
:param default: A value to return in case the attribute doesn't exist in the previous
version of the attributes. Either because it never existed, or because the embedded
entity it is a part of didn't exist, or because we are not in an updating state
(meaning there is no notion of previous attributes in that case).
"""
# First we need to figure out whether the argument we received is an
# instance or an embedded entity
if "_path" in instance_element and "_instance_id" in instance_element:
# This is an embedded entity, we can resolve the full instance and
# the embedded entity that is being targeted
instance = global_cache.get_instance(
get_environment(), None, instance_element["_instance_id"]
)
path = instance_element["_path"]
elif "id" in instance_element and "service_entity" in instance_element:
# This is the full instance
instance = instance_element
path = "."
else:
raise ValueError(
"Unexpected instance element format. "
"Expected a dict coming out of lsm::all, which is either the instance itself, or "
f"a part of its attributes. But it doesn't look like any of this: {instance_element}"
)
# Get the previous attributes for our instance
previous = get_previous_attributes(instance)
if previous is None:
# No previous attributes, not an updated state
return default
# Get the attribute in the previous attributes
try:
return dict_path.to_path(path).get_element(previous)[attribute]
except LookupError:
# Either the embedded entity or the attribute doesn't exist in
# the previous attributes, return the default instead
return default