3939import org .apache .spark .streaming .api .java .JavaReceiverInputDStream ;
4040import org .apache .spark .streaming .api .java .JavaStreamingContext ;
4141
42-
4342/**
4443 * Counts words cumulatively in UTF8 encoded, '\n' delimited text received from the network every
4544 * second starting with initial value of word count.
@@ -65,17 +64,17 @@ public static void main(String[] args) {
6564 StreamingExamples .setStreamingLogLevels ();
6665
6766 // Update the cumulative count function
68- final Function2 <List <Integer >, Optional <Integer >, Optional <Integer >> updateFunction = new
69- Function2 <List <Integer >, Optional <Integer >, Optional <Integer >>() {
70- @ Override
71- public Optional <Integer > call (List <Integer > values , Optional <Integer > state ) {
72- Integer newSum = state .or (0 );
73- for (Integer value : values ) {
74- newSum += value ;
75- }
76- return Optional .of (newSum );
77- }
78- };
67+ final Function2 <List <Integer >, Optional <Integer >, Optional <Integer >> updateFunction =
68+ new Function2 <List <Integer >, Optional <Integer >, Optional <Integer >>() {
69+ @ Override
70+ public Optional <Integer > call (List <Integer > values , Optional <Integer > state ) {
71+ Integer newSum = state .or (0 );
72+ for (Integer value : values ) {
73+ newSum += value ;
74+ }
75+ return Optional .of (newSum );
76+ }
77+ };
7978
8079 // Create the context with a 1 second batch size
8180 SparkConf sparkConf = new SparkConf ().setAppName ("JavaStatefulNetworkWordCount" );
@@ -97,12 +96,13 @@ public Iterable<String> call(String x) {
9796 }
9897 });
9998
100- JavaPairDStream <String , Integer > wordsDstream = words .mapToPair (new PairFunction <String , String , Integer >() {
101- @ Override
102- public Tuple2 <String , Integer > call (String s ) {
103- return new Tuple2 <String , Integer >(s , 1 );
104- }
105- });
99+ JavaPairDStream <String , Integer > wordsDstream = words .mapToPair (
100+ new PairFunction <String , String , Integer >() {
101+ @ Override
102+ public Tuple2 <String , Integer > call (String s ) {
103+ return new Tuple2 <String , Integer >(s , 1 );
104+ }
105+ });
106106
107107 // This will give a Dstream made of state (which is the cumulative count of the words)
108108 JavaPairDStream <String , Integer > stateDstream = wordsDstream .updateStateByKey (updateFunction ,
0 commit comments