Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 51 additions & 6 deletions docs/source/user-guide/tuning.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,52 @@ Comet provides some tuning options to help you get the best performance from you

## Memory Tuning

Comet shares an off-heap memory pool between Spark and Comet. This requires setting `spark.memory.offHeap.enabled=true`.
If this setting is not enabled, Comet will not accelerate queries and will fall back to Spark.
### Unified Memory Management with Off-Heap Memory

The recommended way to share memory between Spark and Comet is to set `spark.memory.offHeap.enabled=true`. This allows
Comet to share an off-heap memory pool with Spark. The size of the pool is specified by `spark.memory.offHeap.size`.

### Dedicated Comet Memory Pools

If the `spark.memory.offHeap.enabled` setting is not enabled then Comet will use its own dedicated memory pools that
are not shared with Spark. This requires additional configuration settings to be specified to set the size and type of
memory pool to use.

The size of the pool can be set explicitly with `spark.comet.memoryOverhead`. If this setting is not specified then
the memory overhead will be calculated by multiplying the executor memory by `spark.comet.memory.overhead.factor`
(defaults to `0.2`).

The type of pool can be specified with `spark.comet.exec.memoryPool`. The default setting is `greedy_task_shared`.

The valid pool types are:

- `greedy`
- `greedy_global`
- `greedy_task_shared`
- `fair_spill`
- `fair_spill_global`
- `fair_spill_task_shared`

Pool types ending with `_global` use a single global memory pool between all tasks.

Pool types ending with `_task_shared` share a single memory pool across all attempts for a single task.

Other pool types create a dedicated pool per task using a fraction of the available pool size based on number of cores
and cores per task.

The `greedy*` pool types use DataFusion's [GreedyMemoryPool], which implements a greedy first-come first-serve limit. This
pool works well for queries that do not need to spill or have a single spillable operator.

The `fair_spill*` pool types use DataFusion's [FairSpillPool], which prevents spillable reservations from using more
than an even fraction of the available memory sans any unspillable reservations
(i.e. `(pool_size - unspillable_memory) / num_spillable_reservations)`). This pool works best when you know beforehand
the query has multiple spillable operators that will likely all need to spill. Sometimes it will cause spills even
when there was sufficient memory (reserved for other operators) to avoid doing so. Unspillable memory is allocated in
a first-come, first-serve fashion

[GreedyMemoryPool]: https://docs.rs/datafusion/latest/datafusion/execution/memory_pool/struct.GreedyMemoryPool.html
[FairSpillPool]: https://docs.rs/datafusion/latest/datafusion/execution/memory_pool/struct.FairSpillPool.html

Each executor will have a single memory pool which will be shared by all native plans being executed within that
process, and by Spark itself. The size of the pool is specified by `spark.memory.offHeap.size`.

### Determining How Much Memory to Allocate

Expand Down Expand Up @@ -106,15 +147,19 @@ then any shuffle operations that cannot be supported in this mode will fall back
### Shuffle Compression

By default, Spark compresses shuffle files using LZ4 compression. Comet overrides this behavior with ZSTD compression.
Compression can be disabled by setting `spark.shuffle.compress=false`, which may result in faster shuffle times in
Compression can be disabled by setting `spark.shuffle.compress=false`, which may result in faster shuffle times in
certain environments, such as single-node setups with fast NVMe drives, at the expense of increased disk space usage.

## Explain Plan

### Extended Explain

With Spark 4.0.0 and newer, Comet can provide extended explain plan information in the Spark UI. Currently this lists
reasons why Comet may not have been enabled for specific operations.
To enable this, in the Spark configuration, set the following:

```shell
-c spark.sql.extendedExplainProviders=org.apache.comet.ExtendedExplainInfo
```
This will add a section to the detailed plan displayed in the Spark SQL UI page.

This will add a section to the detailed plan displayed in the Spark SQL UI page.
7 changes: 2 additions & 5 deletions native/core/src/execution/jni_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,6 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan(
memory_pool_type: jstring,
memory_limit: jlong,
memory_limit_per_task: jlong,
memory_fraction: jdouble,
task_attempt_id: jlong,
debug_native: jboolean,
explain_native: jboolean,
Expand Down Expand Up @@ -208,7 +207,6 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan(
memory_pool_type,
memory_limit,
memory_limit_per_task,
memory_fraction,
)?;
let memory_pool =
create_memory_pool(&memory_pool_config, task_memory_manager, task_attempt_id);
Expand Down Expand Up @@ -281,14 +279,13 @@ fn parse_memory_pool_config(
memory_pool_type: String,
memory_limit: i64,
memory_limit_per_task: i64,
memory_fraction: f64,
) -> CometResult<MemoryPoolConfig> {
let memory_pool_config = if use_unified_memory_manager {
MemoryPoolConfig::new(MemoryPoolType::Unified, 0)
} else {
// Use the memory pool from DF
let pool_size = (memory_limit as f64 * memory_fraction) as usize;
let pool_size_per_task = (memory_limit_per_task as f64 * memory_fraction) as usize;
Comment on lines -290 to -291
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The memory_fraction is already applied in JVM side.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If memory_fraction is not used here, we can remove the configuration option COMET_EXEC_MEMORY_FRACTION (spark.comet.exec.memoryFraction) since it is not used in the code base.

My understanding is that spark.comet.exec.memoryFraction is still useful. As per the documentation, it makes DataFusion not using up all the reserved native memory, since some of the allocations are untracked, and this is also a compensation of inaccurate memory usage estimations.

let pool_size = memory_limit as usize;
let pool_size_per_task = memory_limit_per_task as usize;
match memory_pool_type.as_str() {
"fair_spill_task_shared" => {
MemoryPoolConfig::new(MemoryPoolType::FairSpillTaskShared, pool_size_per_task)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public static CometShuffleMemoryAllocatorTrait getInstance(
(boolean)
CometConf$.MODULE$.COMET_COLUMNAR_SHUFFLE_UNIFIED_MEMORY_ALLOCATOR_IN_TEST().get();

if (isSparkTesting && !useUnifiedMemAllocator) {
if (!useUnifiedMemAllocator) {
synchronized (CometShuffleMemoryAllocator.class) {
if (INSTANCE == null) {
// CometTestShuffleMemoryAllocator handles pages by itself so it can be a singleton.
Expand Down
12 changes: 8 additions & 4 deletions spark/src/main/scala/org/apache/comet/CometExecIterator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@
package org.apache.comet

import org.apache.spark._
import org.apache.spark.internal.Logging
import org.apache.spark.sql.comet.CometMetricNode
import org.apache.spark.sql.vectorized._

import org.apache.comet.CometConf.{COMET_BATCH_SIZE, COMET_BLOCKING_THREADS, COMET_DEBUG_ENABLED, COMET_EXEC_MEMORY_FRACTION, COMET_EXEC_MEMORY_POOL_TYPE, COMET_EXPLAIN_NATIVE_ENABLED, COMET_WORKER_THREADS}
import org.apache.comet.CometConf.{COMET_BATCH_SIZE, COMET_BLOCKING_THREADS, COMET_DEBUG_ENABLED, COMET_EXEC_MEMORY_POOL_TYPE, COMET_EXPLAIN_NATIVE_ENABLED, COMET_WORKER_THREADS}
import org.apache.comet.vector.NativeUtil

/**
Expand Down Expand Up @@ -52,7 +53,8 @@ class CometExecIterator(
nativeMetrics: CometMetricNode,
numParts: Int,
partitionIndex: Int)
extends Iterator[ColumnarBatch] {
extends Iterator[ColumnarBatch]
with Logging {

private val nativeLib = new Native()
private val nativeUtil = new NativeUtil()
Expand All @@ -75,7 +77,6 @@ class CometExecIterator(
memory_pool_type = COMET_EXEC_MEMORY_POOL_TYPE.get(),
memory_limit = CometSparkSessionExtensions.getCometMemoryOverhead(conf),
memory_limit_per_task = getMemoryLimitPerTask(conf),
memory_fraction = COMET_EXEC_MEMORY_FRACTION.get(),
task_attempt_id = TaskContext.get().taskAttemptId,
debug = COMET_DEBUG_ENABLED.get(),
explain = COMET_EXPLAIN_NATIVE_ENABLED.get(),
Expand All @@ -93,7 +94,10 @@ class CometExecIterator(
val coresPerTask = conf.get("spark.task.cpus", "1").toFloat
// example 16GB maxMemory * 16 cores with 4 cores per task results
// in memory_limit_per_task = 16 GB * 4 / 16 = 16 GB / 4 = 4GB
(maxMemory.toFloat * coresPerTask / numCores).toLong
val limit = (maxMemory.toFloat * coresPerTask / numCores).toLong
logInfo(
s"Calculated per-task memory limit of $limit ($maxMemory * $coresPerTask / $numCores)")
limit
}

private def numDriverOrExecutorCores(conf: SparkConf): Int = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ import org.apache.spark.sql.types.{DoubleType, FloatType}

import org.apache.comet.CometConf._
import org.apache.comet.CometExplainInfo.getActualPlan
import org.apache.comet.CometSparkSessionExtensions.{createMessage, getCometBroadcastNotEnabledReason, getCometShuffleNotEnabledReason, isANSIEnabled, isCometBroadCastForceEnabled, isCometEnabled, isCometExecEnabled, isCometJVMShuffleMode, isCometNativeShuffleMode, isCometScan, isCometScanEnabled, isCometShuffleEnabled, isOffHeapEnabled, isSpark34Plus, isSpark40Plus, isTesting, shouldApplySparkToColumnar, withInfo, withInfos}
import org.apache.comet.CometSparkSessionExtensions.{createMessage, getCometBroadcastNotEnabledReason, getCometShuffleNotEnabledReason, isANSIEnabled, isCometBroadCastForceEnabled, isCometEnabled, isCometExecEnabled, isCometJVMShuffleMode, isCometNativeShuffleMode, isCometScan, isCometScanEnabled, isCometShuffleEnabled, isSpark34Plus, isSpark40Plus, shouldApplySparkToColumnar, withInfo, withInfos}
import org.apache.comet.parquet.{CometParquetScan, SupportsComet}
import org.apache.comet.rules.RewriteJoin
import org.apache.comet.serde.OperatorOuterClass.Operator
Expand Down Expand Up @@ -919,14 +919,6 @@ class CometSparkSessionExtensions
}

override def apply(plan: SparkPlan): SparkPlan = {

// Comet required off-heap memory to be enabled
if (!isOffHeapEnabled(conf) && !isTesting) {
logWarning("Comet native exec disabled because spark.memory.offHeap.enabled=false")
withInfo(plan, "Comet native exec disabled because spark.memory.offHeap.enabled=false")
return plan
}

// DataFusion doesn't have ANSI mode. For now we just disable CometExec if ANSI mode is
// enabled.
if (isANSIEnabled(conf)) {
Expand Down Expand Up @@ -1187,8 +1179,7 @@ object CometSparkSessionExtensions extends Logging {
// 2. `spark.shuffle.manager` is set to `CometShuffleManager`
// 3. Off-heap memory is enabled || Spark/Comet unit testing
private[comet] def isCometShuffleEnabled(conf: SQLConf): Boolean =
COMET_EXEC_SHUFFLE_ENABLED.get(conf) && isCometShuffleManagerEnabled(conf) &&
(isOffHeapEnabled(conf) || isTesting)
COMET_EXEC_SHUFFLE_ENABLED.get(conf) && isCometShuffleManagerEnabled(conf)

private[comet] def getCometShuffleNotEnabledReason(conf: SQLConf): Option[String] = {
if (!COMET_EXEC_SHUFFLE_ENABLED.get(conf)) {
Expand Down
1 change: 0 additions & 1 deletion spark/src/main/scala/org/apache/comet/Native.scala
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ class Native extends NativeBase {
memory_pool_type: String,
memory_limit: Long,
memory_limit_per_task: Long,
memory_fraction: Double,
task_attempt_id: Long,
debug: Boolean,
explain: Boolean,
Expand Down
Loading