Source code for flameiq.schema.v1.models
"""FlameIQ Performance Schema — Version 1.
This module defines the canonical, versioned data contract for all FlameIQ
performance snapshots. Schema v1 is **stable and immutable** — fields will
never be removed or renamed. New optional fields may appear in minor patch
releases.
All classes use standard-library ``dataclasses`` only.
Zero external runtime dependencies in this module.
Schema version: 1
Stability: Production / Stable
"""
from __future__ import annotations
import uuid
from dataclasses import dataclass, field
from datetime import datetime, timezone
from enum import Enum
from typing import Any
# ---------------------------------------------------------------------------
# Enumerations
# ---------------------------------------------------------------------------
[docs]
class Environment(str, Enum):
"""The execution environment for a benchmark run."""
CI = "ci"
LOCAL = "local"
STAGING = "staging"
CUSTOM = "custom"
@classmethod
def _missing_(cls, value: object) -> Environment:
return cls.CUSTOM
# ---------------------------------------------------------------------------
# Sub-models
# ---------------------------------------------------------------------------
[docs]
@dataclass
class LatencyMetrics:
"""Latency measurements in **milliseconds**.
At least one field must be provided. ``p95`` is the primary regression
signal used by the comparison engine.
Args:
mean: Arithmetic mean latency (ms).
p50: Median latency (ms).
p95: 95th-percentile latency (ms). **Primary regression signal.**
p99: 99th-percentile latency (ms). Tail latency signal.
"""
mean: float | None = None
p50: float | None = None
p95: float | None = None
p99: float | None = None
[docs]
def __post_init__(self) -> None:
"""Validate that at least one latency metric is provided and all are non-negative."""
if all(v is None for v in (self.mean, self.p50, self.p95, self.p99)):
raise ValueError("LatencyMetrics requires at least one value (mean, p50, p95, or p99).")
for name, val in (
("mean", self.mean),
("p50", self.p50),
("p95", self.p95),
("p99", self.p99),
):
if val is not None and (not isinstance(val, int | float) or val < 0):
raise ValueError(
f"LatencyMetrics.{name} must be a non-negative number, got {val!r}"
)
[docs]
@dataclass
class Metrics:
"""All performance measurements for a single benchmark run.
At least one metric must be non-null/non-empty.
Args:
latency: Latency breakdown in milliseconds.
throughput: Operations or requests per second.
memory_mb: Peak memory usage in megabytes.
cpu_percent: Average CPU utilisation (0–100).
custom: User-defined numeric metrics. Keys are dotted names.
"""
latency: LatencyMetrics | None = None
throughput: float | None = None
memory_mb: float | None = None
cpu_percent: float | None = None
custom: dict[str, float] = field(default_factory=dict)
[docs]
def __post_init__(self) -> None:
"""Validate that at least one metric is provided and all numeric values are non-negative."""
has_std = any(
[
self.latency is not None,
self.throughput is not None,
self.memory_mb is not None,
self.cpu_percent is not None,
]
)
if not has_std and not self.custom:
raise ValueError(
"Metrics must contain at least one measurement "
"(latency, throughput, memory_mb, cpu_percent, or custom)."
)
[docs]
def flat(self) -> dict[str, float]:
"""Return a flat ``{key: value}`` dict of all non-null metrics.
Keys follow a dotted naming convention:
``latency.mean``, ``latency.p95``, ``throughput``, ``memory_mb``,
``cpu_percent``, ``custom.<name>``.
Returns:
Ordered dict of all non-null metric values.
"""
result: dict[str, float] = {}
if self.latency:
for attr in ("mean", "p50", "p95", "p99"):
v = getattr(self.latency, attr)
if v is not None:
result[f"latency.{attr}"] = float(v)
if self.throughput is not None:
result["throughput"] = float(self.throughput)
if self.memory_mb is not None:
result["memory_mb"] = float(self.memory_mb)
if self.cpu_percent is not None:
result["cpu_percent"] = float(self.cpu_percent)
for k, v in self.custom.items():
result[f"custom.{k}"] = float(v)
return result
[docs]
@dataclass
class SnapshotMetadata:
"""Contextual metadata attached to a :class:`PerformanceSnapshot`.
Args:
run_id: UUID4 uniquely identifying this run. Auto-generated.
commit: Git commit hash (short or full SHA).
branch: Git branch name.
timestamp: UTC datetime of the run. Auto-generated if omitted.
environment: Execution environment label.
tags: Arbitrary ``str → str`` metadata for user context.
"""
run_id: str = field(default_factory=lambda: str(uuid.uuid4()))
commit: str | None = None
branch: str | None = None
timestamp: datetime = field(default_factory=lambda: datetime.now(tz=timezone.utc))
environment: Environment = Environment.CI
tags: dict[str, str] = field(default_factory=dict)
# ---------------------------------------------------------------------------
# Top-level snapshot
# ---------------------------------------------------------------------------
[docs]
@dataclass
class PerformanceSnapshot:
"""A complete, versioned performance snapshot.
This is the primary data unit in FlameIQ. Every benchmark run produces
exactly one ``PerformanceSnapshot``. Snapshots are immutable in intent —
do not mutate after construction.
Args:
metrics: All performance measurements for this run.
schema_version: Must be ``1`` for v1 snapshots.
metadata: Run context (commit, branch, timestamp, etc.).
Example::
snapshot = PerformanceSnapshot(
metadata=SnapshotMetadata(commit="abc123", branch="main"),
metrics=Metrics(
latency=LatencyMetrics(mean=120.5, p95=180.0, p99=240.0),
throughput=950.2,
memory_mb=512.0,
),
)
"""
metrics: Metrics
schema_version: int = 1
metadata: SnapshotMetadata = field(default_factory=SnapshotMetadata)
[docs]
def __post_init__(self) -> None:
"""Validate that the snapshot conforms to the expected schema version."""
if self.schema_version != 1:
raise ValueError(
f"Unsupported schema_version: {self.schema_version!r}. "
"FlameIQ v1.0 supports schema version 1 only."
)
# ------------------------------------------------------------------
# Serialisation
# ------------------------------------------------------------------
[docs]
def to_dict(self) -> dict[str, Any]:
"""Serialise to a plain ``dict`` suitable for JSON serialisation.
Returns:
A ``dict`` that round-trips through :meth:`from_dict`.
"""
m = self.metadata
metrics_d: dict[str, Any] = {}
if self.metrics.latency:
lat = self.metrics.latency
metrics_d["latency"] = {
k: getattr(lat, k)
for k in ("mean", "p50", "p95", "p99")
if getattr(lat, k) is not None
}
if self.metrics.throughput is not None:
metrics_d["throughput"] = self.metrics.throughput
if self.metrics.memory_mb is not None:
metrics_d["memory_mb"] = self.metrics.memory_mb
if self.metrics.cpu_percent is not None:
metrics_d["cpu_percent"] = self.metrics.cpu_percent
if self.metrics.custom:
metrics_d["custom"] = dict(self.metrics.custom)
return {
"schema_version": self.schema_version,
"metadata": {
"run_id": m.run_id,
"commit": m.commit,
"branch": m.branch,
"timestamp": m.timestamp.isoformat(),
"environment": m.environment.value,
"tags": dict(m.tags),
},
"metrics": metrics_d,
}
[docs]
@classmethod
def from_dict(cls, data: dict[str, Any]) -> PerformanceSnapshot:
"""Deserialise a ``PerformanceSnapshot`` from a plain ``dict``.
Args:
data: A dict as produced by :meth:`to_dict`, or a compatible
JSON structure conforming to the FlameIQ v1 schema.
Returns:
A validated :class:`PerformanceSnapshot`.
Raises:
ValueError: If required fields are missing or invalid.
TypeError: If field types are wrong.
"""
schema_version = data.get("schema_version", 1)
if schema_version != 1:
raise ValueError(f"Unsupported schema_version: {schema_version!r}")
# --- metadata ---
raw_meta = data.get("metadata", {})
ts_raw = raw_meta.get("timestamp")
if ts_raw:
try:
ts = datetime.fromisoformat(ts_raw.replace("Z", "+00:00"))
except (ValueError, AttributeError):
ts = datetime.now(tz=timezone.utc)
else:
ts = datetime.now(tz=timezone.utc)
env_raw = raw_meta.get("environment", "ci")
try:
env = Environment(env_raw)
except ValueError:
env = Environment.CUSTOM
metadata = SnapshotMetadata(
run_id=raw_meta.get("run_id", str(uuid.uuid4())),
commit=raw_meta.get("commit"),
branch=raw_meta.get("branch"),
timestamp=ts,
environment=env,
tags=dict(raw_meta.get("tags", {})),
)
# --- metrics ---
raw_metrics = data.get("metrics")
if not raw_metrics:
raise ValueError("'metrics' is required and must not be empty.")
latency: LatencyMetrics | None = None
lat_raw = raw_metrics.get("latency")
if lat_raw:
latency = LatencyMetrics(
mean=lat_raw.get("mean"),
p50=lat_raw.get("p50"),
p95=lat_raw.get("p95"),
p99=lat_raw.get("p99"),
)
metrics = Metrics(
latency=latency,
throughput=raw_metrics.get("throughput"),
memory_mb=raw_metrics.get("memory_mb"),
cpu_percent=raw_metrics.get("cpu_percent"),
custom=dict(raw_metrics.get("custom", {})),
)
return cls(
schema_version=schema_version,
metadata=metadata,
metrics=metrics,
)