diff --git a/datafusion/core/src/datasource/file_format/arrow.rs b/datafusion/core/src/datasource/file_format/arrow.rs new file mode 100644 index 0000000000000..2a27468f4591f --- /dev/null +++ b/datafusion/core/src/datasource/file_format/arrow.rs @@ -0,0 +1,96 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Apache Arrow format abstractions +//! +//! Works with files following the [Arrow IPC format](https://arrow.apache.org/docs/format/Columnar.html#ipc-file-format) + +use crate::datasource::file_format::FileFormat; +use crate::error::Result; +use crate::execution::context::SessionState; +use crate::physical_plan::file_format::{ArrowExec, FileScanConfig}; +use crate::physical_plan::ExecutionPlan; +use arrow::ipc::reader::FileReader; +use arrow_schema::{Schema, SchemaRef}; +use async_trait::async_trait; +use datafusion_common::Statistics; +use datafusion_physical_expr::PhysicalExpr; +use object_store::{GetResult, ObjectMeta, ObjectStore}; +use std::any::Any; +use std::io::{Read, Seek}; +use std::sync::Arc; + +/// The default file extension of arrow files +pub const DEFAULT_ARROW_EXTENSION: &str = ".arrow"; +/// Arrow `FileFormat` implementation. +#[derive(Default, Debug)] +pub struct ArrowFormat; + +#[async_trait] +impl FileFormat for ArrowFormat { + fn as_any(&self) -> &dyn Any { + self + } + + async fn infer_schema( + &self, + _state: &SessionState, + store: &Arc, + objects: &[ObjectMeta], + ) -> Result { + let mut schemas = vec![]; + for object in objects { + let schema = match store.get(&object.location).await? { + GetResult::File(mut file, _) => read_arrow_schema_from_reader(&mut file)?, + r @ GetResult::Stream(_) => { + // TODO: Fetching entire file to get schema is potentially wasteful + let data = r.bytes().await?; + let mut cursor = std::io::Cursor::new(&data); + read_arrow_schema_from_reader(&mut cursor)? + } + }; + schemas.push(schema.as_ref().clone()); + } + let merged_schema = Schema::try_merge(schemas)?; + Ok(Arc::new(merged_schema)) + } + + async fn infer_stats( + &self, + _state: &SessionState, + _store: &Arc, + _table_schema: SchemaRef, + _object: &ObjectMeta, + ) -> Result { + Ok(Statistics::default()) + } + + async fn create_physical_plan( + &self, + _state: &SessionState, + conf: FileScanConfig, + _filters: Option<&Arc>, + ) -> Result> { + let exec = ArrowExec::new(conf); + Ok(Arc::new(exec)) + } +} + +fn read_arrow_schema_from_reader(reader: R) -> Result { + let reader = FileReader::try_new(reader, None)?; + Ok(reader.schema()) +} diff --git a/datafusion/core/src/datasource/file_format/file_type.rs b/datafusion/core/src/datasource/file_format/file_type.rs index e72b1e579e4a0..58877c7a82e1d 100644 --- a/datafusion/core/src/datasource/file_format/file_type.rs +++ b/datafusion/core/src/datasource/file_format/file_type.rs @@ -38,6 +38,7 @@ use datafusion_common::parsers::CompressionTypeVariant; #[cfg(feature = "compression")] use flate2::read::MultiGzDecoder; +use crate::datasource::file_format::arrow::DEFAULT_ARROW_EXTENSION; use futures::stream::BoxStream; use futures::StreamExt; #[cfg(feature = "compression")] @@ -211,6 +212,8 @@ impl FileCompressionType { /// Readable file type #[derive(Debug, Clone, PartialEq, Eq)] pub enum FileType { + /// Apache Arrow file + ARROW, /// Apache Avro file AVRO, /// Apache Parquet file @@ -224,6 +227,7 @@ pub enum FileType { impl GetExt for FileType { fn get_ext(&self) -> String { match self { + FileType::ARROW => DEFAULT_ARROW_EXTENSION.to_owned(), FileType::AVRO => DEFAULT_AVRO_EXTENSION.to_owned(), FileType::PARQUET => DEFAULT_PARQUET_EXTENSION.to_owned(), FileType::CSV => DEFAULT_CSV_EXTENSION.to_owned(), @@ -238,6 +242,7 @@ impl FromStr for FileType { fn from_str(s: &str) -> Result { let s = s.to_uppercase(); match s.as_str() { + "ARROW" => Ok(FileType::ARROW), "AVRO" => Ok(FileType::AVRO), "PARQUET" => Ok(FileType::PARQUET), "CSV" => Ok(FileType::CSV), @@ -256,7 +261,7 @@ impl FileType { match self { FileType::JSON | FileType::CSV => Ok(format!("{}{}", ext, c.get_ext())), - FileType::PARQUET | FileType::AVRO => match c.variant { + FileType::PARQUET | FileType::AVRO | FileType::ARROW => match c.variant { UNCOMPRESSED => Ok(ext), _ => Err(DataFusionError::Internal( "FileCompressionType can be specified for CSV/JSON FileType.".into(), diff --git a/datafusion/core/src/datasource/file_format/mod.rs b/datafusion/core/src/datasource/file_format/mod.rs index 52da7285e3734..28f798ade4a8b 100644 --- a/datafusion/core/src/datasource/file_format/mod.rs +++ b/datafusion/core/src/datasource/file_format/mod.rs @@ -20,6 +20,7 @@ /// Default max records to scan to infer the schema pub const DEFAULT_SCHEMA_INFER_MAX_RECORD: usize = 1000; +pub mod arrow; pub mod avro; pub mod csv; pub mod file_type; diff --git a/datafusion/core/src/datasource/file_format/options.rs b/datafusion/core/src/datasource/file_format/options.rs index e51edf829e857..4024da331353b 100644 --- a/datafusion/core/src/datasource/file_format/options.rs +++ b/datafusion/core/src/datasource/file_format/options.rs @@ -23,6 +23,7 @@ use arrow::datatypes::{DataType, Schema, SchemaRef}; use async_trait::async_trait; use datafusion_common::DataFusionError; +use crate::datasource::file_format::arrow::{ArrowFormat, DEFAULT_ARROW_EXTENSION}; use crate::datasource::file_format::avro::DEFAULT_AVRO_EXTENSION; use crate::datasource::file_format::csv::DEFAULT_CSV_EXTENSION; use crate::datasource::file_format::file_type::FileCompressionType; @@ -214,6 +215,52 @@ impl<'a> ParquetReadOptions<'a> { } } +/// Options that control the reading of ARROW files. +/// +/// Note this structure is supplied when a datasource is created and +/// can not not vary from statement to statement. For settings that +/// can vary statement to statement see +/// [`ConfigOptions`](crate::config::ConfigOptions). +#[derive(Clone)] +pub struct ArrowReadOptions<'a> { + /// The data source schema. + pub schema: Option<&'a Schema>, + + /// File extension; only files with this extension are selected for data input. + /// Defaults to `FileType::ARROW.get_ext().as_str()`. + pub file_extension: &'a str, + + /// Partition Columns + pub table_partition_cols: Vec<(String, DataType)>, +} + +impl<'a> Default for ArrowReadOptions<'a> { + fn default() -> Self { + Self { + schema: None, + file_extension: DEFAULT_ARROW_EXTENSION, + table_partition_cols: vec![], + } + } +} + +impl<'a> ArrowReadOptions<'a> { + /// Specify table_partition_cols for partition pruning + pub fn table_partition_cols( + mut self, + table_partition_cols: Vec<(String, DataType)>, + ) -> Self { + self.table_partition_cols = table_partition_cols; + self + } + + /// Specify schema to use for AVRO read + pub fn schema(mut self, schema: &'a Schema) -> Self { + self.schema = Some(schema); + self + } +} + /// Options that control the reading of AVRO files. /// /// Note this structure is supplied when a datasource is created and @@ -484,3 +531,25 @@ impl ReadOptions<'_> for AvroReadOptions<'_> { .await } } + +#[async_trait] +impl ReadOptions<'_> for ArrowReadOptions<'_> { + fn to_listing_options(&self, config: &SessionConfig) -> ListingOptions { + let file_format = ArrowFormat::default(); + + ListingOptions::new(Arc::new(file_format)) + .with_file_extension(self.file_extension) + .with_target_partitions(config.target_partitions()) + .with_table_partition_cols(self.table_partition_cols.clone()) + } + + async fn get_resolved_schema( + &self, + config: &SessionConfig, + state: SessionState, + table_path: ListingTableUrl, + ) -> Result { + self._get_resolved_schema(config, state, table_path, self.schema, false) + .await + } +} diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 94da84ce3b080..8bb3818c91bbb 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -35,8 +35,8 @@ use object_store::ObjectMeta; use crate::datasource::file_format::file_type::{FileCompressionType, FileType}; use crate::datasource::{ file_format::{ - avro::AvroFormat, csv::CsvFormat, json::JsonFormat, parquet::ParquetFormat, - FileFormat, + arrow::ArrowFormat, avro::AvroFormat, csv::CsvFormat, json::JsonFormat, + parquet::ParquetFormat, FileFormat, }, get_statistics_with_limit, listing::ListingTableUrl, @@ -135,6 +135,7 @@ impl ListingTableConfig { .map_err(|_| DataFusionError::Internal(err_msg))?; let file_format: Arc = match file_type { + FileType::ARROW => Arc::new(ArrowFormat::default()), FileType::AVRO => Arc::new(AvroFormat::default()), FileType::CSV => Arc::new( CsvFormat::default().with_file_compression_type(file_compression_type), diff --git a/datafusion/core/src/datasource/listing_table_factory.rs b/datafusion/core/src/datasource/listing_table_factory.rs index 44595e5122c1e..1e2d4051d5962 100644 --- a/datafusion/core/src/datasource/listing_table_factory.rs +++ b/datafusion/core/src/datasource/listing_table_factory.rs @@ -27,6 +27,7 @@ use datafusion_common::DataFusionError; use datafusion_expr::CreateExternalTable; use crate::datasource::datasource::TableProviderFactory; +use crate::datasource::file_format::arrow::ArrowFormat; use crate::datasource::file_format::avro::AvroFormat; use crate::datasource::file_format::csv::CsvFormat; use crate::datasource::file_format::file_type::{FileCompressionType, FileType}; @@ -81,6 +82,7 @@ impl TableProviderFactory for ListingTableFactory { FileType::JSON => Arc::new( JsonFormat::default().with_file_compression_type(file_compression_type), ), + FileType::ARROW => Arc::new(ArrowFormat::default()), }; let (provided_schema, table_partition_cols) = if cmd.schema.fields().is_empty() { diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs index d76e26e1b77c7..0db85c99ea2b3 100644 --- a/datafusion/core/src/execution/context.rs +++ b/datafusion/core/src/execution/context.rs @@ -114,6 +114,7 @@ use datafusion_sql::planner::object_name_to_table_reference; use uuid::Uuid; // backwards compatibility +use crate::execution::options::ArrowReadOptions; use crate::physical_optimizer::combine_partial_final_agg::CombinePartialFinalAggregate; pub use datafusion_execution::config::SessionConfig; pub use datafusion_execution::TaskContext; @@ -844,6 +845,20 @@ impl SessionContext { self._read_type(table_paths, options).await } + /// Creates a [`DataFrame`] for reading an Arrow data source. + /// + /// For more control such as reading multiple files, you can use + /// [`read_table`](Self::read_table) with a [`ListingTable`]. + /// + /// For an example, see [`read_csv`](Self::read_csv) + pub async fn read_arrow( + &self, + table_paths: P, + options: ArrowReadOptions<'_>, + ) -> Result { + self._read_type(table_paths, options).await + } + /// Creates an empty DataFrame. pub fn read_empty(&self) -> Result { Ok(DataFrame::new( @@ -1034,6 +1049,27 @@ impl SessionContext { Ok(()) } + /// Registers an Arrow file as a table that can be referenced from + /// SQL statements executed against this context. + pub async fn register_arrow( + &self, + name: &str, + table_path: &str, + options: ArrowReadOptions<'_>, + ) -> Result<()> { + let listing_options = options.to_listing_options(&self.copied_config()); + + self.register_listing_table( + name, + table_path, + listing_options, + options.schema.map(|s| Arc::new(s.to_owned())), + None, + ) + .await?; + Ok(()) + } + /// Registers a named catalog using a custom `CatalogProvider` so that /// it can be referenced from SQL statements executed against this /// context. @@ -1360,6 +1396,7 @@ impl SessionState { table_factories.insert("JSON".into(), Arc::new(ListingTableFactory::new())); table_factories.insert("NDJSON".into(), Arc::new(ListingTableFactory::new())); table_factories.insert("AVRO".into(), Arc::new(ListingTableFactory::new())); + table_factories.insert("ARROW".into(), Arc::new(ListingTableFactory::new())); if config.create_default_catalog_and_schema() { let default_catalog = MemoryCatalogProvider::new(); diff --git a/datafusion/core/src/physical_plan/file_format/arrow_file.rs b/datafusion/core/src/physical_plan/file_format/arrow_file.rs new file mode 100644 index 0000000000000..d229031d37bdf --- /dev/null +++ b/datafusion/core/src/physical_plan/file_format/arrow_file.rs @@ -0,0 +1,166 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Execution plan for reading Arrow files +use crate::error::Result; +use crate::physical_plan::file_format::{ + FileMeta, FileOpenFuture, FileOpener, FileScanConfig, +}; +use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet}; +use crate::physical_plan::{ + DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, +}; +use arrow_schema::SchemaRef; +use datafusion_common::Statistics; +use datafusion_execution::TaskContext; +use datafusion_physical_expr::PhysicalSortExpr; +use futures::StreamExt; +use object_store::{GetResult, ObjectStore}; +use std::any::Any; +use std::sync::Arc; + +/// Execution plan for scanning Arrow data source +#[derive(Debug, Clone)] +#[allow(dead_code)] +pub struct ArrowExec { + base_config: FileScanConfig, + projected_statistics: Statistics, + projected_schema: SchemaRef, + projected_output_ordering: Option>, + /// Execution metrics + metrics: ExecutionPlanMetricsSet, +} + +impl ArrowExec { + /// Create a new Arrow reader execution plan provided base configurations + pub fn new(base_config: FileScanConfig) -> Self { + let (projected_schema, projected_statistics, projected_output_ordering) = + base_config.project(); + + Self { + base_config, + projected_schema, + projected_statistics, + projected_output_ordering, + metrics: ExecutionPlanMetricsSet::new(), + } + } + /// Ref to the base configs + pub fn base_config(&self) -> &FileScanConfig { + &self.base_config + } +} + +impl ExecutionPlan for ArrowExec { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.projected_schema.clone() + } + + fn output_partitioning(&self) -> Partitioning { + Partitioning::UnknownPartitioning(self.base_config.file_groups.len()) + } + + fn unbounded_output(&self, _: &[bool]) -> Result { + Ok(self.base_config().infinite_source) + } + + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + self.projected_output_ordering.as_deref() + } + + fn children(&self) -> Vec> { + Vec::new() + } + + fn with_new_children( + self: Arc, + _: Vec>, + ) -> Result> { + Ok(self) + } + + fn execute( + &self, + partition: usize, + context: Arc, + ) -> Result { + use super::file_stream::FileStream; + let object_store = context + .runtime_env() + .object_store(&self.base_config.object_store_url)?; + + let opener = ArrowOpener { + object_store, + projection: self.base_config.projection.clone(), + }; + let stream = + FileStream::new(&self.base_config, partition, opener, &self.metrics)?; + Ok(Box::pin(stream)) + } + + fn metrics(&self) -> Option { + Some(self.metrics.clone_inner()) + } + + fn fmt_as( + &self, + t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + match t { + DisplayFormatType::Default => { + write!(f, "ArrowExec: {}", self.base_config) + } + } + } + + fn statistics(&self) -> Statistics { + self.projected_statistics.clone() + } +} + +pub struct ArrowOpener { + pub object_store: Arc, + pub projection: Option>, +} + +impl FileOpener for ArrowOpener { + fn open(&self, file_meta: FileMeta) -> Result { + let object_store = self.object_store.clone(); + let projection = self.projection.clone(); + Ok(Box::pin(async move { + match object_store.get(file_meta.location()).await? { + GetResult::File(file, _) => { + let arrow_reader = + arrow::ipc::reader::FileReader::try_new(file, projection)?; + Ok(futures::stream::iter(arrow_reader).boxed()) + } + r @ GetResult::Stream(_) => { + let bytes = r.bytes().await?; + let cursor = std::io::Cursor::new(bytes); + let arrow_reader = + arrow::ipc::reader::FileReader::try_new(cursor, projection)?; + Ok(futures::stream::iter(arrow_reader).boxed()) + } + } + })) + } +} diff --git a/datafusion/core/src/physical_plan/file_format/mod.rs b/datafusion/core/src/physical_plan/file_format/mod.rs index aa0f79ced7bf8..1b235d62a3772 100644 --- a/datafusion/core/src/physical_plan/file_format/mod.rs +++ b/datafusion/core/src/physical_plan/file_format/mod.rs @@ -17,6 +17,7 @@ //! Execution plans that read file formats +mod arrow_file; mod avro; #[cfg(test)] mod chunked_store; @@ -35,6 +36,7 @@ use arrow::{ datatypes::{ArrowNativeType, DataType, Field, Schema, SchemaRef, UInt16Type}, record_batch::RecordBatch, }; +pub use arrow_file::ArrowExec; pub use avro::AvroExec; use datafusion_physical_expr::PhysicalSortExpr; pub use file_stream::{FileOpenFuture, FileOpener, FileStream}; diff --git a/datafusion/core/tests/data/example.arrow b/datafusion/core/tests/data/example.arrow new file mode 100644 index 0000000000000..5314d9eea1345 Binary files /dev/null and b/datafusion/core/tests/data/example.arrow differ diff --git a/datafusion/core/tests/sql/arrow_files.rs b/datafusion/core/tests/sql/arrow_files.rs new file mode 100644 index 0000000000000..e74294b312904 --- /dev/null +++ b/datafusion/core/tests/sql/arrow_files.rs @@ -0,0 +1,70 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +use datafusion::execution::options::ArrowReadOptions; + +use super::*; + +async fn register_arrow(ctx: &mut SessionContext) { + ctx.register_arrow( + "arrow_simple", + "tests/data/example.arrow", + ArrowReadOptions::default(), + ) + .await + .unwrap(); +} + +#[tokio::test] +async fn arrow_query() { + let mut ctx = SessionContext::new(); + register_arrow(&mut ctx).await; + let sql = "SELECT * FROM arrow_simple"; + let actual = execute_to_batches(&ctx, sql).await; + let expected = vec![ + "+----+-----+-------+", + "| f0 | f1 | f2 |", + "+----+-----+-------+", + "| 1 | foo | true |", + "| 2 | bar | |", + "| 3 | baz | false |", + "| 4 | | true |", + "+----+-----+-------+", + ]; + + assert_batches_eq!(expected, &actual); +} + +#[tokio::test] +async fn arrow_explain() { + let mut ctx = SessionContext::new(); + register_arrow(&mut ctx).await; + let sql = "EXPLAIN SELECT * FROM arrow_simple"; + let actual = execute(&ctx, sql).await; + let actual = normalize_vec_for_explain(actual); + let expected = vec![ + vec![ + "logical_plan", + "TableScan: arrow_simple projection=[f0, f1, f2]", + ], + vec![ + "physical_plan", + "ArrowExec: file_groups={1 group: [[WORKING_DIR/tests/data/example.arrow]]}, projection=[f0, f1, f2]\n", + ], + ]; + + assert_eq!(expected, actual); +} diff --git a/datafusion/core/tests/sql/mod.rs b/datafusion/core/tests/sql/mod.rs index bd9e213dfe216..7947bffa5cb7c 100644 --- a/datafusion/core/tests/sql/mod.rs +++ b/datafusion/core/tests/sql/mod.rs @@ -79,6 +79,7 @@ macro_rules! test_expression { } pub mod aggregates; +pub mod arrow_files; #[cfg(feature = "avro")] pub mod avro; pub mod create_drop; diff --git a/datafusion/core/tests/sqllogictests/test_files/ddl.slt b/datafusion/core/tests/sqllogictests/test_files/ddl.slt index afb009f89f3c6..006641cac518a 100644 --- a/datafusion/core/tests/sqllogictests/test_files/ddl.slt +++ b/datafusion/core/tests/sqllogictests/test_files/ddl.slt @@ -310,6 +310,18 @@ DROP TABLE my_table; statement ok DROP TABLE aggregate_simple +# Arrow format +statement ok +CREATE external table arrow_simple STORED as ARROW LOCATION 'tests/data/example.arrow'; + +query ITB rowsort +SELECT * FROM arrow_simple order by f1 LIMIT 1 +---- +2 bar NULL + +statement ok +DROP TABLE arrow_simple + # create_table_with_schema_as_select_mismatch statement error table 'datafusion.public.aggregate_simple' not found CREATE TABLE my_table(c1 float, c2 double, c3 boolean, c4 varchar) AS SELECT * FROM aggregate_simple;