Skip to content

Commit 7f68c4f

Browse files
committed
Support greedy_task_shared
1 parent 33f0424 commit 7f68c4f

File tree

3 files changed

+22
-8
lines changed

3 files changed

+22
-8
lines changed

common/src/main/scala/org/apache/comet/CometConf.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -434,8 +434,9 @@ object CometConf extends ShimCometConf {
434434
val COMET_EXEC_MEMORY_POOL_TYPE: ConfigEntry[String] = conf("spark.comet.exec.memoryPool")
435435
.doc(
436436
"The type of memory pool to be used for Comet native execution. " +
437-
"Available memory pool types are 'greedy', 'fair_spill', 'fair_spill_task_shared', " +
438-
"'greedy_global' and 'fair_spill_global', By default, this config is 'greedy'.")
437+
"Available memory pool types are 'greedy', 'fair_spill', 'greedy_task_shared', " +
438+
"'fair_spill_task_shared', 'greedy_global' and 'fair_spill_global', By default, " +
439+
"this config is 'greedy'.")
439440
.stringConf
440441
.createWithDefault("greedy")
441442

docs/source/user-guide/configs.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ Comet provides the following configuration settings.
4949
| spark.comet.exec.hashJoin.enabled | Whether to enable hashJoin by default. | true |
5050
| spark.comet.exec.localLimit.enabled | Whether to enable localLimit by default. | true |
5151
| spark.comet.exec.memoryFraction | The fraction of memory from Comet memory overhead that the native memory manager can use for execution. The purpose of this config is to set aside memory for untracked data structures, as well as imprecise size estimation during memory acquisition. Default value is 0.7. | 0.7 |
52-
| spark.comet.exec.memoryPool | The type of memory pool to be used for Comet native execution. Available memory pool types are 'greedy', 'fair_spill', 'fair_spill_task_shared', 'greedy_global' and 'fair_spill_global', By default, this config is 'greedy'. | greedy |
52+
| spark.comet.exec.memoryPool | The type of memory pool to be used for Comet native execution. Available memory pool types are 'greedy', 'fair_spill', 'greedy_task_shared', 'fair_spill_task_shared', 'greedy_global' and 'fair_spill_global', By default, this config is 'greedy'. | greedy |
5353
| spark.comet.exec.project.enabled | Whether to enable project by default. | true |
5454
| spark.comet.exec.shuffle.codec | The codec of Comet native shuffle used to compress shuffle data. Only zstd is supported. | zstd |
5555
| spark.comet.exec.shuffle.enabled | Whether to enable Comet native shuffle. Note that this requires setting 'spark.shuffle.manager' to 'org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager'. 'spark.shuffle.manager' must be set before starting the Spark application and cannot be changed during the application. | true |

native/core/src/execution/jni_api.rs

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ enum MemoryPoolType {
103103
Unified,
104104
Greedy,
105105
FairSpill,
106+
GreedyTaskShared,
106107
FairSpillTaskShared,
107108
GreedyGlobal,
108109
FairSpillGlobal,
@@ -317,6 +318,9 @@ fn parse_memory_pool_config(conf: &HashMap<String, String>) -> CometResult<Memor
317318
"fair_spill_task_shared" => {
318319
MemoryPoolConfig::new(MemoryPoolType::FairSpillTaskShared, pool_size_per_task?)
319320
}
321+
"greedy_task_shared" => {
322+
MemoryPoolConfig::new(MemoryPoolType::GreedyTaskShared, pool_size_per_task?)
323+
}
320324
"fair_spill_global" => {
321325
MemoryPoolConfig::new(MemoryPoolType::FairSpillGlobal, pool_size)
322326
}
@@ -378,14 +382,23 @@ fn create_memory_pool(
378382
});
379383
Some(Arc::clone(memory_pool))
380384
}
381-
MemoryPoolType::FairSpillTaskShared => {
385+
MemoryPoolType::GreedyTaskShared | MemoryPoolType::FairSpillTaskShared => {
382386
let mut memory_pool_map = TASK_SHARED_MEMORY_POOLS.lock().unwrap();
383387
let per_task_memory_pool =
384388
memory_pool_map.entry(task_attempt_id).or_insert_with(|| {
385-
PerTaskMemoryPool::new(Arc::new(TrackConsumersPool::new(
386-
FairSpillPool::new(memory_pool_config.pool_size),
387-
NonZeroUsize::new(10).unwrap(),
388-
)))
389+
let pool: Arc<dyn MemoryPool> =
390+
if memory_pool_config.pool_type == MemoryPoolType::GreedyTaskShared {
391+
Arc::new(TrackConsumersPool::new(
392+
GreedyMemoryPool::new(memory_pool_config.pool_size),
393+
NonZeroUsize::new(10).unwrap(),
394+
))
395+
} else {
396+
Arc::new(TrackConsumersPool::new(
397+
FairSpillPool::new(memory_pool_config.pool_size),
398+
NonZeroUsize::new(10).unwrap(),
399+
))
400+
};
401+
PerTaskMemoryPool::new(pool)
389402
});
390403
per_task_memory_pool.num_plans += 1;
391404
Some(Arc::clone(&per_task_memory_pool.memory_pool))

0 commit comments

Comments
 (0)