Skip to content

Commit 9f70576

Browse files
committed
ExternalSortExec v1
1 parent 1c26cd0 commit 9f70576

12 files changed

Lines changed: 435 additions & 309 deletions

File tree

datafusion/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ path = "src/lib.rs"
4040
default = ["crypto_expressions", "regex_expressions", "unicode_expressions"]
4141
simd = ["arrow/simd"]
4242
crypto_expressions = ["md-5", "sha2"]
43-
regex_expressions = ["regex", "lazy_static"]
43+
regex_expressions = ["regex"]
4444
unicode_expressions = ["unicode-segmentation"]
4545
# Used for testing ONLY: causes all values to hash to the same value (test for collisions)
4646
force_hash_collisions = []
@@ -67,7 +67,7 @@ sha2 = { version = "^0.9.1", optional = true }
6767
ordered-float = "2.0"
6868
unicode-segmentation = { version = "^1.7.1", optional = true }
6969
regex = { version = "^1.4.3", optional = true }
70-
lazy_static = { version = "^1.4.0", optional = true }
70+
lazy_static = { version = "^1.4.0"}
7171
smallvec = { version = "1.6", features = ["union"] }
7272
rand = "0.8"
7373
avro-rs = { version = "0.13", features = ["snappy"], optional = true }

datafusion/src/execution/disk_manager.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use std::collections::hash_map::DefaultHasher;
2020
use std::fs;
2121
use std::fs::File;
2222
use std::hash::{Hash, Hasher};
23-
use std::path::{Path, PathBuf};
23+
use std::path::{PathBuf, Path};
2424
use uuid::Uuid;
2525

2626
pub struct DiskManager {
@@ -78,7 +78,7 @@ fn get_file(file_name: &str, local_dirs: &Vec<String>) -> String {
7878
let mut hasher = DefaultHasher::new();
7979
file_name.hash(&mut hasher);
8080
let hash = hasher.finish();
81-
let dir = local_dirs[hash.rem_euclid(local_dirs.len() as u64)];
81+
let dir = &local_dirs[hash.rem_euclid(local_dirs.len() as u64) as usize];
8282
let mut path = PathBuf::new();
8383
path.push(dir);
8484
path.push(file_name);
@@ -88,9 +88,9 @@ fn get_file(file_name: &str, local_dirs: &Vec<String>) -> String {
8888
fn create_tmp_file(local_dirs: &Vec<String>) -> Result<String> {
8989
let name = Uuid::new_v4().to_string();
9090
let mut path = get_file(&*name, local_dirs);
91-
while path.exists() {
91+
while Path::new(path.as_str()).exists() {
9292
path = get_file(&*Uuid::new_v4().to_string(), local_dirs);
9393
}
94-
File::create(&path).map_err(|e| e.into())?;
94+
File::create(&path)?;
9595
Ok(path)
9696
}

datafusion/src/execution/memory_management/memory_pool.rs

Lines changed: 48 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -15,18 +15,19 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use crate::execution::memory_management::{MemoryConsumer, MemoryConsumerId};
19-
use crate::physical_plan::aggregates::return_type;
18+
use crate::execution::memory_management::MemoryConsumer;
2019
use hashbrown::HashMap;
2120
use log::{info, warn};
22-
use std::cmp::{max, min};
23-
use std::sync::{Arc, Condvar, Mutex};
21+
use std::cmp::min;
22+
use std::sync::{Condvar, Mutex, Arc};
23+
use std::fmt::{Debug, Formatter};
24+
use std::fmt;
2425

25-
pub(crate) trait ExecutionMemoryPool {
26+
pub(crate) trait ExecutionMemoryPool: Sync + Send + Debug {
2627
fn memory_available(&self) -> usize;
2728
fn memory_used(&self) -> usize;
2829
fn memory_used_partition(&self, partition_id: usize) -> usize;
29-
fn acquire_memory(&self, required: usize, consumer: &dyn MemoryConsumer) -> usize;
30+
fn acquire_memory(&self, required: usize, consumer: &Arc<dyn MemoryConsumer>) -> usize;
3031
fn update_usage(
3132
&self,
3233
granted_size: usize,
@@ -49,6 +50,14 @@ impl DummyExecutionMemoryPool {
4950
}
5051
}
5152

53+
impl Debug for DummyExecutionMemoryPool {
54+
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
55+
f.debug_struct("DummyExecutionMemoryPool")
56+
.field("total", &self.pool_size)
57+
.finish()
58+
}
59+
}
60+
5261
impl ExecutionMemoryPool for DummyExecutionMemoryPool {
5362
fn memory_available(&self) -> usize {
5463
usize::MAX
@@ -62,7 +71,7 @@ impl ExecutionMemoryPool for DummyExecutionMemoryPool {
6271
0
6372
}
6473

65-
fn acquire_memory(&self, required: usize, _consumer: &dyn MemoryConsumer) -> usize {
74+
fn acquire_memory(&self, required: usize, _consumer: &Arc<dyn MemoryConsumer>) -> usize {
6675
required
6776
}
6877

@@ -98,6 +107,15 @@ impl ConstraintExecutionMemoryPool {
98107
}
99108
}
100109

110+
impl Debug for ConstraintExecutionMemoryPool {
111+
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
112+
f.debug_struct("ConstraintExecutionMemoryPool")
113+
.field("total", &self.pool_size)
114+
.field("used", &self.memory_used())
115+
.finish()
116+
}
117+
}
118+
101119
impl ExecutionMemoryPool for ConstraintExecutionMemoryPool {
102120
fn memory_available(&self) -> usize {
103121
self.pool_size - self.memory_used()
@@ -110,10 +128,13 @@ impl ExecutionMemoryPool for ConstraintExecutionMemoryPool {
110128

111129
fn memory_used_partition(&self, partition_id: usize) -> usize {
112130
let partition_usage = self.memory_usage.lock().unwrap();
113-
partition_usage[partition_id].unwrap_or(0)
131+
match partition_usage.get(&partition_id) {
132+
None => 0,
133+
Some(v) => *v
134+
}
114135
}
115136

116-
fn acquire_memory(&self, required: usize, consumer: &dyn MemoryConsumer) -> usize {
137+
fn acquire_memory(&self, required: usize, consumer: &Arc<dyn MemoryConsumer>) -> usize {
117138
assert!(required > 0);
118139
let partition_id = consumer.partition_id();
119140
let mut partition_usage = self.memory_usage.lock().unwrap();
@@ -138,7 +159,7 @@ impl ExecutionMemoryPool for ConstraintExecutionMemoryPool {
138159
Some(max_available) => min(required, max_available),
139160
};
140161

141-
let total_used = partition_usage.values().sum();
162+
let total_used: usize = partition_usage.values().sum();
142163
let total_available = self.pool_size - total_used;
143164
// Only give it as much memory as is free, which might be none if it reached 1 / num_active_partition
144165
let to_grant = min(max_grant, total_available);
@@ -147,8 +168,8 @@ impl ExecutionMemoryPool for ConstraintExecutionMemoryPool {
147168
// if we can't give it this much now, wait for other tasks to free up memory
148169
// (this happens if older tasks allocated lots of memory before N grew)
149170
if to_grant < required && current_mem + to_grant < min_memory_per_partition {
150-
info!("{} waiting for at least 1/2N of pool to be free", consumer);
151-
self.condvar.wait(&mut partition_usage);
171+
info!("{:?} waiting for at least 1/2N of pool to be free", consumer);
172+
self.condvar.wait(partition_usage);
152173
} else {
153174
*partition_usage.entry(partition_id).or_insert(0) += to_grant;
154175
return to_grant;
@@ -169,20 +190,24 @@ impl ExecutionMemoryPool for ConstraintExecutionMemoryPool {
169190
} else {
170191
let mut partition_usage = self.memory_usage.lock().unwrap();
171192
if granted_size > real_size {
172-
partition_usage.entry(consumer.partition_id()) -=
193+
*partition_usage.entry(consumer.partition_id()).or_insert(0) -=
173194
granted_size - real_size;
174195
} else {
175196
// TODO: this would have caused OOM already if size estimation ahead is much smaller than
176197
// that of actual allocation
177-
partition_usage.entry(consumer.partition_id()) +=
198+
*partition_usage.entry(consumer.partition_id()).or_insert(0) +=
178199
real_size - granted_size;
179200
}
180201
}
181202
}
182203

183204
fn release_memory(&self, release_size: usize, partition_id: usize) {
184205
let mut partition_usage = self.memory_usage.lock().unwrap();
185-
let current_mem = partition_usage[partition_id].unwrap_or(0);
206+
let current_mem = match partition_usage.get(&partition_id) {
207+
None => 0,
208+
Some(v) => *v,
209+
};
210+
186211
let to_free = if current_mem < release_size {
187212
warn!(
188213
"Release called to free {} but partition only holds {} from the pool",
@@ -193,8 +218,9 @@ impl ExecutionMemoryPool for ConstraintExecutionMemoryPool {
193218
release_size
194219
};
195220
if partition_usage.contains_key(&partition_id) {
196-
partition_usage.entry(partition_id) -= to_free;
197-
if partition_usage[partition_id].unwrap() == 0 {
221+
let entry = partition_usage.entry(partition_id).or_insert(0);
222+
*entry -= to_free;
223+
if *entry == 0 {
198224
partition_usage.remove(&partition_id);
199225
}
200226
}
@@ -203,10 +229,12 @@ impl ExecutionMemoryPool for ConstraintExecutionMemoryPool {
203229

204230
fn release_all(&self, partition_id: usize) -> usize {
205231
let mut partition_usage = self.memory_usage.lock().unwrap();
206-
let current_mem = partition_usage[partition_id].unwrap_or(0);
207-
if current_mem == 0 {
208-
return 0;
232+
let mut current_mem = 0;
233+
match partition_usage.get(&partition_id) {
234+
None => return 0,
235+
Some(v) => current_mem = *v,
209236
}
237+
210238
partition_usage.remove(&partition_id);
211239
self.condvar.notify_all();
212240
return current_mem;

datafusion/src/execution/memory_management/mod.rs

Lines changed: 28 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,18 @@
1818
pub mod memory_pool;
1919

2020
use crate::error::DataFusionError::OutOfMemory;
21-
use crate::error::{DataFusionError, Result};
22-
use crate::execution::disk_manager::DiskManager;
21+
use crate::error::Result;
2322
use crate::execution::memory_management::memory_pool::{
2423
ConstraintExecutionMemoryPool, DummyExecutionMemoryPool, ExecutionMemoryPool,
2524
};
2625
use async_trait::async_trait;
2726
use hashbrown::{HashMap, HashSet};
2827
use log::{debug, info};
2928
use std::fmt;
30-
use std::fmt::{Display, Formatter};
29+
use std::fmt::{Display, Formatter, Debug};
3130
use std::sync::atomic::{AtomicUsize, Ordering};
3231
use std::sync::{Arc, Mutex};
32+
use std::borrow::BorrowMut;
3333

3434
static mut CONSUMER_ID: AtomicUsize = AtomicUsize::new(0);
3535

@@ -41,21 +41,21 @@ pub struct MemoryManager {
4141

4242
impl MemoryManager {
4343
pub fn new(exec_pool_size: usize) -> Self {
44-
let pool: dyn ExecutionMemoryPool = if exec_pool_size == usize::MAX {
45-
DummyExecutionMemoryPool::new()
44+
let execution_pool = if exec_pool_size == usize::MAX {
45+
Arc::new(DummyExecutionMemoryPool::new() as dyn ExecutionMemoryPool)
4646
} else {
47-
ConstraintExecutionMemoryPool::new(exec_pool_size)
47+
Arc::new(ConstraintExecutionMemoryPool::new(exec_pool_size))
4848
};
4949
Self {
50-
execution_pool: Arc::new(pool),
50+
execution_pool,
5151
partition_memory_manager: Arc::new(Mutex::new(HashMap::new())),
5252
}
5353
}
5454

5555
pub fn acquire_exec_memory(
5656
self: Arc<Self>,
5757
required: usize,
58-
consumer: &dyn MemoryConsumer,
58+
consumer: Arc<dyn MemoryConsumer>,
5959
) -> Result<usize> {
6060
let partition_id = consumer.partition_id();
6161
let partition_manager = {
@@ -70,7 +70,7 @@ impl MemoryManager {
7070
pub fn acquire_exec_pool_memory(
7171
&self,
7272
required: usize,
73-
consumer: &dyn MemoryConsumer,
73+
consumer: &Arc<dyn MemoryConsumer>,
7474
) -> usize {
7575
self.execution_pool.acquire_memory(required, consumer)
7676
}
@@ -110,7 +110,7 @@ fn next_id() -> usize {
110110
pub struct PartitionMemoryManager {
111111
memory_manager: Arc<MemoryManager>,
112112
partition_id: usize,
113-
consumers: Arc<Mutex<HashSet<dyn MemoryConsumer>>>,
113+
consumers: Arc<Mutex<HashSet<Arc<dyn MemoryConsumer>>>>,
114114
}
115115

116116
impl PartitionMemoryManager {
@@ -125,12 +125,12 @@ impl PartitionMemoryManager {
125125
pub fn acquire_exec_memory(
126126
&mut self,
127127
required: usize,
128-
consumer: &dyn MemoryConsumer,
128+
consumer: Arc<dyn MemoryConsumer>,
129129
) -> Result<usize> {
130-
let mut consumers = self.consumers.lock().unwrap();
130+
let mut consumers = self.consumers.lock().unwrap().borrow_mut();
131131
let mut got = self
132132
.memory_manager
133-
.acquire_exec_pool_memory(required, consumer);
133+
.acquire_exec_pool_memory(required, &consumer);
134134
if got < required {
135135
// spill others first
136136
}
@@ -140,8 +140,7 @@ impl PartitionMemoryManager {
140140
}
141141

142142
if got < required {
143-
return Err(OutOfMemory(format!(
144-
"Unable to acquire {} bytes of memory, got {}",
143+
return Err(OutOfMemory(format!("Unable to acquire {} bytes of memory, got {}",
145144
required, got
146145
)));
147146
}
@@ -162,14 +161,14 @@ impl PartitionMemoryManager {
162161
info!(
163162
"Consumer {} acquired {}",
164163
c.str_repr(),
165-
human_readable_size(cur_used)
164+
human_readable_size(cur_used as usize)
166165
)
167166
}
168167
}
169168
let no_consumer_size = self
170169
.memory_manager
171170
.exec_memory_used_for_partition(self.partition_id)
172-
- used;
171+
- (used as usize);
173172
info!(
174173
"{} bytes of memory were used for partition {} without specific consumer",
175174
human_readable_size(no_consumer_size),
@@ -178,10 +177,10 @@ impl PartitionMemoryManager {
178177
}
179178
}
180179

181-
#[derive(Debug, Clone)]
180+
#[derive(Clone, Debug)]
182181
pub struct MemoryConsumerId {
183-
partition_id: usize,
184-
id: usize,
182+
pub partition_id: usize,
183+
pub id: usize,
185184
}
186185

187186
impl MemoryConsumerId {
@@ -198,20 +197,20 @@ impl Display for MemoryConsumerId {
198197
}
199198

200199
#[async_trait]
201-
pub trait MemoryConsumer {
200+
pub trait MemoryConsumer: Send + Sync + Debug {
202201
/// Display name of the consumer
203202
fn name(&self) -> String;
204203
/// Unique id of the consumer
205204
fn id(&self) -> &MemoryConsumerId;
206205

207206
fn memory_manager(&self) -> Arc<MemoryManager>;
208207
/// partition that the consumer belongs to
209-
fn partition_id(&self) -> uszie {
208+
fn partition_id(&self) -> usize {
210209
self.id().partition_id
211210
}
212211
/// Try allocate `required` bytes as needed
213-
fn allocate(&self, required: usize) -> Result<()> {
214-
let got = self.memory_manager().acquire_exec_memory(required, self)?;
212+
fn allocate(self: Arc<Self>, required: usize) -> Result<()> {
213+
let got = self.memory_manager().acquire_exec_memory(required, self.clone())?;
215214
self.update_used(got as isize);
216215
Ok(())
217216
}
@@ -250,15 +249,15 @@ fn human_readable_size(size: usize) -> String {
250249
let size = size as u64;
251250
let (value, unit) = {
252251
if size >= 2 * TB {
253-
(size as f64 / TB, "TB")
252+
(size as f64 / TB as f64, "TB")
254253
} else if size >= 2 * GB {
255-
(size as f64 / GB, "GB")
254+
(size as f64 / GB as f64, "GB")
256255
} else if size >= 2 * MB {
257-
(size as f64 / MB, "MB")
256+
(size as f64 / MB as f64, "MB")
258257
} else if size >= 2 * KB {
259-
(size as f64 / KB, "KB")
258+
(size as f64 / KB as f64, "KB")
260259
} else {
261-
(size, "B")
260+
(size as f64, "B")
262261
}
263262
};
264263
format!("{:.1} {}", value, unit)

0 commit comments

Comments
 (0)