Source code for inmanta.data.model

"""
    Copyright 2019 Inmanta

    Licensed under the Apache License, Version 2.0 (the "License");
    you may not use this file except in compliance with the License.
    You may obtain a copy of the License at

        http://www.apache.org/licenses/LICENSE-2.0

    Unless required by applicable law or agreed to in writing, software
    distributed under the License is distributed on an "AS IS" BASIS,
    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and
    limitations under the License.

    Contact: code@inmanta.com
"""
import datetime
import uuid
from enum import Enum
from itertools import chain
from typing import Any, ClassVar, NewType, Optional, Union

import pydantic
import pydantic.schema
from pydantic import Extra, root_validator, validator
from pydantic.fields import ModelField
from pydantic.types import StrictFloat, StrictInt, StrictStr

import inmanta
import inmanta.ast.export as ast_export
from inmanta import const, data, protocol, resources
from inmanta.stable_api import stable_api
from inmanta.types import ArgumentTypes, JsonType, SimpleTypes, StrictNonIntBool

# This reference to the actual pydantic field_type_schema method is only loaded once
old_field_type_schema = pydantic.schema.field_type_schema


def patch_pydantic_field_type_schema() -> None:
    """
    This ugly patch fixes the serialization of models containing Optional in them.
    https://github.com/samuelcolvin/pydantic/issues/1270

    The fix for this issue will be included in pydantic V2.
    """

    def patch_nullable(field: ModelField, **kwargs):
        f_schema, definitions, nested_models = old_field_type_schema(field, **kwargs)
        if field.allow_none:
            f_schema["nullable"] = True
        return f_schema, definitions, nested_models

    pydantic.schema.field_type_schema = patch_nullable


def api_boundary_datetime_normalizer(value: datetime.datetime) -> datetime.datetime:
    if value.tzinfo is None:
        return value.replace(tzinfo=datetime.timezone.utc)
    else:
        return value


def validator_timezone_aware_timestamps(value: object) -> object:
    """
    A Pydantic validator to ensure that all datetime times are timezone aware.
    """
    if isinstance(value, datetime.datetime):
        return api_boundary_datetime_normalizer(value)
    else:
        return value


[docs] @stable_api class BaseModel(pydantic.BaseModel): """ Base class for all data objects in Inmanta """ _normalize_timestamps: ClassVar[classmethod] = pydantic.validator("*", allow_reuse=True)( validator_timezone_aware_timestamps )
[docs] class Config: """ Pydantic config. """ # Populate models with the value property of enums, rather than the raw enum. # This is useful to serialise model.dict() later use_enum_values = True
class ExtensionStatus(BaseModel): """ Status response for extensions loaded in the server """ name: str version: str package: str class SliceStatus(BaseModel): """ Status response for slices loaded in the the server """ name: str status: dict[str, ArgumentTypes] class FeatureStatus(BaseModel): """ Status of the feature """ slice: str name: str value: Optional[Any] class StatusResponse(BaseModel): """ Response for the status method call """ product: str edition: str version: str license: Union[str, dict[str, SimpleTypes]] extensions: list[ExtensionStatus] slices: list[SliceStatus] features: list[FeatureStatus]
[docs] @stable_api class CompileData(BaseModel): """ Top level structure of compiler data to be exported. """ errors: list[ast_export.Error] """ All errors occurred while trying to compile. """
class CompileRunBase(BaseModel): id: uuid.UUID remote_id: Optional[uuid.UUID] environment: uuid.UUID requested: Optional[datetime.datetime] started: Optional[datetime.datetime] do_export: bool force_update: bool metadata: JsonType environment_variables: dict[str, str] partial: bool removed_resource_sets: list[str] exporter_plugin: Optional[str] notify_failed_compile: Optional[bool] failed_compile_message: Optional[str] class CompileRun(CompileRunBase): compile_data: Optional[CompileData] class CompileReport(CompileRunBase): completed: Optional[datetime.datetime] success: Optional[bool] version: Optional[int] class CompileRunReport(BaseModel): id: uuid.UUID started: datetime.datetime completed: Optional[datetime.datetime] command: str name: str errstream: str outstream: str returncode: Optional[int] class CompileDetails(CompileReport): compile_data: Optional[CompileData] reports: Optional[list[CompileRunReport]] ResourceVersionIdStr = NewType("ResourceVersionIdStr", str) # Part of the stable API """ The resource id with the version included. """ ResourceIdStr = NewType("ResourceIdStr", str) # Part of the stable API """ The resource id without the version """ ResourceType = NewType("ResourceType", str) """ The type of the resource """ class AttributeStateChange(BaseModel): """ Changes in the attribute """ current: Optional[Any] = None desired: Optional[Any] = None @validator("current", "desired") @classmethod def check_serializable(cls, v: Optional[Any]) -> Optional[Any]: """ Verify whether the value is serializable (https://github.com/inmanta/inmanta-core/issues/3470) """ try: protocol.common.json_encode(v) except TypeError: if inmanta.RUNNING_TESTS: # Fail the test when the value is not serializable raise Exception(f"Failed to serialize attribute {v}") else: # In production, try to cast the non-serializable value to str to prevent the handler from failing. return str(v) return v EnvSettingType = Union[StrictNonIntBool, StrictInt, StrictFloat, StrictStr, dict[str, Union[str, int, StrictNonIntBool]]] class Environment(BaseModel): """ An inmanta environment. :note: repo_url and repo_branch will be moved to the settings. """ id: uuid.UUID name: str project_id: uuid.UUID repo_url: str repo_branch: str settings: dict[str, EnvSettingType] halted: bool description: Optional[str] icon: Optional[str] class Project(BaseModel): """ An inmanta environment. """ id: uuid.UUID name: str environments: list[Environment] class EnvironmentSetting(BaseModel): """A class to define a new environment setting. :param name: The name of the setting. :param type: The type of the value. This type is mainly used for documentation purpose. :param default: An optional default value for this setting. When a default is set and the is requested from the database, it will return the default value and also store the default value in the database. :param doc: The documentation/help string for this setting :param recompile: Trigger a recompile of the model when a setting is updated? :param update_model: Update the configuration model (git pull on project and repos) :param agent_restart: Restart autostarted agents when this settings is updated. :param allowed_values: list of possible values (if type is enum) """ name: str type: str default: EnvSettingType doc: str recompile: bool update_model: bool agent_restart: bool allowed_values: Optional[list[EnvSettingType]] class EnvironmentSettingsReponse(BaseModel): settings: dict[str, EnvSettingType] definition: dict[str, EnvironmentSetting] class ModelMetadata(BaseModel): """Model metadata""" inmanta_compile_state: const.Compilestate = const.Compilestate.success message: str type: str extra_data: Optional[JsonType] class Config: fields = {"inmanta_compile_state": {"alias": "inmanta:compile:state"}} class ModelVersionInfo(BaseModel): """Version information that can be associated with an orchestration model :param export_metadata: Metadata associated with this version :param model: A serialization of the complete orchestration model """ export_metadata: ModelMetadata model: Optional[JsonType] class ResourceMinimal(BaseModel): """ Represents a resource object as it comes in over the API. Provides strictly required validation only. """ id: ResourceVersionIdStr @classmethod @validator("id") def id_is_resource_version_id(cls, v): if resources.Id.is_resource_version_id(v): return v raise ValueError(f"id {v} is not of type ResourceVersionIdStr") class Config: extra = Extra.allow class Resource(BaseModel): environment: uuid.UUID model: int resource_id: ResourceIdStr resource_type: ResourceType resource_version_id: ResourceVersionIdStr resource_id_value: str agent: str last_deploy: Optional[datetime.datetime] attributes: JsonType status: const.ResourceState resource_set: Optional[str] class ResourceAction(BaseModel): environment: uuid.UUID version: int resource_version_ids: list[ResourceVersionIdStr] action_id: uuid.UUID action: const.ResourceAction started: datetime.datetime finished: Optional[datetime.datetime] messages: Optional[list[JsonType]] status: Optional[const.ResourceState] changes: Optional[JsonType] change: Optional[const.Change] send_event: Optional[bool] = None # Deprecated field class ResourceDeploySummary(BaseModel): """ :param total: The total number of resources :param by_state: The number of resources by state in the latest released version """ total: int by_state: dict[str, int] @classmethod def create_from_db_result(cls, summary_by_state: dict[str, int]) -> "ResourceDeploySummary": full_summary_by_state = cls._ensure_summary_has_all_states(summary_by_state) total = cls._count_all_resources(full_summary_by_state) return ResourceDeploySummary(by_state=full_summary_by_state, total=total) @classmethod def _ensure_summary_has_all_states(cls, summary_by_state: dict[str, int]) -> dict[str, int]: full_summary = summary_by_state.copy() for state in const.ResourceState: if state not in summary_by_state.keys() and state != const.ResourceState.dry: full_summary[state] = 0 return full_summary @classmethod def _count_all_resources(cls, summary_by_state: dict[str, int]) -> int: return sum(resource_count for resource_count in summary_by_state.values()) class LogLine(BaseModel): class Config: """ Pydantic config. """ # Override the setting from the BaseModel class as such that the level field is # serialises using the name of the enum instead of its value. This is required # to make sure that data sent to the API endpoints resource_action_update # and resource_deploy_done are serialized consistently using the name of the enum. use_enum_values = False level: const.LogLevel msg: str args: list[Optional[ArgumentTypes]] = [] kwargs: JsonType = {} timestamp: datetime.datetime class ResourceIdDetails(BaseModel): resource_type: ResourceType agent: str attribute: str resource_id_value: str class OrphanedResource(str, Enum): orphaned = "orphaned" class StrEnum(str, Enum): """Enum where members are also (and must be) strs""" ReleasedResourceState = StrEnum( "ReleasedResourceState", [(i.name, i.value) for i in chain(const.ResourceState, OrphanedResource)] ) class VersionedResource(BaseModel): resource_id: ResourceIdStr resource_version_id: ResourceVersionIdStr id_details: ResourceIdDetails requires: list[ResourceVersionIdStr] @property def all_fields(self) -> dict[str, Any]: return {**self.dict(), **self.id_details.dict()} class LatestReleasedResource(VersionedResource): status: ReleasedResourceState class PagingBoundaries: """Represents the lower and upper bounds that should be used for the next and previous pages when listing domain entities""" def __init__( self, start: Optional["inmanta.data.PRIMITIVE_SQL_TYPES"], # Can be none if user selected field is nullable end: Optional["inmanta.data.PRIMITIVE_SQL_TYPES"], # Can be none if user selected field is nullable first_id: Optional["inmanta.data.PRIMITIVE_SQL_TYPES"], # Can be none if single keyed last_id: Optional["inmanta.data.PRIMITIVE_SQL_TYPES"], # Can be none if single keyed ) -> None: self.start = start self.end = end self.first_id = first_id self.last_id = last_id class ResourceDetails(BaseModel): """The details of a resource :param resource_id: The id of the resource :param resource_type: The type of the resource :param agent: The agent associated with this resource :param id_attribute: The name of the identifying attribute of the resource :param id_attribute_value: The value of the identifying attribute of the resource :param attributes: The attributes of the resource """ resource_id: ResourceIdStr resource_type: ResourceType agent: str id_attribute: str id_attribute_value: str attributes: JsonType class VersionedResourceDetails(ResourceDetails): """The details of a resource version :param resource_version_id: The id of the resource :param version: The version of the resource """ resource_version_id: ResourceVersionIdStr version: int @root_validator @classmethod def ensure_version_field_set_in_attributes(cls, v: JsonType) -> JsonType: # Due to a bug, the version field has always been present in the attributes dictionary. # This bug has been fixed in the database. For backwards compatibility reason we here make sure that the # version field is present in the attributes dictionary served out via the API. if "version" not in v["attributes"]: v["attributes"]["version"] = v["version"] return v class ReleasedResourceDetails(ResourceDetails): """The details of a released resource :param last_deploy: The value of the last_deploy on the latest released version of the resource :param first_generated_time: The first time this resource was generated :param first_generated_version: The first model version this resource was in :param status: The current status of the resource :param requires_status: The id and status of the resources this resource requires """ last_deploy: Optional[datetime.datetime] first_generated_time: datetime.datetime first_generated_version: int status: ReleasedResourceState requires_status: dict[ResourceIdStr, ReleasedResourceState] class ResourceHistory(BaseModel): resource_id: ResourceIdStr date: datetime.datetime attributes: JsonType attribute_hash: str requires: list[ResourceIdStr] class ResourceLog(LogLine): action_id: uuid.UUID action: const.ResourceAction class ResourceDiffStatus(str, Enum): added = "added" modified = "modified" deleted = "deleted" unmodified = "unmodified" agent_down = "agent_down" undefined = "undefined" skipped_for_undefined = "skipped_for_undefined" class AttributeDiff(BaseModel): """ :param from_value: The value of the attribute in the earlier version :param to_value: The value of the attribute in the later version :param from_value_compare: A stringified, diff-friendly form of the 'from_value' field :param to_value_compare: A stringified, diff-friendly form of the 'to_value' field """ from_value: Optional[object] = None to_value: Optional[object] = None from_value_compare: str to_value_compare: str class ResourceDiff(BaseModel): """ :param resource_id: The id of the resource the diff is about (without version) :param attributes: The diff between the attributes of two versions of the resource :param status: The kind of diff between the versions of the resource """ resource_id: ResourceIdStr attributes: dict[str, AttributeDiff] status: ResourceDiffStatus class Parameter(BaseModel): id: uuid.UUID name: str value: str environment: uuid.UUID source: str updated: Optional[datetime.datetime] metadata: Optional[JsonType] class Fact(Parameter): resource_id: ResourceIdStr class Agent(BaseModel): """ :param environment: Id of the agent's environment :param name: The name of the agent :param last_failover: The time of the last failover :param paused: Whether the agent is paused or not :param unpause_on_resume: Whether the agent should be unpaused when the environment is resumed :param status: The current status of the agent :param process_id: The id of the agent process that belongs to this agent, if there is one :param process_name: The name of the agent process that belongs to this agent, if there is one """ environment: uuid.UUID name: str last_failover: Optional[datetime.datetime] paused: bool process_id: Optional[uuid.UUID] process_name: Optional[str] unpause_on_resume: Optional[bool] status: const.AgentStatus class AgentProcess(BaseModel): sid: uuid.UUID hostname: str environment: uuid.UUID first_seen: Optional[datetime.datetime] last_seen: Optional[datetime.datetime] expired: Optional[datetime.datetime] state: Optional[dict[str, Union[dict[str, list[str]], dict[str, str], dict[str, float], str]]] class DesiredStateLabel(BaseModel): name: str message: str class DesiredStateVersion(BaseModel): version: int date: datetime.datetime total: int labels: list[DesiredStateLabel] status: const.DesiredStateVersionStatus class NoPushTriggerMethod(str, Enum): no_push = "no_push" PromoteTriggerMethod = StrEnum( "PromoteTriggerMethod", [(i.name, i.value) for i in chain(const.AgentTriggerMethod, NoPushTriggerMethod)] ) class DryRun(BaseModel): id: uuid.UUID environment: uuid.UUID model: int date: Optional[datetime.datetime] total: int = 0 todo: int = 0 class DryRunReport(BaseModel): summary: DryRun diff: list[ResourceDiff] class Notification(BaseModel): """ :param id: The id of this notification :param environment: The environment this notification belongs to :param created: The date the notification was created at :param title: The title of the notification :param message: The actual text of the notification :param severity: The severity of the notification :param uri: A link to an api endpoint of the server, that is relevant to the message, and can be used to get further information about the problem. For example a compile related problem should have the uri: `/api/v2/compilereport/<compile_id>` :param read: Whether the notification was read or not :param cleared: Whether the notification was cleared or not """ id: uuid.UUID environment: uuid.UUID created: datetime.datetime title: str message: str severity: const.NotificationSeverity uri: str read: bool cleared: bool class Source(BaseModel): """Model for source code""" hash: str is_byte_code: bool module_name: str requirements: list[str] class EnvironmentMetricsResult(BaseModel): """ A container for metrics as returned by the /metrics endpoint. :param start: The starting of the aggregation interval. :param end: The end of the aggregation interval. :param timestamps: The timestamps that belongs to the aggregated metrics present in the `metrics` dictionary. :param metrics: A dictionary that maps the name of a metric to a list of aggregated datapoints. For metrics that are not grouped on a specific property, this list only contains the values of the metrics. For metrics that are grouped by a specific property, this list contains a dictionary where the key is the grouping attribute and the value is the value of the metric. The value is None when no data is available for that specific time window. """ start: datetime.datetime end: datetime.datetime timestamps: list[datetime.datetime] metrics: dict[str, list[Optional[Union[float, dict[str, float]]]]] class AuthMethod(str, Enum): database = "database" oidc = "oidc" class User(BaseModel): """A user""" username: str auth_method: AuthMethod class LoginReturn(BaseModel): """ Login information :param token: A token representing the user's authentication session :param user: The user object for which the token was created """ token: str user: User class DiscoveredResource(BaseModel): """ :param discovered_resource_id: The name of the resource :param values: The actual resource """ discovered_resource_id: ResourceIdStr values: JsonType @validator("discovered_resource_id") @classmethod def discovered_resource_id_is_resource_id(cls, v: str) -> Optional[Any]: if resources.Id.is_resource_id(v): return v raise ValueError(f"id {v} is not of type ResourceIdStr") def to_dao(self, env: uuid) -> "data.DiscoveredResource": return data.DiscoveredResource( discovered_resource_id=self.discovered_resource_id, values=self.values, discovered_at=datetime.datetime.now(), environment=env, )