Event bus
Canonical surface for the Event Bus layer.
ArcFlow ships an in-process publish/subscribe + request-reply substrate that lives in the same memory space as the graph store. No socket, no broker hop, no second store to operate. The wire-equivalent semantics mirror core NATS so services migrating from NATS or Memgraph+NATS keep their mental model.
The lower-level Cypher surface (CREATE TOPIC, PUBLISH, retention,
cursor replay) is covered on the Live queries page.
This page covers the NATS-style extensions: wildcard subscriptions,
durable consumers, queue groups, and synchronous
request-reply.
Topic wildcards#
A subscriber names a pattern instead of a literal topic. The engine fans out events from every existing and future topic whose name matches.
Semantics are identical to NATS:
| Token | Meaning |
|---|---|
* | Matches exactly one segment (no dots) |
> | Matches one or more trailing segments. Only valid as the final token. |
Examples:
| Pattern | Matches | Does not match |
|---|---|---|
all_tracks.* | all_tracks.cam1, all_tracks.cam_27 | all_tracks, all_tracks.cam1.detail |
oz.> | oz.x, oz.x.y.z | oz, arcflow.x |
a.*.c | a.b.c, a.zz.c | a.c, a.b.b.c |
> | every non-empty topic | — |
Subscribe a stored query to a pattern#
SUBSCRIBE PATTERN 'all_tracks.*' AS tracks_view
QUERY 'MATCH (t:Track) RETURN t.id, t.position'The subscription fires for every event published to any topic that
matches all_tracks.* — current and future. Future topics that match
are picked up automatically without re-subscribing.
Pattern-following durable consumers#
A pattern consumer receives every event from every matching topic
and tracks an independent offset per topic, so acks on all_tracks.cam1
don't advance the cursor on all_tracks.cam2.
from arcflow import ArcFlow
from arcflow.bus import Bus, START_FROM_BEGINNING
db = ArcFlow()
bus = Bus(db)
consumer = bus.register_pattern_consumer(
"tracks",
"all_tracks.*",
start=START_FROM_BEGINNING,
)
# Future topics that match auto-attach at LatestOnly:
bus.create_topic("all_tracks.cam99")
bus.publish("all_tracks.cam99", {"id": 42, "x": 1.0})
# Drain pending events across every matching topic. ack=True (default)
# advances each topic's offset to the last seq seen.
for topic, event in consumer.drain():
handle(topic, event.data)Resolving the current match set#
topics = bus.topics_matching_pattern("all_tracks.*")
# -> ['all_tracks.cam1', 'all_tracks.cam2', ...] sortedThe matcher is a pure function in arcflow_core::store::TopicPattern
and runs in O(segments) per topic. Pattern subscriptions live in their
own registry; the publish hot path skips pattern evaluation when no
patterns are active.
WAL durability#
Pattern subscriptions and pattern consumers are journaled via the WAL
ops SubscribePattern, RegisterPatternConsumer, and
DropPatternConsumer. Post-restart, the registrations replay and the
per-topic durable consumers reconstruct from the regular
RegisterConsumer journal — no separate "pattern replay" path.
Synchronous request-reply#
A publisher sends a request to a topic and waits — with a timeout —
for a single matching reply. The engine generates a correlation ID,
allocates a per-request reply topic, and routes the reply back when
the handler calls reply(correlation_id, …).
Python sync flow#
from arcflow import ArcFlow
from arcflow.bus import Bus
db = ArcFlow()
bus = Bus(db)
# In the requester:
reply = bus.request(
"video.query.status",
{"query": "active_cameras"},
timeout_ms=5_000,
)
print(reply.data["count"])bus.request(...) blocks until either the reply arrives or
timeout_ms expires. On timeout it raises ArcFlowError with code
REQUEST_TIMEOUT.
Decoupled handle + reply#
When the requester and the handler share the same process but live on different threads, drive the call manually:
handle = bus.request_async("video.query.status", {"q": "n_cams"}, timeout_ms=2_000)
# ... handler reads the request event from the topic ...
bus.reply(handle.correlation_id, {"count": 4})
# ... requester thread takes the reply ...
reply = bus.wait_for_reply(handle.correlation_id, deadline_ms=handle.deadline_ms)Request envelope#
The published request event carries two engine-controlled headers (both win on collision with caller-supplied headers, so a malicious caller cannot redirect the reply):
| Header | Meaning |
|---|---|
_correlation_id | Engine-allocated _REQ.<n> |
_reply_to | Per-request reply topic _REPLY.<correlation_id> |
The reply event is published to the reply topic so callers who
prefer the durable-consumer attach pattern over poll_reply can
register a consumer against it.
Sweep + cancel#
The engine does not run its own timer. Long-lived services tick the sweep on their own cadence:
expired = bus.sweep_expired_requests()
# -> list of correlation_ids whose deadline elapsed; each is set to
# TimedOut so a late reply returns REQUEST_TIMED_OUT instead of
# REQUEST_NOT_FOUND.Explicit cancellation drops a pending entry without resolving it:
bus.cancel_request(handle.correlation_id)Pending requests are not WAL-journaled. A process restart drops in-flight requests (NATS-equivalent semantics); callers re-issue.
C ABI#
The pub/sub primitives are exported as C functions for any language binding:
| Function | Purpose |
|---|---|
arcflow_pubsub_create_topic / _delete_topic / _topic_count | Topic CRUD |
arcflow_pubsub_publish (data + headers JSON) / _events_after | Publish + read |
arcflow_pubsub_register_consumer / _drop_consumer / _ack / _pending_for | Durable consumers |
arcflow_pubsub_subscribe_pattern / _register_pattern_consumer / _drop_pattern_consumer / _topics_matching_pattern | Wildcards |
arcflow_pubsub_request / _reply / _poll_reply / _wait_for_reply / _cancel_request / _sweep_expired_requests | Request-reply |
arcflow_pubsub_string_free | Free heap-allocated JSON returns |
JSON crosses the boundary as UTF-8. All write entry points route
through MVCC's begin → commit so concurrent writers serialise and
readers stay lock-free.
Compared to NATS / NATS+JetStream#
| Concern | ArcFlow event bus |
|---|---|
| Transport | In-process (no socket; FFI call) |
| Wildcards | * (single segment) + > (tail) — same as NATS |
| Queue groups | register_consumer with shared name (NATS queue-group equivalent) |
| Request-reply | request + reply + correlation ID + reply topic |
| Headers | First-class, NATS 2.2+ shape |
| Durability | Per-topic retention policy + WAL replay |
| JetStream | Not implemented (per-topic retention plus durable consumers cover the common case; durable streams + replay-from-arbitrary-offset are follow-up) |
| Multi-process | One writer + multi-reader via arcflow-daemon over UDS (see Daemon) |