Action Flow Latency Analysis Labs
Experimental feature
This example uses experimental unit conversion functionality.
Enable with experimental_quantities=True.
This example computes the total end-to-end latency of a sequential action pipeline by
following the succession chain from start and summing the latency attribute on
each step. It uses the same DataPipeline model as Action Flow Port Connectivity
and Action Flow Type Compatibility.
Each action definition in the model carries a attribute latency : DurationValue = N
[min] using SI units. The script evaluates these attributes using the Syside
expression compiler.
Concepts Used
API |
Purpose |
|---|---|
Links adjacent steps in an action sequence |
|
Endpoints of a succession ( |
|
Evaluates an attribute expression within a given scope, returning a numeric value |
|
Attributes defined directly on an action definition |
Example Model
package DataPipeline {
private import ISQBase::DurationValue;
private import SI::min;
item def RawData;
item def CleanData;
item def AnalysisResult;
item def Report;
item def Tag;
action def Ingest {
doc /* Read raw data from a source. */
out item raw : RawData;
attribute latency : DurationValue = 2 [min];
}
action def Clean {
doc /* Remove noise and normalize the raw data. */
in item raw : RawData;
out item clean : CleanData;
attribute latency : DurationValue = 5 [min];
}
action def Analyze {
doc /* Run statistical analysis on clean data. */
in item clean : CleanData;
out item result : AnalysisResult;
attribute latency : DurationValue = 10 [min];
}
action def Publish {
doc /* Format and publish the result.
Expects a Report but receives an AnalysisResult — a type mismatch. */
in item report : Report;
in item tag : Tag; // intentionally left unconnected
attribute latency : DurationValue = 1 [min];
}
action def Pipeline {
first start;
then action ingest : Ingest;
then action clean : Clean;
then action analyze : Analyze;
then action publish : Publish;
flow from ingest.raw to clean.raw;
flow from clean.clean to analyze.clean;
// Type mismatch: AnalysisResult is not a subtype of Report
flow from analyze.result to publish.report;
// publish.tag is intentionally left without a flow
}
}
Example Script
import pathlib
import syside
EXAMPLE_DIR = pathlib.Path(__file__).parent
MODEL_FILE_PATH = EXAMPLE_DIR / "example_model.sysml"
STANDARD_LIBRARY = syside.Environment.get_default().lib
# DurationValue evaluates to seconds; divide to get minutes.
SECONDS_PER_MINUTE = 60.0
def get_action_attribute_value(
compiler: syside.Compiler,
action: syside.ActionUsage,
attribute_name: str,
) -> float | None:
"""Evaluate a named attribute on the action's project-defined definition.
Returns the numeric value in SI base units, or None if absent."""
definition = next(
(
d
for d in action.definitions.collect()
if isinstance(d, syside.ActionDefinition)
and d.document.document_tier == syside.DocumentTier.Project
),
None,
)
if definition is None:
return None
attribute = next(
(
a
for a in definition.owned_attributes.collect()
if a.name == attribute_name
),
None,
)
if attribute is None:
return None
value, _report = compiler.evaluate_feature(
feature=attribute,
scope=definition,
stdlib=STANDARD_LIBRARY,
experimental_quantities=True,
)
return float(value) if isinstance(value, (int, float)) else None
def ordered_actions(
model: syside.Model, action_def_name: str
) -> list[syside.ActionUsage]:
"""Return the sub-actions of a named action definition in execution
order by following the succession chain from ``start``."""
# Build a map from source qualified name to target ActionUsage.
succession_map: dict[str, syside.ActionUsage] = {}
start_qname: str | None = None
for succession in model.nodes(syside.SuccessionAsUsage):
owning_def = succession.owning_definition
if owning_def is None or owning_def.name != action_def_name:
continue
source = succession.source_feature
target_list = succession.target_features.collect()
if source is None or not target_list:
continue
if not isinstance(target_list[0], syside.ActionUsage):
continue
source_qname = str(source.qualified_name)
succession_map[source_qname] = target_list[0]
# The start node is the succession source named "start".
if source.declared_name == "start":
start_qname = source_qname
if start_qname is None:
return []
# Walk the chain from start to end.
result: list[syside.ActionUsage] = []
current = start_qname
while current in succession_map:
next_action = succession_map[current]
result.append(next_action)
current = str(next_action.qualified_name)
return result
def analyze_latency(model: syside.Model, action_def_name: str) -> None:
"""Print the latency of each step and the total end-to-end latency."""
actions = ordered_actions(model, action_def_name)
compiler = syside.Compiler()
total_seconds: float = 0
print(f"Latency breakdown for {action_def_name}:")
for action in actions:
seconds = get_action_attribute_value(compiler, action, "latency")
if seconds is not None:
total_seconds += seconds
minutes = seconds / SECONDS_PER_MINUTE
print(f" {action.name}: {minutes:g} min")
else:
print(f" {action.name}: (no latency attribute)")
total_minutes = total_seconds / SECONDS_PER_MINUTE
print(f"Total latency: {total_minutes:g} min")
def main() -> None:
(model, diagnostics) = syside.load_model([MODEL_FILE_PATH])
assert not diagnostics.contains_errors(warnings_as_errors=True)
analyze_latency(model, "Pipeline")
if __name__ == "__main__":
main()
Output
Latency breakdown for Pipeline:
ingest: 2 min
clean: 5 min
analyze: 10 min
publish: 1 min
Total latency: 18 min
Download
Download this example here.