-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-19968][SS] Use a cached instance of KafkaProducer instead of creating one every batch.
#17308
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #74644 has started for PR 17308 at commit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally this should not have been changed. And any implementation of java.util.AbstractMap, has the correct working for hashCode().
Here, they are changed to HashMap, to avoid converting or casting them later. It can actually be just java.util.Map, but then we can not guarantee the outcome of hashCode().
|
Test build #74646 has finished for PR 17308 at commit
|
|
Test build #74658 has finished for PR 17308 at commit
|
|
@tdas ping ! |
|
I can further confirm, that in logs, a kafkaproducer instance is created almost every instant. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just curious if this a good idea to key the producer map by the hash code of a map for which the values are Objects?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is not a good idea to do like that.
I had like my understanding to be corrected, as much as I understood. Since in this particular case Spark does not let user specify a key or value serializer/deserializer. So Object can be either a String, int or Long and for these hashcode would work correctly. I am also contemplating a better way to do it, now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
True, my bad I thought KafkaSink was a public API.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I don't think this is a good key for the hashmap. There could be collisions. We should either assign a unique ID to the sink and thread that through, or come up with some way to canoncicalize the set of parameters that create the sink. The latter might better since you could maybe reuse the same producer for more than one query.
|
Just to throw in my two cents, a change like this is definitely needed, as is made clear by the second sentence of the docs http://kafka.apache.org/0102/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html "The producer is thread safe and sharing a single producer instance across threads will generally be faster than having multiple instances." |
|
Taking a look |
marmbrus
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree this is an important optimization we need to do. I have some concerns about the life cycle of the producer as its implemented here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I don't think this is a good key for the hashmap. There could be collisions. We should either assign a unique ID to the sink and thread that through, or come up with some way to canoncicalize the set of parameters that create the sink. The latter might better since you could maybe reuse the same producer for more than one query.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is only closing the producer on the driver, right? Do we even create on there?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's correct, I have understood, close requires a bit of rethinking, I am unable to see a straight forward way of doing it. If you agree, close related implementation can be taken out from this PR and be taken up in a separate JIRA and PR ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Typically we wrap the arguments rather than the return type
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
uber nit: maybe we can define a type alias
type Producer = KafkaProducer[Array[Byte], Array[Byte]]so that we don't have to write that whole thing over and over
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe put this in the create function instead
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't need the s and $ right?
932d563 to
2a15afe
Compare
|
Test build #76868 has finished for PR 17308 at commit
|
|
Test build #76869 has finished for PR 17308 at commit
|
|
Test build #76872 has finished for PR 17308 at commit
|
…reating one every batch.
ea9592a to
8224596
Compare
|
Test build #76890 has finished for PR 17308 at commit
|
|
SPARK-20737 is created to look into cleanup mechanism in a separate JIRA. |
|
Test build #76932 has finished for PR 17308 at commit
|
…inks on executor shutdown. Add a standard way of cleanup during shutdown of executors for structured streaming sinks in general and KafkaSink in particular.
KafkaProducer instead of creating one every batch.KafkaProducer instead of creating one every batch.
| def close(sc: SparkContext, kafkaParams: ju.Map[String, Object]): Unit = { | ||
| sc.parallelize(1 to 10000).foreachPartition { iter => | ||
| CachedKafkaProducer.close(kafkaParams) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would cause CachedKafkaProducer.close to be executed on each executor. I am thinking of a better way here.
Any help would be appreciated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AFAIK the KafkaSource also faces the same issue of not being able to close consumers. Can we use a guava cache with a (configurable) timeout? I guess that's the safest way to make sure that they'll eventually get closed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using guave cache, we can close if not used for a certain time. Shall we ignore closing them during a shutdown ?
In the particular case of kafka producer, I do not see a direct problem with that. Since we do a producer.flush() on each batch. I was just wondering, with streaming sinks in general - what should be our strategy ?
|
Test build #77293 has finished for PR 17308 at commit
|
| } | ||
|
|
||
| private val guavaCache: Cache[String, Producer] = CacheBuilder.newBuilder() | ||
| .recordStats() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we use the stats?
| import java.{util => ju} | ||
|
|
||
| import org.apache.kafka.common.serialization.ByteArraySerializer | ||
| import org.scalatest.PrivateMethodTester |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we use this import?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ahh, oversight. Thanks !
|
|
||
| private val cacheExpireTimeout: Long = | ||
| System.getProperty("spark.kafka.guava.cache.timeout", "10").toLong | ||
| System.getProperty("spark.kafka.guava.cache.timeout.minutes", "10").toLong |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't we need to get this from SparkEnv by the way? I don't know if the properties get populated properly.
Also, adding minutes to the conf makes it kinda long right? I think we can also replace guava with producer.
I think it may also be better to use this so that we get rid of minutes and users can actually provide arbitrary durations (hours if they want). I think that's what we generally use for duration type confs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, you are right !
|
Test build #77297 has finished for PR 17308 at commit
|
| .build[String, Producer]() | ||
|
|
||
| ShutdownHookManager.addShutdownHook { () => | ||
| clear() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we really need to stop producers in a shutdown hook? I'm asking because stopping a producer is a blocking call and may prevent other shutdown hooks to run.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, this seems complicated. What exactly does shutdown do? Is it just cleaning up thread pools?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it will close connections as well. That's really not necessary since the process is being shut down.
marmbrus
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for working on this. Its a huge latency improvement and I'll be using it next week at Spark Summit.
I just have one suggestion about the cache implementation.
| } | ||
|
|
||
| private def createKafkaProducer( | ||
| producerConfiguration: ju.Map[String, Object]): Producer = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: indent 4 here
| * not exist. This is done to ensure, we have only one set of kafka parameters associated with a | ||
| * unique ID. | ||
| */ | ||
| private[kafka010] object CanonicalizeKafkaParams extends Logging { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems kind of complicated also. Since we know these are always coming from Data[Stream/Frame]Writer and that will always give you Map[String, String] (and we expect the number of options here to be small). Could we just make the key for the cache a sorted Seq[(String, String)] rather than invent another GUID?
KafkaProducer instead of creating one every batch.KafkaProducer instead of creating one every batch.
|
Test build #77357 has finished for PR 17308 at commit
|
|
Jenkins, retest this please ! |
|
Test build #77358 has finished for PR 17308 at commit
|
|
Build is failing due to "Our attempt to download sbt locally to build/sbt-launch-0.13.13.jar failed. Please install sbt manually from http://www.scala-sbt.org/" |
|
Jenkins, retest this please ! |
|
Test build #77410 has finished for PR 17308 at commit
|
|
@marmbrus Thank you for taking a look again. Surely, shut down hook is not ideal for closing kafka producers. In fact, for the case of kafka sink, it might be correct to skip cleanup step. I have tried to address your comments. |
| Option(guavaCache.getIfPresent(paramsSeq)).getOrElse(createKafkaProducer(kafkaParams)) | ||
| } | ||
|
|
||
| def paramsToSeq(kafkaParams: ju.Map[String, Object]): Seq[(String, Object)] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: seems can be private?
|
|
||
| CachedKafkaProducer.close(kafkaParams) | ||
| val map2 = CachedKafkaProducer.invokePrivate(cacheMap()) | ||
| assert(map2.size == 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We just know there is one KP by this assert. Seems we should also verify if we close the correct KP?
|
|
||
| def paramsToSeq(kafkaParams: ju.Map[String, Object]): Seq[(String, Object)] = { | ||
| val paramsSeq: Seq[(String, Object)] = | ||
| kafkaParams.asScala.toSeq.sortBy(x => (x._1, x._2.toString)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: as it is a map, seems we can just sort by x._1?
|
LGTM and few minor comments. |
zsxwing
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest using LoadingCache to simplify the codes. Otherwise, looks good.
| checkForErrors | ||
| producer.close() | ||
| checkForErrors | ||
| producer = null |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: please keep producer = null for double-close
| */ | ||
| private[kafka010] def getOrCreate(kafkaParams: ju.Map[String, Object]): Producer = synchronized { | ||
| val paramsSeq: Seq[(String, Object)] = paramsToSeq(kafkaParams) | ||
| Option(guavaCache.getIfPresent(paramsSeq)).getOrElse(createKafkaProducer(kafkaParams)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove synchronized and also throw inner exception instead after changing to use LoadingCache, such as
private[kafka010] def getOrCreate(kafkaParams: ju.Map[String, Object]): Producer = {
val paramsSeq: Seq[(String, Object)] = paramsToSeq(kafkaParams)
try {
guavaCache.get(paramsSeq)
} catch {
case e @ (_: ExecutionException | _: UncheckedExecutionException | _: ExecutionError)
if e.getCause != null =>
throw e.getCause
}
}
| private lazy val guavaCache: Cache[Seq[(String, Object)], Producer] = CacheBuilder.newBuilder() | ||
| .expireAfterAccess(cacheExpireTimeout, TimeUnit.MILLISECONDS) | ||
| .removalListener(removalListener) | ||
| .build[Seq[(String, Object)], Producer]() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Use build(CacheLoader<? super K1, V1> loader) to use LoadingCache, then getOrCreate will be very simple.
|
Test build #77499 has finished for PR 17308 at commit
|
|
LGTM. Merging to master and 2.2. Thanks! |
… creating one every batch. ## What changes were proposed in this pull request? In summary, cost of recreating a KafkaProducer for writing every batch is high as it starts a lot threads and make connections and then closes them. A KafkaProducer instance is promised to be thread safe in Kafka docs. Reuse of KafkaProducer instance while writing via multiple threads is encouraged. Furthermore, I have performance improvement of 10x in latency, with this patch. ### These are times that addBatch took in ms. Without applying this patch  ### These are times that addBatch took in ms. After applying this patch  ## How was this patch tested? Running distributed benchmarks comparing runs with this patch and without it. Added relevant unit tests. Author: Prashant Sharma <[email protected]> Closes #17308 from ScrapCodes/cached-kafka-producer. (cherry picked from commit 96a4d1d) Signed-off-by: Shixiong Zhu <[email protected]>
What changes were proposed in this pull request?
In summary, cost of recreating a KafkaProducer for writing every batch is high as it starts a lot threads and make connections and then closes them. A KafkaProducer instance is promised to be thread safe in Kafka docs. Reuse of KafkaProducer instance while writing via multiple threads is encouraged.
Furthermore, I have performance improvement of 10x in latency, with this patch.
These are times that addBatch took in ms. Without applying this patch
These are times that addBatch took in ms. After applying this patch
How was this patch tested?
Running distributed benchmarks comparing runs with this patch and without it.
Added relevant unit tests.