66from kafka .client_async import KafkaClient , selectors
77import kafka .errors as Errors
88from kafka .errors import (
9- IncompatibleBrokerVersion , KafkaConfigurationError , KafkaConnectionError ,
10- NodeNotReadyError , NotControllerError )
9+ IncompatibleBrokerVersion , KafkaConfigurationError , NotControllerError ,
10+ UnrecognizedBrokerVersion )
1111from kafka .metrics import MetricConfig , Metrics
1212from kafka .protocol .admin import (
1313 CreateTopicsRequest , DeleteTopicsRequest , DescribeConfigsRequest , AlterConfigsRequest , CreatePartitionsRequest ,
@@ -232,17 +232,22 @@ def _validate_timeout(self, timeout_ms):
232232 return timeout_ms or self .config ['request_timeout_ms' ]
233233
234234 def _refresh_controller_id (self ):
235- """Determine the kafka cluster controller
236- """
237- response = self ._send_request_to_node (
238- self ._client .least_loaded_node (),
239- MetadataRequest [1 ]([])
240- )
241- self ._controller_id = response .controller_id
242- version = self ._client .check_version (self ._controller_id )
243- if version < (0 , 10 , 0 ):
244- raise IncompatibleBrokerVersion (
245- "The controller appears to be running Kafka {}. KafkaAdmin requires brokers >= 0.10.0.0."
235+ """Determine the kafka cluster controller."""
236+ version = self ._matching_api_version (MetadataRequest )
237+ if 1 <= version <= 6 :
238+ request = MetadataRequest [version ]()
239+ response = self ._send_request_to_node (self ._client .least_loaded_node (), request )
240+ controller_id = response .controller_id
241+ # verify the controller is new enough to support our requests
242+ controller_version = self ._client .check_version (controller_id )
243+ if controller_version < (0 , 10 , 0 ):
244+ raise IncompatibleBrokerVersion (
245+ "The controller appears to be running Kafka {}. KafkaAdmin requires brokers >= 0.10.0.0."
246+ .format (controller_version ))
247+ self ._controller_id = controller_id
248+ else :
249+ raise UnrecognizedBrokerVersion (
250+ "Kafka Admin interface cannot determine the controller using MetadataRequest_v{}."
246251 .format (version ))
247252
248253 def _find_group_coordinator_id (self , group_id ):
@@ -301,22 +306,34 @@ def _send_request_to_node(self, node, request):
301306 else :
302307 raise future .exception # pylint: disable-msg=raising-bad-type
303308
304- def _send (self , request ):
305- """Send a kafka protocol message to the cluster controller. Will block until the message result is received.
309+ def _send_request_to_controller (self , request ):
310+ """Send a kafka protocol message to the cluster controller.
311+
312+ Will block until the message result is received.
306313
307314 :param request: The message to send
308- :return The kafka protocol response for the message
309- :exception NodeNotReadyError: If the controller connection can't be established
315+ :return: The kafka protocol response for the message
310316 """
311- remaining_tries = 2
312- while remaining_tries > 0 :
313- remaining_tries = remaining_tries - 1
314- try :
315- return self ._send_request_to_node (self ._controller_id , request )
316- except (NotControllerError , KafkaConnectionError ) as e :
317- # controller changed? refresh it
318- self ._refresh_controller_id ()
319- raise NodeNotReadyError (self ._controller_id )
317+ tries = 2 # in case our cached self._controller_id is outdated
318+ while tries :
319+ tries -= 1
320+ response = self ._send_request_to_node (self ._controller_id , request )
321+ # DeleteTopicsResponse returns topic_error_codes rather than topic_errors
322+ for topic , error_code in getattr (response , "topic_errors" , response .topic_error_codes ):
323+ error_type = Errors .for_code (error_code )
324+ if tries and isinstance (error_type , NotControllerError ):
325+ # No need to inspect the rest of the errors for
326+ # non-retriable errors because NotControllerError should
327+ # either be thrown for all errors or no errors.
328+ self ._refresh_controller_id ()
329+ break
330+ elif error_type is not Errors .NoError :
331+ raise error_type (
332+ "Request '{}' failed with response '{}'."
333+ .format (request , response ))
334+ else :
335+ return response
336+ raise RuntimeError ("This should never happen, please file a bug with full stacktrace if encountered" )
320337
321338 @staticmethod
322339 def _convert_new_topic_request (new_topic ):
@@ -362,7 +379,7 @@ def create_topics(self, new_topics, timeout_ms=None, validate_only=None):
362379 raise NotImplementedError (
363380 "Support for CreateTopics v{} has not yet been added to KafkaAdmin."
364381 .format (version ))
365- return self ._send (request )
382+ return self ._send_request_to_controller (request )
366383
367384 def delete_topics (self , topics , timeout_ms = None ):
368385 """Delete topics from the cluster
@@ -382,19 +399,25 @@ def delete_topics(self, topics, timeout_ms=None):
382399 raise NotImplementedError (
383400 "Support for DeleteTopics v{} has not yet been added to KafkaAdmin."
384401 .format (version ))
385- return self ._send (request )
402+ return self ._send_request_to_controller (request )
386403
387404 # list topics functionality is in ClusterMetadata
405+ # Note: if implemented here, send the request to the least_loaded_node()
388406
389407 # describe topics functionality is in ClusterMetadata
408+ # Note: if implemented here, send the request to the controller
390409
391410 # describe cluster functionality is in ClusterMetadata
411+ # Note: if implemented here, send the request to the least_loaded_node()
392412
393- # describe_acls protocol not implemented
413+ # describe_acls protocol not yet implemented
414+ # Note: send the request to the least_loaded_node()
394415
395- # create_acls protocol not implemented
416+ # create_acls protocol not yet implemented
417+ # Note: send the request to the least_loaded_node()
396418
397- # delete_acls protocol not implemented
419+ # delete_acls protocol not yet implemented
420+ # Note: send the request to the least_loaded_node()
398421
399422 @staticmethod
400423 def _convert_describe_config_resource_request (config_resource ):
@@ -434,7 +457,7 @@ def describe_configs(self, config_resources, include_synonyms=None):
434457 raise NotImplementedError (
435458 "Support for DescribeConfigs v{} has not yet been added to KafkaAdmin."
436459 .format (version ))
437- return self ._send ( request )
460+ return self ._send_request_to_node ( self . _client . least_loaded_node (), request )
438461
439462 @staticmethod
440463 def _convert_alter_config_resource_request (config_resource ):
@@ -449,6 +472,12 @@ def _convert_alter_config_resource_request(config_resource):
449472 def alter_configs (self , config_resources ):
450473 """Alter configuration parameters of one or more kafka resources.
451474
475+ Warning:
476+ This is currently broken for BROKER resources because those must be
477+ sent to that specific broker, versus this always picks the
478+ least-loaded node. See the comment in the source code for details.
479+ We would happily accept a PR fixing this.
480+
452481 :param config_resources: An array of ConfigResource objects.
453482 :return: Appropriate version of AlterConfigsResponse class
454483 """
@@ -461,11 +490,19 @@ def alter_configs(self, config_resources):
461490 raise NotImplementedError (
462491 "Support for AlterConfigs v{} has not yet been added to KafkaAdmin."
463492 .format (version ))
464- return self ._send (request )
493+ # TODO the Java client has the note:
494+ # // We must make a separate AlterConfigs request for every BROKER resource we want to alter
495+ # // and send the request to that specific broker. Other resources are grouped together into
496+ # // a single request that may be sent to any broker.
497+ #
498+ # So this is currently broken as it always sends to the least_loaded_node()
499+ return self ._send_request_to_node (self ._client .least_loaded_node (), request )
465500
466- # alter replica logs dir protocol not implemented
501+ # alter replica logs dir protocol not yet implemented
502+ # Note: have to lookup the broker with the replica assignment and send the request to that broker
467503
468- # describe log dirs protocol not implemented
504+ # describe log dirs protocol not yet implemented
505+ # Note: have to lookup the broker with the replica assignment and send the request to that broker
469506
470507 @staticmethod
471508 def _convert_create_partitions_request (topic_name , new_partitions ):
@@ -498,17 +535,22 @@ def create_partitions(self, topic_partitions, timeout_ms=None, validate_only=Non
498535 raise NotImplementedError (
499536 "Support for CreatePartitions v{} has not yet been added to KafkaAdmin."
500537 .format (version ))
501- return self ._send (request )
538+ return self ._send_request_to_controller (request )
502539
503- # delete records protocol not implemented
540+ # delete records protocol not yet implemented
541+ # Note: send the request to the partition leaders
504542
505- # create delegation token protocol not implemented
543+ # create delegation token protocol not yet implemented
544+ # Note: send the request to the least_loaded_node()
506545
507- # renew delegation token protocol not implemented
546+ # renew delegation token protocol not yet implemented
547+ # Note: send the request to the least_loaded_node()
508548
509- # expire delegation_token protocol not implemented
549+ # expire delegation_token protocol not yet implemented
550+ # Note: send the request to the least_loaded_node()
510551
511- # describe delegation_token protocol not implemented
552+ # describe delegation_token protocol not yet implemented
553+ # Note: send the request to the least_loaded_node()
512554
513555 def describe_consumer_groups (self , group_ids ):
514556 """Describe a set of consumer groups.
@@ -525,7 +567,8 @@ def describe_consumer_groups(self, group_ids):
525567 raise NotImplementedError (
526568 "Support for DescribeGroups v{} has not yet been added to KafkaAdmin."
527569 .format (version ))
528- return self ._send (request )
570+ # TODO this is completely broken, as it needs to send to the group coordinator
571+ # return self._send(request)
529572
530573 def list_consumer_groups (self ):
531574 """List all consumer groups known to the cluster.
@@ -539,6 +582,8 @@ def list_consumer_groups(self):
539582 raise NotImplementedError (
540583 "Support for ListGroups v{} has not yet been added to KafkaAdmin."
541584 .format (version ))
542- return self ._send (request )
585+ # TODO this is completely broken, as it needs to send to the group coordinator
586+ # return self._send(request)
543587
544- # delete groups protocol not implemented
588+ # delete groups protocol not yet implemented
589+ # Note: send the request to the group's coordinator.
0 commit comments