Language Bindings
One engine, every language. The full install matrix is sourced from the engine release manifest — no hand-rolled commands here.
Available now
- Native CLI binary
curl -fsSL https://staging.oz.com/install/arcflow | shPlatforms: darwin-arm64, linux-x86_64-gnu, linux-arm64-gnu, linux-x86_64-musl, linux-arm64-musl, windows-x86_64, windows-arm64 - Standalone arcflow-daemon binary (UDS / named-pipe JSON-RPC)
curl -fsSL https://github.com/ozinc/arcflow/releases/latest/download/arcflow-daemon-${VERSION}-${PLATFORM}.tar.gz | tar -xzPlatforms: darwin-arm64, linux-x86_64-gnu, linux-arm64-gnu, linux-x86_64-musl, linux-arm64-musl, windows-x86_64, windows-arm64 - ArcFlow MCP server (stdio JSON-RPC for AI agents)
curl -fsSL https://github.com/ozinc/arcflow/releases/latest/download/arcflow-mcp-${VERSION}-${PLATFORM}.tar.gz | tar -xzPlatforms: darwin-arm64, linux-x86_64-gnu, linux-arm64-gnu, linux-x86_64-musl, linux-arm64-musl, windows-x86_64, windows-arm64 - libarcflow shared library (C ABI cdylib)
curl -fsSL https://github.com/ozinc/arcflow/releases/latest/download/libarcflow-${VERSION}-${PLATFORM}.tar.gz | tar -xzPlatforms: darwin-arm64, linux-x86_64-gnu, linux-arm64-gnu, windows-x86_64, windows-arm64
In progress
pip install oz-arcflow2026-Q3npm install arcflow2026-Q3cargo add arcflow2026-Q4
Manifest-driven. When a binding flips to shipped in RELEASE-MATRIX.toml, every install surface picks it up automatically.
By design — not provided
- Docker image
Refused 2026-04-30. ArcFlow is a 5MB statically-linked embedded library; Docker would add ~20MB of container overhead and subvert the in-process design. Customers should: - import the language binding (oz-arcflow Python wheel, arcflow npm package, arcflow Rust crate) for embedded use - use the native CLI for one-shot or scripting use - run the WASM playground at https://staging.oz.com/engine for the zero-install demo case If a future use case (long-running arcflow-mcp service, shared multi-tenant deployment) needs a containerized shape, add a separate `kind = "mcp-docker"` or `kind = "service-docker"` entry rather than reviving generic engine Docker.
ArcFlow ships as a 5MB embedded library — Browser, Node, Python, Rust, native CLI, MCP. Containerization would add overhead and subvert the in-process design. The engine is small enough to fit every deployment shape that earns its weight.
All bindings wrap a single C ABI shared library (libarcflow). Direct function calls when in-process. TCP / HTTP / MCP when you need network access.
If you are writing a custom binding, read Adapter Discipline first — it is the boundary contract every binding inherits. The short version: the edge translates; only the core decides.
Rust SDK#
The primary interface.
use arcflow::{open, open_concurrent, compile, Label, Properties};
// In-memory graph — direct store access
let mut store = arcflow::open();
let id = store.create_node(
vec![Label::new("Person")],
Properties::new(),
);
// Thread-safe concurrent access — recommended for applications
let db = arcflow::open_concurrent();
db.execute("CREATE (n:Person {name: 'Alice', age: 30})").unwrap();
let result = db.execute("MATCH (n:Person) RETURN n.name").unwrap();
for row in &result.rows {
println!("{}", row.get("name").unwrap());
}Key Types#
| Type | Description | |--- ---|---#
----|
| GraphStore | Single-threaded graph store with full low-level API |
| ConcurrentStore | Thread-safe wrapper with execute() — auto-detects read vs write |
| Engine | Read-only query engine |
| MutableEngine | Read-write query engine |
| QueryResult | Rows + columns from query execution |
| compile() | Parse WorldCypher text into executable IR |
Compile + Execute#
let db = arcflow::open_concurrent();
db.execute("CREATE (n:Person {name: 'Alice'})").unwrap();
// Pre-compile for repeated execution
let query = arcflow::compile("MATCH (n:Person) RETURN n.name").unwrap();
// Execute compiled query
let result = db.execute("CALL algo.pageRank()").unwrap();
assert!(result.rows.len() > 0);Python#
Native bindings over the C ABI. No compile step needed — loads the shared library at runtime.
from arcflow import ArcFlow
# In-memory or persistent
db = ArcFlow() # in-memory
db = ArcFlow("/tmp/mydb") # persistent (WAL-journaled)
# Execute queries — typed parameters, no manual string escaping
db.execute("CREATE (n:Person {name: 'Alice', age: 30})")
result = db.execute(
"MATCH (p:Person) WHERE p.name = $name RETURN p.age",
params={"name": "Alice"},
)
for row in result:
print(row)
# Algorithms work the same way
db.execute("CALL algo.pageRank()")
# Context-manager for deterministic cleanup
with ArcFlow() as db:
db.execute("CALL db.demo()")
# Version — both return the same string. See /docs/reference/versioning
import arcflow
print(arcflow.__version__) # "0.8.0"
print(ArcFlow.version()) # "0.8.0"Bulk ingest — three throughput tiers#
For ingest, pick the highest-throughput path that matches your input shape. Each successive tier removes one layer of Python overhead.
| Path | Input | Throughput | Use when |
|---|---|---|---|
db.execute("CREATE …") | parser-bound Cypher | 3–10 K wps | one-off writes, exploratory |
bulk_create_nodes / _relationships | list of (labels, props) / (from, to, props) tuples | ~360 K wps | rows already in Python |
bulk_create_nodes_from_arrow / _relationships_from_arrow | pyarrow.Table / RecordBatch | ~1 M+ wps | Parquet / Arrow / Polars pipelines |
load_parquet / load_csv | path on disk | parquet ≈ Arrow path | bulk import from a file |
import pyarrow as pa
# Tier 1 — Python tuples (most common shape)
entity_ids = db.bulk_create_nodes([
(["Entity"], {"id": i, "x": float(i), "y": 0.0})
for i in range(95)
])
frame_ids = db.bulk_create_nodes([
(["Frame"], {"id": f"frame_{t}", "ts_ms": t * 16})
for t in range(46_000)
])
edges = [(eid, fid, {"x": float(ei), "y": float(fi)})
for fi, fid in enumerate(frame_ids)
for ei, eid in enumerate(entity_ids)]
db.bulk_create_relationships("OBSERVED_AT", edges)
# Tier 2 — Arrow-direct (no JSON marshal, saturates the engine)
tbl = pa.table({
"id": pa.array(range(95), type=pa.int64()),
"x": pa.array([float(i) for i in range(95)], type=pa.float64()),
})
db.bulk_create_nodes_from_arrow("Entity", tbl)
# Spatial Struct columns ({x: f64, y: f64[, z]}) round-trip as native Point/Point3d.
# Tier 3 — File-direct (no Python in the loop at all)
n_loaded = db.load_parquet("data/tracking.parquet", "TrackingObs")
db.load_csv("data/players.csv", "Player", chunk_size=100_000)bulk_create_* and load_* have CREATE semantics — every call allocates
a new node or edge, even between the same pair. Use Cypher MERGE if you
need find-or-create.
Prepared statements — db.prepare(query)#
Compile a Cypher query once for repeated execution. Each .execute(params)
call skips the parser and goes straight to the executor. For analytics
workloads that issue the same query thousands of times with different
parameters (dashboards, real-time scoring, ML feature pipelines), this
avoids the per-call parse cost.
stmt = db.prepare("MATCH (e:Entity {id: $eid}) RETURN e.x, e.y, e.speed")
for eid in entity_ids:
row = next(iter(stmt.execute({"eid": eid})))
stmt.close()The statement borrows from this database — close it before closing the database, or use it as a context manager.
Zero-copy result handoff — result.to_arrow() / to_polars() / to_pandas()#
Materializing a 1M-row result as list[dict] is slow and lossy. The Arrow
handoff hands typed Arrow buffers to PyArrow / Polars / Pandas / DuckDB
zero-copy:
result = db.execute("MATCH (p:Player) RETURN p.jersey, p.name, p.age")
tbl = result.to_arrow() # pyarrow.RecordBatch
df = result.to_polars() # polars.DataFrame (Arrow-backed, zero-copy)
df = result.to_pandas() # pandas.DataFrame (Arrow-backed since pandas 2)
# Direct DuckDB interop:
import duckdb
con = duckdb.connect()
con.register("players", tbl)
con.execute("SELECT count(*) FROM players WHERE jersey > 50").fetchone()Column types are honoured exactly — a Player whose name is "42" stays a
String, not coerced to int.
Per-column type mapping:
| Cypher / Engine type | Arrow DataType |
|---|---|
| Int | Int64 |
| Float | Float64 |
| String | Utf8 |
| Bool | Boolean |
[Int…] | List<Int64> |
[Float…] | List<Float64> |
[String…] | List<Utf8> |
| Node ID / Rel ID | UInt64 |
| Point / Polygon | Utf8 (geometry as WKT) |
Snapshot-pinned reads — db.query_at(query, seq)#
Run a query against a specific past WAL sequence. Bit-for-bit deterministic replay of the world model at any point in its history.
# Capture the seq before running a decision loop
anchor_seq = db.execute("RETURN walSeq() AS s").one()["s"]
# … mutations happen …
# Replay the world as it looked when the decision was made
result = db.query_at(
"MATCH (a:Account {id: 'ACC-789'}) RETURN a.balance, a.risk_score",
seq=anchor_seq,
)Equivalent to inlining AS OF seq <seq> in the Cypher, with the AS-OF
mechanics kept out of caller code.
Live subscriptions — db.subscribe(view_name)#
Push-based subscription to a CREATE LIVE VIEW. The engine runs the
standing query incrementally and pushes {added, removed, seq} events
to the subscriber.
db.execute("""
CREATE LIVE VIEW high_conf AS
MATCH (n:Detection) WHERE n.confidence > 0.9
RETURN n.id AS id, n.confidence AS conf
""")
with db.subscribe("high_conf") as sub:
for event in sub: # blocks until next frontier advance
for row in event["added"]:
handle_new_detection(row)
for row in event["removed"]:
retract(row)Each subscription owns a background poll thread; closing the subscription (or exiting the context manager) shuts it down deterministically.
CDC stream — db.changes_since(seq, limit=…)#
Return mutations recorded after a given WAL sequence. Use it to drive a pull-based change-data-capture loop without taking on a separate CDC service.
last_seq = 0
while True:
changes = db.changes_since(last_seq, limit=1_000)
if not changes:
time.sleep(0.5)
continue
publish(changes)
last_seq = max(int(c["sequence"]) for c in changes)Pair with db.fingerprint() (content hash) and db.snapshot_uri()
(content-addressed snapshot URI) when downstream consumers need to verify
the mutations they applied match the source state exactly.
Event bus — arcflow.bus.Bus#
arcflow.bus.Bus exposes the engine's in-process pub/sub + request-reply
surface to Python. Topics, durable consumers, NATS-style wildcard
subscriptions, and synchronous request-reply all reach the same engine
that db.execute() writes to. See Event bus for the
full semantics.
from arcflow import ArcFlow
from arcflow.bus import Bus, EventPublisher, START_FROM_BEGINNING
db = ArcFlow()
bus = Bus(db)
bus.create_topic("orders.us")
seq = bus.publish("orders.us", {"order_id": 42, "amount": 100.0})
# Durable consumer — pull-style, per-consumer offset
bus.register_consumer("billing", "orders.us", start=START_FROM_BEGINNING)
for event in bus.pending("billing", "orders.us"):
process(event.data)
bus.ack("billing", "orders.us", event.seq)
# Wildcard consumer — fans out across every matching topic
tracks = bus.register_pattern_consumer("worker", "all_tracks.*")
for topic, event in tracks.drain():
handle(topic, event.data)
# Synchronous request-reply
reply = bus.request("video.query.status", {"q": "n_cams"}, timeout_ms=5_000)
print(reply.data["count"])EventPublisher is the application-layer convenience for rate-limited,
per-camera, severity-tagged event streams — mirrors oz_core.events.EventPublisher
for services migrating from NATS-and-Memgraph stacks:
pub = EventPublisher(bus, topic_prefix="framing.alert")
pub.set_rate_limit("ball_lost", interval_ms=10_000)
pub.emit("ball_lost", {"conf": 0.2}, camera="cam1", severity="warning")
# -> publishes to framing.alert.warning.ball_lost.cam1 (auto-creates the topic)Repositories — arcflow.repository.BaseRepository#
A drop-in shape for services migrating off the Memgraph Bolt driver.
Subclass and add domain methods that call self.execute(...) /
self.execute_rows(...) / self.execute_scalar(...). Constructor takes
data_dir=None (in-memory) or a path (persistent) — no host / port,
no connection pool, no BAD_SESSION reconnection class because the
engine is same-process FFI.
from arcflow.repository import BaseRepository, RepositoryTimeoutError
class PlayerRepo(BaseRepository):
def get(self, player_id: int) -> dict | None:
return self.execute_one(
"MATCH (p:Player {id: $id}) RETURN p",
{"id": player_id},
)
with PlayerRepo("/var/lib/arcflow") as repo:
alice = repo.get(42)execute(..., timeout_s=5.0) enforces a per-call deadline via a
worker-thread join; exceeding the deadline raises
RepositoryTimeoutError. Pass timeout_s=None to disable per-call
timeouts. Multiple repositories can share one open ArcFlow via the
shared_db= constructor argument.
Streaming functions — arcflow.stream_fn#
Per-stream stateful scalar functions (lag, lead, delta) for
collapsing per-step-derivative patterns. State is held in the engine,
not in the calling service — register_stream_fn + feed_stream_fn
return immediately with whatever output the function produces for that
feed (zero-or-one entries per feed).
from arcflow import ArcFlow
from arcflow.stream_fn import StreamFn, Kind
db = ArcFlow()
pan_delta = StreamFn(db, "pan_delta", Kind.Delta)
ts_delta = StreamFn(db, "ts_delta", Kind.Delta)
for pan, ts in samples:
pan_out = pan_delta.feed(pan)
ts_out = ts_delta.feed(ts)
if pan_out and ts_out:
pan_rate = pan_out[0].fn_value / ts_out[0].fn_valueSee Streaming functions for the
full semantics (Lag(n), Lead(n), and Delta).
Threading concurrency — concurrent.futures works#
ctypes releases the Python GIL for every native call; ArcFlow's
ConcurrentStore is MVCC-safe for parallel readers. Multiple Python threads
on one ArcFlow instance run in real parallel rather than serialized:
from concurrent.futures import ThreadPoolExecutor
queries = [
"MATCH (p:Person) WHERE p.age > 30 RETURN count(*)",
"MATCH (p:Player) RETURN avg(p.jersey)",
# ...
]
with ThreadPoolExecutor(max_workers=8) as pool:
results = list(pool.map(db.execute, queries)) # reads — lock-freeReads parallelize cleanly via ArcSwap snapshots — no coordination needed. Writes through a single handle are guarded: a second write on the same handle while one is in flight returns HANDLE_BUSY_CONCURRENT_WRITER immediately (rather than queueing through the write mutex and regressing 10× silently). The two supported parallel-write patterns are multiple handles (arcflow.sharded) and threading.Lock() around a single handle.
See Threading Model for the complete read/write matrix, the typed-error payload, and substrate detail (writer_busy: AtomicBool + RAII WriterClaim guard).
TypeScript / Node.js#
napi-rs native addon. The Rust engine runs in-process inside Node.js — function calls, not HTTP, no serialization, microsecond latency.
import { open, openInMemory, ArcflowError } from '@ozinc/arcflow'
// In-memory (testing, short-lived)
const db = openInMemory()
// Persistent (WAL-journaled, survives crashes)
const db = open('./data/graph')
// Queries return typed values — numbers are numbers, not strings
db.mutate("CREATE (n:Person {name: 'Alice', age: 30})")
const result = db.query("MATCH (n:Person) RETURN n.name, n.age")
result.rows[0].get('name') // "Alice"
result.rows[0].get('age') // 30 (number, not "30")
// Parameters (prevent injection)
db.query("MATCH (n:Person {name: $name}) RETURN n", { name: 'Alice' })
// Atomic batch mutations
db.batchMutate([
"MERGE (a:Person {id: 'p1', name: 'Alice'})",
"MERGE (b:Org {id: 'o1', name: 'Acme'})",
])
// Paginated cursor for large result sets
const cursor = db.cursor('MATCH (n:Log) RETURN n.ts ORDER BY n.ts', undefined, 500)
let page
while ((page = cursor.next()) !== null) process(page.rows)
cursor.close()
// Live subscriptions — callback fires with added/removed events on each relevant mutation
const sub = db.subscribe(
'MATCH (n:Alert) WHERE n.level = "critical" RETURN n.id, n.message',
({ added, removed }) => console.log('new alerts', added)
)
// later:
sub.cancel()
// Sync
db.syncPending() // 0 if up to date
db.fingerprint() // hash of current state
// Error handling
try { db.query("INVALID") } catch (e) {
if (e instanceof ArcflowError) console.log(e.code, e.category, e.suggestion)
}
db.close()Install#
See the Installation page for the manifest-driven install
matrix. The TypeScript binding ships in the same arcflow package — pick
the npm row in the matrix above.
React#
React hooks for live graph data in components. @arcflow/react ships
alongside arcflow; install both per the Installation page.
import { openInMemory } from '@ozinc/arcflow'
import { useQuery, useLiveQuery } from '@arcflow/react'
const db = openInMemory()
db.mutate("CREATE (n:Alert {level: 'critical', message: 'Disk 95%'})")
function AlertPanel() {
// One-time query
const { data, loading, error } = useQuery(db, 'MATCH (n:Alert) RETURN n.level, n.message')
// Live subscription — re-renders on every graph mutation that affects results
const { rows } = useLiveQuery(db, 'MATCH (n:Alert) WHERE n.level = "critical" RETURN n.message')
if (loading) return <div>Loading...</div>
return (
<ul>
{rows?.map((row, i) => <li key={i}>{String(row.message)}</li>)}
</ul>
)
}| Hook | Returns | Notes | |---
| --- | --- |
|---|---|
| ---- | |
useQuery(db, query, params?, deps?) | { data, loading, error } |
useLiveQuery(db, query, deps?, pollIntervalMs?) | { rows, loading, error } |
C ABI#
The foundation all bindings build on. Stable extern "C" interface with opaque handles.
Header: arcflow.h#
#include "arcflow.h"
// Open runtime (NULL for in-memory, or path for persistent)
arcflow_runtime_t* rt = arcflow_open(NULL);
// Open session
arcflow_session_t* session = arcflow_session_open(rt);
// Execute query
arcflow_result_t* result = arcflow_execute(session, "MATCH (n) RETURN n.name");
// Read results
int64_t rows = arcflow_result_row_count(result);
int64_t cols = arcflow_result_column_count(result);
for (int64_t r = 0; r < rows; r++) {
const char* name = arcflow_result_get_string(result, r, 0);
printf("%s\n", name);
}
// Cleanup
arcflow_result_free(result);
arcflow_session_close(session);
arcflow_close(rt);
// Error handling
const char* err = arcflow_last_error();Functions#
| Function | Description | |--- -------|---#
----|
| arcflow_open(path) | Open runtime. NULL = in-memory |
| arcflow_close(rt) | Close runtime and free memory |
| arcflow_session_open(rt) | Open lightweight session on runtime |
| arcflow_session_close(session) | Close session |
| arcflow_execute(session, query) | Execute WorldCypher query |
| arcflow_result_row_count(result) | Number of result rows |
| arcflow_result_column_count(result) | Number of result columns |
| arcflow_result_column_name(result, idx) | Column name by index |
| arcflow_result_get_string(result, row, col) | Cell value as string |
| arcflow_result_free(result) | Free result memory |
| arcflow_last_error() | Last error message (thread-local) |
| arcflow_version() | ArcFlow version string. See reference/versioning |
C++#
Header-only RAII wrapper over the C ABI. Include arcflow.hpp and link with -larcflow.
#include "arcflow.hpp"
int main() {
// RAII: runtime auto-freed on scope exit
arcflow::Runtime rt; // in-memory
// arcflow::Runtime rt("/path/to/data"); // persistent
auto session = rt.session();
session.execute("CREATE (n:Person {name: 'Alice'})");
auto result = session.execute("MATCH (n:Person) RETURN n.name");
for (int64_t r = 0; r < result.row_count(); ++r) {
std::cout << result.get(r, 0) << std::endl; // "Alice"
}
// result, session, rt auto-freed on scope exit
}Build#
g++ -std=c++17 \
-I include/arcflow \
main.cpp \
-L /path/to/arcflow/lib -larcflow \
-o myappMCP Server#
For cloud chat interfaces (ChatGPT, Claude.ai, Gemini web) that have no local shell. See MCP Server for setup, tools, and configuration.
Architecture#
All bindings target one engine. Pick the layer that matches your use case:
ArcFlow Rust engine
│
C ABI (libarcflow.so)
│
┌───────────────────┼──────────────────────┐
│ │ │
Python TypeScript / React C++
(ctypes) (napi-rs — in-process) (arcflow.hpp)
│
WASM
(browser / edge — zero-copy)
One engine. No protocol translation. Function calls go directly into the Rust engine. For the browser, the WASM build runs the same engine with zero serialization via the WASM memory model.
See Also#
- Installation — pre-built binaries for all platforms
- Platform — runtime environments: browser WASM, Cloudflare Workers, mobile
- Agent-Native Database — integration surfaces: napi-rs, CLI binary, MCP server
- MCP Server — cloud chat UI integration (the fourth binding surface)