diff --git a/ballista/rust/core/src/execution_plans/shuffle_reader.rs b/ballista/rust/core/src/execution_plans/shuffle_reader.rs index ce44be2142d5a..7505f24118f43 100644 --- a/ballista/rust/core/src/execution_plans/shuffle_reader.rs +++ b/ballista/rust/core/src/execution_plans/shuffle_reader.rs @@ -201,7 +201,7 @@ fn stats_for_partitions( async fn fetch_partition( location: &PartitionLocation, -) -> Result>> { +) -> Result>> { let metadata = &location.executor_meta; let partition_id = &location.partition_id; let mut ballista_client = diff --git a/ballista/rust/core/src/utils.rs b/ballista/rust/core/src/utils.rs index b612ddb1798fb..560d459977ddd 100644 --- a/ballista/rust/core/src/utils.rs +++ b/ballista/rust/core/src/utils.rs @@ -59,7 +59,7 @@ use futures::{Stream, StreamExt}; /// Stream data to disk in Arrow IPC format pub async fn write_stream_to_disk( - stream: &mut Pin>, + stream: &mut Pin>, path: &str, disk_write_metric: &metrics::Time, ) -> Result { @@ -98,7 +98,7 @@ pub async fn write_stream_to_disk( } pub async fn collect_stream( - stream: &mut Pin>, + stream: &mut Pin>, ) -> Result> { let mut batches = vec![]; while let Some(batch) = stream.next().await { @@ -310,13 +310,13 @@ impl QueryPlanner for BallistaQueryPlanner { } pub struct WrappedStream { - stream: Pin> + Send + Sync>>, + stream: Pin> + Send>>, schema: SchemaRef, } impl WrappedStream { pub fn new( - stream: Pin> + Send + Sync>>, + stream: Pin> + Send>>, schema: SchemaRef, ) -> Self { Self { stream, schema } diff --git a/datafusion/src/physical_plan/mod.rs b/datafusion/src/physical_plan/mod.rs index e511df1dee90f..f2bf23bc630ad 100644 --- a/datafusion/src/physical_plan/mod.rs +++ b/datafusion/src/physical_plan/mod.rs @@ -51,7 +51,7 @@ pub trait RecordBatchStream: Stream> { } /// Trait for a stream of record batches. -pub type SendableRecordBatchStream = Pin>; +pub type SendableRecordBatchStream = Pin>; /// EmptyRecordBatchStream can be used to create a RecordBatchStream /// that will produce no results