-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-19304] [Streaming] [Kinesis] fix kinesis slow checkpoint recovery #16842
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-19304] [Streaming] [Kinesis] fix kinesis slow checkpoint recovery #16842
Conversation
…rd loaded on aws getRecords call
| case class SequenceNumberRange( | ||
| streamName: String, shardId: String, fromSeqNumber: String, toSeqNumber: String) | ||
| streamName: String, shardId: String, fromSeqNumber: String, toSeqNumber: String, | ||
| recordCount: Int) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this a property of a range -- or when would it not equal (from - to + 1)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure of a better place to put.
from - to != count. Kinesis seqNumber are in order but are not sequential
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, but is it an 'input' or 'output'? the usage below makes it look like the caller dictates how many records are in the range, but it doesn't know that ahead of time? I probably misunderstand this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
its an input to spark checkpoint metadata. On streaming KinesisReceiver receives records creates blocks & knows about seqNumber, count. When recovering from checkpoint we read back this information from checkpoint and make aws kinesis getRecords call with fromSeqNumber & limit
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm worried this change will break checkpoint recovery, because we use Java serialization, and be a barrier to users from upgrading.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure on upgrading, since for code upgrade we need to delete the checkpoint directory and start afresh. I did run this patch and was able to serialize the limit into checkpoint, ( not a scala pro though)
|
OK I think I see. MIght still be good for @brkyvz to review. |
|
Test build #3561 has finished for PR 16842 at commit
|
|
will work on testcases today |
|
Jenkins, retest this please |
|
@brkyvz can I do something to take it forward ? |
| val getRecordsRequest = new GetRecordsRequest | ||
| getRecordsRequest.setRequestCredentials(credentials) | ||
| getRecordsRequest.setShardIterator(shardIterator) | ||
| getRecordsRequest.setLimit(recordCount) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if this value is greater than 10000, this will throw an error
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
|
@srowen Do you know if we make the field of a case class an |
|
Yes it would certainly change the format of the default Java serialization. It wouldn't be compatible. The fields would have different types. |
|
@srowen I assumed that you cannot update code if you want to recover from checkpoint. |
|
@Gauravshah Can you please comment on how much faster this PR improved your recovery time? |
| getRecordsRequest.setRequestCredentials(credentials) | ||
| getRecordsRequest.setShardIterator(shardIterator) | ||
| getRecordsRequest.setLimit(recordCount) | ||
| getRecordsRequest.setLimit(Math.max(recordCount, this.maxGetRecordsLimit)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be a min not a max
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
brkyvz
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Talked with @tdas offline. We can't guarantee updatability across Spark versions for Spark Streaming, therefore this change is okay. Left two comments on style, then it LGTM.
| private[kinesis] | ||
| case class SequenceNumberRange( | ||
| streamName: String, shardId: String, fromSeqNumber: String, toSeqNumber: String) | ||
| streamName: String, shardId: String, fromSeqNumber: String, toSeqNumber: String, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
one parameter per line:
streamName: String,
shardId: String,
...| */ | ||
| private def getRecordsAndNextKinesisIterator( | ||
| shardIterator: String): (Iterator[Record], String) = { | ||
| shardIterator: String, recordCount: Int): (Iterator[Record], String) = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto, one param per line
|
@brkyvz Thank you for taking this forward. We have batch interval of 2 minutes & takes ~1.1 minutes to process. With older code it takes 10-12 minutes to recover and with limit fix it recovers in 2.5-3 minutes. |
| * Get records starting from or after the given sequence number. | ||
| */ | ||
| private def getRecords(iteratorType: ShardIteratorType, seqNum: String): Iterator[Record] = { | ||
| private def getRecords(iteratorType: ShardIteratorType, seqNum: String, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you forgot here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
|
retest this please |
|
okay to test |
|
@srowen Do you know why this hasn't kicked off any tests? |
|
Test build #3595 has finished for PR 16842 at commit
|
|
LGTM! Merging to master! Thanks. |
What changes were proposed in this pull request?
added a limit to getRecords api call call in KinesisBackedBlockRdd. This helps reduce the amount of data returned by kinesis api call making the recovery considerably faster
As we are storing the
fromSeqNum&toSeqNumin checkpoint metadata, we can also store the number of records. Which can later be used for api call.How was this patch tested?
The patch was manually tested
Apologies for any silly mistakes, opening first pull request