Skip to content

Commit 8a193c2

Browse files
authored
fix: describe Parquet schema with coerce_int96 (#15750)
* fix: parquet coerce_int96 schema * move test to parquet.slt * update based on comphead's suggestion
1 parent f07fb10 commit 8a193c2

File tree

3 files changed

+35
-3
lines changed

3 files changed

+35
-3
lines changed

datafusion/datasource-parquet/src/file_format.rs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan};
6161
use datafusion_session::Session;
6262

6363
use crate::can_expr_be_pushed_down_with_schemas;
64-
use crate::source::ParquetSource;
64+
use crate::source::{parse_coerce_int96_string, ParquetSource};
6565
use async_trait::async_trait;
6666
use bytes::Bytes;
6767
use datafusion_datasource::source::DataSourceExec;
@@ -304,9 +304,10 @@ async fn fetch_schema_with_location(
304304
store: &dyn ObjectStore,
305305
file: &ObjectMeta,
306306
metadata_size_hint: Option<usize>,
307+
coerce_int96: Option<TimeUnit>,
307308
) -> Result<(Path, Schema)> {
308309
let loc_path = file.location.clone();
309-
let schema = fetch_schema(store, file, metadata_size_hint).await?;
310+
let schema = fetch_schema(store, file, metadata_size_hint, coerce_int96).await?;
310311
Ok((loc_path, schema))
311312
}
312313

@@ -337,12 +338,17 @@ impl FileFormat for ParquetFormat {
337338
store: &Arc<dyn ObjectStore>,
338339
objects: &[ObjectMeta],
339340
) -> Result<SchemaRef> {
341+
let coerce_int96 = match self.coerce_int96() {
342+
Some(time_unit) => Some(parse_coerce_int96_string(time_unit.as_str())?),
343+
None => None,
344+
};
340345
let mut schemas: Vec<_> = futures::stream::iter(objects)
341346
.map(|object| {
342347
fetch_schema_with_location(
343348
store.as_ref(),
344349
object,
345350
self.metadata_size_hint(),
351+
coerce_int96,
346352
)
347353
})
348354
.boxed() // Workaround https://github.com/rust-lang/rust/issues/64552
@@ -825,13 +831,19 @@ async fn fetch_schema(
825831
store: &dyn ObjectStore,
826832
file: &ObjectMeta,
827833
metadata_size_hint: Option<usize>,
834+
coerce_int96: Option<TimeUnit>,
828835
) -> Result<Schema> {
829836
let metadata = fetch_parquet_metadata(store, file, metadata_size_hint).await?;
830837
let file_metadata = metadata.file_metadata();
831838
let schema = parquet_to_arrow_schema(
832839
file_metadata.schema_descr(),
833840
file_metadata.key_value_metadata(),
834841
)?;
842+
let schema = coerce_int96
843+
.and_then(|time_unit| {
844+
coerce_int96_to_resolution(file_metadata.schema_descr(), &schema, &time_unit)
845+
})
846+
.unwrap_or(schema);
835847
Ok(schema)
836848
}
837849

datafusion/datasource-parquet/src/source.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -439,7 +439,9 @@ impl ParquetSource {
439439
}
440440

441441
/// Parses datafusion.common.config.ParquetOptions.coerce_int96 String to a arrow_schema.datatype.TimeUnit
442-
fn parse_coerce_int96_string(str_setting: &str) -> datafusion_common::Result<TimeUnit> {
442+
pub(crate) fn parse_coerce_int96_string(
443+
str_setting: &str,
444+
) -> datafusion_common::Result<TimeUnit> {
443445
let str_setting_lower: &str = &str_setting.to_lowercase();
444446

445447
match str_setting_lower {

datafusion/sqllogictest/test_files/parquet.slt

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -629,3 +629,21 @@ physical_plan
629629

630630
statement ok
631631
drop table foo
632+
633+
634+
statement ok
635+
set datafusion.execution.parquet.coerce_int96 = ms;
636+
637+
statement ok
638+
CREATE EXTERNAL TABLE int96_from_spark
639+
STORED AS PARQUET
640+
LOCATION '../../parquet-testing/data/int96_from_spark.parquet';
641+
642+
# Print schema
643+
query TTT
644+
describe int96_from_spark;
645+
----
646+
a Timestamp(Millisecond, None) YES
647+
648+
statement ok
649+
set datafusion.execution.parquet.coerce_int96 = ns;

0 commit comments

Comments
 (0)