Skip to content

Commit 2ef1717

Browse files
committed
try new fix
1 parent 66fade4 commit 2ef1717

6 files changed

Lines changed: 8 additions & 16 deletions

File tree

extras/kinesis-asl/pom.xml

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -70,12 +70,6 @@
7070
<version>${aws.kinesis.producer.version}</version>
7171
<scope>test</scope>
7272
</dependency>
73-
<dependency>
74-
<groupId>com.google.protobuf</groupId>
75-
<artifactId>protobuf-java</artifactId>
76-
<version>2.6.1</version>
77-
<scope>test</scope>
78-
</dependency>
7973
<dependency>
8074
<groupId>org.mockito</groupId>
8175
<artifactId>mockito-core</artifactId>

extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ class KinesisBackedBlockRDD[T: ClassTag](
9999
val blockId = partition.blockId
100100

101101
def getBlockFromBlockManager(): Option[Iterator[T]] = {
102-
logInfo(s"Read partition data of $this from block manager, block $blockId")
102+
logDebug(s"Read partition data of $this from block manager, block $blockId")
103103
blockManager.get(blockId).map(_.data.asInstanceOf[Iterator[T]])
104104
}
105105

@@ -213,8 +213,8 @@ class KinesisSequenceRangeIterator(
213213
}
214214
// De-aggregate records, if KPL was used in producing the records. The KCL automatically
215215
// handles de-aggregation during regular operation. This code path is used during recovery
216-
// val recordIterator = UserRecord.deaggregate()
217-
(getRecordsResult.getRecords.iterator().asScala, getRecordsResult.getNextShardIterator)
216+
val recordIterator = UserRecord.deaggregate(getRecordsResult.getRecords)
217+
(recordIterator.asScala.iterator, getRecordsResult.getNextShardIterator)
218218
}
219219

220220
/**

extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ private[kinesis] class KinesisInputDStream[T: ClassTag](
5454
val seqNumRanges = blockInfos.map {
5555
_.metadataOption.get.asInstanceOf[SequenceNumberRanges] }.toArray
5656
val isBlockIdValid = blockInfos.map { _.isBlockIdValid() }.toArray
57-
logInfo(s"Creating KinesisBackedBlockRDD for $time with ${seqNumRanges.length} " +
57+
logDebug(s"Creating KinesisBackedBlockRDD for $time with ${seqNumRanges.length} " +
5858
s"seq number ranges: ${seqNumRanges.mkString(", ")} ")
5959
new KinesisBackedBlockRDD(
6060
context.sc, regionName, endpointUrl, blockIds, seqNumRanges,

extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,6 @@ private[kinesis] class KinesisReceiver[T](
206206
val dataIterator = records.iterator().asScala.map(messageHandler)
207207
val metadata = SequenceNumberRange(streamName, shardId,
208208
records.get(0).getSequenceNumber(), records.get(records.size() - 1).getSequenceNumber())
209-
logInfo(s"blockGenerator.multipleDataWithCallback: iterator: $dataIterator, meta: $metadata")
210209
blockGenerator.addMultipleDataWithCallback(dataIterator, metadata)
211210
}
212211
}
@@ -231,7 +230,7 @@ private[kinesis] class KinesisReceiver[T](
231230
private def finalizeRangesForCurrentBlock(blockId: StreamBlockId): Unit = {
232231
blockIdToSeqNumRanges(blockId) = SequenceNumberRanges(seqNumRangesInCurrentBlock.toArray)
233232
seqNumRangesInCurrentBlock.clear()
234-
logInfo(s"Generated block $blockId has $blockIdToSeqNumRanges")
233+
logDebug(s"Generated block $blockId has $blockIdToSeqNumRanges")
235234
}
236235

237236
/** Store the block along with its associated ranges */

extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,11 +70,10 @@ private[kinesis] class KinesisRecordProcessor[T](
7070
* in the DStream
7171
*/
7272
override def processRecords(batch: List[Record], checkpointer: IRecordProcessorCheckpointer) {
73-
logInfo(s"Received batch: $batch. Is receiver alive: ${!receiver.isStopped()}")
7473
if (!receiver.isStopped()) {
7574
try {
7675
receiver.addRecords(shardId, batch)
77-
logInfo(s"Stored: Worker $workerId stored ${batch.size} records for shardId $shardId")
76+
logDebug(s"Stored: Worker $workerId stored ${batch.size} records for shardId $shardId")
7877

7978
/*
8079
*

extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,8 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
7878
if (testUtils != null) {
7979
// Delete the Kinesis stream as well as the DynamoDB table generated by
8080
// Kinesis Client Library when consuming the stream
81-
// testUtils.deleteStream()
82-
// testUtils.deleteDynamoDBTable(appName)
81+
testUtils.deleteStream()
82+
testUtils.deleteDynamoDBTable(appName)
8383
}
8484
}
8585

0 commit comments

Comments
 (0)