10x Faster Protobuf-to-Arrow by Ditching Reflection

· 7 min read
stream processing rust protobuf arrow

At Massive, our streaming data engine — built on Apache DataFusion and Apache Arrow — ingests real-time financial market data from Kafka, decodes it from protobuf, and processes it through SQL-defined dataflows. The decode step sits on the hottest path in the system: every single message passes through it. So when profiling revealed that protobuf decoding was consuming the majority of our CPU budget, it was worth taking a closer look.

The fix was conceptually simple — replace runtime reflection with compile-time code generation — and the result was roughly a 10x throughput improvement with significantly lower resource usage. But the interesting part isn’t the fix itself. It’s why the original approach was so expensive, and how the abstraction that made the system flexible became the bottleneck that nearly broke it.

Let the Profile Lead

I was load-testing our options quotes pipeline — the kind of workload where OPRA pushes millions of quote updates per second across thousands of tickers. The test simulated 10,000 distinct groups over ~1GB of data (~20 million messages). I expected the bottleneck to be somewhere in the execution layer, but the flamegraph pointed squarely at one place: DynamicMessage::decode.

Throughput topped out at ~200k msgs/sec. The system was CPU-bound on parsing, not on the actual data processing.

To understand why, let’s dive into how the decoding pipeline works.

How We Got Here

Our streaming engine, atlas-flow, has a pluggable codec layer. You declare an encoding in SQL when defining a source:

SQLCREATE SOURCE opra_quotes (
    ticker VARCHAR,
    timestamp TIMESTAMP,
    bid_price DOUBLE,
    ask_price DOUBLE,
    ...
) WITH (
    'connector' = 'kafka',
    'encoding'  = 'protobuf',
    'message'   = 'stocks.QuoteNBBO',
    'location'  = '/descriptors/stocks.pb'
);

Notice that the SQL schema is decoupled from the protobuf definition — the user declares which fields and types they want as Arrow columns, and the codec layer bridges the gap. The protobuf codec loads a compiled .pb descriptor file at runtime, then uses prost-reflect’s DynamicMessage to decode arbitrary protobuf messages without needing generated Rust types. This is a common pattern in SQL-based stream processors — systems like RisingWave use the same approach for the same reason: it lets users work with any protobuf schema without recompilation.

The decoding path looked something like this:

RUSTfn append_message(&mut self, bytes: &[u8]) -> Result<()> {
    // Step 1: Decode into a DynamicMessage
    let dynamic_message = DynamicMessage::decode(
        self.message_descriptor.clone(),  // clone the descriptor
        bytes,
    )?;

    // Step 2: For each field in the Arrow schema...
    for (idx, arrow_field) in self.schema.fields().iter().enumerate() {
        // ...look up the protobuf field by name (HashMap lookup)
        let field_descriptor = self.message_descriptor
            .get_field_by_name(arrow_field.name())?;

        // ...extract the value (returns a Cow<Value> enum)
        let field_value = dynamic_message.get_field(&field_descriptor);

        // ...pattern-match on the Value variant and append to the
        // appropriate Arrow builder (Box<dyn ArrayBuilder>)
        append_value_to_builder(
            &mut self.builders[idx],  // vtable dispatch
            &field_value,
            // ... type metadata ...
        )?;
    }
    Ok(())
}

Death by a Thousand Lookups

At 200k+ messages per second, each with 12 fields, the per-message costs in that loop add up fast:

  1. Descriptor cloning: DynamicMessage::decode takes ownership of a MessageDescriptor, which involves reference-counted interior data. Not free at scale.
  2. HashMap lookups per field: get_field_by_name does a string-keyed lookup for every field of every message. For a 12-field message at 2M msgs/sec, that’s 24 million hash lookups per second.
  3. Dynamic value extraction: get_field returns a Cow<Value> — an enum that wraps every possible protobuf type. Extracting the actual value requires pattern matching and, in some cases, heap allocation.
  4. Vtable dispatch on builders: The builders are stored as Vec<Box<dyn ArrayBuilder>>, so every append call goes through dynamic dispatch. The compiler can’t inline anything.
  5. Presence checks and cardinality metadata: Each field requires checking has_field, supports_presence, is_list, and cardinality — all method calls on the descriptor.

The common thread: every one of these operations answers a question whose answer is known at compile time. Which fields does this message have? What are their types? How should they map to Arrow? The dynamic decoder asks and re-answers these questions for every single message. Millions of times per second, we’re rediscovering facts about a schema that hasn’t changed since the binary was built.

Compile-Time Answers to Runtime Questions

For any given dataflow, the protobuf schema is fixed at deploy time. We know exactly which message type we’re decoding, which fields it has, and what Arrow types they map to. So what if we just… told the compiler?

In a monolithic streaming database, you’d be stuck. The decoding pipeline is an internal implementation detail — you can’t swap it out without forking the project. But atlas-flow is a library, not a standalone system. It’s designed to be embedded, and its codec layer is one of several extension points exposed to the host application. So adding a custom decoding path means implementing a trait and registering it — not patching framework internals.

We introduced a DecoderFactory trait and a Custom encoding variant that lets the host application register statically-typed decoders:

RUSTctx.register_encoding(
    "custom.proto+stocks.QuoteNBBO",
    Some(Arc::new(StocksQuoteNbboDecoderFactory)),
    None, // no custom encoder needed
)?;

Referenced in SQL with a single line change:

SQLCREATE SOURCE opra_quotes (...) WITH (
    'connector' = 'kafka',
    'encoding'  = 'custom.proto+stocks.QuoteNBBO'
);

The custom decoder uses prost::Message (the derive macro, not the reflection API) and concrete Arrow builder types:

RUSTpub struct StocksQuoteNbboDecoder {
    // Concrete builder types — no Box<dyn>, no vtable
    ticker: StringBuilder,
    timestamp: TimestampNanosecondBuilder,
    sequence_number: Int64Builder,
    bid_price: Float64Builder,
    bid_size: UInt32Builder,
    // ...
    batch_size: usize,
    len: usize,
}

impl StocksQuoteNbboDecoder {
    #[inline]
    fn decode_message(&mut self, bytes: &[u8]) -> Result<()> {
        // prost::Message::decode — static, generated code
        let msg = proto_lib::stocks::QuoteNbbo::decode(bytes)?;

        // Direct field access — no lookups, no pattern matching
        if let Some(base) = &msg.base {
            self.ticker.append_value(&base.ticker);
            self.timestamp.append_value(base.timestamp);
            self.sequence_number.append_value(base.sequence_number);
        } else {
            self.ticker.append_null();
            self.timestamp.append_null();
            self.sequence_number.append_null();
        }

        self.bid_price.append_value(msg.bid_price);
        self.bid_size.append_value(msg.bid_size);
        // ...

        self.len += 1;
        Ok(())
    }
}

Every source of overhead from the dynamic path is gone:

Dynamic (DynamicMessage)Static (prost::Message)
HashMap lookup per fieldDirect struct field access
Cow<Value> enum extractionConcrete types, no wrapping
Box<dyn ArrayBuilder> vtable callsConcrete builder types, inlineable
Descriptor clone per messageNo descriptors at runtime
Presence/cardinality checks per fieldCompile-time known structure

The prost::Message derive macro generates a decode implementation that knows the exact wire format at compile time. It matches on tag numbers directly — no hash tables, no string comparisons, no dynamic dispatch. The generated code is essentially a hand-optimized state machine over the protobuf wire format. And with concrete builder types instead of Box<dyn ArrayBuilder>, the compiler can inline the append calls — eliminating the vtable indirection that, in a hot loop processing millions of items per second, quietly compounds into a real bottleneck.

200k to 2M

Same benchmark — ~1GB of data, ~20 million messages, 10,000 distinct groups — now with the static decoder:

Throughput jumped from ~200k msgs/sec to roughly 2M msgs/sec. CPU utilization during the run dropped dramatically, and the time to churn through the full dataset fell from minutes to seconds.

This isn’t a synthetic micro-benchmark. It’s the actual workload shape we see for US options quotes (OPRA), which generate millions of quote updates per second across thousands of tickers. At 200k msgs/sec, we couldn’t keep up with the feed. At 2M msgs/sec, we have headroom.

Living With Both

The obvious cost is that custom decoders must be written and maintained per message type. When the protobuf schema changes, the decoder needs updating too. More code, tightly coupled to the schema.

But the schemas that need this are the high-throughput ones — and there aren’t many. The long tail of low-volume message types continues using the dynamic decoder, where flexibility matters more than raw speed. Both paths coexist; the encoding name in the SQL declaration is all that determines which one runs.

This is where the library design pays off. The DecoderFactory trait isn’t a special-case escape hatch — it’s the same interface the built-in protobuf and JSON codecs implement. Custom decoders are first-class citizens, registered with the execution context alongside everything else. The framework doesn’t need to know or care what custom.proto+stocks.QuoteNBBO does internally; it just calls create_decoder and gets back something that produces RecordBatches. The optimization lives entirely in user code, outside the framework boundary, which means it doesn’t add complexity to the core engine and can evolve independently of it.

In an opaque streaming database, hitting this same prost-reflect bottleneck would mean either contributing an optimization upstream or accepting the throughput ceiling. Here, it was an afternoon of work in our application layer — no framework changes required.