Skip to content

Conversation

@eatoncys
Copy link
Contributor

What changes were proposed in this pull request?

If the value of param 'spark.streaming.concurrentJobs' is more than one, and the value of param 'spark.executor.cores' is more than one, there may be two or more tasks in one executor will use the same kafka consumer at the same time, then it will throw an exception: "KafkaConsumer is not safe for multi-threaded access";
for example:
spark.streaming.concurrentJobs=2
spark.executor.cores=2
spark.cores.max=2
if there is only one topic with one partition('topic1',0) to consume, there will be two jobs to run at the same time, and they will use the same cacheKey('groupid','topic1',0) to get the CachedKafkaConsumer from the cache list of' private var cache: ju.LinkedHashMap[CacheKey, CachedKafkaConsumer[_, _]]' , then it will get the same CachedKafkaConsumer.

this PR add threadId to the CachedKafkaConsumer key to prevent two thread using a consumer at the same time.

How was this patch tested?

existing ut test

@SparkQA
Copy link

SparkQA commented Nov 25, 2017

Test build #84183 has finished for PR 19819 at commit aa02d89.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@lvdongr
Copy link
Contributor

lvdongr commented Nov 27, 2017

Will the cached consumer to the same partition increase , when different tasks consume the same partition and no place to remove?

@gaborgsomogyi
Copy link
Contributor

It will create a new consumer for each thread. This could be quite resource consuming when several topics shared with thread pools.

@gaborgsomogyi
Copy link
Contributor

gaborgsomogyi commented Jul 17, 2018

@lvdongr I think this can be closed as the problem solved.

@lvdongr
Copy link
Contributor

lvdongr commented Jul 18, 2018

I've seen your PR: #20997, a good solution @gaborgsomogyi

@eatoncys eatoncys closed this Jul 18, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants