ArcFlow
Company
Managed Services
Markets
  • News
  • LOG IN
  • GET STARTED

OZ brings Visual Intelligence to physical venues, a managed edge layer that lets real-world environments see, understand, and act in real time.

Talk to us

ArcFlow

  • World Models
  • Sensors

Managed Services

  • OZ VI Venue 1
  • Case Studies

Markets

  • Sports
  • Broadcasting
  • Robotics

Company

  • About
  • Technology
  • Careers
  • Contact

Ready to see it live?

Talk to the OZ team about deploying at your venues, from a single pilot match to a full regional rollout.

Schedule a deployment review

© 2026 OZ. All rights reserved.

LinkedIn
ArcFlow Docs
Start
  • Quickstart
  • Installation
  • Bindings
  • Platforms
  • Get Started
Concepts
  • World Model
  • Graph Model
  • Evidence Model
  • Observations
  • Confidence & Provenance
  • Proof Artifacts & Gates
  • SQL vs GQL
  • Graph Patterns
  • Parameters
  • Query Results
  • Persistence & WAL
  • Snapshot-Pinned Reads
  • Error Handling
  • Execution Models
  • Causal Edges
  • Adapter Discipline
  • Time Decay
  • Layers
  • 1. Perception Lake
  • 2. World Graph
  • 3. Query Engine
  • 4. Live Surface
  • 5. Event Bus
  • 6. Behavior Engine
  • 7. Algorithm Library
WorldCypher
  • Overview
  • Statements
  • MATCH
  • WHERE
  • RETURN
  • OPTIONAL MATCH
  • CREATE
  • SET
  • MERGE
  • DELETE
  • REMOVE
  • Composition
  • WITH
  • UNION
  • UNWIND
  • CASE
  • Schema
  • Schema Overview
  • Indexes
  • Constraints
  • Functions
  • Built-in Functions
  • Aggregations
  • Procedures
  • Shortest Path
  • EXPLAIN
  • PROFILE
  • Temporal Queriesfacet
  • Spatial Queriesfacet
  • Algorithmsfacet
  • Triggers
Capabilities
  • Live Queries
  • Vector Search
  • Trusted RAG
  • Spatial Knowledge
  • Temporal
  • Behavior Graphs
  • Graph Algorithms
  • Skills
  • CREATE SKILL
  • PROCESS NODE
  • REPROCESS EDGES
  • Sync
  • Programs
  • GPU Acceleration
  • Agent-Native
  • MCP Server
  • Event Sourcing
  • Intent Relay
  • Event Bus
Use Cases
  • Agent Tooling
  • Trusted RAG
  • Knowledge Management
  • Behavior Graphs
  • Autonomous Systems
  • Physical AI
  • Digital Twins
  • Robotics & Perception
  • Sports Analytics
  • Grounded Neural Objects
  • Fraud Detection
Walkthroughs
    Guides
  • Agent Integration
  • Building a World Model
  • Modeling a Social Graph
  • Build a RAG Pipeline
  • Using Skills
  • Behavior Graphs
  • Swarm & Multi-Agent
  • Fleet Coordination
  • Migrate from Cypher / Neo4j
  • From SQL to GQL
  • Filesystem Workspace
  • Data Quality
  • Code Intelligence
  • Scale Patterns
  • v0.7 → v0.8 Lakehouse Fast-Path
  • Tutorials
  • Knowledge Graph
  • Entity Linking
  • Vector Search
  • Graph Algorithms
  • Recipes
  • CRUD
  • Multi-MATCH
  • MERGE (Upsert)
  • Full-Text Search
  • Batch Projection
  • Multi-Source Observation
  • Sports Analytics
Operations
  • CLI
  • REPL Commands
  • Snapshot & Restore
  • Filesystem Projection
  • Server Modes & PG Wire
  • Persistence (ops)
  • Import & Export
  • Deployment
  • Daemon (UDS)
  • Architecture
  • Engine Architecture
  • Cloud Architecture
  • Sync Protocol (Deep Dive)
  • World Graph Substrate (Preview)
Reference
  • TypeScript API
  • Glossary
  • Naming & Domain Map
  • Data Types
  • Operators
  • Error Codes
  • GQL Reference
  • Known Issues
  • Versioning
  • Licensing
  • Conformance
  • GQL Conformance
  • openCypher TCK
  • Extension Regressions
GQL Conformance
  • Conformance Dashboard
  • openCypher TCK Results
  • Extension Regressions
GQL Features
  • MATCH Basic
  • CREATE Nodes Edges
  • SET REMOVE Properties
  • DELETE Detach DELETE
  • RETURN WITH WHERE
  • Order BY Limit Skip
  • Order BY Nulls First Last
  • UNWIND
  • Aggregate Functions
  • OPTIONAL MATCH
  • Variable Length Paths
  • Label OR AND NOT Expressions
  • Label Wildcard
  • Quantified Path Sugar
  • Path Modes Walk Trail Simple Acyclic
  • Shortest Path Variants
  • IS Labeled Predicate
  • Element ID Function
  • IS Type Predicate
  • Binary Literals
  • Line Comments Solidus
  • Line Comments Minus
  • GQLSTATUS Result Codes
  • GQL Error Code Mapping
  • Transaction Control Syntax
  • SET Session
  • Conditional Execution WHEN THEN ELSE
  • RETURN NEXT Pipeline
  • Primary Key Constraint
  • Unique Constraint
  • Deterministic MERGE Via PK
  • Undirected Edge MATCH
  • Cast Type Conversion
  • GQL Directories
  • Multiple Labels Per Node
  • GQL Flagger
  • NEXT Linear Composition
  • Cardinality Function
  • INT64 BIGINT Type Names
  • FLOAT64 Double Type Names
  • Log10 Log2 Functions
  • Trim Leading Trailing Both
  • FILTER Clause
  • LET Statement
  • Group BY Explicit
  • EXCEPT SET Operations
  • INTERSECT SET Operations
  • ALL Different Predicate
  • Same Predicate
  • Property Exists Function
  • Path Variable Binding
  • USE Graph Clause
  • FOR IN List
  • Typed Temporal Literals
  • Session SET Value Params
  • Typed List Annotations
  • arcflow.cosine() function
  • arcflow.embed() function
  • arcflow.similar() procedure
  • arcflow.graphrag() procedure
ArcFlow Extensions
  • LIVE Queries
  • Triggered Write-Back Views
  • Evidence Algebra
  • Relationship Skills
  • AI Function Namespace
  • Graph Embedding Algorithms
  • ASOF JOIN
  • Durable Workflows
  • Incremental Z-Set Engine
  • GPU GraphBLAS
  • Triggers
  • HNSW Vector Index
  • Extensions Moat

Daemon — UDS local IPC

ArcFlow ships two surfaces. They are not alternatives to each other; they're for different problems.

SurfaceUse it when
In-process (libarcflow.dylib / WASM / napi-rs)One process owns the graph and never shares it. Browser apps, single-process daemons, embedded SDK use.
Daemon mode (arcflow-daemon over UDS)Multiple processes on the same machine need to share one graph + pub/sub bus. TS shell + Python workers + Rust capture all talking to the same engine.

If you have more than one process that needs to read or write the same ArcFlow state, use the daemon over UDS. That's the recommended default for IPC-based deployments.

Why UDS#

Unix Domain Sockets are the right transport for same-machine IPC:

  • No TCP overhead. Loopback TCP carries network-stack cost per packet; UDS skips it.
  • No port allocation, no firewall surface. Socket lives at a filesystem path that the OS treats like any other file. POSIX permissions gate who can connect.
  • Sub-millisecond latency. Round-trip is a memcpy plus a context switch, ~3-5 µs on modern hardware.
  • Same-machine guarantee. A process that connects to your socket is on your host, full stop. No need to authenticate "is this the same machine" — the kernel already enforced it.

ArcFlow's daemon binds one UDS by default at /tmp/arcflow.sock. Override with --socket <path>. Lock down access with standard Unix file permissions on the path.

Quick start#

# Start the daemon (in-memory, no durability — fine for development)
arcflow-daemon --socket /tmp/arcflow.sock
 
# In another terminal: ping
echo '{"id":1,"method":"ping","params":{}}' | nc -U /tmp/arcflow.sock
# → {"id":1,"result":{"pong":true}}
 
# Create a topic + publish
echo '{"id":2,"method":"topic.create","params":{"name":"frames"}}' \
  | nc -U /tmp/arcflow.sock
echo '{"id":3,"method":"topic.publish","params":{"topic":"frames","data":{"frame_no":1}}}' \
  | nc -U /tmp/arcflow.sock
# → {"id":3,"result":{"sequence":1}}

Wire protocol — newline-delimited JSON-RPC#

Every request is one JSON object on its own line. Response is one JSON object on its own line. Pipelining works: send N requests, receive N responses in the same order.

// Request
{"id": 42, "method": "topic.publish", "params": {"topic": "t", "data": {"k": "v"}}}
 
// Success response
{"id": 42, "result": {"sequence": 17}}
 
// Typed error response
{"id": 42, "error": {"code": "TOPIC_NOT_FOUND", "message": "Topic 't' does not exist"}}

The full method catalog lives in the daemon's source — dispatch in crates/arcflow-daemon/src/lib.rs — but the surface is small: pub/sub (topic.*), pull-style consumers (consumer.*), queue groups (group.*), plus a ping.

Throughput — pick the right publish pattern#

The daemon's per-request cost is dominated by JSON parse + UDS round- trip. Pattern matters a lot for throughput.

PatternUse when
topic.publish_batch (recommended for bursts)Producer has N events ready and doesn't need a per-event ack.
topic.publish pipelined (concurrent reader thread)Producer can fire-and-forget but needs each event journaled separately (e.g. per-event WAL replay semantics).
topic.publish round-trip-per-requestProducer needs a per-event ack before sending the next (rare; usually a footgun).

If you're seeing low throughput, you're almost certainly on the round-trip-per-request path. Switch to publish_batch with a batch size of 100–1000 for substantially higher event rates. Measure on your host with the topic.publish* benchmarks in the ozinc/arcflow repo.

// Batch publish — one RPC, many events
{
  "id": 1,
  "method": "topic.publish_batch",
  "params": {
    "topic": "frames",
    "events": [
      {"data": {"frame_no": 1}},
      {"data": {"frame_no": 2}, "headers": {"trace_id": "abc"}},
      {"data": {"frame_no": 3}}
    ]
  }
}
 
// Response — sequences in input order
{"id": 1, "result": {"sequences": [1, 2, 3], "count": 3}}

First-failure-stops semantics matching Kafka producer batches: if event K fails (e.g. invalid payload), events 0..K-1 are published, response carries the partial sequences plus the typed error and partial: true. Caller resumes from sequences.len().

Live retention changes — topic.set_retention#

Topics created with one retention policy can have it tightened or loosened at runtime without a restart. Useful for handling burst days, bounding queue depth on a noisy topic, or relaxing a too-tight policy that's dropping events you wanted to keep.

// Tighten a topic to keep at most the last 1000 events.
{
  "id": 1,
  "method": "topic.set_retention",
  "params": {
    "topic": "frames",
    "max_events": 1000
  }
}
 
// Both fields are independent and optional. `null` (or omitted)
// means "unbounded on that axis". Passing the bare topic name with
// no bounds restores fully-unbounded retention.
{
  "method": "topic.set_retention",
  "params": { "topic": "frames" }
}

Tightening prunes already-resident events immediately in the same lock pass. The new policy is WAL-journaled, so it survives daemon restart. Returns TOPIC_NOT_FOUND if the topic doesn't exist.

Dropping a topic entirely — topic.delete#

Removes a topic + all events + all consumers bound to it + all groups bound to it. A subsequent topic.create with the same name creates a fresh topic with a new internal id; previous consumer registrations are gone (correct — they were bound to the old id).

{"method": "topic.delete",
 "params": {"topic": "frames"}}
 
// → {"topic":             "frames",
//    "events_evicted":    4,
//    "consumers_dropped": 1,
//    "groups_dropped":    1}

After delete + recreate, the first publish on the new topic gets sequence 1 (not the prior topic's high-water mark). This is the deliberate split with topic.purge — see below.

WAL-journaled so the deletion survives restart. Returns TOPIC_NOT_FOUND if the topic doesn't exist.

Clearing a topic's events — topic.purge#

Drop all resident events from a topic without dropping the topic itself. Retention, consumers, and groups survive; the topic's next_seq is preserved so consumer offsets stay meaningful (the next publish gets last_seq + 1, not 1). Useful for testing, debug, and "drain and start fresh" workflows.

{"method": "topic.purge",
 "params": {"topic": "frames"}}
 
// → {"topic": "frames", "evicted": 7}

After the purge:

  • topic.events_after returns nothing
  • topic.list still shows the topic with its retention intact
  • consumer.list still shows registered consumers with their last_acked_seq unchanged
  • The next topic.publish gets a sequence past the prior high-water mark (e.g. if the last event was seq 7, the new one gets seq 8) — so a consumer with last_acked_seq=3 will see the new event as pending, not all of "everything's pending"

The purge is WAL-journaled so the cleared state survives daemon restart. Returns TOPIC_NOT_FOUND if the topic doesn't exist.

Rewinding a consumer — consumer.reset#

Consumer offsets are monotonic by design — both consumer.register (idempotent) and consumer.ack (no-op on stale seq) refuse to rewind. When operators need a consumer to re-read a window (re-processing after a downstream bug, replaying a debug session), use consumer.reset:

{
  "id": 1,
  "method": "consumer.reset",
  "params": {
    "name":   "vision",
    "topic":  "frames",
    "to_seq": 1000
  }
}
 
// Response
{"id": 1,
 "result": {"id": 17,
            "name": "vision",
            "topic": "frames",
            "last_acked_seq": 1000}}

After this, consumer.pending returns events with sequence > 1000. Internally the daemon does drop + re-register under one lock pass, producing two WAL ops (DropConsumer + RegisterConsumer) that replay correctly. The consumer's id may change (drop assigns a fresh id on the new register).

Strict semantics: consumer.reset requires the consumer to already exist. Returns CONSUMER_NOT_FOUND if not — to create a consumer at a specific seq, use consumer.register with start: <seq>. This split prevents a typo in the consumer name from silently spawning a new one.

Durability — --data-dir#

Without --data-dir the daemon runs purely in-memory. A crash loses all events. That's fine for development, integration tests, or workloads that can replay from the source.

For production, pass --data-dir <path>:

arcflow-daemon --socket /tmp/arcflow.sock --data-dir /var/lib/arcflow

On startup, the daemon:

  1. Opens (or creates) <data_dir>/wal.bin.
  2. Replays every existing entry to rebuild graph state — topics, events, consumers, queue groups, all restored at the offsets they were at when the daemon last shut down.
  3. Attaches the async WAL lane: every subsequent pub/sub mutation is fire-and-forget journaled to disk via a background thread that batches + fsyncs. Publisher returns at memory speed (~50 ns mpsc send), durability is async.

After a daemon crash + restart, consumer offsets are preserved. Workers resume from last_acked_seq + 1 instead of from the beginning. Same for queue-group offsets.

The startup line on stderr tells you what was replayed:

arcflow-daemon: replaying 12834 WAL entries from /var/lib/arcflow/wal.bin
arcflow-daemon: WAL durability enabled at /var/lib/arcflow/wal.bin (replayed 12834 entries; flush_count=1000, flush_interval_ms=10)
arcflow-daemon: UDS listening on /tmp/arcflow.sock (Ctrl+C to stop)

Tuning the disk lane#

The async WAL lane batches ops before each fsync. Two flags control the batching policy; both apply only when --data-dir is in use.

FlagDefaultMeaning
--wal-flush-count <N>1000Fsync after this many ops accumulate.
--wal-flush-interval-ms <N>10Fsync at most this long after the first buffered op, even if the count threshold isn't hit.

The threshold that fires first wins on each batch.

Pick a preset by workload:

# Strict durability (every-op fsync; expensive but bounds data loss to ≤1 op)
arcflow-daemon --data-dir /var/lib/arcflow \
  --wal-flush-count 1 --wal-flush-interval-ms 1
 
# Balanced (default — good for most pub/sub workloads)
arcflow-daemon --data-dir /var/lib/arcflow
 
# High-throughput batch ingestion (fewer fsyncs, larger crash window)
arcflow-daemon --data-dir /var/lib/arcflow \
  --wal-flush-count 10000 --wal-flush-interval-ms 100

Both flags must be > 0: a 0 count would disable the count-driven flush path; a 0 timeout would spin the disk-lane thread. The daemon rejects either with an explicit error at startup.

Bounding request size — --max-line-bytes#

By default the daemon caps a single newline-delimited UDS request at 16 MiB (16 777 216 bytes). Override with --max-line-bytes <N>. Floor: 1024 (1 KiB) — anything below will reject normal JSON-RPC envelopes and is rejected at startup.

Why this exists: the UDS reader was previously unbounded, so a buggy client streaming bytes without a terminating \n could OOM the daemon. The bounded path closes the connection with a typed error after the first oversized line:

{"id": null, "result": null,
 "error": {
   "code": "LINE_TOO_LARGE",
   "message": "request exceeds --max-line-bytes (16777216); closing connection. …"
 }}

The frame is desynced once we abort mid-line, so the daemon must close the socket — clients should reconnect. Pick a tighter cap (e.g. --max-line-bytes 65536) when running in adversarial / shared environments, or a looser cap (e.g. --max-line-bytes 67108864 for 64 MiB) when your topic.publish_batch legitimately ships very large frames.

The same cap covers the HTTP/SSE bridge — parse_request enforces it as the cumulative budget across the request line plus every header, so an attacker can't bypass the per-line limit by sending many small headers. Oversized HTTP requests get a 413 Payload Too Large response and a closed socket:

HTTP/1.1 413 Payload Too Large
Content-Type: text/plain; charset=utf-8
Connection: close

request exceeds --max-line-bytes (16777216); closing connection

Bounding concurrent connections — --max-connections#

By default the daemon caps active connections (across UDS + HTTP) at 1024. Override with --max-connections <N>. Floor: 1 — zero is rejected at startup since it would brick the daemon for every client.

When the cap is full, new accepts get a typed BUSY frame and an immediate close — the connection is not held open in a queue (that would just defer the FD pressure). Counted in arcflow_rejected_connections_total; any non-zero rate is operator-actionable.

// UDS reject — newline-delimited JSON-RPC frame, then close
{"id": null, "result": null,
 "error": {"code": "BUSY", "message": "daemon at --max-connections"}}
HTTP/1.1 503 Service Unavailable
Content-Type: text/plain; charset=utf-8
Connection: close

daemon at --max-connections

The slot is acquired in the accept loop (CAS retry, no double-counting under contention) and released by an RAII guard when the handler thread exits — so even a panicking handler returns the slot rather than leaking it.

Snapshot for backup — topic.checkpoint#

Materialize the daemon's current pub/sub state as a compacted sequence of WAL ops and atomically write to a snapshot file. Useful for:

  • Backups before upgrades / config changes
  • Cold-restore from offsite storage
  • Forensics (load snapshot.bin into a separate daemon to inspect the state at checkpoint time)
// Default: snapshot.bin under --data-dir
{"method": "topic.checkpoint", "params": {}}
 
// Override: write to a custom path (e.g. NFS-mounted backup volume)
{"method": "topic.checkpoint",
 "params": {"path": "/mnt/backup/arcflow-2026-05-11.bin"}}
 
// → {"snapshot_path": "/var/lib/arcflow/snapshot.bin",
//    "ops":           1834,         // # of WAL ops in the snapshot
//    "bytes":         287512}       // serialized size

The snapshot file uses the same BinaryWal format as wal.bin, so it's loadable with the same BinaryWal::open(...).replay() path. The snapshot is a compaction of the historical WAL — redundant ops (multiple AckEvents for the same consumer, dropped consumers, retention-pruned events) are folded out, so it's strictly smaller than the WAL it summarizes.

The write goes through the same atomic .tmp + fsync + rename dance the disk lane uses for its own checkpoint command. Either the new snapshot is fully durable or the old one is untouched — no half-written file.

For automatic restart-cost reduction, pass truncate_wal: true:

// Atomic compact: replace wal.bin with the snapshot in one
// lock pass + reopen the disk lane. No restart needed.
{"method": "topic.checkpoint",
 "params": {"truncate_wal": true}}
 
// → {"snapshot_path": null,           // no separate file written
//    "wal_truncated": true,
//    "ops":           1834,
//    "bytes":         287512}
 
// Or both: snapshot to backup AND compact wal.bin in one call.
{"method": "topic.checkpoint",
 "params": {"path":          "/mnt/backup/arcflow-2026-05-11.bin",
            "truncate_wal":  true}}

When truncate_wal: true, the daemon:

  1. Materializes pub/sub state under the store lock,
  2. Drains the disk lane (shutdown — flush + thread join),
  3. Writes wal.bin.tmp (compacted form) + fsync,
  4. Atomically renames wal.bin.tmp → wal.bin,
  5. Reopens a fresh AsyncWalLane on the new wal.bin,
  6. Releases the lock — publishers resume.

Crash points are all recoverable:

  • Before rename: old wal.bin intact, replay restores pre-checkpoint state. .tmp file is garbage, gets reused on next checkpoint.
  • After rename, before lane reopen: new wal.bin in place, daemon down. Restart picks up the truncated form.
  • Mid-rename: POSIX rename is atomic. Either fully done or untouched.

No separate snapshot loader is needed at startup — the existing init_store_with_wal replay reads the compacted wal.bin directly.

Typed errors:

  • WAL_NOT_ENABLED — truncate_wal: true without --data-dir, or snapshot-only mode with no path and no --data-dir
  • WAL_TRUNCATE_WRITE_FAILED / WAL_TRUNCATE_RENAME_FAILED — the daemon attempts to reopen the original lane before erroring, so the daemon stays operational and the operator can retry
  • WAL_LANE_REOPEN_FAILED — wal.bin truncated successfully but the lane couldn't be reopened. Restart the daemon to restore durability (the on-disk state is correct).

Periodic auto-checkpoint — --auto-checkpoint-secs

For unattended production deployments, pass --auto-checkpoint-secs <N> to the daemon. A background thread fires topic.checkpoint{truncate_wal: true} every N seconds so wal.bin stays bounded without operator intervention:

# Compact wal.bin every 5 minutes
arcflow-daemon \
  --data-dir /var/lib/arcflow \
  --auto-checkpoint-secs 300
PropertyBehavior
Default0 (disabled) — operators must opt in.
Floor10 seconds when enabled. Sub-10s cadence burns more on lane shutdown/reopen than it saves in restart cost; rejected at startup.
Realistic values60 (per minute), 300 (5 min), 3600 (hourly).
Without --data-dirSilently ignored (no WAL to compact).
Shutdown latencySleeps in 1-second slices, so SIGTERM during a long period (e.g. --auto-checkpoint-secs 3600) doesn't make the daemon wait an hour to exit.
FailureLogs to stderr, doesn't retry until the next tick. Alarm pattern: rate(arcflow_wal_checkpoints_total[1h]) == 0 while the flag is set means the compactor stalled.

Both manual and automatic checkpoints increment the same counter (arcflow_wal_checkpoints_total), so a fleet-wide checkpoint-rate dashboard works regardless of which path triggered the compaction.

Daemon identity — daemon.info#

Cheap RPC that returns the daemon's version, uptime, active flag values, and live counts. Useful for "is this the daemon I think I'm talking to" debugging and for client SDKs that want to gate features on a minimum daemon version.

{"method": "daemon.info", "params": {}}
 
// → {
//   "version":         "0.8.0",
//   "uptime_seconds":  3672,
//   "wal_durability":  true,           // mirrors `--data-dir` presence
//   "limits": {
//     "max_line_bytes":  16777216,
//     "max_connections": 1024
//   },
//   "counts": {
//     "topics":          42,
//     "consumers":       18,
//     "consumer_groups": 3
//   }
// }

The same version is exposed as the arcflow_build_info{version="…"} gauge label on /metrics, and uptime as arcflow_uptime_seconds.

Bounded pull pagination — limit + has_more#

The pull-style RPCs — topic.events_after, consumer.pending, group.pending — accept an optional limit parameter and return a has_more: bool field alongside events. Without the limit, a fresh consumer catching up after a long disconnect could request millions of events in one RPC, materializing the whole pending set in memory and serializing it into one giant JSON array.

// Page 1
{"id": 1, "method": "topic.events_after",
 "params": {"topic": "frames", "after_seq": 0, "limit": 1000}}
 
// → {"events": [...1000 events...], "has_more": true}
 
// Page 2 — cursor at the last sequence returned
{"id": 2, "method": "topic.events_after",
 "params": {"topic": "frames", "after_seq": 1000, "limit": 1000}}
 
// → {"events": [...next page...], "has_more": false}
FieldDefaultNotes
limit1000Per-RPC override. Clamped to [1, 100000]. Larger values are silently capped (no error — has_more tells the client to keep paging).
has_moreresponse-onlytrue means at least one more event exists past the returned set; client should issue the next RPC.

For hot-loop catch-up the streaming consumer.subscribe path is the right answer — pagination is for episodic / dashboard pulls.

Streaming subscribers — long-lived push delivery#

For subscribers that want events pushed to them as they arrive (rather than polling consumer.pending), open one connection and send consumer.subscribe. The daemon takes over that connection and streams {"event": …} lines as events fire, until the client disconnects.

echo '{"id":1,"method":"consumer.subscribe","params":{"name":"vision","topic":"frames","poll_ms":30}}' \
  | nc -U /tmp/arcflow.sock
 
# Initial ack
{"id":1,"result":{"streaming":true,"mode":"consumer"}}
 
# Then one line per event as they arrive
{"event":{"sequence":1,"data":{...},"headers":{...}}}
{"event":{"sequence":2,"data":{...}}}
…

Default poll interval is 30 ms (sub-frame for 60 fps). Override per subscription with poll_ms. Auto-ack on send — events are acked as they're delivered, so this is at-most-once semantics. Use consumer.pending + consumer.ack for at-least-once.

The same shape works for queue-group members via group.subscribe.

Browser / agent clients — HTTP/SSE bridge#

For web clients (browser EventSource, agent runtimes that don't link the C ABI), pass --http <addr:port> to bind an HTTP listener alongside the UDS:

arcflow-daemon \
  --socket /tmp/arcflow.sock \
  --http 127.0.0.1:7771 \
  --data-dir /var/lib/arcflow

Routes:

RoutePurpose
GET /livezLiveness probe — 200 OK + ok\n; 503 only mid-shutdown
GET /readyzReadiness probe — 200 OK + per-check report; 503 if any check fails
GET /healthzBackward-compat alias for /livez
GET /metricsPrometheus exposition (counters + gauges)
GET /v1/sse/events?consumer=…&topic=…&poll_ms=…SSE stream of pub/sub events for a consumer
GET /v1/sse/group?group=…&member=…&topic=…&poll_ms=…SSE stream for a queue-group member

Browser usage:

const stream = new EventSource(
  'http://localhost:7771/v1/sse/events?consumer=vision&topic=frames'
);
stream.addEventListener('arcflow.event', (e) => {
  const ev = JSON.parse(e.data);
  // ev.sequence, ev.data, ev.headers, ev.published_at_ms
});

The SSE response sends a : keep-alive comment every 15 s of idle so reverse proxies (nginx, ELB, …) don't close the socket.

Observability — Prometheus /metrics#

When --http is enabled, /metrics exposes Prometheus text format. No per-topic / per-consumer cardinality (those would explode under a busy session); aggregate counters + gauges are read off the store under one lock per scrape.

Counters:

  • arcflow_rpc_publish_total
  • arcflow_rpc_publish_batch_total
  • arcflow_rpc_publish_batch_events_total (sum of events committed across batches — rate(events) / rate(batches) reports average batch size)
  • arcflow_rpc_ack_total / arcflow_rpc_nack_total
  • arcflow_rpc_register_consumer_total / arcflow_rpc_register_group_member_total / arcflow_rpc_drop_consumer_total
  • arcflow_rpc_unknown_method_total / arcflow_rpc_invalid_json_total / arcflow_rpc_malformed_params_total
  • arcflow_rpc_typed_errors_total

Gauges (snapshot at scrape time):

  • arcflow_build_info{version="…"} — constant 1 with the daemon crate version baked into a label. Standard Prometheus convention; use count by (version) (arcflow_build_info) to count daemons by release across a fleet.
  • arcflow_uptime_seconds — seconds since the daemon started serving traffic. Restart detection: delta(arcflow_uptime_seconds[5m]) < 0.
  • arcflow_topics
  • arcflow_consumers
  • arcflow_consumer_groups
  • arcflow_dlq_nodes — failed events parked for re-drive
  • arcflow_sliding_windows
  • arcflow_max_consumer_lag — worst-case head_seq - last_acked_seq across every durable consumer and consumer group on every topic. Single aggregate gauge by design; per-consumer detail lives in the consumer.list / group.list RPC responses (see below) so the metrics surface stays low-cardinality.
  • arcflow_active_connections — active client connections across UDS + HTTP. Bounded by --max-connections.

Counters (cont'd):

  • arcflow_rejected_connections_total — connections refused because --max-connections was full. Any non-zero rate indicates clients silently failing to connect; alarm worthy.
  • arcflow_wal_checkpoints_total — successful topic.checkpoint calls (manual + automatic). Alarm: rate(...[1h]) == 0 while --auto-checkpoint-secs is set means the compactor stalled.

WAL durability metrics (only emitted when --data-dir is in use):

MetricTypeMeaning
arcflow_wal_replay_entriesgaugeWAL entries replayed at startup. Set once; never increments mid-process. Restart-cost indicator.
arcflow_wal_ops_buffered_totalcounterOps sent to the disk lane since startup.
arcflow_wal_ops_flushed_totalcounterOps successfully fsynced.
arcflow_wal_pendinggaugeBuffered but not flushed. Alarm if persistently above the lane's flush_count threshold.
arcflow_wal_bytes_written_totalcounterBytes written to wal.bin.
arcflow_wal_flushes_totalcounterFsync cycles completed by the disk lane.
arcflow_wal_last_flush_usgaugeμs of the most recent fsync cycle.

Daemons started without --data-dir emit only arcflow_wal_replay_entries 0 and skip the lane-derived metrics — the WAL infrastructure isn't running in that mode.

Sample Prometheus scrape config:

scrape_configs:
  - job_name: arcflow-daemon
    static_configs:
      - targets: ['127.0.0.1:7771']

Useful PromQL once it's wired:

# Events per second flowing through the daemon
rate(arcflow_rpc_publish_total[1m])
  + rate(arcflow_rpc_publish_batch_events_total[1m])
 
# Average events per batch — is the producer batching effectively?
rate(arcflow_rpc_publish_batch_events_total[1m])
  / rate(arcflow_rpc_publish_batch_total[1m])
 
# Failed-event backlog — operator alarm if non-zero for more than N minutes
arcflow_dlq_nodes
 
# Sustained durability throughput — should track publish rates under steady state
rate(arcflow_wal_ops_flushed_total[1m])
 
# Disk lane backpressure — alarm if persistently > a few thousand
arcflow_wal_pending
 
# Average bytes per fsync — useful for sizing flush_count / flush_interval
arcflow_wal_bytes_written_total
  / arcflow_wal_flushes_total
 
# Restart cost — schedule snapshot+truncate work when this grows large
arcflow_wal_replay_entries
 
# Worst-case consumer lag — one-number indicator that some subscriber
# is falling behind. Drill into consumer.list / group.list (lag field)
# to see which.
arcflow_max_consumer_lag

The matching consumer.list and group.list RPC responses include head_seq and lag per row, so an admin tool can print a per-consumer table without a second round trip:

// → consumer.list response
{
  "consumers": [
    {
      "id": 1,
      "name": "vision",
      "topic": "frames",
      "last_acked_seq": 1234,
      "head_seq": 1240,
      "lag": 6
    },
    // …
  ]
}

When NOT to use the daemon#

Use in-process instead when:

  • A single process owns the entire graph end-to-end and nothing else needs to read it. The C ABI / napi-rs / WASM path skips JSON-RPC and IPC entirely — every operation is a function call into the same address space.
  • You're shipping a self-contained binary that bundles ArcFlow as a library. Adding a daemon process complicates deployment.
  • Cross-machine. The daemon's UDS transport is same-machine only by design. For network-distributed setups, use the WAL-stream replication mode (one writer, many readers via WAL tailing) — see the Sync architecture page.

Operational notes#

  • Stderr log format: every operationally-meaningful event the daemon writes to stderr now carries an ISO-8601 UTC timestamp and a level prefix. Three levels: INFO (lifecycle), WARN (recoverable degradation), ERROR (fatal-to-process).

    2026-05-11T13:45:23Z INFO  arcflow-daemon: WAL durability enabled at /var/lib/arcflow/wal.bin (replayed 12834 entries; flush_count=1000, flush_interval_ms=10)
    2026-05-11T13:45:23Z INFO  arcflow-daemon: UDS listening on /tmp/arcflow.sock (Ctrl+C to stop)
    2026-05-11T13:45:24Z WARN  arcflow-daemon: HTTP accept failed — Connection reset by peer (os error 104)
    

    No new dependencies — the format is hand-rolled to keep the static binary small for low-overhead deploys. Operators on systemd / k8s already get journald-side timestamps; this lights up plain stdout-to-file deploys, ad-hoc nc testing, and log-scrapers that filter by level prefix. The usage banner printed by --help is intentionally NOT prefixed (it's not a log event).

  • Liveness vs readiness: wire GET /livez to your liveness probe and GET /readyz to your readiness probe.

    • /livez returns 200 + ok\n whenever the process is alive (503 only mid-shutdown). Failure here means restart the pod.
    • /readyz returns 200 + per-check report (store_lock: ok, wal_pending: ok (N), wal_last_flush_us: ok (N)) when every subsystem check passes; 503 + the same report listing failed checks otherwise. Failure here means drain traffic, not restart.
    • /healthz is kept as a backward-compat alias for /livez.
  • Shutdown: the daemon traps SIGINT and SIGTERM. After the signal:

    1. The accept loop stops (no new connections);
    2. The async WAL lane is explicitly drained — its disk-lane thread is told to flush, and main() blocks on the join. The buffered fsync batch is guaranteed durable before the process exits, regardless of how many connection-handler threads are still alive holding Arc clones. (Without this explicit drain, Drop on the lane would never run because the shared GraphStore never gets dropped before main() returns; the OS would then SIGKILL the disk thread, possibly mid-fsync.)
    3. The socket file is unlinked.

    Operators see two stderr lines on a successful shutdown:

    arcflow-daemon: draining WAL on shutdown…
    arcflow-daemon: WAL drained — exit clean
    

    In-memory daemons (no --data-dir) skip both lines — there is no durability surface to drain.

  • Socket permissions: the daemon creates the UDS with default umask. Restrict access by setting umask before exec, or running the daemon as a dedicated user whose primary group is the only one allowed to read/write the socket.

  • WAL retention: for long-running deployments, the WAL file grows monotonically. Periodic snapshot + truncate is on the roadmap; until then, restart the daemon weekly with a fresh --data-dir if you need to bound disk usage.

  • One daemon per --data-dir: the daemon takes an exclusive hold on the data directory. A second daemon pointed at the same directory will fail to start. This is the SWMR (single-writer, multi-reader) invariant — see the engine's replication.rs.

Reference#

  • Source: crates/arcflow-daemon/
  • Method catalog: dispatch function in lib.rs
  • HTTP routes: handle_http_connection in http.rs
  • WAL replay: init_store_with_wal in main.rs
← PreviousImport & ExportNext →Engine Architecture