Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -715,6 +715,9 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
def workerFetchHeartbeatEnabled: Boolean = get(WORKER_FETCH_HEARTBEAT_ENABLED)
def workerPartitionSplitEnabled: Boolean = get(WORKER_PARTITION_SPLIT_ENABLED)
def workerActiveConnectionMax: Option[Long] = get(WORKER_ACTIVE_CONNECTION_MAX)
def workerJvmProfilerEnabled: Boolean = get(WORKER_JVM_PROFILER_ENABLED)
def workerJvmProfilerOptions: String = get(WORKER_JVM_PROFILER_OPTIONS)
def workerJvmProfilerLocalDir: String = get(WORKER_JVM_PROFILER_LOCAL_DIR)
def workerJvmQuakeEnabled: Boolean = get(WORKER_JVM_QUAKE_ENABLED)
def workerJvmQuakeCheckInterval: Long = get(WORKER_JVM_QUAKE_CHECK_INTERVAL)
def workerJvmQuakeRuntimeWeight: Double = get(WORKER_JVM_QUAKE_RUNTIME_WEIGHT)
Expand Down Expand Up @@ -3119,6 +3122,31 @@ object CelebornConf extends Logging {
.longConf
.createOptional

val WORKER_JVM_PROFILER_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.worker.jvmProfiler.enabled")
.categories("worker")
.version("0.5.0")
.doc("Turn on code profiling via async_profiler in workers.")
.booleanConf
.createWithDefault(false)

val WORKER_JVM_PROFILER_OPTIONS: ConfigEntry[String] =
buildConf("celeborn.worker.jvmProfiler.options")
.categories("worker")
.version("0.5.0")
.doc("Options to pass on to the async profiler.")
.stringConf
.createWithDefault("event=wall,interval=10ms,alloc=2m,lock=10ms,chunktime=300s")

val WORKER_JVM_PROFILER_LOCAL_DIR: ConfigEntry[String] =
buildConf("celeborn.worker.jvmProfiler.localDir")
.categories("worker")
.version("0.5.0")
.doc("Local file system path on worker where profiler output is saved. "
+ "Defaults to the working directory of the worker process.")
.stringConf
.createWithDefault(".")
Copy link
Member

@pan3793 pan3793 Mar 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if possible, please choose a default value independent of the working dir, the user likely uses the absolute path of the script to start the process under an arbitrary dir, which would cause indeterminate results. one solution is use ${CELEBORN_HOME}/profiling and resolve the ${CELEBORN_HOME} in def workerJvmProfilerLocalDir: String


val WORKER_JVM_QUAKE_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.worker.jvmQuake.enabled")
.categories("worker")
Expand Down
1 change: 1 addition & 0 deletions dev/deps/dependencies-server
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

HikariCP/4.0.3//HikariCP-4.0.3.jar
RoaringBitmap/0.9.32//RoaringBitmap-0.9.32.jar
ap-loader-all/3.0-8//ap-loader-all-3.0-8.jar
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LICENSE/NOTICE are missed

commons-cli/1.5.0//commons-cli-1.5.0.jar
commons-crypto/1.0.0//commons-crypto-1.0.0.jar
commons-io/2.13.0//commons-io-2.13.0.jar
Expand Down
3 changes: 3 additions & 0 deletions docs/configuration/worker.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ license: |
| celeborn.worker.http.host | <localhost> | false | Worker's http host. | 0.4.0 | celeborn.metrics.worker.prometheus.host,celeborn.worker.metrics.prometheus.host |
| celeborn.worker.http.port | 9096 | false | Worker's http port. | 0.4.0 | celeborn.metrics.worker.prometheus.port,celeborn.worker.metrics.prometheus.port |
| celeborn.worker.internal.port | 0 | false | Internal server port on the Worker where the master nodes connect. | 0.5.0 | |
| celeborn.worker.jvmProfiler.enabled | false | false | Turn on code profiling via async_profiler in workers. | 0.5.0 | |
| celeborn.worker.jvmProfiler.localDir | . | false | Local file system path on worker where profiler output is saved. Defaults to the working directory of the worker process. | 0.5.0 | |
| celeborn.worker.jvmProfiler.options | event=wall,interval=10ms,alloc=2m,lock=10ms,chunktime=300s | false | Options to pass on to the async profiler. | 0.5.0 | |
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where can I find all the option candidates and valid combinations? A detailed description or a hyperlink is required.

| celeborn.worker.jvmQuake.check.interval | 1s | false | Interval of gc behavior checking for worker jvm quake. | 0.4.0 | |
| celeborn.worker.jvmQuake.dump.enabled | true | false | Whether to heap dump for the maximum GC 'deficit' during worker jvm quake. | 0.4.0 | |
| celeborn.worker.jvmQuake.dump.path | <tmp>/jvm-quake/dump/<pid> | false | The path of heap dump for the maximum GC 'deficit' during worker jvm quake. | 0.4.0 | |
Expand Down
56 changes: 56 additions & 0 deletions docs/developers/jvmprofiler.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
---
license: |
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
---

# JVM Profiler
Since version 0.5.0, Celeborn supports JVM sampling profiler to capture CPU and memory profiles. This article provides a detailed guide of Celeborn `Worker`'s code profiling.

## Worker Code Profiling
The JVM profiler enables code profiling of workers based on the [async profiler](https://github.com/async-profiler/async-profiler/blob/v2.10/README.md), a low overhead sampling profiler.
This allows a `Worker` instance to capture CPU and memory profiles for `Worker` which is later analyzed for performance issues.
The profiler captures [Java Flight Recorder (jfr)](https://access.redhat.com/documentation/es-es/red_hat_build_of_openjdk/17/html/using_jdk_flight_recorder_with_red_hat_build_of_openjdk/openjdk-flight-recorded-overview) files for each worker that can be read by tools like Java Mission Control and Intellij etc.
The profiler writes the jfr files to the `Worker`'s working directory in the `Worker`'s local file system and the files can grow to be large,
so it is advisable that the `Worker` machines have adequate storage.

Code profiling is currently only supported for

* Linux (x64)
* Linux (arm 64)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

arm64, don't split it out.

* Linux (musl, x64)
* MacOS
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"macOS" - the case is matter, let's respect the official product name https://support.apple.com/macos


To get maximum profiling information set the following jvm options for the `Worker` :

```
-XX:+UnlockDiagnosticVMOptions -XX:+DebugNonSafepoints -XX:+PreserveFramePointer
```

For more information on async_profiler see the [Async Profiler Manual](https://krzysztofslusarski.github.io/2022/12/12/async-manual.html).

To enable code profiling, enable the code profiling in the configuration.

```properties
celeborn.worker.jvmProfiler.enabled true
```

For more configuration of code profiling refer to `celeborn.worker.jvmProfiler.*`.

### Profiling Configuration Example
```properties
celeborn.worker.jvmProfiler.enabled true
celeborn.worker.jvmProfiler.options event=wall,interval=10ms,alloc=2m,lock=10ms,chunktime=300s
```
1 change: 1 addition & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ nav:
- Overview: developers/worker.md
- Storage: developers/storage.md
- Traffic Control: developers/trafficcontrol.md
- JVM Profiler: developers/jvmprofiler.md
- Client:
- Overview: developers/client.md
- LifecycleManager: developers/lifecyclemanager.md
Expand Down
7 changes: 7 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@
<rocksdbjni.version>8.11.3</rocksdbjni.version>
<jackson.version>2.15.3</jackson.version>
<snappy.version>1.1.10.5</snappy.version>
<ap.loader.version>3.0-8</ap.loader.version>

<!-- Db dependencies -->
<mybatis.version>3.5.15</mybatis.version>
Expand Down Expand Up @@ -430,6 +431,12 @@
</exclusion>
</exclusions>
</dependency>
<!-- async-profiler loader contains async_profiler binaries for multiple platforms -->
<dependency>
<groupId>me.bechberger</groupId>
<artifactId>ap-loader-all</artifactId>
<version>${ap.loader.version}</version>
</dependency>

<!-- Db dependencies -->
<dependency>
Expand Down
3 changes: 3 additions & 0 deletions project/CelebornBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ object Dependencies {
val lz4JavaVersion = sparkClientProjects.map(_.lz4JavaVersion).getOrElse("1.8.0")

// Dependent library versions
val apLoaderVersion = "3.0-8"
val commonsCompressVersion = "1.4.1"
val commonsCryptoVersion = "1.0.0"
val commonsIoVersion = "2.13.0"
Expand Down Expand Up @@ -71,6 +72,7 @@ object Dependencies {
val protocVersion = "3.21.7"
val protoVersion = "3.21.7"

val apLoader = "me.bechberger" % "ap-loader-all" % apLoaderVersion
val commonsCompress = "org.apache.commons" % "commons-compress" % commonsCompressVersion
val commonsCrypto = "org.apache.commons" % "commons-crypto" % commonsCryptoVersion excludeAll(
ExclusionRule("net.java.dev.jna", "jna"))
Expand Down Expand Up @@ -495,6 +497,7 @@ object CelebornWorker {
ExclusionRule("org.apache.ratis", "ratis-client")
),
libraryDependencies ++= Seq(
Dependencies.apLoader,
Dependencies.guava,
Dependencies.commonsIo,
Dependencies.ioNetty,
Expand Down
4 changes: 4 additions & 0 deletions worker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@
<groupId>org.rocksdb</groupId>
<artifactId>rocksdbjni</artifactId>
</dependency>
<dependency>
<groupId>me.bechberger</groupId>
<artifactId>ap-loader-all</artifactId>
</dependency>

<dependency>
<groupId>org.apache.celeborn</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import java.util.concurrent._
import java.util.concurrent.atomic.{AtomicBoolean, AtomicIntegerArray}

import scala.collection.JavaConverters._
import scala.util.Random

import com.google.common.annotations.VisibleForTesting
import io.netty.util.HashedWheelTimer
Expand Down Expand Up @@ -57,6 +58,7 @@ import org.apache.celeborn.service.deploy.worker.congestcontrol.CongestionContro
import org.apache.celeborn.service.deploy.worker.memory.{ChannelsLimiter, MemoryManager}
import org.apache.celeborn.service.deploy.worker.memory.MemoryManager.ServingState
import org.apache.celeborn.service.deploy.worker.monitor.JVMQuake
import org.apache.celeborn.service.deploy.worker.profiler.JVMProfiler
import org.apache.celeborn.service.deploy.worker.storage.{PartitionFilesSorter, StorageManager}

private[celeborn] class Worker(
Expand Down Expand Up @@ -326,6 +328,12 @@ private[celeborn] class Worker(
var cleaner: ExecutorService =
ThreadUtils.newDaemonSingleThreadExecutor("worker-expired-shuffle-cleaner")

private var jvmProfiler: JVMProfiler = _
if (conf.workerJvmProfilerEnabled) {
jvmProfiler = new JVMProfiler(conf)
jvmProfiler.start()
}

private var jvmQuake: JVMQuake = _
if (conf.workerJvmQuakeEnabled) {
jvmQuake = JVMQuake.create(conf, workerInfo.toUniqueId().replace(":", "-"))
Expand Down Expand Up @@ -500,6 +508,9 @@ private[celeborn] class Worker(
if (!stopped) {
logInfo("Stopping Worker.")

if (jvmProfiler != null) {
jvmProfiler.stop()
}
if (jvmQuake != null) {
jvmQuake.stop()
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.celeborn.service.deploy.worker.profiler

import java.io.IOException

import one.profiler.{AsyncProfiler, AsyncProfilerLoader}

import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.internal.Logging

/**
* The JVM profiler provides code profiling of worker based on the the async profiler, a low overhead sampling profiler for Java.
* This allows a worker to capture CPU and memory profiles for worker which can later be analyzed for performance issues.
* The profiler captures Java Flight Recorder (jfr) files for each worker read by tools including Java Mission Control and Intellij.
*
* <p> Note: The profiler writes the jfr files to the worker's working directory in the worker's local file system and the files can grow to be large so it is advisable
* that the worker machines have adequate storage.
*
* <p>Note: code copied from Apache Spark.
*
* @param conf Celeborn configuration with jvm profiler config.
*/
class JVMProfiler(conf: CelebornConf) extends Logging {

private var running = false
private val enableProfiler = conf.workerJvmProfilerEnabled
private val profilerOptions = conf.workerJvmProfilerOptions
private val profilerLocalDir = conf.workerJvmProfilerLocalDir

private val startcmd = s"start,$profilerOptions,file=$profilerLocalDir/profile.jfr"
private val stopcmd = s"stop,$profilerOptions,file=$profilerLocalDir/profile.jfr"

val profiler: Option[AsyncProfiler] = {
Option(
if (enableProfiler && AsyncProfilerLoader.isSupported) AsyncProfilerLoader.load() else null)
}

def start(): Unit = {
if (!running) {
try {
profiler.foreach(p => {
p.execute(startcmd)
logInfo("JVM profiling started.")
running = true
})
} catch {
case e @ (_: IllegalArgumentException | _: IllegalStateException | _: IOException) =>
logError("JVM profiling aborted. Exception occurred in profiler native code: ", e)
case e: Exception => logWarning("JVM profiling aborted due to exception: ", e)
}
}
}

/** Stops the profiling and saves output to dfs location. */
def stop(): Unit = {
if (running) {
profiler.foreach(p => {
p.execute(stopcmd)
logInfo("JVM profiler stopped.")
running = false
})
}
}
}