Time-Varying Filters: Delayed Emission in Streaming SQL

· 11 min read
stream-processing rust datafusion sql

Streaming systems are optimized for speed. Records arrive from sources like Kafka, flow through a pipeline of operators, and reach downstream consumers — dashboards, APIs, alerting systems — as fast as possible. That’s the default, and it’s usually what you want.

But not all consumers want data immediately. A data provider might serve both real-time and delayed tiers of the same feed — same processing, different emission schedules. Or a pipeline that ingests embargoed data (economic indicators, earnings reports) and processes it ahead of time, but can’t publish until the embargo lifts. In both cases, the processing logic is the same — it’s the timing of emission that changes.

The typical solution is a separate buffering service: consume the stream, hold records in some queue, re-emit them after the delay. That works, but it’s an entirely separate system with its own state management, failure handling, and deployment lifecycle.

At my day job, I work on a streaming dataflow engine built on Apache DataFusion, and this problem came up recently. The question was: could we express delayed emission in SQL? It turns out other streaming systems have explored this idea — RisingWave and Materialize both support what they call temporal filters, which use time-based predicates to control when records are emitted. I wanted to bring the same capability to our engine.

Filtering on Time

Consider this query:

SQLSELECT *
FROM events
WHERE event_time + INTERVAL '5' SECOND <= WATERMARK_TS()

WATERMARK_TS() returns the source’s current watermark — a monotonically advancing timestamp that tracks how far the stream has progressed in event time. One way to interpret this filter is straightforwardly: evaluate it on each record as it arrives, substituting the current watermark value. But if the watermark is derived from event_time — which is common — then at the moment a record arrives, the watermark is roughly at that record’s event time. The predicate becomes something like event_time + 5s <= ~event_time, which is always false. Every record gets filtered out. Not useful.

The more interesting interpretation comes from noticing that watermarks are monotonic. A condition that’s false now may become true later. When a record arrives with event_time = 10:00:01, the predicate requires the watermark to reach at least 10:00:06. It hasn’t yet — but it will, once future events push the watermark forward. At that point, the condition holds.

This is the key insight: a predicate that depends on the watermark is not a static filter — it’s a time-varying one. Its truth value changes as the stream progresses. Instead of evaluating once and discarding, the system can buffer records and release them later, once the watermark has advanced far enough.

Making this work in a DataFusion-based engine required a custom marker UDF, a dedicated optimizer rule, and a specialized physical operator — each one building on the extensibility points that DataFusion provides.

A Function That Never Runs

WATERMARK_TS() is what we call a marker UDF. It has no physical implementation — if it ever reaches execution, it throws an error. Its sole purpose is to be recognized and removed by the optimizer. This is a useful pattern: register a function so that users can reference it in SQL, but handle all of its semantics at plan time through rewrite rules.

The UDF is registered with the DataFusion session context during initialization:

RUSTctx.register_udf(watermark_ts_udf());

Its implementation is intentionally minimal:

RUSTpub fn watermark_ts_udf() -> ScalarUDF {
    ScalarUDF::new_from_impl(WatermarkTs::new())
}

impl ScalarUDFImpl for WatermarkTs {
    fn return_type(&self, _args: &[DataType]) -> Result<DataType> {
        Ok(DataType::Timestamp(TimeUnit::Nanosecond, None))
    }

    fn invoke_batch(
        &self,
        _args: &[ColumnarValue],
        _number_of_rows: usize,
    ) -> Result<ColumnarValue> {
        internal_err!(
            "watermark_ts is a marker UDF and should not \
            reach physical execution"
        )
    }

    fn is_volatile(&self) -> bool {
        true // prevents constant folding
    }
}

Marking it as volatile prevents DataFusion’s constant-folding optimizer from evaluating it at plan time — we need it to survive into the plan so our custom rule can find it.

From Filter to TimeVaryingFilter

The optimizer rule is where the interesting work happens. It runs after DataFusion’s standard optimizations and scans the logical plan for Filter nodes whose predicates contain calls to WATERMARK_TS(). When it finds one, it rewrites the Filter into a custom logical plan node: TimeVaryingFilter.

The rewrite isn’t a simple 1:1 replacement. The rule needs to decompose the predicate into a release plan — a structured representation of when and under what conditions records should be emitted. This matters because time-varying conditions can be mixed with static ones:

SQLSELECT *
FROM events
WHERE
  (status = 'final' AND event_time + INTERVAL '5' SECOND <= WATERMARK_TS())
  OR status = 'preliminary'

Here, status = 'preliminary' records should be emitted immediately — there’s no time-varying condition on that branch. But status = 'final' records should be delayed by 5 seconds. The optimizer rule needs to decompose this into separate branches, each with its own release semantics.

Building a Release Plan

The rule’s analysis proceeds in three stages:

1. Split into disjuncts. The top-level predicate is broken apart along OR boundaries. Each disjunct becomes a candidate branch in the release plan.

RUST// Split A OR B OR C into [A, B, C]
let disjuncts = split_disjunction(filter.predicate());

2. Classify each disjunct. Within each disjunct, conjuncts (AND-connected expressions) are separated into static conditions (no WATERMARK_TS() reference) and time-varying conditions (contains WATERMARK_TS()). The rule enforces two constraints:

  • At most one time-varying condition per conjunction. Multiple time-varying conditions in the same AND would create ambiguity about what release time to use.
  • WATERMARK_TS() must appear as an upper bound. Using it as a lower bound (e.g. WATERMARK_TS() <= event_time) would mean the condition is true initially and becomes permanently false as the watermark advances — useless in an append-only system.
RUSTfor conjunct in split_conjunction(&disjunct) {
    if contains_watermark_ts(conjunct) {
        // Validate: upper-bound only, extract release_at expression
        let release_at = extract_release_expression(conjunct)?;
        time_varying = Some(release_at);
    } else {
        static_conjuncts.push(conjunct.clone());
    }
}

3. Build branches. Each disjunct becomes a ReleaseBranch with two optional components:

  • A guard: the conjunction of all static conditions in that branch. If the guard is None, the branch matches unconditionally.
  • A release_at: the time-varying expression, rewritten to remove the WATERMARK_TS() reference (since the physical operator will compare against the watermark directly). If release_at is None, matching records are emitted immediately.
RUSTpub struct ReleasePlan {
    pub branches: Vec<ReleaseBranch>,
}

pub struct ReleaseBranch {
    pub guard: Option<Arc<dyn PhysicalExpr>>,
    pub release_at: Option<Arc<dyn PhysicalExpr>>,
}

For the SQL example above, the release plan would have two branches:

BranchGuardrelease_at
1status = 'final'event_time + INTERVAL '5s'
2status = 'preliminary'None (immediate)

The ordering of branches matters: the physical operator evaluates them top to bottom, and the first matching branch determines the record’s fate.

The Physical Operator

Once the optimizer produces a TimeVaryingFilter logical node, the planner lowers it into a physical operator: TimeVaryingFilterOperator. This operator is responsible for the actual buffering and release logic at runtime.

The core data structure is a BTreeMap<i64, Vec<BufferedBatch>>, keyed by release time in nanoseconds. The BTreeMap keeps entries sorted, which is exactly what we need — when a watermark arrives, we can efficiently drain all entries whose release time has been reached.

Processing Batches

When a batch arrives from the input stream, the operator evaluates the release plan’s branches in order:

RUSTfn process_batch(
    &mut self,
    batch: RecordBatch,
    arrival_watermark: i64,
) -> Result<Option<RecordBatch>> {
    for (i, branch) in self.release_plan.branches.iter().enumerate() {
        // Apply guard filter to find which rows match this branch
        let mask = match &branch.guard {
            Some(guard) => evaluate_boolean(guard, &batch)?,
            None => BooleanArray::from(vec![true; batch.num_rows()]),
        };

        let matched = filter_record_batch(&batch, &mask)?;
        if matched.num_rows() == 0 {
            continue;
        }

        match &branch.release_at {
            None => {
                // Immediate branch — emit matched rows now
                return Ok(Some(matched));
            }
            Some(release_at_expr) => {
                // Time-varying branch — compute release times and buffer
                let release_times = evaluate_i64(release_at_expr, &matched)?;
                self.buffer_batch(matched, &release_times, arrival_watermark);
            }
        }
    }
    Ok(None)
}

Buffering and Draining

Buffering inserts each batch’s rows into the BTreeMap at their computed release time. Multiple batches can share the same release time — they’re appended to the same Vec.

When a watermark arrives, the operator drains all entries that are ready:

RUSTfn drain_up_to(&mut self, watermark_nanos: i64) -> Vec<(RecordBatch, i64)> {
    let mut released = Vec::new();

    // split_off returns everything > watermark_nanos,
    // leaving everything <= watermark_nanos in self.buffer
    let remaining = self.buffer.split_off(&(watermark_nanos + 1));
    let ready = std::mem::replace(&mut self.buffer, remaining);

    for (_release_time, batches) in ready {
        for buffered in batches {
            released.push((
                buffered.batch,
                buffered.arrival_watermark,
            ));
        }
    }

    released
}

The BTreeMap::split_off call is the key insight for efficiency — it splits the tree in O(log n), giving us all entries up to the watermark in one operation. No iteration through irrelevant entries, no scanning the entire buffer.

Seeing It in Action

Click through each stage to see how the operator processes three records with a 5-second delay:

Three records arrive from the source. The operator computes each record's release time: event_time + 5s.

R1 event_time = :01 release_at = :01 + 5s = :06
R2 event_time = :02 release_at = :02 + 5s = :07
R3 event_time = :03 release_at = :03 + 5s = :08
watermark
:03

For each record, release_at > watermark (:03), so all three are buffered in a BTreeMap ordered by release time — ready to be drained efficiently when the watermark advances.

BTreeMap<release_time, Vec<Batch>>
:06 [R1]
:07 [R2]
:08 [R3]
watermark
:03

The watermark advances to :07. The operator drains all entries with release_time ≤ :07, emitting R1 and R2 downstream. R3 remains buffered.

BTreeMap<release_time, Vec<Batch>>
:06 [R1] released
:07 [R2] released
:08 [R3]
watermark
:07
↓ downstream
R1 R2

The watermark reaches :08. R3 is released and the buffer is empty. All three records have been emitted with their intended delay.

BTreeMap<release_time, Vec<Batch>>
∅ empty
watermark
:08
↓ downstream
R1 R2 R3

Watermark Propagation

There’s a subtle correctness concern with watermark forwarding. In a normal filter, watermarks pass through unchanged — a filter only removes rows, it doesn’t change the temporal semantics of the stream. But a time-varying filter delays rows. If we forwarded the input watermark directly, downstream operators would see a watermark claiming “all events up to time T have been processed,” when in reality the time-varying filter is still holding some of them.

The operator takes a conservative approach: the output watermark is derived from the arrival watermark of released records — specifically, the maximum arrival watermark across all records emitted in the current drain. This ensures the output watermark still correlates with the actual data that has been emitted, rather than racing ahead of it.

RUSTfn compute_output_watermark(
    &self,
    released: &[(RecordBatch, i64)],
) -> Option<i64> {
    released
        .iter()
        .map(|(_, arrival_wm)| *arrival_wm)
        .max()
}

Limitations

This implementation is straightforward, but it has sharp edges under certain workloads. The buffer is entirely in-memory — every record waiting for release lives on the heap as a full RecordBatch. For a high-throughput stream with a long delay (say, 15 minutes on a feed producing millions of records per second), that’s a lot of resident memory. There’s no spill-to-disk mechanism; if the buffer outgrows available RAM, the operator will exert memory pressure on the entire process.

Watermark stalls are another concern. If the source goes idle — no new events, no watermark advancement — buffered records sit indefinitely with no way to release them. Our engine supports synthetic watermark advancement during idle periods, which mitigates this, but it’s worth calling out as a dependency.

Finally, the buffer’s contents are included in checkpoint snapshots (serialized as Arrow IPC). A large buffer means large checkpoints, which increases both snapshot latency and storage costs. For workloads where the buffer is consistently large, this could become the bottleneck in the checkpoint cycle.

For now, this is good enough — the workloads we’re targeting have moderate throughput and short delays. But a production-hardened version would likely need managed memory with disk spillover, buffer size limits with backpressure, and incremental checkpointing to avoid serializing the entire buffer on every snapshot.

Why Upper-Bound Only?

We require WATERMARK_TS() to appear as the upper bound of the comparison — that is, the side that the time-varying expression must be less than or equal to. This isn’t arbitrary.

Consider the inverse: WATERMARK_TS() <= event_time + INTERVAL '5' SECOND. When a record arrives with event_time = 10:00:01, the condition is WATERMARK_TS() <= 10:00:06. If the watermark is at 10:00:03, this is true. But watermarks are monotonic — once the watermark passes 10:00:06, the condition becomes permanently false. In an append-only system, that means the record should have been emitted immediately and can never be emitted later. There’s no useful “wait” semantic — it’s just a regular filter with extra steps.

The upper-bound constraint ensures that time-varying conditions always express “wait until later” — a predicate that is false now but will become true in the future. This is the only direction that makes sense for buffered, delayed emission.

SQL as a Scheduling Language

What I find most interesting about this feature isn’t the implementation details — it’s what it says about SQL in a streaming context. A WHERE clause that references a watermark isn’t really filtering. It’s a scheduling directive: “emit this record when the watermark says it’s time.” The predicate encodes temporal semantics that the system can reason about, optimize, and execute — all within the existing query lifecycle.

DataFusion’s extension points made this practical. ScalarUDFImpl for the marker function, OptimizerRule for the rewrite, UserDefinedLogicalNodeCore for the custom plan node — each one is a well-defined boundary where streaming semantics can be introduced without forking the framework. The time-varying filter participates in the same plan lifecycle as every other node: optimized by the framework’s existing passes, lowered through the same planner, executed alongside standard operators.

The result is that delayed emission — something that would normally require a separate buffering system — becomes a one-line SQL change. The implementation has rough edges, but the abstraction is right: teach the optimizer what a watermark predicate means, and the rest follows from machinery that already exists.