diff --git a/ballista/rust/core/src/serde/logical_plan/mod.rs b/ballista/rust/core/src/serde/logical_plan/mod.rs index f6e4627ab2c80..3e82e8f3e5bcd 100644 --- a/ballista/rust/core/src/serde/logical_plan/mod.rs +++ b/ballista/rust/core/src/serde/logical_plan/mod.rs @@ -850,7 +850,7 @@ mod roundtrip_tests { use core::panic; use datafusion::datasource::listing::ListingTable; use datafusion::datasource::object_store::{ - FileMetaStream, ListEntryStream, ObjectReader, ObjectStore, SizedFile, + FileMetaStream, ListEntryStream, ObjectReaderWrapper, ObjectStore, SizedFile, }; use datafusion::error::DataFusionError; use datafusion::{ @@ -895,7 +895,7 @@ mod roundtrip_tests { fn file_reader( &self, _file: SizedFile, - ) -> datafusion::error::Result> { + ) -> datafusion::error::Result { Err(DataFusionError::NotImplemented( "this is only a test object store".to_string(), )) diff --git a/datafusion/src/datasource/file_format/avro.rs b/datafusion/src/datasource/file_format/avro.rs index fa02d1ae28336..e926c0a7dc739 100644 --- a/datafusion/src/datasource/file_format/avro.rs +++ b/datafusion/src/datasource/file_format/avro.rs @@ -27,7 +27,7 @@ use futures::StreamExt; use super::FileFormat; use crate::avro_to_arrow::read_avro_schema_from_reader; -use crate::datasource::object_store::{ObjectReader, ObjectReaderStream}; +use crate::datasource::object_store::{ObjectReaderStream, ObjectReaderWrapper}; use crate::error::Result; use crate::logical_plan::Expr; use crate::physical_plan::file_format::{AvroExec, FileScanConfig}; @@ -49,7 +49,7 @@ impl FileFormat for AvroFormat { async fn infer_schema(&self, mut readers: ObjectReaderStream) -> Result { let mut schemas = vec![]; while let Some(obj_reader) = readers.next().await { - let mut reader = obj_reader?.sync_reader()?; + let mut reader = obj_reader?; let schema = read_avro_schema_from_reader(&mut reader)?; schemas.push(schema); } @@ -57,7 +57,7 @@ impl FileFormat for AvroFormat { Ok(Arc::new(merged_schema)) } - async fn infer_stats(&self, _reader: Arc) -> Result { + async fn infer_stats(&self, _reader: ObjectReaderWrapper) -> Result { Ok(Statistics::default()) } diff --git a/datafusion/src/datasource/file_format/csv.rs b/datafusion/src/datasource/file_format/csv.rs index 6aa0d21235a43..85d0e0bd86695 100644 --- a/datafusion/src/datasource/file_format/csv.rs +++ b/datafusion/src/datasource/file_format/csv.rs @@ -26,7 +26,7 @@ use async_trait::async_trait; use futures::StreamExt; use super::FileFormat; -use crate::datasource::object_store::{ObjectReader, ObjectReaderStream}; +use crate::datasource::object_store::{ObjectReaderStream, ObjectReaderWrapper}; use crate::error::Result; use crate::logical_plan::Expr; use crate::physical_plan::file_format::{CsvExec, FileScanConfig}; @@ -98,7 +98,7 @@ impl FileFormat for CsvFormat { let mut records_to_read = self.schema_infer_max_rec.unwrap_or(std::usize::MAX); while let Some(obj_reader) = readers.next().await { - let mut reader = obj_reader?.sync_reader()?; + let mut reader = obj_reader?; let (schema, records_read) = arrow::csv::reader::infer_reader_schema( &mut reader, self.delimiter, @@ -119,7 +119,7 @@ impl FileFormat for CsvFormat { Ok(Arc::new(merged_schema)) } - async fn infer_stats(&self, _reader: Arc) -> Result { + async fn infer_stats(&self, _reader: ObjectReaderWrapper) -> Result { Ok(Statistics::default()) } diff --git a/datafusion/src/datasource/file_format/json.rs b/datafusion/src/datasource/file_format/json.rs index bdd5ef81d5592..b23f13375e23d 100644 --- a/datafusion/src/datasource/file_format/json.rs +++ b/datafusion/src/datasource/file_format/json.rs @@ -30,7 +30,7 @@ use futures::StreamExt; use super::FileFormat; use super::FileScanConfig; -use crate::datasource::object_store::{ObjectReader, ObjectReaderStream}; +use crate::datasource::object_store::{ObjectReaderStream, ObjectReaderWrapper}; use crate::error::Result; use crate::logical_plan::Expr; use crate::physical_plan::file_format::NdJsonExec; @@ -64,7 +64,7 @@ impl FileFormat for JsonFormat { let mut schemas = Vec::new(); let mut records_to_read = self.schema_infer_max_rec.unwrap_or(usize::MAX); while let Some(obj_reader) = readers.next().await { - let mut reader = BufReader::new(obj_reader?.sync_reader()?); + let mut reader = BufReader::new(obj_reader?); let iter = ValueIter::new(&mut reader, None); let schema = infer_json_schema_from_iterator(iter.take_while(|_| { let should_take = records_to_read > 0; @@ -81,7 +81,7 @@ impl FileFormat for JsonFormat { Ok(Arc::new(schema)) } - async fn infer_stats(&self, _reader: Arc) -> Result { + async fn infer_stats(&self, _reader: ObjectReaderWrapper) -> Result { Ok(Statistics::default()) } diff --git a/datafusion/src/datasource/file_format/mod.rs b/datafusion/src/datasource/file_format/mod.rs index 21da2e1e6a276..293f919b3c0fc 100644 --- a/datafusion/src/datasource/file_format/mod.rs +++ b/datafusion/src/datasource/file_format/mod.rs @@ -32,9 +32,10 @@ use crate::logical_plan::Expr; use crate::physical_plan::file_format::FileScanConfig; use crate::physical_plan::{ExecutionPlan, Statistics}; +use crate::datasource::object_store::ObjectReaderWrapper; use async_trait::async_trait; -use super::object_store::{ObjectReader, ObjectReaderStream}; +use super::object_store::ObjectReaderStream; /// This trait abstracts all the file format specific implementations /// from the `TableProvider`. This helps code re-utilization accross @@ -53,7 +54,7 @@ pub trait FileFormat: Send + Sync + fmt::Debug { /// Infer the statistics for the provided object. The cost and accuracy of the /// estimated statistics might vary greatly between file formats. - async fn infer_stats(&self, reader: Arc) -> Result; + async fn infer_stats(&self, reader: ObjectReaderWrapper) -> Result; /// Take a list of files and convert it to the appropriate executor /// according to this file format. diff --git a/datafusion/src/datasource/file_format/parquet.rs b/datafusion/src/datasource/file_format/parquet.rs index 4afb2f54c3abc..aea4ce61392c3 100644 --- a/datafusion/src/datasource/file_format/parquet.rs +++ b/datafusion/src/datasource/file_format/parquet.rs @@ -18,7 +18,7 @@ //! Parquet format abstractions use std::any::Any; -use std::io::Read; +use std::io::SeekFrom; use std::sync::Arc; use arrow::datatypes::Schema; @@ -40,7 +40,7 @@ use crate::arrow::array::{ BooleanArray, Float32Array, Float64Array, Int32Array, Int64Array, }; use crate::arrow::datatypes::{DataType, Field}; -use crate::datasource::object_store::{ObjectReader, ObjectReaderStream}; +use crate::datasource::object_store::{ObjectReaderStream, ObjectReaderWrapper}; use crate::datasource::{create_max_min_accs, get_col_stats}; use crate::error::DataFusionError; use crate::error::Result; @@ -98,7 +98,7 @@ impl FileFormat for ParquetFormat { Ok(Arc::new(merged_schema)) } - async fn infer_stats(&self, reader: Arc) -> Result { + async fn infer_stats(&self, reader: ObjectReaderWrapper) -> Result { let stats = fetch_statistics(reader)?; Ok(stats) } @@ -268,9 +268,8 @@ fn summarize_min_max( } /// Read and parse the schema of the Parquet file at location `path` -fn fetch_schema(object_reader: Arc) -> Result { - let obj_reader = ChunkObjectReader(object_reader); - let file_reader = Arc::new(SerializedFileReader::new(obj_reader)?); +fn fetch_schema(object_reader: ObjectReaderWrapper) -> Result { + let file_reader = Arc::new(SerializedFileReader::new(object_reader)?); let mut arrow_reader = ParquetFileArrowReader::new(file_reader); let schema = arrow_reader.get_schema()?; @@ -278,9 +277,8 @@ fn fetch_schema(object_reader: Arc) -> Result { } /// Read and parse the statistics of the Parquet file at location `path` -fn fetch_statistics(object_reader: Arc) -> Result { - let obj_reader = ChunkObjectReader(object_reader); - let file_reader = Arc::new(SerializedFileReader::new(obj_reader)?); +fn fetch_statistics(object_reader: ObjectReaderWrapper) -> Result { + let file_reader = Arc::new(SerializedFileReader::new(object_reader)?); let mut arrow_reader = ParquetFileArrowReader::new(file_reader); let schema = arrow_reader.get_schema()?; let num_fields = schema.fields().len(); @@ -336,22 +334,21 @@ fn fetch_statistics(object_reader: Arc) -> Result Ok(statistics) } -/// A wrapper around the object reader to make it implement `ChunkReader` -pub struct ChunkObjectReader(pub Arc); - -impl Length for ChunkObjectReader { +impl Length for ObjectReaderWrapper { fn len(&self) -> u64 { - self.0.length() + self.0.lock().length() } } -impl ChunkReader for ChunkObjectReader { - type T = Box; +impl ChunkReader for ObjectReaderWrapper { + type T = Self; fn get_read(&self, start: u64, length: usize) -> ParquetResult { - self.0 - .sync_chunk_reader(start, length) - .map_err(|e| ParquetError::ArrowError(e.to_string())) + let mut r = self.0.lock(); + r.seek(SeekFrom::Start(start)) + .map_err(|e| ParquetError::ArrowError(e.to_string()))?; + r.set_limit(length); + Ok(self.clone()) } } diff --git a/datafusion/src/datasource/object_store/local.rs b/datafusion/src/datasource/object_store/local.rs index edfe5e2cecd6a..4bbe44fa9cbfe 100644 --- a/datafusion/src/datasource/object_store/local.rs +++ b/datafusion/src/datasource/object_store/local.rs @@ -23,9 +23,11 @@ use std::sync::Arc; use async_trait::async_trait; use futures::{stream, AsyncRead, StreamExt}; +use parking_lot::Mutex; use crate::datasource::object_store::{ - FileMeta, FileMetaStream, ListEntryStream, ObjectReader, ObjectStore, + FileMeta, FileMetaStream, ListEntryStream, ObjectReader, ObjectReaderWrapper, + ObjectStore, }; use crate::datasource::PartitionedFile; use crate::error::{DataFusionError, Result}; @@ -55,18 +57,46 @@ impl ObjectStore for LocalFileSystem { todo!() } - fn file_reader(&self, file: SizedFile) -> Result> { - Ok(Arc::new(LocalFileReader::new(file)?)) + fn file_reader(&self, file: SizedFile) -> Result { + Ok(ObjectReaderWrapper(Arc::new(Mutex::new( + LocalFileReader::new(file)?, + )))) } } struct LocalFileReader { - file: SizedFile, + r: BufReader, + total_size: u64, + limit: usize, } impl LocalFileReader { fn new(file: SizedFile) -> Result { - Ok(Self { file }) + Ok(Self { + r: BufReader::new(File::open(file.path)?), + total_size: file.size, + limit: file.size as usize, + }) + } +} + +impl Read for LocalFileReader { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + // read from current position to limit + if self.limit > 0 { + let read_len = std::cmp::min(self.limit, buf.len()); + let read_len = self.r.read(&mut buf[..read_len])?; + self.limit -= read_len; + Ok(read_len) + } else { + Ok(0) + } + } +} + +impl Seek for LocalFileReader { + fn seek(&mut self, pos: SeekFrom) -> std::io::Result { + self.r.seek(pos) } } @@ -82,23 +112,12 @@ impl ObjectReader for LocalFileReader { ) } - fn sync_chunk_reader( - &self, - start: u64, - length: usize, - ) -> Result> { - // A new file descriptor is opened for each chunk reader. - // This okay because chunks are usually fairly large. - let mut file = File::open(&self.file.path)?; - file.seek(SeekFrom::Start(start))?; - - let file = BufReader::new(file.take(length as u64)); - - Ok(Box::new(file)) + fn set_limit(&mut self, limit: usize) { + self.limit = limit; } fn length(&self) -> u64 { - self.file.size + self.total_size } } @@ -167,7 +186,7 @@ pub fn local_object_reader_stream(files: Vec) -> ObjectReaderStream { } /// Helper method to convert a file location to a `LocalFileReader` -pub fn local_object_reader(file: String) -> Arc { +pub fn local_object_reader(file: String) -> ObjectReaderWrapper { LocalFileSystem .file_reader(local_unpartitioned_file(file).file_meta.sized_file) .expect("File not found") diff --git a/datafusion/src/datasource/object_store/mod.rs b/datafusion/src/datasource/object_store/mod.rs index aad70e70a3087..e6929861848d7 100644 --- a/datafusion/src/datasource/object_store/mod.rs +++ b/datafusion/src/datasource/object_store/mod.rs @@ -19,10 +19,10 @@ pub mod local; -use parking_lot::RwLock; +use parking_lot::{Mutex, RwLock}; use std::collections::HashMap; use std::fmt::{self, Debug}; -use std::io::Read; +use std::io::{Read, Seek}; use std::pin::Pin; use std::sync::Arc; @@ -39,27 +39,34 @@ use crate::error::{DataFusionError, Result}; /// Note that the dynamic dispatch on the reader might /// have some performance impacts. #[async_trait] -pub trait ObjectReader: Send + Sync { +pub trait ObjectReader: Read + Seek + Send { /// Get reader for a part [start, start + length] in the file asynchronously async fn chunk_reader(&self, start: u64, length: usize) -> Result>; - /// Get reader for a part [start, start + length] in the file - fn sync_chunk_reader( - &self, - start: u64, - length: usize, - ) -> Result>; - - /// Get reader for the entire file - fn sync_reader(&self) -> Result> { - self.sync_chunk_reader(0, self.length() as usize) - } + /// Set the max number of bytes to be read from the underlying file, until it's reset. + /// Imitate [`std::io::Read::take`] since we are not [`Sized`] + fn set_limit(&mut self, limit: usize); - /// Get the size of the file + /// Total length of the underlying file. It's currently only used by Parquet reader + /// to read metadata from the end. fn length(&self) -> u64; } +#[derive(Clone)] +/// A wrapper over ObjectReader that reads file contents out. +/// +/// Note: We use Arc> over the reader mainly to reuse the same underlying +/// file handle while conforming to Parquet ChunkReader's [`parquet::file::reader::ChunkReader::get_read`] +/// over immutable reference. +pub struct ObjectReaderWrapper(pub Arc>); + +impl Read for ObjectReaderWrapper { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + self.0.lock().read(buf) + } +} + /// Represents a specific file or a prefix (folder) that may /// require further resolution #[derive(Debug)] @@ -123,7 +130,7 @@ pub type ListEntryStream = /// Stream readers opened on a given object store pub type ObjectReaderStream = - Pin>> + Send + Sync>>; + Pin> + Send + Sync>>; /// A ObjectStore abstracts access to an underlying file/object storage. /// It maps strings (e.g. URLs, filesystem paths, etc) to sources of bytes @@ -158,7 +165,7 @@ pub trait ObjectStore: Sync + Send + Debug { ) -> Result; /// Get object reader for one file - fn file_reader(&self, file: SizedFile) -> Result>; + fn file_reader(&self, file: SizedFile) -> Result; } static LOCAL_SCHEME: &str = "file"; diff --git a/datafusion/src/physical_plan/file_format/file_stream.rs b/datafusion/src/physical_plan/file_format/file_stream.rs index 958b1721bb392..09013b218715c 100644 --- a/datafusion/src/physical_plan/file_format/file_stream.rs +++ b/datafusion/src/physical_plan/file_format/file_stream.rs @@ -21,6 +21,7 @@ //! Note: Most traits here need to be marked `Sync + Send` to be //! compliant with the `SendableRecordBatchStream` trait. +use crate::datasource::object_store::ObjectReaderWrapper; use crate::{ datasource::{object_store::ObjectStore, PartitionedFile}, physical_plan::RecordBatchStream, @@ -33,7 +34,6 @@ use arrow::{ }; use futures::Stream; use std::{ - io::Read, iter, pin::Pin, sync::Arc, @@ -48,15 +48,12 @@ pub type BatchIter = Box> + Send + /// A closure that creates a file format reader (iterator over `RecordBatch`) from a `Read` object /// and an optional number of required records. pub trait FormatReaderOpener: - FnMut(Box, &Option) -> BatchIter + Send + Unpin + 'static + FnMut(ObjectReaderWrapper, &Option) -> BatchIter + Send + Unpin + 'static { } impl FormatReaderOpener for T where - T: FnMut(Box, &Option) -> BatchIter - + Send - + Unpin - + 'static + T: FnMut(ObjectReaderWrapper, &Option) -> BatchIter + Send + Unpin + 'static { } @@ -123,7 +120,6 @@ impl FileStream { self.partition_values = f.partition_values; self.object_store .file_reader(f.file_meta.sized_file) - .and_then(|r| r.sync_reader()) .map_err(|e| ArrowError::ExternalError(Box::new(e))) .and_then(|f| { self.batch_iter = (self.file_reader)(f, &self.remain); diff --git a/datafusion/src/physical_plan/file_format/parquet.rs b/datafusion/src/physical_plan/file_format/parquet.rs index 1ae3012464e87..b2b9173f72117 100644 --- a/datafusion/src/physical_plan/file_format/parquet.rs +++ b/datafusion/src/physical_plan/file_format/parquet.rs @@ -21,7 +21,6 @@ use std::fmt; use std::sync::Arc; use std::{any::Any, convert::TryInto}; -use crate::datasource::file_format::parquet::ChunkObjectReader; use crate::datasource::object_store::ObjectStore; use crate::datasource::PartitionedFile; use crate::physical_plan::expressions::PhysicalSortExpr; @@ -462,8 +461,7 @@ fn read_partition( ); let object_reader = object_store.file_reader(partitioned_file.file_meta.sized_file.clone())?; - let mut file_reader = - SerializedFileReader::new(ChunkObjectReader(object_reader))?; + let mut file_reader = SerializedFileReader::new(object_reader)?; if let Some(pruning_predicate) = pruning_predicate { let row_group_predicate = build_row_group_predicate( pruning_predicate, @@ -960,10 +958,7 @@ mod tests { let mut results = parquet_exec.execute(0, runtime).await?; let batch = results.next().await.unwrap(); // invalid file should produce an error to that effect - assert_contains!( - batch.unwrap_err().to_string(), - "External error: Parquet error: Arrow: IO error" - ); + assert_contains!(batch.unwrap_err().to_string(), "External error: IO error"); assert!(results.next().await.is_none()); Ok(()) diff --git a/datafusion/src/test/object_store.rs b/datafusion/src/test/object_store.rs index e93b4cd2d410d..452c5874d093c 100644 --- a/datafusion/src/test/object_store.rs +++ b/datafusion/src/test/object_store.rs @@ -16,12 +16,10 @@ // under the License. //! Object store implem used for testing -use std::{ - io, - io::{Cursor, Read}, - sync::Arc, -}; +use std::io::Seek; +use std::{io, io::Read, sync::Arc}; +use crate::datasource::object_store::ObjectReaderWrapper; use crate::{ datasource::object_store::{ FileMeta, FileMetaStream, ListEntryStream, ObjectReader, ObjectStore, SizedFile, @@ -30,6 +28,7 @@ use crate::{ }; use async_trait::async_trait; use futures::{stream, AsyncRead, StreamExt}; +use parking_lot::Mutex; #[derive(Debug)] /// An object store implem that is useful for testing. @@ -78,11 +77,11 @@ impl ObjectStore for TestObjectStore { unimplemented!() } - fn file_reader(&self, file: SizedFile) -> Result> { + fn file_reader(&self, file: SizedFile) -> Result { match self.files.iter().find(|item| file.path == item.0) { - Some((_, size)) if *size == file.size => { - Ok(Arc::new(EmptyObjectReader(*size))) - } + Some((_, size)) if *size == file.size => Ok(ObjectReaderWrapper(Arc::new( + Mutex::new(EmptyObjectReader(*size)), + ))), Some(_) => Err(DataFusionError::IoError(io::Error::new( io::ErrorKind::NotFound, "found in test list but wrong size", @@ -97,6 +96,19 @@ impl ObjectStore for TestObjectStore { struct EmptyObjectReader(u64); +impl Read for EmptyObjectReader { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + buf.fill(0); + Ok(buf.len()) + } +} + +impl Seek for EmptyObjectReader { + fn seek(&mut self, _pos: io::SeekFrom) -> io::Result { + Ok(0) + } +} + #[async_trait] impl ObjectReader for EmptyObjectReader { async fn chunk_reader( @@ -107,12 +119,8 @@ impl ObjectReader for EmptyObjectReader { unimplemented!() } - fn sync_chunk_reader( - &self, - _start: u64, - _length: usize, - ) -> Result> { - Ok(Box::new(Cursor::new(vec![0; self.0 as usize]))) + fn set_limit(&mut self, limit: usize) { + self.0 = limit as u64; } fn length(&self) -> u64 { diff --git a/datafusion/tests/path_partition.rs b/datafusion/tests/path_partition.rs index 178e318775c9f..1d88969c38a85 100644 --- a/datafusion/tests/path_partition.rs +++ b/datafusion/tests/path_partition.rs @@ -20,6 +20,7 @@ use std::{fs, io, sync::Arc}; use async_trait::async_trait; +use datafusion::datasource::object_store::ObjectReaderWrapper; use datafusion::{ assert_batches_sorted_eq, datasource::{ @@ -27,7 +28,7 @@ use datafusion::{ listing::{ListingOptions, ListingTable, ListingTableConfig}, object_store::{ local::LocalFileSystem, FileMeta, FileMetaStream, ListEntryStream, - ObjectReader, ObjectStore, SizedFile, + ObjectStore, SizedFile, }, }, error::{DataFusionError, Result}, @@ -376,7 +377,7 @@ impl ObjectStore for MirroringObjectStore { unimplemented!() } - fn file_reader(&self, file: SizedFile) -> Result> { + fn file_reader(&self, file: SizedFile) -> Result { assert_eq!( self.file_size, file.size, "Requested files should have the same size as the mirrored file"