Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ parking_lot = { workspace = true }
parquet = { workspace = true, optional = true, default-features = true }
rand = { workspace = true }
regex = { workspace = true }
rstest = { workspace = true }
serde = { version = "1.0", default-features = false, features = ["derive"], optional = true }
sqlparser = { workspace = true, optional = true }
tempfile = { workspace = true }
Expand Down
7 changes: 6 additions & 1 deletion datafusion/core/src/dataframe/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ impl DataFrame {

#[cfg(test)]
mod tests {
use rstest::rstest;
use std::collections::HashMap;
use std::sync::Arc;

Expand Down Expand Up @@ -247,9 +248,12 @@ mod tests {
Ok(())
}

#[rstest]
#[cfg(feature = "parquet_encryption")]
#[tokio::test]
async fn roundtrip_parquet_with_encryption() -> Result<()> {
async fn roundtrip_parquet_with_encryption(
#[values(false, true)] allow_single_file_parallelism: bool,
) -> Result<()> {
use parquet::encryption::decrypt::FileDecryptionProperties;
use parquet::encryption::encrypt::FileEncryptionProperties;

Expand Down Expand Up @@ -278,6 +282,7 @@ mod tests {
// Write encrypted parquet using write_parquet
let mut options = TableParquetOptions::default();
options.crypto.file_encryption = Some((&encrypt).into());
options.global.allow_single_file_parallelism = allow_single_file_parallelism;

df.write_parquet(
tempfile_str.as_str(),
Expand Down
84 changes: 36 additions & 48 deletions datafusion/datasource-parquet/src/file_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,11 @@ use object_store::buffered::BufWriter;
use object_store::path::Path;
use object_store::{ObjectMeta, ObjectStore};
use parquet::arrow::arrow_writer::{
compute_leaves, get_column_writers, ArrowColumnChunk, ArrowColumnWriter,
ArrowLeafColumn, ArrowWriterOptions,
compute_leaves, ArrowColumnChunk, ArrowColumnWriter, ArrowLeafColumn,
ArrowRowGroupWriterFactory, ArrowWriterOptions,
};
use parquet::arrow::async_reader::MetadataFetch;
use parquet::arrow::{ArrowSchemaConverter, AsyncArrowWriter};
use parquet::arrow::{ArrowWriter, AsyncArrowWriter};
use parquet::basic::Type;

use crate::metadata::DFParquetMetadata;
Expand Down Expand Up @@ -1128,14 +1128,7 @@ impl ParquetSink {
runtime: &Arc<RuntimeEnv>,
path: &Path,
) -> Result<WriterProperties> {
let schema = if self.parquet_options.global.allow_single_file_parallelism {
// If parallelizing writes, we may be also be doing hive style partitioning
// into multiple files which impacts the schema per file.
// Refer to `get_writer_schema()`
&get_writer_schema(&self.config)
} else {
self.config.output_schema()
};
let schema = self.config.output_schema();

// TODO: avoid this clone in follow up PR, where the writer properties & schema
// are calculated once on `ParquetSink::new`
Expand Down Expand Up @@ -1249,16 +1242,6 @@ impl FileSink for ParquetSink {
object_store: Arc<dyn ObjectStore>,
) -> Result<u64> {
let parquet_opts = &self.parquet_options;
let mut allow_single_file_parallelism =
parquet_opts.global.allow_single_file_parallelism;

if parquet_opts.crypto.file_encryption.is_some()
|| parquet_opts.crypto.factory_id.is_some()
{
// For now, arrow-rs does not support parallel writes with encryption
// See https://github.com/apache/arrow-rs/issues/7359
allow_single_file_parallelism = false;
}

let mut file_write_tasks: JoinSet<
std::result::Result<(Path, FileMetaData), DataFusionError>,
Expand All @@ -1276,7 +1259,7 @@ impl FileSink for ParquetSink {

while let Some((path, mut rx)) = file_stream_rx.recv().await {
let parquet_props = self.create_writer_props(&runtime, &path).await?;
if !allow_single_file_parallelism {
if !parquet_opts.global.allow_single_file_parallelism {
let mut writer = self
.create_async_arrow_writer(
&path,
Expand Down Expand Up @@ -1316,6 +1299,7 @@ impl FileSink for ParquetSink {
.build()?;
let schema = get_writer_schema(&self.config);
let props = parquet_props.clone();
let skip_arrow_metadata = self.parquet_options.global.skip_arrow_metadata;
let parallel_options_clone = parallel_options.clone();
let pool = Arc::clone(context.memory_pool());
file_write_tasks.spawn(async move {
Expand All @@ -1324,6 +1308,7 @@ impl FileSink for ParquetSink {
rx,
schema,
&props,
skip_arrow_metadata,
parallel_options_clone,
pool,
)
Expand Down Expand Up @@ -1404,13 +1389,10 @@ type ColSender = Sender<ArrowLeafColumn>;
/// Returns join handles for each columns serialization task along with a send channel
/// to send arrow arrays to each serialization task.
fn spawn_column_parallel_row_group_writer(
schema: Arc<Schema>,
parquet_props: Arc<WriterProperties>,
col_writers: Vec<ArrowColumnWriter>,
max_buffer_size: usize,
pool: &Arc<dyn MemoryPool>,
) -> Result<(Vec<ColumnWriterTask>, Vec<ColSender>)> {
let schema_desc = ArrowSchemaConverter::new().convert(&schema)?;
let col_writers = get_column_writers(&schema_desc, &parquet_props, &schema)?;
let num_columns = col_writers.len();

let mut col_writer_tasks = Vec::with_capacity(num_columns);
Expand Down Expand Up @@ -1505,6 +1487,7 @@ fn spawn_rg_join_and_finalize_task(
/// across both columns and row_groups, with a theoretical max number of parallel tasks
/// given by n_columns * num_row_groups.
fn spawn_parquet_parallel_serialization_task(
row_group_writer_factory: ArrowRowGroupWriterFactory,
mut data: Receiver<RecordBatch>,
serialize_tx: Sender<SpawnedTask<RBStreamSerializeResult>>,
schema: Arc<Schema>,
Expand All @@ -1515,13 +1498,11 @@ fn spawn_parquet_parallel_serialization_task(
SpawnedTask::spawn(async move {
let max_buffer_rb = parallel_options.max_buffered_record_batches_per_stream;
let max_row_group_rows = writer_props.max_row_group_size();
let mut row_group_index = 0;
let col_writers =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is really nice to use the ArrowRowGroupWriterFactory API here for all writing on both paths 👍

row_group_writer_factory.create_column_writers(row_group_index)?;
let (mut column_writer_handles, mut col_array_channels) =
spawn_column_parallel_row_group_writer(
Arc::clone(&schema),
Arc::clone(&writer_props),
max_buffer_rb,
&pool,
)?;
spawn_column_parallel_row_group_writer(col_writers, max_buffer_rb, &pool)?;
let mut current_rg_rows = 0;

while let Some(mut rb) = data.recv().await {
Expand Down Expand Up @@ -1567,10 +1548,12 @@ fn spawn_parquet_parallel_serialization_task(
current_rg_rows = 0;
rb = rb.slice(rows_left, rb.num_rows() - rows_left);

row_group_index += 1;
let col_writers = row_group_writer_factory
.create_column_writers(row_group_index)?;
(column_writer_handles, col_array_channels) =
spawn_column_parallel_row_group_writer(
Arc::clone(&schema),
Arc::clone(&writer_props),
col_writers,
max_buffer_rb,
&pool,
)?;
Expand Down Expand Up @@ -1601,29 +1584,21 @@ fn spawn_parquet_parallel_serialization_task(
/// Consume RowGroups serialized by other parallel tasks and concatenate them in
/// to the final parquet file, while flushing finalized bytes to an [ObjectStore]
async fn concatenate_parallel_row_groups(
mut parquet_writer: SerializedFileWriter<SharedBuffer>,
merged_buff: SharedBuffer,
mut serialize_rx: Receiver<SpawnedTask<RBStreamSerializeResult>>,
schema: Arc<Schema>,
writer_props: Arc<WriterProperties>,
mut object_store_writer: Box<dyn AsyncWrite + Send + Unpin>,
pool: Arc<dyn MemoryPool>,
) -> Result<FileMetaData> {
let merged_buff = SharedBuffer::new(INITIAL_BUFFER_BYTES);

let mut file_reservation =
MemoryConsumer::new("ParquetSink(SerializedFileWriter)").register(&pool);

let schema_desc = ArrowSchemaConverter::new().convert(schema.as_ref())?;
let mut parquet_writer = SerializedFileWriter::new(
merged_buff.clone(),
schema_desc.root_schema_ptr(),
writer_props,
)?;

while let Some(task) = serialize_rx.recv().await {
let result = task.join_unwind().await;
let mut rg_out = parquet_writer.next_row_group()?;
let (serialized_columns, mut rg_reservation, _cnt) =
result.map_err(|e| DataFusionError::ExecutionJoin(Box::new(e)))??;

let mut rg_out = parquet_writer.next_row_group()?;
for chunk in serialized_columns {
chunk.append_to_row_group(&mut rg_out)?;
rg_reservation.free();
Expand Down Expand Up @@ -1661,6 +1636,7 @@ async fn output_single_parquet_file_parallelized(
data: Receiver<RecordBatch>,
output_schema: Arc<Schema>,
parquet_props: &WriterProperties,
skip_arrow_metadata: bool,
parallel_options: ParallelParquetWriterOptions,
pool: Arc<dyn MemoryPool>,
) -> Result<FileMetaData> {
Expand All @@ -1670,7 +1646,19 @@ async fn output_single_parquet_file_parallelized(
mpsc::channel::<SpawnedTask<RBStreamSerializeResult>>(max_rowgroups);

let arc_props = Arc::new(parquet_props.clone());
let merged_buff = SharedBuffer::new(INITIAL_BUFFER_BYTES);
let options = ArrowWriterOptions::new()
.with_properties(parquet_props.clone())
.with_skip_arrow_metadata(skip_arrow_metadata);
let writer = ArrowWriter::try_new_with_options(
merged_buff.clone(),
Arc::clone(&output_schema),
options,
)?;
let (writer, row_group_writer_factory) = writer.into_serialized_writer()?;

let launch_serialization_task = spawn_parquet_parallel_serialization_task(
row_group_writer_factory,
data,
serialize_tx,
Arc::clone(&output_schema),
Expand All @@ -1679,9 +1667,9 @@ async fn output_single_parquet_file_parallelized(
Arc::clone(&pool),
);
let file_metadata = concatenate_parallel_row_groups(
writer,
merged_buff,
serialize_rx,
Arc::clone(&output_schema),
Arc::clone(&arc_props),
object_store_writer,
pool,
)
Expand Down