Source code for inmanta.ast.variables

"""
    Copyright 2017 Inmanta

    Licensed under the Apache License, Version 2.0 (the "License");
    you may not use this file except in compliance with the License.
    You may obtain a copy of the License at

        http://www.apache.org/licenses/LICENSE-2.0

    Unless required by applicable law or agreed to in writing, software
    distributed under the License is distributed on an "AS IS" BASIS,
    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and
    limitations under the License.

    Contact: code@inmanta.com
"""

import logging
from collections import abc
from typing import Generic, Optional, TypeVar, Union

import inmanta.execute.dataflow as dataflow
from inmanta.ast import LocatableString, Location, NotFoundException, OptionalValueException, Range, RuntimeException
from inmanta.ast.statements import (
    AssignStatement,
    AttributeAssignmentLHS,
    ExpressionStatement,
    RawResumer,
    Statement,
    VariableReferenceHook,
    VariableResumer,
)
from inmanta.ast.statements.assign import Assign, SetAttribute
from inmanta.execute.dataflow import DataflowGraph
from inmanta.execute.runtime import (
    QueueScheduler,
    RawUnit,
    Resolver,
    ResultCollector,
    ResultVariable,
    ResultVariableProxy,
    VariableABC,
    WrappedValueVariable,
)
from inmanta.execute.util import NoneValue, Unknown
from inmanta.parser import ParserException
from inmanta.stable_api import stable_api

LOGGER = logging.getLogger(__name__)


R = TypeVar("R", bound="Reference")


[docs] @stable_api class Reference(ExpressionStatement): """ This class represents a reference to a value :ivar name: The name of the Reference as a string. """ __slots__ = ("locatable_name", "name", "full_name") def __init__(self, name: LocatableString) -> None: ExpressionStatement.__init__(self) self.locatable_name = name self.name = str(name) self.full_name = str(name) def normalize(self, *, lhs_attribute: Optional[AttributeAssignmentLHS] = None) -> None: split: abc.Sequence[str] = self.name.rsplit("::", maxsplit=1) if len(split) > 1: # fail-fast if namespace does not exist try: self.namespace.lookup_namespace(split[0]) except NotFoundException as e: e.set_statement(self) raise def requires(self) -> list[str]: return [self.full_name] def requires_emit( self, resolver: Resolver, queue: QueueScheduler, *, propagate_unset: bool = False ) -> dict[object, VariableABC]: requires: dict[object, VariableABC] = super().requires_emit(resolver, queue) # FIXME: may be done more efficient? requires[self.name] = resolver.lookup(self.full_name) return requires def requires_emit_gradual( self, resolver: Resolver, queue: QueueScheduler, resultcollector: ResultCollector, *, propagate_unset: bool = False ) -> dict[object, VariableABC]: result: dict[object, VariableABC] = self.requires_emit(resolver, queue, propagate_unset=propagate_unset) var: VariableABC = result[self.name] assert isinstance(var, ResultVariable) listener_registered: bool = var.listener(resultcollector, self.location) if not listener_registered: # pass on resultcollector for explicit reporting in execute result[(self, ResultCollector)] = WrappedValueVariable(resultcollector) return result def _resolve(self, requires: dict[object, object], resolver: Resolver, queue: QueueScheduler) -> object: return requires[self.name] def execute_direct(self, requires: abc.Mapping[str, object]) -> object: if self.name not in requires: raise NotFoundException(self, "Could not resolve the value %s in this static context" % self.name) return requires[self.name] def get_root_variable(self) -> "Reference": """ Returns the root reference node. e.g. for a.b.c.d, returns the reference for a. """ return self def fully_qualified(self: R) -> R: """ If this reference is already fully qualified, returns it unchanged. Otherwise, returns a new fully qualified reference to this name in this reference's namespace. This fully qualified reference is not guaranteed to resolve to the same object. It is the caller's responsibility to only request a fully qualified name when appropriate. """ if "::" in self.name: return self fully_qualified_name: str = f"{self.namespace.name}::{self.name}" locatable: LocatableString = LocatableString( fully_qualified_name, Range("__internal__", 1, 1, 1, 1), -1, self.namespace ) result: R = self.__class__(locatable) result.location = locatable.location return result def as_assign(self, value: ExpressionStatement, list_only: bool = False) -> AssignStatement: if list_only: raise ParserException(self.location, "+=", "Can not perform += on variable %s" % self.name) return Assign(self.locatable_name, value) def root_in_self(self) -> "Reference": if self.name == "self": return self else: ref = Reference("self") self.copy_location(ref) attr_ref = AttributeReference(ref, self.locatable_name) self.copy_location(attr_ref) return attr_ref def get_dataflow_node(self, graph: DataflowGraph) -> dataflow.AssignableNodeReference: return graph.resolver.get_dataflow_node(self.name) def __str__(self) -> str: return self.name def __repr__(self) -> str: return self.name
T = TypeVar("T") class VariableReader(VariableResumer, Generic[T]): """ Resumes execution on a variable when it becomes avaiable, then connects the target proxy variable to it. Optionally subscribes a result collector to intermediate values. """ __slots__ = ("target",) def __init__(self, target: ResultVariableProxy[T]) -> None: super().__init__() self.target: ResultVariableProxy[T] = target def variable_resume( self, variable: VariableABC[T], resolver: Resolver, queue_scheduler: QueueScheduler, ) -> None: self.target.connect(variable) class IsDefinedGradual(VariableResumer, RawResumer, ResultCollector[object]): """ Fill target variable with is defined result as soon as it gets known. """ __slots__ = ("owner", "target") def __init__(self, owner: Statement, target: ResultVariable[bool]) -> None: super().__init__() self.owner: Statement = owner self.target: ResultVariable[bool] = target def pure_gradual(self) -> bool: # freezing an empty variable causes progress return False def receive_result(self, value: object, location: Location) -> bool: """ Gradually receive an assignment to the referenced variable. Sets the target variable to True because to receive a single value implies that the variable is defined. """ if isinstance(value, Unknown): # value may or may not be defined, nothing can be decided yet return False self.target.set_value(True, self.owner.location) return True def variable_resume( self, variable: VariableABC[object], resolver: Resolver, queue_scheduler: QueueScheduler, ) -> None: if variable.is_ready(): self.target.set_value(self._target_value(variable), self.owner.location) else: # gradual execution: as soon as a value comes in, the result is known variable.listener(self, self.owner.location) # wait for variable completeness in case no value comes in at all RawUnit(queue_scheduler, resolver, {self: variable}, self, override_exception_location=False) def resume(self, requires: dict[object, VariableABC], resolver: Resolver, queue_scheduler: QueueScheduler) -> None: self.target.set_value(self._target_value(requires[self]), self.owner.location) def _target_value(self, variable: VariableABC[object]) -> Union[bool, Unknown]: """ Returns the target value based on the attribute variable's value or absence of a value. """ try: value = variable.get_value() if isinstance(value, Unknown): return Unknown(self) if isinstance(value, list): if len(value) == 0: return False elif all(isinstance(v, Unknown) for v in value): return Unknown(self) else: return True elif isinstance(value, NoneValue): return False return True except OptionalValueException: return False def emit(self, resolver: Resolver, queue: QueueScheduler) -> None: raise RuntimeException(self, "%s is not an actual AST node, it should never be executed" % self.__class__.__name__) def execute(self, requires: dict[object, object], resolver: Resolver, queue: QueueScheduler) -> object: raise RuntimeException(self, "%s is not an actual AST node, it should never be executed" % self.__class__.__name__) class AttributeReference(Reference): """ This variable refers to an attribute. This is mostly used to refer to attributes of a class or class instance. """ __slots__ = ("attribute", "instance") def __init__(self, instance: Reference, attribute: LocatableString) -> None: range: Range = Range( instance.locatable_name.location.file, instance.locatable_name.lnr, instance.locatable_name.start, attribute.elnr, attribute.end, ) reference: LocatableString = LocatableString( f"{instance.full_name}.{attribute}", range, instance.locatable_name.lexpos, instance.namespace ) Reference.__init__(self, reference) self.attribute = attribute # a reference to the instance self.instance = instance def requires(self) -> list[str]: return self.instance.requires() def requires_emit( self, resolver: Resolver, queue: QueueScheduler, *, propagate_unset: bool = False ) -> dict[object, VariableABC]: return self.requires_emit_gradual(resolver, queue, None, propagate_unset=propagate_unset) def requires_emit_gradual( self, resolver: Resolver, queue: QueueScheduler, resultcollector: Optional[ResultCollector], *, propagate_unset: bool = False, ) -> dict[object, VariableABC]: requires: dict[object, VariableABC] = self._requires_emit_promises(resolver, queue) # The tricky one! # introduce proxy variable to point to the eventual result of this stmt proxy: ResultVariableProxy[object] = ResultVariableProxy( listener=(resultcollector, self.location) if resultcollector is not None else None, ) # construct waiter reader: VariableReader = VariableReader(target=proxy) hook: VariableReferenceHook = VariableReferenceHook( self.instance, str(self.attribute), variable_resumer=reader, propagate_unset=propagate_unset, ) self.copy_location(hook) hook.schedule(resolver, queue) # wait for the attribute value requires[self] = proxy return requires def _resolve(self, requires: dict[object, object], resolver: Resolver, queue: QueueScheduler) -> object: # helper returned: return result return requires[self] def get_root_variable(self) -> "Reference": """ Returns the root reference node. e.g. for a.b.c.d, returns the reference for a. """ return self.instance.get_root_variable() def as_assign(self, value: ExpressionStatement, list_only: bool = False) -> AssignStatement: return SetAttribute(self.instance, str(self.attribute), value, list_only) def root_in_self(self) -> Reference: out = AttributeReference(self.instance.root_in_self(), str(self.attribute)) self.copy_location(out) return out def get_dataflow_node(self, graph: DataflowGraph) -> dataflow.AttributeNodeReference: assert self.instance is not None return dataflow.AttributeNodeReference(self.instance.get_dataflow_node(graph), str(self.attribute)) def __repr__(self) -> str: return f"{repr(self.instance)}.{str(self.attribute)}"