diff --git a/datafusion/src/datasource/file_format/mod.rs b/datafusion/src/datasource/file_format/mod.rs index 21da2e1e6a27..6f64d58087c4 100644 --- a/datafusion/src/datasource/file_format/mod.rs +++ b/datafusion/src/datasource/file_format/mod.rs @@ -21,6 +21,7 @@ pub mod avro; pub mod csv; pub mod json; pub mod parquet; +pub mod rocksdb; use std::any::Any; use std::fmt; diff --git a/datafusion/src/datasource/file_format/rocksdb.rs b/datafusion/src/datasource/file_format/rocksdb.rs new file mode 100644 index 000000000000..ed88e075f475 --- /dev/null +++ b/datafusion/src/datasource/file_format/rocksdb.rs @@ -0,0 +1,54 @@ +use std::any::Any; +use std::sync::Arc; + +use arrow::datatypes::Schema; +use arrow::{self, datatypes::SchemaRef}; +use async_trait::async_trait; +use futures::StreamExt; + +use super::FileFormat; +use crate::datasource::object_store::{ObjectReader, ObjectReaderStream}; +use crate::error::Result; +use crate::logical_plan::Expr; +use crate::physical_plan::file_format::{RocksdbExec, FileScanConfig}; +use crate::physical_plan::ExecutionPlan; +use crate::physical_plan::Statistics; +use std::collections::HashMap; +use crate::arrow::datatypes::{DataType, Field}; + +#[derive(Debug)] +pub struct RocksdbFormat { + schema: Schema, +} + +impl RocksdbFormat { + pub fn new(schema: Schema) -> Self { + Self { + schema + } + } +} + +#[async_trait] +impl FileFormat for RocksdbFormat { + fn as_any(&self) -> &dyn Any { + self + } + + async fn infer_schema(&self, mut readers: ObjectReaderStream) -> Result { + Ok(Arc::new(self.schema.to_owned())) + } + + async fn infer_stats(&self, _reader: Arc) -> Result { + Ok(Statistics::default()) + } + + async fn create_physical_plan( + &self, + conf: FileScanConfig, + filters: &[Expr], + ) -> Result> { + let exec = RocksdbExec::new(conf); + Ok(Arc::new(exec)) + } +} \ No newline at end of file diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs index 5a913e91b0f8..0b3abcf80ee9 100644 --- a/datafusion/src/execution/context.rs +++ b/datafusion/src/execution/context.rs @@ -27,6 +27,7 @@ use crate::{ avro::{AvroFormat, DEFAULT_AVRO_EXTENSION}, csv::{CsvFormat, DEFAULT_CSV_EXTENSION}, parquet::{ParquetFormat, DEFAULT_PARQUET_EXTENSION}, + rocksdb::RocksdbFormat, FileFormat, }, MemTable, @@ -101,6 +102,7 @@ use super::{ options::{AvroReadOptions, CsvReadOptions}, DiskManager, MemoryManager, }; +use crate::arrow::datatypes::{Field, DataType, Schema}; /// ExecutionContext is the main interface for executing queries with DataFusion. The context /// provides the following functionality: @@ -464,6 +466,41 @@ impl ExecutionContext { Ok(()) } + /// Registers a rocksdb data source so that it can be referenced from SQL statements + /// executed against this context. + pub async fn register_rocksdb( + &mut self, + name: &str, + ) -> Result<()> { + // 元数据 实时从元数据服务获取 TODO + let field = Field::new("boolvalue", DataType::Boolean, false); + let field1 = Field::new("intvalue", DataType::Boolean, false); + let schema = Schema::new_with_metadata(vec![field, field1],HashMap::new()); + + let file_format = RocksdbFormat::new(schema.clone()); + + // 查询选项 + let listing_options = ListingOptions { + format: Arc::new(file_format), + collect_stat: false, + file_extension: String::new(), + target_partitions:self.state.lock().config.target_partitions, + table_partition_cols: vec![], + }; + + // 需要一个数据地址 表的目录 + let mut uri = String::from(env!("CARGO_MANIFEST_DIR")); + uri.push_str("/src/execution/mod.rs"); + self.register_listing_table( + name, + uri.as_str(), + listing_options, + Some(Arc::new(schema.to_owned())), + ).await?; + + Ok(()) + } + /// Registers a CSV data source so that it can be referenced from SQL statements /// executed against this context. pub async fn register_csv( @@ -1317,6 +1354,7 @@ mod tests { use std::thread::{self, JoinHandle}; use std::{io::prelude::*, sync::Mutex}; use tempfile::TempDir; + use crate::arrow::util::display::array_value_to_string; #[tokio::test] async fn shared_memory_and_disk_manager() { @@ -3360,4 +3398,41 @@ mod tests { ctx.read_parquet("dummy").await.unwrap() } } + + #[tokio::test] + async fn test_register_rocksdb() -> Result<()> { + let mut ctx = ExecutionContext::new(); + let x = ctx.register_rocksdb("tab").await?; + + let df = ctx.sql("select boolvalue from tab").await?; + + let res = df.collect().await; + let vec = res.expect("出现错误"); + let vec1 = collect_answer(&vec); + for x in vec1 { + println!("{}", x); + } + + Ok(()) + } + + pub fn collect_answer(records: &[RecordBatch]) -> Vec { + let mut result = Vec::new(); + + if !records.is_empty() { + for batch in records { + for row in 0..batch.num_rows() { + let mut cells = Vec::new(); + for col in 0..batch.num_columns() { + let column = batch.column(col); + cells.push(array_value_to_string(&column, row).unwrap()); + } + + result.push(cells.join("|")); + } + } + } + + result + } } diff --git a/datafusion/src/physical_plan/file_format/mod.rs b/datafusion/src/physical_plan/file_format/mod.rs index 7658addd3561..612afbdac417 100644 --- a/datafusion/src/physical_plan/file_format/mod.rs +++ b/datafusion/src/physical_plan/file_format/mod.rs @@ -22,6 +22,7 @@ mod csv; mod file_stream; mod json; mod parquet; +mod rocksdb; pub use self::parquet::ParquetExec; use arrow::{ @@ -34,6 +35,7 @@ use arrow::{ pub use avro::AvroExec; pub use csv::CsvExec; pub use json::NdJsonExec; +pub use rocksdb::RocksdbExec; use crate::error::DataFusionError; use crate::{ diff --git a/datafusion/src/physical_plan/file_format/rocksdb.rs b/datafusion/src/physical_plan/file_format/rocksdb.rs new file mode 100644 index 000000000000..36180b0d78df --- /dev/null +++ b/datafusion/src/physical_plan/file_format/rocksdb.rs @@ -0,0 +1,206 @@ +use crate::physical_plan::{ExecutionPlan, Partitioning, Distribution, SendableRecordBatchStream, DisplayFormatType, Statistics}; +use std::any::Any; +use std::sync::Arc; +use crate::execution::runtime_env::RuntimeEnv; +use crate::physical_plan::metrics::MetricsSet; +use std::fmt::{Formatter, Debug}; +use futures::future::OkInto; +use async_trait::async_trait; +use crate::physical_plan::file_format::FileScanConfig; +use crate::error::DataFusionError; +use super::file_stream::{BatchIter, FileStream}; +use std::io::Read; +use arrow::record_batch::RecordBatch; +use arrow::error::Result; +use arrow::datatypes::*; +use crate::arrow::array::{ArrayRef, BooleanBuilder, Array}; +use crate::physical_plan::expressions::PhysicalSortExpr; + +#[derive(Debug, Clone)] +pub struct RocksdbExec{ + base_config: FileScanConfig, + projected_statistics: Statistics, + projected_schema: SchemaRef, +} + +/// Execution plan for scanning one or more Parquet partitions + +impl RocksdbExec{ + /// Create a new Rocksdb reader execution plan provided file list and schema. + pub fn new(base_config: FileScanConfig) -> Self{ + let (projected_schema, projected_statistics) = base_config.project(); + Self{ + base_config, + projected_statistics, + projected_schema, + } + } +} + +#[async_trait] +impl ExecutionPlan for RocksdbExec { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + Arc::clone(&self.projected_schema) + } + + fn output_partitioning(&self) -> Partitioning { + // 输出分区的个数 + Partitioning::UnknownPartitioning(self.base_config.file_groups.len()) + } + + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + None + } + + fn children(&self) -> Vec> { + // 叶子节点没有child + vec![] + } + + fn with_new_children(&self, children: Vec>) -> crate::error::Result> { + if children.is_empty() { + Ok(Arc::new(self.clone())) + } else { + Err(DataFusionError::Internal(format!( + "Children cannot be replaced in {:?}", + self + ))) + } + } + + async fn execute(&self, partition: usize, runtime: Arc) -> crate::error::Result { + + let batch_size = runtime.batch_size; + let file_schema = Arc::clone(&self.base_config.file_schema); + let projection = self.base_config.file_column_projection_indices(); + let fun = move |file, remaining: &Option| { + Box::new(RocksdbReader::new( + Arc::clone(&file_schema), + batch_size, + projection.clone(), + )) as BatchIter + }; + + Ok(Box::pin(FileStream::new( + Arc::clone(&self.base_config.object_store), + self.base_config.file_groups[partition].clone(), + fun, + Arc::clone(&self.projected_schema), + self.base_config.limit, + self.base_config.table_partition_cols.clone(), + ))) + } + + + fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { + match t { + DisplayFormatType::Default => { + write!( + f, + "RocksdbExec: limit={:?}, files={}", + self.base_config.limit, + super::FileGroupsDisplay(&self.base_config.file_groups), + ) + } + } + } + + fn statistics(&self) -> Statistics { + self.projected_statistics.clone() + } +} + +pub struct RocksdbReader { + /// Explicit schema for the rockdb + schema: SchemaRef, + /// Optional projection for which columns to load (zero-based column indices) + projection: Option>, + /// Number of records per batch + batch_size: usize, + last_id:Option, +} + +impl RocksdbReader { + pub fn new( + schema: SchemaRef, + batch_size: usize, + projection: Option>,) -> Self{ + Self{ + schema, + projection, + batch_size, + last_id: Option::Some(std::i64::MIN), + } + } + + /// Returns the schema of the reader, useful for getting the schema without reading + /// record batches + pub fn schema(&self) -> SchemaRef { + match &self.projection { + Some(projection) => { + let fields = self.schema.fields(); + let projected_fields: Vec = + projection.iter().map(|i| fields[*i].clone()).collect(); + + Arc::new(Schema::new(projected_fields)) + } + None => self.schema.clone(), + } + } +} + +impl Iterator for RocksdbReader { + type Item = Result; + + fn next(&mut self) -> Option { + + if self.last_id == Some(std::i64::MAX) { + return None; + } + + // 返回数据 + let fields = self.schema.fields(); + let projection: Vec = match self.projection { + Some(ref v) => v.clone(), + None => fields.iter().enumerate().map(|(i, _)| i).collect(), + }; + + let projected_fields: Vec = projection + .iter() + .map(|i| fields[*i].clone()) + .collect(); + let metadata = Some(self.schema.metadata().clone()); + let projected_schema = Arc::new(match metadata { + None => Schema::new(projected_fields), + Some(metadata) => Schema::new_with_metadata(projected_fields, metadata), + }); + + + let mut builder = BooleanBuilder::new(10); + builder.append_value(true); + builder.append_value(true); + builder.append_value(true); + builder.append_value(true); + builder.append_value(true); + builder.append_value(false); + builder.append_value(false); + builder.append_value(false); + builder.append_value(false); + builder.append_value(false); + + let arc:ArrayRef= Arc::new(builder.finish()); + + let arr = vec![arc]; + + // 现在demo中未实现真正读取rocksdb + let result = RecordBatch::try_new(projected_schema,arr); + + // 更新last_id 标识上次读取的位置 分批次读取 + self.last_id = Option::Some(std::i64::MAX); + Some(result) + } +} \ No newline at end of file