Skip to content

Conversation

@gaborgsomogyi
Copy link
Contributor

@gaborgsomogyi gaborgsomogyi commented Jun 25, 2019

What changes were proposed in this pull request?

There are "unsafe" conversions in the Kafka connector.
CaseInsensitiveStringMap comes in which is then converted the following way:

...
options.asScala.toMap
...

The main problem with this is that such case it looses its case insensitive nature
(case insensitive map is converting the key to lower case when get/contains called).

In this PR I'm using CaseInsensitiveMap to solve this problem.

How was this patch tested?

Existing + additional unit tests.

@gaborgsomogyi
Copy link
Contributor Author

gaborgsomogyi commented Jun 25, 2019

cc @HyukjinKwon and @HeartSaVioR since it's connected to SPARK-28142.

@SparkQA
Copy link

SparkQA commented Jun 25, 2019

Test build #106887 has finished for PR 24967 at commit 23c2d71.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

.filter(_.toLowerCase(Locale.ROOT).startsWith("kafka."))
.map { k => k.drop(6).toString -> parameters(k) }
.toMap
val specifiedKafkaParams = convertToSpecifiedParams(caseInsensitiveOptions)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not a required change but thought it would be good to simplify.

gaborgsomogyi added a commit to gaborgsomogyi/spark that referenced this pull request Jun 25, 2019
@HeartSaVioR
Copy link
Contributor

HeartSaVioR commented Jun 25, 2019

I feel the reason of such confusion is not explicitly requiring type to CaseInsensitiveMap whenever needed. Map with lowercased key and CaseInsensitiveMap are not same, but KafkaSourceProvider just mixes up using both, especially referring Map with lowercase key as "case-insensitive" which is not strictly true.

If we require lowercase key, it shouldn't be referred as "case-insensitive". It's case-sensitive, and if we can't expect the same from Kafka, we will get into trouble with passing config to Kafka.

@gaborgsomogyi
Copy link
Contributor Author

@HeartSaVioR thanks for your comment!

This case sensitivity issue disturbs me for at least half a year so pretty open what less error prone solution can we come up. Since I've found another issue I've invested more time and analyzed it through.

Let me share my thinking and please do the same to find out how to proceed. First, start with the actual implementation. Majority of this code uses case-insensitive maps in some way (CaseInsensitiveMap or CaseInsensitiveStringMap instances arrive mainly).

Map with lowercased key and CaseInsensitiveMap are not same

This is true.

referring Map with lowercase key as "case-insensitive" which is not strictly true

I agree with your statement and we can discuss how to name it. Map with lowercase key is referred "case-insensitive" because the user doesn't have to care about case when the following operations used: put/get/contains. The following ops are the same:

val map: CaseInsensitive...
map.put("key1", "value1")
map.put("keY2", "value2")

// all is true
map.get("key1")==map.get("KEY1")
map.contains("kEy1")
map.get("key2")==map.get("KEY2")
map.contains("kEy2")

If the underlying conversion which is used on the keys consistently will change then the mentioned logic is still true. To find out whether user is provided a configuration I think it's the most robust solution.

The other use-case which exists is to extract the entries from these maps which start with kafka. and send it to Kakfa producer/consumer. This can't be called case-insensitive of course because Kafka requires lowercase keys, no question. This is the reason why the following function extracts the mentioned parameters (and force lower case conversion on keys):

  private def convertToSpecifiedParams(parameters: Map[String, String]): Map[String, String] = {
    parameters
      .keySet
      .filter(_.toLowerCase(Locale.ROOT).startsWith("kafka."))
      .map { k => k.drop(6).toString -> parameters(k) }
      .toMap
  }

Majority of this code uses the mentioned case insensitive maps but there are some parts which breaks this (for example the part which we're now modifying).

As a final conclusion from my perspective:

  • Case insensitive maps has to be used for the first use-case because less error prone (this is what I tend to address)
  • When any parameter passed to Kafka convertToSpecifiedParams must be used (this is already the situation)
  • Changing the following strings to lower case would has the same effect just like the actual stand of this PR but then we stay in the same parameter case hell
  private[kafka010] val FETCH_OFFSET_NUM_RETRY = "fetchoffset.numretries"
  private[kafka010] val FETCH_OFFSET_RETRY_INTERVAL_MS = "fetchoffset.retryintervalms"

That said everything can be discussed so waiting on thoughts...

@HeartSaVioR
Copy link
Contributor

HeartSaVioR commented Jun 27, 2019

First of all, thanks for the detailed analysis, @gaborgsomogyi !

I mostly agree on your analysis - just to say we are mixing up Map with lowercased key and CaseInsensitiveMap, and require Map as a type of parameter if we need case insensitive map so in callee side it can't determine whether it is Map with lowercased key or CaseInsensitiveMap. That was the reason of root case of #24942. We are checking the type loosely even we have proper type which ensures the requirement. That's my point.

Copy link
Member

@gatorsmile gatorsmile left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HeartSaVioR @gaborfeher @dongjoon-hyun We should not skip adding the unit test cases especially when this PR claims we are fixing a bug.. Please add one. Thanks!

@gaborgsomogyi
Copy link
Contributor Author

@HeartSaVioR I see your point and agree the implementation can be made more safe. Since DSv1 provides only map as input parameter this can be converted to be more on the safe side and internally pass around CaseInsensitiveMap instances.

@gatorsmile Thanks for your review, adding test...

@gaborgsomogyi
Copy link
Contributor Author

@HeartSaVioR I've converted Map to CaseInsensitiveMap where the mentioned bug required it to make the code less error prone. I know there are places which not yet touched but I didn't want to mix up 2 intentions, namely solve this bug and making parameter handling more robust on v1 side. Going to file another jira + PR for the second...

@gatorsmile Added tests which cover not only the problematic parts.

@SparkQA
Copy link

SparkQA commented Jul 4, 2019

Test build #107241 has finished for PR 24967 at commit a4790ad.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 5, 2019

Test build #107273 has finished for PR 24967 at commit d20b85a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good overall. Just a 2 cents: I'm not sure we prefer renaming fields without specific reason along with other changes (I'm seeing multiple places doing it), as well as having individual tests for each JIRA issue which actually do pretty similar things.

@gaborgsomogyi
Copy link
Contributor Author

I'm not sure we prefer renaming fields without specific reason along with other changes

The other option was to introduce 2 different methods in the test to get the same KafkaOffsetReader from 2 different places (one from micro-batch and another from continuous).

@SparkQA
Copy link

SparkQA commented Jul 8, 2019

Test build #107344 has finished for PR 24967 at commit 3723930.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

*/
class KafkaContinuousStream(
offsetReader: KafkaOffsetReader,
private val offsetReader: KafkaOffsetReader,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is private val required for Use CaseInsensitiveMap for KafkaOffsetReader?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Otherwise this code:

getField(stream, offsetReaderMethod)

throws exception something like:

java.lang.IllegalArgumentException: Can't find a private method named: offsetReader

The other possibility to add a getter explicitly which I thought is overkill.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another approach would be Scala reflection but definitely more verbose than explicit getter.

*/
private[kafka010] class KafkaMicroBatchStream(
kafkaOffsetReader: KafkaOffsetReader,
private val offsetReader: KafkaOffsetReader,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Is private val required for Use CaseInsensitiveMap for KafkaOffsetReader?
  • Is the renaming from kafkaOffsetReader to offsetReader required for Use CaseInsensitiveMap for KafkaOffsetReader?

Copy link
Contributor Author

@gaborgsomogyi gaborgsomogyi Jul 8, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is private val required for Use CaseInsensitiveMap for KafkaOffsetReader?

Yeah, please see my previous comment. Same applies here.

Is the renaming from kafkaOffsetReader to offsetReader required for Use CaseInsensitiveMap for KafkaOffsetReader?

In short, no. A little bit detailed if not renamed then the test requires something like this:

  private val offsetReaderMicroBatchMethod = PrivateMethod[KafkaOffsetReader]('kafkaOffsetReader)
  private val offsetReaderContinuousMethod = PrivateMethod[KafkaOffsetReader]('offsetReader)
...
    verifyFieldsInMicroBatchStream(KafkaSourceProvider.FETCH_OFFSET_NUM_RETRY, expected, stream => {
      val kafkaOffsetReader = getField(stream, offsetReaderMicroBatchMethod)
      assert(expected.toInt === getField(kafkaOffsetReader, fetchOffsetNumRetriesMethod))
    })
...

General considerations why I've renamed couple of variables:

  • Exact same parameters are named sometimes 2-4 different ways which is hard to track
  • Not renaming them would cause similar situation in the test code
  • Sometimes a SparkConf config name and it's internal scala variable has different name
  • Renamed only Spark internal variables which shouldn't matter for users

We can roll back renamings, the code will work properly but I think these changes increase readability.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The rename rolled back.
private val I think is essential.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gaborgsomogyi . I understand the intention and want to help this PR because this PR has been under reviews for 2 weeks already.

However, this PR distracts the focus inconsistently eagerly. :) In general, we should not put the refactoring and new behavior like CaseInsensitiveMap into a single PR. Could you revert all the irrelevant stuff? You can put them into another PR.

If the PRs become as concise as possible, we can do review and merge as fast as we can. In case of simple PRs, as you know, one or two days are enough until merging. So, which one do you want? Do you want to keep all of this in this PR? Or, do you think you can split this into the simple ones?

Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, except still have my 2 cents for renaming fields.

@gaborgsomogyi
Copy link
Contributor Author

@dongjoon-hyun I think the clean and long term solution is to use CaseInsensitiveMap inside connectors where DSv1 is involved so at some point it should be reviewed. Since we're in the middle it would be good to spend time on it. I think there are limited number of questionable parts which are already pinpointed but as always if you suggest to create here a quick fix and apply the CaseInsensitiveMap in another PR we can turn to this direction.

@dongjoon-hyun
Copy link
Member

Okay. If this PR remains in this state, I'll leave this PR to other committers, @gaborgsomogyi and @HeartSaVioR .

@gaborgsomogyi
Copy link
Contributor Author

@dongjoon-hyun @HeartSaVioR rolled back all the renames.

Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

@SparkQA
Copy link

SparkQA commented Jul 9, 2019

Test build #107403 has finished for PR 24967 at commit 5cd7b8e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gaborgsomogyi
Copy link
Contributor Author

I think @dongjoon-hyun is busy with the 2.4 release so @vanzin can you have a look please?

@dongjoon-hyun
Copy link
Member

dongjoon-hyun commented Jul 15, 2019

@gaborgsomogyi . I'm just leaving this PR to the other committers. I'm not busy on 2.4 release since it's still building consensus. :)

@gaborgsomogyi
Copy link
Contributor Author

Ah, ok 🙂

@HyukjinKwon
Copy link
Member

Looks fine.

@gaborgsomogyi
Copy link
Contributor Author

@HyukjinKwon thanks for investing your time!

@SparkQA
Copy link

SparkQA commented Jul 18, 2019

Test build #107855 has finished for PR 24967 at commit 06fe0fc.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor

@HyukjinKwon Could we revisit this? I guess it's just waiting for last call to merge.

Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good. cc @zsxwing and @cloud-fan, I will get this in if there are no more comments in some days.

@cloud-fan
Copy link
Contributor

retest this please

@SparkQA
Copy link

SparkQA commented Aug 5, 2019

Test build #108669 has finished for PR 24967 at commit 06fe0fc.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 6, 2019

Test build #108718 has finished for PR 24967 at commit 9034894.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 8, 2019

Test build #108820 has finished for PR 24967 at commit e0e8a88.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gaborgsomogyi
Copy link
Contributor Author

@HyukjinKwon may I ask for another round please?

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in 5663386 Aug 9, 2019
@gaborgsomogyi
Copy link
Contributor Author

Many thanks @cloud-fan !

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants