-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Closed
Labels
help wantedExtra attention is neededExtra attention is needed
Description
Hi, I have seen in unit tests that It is possible to store arrow data types in parquet using ArrowWriter. I have created composite type like UUID to check if query against such data will work, but it fails. It looks like as if schema couldn't be read correctly or simply understood.
This page mentions that nested types are not supported https://arrow.apache.org/datafusion/user-guide/sql/sql_status.html,
but there is a way to serialize them via ArrowWriter.
Is something wrong with my approach(code), or this feature is not ready yet ?
const TABLE_ABS_PATH: &str = "/abc/table/...";
const SAMPLE_PATH: &str = "/abc/table/sample1.parquet";
#[tokio::main]
async fn main() -> Result<()> {
write();
println!();
read();
println!();
sql().await?;
Ok(())
}
fn write() {
let file = std::fs::File::create(Path::new(SAMPLE_PATH)).unwrap();
let uuid_structure = DataType::Struct(vec![
Field::new("most", DataType::Int64, false),
Field::new("least", DataType::Int64, false),
]);
let uuid = Field::new("UUID", uuid_structure.clone(), false);
let schema = Arc::new(Schema::new(vec![uuid]));
let mut writer = ArrowWriter::try_new(file, schema.clone(), None).expect("...");
let data = StructArray::from(vec![
(
Field::new("most", DataType::Int64, false),
Arc::new(Int64Array::from(vec![1, 3])) as ArrayRef,
),
(
Field::new("least", DataType::Int64, false),
Arc::new(Int64Array::from(vec![2, 4])) as ArrayRef,
),
]);
let rd =
datafusion::arrow::record_batch::RecordBatch::try_new(schema.clone(), vec![Arc::new(data)])
.expect("...");
writer.write(&rd);
writer.close();
}
fn read() {
use parquet::file::reader::{FileReader, SerializedFileReader};
use std::{fs::File, path::Path};
let f = File::open(Path::new(SAMPLE_PATH)).unwrap();
let reader1 = SerializedFileReader::new(f).expect("...");
let mut pqrd = ParquetFileArrowReader::new(Arc::new(reader1));
let result = pqrd.get_record_reader(60).expect("...");
for batch in result {
let batch = batch.unwrap();
println!("{:?}", batch);
}
}
async fn sql() -> Result<()> {
let mut ctx = ExecutionContext::new();
let file_format = ParquetFormat::default().with_enable_pruning(false);
let listing_options = ListingOptions {
file_extension: DEFAULT_PARQUET_EXTENSION.to_owned(),
format: Arc::new(file_format),
table_partition_cols: vec![],
collect_stat: false,
target_partitions: 1,
};
let uuid_structure = DataType::Struct(vec![
Field::new("most", DataType::Int64, false),
Field::new("least", DataType::Int64, false),
]);
let uuid = Field::new("UUID", uuid_structure.clone(), false);
let schema = Arc::new(Schema::new(vec![uuid]));
ctx.register_listing_table(
"FANCY_TABLE",
&format!("file://{}", TABLE_ABS_PATH),
listing_options,
Some(schema),
)
.await
.unwrap();
let df = ctx.sql("SELECT * FROM FANCY_TABLE").await?;
df.show().await?;
Ok(())
}
Error:
Invalid argument error: column types must match schema types, expected Struct([Field { name: \"most\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: None }, Field { name: \"least\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: None }]) but found Struct([Field { name: \"most\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: None }]) at column index 0")
Content:
RecordBatch { schema: Schema { fields: [Field { name: "UUID", data_type: Struct([Field { name: "most", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: None }, Field { name: "least", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: None }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: None }], metadata: {} }, columns: [StructArray
[
-- child 0: "most" (Int64)
PrimitiveArray<Int64>
[
1,
3,
]
-- child 1: "least" (Int64)
PrimitiveArray<Int64>
[
2,
4,
]
]] }
Metadata
Metadata
Assignees
Labels
help wantedExtra attention is neededExtra attention is needed