Skip to content

Commit a933093

Browse files
committed
[SPARK-21869][SS] Apply Apache Commons Pool to Kafka producer
1 parent 3bf43fb commit a933093

12 files changed

Lines changed: 551 additions & 314 deletions

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala

Lines changed: 50 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -18,111 +18,97 @@
1818
package org.apache.spark.sql.kafka010
1919

2020
import java.{util => ju}
21-
import java.util.concurrent.{ConcurrentMap, ExecutionException, TimeUnit}
21+
import java.io.Closeable
22+
import java.util.concurrent.ExecutionException
2223

23-
import com.google.common.cache._
24-
import com.google.common.util.concurrent.{ExecutionError, UncheckedExecutionException}
25-
import org.apache.kafka.clients.producer.KafkaProducer
2624
import scala.collection.JavaConverters._
2725
import scala.util.control.NonFatal
2826

27+
import com.google.common.util.concurrent.{ExecutionError, UncheckedExecutionException}
28+
import org.apache.kafka.clients.producer.{Callback, KafkaProducer, ProducerRecord}
29+
2930
import org.apache.spark.SparkEnv
3031
import org.apache.spark.internal.Logging
3132
import org.apache.spark.kafka010.{KafkaConfigUpdater, KafkaRedactionUtil}
33+
import org.apache.spark.sql.kafka010.InternalKafkaProducerPool._
34+
import org.apache.spark.util.ShutdownHookManager
3235

33-
private[kafka010] object CachedKafkaProducer extends Logging {
36+
private[kafka010] class CachedKafkaProducer(val kafkaParams: ju.Map[String, Object])
37+
extends Closeable with Logging {
3438

3539
private type Producer = KafkaProducer[Array[Byte], Array[Byte]]
3640

37-
private val defaultCacheExpireTimeout = TimeUnit.MINUTES.toMillis(10)
38-
39-
private lazy val cacheExpireTimeout: Long = Option(SparkEnv.get)
40-
.map(_.conf.get(PRODUCER_CACHE_TIMEOUT))
41-
.getOrElse(defaultCacheExpireTimeout)
41+
private val producer = createProducer()
4242

43-
private val cacheLoader = new CacheLoader[Seq[(String, Object)], Producer] {
44-
override def load(config: Seq[(String, Object)]): Producer = {
45-
createKafkaProducer(config)
43+
private def createProducer(): Producer = {
44+
val producer: Producer = new Producer(kafkaParams)
45+
if (log.isDebugEnabled()) {
46+
val redactedParamsSeq = KafkaRedactionUtil.redactParams(toCacheKey(kafkaParams))
47+
logDebug(s"Created a new instance of kafka producer for $redactedParamsSeq.")
4648
}
49+
producer
4750
}
4851

49-
private val removalListener = new RemovalListener[Seq[(String, Object)], Producer]() {
50-
override def onRemoval(
51-
notification: RemovalNotification[Seq[(String, Object)], Producer]): Unit = {
52-
val paramsSeq: Seq[(String, Object)] = notification.getKey
53-
val producer: Producer = notification.getValue
54-
if (log.isDebugEnabled()) {
55-
val redactedParamsSeq = KafkaRedactionUtil.redactParams(paramsSeq)
56-
logDebug(s"Evicting kafka producer $producer params: $redactedParamsSeq, " +
57-
s"due to ${notification.getCause}")
52+
override def close(): Unit = {
53+
try {
54+
if (log.isInfoEnabled()) {
55+
val redactedParamsSeq = KafkaRedactionUtil.redactParams(toCacheKey(kafkaParams))
56+
logInfo(s"Closing the KafkaProducer with params: ${redactedParamsSeq.mkString("\n")}.")
5857
}
59-
close(paramsSeq, producer)
58+
producer.close()
59+
} catch {
60+
case NonFatal(e) => logWarning("Error while closing kafka producer.", e)
6061
}
6162
}
6263

63-
private lazy val guavaCache: LoadingCache[Seq[(String, Object)], Producer] =
64-
CacheBuilder.newBuilder().expireAfterAccess(cacheExpireTimeout, TimeUnit.MILLISECONDS)
65-
.removalListener(removalListener)
66-
.build[Seq[(String, Object)], Producer](cacheLoader)
64+
def send(record: ProducerRecord[Array[Byte], Array[Byte]], callback: Callback): Unit = {
65+
producer.send(record, callback)
66+
}
6767

68-
private def createKafkaProducer(paramsSeq: Seq[(String, Object)]): Producer = {
69-
val kafkaProducer: Producer = new Producer(paramsSeq.toMap.asJava)
70-
if (log.isDebugEnabled()) {
71-
val redactedParamsSeq = KafkaRedactionUtil.redactParams(paramsSeq)
72-
logDebug(s"Created a new instance of KafkaProducer for $redactedParamsSeq.")
68+
def flush(): Unit = {
69+
producer.flush()
70+
}
71+
}
72+
73+
private[kafka010] object CachedKafkaProducer extends Logging {
74+
75+
private val sparkConf = SparkEnv.get.conf
76+
private val producerPool = new InternalKafkaProducerPool(sparkConf)
77+
78+
ShutdownHookManager.addShutdownHook { () =>
79+
try {
80+
producerPool.close()
81+
} catch {
82+
case e: Throwable =>
83+
logWarning("Ignoring exception while shutting down pool from shutdown hook", e)
7384
}
74-
kafkaProducer
7585
}
7686

7787
/**
7888
* Get a cached KafkaProducer for a given configuration. If matching KafkaProducer doesn't
7989
* exist, a new KafkaProducer will be created. KafkaProducer is thread safe, it is best to keep
8090
* one instance per specified kafkaParams.
8191
*/
82-
private[kafka010] def getOrCreate(kafkaParams: ju.Map[String, Object]): Producer = {
83-
val updatedKafkaProducerConfiguration =
92+
def acquire(kafkaParams: ju.Map[String, Object]): CachedKafkaProducer = {
93+
val updatedKafkaParams =
8494
KafkaConfigUpdater("executor", kafkaParams.asScala.toMap)
8595
.setAuthenticationConfigIfNeeded()
8696
.build()
87-
val paramsSeq: Seq[(String, Object)] = paramsToSeq(updatedKafkaProducerConfiguration)
97+
val key = toCacheKey(updatedKafkaParams)
8898
try {
89-
guavaCache.get(paramsSeq)
99+
producerPool.borrowObject(key, updatedKafkaParams)
90100
} catch {
91101
case e @ (_: ExecutionException | _: UncheckedExecutionException | _: ExecutionError)
92102
if e.getCause != null =>
93103
throw e.getCause
94104
}
95105
}
96106

97-
private def paramsToSeq(kafkaParams: ju.Map[String, Object]): Seq[(String, Object)] = {
98-
val paramsSeq: Seq[(String, Object)] = kafkaParams.asScala.toSeq.sortBy(x => x._1)
99-
paramsSeq
100-
}
101-
102-
/** For explicitly closing kafka producer */
103-
private[kafka010] def close(kafkaParams: ju.Map[String, Object]): Unit = {
104-
val paramsSeq = paramsToSeq(kafkaParams)
105-
guavaCache.invalidate(paramsSeq)
106-
}
107-
108-
/** Auto close on cache evict */
109-
private def close(paramsSeq: Seq[(String, Object)], producer: Producer): Unit = {
110-
try {
111-
if (log.isInfoEnabled()) {
112-
val redactedParamsSeq = KafkaRedactionUtil.redactParams(paramsSeq)
113-
logInfo(s"Closing the KafkaProducer with params: ${redactedParamsSeq.mkString("\n")}.")
114-
}
115-
producer.close()
116-
} catch {
117-
case NonFatal(e) => logWarning("Error while closing kafka producer.", e)
118-
}
107+
def release(producer: CachedKafkaProducer): Unit = {
108+
producerPool.returnObject(producer)
119109
}
120110

121111
private[kafka010] def clear(): Unit = {
122-
logInfo("Cleaning up guava cache.")
123-
guavaCache.invalidateAll()
112+
producerPool.reset()
124113
}
125-
126-
// Intended for testing purpose only.
127-
private def getAsMap: ConcurrentMap[Seq[(String, Object)], Producer] = guavaCache.asMap()
128114
}
Lines changed: 203 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,203 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.kafka010
19+
20+
import java.{util => ju}
21+
import java.util.concurrent.ConcurrentHashMap
22+
23+
import org.apache.commons.pool2.{BaseKeyedPooledObjectFactory, PooledObject, SwallowedExceptionListener}
24+
import org.apache.commons.pool2.impl.{DefaultEvictionPolicy, DefaultPooledObject, GenericKeyedObjectPool, GenericKeyedObjectPoolConfig}
25+
26+
import org.apache.spark.internal.Logging
27+
28+
/**
29+
* Provides object pool for objects which is grouped by a key.
30+
*
31+
* This class leverages [[GenericKeyedObjectPool]] internally, hence providing methods based on
32+
* the class, and same contract applies: after using the borrowed object, you must either call
33+
* returnObject() if the object is healthy to return to pool, or invalidateObject() if the object
34+
* should be destroyed.
35+
*
36+
* The soft capacity of pool is determined by "poolConfig.capacity" config value,
37+
* and the pool will have reasonable default value if the value is not provided.
38+
* (The instance will do its best effort to respect soft capacity but it can exceed when there's
39+
* a borrowing request and there's neither free space nor idle object to clear.)
40+
*
41+
* This class guarantees that no caller will get pooled object once the object is borrowed and
42+
* not yet returned, hence provide thread-safety usage of non-thread-safe objects unless caller
43+
* shares the object to multiple threads.
44+
*/
45+
private[kafka010] abstract class InternalKafkaConnectorPool[K, V](
46+
objectFactory: ObjectFactory[K, V],
47+
poolConfig: PoolConfig[V],
48+
swallowedExceptionListener: SwallowedExceptionListener) extends Logging {
49+
50+
// the class is intended to have only soft capacity
51+
assert(poolConfig.getMaxTotal < 0)
52+
53+
private val pool = {
54+
val internalPool = new GenericKeyedObjectPool[K, V](objectFactory, poolConfig)
55+
internalPool.setSwallowedExceptionListener(swallowedExceptionListener)
56+
internalPool
57+
}
58+
59+
/**
60+
* Borrows object from the pool. If there's no idle object for the key,
61+
* the pool will create the object.
62+
*
63+
* If the pool doesn't have idle object for the key and also exceeds the soft capacity,
64+
* pool will try to clear some of idle objects.
65+
*
66+
* Borrowed object must be returned by either calling returnObject or invalidateObject, otherwise
67+
* the object will be kept in pool as active object.
68+
*/
69+
def borrowObject(key: K, kafkaParams: ju.Map[String, Object]): V = {
70+
updateKafkaParamForKey(key, kafkaParams)
71+
72+
if (size >= poolConfig.softMaxSize) {
73+
logWarning("Pool exceeds its soft max size, cleaning up idle objects...")
74+
pool.clearOldest()
75+
}
76+
77+
pool.borrowObject(key)
78+
}
79+
80+
/** Returns borrowed object to the pool. */
81+
def returnObject(connector: V): Unit = {
82+
pool.returnObject(createKey(connector), connector)
83+
}
84+
85+
/** Invalidates (destroy) borrowed object to the pool. */
86+
def invalidateObject(connector: V): Unit = {
87+
pool.invalidateObject(createKey(connector), connector)
88+
}
89+
90+
/** Invalidates all idle values for the key */
91+
def invalidateKey(key: K): Unit = {
92+
pool.clear(key)
93+
}
94+
95+
/**
96+
* Closes the keyed object pool. Once the pool is closed,
97+
* borrowObject will fail with [[IllegalStateException]], but returnObject and invalidateObject
98+
* will continue to work, with returned objects destroyed on return.
99+
*
100+
* Also destroys idle instances in the pool.
101+
*/
102+
def close(): Unit = {
103+
pool.close()
104+
}
105+
106+
def reset(): Unit = {
107+
// this is the best-effort of clearing up. otherwise we should close the pool and create again
108+
// but we don't want to make it "var" only because of tests.
109+
pool.clear()
110+
}
111+
112+
def numIdle: Int = pool.getNumIdle
113+
114+
def numIdle(key: K): Int = pool.getNumIdle(key)
115+
116+
def numActive: Int = pool.getNumActive
117+
118+
def numActive(key: K): Int = pool.getNumActive(key)
119+
120+
def size: Int = numIdle + numActive
121+
122+
def size(key: K): Int = numIdle(key) + numActive(key)
123+
124+
private def updateKafkaParamForKey(key: K, kafkaParams: ju.Map[String, Object]): Unit = {
125+
// We can assume that kafkaParam should not be different for same cache key,
126+
// otherwise we can't reuse the cached object and cache key should contain kafkaParam.
127+
// So it should be safe to put the key/value pair only when the key doesn't exist.
128+
val oldKafkaParams = objectFactory.keyToKafkaParams.putIfAbsent(key, kafkaParams)
129+
require(oldKafkaParams == null || kafkaParams == oldKafkaParams, "Kafka parameters for same " +
130+
s"cache key should be equal. old parameters: $oldKafkaParams new parameters: $kafkaParams")
131+
}
132+
133+
protected def createKey(connector: V): K
134+
}
135+
136+
private[kafka010] abstract class PoolConfig[V] extends GenericKeyedObjectPoolConfig[V] {
137+
138+
init()
139+
140+
def softMaxSize: Int
141+
142+
def jmxEnabled: Boolean
143+
144+
def minEvictableIdleTimeMillis: Long
145+
146+
def evictorThreadRunIntervalMillis: Long
147+
148+
def jmxNamePrefix: String
149+
150+
def init(): Unit = {
151+
// NOTE: Below lines define the behavior, so do not modify unless you know what you are
152+
// doing, and update the class doc accordingly if necessary when you modify.
153+
154+
// 1. Set min idle objects per key to 0 to avoid creating unnecessary object.
155+
// 2. Set max idle objects per key to 3 but set total objects per key to infinite
156+
// which ensures borrowing per key is not restricted.
157+
// 3. Set max total objects to infinite which ensures all objects are managed in this pool.
158+
setMinIdlePerKey(0)
159+
setMaxIdlePerKey(3)
160+
setMaxTotalPerKey(-1)
161+
setMaxTotal(-1)
162+
163+
// Set minimum evictable idle time which will be referred from evictor thread
164+
setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis)
165+
setSoftMinEvictableIdleTimeMillis(-1)
166+
167+
// evictor thread will run test with ten idle objects
168+
setTimeBetweenEvictionRunsMillis(evictorThreadRunIntervalMillis)
169+
setNumTestsPerEvictionRun(10)
170+
setEvictionPolicy(new DefaultEvictionPolicy[V]())
171+
172+
// Immediately fail on exhausted pool while borrowing
173+
setBlockWhenExhausted(false)
174+
175+
setJmxEnabled(jmxEnabled)
176+
setJmxNamePrefix(jmxNamePrefix)
177+
}
178+
}
179+
180+
private[kafka010] abstract class ObjectFactory[K, V] extends BaseKeyedPooledObjectFactory[K, V] {
181+
val keyToKafkaParams = new ConcurrentHashMap[K, ju.Map[String, Object]]()
182+
183+
override def create(key: K): V = {
184+
Option(keyToKafkaParams.get(key)) match {
185+
case Some(kafkaParams) => createValue(key, kafkaParams)
186+
case None => throw new IllegalStateException("Kafka params should be set before " +
187+
"borrowing object.")
188+
}
189+
}
190+
191+
override def wrap(value: V): PooledObject[V] = {
192+
new DefaultPooledObject[V](value)
193+
}
194+
195+
protected def createValue(key: K, kafkaParams: ju.Map[String, Object]): V
196+
}
197+
198+
private[kafka010] class CustomSwallowedExceptionListener(connectorType: String)
199+
extends SwallowedExceptionListener with Logging {
200+
override def onSwallowException(e: Exception): Unit = {
201+
logError(s"Error closing Kafka $connectorType", e)
202+
}
203+
}

0 commit comments

Comments
 (0)