"""
Copyright 2019 Inmanta
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
Contact: code@inmanta.com
"""
import base64
import functools
import hashlib
import importlib
import importlib.util
import inspect
import logging
import os
import pathlib
import shutil
import sys
import types
from collections import abc
from collections.abc import Iterable, Iterator, Sequence
from dataclasses import dataclass
from importlib.abc import FileLoader, MetaPathFinder
from importlib.machinery import ModuleSpec, SourcelessFileLoader
from itertools import chain
from typing import TYPE_CHECKING, Optional
from inmanta import const, module
from inmanta.stable_api import stable_api
from inmanta.util import hash_file_streaming
if TYPE_CHECKING:
from inmanta import protocol
VERSION_FILE = "version"
MODULE_DIR = "modules"
PLUGIN_DIR = "plugins"
LOGGER = logging.getLogger(__name__)
def get_inmanta_module_name(python_module_name: str) -> str:
"""Small utility to convert python module into inmanta module"""
module_parts = python_module_name.split(".")
if module_parts[0] != const.PLUGINS_PACKAGE:
raise Exception(
"All instances from which the source is loaded, should be defined in the inmanta plugins package. "
"%s does not match" % python_module_name
)
return module_parts[1]
class SourceNotFoundException(Exception):
"""This exception is raised when the source of the provided type is not found"""
class SourceInfo:
"""This class is used to store information related to source code information"""
def __init__(self, path: str, module_name: str) -> None:
"""
:param path: The path of the source code file
:param module_name: The fully qualified name of the Python module. Should be a module in the inmanta_plugins namespace.
"""
self.path = path
self._hash: Optional[str] = None
self._content: Optional[bytes] = None
self._requires: Optional[list[str]] = None
self.module_name = module_name
@property
def hash(self) -> str:
"""Get the sha1 hash of the file"""
if self._hash is None:
sha1sum = hashlib.new("sha1")
sha1sum.update(self.content)
self._hash = sha1sum.hexdigest()
return self._hash
@property
def content(self) -> bytes:
"""Get the content of the file"""
if self._content is None:
with open(self.path, "rb") as fd:
self._content = fd.read()
return self._content
def _get_module_name(self) -> str:
"""Get the name of the inmanta module, derived from the python module name"""
return get_inmanta_module_name(self.module_name)
@property
def requires(self) -> list[str]:
"""List of python requirements associated with this source file"""
if self._requires is None:
project: module.Project = module.Project.get()
mod: module.Module = project.modules[self._get_module_name()]
if project.metadata.agent_install_dependency_modules:
self._requires = mod.get_all_python_requirements_as_list()
else:
self._requires = mod.get_strict_python_requirements_as_list()
return self._requires
class CodeManager:
"""This class is responsible for loading and packaging source code for types (resources, handlers, ...) that need to be
available in a remote process (e.g. agent).
__type_file: Maps Inmanta type names (e.g., ``std::testing::NullResource``, ``mymodule::Mytype``)
to sets of filenames containing
the necessary source code (all plugin files in the module).
__file_info: Stores metadata about each individual source code file. The keys are file paths and the values
in this dictionary are ``SourceInfo`` objects.
"""
def __init__(self) -> None:
# Old implementation
# Use by external code
self.__type_file: dict[str, set[str]] = {}
self.__file_info: dict[str, SourceInfo] = {}
# Cache of module to source info
self.__module_to_source_info: dict[str, list[SourceInfo]] = {}
def register_code(self, type_name: str, instance: object) -> None:
"""Register the given type_object under the type_name and register the source associated with this type object.
:param type_name: The inmanta type name for which the source of type_object will be registered.
For example std::testing::NullResource
:param instance: An instance for which the code needs to be registered.
"""
file_name = self.get_object_source(instance)
if file_name is None:
raise SourceNotFoundException(f"Unable to locate source code of instance {inspect} for entity {type_name}")
if type_name not in self.__type_file:
self.__type_file[type_name] = set()
# if file_name is in there, all plugin files should be in there => return
if file_name in self.__type_file[type_name]:
return
# get the module
module_name = get_inmanta_module_name(instance.__module__)
all_plugin_files: list[SourceInfo] = self._get_source_info_for_module(module_name)
self.__type_file[type_name].update(source_info.path for source_info in all_plugin_files)
def _get_source_info_for_module(self, module_name: str) -> list[SourceInfo]:
if module_name in self.__module_to_source_info:
return self.__module_to_source_info[module_name]
sources = [
SourceInfo(path, module_name) for path, module_name in module.Project.get().modules[module_name].get_plugin_files()
]
self.__module_to_source_info[module_name] = sources
# Register files
for file_info in sources:
self.__file_info[file_info.path] = file_info
return sources
def get_object_source(self, instance: object) -> Optional[str]:
"""Get the path of the source file in which type_object is defined"""
try:
return inspect.getsourcefile(instance)
except TypeError:
return None
def get_file_hashes(self) -> Iterable[str]:
"""Return the hashes of all source files"""
return (info.hash for info in self.__file_info.values())
def get_file_content(self, hash: str) -> bytes:
"""Get the file content for the given hash"""
for info in self.__file_info.values():
if info.hash == hash:
return info.content
raise KeyError("No file found with this hash")
def get_types(self) -> Iterable[tuple[str, list[SourceInfo]]]:
"""Get a list of all registered types"""
return ((type_name, [self.__file_info[path] for path in files]) for type_name, files in self.__type_file.items())
@dataclass(frozen=True)
@functools.total_ordering
class ModuleSource:
"""
:param name: the name of the python module. e.g. inmanta_plugins.model.x
:param is_byte_code: is this content python byte code or python source
:param source: the content of the file
:param _client: a protocol client, required when source is not set
"""
name: str
hash_value: str
is_byte_code: bool
source: Optional[bytes] = None
_client: Optional["protocol.SyncClient"] = None
def __lt__(self, other):
if not isinstance(other, ModuleSource):
return NotImplemented
return (self.name, self.hash_value, self.is_byte_code) < (other.name, other.hash_value, other.is_byte_code)
def __eq__(self, other):
if not isinstance(other, ModuleSource):
return False
return (self.name, self.hash_value, self.is_byte_code) == (other.name, other.hash_value, other.is_byte_code)
def get_source_code(self) -> bytes:
"""Load the source code"""
if self.source is not None:
return self.source
if self._client is None:
raise Exception("_client should be set to use this method.")
response: protocol.Result = self._client.get_file(self.hash_value)
if response.code != 200 or response.result is None:
raise Exception(f"Failed to fetch code for {self.name} with hash {self.hash_value}.")
return base64.b64decode(response.result["content"])
def for_transport(self) -> "ModuleSource":
return ModuleSource(name=self.name, hash_value=self.hash_value, is_byte_code=self.is_byte_code, source=self.source)
def with_client(self, client: "protocol.SyncClient") -> "ModuleSource":
return ModuleSource(
name=self.name, hash_value=self.hash_value, is_byte_code=self.is_byte_code, source=self.source, _client=client
)
@dataclass(frozen=True)
class FailedModuleSource:
module_source: ModuleSource
exception: Exception
class CodeLoader:
"""
Class responsible for managing code loaded from modules received from the compiler
:param code_dir: The directory where the code is stored
"""
def __init__(self, code_dir: str, clean: bool = False) -> None:
self.__code_dir = code_dir
self.__modules: dict[str, tuple[str, types.ModuleType]] = {} # A map with all modules we loaded, and its hv
self.__check_dir(clean)
self.mod_dir = os.path.join(self.__code_dir, MODULE_DIR)
PluginModuleFinder.configure_module_finder(modulepaths=[self.mod_dir], prefer=True)
def __check_dir(self, clean: bool = False) -> None:
"""
Check if the code directory
"""
if clean and os.path.exists(self.__code_dir):
shutil.rmtree(self.__code_dir)
# check for the code dir
if not os.path.exists(self.__code_dir):
os.makedirs(self.__code_dir, exist_ok=True)
# check for modules subdir
if not os.path.exists(os.path.join(self.__code_dir, MODULE_DIR)):
os.makedirs(os.path.join(self.__code_dir, MODULE_DIR), exist_ok=True)
def _load_module(self, mod_name: str, hv: str, require_reload: bool = True) -> None:
"""
Load or reload a module
:arg require_reload: if set to true, we will explcitly reload modules, otherwise we keep them as is
"""
# Importing a module -> only the first import loads the code
# cache of loaded modules mechanism -> starts afresh when agent is restarted
try:
if mod_name in self.__modules:
if require_reload:
mod = importlib.reload(self.__modules[mod_name][1])
else:
LOGGER.debug("Not reloading module %s", mod_name)
return
else:
mod = importlib.import_module(mod_name)
self.__modules[mod_name] = (hv, mod)
LOGGER.info("Loaded module %s", mod_name)
except ImportError:
LOGGER.exception("Unable to load module %s", mod_name)
def install_source(self, module_source: ModuleSource) -> bool:
"""
:return: True if this module install requires a reload
"""
# if the module is new, or update
if module_source.name not in self.__modules or module_source.hash_value != self.__modules[module_source.name][0]:
LOGGER.info("Deploying code (hv=%s, module=%s)", module_source.hash_value, module_source.name)
all_modules_dir: str = os.path.join(self.__code_dir, MODULE_DIR)
relative_module_path: str = convert_module_to_relative_path(module_source.name)
# Treat all modules as a package for simplicity: module is a dir with source in __init__.py
module_dir: str = os.path.join(all_modules_dir, relative_module_path)
package_dir: str = os.path.normpath(
os.path.join(all_modules_dir, pathlib.PurePath(pathlib.PurePath(relative_module_path).parts[0]))
)
if module_source.is_byte_code:
init_file = "__init__.pyc"
alternate_file = "__init__.py"
else:
init_file = "__init__.py"
alternate_file = "__init__.pyc"
def touch_inits(directory: str) -> None:
"""
Make sure __init__.py files exist for this package and all parent packages. Required for compatibility
with pre-2020.4 inmanta clients because they don't necessarily upload the whole package.
"""
normdir: str = os.path.normpath(directory)
if normdir == package_dir:
return
if not os.path.exists(os.path.join(normdir, "__init__.py")) and not os.path.exists(
os.path.join(normdir, "__init__.pyc")
):
pathlib.Path(os.path.join(normdir, "__init__.py")).touch()
touch_inits(os.path.dirname(normdir))
# ensure correct package structure
os.makedirs(module_dir, exist_ok=True)
touch_inits(os.path.dirname(module_dir))
source_file = os.path.join(module_dir, init_file)
if os.path.exists(os.path.join(module_dir, alternate_file)):
# A file of the other type exists, we should clean it up
os.remove(os.path.join(module_dir, alternate_file))
if os.path.exists(source_file):
with open(source_file, "rb") as fh:
thehash = hash_file_streaming(fh)
if thehash == module_source.hash_value:
LOGGER.debug(
"Not deploying code (hv=%s, module=%s) because it is already on disk",
module_source.hash_value,
module_source.name,
)
# Force (re)load, because we have it on disk, but not on the in-memory cache
# We may have not loaded it
return True
# write the new source
source_code = module_source.get_source_code()
with open(source_file, "wb+") as fd:
fd.write(source_code)
return True
else:
LOGGER.debug(
"Not deploying code (hv=%s, module=%s) because of cache hit", module_source.hash_value, module_source.name
)
return False
def deploy_version(self, module_sources: Iterable[ModuleSource]) -> None:
to_reload: list[ModuleSource] = []
sources = set(module_sources)
for module_source in sources:
is_changed = self.install_source(module_source)
if is_changed:
to_reload.append(module_source)
# This whole mechanism can go if we spawn a new venv with the new code when required
if len(to_reload) > 0:
importlib.invalidate_caches()
for module_source in to_reload:
# (re)load the new source
self._load_module(module_source.name, module_source.hash_value)
class PluginModuleLoader(FileLoader):
"""
A custom module loader which imports the V1 modules in the inmanta_plugins namespace package.
V2 modules are loaded using the standard Python loader.
"""
def __init__(self, fullname: str, path_to_module: str) -> None:
"""
:param fullname: A fully qualified import path to the module or package to be imported
:param path_to_module: Path to the file on disk that belongs to the import `fullname`. This should be an empty
string when the top-level package inmanta_plugins is imported.
"""
super().__init__(fullname, path_to_module)
self.path: str
def exec_module(self, module: types.ModuleType) -> None:
return super().exec_module(module)
def get_source(self, fullname: str) -> bytes:
# No __init__.py exists for top level package
if self._loading_top_level_package():
return b""
with open(self.path, "rb") as fd:
return fd.read()
def is_package(self, fullname: str) -> bool:
if self._loading_top_level_package():
return True
return os.path.basename(self.path) == "__init__.py"
def _loading_top_level_package(self) -> bool:
return self.path == ""
class ByteCodePluginModuleLoader(SourcelessFileLoader):
def is_package(self, fullname: str) -> bool:
if self._loading_top_level_package():
return True
return os.path.basename(self.path) == "__init__.pyc"
def _loading_top_level_package(self) -> bool:
return self.path == ""
def convert_relative_path_to_module(path: str) -> str:
"""
Returns the fully qualified module name given a path, relative to the module directory.
For example
convert_relative_path_to_module("my_mod/plugins/my_submod")
== convert_relative_path_to_module("my_mod/plugins/my_submod.py")
== convert_relative_path_to_module("my_mod/plugins/my_submod/__init__.py")
== "inmanta_plugins.my_mod.my_submod".
"""
if path.startswith("/"):
raise Exception("Error parsing module path: expected relative path, got %s" % path)
def split(path: str) -> Iterator[str]:
"""
Returns an iterator over path's parts.
"""
if path == "":
return iter(())
init, last = os.path.split(path)
yield from split(init)
if last != "":
yield last
parts: list[str] = list(split(path))
if parts == []:
return const.PLUGINS_PACKAGE
if len(parts) == 1 or parts[1] != PLUGIN_DIR:
raise Exception(f"Error parsing module path: expected 'some_module/{PLUGIN_DIR}/some_submodule', got {path}")
def strip_py(module: list[str]) -> list[str]:
"""
Strip __init__.py or .py file extension from module parts.
"""
if module == []:
return []
init, last = module[:-1], module[-1]
if last == "__init__.py" or last == "__init__.pyc":
return init
if last.endswith(".py"):
return list(chain(init, [last[:-3]]))
if last.endswith(".pyc"):
return list(chain(init, [last[:-4]]))
return module
top_level_inmanta_module: str = parts[0]
inmanta_submodule: list[str] = parts[2:]
# my_mod/plugins/tail -> inmanta_plugins.my_mod.tail
return ".".join(chain([const.PLUGINS_PACKAGE, top_level_inmanta_module], strip_py(inmanta_submodule)))
def convert_module_to_relative_path(full_mod_name: str) -> str:
"""
Returns path to the module, relative to the module directory. Does not differentiate between modules and packages.
For example convert_module_to_relative_path("inmanta_plugins.my_mod.my_submod") == "my_mod/plugins/my_submod".
An empty string is returned when `full_mod_name` equals `inmanta_plugins`.
"""
full_module_parts = full_mod_name.split(".")
if full_module_parts[0] != const.PLUGINS_PACKAGE:
raise Exception(
"PluginModuleLoader is a loader for the inmanta_plugins package."
" Module %s is not part of the inmanta_plugins package." % full_mod_name,
)
module_parts = full_module_parts[1:]
# No __init__.py exists for top level package
if len(module_parts) == 0:
return ""
module_parts.insert(1, PLUGIN_DIR)
if module_parts[-1] == "__init__":
module_parts = module_parts[:-1]
return os.path.join(*module_parts)
[docs]
@stable_api
class PluginModuleFinder(MetaPathFinder):
"""
Custom module finder which handles V1 Inmanta modules. V2 modules are handled using the standard Python finder. This
finder is stored as the last entry in `meta_path`, as such that the default Python Finders detect V2 modules first.
"""
MODULE_FINDER: "PluginModuleFinder" = None
def __init__(self, modulepaths: list[str]) -> None:
"""
:param modulepaths: The module paths for the inmanta project.
"""
self._modulepaths = list(modulepaths)
@classmethod
def get_module_finder(cls) -> "PluginModuleFinder":
if cls.MODULE_FINDER is not None:
return cls.MODULE_FINDER
raise Exception("No PluginModuleFinder configured. Call configure_module_finder() first.")
[docs]
@classmethod
def reset(cls) -> None:
"""
Remove the PluginModuleFinder from sys.meta_path.
"""
if cls.MODULE_FINDER is not None and cls.MODULE_FINDER in sys.meta_path:
sys.meta_path.remove(cls.MODULE_FINDER)
cls.MODULE_FINDER = None
@classmethod
def configure_module_finder(cls, modulepaths: list[str], *, prefer: bool = False) -> None:
"""
Setup a custom module loader to handle imports in .py files of the modules. This finder will be stored
as the last finder in sys.meta_path, unless prefer is True. If the custom module loader has already been
set up, does nothing (i.e. it is not moved to the front or the back of sys.meta_path).
:param modulepaths: The directories where the module finder should look for modules.
:param prefer: Prefer this module finder over others, putting it first in sys.meta_path.
"""
if cls.MODULE_FINDER is not None:
# PluginModuleFinder already present in sys.meta_path
cls.MODULE_FINDER._modulepaths = list(modulepaths)
return
# PluginModuleFinder not yet present in sys.meta_path.
module_finder = PluginModuleFinder(modulepaths)
if prefer:
sys.meta_path.insert(0, module_finder)
else:
sys.meta_path.append(module_finder)
cls.MODULE_FINDER = module_finder
def find_spec(
self, fullname: str, path: Optional[abc.Sequence[str]], target: Optional[types.ModuleType] = None
) -> Optional[ModuleSpec]:
"""
:param fullname: A fully qualified import path to the module or package to be imported.
"""
if self._should_handle_import(fullname):
LOGGER.debug("Loading module: %s", fullname)
path_to_module = self._get_path_to_module(fullname)
if path_to_module is not None:
if path_to_module[-4:] == ".pyc":
return importlib.util.spec_from_loader(fullname, ByteCodePluginModuleLoader(fullname, path_to_module))
return importlib.util.spec_from_loader(fullname, PluginModuleLoader(fullname, path_to_module))
else:
# The given module is not present in self.modulepath.
return None
return None
def _should_handle_import(self, fq_import_path: str) -> bool:
if fq_import_path == const.PLUGINS_PACKAGE:
return False
return fq_import_path.startswith(f"{const.PLUGINS_PACKAGE}.")
def _get_path_to_module(self, fullname: str) -> Optional[str]:
"""
Return the path to the file in the module path that belongs to the module given by `fullname`.
None is returned when the given module is not present in the module path.
:param fullname: A fully-qualified import path to a module.
"""
def find_module(module_path: str, extension: str = "py") -> Optional[str]:
path_to_module = os.path.join(module_path, relative_path)
if os.path.exists(f"{path_to_module}.{extension}"):
return f"{path_to_module}.{extension}"
if os.path.isdir(path_to_module):
path_to_module = os.path.join(path_to_module, f"__init__.{extension}")
if os.path.exists(path_to_module):
return path_to_module
return None
relative_path: str = convert_module_to_relative_path(fullname)
# special case: top-level package
if relative_path == "":
return ""
for module_path in self._modulepaths:
path_to_module = find_module(module_path, extension="pyc")
if path_to_module is not None:
return path_to_module
# try the byte code only version
path_to_module = find_module(module_path, extension="py")
if path_to_module is not None:
return path_to_module
return None
[docs]
@stable_api
def unload_inmanta_plugins(inmanta_module: Optional[str] = None) -> None:
"""
Unloads Python modules associated with inmanta modules (`inmanta_plugins` submodules).
:param inmanta_module: Unload the Python modules for a specific inmanta module. If omitted, unloads the Python modules for
all inmanta modules.
"""
top_level_pkg: str = f"{const.PLUGINS_PACKAGE}.{inmanta_module}" if inmanta_module is not None else const.PLUGINS_PACKAGE
# module created by setuptools for custom Finder
prefix_editable_installed_pkg = "__editable___inmanta_module_"
if inmanta_module is not None:
prefix_editable_installed_pkg = f"{prefix_editable_installed_pkg}{inmanta_module.replace('-', '_')}"
def should_unload(key_in_sys_modules_dct: str) -> bool:
if key_in_sys_modules_dct == top_level_pkg or key_in_sys_modules_dct.startswith(f"{top_level_pkg}."):
return True
if key_in_sys_modules_dct.startswith(prefix_editable_installed_pkg):
return True
return False
loaded_modules: abc.KeysView[str] = sys.modules.keys()
modules_to_unload: Sequence[str] = [fq_name for fq_name in loaded_modules if should_unload(fq_name)]
for k in modules_to_unload:
del sys.modules[k]
if modules_to_unload:
importlib.invalidate_caches()
def unload_modules_for_path(path: str) -> None:
"""
Unload any modules that are loaded from a given path (site-packages dir).
"""
def module_in_prefix(module: types.ModuleType, prefix: str) -> bool:
file: Optional[str] = getattr(module, "__file__", None)
return file.startswith(prefix) if file is not None else False
loaded_modules: list[str] = [mod_name for mod_name, mod in sys.modules.items() if module_in_prefix(mod, path)]
for mod_name in loaded_modules:
del sys.modules[mod_name]
importlib.invalidate_caches()