Skip to content

Commit cf10c0e

Browse files
author
Marcelo Vanzin
committed
SHS-NG M3: Add initial listener implementation, handle scheduler events.
The initial listener is based on the existing JobProgressListener, and tries to mimic its behavior as much as possible. The change also includes some minor code movement so that some types and methods from the initial history provider code can be reused. Note the code here is not 100% correct. This is meant as a building ground for the UI integration in the next milestone. As different parts of the UI are ported, fixes will be made to the different parts of this code to account for the needed behavior. I also added annotations to API types so that Jackson is able to correctly deserialize options, sequences and maps that store primitive types.
1 parent fa9b1a8 commit cf10c0e

10 files changed

Lines changed: 2007 additions & 24 deletions

File tree

common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDB.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ public LevelDB(File path, KVStoreSerializer serializer) throws Exception {
6969
this.types = new ConcurrentHashMap<>();
7070

7171
Options options = new Options();
72-
options.createIfMissing(!path.exists());
72+
options.createIfMissing(true);
7373
this._db = new AtomicReference<>(JniDBFactory.factory.open(path, options));
7474

7575
byte[] versionData = db().get(STORE_VERSION_KEY);

core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ import org.apache.spark.internal.Logging
4343
import org.apache.spark.kvstore._
4444
import org.apache.spark.scheduler._
4545
import org.apache.spark.scheduler.ReplayListenerBus._
46+
import org.apache.spark.status.KVUtils._
4647
import org.apache.spark.status.api.v1
4748
import org.apache.spark.ui.SparkUI
4849
import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
@@ -699,18 +700,6 @@ private[history] object FsHistoryProvider {
699700
private val CURRENT_VERSION = 1L
700701
}
701702

702-
/**
703-
* A KVStoreSerializer that provides Scala types serialization too, and uses the same options as
704-
* the API serializer.
705-
*/
706-
private class KVStoreScalaSerializer extends KVStoreSerializer {
707-
708-
mapper.registerModule(DefaultScalaModule)
709-
mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL)
710-
mapper.setDateFormat(v1.JacksonMessageWriter.makeISODateFormat)
711-
712-
}
713-
714703
case class KVStoreMetadata(
715704
val version: Long,
716705
val logDir: String)

core/src/main/scala/org/apache/spark/deploy/history/config.scala

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,10 @@ package org.apache.spark.deploy.history
1919

2020
import java.util.concurrent.TimeUnit
2121

22-
import scala.annotation.meta.getter
23-
2422
import org.apache.spark.internal.config.ConfigBuilder
25-
import org.apache.spark.kvstore.KVIndex
2623

2724
private[spark] object config {
2825

29-
/** Use this to annotate constructor params to be used as KVStore indices. */
30-
type KVIndexParam = KVIndex @getter
31-
3226
val DEFAULT_LOG_DIR = "file:/tmp/spark-events"
3327

3428
val EVENT_LOG_DIR = ConfigBuilder("spark.history.fs.logDirectory")

0 commit comments

Comments
 (0)