Source code for momba.ext.jani.dump_model

# -*- coding:utf-8 -*-
#
# Copyright (C) 2019-2020, Maximilian Köhl <mkoehl@cs.uni-saarland.de>

from __future__ import annotations

import typing as t

import dataclasses
import enum
import functools
import json
import warnings

from ... import model
from ...model import effects, context, expressions, operators, properties, types
from ...utils import checks
from ...metadata import version


# XXX: ignore this type definition, mypy does not support recursive types
JSON = t.Union[None, int, float, str, t.Sequence["JSON"], t.Mapping[str, "JSON"]]  # type: ignore

_JANIMap = t.Dict[str, JSON]  # type: ignore


class ModelFeature(enum.Enum):
    ARRAYS = "arrays"
    DATATYPES = "datatypes"
    DERIVED_OPERATORS = "derived-operators"
    EDGE_PRIORITIES = "edge-priorities"
    FUNCTIONS = "functions"
    HYPERBOLIC_FUNCTIONS = "hyperbolic-functions"
    NAMED_EXPRESSIONS = "named-expressions"
    NONDET_EXPRESSIONS = "nondet-expressions"
    STATE_EXIT_REWARDS = "state-exit-rewards"
    TRADEOFF_PROPERTIES = "tradeoff-properties"
    TRIGONOMETRIC_FUNCTIONS = "trigonometric-functions"

    X_MOMBA_OPERATORS = "x-momba-operators"
    X_MOMBA_VALUE_PASSING = "x-momba-value-passing"


_NamedObject = t.Union[model.Location, model.Automaton]


@dataclasses.dataclass
class JANIContext:
    allow_momba_operators: bool = False

    _name_counter: int = 0

    _actions: t.Set[str] = dataclasses.field(default_factory=set)

    _names: t.Dict[_NamedObject, str] = dataclasses.field(default_factory=dict)

    _features: t.Set[ModelFeature] = dataclasses.field(default_factory=set)

    @property
    def features(self) -> t.AbstractSet[ModelFeature]:
        return self._features

    @property
    def actions(self) -> t.AbstractSet[str]:
        return self._actions

    def get_unique_name(self) -> str:
        name = f"__momba_{self._name_counter}"
        self._name_counter += 1
        return name

    def get_name(self, obj: _NamedObject) -> str:
        if obj.name is None:
            try:
                return self._names[obj]
            except KeyError:
                name = self._names[obj] = self.get_unique_name()
                return name
        return obj.name

    def require_action(self, action: str) -> str:
        self._actions.add(action)
        return action

    def require(self, feature: ModelFeature) -> None:
        self._features.add(feature)


@functools.singledispatch
def _dump_type(typ: model.Type, ctx: JANIContext) -> JSON:
    raise NotImplementedError(f"dump has not been implemented for type {typ}")


@_dump_type.register
def _dump_type_integer(typ: types.IntegerType, ctx: JANIContext) -> JSON:
    return "int"


@_dump_type.register
def _dump_type_real(typ: types.RealType, ctx: JANIContext) -> JSON:
    return "real"


@_dump_type.register
def _dump_type_bool(typ: types.BoolType, ctx: JANIContext) -> JSON:
    return "bool"


@_dump_type.register
def _dump_type_clock(typ: types.ClockType, ctx: JANIContext) -> JSON:
    return "clock"


@_dump_type.register
def _dump_type_continuous(typ: types.ContinuousType, ctx: JANIContext) -> JSON:
    return "continuous"


@_dump_type.register
def _dump_bounded_type(typ: types.BoundedType, ctx: JANIContext) -> JSON:
    jani_type: _JANIMap = {"kind": "bounded", "base": _dump_type(typ.base, ctx)}
    if typ.upper_bound:
        jani_type["upper-bound"] = _dump_prop(typ.upper_bound, ctx)
    if typ.lower_bound:
        jani_type["lower-bound"] = _dump_prop(typ.lower_bound, ctx)
    return jani_type


@_dump_type.register
def _dump_array_type(typ: types.ArrayType, ctx: JANIContext) -> JSON:
    ctx.require(ModelFeature.ARRAYS)
    return {"kind": "array", "base": _dump_type(typ.base, ctx)}


checks.check_singledispatch(_dump_type, types.Type)


@functools.singledispatch
def _dump_prop(prop: model.Property, ctx: JANIContext) -> JSON:
    raise NotImplementedError(f"dump has not been implemented for property {prop}")


@_dump_prop.register
def _dump_identifier(expr: expressions.Identifier, ctx: JANIContext) -> JSON:
    return expr.name


@_dump_prop.register
def _dump_boolean_constant(expr: expressions.BooleanConstant, ctx: JANIContext) -> JSON:
    return expr.boolean


@_dump_prop.register
def _dump_integer_constant(expr: expressions.IntegerConstant, ctx: JANIContext) -> JSON:
    return expr.integer


@_dump_prop.register
def _dump_real_constant(expr: expressions.RealConstant, ctx: JANIContext) -> JSON:
    if isinstance(expr.real, expressions.NamedReal):
        return {"constant": expr.real.symbol}
    if not isinstance(expr.real, float) and expr.real != float(expr.real):
        warnings.warn(
            f"imprecise conversion: JSON does not support the number type {type(expr.real)}"
        )
    return float(expr.real)


@_dump_prop.register
def _dump_conditional(expr: expressions.Conditional, ctx: JANIContext) -> JSON:
    return {
        "op": "ite",
        "if": _dump_prop(expr.condition, ctx),
        "then": _dump_prop(expr.consequence, ctx),
        "else": _dump_prop(expr.alternative, ctx),
    }


_DERIVED_OPERATORS = {
    operators.BooleanOperator.IMPLY,
    operators.ComparisonOperator.GT,
    operators.ComparisonOperator.GE,
    operators.ArithmeticOperator.MIN,
    operators.ArithmeticOperator.MAX,
}


_Transform = t.Callable[[expressions.Expression], expressions.Expression]

_MOMBA_OPERATORS: t.Mapping[operators.BinaryOperator, _Transform] = {
    operators.BooleanOperator.XOR: expressions.normalize_xor,
    operators.BooleanOperator.EQUIV: expressions.normalize_equiv,
    operators.ArithmeticOperator.FLOOR_DIV: expressions.normalize_floor_div,
}


@_dump_prop.register
def _dump_binary_expression(
    expr: expressions.BinaryExpression, ctx: JANIContext
) -> JSON:
    if expr.operator in _DERIVED_OPERATORS:
        ctx.require(ModelFeature.DERIVED_OPERATORS)
    if expr.operator in _MOMBA_OPERATORS:
        if ctx.allow_momba_operators:
            ctx.require(ModelFeature.X_MOMBA_OPERATORS)
        else:
            return _dump_prop(_MOMBA_OPERATORS[expr.operator](expr), ctx)
    return {
        "op": expr.operator.symbol,
        "left": _dump_prop(expr.left, ctx),
        "right": _dump_prop(expr.right, ctx),
    }


@_dump_prop.register
def _dump_unary_expression(expr: expressions.UnaryExpression, ctx: JANIContext) -> JSON:
    return {"op": expr.operator.symbol, "exp": _dump_prop(expr.operand, ctx)}


@_dump_prop.register
def _dump_derivative(expr: expressions.Derivative, ctx: JANIContext) -> JSON:
    return {"op": "der", "var": expr.identifier}


@_dump_prop.register
def _dump_sample(expr: expressions.Sample, ctx: JANIContext) -> JSON:
    return {
        "distribution": expr.distribution.jani_name,
        "args": [_dump_prop(argument, ctx) for argument in expr.arguments],
    }


@_dump_prop.register
def _dump_selection(expr: expressions.Selection, ctx: JANIContext) -> JSON:
    ctx.require(ModelFeature.NONDET_EXPRESSIONS)
    return {
        "op": "nondet",
        "var": expr.name,
        "exp": _dump_prop(expr.condition, ctx),
    }


def _dump_prop_interval(pi: properties.PropertyInterval, ctx: JANIContext) -> JSON:
    prop_interval: _JANIMap = {}
    if pi.lower is not None:
        prop_interval["lower"] = _dump_prop(pi.lower, ctx)
    if pi.lower_exclusive is not None:
        prop_interval["lower-exclusive"] = _dump_prop(pi.lower_exclusive, ctx)
    if pi.upper is not None:
        prop_interval["upper"] = _dump_prop(pi.upper, ctx)
    if pi.lower_exclusive is not None:
        prop_interval["upper-exclusive"] = _dump_prop(pi.upper_exclusive, ctx)
    return prop_interval


def _dump_reward_bound(rb: properties.RewardBound, ctx: JANIContext) -> JSON:
    return {
        "exp": _dump_prop(rb.expression, ctx),
        "accumulate": [elem.value for elem in rb.accumulate],
        "bounds": _dump_prop_interval(rb.bounds, ctx),
    }


def _dump_reward_instant(ri: properties.RewardInstant, ctx: JANIContext) -> JSON:
    return {
        "exp": _dump_prop(ri.expression, ctx),
        "accumulate": [elem.value for elem in ri.accumulate],
        "instant": _dump_prop(ri.instant, ctx),
    }


@_dump_prop.register
def _dump_filter(prop: properties.Filter, ctx: JANIContext) -> JSON:
    return {
        "op": "filter",
        "fun": prop.function.value,
        "values": _dump_prop(prop.values, ctx),
        "states": _dump_prop(prop.states, ctx),
    }


@_dump_prop.register
def _dump_probability(prop: properties.ProbabilityProp, ctx: JANIContext) -> JSON:
    return {
        "op": prop.operator.value,
        "exp": _dump_prop(prop.expression, ctx),
    }


@_dump_prop.register
def _dump_path_formula(prop: properties.PathFormula, ctx: JANIContext) -> JSON:
    return {
        "op": prop.operator.value,
        "exp": _dump_prop(prop.expression, ctx),
    }


@_dump_prop.register
def _dump_expected(prop: properties.Expected, ctx: JANIContext) -> JSON:
    expected: _JANIMap = {
        "op": prop.operator.value,
        "exp": _dump_prop(prop.expression, ctx),
    }
    if prop.accumulate is not None:
        expected["accumulate"] = [elem.value for elem in prop.accumulate]
    if prop.reach is not None:
        expected["reach"] = _dump_prop(prop.reach, ctx)
    if prop.step_instant is not None:
        expected["step-instant"] = _dump_prop(prop.step_instant, ctx)
    if prop.time_instant is not None:
        expected["time-instant"] = _dump_prop(prop.time_instant, ctx)
    if prop.reward_instants is not None:
        expected["reward-instants"] = [
            _dump_reward_instant(ri, ctx) for ri in prop.reward_instants
        ]
    return expected


@_dump_prop.register
def _dump_steady(prop: properties.Steady, ctx: JANIContext) -> JSON:
    steady: _JANIMap = {
        "op": prop.operator.value,
        "exp": _dump_prop(prop.expression, ctx),
    }
    if prop.accumulate is not None:
        steady["accumulate"] = [elem.value for elem in prop.accumulate]
    return steady


@_dump_prop.register
def _dump_timed(prop: properties.Timed, ctx: JANIContext) -> JSON:
    timed: _JANIMap = {
        "op": prop.operator.value,
        "left": _dump_prop(prop.left, ctx),
        "right": _dump_prop(prop.right, ctx),
    }
    if prop.step_bounds is not None:
        timed["step-bounds"] = _dump_prop_interval(prop.step_bounds, ctx)
    if prop.time_bounds is not None:
        timed["time-bounds"] = _dump_prop_interval(prop.time_bounds, ctx)
    if prop.reward_bounds is not None:
        timed["reward-bounds"] = [
            _dump_reward_bound(rb, ctx) for rb in prop.reward_bounds
        ]
    return timed


@_dump_prop.register
def _dump_state_predicates(sp: properties.StatePredicates, ctx: JANIContext) -> JSON:
    return {"op": sp.value}


checks.check_singledispatch(_dump_prop, model.Property)


@functools.singledispatch
def _dump_target(target: effects.Target, ctx: JANIContext) -> JSON:
    raise NotImplementedError(f"_dump_target has not been implemented for {target}")


@_dump_target.register
def _dump_target_identifier(target: effects.Identifier, ctx: JANIContext) -> JSON:
    return target.name


checks.check_singledispatch(_dump_target, effects.Target)


def _dump_var_decl(decl: context.VariableDeclaration, ctx: JANIContext) -> JSON:
    jani_declaration: _JANIMap = {
        "name": decl.identifier,
        "type": _dump_type(decl.typ, ctx),
    }
    if decl.is_transient is not None:
        jani_declaration["transient"] = decl.is_transient
    if decl.initial_value is not None:
        jani_declaration["initial-value"] = _dump_prop(decl.initial_value, ctx)
    return jani_declaration


def _dump_const_decl(decl: context.ConstantDeclaration, ctx: JANIContext) -> JSON:
    jani_declaration: _JANIMap = {
        "name": decl.identifier,
        "type": _dump_type(decl.typ, ctx),
    }
    if decl.value is not None:
        jani_declaration["value"] = _dump_prop(decl.value, ctx)
    return jani_declaration


def _dump_assignment(assignment: effects.Assignment, ctx: JANIContext) -> JSON:
    jani_assignment: _JANIMap = {
        "ref": _dump_target(assignment.target, ctx),
        "value": _dump_prop(assignment.value, ctx),
    }
    if assignment.index != 0:
        jani_assignment["index"] = assignment.index
    return jani_assignment


def _dump_location(loc: model.Location, ctx: JANIContext) -> JSON:
    jani_location: _JANIMap = {
        "name": ctx.get_name(loc),
        "x-momba-anonymous": loc.name is None,
    }
    if loc.progress_invariant is not None:
        jani_location["time-progress"] = {
            "exp": _dump_prop(loc.progress_invariant, ctx)
        }
    if loc.transient_values is not None:
        jani_location["transient-values"] = [
            _dump_assignment(assignment, ctx) for assignment in loc.transient_values
        ]
    return jani_location


def _dump_destination(dst: model.Destination, ctx: JANIContext) -> JSON:
    jani_destination: _JANIMap = {"location": ctx.get_name(dst.location)}
    if dst.probability is not None:
        jani_destination["probability"] = {"exp": _dump_prop(dst.probability, ctx)}
    if dst.assignments:
        jani_destination["assignments"] = [
            _dump_assignment(assignment, ctx) for assignment in dst.assignments
        ]
    return jani_destination


def _dump_edge(edge: model.Edge, ctx: JANIContext) -> JSON:
    jani_edge: _JANIMap = {
        "location": ctx.get_name(edge.location),
        "destinations": [_dump_destination(dst, ctx) for dst in edge.destinations],
    }
    if edge.action_pattern is not None:
        jani_edge["action"] = _dump_action_pattern(edge.action_pattern, ctx)
    if edge.rate is not None:
        jani_edge["rate"] = {"exp": _dump_prop(edge.rate, ctx)}
    if edge.guard is not None:
        jani_edge["guard"] = {"exp": _dump_prop(edge.guard, ctx)}
    return jani_edge


def _dump_automaton(automaton: model.Automaton, ctx: JANIContext) -> JSON:
    jani_automaton: _JANIMap = {
        "name": ctx.get_name(automaton),
        "x-momba-anonymous": automaton.name is None,
        "variables": [
            _dump_var_decl(var_decl, ctx)
            for var_decl in automaton.scope.variable_declarations
        ],
        "locations": [_dump_location(loc, ctx) for loc in automaton.locations],
        "edges": [_dump_edge(edge, ctx) for edge in automaton.edges],
        "initial-locations": [ctx.get_name(loc) for loc in automaton.initial_locations],
    }
    if automaton.initial_restriction is not None:
        jani_automaton["restrict-initial"] = {
            "exp": _dump_prop(automaton.initial_restriction, ctx)
        }
    return jani_automaton


def _dump_action_pattern(
    pattern: t.Optional[model.ActionPattern], ctx: JANIContext
) -> JSON:
    if pattern is not None:
        assert (
            not pattern.arguments
        ), "exporting action patterns with arguments to Jani is currently not supported"
        # FIXME: add support for exporting patterns with arguments
        # if pattern.identifiers:
        #     ctx.require(ModelFeature.X_MOMBA_VALUE_PASSING)
        #     return {
        #         "name": pattern.action_type.name,
        #         "identifiers": list(pattern.identifiers),
        #     }
        # else:
        return pattern.action_type.name
    return None


def _dump_sync(
    instance_vector: t.Sequence[model.Instance],
    sync: model.Synchronization,
    ctx: JANIContext,
) -> JSON:
    jani_sync: _JANIMap = {
        "synchronise": [
            _dump_action_pattern(sync.vector.get(instance, None), ctx)
            for instance in instance_vector
        ],
    }
    if sync.result is not None:
        jani_sync["result"] = _dump_action_pattern(sync.result, ctx)
    return jani_sync


def _dump_system(network: model.Network, ctx: JANIContext) -> JSON:
    instances: t.Set[model.Instance] = set()
    for composition in network.system:
        instances |= composition.instances
    instance_vector = list(instances)
    synchronizations: t.Set[model.Synchronization] = set()
    for composition in network.system:
        synchronizations |= composition.synchronizations
    jani_elements: t.List[_JANIMap] = []
    for instance in instance_vector:
        jani_instance: _JANIMap = {"automaton": ctx.get_name(instance.automaton)}
        if instance.input_enable:
            jani_instance["input-enable"] = list(instance.input_enable)
        jani_elements.append(jani_instance)
    return {
        "elements": jani_elements,
        "syncs": [
            _dump_sync(instance_vector, synchronization, ctx)
            for synchronization in synchronizations
        ],
    }


def _dump_metadata(model_ctx: model.Context) -> _JANIMap:
    jani_metadata: _JANIMap = {}
    for field in {"version", "author", "description", "doi", "url"}:
        try:
            jani_metadata[field] = model_ctx.metadata[field]
        except KeyError:
            pass
    return jani_metadata


def _dump_action_type(action_type: model.ActionType, ctx: JANIContext) -> _JANIMap:
    jani_action: _JANIMap = {"name": action_type.name}
    jani_parameters: t.List[_JANIMap] = []
    for parameter in action_type.parameters:
        ctx.require(ModelFeature.X_MOMBA_VALUE_PASSING)
        jani_parameter: _JANIMap = {"type": _dump_type(parameter.typ, ctx)}
        if parameter.comment is not None:
            jani_parameter["comment"] = parameter.comment
        jani_parameters.append(jani_parameter)
    if jani_parameters:
        jani_action["parameters"] = jani_parameters
    if action_type.comment is not None:
        jani_action["comment"] = action_type.comment
    return jani_action


def dump_structure(
    network: model.Network, *, allow_momba_operators: bool = False
) -> JSON:
    ctx = JANIContext(allow_momba_operators=allow_momba_operators)
    jani_metadata: t.Dict[str, str] = {}
    if "name" in network.ctx.metadata:
        jani_metadata
    jani_model: _JANIMap = {
        "jani-version": 1,
        "x-generator": f"Momba (v{version})",
        "x-momba-release": version,
        "name": network.name or "A Momba Model",
        "x-momba-anonymous": network.name is None,
        "metadata": _dump_metadata(network.ctx),
        "x-momba-metadata": dict(network.ctx.metadata),
        "type": network.ctx.model_type.name.lower(),
        "variables": [
            _dump_var_decl(var_decl, ctx)
            for var_decl in network.ctx.global_scope.variable_declarations
        ],
        "constants": [
            _dump_const_decl(const_decl, ctx)
            for const_decl in network.ctx.global_scope.constant_declarations
        ],
        "actions": [
            _dump_action_type(action_type, ctx)
            for action_type in network.ctx.action_types
        ],
        "automata": [
            _dump_automaton(automaton, ctx) for automaton in network.ctx.automata
        ],
        "properties": [
            {"name": prop.name, "expression": _dump_prop(prop.prop, ctx)}
            for prop in network.ctx.properties
        ],
        "system": _dump_system(network, ctx),
        # important: has to be at the end, because we collect the features while dumping
        "features": [feature.value for feature in ctx.features],
    }
    if network.initial_restriction is not None:
        jani_model["restrict-initial"] = {
            "exp": _dump_prop(network.initial_restriction, ctx)
        }
    return jani_model


[docs]def dump_model( network: model.Network, *, indent: t.Optional[int] = None, allow_momba_operators: bool = False, ) -> bytes: """ Takes a Momba automata network and exports it to the JANI format. Arguments: network: The Momba automata network to export to JANI. indent: Indentation of the final JSON. Returns: The model in UTF-8 encoded JANI format. """ return json.dumps( dump_structure(network, allow_momba_operators=allow_momba_operators), indent=indent, ensure_ascii=False, ).encode("utf-8")