Skip to content

Commit 88cd208

Browse files
authored
Split SLOT_EXPORT_AUTHENTICATING into SEND and READ to avoid synchronous reading of auth response (#2494)
The old SLOT_EXPORT_AUTHENTICATING added in #1949, when processed by the source node, we will send the AUTH command and then reads the response. If the target node is blocked during this process, the source node will also be blocked. We should use a read handler to handle this. We split SLOT_EXPORT_AUTHENTICATING into SLOT_EXPORT_SEND_AUTH and SLOT_EXPORT_READ_AUTH_RESPONSE to avoid this issue. Signed-off-by: Binbin <[email protected]>
1 parent cf90acb commit 88cd208

File tree

1 file changed

+59
-42
lines changed

1 file changed

+59
-42
lines changed

src/cluster_migrateslots.c

Lines changed: 59 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@ typedef enum slotMigrationJobState {
2020

2121
/* Exporting states */
2222
SLOT_EXPORT_CONNECTING,
23-
SLOT_EXPORT_AUTHENTICATING,
23+
SLOT_EXPORT_SEND_AUTH,
24+
SLOT_EXPORT_READ_AUTH_RESPONSE,
2425
SLOT_EXPORT_SEND_ESTABLISH,
2526
SLOT_EXPORT_READ_ESTABLISH_RESPONSE,
2627
SLOT_EXPORT_WAITING_TO_SNAPSHOT,
@@ -734,9 +735,13 @@ void clusterHandleFlushDuringSlotMigration(void) {
734735
* │SLOT_EXPORT_CONNECTING├─────────┐
735736
* └───────────┬──────────┘ │
736737
* Connected│ │
737-
* ┌─────────────▼────────────┐ │
738-
* │SLOT_EXPORT_AUTHENTICATING┼───────┤
739-
* └─────────────┬────────────┘ │
738+
* ┌───────────▼─────────┐ │
739+
* │SLOT_EXPORT_SEND_AUTH┼──────────┤
740+
* └───────────┬─────────┘ │
741+
* AUTH command written│ │
742+
* ┌─────────────▼────────────────┐ │
743+
* │SLOT_EXPORT_READ_AUTH_RESPONSE┼───┤
744+
* └─────────────┬────────────────┘ │
740745
* Authenticated│ │
741746
* ┌─────────────▼────────────┐ │
742747
* │SLOT_EXPORT_SEND_ESTABLISH┼───────┤
@@ -1020,41 +1025,48 @@ int proceedWithSlotExportJobConnecting(slotMigrationJob *job, bool *completed) {
10201025
}
10211026
}
10221027

1023-
/* Perform the authentication steps needed to authenticate a slot migration
1024-
* job's connection. Return C_ERR if an error is encountered, or C_OK if the
1025-
* authentication is done. */
1026-
int performSlotExportJobAuthentication(slotMigrationJob *job) {
1027-
serverAssert(job->type == SLOT_MIGRATION_EXPORT);
1028-
if (!server.primary_auth) {
1029-
return C_OK;
1030-
}
1031-
sds err = replicationSendAuth(job->conn);
1032-
if (err) {
1033-
serverLog(LL_WARNING,
1034-
"Failed to send AUTH command for slot migration %s: %s",
1035-
job->description, err);
1036-
sdsfree(err);
1037-
return C_ERR;
1038-
}
1039-
err = receiveSynchronousResponse(job->conn);
1028+
/* Read a response to the AUTH command, moving to the next stage of the migration
1029+
* if the response is a success. If there is an error, fail the migration with the
1030+
* error message. */
1031+
void slotMigrationJobReadAuthResponse(connection *conn) {
1032+
slotMigrationJob *job = (slotMigrationJob *)connGetPrivateData(conn);
1033+
1034+
sds err = receiveSynchronousResponse(job->conn);
10401035
if (err == NULL) {
1041-
serverLog(LL_WARNING,
1042-
"Received no response to AUTH command for slot migration %s",
1043-
job->description);
1044-
return C_ERR;
1036+
finishSlotMigrationJob(job, SLOT_MIGRATION_JOB_FAILED, "Target node did not respond to AUTH command");
1037+
return;
10451038
}
10461039
if (err[0] == '-') {
1047-
serverLog(LL_WARNING,
1048-
"Failed to AUTH for slot migration %s: %s",
1049-
job->description, err);
1040+
sds status_msg = sdscatfmt(sdsempty(), "Failed to AUTH to target node: %s", err);
1041+
finishSlotMigrationJob(job, SLOT_MIGRATION_JOB_FAILED, status_msg);
10501042
sdsfree(err);
1051-
return C_ERR;
1043+
sdsfree(status_msg);
1044+
return;
10521045
}
1053-
serverLog(LL_NOTICE,
1054-
"Successfully authenticated slot migration %s",
1055-
job->description);
1046+
10561047
sdsfree(err);
1057-
return C_OK;
1048+
serverLog(LL_NOTICE, "Successfully authenticated slot migration %s", job->description);
1049+
updateSlotMigrationJobState(job, SLOT_EXPORT_SEND_ESTABLISH);
1050+
proceedWithSlotMigration(job);
1051+
}
1052+
1053+
/* Perform the authentication steps needed to authenticate a slot migration
1054+
* job's connection. */
1055+
void slotMigrationJobSendAuth(slotMigrationJob *job) {
1056+
serverAssert(job->type == SLOT_MIGRATION_EXPORT);
1057+
serverAssert(server.primary_auth);
1058+
1059+
sds err = replicationSendAuth(job->conn);
1060+
if (err) {
1061+
sds status_msg = sdscatfmt(sdsempty(), "Failed to send AUTH command to target node: %s", err);
1062+
finishSlotMigrationJob(job, SLOT_MIGRATION_JOB_FAILED, status_msg);
1063+
sdsfree(err);
1064+
sdsfree(status_msg);
1065+
return;
1066+
}
1067+
1068+
connSetReadHandler(job->conn, slotMigrationJobReadAuthResponse);
1069+
updateSlotMigrationJobState(job, SLOT_EXPORT_READ_AUTH_RESPONSE);
10581070
}
10591071

10601072
/* Initialize the client for the slot migration job, which should already be
@@ -1590,17 +1602,19 @@ void proceedWithSlotMigration(slotMigrationJob *job) {
15901602
if (!completed) return;
15911603
serverLog(LL_NOTICE, "Slot migration %s connection established.",
15921604
job->description);
1593-
updateSlotMigrationJobState(job, SLOT_EXPORT_AUTHENTICATING);
1605+
if (server.primary_auth) {
1606+
updateSlotMigrationJobState(job, SLOT_EXPORT_SEND_AUTH);
1607+
} else {
1608+
updateSlotMigrationJobState(job, SLOT_EXPORT_SEND_ESTABLISH);
1609+
}
15941610
continue;
15951611
}
1596-
case SLOT_EXPORT_AUTHENTICATING:
1597-
if (performSlotExportJobAuthentication(job) == C_ERR) {
1598-
finishSlotMigrationJob(job, SLOT_MIGRATION_JOB_FAILED,
1599-
"Failed to AUTH to target node");
1600-
return;
1601-
}
1602-
updateSlotMigrationJobState(job, SLOT_EXPORT_SEND_ESTABLISH);
1612+
case SLOT_EXPORT_SEND_AUTH:
1613+
slotMigrationJobSendAuth(job);
16031614
continue;
1615+
case SLOT_EXPORT_READ_AUTH_RESPONSE:
1616+
/* We are still reading back the response, nothing to do in cron */
1617+
return;
16041618
case SLOT_EXPORT_SEND_ESTABLISH:
16051619
initSlotExportJobClient(job);
16061620
addReplySds(job->client, generateSyncSlotsEstablishCommand(job));
@@ -1768,7 +1782,8 @@ const char *slotMigrationJobStateToString(slotMigrationJobState state) {
17681782
case SLOT_IMPORT_FINISHED_WAITING_TO_CLEANUP: return "cleaning-up";
17691783

17701784
case SLOT_EXPORT_CONNECTING: return "connecting";
1771-
case SLOT_EXPORT_AUTHENTICATING: return "authenticating";
1785+
case SLOT_EXPORT_SEND_AUTH: return "sending-auth-command";
1786+
case SLOT_EXPORT_READ_AUTH_RESPONSE: return "reading-auth-response";
17721787
case SLOT_EXPORT_SEND_ESTABLISH: return "sending-establish-command";
17731788
case SLOT_EXPORT_READ_ESTABLISH_RESPONSE:
17741789
return "reading-establish-response";
@@ -2007,6 +2022,8 @@ bool canSlotMigrationJobSendAck(slotMigrationJob *job) {
20072022
return job->state != SLOT_EXPORT_SNAPSHOTTING &&
20082023
job->state != SLOT_IMPORT_WAIT_ACK &&
20092024
job->state != SLOT_EXPORT_CONNECTING &&
2025+
job->state != SLOT_EXPORT_SEND_AUTH &&
2026+
job->state != SLOT_EXPORT_READ_AUTH_RESPONSE &&
20102027
job->state != SLOT_EXPORT_SEND_ESTABLISH &&
20112028
job->state != SLOT_EXPORT_READ_ESTABLISH_RESPONSE;
20122029
}

0 commit comments

Comments
 (0)