From c25a1e3ec63438e6372ff6dc6b08c41037a96b0e Mon Sep 17 00:00:00 2001 From: Matthew Turner Date: Mon, 28 Feb 2022 00:40:41 -0500 Subject: [PATCH 1/2] Add write_ipc to ExecutionContext --- datafusion/src/dataframe.rs | 15 +- datafusion/src/execution/context.rs | 14 +- .../src/physical_plan/file_format/ipc.rs | 173 ++++++++++++++++++ .../src/physical_plan/file_format/mod.rs | 2 + 4 files changed, 202 insertions(+), 2 deletions(-) create mode 100644 datafusion/src/physical_plan/file_format/ipc.rs diff --git a/datafusion/src/dataframe.rs b/datafusion/src/dataframe.rs index 7ea4fb5b6211a..162e7b6dbb934 100644 --- a/datafusion/src/dataframe.rs +++ b/datafusion/src/dataframe.rs @@ -17,6 +17,7 @@ //! DataFrame API for building and executing query plans. +use crate::arrow::ipc::writer::IpcWriteOptions; use crate::arrow::record_batch::RecordBatch; use crate::error::Result; use crate::logical_plan::{ @@ -35,7 +36,7 @@ use crate::arrow::util::pretty; use crate::datasource::TableProvider; use crate::datasource::TableType; use crate::execution::context::{ExecutionContext, ExecutionContextState}; -use crate::physical_plan::file_format::{plan_to_csv, plan_to_parquet}; +use crate::physical_plan::file_format::{plan_to_csv, plan_to_ipc, plan_to_parquet}; use crate::physical_plan::{collect, collect_partitioned}; use crate::physical_plan::{execute_stream, execute_stream_partitioned, ExecutionPlan}; use crate::scalar::ScalarValue; @@ -579,6 +580,18 @@ impl DataFrame { let ctx = ExecutionContext::from(Arc::new(Mutex::new(state))); plan_to_parquet(&ctx, plan, path, writer_properties).await } + + /// Write a `DataFrame` to a IPC file. + pub async fn write_ipc( + &self, + path: &str, + writer_properties: IpcWriteOptions, + ) -> Result<()> { + let plan = self.create_physical_plan().await?; + let state = self.ctx_state.lock().clone(); + let ctx = ExecutionContext::from(Arc::new(Mutex::new(state))); + plan_to_ipc(&ctx, plan, path, writer_properties).await + } } #[async_trait] diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs index 49644c11bb6b4..c16c5215af7c3 100644 --- a/datafusion/src/execution/context.rs +++ b/datafusion/src/execution/context.rs @@ -47,6 +47,7 @@ use std::string::String; use std::sync::Arc; use arrow::datatypes::{DataType, SchemaRef}; +use arrow::ipc::writer::IpcWriteOptions; use crate::catalog::{ catalog::{CatalogProvider, MemoryCatalogProvider}, @@ -77,7 +78,7 @@ use crate::physical_optimizer::repartition::Repartition; use crate::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; use crate::logical_plan::plan::Explain; -use crate::physical_plan::file_format::{plan_to_csv, plan_to_parquet}; +use crate::physical_plan::file_format::{plan_to_csv, plan_to_ipc, plan_to_parquet}; use crate::physical_plan::planner::DefaultPhysicalPlanner; use crate::physical_plan::udaf::AggregateUDF; use crate::physical_plan::udf::ScalarUDF; @@ -88,6 +89,7 @@ use crate::sql::{ planner::{ContextProvider, SqlToRel}, }; use crate::variable::{VarProvider, VarType}; + use async_trait::async_trait; use chrono::{DateTime, Utc}; use parquet::file::properties::WriterProperties; @@ -726,6 +728,16 @@ impl ExecutionContext { plan_to_parquet(self, plan, path, writer_properties).await } + /// Executes a query and writes the results to an Arrow IPC file. + pub async fn write_ipc( + &self, + plan: Arc, + path: impl AsRef, + writer_properties: IpcWriteOptions, + ) -> Result<()> { + plan_to_ipc(self, plan, path, writer_properties).await + } + /// Optimizes the logical plan by applying optimizer rules, and /// invoking observer function after each call fn optimize_internal( diff --git a/datafusion/src/physical_plan/file_format/ipc.rs b/datafusion/src/physical_plan/file_format/ipc.rs new file mode 100644 index 0000000000000..3d9497b51e501 --- /dev/null +++ b/datafusion/src/physical_plan/file_format/ipc.rs @@ -0,0 +1,173 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Execution plan for reading CSV files + +use crate::error::{DataFusionError, Result}; +use crate::execution::context::ExecutionContext; +use crate::physical_plan::ExecutionPlan; + +use arrow::ipc::writer::{FileWriter, IpcWriteOptions}; +use futures::{StreamExt, TryStreamExt}; +use std::fs; +use std::path::Path; +use std::sync::Arc; +use tokio::task::{self, JoinHandle}; + +pub async fn plan_to_ipc( + context: &ExecutionContext, + plan: Arc, + path: impl AsRef, + writer_options: IpcWriteOptions, +) -> Result<()> { + let path = path.as_ref(); + // create directory to contain the CSV files (one per partition) + let fs_path = Path::new(path); + let runtime = context.runtime_env(); + match fs::create_dir(fs_path) { + Ok(()) => { + let mut tasks = vec![]; + for i in 0..plan.output_partitioning().partition_count() { + let plan = plan.clone(); + let filename = format!("part-{}.csv", i); + let path = fs_path.join(&filename); + let file = fs::File::create(path)?; + let stream = plan.execute(i, runtime.clone()).await?; + let schema = stream.schema(); + let mut writer = FileWriter::try_new_with_options( + file, + schema.as_ref(), + writer_options.clone(), + )?; + let handle: JoinHandle> = task::spawn(async move { + stream + .map(|batch| writer.write(&batch?)) + .try_collect() + .await + .map_err(DataFusionError::from) + }); + tasks.push(handle); + } + futures::future::join_all(tasks).await; + Ok(()) + } + Err(e) => Err(DataFusionError::Execution(format!( + "Could not create directory {}: {:?}", + path, e + ))), + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::prelude::*; + use crate::test_util::aggr_test_schema_with_missing_col; + use crate::{ + datasource::object_store::local::{local_unpartitioned_file, LocalFileSystem}, + scalar::ScalarValue, + test_util::aggr_test_schema, + }; + use arrow::datatypes::*; + use futures::StreamExt; + use std::fs::File; + use std::io::Write; + use tempfile::TempDir; + + /// Generate CSV partitions within the supplied directory + fn populate_csv_partitions( + tmp_dir: &TempDir, + partition_count: usize, + file_extension: &str, + ) -> Result { + // define schema for data source (csv file) + let schema = Arc::new(Schema::new(vec![ + Field::new("c1", DataType::UInt32, false), + Field::new("c2", DataType::UInt64, false), + Field::new("c3", DataType::Boolean, false), + ])); + + // generate a partitioned file + for partition in 0..partition_count { + let filename = format!("partition-{}.{}", partition, file_extension); + let file_path = tmp_dir.path().join(&filename); + let mut file = File::create(file_path)?; + + // generate some data + for i in 0..=10 { + let data = format!("{},{},{}\n", partition, i, i % 2 == 0); + file.write_all(data.as_bytes())?; + } + } + + Ok(schema) + } + + #[tokio::test] + async fn write_ipc_results() -> Result<()> { + // create partitioned input file and context + let tmp_dir = TempDir::new()?; + let mut ctx = ExecutionContext::with_config( + ExecutionConfig::new().with_target_partitions(8), + ); + + let schema = populate_csv_partitions(&tmp_dir, 8, ".csv")?; + + // register csv file with the execution context + ctx.register_csv( + "test", + tmp_dir.path().to_str().unwrap(), + CsvReadOptions::new().schema(&schema), + ) + .await?; + + // execute a simple query and write the results to CSV + let out_dir = tmp_dir.as_ref().to_str().unwrap().to_string() + "/out"; + let df = ctx.sql("SELECT c1, c2 FROM test").await?; + df.write_csv(&out_dir).await?; + + // create a new context and verify that the results were saved to a partitioned csv file + let mut ctx = ExecutionContext::new(); + + let schema = Arc::new(Schema::new(vec![ + Field::new("c1", DataType::UInt32, false), + Field::new("c2", DataType::UInt64, false), + ])); + + // register each partition as well as the top level dir + let csv_read_option = CsvReadOptions::new().schema(&schema); + ctx.register_csv("part0", &format!("{}/part-0.csv", out_dir), csv_read_option) + .await?; + ctx.register_csv("allparts", &out_dir, csv_read_option) + .await?; + + let part0 = ctx.sql("SELECT c1, c2 FROM part0").await?.collect().await?; + let allparts = ctx + .sql("SELECT c1, c2 FROM allparts") + .await? + .collect() + .await?; + + let allparts_count: usize = allparts.iter().map(|batch| batch.num_rows()).sum(); + + assert_eq!(part0[0].schema(), allparts[0].schema()); + + assert_eq!(allparts_count, 80); + + Ok(()) + } +} diff --git a/datafusion/src/physical_plan/file_format/mod.rs b/datafusion/src/physical_plan/file_format/mod.rs index 0e1e8596c7cb9..d78911c889ff5 100644 --- a/datafusion/src/physical_plan/file_format/mod.rs +++ b/datafusion/src/physical_plan/file_format/mod.rs @@ -20,6 +20,7 @@ mod avro; mod csv; mod file_stream; +mod ipc; mod json; mod parquet; @@ -35,6 +36,7 @@ use arrow::{ pub use avro::AvroExec; pub(crate) use csv::plan_to_csv; pub use csv::CsvExec; +pub(crate) use ipc::plan_to_ipc; pub use json::NdJsonExec; use crate::error::DataFusionError; From c9930e72daf7070b046591129f33306d32c126cf Mon Sep 17 00:00:00 2001 From: Matthew Turner Date: Tue, 15 Mar 2022 10:36:34 -0400 Subject: [PATCH 2/2] Update test --- .../src/physical_plan/file_format/ipc.rs | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/datafusion/src/physical_plan/file_format/ipc.rs b/datafusion/src/physical_plan/file_format/ipc.rs index 3d9497b51e501..1881a8d8ac0cd 100644 --- a/datafusion/src/physical_plan/file_format/ipc.rs +++ b/datafusion/src/physical_plan/file_format/ipc.rs @@ -43,7 +43,7 @@ pub async fn plan_to_ipc( let mut tasks = vec![]; for i in 0..plan.output_partitioning().partition_count() { let plan = plan.clone(); - let filename = format!("part-{}.csv", i); + let filename = format!("part-{}.arrow", i); let path = fs_path.join(&filename); let file = fs::File::create(path)?; let stream = plan.execute(i, runtime.clone()).await?; @@ -76,14 +76,8 @@ pub async fn plan_to_ipc( mod tests { use super::*; use crate::prelude::*; - use crate::test_util::aggr_test_schema_with_missing_col; - use crate::{ - datasource::object_store::local::{local_unpartitioned_file, LocalFileSystem}, - scalar::ScalarValue, - test_util::aggr_test_schema, - }; use arrow::datatypes::*; - use futures::StreamExt; + use arrow::ipc::writer::IpcWriteOptions; use std::fs::File; use std::io::Write; use tempfile::TempDir; @@ -138,7 +132,9 @@ mod tests { // execute a simple query and write the results to CSV let out_dir = tmp_dir.as_ref().to_str().unwrap().to_string() + "/out"; let df = ctx.sql("SELECT c1, c2 FROM test").await?; - df.write_csv(&out_dir).await?; + let opts = IpcWriteOptions::default(); + + df.write_ipc(&out_dir, opts).await?; // create a new context and verify that the results were saved to a partitioned csv file let mut ctx = ExecutionContext::new(); @@ -150,8 +146,12 @@ mod tests { // register each partition as well as the top level dir let csv_read_option = CsvReadOptions::new().schema(&schema); - ctx.register_csv("part0", &format!("{}/part-0.csv", out_dir), csv_read_option) - .await?; + ctx.register_csv( + "part0", + &format!("{}/part-0.arrow", out_dir), + csv_read_option, + ) + .await?; ctx.register_csv("allparts", &out_dir, csv_read_option) .await?;