Skip to content

Commit d17ca6d

Browse files
committed
per TD's feedback: updated docs, simplified the KinesisUtils api
1 parent 912640c commit d17ca6d

4 files changed

Lines changed: 38 additions & 67 deletions

File tree

extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCount.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -165,14 +165,14 @@ public static void main(String[] args) {
165165
/** Create the same number of Kinesis Receivers/DStreams as stream shards, then union them all */
166166
JavaDStream<byte[]> allStreams = KinesisUtils
167167
.createStream(jssc, appName, stream, endpoint, checkpointInterval.milliseconds(),
168-
InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2());
168+
InitialPositionInStream.LATEST);
169169
/** Set the checkpoint interval */
170170
allStreams.checkpoint(checkpointInterval);
171171
for (int i = 1; i < numStreams; i++) {
172172
/** Create a new Receiver/DStream for each stream shard */
173173
JavaDStream<byte[]> dStream = KinesisUtils
174174
.createStream(jssc, appName, stream, endpoint, checkpointInterval.milliseconds(),
175-
InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2());
175+
InitialPositionInStream.LATEST);
176176
/** Set the Spark checkpoint interval */
177177
dStream.checkpoint(checkpointInterval);
178178

extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCount.scala

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -159,15 +159,13 @@ object KinesisWordCount extends Logging {
159159
* them all.
160160
*/
161161
var allStreams: DStream[Array[Byte]] = KinesisUtils.createStream(ssc, appName, stream,
162-
endpoint, checkpointInterval.milliseconds, InitialPositionInStream.LATEST,
163-
StorageLevel.MEMORY_AND_DISK_2)
162+
endpoint, checkpointInterval.milliseconds, InitialPositionInStream.LATEST)
164163
/** Set the checkpoint interval */
165164
allStreams.checkpoint(checkpointInterval)
166165
for (i <- 1 until numStreams) {
167166
/** Create a new Receiver/DStream for each stream shard */
168167
val dStream = KinesisUtils.createStream(ssc, appName, stream, endpoint,
169-
checkpointInterval.milliseconds,
170-
InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2)
168+
checkpointInterval.milliseconds, InitialPositionInStream.LATEST)
171169
/** Set the Spark checkpoint interval */
172170
dStream.checkpoint(checkpointInterval)
173171

extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala

Lines changed: 10 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -41,29 +41,23 @@ import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker
4141
* Instances of this class will get shipped to the Spark Streaming Workers
4242
* to run within a Spark Executor.
4343
*
44-
* @param appName Kinesis Application Name. Kinesis apps are mapped to Kinesis streams
45-
* by the Kinesis Client Library. If you change the app name or stream name,
46-
* the KCL will throw errors.
44+
* @param appName unique name for your Kinesis app. Multiple instances of the app pull from
45+
* the same stream. The Kinesis Client Library coordinates all load-balancing and
46+
* failure-recovery.
4747
* @param stream Kinesis stream name
48-
* @param endpoint url of Kinesis service
49-
* @param checkpointIntervalMillis for Kinesis checkpointing (not Spark checkpointing).
50-
* See the Kinesis Spark Streaming documentation for more details on the different types
51-
* of checkpoints.
52-
* @param initialPositionInStream in the absence of Kinesis checkpoint info, this is the worker's initial
53-
* starting position in the stream.
54-
* The values are either the beginning of the stream per Kinesis' limit of 24 hours
55-
* (InitialPositionInStream.TRIM_HORIZON) or the tip of the stream
56-
* (InitialPositionInStream.LATEST).
57-
* @param persistence strategy for RDDs and DStreams.
48+
* @param endpoint url of Kinesis service (ie. https://kinesis.us-east-1.amazonaws.com)
49+
* Available endpoints: http://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region
50+
* @param checkpointIntervalMillis interval (millis) for Kinesis checkpointing
51+
* @param initialPositionInStream in the absence of a Kinesis checkpoint info, this is the
52+
* worker's initial starting position in the stream.
5853
*/
5954
private[kinesis] class KinesisReceiver(
6055
appName: String,
6156
stream: String,
6257
endpoint: String,
6358
checkpointIntervalMillis: Long,
64-
initialPositionInStream: InitialPositionInStream,
65-
storageLevel: StorageLevel)
66-
extends Receiver[Array[Byte]](storageLevel) with Logging { receiver =>
59+
initialPositionInStream: InitialPositionInStream)
60+
extends Receiver[Array[Byte]](StorageLevel.MEMORY_AND_DISK_2) with Logging { receiver =>
6761

6862
/**
6963
* The following vars are built in the onStart() method which executes in the Spark Worker after

extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala

Lines changed: 24 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -28,34 +28,24 @@ import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionIn
2828

2929

3030
/**
31-
* Facade to create the Scala-based or Java-based streams.
32-
* Also, contains a reusable utility methods.
31+
* Helper class to create Amazon Kinesis Input Stream
3332
* :: Experimental ::
3433
*/
3534
@Experimental
3635
object KinesisUtils extends Logging {
3736
/**
3837
* Create an InputDStream that pulls messages from a Kinesis stream.
3938
*
40-
* @param StreamingContext object
41-
* @param appName Kinesis Application Name. Kinesis Apps are mapped to Kinesis Streams
42-
* by the Kinesis Client Library. If you change the App name or Stream name,
43-
* the KCL will throw errors.
44-
* @param stream Kinesis Stream Name
45-
* @param endpoint url of Kinesis service
46-
* @param checkpoint interval (millis) for Kinesis checkpointing (not Spark checkpointing).
47-
* See the Kinesis Spark Streaming documentation for more details on the different types
48-
* of checkpoints.
49-
* @param initialPositionInStream in the absence of Kinesis checkpoint info, this is the
39+
* @param ssc StreamingContext
40+
* @param appName unique name for your Kinesis app. Multiple instances of the app pull from
41+
* the same stream. The Kinesis Client Library coordinates all load-balancing and
42+
* failure-recovery.
43+
* @param stream Kinesis stream name
44+
* @param endpoint url of Kinesis service (ie. https://kinesis.us-east-1.amazonaws.com)
45+
* Available endpoints: http://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region
46+
* @param checkpointIntervalMillis interval (millis) for Kinesis checkpointing
47+
* @param initialPositionInStream in the absence of a Kinesis checkpoint info, this is the
5048
* worker's initial starting position in the stream.
51-
* The values are either the beginning of the stream per Kinesis' limit of 24 hours
52-
* (InitialPositionInStream.TRIM_HORIZON) or the tip of the stream
53-
* (InitialPositionInStream.LATEST).
54-
* The default is TRIM_HORIZON to avoid potential data loss. However, this presents the risk
55-
* of processing records more than once.
56-
* @param storageLevel The default is StorageLevel.MEMORY_AND_DISK_2 which replicates in-memory
57-
* and on-disk to 2 nodes total (primary and secondary)
58-
*
5949
* @return ReceiverInputDStream[Array[Byte]]
6050
*/
6151
def createStream(
@@ -64,45 +54,34 @@ object KinesisUtils extends Logging {
6454
stream: String,
6555
endpoint: String,
6656
checkpointIntervalMillis: Long,
67-
initialPositionInStream: InitialPositionInStream,
68-
storageLevel: StorageLevel): ReceiverInputDStream[Array[Byte]] = {
57+
initialPositionInStream: InitialPositionInStream): ReceiverInputDStream[Array[Byte]] = {
6958
ssc.receiverStream(new KinesisReceiver(appName, stream, endpoint, checkpointIntervalMillis,
70-
initialPositionInStream, storageLevel))
59+
initialPositionInStream ))
7160
}
7261

7362
/**
7463
* Create a Java-friendly InputDStream that pulls messages from a Kinesis stream.
7564
*
76-
* @param JavaStreamingContext object
77-
* @param appName Kinesis Application Name. Kinesis Apps are mapped to Kinesis Streams
78-
* by the Kinesis Client Library. If you change the App name or Stream name,
79-
* the KCL will throw errors.
80-
* @param stream Kinesis Stream Name
81-
* @param endpoint url of Kinesis service
82-
* @param checkpoint interval (millis) for Kinesis checkpointing (not Spark checkpointing).
83-
* See the Kinesis Spark Streaming documentation for more details on the different types
84-
* of checkpoints.
85-
* @param initialPositionInStream in the absence of Kinesis checkpoint info, this is the
65+
* @param jssc Java StreamingContext object
66+
* @param appName unique name for your Kinesis app. Multiple instances of the app pull from
67+
* the same stream. The Kinesis Client Library coordinates all load-balancing and
68+
* failure-recovery.
69+
* @param stream Kinesis stream name
70+
* @param endpoint url of Kinesis service (ie. https://kinesis.us-east-1.amazonaws.com)
71+
* Available endpoints: http://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region
72+
* @param checkpointIntervalMillis interval (millis) for Kinesis checkpointing
73+
* @param initialPositionInStream in the absence of a Kinesis checkpoint info, this is the
8674
* worker's initial starting position in the stream.
87-
* The values are either the beginning of the stream per Kinesis' limit of 24 hours
88-
* (InitialPositionInStream.TRIM_HORIZON) or the tip of the stream
89-
* (InitialPositionInStream.LATEST).
90-
* The default is TRIM_HORIZON to avoid potential data loss. However, this presents the risk
91-
* of processing records more than once.
92-
* @param storageLevel The default is StorageLevel.MEMORY_AND_DISK_2 which replicates in-memory
93-
* and on-disk to 2 nodes total (primary and secondary)
94-
*
9575
* @return JavaReceiverInputDStream[Array[Byte]]
9676
*/
9777
def createStream(
9878
jssc: JavaStreamingContext,
9979
appName: String,
10080
stream: String,
10181
endpoint: String,
102-
checkpointIntervalMillis: Long,
103-
initialPositionInStream: InitialPositionInStream,
104-
storageLevel: StorageLevel): JavaReceiverInputDStream[Array[Byte]] = {
82+
checkpointIntervalMillis: Long,
83+
initialPositionInStream: InitialPositionInStream): JavaReceiverInputDStream[Array[Byte]] = {
10584
jssc.receiverStream(new KinesisReceiver(appName, stream, endpoint, checkpointIntervalMillis,
106-
initialPositionInStream, storageLevel))
85+
initialPositionInStream))
10786
}
10887
}

0 commit comments

Comments
 (0)