Skip to content

Commit 1c9f892

Browse files
committed
Using spark conf instead of System.getProperties.
1 parent 039d063 commit 1c9f892

2 files changed

Lines changed: 6 additions & 7 deletions

File tree

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,15 +28,16 @@ import scala.collection.immutable.SortedMap
2828
import scala.collection.mutable
2929
import scala.util.control.NonFatal
3030

31+
import org.apache.spark.SparkEnv
3132
import org.apache.spark.internal.Logging
3233
import org.apache.spark.util.ShutdownHookManager
3334

3435
private[kafka010] object CachedKafkaProducer extends Logging {
3536

3637
private type Producer = KafkaProducer[Array[Byte], Array[Byte]]
3738

38-
private val cacheExpireTimeout: Long =
39-
System.getProperty("spark.kafka.guava.cache.timeout.minutes", "10").toLong
39+
private lazy val cacheExpireTimeout: Long =
40+
SparkEnv.get.conf.getTimeAsMs("spark.kafka.producer.cache.timeout", "10m")
4041

4142
private val removalListener = new RemovalListener[String, Producer]() {
4243
override def onRemoval(notification: RemovalNotification[String, Producer]): Unit = {
@@ -47,9 +48,8 @@ private[kafka010] object CachedKafkaProducer extends Logging {
4748
}
4849
}
4950

50-
private val guavaCache: Cache[String, Producer] = CacheBuilder.newBuilder()
51-
.recordStats()
52-
.expireAfterAccess(cacheExpireTimeout, TimeUnit.MINUTES)
51+
private lazy val guavaCache: Cache[String, Producer] = CacheBuilder.newBuilder()
52+
.expireAfterAccess(cacheExpireTimeout, TimeUnit.MILLISECONDS)
5353
.removalListener(removalListener)
5454
.build[String, Producer]()
5555

external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CanonicalizeKafkaParamsSuite.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,10 @@ package org.apache.spark.sql.kafka010
2020
import java.{util => ju}
2121

2222
import org.apache.kafka.common.serialization.ByteArraySerializer
23-
import org.scalatest.PrivateMethodTester
2423

2524
import org.apache.spark.sql.test.SharedSQLContext
2625

27-
class CanonicalizeKafkaParamsSuite extends SharedSQLContext with PrivateMethodTester {
26+
class CanonicalizeKafkaParamsSuite extends SharedSQLContext {
2827

2928
test("Same unique id is returned for same set of kafka parameters") {
3029
CanonicalizeKafkaParams.clear()

0 commit comments

Comments
 (0)