Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pub use datafusion_datasource_parquet::{ParquetFileMetrics, ParquetFileReaderFac

pub use json::{JsonOpener, JsonSource};

pub use arrow::{ArrowOpener, ArrowSource};
pub use arrow::{ArrowFileOpener, ArrowFileSource, ArrowStreamOpener, ArrowStreamSource};
pub use csv::{CsvOpener, CsvSource};
pub use datafusion_datasource::file::FileSource;
pub use datafusion_datasource::file_groups::FileGroup;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use bytes::{BufMut, BytesMut};
use datafusion::common::Result;
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::physical_plan::{
ArrowSource, CsvSource, FileSource, JsonSource, ParquetSource,
ArrowFileSource, CsvSource, FileSource, JsonSource, ParquetSource,
};
use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::SessionContext;
Expand Down Expand Up @@ -282,9 +282,9 @@ async fn test_multi_source_schema_adapter_reuse() -> Result<()> {
// Create a test factory
let factory = Arc::new(UppercaseAdapterFactory {});

// Test ArrowSource
// Test ArrowFileSource
{
let source = ArrowSource::default();
let source = ArrowFileSource::default();
let source_with_adapter = source
.clone()
.with_schema_adapter_factory(factory.clone())
Expand Down
266 changes: 167 additions & 99 deletions datafusion/datasource-arrow/src/file_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@ use std::any::Any;
use std::borrow::Cow;
use std::collections::HashMap;
use std::fmt::{self, Debug};
use std::io::{Seek, SeekFrom};
use std::sync::Arc;

use arrow::datatypes::{Schema, SchemaRef};
use arrow::error::ArrowError;
use arrow::ipc::convert::fb_to_schema;
use arrow::ipc::reader::FileReader;
use arrow::ipc::reader::{FileReader, StreamReader};
use arrow::ipc::writer::IpcWriteOptions;
use arrow::ipc::{root_as_message, CompressionType};
use datafusion_common::error::Result;
Expand All @@ -49,7 +50,7 @@ use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_expr::dml::InsertOp;
use datafusion_physical_expr_common::sort_expr::LexRequirement;

use crate::source::ArrowSource;
use crate::source::{ArrowFileSource, ArrowStreamSource};
use async_trait::async_trait;
use bytes::Bytes;
use datafusion_datasource::file_compression_type::FileCompressionType;
Expand All @@ -61,7 +62,7 @@ use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan};
use datafusion_session::Session;
use futures::stream::BoxStream;
use futures::StreamExt;
use object_store::{GetResultPayload, ObjectMeta, ObjectStore};
use object_store::{GetOptions, GetRange, GetResultPayload, ObjectMeta, ObjectStore};
use tokio::io::AsyncWriteExt;

/// Initial writing buffer size. Note this is just a size hint for efficiency. It
Expand Down Expand Up @@ -150,12 +151,18 @@ impl FileFormat for ArrowFormat {
let schema = match r.payload {
#[cfg(not(target_arch = "wasm32"))]
GetResultPayload::File(mut file, _) => {
let reader = FileReader::try_new(&mut file, None)?;
reader.schema()
}
GetResultPayload::Stream(stream) => {
infer_schema_from_file_stream(stream).await?
match FileReader::try_new(&mut file, None) {
Ok(reader) => reader.schema(),
Err(_) => {
// not in the file format, but FileReader read some bytes
// while trying to parse the file and so we need to rewind
// it to the beginning of the file
file.seek(SeekFrom::Start(0))?;
StreamReader::try_new(&mut file, None)?.schema()
}
}
}
GetResultPayload::Stream(stream) => infer_ipc_schema(stream).await?,
};
schemas.push(schema.as_ref().clone());
}
Expand All @@ -175,10 +182,39 @@ impl FileFormat for ArrowFormat {

async fn create_physical_plan(
&self,
_state: &dyn Session,
state: &dyn Session,
conf: FileScanConfig,
) -> Result<Arc<dyn ExecutionPlan>> {
let source = Arc::new(ArrowSource::default());
let is_stream_format = if let Some(first_group) = conf.file_groups.first() {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe worth pulling this out into a helper method that's easy to test. Also then this method reads a bit cleaner, with just a is_stream_format() check as opposed to this block of logic which is not directly relevant to creating a physical plan.

if let Some(first_file) = first_group.files().first() {
let object_store =
state.runtime_env().object_store(&conf.object_store_url)?;

let get_opts = GetOptions {
range: Some(GetRange::Bounded(0..6)),
..Default::default()
};
let result = object_store
.get_opts(&first_file.object_meta.location, get_opts)
.await?;
let bytes = result.bytes().await?;

// assume stream format if the file is too short
// or the file does not start with the magic number
bytes.len() < 6 || bytes[0..6] != ARROW_MAGIC
} else {
false // no files, default to file format
}
} else {
false // no file groups, default to file format
};

let source: Arc<dyn FileSource> = if is_stream_format {
Arc::new(ArrowStreamSource::default())
} else {
Arc::new(ArrowFileSource::default())
};

let config = FileScanConfigBuilder::from(conf)
.with_source(source)
.build();
Expand All @@ -203,7 +239,9 @@ impl FileFormat for ArrowFormat {
}

fn file_source(&self) -> Arc<dyn FileSource> {
Arc::new(ArrowSource::default())
// defaulting to the file format source since it's
// more capable in general
Arc::new(ArrowFileSource::default())
}
}

Expand Down Expand Up @@ -344,40 +382,68 @@ impl DataSink for ArrowFileSink {
}
}

// Custom implementation of inferring schema. Should eventually be moved upstream to arrow-rs.
// See <https://github.com/apache/arrow-rs/issues/5021>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't fully reviewed this PR, but just curious if you've managed to check if this code has been upstream to arrow-rs by now and we might be able to leverage it's code?


const ARROW_MAGIC: [u8; 6] = [b'A', b'R', b'R', b'O', b'W', b'1'];
const CONTINUATION_MARKER: [u8; 4] = [0xff; 4];

/// Custom implementation of inferring schema. Should eventually be moved upstream to arrow-rs.
/// See <https://github.com/apache/arrow-rs/issues/5021>
async fn infer_schema_from_file_stream(
async fn infer_ipc_schema(
mut stream: BoxStream<'static, object_store::Result<Bytes>>,
) -> Result<SchemaRef> {
// Expected format:
// <magic number "ARROW1"> - 6 bytes
// <empty padding bytes [to 8 byte boundary]> - 2 bytes
// <continuation: 0xFFFFFFFF> - 4 bytes, not present below v0.15.0
// <metadata_size: int32> - 4 bytes
// <metadata_flatbuffer: bytes>
// <rest of file bytes>

// So in first read we need at least all known sized sections,
// which is 6 + 2 + 4 + 4 = 16 bytes.
// Expected IPC format is either:
//
// stream:
// <continuation: 0xFFFFFFFF> - 4 bytes (added in v0.15.0+)
// <metadata_size: int32> - 4 bytes
// <metadata_flatbuffer: bytes>
// <rest of file bytes>
//
// file:
// <magic number "ARROW1"> - 6 bytes
// <empty padding bytes [to 8 byte boundary]> - 2 bytes
// <stream format above>

// Perform the initial read such that we always have the metadata size
let bytes = collect_at_least_n_bytes(&mut stream, 16, None).await?;

// Files should start with these magic bytes
if bytes[0..6] != ARROW_MAGIC {
return Err(ArrowError::ParseError(
"Arrow file does not contain correct header".to_string(),
))?;
}

// Since continuation marker bytes added in later versions
let (meta_len, rest_of_bytes_start_index) = if bytes[8..12] == CONTINUATION_MARKER {
(&bytes[12..16], 16)
// The preamble size is everything before the metadata size
let preamble_size = if bytes[0..6] == ARROW_MAGIC {
// File format starts with magic number "ARROW1"
if bytes[8..12] == CONTINUATION_MARKER {
// Continuation marker was added in v0.15.0
12
} else {
// File format before v0.15.0
8
}
} else if bytes[0..4] == CONTINUATION_MARKER {
// Stream format after v0.15.0 starts with continuation marker
4
} else {
(&bytes[8..12], 12)
// Stream format before v0.15.0 does not have a preamble
0
};

infer_ipc_schema_ignoring_preamble_bytes(bytes, preamble_size, stream).await
}

async fn infer_ipc_schema_ignoring_preamble_bytes(
bytes: Vec<u8>,
preamble_size: usize,
mut stream: BoxStream<'static, object_store::Result<Bytes>>,
) -> Result<SchemaRef> {
let (meta_len, rest_of_bytes_start_index): ([u8; 4], usize) = (
bytes[preamble_size..preamble_size + 4]
.try_into()
.map_err(|err| {
ArrowError::ParseError(format!(
"Unable to read IPC message as metadata length: {err:?}"
))
})?,
preamble_size + 4,
);
Comment on lines +436 to +445
Copy link

@jdcasale jdcasale Nov 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Am I reading this right that rest_of_bytes_start_index is always just preamble_size + 4?

If that's the case, it may be clearer to do two separate assignments, i,.e.

Suggested change
let (meta_len, rest_of_bytes_start_index): ([u8; 4], usize) = (
bytes[preamble_size..preamble_size + 4]
.try_into()
.map_err(|err| {
ArrowError::ParseError(format!(
"Unable to read IPC message as metadata length: {err:?}"
))
})?,
preamble_size + 4,
);
let rest_of_bytes_start_index: usize = preamble_size + 4;
let meta_len: [u8; 4] = bytes[preamble_size..rest_of_bytes_start_index]
.try_into()
.map_err(|err| {
ArrowError::ParseError(format!(
"Unable to read IPC message as metadata length: {err:?}"
))
})?;


let meta_len = [meta_len[0], meta_len[1], meta_len[2], meta_len[3]];
let meta_len = i32::from_le_bytes(meta_len);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it should be possible to (manually) manipulate the file's bytes in such a way that it produces a negative i32 here.
Then below the casting to usize will lead to problems.
What is the reason meta_len to be i32 instead of u32 ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm honestly not sure, this was from the code that was there previously. This is the PR that introduced it initially and I don't see any information about why this choice #7962 -- @Jefffrey do you recall why i32 instead of u32? I'm happy to change it but I don't understand the implications.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, maybe I was referring to the spec: https://arrow.apache.org/docs/format/Columnar.html#encapsulated-message-format

And saw it say <metadata_size: int32> so I defaulted to i32 🤔

Checking for valid i32 (aka non-negative) does sound reasonable for robustness


Expand Down Expand Up @@ -427,7 +493,8 @@ async fn collect_at_least_n_bytes(
if buf.len() < n {
return Err(ArrowError::ParseError(
"Unexpected end of byte stream for Arrow IPC file".to_string(),
))?;
)
.into());
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why?

}
Ok(buf)
}
Expand Down Expand Up @@ -524,79 +591,80 @@ mod tests {

#[tokio::test]
async fn test_infer_schema_stream() -> Result<()> {
let mut bytes = std::fs::read("tests/data/example.arrow")?;
bytes.truncate(bytes.len() - 20); // mangle end to show we don't need to read whole file
let location = Path::parse("example.arrow")?;
let in_memory_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
in_memory_store.put(&location, bytes.into()).await?;

let state = MockSession::new();
let object_meta = ObjectMeta {
location,
last_modified: DateTime::default(),
size: u64::MAX,
e_tag: None,
version: None,
};

let arrow_format = ArrowFormat {};
let expected = vec!["f0: Int64", "f1: Utf8", "f2: Boolean"];
for file in ["example.arrow", "example_stream.arrow"] {
let mut bytes = std::fs::read(format!("tests/data/{file}"))?;
bytes.truncate(bytes.len() - 20); // mangle end to show we don't need to read whole file
let location = Path::parse(file)?;
let in_memory_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
in_memory_store.put(&location, bytes.into()).await?;

let state = MockSession::new();
let object_meta = ObjectMeta {
location,
last_modified: DateTime::default(),
size: u64::MAX,
e_tag: None,
version: None,
};

// Test chunk sizes where too small so we keep having to read more bytes
// And when large enough that first read contains all we need
for chunk_size in [7, 3000] {
let store = Arc::new(ChunkedStore::new(in_memory_store.clone(), chunk_size));
let inferred_schema = arrow_format
.infer_schema(
&state,
&(store.clone() as Arc<dyn ObjectStore>),
std::slice::from_ref(&object_meta),
)
.await?;
let actual_fields = inferred_schema
.fields()
.iter()
.map(|f| format!("{}: {:?}", f.name(), f.data_type()))
.collect::<Vec<_>>();
assert_eq!(expected, actual_fields);
let arrow_format = ArrowFormat {};
let expected = vec!["f0: Int64", "f1: Utf8", "f2: Boolean"];

// Test chunk sizes where too small so we keep having to read more bytes
// And when large enough that first read contains all we need
for chunk_size in [7, 3000] {
let store =
Arc::new(ChunkedStore::new(in_memory_store.clone(), chunk_size));
let inferred_schema = arrow_format
.infer_schema(
&state,
&(store.clone() as Arc<dyn ObjectStore>),
std::slice::from_ref(&object_meta),
)
.await?;
let actual_fields = inferred_schema
.fields()
.iter()
.map(|f| format!("{}: {:?}", f.name(), f.data_type()))
.collect::<Vec<_>>();
assert_eq!(expected, actual_fields);
}
}

Ok(())
}

#[tokio::test]
async fn test_infer_schema_short_stream() -> Result<()> {
let mut bytes = std::fs::read("tests/data/example.arrow")?;
bytes.truncate(20); // should cause error that file shorter than expected
let location = Path::parse("example.arrow")?;
let in_memory_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
in_memory_store.put(&location, bytes.into()).await?;

let state = MockSession::new();
let object_meta = ObjectMeta {
location,
last_modified: DateTime::default(),
size: u64::MAX,
e_tag: None,
version: None,
};
for file in ["example.arrow", "example_stream.arrow"] {
let mut bytes = std::fs::read(format!("tests/data/{file}"))?;
bytes.truncate(20); // should cause error that file shorter than expected
let location = Path::parse(file)?;
let in_memory_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
in_memory_store.put(&location, bytes.into()).await?;

let state = MockSession::new();
let object_meta = ObjectMeta {
location,
last_modified: DateTime::default(),
size: u64::MAX,
e_tag: None,
version: None,
};

let arrow_format = ArrowFormat {};
let arrow_format = ArrowFormat {};

let store = Arc::new(ChunkedStore::new(in_memory_store.clone(), 7));
let err = arrow_format
.infer_schema(
&state,
&(store.clone() as Arc<dyn ObjectStore>),
std::slice::from_ref(&object_meta),
)
.await;
let store = Arc::new(ChunkedStore::new(in_memory_store.clone(), 7));
let err = arrow_format
.infer_schema(
&state,
&(store.clone() as Arc<dyn ObjectStore>),
std::slice::from_ref(&object_meta),
)
.await;

assert!(err.is_err());
assert_eq!(
"Arrow error: Parser error: Unexpected end of byte stream for Arrow IPC file",
err.unwrap_err().to_string().lines().next().unwrap()
);
assert!(err.is_err());
assert_eq!( "Arrow error: Parser error: Unexpected end of byte stream for Arrow IPC file", err.unwrap_err().to_string().lines().next().unwrap());
}

Ok(())
}
Expand Down
Loading