Draft for initial implementation in iris-vector-graph
Implement temporal property-graph ingest and query in iris-vector-graph, using the existing ^KG model and Cypher layer as the primary runtime abstraction.
The package is positioned as an IRIS-based graph/vector/text engine with SQL, openCypher, and GraphQL over a unified graph substrate, so temporal edges should extend that substrate rather than create a parallel one.
Reference:
This spec covers:
- Dataset targets for first-wave implementation
- Canonical interchange schema
- Conversion rules from observability datasets into the canonical graph model
^KGstorage and temporal indexing extensions- Python and ObjectScript APIs
- Cypher query behavior and procedure surface
- Import/export formats
- Acceptance criteria and implementation order
This spec does not cover:
- Full temporal Cypher language redesign
- TTL / expiry policies
- Streaming Kafka / Pulsar pipelines
- Long-term retention compaction
- Full RDF-first ingest for observability workloads
Why:
- Explicit metrics, logs, and traces RCA benchmark for microservices
- Suitable for service-call graph, incident graph, and temporal path queries
- Good primary target for validating root-cause-oriented temporal graph patterns
- Zenodo: https://zenodo.org/records/6979726
- Train-Ticket system: https://github.com/FudanSELab/train-ticket/
Why:
- Includes logs, Jaeger traces, and Prometheus KPI data
- Smaller and easier first ingest target than RCAEval full benchmark family
- Good first integration target for temporal service-call and KPI graphs
Why:
- Trace-centric RCA workflow
- Clean fit for span graph and service-call temporal graph derivation
- Root repo: https://github.com/alibaba/clusterdata
- README: https://github.com/alibaba/clusterdata/blob/master/cluster-trace-microservices-v2021/README.md
Why:
- Production-scale microservice trace/runtime dataset
- Best scale validation for
CALLS_ATedges and time-window scans
- GitHub: https://github.com/Microsoft/cloud-monitoring-dataset
- README: https://github.com/Microsoft/cloud-monitoring-dataset/blob/master/README.md
Why:
- KPI anomaly corpus
- Good validation set for metric-oriented temporal edges and burst windows
- Zenodo: https://zenodo.org/records/14062900
- Paper / description: https://arxiv.org/pdf/2411.09047
Why:
- Extremely wide telemetry table
- Good for validating metric-heavy and bucket-heavy workloads
- GitHub: https://github.com/Azure/AzurePublicDataset
- README: https://github.com/Azure/AzurePublicDataset/blob/master/README.md
Why:
- Useful workload traces and infra traces
- Secondary target after service-centric observability ingestion is stable
Why:
- Good reference model for deriving service-to-service graphs from traces
- Useful as the semantic baseline for OTel / Jaeger trace adapters
The canonical internal model is a temporal property graph.
- Nodes have stable string IDs
- Nodes may have one or more labels
- Relationships have a single type / predicate
- Temporal relationships carry a required integer Unix timestamp
- Rich attributes are stored as relationship properties
This is the first interchange format to implement.
{
"kind": "temporal_edge",
"source": "service:checkout",
"predicate": "CALLS_AT",
"target": "service:payment",
"timestamp": 1712000000,
"weight": 1.0,
"source_labels": ["Service"],
"target_labels": ["Service"],
"attrs": {
"dataset": "RCAEval_RE2_TT",
"case_id": "tt_case_001",
"trace_id": "abc123",
"span_id": "def456",
"latency_ms": 237,
"status_code": 500,
"error": true
}
}{
"kind": "node",
"id": "service:checkout",
"labels": ["Service"],
"properties": {
"name": "checkout",
"namespace": "train-ticket"
}
}{
"kind": "edge",
"source": "span:def456",
"predicate": "BELONGS_TO",
"target": "trace:abc123",
"weight": 1.0,
"attrs": {}
}ServiceHostPodTraceSpanMetricMetricSampleLogEventIncident
CALLS_ATCHILD_OF_ATEMITS_METRIC_ATOBSERVED_ATIMPACTS_ATHOSTS_ATBELONGS_TOHAS_ROOT_CAUSE
Use the OTel service graph model as the reference for service-call derivation.
Reference:
For a client/server or producer/consumer interaction:
- upsert
Service(caller) - upsert
Service(callee) - emit:
{
"kind": "temporal_edge",
"source": "service:<caller>",
"predicate": "CALLS_AT",
"target": "service:<callee>",
"timestamp": "<span_start_epoch_s>",
"weight": 1.0,
"attrs": {
"trace_id": "<trace_id>",
"span_id": "<span_id>",
"latency_ms": "<duration_ms>",
"status_code": "<status_code>",
"error": "<bool>"
}
}Span(parent) -[CHILD_OF_AT]-> Span(child)at child start timeSpan -[BELONGS_TO]-> Trace
For each metric point:
- create or upsert source node, usually
ServiceorHost - either:
- emit
EMITS_METRIC_ATto aMetricSamplenode, or - keep metric details in edge attrs only
- emit
Recommended first pass:
{
"kind": "temporal_edge",
"source": "service:checkout",
"predicate": "EMITS_METRIC_AT",
"target": "metric:latency_ms",
"timestamp": 1712000000,
"weight": 237.0,
"attrs": {
"metric_name": "latency_ms",
"aggregation": "avg",
"dataset": "MicrosoftCloudMonitoring"
}
}For each log event:
{
"kind": "temporal_edge",
"source": "log:<event_id>",
"predicate": "OBSERVED_AT",
"target": "service:<service_name>",
"timestamp": 1712000000,
"weight": 1.0,
"attrs": {
"level": "ERROR",
"template": "...",
"trace_id": "abc123"
}
}For a labeled RCA case:
{
"kind": "temporal_edge",
"source": "incident:<case_id>",
"predicate": "IMPACTS_AT",
"target": "service:<service_name>",
"timestamp": 1712000000,
"weight": 1.0,
"attrs": {
"fault_type": "resource_exhaustion",
"dataset": "RCAEval_RE2_TT"
}
}Optionally:
{
"kind": "edge",
"source": "incident:<case_id>",
"predicate": "HAS_ROOT_CAUSE",
"target": "service:<root_service>",
"weight": 1.0,
"attrs": {}
}Extend the existing ^KG("out"/"in"/"deg"/"label"/"prop") pattern with temporal subscripts.
^KG("out", source, predicate, target) = weight
^KG("in", target, predicate, source) = weight
^KG("tout", ts, source, predicate, target) = weight
^KG("tin", ts, target, predicate, source) = weight
^KG("bucket", bucket_id, source) = ""
^KG("bucketCount", bucket_id, source) = count
^KG("label", label, node) = ""
^KG("prop", node, key) = value
^KG("edgeprop", ts, source, predicate, target, key) = valuebucket_id = floor(ts / bucket_size_seconds)- rich attributes belong in
edgeprop, not intoutortin bucketCountis strongly recommended because it makes velocity queries O(1) per bucket rather than requiring bucket membership scans
Primary operational format.
IRISGraphEngine.import_graph_ndjson(path, upsert_nodes=True, batch_size=10000)
IRISGraphEngine.import_temporal_edges_ndjson(path, batch_size=10000)IRISGraphEngine.export_graph_ndjson(path)
IRISGraphEngine.export_temporal_edges_ndjson(path, start=None, end=None, predicate=None)Best compatibility with property-graph tooling and Neo4j-style bulk loaders.
Reference:
id:ID,labels:LABEL,name,namespace
service:checkout,Service,checkout,train-ticket:START_ID,:END_ID,:TYPE,ts:long,weight:double,latency_ms:long,status_code:long,error:boolean
service:checkout,service:payment,CALLS_AT,1712000000,1.0,237,500,trueIRISGraphEngine.import_graph_csv(nodes_path, edges_path, delimiter=",")
IRISGraphEngine.import_temporal_edges_csv(edges_path, delimiter=",")Implement export first, import later.
Reference:
IRISGraphEngine.export_graphml(path, include_temporal_edges=True, start=None, end=None)- node labels become a
labelsattribute, pipe-joined if multiple - edge predicate becomes
type tsstored as an edge attribute- edge attrs flattened as GraphML key/value attributes
def create_edge_temporal(
source: str,
predicate: str,
target: str,
timestamp: int | None = None,
weight: float = 1.0,
attrs: dict | None = None,
source_labels: list[str] | None = None,
target_labels: list[str] | None = None,
) -> None:
...def bulk_create_edges_temporal(
batch: list[dict],
batch_size: int = 10000,
) -> dict:
...def get_edges_in_window(
source: str | None = None,
target: str | None = None,
predicate: str | None = None,
start: int | None = None,
end: int | None = None,
direction: str = "out",
) -> list[dict]:
...def get_edge_velocity(
node_id: str,
window_seconds: int = 60,
predicate: str | None = None,
now: int | None = None,
) -> int:
...def find_burst_nodes(
label: str | None = None,
predicate: str | None = None,
window_seconds: int = 60,
threshold: int = 50,
now: int | None = None,
) -> list[dict]:
...def import_graph_ndjson(path: str, upsert_nodes: bool = True, batch_size: int = 10000) -> dict:
...
def import_temporal_edges_ndjson(path: str, batch_size: int = 10000) -> dict:
...
def export_graph_ndjson(path: str) -> dict:
...
def export_temporal_edges_ndjson(
path: str,
start: int | None = None,
end: int | None = None,
predicate: str | None = None,
) -> dict:
...
def import_graph_csv(nodes_path: str, edges_path: str, delimiter: str = ",") -> dict:
...
def import_temporal_edges_csv(edges_path: str, delimiter: str = ",") -> dict:
...
def export_graphml(
path: str,
include_temporal_edges: bool = True,
start: int | None = None,
end: int | None = None,
) -> dict:
...Class Graph.KG.TemporalIndex Extends %RegisteredObject
{
ClassMethod CreateEdge(
source As %String,
predicate As %String,
target As %String,
timestamp As %BigInt = "",
weight As %Double = 1.0,
attrs As %DynamicObject = ""
) As %Status
ClassMethod BulkInsert(batchJSON As %String) As %Integer
ClassMethod GetEdgesInWindow(
source As %String = "",
target As %String = "",
predicate As %String = "",
startTS As %BigInt = "",
endTS As %BigInt = "",
direction As %String = "out"
) As %DynamicArray
ClassMethod GetEdgeVelocity(
nodeId As %String,
windowSeconds As %Integer = 60,
predicate As %String = "",
nowTS As %BigInt = ""
) As %Integer
ClassMethod FindBurstNodes(
label As %String = "",
predicate As %String = "",
windowSeconds As %Integer = 60,
threshold As %Integer = 50,
nowTS As %BigInt = ""
) As %DynamicArray
ClassMethod PurgeTemporalIndex() As %Status
}Start with normal edge properties.
MATCH (a:Service)-[r:CALLS_AT]->(b:Service)
WHERE r.ts >= $start AND r.ts < $end
RETURN a, b, r
ORDER BY r.ts ASC
LIMIT 100r.tsmaps to the temporal edge timestamp- results come from
^KG("tout")whenr.tsconstraints are present - if only topology is requested and no temporal predicate exists, existing
^KG("out")behavior remains valid r.latency_ms,r.status_code, and similar fields resolve from^KG("edgeprop", ...)
MATCH (a:Service)-[r:CALLS_AT]->(b:Service)
WHERE a.name = $service
AND r.ts >= $start
AND r.ts < $end
RETURN a.name AS source, b.name AS target, r.ts AS ts, r.latency_ms AS latency_ms
ORDER BY r.ts ASCMATCH (a:Service)-[r:CALLS_AT]->(b:Service)
WHERE b.name = $service
AND r.ts >= $start
AND r.ts < $end
RETURN a.name AS caller, b.name AS callee, r.ts AS ts, r.status_code AS status_code
ORDER BY r.ts ASCMATCH (a:Service)-[r:CALLS_AT]->(b:Service)
WHERE r.ts >= $start
AND r.ts < $end
AND r.error = true
RETURN a.name, b.name, r.ts, r.status_code
ORDER BY r.ts ASCMATCH (i:Incident)-[x:IMPACTS_AT]->(s:Service)-[r:CALLS_AT]->(t:Service)
WHERE i.id = $case_id
AND r.ts >= $start
AND r.ts < $end
RETURN i.id, s.name, t.name, r.ts
ORDER BY r.ts ASCMATCH p = (a:Service)-[:CALLS_AT*1..3]->(b:Service)
WHERE a.name = $source
AND b.name = $target
AND ALL(rel IN relationships(p) WHERE rel.ts >= $start AND rel.ts < $end)
RETURN p
LIMIT 20MATCH p = (a:Service)-[:CALLS_AT*1..4]->(b:Service)
WHERE a.name = $source
AND b.name = $target
AND ALL(rel IN relationships(p) WHERE rel.ts >= $start AND rel.ts < $end)
AND ALL(i IN range(0, size(relationships(p)) - 2)
WHERE relationships(p)[i].ts <= relationships(p)[i+1].ts)
RETURN p
LIMIT 20MATCH p = (a:Service)-[:CALLS_AT*1..4]->(b:Service)
WHERE a.name = $source
AND b.name = $target
AND ALL(i IN range(0, size(relationships(p)) - 2)
WHERE relationships(p)[i+1].ts - relationships(p)[i].ts <= 60)
RETURN p
LIMIT 20Add procedures before extending full Cypher syntax deeply.
CALL ivg.temporal.window($source, $predicate, $start, $end)
YIELD source, target, ts, weight, attrs
RETURN source, target, ts, attrs
ORDER BY tsCALL ivg.temporal.velocity($node_id, 60, $predicate)
YIELD node, velocity
RETURN node, velocityCALL ivg.temporal.bursts("Service", "CALLS_AT", 60, 50)
YIELD node, velocity
RETURN node, velocity
ORDER BY velocity DESCCALL ivg.temporal.inbound($target, "CALLS_AT", $start, $end)
YIELD source, target, ts, attrs
RETURN source, target, ts, attrs
ORDER BY ts- traces →
CALLS_AT,CHILD_OF_AT,BELONGS_TO - metrics →
EMITS_METRIC_AT - logs →
OBSERVED_AT - fault labels →
IMPACTS_AT, optionalHAS_ROOT_CAUSE
- service dependency / call runtime records →
CALLS_AT - use attrs for response time, call rate, and errors
- source node is service or host
- target is metric id
- emit
EMITS_METRIC_AT - weight carries scalar value
{"kind":"node","id":"service:checkout","labels":["Service"],"properties":{"name":"checkout"}}
{"kind":"node","id":"service:payment","labels":["Service"],"properties":{"name":"payment"}}
{"kind":"temporal_edge","source":"service:checkout","predicate":"CALLS_AT","target":"service:payment","timestamp":1712000000,"weight":1.0,"source_labels":["Service"],"target_labels":["Service"],"attrs":{"latency_ms":237,"status_code":500,"error":true}}id:ID,labels:LABEL,name
service:checkout,Service,checkout
service:payment,Service,payment:START_ID,:END_ID,:TYPE,ts:long,weight:double,latency_ms:long,status_code:long,error:boolean
service:checkout,service:payment,CALLS_AT,1712000000,1.0,237,500,true- create one temporal edge and verify presence in
out,in,tout,tin - create same
(source, predicate, target)at different timestamps and verify both timestamps exist - create same
(source, predicate, target, timestamp)twice and verify second write is idempotent - verify node labels and properties are upserted correctly
- verify edge attrs land in
edgeprop
- get all outbound calls from one service in a 5-minute window
- get all inbound calls to one service in a 5-minute window
- empty-window query returns empty result with no error
- exact bucket count for one node over one window
- threshold burst query returns only expected nodes
- velocity with predicate filter behaves correctly
- edge property filter on
r.ts - edge property projection from
edgeprop - incident-local neighborhood query
- procedure calls for
window,velocity, andbursts
- NDJSON round-trip preserves node ids, labels, edge types, timestamps, and attrs
- CSV export/import round-trip preserves graph structure
- GraphML export contains edge timestamps and flattened attrs
- canonical NDJSON schema implemented
- CSV nodes/edges format implemented
- per-dataset adapters for RCAEval and Train-Ticket implemented first
create_edge_temporal()writes toout,in,tout,tin,bucket, andbucketCount
- Cypher edge-property filtering on
r.tsworks - procedure APIs for
window,velocity, andburstswork - phase-2 bounded temporal path support works
bulk_create_edges_temporal(100K)under 2 seconds on standard IRIS hardware- 1-minute window query on million-edge graph under 10 ms
- burst detection on bucket counts under 100 ms
- canonical NDJSON schema
Graph.KG.TemporalIndexplus Python wrappers- RCAEval and Train-Ticket adapters
- Cypher
r.tsfiltering - temporal procedures
- CSV import/export
- GraphML export
- Alibaba-scale validation
- phase-2 temporal path semantics
The implementation strategy is:
- use
iris-vector-graphfirst - extend
^KGrather than creating a parallel temporal store - make NDJSON the canonical operational interchange format
- support CSV for bulk graph exchange
- support GraphML export for tool interoperability
- keep Cypher as the primary query language
- layer temporal procedures first, then deeper temporal path semantics