The One-Line Fix That Eliminated a Full Sort

· 4 min read
rust query-optimization datafusion

I was staring at a query plan that didn’t make sense. The data was already sorted — every partition declared an output ordering — and the plan should have been a cheap streaming merge. Instead, DataFusion’s physical optimizer was inserting a full SortExec: materializing everything, sorting it from scratch, then merging. Somewhere in the optimizer pipeline, the ordering was being quietly destroyed and then expensively reconstructed. One pass was undoing work that another pass depended on, and neither knew about the other.

The fix was a single if guard. But finding it meant tracing ordering properties through two independent optimizer passes and a multi-level plan tree.

The Setup

Our system has a custom execution node — call it RemoteExec — that routes partitions to remote workers for execution. It requires SinglePartition input (the partitions need to be merged before being sent over the network) and it maintains input order — whatever ordering goes in comes out unchanged.

The plan in question looked something like this after our custom optimizer rules ran:

SortPreservingMergeExec: [col ASC]
  UnionExec
    RemoteExec
      DataSourceExec (2 partitions, sorted [col ASC])
    RemoteExec
      DataSourceExec (3 partitions, sorted [col ASC])

Each DataSourceExec produces sorted partitions. RemoteExec requires them merged into a single partition, and EnforceDistribution correctly inserts a SortPreservingMergeExec under each RemoteExec to satisfy this — merging multiple sorted streams into one sorted stream. Since RemoteExec maintains order, the ordering flows through, and the top-level SortPreservingMergeExec can do a streaming merge across the UnionExec branches. No full sort needed.

That’s what should happen. Here’s what actually happened:

SortExec: [col ASC]                    <-- full sort, shouldn't be here
  CoalescePartitionsExec
    UnionExec
      RemoteExec
        CoalescePartitionsExec         <-- ordering destroyed
          DataSourceExec (2 partitions, sorted [col ASC])
      RemoteExec
        CoalescePartitionsExec         <-- ordering destroyed
          DataSourceExec (3 partitions, sorted [col ASC])

EnforceDistribution replaced the SortPreservingMergeExec under each RemoteExec with CoalescePartitionsExec — which merges partitions into one but doesn’t preserve ordering. The ordering is gone. UnionExec now has unordered children, and the subsequent EnforceSorting pass has no choice but to insert a full SortExec to recover the ordering. A streaming merge became a full materialization and sort.

The Bug

The issue is in ensure_distribution, the core function of the EnforceDistribution optimizer pass. When a child node satisfies its parent’s distribution requirement but has no ordering requirement from the parent, the optimizer considers replacing order-preserving variants (like SortPreservingMergeExec) with cheaper alternatives (like CoalescePartitionsExec). The logic is: if nobody above you needs the data sorted, why pay the cost of a merge that preserves order?

The problem is the SinglePartition | HashPartitioned branch did this unconditionally:

RUSTDistribution::SinglePartition | Distribution::HashPartitioned(_) => {
    // Since there is no ordering requirement, preserving ordering
    // is pointless
    child = replace_order_preserving_variants(child)?;
}

But “no ordering requirement from the parent” is not the same as “nobody needs this ordering.” If the parent maintains input order, the ordering flows through it to ancestors — and one of those ancestors might need it. That’s exactly our case: RemoteExec doesn’t require ordering, but it maintains it, and the SortPreservingMergeExec above the UnionExec depends on it.

The UnspecifiedDistribution branch already handled this correctly:

RUSTDistribution::UnspecifiedDistribution => {
    if !maintains {
        child = replace_order_preserving_variants(child)?;
    }
}

The !maintains guard was just missing from the other branch.

The Fix

RUSTDistribution::SinglePartition | Distribution::HashPartitioned(_) => {
    if !maintains {
        child = replace_order_preserving_variants(child)?;
    }
}

That’s it. One condition, aligning the SinglePartition | HashPartitioned branch with the UnspecifiedDistribution branch. The SortPreservingMergeExec nodes survive under each RemoteExec, ordering flows through the UnionExec, and the SortExec disappears from the plan entirely.

Optimizer Passes Interact

What makes bugs like this subtle is that EnforceDistribution and EnforceSorting are separate passes that run independently. EnforceDistribution doesn’t think about sort requirements — that’s EnforceSorting’s job. And EnforceSorting doesn’t know that the ordering it’s repairing was needlessly destroyed by a previous pass. Each pass is locally reasonable; the bug only manifests in their interaction.

The fix is small, but the debugging wasn’t — it required reading both passes, understanding when order-preserving variants are replaced and why, and tracing the ordering properties through a multi-level plan tree. The one-line change just happened to be where all that understanding converged.

The impact, though, is significant. A streaming merge is O(n) with minimal memory overhead — it pulls one row at a time from each sorted input. A full sort is O(n log n), requires materializing the entire dataset in memory, and is pipeline-breaking — it can’t produce any output until it’s seen every row. For any system serving low-latency queries, that difference matters a lot: a streaming merge can start emitting rows immediately, while a sort has to wait for every last row before it can begin. As cardinality grows, the gap widens — and under the right conditions, we saw the corrected plans run an order of magnitude faster. For a one-line fix, that’s a good return.

The upstream issue and fix are at apache/datafusion#21096 and apache/datafusion#21097.