Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -1094,9 +1094,9 @@ clusterNode *getNodeByQuery(client *c, int *error_code) {
* error). To do so we set the importing/migrating state and
* increment a counter for every missing key. */
if (clusterNodeIsPrimary(myself) || c->flag.readonly) {
if (n == clusterNodeGetPrimary(myself) && getMigratingSlot(c->slot) != NULL) {
if (n == clusterNodeGetPrimary(myself) && getMigratingSlotDest(c->slot) != NULL) {
migrating_slot = 1;
} else if (getImportingSlot(c->slot) != NULL) {
} else if (getImportingSlotSource(c->slot) != NULL) {
importing_slot = 1;
}
}
Expand Down Expand Up @@ -1260,7 +1260,7 @@ clusterNode *getNodeByQuery(client *c, int *error_code) {
return NULL;
} else {
if (error_code) *error_code = CLUSTER_REDIR_ASK;
return getMigratingSlot(c->slot);
return getMigratingSlotDest(c->slot);
}
}

Expand Down Expand Up @@ -1374,7 +1374,7 @@ int clusterRedirectBlockedClientIfNeeded(client *c) {
/* We send an error and unblock the client if:
* 1) The slot is unassigned, emitting a cluster down error.
* 2) The slot is neither handled by this node, nor being imported. */
if (node != myself && getImportingSlot(slot) == NULL) {
if (node != myself && getImportingSlotSource(slot) == NULL) {
if (node == NULL) {
clusterRedirectClient(c, NULL, 0, CLUSTER_REDIR_DOWN_UNBOUND);
} else {
Expand Down
4 changes: 2 additions & 2 deletions src/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@ int clusterNodeIsNoFailover(clusterNode *node);
char *clusterNodeGetShardId(clusterNode *node);
int clusterNodeNumReplicas(clusterNode *node);
clusterNode *clusterNodeGetReplica(clusterNode *node, int replica_idx);
clusterNode *getMigratingSlot(int slot);
clusterNode *getImportingSlot(int slot);
clusterNode *getMigratingSlotDest(int slot);
clusterNode *getImportingSlotSource(int slot);
clusterNode *getNodeBySlot(int slot);
int clusterNodeClientPort(clusterNode *n, int use_tls);
char *clusterNodeHostname(clusterNode *node);
Expand Down
125 changes: 59 additions & 66 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -197,23 +197,24 @@ dictType clusterSdsToListType = {
NULL /* allow to expand */
};

static uint64_t dictIntHash(const void *key) {
return dictGenHashFunction(key, sizeof(int));
static uint64_t dictPtrHash(const void *key) {
/* We hash the pointer value itself. */
return dictGenHashFunction(&key, sizeof(key));
}

static int dictIntKeyCompare(const void *key1, const void *key2) {
return *(int *)key1 == *(int *)key2;
static int dictPtrCompare(const void *key1, const void *key2) {
return key1 == key2;
}

/* Dictionary type for mapping hash slots to cluster nodes.
* Keys are allocated integers representing the slot number, values are clusterNode pointers. */
* Keys are slot numbers encoded directly as pointer values, values are clusterNode pointers. */
dictType clusterSlotDictType = {
dictIntHash, /* hash function */
NULL, /* key dup */
dictIntKeyCompare, /* key compare */
zfree, /* key destructor */
NULL, /* val destructor */
NULL /* allow to expand */
dictPtrHash, /* hash function */
NULL, /* key dup */
dictPtrCompare, /* key compare */
NULL, /* key destructor */
NULL, /* val destructor */
NULL /* allow to expand */
};

typedef struct {
Expand Down Expand Up @@ -282,43 +283,39 @@ static void clusterNodeIterReset(ClusterNodeIterator *iter) {
}

/* Helpers to access the migrating/importing slot dictionaries. */
clusterNode *getMigratingSlot(int slot) {
dictEntry *de = dictFind(server.cluster->migrating_slots_to, &slot);
clusterNode *getMigratingSlotDest(int slot) {
dictEntry *de = dictFind(server.cluster->migrating_slots_to, (void *)(long)slot);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: For maximum portability, i guess we should ideally use intptr_t instead of long in these casts. There is no guarantee in the C standard that long is pointer-sized but intptr_t always is.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TIL! I have made the change. thank you

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just fyi, I referenced it from here, I am not sure if this nit would fit there too!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am going to update that before someone can leave a comment :)

return de ? dictGetVal(de) : NULL;
}

static void setMigratingSlot(int slot, clusterNode *node) {
dictEntry *de = dictFind(server.cluster->migrating_slots_to, &slot);
static void setMigratingSlotDest(int slot, clusterNode *node) {
dictEntry *de = dictFind(server.cluster->migrating_slots_to, (void *)(long)slot);
if (node == NULL) {
if (de) dictDelete(server.cluster->migrating_slots_to, &slot);
if (de) dictDelete(server.cluster->migrating_slots_to, (void *)(long)slot);
return;
}
if (de) {
dictSetVal(server.cluster->migrating_slots_to, de, node);
} else {
int *k = zmalloc(sizeof(int));
*k = slot;
dictAdd(server.cluster->migrating_slots_to, k, node);
dictAdd(server.cluster->migrating_slots_to, (void *)(long)slot, node);
}
}

clusterNode *getImportingSlot(int slot) {
dictEntry *de = dictFind(server.cluster->importing_slots_from, &slot);
clusterNode *getImportingSlotSource(int slot) {
dictEntry *de = dictFind(server.cluster->importing_slots_from, (void *)(long)slot);
return de ? dictGetVal(de) : NULL;
}

static void setImportingSlot(int slot, clusterNode *node) {
dictEntry *de = dictFind(server.cluster->importing_slots_from, &slot);
static void setImportingSlotSource(int slot, clusterNode *node) {
dictEntry *de = dictFind(server.cluster->importing_slots_from, (void *)(long)slot);
if (node == NULL) {
if (de) dictDelete(server.cluster->importing_slots_from, &slot);
if (de) dictDelete(server.cluster->importing_slots_from, (void *)(long)slot);
return;
}
if (de) {
dictSetVal(server.cluster->importing_slots_from, de, node);
} else {
int *k = zmalloc(sizeof(int));
*k = slot;
dictAdd(server.cluster->importing_slots_from, k, node);
dictAdd(server.cluster->importing_slots_from, (void *)(long)slot, node);
}
}

Expand Down Expand Up @@ -850,9 +847,9 @@ int clusterLoadConfig(char *filename) {
clusterAddNode(cn);
}
if (direction == '>') {
setMigratingSlot(slot, cn);
setMigratingSlotDest(slot, cn);
} else {
setImportingSlot(slot, cn);
setImportingSlotSource(slot, cn);
}
continue;
} else if ((p = strchr(argv[j], '-')) != NULL) {
Expand Down Expand Up @@ -2002,8 +1999,8 @@ void clusterDelNode(clusterNode *delnode) {

/* 1) Mark slots as unassigned. */
for (j = 0; j < CLUSTER_SLOTS; j++) {
if (getImportingSlot(j) == delnode) setImportingSlot(j, NULL);
if (getMigratingSlot(j) == delnode) setMigratingSlot(j, NULL);
if (getImportingSlotSource(j) == delnode) setImportingSlotSource(j, NULL);
if (getMigratingSlotDest(j) == delnode) setMigratingSlotDest(j, NULL);
if (server.cluster->slots[j] == delnode) clusterDelSlot(j);
}

Expand Down Expand Up @@ -2826,19 +2823,17 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc
* state for the slot. Otherwise, we are looking at a failover within
* the same shard and we should retain the migrating_slots_to state
* for the slot in question */
clusterNode *mn = getMigratingSlot(j);
clusterNode *mn = getMigratingSlotDest(j);
if (mn != NULL) {
if (!are_in_same_shard) {
serverLog(LL_NOTICE, "Slot %d is no longer being migrated to node %.40s (%s) in shard %.40s.",
j, mn->name,
mn->human_nodename,
mn->shard_id);
setMigratingSlot(j, NULL);
j, mn->name, mn->human_nodename, mn->shard_id);
setMigratingSlotDest(j, NULL);
}
}

/* Handle the case where we are importing this slot and the ownership changes */
clusterNode *in = getImportingSlot(j);
clusterNode *in = getImportingSlotSource(j);
if (in != NULL &&
in != sender) {
/* Update importing_slots_from to point to the sender, if it is in the
Expand All @@ -2848,16 +2843,14 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc
"Failover occurred in migration source. Update importing "
"source for slot %d to node %.40s (%s) in shard %.40s.",
j, sender->name, sender->human_nodename, sender->shard_id);
setImportingSlot(j, sender);
setImportingSlotSource(j, sender);
} else {
/* If the sender is from a different shard, it must be a result
* of deliberate operator actions. We should clear the importing
* state to conform to the operator's will. */
serverLog(LL_NOTICE, "Slot %d is no longer being imported from node %.40s (%s) in shard %.40s.",
j, in->name,
in->human_nodename,
in->shard_id);
setImportingSlot(j, NULL);
j, in->name, in->human_nodename, in->shard_id);
setImportingSlotSource(j, NULL);
}
}

Expand All @@ -2882,7 +2875,7 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc
* any slot to its shard and if there is a primaryship change in
* the shard. Update the migrating_slots_to state to point to the
* sender if it has just taken over the primary role. */
clusterNode *mn = getMigratingSlot(j);
clusterNode *mn = getMigratingSlotDest(j);
if (mn != NULL && mn != sender &&
(mn->configEpoch < senderConfigEpoch ||
nodeIsReplica(mn)) &&
Expand All @@ -2891,7 +2884,7 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc
"Failover occurred in migration target."
" Slot %d is now being migrated to node %.40s (%s) in shard %.40s.",
j, sender->name, sender->human_nodename, sender->shard_id);
setMigratingSlot(j, sender);
setMigratingSlotDest(j, sender);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_FSYNC_CONFIG);
}

Expand All @@ -2911,12 +2904,12 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc
* 1. Remove the importing state for the specific slot.
* 2. Finalize the slot's ownership, if I am not already the owner of
* the slot. */
if (nodeIsPrimary(myself) && getImportingSlot(j) == sender) {
if (nodeIsPrimary(myself) && getImportingSlotSource(j) == sender) {
serverLog(LL_NOTICE,
"Slot %d is no longer being imported from node %.40s (%s) in shard %.40s;"
" Clear my importing source for the slot.",
j, sender->name, sender->human_nodename, sender->shard_id);
setImportingSlot(j, NULL);
setImportingSlotSource(j, NULL);
/* Take over the slot ownership if I am not the owner yet*/
if (server.cluster->slots[j] != myself) {
/* A primary reason why we are here is likely due to my primary crashing during the
Expand Down Expand Up @@ -5981,21 +5974,21 @@ void clusterMoveNodeSlots(clusterNode *from_node, clusterNode *to_node, int *slo
processed++;
}

if (getImportingSlot(j) == from_node) {
if (getImportingSlotSource(j) == from_node) {
serverLog(LL_VERBOSE,
"Failover occurred in migration source. Update importing "
"source for slot %d to node %.40s (%s) in shard %.40s.",
j, to_node->name, to_node->human_nodename, to_node->shard_id);
setImportingSlot(j, to_node);
setImportingSlotSource(j, to_node);
importing_processed++;
}

if (getMigratingSlot(j) == from_node) {
if (getMigratingSlotDest(j) == from_node) {
serverLog(LL_VERBOSE,
"Failover occurred in migration target."
" Slot %d is now being migrated to node %.40s (%s) in shard %.40s.",
j, to_node->name, to_node->human_nodename, to_node->shard_id);
setMigratingSlot(j, to_node);
setMigratingSlotDest(j, to_node);
migrating_processed++;
}
}
Expand Down Expand Up @@ -6182,15 +6175,15 @@ int verifyClusterConfigWithData(void) {
/* Check if we are assigned to this slot or if we are importing it.
* In both cases check the next slot as the configuration makes
* sense. */
if (server.cluster->slots[j] == myself || getImportingSlot(j) != NULL) continue;
if (server.cluster->slots[j] == myself || getImportingSlotSource(j) != NULL) continue;

/* If we are here data and cluster config don't agree, and we have
* slot 'j' populated even if we are not importing it, nor we are
* assigned to this slot. Fix this condition. */

update_config++;
/* slot is unassigned. Take responsibility for it. */
clusterNode *in = getImportingSlot(j);
clusterNode *in = getImportingSlotSource(j);
if (server.cluster->slots[j] == NULL) {
serverLog(LL_NOTICE,
"I have keys for unassigned slot %d. "
Expand Down Expand Up @@ -6393,8 +6386,8 @@ sds clusterGenNodeDescription(client *c, clusterNode *node, int tls_primary) {
* instances. */
if (node->flags & CLUSTER_NODE_MYSELF) {
for (j = 0; j < CLUSTER_SLOTS; j++) {
clusterNode *mn = getMigratingSlot(j);
clusterNode *in = getImportingSlot(j);
clusterNode *mn = getMigratingSlotDest(j);
clusterNode *in = getImportingSlotSource(j);
if (mn) {
ci = sdscatfmt(ci, " [%i->-", j);
ci = sdscatlen(ci, mn->name, CLUSTER_NAMELEN);
Expand Down Expand Up @@ -6613,7 +6606,7 @@ void clusterUpdateSlots(client *c, unsigned char *slots, int del) {

/* If this slot was set as importing we can clear this
* state as now we are the real owner of the slot. */
if (getImportingSlot(j)) setImportingSlot(j, NULL);
if (getImportingSlotSource(j)) setImportingSlotSource(j, NULL);

retval = del ? clusterDelSlot(j) : clusterAddSlot(myself, j);
serverAssertWithInfo(c, NULL, retval == C_OK);
Expand Down Expand Up @@ -7217,15 +7210,15 @@ void clusterCommandSetSlot(client *c) {
* Now execute the command on the primary. */
if (!strcasecmp(c->argv[3]->ptr, "migrating")) {
serverLog(LL_NOTICE, "Migrating slot %d to node %.40s (%s)", slot, n->name, n->human_nodename);
setMigratingSlot(slot, n);
setMigratingSlotDest(slot, n);
} else if (!strcasecmp(c->argv[3]->ptr, "importing")) {
serverLog(LL_NOTICE, "Importing slot %d from node %.40s (%s)", slot, n->name, n->human_nodename);
setImportingSlot(slot, n);
setImportingSlotSource(slot, n);
} else if (!strcasecmp(c->argv[3]->ptr, "stable")) {
/* CLUSTER SETSLOT <SLOT> STABLE */
serverLog(LL_NOTICE, "Marking slot %d stable", slot);
setImportingSlot(slot, NULL);
setMigratingSlot(slot, NULL);
setImportingSlotSource(slot, NULL);
setMigratingSlotDest(slot, NULL);
} else if (!strcasecmp(c->argv[3]->ptr, "node")) {
/* CLUSTER SETSLOT <SLOT> NODE <NODE ID> */
serverLog(LL_NOTICE, "Assigning slot %d to node %.40s (%s) in shard %.40s", slot, n->name, n->human_nodename,
Expand All @@ -7234,8 +7227,8 @@ void clusterCommandSetSlot(client *c) {
/* If this slot is in migrating status but we have no keys
* for it assigning the slot to another node will clear
* the migrating status. */
if (countKeysInSlot(slot) == 0 && getMigratingSlot(slot)) {
setMigratingSlot(slot, NULL);
if (countKeysInSlot(slot) == 0 && getMigratingSlotDest(slot)) {
setMigratingSlotDest(slot, NULL);
}

clusterNode *my_primary = clusterNodeGetPrimary(myself);
Expand Down Expand Up @@ -7276,8 +7269,8 @@ void clusterCommandSetSlot(client *c) {

/* If this node or this node's primary was importing this slot,
* assigning the slot to itself also clears the importing status. */
if ((n == myself || n == myself->replicaof) && getImportingSlot(slot)) {
setImportingSlot(slot, NULL);
if ((n == myself || n == myself->replicaof) && getImportingSlotSource(slot)) {
setImportingSlotSource(slot, NULL);

/* Only primary broadcasts the updates */
if (n == myself) {
Expand Down Expand Up @@ -7804,7 +7797,7 @@ sds clusterEncodeOpenSlotsAuxField(int rdbflags) {
dictIterator *di = dictGetIterator(d);
dictEntry *de;
while ((de = dictNext(di)) != NULL) {
int slot = *(int *)dictGetKey(de);
int slot = (int)(uintptr_t)dictGetKey(de);
clusterNode *node = dictGetVal(de);
if (s == NULL) s = sdsempty();
s = sdscatfmt(s, "%i%s", slot, (i == 0) ? "<" : ">");
Expand Down Expand Up @@ -7859,9 +7852,9 @@ int clusterDecodeOpenSlotsAuxField(int rdbflags, sds s) {

/* Set the slot state */
if (is_importing) {
setImportingSlot(slot, node);
setImportingSlotSource(slot, node);
} else {
setMigratingSlot(slot, node);
setMigratingSlotDest(slot, node);
}
}
return C_OK;
Expand Down
Loading