From 12a5e5b6baccbe5e45cd65644fc2219d4aa72bb7 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 21 Apr 2021 07:24:57 -0600 Subject: [PATCH 1/6] Combine DataFusion and Ballista workspaces --- Cargo.toml | 9 +++++-- ballista/rust/Cargo.toml | 30 ------------------------ ballista/rust/benchmarks/tpch/Cargo.toml | 2 +- ballista/rust/client/Cargo.toml | 2 +- ballista/rust/core/Cargo.toml | 2 +- ballista/rust/executor/Cargo.toml | 2 +- ballista/rust/scheduler/Cargo.toml | 2 +- 7 files changed, 12 insertions(+), 37 deletions(-) delete mode 100644 ballista/rust/Cargo.toml diff --git a/Cargo.toml b/Cargo.toml index 0a4ef2a7f2c1..bf36e6ac3fb7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,9 +17,14 @@ [workspace] members = [ - "datafusion", - "datafusion-examples", + "datafusion", + "datafusion-examples", "benchmarks", + "ballista/rust/benchmarks/tpch", + "ballista/rust/client", + "ballista/rust/core", + "ballista/rust/executor", + "ballista/rust/scheduler", ] # this package is excluded because it requires different compilation flags, thereby significantly changing diff --git a/ballista/rust/Cargo.toml b/ballista/rust/Cargo.toml deleted file mode 100644 index 5e344e004b83..000000000000 --- a/ballista/rust/Cargo.toml +++ /dev/null @@ -1,30 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -[workspace] - -members = [ - "benchmarks/tpch", - "client", - "core", - "executor", - "scheduler", -] - -#[profile.release] -#lto = true -#codegen-units = 1 diff --git a/ballista/rust/benchmarks/tpch/Cargo.toml b/ballista/rust/benchmarks/tpch/Cargo.toml index 8d62e20e17e1..53499615124c 100644 --- a/ballista/rust/benchmarks/tpch/Cargo.toml +++ b/ballista/rust/benchmarks/tpch/Cargo.toml @@ -17,7 +17,7 @@ [package] name = "tpch" -version = "0.4.2-SNAPSHOT" +version = "0.5.0-SNAPSHOT" homepage = "https://github.com/apache/arrow" repository = "https://github.com/apache/arrow" authors = ["Apache Arrow "] diff --git a/ballista/rust/client/Cargo.toml b/ballista/rust/client/Cargo.toml index 6ac86875169b..696e80f1ea69 100644 --- a/ballista/rust/client/Cargo.toml +++ b/ballista/rust/client/Cargo.toml @@ -19,7 +19,7 @@ name = "ballista" description = "Ballista Distributed Compute" license = "Apache-2.0" -version = "0.4.2-SNAPSHOT" +version = "0.5.0-SNAPSHOT" homepage = "https://github.com/apache/arrow" repository = "https://github.com/apache/arrow" authors = ["Apache Arrow "] diff --git a/ballista/rust/core/Cargo.toml b/ballista/rust/core/Cargo.toml index e9d7682473f1..55b4fc4a4abb 100644 --- a/ballista/rust/core/Cargo.toml +++ b/ballista/rust/core/Cargo.toml @@ -19,7 +19,7 @@ name = "ballista-core" description = "Ballista Distributed Compute" license = "Apache-2.0" -version = "0.4.2-SNAPSHOT" +version = "0.5.0-SNAPSHOT" homepage = "https://github.com/apache/arrow" repository = "https://github.com/apache/arrow" authors = ["Apache Arrow "] diff --git a/ballista/rust/executor/Cargo.toml b/ballista/rust/executor/Cargo.toml index 79ceabe2dd66..bd6d9242ffd5 100644 --- a/ballista/rust/executor/Cargo.toml +++ b/ballista/rust/executor/Cargo.toml @@ -19,7 +19,7 @@ name = "ballista-executor" description = "Ballista Distributed Compute - Executor" license = "Apache-2.0" -version = "0.4.2-SNAPSHOT" +version = "0.5.0-SNAPSHOT" homepage = "https://github.com/apache/arrow" repository = "https://github.com/apache/arrow" authors = ["Apache Arrow "] diff --git a/ballista/rust/scheduler/Cargo.toml b/ballista/rust/scheduler/Cargo.toml index ce8ca09e15b2..48086e44177e 100644 --- a/ballista/rust/scheduler/Cargo.toml +++ b/ballista/rust/scheduler/Cargo.toml @@ -19,7 +19,7 @@ name = "ballista-scheduler" description = "Ballista Distributed Compute - Scheduler" license = "Apache-2.0" -version = "0.4.2-SNAPSHOT" +version = "0.5.0-SNAPSHOT" homepage = "https://github.com/apache/arrow" repository = "https://github.com/apache/arrow" authors = ["Apache Arrow "] From 39e0485ba17c327e71fdc4c97122decf78263844 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 21 Apr 2021 07:29:45 -0600 Subject: [PATCH 2/6] Update integration test to avoid compiling Ballista separately --- dev/docker/rust.dockerfile | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/dev/docker/rust.dockerfile b/dev/docker/rust.dockerfile index 19dd4879eab6..6505f3c1660a 100644 --- a/dev/docker/rust.dockerfile +++ b/dev/docker/rust.dockerfile @@ -59,20 +59,18 @@ ARG RELEASE_FLAG=--release # force build.rs to run to generate configure_me code. ENV FORCE_REBUILD='true' RUN cargo build $RELEASE_FLAG -RUN cd ballista/rust && \ - cargo build $RELEASE_FLAG # put the executor on /executor (need to be copied from different places depending on FLAG) ENV RELEASE_FLAG=${RELEASE_FLAG} -RUN if [ -z "$RELEASE_FLAG" ]; then mv /tmp/ballista/ballista/rust/target/debug/ballista-executor /executor; else mv /tmp/ballista/ballista/rust/target/release/ballista-executor /executor; fi +RUN if [ -z "$RELEASE_FLAG" ]; then mv /tmp/ballista/target/debug/ballista-executor /executor; else mv /tmp/ballista/target/release/ballista-executor /executor; fi # put the scheduler on /scheduler (need to be copied from different places depending on FLAG) ENV RELEASE_FLAG=${RELEASE_FLAG} -RUN if [ -z "$RELEASE_FLAG" ]; then mv /tmp/ballista/ballista/rust/target/debug/ballista-scheduler /scheduler; else mv /tmp/ballista/ballista/rust/target/release/ballista-scheduler /scheduler; fi +RUN if [ -z "$RELEASE_FLAG" ]; then mv /tmp/ballista/target/debug/ballista-scheduler /scheduler; else mv /tmp/ballista/target/release/ballista-scheduler /scheduler; fi # put the tpch on /tpch (need to be copied from different places depending on FLAG) ENV RELEASE_FLAG=${RELEASE_FLAG} -RUN if [ -z "$RELEASE_FLAG" ]; then mv /tmp/ballista/ballista/rust/target/debug/tpch /tpch; else mv /tmp/ballista/ballista/rust/target/release/tpch /tpch; fi +RUN if [ -z "$RELEASE_FLAG" ]; then mv /tmp/ballista/target/debug/tpch /tpch; else mv /tmp/ballista/target/release/tpch /tpch; fi # Copy the binary into a new container for a smaller docker image FROM ballistacompute/rust-base:0.4.0-20210213 From 34df3f58c9a4f29e4c39553072acfa0adbb9ac07 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 22 Apr 2021 07:09:56 -0600 Subject: [PATCH 3/6] snmalloc no longer the default --- ballista/rust/executor/Cargo.toml | 1 - 1 file changed, 1 deletion(-) diff --git a/ballista/rust/executor/Cargo.toml b/ballista/rust/executor/Cargo.toml index bd6d9242ffd5..a7b96a7a39f1 100644 --- a/ballista/rust/executor/Cargo.toml +++ b/ballista/rust/executor/Cargo.toml @@ -26,7 +26,6 @@ authors = ["Apache Arrow "] edition = "2018" [features] -default = ["snmalloc"] snmalloc = ["snmalloc-rs"] [dependencies] From 95e0045f3eabc65a283bbb0ee5ac0e45d80a0df4 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 22 Apr 2021 07:43:44 -0600 Subject: [PATCH 4/6] remove exclusion --- Cargo.toml | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index bf36e6ac3fb7..0947beadac0d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,9 +25,4 @@ members = [ "ballista/rust/core", "ballista/rust/executor", "ballista/rust/scheduler", -] - -# this package is excluded because it requires different compilation flags, thereby significantly changing -# how it is compiled within the workspace, causing the whole workspace to be compiled from scratch -# this way, this is a stand-alone package that compiles independently of the others. -exclude = ["ballista"] +] \ No newline at end of file From 7256c9968911aa6561da1f72740a2e682f0491f5 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 22 Apr 2021 09:45:25 -0600 Subject: [PATCH 5/6] cargo fmt + clippy --- ballista/rust/client/src/context.rs | 4 ++-- ballista/rust/core/src/datasource.rs | 6 +++--- ballista/rust/core/src/serde/logical_plan/to_proto.rs | 4 ++-- ballista/rust/scheduler/src/planner.rs | 4 ++-- 4 files changed, 9 insertions(+), 9 deletions(-) diff --git a/ballista/rust/client/src/context.rs b/ballista/rust/client/src/context.rs index 400f6b6183ec..a4cca7a0996c 100644 --- a/ballista/rust/client/src/context.rs +++ b/ballista/rust/client/src/context.rs @@ -30,7 +30,7 @@ use ballista_core::serde::protobuf::{ }; use ballista_core::{ client::BallistaClient, - datasource::DFTableAdapter, + datasource::DfTableAdapter, error::{BallistaError, Result}, memory_stream::MemoryStream, utils::create_datafusion_context, @@ -151,7 +151,7 @@ impl BallistaContext { let execution_plan = ctx.create_physical_plan(&plan)?; ctx.register_table( TableReference::Bare { table: name }, - Arc::new(DFTableAdapter::new(plan, execution_plan)), + Arc::new(DfTableAdapter::new(plan, execution_plan)), )?; } let df = ctx.sql(sql)?; diff --git a/ballista/rust/core/src/datasource.rs b/ballista/rust/core/src/datasource.rs index 8ff0df44e4be..5b1540ac5037 100644 --- a/ballista/rust/core/src/datasource.rs +++ b/ballista/rust/core/src/datasource.rs @@ -30,20 +30,20 @@ use datafusion::{ /// TableProvider which is effectively a wrapper around a physical plan. We need to be able to /// register tables so that we can create logical plans from SQL statements that reference these /// tables. -pub struct DFTableAdapter { +pub struct DfTableAdapter { /// DataFusion logical plan pub logical_plan: LogicalPlan, /// DataFusion execution plan plan: Arc, } -impl DFTableAdapter { +impl DfTableAdapter { pub fn new(logical_plan: LogicalPlan, plan: Arc) -> Self { Self { logical_plan, plan } } } -impl TableProvider for DFTableAdapter { +impl TableProvider for DfTableAdapter { fn as_any(&self) -> &dyn Any { self } diff --git a/ballista/rust/core/src/serde/logical_plan/to_proto.rs b/ballista/rust/core/src/serde/logical_plan/to_proto.rs index a181f98b6eb6..3f678a76c610 100644 --- a/ballista/rust/core/src/serde/logical_plan/to_proto.rs +++ b/ballista/rust/core/src/serde/logical_plan/to_proto.rs @@ -24,7 +24,7 @@ use std::{ convert::{TryFrom, TryInto}, }; -use crate::datasource::DFTableAdapter; +use crate::datasource::DfTableAdapter; use crate::serde::{protobuf, BallistaError}; use arrow::datatypes::{DataType, Schema}; @@ -679,7 +679,7 @@ impl TryInto for &LogicalPlan { // unwrap the DFTableAdapter to get to the real TableProvider let source = if let Some(adapter) = - source.as_any().downcast_ref::() + source.as_any().downcast_ref::() { match &adapter.logical_plan { LogicalPlan::TableScan { source, .. } => Ok(source.as_any()), diff --git a/ballista/rust/scheduler/src/planner.rs b/ballista/rust/scheduler/src/planner.rs index e9f668a7d5f8..e791fa8b5459 100644 --- a/ballista/rust/scheduler/src/planner.rs +++ b/ballista/rust/scheduler/src/planner.rs @@ -25,7 +25,7 @@ use std::time::Instant; use std::{collections::HashMap, future::Future}; use ballista_core::client::BallistaClient; -use ballista_core::datasource::DFTableAdapter; +use ballista_core::datasource::DfTableAdapter; use ballista_core::error::{BallistaError, Result}; use ballista_core::serde::scheduler::ExecutorMeta; use ballista_core::serde::scheduler::PartitionId; @@ -138,7 +138,7 @@ impl DistributedPlanner { stages.append(&mut child_stages); } - if let Some(adapter) = execution_plan.as_any().downcast_ref::() { + if let Some(adapter) = execution_plan.as_any().downcast_ref::() { // remove Repartition rule because that isn't supported yet let rules: Vec> = vec![ Arc::new(CoalesceBatches::new()), From f961577b057174e592fe7f1a30fb50cdf310e2a2 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 22 Apr 2021 10:57:28 -0600 Subject: [PATCH 6/6] clippy --- ballista/rust/core/src/error.rs | 1 + ballista/rust/core/src/serde/logical_plan/from_proto.rs | 2 ++ ballista/rust/core/src/serde/logical_plan/to_proto.rs | 3 ++- ballista/rust/core/src/serde/scheduler/from_proto.rs | 1 + ballista/rust/core/src/serde/scheduler/mod.rs | 1 + ballista/rust/core/src/serde/scheduler/to_proto.rs | 2 ++ 6 files changed, 9 insertions(+), 1 deletion(-) diff --git a/ballista/rust/core/src/error.rs b/ballista/rust/core/src/error.rs index d0155ce4b78f..e16920e04744 100644 --- a/ballista/rust/core/src/error.rs +++ b/ballista/rust/core/src/error.rs @@ -49,6 +49,7 @@ pub enum BallistaError { TokioError(tokio::task::JoinError), } +#[allow(clippy::from_over_into)] impl Into> for BallistaError { fn into(self) -> Result { Err(self) diff --git a/ballista/rust/core/src/serde/logical_plan/from_proto.rs b/ballista/rust/core/src/serde/logical_plan/from_proto.rs index 93084260662f..18a85d2796cf 100644 --- a/ballista/rust/core/src/serde/logical_plan/from_proto.rs +++ b/ballista/rust/core/src/serde/logical_plan/from_proto.rs @@ -436,6 +436,7 @@ impl TryInto for &protobuf::arrow_type::ArrowTypeEnu } } +#[allow(clippy::from_over_into)] impl Into for protobuf::PrimitiveScalarType { fn into(self) -> arrow::datatypes::DataType { use arrow::datatypes::DataType; @@ -1170,6 +1171,7 @@ impl TryFrom for protobuf::FileType { } } +#[allow(clippy::from_over_into)] impl Into for protobuf::FileType { fn into(self) -> datafusion::sql::parser::FileType { use datafusion::sql::parser::FileType; diff --git a/ballista/rust/core/src/serde/logical_plan/to_proto.rs b/ballista/rust/core/src/serde/logical_plan/to_proto.rs index 3f678a76c610..7f4b67e7efaa 100644 --- a/ballista/rust/core/src/serde/logical_plan/to_proto.rs +++ b/ballista/rust/core/src/serde/logical_plan/to_proto.rs @@ -1020,7 +1020,7 @@ impl TryInto for &Expr { let fun: protobuf::ScalarFunction = fun.try_into()?; let expr: Vec = args .iter() - .map(|e| Ok(e.try_into()?)) + .map(|e| e.try_into()) .collect::, BallistaError>>()?; Ok(protobuf::LogicalExprNode { expr_type: Some( @@ -1163,6 +1163,7 @@ impl TryInto for &Expr { } } +#[allow(clippy::from_over_into)] impl Into for &Schema { fn into(self) -> protobuf::Schema { protobuf::Schema { diff --git a/ballista/rust/core/src/serde/scheduler/from_proto.rs b/ballista/rust/core/src/serde/scheduler/from_proto.rs index fb1e4f812d0d..4631b2e4d863 100644 --- a/ballista/rust/core/src/serde/scheduler/from_proto.rs +++ b/ballista/rust/core/src/serde/scheduler/from_proto.rs @@ -72,6 +72,7 @@ impl TryInto for protobuf::PartitionId { } } +#[allow(clippy::from_over_into)] impl Into for protobuf::PartitionStats { fn into(self) -> PartitionStats { PartitionStats::new( diff --git a/ballista/rust/core/src/serde/scheduler/mod.rs b/ballista/rust/core/src/serde/scheduler/mod.rs index 81d8722d7f46..bbbd48b74a1f 100644 --- a/ballista/rust/core/src/serde/scheduler/mod.rs +++ b/ballista/rust/core/src/serde/scheduler/mod.rs @@ -75,6 +75,7 @@ pub struct ExecutorMeta { pub port: u16, } +#[allow(clippy::from_over_into)] impl Into for ExecutorMeta { fn into(self) -> protobuf::ExecutorMetadata { protobuf::ExecutorMetadata { diff --git a/ballista/rust/core/src/serde/scheduler/to_proto.rs b/ballista/rust/core/src/serde/scheduler/to_proto.rs index f581becdea17..40ca907a8a71 100644 --- a/ballista/rust/core/src/serde/scheduler/to_proto.rs +++ b/ballista/rust/core/src/serde/scheduler/to_proto.rs @@ -55,6 +55,7 @@ impl TryInto for ExecutePartition { } } +#[allow(clippy::from_over_into)] impl Into for PartitionId { fn into(self) -> protobuf::PartitionId { protobuf::PartitionId { @@ -77,6 +78,7 @@ impl TryInto for PartitionLocation { } } +#[allow(clippy::from_over_into)] impl Into for PartitionStats { fn into(self) -> protobuf::PartitionStats { let none_value = -1_i64;