ArcFlow
Company
Managed Services
Markets
  • News
  • LOG IN
  • GET STARTED

OZ brings Visual Intelligence to physical venues, a managed edge layer that lets real-world environments see, understand, and act in real time.

Talk to us

ArcFlow

  • World Models
  • Sensors

Managed Services

  • OZ VI Venue 1
  • Case Studies

Markets

  • Sports
  • Broadcasting
  • Robotics

Company

  • About
  • Technology
  • Careers
  • Contact

Ready to see it live?

Talk to the OZ team about deploying at your venues, from a single pilot match to a full regional rollout.

Schedule a deployment review

© 2026 OZ. All rights reserved.

LinkedIn
ArcFlow Docs
Start
  • Quickstart
  • Installation
  • Bindings
  • Platforms
  • Get Started
Concepts
  • World Model
  • Graph Model
  • Evidence Model
  • Observations
  • Confidence & Provenance
  • Proof Artifacts & Gates
  • SQL vs GQL
  • Graph Patterns
  • Parameters
  • Query Results
  • Persistence & WAL
  • Snapshot-Pinned Reads
  • Error Handling
  • Execution Models
  • Causal Edges
  • Adapter Discipline
  • Time Decay
  • Layers
  • 1. Perception Lake
  • 2. World Graph
  • 3. Query Engine
  • 4. Live Surface
  • 5. Event Bus
  • 6. Behavior Engine
  • 7. Algorithm Library
WorldCypher
  • Overview
  • Statements
  • MATCH
  • WHERE
  • RETURN
  • OPTIONAL MATCH
  • CREATE
  • SET
  • MERGE
  • DELETE
  • REMOVE
  • Composition
  • WITH
  • UNION
  • UNWIND
  • CASE
  • Schema
  • Schema Overview
  • Indexes
  • Constraints
  • Functions
  • Built-in Functions
  • Aggregations
  • Procedures
  • Shortest Path
  • EXPLAIN
  • PROFILE
  • Temporal Queriesfacet
  • Spatial Queriesfacet
  • Algorithmsfacet
  • Triggers
Capabilities
  • Live Queries
  • Vector Search
  • Trusted RAG
  • Spatial Knowledge
  • Temporal
  • Behavior Graphs
  • Graph Algorithms
  • Skills
  • CREATE SKILL
  • PROCESS NODE
  • REPROCESS EDGES
  • Sync
  • Programs
  • GPU Acceleration
  • Agent-Native
  • MCP Server
  • Event Sourcing
  • Intent Relay
  • Event Bus
Use Cases
  • Agent Tooling
  • Trusted RAG
  • Knowledge Management
  • Behavior Graphs
  • Autonomous Systems
  • Physical AI
  • Digital Twins
  • Robotics & Perception
  • Sports Analytics
  • Grounded Neural Objects
  • Fraud Detection
Walkthroughs
    Guides
  • Agent Integration
  • Building a World Model
  • Modeling a Social Graph
  • Build a RAG Pipeline
  • Using Skills
  • Behavior Graphs
  • Swarm & Multi-Agent
  • Fleet Coordination
  • Migrate from Cypher / Neo4j
  • From SQL to GQL
  • Filesystem Workspace
  • Data Quality
  • Code Intelligence
  • Scale Patterns
  • v0.7 → v0.8 Lakehouse Fast-Path
  • Tutorials
  • Knowledge Graph
  • Entity Linking
  • Vector Search
  • Graph Algorithms
  • Recipes
  • CRUD
  • Multi-MATCH
  • MERGE (Upsert)
  • Full-Text Search
  • Batch Projection
  • Multi-Source Observation
  • Sports Analytics
Operations
  • CLI
  • REPL Commands
  • Snapshot & Restore
  • Filesystem Projection
  • Server Modes & PG Wire
  • Persistence (ops)
  • Import & Export
  • Deployment
  • Daemon (UDS)
  • Architecture
  • Engine Architecture
  • Cloud Architecture
  • Sync Protocol (Deep Dive)
  • World Graph Substrate (Preview)
Reference
  • TypeScript API
  • Glossary
  • Naming & Domain Map
  • Data Types
  • Operators
  • Error Codes
  • GQL Reference
  • Known Issues
  • Versioning
  • Licensing
  • Conformance
  • GQL Conformance
  • openCypher TCK
  • Extension Regressions
GQL Conformance
  • Conformance Dashboard
  • openCypher TCK Results
  • Extension Regressions
GQL Features
  • MATCH Basic
  • CREATE Nodes Edges
  • SET REMOVE Properties
  • DELETE Detach DELETE
  • RETURN WITH WHERE
  • Order BY Limit Skip
  • Order BY Nulls First Last
  • UNWIND
  • Aggregate Functions
  • OPTIONAL MATCH
  • Variable Length Paths
  • Label OR AND NOT Expressions
  • Label Wildcard
  • Quantified Path Sugar
  • Path Modes Walk Trail Simple Acyclic
  • Shortest Path Variants
  • IS Labeled Predicate
  • Element ID Function
  • IS Type Predicate
  • Binary Literals
  • Line Comments Solidus
  • Line Comments Minus
  • GQLSTATUS Result Codes
  • GQL Error Code Mapping
  • Transaction Control Syntax
  • SET Session
  • Conditional Execution WHEN THEN ELSE
  • RETURN NEXT Pipeline
  • Primary Key Constraint
  • Unique Constraint
  • Deterministic MERGE Via PK
  • Undirected Edge MATCH
  • Cast Type Conversion
  • GQL Directories
  • Multiple Labels Per Node
  • GQL Flagger
  • NEXT Linear Composition
  • Cardinality Function
  • INT64 BIGINT Type Names
  • FLOAT64 Double Type Names
  • Log10 Log2 Functions
  • Trim Leading Trailing Both
  • FILTER Clause
  • LET Statement
  • Group BY Explicit
  • EXCEPT SET Operations
  • INTERSECT SET Operations
  • ALL Different Predicate
  • Same Predicate
  • Property Exists Function
  • Path Variable Binding
  • USE Graph Clause
  • FOR IN List
  • Typed Temporal Literals
  • Session SET Value Params
  • Typed List Annotations
  • arcflow.cosine() function
  • arcflow.embed() function
  • arcflow.similar() procedure
  • arcflow.graphrag() procedure
ArcFlow Extensions
  • LIVE Queries
  • Triggered Write-Back Views
  • Evidence Algebra
  • Relationship Skills
  • AI Function Namespace
  • Graph Embedding Algorithms
  • ASOF JOIN
  • Durable Workflows
  • Incremental Z-Set Engine
  • GPU GraphBLAS
  • Triggers
  • HNSW Vector Index
  • Extensions Moat

Event bus

Canonical surface for the Event Bus layer.

ArcFlow ships an in-process publish/subscribe + request-reply substrate that lives in the same memory space as the graph store. No socket, no broker hop, no second store to operate. The wire-equivalent semantics mirror core NATS so services migrating from NATS or Memgraph+NATS keep their mental model.

The lower-level Cypher surface (CREATE TOPIC, PUBLISH, retention, cursor replay) is covered on the Live queries page. This page covers the NATS-style extensions: wildcard subscriptions, durable consumers, queue groups, and synchronous request-reply.

Topic wildcards#

A subscriber names a pattern instead of a literal topic. The engine fans out events from every existing and future topic whose name matches.

Semantics are identical to NATS:

TokenMeaning
*Matches exactly one segment (no dots)
>Matches one or more trailing segments. Only valid as the final token.

Examples:

PatternMatchesDoes not match
all_tracks.*all_tracks.cam1, all_tracks.cam_27all_tracks, all_tracks.cam1.detail
oz.>oz.x, oz.x.y.zoz, arcflow.x
a.*.ca.b.c, a.zz.ca.c, a.b.b.c
>every non-empty topic—

Subscribe a stored query to a pattern#

SUBSCRIBE PATTERN 'all_tracks.*' AS tracks_view
  QUERY 'MATCH (t:Track) RETURN t.id, t.position'

The subscription fires for every event published to any topic that matches all_tracks.* — current and future. Future topics that match are picked up automatically without re-subscribing.

Pattern-following durable consumers#

A pattern consumer receives every event from every matching topic and tracks an independent offset per topic, so acks on all_tracks.cam1 don't advance the cursor on all_tracks.cam2.

from arcflow import ArcFlow
from arcflow.bus import Bus, START_FROM_BEGINNING
 
db = ArcFlow()
bus = Bus(db)
 
consumer = bus.register_pattern_consumer(
    "tracks",
    "all_tracks.*",
    start=START_FROM_BEGINNING,
)
 
# Future topics that match auto-attach at LatestOnly:
bus.create_topic("all_tracks.cam99")
bus.publish("all_tracks.cam99", {"id": 42, "x": 1.0})
 
# Drain pending events across every matching topic. ack=True (default)
# advances each topic's offset to the last seq seen.
for topic, event in consumer.drain():
    handle(topic, event.data)

Resolving the current match set#

topics = bus.topics_matching_pattern("all_tracks.*")
# -> ['all_tracks.cam1', 'all_tracks.cam2', ...] sorted

The matcher is a pure function in arcflow_core::store::TopicPattern and runs in O(segments) per topic. Pattern subscriptions live in their own registry; the publish hot path skips pattern evaluation when no patterns are active.

WAL durability#

Pattern subscriptions and pattern consumers are journaled via the WAL ops SubscribePattern, RegisterPatternConsumer, and DropPatternConsumer. Post-restart, the registrations replay and the per-topic durable consumers reconstruct from the regular RegisterConsumer journal — no separate "pattern replay" path.

Synchronous request-reply#

A publisher sends a request to a topic and waits — with a timeout — for a single matching reply. The engine generates a correlation ID, allocates a per-request reply topic, and routes the reply back when the handler calls reply(correlation_id, …).

Python sync flow#

from arcflow import ArcFlow
from arcflow.bus import Bus
 
db = ArcFlow()
bus = Bus(db)
 
# In the requester:
reply = bus.request(
    "video.query.status",
    {"query": "active_cameras"},
    timeout_ms=5_000,
)
print(reply.data["count"])

bus.request(...) blocks until either the reply arrives or timeout_ms expires. On timeout it raises ArcFlowError with code REQUEST_TIMEOUT.

Decoupled handle + reply#

When the requester and the handler share the same process but live on different threads, drive the call manually:

handle = bus.request_async("video.query.status", {"q": "n_cams"}, timeout_ms=2_000)
# ... handler reads the request event from the topic ...
bus.reply(handle.correlation_id, {"count": 4})
# ... requester thread takes the reply ...
reply = bus.wait_for_reply(handle.correlation_id, deadline_ms=handle.deadline_ms)

Request envelope#

The published request event carries two engine-controlled headers (both win on collision with caller-supplied headers, so a malicious caller cannot redirect the reply):

HeaderMeaning
_correlation_idEngine-allocated _REQ.<n>
_reply_toPer-request reply topic _REPLY.<correlation_id>

The reply event is published to the reply topic so callers who prefer the durable-consumer attach pattern over poll_reply can register a consumer against it.

Sweep + cancel#

The engine does not run its own timer. Long-lived services tick the sweep on their own cadence:

expired = bus.sweep_expired_requests()
# -> list of correlation_ids whose deadline elapsed; each is set to
# TimedOut so a late reply returns REQUEST_TIMED_OUT instead of
# REQUEST_NOT_FOUND.

Explicit cancellation drops a pending entry without resolving it:

bus.cancel_request(handle.correlation_id)

Pending requests are not WAL-journaled. A process restart drops in-flight requests (NATS-equivalent semantics); callers re-issue.

C ABI#

The pub/sub primitives are exported as C functions for any language binding:

FunctionPurpose
arcflow_pubsub_create_topic / _delete_topic / _topic_countTopic CRUD
arcflow_pubsub_publish (data + headers JSON) / _events_afterPublish + read
arcflow_pubsub_register_consumer / _drop_consumer / _ack / _pending_forDurable consumers
arcflow_pubsub_subscribe_pattern / _register_pattern_consumer / _drop_pattern_consumer / _topics_matching_patternWildcards
arcflow_pubsub_request / _reply / _poll_reply / _wait_for_reply / _cancel_request / _sweep_expired_requestsRequest-reply
arcflow_pubsub_string_freeFree heap-allocated JSON returns

JSON crosses the boundary as UTF-8. All write entry points route through MVCC's begin → commit so concurrent writers serialise and readers stay lock-free.

Compared to NATS / NATS+JetStream#

ConcernArcFlow event bus
TransportIn-process (no socket; FFI call)
Wildcards* (single segment) + > (tail) — same as NATS
Queue groupsregister_consumer with shared name (NATS queue-group equivalent)
Request-replyrequest + reply + correlation ID + reply topic
HeadersFirst-class, NATS 2.2+ shape
DurabilityPer-topic retention policy + WAL replay
JetStreamNot implemented (per-topic retention plus durable consumers cover the common case; durable streams + replay-from-arbitrary-offset are follow-up)
Multi-processOne writer + multi-reader via arcflow-daemon over UDS (see Daemon)
← PreviousIntent RelayNext →Agent Tooling