"""
Inmanta LSM
:copyright: 2019 Inmanta
:contact: code@inmanta.com
:license: Inmanta EULA
"""
import builtins
import contextlib
import itertools
import logging
import os
import re
import subprocess
import typing
from collections import abc, defaultdict
from typing import Any, Mapping, Optional, Sequence, Set, Tuple, Union
from uuid import UUID
import inmanta_plugins.lsm.allocation_helpers
import inmanta_plugins.lsm.partial
import pydantic
from inmanta import ast, config, const, plugins, protocol, resources
from inmanta.agent import handler
from inmanta.agent.handler import SkipResource
from inmanta.ast import (
DirectExecuteException,
OptionalValueException,
WrappingRuntimeException,
)
from inmanta.ast import type as ast_type
from inmanta.ast.attribute import Attribute
from inmanta.ast.entity import Entity
from inmanta.const import LOG_LEVEL_TRACE
from inmanta.data.model import ResourceIdStr
from inmanta.docstring_parser import parse_docstring
from inmanta.execute.proxy import DynamicProxy, UnsetException
from inmanta.execute.runtime import Instance
from inmanta.execute.util import NoneValue
from inmanta.export import (
Exporter,
ModelDict,
ProxiedType,
ResourceDict,
dependency_manager,
export,
)
from inmanta.plugins import PluginException
from inmanta.resources import IgnoreResourceException
from inmanta_lsm import const as lsm_const
from inmanta_lsm import methods # noqa
from inmanta_lsm.model import (
EmbeddedServiceEntity,
InterServiceRelation,
Lifecycle,
LifecycleState,
LifecycleTransfer,
ModelState,
ResourceState,
ServiceAttribute,
ServiceEntity,
)
from inmanta_plugins.lsm.allocation_v2 import entity, framework
LOGGER = logging.getLogger(__name__)
def get_optional_value(value: DynamicProxy, attribute: str) -> Optional[object]:
try:
return getattr(value, attribute)
except OptionalValueException:
return None
[docs]
@plugins.plugin
def context_v2_unwrapper(
assignments: "dict[]",
fallback_attribute: "string",
track_deletes: "bool" = False,
) -> "dict[]":
"""
This plugin can be used to wrap the instances coming out of lsm::all and place all allocated values in
:param fallback_attribute: where they should go. The returned value is what has been given as input,
except for the allocated values being set where they should.
:param track_deletes: drop deleted embedded entities, even if they still exist in the fallback set.
This should be used together with ContextV2Wrapper allocator.
Each assignment is an attribute dict containing a fallback attribute assigned with allocated values as
produced by the ContextV2 (one-level deep, keys are string representation of the paths, values are
allocated values) and update the dict placing values where their key-path would reach them.
:param assignments: The list of service instance dictionaries as returned by lsm::all
:param fallback_attributes: The attribute name at the root of the instance attributes that contains
all the allocated values
e.g.::
context_v2_unwrapper(
[
{
"environment": "8f7bf3a5-d655-4bcb-bbd4-6222407be999",
"id": "f93acfad-7894-4a12-9770-b27cbdd85c74",
"service_entity": "carrierEthernetEvc",
"version": 4,
"config": {},
"state": "allocating",
"attributes": {
"allocated": {
"evcEndPoints[identifier=my-evc-ep-1].uni": {
"uniref": "inmanta:456-852-789",
"test_value": "test_value",
},
"evcEndPoints[identifier=my-evc-ep-2].uni": {
"uniref": "inmanta:123-852-456",
"test_value": "test_value",
},
},
"another_key": "any value",
},
"candidate_attributes": {
"allocated": {
"evcEndPoints[identifier=my-evc-ep-1].uni": {
"uniref": "inmanta:456-852-789",
"test_value": "test_value",
},
"evcEndPoints[identifier=my-evc-ep-2].uni": {
"uniref": "inmanta:123-852-456",
"test_value": "test_value",
},
},
"another_key": "any value",
},
"active_attributes": {},
"rollback_attributes": {},
}
],
"allocated",
)
will return::
[
{
"environment": "8f7bf3a5-d655-4bcb-bbd4-6222407be999",
"id": "f93acfad-7894-4a12-9770-b27cbdd85c74",
"service_entity": "carrierEthernetEvc",
"version": 4,
"config": {},
"state": "allocating",
"attributes": {
"allocated": {
"evcEndPoints[identifier=my-evc-ep-1].uni": {
"uniref": "inmanta:456-852-789",
"test_value": "test_value",
},
"evcEndPoints[identifier=my-evc-ep-2].uni": {
"uniref": "inmanta:123-852-456",
"test_value": "test_value",
},
},
"evcEndPoints": [
{
"identifier": "my-evc-ep-1",
"uni": {
"uniref": "inmanta:456-852-789",
"test_value": "test_value",
},
},
{
"identifier": "my-evc-ep-2",
"uni": {
"uniref": "inmanta:123-852-456",
"test_value": "test_value",
},
},
],
"another_key": "any value",
},
"candidate_attributes": {
"allocated": {
"evcEndPoints[identifier=my-evc-ep-1].uni": {
"uniref": "inmanta:456-852-789",
"test_value": "test_value",
},
"evcEndPoints[identifier=my-evc-ep-2].uni": {
"uniref": "inmanta:123-852-456",
"test_value": "test_value",
},
},
"evcEndPoints": [
{
"identifier": "my-evc-ep-1",
"uni": {
"uniref": "inmanta:456-852-789",
"test_value": "test_value",
},
},
{
"identifier": "my-evc-ep-2",
"uni": {
"uniref": "inmanta:123-852-456",
"test_value": "test_value",
},
},
],
"another_key": "any value",
},
"active_attributes": {},
"rollback_attributes": {},
}
]
"""
def for_all_attributes(assignment: dict) -> dict:
for attributes_set in entity.AttributeSetName:
attributes: Optional[dict] = assignment.get(attributes_set.value)
if isinstance(attributes, NoneValue) or not attributes:
continue
assignment[attributes_set.value] = framework.unwrap_attributes(
attributes, fallback_attribute, track_deletes=track_deletes
)
return assignment
return [
for_all_attributes(DynamicProxy.unwrap(assignment))
for assignment in assignments
]
def is_instance(ctx: plugins.Context, obj: "any", cls: "string") -> "bool":
t = ctx.get_type(cls)
try:
t.validate(obj._get_instance())
except ast.RuntimeException:
return False
return True
def getoptional(obj, *relations):
try:
for rel in relations:
obj = getattr(obj, rel)
return obj
except OptionalValueException:
return None
def create_lifecycle(fsm_config) -> "dict":
"""
Generate a dict with the state metadata in to send to the server
"""
states = set()
def state(st):
if st is None:
return None
states.add(st)
return st.name
transfers = [
LifecycleTransfer(
source=state(tr.source),
target=state(tr.target),
error=state(getoptional(tr, "error")),
on_update=tr.on_update,
on_delete=tr.on_delete,
api_set_state=tr.api_set_state,
resource_based=tr.resource_based,
auto=tr.auto,
validate=tr.validate,
config_name=tr.config_name,
target_operation=tr.target_operation,
error_operation=tr.error_operation,
description=tr.description,
)
for tr in fsm_config.transfers
]
states = [
LifecycleState(
name=state.name,
label=getoptional(state, "label"),
export_resources=getoptional(state, "export_resources"),
validate_self=getoptional(state, "validate_self"),
validate_others=getoptional(state, "validate_others"),
purge_resources=state.purge_resources,
deleted=state.deleted,
values={k: v for k, v in state.values.items()},
)
for state in states
]
out = Lifecycle(
name=fsm_config.name,
states=states,
transfers=transfers,
initial_state=fsm_config.initial_state.name,
)
out.validate_graph()
return out
def collect_states(fsm_config) -> "dict":
"""
Generate a dict with the state metadata in to send to the server
"""
states = set()
def state(st):
if st is None:
return None
states.add(st)
for tr in fsm_config.transfers:
state(tr.source)
state(tr.target)
state(getoptional(tr, "error"))
return {state.name: state for state in states}
def map_attr_type(
attr: ast.attribute.Attribute, thetype: Optional[ast.type.Type] = None
) -> str:
tp: ast.type.Type = (
attr.type if thetype is None else attr.type.with_base_type(thetype)
)
result: Optional[str] = tp.type_string()
if result is None:
raise Exception("Invalid attribute type %s" % thetype)
return result
def unwrap(value: DynamicProxy) -> Any:
"""
This code is not quite the compiler domain, not is it purely the plugin domain.
As such, the default unwrapping doesn't really work
"""
value = DynamicProxy.unwrap(value)
if isinstance(value, NoneValue):
return None
return value
class ServiceEntityBuilder:
"""
This class creates a ServiceEntity instance from a ServiceEntity in model.
"""
@classmethod
def get_service_entity_from_model(
cls,
env: UUID,
all_bindings: abc.Mapping[str, DynamicProxy],
entity_binding: ProxiedType,
entity_type: Entity,
) -> ServiceEntity:
"""
Convert the entity type object from the compiler to a ServiceEntity object.
:param all_bindings: All service entity bindings defined in the model, mapped by corresponding ServiceEntity subtype.
"""
try:
return cls._get_service_entity_from_model(
env, all_bindings, entity_binding, entity_type
)
except DirectExecuteException as e:
raise WrappingRuntimeException(
None,
"Only Literal statements are allowed as defaults in model definition",
e,
)
@classmethod
def _get_service_entity_from_model(
cls,
env: UUID,
all_bindings: abc.Mapping[str, DynamicProxy],
entity_binding: ProxiedType,
entity_type: Entity,
) -> ServiceEntity:
docstring = parse_docstring(entity_type.comment)
(
service_attributes,
embedded_service_entities,
inter_service_relations,
) = cls._get_attributes(
entity_type,
all_bindings=all_bindings,
strict_enforcement=entity_binding.strict_modifier_enforcement,
)
owner = get_optional_value(entity_binding, "owner")
if owner:
owner = owner.service_entity_name
return ServiceEntity(
environment=env,
name=entity_binding.service_entity_name,
description=docstring.get_description(),
attributes=service_attributes,
embedded_entities=embedded_service_entities,
inter_service_relations=inter_service_relations,
lifecycle=create_lifecycle(entity_binding.lifecycle),
# The config field shouldn't be populated. It is populated indirectly via the config_name attribute of a
# StateTransfer. The lsm_service_entity_config_set() API call should be used to update the config of a
# ServiceEntity.
config={},
service_identity=entity_binding.service_identity,
service_identity_display_name=entity_binding.service_identity_display_name,
strict_modifier_enforcement=entity_binding.strict_modifier_enforcement,
owner=owner,
relation_to_owner=entity_binding.relation_to_owner,
entity_annotations=cls._get_annotations_for_entity(entity_type),
)
@classmethod
def _get_annotation_in_attribute(
cls, attribute: ast.attribute.Attribute, entity_type: Entity
) -> dict[str, str]:
"""
Returns the default value of the given attribute that contains attribute annotations. This method verifies the
constraints that should hold on an attribute that contains annotations.
:param attribute: The attribute that contains the annotations in its default values.
:param entity_type: The entity that attribute belongs to.
"""
try:
default_value = entity_type.get_default(attribute.name).execute_direct({})
except AttributeError:
raise Exception(
f"Attribute {attribute.name} of entity {entity_type.get_full_name()} is missing a default value"
f" ({attribute.location})"
)
if not isinstance(default_value, dict):
raise Exception(
f"Default value on attribute {attribute.name} of entity {entity_type.get_full_name()} doesn't have"
f" type dict ({attribute.location})."
)
if not builtins.all(isinstance(d, str) for d in default_value.values()):
raise Exception(
f"The default value of the attribute {attribute.name} of entity {entity_type.get_full_name()} must"
f" have the type dict[str, str], but some values have a non-string type. Got: {default_value}"
f" ({attribute.location})"
)
return default_value
@classmethod
def _get_annotations_for_entity(cls, entity_type: Entity) -> dict[str, str]:
"""
Returns the dictionary with the annotations for the given Entity.
"""
annotations_attr_name = "__annotations"
if annotations_attr_name not in entity_type.attributes:
return {}
attr = entity_type.attributes[annotations_attr_name]
return cls._get_annotation_in_attribute(attr, entity_type)
@classmethod
def _get_annotations_for_relation(
cls, relation: ast.attribute.RelationAttribute
) -> dict[str, str]:
"""
Returns the dictionary with the annotations for the given relation.
"""
if relation.source_annotations:
relation_annotations_instances = [
source_annotation.get_value()
for source_annotation in relation.source_annotations
if isinstance(source_annotation.get_value(), Instance)
and source_annotation.get_value().type.get_full_name()
== "lsm::RelationAnnotations"
]
if len(relation_annotations_instances) > 1:
raise Exception(
f"More than one lsm::RelationAnnotations instance found on relationship {relation} ({relation.location})"
)
if relation_annotations_instances:
instance = relation_annotations_instances[0]
attribute = instance.get_attribute("annotations")
annotations = attribute.get_value()
if not builtins.all(isinstance(d, str) for d in annotations.values()):
raise Exception(
f"Attribute annotations of entity {instance.type.get_full_name()} must have the type"
f" dict[str, str], but some values have a non-string type. Got: {annotations} ({attribute.location})"
)
return annotations
return {}
@classmethod
def _has_parent_entity(cls, entity_type: Entity, parent_entity: str) -> bool:
"""
Returns true iff this entity type has parent_entity as a parent entity.
"""
return parent_entity in [
p.get_full_name() for p in entity_type.get_all_parent_entities()
]
@classmethod
def _ensure_parent_entity(
cls, entity_type: Entity, required_parent_entity: str
) -> None:
"""
Raise an exception when the given entity doesn't have required_parent_entity as a parent entity.
"""
if not cls._has_parent_entity(entity_type, required_parent_entity):
raise Exception(
f"{entity_type.get_full_name()} should be a subclass of {required_parent_entity}"
)
@classmethod
def _get_attributes(
cls,
entity_type: Entity,
*,
all_bindings: abc.Mapping[str, DynamicProxy],
strict_enforcement: bool = False,
is_embedded_entity: bool = False,
entity_chain_from_service_entity: Optional[Sequence[Entity]] = None,
) -> Tuple[
list[ServiceAttribute], list[EmbeddedServiceEntity], list[InterServiceRelation]
]:
"""
:param strict_enforcement: True iff `ServiceEntityBinding.strict_modifier_enforcement` is set to True.
:param is_embedded_entity: Indicates whether the given entity is an embedded entity.
:param entity_chain_from_service_entity: All entities that exist between the service entity and this entity_type.
"""
if entity_chain_from_service_entity is None:
entity_chain_from_service_entity = []
if is_embedded_entity:
cls._ensure_parent_entity(entity_type, "lsm::EmbeddedEntity")
else:
cls._ensure_parent_entity(entity_type, "lsm::ServiceEntity")
attributes: dict[str, Attribute]
attribute_description_map: dict[str, str]
attributes, attribute_description_map = cls._get_all_attributes_compiler_type(
entity_type
)
service_attrs: list[ServiceAttribute]
embedded_entities: list[EmbeddedServiceEntity]
(
service_attrs,
embedded_entities,
inter_service_relations,
) = cls._get_all_attributes_model_type(
entity_type,
attributes,
attribute_description_map,
entity_chain_from_service_entity,
all_bindings,
strict_enforcement,
)
cls._log_warning_for_missing_documentation(
entity_type, attribute_description_map, service_attrs
)
return service_attrs, embedded_entities, inter_service_relations
@classmethod
def _get_all_attributes_compiler_type(
cls, entity_type: Entity
) -> Tuple[dict[str, Attribute], dict[str, str]]:
"""
Get all attributes of the given entity in types used by the compiler.
:return: A tuple with as a first element the attributes. The second element
contains the description of attributes present in the docstring.
"""
# collect all attributes, not on std::Entity, lsm::ServiceEntity or lsm::ServiceBase
attributes: dict[str, Attribute] = {}
attribute_description_map = {}
for name, attr in entity_type.attributes.items():
attributes[name] = attr
docstring = parse_docstring(entity_type.comment)
attribute_description_map.update(docstring.get_attribute_description_map())
for current in entity_type.get_all_parent_entities():
name = current.get_full_name()
if name in [
"lsm::ServiceEntity",
"lsm::ServiceBase",
"lsm::EmbeddedEntity",
"std::Entity",
]:
continue
for name, attr in current.attributes.items():
attributes.setdefault(name, attr)
docstring = parse_docstring(current.comment)
attribute_description_map.update(docstring.get_attribute_description_map())
return attributes, attribute_description_map
@classmethod
def _check_relationship_constraints(
cls, attr: ast.attribute.RelationAttribute
) -> None:
"""
This method checks the following constraints:
* The upperbound of the arity of the relationship to the embedding entity should be 1.
* The name of the attribute referencing the embedding entity should start with an underscore.
"""
# Bidirectional relationship
if attr.end is not None:
# Check arity relationship to embedding entity
if attr.end.low != 1 or attr.end.high != 1:
raise Exception(
f"Attribute {attr.end.name} of relationship {attr.end.type}.{attr.name} -- {attr.type}."
f"{attr.end.name} should have arity one. The entity structure of embedded entities should "
f"form a tree."
)
# Check name variable referencing embedding entity
if not attr.end.name.startswith("_"):
attribute_to_embedding_entity = (
f"{attr.end.entity.get_full_name()}.{attr.end.name}"
)
raise Exception(
f"Attribute {attribute_to_embedding_entity} should start with an underscore "
"because it references the embedding entity."
)
@classmethod
def _get_plain_attributes(cls, entity: Entity) -> Set[str]:
"""
Return all attributes except relational and metadata attributes.
"""
attributes, _ = cls._get_all_attributes_compiler_type(entity)
return set(
name
for name, attr in attributes.items()
if not (
name.startswith("_")
or "__" in name
or isinstance(attr, ast.attribute.RelationAttribute)
)
)
@classmethod
def _get_modifier(
cls,
attr: ast.attribute.RelationAttribute,
) -> Optional[str]:
"""
get the modifier from the RelationshipMetadata if found in the
source_annotations of attr.
"""
if attr.source_annotations:
relationship_metadata_entities = [
source_annotation.get_value()
for source_annotation in attr.source_annotations
if isinstance(source_annotation.get_value(), Instance)
and source_annotation.get_value().type.get_full_name()
== "lsm::RelationshipMetadata"
]
if len(relationship_metadata_entities) > 1:
raise Exception(
"More than one attribute modifier annotation (__rwplus__, __rw__ or __r__) found on relationship "
f"{attr}"
)
if relationship_metadata_entities:
return (
relationship_metadata_entities[0]
.get_attribute("modifier")
.get_value()
)
return None
@classmethod
def _get_all_attributes_model_type(
cls,
entity_type: Entity,
attributes_from_compiler: Mapping[str, Attribute],
attribute_description_map: Mapping[str, str],
entity_chain_from_service_entity: Sequence[Entity],
all_bindings: abc.Mapping[str, DynamicProxy],
strict_enforcement: bool = False,
) -> Tuple[
list[ServiceAttribute], list[EmbeddedServiceEntity], list[InterServiceRelation]
]:
"""
Get all attributes of the given entity in types used by the internal model.
:param attribute_description_map: Maps the attribute name to the description present in the docstring of the entity.
:param entity_chain_from_service_entity: All entities that exist between the service entity and this entity_type.
:param strict_enforcement: True iff `ServiceEntityBinding.strict_modifier_enforcement` is set to True.
"""
defaults = entity_type.get_default_values()
service_attr_dct: dict[str, dict[str, Any]] = defaultdict(dict)
embedded_service_entities: list[EmbeddedServiceEntity] = []
inter_service_relations: list[InterServiceRelation] = []
# Check if loop exists between embedded entities
if entity_type.get_full_name() in [
t.get_full_name() for t in entity_chain_from_service_entity
]:
chain = (
" -> ".join(t.get_full_name() for t in entity_chain_from_service_entity)
+ f" -> {entity_type.get_full_name()}"
)
raise Exception(
f"A cycle exists in the embedded entity structure: {chain}. Embedded entities should form a tree structure."
)
for name, attr in attributes_from_compiler.items():
try:
if name.startswith("_"):
# Ignore attributes starting with _
pass
elif isinstance(attr, ast.attribute.RelationAttribute):
if cls._has_parent_entity(attr.type, "lsm::EmbeddedEntity"):
cls._check_relationship_constraints(attr)
docstring = parse_docstring(attr.type.comment)
(
ref_attributes,
ref_embedded_service_entities,
ref_inter_service_relations,
) = cls._get_attributes(
attr.type,
all_bindings=all_bindings,
is_embedded_entity=True,
entity_chain_from_service_entity=[
*entity_chain_from_service_entity,
entity_type,
],
strict_enforcement=strict_enforcement,
)
name_attr_to_embedder: Optional[str] = (
attr.end.name if attr.end is not None else None
)
ese = EmbeddedServiceEntity(
name=attr.name,
lower_limit=attr.low,
upper_limit=attr.high,
description=docstring.get_description(),
attributes=ref_attributes,
embedded_entities=ref_embedded_service_entities,
inter_service_relations=ref_inter_service_relations,
key_attributes=[]
if not strict_enforcement
else cls._get_key_attributes(
attr.type, name_attr_to_embedder, attr.high
),
entity_annotations=cls._get_annotations_for_entity(
attr.type
),
attribute_annotations=cls._get_annotations_for_relation(
attr
),
)
modifier = cls._get_modifier(attr)
if modifier:
ese.modifier = modifier
embedded_service_entities.append(ese)
elif attr.source_annotations:
inter_service_relations_entries = [
entity.get_value()
for entity in attr.source_annotations
if isinstance(entity.get_value(), Instance)
and entity.get_value().type.get_full_name()
== "lsm::InterServiceRelation"
]
if len(inter_service_relations_entries) == 0:
pass
elif len(inter_service_relations_entries) > 1:
raise Exception(
"More than one `__service__` annotation found on relationship "
f"{attr}"
)
else:
docstring = parse_docstring(attr.type.comment)
isr = InterServiceRelation(
name=attr.name,
lower_limit=attr.low,
upper_limit=attr.high,
description=docstring.get_description(),
# use service name rather than entity name as entity type
entity_type=all_bindings[
str(attr.type)
].service_entity_name,
attribute_annotations=cls._get_annotations_for_relation(
attr
),
)
modifier = cls._get_modifier(attr)
if modifier:
isr.modifier = modifier
inter_service_relations.append(isr)
else:
# values and metadata
obj = re.search("(.*)__(.*)", name)
if obj:
# metadata
source_attr_name, opt = obj.groups()
# Make sure that attribute documentation via the docstring takes precedence
# over attribute documentation via the <attr_name>__description field.
if (
opt == "description"
and source_attr_name in attribute_description_map
):
continue
if opt == "annotations":
annotations = cls._get_annotation_in_attribute(
attr, entity_type
)
service_attr_dct[source_attr_name][
"attribute_annotations"
] = annotations
continue
service_attr_dct[source_attr_name][opt] = defaults[
name
].execute_direct({})
else:
service_attr_dct[name]["name"] = name
service_attr_dct[name]["type"] = get_type_as_str(attr)
service_attr_dct[name][
"description"
] = attribute_description_map.get(name, None)
(
validation_type,
validation_params,
) = get_validation_type_and_params(attr)
service_attr_dct[name]["validation_type"] = validation_type
service_attr_dct[name][
"validation_parameters"
] = validation_params
service_attr_dct[name]["default_value_set"] = name in defaults
if name in defaults and defaults[name] is not None:
value = do_execute_direct(
defaults[name],
f"Invalid default value for attribute {name} "
"of service entity {entity_binding.service_entity_name}",
)
attr.validate(DynamicProxy.unwrap(value))
service_attr_dct[name]["default_value"] = unwrap(value)
except Exception:
LOGGER.exception("Failed to process attribute %s.%s", entity_type, name)
raise
service_attributes: list[ServiceAttribute] = []
for attribute, values in service_attr_dct.items():
try:
if "name" not in values.keys():
if (
"attribute_annotations" in values
and attribute in entity_type.attributes
and isinstance(
entity_type.attributes[attribute],
ast.attribute.RelationAttribute,
)
):
# The annotations for a relationship were specified using the syntax for an attribute annotation.
cls.raise_exception_to_use_annotation_on_relationship(
entity_type, attribute, values["attribute_annotations"]
)
all_metadata_attributes = [
f'{attribute}__{value if value != "attribute_annotations" else "annotations"}'
for value in values.keys()
]
raise Exception(
f"Metadata attribute(s): { ', '.join(all_metadata_attributes) } "
f"of entity {entity_type.get_full_name()} specified without a non-relational attribute {attribute}"
)
service_attributes.append(ServiceAttribute(**values))
except pydantic.ValidationError as e:
LOGGER.exception(
"Failed to parse attribute %s due to an validation exception",
attribute,
)
raise e
return service_attributes, embedded_service_entities, inter_service_relations
@classmethod
def raise_exception_to_use_annotation_on_relationship(
cls, entity_type: Entity, attribute_name: str, annotations: dict[str, str]
) -> None:
"""
This method raises an exception about the fact that annotations on the relational attribute `attribute_name` were
defined using the attribute annotation syntax instead of the syntax for relational attributes.
:param entity_type: The entity the relational attribute belongs to.
:param attribute_name: The name of the relational attribute.
:param annotations: The annotations that were defined by the attribute annotation.
"""
metadata_attribute_name = f"{attribute_name}__annotations"
assert metadata_attribute_name in entity_type.attributes
relationship = entity_type.attributes[attribute_name]
assert isinstance(relationship, ast.attribute.RelationAttribute)
def _format_limits(relationship: ast.attribute.RelationAttribute) -> str:
"""
Returns a textual representation of the upper and lower arity of the given relationship.
"""
if relationship.high is None:
return f"[{relationship.low}:]"
elif relationship.low == relationship.high:
return f"[{relationship.low}]"
else:
return f"[{relationship.low}:{relationship.high}]"
def _format_path_for_relationship(
relationship_location: ast.Location,
entity_type: Entity,
name_relational_attribute: Optional[str] = None,
) -> str:
"""
Return the path to an entity or a relational attribute as part of the textual representation of a relationship.
:param relationship_location: The location of the relationship definition.
:param entity_type: The entity type referenced by the relationship.
:param name_relational_attribute: The name of the relational attribute. Can be set to None for the side of a
unidirectional relationship that doesn't have a relational attribute.
"""
if relationship_location.file == entity_type.location.file:
# The relationship and the entity are defined in the same namespace.
# We don't have to reference the entity with its fully-qualified name.
result = entity_type.name
else:
result = entity_type.get_full_name()
if name_relational_attribute:
result = f"{result}.{name_relational_attribute}"
return result
def _format_relationship() -> str:
"""
Return a textual representation of the relationship. This method returns a relationship where
the -- in the middle of the relationship definition is replaced with __annotations__.
"""
path_relational_attr_lhs = _format_path_for_relationship(
relationship.location, entity_type, relationship.name
)
formatted_lhs = f"{path_relational_attr_lhs} {_format_limits(relationship)}"
if relationship.end:
path_relational_attr_rhs = _format_path_for_relationship(
relationship.location, relationship.type, relationship.end.name
)
formatted_rhs = (
f"{path_relational_attr_rhs} {_format_limits(relationship.end)}"
)
else:
formatted_rhs = _format_path_for_relationship(
relationship.location, relationship.type
)
return f"{formatted_lhs} __annotations__ {formatted_rhs}"
raise Exception(
f"""
Metadata attribute(s): {metadata_attribute_name} of entity {entity_type.get_full_name()} on the relation {attribute_name}
is specified as an attribute annotation. Write:
__annotations__ = lsm::RelationAnnotations(
annotations={annotations}
)
{_format_relationship()}
instead and remove the metadata attribute at {entity_type.attributes[metadata_attribute_name].location}
""".strip()
)
@classmethod
def _get_key_attributes(
cls,
embedded_entity: Entity,
name_attr_to_embedder: Optional[str],
arity_upper_limit: Optional[int],
) -> list[str]:
"""
Return the list of attributes that uniquely identify the given embedded entity.
:param arity_upper_limit: The upper limit of the arity in the relationship to the embedded entity.
"""
key_attributes: list[str]
key_attributes_attr_name = "__lsm_key_attributes"
try:
key_attributes = embedded_entity.get_default(
key_attributes_attr_name
).execute_direct({})
except AttributeError:
raise Exception(
f"A default value should be defined for the attribute {key_attributes_attr_name} "
f"of entity {embedded_entity.get_full_name()}"
)
potential_indices: list[
Set[str]
] = cls._get_potential_indices_for_object_identity(
embedded_entity, name_attr_to_embedder
)
if not isinstance(key_attributes, NoneValue):
# Verify whether an index was defined for these key_attributes
key_attribute_set = set(key_attributes)
if key_attribute_set in potential_indices:
pass
elif (
name_attr_to_embedder is not None
and key_attribute_set.union({name_attr_to_embedder})
in potential_indices
):
pass
else:
raise Exception(
f"Key attributes '{key_attributes}' defined with `__lsm_key_attributes` for entity "
f"{embedded_entity.get_full_name()} is not an index."
)
else:
# Derive key attributes from defined indices
if not potential_indices:
if arity_upper_limit is not None and arity_upper_limit == 1:
# If the upper arity is 1, there is no need to define an index to uniquely identity the embedded entity.
return []
else:
raise Exception(
f"Failed to identify key attributes of embedded entity {embedded_entity.get_full_name()}. "
f"Attribute `__lsm_key_attributes` is null and no identity could be derived from the indices of "
f"the entity."
)
if len(potential_indices) > 1:
raise Exception(
f"Failed to identify the key attributes of embedded entity {embedded_entity.get_full_name()}. Attribute "
f"`__lsm_key_attributes` is null and more than one index was found that could be used to define "
f"the identity of the entity. Use the `__lsm_key_attributes` attribute to define the identifying "
f"attributes explicitly."
)
key_attributes = potential_indices[0]
# Filter out relation to embedding entity
return sorted(
[attr for attr in key_attributes if attr != name_attr_to_embedder]
)
@classmethod
def _get_potential_indices_for_object_identity(
cls, embedded_entity: Entity, name_attr_to_embedder: Optional[str]
) -> list[Set[str]]:
"""
Return a subset of all the indices defined on the given embedded entity, that
can be used to define the identity of the embedded entity from the perspective
of the north-bound API.
:param name_attr_to_embedder: The name of the relational attribute, defined on the embedded entity,
that references to the embedding entity. Set to None is no such
relationship exists.
"""
def is_valid_index(attribute_names_in_index: Sequence[str]) -> bool:
"""
Return True iff the given index can be used to define the identity of an embedded entity.
"""
for attribute_name in attribute_names_in_index:
if attribute_name == name_attr_to_embedder:
continue
if attribute_name.startswith("_"):
LOGGER.debug(
"Ignoring index %s on entity %s as potential key attributes for the embedded entity. "
"The identifying index can only contain "
"attributes exposed via the north-bound API",
attribute_names_in_index,
embedded_entity.get_full_name(),
)
return False
attribute: Optional[Attribute] = embedded_entity.get_attribute(
attribute_name
)
assert attribute is not None # Make mypy happy
if isinstance(attribute, ast.attribute.RelationAttribute):
LOGGER.debug(
"Ignoring index %s on entity %s as potential key attributes for the embedded entity. "
"The identifying index cannot contain "
"relationships except for a relationship to the entity this entity is embedded in",
attribute_names_in_index,
embedded_entity.get_full_name(),
)
return False
attribute_type: ast_type.Type = attribute.get_type().get_base_type()
if isinstance(attribute_type, ast_type.List):
LOGGER.info(
"Ignoring index %s on entity %s as potential key attributes for the embedded entity because it"
" contains the attribute %s of type list.",
attribute_names_in_index,
embedded_entity.get_full_name(),
attribute.name,
)
return False
if isinstance(attribute_type, ast_type.Dict):
LOGGER.info(
"Ignoring index %s on entity %s as potential key attributes for the embedded entity because it"
" contains the attribute %s of type dict.",
attribute_names_in_index,
embedded_entity.get_full_name(),
attribute.name,
)
return False
return True
return [
set(index)
for index in embedded_entity.get_indices()
if is_valid_index(index)
]
@classmethod
def _log_warning_for_missing_documentation(
cls,
entity_type: Entity,
attribute_description_map: dict[str, str],
service_attributes: list[ServiceAttribute],
) -> None:
"""
Log a warning when the docstring contains documentation for an
attribute that is not defined in the entity.
"""
attributes_in_docstring = set(attribute_description_map.keys())
attributes_in_service_definition = set([s.name for s in service_attributes])
undefined_attributes_in_docstring = (
attributes_in_docstring - attributes_in_service_definition
)
for attr_name in undefined_attributes_in_docstring:
if attr_name.startswith("_") or "__" in attr_name:
# Documenting 'private' attributes or metadata attributes shouldn't emit a warning
continue
LOGGER.warning(
"Attribute '%s' is defined in the docstring of service entity '%s', but attribute doesn't exist.",
attr_name,
entity_type.name,
)
def get_type_as_str(attr: ast.attribute.Attribute) -> str:
base_type: ast.type.Type = attr.type.get_base_type()
def get_constraint_basetype(tp: ast.type.ConstraintType):
if isinstance(tp.basetype, ast.type.ConstraintType):
return get_constraint_basetype(tp.basetype)
return tp.basetype
if isinstance(base_type, ast.type.ConstraintType):
return map_attr_type(attr, get_constraint_basetype(base_type))
else:
return map_attr_type(attr)
def get_validation_type_and_params(
attr: ast.attribute.Attribute,
) -> Tuple[Optional[str], Optional[Mapping[str, object]]]:
"""
If the type of the given attribute was defined with a typedef statement and it uses a supported constraint, this method
returns the inmanta_lsm validation type and validation parameters.
"""
import inmanta_plugins.lsm.typedef as typedef
result: Optional[typedef.ValidationType] = typedef.get_validation_type(attr)
return result if result is not None else (None, None)
def do_execute_direct(
statement: ast.statements.DynamicStatement, error_msg: str
) -> Any:
"""
Execute the given statement directly.
"""
try:
return DynamicProxy.return_value(statement.execute_direct({}))
except DirectExecuteException as e:
raise WrappingRuntimeException(None, error_msg, e)
@plugins.plugin
def fsm_to_dot(config: "lsm::LifecycleStateMachine") -> "string":
"""
Generate a dot representation of the state machine
"""
return create_lifecycle(config).to_dot()
@plugins.plugin
def render_dot(fsm: "lsm::LifecycleStateMachine") -> None:
"""
Render a dot graph in the current directory
"""
dot = fsm_to_dot(fsm)
with open("fsm.dot", "w+") as fd:
fd.write(dot)
subprocess.check_call(["dot", "-Tsvg", "-ofsm.svg", "fsm.dot"])
def do_export(
env: str,
conn: protocol.endpoints.SyncClient,
se: ServiceEntity,
allow_instance_updates: bool,
) -> None:
get_entity_result = conn.lsm_service_catalog_get_entity(
tid=env, service_entity=se.name
)
if get_entity_result.code == 404:
export_result = conn.lsm_service_catalog_create_entity(
tid=env, service_entity_definition=se
)
else:
kwargs = {}
if allow_instance_updates:
# Only set parameter if required, to preserve backward compatibility
kwargs["allow_instance_updates"] = True
# Config settings cannot be updated via the update_entity API call
se.config = get_entity_result.result["data"]["config"]
if not se.strict_modifier_enforcement:
LOGGER.warning(
(
"The strict_modifier_enforcement flag is not enabled for service entity binding %s. This behaviour is being"
" deprecated. Please refer to the documentation for more information on how to enable it."
),
se.name,
)
# try export
export_result = conn.lsm_service_catalog_update_entity(
tid=env, service_entity=se.name, service_entity_definition=se, **kwargs
)
if (
allow_instance_updates
and export_result == 400
and "allow_instance_updates" in export_result.result.get("message", "")
):
# Backward compatibility!
LOGGER.warning(
"The version of inmanta-lsm used by the compiler and server are different, this may cause issues."
)
LOGGER.warning(
"The server version is too old to support type updates or updates on embedded instances."
)
# for backward compatibility with inmanta_lsm<2.0.1.dev
# we do the call again without allow_instance_updates the argument
export_result = conn.lsm_service_catalog_update_entity(
tid=env, service_entity=se.name, service_entity_definition=se
)
if export_result.code != 200:
# Improve error reporting for
# This update requires the `allow_type_updates` parameter to be set
if "allow_instance_updates" in export_result.result.get("message", ""):
raise Exception(
"could not upload lifecycle, "
"because it would require updating existing instances, "
"use `--export-plugin service_entities_exporter`"
)
raise Exception(
f"could not upload lifecycle {export_result.result} (code: {export_result.code})"
)
@export(
"service_entities_exporter_strict",
"lsm::ServiceEntityBinding",
"lsm::ServiceEntityBindingV2",
)
def export_service_entities(
exporter: Exporter, types: dict[str, Sequence[ProxiedType]] = {}
) -> None:
do_export_service_entities(exporter, types, False)
@export(
"service_entities_exporter",
"lsm::ServiceEntityBinding",
"lsm::ServiceEntityBindingV2",
)
def service_entities_exporter_with_instance_updates(
exporter: Exporter, types: dict[str, Sequence[ProxiedType]] = {}
) -> None:
do_export_service_entities(exporter, types, True)
def do_export_service_entities(
exporter: Exporter,
types: dict[str, Sequence[ProxiedType]],
force: bool = False,
) -> None:
conn = protocol.endpoints.SyncClient("compiler")
env: Optional[UUID] = config.Config.get("config", "environment", None)
if env is None:
raise Exception(
"The environment of the model should be configured in config>environment"
)
all_bindings: dict[str, DynamicProxy] = {
instance.service_entity: instance
for instance in (
typing.cast(DynamicProxy, instance)
for instance in itertools.chain(
types["lsm::ServiceEntityBinding"], types["lsm::ServiceEntityBindingV2"]
)
)
}
for service_entity_binding in all_bindings.values():
try:
entity_type = exporter.types[service_entity_binding.service_entity]
except KeyError as e:
raise PluginException(
f"Entity {service_entity_binding.service_entity} in service entity binding for"
f" {service_entity_binding.service_entity_name} does not exist."
) from e
type_map = ServiceEntityBuilder.get_service_entity_from_model(
env, all_bindings, service_entity_binding, entity_type
)
do_export(env, conn, type_map, force)
def get_service_entity(type_class):
defaults = type_class.get_default_values()
if "__service_entity_name" not in defaults:
return None
return defaults["__service_entity_name"].execute(None, None, None)
class CacheManager:
"""
Entry point for all internal caches in LSM, accessed via :py:data:`global_cache`.
Also caches a connection to the server.
"""
def __init__(self) -> None:
self._instances_per_binding: dict[str, list[dict]] = {}
self._current_state_cache: dict[str, Tuple[str, int]] = {}
self._all_instance_cache: dict[str, dict] = {}
self._client = protocol.endpoints.SyncClient("compiler")
# Cache of all bindings, may be unresolvable and cause rescheduling
self._binding_cache_pre: dict[str, "lsm::ServiceEntityBinding"] = {}
# Cache of all lifecycles, resolved
self._binding_cache_lifecycles: dict[str, dict] = {}
# Cache of selectors
self._selector_cache: dict[
str, "inmanta_plugins.lsm.partial.ParentSelector"
] = {}
def reset(self) -> None:
"""Reset all state: drop all caches and renew the connection"""
self._instances_per_binding = {}
self._current_state_cache = {}
self._client = protocol.endpoints.SyncClient("compiler")
self._binding_cache_pre = {}
self._binding_cache_lifecycles = {}
self._all_instance_cache = {}
self._selector_cache = {}
def get_selector(
self,
root: "lsm:ServiceBinding",
env: str,
current_instance_id: str,
validate: bool,
) -> "inmanta_plugins.lsm.partial.ParentSelector":
root_name = root.service_entity_name
if root_name not in self._selector_cache:
self._selector_cache[
root_name
] = inmanta_plugins.lsm.partial.ParentSelector(
root, env, current_instance_id, validate
)
return self._selector_cache[root_name]
def get_client(self) -> protocol.endpoints.SyncClient:
return self._client
def get_instance(
self,
env: str,
service_entity_name: Optional[str],
instance_id: Union[str, UUID],
force: bool = False,
include_terminated: bool = False,
) -> Optional[dict]:
"""
Return the service instance with the given environment, service_entity_name and instance_id or None
if no such instance exists.
:param force: when true, the cache is refreshed from the server
:param include_terminated: when trying to pull a specific instance, and it is not in the cache,
try to get it from the API directly, so that it is returned even when terminated.
"""
assert not (
force and not service_entity_name
), "when forcing refresh, service_entity_name must be passed in"
if isinstance(instance_id, UUID):
instance_id = str(instance_id)
if service_entity_name:
self.get_all_instances(env, service_entity_name, force=force)
found = self._all_instance_cache.get(instance_id)
if found is None:
if include_terminated:
# Try to pull it from the API directly in case it is terminated
result = self._client.lsm_services_get_by_id(
tid=env, service_id=instance_id
)
if result.code == 404:
# it is not there at all
return None
if result.code != 200:
message = result.result.get("message") if result.result else None
extra_info = f": {message}" if message else "."
raise Exception(
f"Unable to retrieve service instance from the API (code {result.code}){extra_info}"
)
found = result.result["data"]
if found is None:
return None
if service_entity_name and found["service_entity"] != service_entity_name:
return None
return found
def convert_instance(
self, instance: dict, validation: bool, instance_id: str
) -> Optional[dict]:
"""
Convert an instance from the API form to the return format of lsm::all
:param instance: The instance dict as returned by the api
:param validation: Whether this is a validation compile
:param instance_id: The id of the instance being validated (if this is a validation compile)
"""
service_entity = instance["service_entity"]
ls = self.get_lifecycle(service_entity)
state = ls[instance["state"]]
if not validation:
if not state.export_resources:
return None
else:
attributes = instance["active_attributes"]
else:
if instance_id == instance["id"]:
take_set = state.validate_self
if not take_set:
return None
else:
attributes = instance[f"{take_set}_attributes"]
else:
take_set = state.validate_others
if not take_set:
return None
else:
attributes = instance[f"{take_set}_attributes"]
if attributes is None:
attributes = {}
return {
"id": instance["id"],
"version": instance["version"],
"state": instance["state"],
"delete": instance["deleted"],
"deleted": instance["deleted"],
"service_entity": service_entity,
"attributes": attributes,
"candidate_attributes": instance["candidate_attributes"],
"active_attributes": instance["active_attributes"],
"rollback_attributes": instance["rollback_attributes"],
}
def get_all_instances(
self, env: str, service_entity_name: str, force: bool = False
) -> list[dict]:
"""
Get all (non-terminal) instances from the server for a specific environment and service_entity_name.
The result is cached and any subsequent call uses the cache.
:param env: the environment to use.
:param service_entity_name: the name of specific service entity for which to fetch the instances.
:param force: when true, the cache is refreshed from the server
:return: all instances, as dicts, in the format returned by the server.
"""
if force or service_entity_name not in self._instances_per_binding:
# get ALL instances
result = self._client.lsm_services_list(
tid=env, service_entity=service_entity_name
)
if result.code == 404:
# the type probably needs to be registered first
return []
if result.code != 200:
message = result.result.get("message") if result.result else None
extra_info = f": {message}" if message else "."
raise Exception(
f"Unable to retrieve service instance from the API (code {result.code}){extra_info}"
)
out = result.result["data"]
self._instances_per_binding[service_entity_name] = out
for instance in out:
self.cache_instance(instance)
return self._instances_per_binding[service_entity_name]
def cache_instance(self, instance: dict) -> None:
self._all_instance_cache[instance["id"]] = instance
self._current_state_cache[instance["id"]] = (
instance["state"],
instance["version"],
)
def get_instance_state(self, instance_id: str) -> Tuple[str, int]:
"""
Get the current state and version for a specific instance.
Can only be called for instances retrieved via get_all_instances
:param instance_id: the uuid for the service instance
:return: current state and version for the specific instance
"""
if instance_id not in self._current_state_cache:
# This should never happen, since the all() plugin is always called before the current_state() plugin
raise Exception(
"No service instances found, ensure the 'all' plugin has been called before"
)
return self._current_state_cache[instance_id]
def register_binding(self, binding) -> dict[str, object]:
"""Register a lifecycle binding and return its lifecycle"""
name = binding.service_entity_name
self._binding_cache_pre[name] = binding
self.resolve_binding_cache()
return self._binding_cache_lifecycles[name]
def resolve_binding_cache(self) -> None:
# Resolve all bindings at once, making multiple lsm::all behave more regular
todo = set(self._binding_cache_pre.keys()) - set(
self._binding_cache_lifecycles.keys()
)
for to_resolve in todo:
self._binding_cache_lifecycles[to_resolve] = collect_states(
self._binding_cache_pre[to_resolve].lifecycle
)
def get_lifecycle(self, entity_name: str) -> dict[str, dict[str, object]]:
if entity_name not in self._binding_cache_lifecycles:
raise Exception(
f"No lifecycle found for {entity_name}, ensure register_binding has been called."
)
return self._binding_cache_lifecycles[entity_name]
global_cache = CacheManager()
@plugins.plugin
def all(context: plugins.Context, binding: "lsm::ServiceEntityBinding") -> "list":
"""
Returns a list of records for the given binding.
:param binding: The entity binding
:return: A list of dict with all the defined records.
"""
# make calls that can cause re-scheduling
if binding.service_entity_name is None:
raise Exception(
"Only instances of entities that define its service type can be retrieved."
)
global_cache.register_binding(binding)
partial_compile: bool = (
os.environ.get(lsm_const.ENV_PARTIAL_COMPILE, "false").lower() == "true"
)
LOGGER.log(
LOG_LEVEL_TRACE,
"Resolution of service binding %s: start",
binding.service_entity_name,
)
# This may be re-executed a few times
# but complex to cache, easy to execute, so we don't bother
root = binding
try:
while True:
parent = root.owner
if not parent:
break
root = parent
except UnsetException:
LOGGER.warning(
"Please set the owner relation on the service entity binding for the service entity %s (instantiated at %s) "
"in the constructor, even if it is null",
binding.service_entity,
str(binding.__instance._location),
)
# only reraise when strictly required so we don't force the compiler to freeze the relation when unset
if partial_compile:
raise
except OptionalValueException:
# found root
pass
LOGGER.log(
LOG_LEVEL_TRACE,
"Resolution of service binding %s: found root %s",
binding.service_entity_name,
root.service_entity_name,
)
siblings = set()
todo = [root]
while todo:
item = todo.pop()
if item not in siblings:
siblings.add(item)
with (
contextlib.suppress(UnsetException)
if not partial_compile
else contextlib.nullcontext()
):
todo.extend(item.owned)
LOGGER.log(
LOG_LEVEL_TRACE,
"Resolution of service binding %s: found siblings [%s]",
binding.service_entity_name,
", ".join([sibling.service_entity_name for sibling in siblings]),
)
for sibling in siblings:
global_cache.register_binding(sibling)
LOGGER.log(
LOG_LEVEL_TRACE,
"Resolution of service binding %s: done",
binding.service_entity_name,
)
# at this point, we are probably good wrt to re-execution
# collect environment context
service_entity_name = binding.service_entity_name
env = context.get_environment_id()
model_state = os.environ.get(lsm_const.ENV_MODEL_STATE, ModelState.active)
validation = model_state == ModelState.candidate
instance_id = os.environ.get(lsm_const.ENV_INSTANCE_ID, None)
raw_instances: list[dict]
if partial_compile:
if instance_id is None:
raise Exception(f"Environment variable {lsm_const.ENV_INSTANCE_ID} not set")
if len(siblings) == 1:
instance: Optional[dict] = global_cache.get_instance(
env, service_entity_name, instance_id
)
# instance is None if the service instance with instance_id belongs
# to a different service entity than service_entity_name.
raw_instances = [instance] if instance is not None else []
else:
selector: inmanta_plugins.lsm.partial.ParentSelector = (
global_cache.get_selector(root, env, instance_id, validation)
)
raw_instances = selector.select(service_entity_name)
else:
raw_instances = global_cache.get_all_instances(env, service_entity_name)
if binding.allocation_spec:
import inmanta_plugins.lsm.allocation as allocation
allocationspec = allocation.global_allocation_collector.get_spec(
binding.allocation_spec
)
else:
allocationspec = None
instances = []
to_allocate = []
for data in raw_instances:
instance = global_cache.convert_instance(data, validation, instance_id)
if not instance:
continue
instances.append(instance)
if validation and instance_id == data["id"]:
to_allocate.append(instance)
if to_allocate and allocationspec:
LOGGER.log(
logging.DEBUG,
"Performing allocation for %d instances of type %s: done",
len(to_allocate),
binding.service_entity_name,
)
allocation.do_allocate_instances(context, binding, allocationspec, to_allocate)
LOGGER.log(
logging.DEBUG,
"lsm::all returning %d instances for type %s",
len(instances),
binding.service_entity_name,
)
return instances
@plugins.plugin
def current_state(ctx: plugins.Context, fsm: "lsm::ServiceEntity") -> "dict":
"""
Returns the current state from the lifecycle and the next version of the instance
"""
instance_id = fsm.instance_id
state_name, current_version = global_cache.get_instance_state(instance_id)
states = {}
# collect all states in the graph
for transfer in fsm.entity_binding.lifecycle.transfers:
states[transfer.source.name] = transfer.source
states[transfer.target.name] = transfer.target
try:
states[transfer.error.name] = transfer.error
except OptionalValueException:
pass
if state_name not in states:
raise Exception(
"The state returned from the API does not exist in the lifecycle defined in the model."
)
return {
"state": states[state_name]._get_instance(),
"next_version": current_version + 1,
}
@plugins.plugin
def has_current_state(
ctx: plugins.Context, service_instance: "lsm::ServiceEntity", state_name: "string"
) -> "bool":
"""
Check whether the given service instance is currently in the given state of its lifecycle.
:param service_instance: The ServiceEntity object.
:param state_name: The name of the lifecycle state
"""
instance_id = service_instance.instance_id
current_state, _ = global_cache.get_instance_state(instance_id)
return current_state == state_name
@plugins.plugin
def is_validating(instance_id: "string") -> "bool":
"""
Return true if the current compile is a validating compile and the instance
being validated has the given id.
:param instance_id: The id of the instance we want to check for validation.
"""
return (
inmanta_plugins.lsm.allocation_helpers.is_validation_compile()
and inmanta_plugins.lsm.allocation_helpers.get_validated_instance_id()
== instance_id
)
@plugins.plugin
def format(__string: "string", args: "list", kwargs: "dict") -> "string":
"""
Format a string using python string formatter, and accepting statements which
native inmanta f-string doesn't support (such as accessing dict values)
:param __string: The string to apply formatting to
:param args: The positional arguments to feed into the `str.format` method
:param kwargs: The keyword arguments to feed into the `str.format` method
"""
return __string.format(*args, **kwargs)
@resources.resource("lsm::LifecycleTransfer", agent="agent", id_attribute="instance_id")
class LifecycleTransferResource(resources.PurgeableResource):
"""
A resource that collects deploy events and send them to the lsm
"""
fields = ("service_entity", "instance_id", "next_version", "resources")
@staticmethod
def get_resources(exporter, transfer):
resource_list = []
for res in transfer.resources:
try:
resource_list.append(resources.to_id(res))
except IgnoreResourceException:
pass
sorted_resources = sorted(resource_list)
return sorted_resources
# This class should ideally inherit from the more generic `HandlerAPI` instead of `CRUDHandler` due to its limited adherence
# to the interface. However, since the `HandlerAPI` class is only available on iso7+ and we want to remain compatible with
# iso6 this cannot yet be done.
@handler.provider("lsm::LifecycleTransfer", name="local_state")
class LifecycleTransferHandler(handler.CRUDHandler):
"""
A handler that collects all resource statuses from resources that are part of a service instance.
The deploy() method is used to determine whether the resources of a service instance are deployed successfully or not.
"""
client: Optional[protocol.Client]
def pre(
self, ctx: handler.HandlerContext, resource: LifecycleTransferResource
) -> None:
self.client = protocol.Client("agent")
def deploy(
self,
ctx: handler.HandlerContext,
resource: LifecycleTransferResource,
requires: dict[ResourceIdStr, const.ResourceState],
) -> None:
self.pre(ctx, resource)
try:
all_resources_are_deployed_successfully = self._send_current_state(
ctx, resource, requires
)
if all_resources_are_deployed_successfully:
ctx.set_status(const.ResourceState.deployed)
else:
ctx.set_status(const.ResourceState.failed)
finally:
self.post(ctx, resource)
def _send_current_state(
self,
ctx: handler.HandlerContext,
resource: LifecycleTransferResource,
fine_grained_resource_states: dict[ResourceIdStr, const.ResourceState],
) -> bool:
"""
Report the resource states for the resources in this service instance to inmanta LSM. This method raises a SkipResource
exception when one of the given resource states is a transient state (i.e. it doesn't match the failed or deployed
state).
:param resource: This LifecycleTransfer resource.
:param fine_grained_resource_states: The resource state for each resource part of this service instance.
:return: True iff all the given resource states are equal to deployed.
"""
# If a resource is not in events, it means that it was deployed before so we can mark it as success
resource_states = {res: ResourceState.success for res in resource.resources}
is_failed = False
is_skip = False
# Convert inmanta.const.ResourceState to inmanta_lsm.model.ResourceState
for resource_id, state in fine_grained_resource_states.items():
if state == const.ResourceState.failed:
resource_states[resource_id] = ResourceState.error
is_failed = True
elif state == const.ResourceState.deployed:
# same as default list
pass
elif resource_id in resource_states:
# some transient state that is not failed and not success, so lets skip
is_skip = True
# failure takes precedence over transient
# transient takes precedence over success
if is_skip and not is_failed:
raise SkipResource()
ctx.info(
"Sending %(states_str)s",
states_str=", ".join([f"{k}: {v}" for k, v in resource_states.items()]),
states=resource_states,
)
def call() -> typing.Awaitable[protocol.Result]:
return self.client.lsm_services_resources_set_state(
self._agent._env_id,
resource.service_entity,
resource.instance_id,
resource.next_version - 1,
resource_states,
)
response = self.run_sync(call)
if response.code == 409:
# Conflict: Transfer already happened
return not is_failed
if response.code != 200:
ctx.error(
"Failed to set resource state on lsm.",
code=response.code,
result=response.result,
)
# and raise exception to mark this resource as failed
raise Exception("Failed to set resource state on lsm.")
return not is_failed
def _diff(
self, current: LifecycleTransferResource, desired: LifecycleTransferResource
) -> dict[str, dict[str, Any]]:
changes = super()._diff(current, desired)
if "next_version" in changes:
# no changes required when current > desired
current = changes["next_version"]["current"]
desired = changes["next_version"]["desired"]
if current > desired:
del changes["next_version"]
return changes
def read_resource(
self, ctx: handler.HandlerContext, resource: LifecycleTransferResource
) -> None:
"""
This method is used by the dry_run code path.
"""
def call():
# get the state from lsm
return self.client.lsm_services_get(
self._agent._env_id, resource.service_entity, resource.instance_id
)
response = self.run_sync(call)
if response.code != 200:
ctx.error(
"Failed to retrieve service instance",
message=response.result,
status=response.code,
)
raise Exception("Failed to retrieve service instance")
ctx.set("instance", response.result["data"])
ctx.set("transferred", False)
ctx.info(
"Retrieved instance with version %(version)s",
version=response.result["data"]["version"],
)
resource.next_version = response.result["data"]["version"]
def create_resource(
self, ctx: handler.HandlerContext, resource: LifecycleTransferResource
) -> None:
raise Exception(
"This method should not be called, because this handler overrides the deploy() method."
)
def delete_resource(
self, ctx: handler.HandlerContext, resource: LifecycleTransferResource
) -> None:
raise Exception(
"This method should not be called, because this handler overrides the deploy() method."
)
def update_resource(
self,
ctx: handler.HandlerContext,
changes: dict,
resource: LifecycleTransferResource,
) -> None:
raise Exception(
"This method should not be called, because this handler overrides the deploy() method."
)
def post(
self, ctx: handler.HandlerContext, resource: LifecycleTransferResource
) -> None:
self.client = None
@resources.resource("lsm::ServiceInstance", agent="agent", id_attribute="instance_id")
class ServiceInstanceResource(resources.PurgeableResource):
fields = (
"service_entity",
"instance_id",
"attributes",
"skip_update_states",
"rejected_states",
)
@staticmethod
def get_attributes(exporter, instance):
return instance.attributes
@staticmethod
def get_instance_id(_, instance):
return instance.instance_id
@staticmethod
def get_service_entity(_, instance):
return instance.service_entity
@handler.provider("lsm::ServiceInstance", name="service_instance")
class ServiceInstanceHandler(handler.CRUDHandler):
client: protocol.SyncClient
def pre(self, ctx, resource) -> None:
self.client = protocol.Client("agent")
def read_instance(self, service_instance: ServiceInstanceResource):
def call():
return self.client.lsm_services_get(
self._agent._env_id,
service_instance.service_entity,
service_instance.instance_id,
)
return call
def read_resource(
self, ctx: handler.HandlerContext, service_instance: ServiceInstanceResource
) -> None:
# get the state from lsm
def call():
return self.client.lsm_services_get(
self._agent._env_id,
service_instance.service_entity,
service_instance.instance_id,
)
response = self.run_sync(call)
if response.code == 404:
service_instance.purged = True
return
if response.code != 200:
ctx.error(
"Failed to retrieve service instance",
message=response.result,
status=response.code,
)
raise Exception("Failed to retrieve service instance")
ctx.set("instance", response.result["data"])
ctx.info(
"Retrieved instance with version %(version)s",
version=response.result["data"]["version"],
)
service_instance.attributes = (
response.result["data"]["candidate_attributes"]
if not response.result["data"]["active_attributes"]
else response.result["data"]["active_attributes"]
)
def create_resource(
self, ctx: handler.HandlerContext, service_instance: ServiceInstanceResource
) -> None:
def call():
return self.client.lsm_services_create(
self._agent._env_id,
service_instance.service_entity,
service_instance.attributes,
service_instance.instance_id,
)
response = self.run_sync(call)
if response.code != 200:
ctx.error(
"Failed to create service instance on lsm.",
code=response.code,
result=response.result,
)
raise Exception(
f"Failed to set resource state on lsm. {str(response.result)}"
)
ctx.set("instance", response.result["data"])
ctx.info(
"Created instance with version %(version)s",
version=response.result["data"]["version"],
)
service_instance.instance_id = response.result["data"]["id"]
ctx.set_created()
def delete_resource(
self, ctx: handler.HandlerContext, service_instance: ServiceInstanceResource
) -> None:
instance_read_response = self.run_sync(self.read_instance(service_instance))
instance_version = instance_read_response.result["data"]["version"]
instance_current_state = instance_read_response.result["data"]["state"]
def call():
return self.client.lsm_services_delete(
self._agent._env_id,
service_instance.service_entity,
service_instance.instance_id,
instance_version,
)
response = self.run_sync(call)
if response.code == 404 and "cannot be updated" in response.result["message"]:
raise SkipResource(
f"Instance {instance_read_response.result['data']['id']} "
f"cannot be deleted while in state {instance_current_state}"
)
if response.code != 200:
ctx.error(
"Failed to delete service instance on lsm.",
code=response.code,
result=response.result,
)
raise Exception("Failed to delete service instance on lsm.")
ctx.set("instance", None)
ctx.set_purged()
def calculate_diff(
self,
ctx: handler.HandlerContext,
current: ServiceInstanceResource,
desired: ServiceInstanceResource,
) -> dict[str, dict[str, Any]]:
changes = super()._diff(current, desired)
if "attributes" not in changes:
return changes
# Desired might not contain the attributes that have default value
for attribute_name, attribute_value in changes["attributes"]["current"].items():
if attribute_name not in changes["attributes"]["desired"].keys():
changes["attributes"]["desired"][attribute_name] = attribute_value
if changes["attributes"]["current"] == changes["attributes"]["desired"]:
del changes["attributes"]
return changes
def update_resource(
self,
ctx: handler.HandlerContext,
changes: dict,
service_instance: ServiceInstanceResource,
) -> None:
read_response = self.run_sync(self.read_instance(service_instance))
state = read_response.result["data"]["state"]
instance_version = read_response.result["data"]["version"]
if state in service_instance.skip_update_states:
raise SkipResource(f"Instance cannot be updated from state {state}")
if state in service_instance.rejected_states:
raise Exception(f"Instance is in a rejected state: {state}")
updated = {}
if "attributes" in changes:
# Check the desired attributes
for attribute_name, attribute_value in changes["attributes"][
"desired"
].items():
# If it's already in the candidates, but with a different value, update the value
if (
read_response.result["data"]["candidate_attributes"]
and read_response.result["data"]["candidate_attributes"][
attribute_name
]
!= attribute_value
):
updated[attribute_name] = attribute_value
# If it's different from the active attribute, and not already in the candidates, update it
if (
read_response.result["data"]["active_attributes"]
and read_response.result["data"]["active_attributes"][
attribute_name
]
!= attribute_value
and attribute_name not in updated.keys()
):
updated[attribute_name] = attribute_value
del changes["attributes"]
if changes:
raise Exception(
f"Changing {changes.keys()} of an instance is not supported"
)
def call():
return self.client.lsm_services_update(
self._agent._env_id,
service_instance.service_entity,
service_instance.instance_id,
instance_version,
updated,
)
if updated:
ctx.debug(
f"Updating instance {service_instance.id} with attributes {str(updated)}"
)
response = self.run_sync(call)
if response.code != 200:
ctx.error(
"Failed to update service instance on lsm.",
code=response.code,
result=response.result,
)
raise Exception(
f"Failed to update service instance on lsm. {str(response.result)}"
)
ctx.set_updated()
@dependency_manager
def lsm_dependency_manager(entities: ModelDict, resources: ResourceDict) -> None:
for res in resources.values():
if res.id.entity_type == "lsm::LifecycleTransfer":
# Ensure that the resources list contains only resources that exist.
# The requires set was already filtered from resources that don't exist.
res.resources = sorted([r.resource_str() for r in res.requires])
def inmanta_reset_state() -> None:
"""
Reset the cached state for this module.
"""
global_cache.reset()