diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs index efec07abbca0..9022e340cd36 100644 --- a/datafusion/core/src/datasource/file_format/csv.rs +++ b/datafusion/core/src/datasource/file_format/csv.rs @@ -33,6 +33,7 @@ mod tests { use arrow_schema::{DataType, Field, Schema, SchemaRef}; use datafusion_catalog::Session; use datafusion_common::cast::as_string_array; + use datafusion_common::config::CsvOptions; use datafusion_common::internal_err; use datafusion_common::stats::Precision; use datafusion_common::test_util::{arrow_test_data, batches_to_string}; @@ -795,6 +796,62 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_csv_write_empty_file() -> Result<()> { + // Case 1. write to a single file + // Expect: an empty file created + let tmp_dir = tempfile::TempDir::new().unwrap(); + let path = format!("{}/empty.csv", tmp_dir.path().to_string_lossy()); + + let ctx = SessionContext::new(); + + let df = ctx.sql("SELECT 1 limit 0").await?; + + let cfg1 = + crate::dataframe::DataFrameWriteOptions::new().with_single_file_output(true); + let cfg2 = CsvOptions::default().with_has_header(true); + + df.write_csv(&path, cfg1, Some(cfg2)).await?; + assert!(std::path::Path::new(&path).exists()); + + // Case 2. write to a directory without partition columns + // Expect: under the directory, an empty file is created + let tmp_dir = tempfile::TempDir::new().unwrap(); + let path = format!("{}", tmp_dir.path().to_string_lossy()); + + let cfg1 = + crate::dataframe::DataFrameWriteOptions::new().with_single_file_output(true); + let cfg2 = CsvOptions::default().with_has_header(true); + + let df = ctx.sql("SELECT 1 limit 0").await?; + + df.write_csv(&path, cfg1, Some(cfg2)).await?; + assert!(std::path::Path::new(&path).exists()); + + let files = std::fs::read_dir(&path).unwrap(); + assert!(files.count() == 1); + + // Case 3. write to a directory with partition columns + // Expect: No file is created + let tmp_dir = tempfile::TempDir::new().unwrap(); + let path = format!("{}", tmp_dir.path().to_string_lossy()); + + let df = ctx.sql("SELECT 1 as col1, 2 as col2 limit 0").await?; + + let cfg1 = crate::dataframe::DataFrameWriteOptions::new() + .with_single_file_output(true) + .with_partition_by(vec!["col1".to_string()]); + let cfg2 = CsvOptions::default().with_has_header(true); + + df.write_csv(&path, cfg1, Some(cfg2)).await?; + + assert!(std::path::Path::new(&path).exists()); + let files = std::fs::read_dir(&path).unwrap(); + assert!(files.count() == 0); + + Ok(()) + } + /// Read a single empty csv file with header /// /// empty.csv: diff --git a/datafusion/core/src/datasource/file_format/json.rs b/datafusion/core/src/datasource/file_format/json.rs index 34d3d64f07fb..d818187bb307 100644 --- a/datafusion/core/src/datasource/file_format/json.rs +++ b/datafusion/core/src/datasource/file_format/json.rs @@ -31,6 +31,7 @@ mod tests { use arrow_schema::Schema; use bytes::Bytes; use datafusion_catalog::Session; + use datafusion_common::config::JsonOptions; use datafusion_common::test_util::batches_to_string; use datafusion_datasource::decoder::{ BatchDeserializer, DecoderDeserializer, DeserializerOutput, @@ -257,6 +258,61 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_json_write_empty_file() -> Result<()> { + // Case 1. write to a single file + // Expect: an empty file created + let tmp_dir = tempfile::TempDir::new().unwrap(); + let path = format!("{}/empty.json", tmp_dir.path().to_string_lossy()); + + let ctx = SessionContext::new(); + + let df = ctx.sql("SELECT 1 limit 0").await?; + + let cfg1 = + crate::dataframe::DataFrameWriteOptions::new().with_single_file_output(true); + let cfg2 = JsonOptions::default(); + + df.write_json(&path, cfg1, Some(cfg2)).await?; + assert!(std::path::Path::new(&path).exists()); + + // Case 2. write to a directory without partition columns + // Expect: under the directory, an empty file is created + let tmp_dir = tempfile::TempDir::new().unwrap(); + let path = format!("{}", tmp_dir.path().to_string_lossy()); + + let cfg1 = + crate::dataframe::DataFrameWriteOptions::new().with_single_file_output(true); + let cfg2 = JsonOptions::default(); + + let df = ctx.sql("SELECT 1 limit 0").await?; + + df.write_json(&path, cfg1, Some(cfg2)).await?; + assert!(std::path::Path::new(&path).exists()); + + let files = std::fs::read_dir(&path).unwrap(); + assert!(files.count() == 1); + + // Case 3. write to a directory with partition columns + // Expect: No file is created + let tmp_dir = tempfile::TempDir::new().unwrap(); + let path = format!("{}", tmp_dir.path().to_string_lossy()); + + let df = ctx.sql("SELECT 1 as col1, 2 as col2 limit 0").await?; + + let cfg1 = crate::dataframe::DataFrameWriteOptions::new() + .with_single_file_output(true) + .with_partition_by(vec!["col1".to_string()]); + let cfg2 = JsonOptions::default(); + + df.write_json(&path, cfg1, Some(cfg2)).await?; + + assert!(std::path::Path::new(&path).exists()); + let files = std::fs::read_dir(&path).unwrap(); + assert!(files.count() == 0); + Ok(()) + } + #[test] fn test_json_deserializer_finish() -> Result<()> { let schema = Arc::new(Schema::new(vec![ diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 6a5c19829c1c..dcd20f8a267d 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -1246,6 +1246,61 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_parquet_write_empty_file() -> Result<()> { + // Case 1. write to a single file + // Expect: an empty file created + let tmp_dir = tempfile::TempDir::new().unwrap(); + let path = format!("{}/empty.parquet", tmp_dir.path().to_string_lossy()); + + let ctx = SessionContext::new(); + + let df = ctx.sql("SELECT 1 limit 0").await?; + + let cfg1 = + crate::dataframe::DataFrameWriteOptions::new().with_single_file_output(true); + let cfg2 = TableParquetOptions::default(); + + df.write_parquet(&path, cfg1, Some(cfg2)).await?; + assert!(std::path::Path::new(&path).exists()); + + // Case 2. write to a directory without partition columns + // Expect: under the directory, an empty file is created + let tmp_dir = tempfile::TempDir::new().unwrap(); + let path = format!("{}", tmp_dir.path().to_string_lossy()); + + let cfg1 = + crate::dataframe::DataFrameWriteOptions::new().with_single_file_output(true); + let cfg2 = TableParquetOptions::default(); + + let df = ctx.sql("SELECT 1 limit 0").await?; + + df.write_parquet(&path, cfg1, Some(cfg2)).await?; + assert!(std::path::Path::new(&path).exists()); + + let files = std::fs::read_dir(&path).unwrap(); + assert!(files.count() == 1); + + // Case 3. write to a directory with partition columns + // Expect: No file is created + let tmp_dir = tempfile::TempDir::new().unwrap(); + let path = format!("{}", tmp_dir.path().to_string_lossy()); + + let df = ctx.sql("SELECT 1 as col1, 2 as col2 limit 0").await?; + + let cfg1 = crate::dataframe::DataFrameWriteOptions::new() + .with_single_file_output(true) + .with_partition_by(vec!["col1".to_string()]); + let cfg2 = TableParquetOptions::default(); + + df.write_parquet(&path, cfg1, Some(cfg2)).await?; + + assert!(std::path::Path::new(&path).exists()); + let files = std::fs::read_dir(&path).unwrap(); + assert!(files.count() == 0); + Ok(()) + } + #[tokio::test] async fn parquet_sink_write_insert_schema_into_metadata() -> Result<()> { // expected kv metadata without schema diff --git a/datafusion/datasource/src/file_sink_config.rs b/datafusion/datasource/src/file_sink_config.rs index 2968bd1ee044..8a86b11a4743 100644 --- a/datafusion/datasource/src/file_sink_config.rs +++ b/datafusion/datasource/src/file_sink_config.rs @@ -22,12 +22,14 @@ use crate::sink::DataSink; use crate::write::demux::{start_demuxer_task, DemuxedStreamReceiver}; use crate::ListingTableUrl; +use arrow::array::RecordBatch; use arrow::datatypes::{DataType, SchemaRef}; use datafusion_common::Result; use datafusion_common_runtime::SpawnedTask; use datafusion_execution::object_store::ObjectStoreUrl; use datafusion_execution::{SendableRecordBatchStream, TaskContext}; use datafusion_expr::dml::InsertOp; +use datafusion_physical_plan::stream::RecordBatchStreamAdapter; use async_trait::async_trait; use object_store::ObjectStore; @@ -77,13 +79,34 @@ pub trait FileSink: DataSink { .runtime_env() .object_store(&config.object_store_url)?; let (demux_task, file_stream_rx) = start_demuxer_task(config, data, context); - self.spawn_writer_tasks_and_join( - context, - demux_task, - file_stream_rx, - object_store, - ) - .await + let mut num_rows = self + .spawn_writer_tasks_and_join( + context, + demux_task, + file_stream_rx, + Arc::clone(&object_store), + ) + .await?; + if num_rows == 0 { + // If no rows were written, then no files are output either. + // In this case, send an empty record batch through to ensure the output file is generated + let schema = Arc::clone(&config.output_schema); + let empty_batch = RecordBatch::new_empty(Arc::clone(&schema)); + let data = Box::pin(RecordBatchStreamAdapter::new( + schema, + futures::stream::iter(vec![Ok(empty_batch)]), + )); + let (demux_task, file_stream_rx) = start_demuxer_task(config, data, context); + num_rows = self + .spawn_writer_tasks_and_join( + context, + demux_task, + file_stream_rx, + Arc::clone(&object_store), + ) + .await?; + } + Ok(num_rows) } }