Source code for inmanta.env

"""
    Copyright 2021 Inmanta

    Licensed under the Apache License, Version 2.0 (the "License");
    you may not use this file except in compliance with the License.
    You may obtain a copy of the License at

        http://www.apache.org/licenses/LICENSE-2.0

    Unless required by applicable law or agreed to in writing, software
    distributed under the License is distributed on an "AS IS" BASIS,
    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and
    limitations under the License.

    Contact: code@inmanta.com
"""
import contextlib
import enum
import functools
import importlib.util
import json
import logging
import os
import re
import site
import subprocess
import sys
import tempfile
import venv
from dataclasses import dataclass
from importlib.abc import Loader
from importlib.machinery import ModuleSpec
from itertools import chain
from subprocess import CalledProcessError
from typing import Any, Dict, Iterator, List, Optional, Pattern, Sequence, Set, Tuple, TypeVar

import pkg_resources
from pkg_resources import DistInfoDistribution, Requirement

from inmanta import const
from inmanta.stable_api import stable_api
from packaging import version

try:
    from typing import TYPE_CHECKING
except ImportError:
    TYPE_CHECKING = False

if TYPE_CHECKING:
    from packaging.requirements import InvalidRequirement
else:
    from pkg_resources.extern.packaging.requirements import InvalidRequirement

LOGGER = logging.getLogger(__name__)


class PackageNotFound(Exception):
    pass


class PythonWorkingSet:
    @classmethod
    def get_packages_in_working_set(cls) -> Dict[str, version.Version]:
        """
        Return all packages present in `pkg_resources.working_set` together with the version of the package.
        """
        return {dist_info.key: version.Version(dist_info.version) for dist_info in pkg_resources.working_set}

    @classmethod
    def rebuild_working_set(cls) -> None:
        pkg_resources.working_set = pkg_resources.WorkingSet._build_master()


@dataclass
class LocalPackagePath:
    path: str
    editable: bool = False


class PipListFormat(enum.Enum):
    """
    The different output formats that can be passed to the `pip list` command.
    """

    columns = "columns"
    freeze = "freeze"
    json = "json"


class PipUpgradeStrategy(enum.Enum):
    """
    The upgrade strategy used by pip (`--upgrade-strategy` option). Determines upgrade behavior for dependencies of packages to
    upgrade.
    """

    EAGER = "eager"
    ONLY_IF_NEEDED = "only-if-needed"


class PipCommandBuilder:
    """
    Class used to compose pip commands.
    """

    @classmethod
    def compose_install_command(
        cls,
        python_path: str,
        requirements: Optional[List[Requirement]] = None,
        paths: Optional[List[LocalPackagePath]] = None,
        index_urls: Optional[List[str]] = None,
        upgrade: bool = False,
        upgrade_strategy: PipUpgradeStrategy = PipUpgradeStrategy.ONLY_IF_NEEDED,
        allow_pre_releases: bool = False,
        constraints_files: Optional[List[str]] = None,
        requirements_files: Optional[List[str]] = None,
    ) -> List[str]:
        """
        Generate `pip install` command from the given arguments.

        :param python_path: The python interpreter to use in the command
        :param requirements: The requirements that should be installed
        :param paths: Paths to python projects on disk that should be installed in the venv.
        :param index_urls: The Python package repositories to use. When set to None, the system default will be used.
        :param upgrade: Upgrade the specified packages to the latest version.
        :param upgrade_strategy: The upgrade strategy to use for requirements' dependencies.
        :param allow_pre_releases: Allow the installation of packages with pre-releases and development versions.
        :param constraints_files: Files that should be passed to pip using the `-c` option.
        :param requirements_files: Files that should be passed to pip using the `-r` option.
        """
        requirements = requirements if requirements is not None else []
        paths = paths if paths is not None else []
        local_paths: Iterator[LocalPackagePath] = (
            # make sure we only try to install from a local source: add leading `./` and trailing `/` to explicitly tell pip
            # we're pointing to a local directory.
            LocalPackagePath(path=os.path.join(".", path.path, ""), editable=path.editable)
            for path in paths
        )
        index_args: List[str] = (
            []
            if index_urls is None
            else ["--index-url", index_urls[0], *chain.from_iterable(["--extra-index-url", url] for url in index_urls[1:])]
            if index_urls
            else ["--no-index"]
        )
        constraints_files = constraints_files if constraints_files is not None else []
        requirements_files = requirements_files if requirements_files is not None else []
        return [
            python_path,
            "-m",
            "pip",
            "install",
            *(["--upgrade", "--upgrade-strategy", upgrade_strategy.value] if upgrade else []),
            *(["--pre"] if allow_pre_releases else []),
            *chain.from_iterable(["-c", f] for f in constraints_files),
            *chain.from_iterable(["-r", f] for f in requirements_files),
            *(str(requirement) for requirement in requirements),
            *chain.from_iterable(["-e", path.path] if path.editable else [path.path] for path in local_paths),
            *index_args,
        ]

    @classmethod
    def compose_list_command(
        cls, python_path: str, format: Optional[PipListFormat] = None, only_editable: bool = False
    ) -> List[str]:
        """
        Generate a `pip list` command for the given arguments.

        :param python_path: The python interpreter to use in the command.
        :param format: The output format to use.
        :param only_editable: Whether the output should only contain project installed in editable mode.
        """
        return [
            python_path,
            "-m",
            "pip",
            "list",
            *(["--format", format.value] if format else []),
            *(["--editable"] if only_editable else []),
        ]


class PythonEnvironment:
    """
    A generic Python environment.

    The implementation of this class is based on the invariant that the version of the inmanta-core and the
    Inmanta product packages doesn't change. This call will make sure that the version of these packages is
    never changed.
    """

    def __init__(self, *, env_path: Optional[str] = None, python_path: Optional[str] = None) -> None:
        if (env_path is None) == (python_path is None):
            raise ValueError("Exactly one of `env_path` and `python_path` needs to be specified")
        self.env_path: str
        self.python_path: str
        if env_path is not None:
            self.env_path = env_path
            self.python_path = self.get_python_path_for_env_path(self.env_path)
        else:
            assert python_path is not None
            self.python_path = python_path
            self.env_path = self.get_env_path_for_python_path(self.python_path)
        self.site_packages_dir: str = self.get_site_dir_for_env_path(self.env_path)

    @classmethod
    def get_python_path_for_env_path(cls, env_path: str) -> str:
        """
        For the given venv directory (`env_path`) return the path to the Python interpreter.
        """
        python_name: str = os.path.basename(sys.executable)
        return (
            os.path.join(env_path, "Scripts", python_name)
            if sys.platform == "win32"
            else os.path.join(env_path, "bin", python_name)
        )

    @classmethod
    def get_site_dir_for_env_path(cls, env_path: str) -> str:
        """
        Return the site directory for a given venv directory.
        """
        return (
            os.path.join(env_path, "Lib", "site-packages")
            if sys.platform == "win32"
            else os.path.join(
                env_path, "lib", "python%s" % ".".join(str(digit) for digit in sys.version_info[:2]), "site-packages"
            )
        )

    @classmethod
    def get_env_path_for_python_path(cls, python_path: str) -> str:
        """
        For a given path to a python binary, return the path to the venv directory.
        """
        return os.path.dirname(os.path.dirname(python_path))

    def get_installed_packages(self, only_editable: bool = False) -> Dict[str, version.Version]:
        """
        Return a list of all installed packages in the site-packages of a python interpreter.

        :param only_editable: List only packages installed in editable mode.
        :return: A dict with package names as keys and versions as values
        """
        cmd = PipCommandBuilder.compose_list_command(self.python_path, format=PipListFormat.json, only_editable=only_editable)
        output = self._run_command_and_log_output(cmd, stderr=subprocess.DEVNULL, env=os.environ.copy())
        return {r["name"]: version.Version(r["version"]) for r in json.loads(output)}

    def install_from_index(
        self,
        requirements: List[Requirement],
        index_urls: Optional[List[str]] = None,
        upgrade: bool = False,
        allow_pre_releases: bool = False,
        constraint_files: Optional[List[str]] = None,
    ) -> None:
        if len(requirements) == 0:
            raise Exception("install_from_index requires at least one requirement to install")
        constraint_files = constraint_files if constraint_files is not None else []
        with requirements_txt_file(content=self._get_constraint_on_inmanta_package()) as filename:
            try:
                cmd: List[str] = PipCommandBuilder.compose_install_command(
                    python_path=self.python_path,
                    requirements=requirements,
                    index_urls=index_urls,
                    upgrade=upgrade,
                    allow_pre_releases=allow_pre_releases,
                    constraints_files=[*constraint_files, filename],
                )
                self._run_command_and_log_output(cmd, stderr=subprocess.PIPE)
            except CalledProcessError as e:
                stderr: str = e.stderr.decode()
                not_found: List[str] = [
                    requirement.project_name
                    for requirement in requirements
                    if f"No matching distribution found for {requirement.project_name}" in stderr
                ]
                if not_found:
                    raise PackageNotFound("Packages %s were not found in the given indexes." % ", ".join(not_found))
                raise e

    def install_from_source(self, paths: List[LocalPackagePath], constraint_files: Optional[List[str]] = None) -> None:
        """
        Install one or more packages from source. Any path arguments should be local paths to a package directory or wheel.
        """
        if len(paths) == 0:
            raise Exception("install_from_source requires at least one package to install")
        constraint_files = constraint_files if constraint_files is not None else []
        with requirements_txt_file(content=self._get_constraint_on_inmanta_package()) as filename:
            cmd: List[str] = PipCommandBuilder.compose_install_command(
                python_path=self.python_path, paths=paths, constraints_files=[*constraint_files, filename]
            )
            self._run_command_and_log_output(cmd, stderr=subprocess.PIPE)

    @functools.lru_cache(maxsize=1)
    def _get_constraint_on_inmanta_package(self) -> str:
        """
        Returns the content of the constraint file that should be supplied to each `pip install` invocation
        to make sure that no Inmanta packages gets overridden.
        """
        workingset: Dict[str, version.Version] = PythonWorkingSet.get_packages_in_working_set()
        inmanta_packages = ["inmanta-service-orchestrator", "inmanta", "inmanta-core"]
        for pkg in inmanta_packages:
            if pkg in workingset:
                return f"{pkg}=={workingset[pkg]}"
        # No inmanta product or inmanta-core package installed -> Leave constraint empty
        return ""

    @classmethod
    def _run_command_and_log_output(
        cls, cmd: List[str], env: Optional[Dict[str, str]] = None, stderr: Optional[int] = None
    ) -> str:
        output: bytes = b""  # Make sure the var is always defined in the except bodies
        try:
            output = subprocess.check_output(cmd, stderr=stderr, env=env)
        except CalledProcessError as e:
            if e.stderr:
                msg = e.stderr.decode()
            elif e.output:
                msg = e.output.decode()
            else:
                msg = ""
            LOGGER.error("%s: %s", cmd, msg)
            raise
        except Exception:
            LOGGER.error("%s: %s", cmd, output.decode())
            raise
        else:
            LOGGER.debug("%s: %s", cmd, output.decode())
            return output.decode()


@contextlib.contextmanager
def requirements_txt_file(content: str) -> Iterator[str]:
    with tempfile.NamedTemporaryFile(mode="w", encoding="utf-8", delete=True) as fd:
        fd.write(content)
        fd.flush()
        yield fd.name


req_list = TypeVar("req_list", Sequence[str], Sequence[Requirement])


class ActiveEnv(PythonEnvironment):
    """
    The active Python environment. Method implementations assume this environment is active when they're called.
    Activating another environment that inherits from this one is allowed.
    """

    _egg_fragment_re = re.compile(r"#egg=(?P<name>[^&]*)")
    _at_fragment_re = re.compile(r"^(?P<name>[^@]+)@(?P<req>.+)")

    def __init__(self, *, env_path: Optional[str] = None, python_path: Optional[str] = None) -> None:
        super(ActiveEnv, self).__init__(env_path=env_path, python_path=python_path)

    def is_using_virtual_env(self) -> bool:
        return True

    def use_virtual_env(self) -> None:
        """
        Activate the virtual environment.
        """
        return

    @classmethod
    def _get_as_requirements_type(cls, requirements: req_list) -> Sequence[Requirement]:
        """
        Convert requirements from Union[Sequence[str], Sequence[Requirement]] to Sequence[Requirement]
        """
        if isinstance(requirements[0], str):
            return [Requirement.parse(r) for r in requirements]
        else:
            return requirements

    def are_installed(self, requirements: req_list) -> bool:
        """
        Return True iff the given requirements are installed in this venv.
        """
        if not requirements:
            return True
        reqs_as_requirements: Sequence[Requirement] = self._get_as_requirements_type(requirements)
        installed_packages: Dict[str, version.Version] = PythonWorkingSet.get_packages_in_working_set()
        return all(r.key in installed_packages and str(installed_packages[r.key]) in r for r in reqs_as_requirements)

    def install_from_index(
        self,
        requirements: List[Requirement],
        index_urls: Optional[List[str]] = None,
        upgrade: bool = False,
        allow_pre_releases: bool = False,
        constraint_files: Optional[List[str]] = None,
    ) -> None:
        if not upgrade and self.are_installed(requirements):
            return
        try:
            super(ActiveEnv, self).install_from_index(requirements, index_urls, upgrade, allow_pre_releases, constraint_files)
        finally:
            self.notify_change()

    def install_from_source(self, paths: List[LocalPackagePath], constraint_files: Optional[List[str]] = None) -> None:
        try:
            super().install_from_source(paths, constraint_files)
        finally:
            self.notify_change()

    @classmethod
    def _parse_line(cls, req_line: str) -> Tuple[Optional[str], str]:
        """
        Parse the requirement line
        """
        at = VirtualEnv._at_fragment_re.search(req_line)
        if at is not None:
            d = at.groupdict()
            return d["name"], d["req"] + "#egg=" + d["name"]

        egg = VirtualEnv._egg_fragment_re.search(req_line)
        if egg is not None:
            d = egg.groupdict()
            return d["name"], req_line

        return None, req_line

    @classmethod
    def _gen_content_requirements_file(cls, requirements_list: Sequence[str]) -> str:
        """Generate a new requirements file based on the requirements list.
        :param requirements_list:  A list of Python requirements as strings.
        :return: A string that can be written to a requirements file that pip understands.
        """
        modules: Dict[str, Any] = {}
        for req in requirements_list:
            parsed_name, req_spec = cls._parse_line(req)

            if parsed_name is None:
                name = req
            else:
                name = parsed_name

            url = None
            version = None
            marker = None
            try:
                # this will fail if an url is supplied
                parsed_req = list(pkg_resources.parse_requirements(req_spec))
                if len(parsed_req) > 0:
                    item = parsed_req[0]
                    if hasattr(item, "name"):
                        name = item.name
                    elif hasattr(item, "unsafe_name"):
                        name = item.unsafe_name
                    version = item.specs
                    marker = item.marker
                    if hasattr(item, "url"):
                        url = item.url
            except InvalidRequirement:
                url = req_spec

            if name not in modules:
                modules[name] = {"name": name, "version": [], "markers": []}

            if version is not None:
                modules[name]["version"].extend(version)

            if marker is not None:
                modules[name]["markers"].append(marker)

            if url is not None:
                modules[name]["url"] = url

        requirements_file = ""
        for module, info in modules.items():
            version_spec = ""
            markers: str = ""
            if len(info["version"]) > 0:
                version_spec = " " + (", ".join(["%s %s" % (a, b) for a, b in info["version"]]))

            if len(info["markers"]) > 0:
                markers = " ; " + (" and ".join(map(str, info["markers"])))

            if "url" in info:
                module = info["url"]

            requirements_file += module + version_spec + markers + "\n"

        return requirements_file

    def install_from_list(
        self,
        requirements_list: Sequence[str],
        *,
        upgrade: bool = False,
        upgrade_strategy: PipUpgradeStrategy = PipUpgradeStrategy.ONLY_IF_NEEDED,
    ) -> None:
        """
        Install requirements from a list of requirement strings. This method uses the Python package repositories
        configured on the host.

        :param requirements_list: List of requirement strings to install.
        :param upgrade: Upgrade requirements to the latest compatible version.
        :param upgrade_strategy: The upgrade strategy to use for requirements' dependencies.
        """
        if not upgrade and self.are_installed(requirements_list):
            # don't fork subprocess if requirements are already met
            return
        try:
            self._install_from_list(requirements_list, upgrade=upgrade, upgrade_strategy=upgrade_strategy)
        finally:
            self.notify_change()

    def _install_from_list(
        self,
        requirements_list: Sequence[str],
        *,
        upgrade: bool = False,
        upgrade_strategy: PipUpgradeStrategy = PipUpgradeStrategy.ONLY_IF_NEEDED,
    ) -> None:
        """
        This method differs from the `install_from_index()` method in the sense that it calls
        `_gen_content_requirements_file()`, which rewrites the requirements from pep440 format to a format that pip understands.
        This method is maintained for V1 modules only: V2 modules do not require this conversion. It is currently used for both
        v1 and v2 for consistency but it can be substituted by `install_from_index` once V1 support is removed.
        """
        content_requirements_file = self._gen_content_requirements_file(requirements_list)
        with requirements_txt_file(content=content_requirements_file) as requirements_file:
            with requirements_txt_file(content=self._get_constraint_on_inmanta_package()) as constraint_file:
                cmd: List[str] = PipCommandBuilder.compose_install_command(
                    python_path=self.python_path,
                    requirements_files=[requirements_file],
                    constraints_files=[constraint_file],
                    upgrade=upgrade,
                    upgrade_strategy=upgrade_strategy,
                )
                try:
                    self._run_command_and_log_output(cmd, stderr=subprocess.STDOUT)
                except Exception:
                    LOGGER.error("requirements: %s", content_requirements_file)
                    raise

    @classmethod
    def check(cls, in_scope: Pattern[str], constraints: Optional[List[Requirement]] = None) -> bool:
        """
        Check this Python environment for incompatible dependencies in installed packages.

        :param in_scope: A full pattern representing the package names that are considered in scope for the installed packages'
            compatibility check. Only in scope packages' dependencies will be considered for conflicts. The pattern is matched
            against an all-lowercase package name.
        :param constraints: In addition to checking for compatibility within the environment, also verify that the environment's
            packages meet the given constraints. All listed packages are expected to be installed.
        :return: True iff the check succeeds.
        """

        dist_info: DistInfoDistribution
        # add all requirements of all in scope packages installed in this environment
        all_constraints: Set[Requirement] = set(constraints if constraints is not None else []).union(
            requirement
            for dist_info in pkg_resources.working_set
            if in_scope.fullmatch(dist_info.key)
            for requirement in dist_info.requires()
        )

        installed_versions: Dict[str, version.Version] = PythonWorkingSet.get_packages_in_working_set()
        constraint_violations: List[Tuple[Requirement, Optional[version.Version]]] = [
            (constraint, installed_versions.get(constraint.key, None))
            for constraint in all_constraints
            if constraint.key not in installed_versions or str(installed_versions[constraint.key]) not in constraint
        ]

        for constraint, v in constraint_violations:
            LOGGER.warning("Incompatibility between constraint %s and installed version %s", constraint, v)
        return len(constraint_violations) == 0

    @classmethod
    def get_module_file(cls, module: str) -> Optional[Tuple[Optional[str], Loader]]:
        """
        Get the location of the init file for a Python module within the active environment.

        :return: A tuple of the path and the associated loader, if the module is found.
        """
        spec: Optional[ModuleSpec]
        try:
            spec = importlib.util.find_spec(module)
        # inmanta.loader.PluginModuleLoader raises ImportError if module is not found
        except (ImportError, ModuleNotFoundError):
            spec = None
        return (spec.origin, spec.loader) if spec is not None else None

    def notify_change(self) -> None:
        """
        This method must be called when a package is installed or removed from the environment in order for Python to detect
        the change. Namespace packages installed in editable mode in particular require this method to allow them to be found by
        get_module_file().
        """
        # Make sure that the .pth files in the site-packages directory are processed.
        # This is required to make editable installs work.
        site.addsitedir(self.site_packages_dir)
        importlib.invalidate_caches()

        if const.PLUGINS_PACKAGE in sys.modules:
            mod = sys.modules[const.PLUGINS_PACKAGE]
            if mod is not None:
                # Make mypy happy
                assert mod.__spec__.submodule_search_locations is not None
                if self.site_packages_dir not in mod.__spec__.submodule_search_locations and os.path.exists(
                    os.path.join(self.site_packages_dir, const.PLUGINS_PACKAGE)
                ):
                    """
                    A V2 module was installed in this virtual environment, but the inmanta_plugins package was already
                    loaded before this venv was activated. Reload the inmanta_plugins package to ensure that all V2 modules
                    installed in this virtual environment are discovered correctly.

                    This is required to cover the following scenario:

                        * Two venvs are stacked on top of each other. The parent venv contains the inmanta-core package and
                          the subvenv is empty.
                        * The inmanta_plugins package gets loaded before a V2 module is installed in the subvenv. This way,
                          the module object in sys.modules, doesn't have the site dir of the subvenv in its
                          submodule_search_locations. This field caches where the loader should look for the namespace packages
                          that are part of the inmanta_plugins namespace.
                        * When a V2 module gets installed in the subvenv now, the loader will not find the newly installed V2
                          module, because it's not considering the site dir of the subvenv.

                    The above-mentioned scenario can only be triggered by test cases, because:
                        1) The compiler venv was removed. As such, no new venv are activated on the fly by production code
                           paths.
                        2) The compiler service creates a new subvenv for each inmanta environment, but the inmanta commands
                           are executed in a subprocess.
                    """
                    importlib.reload(mod)
        PythonWorkingSet.rebuild_working_set()


process_env: ActiveEnv = ActiveEnv(python_path=sys.executable)
"""
Singleton representing the Python environment this process is running in.
"""


[docs]@stable_api def mock_process_env(*, python_path: Optional[str] = None, env_path: Optional[str] = None) -> None: """ Overrides the process environment information. This forcefully sets the environment that is recognized as the outer Python environment. This function should only be called when a Python environment has been set up dynamically and this environment should be treated as if this process was spawned from it, and even then with great care. :param python_path: The path to the python binary. Only one of `python_path` and `env_path` should be set. :param env_path: The path to the python environment directory. Only one of `python_path` and `env_path` should be set. """ process_env.__init__(python_path=python_path, env_path=env_path) # type: ignore
[docs]@stable_api class VirtualEnv(ActiveEnv): """ Creates and uses a virtual environment for this process. This virtualenv inherits from the previously active one. """ def __init__(self, env_path: str) -> None: LOGGER.info("Creating new virtual environment in %s", env_path) super(VirtualEnv, self).__init__(env_path=env_path) self.env_path: str = env_path self.virtual_python: Optional[str] = None self.__using_venv: bool = False self._parent_python: Optional[str] = None self._path_pth_file = os.path.join(self.site_packages_dir, "inmanta-inherit-from-parent-venv.pth") def exists(self) -> bool: """ Returns True iff the venv exists on disk. """ return os.path.exists(self.python_path) and os.path.exists(self._path_pth_file)
[docs] def init_env(self) -> None: """ Initialize the virtual environment. """ self._parent_python = sys.executable # check if the virtual env exists if not os.path.exists(self.python_path): # venv requires some care when the .env folder already exists # https://docs.python.org/3/library/venv.html if not os.path.exists(self.env_path): path = self.env_path else: # venv has problems with symlinks path = os.path.realpath(self.env_path) # --clear is required in python prior to 3.4 if the folder already exists try: venv.create(path, clear=True, with_pip=False) self._write_pip_binary() self._write_pth_file() except CalledProcessError as e: raise VenvCreationFailedError(msg=f"Unable to create new virtualenv at {self.env_path} ({e.stdout.decode()})") except Exception: raise VenvCreationFailedError(msg=f"Unable to create new virtualenv at {self.env_path}") LOGGER.debug("Created a new virtualenv at %s", self.env_path) elif not os.path.exists(self._path_pth_file): # Venv was created using an older version of Inmanta -> Update pip binary and set sitecustomize.py file self._write_pip_binary() self._write_pth_file() # set the path to the python and the pip executables self.virtual_python = self.python_path
def is_using_virtual_env(self) -> bool: return self.__using_venv
[docs] def use_virtual_env(self) -> None: """ Activate the virtual environment. """ if self.__using_venv: raise Exception(f"Already using venv {self.env_path}.") self.init_env() self._activate_that() mock_process_env(python_path=self.python_path) # patch up pkg self.notify_change() self.__using_venv = True
def _write_pip_binary(self) -> None: """ write out a "stub" pip binary so that pip list works in the virtual env. """ pip_path = os.path.join(self.env_path, "bin", "pip") with open(pip_path, "w", encoding="utf-8") as fd: fd.write( """#!/usr/bin/env bash source "$(dirname "$0")/activate" python -m pip $@ """.strip() ) os.chmod(pip_path, 0o755) def _write_pth_file(self) -> None: """ Write an inmanta-inherit-from-parent-venv.pth file to the venv to ensure that an activation of this venv will also activate the parent venv. The site directories of the parent venv should appear later in sys.path than the ones of this venv. """ site_dir_strings: List[str] = ['"' + p.replace('"', r"\"") + '"' for p in list(sys.path)] add_site_dir_statements: str = "\n".join( [f"site.addsitedir({p}) if {p} not in sys.path else None" for p in site_dir_strings] ) script = f""" import os import site import sys # Ensure inheritance from all parent venvs + process their .pth files {add_site_dir_statements} # Also set the PYTHONPATH environment variable for any subprocess os.environ["PYTHONPATH"] = os.pathsep.join(sys.path) """ script_as_oneliner = "; ".join( [line for line in script.split("\n") if line.strip() and not line.strip().startswith("#")] ) with open(self._path_pth_file, "w", encoding="utf-8") as fd: fd.write(script_as_oneliner) def _update_sys_path(self) -> None: """ Updates sys.path by adding self.site_packages_dir. This method ensures that .pth files are processed. """ prev_sys_path = list(sys.path) site.addsitedir(self.site_packages_dir) # Move the added items to the front of the path new_sys_path = [e for e in list(sys.path) if e not in prev_sys_path] new_sys_path += prev_sys_path # Set sys.path sys.path = new_sys_path def _activate_that(self) -> None: # adapted from https://github.com/pypa/virtualenv/blob/master/virtualenv_embedded/activate_this.py # MIT license # Copyright (c) 2007 Ian Bicking and Contributors # Copyright (c) 2009 Ian Bicking, The Open Planning Project # Copyright (c) 2011-2016 The virtualenv developers binpath: str = os.path.dirname(self.python_path) base: str = os.path.dirname(binpath) old_os_path = os.environ.get("PATH", "") os.environ["PATH"] = binpath + os.pathsep + old_os_path sys.real_prefix = sys.prefix sys.prefix = base self._update_sys_path() # Also set the python path environment variable for any subprocess os.environ["PYTHONPATH"] = os.pathsep.join(sys.path) def install_from_index( self, requirements: List[Requirement], index_urls: Optional[List[str]] = None, upgrade: bool = False, allow_pre_releases: bool = False, constraint_files: Optional[List[str]] = None, ) -> None: if not self.__using_venv: raise Exception(f"Not using venv {self.env_path}. use_virtual_env() should be called first.") super(VirtualEnv, self).install_from_index( requirements, index_urls, upgrade, allow_pre_releases, constraint_files, ) def install_from_source(self, paths: List[LocalPackagePath], constraint_files: Optional[List[str]] = None) -> None: if not self.__using_venv: raise Exception(f"Not using venv {self.env_path}. use_virtual_env() should be called first.") super(VirtualEnv, self).install_from_source(paths, constraint_files) def install_from_list( self, requirements_list: Sequence[str], *, upgrade: bool = False, upgrade_strategy: PipUpgradeStrategy = PipUpgradeStrategy.ONLY_IF_NEEDED, ) -> None: if not self.__using_venv: raise Exception(f"Not using venv {self.env_path}. use_virtual_env() should be called first.") super(VirtualEnv, self).install_from_list(requirements_list, upgrade=upgrade, upgrade_strategy=upgrade_strategy)
class VenvCreationFailedError(Exception): def __init__(self, msg: str) -> None: super().__init__(msg) self.msg = msg