Skip to content

Conversation

@albertlockett
Copy link
Member

@albertlockett albertlockett commented Aug 28, 2025

part of #863

Because some OTAP fields are optional, in a stream of record batches we may receive subsequent batches with different schemas. Parquet doesn't support having row groups with different sets of column chunks, which means we need to know the schema a-priori when the writer is created.

This PR adds code to normalize the schema of the record batch before writing by:

  • putting all the fields in the same order
  • creating all null/default value columns for any missing column

The missing columns should have a small overhead when written to disk, because parquet will either write an entirely empty column chunk for the null column (all null count, no data), or and for all default-value columns, parquet will use dictionary and RLE encoding by default, leading to a small column chunk with a single value value in dict & a single run for the key.

What's unfortunate is that we still materialize an all-null column before writing with the length of the record batch. This can be optimized when run-end encoded arrays are supported in parquet, because we could just create a run array with a single run of null/default value. The arrow community is currently working on adding support (see apache/arrow-rs#7713 & apache/arrow-rs#8069).

@albertlockett albertlockett requested a review from a team as a code owner August 28, 2025 16:09
@github-actions github-actions bot added the rust Pull requests that update Rust code label Aug 28, 2025
@codecov
Copy link

codecov bot commented Aug 28, 2025

Codecov Report

❌ Patch coverage is 95.85799% with 28 lines in your changes missing coverage. Please review.
✅ Project coverage is 81.18%. Comparing base (0d3422f) to head (0bd3217).
⚠️ Report is 1 commits behind head on main.

Additional details and impacted files
@@            Coverage Diff             @@
##             main    #1024      +/-   ##
==========================================
+ Coverage   81.08%   81.18%   +0.10%     
==========================================
  Files         363      364       +1     
  Lines       89424    90095     +671     
==========================================
+ Hits        72508    73148     +640     
- Misses      16388    16419      +31     
  Partials      528      528              
Components Coverage Δ
otap-dataflow 81.21% <95.97%> (+0.33%) ⬆️
beaubourg ∅ <ø> (∅)
otel-arrow-rust 87.07% <80.00%> (-0.01%) ⬇️
query_abstraction 80.61% <ø> (ø)
query_engine 91.05% <ø> (ø)
syslog_cef_receivers ∅ <ø> (∅)
otel-arrow-go 52.82% <ø> (ø)
🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Comment on lines +11 to +14
//! This means we can't receive two consecutive OTAP batches for some payload type and write them
//! into the same writer. To handle this, we insert all null columns for missing columns (or all
//! default-value where the column is not nullable), and also arrange the columns so they're always
//! in the same order.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably don't have any other simple option to implement to resolve this limitation. However, it's important to keep an eye on the implications this will have, particularly on memory consumption across the parquet -> arrow -> query engine path. It might have no effect, but we'll need to verify that.

Copy link
Member Author

@albertlockett albertlockett Aug 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lquerel I agree. Related to memory consumption, there's an optimization to switch the placeholders we create with a more efficient representation. We can do this once run-end encoding is supported in Parquet (work in progress).

Say we're adding a placeholder for a missing Int32 column, we'd need to add an all nulls Int32Array which still has a values buffer containing four bytes each row plus the nulls buffer:

values buf = [0, 0, ... ] // len = num rows
nulls buf  = [false, false ... ] // len = num rows

Eventually we'll change to use a RunArray, which would contain an Int32Array of length 1, plus a single run specifying to use the same value for the entire length:

values buf = [0]
nulls buf  = [false]
run ends   = [num_rows]

I think we could maybe even reuse the values & nulls buffer across multiple column replacements for the same datatypes.

I will create a followup issue to track this optimization

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wrt to querying, I think the behvaiour will depend on the query engine but we can definitely verify. I imagine that generally the overhead of the extra metadata from these additional columns would be small compared to the size of the data. Depending on the query, we'd still probably have empty arrays in memory for these columns if the user selected them.

Copy link
Contributor

@lquerel lquerel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. See my comment.

@lquerel lquerel enabled auto-merge August 28, 2025 21:11
@lquerel lquerel added this pull request to the merge queue Aug 28, 2025
Merged via the queue into open-telemetry:main with commit fc324c5 Aug 28, 2025
35 checks passed
clhain pushed a commit to clhain/otel-arrow that referenced this pull request Oct 15, 2025
part of open-telemetry#863 

Because some OTAP fields are optional, in a stream of record batches we
may receive subsequent batches with different schemas. Parquet doesn't
support having row groups with different sets of column chunks, which
means we need to know the schema a-priori when the writer is created.

This PR adds code to normalize the schema of the record batch before
writing by:
- putting all the fields in the same order
- creating all null/default value columns for any missing column

The missing columns should have a small overhead when written to disk,
because parquet will either write an entirely empty column chunk for the
null column (all null count, no data), or and for all default-value
columns, parquet will use dictionary and RLE encoding by default,
leading to a small column chunk with a single value value in dict & a
single run for the key.

What's unfortunate is that we still materialize an all-null column
before writing with the length of the record batch. This can be
optimized when run-end encoded arrays are supported in parquet, because
we could just create a run array with a single run of null/default
value. The arrow community is currently working on adding support (see
apache/arrow-rs#7713 &
apache/arrow-rs#8069).

---------

Co-authored-by: Laurent Quérel <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

rust Pull requests that update Rust code

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants