Skip to content

Commit 6c13b61

Browse files
authored
feat: arrow Interval and Duration type encoding (#275)
* feat: add interval type support * feat: implement duration and list for interval * chore: switch pgwire to released version
1 parent 4e66e42 commit 6c13b61

File tree

5 files changed

+197
-24
lines changed

5 files changed

+197
-24
lines changed

Cargo.lock

Lines changed: 15 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ bytes = "1.11.0"
1919
chrono = { version = "0.4", features = ["std"] }
2020
datafusion = { version = "52", default-features = false }
2121
futures = "0.3"
22-
pgwire = { version = "0.37", default-features = false }
22+
pgwire = { version = "0.37.1", default-features = false }
2323
postgres-types = "0.2"
2424
rust_decimal = { version = "1.40", features = ["db-postgres"] }
2525
tokio = { version = "1", default-features = false }

arrow-pg/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ datafusion = { workspace = true, optional = true }
2727
futures.workspace = true
2828
geoarrow = { version = "0.7", optional = true }
2929
geoarrow-schema = { version = "0.7", optional = true }
30+
pg_interval = { version = "0.5.1", package = "pg_interval_2" }
3031
pgwire = { workspace = true, default-features = false, features = ["server-api", "pg-ext-types"] }
3132
postgres-types.workspace = true
3233
rust_decimal.workspace = true

arrow-pg/src/encoder.rs

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use chrono::NaiveTime;
77
use chrono::{NaiveDate, NaiveDateTime};
88
#[cfg(feature = "datafusion")]
99
use datafusion::arrow::{array::*, datatypes::*};
10+
use pg_interval::Interval as PgInterval;
1011
use pgwire::api::results::{DataRowEncoder, FieldInfo};
1112
use pgwire::error::{ErrorInfo, PgWireError, PgWireResult};
1213
use pgwire::types::ToSqlText;
@@ -377,6 +378,78 @@ pub fn encode_value<T: Encoder>(
377378
}
378379
}
379380
},
381+
DataType::Interval(interval_unit) => match interval_unit {
382+
IntervalUnit::YearMonth => {
383+
let interval_array = arr
384+
.as_any()
385+
.downcast_ref::<IntervalYearMonthArray>()
386+
.unwrap();
387+
let months = IntervalYearMonthType::to_months(interval_array.value(idx));
388+
encoder.encode_field(&PgInterval::new(months, 0, 0), pg_field)?;
389+
}
390+
IntervalUnit::DayTime => {
391+
let interval_array = arr.as_any().downcast_ref::<IntervalDayTimeArray>().unwrap();
392+
let (days, millis) = IntervalDayTimeType::to_parts(interval_array.value(idx));
393+
encoder
394+
.encode_field(&PgInterval::new(0, days, millis as i64 * 1000i64), pg_field)?;
395+
}
396+
IntervalUnit::MonthDayNano => {
397+
let interval_array = arr
398+
.as_any()
399+
.downcast_ref::<IntervalMonthDayNanoArray>()
400+
.unwrap();
401+
let (months, days, nanoseconds) =
402+
IntervalMonthDayNanoType::to_parts(interval_array.value(idx));
403+
404+
encoder.encode_field(
405+
&PgInterval::new(months, days, nanoseconds / 1000i64),
406+
pg_field,
407+
)?;
408+
}
409+
},
410+
DataType::Duration(unit) => match unit {
411+
TimeUnit::Second => {
412+
if arr.is_null(idx) {
413+
return encoder.encode_field(&None::<PgInterval>, pg_field);
414+
}
415+
let duration_array = arr.as_any().downcast_ref::<DurationSecondArray>().unwrap();
416+
let microseconds = duration_array.value(idx) * 1_000_000i64;
417+
encoder.encode_field(&PgInterval::new(0, 0, microseconds), pg_field)?;
418+
}
419+
TimeUnit::Millisecond => {
420+
if arr.is_null(idx) {
421+
return encoder.encode_field(&None::<PgInterval>, pg_field);
422+
}
423+
let duration_array = arr
424+
.as_any()
425+
.downcast_ref::<DurationMillisecondArray>()
426+
.unwrap();
427+
let microseconds = duration_array.value(idx) * 1_000i64;
428+
encoder.encode_field(&PgInterval::new(0, 0, microseconds), pg_field)?;
429+
}
430+
TimeUnit::Microsecond => {
431+
if arr.is_null(idx) {
432+
return encoder.encode_field(&None::<PgInterval>, pg_field);
433+
}
434+
let duration_array = arr
435+
.as_any()
436+
.downcast_ref::<DurationMicrosecondArray>()
437+
.unwrap();
438+
let microseconds = duration_array.value(idx);
439+
encoder.encode_field(&PgInterval::new(0, 0, microseconds), pg_field)?;
440+
}
441+
TimeUnit::Nanosecond => {
442+
if arr.is_null(idx) {
443+
return encoder.encode_field(&None::<PgInterval>, pg_field);
444+
}
445+
let duration_array = arr
446+
.as_any()
447+
.downcast_ref::<DurationNanosecondArray>()
448+
.unwrap();
449+
let microseconds = duration_array.value(idx) / 1_000i64;
450+
encoder.encode_field(&PgInterval::new(0, 0, microseconds), pg_field)?;
451+
}
452+
},
380453
DataType::List(_) | DataType::FixedSizeList(_, _) | DataType::LargeList(_) => {
381454
if arr.is_null(idx) {
382455
return encoder.encode_field(&None::<&[i8]>, pg_field);

arrow-pg/src/list_encoder.rs

Lines changed: 107 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,13 @@ use std::{str::FromStr, sync::Arc};
44
use arrow::{
55
array::{
66
timezone::Tz, Array, BinaryArray, BinaryViewArray, BooleanArray, Date32Array, Date64Array,
7-
Decimal128Array, Decimal256Array, DurationMicrosecondArray, LargeBinaryArray,
8-
LargeListArray, LargeStringArray, ListArray, MapArray, PrimitiveArray, StringArray,
9-
StringViewArray, Time32MillisecondArray, Time32SecondArray, Time64MicrosecondArray,
10-
Time64NanosecondArray, TimestampMicrosecondArray, TimestampMillisecondArray,
11-
TimestampNanosecondArray, TimestampSecondArray,
7+
Decimal128Array, Decimal256Array, DurationMicrosecondArray, DurationMillisecondArray,
8+
DurationNanosecondArray, DurationSecondArray, IntervalDayTimeArray,
9+
IntervalMonthDayNanoArray, IntervalYearMonthArray, LargeBinaryArray, LargeListArray,
10+
LargeStringArray, ListArray, MapArray, PrimitiveArray, StringArray, StringViewArray,
11+
Time32MillisecondArray, Time32SecondArray, Time64MicrosecondArray, Time64NanosecondArray,
12+
TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
13+
TimestampSecondArray,
1214
},
1315
datatypes::{
1416
DataType, Date32Type, Date64Type, Float32Type, Float64Type, Int16Type, Int32Type,
@@ -21,11 +23,13 @@ use arrow::{
2123
use datafusion::arrow::{
2224
array::{
2325
timezone::Tz, Array, BinaryArray, BinaryViewArray, BooleanArray, Date32Array, Date64Array,
24-
Decimal128Array, Decimal256Array, DurationMicrosecondArray, LargeBinaryArray,
25-
LargeListArray, LargeStringArray, ListArray, MapArray, PrimitiveArray, StringArray,
26-
StringViewArray, Time32MillisecondArray, Time32SecondArray, Time64MicrosecondArray,
27-
Time64NanosecondArray, TimestampMicrosecondArray, TimestampMillisecondArray,
28-
TimestampNanosecondArray, TimestampSecondArray,
26+
Decimal128Array, Decimal256Array, DurationMicrosecondArray, DurationMillisecondArray,
27+
DurationNanosecondArray, DurationSecondArray, IntervalDayTimeArray,
28+
IntervalMonthDayNanoArray, IntervalYearMonthArray, LargeBinaryArray, LargeListArray,
29+
LargeStringArray, ListArray, MapArray, PrimitiveArray, StringArray, StringViewArray,
30+
Time32MillisecondArray, Time32SecondArray, Time64MicrosecondArray, Time64NanosecondArray,
31+
TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
32+
TimestampSecondArray,
2933
},
3034
datatypes::{
3135
DataType, Date32Type, Date64Type, Float32Type, Float64Type, Int16Type, Int32Type,
@@ -36,6 +40,7 @@ use datafusion::arrow::{
3640
};
3741

3842
use chrono::{DateTime, TimeZone, Utc};
43+
use pg_interval::Interval as PgInterval;
3944
use pgwire::api::results::FieldInfo;
4045
use pgwire::error::{PgWireError, PgWireResult};
4146
use rust_decimal::Decimal;
@@ -446,17 +451,98 @@ pub fn encode_list<T: Encoder>(
446451
encoder.encode_field(&value, pg_field)?;
447452
Ok(())
448453
}
449-
DataType::Duration(_) => {
450-
// Convert duration to microseconds for now
451-
let value: Vec<Option<i64>> = arr
452-
.as_any()
453-
.downcast_ref::<DurationMicrosecondArray>()
454-
.unwrap()
455-
.iter()
456-
.collect();
457-
encoder.encode_field(&value, pg_field)?;
458-
Ok(())
459-
}
454+
DataType::Duration(unit) => match unit {
455+
TimeUnit::Second => {
456+
let value: Vec<Option<PgInterval>> = arr
457+
.as_any()
458+
.downcast_ref::<DurationSecondArray>()
459+
.unwrap()
460+
.iter()
461+
.map(|val| val.map(|v| PgInterval::new(0, 0, v * 1_000_000i64)))
462+
.collect();
463+
encoder.encode_field(&value, pg_field)?;
464+
Ok(())
465+
}
466+
TimeUnit::Millisecond => {
467+
let value: Vec<Option<PgInterval>> = arr
468+
.as_any()
469+
.downcast_ref::<DurationMillisecondArray>()
470+
.unwrap()
471+
.iter()
472+
.map(|val| val.map(|v| PgInterval::new(0, 0, v * 1_000i64)))
473+
.collect();
474+
encoder.encode_field(&value, pg_field)?;
475+
Ok(())
476+
}
477+
TimeUnit::Microsecond => {
478+
let value: Vec<Option<PgInterval>> = arr
479+
.as_any()
480+
.downcast_ref::<DurationMicrosecondArray>()
481+
.unwrap()
482+
.iter()
483+
.map(|val| val.map(|v| PgInterval::new(0, 0, v)))
484+
.collect();
485+
encoder.encode_field(&value, pg_field)?;
486+
Ok(())
487+
}
488+
TimeUnit::Nanosecond => {
489+
let value: Vec<Option<PgInterval>> = arr
490+
.as_any()
491+
.downcast_ref::<DurationNanosecondArray>()
492+
.unwrap()
493+
.iter()
494+
.map(|val| val.map(|v| PgInterval::new(0, 0, v / 1_000i64)))
495+
.collect();
496+
encoder.encode_field(&value, pg_field)?;
497+
Ok(())
498+
}
499+
},
500+
DataType::Interval(interval_unit) => match interval_unit {
501+
arrow::datatypes::IntervalUnit::YearMonth => {
502+
let value: Vec<Option<PgInterval>> = arr
503+
.as_any()
504+
.downcast_ref::<IntervalYearMonthArray>()
505+
.unwrap()
506+
.iter()
507+
.map(|val| val.map(|v| PgInterval::new(v, 0, 0)))
508+
.collect();
509+
encoder.encode_field(&value, pg_field)?;
510+
Ok(())
511+
}
512+
arrow::datatypes::IntervalUnit::DayTime => {
513+
let value: Vec<Option<PgInterval>> = arr
514+
.as_any()
515+
.downcast_ref::<IntervalDayTimeArray>()
516+
.unwrap()
517+
.iter()
518+
.map(|val| {
519+
val.map(|v| {
520+
let (days, millis) = arrow::datatypes::IntervalDayTimeType::to_parts(v);
521+
PgInterval::new(0, days, millis as i64 * 1000i64)
522+
})
523+
})
524+
.collect();
525+
encoder.encode_field(&value, pg_field)?;
526+
Ok(())
527+
}
528+
arrow::datatypes::IntervalUnit::MonthDayNano => {
529+
let value: Vec<Option<PgInterval>> = arr
530+
.as_any()
531+
.downcast_ref::<IntervalMonthDayNanoArray>()
532+
.unwrap()
533+
.iter()
534+
.map(|val| {
535+
val.map(|v| {
536+
let (months, days, nanos) =
537+
arrow::datatypes::IntervalMonthDayNanoType::to_parts(v);
538+
PgInterval::new(months, days, nanos / 1000i64)
539+
})
540+
})
541+
.collect();
542+
encoder.encode_field(&value, pg_field)?;
543+
Ok(())
544+
}
545+
},
460546
DataType::List(_) => {
461547
// Support for nested lists (list of lists)
462548
// For now, convert to string representation

0 commit comments

Comments
 (0)