@@ -64,7 +64,9 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
6464 pyFiles = pyFiles , environment = environment , batchSize = batchSize ,
6565 serializer = serializer , conf = conf , gateway = gateway )
6666
67- # Start py4j callback server
67+ # Start py4j callback server.
68+ # Callback sever is need only by SparkStreming; therefore the callback sever
69+ # is started in StreamingContext.
6870 SparkContext ._gateway .restart_callback_server ()
6971 self ._clean_up_trigger ()
7072 self ._jvm = self ._sc ._jvm
@@ -78,6 +80,8 @@ def _clean_up_trigger(self):
7880 """Kill py4j callback server properly using signal lib"""
7981
8082 def clean_up_handler (* args ):
83+ # Make sure stop callback server.
84+ # This need improvement how to terminate callback sever properly.
8185 SparkContext ._gateway ._shutdown_callback_server ()
8286 SparkContext ._gateway .shutdown ()
8387 sys .exit (0 )
@@ -100,7 +104,7 @@ def awaitTermination(self, timeout=None):
100104 else :
101105 self ._jssc .awaitTermination (timeout )
102106
103- # start from simple one. storageLevel is not passed for now.
107+ #TODO: add storageLevel
104108 def socketTextStream (self , hostname , port ):
105109 """
106110 Create an input from TCP source hostname:port. Data is received using
@@ -134,7 +138,7 @@ def stop(self, stopSparkContext=True, stopGraceFully=False):
134138 def _testInputStream (self , test_inputs , numSlices = None ):
135139 """
136140 This function is only for test.
137- This implementation is inpired by QueStream implementation.
141+ This implementation is inspired by QueStream implementation.
138142 Give list of RDD to generate DStream which contains the RDD.
139143 """
140144 test_rdds = list ()
@@ -144,9 +148,6 @@ def _testInputStream(self, test_inputs, numSlices=None):
144148 test_rdds .append (test_rdd ._jrdd )
145149 test_rdd_deserializers .append (test_rdd ._jrdd_deserializer )
146150
147- # if len(set(test_rdd_deserializers)) > 1:
148- # raise IOError("Deserializer should be one type to run test case. "
149- # "See the SparkContext.parallelize to understand how to decide deserializer")
150151 jtest_rdds = ListConverter ().convert (test_rdds , SparkContext ._gateway ._gateway_client )
151152 jinput_stream = self ._jvm .PythonTestInputStream (self ._jssc , jtest_rdds ).asJavaDStream ()
152153
0 commit comments