"""
Copyright 2021 Inmanta
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
Contact: code@inmanta.com
"""
import configparser
import importlib
import itertools
import logging
import operator
import os
import re
import subprocess
import sys
import tempfile
import textwrap
import traceback
import types
import typing
import warnings
from abc import ABC, abstractmethod
from collections import abc, defaultdict
from collections.abc import Iterable, Iterator, Mapping, Sequence
from configparser import ConfigParser
from dataclasses import dataclass
from enum import Enum
from functools import reduce
from importlib.abc import Loader
from importlib.metadata import PackageNotFoundError
from io import BytesIO, TextIOBase
from itertools import chain
from subprocess import CalledProcessError
from tarfile import TarFile
from time import time
from typing import Annotated, ClassVar, Dict, Generic, List, NewType, Optional, TextIO, TypeVar, Union, cast
import more_itertools
import pydantic
import yaml
from pydantic import BaseModel, Field, NameEmail, StringConstraints, ValidationError, field_validator
import inmanta.data.model
import packaging.requirements
import packaging.utils
import packaging.version
from inmanta import RUNNING_TESTS, const, env, loader, plugins
from inmanta.ast import CompilerException, LocatableString, Location, Namespace, Range, WrappingRuntimeException
from inmanta.ast.blocks import BasicBlock
from inmanta.ast.statements import BiStatement, DefinitionStatement, DynamicStatement, Statement
from inmanta.ast.statements.define import DefineImport
from inmanta.env import assert_pip_has_source
from inmanta.file_parser import PreservativeYamlParser, RequirementsTxtParser
from inmanta.parser import plyInmantaParser
from inmanta.parser.plyInmantaParser import cache_manager
from inmanta.stable_api import stable_api
from inmanta.util import get_compiler_version
from inmanta.warnings import InmantaWarning
from packaging.specifiers import SpecifierSet
from packaging.version import Version
from ruamel.yaml.comments import CommentedMap
try:
from typing import TYPE_CHECKING
except ImportError:
TYPE_CHECKING = False
LOGGER = logging.getLogger(__name__)
Path = NewType("Path", str)
ModuleName = NewType("ModuleName", str)
T = TypeVar("T")
TModule = TypeVar("TModule", bound="Module")
TProject = TypeVar("TProject", bound="Project")
TInmantaModuleRequirement = TypeVar("TInmantaModuleRequirement", bound="InmantaModuleRequirement")
@stable_api
class InmantaModuleRequirement:
"""
Represents a requirement on an inmanta module. This is a wrapper around Requirement. This class is provided for the
following reasons:
1. Work around some particulars of Requirement's semantics with respect to naming conventions.
2. Improve readability and clarity of purpose where a requirement on either a Python package or an inmanta module is
used by distinguishing the two on a type level.
"""
def __init__(self, requirement: inmanta.util.CanonicalRequirement) -> None:
if requirement.name.startswith(ModuleV2.PKG_NAME_PREFIX):
raise ValueError(
f"InmantaModuleRequirement instances work with inmanta module names, not python package names. "
f"Problematic case: {str(requirement)}"
)
self._requirement = requirement
@property
def project_name(self) -> str:
warnings.warn(InmantaWarning("The `project_name` property has been deprecated in favor of `name`"))
return self.name
@property
def name(self) -> str:
return self._requirement.name.replace("-", "_")
@property
def key(self) -> str:
# Requirement converts all "_" to "-". Inmanta modules use "_"
warnings.warn(InmantaWarning("The `key` property has been deprecated in favor of `name`"))
return self.name
@property
def specifier(self) -> SpecifierSet:
return self._requirement.specifier
def __eq__(self, other: object) -> bool:
if not isinstance(other, InmantaModuleRequirement):
return NotImplemented
return self._requirement == other._requirement
def __contains__(self, version: packaging.version.Version | str) -> bool:
return self._requirement.specifier.contains(version, prereleases=True)
def __str__(self) -> str:
return str(self._requirement).replace("-", "_")
def __hash__(self) -> int:
return self._requirement.__hash__()
@classmethod
def parse(cls: type[TInmantaModuleRequirement], spec: str) -> TInmantaModuleRequirement:
if spec.startswith(ModuleV2.PKG_NAME_PREFIX):
raise ValueError(
"Invalid Inmanta module requirement: Use the Inmanta module name instead of the Python package name"
)
if "-" in spec:
raise ValueError("Invalid Inmanta module requirement: Inmanta module names use '_', not '-'.")
return cls(inmanta.util.parse_requirement(requirement=spec))
def get_python_package_requirement(self) -> inmanta.util.CanonicalRequirement:
"""
Return a Requirement with the name of the Python distribution package for this module requirement.
"""
module_name = self.name
pkg_name = ModuleV2Source.get_package_name_for(module_name)
pkg_req_str = str(self).replace(module_name, pkg_name, 1) # Replace max 1 occurrence
return inmanta.util.parse_requirement(requirement=pkg_req_str)
class CompilerExceptionWithExtendedTrace(CompilerException):
"""
A compiler exception that adds additional information about the cause of this exception
to the formatted trace.
"""
def format_trace(self, indent: str = "", indent_level: int = 0) -> str:
"""Make a representation of this exception and its causes"""
# can have a cause of any type
out = indent * indent_level + self.format()
if self.__cause__ is not None:
part = traceback.format_exception_only(self.__cause__.__class__, self.__cause__)
out += "\n" + indent * indent_level + "caused by:\n"
for line in part:
out += indent * (indent_level + 1) + line
return out
[docs]
@stable_api
class InvalidModuleException(CompilerExceptionWithExtendedTrace):
"""
This exception is raised if a module is invalid.
"""
class ModuleNotFoundException(CompilerExceptionWithExtendedTrace):
"""
This exception is raised if a module is not found in any of the repositories.
"""
class ModuleLoadingException(WrappingRuntimeException):
"""
Wrapper around an InvalidModuleException or a ModuleNotFoundException that contains extra information
about the specific DefinedImport statement that cannot not be processed correctly.
"""
def __init__(
self,
name: str,
stmt: "DefineImport",
cause: Union[InvalidModuleException, ModuleNotFoundException],
msg: Optional[str] = None,
) -> None:
"""
:param name: The name of the module that could not be loaded.
:param stmt: The DefinedImport statement that triggered the failure
:param cause: The InvalidModuleException or ModuleNotFoundException that was raised
:param msg: A description of the error.
"""
if msg is None:
msg = "Failed to load module %s" % name
WrappingRuntimeException.__init__(self, stmt, msg, cause)
self.name = name
def importantance(self) -> int:
return 5
class ModuleMetadataFileNotFound(InvalidModuleException):
pass
class ModuleV2InV1PathException(InvalidModuleException):
def __init__(self, project: Optional["Project"], module: "ModuleV2", msg: str) -> None:
super().__init__(msg)
self.project: Optional[Project] = project
self.module: ModuleV2 = module
class ModuleDeprecationWarning(InmantaWarning):
pass
class ProjectConfigurationWarning(InmantaWarning):
pass
[docs]
@stable_api
class ProjectNotFoundException(CompilerException):
"""
This exception is raised when inmanta is unable to find a valid project
"""
class PluginModuleLoadException(Exception):
"""
Exception representing an error during plugin module loading.
"""
def __init__(self, cause: Exception, module: str, fq_import: str, path: str, lineno: Optional[int]) -> None:
"""
:param cause: The exception raised by the import.
:param module: The name of the Inmanta module.
:param fq_import: The fully qualified import to the Python module for which the loading failed.
:param path: The path to the file on disk that belongs to `fq_import`.
:param lineno: Optionally, the line number in `path` that causes the loading issue.
"""
self.cause: Exception = cause
self.module: str = module
self.fq_import: str = fq_import
self.path: str = path
self.lineno: Optional[int] = lineno
lineno_suffix = f":{self.lineno}" if self.lineno is not None else ""
super().__init__(
"%s while loading plugin module %s at %s: %s"
% (
self.get_cause_type_name(),
self.module,
f"{self.path}{lineno_suffix}",
self.cause,
)
)
def get_cause_type_name(self) -> str:
module: Optional[str] = type(self.cause).__module__
name: str = type(self.cause).__qualname__
return name if module is None or module == "builtins" else f"{module}.{name}"
def to_compiler_exception(self) -> CompilerException:
module: Optional[str] = type(self.cause).__module__
name: str = type(self.cause).__qualname__
cause_type_name = name if module is None or module == "builtins" else f"{module}.{name}"
exception = CompilerException(
f"Unable to load all plug-ins for module {self.module}:"
f"\n\t{cause_type_name} while loading plugin module {self.fq_import}: {self.cause}"
)
exception.set_location(Location(self.path, self.lineno if self.lineno is not None else 0))
return exception
class UntrackedFilesMode(Enum):
"""
The different options that can be passed to the --untracked-files option of the `git status` command.
"""
ALL = "all"
NORMAL = "normal"
NO = "no"
class GitProvider:
def clone(self, src: str, dest: str) -> None:
pass
def fetch(self, repo: str) -> None:
pass
@abstractmethod
def status(self, repo: str, untracked_files_mode: Optional[UntrackedFilesMode] = None) -> str:
pass
@abstractmethod
def get_all_tags(self, repo: str) -> list[str]:
pass
@abstractmethod
def get_version_tags(self, repo: str, only_return_stable_versions: bool = False) -> list[packaging.version.Version]:
pass
@abstractmethod
def get_file_for_version(self, repo: str, tag: str, file: str) -> str:
pass
@abstractmethod
def checkout_tag(self, repo: str, tag: str) -> None:
pass
@abstractmethod
def commit(
self,
repo: str,
message: str,
commit_all: bool,
add: Optional[abc.Sequence[str]] = None,
raise_exc_when_nothing_to_commit: bool = True,
) -> None:
pass
def tag(self, repo: str, tag: str) -> None:
pass
@abstractmethod
def push(self, repo: str) -> str:
pass
@abstractmethod
def pull(self, repo: str) -> str:
pass
@abstractmethod
def get_remote(self, repo: str) -> Optional[str]:
pass
@abstractmethod
def is_git_repository(self, repo: str) -> bool:
pass
def git_init(self, repo: str) -> None:
pass
def add(self, repo: str, files: list[str]) -> None:
pass
class CLIGitProvider(GitProvider):
def clone(self, src: str, dest: str) -> None:
process_env = os.environ.copy()
process_env["GIT_ASKPASS"] = "true"
cmd = ["git", "clone", src, dest]
return_code, _ = env.CommandRunner(LOGGER).run_command_and_stream_output(cmd, env_vars=process_env)
if return_code != 0:
raise Exception(f"An unexpected error occurred while cloning into {dest} from {src}.")
def fetch(self, repo: str) -> None:
env = os.environ.copy()
env["GIT_ASKPASS"] = "true"
subprocess.check_call(
["git", "fetch", "--tags"], cwd=repo, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, env=env
)
def status(self, repo: str, untracked_files_mode: Optional[UntrackedFilesMode] = None) -> str:
"""
Return the output of the `git status --porcelain` command.
:param repo: The path to the directory that contains the git repository on which the git command should be executed.
:param untracked_files_mode: If provided, the --untracked-files option will be passed to `git status` command.
"""
extra_args = []
if untracked_files_mode:
extra_args.append(f"--untracked-files={untracked_files_mode.value}")
return subprocess.check_output(["git", "status", "--porcelain", *extra_args], cwd=repo).decode("utf-8")
def get_all_tags(self, repo: str) -> list[str]:
return subprocess.check_output(["git", "tag"], cwd=repo).decode("utf-8").splitlines()
def get_version_tags(self, repo: str, only_return_stable_versions: bool = False) -> list[packaging.version.Version]:
"""
Return the Git tags that represent version numbers as version.Version objects. Only PEP440 compliant
versions will be returned.
:param repo: The path to the directory that contains the git repository on which the git command should be executed.
:param only_return_stable_versions: Return only version for stable releases.
"""
result = []
all_tags: list[str] = sorted(self.get_all_tags(repo))
for tag in all_tags:
try:
parsed_version: packaging.version.Version = packaging.version.Version(tag)
except packaging.version.InvalidVersion:
continue
if not only_return_stable_versions or not parsed_version.is_prerelease:
result.append(parsed_version)
return sorted(result)
def checkout_tag(self, repo: str, tag: str) -> None:
subprocess.check_call(["git", "checkout", tag], cwd=repo, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
def commit(
self,
repo: str,
message: str,
commit_all: bool,
add: Optional[abc.Sequence[str]] = None,
raise_exc_when_nothing_to_commit: bool = True,
) -> None:
"""
Execute the `git commit` command.
:param repo: The path to the directory that contains the git repository on which the git command should be executed.
:param message: The commit message.
:param commit_all: Commit changes to all tracked files using the `-a` option.
:param add: Paths to files that have to be staged for the commit.
:param raise_exc_when_nothing_to_commit: True iff there are no changes to commit.
"""
if add is None:
add = []
self.add(repo=repo, files=add)
if (
not raise_exc_when_nothing_to_commit
and not self.status(repo=repo, untracked_files_mode=UntrackedFilesMode.NO).strip()
):
# Nothing to commit
return
if not commit_all:
subprocess.check_call(
["git", "commit", "-m", message], cwd=repo, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL
)
else:
subprocess.check_call(
["git", "commit", "-a", "-m", message], cwd=repo, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL
)
def tag(self, repo: str, tag: str) -> None:
subprocess.check_call(
["git", "tag", "-a", "-m", "auto tag by module tool", tag],
cwd=repo,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
def pull(self, repo: str) -> str:
return subprocess.check_output(["git", "pull"], cwd=repo, stderr=subprocess.DEVNULL).decode("utf-8")
def get_remote(self, repo: str) -> Optional[str]:
"""
Returns the remote tracking repo given a local repo or None if the remote is not yet configured
"""
try:
remote = subprocess.check_output(
["git", "config", "--get", "remote.origin.url"], cwd=repo, stderr=subprocess.DEVNULL
).decode("utf-8")
except CalledProcessError:
remote = None
return remote
def push(self, repo: str) -> str:
return subprocess.check_output(
["git", "push", "--follow-tags", "--porcelain"], cwd=repo, stderr=subprocess.DEVNULL
).decode("utf-8")
def get_file_for_version(self, repo: str, tag: str, file: str) -> str:
data = subprocess.check_output(["git", "archive", "--format=tar", tag, file], cwd=repo, stderr=subprocess.DEVNULL)
tf = TarFile(fileobj=BytesIO(data))
tfile = tf.next()
assert tfile is not None
b = tf.extractfile(tfile)
assert b is not None
return b.read().decode("utf-8")
def is_git_repository(self, repo: str) -> bool:
"""
Return True iff the given directory is a git repository.
"""
try:
self.status(repo=repo)
except subprocess.CalledProcessError:
return False
else:
return True
def git_init(self, repo: str) -> None:
"""
Execute `git init` in the given repository.
"""
subprocess.check_call(["git", "init"], cwd=repo, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
def add(self, repo: str, files: abc.Sequence[str]) -> None:
if files:
subprocess.check_call(["git", "add", *files], cwd=repo, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
gitprovider = CLIGitProvider()
[docs]
class ModuleSource(Generic[TModule]):
[docs]
def get_installed_module(self, project: Optional["Project"], module_name: str) -> Optional[TModule]:
"""
Returns a module object for a module if it is installed.
:param project: The project associated with the module.
:param module_name: The name of the module.
"""
path: Optional[str] = self.path_for(module_name)
return self.from_path(project, module_name, path) if path is not None else None
def get_module(
self, project: "Project", module_spec: list[InmantaModuleRequirement], install: bool = False
) -> Optional[TModule]:
"""
Returns the appropriate module instance for a given module spec.
:param project: The project associated with the module.
:param module_spec: The module specification including any constraints on its version. In this case,
the project is responsible for verifying constraint compatibility.
:param install: Whether to attempt to install the module if it hasn't been installed yet.
"""
module_name: str = self._get_module_name(module_spec)
installed: Optional[TModule] = self.get_installed_module(project, module_name)
def _should_install_module() -> bool:
"""
Return True iff the given module should get installed
"""
if not install:
# No install was requested
return False
if installed is None:
# Package is not installed
return True
if isinstance(installed, ModuleV2):
python_pkg_req = [r.get_python_package_requirement() for r in module_spec]
if not project.virtualenv.are_installed(python_pkg_req):
# Package could define an extra that is not installed yet
return True
# Already installed
return False
if _should_install_module():
return self.install(project, module_spec)
return installed
def _format_constraints(self, module_name: str, module_spec: list[InmantaModuleRequirement]) -> str:
"""
Returns the constraints on a given inmanta module as a string.
:param module_name: The name of the module.
:param module_spec: List of inmanta requirements in which to look for the module.
"""
constraints_on_module: list[str] = [
str(req) for req in module_spec if module_name == req.name and len(req.specifier) > 0
]
if constraints_on_module:
from_constraints = f"(with constraints {' '.join(constraints_on_module)})"
else:
from_constraints = "(with no version constraints)"
return from_constraints
@abstractmethod
def log_pre_install_information(self, module_name: str, module_spec: list[InmantaModuleRequirement]) -> None:
"""
Display information about this module's installation before the actual installation.
:param module_name: The module's name.
"""
raise NotImplementedError("Abstract method")
def _log_version_snapshot(self, header: Optional[str], version_snapshot: dict[str, packaging.version.Version]) -> None:
if version_snapshot:
out = [header] if header is not None else []
out.extend(f"{mod}: {version}" for mod, version in version_snapshot.items())
LOGGER.debug("\n".join(out))
def _log_snapshot_difference(
self,
version_snapshot: dict[str, packaging.version.Version],
previous_snapshot: dict[str, packaging.version.Version],
header: Optional[str],
) -> None:
set_pre_install: set[tuple[str, packaging.version.Version]] = set(previous_snapshot.items())
set_post_install: set[tuple[str, packaging.version.Version]] = set(version_snapshot.items())
updates_and_additions: set[tuple[str, packaging.version.Version]] = set_post_install - set_pre_install
if version_snapshot:
out = [header] if header is not None else []
for inmanta_module_name, package_version in sorted(version_snapshot.items()):
if inmanta_module_name not in previous_snapshot.keys():
# new module that wasn't previously installed
out.append("+ " + inmanta_module_name + ": " + str(package_version))
elif inmanta_module_name in [elmt[0] for elmt in updates_and_additions]:
# module has a different version
out.append("+ " + inmanta_module_name + ": " + str(package_version))
out.append("- " + inmanta_module_name + ": " + str(previous_snapshot[inmanta_module_name]))
LOGGER.debug("\n".join(out))
@abstractmethod
def install(self, project: "Project", module_spec: list[InmantaModuleRequirement]) -> Optional[TModule]:
"""
Attempt to install a module given a module spec. Updates a module that is already installed only if it does not match
the constraints.
:param project: The project associated with the module.
:param module_spec: The module specification including any constraints on its version.
:return: The module object when the module was installed. When the module could not be found, None is returned.
"""
raise NotImplementedError("Abstract method")
@abstractmethod
def path_for(self, name: str) -> Optional[str]:
"""
Returns the path to the module root directory. Should be called prior to configuring the module finder for v1 modules.
"""
raise NotImplementedError("Abstract method")
@classmethod
@abstractmethod
def from_path(cls, project: Optional["Project"], module_name: str, path: str) -> TModule:
"""
Returns a module instance given a path to it.
"""
raise NotImplementedError("Abstract method")
def _get_module_name(self, module_spec: list[InmantaModuleRequirement]) -> str:
module_names: set[str] = {req.name for req in module_spec}
module_name: str = more_itertools.one(
module_names,
too_short=ValueError("module_spec should contain at least one requirement"),
too_long=ValueError("module_spec should contain requirements for exactly one module"),
)
return module_name
[docs]
@stable_api
class ModuleV2Source(ModuleSource["ModuleV2"]):
def __init__(self, urls: Sequence[str] = []) -> None:
"""
:param urls: retained for backward compatibility
"""
self.urls: list[str] = [url if not os.path.exists(url) else os.path.abspath(url) for url in urls]
@classmethod
def get_installed_version(cls, module_name: str) -> Optional[packaging.version.Version]:
"""
Returns the version for a module if it is installed.
"""
if module_name.startswith(ModuleV2.PKG_NAME_PREFIX):
raise ValueError("PythonRepo instances work with inmanta module names, not Python package names.")
try:
return packaging.version.Version(importlib.metadata.version(ModuleV2Source.get_package_name_for(module_name)))
except PackageNotFoundError:
return None
@classmethod
def get_inmanta_module_name(cls, python_package_name: str) -> str:
if not python_package_name.startswith(ModuleV2.PKG_NAME_PREFIX):
raise ValueError(f"Invalid python package name: should start with {ModuleV2.PKG_NAME_PREFIX}")
result: str = python_package_name[len(ModuleV2.PKG_NAME_PREFIX) :].replace("-", "_")
if not result:
raise ValueError("Invalid python package name: empty module name part.")
return result
@classmethod
def get_package_name_for(cls, module_name: str) -> str:
module_name = module_name.replace("_", "-")
return f"{ModuleV2.PKG_NAME_PREFIX}{module_name}"
@classmethod
def get_namespace_package_name(cls, module_name: str) -> str:
return f"{const.PLUGINS_PACKAGE}.{module_name}"
def install(self, project: "Project", module_spec: list[InmantaModuleRequirement]) -> Optional["ModuleV2"]:
module_name: str = self._get_module_name(module_spec)
assert_pip_has_source(project.metadata.pip, f"a v2 module {module_name}")
requirements: list[inmanta.util.CanonicalRequirement] = [req.get_python_package_requirement() for req in module_spec]
preinstalled: Optional[ModuleV2] = self.get_installed_module(project, module_name)
# Get known requires and add them to prevent invalidating constraints through updates
# These could be constraints (-c) as well, but that requires additional sanitation
# Because for pip not every valid -r is a valid -c
current_requires = project.get_strict_python_requirements_as_list()
requirements += inmanta.util.parse_requirements(current_requires)
if preinstalled is not None:
# log warning if preinstalled version does not match constraints
preinstalled_version: str = str(preinstalled.version)
if not all(preinstalled_version in constraint for constraint in module_spec):
LOGGER.warning(
"Currently installed %s-%s does not match constraint %s: updating to compatible version.",
module_name,
preinstalled_version,
",".join(str(constraint.specifier) for constraint in module_spec if len(constraint.specifier) > 0),
)
try:
self.log_pre_install_information(module_name, module_spec)
modules_pre_install = self.take_v2_modules_snapshot(header="Modules versions before installation:")
env.process_env.install_for_config(
requirements,
project.metadata.pip,
)
self.log_post_install_information(module_name)
self.log_snapshot_difference_v2_modules(modules_pre_install, header="Modules versions after installation:")
except env.PackageNotFound:
return None
path: Optional[str] = self.path_for(module_name)
if path is None:
python_package: str = ModuleV2Source.get_package_name_for(module_name)
namespace_package: str = self.get_namespace_package_name(module_name)
raise InvalidModuleException(f"{python_package} does not contain a {namespace_package} module.")
return self.from_path(project, module_name, path)
def log_pre_install_information(self, module_name: str, module_spec: list[InmantaModuleRequirement]) -> None:
LOGGER.debug("Installing module %s (v2) %s.", module_name, super()._format_constraints(module_name, module_spec))
def take_v2_modules_snapshot(self, header: Optional[str] = None) -> dict[str, packaging.version.Version]:
"""
Log and return a dictionary containing currently installed v2 modules and their versions.
:param header: Optional text to be displayed before logging the modules and their versions
"""
packages = env.PythonWorkingSet.get_packages_in_working_set(inmanta_modules_only=True)
version_snapshot = {self.get_inmanta_module_name(mod): version for mod, version in packages.items()}
super()._log_version_snapshot(header, version_snapshot)
return version_snapshot
def log_snapshot_difference_v2_modules(
self, previous_snapshot: dict[str, packaging.version.Version], header: Optional[str] = None
) -> None:
"""
Logs a diff view of v2 inmanta modules currently installed (in alphabetical order) and their version.
:param previous_snapshot: Mapping of inmanta module names to their respective versions. This is the baseline against
which the currently installed versions will be compared.
:param header: Optional text to be displayed before logging the diff view
"""
packages = env.PythonWorkingSet.get_packages_in_working_set(inmanta_modules_only=True)
version_snapshot = {self.get_inmanta_module_name(mod): version for mod, version in packages.items()}
super()._log_snapshot_difference(version_snapshot, previous_snapshot, header)
def log_post_install_information(self, module_name: str) -> None:
"""
Display information about this module's installation after the actual installation.
:param module_name: The module's name.
"""
installed_version: Optional[packaging.version.Version] = self.get_installed_version(module_name)
LOGGER.debug("Successfully installed module %s (v2) version %s", module_name, installed_version)
def path_for(self, name: str) -> Optional[str]:
"""
Returns the path to the module root directory. Should be called prior to configuring the module finder for v1 modules.
"""
if name.startswith(ModuleV2.PKG_NAME_PREFIX):
raise ValueError("PythonRepo instances work with inmanta module names, not Python package names.")
package: str = self.get_namespace_package_name(name)
mod_spec: Optional[tuple[Optional[str], Loader]] = env.ActiveEnv.get_module_file(package)
if mod_spec is None:
return None
init, mod_loader = mod_spec
if isinstance(mod_loader, loader.PluginModuleLoader):
# Module was found in the environment but it is associated with the v1 module loader. Since the v2 loader has
# precedence, we can conclude the module has not been installed in v2 mode. If it were, the module could never
# be associated with the v1 loader.
return None
if init is None:
raise InvalidModuleException(f"Package {package} was installed but no __init__.py file could be found.")
# In case of editable installs the path may contain symlinks to actual location (see
# PythonEnvironment.get_module_file docstring). Since modules contain non-Python files (of which setuptools may not be
# aware, therefore they may not exist in the link structure), we need the real path.
pkg_installation_dir = os.path.dirname(os.path.realpath(init))
if os.path.exists(os.path.join(pkg_installation_dir, ModuleV2.MODULE_FILE)):
# normal install: __init__.py is in module root
return pkg_installation_dir
else:
# editable install: __init__.py is in `inmanta_plugins/<mod_name>`
module_root_dir = os.path.normpath(os.path.join(pkg_installation_dir, os.pardir, os.pardir))
if os.path.exists(os.path.join(module_root_dir, ModuleV2.MODULE_FILE)):
return module_root_dir
raise InvalidModuleException(
f"Invalid module at {pkg_installation_dir}: found module package but it has no {ModuleV2.MODULE_FILE}. "
"This occurs when you install or build modules from source incorrectly. "
"Always use the `inmanta module build` command followed by `pip install ./dist/<dist-package>` to "
"respectively build a module from source and install the distribution package. "
"Make sure to uninstall the broken package first."
)
@classmethod
def from_path(cls, project: Optional["Project"], module_name: str, path: str) -> "ModuleV2":
return ModuleV2(
project,
path,
is_editable_install=os.path.exists(os.path.join(path, const.PLUGINS_PACKAGE)),
installed_version=cls.get_installed_version(module_name),
)
def _get_module_name(self, module_spec: list[InmantaModuleRequirement]) -> str:
module_name: str = super()._get_module_name(module_spec)
if module_name.startswith(ModuleV2.PKG_NAME_PREFIX.replace("-", "_")):
raise ValueError("PythonRepo instances work with inmanta module names, not Python package names.")
return module_name
class ModuleV1Source(ModuleSource["ModuleV1"]):
def __init__(self, local_repo: "ModuleRepo", remote_repo: "ModuleRepo") -> None:
self.local_repo: ModuleRepo = local_repo
self.remote_repo: ModuleRepo = remote_repo
def log_pre_install_information(self, module_name: str, module_spec: list[InmantaModuleRequirement]) -> None:
LOGGER.debug("Installing module %s (v1) %s.", module_name, super()._format_constraints(module_name, module_spec))
def take_modules_snapshot(self, project: "Project", header: Optional[str] = None) -> dict[str, packaging.version.Version]:
"""
Log and return a dictionary containing currently loaded modules and their versions.
:param header: Optional text to be displayed before logging the modules and their versions
"""
version_snapshot = {module_name: module.version for module_name, module in project.modules.items()}
super()._log_version_snapshot(header, version_snapshot)
return version_snapshot
def log_snapshot_difference_v1_modules(
self, project: "Project", previous_snapshot: dict[str, packaging.version.Version], header: Optional[str] = None
) -> None:
"""
Logs a diff view on inmanta modules (both v1 and v2) currently loaded (in alphabetical order) and their version.
:param project: The currently active project.
:param previous_snapshot: Mapping of inmanta module names to their respective versions. This is the baseline against
which the currently installed versions will be compared.
:param header: Optional text to be displayed before logging the diff view.
"""
version_snapshot = {module_name: module.version for module_name, module in project.modules.items()}
super()._log_snapshot_difference(version_snapshot, previous_snapshot, header)
def log_post_install_information(self, module: TModule) -> None:
"""
Display information about this module's installation after the actual installation.
:param module: The module.
"""
local_repo = module.path
remote_repo = gitprovider.get_remote(local_repo)
remote_repo = f" from {remote_repo.strip()}" if remote_repo is not None else ""
LOGGER.debug(
"Successfully installed module %s (v1) version %s in %s%s.",
module.name,
module.version,
module.path,
remote_repo,
)
def install(self, project: "Project", module_spec: list[InmantaModuleRequirement]) -> Optional["ModuleV1"]:
module_name: str = self._get_module_name(module_spec)
preinstalled: Optional[ModuleV1] = self.get_installed_module(project, module_name)
if preinstalled is not None:
preinstalled_version: str = str(preinstalled.version)
if all(preinstalled_version in constraint for constraint in module_spec):
return preinstalled
else:
LOGGER.warning(
"Currently installed %s-%s does not match constraint %s: updating to compatible version.",
module_name,
preinstalled_version,
",".join(str(constraint.specifier) for constraint in module_spec if len(constraint.specifier) > 0),
)
self.log_pre_install_information(module_name, module_spec)
modules_pre_install = self.take_modules_snapshot(project, header="Modules versions before installation:")
module = ModuleV1.update(
project, module_name, module_spec, preinstalled.path, fetch=False, install_mode=project.install_mode
)
self.log_snapshot_difference_v1_modules(
project, modules_pre_install, header="Modules versions after installation:"
)
self.log_post_install_information(module)
return module
else:
if project.downloadpath is None:
raise CompilerException(
f"Can not install module {module_name} because 'downloadpath' is not set in {project.PROJECT_FILE}"
)
download_path: str = os.path.join(project.downloadpath, module_name)
result = self.remote_repo.clone(module_name, project.downloadpath)
if not result:
return None
self.log_pre_install_information(module_name, module_spec)
modules_pre_install = self.take_modules_snapshot(project, header="Modules versions before installation:")
module = ModuleV1.update(
project, module_name, module_spec, download_path, fetch=False, install_mode=project.install_mode
)
self.log_snapshot_difference_v1_modules(project, modules_pre_install, header="Modules versions after installation:")
self.log_post_install_information(module)
return module
def path_for(self, name: str) -> Optional[str]:
return self.local_repo.path_for(name)
@classmethod
def from_path(cls, project: Optional["Project"], module_name: str, path: str) -> "ModuleV1":
return ModuleV1(project, path)
class ModuleRepo:
def clone(self, name: str, dest: str) -> bool:
raise NotImplementedError("Abstract method")
def path_for(self, name: str) -> Optional[str]:
# same class is used for search path and remote repos, perhaps not optimal
raise NotImplementedError("Abstract method")
def is_empty(self) -> bool:
"""
Return true if this repo will never produce any repo.
Used to distinguish an empty compose repo from a non-empty one
"""
return False
class CompositeModuleRepo(ModuleRepo):
def __init__(self, children: list[ModuleRepo]) -> None:
self.children = children
def clone(self, name: str, dest: str) -> bool:
for child in self.children:
if child.clone(name, dest):
return True
return False
def path_for(self, name: str) -> Optional[str]:
for child in self.children:
result = child.path_for(name)
if result is not None:
return result
return None
def is_empty(self) -> bool:
return reduce(operator.and_, (child.is_empty() for child in self.children), True)
class LocalFileRepo(ModuleRepo):
def __init__(self, root: str, parent_root: Optional[str] = None) -> None:
if parent_root is None:
self.root = os.path.abspath(root)
else:
self.root = os.path.join(parent_root, root)
def clone(self, name: str, dest: str) -> bool:
try:
gitprovider.clone(os.path.join(self.root, name), os.path.join(dest, name))
return True
except Exception:
LOGGER.debug("could not clone repo", exc_info=True)
return False
def path_for(self, name: str) -> Optional[str]:
path = os.path.join(self.root, name)
if os.path.exists(path):
return path
return None
def is_empty(self) -> bool:
# May have or receive items
return False
class RemoteRepo(ModuleRepo):
def __init__(self, baseurl: str) -> None:
self.baseurl = baseurl
def clone(self, name: str, dest: str) -> bool:
url, nbr_substitutions = re.subn(r"{}", name, self.baseurl)
if nbr_substitutions > 1:
raise InvalidMetadata(msg=f"Wrong repo path at {self.baseurl} : should only contain at most one {{}} pair")
elif nbr_substitutions == 0:
url = self.baseurl + name
try:
gitprovider.clone(url, os.path.join(dest, name))
return True
except Exception:
LOGGER.debug("could not clone repo", exc_info=True)
return False
def path_for(self, name: str) -> Optional[str]:
raise NotImplementedError("Should only be called on local repos")
def is_empty(self) -> bool:
# May have or receive items
return False
def make_repo(path: str, root: Optional[str] = None) -> Union[LocalFileRepo, RemoteRepo]:
"""
Returns the appropriate `ModuleRepo` instance (v1) for the given path.
"""
# check that the second char is not a colon (windows)
if ":" in path and path[1] != ":":
return RemoteRepo(path)
else:
return LocalFileRepo(path, parent_root=root)
def merge_specs(mainspec: "Dict[str, List[InmantaModuleRequirement]]", new: "List[InmantaModuleRequirement]") -> None:
"""Merge two maps str->[TMetadata] by concatting their lists."""
for req in new:
key = req.name
if key not in mainspec:
mainspec[key] = [req]
else:
mainspec[key] = mainspec[key] + [req]
[docs]
@stable_api
class InstallMode(str, Enum):
"""
The module install mode determines what version of a module should be selected when a module is downloaded.
"""
release = "release"
"""
Only use a released version that is compatible with the current compiler and any version constraints defined in the
``requires`` lists for the project or any other modules (see :class:`ProjectMetadata`, :class:`ModuleV1Metadata` and
:class:`ModuleV2Metadata`).
A module is considered released in the following situations:
* For V1 modules: There is a tag on a commit. This tag is a valid, pep440 compliant version identifier and it's not a
prelease version.
* For V2 modules: The python package was published on a Python package repository, the version identifier is pep440
compliant and is not a prerelease version.
"""
prerelease = "prerelease"
"""
Similar to :attr:`InstallMode.release` but prerelease versions are allowed as well.
"""
master = "master"
"""
For V1 modules: Use the module's master branch.
For V2 modules: Equivalent to :attr:`InstallMode.prerelease`
"""
INSTALL_OPTS: list[str] = [mode.value for mode in InstallMode] # Part of the stable API
"""
List of possible module install modes, kept for backwards compatibility. New code should use :class:`InstallMode` instead.
"""
@stable_api
class FreezeOperator(str, Enum):
eq = "=="
compatible = "~="
ge = ">="
@classmethod
def get_regex_for_validation(cls) -> str:
all_values = [re.escape(o.value) for o in cls]
return f"^({'|'.join(all_values)})$"
class RawParser(ABC):
@classmethod
@abstractmethod
def parse(cls, source: Union[str, TextIO]) -> Mapping[str, object]:
raise NotImplementedError()
class YamlParser(RawParser):
@classmethod
def parse(cls, source: Union[str, TextIO]) -> Mapping[str, object]:
try:
return yaml.safe_load(source)
except yaml.YAMLError as e:
if isinstance(source, TextIOBase):
raise InvalidMetadata(msg=f"Invalid yaml syntax in {source.name}:\n{str(e)}") from e
else:
raise InvalidMetadata(msg=str(e)) from e
class CfgParser(RawParser):
@classmethod
def parse(cls, source: Union[str, TextIO]) -> Mapping[str, object]:
try:
config: configparser.ConfigParser = configparser.ConfigParser()
config.read_string(source if isinstance(source, str) else source.read())
if config.has_option("options", "install_requires"):
install_requires = [r for r in config.get("options", "install_requires").split("\n") if r]
else:
install_requires = []
version_tag: str = config.get("egg_info", "tag_build", fallback="")
return {**config["metadata"], "install_requires": install_requires, "version_tag": version_tag}
except configparser.Error as e:
if isinstance(source, TextIOBase):
raise InvalidMetadata(msg=f"Invalid syntax in {source.name}:\n{str(e)}") from e
else:
raise InvalidMetadata(msg=str(e)) from e
except KeyError as e:
if isinstance(source, TextIOBase):
raise InvalidMetadata(msg=f"Metadata file {source.name} doesn't have a metadata section.") from e
else:
raise InvalidMetadata(msg="Metadata file doesn't have a metadata section.") from e
class RequirementsTxtFile:
"""
This class caches the requirements specified in the requirements.txt file in memory.
As such, this class will miss any updated applied after the object is constructed.
"""
def __init__(self, filename: str, create_file_if_not_exists: bool = False) -> None:
self._filename = filename
if not os.path.exists(self._filename):
if create_file_if_not_exists:
with open(self._filename, "w", encoding="utf-8"):
pass
else:
raise FileNotFoundError(f"File {filename} does not exist")
self._requirements = RequirementsTxtParser.parse(filename)
def has_requirement_for(self, pkg_name: str) -> bool:
"""
Returns True iff this requirements.txt file contains the given package name. The given `pkg_name` is matched
case insensitive against the requirements in this RequirementsTxtFile.
"""
canonicalized: str = packaging.utils.canonicalize_name(pkg_name)
return any(r.name == canonicalized for r in self._requirements)
def set_requirement_and_write(self, requirement: inmanta.util.CanonicalRequirement) -> None:
"""
Add the given requirement to the requirements.txt file and update the file on disk, replacing any existing constraints
on this package.
"""
new_content_file = RequirementsTxtParser.get_content_with_dep_removed(
self._filename, remove_dep_on_pkg=requirement.name
)
new_content_file = new_content_file.rstrip()
if new_content_file:
new_content_file = f"{new_content_file}\n{requirement}"
else:
new_content_file = str(requirement)
self._write(new_content_file)
def remove_requirement_and_write(self, pkg_name: str) -> None:
"""
Remove the dependency on the given package and update the file on disk.
"""
if not self.has_requirement_for(pkg_name):
return
new_content_file = RequirementsTxtParser.get_content_with_dep_removed(self._filename, remove_dep_on_pkg=pkg_name)
self._write(new_content_file)
def _write(self, new_content_file: str) -> None:
"""
Write the file to disk.
"""
with open(self._filename, "w", encoding="utf-8") as fd:
fd.write(new_content_file)
self._requirements = RequirementsTxtParser.parse(self._filename)
TMetadata = TypeVar("TMetadata", bound="Metadata")
@stable_api
class Metadata(BaseModel):
model_config: typing.ClassVar[pydantic.ConfigDict] = pydantic.ConfigDict(
# By default pydantic does not coerce to strings. However, our raw metadata may be typed incorrectly,
# e.g. in case of yaml, 1.0.0 is a string but 1.0 is a float
coerce_numbers_to_str=True
)
name: str
description: Optional[str] = None
freeze_recursive: bool = False
freeze_operator: str = Field(default="~=", pattern=FreezeOperator.get_regex_for_validation())
_raw_parser: typing.ClassVar[type[RawParser]]
@classmethod
def parse(cls: type[TMetadata], source: Union[str, TextIO]) -> TMetadata:
raw: Mapping[str, object] = cls._raw_parser.parse(source)
try:
return cls(
**raw,
)
except ValidationError as e:
if isinstance(source, TextIOBase):
raise InvalidMetadata(msg=f"Metadata defined in {source.name} is invalid:", validation_error=e) from e
else:
raise InvalidMetadata(msg=str(e), validation_error=e) from e
class MetadataFieldRequires(BaseModel):
requires: list[str] = []
@classmethod
def to_list(cls, v: object) -> list[object]:
if v is None:
return []
if not isinstance(v, list):
return [v]
return v
@field_validator("requires", mode="before")
@classmethod
def requires_to_list(cls, v: object) -> object:
return cls.to_list(v)
TModuleMetadata = TypeVar("TModuleMetadata", bound="ModuleMetadata")
@stable_api
class ModuleV1Metadata(ModuleMetadata, MetadataFieldRequires):
"""
:param name: The name of the module.
:param description: (Optional) The description of the module
:param version: The version of the inmanta module.
:param license: The license for this module
:param compiler_version: (Optional) The minimal compiler version required to compile this module.
:param requires: (Optional) Model files import other modules. These imports do not determine a version, this
is based on the install_mode setting of the project. Modules and projects can constrain a version in the
requires setting. Similar to the module, version constraints are defined using
`PEP440 syntax <https://www.python.org/dev/peps/pep-0440/#version-specifiers>`_.
:param freeze_recursive: (Optional) This key determined if the freeze command will behave recursively or not. If
freeze_recursive is set to false or not set, the current version of all modules imported directly in any submodule of
this module will be set in module.yml. If it is set to true, all modules imported in any of those modules will also be
set.
:param freeze_operator: (Optional) This key determines the comparison operator used by the freeze command.
Valid values are [==, ~=, >=]. *Default is '~='*
"""
compiler_version: Optional[str] = None
_raw_parser: typing.ClassVar[type[YamlParser]] = YamlParser
@field_validator("compiler_version", mode="after")
@classmethod
def is_pep440_version_v1(cls, v: str) -> str:
return cls.is_pep440_version(v)
@classmethod
def _substitute_version(cls: type[TModuleMetadata], source: str, new_version: str, version_tag: str = "") -> str:
new_version_obj: packaging.version.Version = cls._compose_full_version(new_version, version_tag)
return re.sub(r"([\s]version\s*:\s*['\"\s]?)[^\"'}\s]+(['\"]?)", rf"\g<1>{new_version_obj}\g<2>", source)
def get_full_version(self) -> packaging.version.Version:
return packaging.version.Version(self.version)
def to_v2(self) -> "ModuleV2Metadata":
values = self.dict()
if values["description"] is not None:
values["description"] = values["description"].replace("\n", " ")
del values["compiler_version"]
install_requires = [ModuleV2Source.get_package_name_for(r) for r in values["requires"]]
del values["requires"]
values["name"] = ModuleV2Source.get_package_name_for(values["name"])
values["version"], values["version_tag"] = ModuleV2Metadata.split_version(packaging.version.Version(values["version"]))
return ModuleV2Metadata(**values, install_requires=install_requires)
@stable_api
class ModuleV2Metadata(ModuleMetadata):
"""
:param name: The name of the python package that is generated when packaging this module.
This name should follow the format "inmanta-module-<module-name>"
:param description: (Optional) The description of the module
:param version: The version of the inmanta module. Should not contain build tag.
:param version_tag: The build tag for this module version, i.e. "dev0"
:param license: The license for this module
:param freeze_recursive: (Optional) This key determined if the freeze command will behave recursively or not. If
freeze_recursive is set to false or not set, the current version of all modules imported directly in any submodule of
this module will be set in setup.cfg. If it is set to true, all modules imported in any of those modules will also be
set.
:param freeze_operator: (Optional) This key determines the comparison operator used by the freeze command.
Valid values are [==, ~=, >=]. *Default is '~='*
:param install_requires: The Python packages this module depends on.
:param four_digit_version: Whether to use a four-digit version format (e.g. 23.4.1.0) instead of the standard
three-digit format (e.g. 23.4.1).
Even without setting this option to True, a fourth digit is allowed as a revision digit. However, the tooling
will not use it by default. For example, you can use the '--revision' flag with the command
'inmanta module release --dev --revision' even if 'four_digit_version' is set to False. But if you don't specify
'--revision' explicitly, the tooling will consider 'patch' to be the lowest increment.
"""
install_requires: list[str]
version_tag: str = ""
four_digit_version: bool = False
_raw_parser: typing.ClassVar[type[CfgParser]] = CfgParser
@field_validator("version")
@classmethod
def is_base_version(cls, v: str) -> str:
version_obj: packaging.version.Version = packaging.version.Version(v)
if str(version_obj) != version_obj.base_version:
raise ValueError(
"setup.cfg version should be a base version without tag. Use egg_info.tag_build to configure a tag"
)
return v
@classmethod
def split_version(cls, v: packaging.version.Version) -> tuple[str, str]:
"""
Splits a full version in a base version and a tag.
"""
def get_version_tag(v: packaging.version.Version) -> str:
if v.is_devrelease:
return f"dev{v.dev}"
if v.is_prerelease:
# e.g. rc
assert v.pre is not None
return f"{v.pre[0]}{v.pre[1]}"
if v.is_postrelease:
return f"post{v.post}"
return ""
return v.base_version, get_version_tag(v)
@field_validator("version_tag")
@classmethod
def is_valid_version_tag(cls, v: str) -> str:
try:
cls._compose_full_version("1.0.0", v)
except packaging.version.InvalidVersion as e:
raise ValueError(f"Version tag {v} is not PEP440 compliant") from e
return v
@field_validator("name")
@classmethod
def validate_name_field(cls, v: str) -> str:
"""
The name field of a V2 module should follow the format "inmanta-module-<module-name>"
"""
if not v.startswith(ModuleV2.PKG_NAME_PREFIX) or not len(v) > len(ModuleV2.PKG_NAME_PREFIX):
raise ValueError(f'The name field should follow the format "{ModuleV2.PKG_NAME_PREFIX}<module-name>"')
if "_" in v:
raise ValueError("Module names should not contain underscores, use '-' instead.")
return v
def get_full_version(self) -> packaging.version.Version:
return self._compose_full_version(self.version, self.version_tag)
@classmethod
def _substitute_version(cls: type[TModuleMetadata], source: str, new_version: str, version_tag: str = "") -> str:
result = re.sub(
r"(\[metadata\][^\[]*?\bversion[ \t\f\v]*=[ \t\f\v]*)[\S]+(\n|$)", rf"\g<1>{new_version}\n", source, flags=re.DOTALL
)
if "[egg_info]" not in result:
result = f"{result}\n[egg_info]\ntag_build = {version_tag}"
elif "tag_build" not in result:
result = result.replace("[egg_info]", f"[egg_info]\ntag_build = {version_tag}")
else:
result = re.sub(
r"(\[egg_info\][^\[]*[ \t\f\v]*tag_build[ \t\f\v]*=)[ \t\f\v]*[\S]*(\n|$)",
rf"\g<1> {version_tag}\n",
result,
)
return result
def to_config(self, inp: Optional[configparser.ConfigParser] = None) -> configparser.ConfigParser:
if inp:
out = inp
else:
out = configparser.ConfigParser()
if not out.has_section("metadata"):
out.add_section("metadata")
for k, v in self.dict(exclude_none=True, exclude={"install_requires", "version_tag"}).items():
out.set("metadata", k, str(v))
if self.version_tag:
if not out.has_section("egg_info"):
out.add_section("egg_info")
out.set("egg_info", "tag_build", self.version_tag)
return out
@stable_api
class ModuleRepoType(Enum):
git = "git"
package = "package"
@stable_api
class ModuleRepoInfo(BaseModel):
url: str
type: ModuleRepoType = ModuleRepoType.git
@dataclass(frozen=True)
class RelationPrecedenceRule:
"""
Represents a rule defined in the relation precedence policy of the project.yml file.
Indicates that list `first_type.first_relation_name` should be frozen
before `then_type.then_relation_name`.
"""
first_type: str
first_relation_name: str
then_type: str
then_relation_name: str
@classmethod
def from_string(cls, rule: str) -> "RelationPrecedenceRule":
"""
Create a RelationPrecedencePolicy object from its string representation.
"""
match: Optional[re.Match[str]] = ProjectMetadata._re_relation_precedence_rule_compiled.fullmatch(rule.strip())
if not match:
raise Exception(
f"Invalid rule in relation precedence policy: {rule}. "
f"Expected syntax: '<entity-type>.<relation-name> before <entity-type>.<relation-name>'"
)
group_dict = match.groupdict()
return cls(
first_type=group_dict["ft"],
first_relation_name=group_dict["fr"],
then_type=group_dict["tt"],
then_relation_name=group_dict["tr"],
)
def __str__(self) -> str:
return f"{self.first_type}.{self.first_relation_name} before {self.then_type}.{self.then_relation_name}"
[docs]
@stable_api
class ProjectPipConfig(inmanta.data.model.PipConfig):
"""
:param index_url: one pip index url for this project.
:param extra_index_url: additional pip index urls for this project. This is generally only
recommended if all configured indexes are under full control of the end user to protect against dependency
confusion attacks. See the `pip install documentation <https://pip.pypa.io/en/stable/cli/pip_install/>`_ and
`PEP 708 (draft) <https://peps.python.org/pep-0708/>`_ for more information.
:param pre: allow pre-releases when installing Python packages, i.e. pip --pre.
If null, behaves like false unless pip.use-system-config=true, in which case system config is respected.
:param use_system_config: defaults to false.
When true, sets the pip's index url, extra index urls and pre according to the respective settings outlined above
but otherwise respect any pip environment variables and/or config in the pip config file,
including any extra-index-urls.
If no indexes are configured in pip.index-url/pip.extra-index-url, this option falls back to pip's default behavior,
meaning it uses the pip index url from the environment, the config file, or PyPi, in that order.
For development, it is recommended to set this option to false, both for portability
(and related compatibility with tools like pytest-inmanta-lsm) and for security
(dependency confusion attacks could affect users that aren't aware that inmanta installs Python packages).
See the :ref:`section<specify_location_pip>` about setting up pip index for more information.
"""
@pydantic.model_validator(mode="before")
@classmethod
def __alert_extra_field__(cls, values: dict[str, object]) -> dict[str, object]:
extra_fields = values.keys() - cls.model_fields.keys() - {v.alias for v in cls.model_fields.values()}
for extra_field in extra_fields:
# This is cfr adr 0000
warnings.warn(
ProjectConfigurationWarning(f"Found unexpected configuration value 'pip.{extra_field}' in 'project.yml'")
)
return values
[docs]
@stable_api
class ModuleLike(ABC, Generic[TMetadata]):
"""
Commons superclass for projects and modules, which are both versioned by git
:ivar name: The name for this module like instance, in the context of the Inmanta DSL.
"""
def __init__(self, path: str) -> None:
"""
:param path: root git directory
"""
self._path = path
self._metadata = self._get_metadata_from_disk()
self.name = self.get_name_from_metadata(self._metadata)
[docs]
@classmethod
@abstractmethod
# Union[Project, ModuleV1, ModuleV2] would be more strict than ModuleLike[Any] but very restrictive with potential stable
# API extension in mind.
def from_path(cls, path: str) -> Optional["ModuleLike"]:
"""
Get a concrete module like instance from a path. Returns None when no project or module is present at the given path.
"""
subs: tuple[type[ModuleLike], ...] = (Project, Module)
for sub in subs:
instance: Optional[ModuleLike] = sub.from_path(path)
if instance is not None:
return instance
return None
@classmethod
def get_first_directory_containing_file(cls, cur_dir: str, filename: str) -> str:
"""
Travel up in the directory structure until a file with the given name is found.
"""
fq_path_to_filename = os.path.join(cur_dir, filename)
if os.path.exists(fq_path_to_filename):
return cur_dir
parent_dir = os.path.abspath(os.path.join(cur_dir, os.pardir))
if parent_dir == cur_dir:
raise FileNotFoundError(f"No file with name {filename} exists in any of the parent directories")
return cls.get_first_directory_containing_file(parent_dir, filename)
def _get_metadata_from_disk(self) -> TMetadata:
metadata_file_path = self.get_metadata_file_path()
if not os.path.exists(metadata_file_path):
raise ModuleMetadataFileNotFound(f"Metadata file {metadata_file_path} does not exist")
with open(metadata_file_path, encoding="utf-8") as fd:
return self.get_metadata_from_source(source=fd)
def get_metadata_from_source(self, source: Union[str, TextIO]) -> TMetadata:
"""
:param source: Either the yaml content as a string or an input stream from the yaml file
"""
metadata_type: type[TMetadata] = self.get_metadata_file_schema_type()
return metadata_type.parse(source)
@property
def path(self) -> str:
return self._path
@property
def metadata(self) -> TMetadata:
return self._metadata
@property
def freeze_recursive(self) -> bool:
return self._metadata.freeze_recursive
@property
def freeze_operator(self) -> str:
return self._metadata.freeze_operator
@abstractmethod
def get_metadata_file_path(self) -> str:
raise NotImplementedError()
@classmethod
@abstractmethod
def get_metadata_file_schema_type(cls) -> type[TMetadata]:
raise NotImplementedError()
@classmethod
@abstractmethod
def get_name_from_metadata(cls, metadata: TMetadata) -> str:
raise NotImplementedError()
@abstractmethod
def add_module_requirement_persistent(self, requirement: InmantaModuleRequirement, add_as_v1_module: bool) -> None:
"""
Add a new module requirement to the files that define requirements on other modules. This could include the
requirements.txt file next to the metadata file of the project or module. This method updates the files on disk.
This operation may make this object invalid or outdated if the persisted metadata has been updated since creating this
instance.
"""
raise NotImplementedError()
@abstractmethod
def get_module_requirements(self) -> list[str]:
"""
Returns all requirements this module has on other modules, regardless of module generation. Requirements should be on
inmanta module names, not Python package names.
"""
raise NotImplementedError()
def has_module_requirement(self, module_name: str) -> bool:
"""
:param module_name: The module name in lower cases.
:returns: True iff the module defines a dependency on the given module in one of the files that
declare dependencies module dependencies. This could include the requirements.txt file
next to the metadata file of the project or module.
"""
return any(module_name == InmantaModuleRequirement.parse(req).name for req in self.get_module_requirements())
def _load_file(self, ns: Namespace, file: str) -> tuple[list[Statement], BasicBlock]:
ns.location = Location(file, 1)
statements = [] # type: List[Statement]
stmts = plyInmantaParser.parse(ns, file)
block = BasicBlock(ns)
for s in stmts:
if isinstance(s, BiStatement):
statements.append(s)
block.add(s)
elif isinstance(s, DefinitionStatement):
statements.append(s)
block.add_definition(s)
elif isinstance(s, str) or isinstance(s, LocatableString):
pass
else:
assert isinstance(s, DynamicStatement)
block.add(s)
return (statements, block)
def _get_requirements_txt_as_list(self) -> list[str]:
"""
Returns the contents of the requirements.txt file as a list of requirements, if it exists.
"""
file = os.path.join(self._path, "requirements.txt")
if os.path.exists(file):
return RequirementsTxtParser.parse_requirements_as_strs(file)
else:
return []
@abstractmethod
def get_all_python_requirements_as_list(self) -> list[str]:
"""
Returns all Python requirements specified by this module like, including requirements on V2 modules.
"""
raise NotImplementedError()
def get_strict_python_requirements_as_list(self) -> list[str]:
"""
Returns the strict python requirements specified by this module like, meaning all Python requirements excluding those on
inmanta modules.
"""
return [req for req in self.get_all_python_requirements_as_list() if not req.startswith(ModuleV2.PKG_NAME_PREFIX)]
def get_module_v2_requirements(self) -> list[InmantaModuleRequirement]:
"""
Returns all requirements this module like has on v2 modules.
"""
return [
InmantaModuleRequirement.parse(ModuleV2Source.get_inmanta_module_name(req))
for req in self.get_all_python_requirements_as_list()
if req.startswith(ModuleV2.PKG_NAME_PREFIX)
]
class ModuleLikeWithYmlMetadataFile(ABC):
@abstractmethod
def get_metadata_file_path(self) -> str:
raise NotImplementedError()
def add_module_requirement_to_requires_and_write(self, requirement: InmantaModuleRequirement) -> None:
"""
Updates the metadata file of the given project or V1 module by adding the given requirement the `requires` section.
:param requirement: The requirement to add.
"""
# Parse cfg file
content: CommentedMap = PreservativeYamlParser.parse(self.get_metadata_file_path())
# Update requires
if "requires" in content and content["requires"]:
existing_matching_reqs: list[str] = [
r for r in content["requires"] if InmantaModuleRequirement.parse(r).name == requirement.name
]
for r in existing_matching_reqs:
content["requires"].remove(r)
content["requires"].append(str(requirement))
else:
content["requires"] = [str(requirement)]
# Write file back to disk
PreservativeYamlParser.dump(self.get_metadata_file_path(), content)
def has_module_requirement_in_requires(self, module_name: str) -> bool:
"""
Returns true iff the given module is present in the `requires` list of the given project or module metadata file.
"""
content: CommentedMap = PreservativeYamlParser.parse(self.get_metadata_file_path())
if "requires" not in content:
return False
return any(r for r in content["requires"] if InmantaModuleRequirement.parse(r).name == module_name)
def remove_module_requirement_from_requires_and_write(self, module_name: str) -> None:
"""
Updates the metadata file of the given project or module by removing the given requirement frm the `requires` section.
:param module_name: The Inmanta module name is lower case.
"""
if not self.has_module_requirement_in_requires(module_name):
return
content: CommentedMap = PreservativeYamlParser.parse(self.get_metadata_file_path())
content["requires"] = [r for r in content["requires"] if InmantaModuleRequirement.parse(r).name != module_name]
PreservativeYamlParser.dump(self.get_metadata_file_path(), content)
[docs]
@stable_api
class Project(ModuleLike[ProjectMetadata], ModuleLikeWithYmlMetadataFile):
"""
An inmanta project
:ivar modules: The collection of loaded modules for this project.
:ivar module_source: The v2 module source for this project.
"""
PROJECT_FILE = "project.yml"
_project = None
def __init__(
self,
path: str,
autostd: bool = True,
main_file: str = "main.cf",
venv_path: Optional[Union[str, "env.VirtualEnv"]] = None,
attach_cf_cache: bool = True,
strict_deps_check: Optional[bool] = None,
) -> None:
"""
Initialize the project, this includes
* Loading the project.yaml (into self._metadata)
* Setting paths from project.yaml
* Loading all modules in the module path (into self.modules)
It does not include
* verify if project.yml corresponds to the modules in self.modules
Instances of this class can be created by in two different ways:
1) Via the Project.get() method
2) Via the constructor: Always call the Project.set() method after the constructor call.
Project instances should only be created via the constructor in test cases.
:param path: The directory where the project is located
:param venv_path: Path to the directory that will contain the Python virtualenv.
This can be an existing or a non-existing directory.
:param strict_deps_check: Overrides the strict_deps_check configuration option from the project.yml file if the
provided value is different from None.
"""
if not os.path.exists(path):
raise ProjectNotFoundException(f"Directory {path} doesn't exist")
super().__init__(path)
self.project_path = path
self.main_file = main_file
self._ast_cache: Optional[tuple[list[Statement], BasicBlock]] = None # Cache for expensive method calls
self._metadata.modulepath = [os.path.abspath(os.path.join(path, x)) for x in self._metadata.modulepath]
self.module_source: ModuleV2Source = ModuleV2Source()
self.module_source_v1: ModuleV1Source = ModuleV1Source(
local_repo=CompositeModuleRepo([make_repo(x) for x in self.modulepath]),
remote_repo=CompositeModuleRepo(
[make_repo(repo.url, root=path) for repo in self._metadata.repo if repo.type == ModuleRepoType.git]
),
)
if not self.module_source_v1.remote_repo.is_empty():
# This is only relevant if we have a V1 module source
if self._metadata.downloadpath is not None:
self._metadata.downloadpath = os.path.abspath(os.path.join(path, self._metadata.downloadpath))
if self._metadata.downloadpath not in self._metadata.modulepath:
LOGGER.warning("Downloadpath is not in module path! Module install will not work as expected")
if not os.path.exists(self._metadata.downloadpath):
os.mkdir(self._metadata.downloadpath)
self.virtualenv: env.ActiveEnv
if venv_path is None:
self.virtualenv = env.process_env
else:
if isinstance(venv_path, env.VirtualEnv):
self.virtualenv = venv_path
else:
venv_path = os.path.abspath(venv_path)
self.virtualenv = env.VirtualEnv(venv_path)
self.loaded = False
self.modules: dict[str, Module] = {}
self.root_ns = Namespace("__root__")
self.autostd = autostd
if attach_cf_cache:
cache_manager.attach_to_project(path)
if strict_deps_check is not None:
self.strict_deps_check = strict_deps_check
else:
self.strict_deps_check = self._metadata.strict_deps_check
self._complete_ast: Optional[tuple[list[Statement], list[BasicBlock]]] = None
# Cache for the complete ast
def get_relation_precedence_policy(self) -> list[RelationPrecedenceRule]:
return self._metadata.get_relation_precedence_rules()
@classmethod
def from_path(cls: type[TProject], path: str) -> Optional[TProject]:
return cls(path=path) if os.path.exists(os.path.join(path, cls.PROJECT_FILE)) else None
def install_module(self, module_req: InmantaModuleRequirement, install_as_v1_module: bool) -> None:
"""
Install the given module. If attempting to as v2, this method implicitly trusts any Python package with the
corresponding name.
Does not reinstall if the given module requirement is already met.
"""
installed_module: Optional[Module]
if install_as_v1_module:
installed_module = self.module_source_v1.install(self, module_spec=[module_req])
else:
installed_module = self.module_source.install(self, module_spec=[module_req])
if not installed_module:
raise ModuleNotFoundException(
f"Failed to install module {module_req} as {'V1' if install_as_v1_module else 'V2'} module"
)
self.modules[installed_module.name] = installed_module
@classmethod
def get_name_from_metadata(cls, metadata: ProjectMetadata) -> str:
return metadata.name
@property
def install_mode(self) -> InstallMode:
return self._metadata.install_mode
@property
def modulepath(self) -> list[str]:
return self._metadata.modulepath
@property
def downloadpath(self) -> Optional[str]:
return self._metadata.downloadpath
def get_metadata_file_path(self) -> str:
return os.path.join(self._path, Project.PROJECT_FILE)
@classmethod
def get_metadata_file_schema_type(cls) -> type[ProjectMetadata]:
return ProjectMetadata
@classmethod
def get_project_dir(cls, cur_dir: str) -> str:
"""
Find the project directory where we are working in. Traverse up until we find Project.PROJECT_FILE or reach /
"""
try:
return cls.get_first_directory_containing_file(cur_dir, Project.PROJECT_FILE)
except FileNotFoundError:
raise ProjectNotFoundException("Unable to find an inmanta project (project.yml expected)")
[docs]
@classmethod
def get(cls, main_file: str = "main.cf", strict_deps_check: Optional[bool] = None) -> "Project":
"""
Get the instance of the project
"""
if cls._project is None:
cls._project = Project(cls.get_project_dir(os.curdir), main_file=main_file, strict_deps_check=strict_deps_check)
return cls._project
[docs]
@classmethod
def set(cls, project: "Project", *, clean: bool = True) -> None:
"""
Set the instance of the project.
:param clean: Clean up all side effects of any previously loaded projects. Clears the registered plugins and loaded
Python plugins packages.
"""
cls._project = project
os.chdir(project._path)
if clean:
plugins.PluginMeta.clear()
loader.unload_inmanta_plugins()
loader.PluginModuleFinder.reset()
[docs]
def install_modules(self, *, bypass_module_cache: bool = False, update_dependencies: bool = False) -> None:
"""
Installs all modules, both v1 and v2.
:param bypass_module_cache: Fetch the module data from disk even if a cache entry exists.
:param update_dependencies: Update all Python dependencies (recursive) to their latest versions.
"""
if not self.is_using_virtual_env():
self.use_virtual_env()
self.load_module_recursive(install=True, bypass_module_cache=bypass_module_cache)
# Verify non-python part
self.verify_modules_cache()
self.verify_module_version_compatibility()
# do python install
pyreq: list[inmanta.util.CanonicalRequirement] = inmanta.util.parse_requirements(self.collect_python_requirements())
if len(pyreq) > 0:
# upgrade both direct and transitive module dependencies: eager upgrade strategy
self.virtualenv.install_for_config(
pyreq,
config=self.metadata.pip,
upgrade=update_dependencies,
upgrade_strategy=env.PipUpgradeStrategy.EAGER,
)
self.verify()
[docs]
def load(self, install: bool = False) -> None:
"""
Load this project's AST and plugins.
:param install: Whether to install the project's modules before attempting to load it.
"""
if not self.loaded:
if not self.is_using_virtual_env():
self.use_virtual_env()
if install:
self.install_modules()
self.get_complete_ast()
self.loaded = True
start = time()
self.verify()
self.load_plugins()
end = time()
LOGGER.debug("Plugin loading took %0.03f seconds", end - start)
def invalidate_state(self, module: Optional[str] = None) -> None:
"""
Invalidate this project's state, forcing a reload next time load is called.
:param module: Invalidate the state for a single module. If omitted, invalidates the state for all modules.
"""
if module is not None:
if module in self.modules:
del self.modules[module]
else:
self.modules = {}
self.loaded = False
self._ast_cache = None
self._complete_ast = None
def get_ast(self) -> tuple[list[Statement], BasicBlock]:
if self._ast_cache is None:
self._ast_cache = self.__load_ast()
return self._ast_cache
def get_imports(self) -> list[DefineImport]:
(statements, _) = self.get_ast()
imports = [x for x in statements if isinstance(x, DefineImport)]
if self.autostd:
std_locatable = LocatableString("std", Range("__internal__", 1, 1, 1, 1), -1, self.root_ns)
imp = DefineImport(std_locatable, std_locatable)
imp.location = std_locatable.location
imports.insert(0, imp)
return imports
def get_complete_ast(self) -> tuple[list[Statement], list[BasicBlock]]:
if self._complete_ast is not None:
return self._complete_ast
start = time()
# load ast
(statements, block) = self.get_ast()
blocks = [block]
statements = [x for x in statements]
for _, nstmt, nb in self.load_module_recursive():
statements.extend(nstmt)
blocks.append(nb)
end = time()
LOGGER.debug("Parsing took %0.03f seconds", end - start)
cache_manager.log_stats()
self._complete_ast = (statements, blocks)
return self._complete_ast
def __load_ast(self) -> tuple[list[Statement], BasicBlock]:
main_ns = Namespace("__config__", self.root_ns)
return self._load_file(main_ns, os.path.join(self.project_path, self.main_file))
def get_modules(self) -> dict[str, "Module"]:
self.load()
return self.modules
def get_module(
self,
full_module_name: str,
*,
allow_v1: bool = False,
install_v1: bool = False,
install_v2: bool = False,
bypass_module_cache: bool = False,
) -> "Module":
"""
Get a module instance for a given module name. Caches modules by top level name for later access. The install parameters
allow to install the module if it has not been installed yet. If both install parameters are False, the module is
expected to be preinstalled.
:param full_module_name: The full name of the module. If this is a submodule, the corresponding top level module is
used.
:param allow_v1: Allow this module to be loaded as v1.
:param install_v1: Allow installing this module as v1 if it has not yet been installed. This option is ignored if
allow_v1=False.
:param install_v2: Allow installing this module as v2 if it has not yet been installed, implicitly trusting any Python
package with the corresponding name.
:param bypass_module_cache: Fetch the module data from disk even if a cache entry exists.
"""
parts = full_module_name.split("::")
module_name = parts[0]
def use_module_cache() -> bool:
if bypass_module_cache:
return False
if module_name not in self.modules:
return False
if not allow_v1 and not isinstance(self.modules[module_name], ModuleV2):
# Reload module because it was loaded as a V1 module, while it should be loaded as a V2 module
return False
return True
if use_module_cache():
return self.modules[module_name]
return self.load_module(module_name, allow_v1=allow_v1, install_v1=install_v1, install_v2=install_v2)
def load_module_recursive(
self, install: bool = False, bypass_module_cache: bool = False
) -> list[tuple[str, list[Statement], BasicBlock]]:
"""
Loads this project's modules and submodules by recursively following import statements starting from the project's main
file.
For each imported submodule, return a triple of name, statements, basicblock
:param install: Run in install mode, installing any modules that have not yet been installed. If install is False,
all modules are expected to be preinstalled. For security reasons installation of v2 modules is based on explicit
Python requirements rather than on imports.
:param bypass_module_cache: Fetch the module data from disk even if a cache entry exists.
"""
ast_by_top_level_mod: dict[str, list[tuple[str, list[Statement], BasicBlock]]] = defaultdict(list)
# List of imports that still have to be loaded.
# get imports: don't use a set because this collection is used to drive control flow and we want to keep control flow as
# deterministic as possible
imports: list[DefineImport] = [x for x in self.get_imports()]
# All imports of the entire project
all_imports: set[DefineImport] = set(imports)
v2_modules: set[str] = set()
"""
Set of modules that should be loaded as a V2 module.
"""
set_up: set[str] = set()
"""
Set of top level modules that have been set up (setup_module()).
"""
done: dict[str, dict[str, DefineImport]] = defaultdict(dict)
"""
Submodules, grouped by top level that have been fully loaded: AST has been loaded into ast_by_top_level_mod and its
imports have been added to the queue (load_sub_module()).
"""
def require_v2(module_name: str) -> None:
"""
Ensure that the module with the given name gets loaded as a V2 module in a next iteration.
"""
if module_name in v2_modules:
# already v2
return
v2_modules.add(module_name)
if module_name in set_up:
set_up.remove(module_name)
if module_name in done:
# some submodules already loaded as v1 => reload
add_imports_to_be_loaded(done[module_name].values())
del done[module_name]
if module_name in ast_by_top_level_mod:
del ast_by_top_level_mod[module_name]
def load_module_v2_requirements(module_like: ModuleLike) -> None:
"""
Loads all v2 modules explicitly required by the supplied module like instance, installing them if install=True. If
any of these requirements have already been loaded as v1, queues them for reload.
"""
for requirement in module_like.get_module_v2_requirements():
# load module
self.get_module(
requirement.name,
allow_v1=False,
install_v2=install,
bypass_module_cache=bypass_module_cache,
)
# queue AST reload
require_v2(requirement.name)
def setup_module(module: Module) -> None:
"""
Sets up a top level module, making sure all its v2 requirements are loaded correctly. V2 modules do not support
import-based installation because of security reasons (it would mean we implicitly trust any `inmanta-module-x`
package for the module we're trying to load). As a result we need to make sure all required v2 modules are present
in a set up stage.
"""
if module.name in set_up:
# already set up
return
if isinstance(module, ModuleV2):
# register it as a v2 module so that any subsequent require_v2 calls
require_v2(module.name)
load_module_v2_requirements(module)
set_up.add(module.name)
def load_sub_module(module: Module, imp: DefineImport) -> None:
"""
Loads a submodule's AST and processes its imports. Enforces dependency generation directionality (v1 can depend on
v2 but not the other way around). If any modules have already been loaded with an incompatible generation, queues
them for reload.
Does not install any v2 modules.
"""
parts: list[str] = imp.name.split("::")
for i in range(1, len(parts) + 1):
subs = "::".join(parts[0:i])
if subs in done[module.name]:
continue
(nstmt, nb) = module.get_ast(subs)
done[module.name][subs] = imp
ast_by_top_level_mod[module.name].append((subs, nstmt, nb))
# get imports and add to list
subs_imports: list[DefineImport] = module.get_imports(subs)
add_imports_to_be_loaded(subs_imports)
if isinstance(module, ModuleV2):
# A V2 module can only depend on V2 modules. Ensure that all dependencies
# of this module will be loaded as a V2 module.
for dep_module_name in (subs_imp.name.split("::")[0] for subs_imp in subs_imports):
require_v2(dep_module_name)
def add_imports_to_be_loaded(new_imports: Iterable[DefineImport]) -> None:
imports.extend(new_imports)
all_imports.update(new_imports)
# load this project's v2 requirements
load_module_v2_requirements(self)
# Loop over imports. For each import:
# 1. Load the top level module. For v1, install if install=True, for v2 import-based installation is disabled for
# security reasons. v2 modules installation is done in step 2.
# 2. Set up top level module if it has not been set up yet, loading v2 requirements and installing them if install=True.
# 3. Load AST for imported submodule and its parent modules, queueing any transitive imports.
while len(imports) > 0:
imp: DefineImport = imports.pop()
ns: str = imp.name
module_name: str = ns.split("::")[0]
if ns in done[module_name]:
continue
try:
# get module
module: Module = self.get_module(
module_name,
allow_v1=module_name not in v2_modules,
install_v1=install,
install_v2=False,
bypass_module_cache=bypass_module_cache,
)
setup_module(module)
load_sub_module(module, imp)
except (InvalidModuleException, ModuleNotFoundException) as e:
raise ModuleLoadingException(ns, imp, e)
# Remove modules from self.modules that were not part of an import statement.
# This happens when a module or a project defines a V2 module requirement in
# its dependencies, but the requirement is never imported anywhere.
loaded_modules: set[str] = set(self.modules.keys())
imported_modules: set[str] = {i.name.split("::")[0] for i in all_imports}
for module_to_unload in loaded_modules - imported_modules:
self.invalidate_state(module_to_unload)
return list(chain.from_iterable(ast_by_top_level_mod.values()))
def load_module(
self,
module_name: str,
*,
allow_v1: bool = False,
install_v1: bool = False,
install_v2: bool = False,
) -> "Module":
"""
Get a module instance for a given module name. The install parameters allow to install the module if it has not been
installed yet. If both install parameters are False, the module is expected to be preinstalled.
:param module_name: The name of the module.
:param allow_v1: Allow this module to be loaded as v1.
:param install_v1: Allow installing this module as v1 if it has not yet been installed. This option is ignored if
allow_v1=False.
:param install_v2: Allow installing this module as v2 if it has not yet been installed, implicitly trusting any Python
package with the corresponding name.
"""
if not self.is_using_virtual_env():
self.use_virtual_env()
reqs: Mapping[str, list[InmantaModuleRequirement]] = self.collect_requirements()
module_reqs: list[InmantaModuleRequirement] = (
list(reqs[module_name]) if module_name in reqs else [InmantaModuleRequirement.parse(module_name)]
)
module: Optional[Union[ModuleV1, ModuleV2]]
try:
module = self.module_source.get_module(self, module_reqs, install=install_v2)
if module is not None and self.module_source_v1.path_for(module_name) is not None:
LOGGER.warning("Module %s is installed as a V1 module and a V2 module: V1 will be ignored.", module_name)
if module is None and allow_v1:
module = self.module_source_v1.get_module(self, module_reqs, install=install_v1)
except InvalidModuleException:
raise
except env.ConflictingRequirements:
raise
except env.PackageNotFound:
raise
except Exception as e:
raise InvalidModuleException(f"Could not load module {module_name}") from e
if module is None:
raise ModuleNotFoundException(
f"Could not find module {module_name}. Please make sure to add any module v2 requirements with"
" `inmanta module add --v2` and to install all the project's dependencies with `inmanta project install`."
)
if isinstance(module, ModuleV1):
warnings.warn(
InmantaWarning(
f"Loaded V1 module {module.name}. The use of V1 modules is deprecated."
" Use the equivalent V2 module instead."
)
)
self.modules[module_name] = module
return module
def load_plugins(self) -> None:
"""
Load all plug-ins
"""
if not self.loaded:
LOGGER.warning("loading plugins on project that has not been loaded completely")
# ensure the loader is properly configured
loader.PluginModuleFinder.configure_module_finder(self.modulepath)
for module in self.modules.values():
module.load_plugins()
def verify(self) -> None:
"""
Verifies the integrity of the loaded project, with respect to both inter-module requirements and the Python environment.
"""
LOGGER.info("verifying project")
self.verify_modules_cache()
self.verify_module_version_compatibility()
self.verify_python_requires()
def verify_python_environment(self) -> None:
"""
Verifies the integrity of the loaded project with respect to the Python environment, over which the project has no
direct control.
"""
self.verify_modules_cache()
self.verify_python_requires()
def verify_modules_cache(self) -> None:
if not self._modules_cache_is_valid():
raise CompilerException(
"Not all modules were loaded correctly as a result of transitive dependencies. A recompile should load them"
" correctly."
)
def verify_module_version_compatibility(self) -> None:
"""
Check if all the required modules for this module have been loaded. Assumes the modules cache is valid and up to date.
:raises CompilerException: When one or more of the requirements of the project is not satisfied.
"""
requirements: dict[str, list[InmantaModuleRequirement]] = self.collect_requirements()
exc_message = ""
for name, spec in requirements.items():
if name not in self.modules:
# the module is in the project requirements but it is not part of the loaded AST so there is no need to verify
# its compatibility
LOGGER.warning("Module %s is present in requires but it is not used by the model.", name)
continue
module = self.modules[name]
current_version = Version(version=str(module.version))
for r in spec:
if current_version not in r:
exc_message += f"\n\t* requirement {r} on module {name} not fulfilled, now at version {current_version}."
if exc_message:
exc_message = f"The following requirements were not satisfied:{exc_message}"
if self.metadata.install_mode == InstallMode.master:
exc_message += (
"\nThe release type of the project is set to 'master'. Set it to a value that is "
"appropriate for the version constraint or remove the version constraint. After that, "
"run `inmanta project update` to resolve this issue."
)
else:
exc_message += "\nRun `inmanta project update` to resolve this."
raise CompilerException(exc_message)
def verify_python_requires(self) -> None:
"""
Verifies no incompatibilities exist within the Python environment with respect to installed module v2 requirements.
"""
if self.strict_deps_check:
constraints: list[inmanta.util.CanonicalRequirement] = inmanta.util.parse_requirements(
self.collect_python_requirements()
)
env.process_env.check(strict_scope=re.compile(f"{ModuleV2.PKG_NAME_PREFIX}.*"), constraints=constraints)
else:
if not env.process_env.check_legacy(in_scope=re.compile(f"{ModuleV2.PKG_NAME_PREFIX}.*")):
raise CompilerException(
"Not all installed modules are compatible: requirements conflicts were found. Please resolve any conflicts"
" before attempting another compile. Run `pip check` to check for any incompatibilities."
)
def _modules_cache_is_valid(self) -> bool:
"""
Verify the modules cache after changes have been made to the Python environment. Returns False if any modules
somehow got installed as another generation or with another version as the one that has been loaded into the AST.
When this situation occurs, the compiler state is invalid and the compile needs to either abort or attempt recovery.
The modules cache, from which the AST was loaded, is out of date, therefore at least a partial AST regeneration
would be required to recover.
Scenario's that could trigger this state:
1.
- latest v2 mod a is installed
- some v1 mod depends on v2 mod b, which depends on a<2
- during loading, after a has been loaded, mod b is installed
- Python downgrades transitive dependency a to a<2
2.
- latest v2 mod a is installed
- some v1 (or even v2 when in install mode) mod depends on a<2
- after loading, during plugin requirements install, `pip install a<2` is run
- Python downgrades direct dependency a to a<2
In both cases, a<2 might be a valid version, but since it was installed transitively after the compiler has loaded
module a, steps would need to be taken to take this change into account.
"""
result: bool = True
for name, module in self.modules.items():
installed: Optional[ModuleV2] = self.module_source.get_installed_module(self, name)
if installed is None:
if module.GENERATION == ModuleGeneration.V1:
# Loaded module as V1 and no installed V2 module found: no issues with this module
continue
raise CompilerException(
f"Invalid state: compiler has loaded module {name} as v2 but it is nowhere to be found."
)
else:
if module.GENERATION == ModuleGeneration.V1:
LOGGER.warning(
"Compiler has loaded module %s as v1 but it has later been installed as v2 as a side effect.", name
)
result = False
elif installed.version != module.version:
LOGGER.warning(
"Compiler has loaded module %s==%s but %s==%s has later been installed as a side effect.",
name,
module.version,
name,
installed.version,
)
result = False
return result
def is_using_virtual_env(self) -> bool:
return self.virtualenv.is_using_virtual_env()
def use_virtual_env(self) -> None:
"""
Use the virtual environment. This activates the environment for the current process.
"""
self.virtualenv.use_virtual_env()
def sorted_modules(self) -> list["Module"]:
"""
Return a list of all modules, sorted on their name
"""
names = list(self.modules.keys())
names = sorted(names)
mod_list = []
for name in names:
mod_list.append(self.modules[name])
return mod_list
def log_installed_modules(self) -> None:
"""
Log the name, version and generation (v1 or v2) of all installed modules.
"""
LOGGER.info("The following modules are currently installed:")
sorted_modules: list["Module"] = self.sorted_modules()
def get_modules_with_gen(gen: ModuleGeneration) -> Sequence["Module"]:
return list(filter(lambda mod: mod.GENERATION == gen, sorted_modules))
v1_modules: Sequence["ModuleV1"] = cast(list["ModuleV1"], get_modules_with_gen(ModuleGeneration.V1))
v2_modules: Sequence["ModuleV2"] = cast(list["ModuleV2"], get_modules_with_gen(ModuleGeneration.V2))
if v2_modules:
LOGGER.info("V2 modules:")
for v2_mod in v2_modules:
path = f" ({v2_mod.path})" if v2_mod._is_editable_install else ""
LOGGER.info(f" {v2_mod.name}: {v2_mod.version}{path}")
if v1_modules:
LOGGER.info("V1 modules:")
for v1_mod in v1_modules:
LOGGER.info(f" {v1_mod.name}: {v1_mod.version}")
def add_module_requirement_persistent(self, requirement: InmantaModuleRequirement, add_as_v1_module: bool) -> None:
# Add requirement to metadata file
if add_as_v1_module:
self.add_module_requirement_to_requires_and_write(requirement)
# Refresh in-memory metadata
with open(self.get_metadata_file_path(), encoding="utf-8") as fd:
self._metadata = ProjectMetadata.parse(fd)
# Update requirements.txt file
requirements_txt_file_path = os.path.join(self._path, "requirements.txt")
if not add_as_v1_module:
requirements_txt_file = RequirementsTxtFile(requirements_txt_file_path, create_file_if_not_exists=True)
requirements_txt_file.set_requirement_and_write(requirement.get_python_package_requirement())
elif os.path.exists(requirements_txt_file_path):
requirements_txt_file = RequirementsTxtFile(requirements_txt_file_path)
requirements_txt_file.remove_requirement_and_write(requirement.get_python_package_requirement().name)
def get_module_requirements(self) -> list[str]:
return [*self.metadata.requires, *(str(req) for req in self.get_module_v2_requirements())]
def requires(self) -> "List[InmantaModuleRequirement]":
"""
Get the requires for this project
"""
# filter on import stmt
reqs = []
for spec in self._metadata.requires:
req = [inmanta.util.parse_requirement(requirement=spec)]
if len(req) > 1:
print(f"Module file for {self._path} has bad line in requirements specification {spec}")
reqe = InmantaModuleRequirement(req[0])
reqs.append(reqe)
return [*reqs, *self.get_module_v2_requirements()]
def collect_requirements(self) -> "Dict[str, List[InmantaModuleRequirement]]":
"""
Collect the list of all module requirements of all modules in the project.
"""
specs: dict[str, list[InmantaModuleRequirement]] = {}
merge_specs(specs, self.requires())
for module in self.modules.values():
reqs = module.requires()
merge_specs(specs, reqs)
return specs
def collect_imported_requirements(self) -> "Dict[str, List[InmantaModuleRequirement]]":
imports = {x.name.split("::")[0] for x in self.get_complete_ast()[0] if isinstance(x, DefineImport)}
if self.autostd:
imports.add("std")
specs: dict[str, list[InmantaModuleRequirement]] = self.collect_requirements()
def get_spec(name: str) -> "List[InmantaModuleRequirement]":
if name in specs:
return specs[name]
return [InmantaModuleRequirement.parse(name)]
return {name: get_spec(name) for name in imports}
def collect_python_requirements(self) -> Sequence[str]:
"""
Collect the list of all python requirements of all modules in this project, excluding those on inmanta modules.
"""
reqs = chain(
chain.from_iterable([mod.get_strict_python_requirements_as_list() for mod in self.modules.values()]),
self.get_strict_python_requirements_as_list(),
)
return list(set(reqs))
def get_root_namespace(self) -> Namespace:
return self.root_ns
def get_freeze(self, mode: str = "==", recursive: bool = False) -> dict[str, str]:
# collect in scope modules
if not recursive:
modules = {m.name: m for m in (self.get_module(imp.name, allow_v1=True) for imp in self.get_imports())}
else:
modules = self.get_modules()
out = {}
for name, mod in modules.items():
version = str(mod.version)
out[name] = mode + " " + version
return out
def get_all_python_requirements_as_list(self) -> list[str]:
return self._get_requirements_txt_as_list()
def module_v2_source_configured(self) -> bool:
"""
Returns True iff this project has one or more module v2 sources configured.
"""
return any(True for repo in self._metadata.repo if repo.type == ModuleRepoType.package)
@stable_api
class DummyProject(Project):
"""Placeholder project that does nothing"""
def __init__(self, autostd: bool = True) -> None:
super().__init__(tempfile.gettempdir(), autostd=autostd, attach_cf_cache=False)
def _get_metadata_from_disk(self) -> ProjectMetadata:
return ProjectMetadata(name="DUMMY")
@stable_api
class ModuleGeneration(Enum):
"""
The generation of a module. This might affect the on-disk structure of a module as well as how it's distributed.
"""
V1 = 1
V2 = 2
[docs]
@stable_api
class Module(ModuleLike[TModuleMetadata], ABC):
"""
This class models an inmanta configuration module
"""
MODEL_DIR = "model"
MODULE_FILE: str
GENERATION: ModuleGeneration
def __init__(self, project: Optional[Project], path: str) -> None:
"""
Create a new configuration module
:param project: A reference to the project this module belongs to.
:param path: Where is the module stored
"""
if not os.path.exists(path):
raise InvalidModuleException(f"Directory {path} doesn't exist")
super().__init__(path)
if self.metadata.deprecated:
warnings.warn(ModuleDeprecationWarning(f"Module {self.name} has been deprecated"))
self._project: Optional[Project] = project
self.ensure_versioned()
self.model_dir = os.path.join(self.path, Module.MODEL_DIR)
self._ast_cache: dict[str, tuple[list[Statement], BasicBlock]] = {} # Cache for expensive method calls
self._import_cache: dict[str, list[DefineImport]] = {} # Cache for expensive method calls
self._dir_cache: Dict[str, list[str]] = {} # Cache containing all the filepaths present in a dir
self._plugin_file_cache: Optional[list[tuple[Path, ModuleName]]] = None
[docs]
@classmethod
@abstractmethod
def from_path(cls, path: str) -> Optional["Module"]:
subs: tuple[type[Module], ...] = (ModuleV1, ModuleV2)
for sub in subs:
instance: Optional[Module] = sub.from_path(path)
if instance is not None:
return instance
return None
def requires(self) -> "List[InmantaModuleRequirement]":
"""
Return all requirements this module has to other modules as a list of requirements.
"""
reqs = []
for spec in self.get_module_requirements():
req = [inmanta.util.parse_requirement(requirement=spec)]
if len(req) > 1:
print(f"Module file for {self._path} has bad line in requirements specification {spec}")
reqe = InmantaModuleRequirement(req[0])
reqs.append(reqe)
return reqs
@classmethod
def get_module_dir(cls, module_subdirectory: str) -> str:
"""
Find the top level module from the given subdirectory of a module.
"""
try:
return cls.get_first_directory_containing_file(module_subdirectory, cls.MODULE_FILE)
except FileNotFoundError:
raise InvalidModuleException(f"Directory {module_subdirectory} is not part of a valid {cls.GENERATION.name} module")
def rewrite_version(self, new_version: str, version_tag: str = "") -> None:
new_version = str(new_version) # make sure it is a string!
with open(self.get_metadata_file_path(), encoding="utf-8") as fd:
module_def = fd.read()
new_module_def, new_metadata = self.get_metadata_file_schema_type().rewrite_version(
module_def, new_version, version_tag
)
with open(self.get_metadata_file_path(), "w+", encoding="utf-8") as fd:
fd.write(new_module_def)
self._metadata = new_metadata
def get_version(self) -> packaging.version.Version:
"""
Return the version of this module. This is the actually installed version, which might differ from the version declared
in its metadata (e.g. by a .dev0 tag).
"""
return packaging.version.Version(self._metadata.version)
version = property(get_version)
def ensure_versioned(self) -> None:
"""
Check if this module is versioned using Git. If not a warning is logged.
"""
if not os.path.exists(os.path.join(self.path, ".git")):
LOGGER.warning("Module %s is not version controlled, we recommend you do this as soon as possible.", self.name)
def get_ast(self, name: str) -> tuple[list[Statement], BasicBlock]:
if self._project is None:
raise ValueError("Can only get module's AST in the context of a project.")
# Check local cache
hit = self._ast_cache.get(name, None)
if hit is not None:
return hit
if name == self.name:
file = os.path.join(self.model_dir, "_init.cf")
else:
parts = name.split("::")
parts = parts[1:]
if os.path.isdir(os.path.join(self.model_dir, *parts)):
path_elements = [self.model_dir] + parts + ["_init.cf"]
else:
path_elements = [self.model_dir] + parts[:-1] + [parts[-1] + ".cf"]
file = os.path.join(*path_elements)
ns = self._project.get_root_namespace().get_ns_or_create(name)
try:
out = self._load_file(ns, file)
# Set local cache before returning
self._ast_cache[name] = out
return out
except FileNotFoundError as e:
raise InvalidModuleException("could not locate module with name: %s" % name) from e
def get_freeze(self, submodule: str, recursive: bool = False, mode: str = ">=") -> dict[str, str]:
if self._project is None:
raise ValueError("Can only get module's freeze in the context of a project.")
imports = [statement.name for statement in self.get_imports(submodule)]
out: dict[str, str] = {}
todo: list[str] = imports
for impor in todo:
if impor not in out:
v1_mode: bool = self.GENERATION == ModuleGeneration.V1
mainmod = self._project.get_module(impor, install_v1=v1_mode, allow_v1=v1_mode)
vers: packaging.version.Version = mainmod.version
# track submodules for cycle avoidance
out[impor] = mode + " " + str(vers)
if recursive:
todo.extend([statement.name for statement in mainmod.get_imports(impor)])
# drop submodules
return {x: v for x, v in out.items() if "::" not in x}
def get_imports(self, name: str) -> list[DefineImport]:
# Check local cache
hit = self._import_cache.get(name, None)
if hit is not None:
return hit
if self._project is None:
raise ValueError("Can only get module's imports in the context of a project.")
(statements, block) = self.get_ast(name)
imports = [x for x in statements if isinstance(x, DefineImport)]
if self.name != "std" and self._project.autostd:
std_locatable = LocatableString("std", Range("__internal__", 1, 1, 1, 1), -1, block.namespace)
imp = DefineImport(std_locatable, std_locatable)
imp.location = std_locatable.location
imports.insert(0, imp)
# Set local cache before returning
self._import_cache[name] = imports
return imports
def _get_model_files(self, curdir: str) -> list[str]:
files: list[str] = []
init_cf = os.path.join(curdir, "_init.cf")
if not os.path.exists(init_cf):
return files
for entry in os.listdir(curdir):
entry = os.path.join(curdir, entry)
if os.path.isdir(entry):
files.extend(self._get_model_files(entry))
elif entry[-3:] == ".cf":
files.append(entry)
return files
def get_all_submodules(self) -> list[str]:
"""
Get all submodules of this module
"""
modules = []
files = self._get_model_files(self.model_dir)
for f in files:
name = f[len(self.model_dir) + 1 : -3]
parts = name.split("/")
if parts[-1] == "_init":
parts = parts[:-1]
parts.insert(0, self.name)
name = "::".join(parts)
modules.append(name)
return modules
@abstractmethod
def get_plugin_dir(self) -> Optional[str]:
"""
Return directory containing the python files which define handlers and plugins.
If no such directory is defined, this method returns None.
"""
raise NotImplementedError()
def _list_python_files(self, plugin_dir: str) -> list[str]:
"""
Generate a list of all Python files in the given plugin directory.
This method prioritizes .pyc files over .py files, uses caching to avoid duplicate directory walks,
includes namespace packages and excludes the `model`, `files` and `templates` directories.
"""
# Return cached results if this directory has been processed before
if plugin_dir in self._dir_cache:
return self._dir_cache[plugin_dir]
files: dict[str, str] = {}
model_dir_path: str = os.path.join(plugin_dir, "model")
files_dir_path: str = os.path.join(plugin_dir, "files")
templates_dir_path: str = os.path.join(plugin_dir, "templates")
for dirpath, dirnames, filenames in os.walk(plugin_dir, topdown=True):
# Modify dirnames in-place to stop os.walk from descending into any more subdirectories of the model directory
if (
dirpath.startswith(model_dir_path)
or dirpath.startswith(files_dir_path)
or dirpath.startswith(templates_dir_path)
):
dirnames[:] = []
continue
# Skip this directory if it's already in the cache
if dirpath in self._dir_cache:
cached_files = self._dir_cache[dirpath]
for file in cached_files:
base_file_path = os.path.splitext(file)[0]
files[base_file_path] = file
continue
current_path_files = []
for filename in filenames:
file_path = os.path.join(dirpath, filename)
# Skip files in the default cache directory
if "__pycache__" in file_path:
continue
base_file_path = os.path.splitext(file_path)[0]
# Prioritize .pyc files over .py files
if file_path.endswith(".pyc"):
files[base_file_path] = file_path
current_path_files.append(file_path)
elif file_path.endswith(".py") and base_file_path not in files:
files[base_file_path] = file_path
current_path_files.append(file_path)
# Update the cache with files found in the current directory
self._dir_cache[dirpath] = current_path_files
# Cache the final list of files for the root directory and return it
self._dir_cache[plugin_dir] = list(files.values())
return self._dir_cache[plugin_dir]
[docs]
def get_plugin_files(self) -> Iterator[tuple[Path, ModuleName]]:
"""
Returns a tuple (absolute_path, fq_mod_name) of all python files in this module.
"""
if self._plugin_file_cache is not None:
return iter(self._plugin_file_cache)
plugin_dir: Optional[str] = self.get_plugin_dir()
if plugin_dir is None:
return iter(())
if not os.path.exists(os.path.join(plugin_dir, "__init__.py")) and not os.path.exists(
os.path.join(plugin_dir, "__init__.pyc")
):
raise InvalidModuleException(f"Directory {plugin_dir} should be a valid python package with a __init__.py file")
self._plugin_file_cache = [
(
Path(file_name),
ModuleName(self._get_fq_mod_name_for_py_file(file_name, plugin_dir, self.name)),
)
for file_name in self._list_python_files(plugin_dir)
]
return iter(self._plugin_file_cache)
def load_plugins(self) -> None:
"""
Load all plug-ins from a configuration module
"""
for path_to_file, fq_mod_name in self.get_plugin_files():
LOGGER.log(const.LOG_LEVEL_TRACE, "Loading Python module %s", fq_mod_name)
try:
importlib.import_module(fq_mod_name)
except Exception as e:
tb: Optional[types.TracebackType] = sys.exc_info()[2]
stack: traceback.StackSummary = traceback.extract_tb(tb)
lineno: Optional[int] = more_itertools.first(
(frame.lineno for frame in reversed(stack) if frame.filename == path_to_file), None
)
raise PluginModuleLoadException(e, self.name, fq_mod_name, path_to_file, lineno).to_compiler_exception()
# This method is not part of core's stable API but it is currently used by pytest-inmanta (inmanta/pytest-inmanta#76)
def _get_fq_mod_name_for_py_file(self, py_file: str, plugin_dir: str, mod_name: str) -> str:
"""
Returns the fully qualified Python module name for an inmanta module.
:param py_file: The Python file for the module, relative to the plugin directory.
:param plugin_dir: The plugin directory relative to the inmanta module's root directory.
:param mod_name: The top-level name of this module.
"""
rel_py_file = os.path.relpath(py_file, start=plugin_dir)
return loader.convert_relative_path_to_module(os.path.join(mod_name, loader.PLUGIN_DIR, rel_py_file))
[docs]
def unload(self) -> None:
"""
Unloads this module instance from the project, the registered plugins and the loaded Python modules.
"""
loader.unload_inmanta_plugins(self.name)
plugins.PluginMeta.clear(self.name)
if self._project is not None:
self._project.invalidate_state(self.name)
[docs]
@stable_api
class ModuleV1(Module[ModuleV1Metadata], ModuleLikeWithYmlMetadataFile):
MODULE_FILE = "module.yml"
GENERATION = ModuleGeneration.V1
def __init__(self, project: Optional[Project], path: str):
try:
super().__init__(project, path)
except InvalidMetadata as e:
raise InvalidModuleException(f"The module found at {path} is not a valid V1 module") from e
except ModuleMetadataFileNotFound:
if os.path.exists(os.path.join(path, ModuleV2.MODULE_FILE)):
raise ModuleV2InV1PathException(
project=project,
module=ModuleV2(project, path),
msg=f"Module at {path} looks like a v2 module. Please have a look at the documentation on how to use v2"
" modules.",
)
raise
# Only show the warning when we are not running tests. Especially on jenkins the directory of the module often does not
# have the correct name.
if self.name != os.path.basename(self._path) and not RUNNING_TESTS:
LOGGER.warning(
"The name in the module file (%s) does not match the directory name (%s)",
self.name,
os.path.basename(self._path),
)
[docs]
@classmethod
def from_path(cls: type[TModule], path: str) -> Optional[TModule]:
return cls(project=None, path=path) if os.path.exists(os.path.join(path, cls.MODULE_FILE)) else None
def get_metadata_file_path(self) -> str:
return os.path.join(self.path, self.MODULE_FILE)
@classmethod
def get_name_from_metadata(cls, metadata: ModuleV1Metadata) -> str:
return metadata.name
@property
def compiler_version(self) -> Optional[str]:
"""
Get the minimal compiler version required for this module version. Returns none is the compiler version is not
constrained.
"""
return str(self._metadata.compiler_version)
def get_all_requires(self) -> list[InmantaModuleRequirement]:
"""
:return: all modules required by an import from any sub-modules, with all constraints applied
"""
# get all constraints
spec: dict[str, InmantaModuleRequirement] = {req.name: req for req in self.requires()}
# find all imports
imports = {imp.name.split("::")[0] for subm in sorted(self.get_all_submodules()) for imp in self.get_imports(subm)}
return [spec[r] if spec.get(r) else InmantaModuleRequirement.parse(r) for r in imports]
@classmethod
def update(
cls,
project: Project,
modulename: str,
requirements: Iterable[InmantaModuleRequirement],
path: Optional[str] = None,
fetch: bool = True,
install_mode: InstallMode = InstallMode.release,
) -> "ModuleV1":
"""
Update a module, return module object
"""
if path is None:
mypath = project.module_source_v1.path_for(modulename)
assert mypath is not None, f"trying to update module {modulename} not found on disk "
else:
mypath = path
if fetch:
LOGGER.info("Performing fetch on %s", mypath)
gitprovider.fetch(mypath)
if install_mode == InstallMode.master:
LOGGER.info("Checking out master on %s", mypath)
gitprovider.checkout_tag(mypath, "master")
if fetch:
LOGGER.info("Pulling master on %s", mypath)
gitprovider.pull(mypath)
else:
release_only = install_mode == InstallMode.release
version = cls.get_suitable_version_for(modulename, requirements, mypath, release_only=release_only)
if version is None:
print("no suitable version found for module %s" % modulename)
else:
LOGGER.info("Checking out %s on %s", str(version), mypath)
gitprovider.checkout_tag(mypath, str(version))
return cls(project, mypath)
@classmethod
def get_suitable_version_for(
cls, modulename: str, requirements: Iterable[InmantaModuleRequirement], path: str, release_only: bool = True
) -> Optional[packaging.version.Version]:
versions_str = gitprovider.get_all_tags(path)
def try_parse(x: str) -> Optional[packaging.version.Version]:
try:
return Version(version=x)
except Exception:
return None
versions: list[packaging.version.Version] = [x for x in [try_parse(v) for v in versions_str] if x is not None]
versions = sorted(versions, reverse=True)
for r in requirements:
versions = [x for x in r.specifier.filter(iterable=versions, prereleases=not release_only)]
comp_version_raw = get_compiler_version()
comp_version = Version(version=comp_version_raw)
return cls.__best_for_compiler_version(modulename, versions, path, comp_version)
@classmethod
def __best_for_compiler_version(
cls, modulename: str, versions: list[packaging.version.Version], path: str, comp_version: packaging.version.Version
) -> Optional[packaging.version.Version]:
def get_cv_for(best: packaging.version.Version) -> Optional[packaging.version.Version]:
cfg_text: str = gitprovider.get_file_for_version(path, str(best), cls.MODULE_FILE)
metadata: ModuleV1Metadata = cls.get_metadata_file_schema_type().parse(cfg_text)
if metadata.compiler_version is None:
return None
v = metadata.compiler_version
if isinstance(v, (int, float)):
v = str(v)
return Version(version=v)
if not versions:
return None
best = versions[0]
atleast = get_cv_for(best)
if atleast is None or comp_version >= atleast:
return best
# binary search
hi = len(versions)
lo = 1
while lo < hi:
mid = (lo + hi) // 2
atleast = get_cv_for(versions[mid])
if atleast is not None and atleast > comp_version:
lo = mid + 1
else:
hi = mid
if hi == len(versions):
LOGGER.warning("Could not find version of module %s suitable for this compiler, try a newer compiler" % modulename)
return None
return versions[lo]
@classmethod
def get_metadata_file_schema_type(cls) -> type[ModuleV1Metadata]:
return ModuleV1Metadata
def get_plugin_dir(self) -> Optional[str]:
plugins_dir = os.path.join(self._path, loader.PLUGIN_DIR)
if not os.path.exists(plugins_dir):
return None
return plugins_dir
def get_all_python_requirements_as_list(self) -> list[str]:
return self._get_requirements_txt_as_list()
def get_module_requirements(self) -> list[str]:
return [*self.metadata.requires, *(str(req) for req in self.get_module_v2_requirements())]
def add_module_requirement_persistent(self, requirement: InmantaModuleRequirement, add_as_v1_module: bool) -> None:
requirements_txt_file_path = os.path.join(self._path, "requirements.txt")
if add_as_v1_module:
# Add requirement to module.yml file
self.add_module_requirement_to_requires_and_write(requirement)
# Refresh in-memory metadata
with open(self.get_metadata_file_path(), encoding="utf-8") as fd:
self._metadata = ModuleV1Metadata.parse(fd)
# Remove requirement from requirements.txt file
if os.path.exists(requirements_txt_file_path):
requirements_txt_file = RequirementsTxtFile(requirements_txt_file_path)
requirements_txt_file.remove_requirement_and_write(requirement.get_python_package_requirement().name)
else:
# Add requirement to requirements.txt
requirements_txt_file = RequirementsTxtFile(requirements_txt_file_path, create_file_if_not_exists=True)
requirements_txt_file.set_requirement_and_write(requirement.get_python_package_requirement())
# Remove requirement from module.yml file
self.remove_module_requirement_from_requires_and_write(requirement.name)
def versions(self) -> list[packaging.version.Version]:
"""
Provide a list of all versions available in the repository
"""
versions_str: list[str] = gitprovider.get_all_tags(self._path)
def try_parse(x: str) -> Optional[packaging.version.Version]:
try:
return Version(version=x)
except Exception:
return None
versions = [x for x in [try_parse(v) for v in versions_str] if x is not None]
versions = sorted(versions, reverse=True)
return versions
def status(self) -> None:
"""
Run a git status on this module
"""
try:
output = gitprovider.status(self._path)
files = [x.strip() for x in output.split("\n") if x != ""]
if len(files) > 0:
print(f"Module {self.name} ({self._path})")
for f in files:
print("\t%s" % f)
print()
else:
print(f"Module {self.name} ({self._path}) has no changes")
except Exception:
print("Failed to get status of module")
LOGGER.exception("Failed to get status of module %s")
def push(self) -> None:
"""
Run a git push on this module
"""
sys.stdout.write(f"{self.name} ({self._path}) ")
sys.stdout.flush()
try:
print(gitprovider.push(self._path))
except CalledProcessError:
print("Cloud not push module %s" % self.name)
else:
print("done")
print()
[docs]
@stable_api
class ModuleV2(Module[ModuleV2Metadata]):
MODULE_FILE = "setup.cfg"
GENERATION = ModuleGeneration.V2
PKG_NAME_PREFIX = const.MODULE_PKG_NAME_PREFIX
def __init__(
self,
project: Optional[Project],
path: str,
is_editable_install: bool = False,
installed_version: Optional[packaging.version.Version] = None,
) -> None:
self._is_editable_install = is_editable_install
self._version: Optional[packaging.version.Version] = installed_version
super().__init__(project, path)
if not os.path.exists(os.path.join(self.model_dir, "_init.cf")):
raise InvalidModuleException(
f"The module at {path} contains no _init.cf file. This occurs when you install or build modules from source"
" incorrectly. Always use the `inmanta module build` command followed by `pip install ./dist/<dist-package>` "
"to respectively build a module from source and install the distribution package. "
"Make sure to uninstall the broken package first."
)
[docs]
@classmethod
def from_path(cls: type[TModule], path: str) -> Optional[TModule]:
try:
return cls(project=None, path=path) if os.path.exists(os.path.join(path, cls.MODULE_FILE)) else None
except InvalidModuleException:
# setup.cfg is a generic Python config file: if the metadata does not match an inmanta module's, return None
return None
def get_version(self) -> packaging.version.Version:
return self._version if self._version is not None else self._metadata.get_full_version()
version = property(get_version)
[docs]
def is_editable(self) -> bool:
"""
Returns True iff this module has been installed in editable mode.
"""
return self._is_editable_install
def ensure_versioned(self) -> None:
if self._is_editable_install:
super().ensure_versioned()
else:
# Only editable installs can be checked for versioning
pass
def get_metadata_file_path(self) -> str:
return os.path.join(self.path, ModuleV2.MODULE_FILE)
@classmethod
def get_name_from_metadata(cls, metadata: ModuleV2Metadata) -> str:
return metadata.name[len(cls.PKG_NAME_PREFIX) :].replace("-", "_")
@classmethod
def get_metadata_file_schema_type(cls) -> type[ModuleV2Metadata]:
return ModuleV2Metadata
def get_plugin_dir(self) -> str:
if self._is_editable_install:
return os.path.join(self.path, const.PLUGINS_PACKAGE, self.name)
else:
return self.path
def get_all_python_requirements_as_list(self) -> list[str]:
return list(self.metadata.install_requires)
def get_module_requirements(self) -> list[str]:
return [str(req) for req in self.get_module_v2_requirements()]
def add_module_requirement_persistent(self, requirement: InmantaModuleRequirement, add_as_v1_module: bool) -> None:
if add_as_v1_module:
raise Exception("Cannot add V1 requirement to a V2 module")
# Parse config file
config_parser = ConfigParser()
config_parser.read(self.get_metadata_file_path())
python_pkg_requirement: inmanta.util.CanonicalRequirement = requirement.get_python_package_requirement()
if config_parser.has_option("options", "install_requires"):
new_install_requires = [
r
for r in config_parser.get("options", "install_requires").split("\n")
if r and inmanta.util.parse_requirement(requirement=r).name != python_pkg_requirement.name
]
new_install_requires.append(str(python_pkg_requirement))
else:
new_install_requires = [str(python_pkg_requirement)]
config_parser.set("options", "install_requires", "\n".join(new_install_requires))
# Write config back to disk
with open(self.get_metadata_file_path(), "w", encoding="utf-8") as fd:
config_parser.write(fd)
# Reload in-memory state
with open(self.get_metadata_file_path(), encoding="utf-8") as fd:
self._metadata = ModuleV2Metadata.parse(fd)