-
Notifications
You must be signed in to change notification settings - Fork 29k
support for Kinesis #223
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
support for Kinesis #223
Changes from 1 commit
58210f2
035b976
88c9844
e6c0550
c282a31
294e5c2
d56d894
66a244d
e3ca337
b3aa4be
b2158a3
a0d07eb
f1c7811
8c631a0
0c62787
cf00662
c01fd48
bde5ff5
369b6dc
be53829
8b15ccd
5f8c829
e26687c
ebcfe8e
3fe7d72
39fd10a
d770a90
fb94cfb
0d21059
342cce6
1b04476
96fdfbb
0325c8d
3ab4055
d4a04a5
8696d9f
08a7929
ee975a8
8b37805
a36d925
7b8d2c2
ffc4cf1
e5cd817
e84aa22
a5602da
ce3bf45
e3b62b7
5219429
4a51dd8
fc8e738
34af7ed
186879f
6544239
4c42c45
b1ec902
c17bea0
05fa288
d890b93
146665f
40ceb3d
e7d0f3c
19edca2
177ef5d
ae55c2c
7c8a1d0
5be3319
349560c
52cfc18
2cc0bd3
d8d2e93
285cac4
4671126
01f4c13
aaee35f
654e63e
15d8bf6
08e04ed
2ea7ce7
f052dff
f9ab450
229c135
4b1266c
ab928c9
8f65c03
a761f16
684a102
e6bc51f
4100154
da48c74
6c505e6
4ea7110
e6e2963
3a73aa7
7cb4cc0
20d5f4b
c018887
fb64844
b111815
12c198d
93ddcbc
8045f49
ed5dd10
5f9b236
a034761
cfb22da
d00cae8
34a0b15
f59bead
a37e7a4
cfa8c4e
93c0e0e
a32746f
576e2d0
9645119
eaccf3b
d78cc81
be2ec07
487153b
f1fe720
764ffcf
974bf33
1acf918
7e8809d
2dc0b65
ce0886f
e991148
5b4ac43
6b02002
c90169a
375476c
9ec0b85
19dbc8c
ae3cf5c
4f602e5
aad4110
ca8a046
fa21377
35f81c1
cb96cd2
b4dc414
166ec29
319f578
e031c2f
c875a03
425f85c
912e9f6
697b7ad
fa9a017
bb0ddc2
2b9b907
51577ba
6e69569
bd56556
79ef2d8
d1de0aa
b19966d
7658285
9176e53
5ce095b
52b2ed5
7133ad9
692a679
4877e05
3d795ee
1b2dc5a
8df44f1
8d65d44
a2ef7f2
723bdf7
f396726
8b77c36
b847fe5
5b7ab96
c6ebbda
f81fc0f
1c3a0ab
a8694be
d5e008f
0126cea
d562dec
97f9315
422e642
9869bea
b11c599
21522a3
4a0c6b5
85babcb
814827b
b30b28c
60c613e
cc40c06
38df2e3
10d23c4
64db752
adeca4f
af22207
05255de
aac5872
32c30ca
39693e1
a62c682
47a6428
8b9b33d
38008be
3060c81
bb2a4e2
9a56b58
713c103
8505024
50b9f6a
1e6c770
602d4f8
1460b75
c829cab
11d8332
1d5cbf3
bb36494
db255ae
f6defcd
8ffa264
8c7c21b
50016ca
a50e69d
b622e01
c433491
fb18938
8fc1b52
7f84c3c
74c2a35
153cf30
27bec31
4cb80a0
f41dd6a
bd2fa02
52ac8a4
e8530db
7385516
a18f843
b36f603
dfd4a7f
0b46168
386a0a2
e7de368
b6ac76a
fc9c55b
cdff363
84b641b
02ab309
07ea156
faa1743
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,35 @@ | ||
| package org.apache.spark.streaming.examples | ||
|
|
||
| import org.apache.spark.streaming.{Seconds, StreamingContext} | ||
| import org.apache.spark.streaming.kinesis.KinesisUtils | ||
| import org.apache.spark.streaming.StreamingContext._ | ||
|
|
||
|
|
||
| object KinesisWordCount { | ||
|
|
||
| def main(args: Array[String]): Unit = { | ||
|
|
||
|
|
||
| if (args.length < 1) { | ||
| System.err.println("Usage: KinesisWordCount <master> <streamname>" + " [accesskey] [accessSecretKey]") | ||
| System.exit(1) | ||
| } | ||
|
|
||
| val master=args(0) | ||
| val kinesisStream=args(1) | ||
| val accesskey=args(2) | ||
| val accessSecretKey=args(3) | ||
|
|
||
|
|
||
| val ssc = new StreamingContext(master, "KinesisWordCOunt", Seconds(2), | ||
| System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass)) | ||
|
|
||
| val lines = KinesisUtils.createStream(ssc, accesskey, accessSecretKey, kinesisStream) | ||
|
|
||
| val words = lines.flatMap(_.split(" ")) | ||
| val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) | ||
| wordCounts.print | ||
| ssc.start | ||
|
|
||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,73 @@ | ||
| <?xml version="1.0" encoding="UTF-8"?> | ||
| <!-- | ||
| ~ Licensed to the Apache Software Foundation (ASF) under one or more | ||
| ~ contributor license agreements. See the NOTICE file distributed with | ||
| ~ this work for additional information regarding copyright ownership. | ||
| ~ The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| ~ (the "License"); you may not use this file except in compliance with | ||
| ~ the License. You may obtain a copy of the License at | ||
| ~ | ||
| ~ http://www.apache.org/licenses/LICENSE-2.0 | ||
| ~ | ||
| ~ Unless required by applicable law or agreed to in writing, software | ||
| ~ distributed under the License is distributed on an "AS IS" BASIS, | ||
| ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| ~ See the License for the specific language governing permissions and | ||
| ~ limitations under the License. | ||
| --> | ||
|
|
||
| <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> | ||
| <modelVersion>4.0.0</modelVersion> | ||
| <parent> | ||
| <groupId>org.apache.spark</groupId> | ||
| <artifactId>spark-parent</artifactId> | ||
| <version>1.0.0-incubating-SNAPSHOT</version> | ||
| <relativePath>../../pom.xml</relativePath> | ||
| </parent> | ||
|
|
||
| <groupId>org.apache.spark</groupId> | ||
| <artifactId>spark-streaming-amazonkinesis</artifactId> | ||
| <packaging>jar</packaging> | ||
| <name>Spark Project External Amazon Kinesis</name> | ||
| <url>http://spark.incubator.apache.org/</url> | ||
|
|
||
| <dependencies> | ||
| <dependency> | ||
| <groupId>org.apache.spark</groupId> | ||
| <artifactId>spark-streaming_${scala.binary.version}</artifactId> | ||
| <version>${project.version}</version> | ||
| </dependency> | ||
| <dependency> | ||
| <groupId>org.apache.spark</groupId> | ||
| <artifactId>spark-streaming_${scala.binary.version}</artifactId> | ||
| <version>${project.version}</version> | ||
| <type>test-jar</type> | ||
| <scope>test</scope> | ||
| </dependency> | ||
| <dependency> | ||
| <groupId>org.scalatest</groupId> | ||
| <artifactId>scalatest_${scala.binary.version}</artifactId> | ||
| <scope>test</scope> | ||
| </dependency> | ||
| <dependency> | ||
| <groupId>org.scalacheck</groupId> | ||
| <artifactId>scalacheck_${scala.binary.version}</artifactId> | ||
| <scope>test</scope> | ||
| </dependency> | ||
| <dependency> | ||
| <groupId>com.novocode</groupId> | ||
| <artifactId>junit-interface</artifactId> | ||
| <scope>test</scope> | ||
| </dependency> | ||
| </dependencies> | ||
| <build> | ||
| <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory> | ||
| <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory> | ||
| <plugins> | ||
| <plugin> | ||
| <groupId>org.scalatest</groupId> | ||
| <artifactId>scalatest-maven-plugin</artifactId> | ||
| </plugin> | ||
| </plugins> | ||
| </build> | ||
| </project> |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,164 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.streaming.kinesis | ||
|
|
||
| import org.apache.spark.streaming.StreamingContext | ||
| import scala.reflect.ClassTag | ||
| import org.apache.spark.storage.StorageLevel | ||
| import org.apache.spark.streaming.dstream.NetworkReceiver | ||
| import com.amazonaws.auth.AWSCredentialsProvider | ||
| import java.util.UUID | ||
| import com.amazonaws.auth.InstanceProfileCredentialsProvider | ||
| import com.amazonaws.auth.AWSCredentials | ||
| import com.amazonaws.auth.BasicAWSCredentials | ||
| import java.net.UnknownHostException | ||
| import java.net.InetAddress | ||
| import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration | ||
| import java.nio.charset.Charset | ||
| import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory | ||
| import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException | ||
| import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException | ||
| import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException | ||
| import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer | ||
| import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker | ||
| import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason | ||
| import com.amazonaws.services.kinesis.model.Record | ||
| import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor | ||
| import org.apache.spark.streaming.dstream.NetworkInputDStream | ||
| import scala.collection.JavaConversions._ | ||
| import java.util.List | ||
|
|
||
|
|
||
| private[streaming] | ||
| class KinesisInputDStream[T: ClassTag]( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please provide docs on these classes, especially on the KinesisReceiver. The documentation must be sufficient that any other developer is able to look at the code, understand the control/data flow and debug stuff when required. |
||
| @transient ssc_ : StreamingContext, | ||
| accesskey:String, | ||
| accessSecretKey:String, | ||
| kinesisStream:String, | ||
| kinesisEndpoint:String, | ||
| storageLevel: StorageLevel | ||
| ) extends NetworkInputDStream[String](ssc_) { | ||
|
|
||
|
|
||
| override def getReceiver(): NetworkReceiver[String] = { | ||
| new KinesisReceiver(accesskey,accessSecretKey,kinesisStream,kinesisEndpoint,storageLevel) | ||
| } | ||
| } | ||
|
|
||
|
|
||
| object AllDone extends Exception { } | ||
|
|
||
| private[streaming] | ||
| class KinesisReceiver[T: ClassTag]( | ||
| accesskey:String, | ||
| accessSecretKey:String, | ||
| kinesisStream:String, | ||
| kinesisEndpoint:String, | ||
| storageLevel: StorageLevel | ||
| ) extends NetworkReceiver[String] { | ||
|
|
||
| val NUM_RETRIES =5 | ||
| val BACKOFF_TIME_IN_MILLIS =2000 | ||
| var workerId = UUID.randomUUID().toString() | ||
|
|
||
| lazy val credentialsProvider = new AWSCredentialsProvider { | ||
|
|
||
| def getCredentials():AWSCredentials = { | ||
| if (accesskey.isEmpty()||accessSecretKey.isEmpty) { | ||
| new InstanceProfileCredentialsProvider().getCredentials() | ||
| }else{ | ||
| new BasicAWSCredentials(accesskey,accessSecretKey) | ||
| } | ||
| } | ||
|
|
||
| def refresh() {} | ||
| } | ||
|
|
||
| try { | ||
| workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID() | ||
| } catch { | ||
| case e:UnknownHostException => e.printStackTrace() | ||
| } | ||
|
|
||
| private lazy val decoder = Charset.forName("UTF-8").newDecoder(); | ||
| private lazy val kinesisClientLibConfiguration = new KinesisClientLibConfiguration(kinesisStream, kinesisStream, credentialsProvider,workerId).withKinesisEndpoint(kinesisEndpoint) | ||
| private lazy val blockGenerator = new BlockGenerator(storageLevel) | ||
|
|
||
| protected override def onStart() { | ||
|
|
||
| blockGenerator.start() | ||
| lazy val recordProcessorFactory:IRecordProcessorFactory = new IRecordProcessorFactory{ | ||
| def createProcessor():IRecordProcessor= new IRecordProcessor { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please fix indenting. Refer to Spark style guide |
||
|
|
||
| def initialize(shardId:String){ | ||
| logInfo("starting with shardId: "+shardId) | ||
| } | ||
|
|
||
| def processRecords(records: List[Record], checkpointer : IRecordProcessorCheckpointer) { | ||
| records.toList.foreach(record=>{ | ||
| blockGenerator+=decoder.decode(record.getData()).toString(); | ||
| }) | ||
| checkpoint(checkpointer); | ||
| } | ||
|
|
||
| def shutdown(checkpointer : IRecordProcessorCheckpointer, reason : ShutdownReason){ | ||
| logInfo("Shutting Down Kinesis Receiver: "+reason) | ||
| } | ||
| } | ||
| } | ||
| val worker = new Worker(recordProcessorFactory, kinesisClientLibConfiguration); | ||
| worker.run() | ||
| } | ||
|
|
||
| private def checkpoint(checkpointer : IRecordProcessorCheckpointer) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please document what these functions are doing. |
||
|
|
||
| for (i<-1 to NUM_RETRIES) { | ||
| try { | ||
| checkpointer.checkpoint(); | ||
| throw AllDone; | ||
| } catch { | ||
| case se:ShutdownException =>logInfo("Caught shutdown exception, skipping checkpoint.", se) | ||
| case e:ThrottlingException => { | ||
| // Backoff and re-attempt checkpoint upon transient failures | ||
| if (i >= (NUM_RETRIES - 1)) { | ||
| logInfo("Checkpoint failed after " + (i + 1) + "attempts.", e) | ||
| throw AllDone; | ||
| } else { | ||
| logInfo("Transient issue when checkpointing - attempt " | ||
| + (i + 1) + " of "+ NUM_RETRIES, e) | ||
| } | ||
| } | ||
| case e:InvalidStateException => { | ||
| logInfo("Cannot save checkpoint to the DynamoDB table used by the Amazon Kinesis Client Library.", e) | ||
| throw AllDone | ||
| } | ||
| case AllDone=> | ||
| } | ||
| try { | ||
| Thread.sleep(BACKOFF_TIME_IN_MILLIS) | ||
| } catch { | ||
| case e:InterruptedException => logInfo("Interrupted sleep", e) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| protected override def onStop() { | ||
| blockGenerator.stop() | ||
| logInfo("Amazon Kinesis receiver stopped") | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,48 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.streaming.kinesis | ||
|
|
||
| import org.apache.spark.streaming.StreamingContext | ||
| import org.apache.spark.streaming.dstream.DStream | ||
| import org.apache.spark.storage.StorageLevel | ||
| import org.apache.spark.streaming.api.java.JavaStreamingContext | ||
| import org.apache.spark.streaming.api.java.JavaDStream | ||
|
|
||
| object KinesisUtils { | ||
|
|
||
| def createStream( | ||
| ssc: StreamingContext, | ||
| accesskey:String="", | ||
| accessSecretKey:String="", | ||
| kinesisStream:String, | ||
| kinesisEndpoint:String="https://kinesis.us-east-1.amazonaws.com", | ||
| storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2 | ||
| ): DStream[String] = { | ||
| new KinesisInputDStream(ssc, accesskey,accessSecretKey,kinesisStream,kinesisEndpoint, storageLevel) | ||
| } | ||
| def createStream( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please provide complete Java documentation on how these functions are used. Refer to other XYZUtils to get an idea. |
||
| jssc: JavaStreamingContext, | ||
| accesskey:String, | ||
| accessSecretKey:String, | ||
| kinesisStream:String, | ||
| kinesisEndpoint:String, | ||
| storageLevel: StorageLevel | ||
| ): JavaDStream[String] = { | ||
| new KinesisInputDStream(jssc.ssc, accesskey,accessSecretKey,kinesisStream,kinesisEndpoint, storageLevel) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Scala style issue. |
||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,34 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.streaming.kinesis; | ||
|
|
||
|
|
||
| import org.junit.Test; | ||
| import org.apache.spark.storage.StorageLevel; | ||
| import org.apache.spark.streaming.LocalJavaStreamingContext; | ||
| import org.apache.spark.streaming.api.java.JavaDStream; | ||
|
|
||
| public class JavaKinesisStreamSuite extends LocalJavaStreamingContext { | ||
| @Test | ||
| public void testKinesisStream() { | ||
|
|
||
| JavaDStream<String> test1 = KinesisUtils.createStream(ssc, | ||
| "x", "y", "z","1",StorageLevel.MEMORY_AND_DISK_SER_2()); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same comment as the Scala unit test. |
||
| } | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,30 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.streaming.kinesis | ||
|
|
||
| import org.apache.spark.streaming.{StreamingContext, TestSuiteBase} | ||
| import org.apache.spark.storage.StorageLevel | ||
|
|
||
| class KinesisStreamSuite extends TestSuiteBase { | ||
|
|
||
| test("Kinesis input stream") { | ||
| val ssc = new StreamingContext(master, framework, batchDuration) | ||
| val test1 = KinesisUtils.createStream(ssc, accesskey="x",accessSecretKey="y",kinesisStream="z") | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This unit test does not really test anything. Is it possible to add a unit test that actually tests receiving data. Without proper unit tests, we have a lot of trouble understanding and analyzing when things have failed.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. unless i include AWS credentials in this test, there's no way to receive data from Kinesis. I'll take another looks and see if i can come up with something more comprehensive but a very little can be done without credentials. |
||
| ssc.stop() | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please provide full instructions on how to run this example. See other examples for more details.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, please add a Java Kinesis example. Many users really ask for Java examples and can be a significant barrier for trying out the Kinesis
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pdeyhim - per our offline convo this wknd, please add a note about running this demo with a minimum of master=local 2 threads.
otherwise it appears that the KinesisNetworkReceiver thread does not startup - breaking the demo.