-
Notifications
You must be signed in to change notification settings - Fork 32
Expand file tree
/
Copy pathlocalhost_run.rs
More file actions
78 lines (67 loc) · 2.32 KB
/
localhost_run.rs
File metadata and controls
78 lines (67 loc) · 2.32 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
use arrow::util::pretty::pretty_format_batches;
use async_trait::async_trait;
use datafusion::common::DataFusionError;
use datafusion::execution::SessionStateBuilder;
use datafusion::prelude::{ParquetReadOptions, SessionContext};
use datafusion_distributed::{
DistributedExt, DistributedPhysicalOptimizerRule, WorkerResolver, display_plan_ascii,
};
use futures::TryStreamExt;
use std::error::Error;
use std::sync::Arc;
use structopt::StructOpt;
use url::Url;
#[derive(StructOpt)]
#[structopt(name = "run", about = "A localhost Distributed DataFusion runner")]
struct Args {
/// The SQL query to run.
#[structopt()]
query: String,
/// The ports holding Distributed DataFusion workers.
#[structopt(long = "cluster-ports", use_delimiter = true)]
cluster_ports: Vec<u16>,
/// Whether the distributed plan should be rendered instead of executing the query.
#[structopt(long)]
show_distributed_plan: bool,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let args = Args::from_args();
let localhost_resolver = LocalhostWorkerResolver {
ports: args.cluster_ports,
};
let state = SessionStateBuilder::new()
.with_default_features()
.with_distributed_worker_resolver(localhost_resolver)
.with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule))
.with_distributed_files_per_task(1)?
.build();
let ctx = SessionContext::from(state);
ctx.register_parquet("weather", "testdata/weather", ParquetReadOptions::default())
.await?;
let df = ctx.sql(&args.query).await?;
if args.show_distributed_plan {
let plan = df.create_physical_plan().await?;
println!("{}", display_plan_ascii(plan.as_ref(), false));
} else {
let stream = df.execute_stream().await?;
let batches = stream.try_collect::<Vec<_>>().await?;
let formatted = pretty_format_batches(&batches)?;
println!("{formatted}");
}
Ok(())
}
#[derive(Clone)]
struct LocalhostWorkerResolver {
ports: Vec<u16>,
}
#[async_trait]
impl WorkerResolver for LocalhostWorkerResolver {
fn get_urls(&self) -> Result<Vec<Url>, DataFusionError> {
Ok(self
.ports
.iter()
.map(|port| Url::parse(&format!("http://localhost:{port}")).unwrap())
.collect())
}
}