104104import org .apache .ratis .server .RaftServerRpc ;
105105import org .apache .ratis .server .protocol .TermIndex ;
106106import org .apache .ratis .server .storage .RaftStorage ;
107+ import org .apache .ratis .util .Preconditions ;
107108import org .apache .ratis .util .SizeInBytes ;
108109import org .apache .ratis .util .TimeDuration ;
109110import org .apache .ratis .util .TraditionalBinaryPrefix ;
@@ -161,19 +162,18 @@ private static long nextCallId() {
161162 private int clientPort ;
162163 private int dataStreamPort ;
163164 private final RaftServer server ;
165+ private final String name ;
164166 private final List <ThreadPoolExecutor > chunkExecutors ;
165167 private final ContainerDispatcher dispatcher ;
166168 private final ContainerController containerController ;
167169 private final ClientId clientId = ClientId .randomId ();
168170 private final StateContext context ;
169- private final long nodeFailureTimeoutMs ;
170171 private boolean isStarted = false ;
171172 private final DatanodeDetails datanodeDetails ;
172173 private final ConfigurationSource conf ;
173174 // TODO: Remove the gids set when Ratis supports an api to query active
174175 // pipelines
175176 private final ConcurrentMap <RaftGroupId , ActivePipelineContext > activePipelines = new ConcurrentHashMap <>();
176- private final RaftPeerId raftPeerId ;
177177 // Timeout used while calling submitRequest directly.
178178 private final long requestTimeout ;
179179 private final boolean shouldDeleteRatisLogDirectory ;
@@ -197,14 +197,14 @@ private XceiverServerRatis(HddsDatanodeService hddsDatanodeService, DatanodeDeta
197197 this .context = context ;
198198 this .dispatcher = dispatcher ;
199199 this .containerController = containerController ;
200- this .raftPeerId = RatisHelper .toRaftPeerId (dd );
201200 String threadNamePrefix = datanodeDetails .threadNamePrefix ();
202201 chunkExecutors = createChunkExecutors (conf , threadNamePrefix );
203- nodeFailureTimeoutMs = ratisServerConfig .getFollowerSlownessTimeout ();
204202 shouldDeleteRatisLogDirectory =
205203 ratisServerConfig .shouldDeleteRatisLogDirectory ();
206204
207205 RaftProperties serverProperties = newRaftProperties ();
206+ final RaftPeerId raftPeerId = RatisHelper .toRaftPeerId (dd );
207+ this .name = getClass ().getSimpleName () + "(" + raftPeerId + ")" ;
208208 this .server =
209209 RaftServer .newBuilder ().setServerId (raftPeerId )
210210 .setProperties (serverProperties )
@@ -474,7 +474,7 @@ private void setStateMachineDataConfigurations(RaftProperties properties) {
474474
475475 // NOTE : the default value for the retry count in ratis is -1,
476476 // which means retry indefinitely.
477- int syncTimeoutRetryDefault = (int ) nodeFailureTimeoutMs /
477+ final int syncTimeoutRetryDefault = (int ) ratisServerConfig . getFollowerSlownessTimeout () /
478478 dataSyncTimeout .toIntExact (TimeUnit .MILLISECONDS );
479479 int numSyncRetries = conf .getInt (
480480 OzoneConfigKeys .HDDS_CONTAINER_RATIS_STATEMACHINEDATA_SYNC_RETRIES ,
@@ -558,7 +558,7 @@ private static Parameters createTlsParameters(SecurityConfig conf,
558558 @ Override
559559 public void start () throws IOException {
560560 if (!isStarted ) {
561- LOG .info ("Starting {} {} " , getClass (). getSimpleName (), server . getId () );
561+ LOG .info ("Starting {}" , name );
562562 for (ThreadPoolExecutor executor : chunkExecutors ) {
563563 executor .prestartAllCoreThreads ();
564564 }
@@ -581,19 +581,19 @@ public void start() throws IOException {
581581 }
582582 }
583583
584- private int getRealPort (InetSocketAddress address , Port .Name name ) {
584+ private int getRealPort (InetSocketAddress address , Port .Name portName ) {
585585 int realPort = address .getPort ();
586- datanodeDetails . setPort ( DatanodeDetails .newPort (name , realPort ) );
587- LOG . info ( "{} {} is started using port {} for {}" ,
588- getClass (). getSimpleName (), server . getId (), realPort , name );
586+ final Port port = DatanodeDetails .newPort (portName , realPort );
587+ datanodeDetails . setPort ( port );
588+ LOG . info ( "{} is started using port {}" , name , port );
589589 return realPort ;
590590 }
591591
592592 @ Override
593593 public void stop () {
594594 if (isStarted ) {
595595 try {
596- LOG .info ("Stopping {} {} " , getClass (). getSimpleName (), server . getId () );
596+ LOG .info ("Closing {}" , name );
597597 // shutdown server before the executors as while shutting down,
598598 // some of the tasks would be executed using the executors.
599599 server .close ();
@@ -602,7 +602,7 @@ public void stop() {
602602 }
603603 isStarted = false ;
604604 } catch (IOException e ) {
605- LOG .error ("XceiverServerRatis Could not be stopped gracefully." , e );
605+ LOG .error ("Failed to close {}." , name , e );
606606 }
607607 }
608608 }
@@ -706,45 +706,40 @@ private GroupInfoRequest createGroupInfoRequest(
706706 nextCallId ());
707707 }
708708
709- private void handlePipelineFailure (RaftGroupId groupId ,
710- RoleInfoProto roleInfoProto ) {
711- String msg ;
712- UUID datanode = RatisHelper .toDatanodeId (roleInfoProto .getSelf ());
713- RaftPeerId id = RaftPeerId .valueOf (roleInfoProto .getSelf ().getId ());
709+ private void handlePipelineFailure (RaftGroupId groupId , RoleInfoProto roleInfoProto , String reason ) {
710+ final RaftPeerId raftPeerId = RaftPeerId .valueOf (roleInfoProto .getSelf ().getId ());
711+ Preconditions .assertEquals (getServer ().getId (), raftPeerId , "raftPeerId" );
712+ final StringBuilder b = new StringBuilder ()
713+ .append (name ).append (" with datanodeId " ).append (RatisHelper .toDatanodeId (raftPeerId ))
714+ .append ("handlePipelineFailure " ).append (" for " ).append (reason )
715+ .append (": " ).append (roleInfoProto .getRole ())
716+ .append (" elapsed time=" ).append (roleInfoProto .getRoleElapsedTimeMs ()).append ("ms" );
717+
714718 switch (roleInfoProto .getRole ()) {
715719 case CANDIDATE :
716- msg = datanode + " is in candidate state for " +
717- roleInfoProto . getCandidateInfo (). getLastLeaderElapsedTimeMs () + "ms" ;
720+ final long lastLeaderElapsedTime = roleInfoProto . getCandidateInfo (). getLastLeaderElapsedTimeMs ();
721+ b . append ( ", lastLeaderElapsedTime=" ). append ( lastLeaderElapsedTime ). append ( "ms" ) ;
718722 break ;
719723 case FOLLOWER :
720- msg = datanode + " closes pipeline when installSnapshot from leader " +
721- "because leader snapshot doesn't contain any data to replay, " +
722- "all the log entries prior to the snapshot might have been purged." +
723- "So follower should not try to install snapshot from leader but" +
724- "can close the pipeline here. It's in follower state for " +
725- roleInfoProto .getRoleElapsedTimeMs () + "ms" ;
724+ b .append (", outstandingOp=" ).append (roleInfoProto .getFollowerInfo ().getOutstandingOp ());
726725 break ;
727726 case LEADER :
728- StringBuilder sb = new StringBuilder ();
729- sb . append ( datanode ). append ( " has not seen follower/s" );
730- for ( RaftProtos . ServerRpcProto follower : roleInfoProto . getLeaderInfo ()
731- . getFollowerInfoList ()) {
732- if (follower .getLastRpcElapsedTimeMs () > nodeFailureTimeoutMs ) {
733- sb .append (" " ).append (RatisHelper . toDatanodeId ( follower . getId ()) )
734- .append (" for " ).append (follower . getLastRpcElapsedTimeMs ( ))
735- .append ("ms" );
736- }
727+ final long followerSlownessTimeoutMs = ratisServerConfig . getFollowerSlownessTimeout ();
728+ for ( RaftProtos . ServerRpcProto follower : roleInfoProto . getLeaderInfo (). getFollowerInfoList ()) {
729+ final long lastRpcElapsedTimeMs = follower . getLastRpcElapsedTimeMs ();
730+ final boolean slow = lastRpcElapsedTimeMs > followerSlownessTimeoutMs ;
731+ final RaftPeerId followerId = RaftPeerId . valueOf (follower .getId (). getId ());
732+ b .append ("\n Follower " ).append (followerId )
733+ .append (" with datanodeId " ).append (RatisHelper . toDatanodeId ( followerId ))
734+ . append ( " is " ) .append (slow ? "slow" : " responding" )
735+ . append ( " with lastRpcElapsedTime=" ). append ( lastRpcElapsedTimeMs ). append ( "ms" );
737736 }
738- msg = sb .toString ();
739737 break ;
740738 default :
741- LOG .error ("unknown state: {}" , roleInfoProto .getRole ());
742- throw new IllegalStateException ("node" + id + " is in illegal role "
743- + roleInfoProto .getRole ());
739+ throw new IllegalStateException ("Unexpected role " + roleInfoProto .getRole ());
744740 }
745741
746- triggerPipelineClose (groupId , msg ,
747- ClosePipelineInfo .Reason .PIPELINE_FAILED );
742+ triggerPipelineClose (groupId , b .toString (), ClosePipelineInfo .Reason .PIPELINE_FAILED );
748743 }
749744
750745 private void triggerPipelineClose (RaftGroupId groupId , String detail ,
@@ -869,12 +864,12 @@ public void removeGroup(HddsProtos.PipelineID pipelineId)
869864 processReply (reply );
870865 }
871866
872- void handleNodeSlowness (RaftGroupId groupId , RoleInfoProto roleInfoProto ) {
873- handlePipelineFailure (groupId , roleInfoProto );
867+ void handleFollowerSlowness (RaftGroupId groupId , RoleInfoProto roleInfoProto , RaftPeer follower ) {
868+ handlePipelineFailure (groupId , roleInfoProto , "slow follower " + follower . getId () );
874869 }
875870
876871 void handleNoLeader (RaftGroupId groupId , RoleInfoProto roleInfoProto ) {
877- handlePipelineFailure (groupId , roleInfoProto );
872+ handlePipelineFailure (groupId , roleInfoProto , "no leader" );
878873 }
879874
880875 void handleApplyTransactionFailure (RaftGroupId groupId ,
@@ -901,10 +896,9 @@ void handleApplyTransactionFailure(RaftGroupId groupId,
901896 void handleInstallSnapshotFromLeader (RaftGroupId groupId ,
902897 RoleInfoProto roleInfoProto ,
903898 TermIndex firstTermIndexInLog ) {
904- LOG .warn ("Install snapshot notification received from Leader with " +
905- "termIndex: {}, terminating pipeline: {}" ,
899+ LOG .warn ("handleInstallSnapshotFromLeader for firstTermIndexInLog={}, terminating pipeline: {}" ,
906900 firstTermIndexInLog , groupId );
907- handlePipelineFailure (groupId , roleInfoProto );
901+ handlePipelineFailure (groupId , roleInfoProto , "install snapshot notification" );
908902 }
909903
910904 /**
@@ -950,7 +944,7 @@ void handleLeaderChangedNotification(RaftGroupMemberId groupMemberId,
950944 LOG .info ("Leader change notification received for group: {} with new " +
951945 "leaderId: {}" , groupMemberId .getGroupId (), raftPeerId1 );
952946 // Save the reported leader to be sent with the report to SCM
953- boolean leaderForGroup = this . raftPeerId .equals (raftPeerId1 );
947+ final boolean leaderForGroup = server . getId () .equals (raftPeerId1 );
954948 activePipelines .compute (groupMemberId .getGroupId (),
955949 (key , value ) -> value == null ? new ActivePipelineContext (leaderForGroup , false ) :
956950 new ActivePipelineContext (leaderForGroup , value .isPendingClose ()));
0 commit comments