Skip to content

Conversation

@gaborgsomogyi
Copy link
Contributor

@gaborgsomogyi gaborgsomogyi commented Apr 6, 2018

What changes were proposed in this pull request?

CachedKafkaConsumer in the project streaming-kafka-0-10 is designed to maintain a pool of KafkaConsumers that can be reused. However, it was built with the assumption there will be only one thread trying to read the same Kafka TopicPartition at the same time. This assumption is not true all the time and this can inadvertently lead to ConcurrentModificationException.

Here is a better way to design this. The consumer pool should be smart enough to avoid concurrent use of a cached consumer. If there is another request for the same TopicPartition as a currently in-use consumer, the pool should automatically return a fresh consumer.

  • There are effectively two kinds of consumer that may be generated
    • Cached consumer - this should be returned to the pool at task end
    • Non-cached consumer - this should be closed at task end
  • A trait called KafkaDataConsumer is introduced to hide this difference from the users of the consumer so that the client code does not have to reason about whether to stop and release. They simply call val consumer = KafkaDataConsumer.acquire and then consumer.release.
  • If there is request for a consumer that is in-use, then a new consumer is generated.
  • If there is request for a consumer which is a task reattempt, then already existing cached consumer will be invalidated and a new consumer is generated. This could fix potential issues if the source of the reattempt is a malfunctioning consumer.
  • In addition, I renamed the CachedKafkaConsumer class to KafkaDataConsumer because is a misnomer given that what it returns may or may not be cached.

How was this patch tested?

A new stress test that verifies it is safe to concurrently get consumers for the same TopicPartition from the consumer pool.

@SparkQA
Copy link

SparkQA commented Apr 6, 2018

Test build #88990 has finished for PR 20997 at commit 0fe456b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gaborgsomogyi
Copy link
Contributor Author

cc @tdas @zsxwing @koeninger

Copy link
Contributor

@vanzin vanzin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the caching code could use some cleanup. In particular, it seems odd to not reuse consumers across attempts, if you're already tracking whether the consumer is in use.

If there's a reason for that, there needs to be a comment in the code with a little more info.

*/
private[kafka010]
class InternalKafkaConsumer[K, V](
val groupId: String,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: follow coding style for multi-line declarations.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leftover from old code, fixing.

val topicPartition: TopicPartition,
val kafkaParams: ju.Map[String, Object]) extends Logging {

require(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given this, is there any advantage in passing the group ID as a parameter?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is mostly vestigial (there used to be a remove method that took a groupId, but no kafkaParams, so there was symmetry).

I don't see a reason it can't be changed to match the SQL version at this point, i.e. assign groupId from the params.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to assign the groupId from the params.

/** Create a KafkaConsumer to fetch records for `topicPartition` */
private def createConsumer: KafkaConsumer[K, V] = {
val c = new KafkaConsumer[K, V](kafkaParams)
val tps = new ju.ArrayList[TopicPartition]()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

val topics = ju.Arrays.asList(topicPartition)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed.

poll(timeout)
}

if (!buffer.hasNext()) { poll(timeout) }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: break into multiple lines.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leftover from old code, fixing.


/**
* A wrapper around Kafka's KafkaConsumer.
* This is not for direct use outside this file.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This generally means the class should be private not private[blah].

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed.

logDebug("Reattempt detected, invalidating cached consumers")
val closedExistingInternalConsumers = new ju.LinkedList[InternalKafkaConsumer[_, _]]()
existingInternalConsumers.asScala.foreach { existingInternalConsumer =>
// Consumer exists in cache. If its in use, mark it for closing later, or close it now.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

newInternalConsumer.inUse = true
existingInternalConsumers.add(newInternalConsumer)
CachedKafkaDataConsumer(newInternalConsumer)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All these blank lines before the closing brace are unnecessary.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed them.

lazy val newInternalConsumer = new InternalKafkaConsumer[K, V](
groupId, topicPartition, kafkaParams)

if (context != null && context.attemptNumber >= 1) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little confused why this needs special treatment.

If this is the first attempt, won't the list just be empty? And then you could execute the same code and it would basically be a no-op?

Or, from a different angle, why can't you reuse the consumers? Isn't the problem just concurrent use? So if the consumer is not in use, it should be fair game, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, the comment in the old code was more clear: "just in case the prior attempt failures were cache related"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is the first attempt then the list can contain elements because other tasks can create other consumers. Imran is right the why was written in KafkaRDD.scala which is missing here. Additional explanation added.

In short if the possible problematic consumer is not removed from the list then it could infect tasks run later.

context: TaskContext,
useCache: Boolean): KafkaDataConsumer[K, V] = synchronized {
val key = new CacheKey(groupId, topicPartition)
val existingInternalConsumers = Option(cache.get(key))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't you just skip messing with the cache if useCache = false?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One can spare 3 pieces of O(1) operations on the other side would make the code less readable.

require(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG),
"groupId used for cache key must match the groupId in kafkaParams")

@volatile private var consumer = createConsumer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that all these variables are read / updated inside synchronized blocks, @volatile is unnecessary and a little misleading.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an example of the cut & paste I was referring to.

In this case, I don't believe consumer is ever reassigned, so it doesn't even need to be a var.

It was reassigned in the SQL version of the code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it's an overkill. Removed volatile and switched to val.

useCache: Boolean): KafkaDataConsumer[K, V] = synchronized {
val key = new CacheKey(groupId, topicPartition)
val existingInternalConsumers = Option(cache.get(key))
.getOrElse(new ju.LinkedList[InternalKafkaConsumer[_, _]])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any particular reason you want to use a java.util.LinkedList? If you instead used a scala collection, eg. scala.collection.mutable.ArrayBuffer, you would avoid the asScalas. Or ListBuffer if you actually want a LinkedList

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's another potential problem with using a linked list instead of a pool that has more smarts.

Let's say I have two consumers on the same topicpartion in the same jvm. Consumer A last read offset 1000 and pre-fetched the next N messages, consumer B last read offset 2000 and pre-fetched the next N messages.

If the client code that was using consumer A last time gets consumer B out of the cache this time, the prefetch is wasted and it will have to seek. All we're saving by caching at that point is connection time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general happy to hear better approach.

@squito The code started with java collections and didn't want to break that. A couple of conversion was the price.

@koeninger True. The last info what I've seen in the SQL area is that this approach used and discussion started how to make it better. Is there an outcome which can be applied to both area? If the situation is different please share with me.

Even if only the connection can be enhanced that could be significant in case of SSL/TLS.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The SQL code, at least last time I looked at it, isn't keeping a linked list pool, it's just making a fresh consumer that's marked for close, right? I actually think that's better as a stopgap solution, because it's less likely to leak in some unforeseen way, and it's consistent across both codebases.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's correct, the SQL part isn't keeping a linked list pool but a single cached consumer. I was considering your suggestion and came to the same conclusion:

Can you clarify why you want to allow only 1 cached consumer per topicpartition, closing any others at task end?

It seems like opening and closing consumers would be less efficient than allowing a pool of more than one consumer per topicpartition.

Though limiting the number of cached consumers per groupId/TopicPartition is a must as you've pointed out. On the other side if we go the SQL way it's definitely less risky. Do you think we should switch back to the one cached consumer approach?

lazy val newInternalConsumer = new InternalKafkaConsumer[K, V](
groupId, topicPartition, kafkaParams)

if (context != null && context.attemptNumber >= 1) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, the comment in the old code was more clear: "just in case the prior attempt failures were cache related"

val consumer = KafkaDataConsumer.acquire[Array[Byte], Array[Byte]](
groupId, topicPartition, kafkaParams.asJava, taskContext, useCache)
try {
val rcvd = 0 until data.length map { offset =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style -- just by convetion, ranges are an exception to the usual rule, they are wrapped with parens (x until y).map

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed.

super.afterAll()
}

test("concurrent use of KafkaDataConsumer") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is good, but it would be nice to have a test which checks that cached consumers are re-used when possible. Eg this could pass just by never caching anything.

Copy link
Contributor Author

@gaborgsomogyi gaborgsomogyi Apr 12, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reuse test will be added.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reuse test added.

@koeninger
Copy link
Contributor

In general, 2 things about this make me uncomfortable:

threadPool.shutdown()
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this PR is intended to fix a problem with silent reading of incorrect data, can you add a test reproducing that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a baad cut and paste issue. This PR intends to solve ConcurrentModificationException.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed from the PR description.

require(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG),
"groupId used for cache key must match the groupId in kafkaParams")

@volatile private var consumer = createConsumer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an example of the cut & paste I was referring to.

In this case, I don't believe consumer is ever reassigned, so it doesn't even need to be a var.

It was reassigned in the SQL version of the code.

val topicPartition: TopicPartition,
val kafkaParams: ju.Map[String, Object]) extends Logging {

require(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is mostly vestigial (there used to be a remove method that took a groupId, but no kafkaParams, so there was symmetry).

I don't see a reason it can't be changed to match the SQL version at this point, i.e. assign groupId from the params.


cache.putIfAbsent(key, existingInternalConsumers)

lazy val newInternalConsumer = new InternalKafkaConsumer[K, V](
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not convinced this lazy val is clearer than simply constructing a consumer when you need it, but that's the way the SQL code is...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it better to copy/paste the same thing every occasion?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy paste a one line static constructor only in the cases a new consumer actually needs to be constructed? Yes, I think that's clearer than a lazy val. But again, probably not worth the difference from the SQL code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, leaving it how it is now.


logDebug("Reattempt detected, new cached consumer will be allocated " +
s"$newInternalConsumer")
newInternalConsumer.inUse = true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For all of these lines that set newInternalConsumer.inUse, is there any way it wouldn't already have been true?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ooooh, nice catch :) Removed them.

// start with a new one.
logDebug("Reattempt detected, invalidating cached consumers")
val closedExistingInternalConsumers = new ju.LinkedList[InternalKafkaConsumer[_, _]]()
existingInternalConsumers.asScala.foreach { existingInternalConsumer =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is invalidating all consumers for a topicpartition on a re-attempt unnecessarily pessimistic? Seems like if you have N consumers for the same topicpartition and they're all involved in a re-attempt, the last one to go through this section is going to invalidate all the fresh consumers the others just made.

I think all you need is to make sure you get a fresh consumer on a re-attempt.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about this and maybe came to the wrong conclusion.

I was thinking that executors are really long living in streaming. If that's true then a problematic consumer would stay there for long time poisoning all the further tasks running there. Giving back a new consumer purely lead to this situation. What have I missed?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem isn't that you're invalidating all N consumers, it's potentially invalidating N + 1 + 2 + ... (N -1), right?

I'm saying I think you can solve this by, if it's a retry, and you would normally grab 1 consumer from the cache, invalidate just that one and make a fresh one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, makes sense. That way the problematic consumers can be thrown away but a bit slower. Being so pessimistic how it's actually implemented ends up in slower execution overall.

I've just seen your comment about having only one consumer in the cache just like in the SQL code. If we go that way this question will not be relevant.

useCache: Boolean): KafkaDataConsumer[K, V] = synchronized {
val key = new CacheKey(groupId, topicPartition)
val existingInternalConsumers = Option(cache.get(key))
.getOrElse(new ju.LinkedList[InternalKafkaConsumer[_, _]])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's another potential problem with using a linked list instead of a pool that has more smarts.

Let's say I have two consumers on the same topicpartion in the same jvm. Consumer A last read offset 1000 and pre-fetched the next N messages, consumer B last read offset 2000 and pre-fetched the next N messages.

If the client code that was using consumer A last time gets consumer B out of the cache this time, the prefetch is wasted and it will have to seek. All we're saving by caching at that point is connection time.

@gaborgsomogyi
Copy link
Contributor Author

@koeninger

I don't see an upper bound on the number of consumers per key, nor a way of reaping idle consumers. If the SQL equivalent code is likely to be modified to use pooling of some kind, seems better to make a consistent decision.

When do you think the decision will happen?

@SparkQA
Copy link

SparkQA commented Apr 12, 2018

Test build #89274 has finished for PR 20997 at commit d776289.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 12, 2018

Test build #89275 has finished for PR 20997 at commit 250ad92.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 13, 2018

Test build #89344 has finished for PR 20997 at commit 215339d.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 13, 2018

Test build #89359 has finished for PR 20997 at commit 7aa3257.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class KafkaDataConsumerSuite extends SparkFunSuite with BeforeAndAfterAll

@koeninger
Copy link
Contributor

koeninger commented Apr 17, 2018 via email

@gaborgsomogyi
Copy link
Contributor Author

gaborgsomogyi commented Apr 20, 2018

Taken a look at the pool options and I have the feeling it requires more time to come up with a proper solution. Switching back to the SQL code provided one cached consumer approach...

@gaborgsomogyi
Copy link
Contributor Author

In the meantime found a small glitch in the SQL part. Namely if reattempt happens this line


removes the consumer from cache which will end up in this log message:

13:27:07.556 INFO org.apache.spark.sql.kafka010.KafkaDataConsumer: Released a supposedly cached consumer that was not found in the cache

I've solved this here by removing only the closed consumer. The marked for close will be removed in release.

@SparkQA
Copy link

SparkQA commented Apr 21, 2018

Test build #89676 has finished for PR 20997 at commit 2c45388.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@vanzin vanzin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few minor things but in general I'll defer to Cody, who knows a lot more about this than I do.

def release(): Unit

/** Reference to the internal implementation that this wrapper delegates to */
private[kafka010] def internalConsumer: InternalKafkaConsumer[K, V]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: private[kafka010] since class already has that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed.

/**
* Must be called before acquire, once per JVM, to configure the cache.
* Further calls are ignored.
* */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: */

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

// likely running on a beefy machine that can handle a large number of simultaneously
// active consumers.

if (entry.getValue.inUse == false && this.size > maxCapacity) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: !entry.getValue.inUse

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

logWarning(
s"KafkaConsumer cache hitting max capacity of $maxCapacity, " +
s"removing consumer for ${entry.getKey}")
try {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: indent

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

}
}

private def release(internalConsumer: InternalKafkaConsumer[_, _]): Unit = synchronized {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After reading this code and also the acquire method, is there a useful difference between the CachedKafkaDataConsumer and NonCachedKafkaDataConsumer types?

It seems like the code doesn't really care about those types, but just about whether the consumer is in the cache?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's a good observation. But I'm not sure it's worth deviating from the same design being used in the SQL code.

@SparkQA
Copy link

SparkQA commented May 2, 2018

Test build #90044 has finished for PR 20997 at commit 6cd67c6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gaborgsomogyi
Copy link
Contributor Author

Do I need to do any further changes?

@vanzin
Copy link
Contributor

vanzin commented May 22, 2018

I'm fine with it. Unless Cody beats me to it or has more comments, I'll push this after the long weekend.

@vanzin
Copy link
Contributor

vanzin commented May 22, 2018

retest this please

@koeninger
Copy link
Contributor

I'm fine as well.

@SparkQA
Copy link

SparkQA commented May 22, 2018

Test build #90989 has finished for PR 20997 at commit 6cd67c6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@vanzin
Copy link
Contributor

vanzin commented May 22, 2018

That being the case, merging to master.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants