Action Flow Latency Analysis Labs

Experimental feature

This example uses experimental unit conversion functionality. Enable with experimental_quantities=True.

This example computes the total end-to-end latency of a sequential action pipeline by following the succession chain from start and summing the latency attribute on each step. It uses the same DataPipeline model as Action Flow Port Connectivity and Action Flow Type Compatibility.

Each action definition in the model carries a attribute latency : DurationValue = N [min] using SI units. The script evaluates these attributes using the Syside expression compiler.

Concepts Used

API

Purpose

SuccessionAsUsage

Links adjacent steps in an action sequence

ConnectorAsUsage

Endpoints of a succession (source_feature / target_features)

Compiler.evaluate_feature()

Evaluates an attribute expression within a given scope, returning a numeric value

Definition.owned_attributes

Attributes defined directly on an action definition

Example Model

package DataPipeline {
    private import ISQBase::DurationValue;
    private import SI::min;

    item def RawData;
    item def CleanData;
    item def AnalysisResult;
    item def Report;
    item def Tag;

    action def Ingest {
        doc /* Read raw data from a source. */
        out item raw : RawData;
        attribute latency : DurationValue = 2 [min];
    }

    action def Clean {
        doc /* Remove noise and normalize the raw data. */
        in item raw : RawData;
        out item clean : CleanData;
        attribute latency : DurationValue = 5 [min];
    }

    action def Analyze {
        doc /* Run statistical analysis on clean data. */
        in item clean : CleanData;
        out item result : AnalysisResult;
        attribute latency : DurationValue = 10 [min];
    }

    action def Publish {
        doc /* Format and publish the result.
             Expects a Report but receives an AnalysisResult — a type mismatch. */
        in item report : Report;
        in item tag : Tag;  // intentionally left unconnected
        attribute latency : DurationValue = 1 [min];
    }

    action def Pipeline {
        first start;
        then action ingest : Ingest;
        then action clean : Clean;
        then action analyze : Analyze;
        then action publish : Publish;

        flow from ingest.raw to clean.raw;
        flow from clean.clean to analyze.clean;
        // Type mismatch: AnalysisResult is not a subtype of Report
        flow from analyze.result to publish.report;
        // publish.tag is intentionally left without a flow
    }
}

Example Script

import pathlib
import syside

EXAMPLE_DIR = pathlib.Path(__file__).parent
MODEL_FILE_PATH = EXAMPLE_DIR / "example_model.sysml"

STANDARD_LIBRARY = syside.Environment.get_default().lib

# DurationValue evaluates to seconds; divide to get minutes.
SECONDS_PER_MINUTE = 60.0


def get_action_attribute_value(
    compiler: syside.Compiler,
    action: syside.ActionUsage,
    attribute_name: str,
) -> float | None:
    """Evaluate a named attribute on the action's project-defined definition.
    Returns the numeric value in SI base units, or None if absent."""

    definition = next(
        (
            d
            for d in action.definitions.collect()
            if isinstance(d, syside.ActionDefinition)
            and d.document.document_tier == syside.DocumentTier.Project
        ),
        None,
    )
    if definition is None:
        return None

    attribute = next(
        (
            a
            for a in definition.owned_attributes.collect()
            if a.name == attribute_name
        ),
        None,
    )
    if attribute is None:
        return None

    value, _report = compiler.evaluate_feature(
        feature=attribute,
        scope=definition,
        stdlib=STANDARD_LIBRARY,
        experimental_quantities=True,
    )
    return float(value) if isinstance(value, (int, float)) else None


def ordered_actions(
    model: syside.Model, action_def_name: str
) -> list[syside.ActionUsage]:
    """Return the sub-actions of a named action definition in execution
    order by following the succession chain from ``start``."""

    # Build a map from source qualified name to target ActionUsage.
    succession_map: dict[str, syside.ActionUsage] = {}
    start_qname: str | None = None

    for succession in model.nodes(syside.SuccessionAsUsage):
        owning_def = succession.owning_definition
        if owning_def is None or owning_def.name != action_def_name:
            continue
        source = succession.source_feature
        target_list = succession.target_features.collect()
        if source is None or not target_list:
            continue
        if not isinstance(target_list[0], syside.ActionUsage):
            continue
        source_qname = str(source.qualified_name)
        succession_map[source_qname] = target_list[0]
        # The start node is the succession source named "start".
        if source.declared_name == "start":
            start_qname = source_qname

    if start_qname is None:
        return []

    # Walk the chain from start to end.
    result: list[syside.ActionUsage] = []
    current = start_qname
    while current in succession_map:
        next_action = succession_map[current]
        result.append(next_action)
        current = str(next_action.qualified_name)

    return result


def analyze_latency(model: syside.Model, action_def_name: str) -> None:
    """Print the latency of each step and the total end-to-end latency."""

    actions = ordered_actions(model, action_def_name)
    compiler = syside.Compiler()
    total_seconds: float = 0

    print(f"Latency breakdown for {action_def_name}:")
    for action in actions:
        seconds = get_action_attribute_value(compiler, action, "latency")
        if seconds is not None:
            total_seconds += seconds
            minutes = seconds / SECONDS_PER_MINUTE
            print(f"  {action.name}: {minutes:g} min")
        else:
            print(f"  {action.name}: (no latency attribute)")

    total_minutes = total_seconds / SECONDS_PER_MINUTE
    print(f"Total latency: {total_minutes:g} min")


def main() -> None:
    (model, diagnostics) = syside.load_model([MODEL_FILE_PATH])
    assert not diagnostics.contains_errors(warnings_as_errors=True)

    analyze_latency(model, "Pipeline")


if __name__ == "__main__":
    main()

Output

Latency breakdown for Pipeline:
  ingest: 2 min
  clean: 5 min
  analyze: 10 min
  publish: 1 min
Total latency: 18 min

Download

Download this example here.