Skip to content

Commit 9d50bd9

Browse files
committed
Docs and example changes to update kinesis docs to newer version of API
1 parent be846db commit 9d50bd9

2 files changed

Lines changed: 80 additions & 43 deletions

File tree

docs/streaming-kinesis-integration.md

Lines changed: 69 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -24,41 +24,58 @@ A Kinesis stream can be set up at one of the valid Kinesis endpoints with 1 or m
2424
For Python applications, you will have to add this above library and its dependencies when deploying your application. See the *Deploying* subsection below.
2525
**Note that by linking to this library, you will include [ASL](https://aws.amazon.com/asl/)-licensed code in your application.**
2626

27-
2. **Programming:** In the streaming application code, import `KinesisUtils` and create the input DStream of byte array as follows:
27+
2. **Programming:** In the streaming application code, import `KinesisInputDStream` and create the input DStream of byte array as follows:
2828

2929
<div class="codetabs">
3030
<div data-lang="scala" markdown="1">
31-
import org.apache.spark.streaming.Duration
32-
import org.apache.spark.streaming.kinesis._
33-
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
34-
35-
val kinesisStream = KinesisUtils.createStream(
36-
streamingContext, [Kinesis app name], [Kinesis stream name], [endpoint URL],
37-
[region name], [initial position], [checkpoint interval], StorageLevel.MEMORY_AND_DISK_2)
38-
39-
See the [API docs](api/scala/index.html#org.apache.spark.streaming.kinesis.KinesisUtils$)
31+
import org.apache.spark.storage.StorageLevel
32+
import org.apache.spark.streaming.kinesis.KinesisInputDStream
33+
import org.apache.spark.streaming.{Seconds, StreamingContext}
34+
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
35+
36+
val kinesisStream = KinesisInputDStream.builder
37+
.streamingContext(streamingContext)
38+
.endpointUrl([endpoint URL])
39+
.regionName([region name])
40+
.streamName([streamName])
41+
.initialPositionInStream([initial position])
42+
.checkpointAppName([Kinesis app name])
43+
.checkpointInterval([checkpoint interval])
44+
.storageLevel(StorageLevel.MEMORY_AND_DISK_2)
45+
.build()
46+
47+
See the [API docs](api/scala/index.html#org.apache.spark.streaming.kinesis.KinesisInputDStream)
4048
and the [example]({{site.SPARK_GITHUB_URL}}/tree/master/external/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala). Refer to the [Running the Example](#running-the-example) subsection for instructions on how to run the example.
4149

4250
</div>
4351
<div data-lang="java" markdown="1">
44-
import org.apache.spark.streaming.Duration;
45-
import org.apache.spark.streaming.kinesis.*;
46-
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
47-
48-
JavaReceiverInputDStream<byte[]> kinesisStream = KinesisUtils.createStream(
49-
streamingContext, [Kinesis app name], [Kinesis stream name], [endpoint URL],
50-
[region name], [initial position], [checkpoint interval], StorageLevel.MEMORY_AND_DISK_2);
52+
import org.apache.spark.storage.StorageLevel
53+
import org.apache.spark.streaming.kinesis.KinesisInputDStream
54+
import org.apache.spark.streaming.Seconds
55+
import org.apache.spark.streaming.StreamingContext
56+
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
57+
58+
KinesisInputDStream<byte[]> kinesisStream = KinesisInputDStream.builder
59+
.streamingContext(streamingContext)
60+
.endpointUrl([endpoint URL])
61+
.regionName([region name])
62+
.streamName([streamName])
63+
.initialPositionInStream([initial position])
64+
.checkpointAppName([Kinesis app name])
65+
.checkpointInterval([checkpoint interval])
66+
.storageLevel(StorageLevel.MEMORY_AND_DISK_2)
67+
.build();
5168

5269
See the [API docs](api/java/index.html?org/apache/spark/streaming/kinesis/KinesisUtils.html)
5370
and the [example]({{site.SPARK_GITHUB_URL}}/tree/master/external/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java). Refer to the [Running the Example](#running-the-example) subsection for instructions to run the example.
5471

5572
</div>
5673
<div data-lang="python" markdown="1">
57-
from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream
74+
from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream
5875

59-
kinesisStream = KinesisUtils.createStream(
60-
streamingContext, [Kinesis app name], [Kinesis stream name], [endpoint URL],
61-
[region name], [initial position], [checkpoint interval], StorageLevel.MEMORY_AND_DISK_2)
76+
kinesisStream = KinesisUtils.createStream(
77+
streamingContext, [Kinesis app name], [Kinesis stream name], [endpoint URL],
78+
[region name], [initial position], [checkpoint interval], StorageLevel.MEMORY_AND_DISK_2)
6279

6380
See the [API docs](api/python/pyspark.streaming.html#pyspark.streaming.kinesis.KinesisUtils)
6481
and the [example]({{site.SPARK_GITHUB_URL}}/tree/master/external/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py). Refer to the [Running the Example](#running-the-example) subsection for instructions to run the example.
@@ -70,27 +87,40 @@ A Kinesis stream can be set up at one of the valid Kinesis endpoints with 1 or m
7087

7188
<div class="codetabs">
7289
<div data-lang="scala" markdown="1">
73-
74-
import org.apache.spark.streaming.Duration
75-
import org.apache.spark.streaming.kinesis._
76-
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
77-
78-
val kinesisStream = KinesisUtils.createStream[T](
79-
streamingContext, [Kinesis app name], [Kinesis stream name], [endpoint URL],
80-
[region name], [initial position], [checkpoint interval], StorageLevel.MEMORY_AND_DISK_2,
81-
[message handler])
90+
import org.apache.spark.storage.StorageLevel
91+
import org.apache.spark.streaming.kinesis.KinesisInputDStream
92+
import org.apache.spark.streaming.{Seconds, StreamingContext}
93+
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
94+
95+
val kinesisStream = KinesisInputDStream.builder
96+
.streamingContext(streamingContext)
97+
.endpointUrl([endpoint URL])
98+
.regionName([region name])
99+
.streamName([streamName])
100+
.initialPositionInStream([initial position])
101+
.checkpointAppName([Kinesis app name])
102+
.checkpointInterval([checkpoint interval])
103+
.storageLevel(StorageLevel.MEMORY_AND_DISK_2)
104+
.buildWithMessageHandler([message handler])
82105

83106
</div>
84107
<div data-lang="java" markdown="1">
85-
86-
import org.apache.spark.streaming.Duration;
87-
import org.apache.spark.streaming.kinesis.*;
88-
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
89-
90-
JavaReceiverInputDStream<T> kinesisStream = KinesisUtils.createStream(
91-
streamingContext, [Kinesis app name], [Kinesis stream name], [endpoint URL],
92-
[region name], [initial position], [checkpoint interval], StorageLevel.MEMORY_AND_DISK_2,
93-
[message handler], [class T]);
108+
import org.apache.spark.storage.StorageLevel
109+
import org.apache.spark.streaming.kinesis.KinesisInputDStream
110+
import org.apache.spark.streaming.Seconds
111+
import org.apache.spark.streaming.StreamingContext
112+
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
113+
114+
KinesisInputDStream<byte[]> kinesisStream = KinesisInputDStream.builder
115+
.streamingContext(streamingContext)
116+
.endpointUrl([endpoint URL])
117+
.regionName([region name])
118+
.streamName([streamName])
119+
.initialPositionInStream([initial position])
120+
.checkpointAppName([Kinesis app name])
121+
.checkpointInterval([checkpoint interval])
122+
.storageLevel(StorageLevel.MEMORY_AND_DISK_2)
123+
.buildWithMessageHandler([message handler]);
94124

95125
</div>
96126
</div>

external/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import java.nio.ByteBuffer
2323
import scala.util.Random
2424

2525
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
26-
import com.amazonaws.regions.RegionUtils
2726
import com.amazonaws.services.kinesis.AmazonKinesisClient
2827
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
2928
import com.amazonaws.services.kinesis.model.PutRecordRequest
@@ -34,7 +33,7 @@ import org.apache.spark.internal.Logging
3433
import org.apache.spark.storage.StorageLevel
3534
import org.apache.spark.streaming.{Milliseconds, StreamingContext}
3635
import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions
37-
import org.apache.spark.streaming.kinesis.KinesisUtils
36+
import org.apache.spark.streaming.kinesis.KinesisInputDStream
3837

3938

4039
/**
@@ -135,8 +134,16 @@ object KinesisWordCountASL extends Logging {
135134

136135
// Create the Kinesis DStreams
137136
val kinesisStreams = (0 until numStreams).map { i =>
138-
KinesisUtils.createStream(ssc, appName, streamName, endpointUrl, regionName,
139-
InitialPositionInStream.LATEST, kinesisCheckpointInterval, StorageLevel.MEMORY_AND_DISK_2)
137+
KinesisInputDStream.builder
138+
.streamingContext(ssc)
139+
.streamName(streamName)
140+
.endpointUrl(endpointUrl)
141+
.regionName(regionName)
142+
.initialPositionInStream(InitialPositionInStream.LATEST)
143+
.checkpointAppName(appName)
144+
.checkpointInterval(kinesisCheckpointInterval)
145+
.storageLevel(StorageLevel.MEMORY_AND_DISK_2)
146+
.build()
140147
}
141148

142149
// Union all the streams

0 commit comments

Comments
 (0)