Data Quality & Pipeline Integrity
Write your logic once. Run it as batch, incremental delta stream, point-in-time snapshot, or frozen replay — same query, same result, mathematically proven.
A world model is only as reliable as the facts it contains. Stale observations, contradicting signals, and lineage gaps don't show up as errors — they show up as wrong decisions. ArcFlow treats data quality as a first-class concern: live views maintain expectations continuously, confidence scores surface degrading facts before they corrupt downstream queries, and the temporal layer lets you replay exactly what the model knew at any point in time. No separate pipeline. No separate framework. The same engine that serves your queries enforces their integrity.
Batch == Delta: One Definition, All Execution Modes#
The hardest problem in data pipelines is maintaining two codebases: one for batch ETL and one for streaming. They diverge silently. Bugs appear only in production, only at midnight, only under Monday morning load.
ArcFlow's Z-set algebra eliminates the split. Every live view is both a batch query and a delta-incremental query simultaneously. The engine chooses the execution mode automatically:
-- This query definition works in all four modes:
CREATE LIVE VIEW revenue_by_region AS
MATCH (o:Order)-[:PLACED_IN]->(r:Region)
WHERE o.status = 'completed'
RETURN r.name AS region, sum(o.total) AS revenue, count(o) AS order_count
-- Mode 1: Full batch scan (initial load)
MATCH (row) FROM VIEW revenue_by_region
RETURN row.region, row.revenue
-- Mode 2: Incremental delta (a new Order arrives → only that region updates)
-- Happens automatically. No code change.
-- Mode 3: Frozen snapshot (point-in-time replay)
MATCH (row) FROM VIEW revenue_by_region AS OF seq 1000
RETURN row.region, row.revenue
-- Mode 4: Delta batch (replay deltas from seq 1000 to 2000, no live updates)
CALL db.replayDeltaRange(1000, 2000)The result is mathematically identical in all four modes. This is not an approximation — it is proven via Z-set algebra and validated against large real-world analytical workloads (15.4M rows, 246M cells, 99.48% cell-by-cell match).
Why the equivalence matters#
| Old world (two codebases) | ArcFlow (one definition) |
|---|---|
| Batch ETL job + streaming Flink job | Single live view |
| Diverge silently in edge cases | Mathematically identical by construction |
| Debug "why does the stream give a different number?" | Impossible: same algebra, same operators |
| Backfill requires rewriting the streaming job | AS OF query on the same view |
| Schema change breaks both pipelines | Change the live view definition once |
Pipeline Prune: Automatic Staleness Removal#
Data pipelines accumulate stale facts. Product prices change, users deactivate, sensor readings expire. Without pruning, stale facts silently corrupt downstream aggregations.
ArcFlow pruning works at three levels:
1. Confidence Decay Policies#
Attach a decay policy to any node label. Confidence decreases automatically as facts age, and nodes below the threshold are pruned from live view results:
-- Facts about prices lose confidence after 24 hours, expire after 7 days
CREATE DECAY POLICY price_freshness
ON :Price
HALF_LIFE 86400000 -- confidence halves every 24h
EXPIRE_AFTER 604800000 -- hard-delete after 7 days
-- Sensor readings: 1-hour half-life
CREATE DECAY POLICY sensor_decay
ON :SensorReading
HALF_LIFE 3600000
EXPIRE_AFTER 86400000After policy creation, every live view that touches :Price or :SensorReading automatically excludes expired facts and downweights decayed ones — no ETL job, no cron, no deletion script.
2. Explicit Prune in Pipeline Runs#
-- Before a pipeline run: remove facts below confidence threshold
MATCH (n) WHERE n._confidence < 0.3
DELETE n
-- Or scope to a specific label
MATCH (n:Prediction) WHERE n._confidence < 0.5
AND n._created_at < (timestamp() - 604800000)
DELETE n3. REPROCESS EDGES — Prune-and-Rebuild#
When source data changes, REPROCESS EDGES removes stale derived edges and re-runs extractors only on affected nodes:
-- Reprocess only nodes that changed since last run
REPROCESS EDGES (:Document)-[:EXTRACTED_ENTITY]->(:Entity)
WHERE _updated_at > $last_run_atThis is O(changed nodes), not O(all nodes) — a pipeline that processes 10K documents daily reruns only the 200 that changed.
Automated Data Quality#
Traditional DQ frameworks (Great Expectations, dbt tests) run as separate pipeline steps on a schedule. They check batch snapshots. They miss the moment a bad fact enters the graph.
ArcFlow DQ runs continuously as live views. Every mutation is checked against every DQ rule instantly, in-process, with zero network hops:
Contradiction Detection#
-- Register a live contradiction check (runs on every mutation)
CREATE LIVE VIEW contradictions AS
CALL algo.factContradiction()
YIELD nodeId1, nodeId2, contradiction_type, confidence
RETURN nodeId1, nodeId2, contradiction_type, confidence
ORDER BY confidence DESC
-- Query current violations
MATCH (row) FROM VIEW contradictions
RETURN row.nodeId1, row.nodeId2, row.contradiction_typeWhen node A says "Product X costs $50" and node B says "Product X costs $80," the contradiction fires immediately — not at the next scheduled DQ run.
Freshness Checks#
-- Entities not observed recently are flagged as stale
CREATE LIVE VIEW stale_entities AS
CALL algo.entityFreshness()
YIELD nodeId, name, freshness, last_observed_at
WHERE freshness < 0.4
RETURN nodeId, name, freshness, last_observed_at
-- Alert on stale critical data
MATCH (row) FROM VIEW stale_entities
MATCH (n) WHERE id(n) = row.nodeId AND n:CriticalFact
RETURN row.name, row.last_observed_atSchema Constraints as DQ Rules#
-- Enforce that every Order has an amount
CREATE CONSTRAINT order_amount_required
FOR (o:Order) REQUIRE o.amount IS NOT NULL
-- Enforce uniqueness
CREATE CONSTRAINT unique_product_sku
FOR (p:Product) REQUIRE p.sku IS UNIQUE
-- Enforce value ranges
CREATE CONSTRAINT confidence_range
FOR (n) REQUIRE n._confidence >= 0.0 AND n._confidence <= 1.0Constraint violations are caught at write time — the mutation is rejected with a structured error before the bad data enters the graph.
Custom DQ Rules as Live Views#
Any invariant you can express in GQL becomes a continuous DQ check:
-- Orders without a customer
CREATE LIVE VIEW orphaned_orders AS
MATCH (o:Order)
WHERE NOT (o)<-[:PLACED]-(:Customer)
RETURN o.id, o.total, o._created_at
-- Revenue anomaly: order total > 3σ from mean
CREATE LIVE VIEW revenue_anomalies AS
MATCH (o:Order)
WITH avg(o.total) AS mean_total, stdev(o.total) AS std_total
MATCH (o:Order) WHERE o.total > mean_total + (3 * std_total)
RETURN o.id, o.total, mean_total, std_totalAutomated Feature Checks#
Feature pipelines need to verify that computed features are in range, not degenerate (all NaN, all zero), and consistent across cohorts. ArcFlow's proof gates run these checks automatically as part of the pipeline:
-- Register a proof gate that blocks downstream steps if features are degenerate
CALL db.proofGates()
-- Define feature validity rules
CREATE CONSTRAINT embedding_non_null
FOR (n:Entity) WHERE exists(n.embedding) REQUIRE size(n.embedding) = 128
-- Flag nodes with stale embeddings (source changed after last embed)
CALL algo.staleEmbeddings()
YIELD nodeId, name
RETURN nodeId, name -- these need re-embedding
-- Distribution check: flag if >5% of a cohort has null features
CREATE LIVE VIEW null_feature_rate AS
MATCH (n:Entity)
WITH count(n) AS total,
count(CASE WHEN n.feature_score IS NULL THEN 1 END) AS null_count
RETURN (toFloat(null_count) / total) AS null_rate
HAVING null_rate > 0.05The flywheel CLI exposes these checks as composable pipeline stages:
# Run the full validation suite and gate on failures
arcflow flywheel run --gate proof_gates_passing
# Score current data quality
arcflow flywheel score --view null_feature_rate
# Compare DQ metrics across two snapshots
arcflow flywheel compare --before seq:1000 --after seq:2000Auto DAG / Lineage Documentation#
Every CREATE LIVE VIEW is a node in the dependency graph. Every MATCH FROM VIEW creates an edge. ArcFlow tracks this automatically — the data lineage DAG is the graph, not a separate metadata system:
-- Inspect the live view dependency DAG
CALL db.liveViews
-- Returns: name, query, dependencies, dependents, last_updated_ms
-- For each view, see what it reads from and writes to
CALL db.viewStats('revenue_by_region')
-- Returns: upstream_views, downstream_views, memory_bytes, avg_update_msThis means:
- Schema changes propagate automatically. Rename a property → the engine flags affected views.
- Impact analysis is a graph query. Which views does
:Order.totalaffect? - Documentation is always current. The graph IS the documentation.
-- Which live views would break if we rename Order.total to Order.amount?
MATCH (v:LiveView)-[:READS]->(p:Property {node: 'Order', name: 'total'})
RETURN v.name, v.query
-- Full upstream lineage for a metric
MATCH path = (source:Table)-[:FEEDS*1..10]->(target:LiveView {name: 'revenue_by_region'})
RETURN [n IN nodes(path) | n.name] AS lineage_chainFor pipeline documentation generation, the CLI synth command emits a structured lineage report:
arcflow agent-context synth --format lineage --output lineage.jsonDrill-Through & Reconciliation#
The historical BI gap: a number on a dashboard is wrong, and you have no way to find out why. You can see the aggregated result, but you cannot drill into the facts that produced it.
ArcFlow closes this gap via temporal queries and the mutation log. Every number is reproducible. Every disagreement is traceable.
Reproduce Any Historical Value#
-- What was revenue_by_region showing last Tuesday at 9 AM?
MATCH (row) FROM VIEW revenue_by_region AS OF 1710565200000
WHERE row.region = 'EMEA'
RETURN row.revenue
-- Step back through the WAL to find when the number changed
CALL db.changesSince(42)
-- Returns: every mutation after sequence 42, in orderCompare Two Snapshots#
-- What changed between the Monday report and Tuesday report?
CALL db.temporalCompare(
'MATCH (o:Order) RETURN sum(o.total) AS revenue', -- as of Monday 9AM
'MATCH (o:Order) RETURN sum(o.total) AS revenue' -- as of Tuesday 9AM
)
-- Highlights: +$12,400 from new orders, -$3,200 from refunds processedFind the Source of a Discrepancy#
-- The dashboard shows $1.2M but the GL shows $1.18M. Find the diff.
MATCH (o:Order) WHERE o.status = 'completed'
AND o._updated_at BETWEEN $period_start AND $period_end
WITH sum(o.total) AS arcflow_total
MATCH (g:GLEntry) WHERE g._source = 'ERP'
AND g.date BETWEEN $period_start AND $period_end
WITH sum(g.amount) AS gl_total
RETURN arcflow_total, gl_total, (arcflow_total - gl_total) AS gap
-- Drill into the gap: orders in ArcFlow not yet in GL
MATCH (o:Order) WHERE o.status = 'completed'
AND NOT (o)-[:POSTED_TO]->(:GLEntry)
RETURN o.id, o.total, o._created_atDeterministic Replay#
If you need to reproduce an exact pipeline run from a prior point in time:
-- Replay all mutations from seq 500 to 1000 and verify the result matches
CALL db.temporalReplay('pipeline_run_20260315')
-- Cryptographic fingerprint of current state (for audit)
CALL db.fingerprint()
-- Returns: SHA-256 of the full graph state — reproducible across replaysThe BI / ML Infrastructure Gap#
For years, data quality infrastructure was effectively ML-specific. The toolchain — Great Expectations for expectation testing, dbt for lineage and tests, MLflow for model tracking, feature stores for versioned features — required significant engineering investment. The pattern was: ML teams have a data platform; BI teams have a BI tool.
The gap was structural:
| Capability | ML ecosystem | BI ecosystem (historically) |
|---|---|---|
| Expectation testing | Great Expectations, dbt tests | Manual spot checks |
| Drill-through to raw facts | Feature store lineage, MLflow | None (pivot table → dead end) |
| Continuous DQ monitoring | Airflow pipelines on a schedule | Alerts on final metrics only |
| Data lineage | dbt DAG, feature store graph | Tool-specific, often manual |
| Point-in-time replay | Feast / Hopsworks point-in-time correct joins | None |
| Automated feature checks | Feature platform validation | Not applicable |
Why ML infrastructure had it first: ML training requires reproducible datasets. A model trained on stale or incorrect features fails silently — you don't find out until the model degrades in production weeks later. That forcing function created the investment in DQ infrastructure. BI teams, by contrast, got blamed after the fact for wrong numbers, not before.
Why expert AI prompts + backends have filled part of the gap: Some teams built DQ workflows on top of LLMs — an AI that reads a schema and generates test cases, or an agent that navigates lineage metadata to explain a number. This works, but it requires a backend API, a prompt engineering specialist, and ongoing maintenance. The AI generates the check; a separate system runs it.
What ArcFlow provides instead: The graph IS the lineage system. Live views ARE the expectation tests. Temporal queries ARE the drill-through mechanism. These are not separate systems calling each other — they are first-class query features in the same embedded database. A DQ rule is a CREATE LIVE VIEW. Drill-through is AS OF. Expectation testing is a CREATE CONSTRAINT. No AI prompt needed to generate the check — you write it once in GQL and the engine maintains it continuously.
For scenarios where dynamic expectation generation is valuable (e.g., an LLM that proposes DQ rules based on schema analysis), ArcFlow provides the enforcement backend that the LLM-generated rules run against. The expert AI prompt generates the CREATE LIVE VIEW definition; ArcFlow runs it incrementally with zero additional infrastructure.
See Also#
- Live Queries — standing queries, live views, and always-current state
- Event Sourcing — WAL, AS OF queries, deterministic replay
- Algorithms —
algo.factContradiction(),algo.entityFreshness(),algo.staleEmbeddings() - Trusted RAG — Confidence scoring and provenance trails
- Skills — PROCESS NODE, REPROCESS EDGES pipeline patterns