From c3df57db26d2e948d270886a676c3932b2e4caeb Mon Sep 17 00:00:00 2001 From: albertlockett Date: Mon, 22 Sep 2025 09:22:10 -0400 Subject: [PATCH 1/6] add resuable compression context for ipc writer --- arrow-ipc/src/compression.rs | 75 +++++++++++++++++++++++++++----- arrow-ipc/src/reader.rs | 14 +++++- arrow-ipc/src/writer.rs | 83 +++++++++++++++++++++++++++++++----- 3 files changed, 148 insertions(+), 24 deletions(-) diff --git a/arrow-ipc/src/compression.rs b/arrow-ipc/src/compression.rs index 47ea7785cbec..9fff54a7dd40 100644 --- a/arrow-ipc/src/compression.rs +++ b/arrow-ipc/src/compression.rs @@ -22,6 +22,39 @@ use arrow_schema::ArrowError; const LENGTH_NO_COMPRESSED_DATA: i64 = -1; const LENGTH_OF_PREFIX_DATA: i64 = 8; +/// Additional context that may be needed for compression. +/// +/// In the case of zstd, this will contain the zstd context, which can be reused between subsequent +/// compression calls to avoid the performance overhead of initialising a new context for every +/// compression. +pub struct CompressionContext { + #[cfg(feature = "zstd")] + compressor: zstd::bulk::Compressor<'static>, +} + +#[allow(clippy::derivable_impls)] +impl Default for CompressionContext { + fn default() -> Self { + CompressionContext { + // safety: `new` here will only return error here if using an invalid compression level + #[cfg(feature = "zstd")] + compressor: zstd::bulk::Compressor::new(zstd::DEFAULT_COMPRESSION_LEVEL) + .expect("can use default compression level"), + } + } +} + +impl std::fmt::Debug for CompressionContext { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let mut ds = f.debug_struct("CompressionContext"); + + #[cfg(feature = "zstd")] + ds.field("compressor", &"zstd::bulk::Compressor"); + + ds.finish() + } +} + /// Represents compressing a ipc stream using a particular compression algorithm #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum CompressionCodec { @@ -58,6 +91,7 @@ impl CompressionCodec { &self, input: &[u8], output: &mut Vec, + context: &mut CompressionContext, ) -> Result { let uncompressed_data_len = input.len(); let original_output_len = output.len(); @@ -67,7 +101,7 @@ impl CompressionCodec { } else { // write compressed data directly into the output buffer output.extend_from_slice(&uncompressed_data_len.to_le_bytes()); - self.compress(input, output)?; + self.compress(input, output, context)?; let compression_len = output.len() - original_output_len; if compression_len > uncompressed_data_len { @@ -115,10 +149,15 @@ impl CompressionCodec { /// Compress the data in input buffer and write to output buffer /// using the specified compression - fn compress(&self, input: &[u8], output: &mut Vec) -> Result<(), ArrowError> { + fn compress( + &self, + input: &[u8], + output: &mut Vec, + context: &mut CompressionContext, + ) -> Result<(), ArrowError> { match self { CompressionCodec::Lz4Frame => compress_lz4(input, output), - CompressionCodec::Zstd => compress_zstd(input, output), + CompressionCodec::Zstd => compress_zstd(input, output, context), } } @@ -175,17 +214,27 @@ fn decompress_lz4(_input: &[u8], _decompressed_size: usize) -> Result, A } #[cfg(feature = "zstd")] -fn compress_zstd(input: &[u8], output: &mut Vec) -> Result<(), ArrowError> { - use std::io::Write; - let mut encoder = zstd::Encoder::new(output, 0)?; - encoder.write_all(input)?; - encoder.finish()?; +fn compress_zstd( + input: &[u8], + output: &mut Vec, + context: &mut CompressionContext, +) -> Result<(), ArrowError> { + // use std::io::Write; + // let mut encoder = zstd::Encoder::new(output, 0)?; + // encoder.write_all(input)?; + // encoder.finish()?; + let result = context.compressor.compress(input)?; + output.extend_from_slice(&result); Ok(()) } #[cfg(not(feature = "zstd"))] #[allow(clippy::ptr_arg)] -fn compress_zstd(_input: &[u8], _output: &mut Vec) -> Result<(), ArrowError> { +fn compress_zstd( + _input: &[u8], + _output: &mut Vec, + _context: &mut CompressionContext, +) -> Result<(), ArrowError> { Err(ArrowError::InvalidArgumentError( "zstd IPC compression requires the zstd feature".to_string(), )) @@ -227,7 +276,9 @@ mod tests { let input_bytes = b"hello lz4"; let codec = super::CompressionCodec::Lz4Frame; let mut output_bytes: Vec = Vec::new(); - codec.compress(input_bytes, &mut output_bytes).unwrap(); + codec + .compress(input_bytes, &mut output_bytes, &mut Default::default()) + .unwrap(); let result = codec .decompress(output_bytes.as_slice(), input_bytes.len()) .unwrap(); @@ -240,7 +291,9 @@ mod tests { let input_bytes = b"hello zstd"; let codec = super::CompressionCodec::Zstd; let mut output_bytes: Vec = Vec::new(); - codec.compress(input_bytes, &mut output_bytes).unwrap(); + codec + .compress(input_bytes, &mut output_bytes, &mut Default::default()) + .unwrap(); let result = codec .decompress(output_bytes.as_slice(), input_bytes.len()) .unwrap(); diff --git a/arrow-ipc/src/reader.rs b/arrow-ipc/src/reader.rs index dfb9f3f75d8f..d0c90c1dcda5 100644 --- a/arrow-ipc/src/reader.rs +++ b/arrow-ipc/src/reader.rs @@ -2702,7 +2702,12 @@ mod tests { let gen = IpcDataGenerator {}; let mut dict_tracker = DictionaryTracker::new(false); let (_, encoded) = gen - .encoded_batch(&batch, &mut dict_tracker, &Default::default()) + .encoded_batch( + &batch, + &mut dict_tracker, + &Default::default(), + &mut Default::default(), + ) .unwrap(); let message = root_as_message(&encoded.ipc_message).unwrap(); @@ -2740,7 +2745,12 @@ mod tests { let gen = IpcDataGenerator {}; let mut dict_tracker = DictionaryTracker::new(false); let (_, encoded) = gen - .encoded_batch(&batch, &mut dict_tracker, &Default::default()) + .encoded_batch( + &batch, + &mut dict_tracker, + &Default::default(), + &mut Default::default(), + ) .unwrap(); let message = root_as_message(&encoded.ipc_message).unwrap(); diff --git a/arrow-ipc/src/writer.rs b/arrow-ipc/src/writer.rs index 59a1a3c0a190..f17ffa4f44d8 100644 --- a/arrow-ipc/src/writer.rs +++ b/arrow-ipc/src/writer.rs @@ -42,6 +42,7 @@ use arrow_data::{layout, ArrayData, ArrayDataBuilder, BufferSpec}; use arrow_schema::*; use crate::compression::CompressionCodec; +pub use crate::compression::CompressionContext; use crate::convert::IpcSchemaEncoder; use crate::CONTINUATION_MARKER; @@ -167,7 +168,7 @@ impl Default for IpcWriteOptions { /// # use std::sync::Arc; /// # use arrow_array::UInt64Array; /// # use arrow_array::RecordBatch; -/// # use arrow_ipc::writer::{DictionaryTracker, IpcDataGenerator, IpcWriteOptions}; +/// # use arrow_ipc::writer::{CompressionContext, DictionaryTracker, IpcDataGenerator, IpcWriteOptions}; /// /// // Create a record batch /// let batch = RecordBatch::try_from_iter(vec![ @@ -179,11 +180,13 @@ impl Default for IpcWriteOptions { /// let options = IpcWriteOptions::default(); /// let mut dictionary_tracker = DictionaryTracker::new(error_on_replacement); /// +/// let mut compression_context = CompressionContext::default(); +/// /// // encode the batch into zero or more encoded dictionaries /// // and the data for the actual array. /// let data_gen = IpcDataGenerator::default(); /// let (encoded_dictionaries, encoded_message) = data_gen -/// .encoded_batch(&batch, &mut dictionary_tracker, &options) +/// .encoded_batch(&batch, &mut dictionary_tracker, &options, &mut compression_context) /// .unwrap(); /// # } /// ``` @@ -231,6 +234,7 @@ impl IpcDataGenerator { dictionary_tracker: &mut DictionaryTracker, write_options: &IpcWriteOptions, dict_id: &mut I, + compression_context: &mut CompressionContext, ) -> Result<(), ArrowError> { match column.data_type() { DataType::Struct(fields) => { @@ -243,6 +247,7 @@ impl IpcDataGenerator { dictionary_tracker, write_options, dict_id, + compression_context, )?; } } @@ -264,6 +269,7 @@ impl IpcDataGenerator { dictionary_tracker, write_options, dict_id, + compression_context, )?; } DataType::List(field) => { @@ -275,6 +281,7 @@ impl IpcDataGenerator { dictionary_tracker, write_options, dict_id, + compression_context, )?; } DataType::LargeList(field) => { @@ -286,6 +293,7 @@ impl IpcDataGenerator { dictionary_tracker, write_options, dict_id, + compression_context, )?; } DataType::FixedSizeList(field, _) => { @@ -300,6 +308,7 @@ impl IpcDataGenerator { dictionary_tracker, write_options, dict_id, + compression_context, )?; } DataType::Map(field, _) => { @@ -318,6 +327,7 @@ impl IpcDataGenerator { dictionary_tracker, write_options, dict_id, + compression_context, )?; // values @@ -328,6 +338,7 @@ impl IpcDataGenerator { dictionary_tracker, write_options, dict_id, + compression_context, )?; } DataType::Union(fields, _) => { @@ -341,6 +352,7 @@ impl IpcDataGenerator { dictionary_tracker, write_options, dict_id, + compression_context, )?; } } @@ -350,6 +362,7 @@ impl IpcDataGenerator { Ok(()) } + #[allow(clippy::too_many_arguments)] fn encode_dictionaries>( &self, field: &Field, @@ -358,6 +371,7 @@ impl IpcDataGenerator { dictionary_tracker: &mut DictionaryTracker, write_options: &IpcWriteOptions, dict_id_seq: &mut I, + compression_context: &mut CompressionContext, ) -> Result<(), ArrowError> { match column.data_type() { DataType::Dictionary(_key_type, _value_type) => { @@ -372,6 +386,7 @@ impl IpcDataGenerator { dictionary_tracker, write_options, dict_id_seq, + compression_context, )?; // It's important to only take the dict_id at this point, because the dict ID @@ -393,6 +408,7 @@ impl IpcDataGenerator { dict_values, write_options, false, + compression_context, )?); } DictionaryUpdate::Delta(data) => { @@ -401,6 +417,7 @@ impl IpcDataGenerator { &data, write_options, true, + compression_context, )?); } } @@ -411,6 +428,7 @@ impl IpcDataGenerator { dictionary_tracker, write_options, dict_id_seq, + compression_context, )?, } @@ -425,6 +443,7 @@ impl IpcDataGenerator { batch: &RecordBatch, dictionary_tracker: &mut DictionaryTracker, write_options: &IpcWriteOptions, + compression_context: &mut CompressionContext, ) -> Result<(Vec, EncodedData), ArrowError> { let schema = batch.schema(); let mut encoded_dictionaries = Vec::with_capacity(schema.flattened_fields().len()); @@ -440,10 +459,12 @@ impl IpcDataGenerator { dictionary_tracker, write_options, &mut dict_id, + compression_context, )?; } - let encoded_message = self.record_batch_to_bytes(batch, write_options)?; + let encoded_message = + self.record_batch_to_bytes(batch, write_options, compression_context)?; Ok((encoded_dictionaries, encoded_message)) } @@ -453,6 +474,7 @@ impl IpcDataGenerator { &self, batch: &RecordBatch, write_options: &IpcWriteOptions, + compression_context: &mut CompressionContext, ) -> Result { let mut fbb = FlatBufferBuilder::new(); @@ -487,6 +509,7 @@ impl IpcDataGenerator { array.len(), array.null_count(), compression_codec, + compression_context, write_options, )?; @@ -545,6 +568,7 @@ impl IpcDataGenerator { array_data: &ArrayData, write_options: &IpcWriteOptions, is_delta: bool, + compression_context: &mut CompressionContext, ) -> Result { let mut fbb = FlatBufferBuilder::new(); @@ -575,6 +599,7 @@ impl IpcDataGenerator { array_data.len(), array_data.null_count(), compression_codec, + compression_context, write_options, )?; @@ -1008,6 +1033,8 @@ pub struct FileWriter { custom_metadata: HashMap, data_gen: IpcDataGenerator, + + compression_context: CompressionContext, } impl FileWriter> { @@ -1069,6 +1096,7 @@ impl FileWriter { dictionary_tracker, custom_metadata: HashMap::new(), data_gen, + compression_context: CompressionContext::default(), }) } @@ -1089,6 +1117,7 @@ impl FileWriter { batch, &mut self.dictionary_tracker, &self.write_options, + &mut self.compression_context, )?; for encoded_dictionary in encoded_dictionaries { @@ -1293,6 +1322,8 @@ pub struct StreamWriter { dictionary_tracker: DictionaryTracker, data_gen: IpcDataGenerator, + + compression_context: CompressionContext, } impl StreamWriter> { @@ -1343,6 +1374,7 @@ impl StreamWriter { finished: false, dictionary_tracker, data_gen, + compression_context: CompressionContext::default(), }) } @@ -1356,7 +1388,12 @@ impl StreamWriter { let (encoded_dictionaries, encoded_message) = self .data_gen - .encoded_batch(batch, &mut self.dictionary_tracker, &self.write_options) + .encoded_batch( + batch, + &mut self.dictionary_tracker, + &self.write_options, + &mut self.compression_context, + ) .expect("StreamWriter is configured to not error on dictionary replacement"); for encoded_dictionary in encoded_dictionaries { @@ -1667,6 +1704,7 @@ fn write_array_data( num_rows: usize, null_count: usize, compression_codec: Option, + compression_context: &mut CompressionContext, write_options: &IpcWriteOptions, ) -> Result { let mut offset = offset; @@ -1696,6 +1734,7 @@ fn write_array_data( arrow_data, offset, compression_codec, + compression_context, write_options.alignment, )?; } @@ -1710,6 +1749,7 @@ fn write_array_data( arrow_data, offset, compression_codec, + compression_context, write_options.alignment, )?; } @@ -1727,6 +1767,7 @@ fn write_array_data( arrow_data, offset, compression_codec, + compression_context, write_options.alignment, )?; } @@ -1739,6 +1780,7 @@ fn write_array_data( arrow_data, offset, compression_codec, + compression_context, write_options.alignment, )?; } @@ -1771,6 +1813,7 @@ fn write_array_data( arrow_data, offset, compression_codec, + compression_context, write_options.alignment, )?; } else if matches!(data_type, DataType::Boolean) { @@ -1786,6 +1829,7 @@ fn write_array_data( arrow_data, offset, compression_codec, + compression_context, write_options.alignment, )?; } else if matches!( @@ -1808,6 +1852,7 @@ fn write_array_data( arrow_data, offset, compression_codec, + compression_context, write_options.alignment, )?; offset = write_array_data( @@ -1819,6 +1864,7 @@ fn write_array_data( sliced_child_data.len(), sliced_child_data.null_count(), compression_codec, + compression_context, write_options, )?; return Ok(offset); @@ -1839,6 +1885,7 @@ fn write_array_data( child_data.len(), child_data.null_count(), compression_codec, + compression_context, write_options, )?; return Ok(offset); @@ -1850,6 +1897,7 @@ fn write_array_data( arrow_data, offset, compression_codec, + compression_context, write_options.alignment, )?; } @@ -1872,6 +1920,7 @@ fn write_array_data( data_ref.len(), data_ref.null_count(), compression_codec, + compression_context, write_options, )?; } @@ -1889,6 +1938,7 @@ fn write_array_data( data_ref.len(), data_ref.null_count(), compression_codec, + compression_context, write_options, )?; } @@ -1915,10 +1965,11 @@ fn write_buffer( arrow_data: &mut Vec, // output stream offset: i64, // current output stream offset compression_codec: Option, + compression_context: &mut CompressionContext, alignment: u8, ) -> Result { let len: i64 = match compression_codec { - Some(compressor) => compressor.compress_to_vec(buffer, arrow_data)?, + Some(compressor) => compressor.compress_to_vec(buffer, arrow_data, compression_context)?, None => { arrow_data.extend_from_slice(buffer); buffer.len() @@ -2250,7 +2301,7 @@ mod tests { false, )])); - let gen = IpcDataGenerator {}; + let gen = IpcDataGenerator::default(); let mut dict_tracker = DictionaryTracker::new(false); gen.schema_to_bytes_with_dictionary_tracker( &schema, @@ -2260,8 +2311,13 @@ mod tests { let batch = RecordBatch::try_new(schema, vec![Arc::new(union)]).unwrap(); - gen.encoded_batch(&batch, &mut dict_tracker, &Default::default()) - .unwrap(); + gen.encoded_batch( + &batch, + &mut dict_tracker, + &Default::default(), + &mut Default::default(), + ) + .unwrap(); // The encoder will assign dict IDs itself to ensure uniqueness and ignore the dict ID in the schema // so we expect the dict will be keyed to 0 @@ -2293,7 +2349,7 @@ mod tests { false, )])); - let gen = IpcDataGenerator {}; + let gen = IpcDataGenerator::default(); let mut dict_tracker = DictionaryTracker::new(false); gen.schema_to_bytes_with_dictionary_tracker( &schema, @@ -2303,8 +2359,13 @@ mod tests { let batch = RecordBatch::try_new(schema, vec![struct_array]).unwrap(); - gen.encoded_batch(&batch, &mut dict_tracker, &Default::default()) - .unwrap(); + gen.encoded_batch( + &batch, + &mut dict_tracker, + &Default::default(), + &mut Default::default(), + ) + .unwrap(); assert!(dict_tracker.written.contains_key(&0)); } From 1b26f99e7e5aa726c4146e8eb40cb8e3e9b08c06 Mon Sep 17 00:00:00 2001 From: albertlockett Date: Mon, 22 Sep 2025 09:26:33 -0400 Subject: [PATCH 2/6] fix compiler errors and clippies in flight --- arrow-flight/src/encode.rs | 9 ++++++--- arrow-flight/src/utils.rs | 4 +++- .../src/flight_client_scenarios/integration_test.rs | 12 ++++++++++-- .../src/flight_server_scenarios/integration_test.rs | 7 ++++++- arrow-ipc/src/compression.rs | 2 ++ 5 files changed, 27 insertions(+), 7 deletions(-) diff --git a/arrow-flight/src/encode.rs b/arrow-flight/src/encode.rs index 49910a3ee2b0..dd1e6e65bace 100644 --- a/arrow-flight/src/encode.rs +++ b/arrow-flight/src/encode.rs @@ -20,7 +20,7 @@ use std::{collections::VecDeque, fmt::Debug, pin::Pin, sync::Arc, task::Poll}; use crate::{error::Result, FlightData, FlightDescriptor, SchemaAsIpc}; use arrow_array::{Array, ArrayRef, RecordBatch, RecordBatchOptions, UnionArray}; -use arrow_ipc::writer::{DictionaryTracker, IpcDataGenerator, IpcWriteOptions}; +use arrow_ipc::writer::{CompressionContext, DictionaryTracker, IpcDataGenerator, IpcWriteOptions}; use arrow_schema::{DataType, Field, FieldRef, Fields, Schema, SchemaRef, UnionMode}; use bytes::Bytes; @@ -647,6 +647,7 @@ struct FlightIpcEncoder { options: IpcWriteOptions, data_gen: IpcDataGenerator, dictionary_tracker: DictionaryTracker, + compression_context: CompressionContext, } impl FlightIpcEncoder { @@ -655,6 +656,7 @@ impl FlightIpcEncoder { options, data_gen: IpcDataGenerator::default(), dictionary_tracker: DictionaryTracker::new(error_on_replacement), + compression_context: CompressionContext::default(), } } @@ -668,7 +670,7 @@ impl FlightIpcEncoder { fn encode_batch(&mut self, batch: &RecordBatch) -> Result<(Vec, FlightData)> { let (encoded_dictionaries, encoded_batch) = self.data_gen - .encoded_batch(batch, &mut self.dictionary_tracker, &self.options)?; + .encoded_batch(batch, &mut self.dictionary_tracker, &self.options, &mut self.compression_context)?; let flight_dictionaries = encoded_dictionaries.into_iter().map(Into::into).collect(); let flight_batch = encoded_batch.into(); @@ -1596,9 +1598,10 @@ mod tests { ) -> (Vec, FlightData) { let data_gen = IpcDataGenerator::default(); let mut dictionary_tracker = DictionaryTracker::new(false); + let mut compression_context = CompressionContext::default(); let (encoded_dictionaries, encoded_batch) = data_gen - .encoded_batch(batch, &mut dictionary_tracker, options) + .encoded_batch(batch, &mut dictionary_tracker, options, &mut compression_context) .expect("DictionaryTracker configured above to not error on replacement"); let flight_dictionaries = encoded_dictionaries.into_iter().map(Into::into).collect(); diff --git a/arrow-flight/src/utils.rs b/arrow-flight/src/utils.rs index a304aedcfaee..a58face18c22 100644 --- a/arrow-flight/src/utils.rs +++ b/arrow-flight/src/utils.rs @@ -24,6 +24,7 @@ use std::sync::Arc; use arrow_array::{ArrayRef, RecordBatch}; use arrow_buffer::Buffer; use arrow_ipc::convert::fb_to_schema; +use arrow_ipc::writer::CompressionContext; use arrow_ipc::{reader, root_as_message, writer, writer::IpcWriteOptions}; use arrow_schema::{ArrowError, Schema, SchemaRef}; @@ -91,10 +92,11 @@ pub fn batches_to_flight_data( let data_gen = writer::IpcDataGenerator::default(); let mut dictionary_tracker = writer::DictionaryTracker::new(false); + let mut compression_context = CompressionContext::default(); for batch in batches.iter() { let (encoded_dictionaries, encoded_batch) = - data_gen.encoded_batch(batch, &mut dictionary_tracker, &options)?; + data_gen.encoded_batch(batch, &mut dictionary_tracker, &options, &mut compression_context)?; dictionaries.extend(encoded_dictionaries.into_iter().map(Into::into)); flight_data.push(encoded_batch.into()); diff --git a/arrow-integration-testing/src/flight_client_scenarios/integration_test.rs b/arrow-integration-testing/src/flight_client_scenarios/integration_test.rs index bd41ab602ee5..df61843f3bd3 100644 --- a/arrow-integration-testing/src/flight_client_scenarios/integration_test.rs +++ b/arrow-integration-testing/src/flight_client_scenarios/integration_test.rs @@ -24,7 +24,10 @@ use arrow::{ array::ArrayRef, buffer::Buffer, datatypes::SchemaRef, - ipc::{self, reader, writer}, + ipc::{ + self, reader, + writer::{self, CompressionContext}, + }, record_batch::RecordBatch, }; use arrow_flight::{ @@ -90,6 +93,8 @@ async fn upload_data( let mut original_data_iter = original_data.iter().enumerate(); + let mut compression_context = CompressionContext::default(); + if let Some((counter, first_batch)) = original_data_iter.next() { let metadata = counter.to_string().into_bytes(); // Preload the first batch into the channel before starting the request @@ -99,6 +104,7 @@ async fn upload_data( first_batch, &options, &mut dict_tracker, + &mut compression_context, ) .await?; @@ -121,6 +127,7 @@ async fn upload_data( batch, &options, &mut dict_tracker, + &mut compression_context, ) .await?; @@ -150,11 +157,12 @@ async fn send_batch( batch: &RecordBatch, options: &writer::IpcWriteOptions, dictionary_tracker: &mut writer::DictionaryTracker, + compression_context: &mut CompressionContext, ) -> Result { let data_gen = writer::IpcDataGenerator::default(); let (encoded_dictionaries, encoded_batch) = data_gen - .encoded_batch(batch, dictionary_tracker, options) + .encoded_batch(batch, dictionary_tracker, options, compression_context) .expect("DictionaryTracker configured above to not error on replacement"); let dictionary_flight_data: Vec = diff --git a/arrow-integration-testing/src/flight_server_scenarios/integration_test.rs b/arrow-integration-testing/src/flight_server_scenarios/integration_test.rs index d608a4753723..8a072f958dd2 100644 --- a/arrow-integration-testing/src/flight_server_scenarios/integration_test.rs +++ b/arrow-integration-testing/src/flight_server_scenarios/integration_test.rs @@ -144,7 +144,12 @@ impl FlightService for FlightServiceImpl { .enumerate() .flat_map(|(counter, batch)| { let (encoded_dictionaries, encoded_batch) = data_gen - .encoded_batch(batch, &mut dictionary_tracker, &options) + .encoded_batch( + batch, + &mut dictionary_tracker, + &options, + &mut Default::default(), + ) .expect("DictionaryTracker configured above to not error on replacement"); let dictionary_flight_data = encoded_dictionaries.into_iter().map(Into::into); diff --git a/arrow-ipc/src/compression.rs b/arrow-ipc/src/compression.rs index 9fff54a7dd40..8a7176fe4924 100644 --- a/arrow-ipc/src/compression.rs +++ b/arrow-ipc/src/compression.rs @@ -32,6 +32,8 @@ pub struct CompressionContext { compressor: zstd::bulk::Compressor<'static>, } +// the reason we allow derivable_impls here is because when zstd feature is not enabled, this +// becomes derivable. however with zstd feature want to be explicit about the compression level. #[allow(clippy::derivable_impls)] impl Default for CompressionContext { fn default() -> Self { From f829fe4f37c6990a0cf5ed07cba7e7407f96de5d Mon Sep 17 00:00:00 2001 From: albertlockett Date: Mon, 22 Sep 2025 09:41:11 -0400 Subject: [PATCH 3/6] remove commented code --- arrow-ipc/src/compression.rs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/arrow-ipc/src/compression.rs b/arrow-ipc/src/compression.rs index 8a7176fe4924..9bbc6e752c12 100644 --- a/arrow-ipc/src/compression.rs +++ b/arrow-ipc/src/compression.rs @@ -221,10 +221,6 @@ fn compress_zstd( output: &mut Vec, context: &mut CompressionContext, ) -> Result<(), ArrowError> { - // use std::io::Write; - // let mut encoder = zstd::Encoder::new(output, 0)?; - // encoder.write_all(input)?; - // encoder.finish()?; let result = context.compressor.compress(input)?; output.extend_from_slice(&result); Ok(()) From e4b33d15c6a1f4a2fc4be81c0b675e1bdd8c81a9 Mon Sep 17 00:00:00 2001 From: albertlockett Date: Mon, 22 Sep 2025 09:45:11 -0400 Subject: [PATCH 4/6] fix fmt in flight --- arrow-flight/src/encode.rs | 16 ++++++++++++---- arrow-flight/src/utils.rs | 8 ++++++-- 2 files changed, 18 insertions(+), 6 deletions(-) diff --git a/arrow-flight/src/encode.rs b/arrow-flight/src/encode.rs index dd1e6e65bace..eaef8cd7fe86 100644 --- a/arrow-flight/src/encode.rs +++ b/arrow-flight/src/encode.rs @@ -668,9 +668,12 @@ impl FlightIpcEncoder { /// Convert a `RecordBatch` to a Vec of `FlightData` representing /// dictionaries and a `FlightData` representing the batch fn encode_batch(&mut self, batch: &RecordBatch) -> Result<(Vec, FlightData)> { - let (encoded_dictionaries, encoded_batch) = - self.data_gen - .encoded_batch(batch, &mut self.dictionary_tracker, &self.options, &mut self.compression_context)?; + let (encoded_dictionaries, encoded_batch) = self.data_gen.encoded_batch( + batch, + &mut self.dictionary_tracker, + &self.options, + &mut self.compression_context, + )?; let flight_dictionaries = encoded_dictionaries.into_iter().map(Into::into).collect(); let flight_batch = encoded_batch.into(); @@ -1601,7 +1604,12 @@ mod tests { let mut compression_context = CompressionContext::default(); let (encoded_dictionaries, encoded_batch) = data_gen - .encoded_batch(batch, &mut dictionary_tracker, options, &mut compression_context) + .encoded_batch( + batch, + &mut dictionary_tracker, + options, + &mut compression_context, + ) .expect("DictionaryTracker configured above to not error on replacement"); let flight_dictionaries = encoded_dictionaries.into_iter().map(Into::into).collect(); diff --git a/arrow-flight/src/utils.rs b/arrow-flight/src/utils.rs index a58face18c22..ed9cc97c88c9 100644 --- a/arrow-flight/src/utils.rs +++ b/arrow-flight/src/utils.rs @@ -95,8 +95,12 @@ pub fn batches_to_flight_data( let mut compression_context = CompressionContext::default(); for batch in batches.iter() { - let (encoded_dictionaries, encoded_batch) = - data_gen.encoded_batch(batch, &mut dictionary_tracker, &options, &mut compression_context)?; + let (encoded_dictionaries, encoded_batch) = data_gen.encoded_batch( + batch, + &mut dictionary_tracker, + &options, + &mut compression_context, + )?; dictionaries.extend(encoded_dictionaries.into_iter().map(Into::into)); flight_data.push(encoded_batch.into()); From 499fcbe9d38a1a4321d8906c77c1ecc6e762315b Mon Sep 17 00:00:00 2001 From: albertlockett Date: Mon, 22 Sep 2025 17:24:36 -0400 Subject: [PATCH 5/6] PR feedback --- arrow-ipc/src/reader.rs | 4 ++-- arrow-ipc/src/writer.rs | 30 ++++++++++++++++++++++++------ 2 files changed, 26 insertions(+), 8 deletions(-) diff --git a/arrow-ipc/src/reader.rs b/arrow-ipc/src/reader.rs index d0c90c1dcda5..7702c814e8d3 100644 --- a/arrow-ipc/src/reader.rs +++ b/arrow-ipc/src/reader.rs @@ -2702,7 +2702,7 @@ mod tests { let gen = IpcDataGenerator {}; let mut dict_tracker = DictionaryTracker::new(false); let (_, encoded) = gen - .encoded_batch( + .encode( &batch, &mut dict_tracker, &Default::default(), @@ -2745,7 +2745,7 @@ mod tests { let gen = IpcDataGenerator {}; let mut dict_tracker = DictionaryTracker::new(false); let (_, encoded) = gen - .encoded_batch( + .encode( &batch, &mut dict_tracker, &Default::default(), diff --git a/arrow-ipc/src/writer.rs b/arrow-ipc/src/writer.rs index f17ffa4f44d8..ed05998ad106 100644 --- a/arrow-ipc/src/writer.rs +++ b/arrow-ipc/src/writer.rs @@ -186,7 +186,7 @@ impl Default for IpcWriteOptions { /// // and the data for the actual array. /// let data_gen = IpcDataGenerator::default(); /// let (encoded_dictionaries, encoded_message) = data_gen -/// .encoded_batch(&batch, &mut dictionary_tracker, &options, &mut compression_context) +/// .encode(&batch, &mut dictionary_tracker, &options, &mut compression_context) /// .unwrap(); /// # } /// ``` @@ -438,7 +438,7 @@ impl IpcDataGenerator { /// Encodes a batch to a number of [EncodedData] items (dictionary batches + the record batch). /// The [DictionaryTracker] keeps track of dictionaries with new `dict_id`s (so they are only sent once) /// Make sure the [DictionaryTracker] is initialized at the start of the stream. - pub fn encoded_batch( + pub fn encode( &self, batch: &RecordBatch, dictionary_tracker: &mut DictionaryTracker, @@ -468,6 +468,24 @@ impl IpcDataGenerator { Ok((encoded_dictionaries, encoded_message)) } + /// Encodes a batch to a number of [EncodedData] items (dictionary batches + the record batch). + /// The [DictionaryTracker] keeps track of dictionaries with new `dict_id`s (so they are only sent once) + /// Make sure the [DictionaryTracker] is initialized at the start of the stream. + #[deprecated(since = "57.0.0", note = "Use `encode` instead")] + pub fn encoded_batch( + &self, + batch: &RecordBatch, + dictionary_tracker: &mut DictionaryTracker, + write_options: &IpcWriteOptions, + ) -> Result<(Vec, EncodedData), ArrowError> { + self.encode( + batch, + dictionary_tracker, + write_options, + &mut Default::default(), + ) + } + /// Write a `RecordBatch` into two sets of bytes, one for the header (crate::Message) and the /// other for the batch's data fn record_batch_to_bytes( @@ -1113,7 +1131,7 @@ impl FileWriter { )); } - let (encoded_dictionaries, encoded_message) = self.data_gen.encoded_batch( + let (encoded_dictionaries, encoded_message) = self.data_gen.encode( batch, &mut self.dictionary_tracker, &self.write_options, @@ -1388,7 +1406,7 @@ impl StreamWriter { let (encoded_dictionaries, encoded_message) = self .data_gen - .encoded_batch( + .encode( batch, &mut self.dictionary_tracker, &self.write_options, @@ -2311,7 +2329,7 @@ mod tests { let batch = RecordBatch::try_new(schema, vec![Arc::new(union)]).unwrap(); - gen.encoded_batch( + gen.encode( &batch, &mut dict_tracker, &Default::default(), @@ -2359,7 +2377,7 @@ mod tests { let batch = RecordBatch::try_new(schema, vec![struct_array]).unwrap(); - gen.encoded_batch( + gen.encode( &batch, &mut dict_tracker, &Default::default(), From 4a40a43e996fe636451362953fdc179852d52cf2 Mon Sep 17 00:00:00 2001 From: albertlockett Date: Mon, 22 Sep 2025 17:41:18 -0400 Subject: [PATCH 6/6] fix compiler errors in flight and integ tests --- arrow-flight/src/encode.rs | 4 ++-- arrow-flight/src/utils.rs | 2 +- .../src/flight_client_scenarios/integration_test.rs | 2 +- .../src/flight_server_scenarios/integration_test.rs | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/arrow-flight/src/encode.rs b/arrow-flight/src/encode.rs index eaef8cd7fe86..82a106ce49c1 100644 --- a/arrow-flight/src/encode.rs +++ b/arrow-flight/src/encode.rs @@ -668,7 +668,7 @@ impl FlightIpcEncoder { /// Convert a `RecordBatch` to a Vec of `FlightData` representing /// dictionaries and a `FlightData` representing the batch fn encode_batch(&mut self, batch: &RecordBatch) -> Result<(Vec, FlightData)> { - let (encoded_dictionaries, encoded_batch) = self.data_gen.encoded_batch( + let (encoded_dictionaries, encoded_batch) = self.data_gen.encode( batch, &mut self.dictionary_tracker, &self.options, @@ -1604,7 +1604,7 @@ mod tests { let mut compression_context = CompressionContext::default(); let (encoded_dictionaries, encoded_batch) = data_gen - .encoded_batch( + .encode( batch, &mut dictionary_tracker, options, diff --git a/arrow-flight/src/utils.rs b/arrow-flight/src/utils.rs index ed9cc97c88c9..6effb5f86aaf 100644 --- a/arrow-flight/src/utils.rs +++ b/arrow-flight/src/utils.rs @@ -95,7 +95,7 @@ pub fn batches_to_flight_data( let mut compression_context = CompressionContext::default(); for batch in batches.iter() { - let (encoded_dictionaries, encoded_batch) = data_gen.encoded_batch( + let (encoded_dictionaries, encoded_batch) = data_gen.encode( batch, &mut dictionary_tracker, &options, diff --git a/arrow-integration-testing/src/flight_client_scenarios/integration_test.rs b/arrow-integration-testing/src/flight_client_scenarios/integration_test.rs index df61843f3bd3..4f4f29cc3d2a 100644 --- a/arrow-integration-testing/src/flight_client_scenarios/integration_test.rs +++ b/arrow-integration-testing/src/flight_client_scenarios/integration_test.rs @@ -162,7 +162,7 @@ async fn send_batch( let data_gen = writer::IpcDataGenerator::default(); let (encoded_dictionaries, encoded_batch) = data_gen - .encoded_batch(batch, dictionary_tracker, options, compression_context) + .encode(batch, dictionary_tracker, options, compression_context) .expect("DictionaryTracker configured above to not error on replacement"); let dictionary_flight_data: Vec = diff --git a/arrow-integration-testing/src/flight_server_scenarios/integration_test.rs b/arrow-integration-testing/src/flight_server_scenarios/integration_test.rs index 8a072f958dd2..9faced000366 100644 --- a/arrow-integration-testing/src/flight_server_scenarios/integration_test.rs +++ b/arrow-integration-testing/src/flight_server_scenarios/integration_test.rs @@ -144,7 +144,7 @@ impl FlightService for FlightServiceImpl { .enumerate() .flat_map(|(counter, batch)| { let (encoded_dictionaries, encoded_batch) = data_gen - .encoded_batch( + .encode( batch, &mut dictionary_tracker, &options,