-
Notifications
You must be signed in to change notification settings - Fork 1.8k
fix: create file for empty stream #16342
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||
|---|---|---|---|---|
|
|
@@ -22,12 +22,14 @@ use crate::sink::DataSink; | |||
| use crate::write::demux::{start_demuxer_task, DemuxedStreamReceiver}; | ||||
| use crate::ListingTableUrl; | ||||
|
|
||||
| use arrow::array::RecordBatch; | ||||
| use arrow::datatypes::{DataType, SchemaRef}; | ||||
| use datafusion_common::Result; | ||||
| use datafusion_common_runtime::SpawnedTask; | ||||
| use datafusion_execution::object_store::ObjectStoreUrl; | ||||
| use datafusion_execution::{SendableRecordBatchStream, TaskContext}; | ||||
| use datafusion_expr::dml::InsertOp; | ||||
| use datafusion_physical_plan::stream::RecordBatchStreamAdapter; | ||||
|
|
||||
| use async_trait::async_trait; | ||||
| use object_store::ObjectStore; | ||||
|
|
@@ -77,13 +79,34 @@ pub trait FileSink: DataSink { | |||
| .runtime_env() | ||||
| .object_store(&config.object_store_url)?; | ||||
| let (demux_task, file_stream_rx) = start_demuxer_task(config, data, context); | ||||
| self.spawn_writer_tasks_and_join( | ||||
| context, | ||||
| demux_task, | ||||
| file_stream_rx, | ||||
| object_store, | ||||
| ) | ||||
| .await | ||||
| let mut num_rows = self | ||||
| .spawn_writer_tasks_and_join( | ||||
| context, | ||||
| demux_task, | ||||
| file_stream_rx, | ||||
| Arc::clone(&object_store), | ||||
| ) | ||||
| .await?; | ||||
| if num_rows == 0 { | ||||
alamb marked this conversation as resolved.
Show resolved
Hide resolved
|
||||
| // If no rows were written, then no files are output either. | ||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You say now row => no file was created. But then you say write an empty recordbatch => ensure a file gets created. Except an empty recordbatch has no rows (at least when written to a parquet file). Your 2 sentences don't make sense together. In practice, this PR caused a regression: we cannot write empty recordbatch to parquet anymore, as the code here tries to write it a second time, and we get an error.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hi, I didn't reproduce you problem. could you please share your test. I will check it ASAP
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @alamb I think we need to define 'empty' in datafusion clearly. currently it's vec![]
vec![empty_record_batch].
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Indeed, I write one empty RecordBatch. Here is a short repro of the issue: brunal@aed5316. I don't think num_rows should be used to determine whether a file was created. |
||||
| // In this case, send an empty record batch through to ensure the output file is generated | ||||
| let schema = Arc::clone(&config.output_schema); | ||||
| let empty_batch = RecordBatch::new_empty(Arc::clone(&schema)); | ||||
| let data = Box::pin(RecordBatchStreamAdapter::new( | ||||
| schema, | ||||
| futures::stream::iter(vec![Ok(empty_batch)]), | ||||
| )); | ||||
| let (demux_task, file_stream_rx) = start_demuxer_task(config, data, context); | ||||
| num_rows = self | ||||
| .spawn_writer_tasks_and_join( | ||||
| context, | ||||
| demux_task, | ||||
| file_stream_rx, | ||||
| Arc::clone(&object_store), | ||||
| ) | ||||
| .await?; | ||||
| } | ||||
| Ok(num_rows) | ||||
| } | ||||
| } | ||||
|
|
||||
|
|
||||
Uh oh!
There was an error while loading. Please reload this page.