Skip to content

Commit bc3d8f4

Browse files
committed
fix default schema adapter for arrow
1 parent fc538fb commit bc3d8f4

File tree

3 files changed

+125
-16
lines changed

3 files changed

+125
-16
lines changed

datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
use arrow::datatypes::SchemaRef;
1919
use arrow::{array::RecordBatch, compute::concat_batches};
2020
use datafusion::{datasource::object_store::ObjectStoreUrl, physical_plan::PhysicalExpr};
21-
use datafusion_common::{config::ConfigOptions, internal_err, Result, Statistics};
21+
use datafusion_common::{config::ConfigOptions, internal_err, Result};
2222
use datafusion_datasource::{
2323
file::FileSource, file_scan_config::FileScanConfig,
2424
file_scan_config::FileScanConfigBuilder, file_stream::FileOpenFuture,

datafusion/datasource-arrow/src/file_format.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -208,14 +208,21 @@ impl FileFormat for ArrowFormat {
208208
conf.table_partition_cols().clone(),
209209
);
210210

211-
let source: Arc<dyn FileSource> =
211+
let mut source: Arc<dyn FileSource> =
212212
match is_object_in_arrow_ipc_file_format(object_store, object_location).await
213213
{
214214
Ok(true) => Arc::new(ArrowSource::new_file_source(table_schema)),
215215
Ok(false) => Arc::new(ArrowSource::new_stream_file_source(table_schema)),
216216
Err(e) => Err(e)?,
217217
};
218218

219+
// Preserve projection from the original file source
220+
if let Some(projection) = conf.file_source.projection() {
221+
if let Some(new_source) = source.try_pushdown_projection(projection)? {
222+
source = new_source;
223+
}
224+
}
225+
219226
let config = FileScanConfigBuilder::from(conf)
220227
.with_source(source)
221228
.build();

datafusion/datasource-arrow/src/source.rs

Lines changed: 116 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,13 @@
3434
use std::sync::Arc;
3535
use std::{any::Any, io::Cursor};
3636

37-
use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
37+
use datafusion_datasource::schema_adapter::{
38+
DefaultSchemaAdapterFactory, SchemaAdapterFactory,
39+
};
3840
use datafusion_datasource::{as_file_source, TableSchema};
3941

4042
use arrow::buffer::Buffer;
43+
use arrow::datatypes::SchemaRef;
4144
use arrow::ipc::reader::{FileDecoder, FileReader, StreamReader};
4245
use datafusion_common::error::Result;
4346
use datafusion_common::exec_datafusion_err;
@@ -51,7 +54,7 @@ use datafusion_physical_plan::projection::ProjectionExprs;
5154

5255
use datafusion_datasource::file_stream::FileOpenFuture;
5356
use datafusion_datasource::file_stream::FileOpener;
54-
use futures::StreamExt;
57+
use futures::{StreamExt, TryStreamExt};
5558
use itertools::Itertools;
5659
use object_store::{GetOptions, GetRange, GetResultPayload, ObjectStore};
5760

@@ -68,6 +71,8 @@ enum ArrowFormat {
6871
pub(crate) struct ArrowStreamFileOpener {
6972
object_store: Arc<dyn ObjectStore>,
7073
projection: Option<Vec<usize>>,
74+
projected_schema: Option<SchemaRef>,
75+
schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
7176
}
7277

7378
impl FileOpener for ArrowStreamFileOpener {
@@ -79,27 +84,52 @@ impl FileOpener for ArrowStreamFileOpener {
7984
}
8085
let object_store = Arc::clone(&self.object_store);
8186
let projection = self.projection.clone();
87+
let projected_schema = self.projected_schema.clone();
88+
let schema_adapter_factory = self.schema_adapter_factory.clone();
89+
8290
Ok(Box::pin(async move {
8391
let r = object_store
8492
.get(&partitioned_file.object_meta.location)
8593
.await?;
86-
match r.payload {
94+
95+
let stream = match r.payload {
8796
#[cfg(not(target_arch = "wasm32"))]
88-
GetResultPayload::File(file, _) => Ok(futures::stream::iter(
97+
GetResultPayload::File(file, _) => futures::stream::iter(
8998
StreamReader::try_new(file.try_clone()?, projection.clone())?,
9099
)
91100
.map(|r| r.map_err(Into::into))
92-
.boxed()),
101+
.boxed(),
93102
GetResultPayload::Stream(_) => {
94103
let bytes = r.bytes().await?;
95104
let cursor = Cursor::new(bytes);
96-
Ok(futures::stream::iter(StreamReader::try_new(
105+
futures::stream::iter(StreamReader::try_new(
97106
cursor,
98107
projection.clone(),
99108
)?)
100109
.map(|r| r.map_err(Into::into))
101-
.boxed())
110+
.boxed()
102111
}
112+
};
113+
114+
// If we have a schema adapter factory and projected schema, use them to normalize the schema
115+
if let (Some(factory), Some(proj_schema)) =
116+
(schema_adapter_factory, projected_schema)
117+
{
118+
Ok(stream
119+
.and_then(move |batch| {
120+
let factory = factory.clone();
121+
let proj_schema = proj_schema.clone();
122+
async move {
123+
let schema_adapter =
124+
factory.create_with_projected_schema(proj_schema);
125+
let (schema_mapper, _) =
126+
schema_adapter.map_schema(batch.schema().as_ref())?;
127+
schema_mapper.map_batch(batch)
128+
}
129+
})
130+
.boxed())
131+
} else {
132+
Ok(stream)
103133
}
104134
}))
105135
}
@@ -109,36 +139,62 @@ impl FileOpener for ArrowStreamFileOpener {
109139
pub(crate) struct ArrowFileOpener {
110140
object_store: Arc<dyn ObjectStore>,
111141
projection: Option<Vec<usize>>,
142+
projected_schema: Option<SchemaRef>,
143+
schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
112144
}
113145

114146
impl FileOpener for ArrowFileOpener {
115147
fn open(&self, partitioned_file: PartitionedFile) -> Result<FileOpenFuture> {
116148
let object_store = Arc::clone(&self.object_store);
117149
let projection = self.projection.clone();
150+
let projected_schema = self.projected_schema.clone();
151+
let schema_adapter_factory = self.schema_adapter_factory.clone();
152+
118153
Ok(Box::pin(async move {
119154
let range = partitioned_file.range.clone();
120155
match range {
121156
None => {
122157
let r = object_store
123158
.get(&partitioned_file.object_meta.location)
124159
.await?;
125-
match r.payload {
160+
let stream = match r.payload {
126161
#[cfg(not(target_arch = "wasm32"))]
127-
GetResultPayload::File(file, _) => Ok(futures::stream::iter(
162+
GetResultPayload::File(file, _) => futures::stream::iter(
128163
FileReader::try_new(file.try_clone()?, projection.clone())?,
129164
)
130165
.map(|r| r.map_err(Into::into))
131-
.boxed()),
166+
.boxed(),
132167
GetResultPayload::Stream(_) => {
133168
let bytes = r.bytes().await?;
134169
let cursor = Cursor::new(bytes);
135-
Ok(futures::stream::iter(FileReader::try_new(
170+
futures::stream::iter(FileReader::try_new(
136171
cursor,
137172
projection.clone(),
138173
)?)
139174
.map(|r| r.map_err(Into::into))
140-
.boxed())
175+
.boxed()
141176
}
177+
};
178+
179+
// Apply schema adaptation if available
180+
if let (Some(factory), Some(proj_schema)) =
181+
(schema_adapter_factory, projected_schema)
182+
{
183+
Ok(stream
184+
.and_then(move |batch| {
185+
let factory = factory.clone();
186+
let proj_schema = proj_schema.clone();
187+
async move {
188+
let schema_adapter =
189+
factory.create_with_projected_schema(proj_schema);
190+
let (schema_mapper, _) = schema_adapter
191+
.map_schema(batch.schema().as_ref())?;
192+
schema_mapper.map_batch(batch)
193+
}
194+
})
195+
.boxed())
196+
} else {
197+
Ok(stream)
142198
}
143199
}
144200
Some(range) => {
@@ -228,7 +284,7 @@ impl FileOpener for ArrowFileOpener {
228284
)
229285
.await?;
230286

231-
Ok(futures::stream::iter(
287+
let stream = futures::stream::iter(
232288
recordbatches
233289
.into_iter()
234290
.zip(recordbatch_results)
@@ -239,7 +295,29 @@ impl FileOpener for ArrowFileOpener {
239295
}),
240296
)
241297
.map(|r| r.map_err(Into::into))
242-
.boxed())
298+
.boxed();
299+
300+
// Apply schema adaptation if available
301+
if let (Some(factory), Some(proj_schema)) =
302+
(schema_adapter_factory, projected_schema)
303+
{
304+
Ok(stream
305+
.and_then(move |batch| {
306+
let factory = factory.clone();
307+
let proj_schema = proj_schema.clone();
308+
async move {
309+
let schema_adapter =
310+
factory.create_with_projected_schema(proj_schema);
311+
let (schema_mapper, projection) = schema_adapter
312+
.map_schema(batch.schema().as_ref())?;
313+
let batch = batch.project(&projection)?;
314+
schema_mapper.map_batch(batch)
315+
}
316+
})
317+
.boxed())
318+
} else {
319+
Ok(stream)
320+
}
243321
}
244322
}
245323
}))
@@ -290,14 +368,32 @@ impl FileSource for ArrowSource {
290368
_partition: usize,
291369
) -> Result<Arc<dyn FileOpener>> {
292370
let split_projection = self.projection.clone();
371+
// For schema adaptation, we only use the file schema (not partition columns)
372+
let projected_file_schema = SchemaRef::from(
373+
self.table_schema
374+
.file_schema()
375+
.project(&split_projection.file_indices)?,
376+
);
377+
378+
// Use provided schema adapter factory, or default to DefaultSchemaAdapterFactory
379+
// This ensures schema normalization (removing metadata differences) happens during execution
380+
let schema_adapter_factory = self
381+
.schema_adapter_factory
382+
.clone()
383+
.unwrap_or_else(|| Arc::new(DefaultSchemaAdapterFactory));
384+
293385
let opener: Arc<dyn FileOpener> = match self.format {
294386
ArrowFormat::File => Arc::new(ArrowFileOpener {
295387
object_store,
296388
projection: Some(split_projection.file_indices.clone()),
389+
projected_schema: Some(projected_file_schema.clone()),
390+
schema_adapter_factory: Some(schema_adapter_factory),
297391
}),
298392
ArrowFormat::Stream => Arc::new(ArrowStreamFileOpener {
299393
object_store,
300394
projection: Some(split_projection.file_indices.clone()),
395+
projected_schema: Some(projected_file_schema),
396+
schema_adapter_factory: Some(schema_adapter_factory),
301397
}),
302398
};
303399
Ok(ProjectionOpener::new(split_projection, opener)?)
@@ -429,6 +525,8 @@ impl ArrowOpener {
429525
inner: Arc::new(ArrowFileOpener {
430526
object_store,
431527
projection,
528+
projected_schema: None,
529+
schema_adapter_factory: None,
432530
}),
433531
}
434532
}
@@ -441,6 +539,8 @@ impl ArrowOpener {
441539
inner: Arc::new(ArrowStreamFileOpener {
442540
object_store,
443541
projection,
542+
projected_schema: None,
543+
schema_adapter_factory: None,
444544
}),
445545
}
446546
}
@@ -636,6 +736,8 @@ mod tests {
636736
let opener = ArrowStreamFileOpener {
637737
object_store,
638738
projection: Some(vec![0]), // just the first column
739+
projected_schema: None,
740+
schema_adapter_factory: None,
639741
};
640742

641743
let mut stream = opener.open(partitioned_file)?.await?;

0 commit comments

Comments
 (0)