@@ -1336,6 +1336,10 @@ clusterLink *createClusterLink(clusterNode *node) {
13361336 * with this link will have the 'link' field set to NULL. */
13371337void freeClusterLink (clusterLink * link ) {
13381338 serverAssert (link != NULL );
1339+ serverLog (LL_DEBUG , "Freeing cluster link for node: %.40s:%s" ,
1340+ link -> node ? link -> node -> name : "<unknown>" ,
1341+ link -> inbound ? "inbound" : "outbound" );
1342+
13391343 if (link -> conn ) {
13401344 connClose (link -> conn );
13411345 link -> conn = NULL ;
@@ -1351,6 +1355,7 @@ void freeClusterLink(clusterLink *link) {
13511355 } else if (link -> node -> inbound_link == link ) {
13521356 serverAssert (link -> inbound );
13531357 link -> node -> inbound_link = NULL ;
1358+ link -> node -> inbound_link_freed_time = mstime ();
13541359 }
13551360 }
13561361 zfree (link );
@@ -1490,6 +1495,7 @@ clusterNode *createClusterNode(char *nodename, int flags) {
14901495 node -> fail_time = 0 ;
14911496 node -> link = NULL ;
14921497 node -> inbound_link = NULL ;
1498+ node -> inbound_link_freed_time = node -> ctime ;
14931499 memset (node -> ip , 0 , sizeof (node -> ip ));
14941500 node -> announce_client_ipv4 = sdsempty ();
14951501 node -> announce_client_ipv6 = sdsempty ();
@@ -1696,6 +1702,9 @@ void clusterAddNode(clusterNode *node) {
16961702 * it is a replica node.
16971703 */
16981704void clusterDelNode (clusterNode * delnode ) {
1705+ serverAssert (delnode != NULL );
1706+ serverLog (LL_DEBUG , "Deleting node %.40s from cluster view" , delnode -> name );
1707+
16991708 int j ;
17001709 dictIterator * di ;
17011710 dictEntry * de ;
@@ -2078,7 +2087,7 @@ void clearNodeFailureIfNeeded(clusterNode *node) {
20782087/* Return 1 if we already have a node in HANDSHAKE state matching the
20792088 * specified ip address and port number. This function is used in order to
20802089 * avoid adding a new handshake node for the same address multiple times. */
2081- int clusterHandshakeInProgress (char * ip , int port , int cport ) {
2090+ static int clusterHandshakeInProgress (char * ip , int port , int cport ) {
20822091 dictIterator * di ;
20832092 dictEntry * de ;
20842093
@@ -2100,7 +2109,7 @@ int clusterHandshakeInProgress(char *ip, int port, int cport) {
21002109 *
21012110 * EAGAIN - There is already a handshake in progress for this address.
21022111 * EINVAL - IP or port are not valid. */
2103- int clusterStartHandshake (char * ip , int port , int cport ) {
2112+ static int clusterStartHandshake (char * ip , int port , int cport ) {
21042113 clusterNode * n ;
21052114 char norm_ip [NET_IP_STR_LEN ];
21062115 struct sockaddr_storage sa ;
@@ -3207,33 +3216,48 @@ int clusterProcessPacket(clusterLink *link) {
32073216 }
32083217 }
32093218
3210- /* Add this node if it is new for us and the msg type is MEET.
3211- * In this stage we don't try to add the node with the right
3212- * flags, replicaof pointer, and so forth, as this details will be
3213- * resolved when we'll receive PONGs from the node. The exception
3214- * to this is the flag that indicates extensions are supported, as
3215- * we want to send extensions right away in the return PONG in order
3216- * to reduce the amount of time needed to stabilize the shard ID. */
3217- if (!sender && type == CLUSTERMSG_TYPE_MEET ) {
3218- clusterNode * node ;
3219-
3220- node = createClusterNode (NULL , CLUSTER_NODE_HANDSHAKE );
3221- serverAssert (nodeIp2String (node -> ip , link , hdr -> myip ) == C_OK );
3222- getClientPortFromClusterMsg (hdr , & node -> tls_port , & node -> tcp_port );
3223- node -> cport = ntohs (hdr -> cport );
3224- if (hdr -> mflags [0 ] & CLUSTERMSG_FLAG0_EXT_DATA ) {
3225- node -> flags |= CLUSTER_NODE_EXTENSIONS_SUPPORTED ;
3219+ if (type == CLUSTERMSG_TYPE_MEET ) {
3220+ if (!sender ) {
3221+ /* Add this node if it is new for us and the msg type is MEET.
3222+ * In this stage we don't try to add the node with the right
3223+ * flags, replicaof pointer, and so forth, as this details will be
3224+ * resolved when we'll receive PONGs from the node. The exception
3225+ * to this is the flag that indicates extensions are supported, as
3226+ * we want to send extensions right away in the return PONG in order
3227+ * to reduce the amount of time needed to stabilize the shard ID. */
3228+ clusterNode * node ;
3229+
3230+ node = createClusterNode (NULL , CLUSTER_NODE_HANDSHAKE );
3231+ serverAssert (nodeIp2String (node -> ip , link , hdr -> myip ) == C_OK );
3232+ getClientPortFromClusterMsg (hdr , & node -> tls_port , & node -> tcp_port );
3233+ node -> cport = ntohs (hdr -> cport );
3234+ if (hdr -> mflags [0 ] & CLUSTERMSG_FLAG0_EXT_DATA ) {
3235+ node -> flags |= CLUSTER_NODE_EXTENSIONS_SUPPORTED ;
3236+ }
3237+ setClusterNodeToInboundClusterLink (node , link );
3238+ clusterAddNode (node );
3239+ clusterDoBeforeSleep (CLUSTER_TODO_SAVE_CONFIG );
3240+
3241+ /* If this is a MEET packet from an unknown node, we still process
3242+ * the gossip section here since we have to trust the sender because
3243+ * of the message type. */
3244+ clusterProcessGossipSection (hdr , link );
3245+ } else if (sender -> link && now - sender -> ctime > server .cluster_node_timeout ) {
3246+ /* The MEET packet is from a known node, after the handshake timeout, so the sender thinks that I do not
3247+ * know it.
3248+ * Freeing my outbound link to that node, to force a reconnect and sending a PING.
3249+ * Once that node receives our PING, it should recognize the new connection as an inbound link from me.
3250+ * We should only free the outbound link if the node is known for more time than the handshake timeout,
3251+ * since during this time, the other side might still be trying to complete the handshake. */
3252+
3253+ /* We should always receive a MEET packet on an inbound link. */
3254+ serverAssert (link != sender -> link );
3255+ serverLog (LL_NOTICE , "Freeing outbound link to node %.40s after receiving a MEET packet from this known node" ,
3256+ sender -> name );
3257+ freeClusterLink (sender -> link );
32263258 }
3227- setClusterNodeToInboundClusterLink (node , link );
3228- clusterAddNode (node );
3229- clusterDoBeforeSleep (CLUSTER_TODO_SAVE_CONFIG );
32303259 }
32313260
3232- /* If this is a MEET packet from an unknown node, we still process
3233- * the gossip section here since we have to trust the sender because
3234- * of the message type. */
3235- if (!sender && type == CLUSTERMSG_TYPE_MEET ) clusterProcessGossipSection (hdr , link );
3236-
32373261 /* Anyway reply with a PONG */
32383262 clusterSendPing (link , CLUSTERMSG_TYPE_PONG );
32393263 }
@@ -3243,7 +3267,7 @@ int clusterProcessPacket(clusterLink *link) {
32433267 serverLog (LL_DEBUG , "%s packet received: %.40s" , clusterGetMessageTypeString (type ),
32443268 link -> node ? link -> node -> name : "NULL" );
32453269
3246- if (sender && (sender -> flags & CLUSTER_NODE_MEET )) {
3270+ if (sender && nodeInMeetState (sender )) {
32473271 /* Once we get a response for MEET from the sender, we can stop sending more MEET. */
32483272 sender -> flags &= ~CLUSTER_NODE_MEET ;
32493273 serverLog (LL_NOTICE , "Successfully completed handshake with %.40s (%s)" , sender -> name ,
@@ -3668,7 +3692,7 @@ void clusterLinkConnectHandler(connection *conn) {
36683692 * of a PING one, to force the receiver to add us in its node
36693693 * table. */
36703694 mstime_t old_ping_sent = node -> ping_sent ;
3671- clusterSendPing (link , node -> flags & CLUSTER_NODE_MEET ? CLUSTERMSG_TYPE_MEET : CLUSTERMSG_TYPE_PING );
3695+ clusterSendPing (link , nodeInMeetState ( node ) ? CLUSTERMSG_TYPE_MEET : CLUSTERMSG_TYPE_PING );
36723696 if (old_ping_sent ) {
36733697 /* If there was an active ping before the link was
36743698 * disconnected, we want to restore the ping time, otherwise
@@ -3747,7 +3771,9 @@ void clusterReadHandler(connection *conn) {
37473771
37483772 if (nread <= 0 ) {
37493773 /* I/O error... */
3750- serverLog (LL_DEBUG , "I/O error reading from node link: %s" ,
3774+ serverLog (LL_DEBUG , "I/O error reading from node link (%.40s:%s): %s" ,
3775+ link -> node ? link -> node -> name : "<unknown>" ,
3776+ link -> inbound ? "inbound" : "outbound" ,
37513777 (nread == 0 ) ? "connection closed" : connGetLastError (conn ));
37523778 handleLinkIOError (link );
37533779 return ;
@@ -3928,6 +3954,12 @@ void clusterSetGossipEntry(clusterMsg *hdr, int i, clusterNode *n) {
39283954/* Send a PING or PONG packet to the specified node, making sure to add enough
39293955 * gossip information. */
39303956void clusterSendPing (clusterLink * link , int type ) {
3957+ serverLog (LL_DEBUG , "Sending %s packet to node %.40s (%s) on %s link" ,
3958+ clusterGetMessageTypeString (type ),
3959+ link -> node ? link -> node -> name : "<unknown>" ,
3960+ link -> node ? link -> node -> human_nodename : "<unknown>" ,
3961+ link -> inbound ? "inbound" : "outbound" );
3962+
39313963 static unsigned long long cluster_pings_sent = 0 ;
39323964 cluster_pings_sent ++ ;
39333965 int gossipcount = 0 ; /* Number of gossip sections added so far. */
@@ -4943,6 +4975,15 @@ static int clusterNodeCronHandleReconnect(clusterNode *node, mstime_t handshake_
49434975 clusterDelNode (node );
49444976 return 1 ;
49454977 }
4978+ if (node -> link != NULL && node -> inbound_link == NULL && nodeInNormalState (node ) &&
4979+ now - node -> inbound_link_freed_time > handshake_timeout ) {
4980+ /* Node has an outbound link, but no inbound link for more than the handshake timeout.
4981+ * This probably means this node does not know us yet, whereas we know it.
4982+ * So we send it a MEET packet to do a handshake with it and correct the inconsistent cluster view. */
4983+ node -> flags |= CLUSTER_NODE_MEET ;
4984+ serverLog (LL_NOTICE , "Sending MEET packet to node %.40s because there is no inbound link for it" , node -> name );
4985+ clusterSendPing (node -> link , CLUSTERMSG_TYPE_MEET );
4986+ }
49464987
49474988 if (node -> link == NULL ) {
49484989 clusterLink * link = createClusterLink (node );
0 commit comments