"""
Copyright 2017 Inmanta
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
Contact: code@inmanta.com
"""
from collections.abc import Iterable, Mapping, Sequence
from copy import copy
from typing import Callable, Optional, Union
from inmanta.ast import NotFoundException, RuntimeException
from inmanta.execute.util import NoneValue, Unknown
from inmanta.stable_api import stable_api
from inmanta.types import PrimitiveTypes
from inmanta.util import JSONSerializable
try:
from typing import TYPE_CHECKING
except ImportError:
TYPE_CHECKING = False
if TYPE_CHECKING:
from inmanta.ast.attribute import Attribute
from inmanta.ast.entity import Entity
from inmanta.execute.runtime import Instance
class UnsetException(RuntimeException):
"""
This exception is thrown when an attribute is accessed that was not yet
available (i.e. it has not been frozen yet).
"""
def __init__(self, msg: str, instance: Optional["Instance"] = None, attribute: Optional["Attribute"] = None) -> None:
RuntimeException.__init__(self, None, msg)
self.instance: Optional["Instance"] = instance
self.attribute: Optional["Attribute"] = attribute
self.msg = msg
def get_result_variable(self) -> Optional["Instance"]:
return self.instance
class UnknownException(Exception):
"""
This exception is thrown when code tries to access a value that is
unknown and cannot be determined during this evaluation. The code
receiving this exception is responsible for invalidating any results
depending on this value by return an instance of Unknown as well.
"""
def __init__(self, unknown: Unknown):
super().__init__()
self.unknown = unknown
class AttributeNotFound(NotFoundException, AttributeError):
"""
Exception used for backwards compatibility with try-except blocks around some_proxy.some_attr.
This previously raised `NotFoundException` which is currently deprecated in this context.
Its new behavior is to raise an AttributeError for compatibility with Python's builtin `hasattr`.
"""
[docs]
@stable_api
class DynamicProxy:
"""
This class wraps an object and makes sure that a model is never modified
by native code.
"""
def __init__(self, instance: "Instance") -> None:
object.__setattr__(self, "__instance", instance)
def _get_instance(self) -> "Instance":
return object.__getattribute__(self, "__instance")
[docs]
@classmethod
def unwrap(cls, item: object) -> object:
"""
Converts a value from the plugin domain to the internal domain.
"""
if item is None:
return NoneValue()
if isinstance(item, DynamicProxy):
return item._get_instance()
if isinstance(item, list):
return [cls.unwrap(x) for x in item]
if isinstance(item, dict):
def recurse_dict_item(key_value: tuple[object, object]) -> tuple[object, object]:
(key, value) = key_value
if not isinstance(key, str):
raise RuntimeException(
None, f"dict keys should be strings, got {key} of type {type(key)} with dict value {value}"
)
return (key, cls.unwrap(value))
return dict(map(recurse_dict_item, item.items()))
return item
[docs]
@classmethod
def return_value(cls, value: object) -> Union[None, str, tuple[object, ...], int, float, bool, "DynamicProxy"]:
"""
Converts a value from the internal domain to the plugin domain.
"""
if value is None:
return None
if isinstance(value, NoneValue):
return None
if isinstance(value, Unknown):
raise UnknownException(value)
if isinstance(value, (str, tuple, int, float, bool)):
return copy(value)
if isinstance(value, DynamicProxy):
return value
if isinstance(value, dict):
return DictProxy(value)
if hasattr(value, "__len__"):
return SequenceProxy(value)
if hasattr(value, "__call__"):
return CallProxy(value)
return DynamicProxy(value)
def __getattr__(self, attribute: str):
instance = self._get_instance()
try:
value = instance.get_attribute(attribute).get_value()
except NotFoundException as e:
# allow for hasattr(proxy, "some_attr")
raise AttributeNotFound(e.stmt, e.name)
return DynamicProxy.return_value(value)
def __setattr__(self, attribute: str, value: object) -> None:
raise Exception("Readonly object")
def _type(self) -> "Entity":
"""
Return the type of the proxied instance
"""
return self._get_instance().type
def is_unknown(self) -> bool:
"""
Return true if this value is unknown and cannot be determined
during this compilation run
"""
if isinstance(self._get_instance(), Unknown):
return True
return False
def __hash__(self) -> int:
return hash(self._get_instance())
def __eq__(self, other: object) -> bool:
if hasattr(other, "_get_instance"):
other = other._get_instance()
return self._get_instance() == other
def __lt__(self, other: object) -> bool:
if hasattr(other, "_get_instance"):
other = other._get_instance()
return self._get_instance() < other
def __repr__(self) -> str:
return "@%s" % repr(self._get_instance())
class SequenceProxy(DynamicProxy, JSONSerializable):
def __init__(self, iterator: Sequence) -> None:
DynamicProxy.__init__(self, iterator)
def __getitem__(self, key: str) -> object:
instance = self._get_instance()
if isinstance(key, str):
raise RuntimeException(self, f"can not get a attribute {key}, {self._get_instance()} is a list")
return DynamicProxy.return_value(instance[key])
def __len__(self) -> int:
return len(self._get_instance())
def __iter__(self) -> Iterable:
instance = self._get_instance()
return IteratorProxy(instance.__iter__())
def json_serialization_step(self) -> list[PrimitiveTypes]:
# Ensure proper unwrapping by using __getitem__
return [i for i in self]
class DictProxy(DynamicProxy, Mapping, JSONSerializable):
def __init__(self, mydict: dict[object, object]) -> None:
DynamicProxy.__init__(self, mydict)
def __getitem__(self, key):
instance = self._get_instance()
if not isinstance(key, str):
raise RuntimeException(self, f"Expected string key, but got {key}, {self._get_instance()} is a dict")
return DynamicProxy.return_value(instance[key])
def __len__(self) -> int:
return len(self._get_instance())
def __iter__(self):
instance = self._get_instance()
return IteratorProxy(instance.__iter__())
def json_serialization_step(self) -> dict[str, PrimitiveTypes]:
# Ensure proper unwrapping by using __getitem__
return {k: v for k, v in self.items()}
class CallProxy(DynamicProxy):
"""
Proxy a value that implements a __call__ function
"""
def __init__(self, instance: Callable[..., object]) -> None:
DynamicProxy.__init__(self, instance)
def __call__(self, *args, **kwargs):
instance = self._get_instance()
return instance(*args, **kwargs)
class IteratorProxy(DynamicProxy):
"""
Proxy an iterator call
"""
def __init__(self, iterator: Iterable[object]) -> None:
DynamicProxy.__init__(self, iterator)
def __iter__(self):
return self
def __next__(self):
i = self._get_instance()
return DynamicProxy.return_value(next(i))