Source code for

    Copyright 2019 Inmanta

    Licensed under the Apache License, Version 2.0 (the "License");
    you may not use this file except in compliance with the License.
    You may obtain a copy of the License at

    Unless required by applicable law or agreed to in writing, software
    distributed under the License is distributed on an "AS IS" BASIS,
    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and
    limitations under the License.

    Contact: [email protected]
import datetime
import uuid
from enum import Enum
from itertools import chain
from typing import Any, ClassVar, Dict, List, NewType, Optional, Union

import pydantic
import pydantic.schema
from pydantic.fields import ModelField

import inmanta.ast.export as ast_export
from inmanta import const
from inmanta.stable_api import stable_api
from inmanta.types import ArgumentTypes, JsonType, SimpleTypes, StrictNonIntBool

# This reference to the actual pydantic field_type_schema method is only loaded once
old_field_type_schema = pydantic.schema.field_type_schema

def patch_pydantic_field_type_schema() -> None:
    This ugly patch fixes the serialization of models containing Optional in them.

    def patch_nullable(field: ModelField, **kwargs):
        f_schema, definitions, nested_models = old_field_type_schema(field, **kwargs)
        if field.allow_none:
            f_schema["nullable"] = True
        return f_schema, definitions, nested_models

    pydantic.schema.field_type_schema = patch_nullable

def validator_timezone_aware_timestamps(value: object) -> object:
    A Pydantic validator to ensure that all datetime times are timezone aware.
    if isinstance(value, datetime.datetime) and value.tzinfo is None:
        return value.replace(tzinfo=datetime.timezone.utc)
        return value

[docs]@stable_api class BaseModel(pydantic.BaseModel): """ Base class for all data objects in Inmanta """ _normalize_timestamps: ClassVar[classmethod] = pydantic.validator("*", allow_reuse=True)( validator_timezone_aware_timestamps )
[docs] class Config: """ Pydantic config. """ # Populate models with the value property of enums, rather than the raw enum. # This is useful to serialise model.dict() later use_enum_values = True
class ExtensionStatus(BaseModel): """ Status response for extensions loaded in the server """ name: str version: str package: str class SliceStatus(BaseModel): """ Status response for slices loaded in the the server """ name: str status: Dict[str, ArgumentTypes] class FeatureStatus(BaseModel): """ Status of the feature """ slice: str name: str value: Optional[Any] class StatusResponse(BaseModel): """ Response for the status method call """ product: str edition: str version: str license: Union[str, Dict[str, SimpleTypes]] extensions: List[ExtensionStatus] slices: List[SliceStatus] features: List[FeatureStatus]
[docs]@stable_api class CompileData(BaseModel): """ Top level structure of compiler data to be exported. """ errors: List[ast_export.Error] """ All errors occurred while trying to compile. """
class CompileRunBase(BaseModel): id: uuid.UUID remote_id: Optional[uuid.UUID] environment: uuid.UUID requested: Optional[datetime.datetime] started: Optional[datetime.datetime] do_export: bool force_update: bool metadata: JsonType environment_variables: Dict[str, str] class CompileRun(CompileRunBase): compile_data: Optional[CompileData] class CompileReport(CompileRunBase): completed: Optional[datetime.datetime] success: Optional[bool] version: Optional[int] class CompileRunReport(BaseModel): id: uuid.UUID started: datetime.datetime completed: Optional[datetime.datetime] command: str name: str errstream: str outstream: str returncode: Optional[int] class CompileDetails(CompileReport): compile_data: Optional[CompileData] reports: Optional[List[CompileRunReport]] ResourceVersionIdStr = NewType("ResourceVersionIdStr", str) # Part of the stable API """ The resource id with the version included. """ ResourceIdStr = NewType("ResourceIdStr", str) # Part of the stable API """ The resource id without the version """ ResourceType = NewType("ResourceType", str) """ The type of the resource """ class AttributeStateChange(BaseModel): """ Changes in the attribute """ current: Optional[Any] = None desired: Optional[Any] = None EnvSettingType = Union[StrictNonIntBool, int, str, Dict[str, Union[str, int, StrictNonIntBool]]] class Environment(BaseModel): """ An inmanta environment. :note: repo_url and repo_branch will be moved to the settings. """ id: uuid.UUID name: str project_id: uuid.UUID repo_url: str repo_branch: str settings: Dict[str, EnvSettingType] halted: bool description: Optional[str] icon: Optional[str] class Project(BaseModel): """ An inmanta environment. """ id: uuid.UUID name: str environments: List[Environment] class EnvironmentSetting(BaseModel): """A class to define a new environment setting. :param name: The name of the setting. :param type: The type of the value. This type is mainly used for documentation purpose. :param default: An optional default value for this setting. When a default is set and the is requested from the database, it will return the default value and also store the default value in the database. :param doc: The documentation/help string for this setting :param recompile: Trigger a recompile of the model when a setting is updated? :param update_model: Update the configuration model (git pull on project and repos) :param agent_restart: Restart autostarted agents when this settings is updated. :param allowed_values: list of possible values (if type is enum) """ name: str type: str default: EnvSettingType doc: str recompile: bool update_model: bool agent_restart: bool allowed_values: Optional[List[EnvSettingType]] class EnvironmentSettingsReponse(BaseModel): settings: Dict[str, EnvSettingType] definition: Dict[str, EnvironmentSetting] class ModelMetadata(BaseModel): """Model metadata""" inmanta_compile_state: const.Compilestate = const.Compilestate.success message: str type: str extra_data: Optional[JsonType] class Config: fields = {"inmanta_compile_state": {"alias": "inmanta:compile:state"}} class ModelVersionInfo(BaseModel): """Version information that can be associated with an orchestration model :param export_metadata: Metadata associated with this version :param model: A serialization of the complete orchestration model """ export_metadata: ModelMetadata model: Optional[JsonType] class Resource(BaseModel): environment: uuid.UUID model: int resource_id: ResourceVersionIdStr resource_type: ResourceType resource_version_id: ResourceVersionIdStr resource_id_value: str agent: str last_deploy: Optional[datetime.datetime] attributes: JsonType status: const.ResourceState class ResourceAction(BaseModel): environment: uuid.UUID version: int resource_version_ids: List[ResourceVersionIdStr] action_id: uuid.UUID action: const.ResourceAction started: datetime.datetime finished: Optional[datetime.datetime] messages: Optional[List[JsonType]] status: Optional[const.ResourceState] changes: Optional[JsonType] change: Optional[const.Change] send_event: Optional[bool] = None # Deprecated field class ResourceDeploySummary(BaseModel): """ :param total: The total number of resources :param by_state: The number of resources by state in the latest released version """ total: int by_state: Dict[str, int] @classmethod def create_from_db_result(cls, summary_by_state: Dict[str, int]) -> "ResourceDeploySummary": full_summary_by_state = cls._ensure_summary_has_all_states(summary_by_state) total = cls._count_all_resources(full_summary_by_state) return ResourceDeploySummary(by_state=full_summary_by_state, total=total) @classmethod def _ensure_summary_has_all_states(cls, summary_by_state: Dict[str, int]) -> Dict[str, int]: full_summary = summary_by_state.copy() for state in const.ResourceState: if state not in summary_by_state.keys() and state != const.ResourceState.dry: full_summary[state] = 0 return full_summary @classmethod def _count_all_resources(cls, summary_by_state: Dict[str, int]) -> int: return sum(resource_count for resource_count in summary_by_state.values()) class LogLine(BaseModel): class Config: """ Pydantic config. """ # Override the setting from the BaseModel class as such that the level field is # serialises using the name of the enum instead of its value. This is required # to make sure that data sent to the API endpoints resource_action_update # and resource_deploy_done are serialized consistently using the name of the enum. use_enum_values = False level: const.LogLevel msg: str args: List[Optional[ArgumentTypes]] = [] kwargs: Dict[str, Optional[ArgumentTypes]] = {} timestamp: datetime.datetime class ResourceIdDetails(BaseModel): resource_type: ResourceType agent: str attribute: str resource_id_value: str class OrphanedResource(str, Enum): orphaned = "orphaned" class StrEnum(str, Enum): """Enum where members are also (and must be) strs""" ReleasedResourceState = StrEnum( "ReleasedResourceState", [(, i.value) for i in chain(const.ResourceState, OrphanedResource)] ) class VersionedResource(BaseModel): resource_id: ResourceIdStr resource_version_id: ResourceVersionIdStr id_details: ResourceIdDetails requires: List[ResourceVersionIdStr] @property def all_fields(self) -> Dict[str, Any]: return {**self.dict(), **self.id_details.dict()} class LatestReleasedResource(VersionedResource): status: ReleasedResourceState class PagingBoundaries: """Represents the lower and upper bounds that should be used for the next and previous pages when listing domain entities""" def __init__( self, start: Union[datetime.datetime, int, str], end: Union[datetime.datetime, int, str], first_id: Optional[Union[uuid.UUID, str]], last_id: Optional[Union[uuid.UUID, str]], ) -> None: self.start = start self.end = end self.first_id = first_id self.last_id = last_id class ResourceDetails(BaseModel): """The details of a resource :param resource_id: The id of the resource :param resource_type: The type of the resource :param agent: The agent associated with this resource :param id_attribute: The name of the identifying attribute of the resource :param id_attribute_value: The value of the identifying attribute of the resource :param attributes: The attributes of the resource """ resource_id: ResourceIdStr resource_type: ResourceType agent: str id_attribute: str id_attribute_value: str attributes: JsonType class VersionedResourceDetails(ResourceDetails): """The details of a resource version :param resource_version_id: The id of the resource :param version: The version of the resource """ resource_version_id: ResourceVersionIdStr version: int class ReleasedResourceDetails(ResourceDetails): """The details of a released resource :param last_deploy: The value of the last_deploy on the latest released version of the resource :param first_generated_time: The first time this resource was generated :param first_generated_version: The first model version this resource was in :param status: The current status of the resource :param requires_status: The id and status of the resources this resource requires """ last_deploy: Optional[datetime.datetime] first_generated_time: datetime.datetime first_generated_version: int status: ReleasedResourceState requires_status: Dict[ResourceIdStr, ReleasedResourceState] class ResourceHistory(BaseModel): resource_id: ResourceIdStr date: datetime.datetime attributes: JsonType attribute_hash: str requires: List[ResourceIdStr] class ResourceLog(LogLine): action_id: uuid.UUID action: const.ResourceAction class Parameter(BaseModel): id: uuid.UUID name: str value: str environment: uuid.UUID resource_id: ResourceIdStr source: str updated: Optional[datetime.datetime] metadata: Optional[JsonType] class Agent(BaseModel): """ :param environment: Id of the agent's environment :param name: The name of the agent :param last_failover: The time of the last failover :param paused: Whether the agent is paused or not :param unpause_on_resume: Whether the agent should be unpaused when the environment is resumed :param status: The current status of the agent :param process_id: The id of the agent process that belongs to this agent, if there is one :param process_name: The name of the agent process that belongs to this agent, if there is one """ environment: uuid.UUID name: str last_failover: Optional[datetime.datetime] paused: bool process_id: Optional[uuid.UUID] process_name: Optional[str] unpause_on_resume: Optional[bool] status: const.AgentStatus class AgentProcess(BaseModel): sid: uuid.UUID hostname: str environment: uuid.UUID first_seen: Optional[datetime.datetime] last_seen: Optional[datetime.datetime] expired: Optional[datetime.datetime] state: Optional[Dict[str, Union[Dict[str, List[str]], Dict[str, str], Dict[str, float], str]]] class DesiredStateLabel(BaseModel): name: str message: str class DesiredStateVersion(BaseModel): version: int date: datetime.datetime total: int labels: List[DesiredStateLabel] status: const.DesiredStateVersionStatus class NoPushTriggerMethod(str, Enum): no_push = "no_push" PromoteTriggerMethod = StrEnum( "PromoteTriggerMethod", [(, i.value) for i in chain(const.AgentTriggerMethod, NoPushTriggerMethod)] )