Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public static void main(String[] args) throws Exception {
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(1000));

// Create an input stream with the custom receiver on target ip:port and count the
// words in input stream of \n delimited text (eg. generated by 'nc')
// words in input stream of \n delimited text (e.g. generated by 'nc')
JavaReceiverInputDStream<String> lines = ssc.receiverStream(
new JavaCustomReceiver(args[0], Integer.parseInt(args[1])));
JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(SPACE.split(x)).iterator());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public static void main(String[] args) throws Exception {
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1));

// Create a JavaReceiverInputDStream on target ip:port and count the
// words in input stream of \n delimited text (eg. generated by 'nc')
// words in input stream of \n delimited text (e.g. generated by 'nc')
// Note that no duplication in storage level only for running locally.
// Replication necessary in distributed scenario for fault tolerance.
JavaReceiverInputDStream<String> lines = ssc.socketTextStream(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ private static JavaStreamingContext createContext(String ip,
ssc.checkpoint(checkpointDirectory);

// Create a socket stream on target ip:port and count the
// words in input stream of \n delimited text (eg. generated by 'nc')
// words in input stream of \n delimited text (e.g. generated by 'nc')
JavaReceiverInputDStream<String> lines = ssc.socketTextStream(ip, port);
JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(SPACE.split(x)).iterator());
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(s -> new Tuple2<>(s, 1))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public static void main(String[] args) throws Exception {
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1));

// Create a JavaReceiverInputDStream on target ip:port and count the
// words in input stream of \n delimited text (eg. generated by 'nc')
// words in input stream of \n delimited text (e.g. generated by 'nc')
// Note that no duplication in storage level only for running locally.
// Replication necessary in distributed scenario for fault tolerance.
JavaReceiverInputDStream<String> lines = ssc.socketTextStream(
Expand Down
2 changes: 1 addition & 1 deletion examples/src/main/python/ml/train_validation_split.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

"""
This example demonstrates applying TrainValidationSplit to split data
and preform model selection.
and perform model selection.
Run with:

bin/spark-submit examples/src/main/python/ml/train_validation_split.py
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def createContext(host, port, outputPath):
ssc = StreamingContext(sc, 1)

# Create a socket stream on target ip:port and count the
# words in input stream of \n delimited text (eg. generated by 'nc')
# words in input stream of \n delimited text (e.g. generated by 'nc')
lines = ssc.socketTextStream(host, port)
words = lines.flatMap(lambda line: line.split(" "))
wordCounts = words.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def getSparkSessionInstance(sparkConf):
ssc = StreamingContext(sc, 1)

# Create a socket stream on target ip:port and count the
# words in input stream of \n delimited text (eg. generated by 'nc')
# words in input stream of \n delimited text (e.g. generated by 'nc')
lines = ssc.socketTextStream(host, int(port))
words = lines.flatMap(lambda line: line.split(" "))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ object CustomReceiver {
val ssc = new StreamingContext(sparkConf, Seconds(1))

// Create an input stream with the custom receiver on target ip:port and count the
// words in input stream of \n delimited text (eg. generated by 'nc')
// words in input stream of \n delimited text (e.g. generated by 'nc')
val lines = ssc.receiverStream(new CustomReceiver(args(0), args(1).toInt))
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ object NetworkWordCount {
val ssc = new StreamingContext(sparkConf, Seconds(1))

// Create a socket stream on target ip:port and count the
// words in input stream of \n delimited text (eg. generated by 'nc')
// words in input stream of \n delimited text (e.g. generated by 'nc')
// Note that no duplication in storage level only for running locally.
// Replication necessary in distributed scenario for fault tolerance.
val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ object RecoverableNetworkWordCount {
ssc.checkpoint(checkpointDirectory)

// Create a socket stream on target ip:port and count the
// words in input stream of \n delimited text (eg. generated by 'nc')
// words in input stream of \n delimited text (e.g. generated by 'nc')
val lines = ssc.socketTextStream(ip, port)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map((_, 1)).reduceByKey(_ + _)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ object SqlNetworkWordCount {
val ssc = new StreamingContext(sparkConf, Seconds(2))

// Create a socket stream on target ip:port and count the
// words in input stream of \n delimited text (eg. generated by 'nc')
// words in input stream of \n delimited text (e.g. generated by 'nc')
// Note that no duplication in storage level only for running locally.
// Replication necessary in distributed scenario for fault tolerance.
val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ object StatefulNetworkWordCount {
val initialRDD = ssc.sparkContext.parallelize(List(("hello", 1), ("world", 1)))

// Create a ReceiverInputDStream on target ip:port and count the
// words in input stream of \n delimited test (eg. generated by 'nc')
// words in input stream of \n delimited test (e.g. generated by 'nc')
val lines = ssc.socketTextStream(args(0), args(1).toInt)
val words = lines.flatMap(_.split(" "))
val wordDstream = words.map(x => (x, 1))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ abstract class DatabaseOnDocker {
val env: Map[String, String]

/**
* Wheather or not to use ipc mode for shared memory when starting docker image
* Whether or not to use ipc mode for shared memory when starting docker image
*/
val usesIpc: Boolean

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class KafkaContinuousSourceSuite extends KafkaSourceSuiteBase with KafkaContinuo
withTable(table) {
val topic = newTopic()
testUtils.createTopic(topic)
testUtils.withTranscationalProducer { producer =>
testUtils.withTransactionalProducer { producer =>
val df = spark
.readStream
.format("kafka")
Expand Down Expand Up @@ -99,7 +99,7 @@ class KafkaContinuousSourceSuite extends KafkaSourceSuiteBase with KafkaContinuo
withTable(table) {
val topic = newTopic()
testUtils.createTopic(topic)
testUtils.withTranscationalProducer { producer =>
testUtils.withTransactionalProducer { producer =>
val df = spark
.readStream
.format("kafka")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
val rows = spark.table("kafkaWatermark").collect()
assert(rows.length === 1, s"Unexpected results: ${rows.toList}")
val row = rows(0)
// We cannot check the exact window start time as it depands on the time that messages were
// We cannot check the exact window start time as it depends on the time that messages were
// inserted by the producer. So here we just use a low bound to make sure the internal
// conversion works.
assert(
Expand Down Expand Up @@ -740,7 +740,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {

val topicPartition = new TopicPartition(topic, 0)
// The message values are the same as their offsets to make the test easy to follow
testUtils.withTranscationalProducer { producer =>
testUtils.withTransactionalProducer { producer =>
testStream(mapped)(
StartStream(ProcessingTime(100), clock),
waitUntilBatchProcessed,
Expand Down Expand Up @@ -863,7 +863,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {

val topicPartition = new TopicPartition(topic, 0)
// The message values are the same as their offsets to make the test easy to follow
testUtils.withTranscationalProducer { producer =>
testUtils.withTransactionalProducer { producer =>
testStream(mapped)(
StartStream(ProcessingTime(100), clock),
waitUntilBatchProcessed,
Expand Down Expand Up @@ -954,7 +954,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
.load()
.select($"value".as[String])

testUtils.withTranscationalProducer { producer =>
testUtils.withTransactionalProducer { producer =>
producer.beginTransaction()
(0 to 3).foreach { i =>
producer.send(new ProducerRecord[String, String](topic, i.toString)).get()
Expand All @@ -970,7 +970,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
// this case, if we forget to reset `FetchedData._nextOffsetInFetchedData` or
// `FetchedData._offsetAfterPoll` (See SPARK-25495), the next batch will see incorrect
// values and return wrong results hence fail the test.
testUtils.withTranscationalProducer { producer =>
testUtils.withTransactionalProducer { producer =>
producer.beginTransaction()
(4 to 7).foreach { i =>
producer.send(new ProducerRecord[String, String](topic, i.toString)).get()
Expand Down Expand Up @@ -1472,7 +1472,7 @@ abstract class KafkaSourceSuiteBase extends KafkaSourceTest {
withTable(table) {
val topic = newTopic()
testUtils.createTopic(topic)
testUtils.withTranscationalProducer { producer =>
testUtils.withTransactionalProducer { producer =>
val df = spark
.readStream
.format("kafka")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ class KafkaRelationSuite extends QueryTest with SharedSQLContext with KafkaTest
test("read Kafka transactional messages: read_committed") {
val topic = newTopic()
testUtils.createTopic(topic)
testUtils.withTranscationalProducer { producer =>
testUtils.withTransactionalProducer { producer =>
val df = spark
.read
.format("kafka")
Expand Down Expand Up @@ -288,7 +288,7 @@ class KafkaRelationSuite extends QueryTest with SharedSQLContext with KafkaTest
test("read Kafka transactional messages: read_uncommitted") {
val topic = newTopic()
testUtils.createTopic(topic)
testUtils.withTranscationalProducer { producer =>
testUtils.withTransactionalProducer { producer =>
val df = spark
.read
.format("kafka")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
}

/** Call `f` with a `KafkaProducer` that has initialized transactions. */
def withTranscationalProducer(f: KafkaProducer[String, String] => Unit): Unit = {
def withTransactionalProducer(f: KafkaProducer[String, String] => Unit): Unit = {
val props = producerConfiguration
props.put("transactional.id", UUID.randomUUID().toString)
val producer = new KafkaProducer[String, String](props)
Expand Down Expand Up @@ -390,7 +390,7 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
// ensure that logs from all replicas are deleted if delete topic is marked successful
assert(servers.forall(server => topicAndPartitions.forall(tp =>
server.getLogManager().getLog(tp).isEmpty)),
s"topic $topic still exists in log mananger")
s"topic $topic still exists in log manager")
// ensure that topic is removed from all cleaner offsets
assert(servers.forall(server => topicAndPartitions.forall { tp =>
val checkpoints = server.getLogManager().liveLogDirs.map { logDir =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
*
* Usage: JavaKinesisWordCountASL [app-name] [stream-name] [endpoint-url] [region-name]
* [app-name] is the name of the consumer app, used to track the read data in DynamoDB
* [stream-name] name of the Kinesis stream (ie. mySparkStream)
* [stream-name] name of the Kinesis stream (i.e. mySparkStream)
* [endpoint-url] endpoint of the Kinesis service
* (e.g. https://kinesis.us-east-1.amazonaws.com)
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

Usage: kinesis_wordcount_asl.py <app-name> <stream-name> <endpoint-url> <region-name>
<app-name> is the name of the consumer app, used to track the read data in DynamoDB
<stream-name> name of the Kinesis stream (ie. mySparkStream)
<stream-name> name of the Kinesis stream (i.e. mySparkStream)
<endpoint-url> endpoint of the Kinesis service
(e.g. https://kinesis.us-east-1.amazonaws.com)
<region-name> region name of the Kinesis endpoint (e.g. us-east-1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import org.apache.spark.streaming.kinesis.KinesisInputDStream
*
* Usage: KinesisWordCountASL <app-name> <stream-name> <endpoint-url> <region-name>
* <app-name> is the name of the consumer app, used to track the read data in DynamoDB
* <stream-name> name of the Kinesis stream (ie. mySparkStream)
* <stream-name> name of the Kinesis stream (i.e. mySparkStream)
* <endpoint-url> endpoint of the Kinesis service
* (e.g. https://kinesis.us-east-1.amazonaws.com)
*
Expand Down Expand Up @@ -167,9 +167,9 @@ object KinesisWordCountASL extends Logging {
* Usage: KinesisWordProducerASL <stream-name> <endpoint-url> \
* <records-per-sec> <words-per-record>
*
* <stream-name> is the name of the Kinesis stream (ie. mySparkStream)
* <stream-name> is the name of the Kinesis stream (i.e. mySparkStream)
* <endpoint-url> is the endpoint of the Kinesis service
* (ie. https://kinesis.us-east-1.amazonaws.com)
* (i.e. https://kinesis.us-east-1.amazonaws.com)
* <records-per-sec> is the rate of records per second to put onto the stream
* <words-per-record> is the number of words per record
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,7 @@ private class KinesisUtilsPythonHelper {
// scalastyle:on
if (!(stsAssumeRoleArn != null && stsSessionName != null && stsExternalId != null)
&& !(stsAssumeRoleArn == null && stsSessionName == null && stsExternalId == null)) {
throw new IllegalArgumentException("stsAssumeRoleArn, stsSessionName, and stsExtenalId " +
throw new IllegalArgumentException("stsAssumeRoleArn, stsSessionName, and stsExternalId " +
"must all be defined or all be null")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,8 @@ class PageRankSuite extends SparkFunSuite with LocalSparkContext {
withSpark { sc =>
// Check that implementation can handle large vertexIds, SPARK-25149
val vertexIdOffset = Int.MaxValue.toLong + 1
val sourceOffest = 4
val source = vertexIdOffset + sourceOffest
val sourceOffset = 4
val source = vertexIdOffset + sourceOffset
val numIter = 10
val vertices = vertexIdOffset until vertexIdOffset + numIter
val chain1 = vertices.zip(vertices.tail)
Expand All @@ -216,7 +216,7 @@ class PageRankSuite extends SparkFunSuite with LocalSparkContext {
val tol = 0.0001
val errorTol = 1.0e-1

val a = resetProb / (1 - Math.pow(1 - resetProb, numIter - sourceOffest))
val a = resetProb / (1 - Math.pow(1 - resetProb, numIter - sourceOffset))
// We expect the rank to decay as (1 - resetProb) ^ distance
val expectedRanks = sc.parallelize(vertices).map { vid =>
val rank = if (vid < source) {
Expand Down