4141from apache_beam .transforms .userstate import BagStateSpec
4242from apache_beam .transforms .userstate import CombiningValueStateSpec
4343from apache_beam .utils import subprocess_server
44- from apache_beam .options .pipeline_options import (
45- PipelineOptions ,
46- GoogleCloudOptions
47- )
4844
4945NUM_RECORDS = 1000
5046
@@ -72,7 +68,7 @@ def process(
7268
7369class CrossLanguageKafkaIO (object ):
7470 def __init__ (
75- self , bootstrap_servers = None , topic = None , null_key = None , expansion_service = None ):
71+ self , bootstrap_servers , topic , null_key , expansion_service = None ):
7672 self .bootstrap_servers = bootstrap_servers
7773 self .topic = topic
7874 self .null_key = null_key
@@ -111,34 +107,6 @@ def build_read_pipeline(self, pipeline, max_num_records=None):
111107 | 'CalculateSum' >> beam .ParDo (CollectingFn ())
112108 | 'SetSumCounter' >> beam .Map (self .sum_counter .inc ))
113109
114- def build_read_pipeline_with_kerberos (self , p , max_num_records = None ):
115-
116- jaas_config = (
117- f'com.sun.security.auth.module.Krb5LoginModule required '
118- f'useKeyTab=true storeKey=true '
119- f'keyTab="secretValue:projects/dataflow-testing-311516/secrets/kafka-client-keytab/versions/latest" '
120- 121- )
122-
123- kafka_records = (
124- p
125- | 'ReadFromKafka' >> ReadFromKafka (
126- consumer_config = {
127- 'bootstrap.servers' : 'fozzie-test-kafka-broker.us-central1-c.c.dataflow-testing-311516.internal:9092' ,
128- 'auto.offset.reset' : 'earliest' ,
129- 'max_num_records' : max_num_records ,
130- 'security.protocol' : 'SASL_PLAINTEXT' ,
131- 'sasl.mechanism' : 'GSSAPI' ,
132- 'sasl.kerberos.service.name' : 'kafka' ,
133- 'sasl.jaas.config' : jaas_config
134- },
135- topics = ['fozzie_test_kerberos_topic' ],
136- key_deserializer = 'org.apache.kafka.common.serialization.StringDeserializer' ,
137- value_deserializer = 'org.apache.kafka.common.serialization.StringDeserializer' ,
138- consumer_factory_fn_class = 'org.apache.beam.sdk.extensions.kafka.factories.KerberosConsumerFactoryFn' ,
139- consumer_factory_fn_params = {'krb5Location' : 'gs://fozzie_testing_bucket/kerberos/krb5.conf' }))
140- return kafka_records
141-
142110 def run_xlang_kafkaio (self , pipeline ):
143111 self .build_write_pipeline (pipeline )
144112 self .build_read_pipeline (pipeline )
@@ -214,17 +182,6 @@ def test_hosted_kafkaio_null_key(self):
214182 self .run_kafka_write (pipeline_creator )
215183 self .run_kafka_read (pipeline_creator , None )
216184
217- def test_hosted_kafkaio_null_key_kerberos (self ):
218- kafka_topic = 'xlang_kafkaio_test_null_key_{}' .format (uuid .uuid4 ())
219- bootstrap_servers = 'fozzie-test-kafka-broker.us-central1-c.c.dataflow-testing-311516.internal:9092'
220- pipeline_creator = CrossLanguageKafkaIO (
221- bootstrap_servers ,
222- kafka_topic ,
223- True ,
224- 'localhost:%s' % os .environ .get ('EXPANSION_PORT' ))
225-
226- self .run_kafka_read_with_kerberos (pipeline_creator )
227-
228185 def run_kafka_write (self , pipeline_creator ):
229186 with TestPipeline () as pipeline :
230187 pipeline .not_use_test_runner_api = True
@@ -239,23 +196,6 @@ def run_kafka_read(self, pipeline_creator, expected_key):
239196 equal_to ([(expected_key , str (i ).encode ())
240197 for i in range (NUM_RECORDS )]))
241198
242- def run_kafka_read_with_kerberos (self , pipeline_creator ):
243- options_dict = {
244- 'runner' : 'DataflowRunner' ,
245- 'project' : 'dataflow-testing-311516' ,
246- 'region' : 'us-central1' ,
247- 'streaming' : False
248- }
249- options = PipelineOptions .from_dictionary (options_dict )
250- expected_records = [f'test{ i } ' for i in range (1 , 12 )]
251- with beam .Pipeline (options = options ) as p :
252- pipeline .not_use_test_runner_api = True
253- result = pipeline_creator .build_read_pipeline_with_kerberos (p , max_num_records = 11 )
254- assert_that (
255- result ,
256- equal_to (expected_records )
257- )
258-
259199 def get_platform_localhost (self ):
260200 if sys .platform == 'darwin' :
261201 return 'host.docker.internal'
0 commit comments