@@ -54,6 +54,15 @@ class SSLWantReadError(Exception):
5454 class SSLWantWriteError (Exception ):
5555 pass
5656
57+ # needed for SASL_GSSAPI authentication:
58+ try :
59+ import gssapi
60+ from gssapi .raw .misc import GSSError
61+ except ImportError :
62+ #no gssapi available, will disable gssapi mechanism
63+ gssapi = None
64+ GSSError = None
65+
5766class ConnectionStates (object ):
5867 DISCONNECTING = '<disconnecting>'
5968 DISCONNECTED = '<disconnected>'
@@ -167,9 +176,13 @@ class BrokerConnection(object):
167176 'metric_group_prefix' : '' ,
168177 'sasl_mechanism' : 'PLAIN' ,
169178 'sasl_plain_username' : None ,
170- 'sasl_plain_password' : None
179+ 'sasl_plain_password' : None ,
180+ 'sasl_kerberos_service_name' :'kafka'
171181 }
172- SASL_MECHANISMS = ('PLAIN' ,)
182+ if gssapi is None :
183+ SASL_MECHANISMS = ('PLAIN' ,)
184+ else :
185+ SASL_MECHANISMS = ('PLAIN' , 'GSSAPI' )
173186
174187 def __init__ (self , host , port , afi , ** configs ):
175188 self .hostname = host
@@ -203,6 +216,9 @@ def __init__(self, host, port, afi, **configs):
203216 if self .config ['sasl_mechanism' ] == 'PLAIN' :
204217 assert self .config ['sasl_plain_username' ] is not None , 'sasl_plain_username required for PLAIN sasl'
205218 assert self .config ['sasl_plain_password' ] is not None , 'sasl_plain_password required for PLAIN sasl'
219+ if self .config ['sasl_mechanism' ] == 'GSSAPI' :
220+ assert gssapi is not None , 'GSSAPI lib not available'
221+ assert self .config ['sasl_kerberos_service_name' ] is not None , 'sasl_servicename_kafka required for GSSAPI sasl'
206222
207223 self .state = ConnectionStates .DISCONNECTED
208224 self ._reset_reconnect_backoff ()
@@ -433,6 +449,8 @@ def _handle_sasl_handshake_response(self, future, response):
433449
434450 if self .config ['sasl_mechanism' ] == 'PLAIN' :
435451 return self ._try_authenticate_plain (future )
452+ elif self .config ['sasl_mechanism' ] == 'GSSAPI' :
453+ return self ._try_authenticate_gssapi (future )
436454 else :
437455 return future .failure (
438456 Errors .UnsupportedSaslMechanismError (
@@ -477,6 +495,61 @@ def _try_authenticate_plain(self, future):
477495
478496 return future .success (True )
479497
498+ def _try_authenticate_gssapi (self , future ):
499+
500+ data = b''
501+ gssname = self .config ['sasl_kerberos_service_name' ] + '@' + self .hostname
502+ ctx_Name = gssapi .Name (gssname , name_type = gssapi .NameType .hostbased_service )
503+ ctx_CanonName = ctx_Name .canonicalize (gssapi .MechType .kerberos )
504+ log .debug ('%s: canonical Servicename: %s' , self , ctx_CanonName )
505+ ctx_Context = gssapi .SecurityContext (name = ctx_CanonName , usage = 'initiate' )
506+ #Exchange tokens until authentication either suceeded or failed:
507+ received_token = None
508+ try :
509+ while not ctx_Context .complete :
510+ #calculate the output token
511+ try :
512+ output_token = ctx_Context .step (received_token )
513+ except GSSError as e :
514+ log .exception ("%s: Error invalid token received from server" , self )
515+ error = Errors .ConnectionError ("%s: %s" % (self , e ))
516+
517+ if not output_token :
518+ if ctx_Context .complete :
519+ log .debug ("%s: Security Context complete " , self )
520+ log .debug ("%s: Successful GSSAPI handshake for %s" , self , ctx_Context .initiator_name )
521+ break
522+ try :
523+ self ._sock .setblocking (True )
524+ # Send output token
525+ msg = output_token
526+ size = Int32 .encode (len (msg ))
527+ self ._sock .sendall (size + msg )
528+
529+ # The server will send a token back. processing of this token either
530+ # establishes a security context, or needs further token exchange
531+ # the gssapi will be able to identify the needed next step
532+ # The connection is closed on failure
533+ response = self ._sock .recv (2000 )
534+ self ._sock .setblocking (False )
535+
536+ except (AssertionError , ConnectionError ) as e :
537+ log .exception ("%s: Error receiving reply from server" , self )
538+ error = Errors .ConnectionError ("%s: %s" % (self , e ))
539+ future .failure (error )
540+ self .close (error = error )
541+
542+ #pass the received token back to gssapi, strip the first 4 bytes
543+ received_token = response [4 :]
544+
545+ except Exception as e :
546+ log .exception ("%s: GSSAPI handshake error" , self )
547+ error = Errors .ConnectionError ("%s: %s" % (self , e ))
548+ future .failure (error )
549+ self .close (error = error )
550+
551+ return future .success (True )
552+
480553 def blacked_out (self ):
481554 """
482555 Return true if we are disconnected from the given node and can't
0 commit comments