Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2806,7 +2806,7 @@ standardConfig configs[] = {
createBoolConfig("appendonly", NULL, MODIFIABLE_CONFIG, g_pserver->aof_enabled, 0, NULL, updateAppendonly),
createBoolConfig("cluster-allow-reads-when-down", NULL, MODIFIABLE_CONFIG, g_pserver->cluster_allow_reads_when_down, 0, NULL, NULL),
createBoolConfig("delete-on-evict", NULL, MODIFIABLE_CONFIG, cserver.delete_on_evict, 0, NULL, NULL),
createBoolConfig("use-fork", NULL, IMMUTABLE_CONFIG, cserver.fForkBgSave, 0, NULL, NULL),
createBoolConfig("use-fork", NULL, IMMUTABLE_CONFIG, cserver.fForkBgSave, 1, NULL, NULL),
createBoolConfig("io-threads-do-reads", NULL, IMMUTABLE_CONFIG, fDummy, 0, NULL, NULL),
createBoolConfig("time-thread-priority", NULL, IMMUTABLE_CONFIG, cserver.time_thread_priority, 0, NULL, NULL),
createBoolConfig("prefetch-enabled", NULL, MODIFIABLE_CONFIG, g_pserver->prefetch_enabled, 1, NULL, NULL),
Expand Down
169 changes: 121 additions & 48 deletions src/rdb.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1334,7 +1334,7 @@ int rdbSaveRio(rio *rdb, const redisDbPersistentDataSnapshot **rgpdb, int *error
if (rdbSaveModulesAux(rdb, REDISMODULE_AUX_BEFORE_RDB) == -1) goto werr;

for (j = 0; j < cserver.dbnum; j++) {
const redisDbPersistentDataSnapshot *db = rgpdb[j];
const redisDbPersistentDataSnapshot *db = rgpdb != nullptr ? rgpdb[j] : g_pserver->db[j];
if (db->size() == 0) continue;

/* Write the SELECT DB opcode */
Expand Down Expand Up @@ -1715,7 +1715,7 @@ void getTempFileName(char tmpfile[], int tmpfileNum) {
char tmpfileNumString[214];

/* Generate temp rdb file name using aync-signal safe functions. */
int pid_len = ll2string(pid, sizeof(pid), getpid());
int pid_len = ll2string(pid, sizeof(pid), g_pserver->in_fork_child ? getppid() : getpid());
int tmpfileNum_len = ll2string(tmpfileNumString, sizeof(tmpfileNumString), tmpfileNum);
strcpy(tmpfile, "temp-");
strncpy(tmpfile+5, pid, pid_len);
Expand Down Expand Up @@ -3832,57 +3832,130 @@ int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) {
}

/* Create the child process. */
openChildInfoPipe();

for (int idb = 0; idb < cserver.dbnum; ++idb)
args->rgpdb[idb] = g_pserver->db[idb]->createSnapshot(getMvccTstamp(), false /*fOptional*/);

g_pserver->rdbThreadVars.tmpfileNum++;
g_pserver->rdbThreadVars.fRdbThreadCancel = false;
pthread_attr_t tattr;
pthread_attr_init(&tattr);
pthread_attr_setstacksize(&tattr, 1 << 23); // 8 MB
if (pthread_create(&child, &tattr, rdbSaveToSlavesSocketsThread, args)) {
pthread_attr_destroy(&tattr);
serverLog(LL_WARNING,"Can't save in background: fork: %s",
strerror(errno));

/* Undo the state change. The caller will perform cleanup on
* all the slaves in BGSAVE_START state, but an early call to
* replicationSetupSlaveForFullResync() turned it into BGSAVE_END */
listRewind(g_pserver->slaves,&li);
while((ln = listNext(&li))) {
client *replica = (client*)ln->value;
if (replica->replstate == SLAVE_STATE_WAIT_BGSAVE_END) {
replica->replstate = SLAVE_STATE_WAIT_BGSAVE_START;
if (cserver.fForkBgSave) {
pid_t childpid;
if ((childpid = redisFork(CHILD_TYPE_RDB)) == 0) {
/* Child */
int retval, dummy;
rio rdb;

rioInitWithFd(&rdb,args->rdb_pipe_write);

redisSetProcTitle("keydb-rdb-to-slaves");
redisSetCpuAffinity(g_pserver->bgsave_cpulist);

retval = rdbSaveRioWithEOFMark(&rdb,nullptr,nullptr,rsi);
if (retval == C_OK && rioFlush(&rdb) == 0)
retval = C_ERR;

if (retval == C_OK) {
sendChildCowInfo(CHILD_INFO_TYPE_RDB_COW_SIZE, "RDB");
}

rioFreeFd(&rdb);
/* wake up the reader, tell it we're done. */
close(args->rdb_pipe_write);
close(g_pserver->rdb_child_exit_pipe); /* close write end so that we can detect the close on the parent. */
/* hold exit until the parent tells us it's safe. we're not expecting
* to read anything, just get the error when the pipe is closed. */
dummy = read(args->safe_to_exit_pipe, pipefds, 1);
UNUSED(dummy);
exitFromChild((retval == C_OK) ? 0 : 1);
} else {
/* Parent */
close(args->safe_to_exit_pipe);
if (childpid == -1) {
serverLog(LL_WARNING,"Can't save in background: fork: %s",
strerror(errno));

/* Undo the state change. The caller will perform cleanup on
* all the slaves in BGSAVE_START state, but an early call to
* replicationSetupSlaveForFullResync() turned it into BGSAVE_END */
listRewind(g_pserver->slaves,&li);
while((ln = listNext(&li))) {
client *replica = (client*)ln->value;
if (replica->replstate == SLAVE_STATE_WAIT_BGSAVE_END) {
replica->replstate = SLAVE_STATE_WAIT_BGSAVE_START;
}
}
close(args->rdb_pipe_write);
close(g_pserver->rdb_pipe_read);
zfree(g_pserver->rdb_pipe_conns);
g_pserver->rdb_pipe_conns = NULL;
g_pserver->rdb_pipe_numconns = 0;
g_pserver->rdb_pipe_numconns_writing = 0;
args->rsi.~rdbSaveInfo();
zfree(args);
} else {
serverLog(LL_NOTICE,"Background RDB transfer started by pid %ld",
(long)childpid);
g_pserver->rdb_save_time_start = time(NULL);
g_pserver->rdb_child_type = RDB_CHILD_TYPE_SOCKET;
g_pserver->rdbThreadVars.fRdbThreadActive = true;
updateDictResizePolicy();
close(args->rdb_pipe_write); /* close write in parent so that it can detect the close on the child. */
aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, []{
if (aeCreateFileEvent(serverTL->el, g_pserver->rdb_pipe_read, AE_READABLE, rdbPipeReadHandler, nullptr) == AE_ERR) {
serverPanic("Unrecoverable error creating g_pserver->rdb_pipe_read file event.");
}
});
}
return (childpid == -1) ? C_ERR : C_OK;
}
close(args->rdb_pipe_write);
close(g_pserver->rdb_pipe_read);
zfree(g_pserver->rdb_pipe_conns);
close(args->safe_to_exit_pipe);
g_pserver->rdb_pipe_conns = NULL;
g_pserver->rdb_pipe_numconns = 0;
g_pserver->rdb_pipe_numconns_writing = 0;
args->rsi.~rdbSaveInfo();
zfree(args);
closeChildInfoPipe();
return C_ERR;
}
pthread_attr_destroy(&tattr);
g_pserver->child_type = CHILD_TYPE_RDB;
else {
openChildInfoPipe();

serverLog(LL_NOTICE,"Background RDB transfer started");
g_pserver->rdb_save_time_start = time(NULL);
serverAssert(!g_pserver->rdbThreadVars.fRdbThreadActive);
g_pserver->rdbThreadVars.rdb_child_thread = child;
g_pserver->rdbThreadVars.fRdbThreadActive = true;
g_pserver->rdb_child_type = RDB_CHILD_TYPE_SOCKET;
aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, []{
if (aeCreateFileEvent(serverTL->el, g_pserver->rdb_pipe_read, AE_READABLE, rdbPipeReadHandler,NULL) == AE_ERR) {
serverPanic("Unrecoverable error creating server.rdb_pipe_read file event.");
for (int idb = 0; idb < cserver.dbnum; ++idb)
args->rgpdb[idb] = g_pserver->db[idb]->createSnapshot(getMvccTstamp(), false /*fOptional*/);

g_pserver->rdbThreadVars.tmpfileNum++;
g_pserver->rdbThreadVars.fRdbThreadCancel = false;
pthread_attr_t tattr;
pthread_attr_init(&tattr);
pthread_attr_setstacksize(&tattr, 1 << 23); // 8 MB
if (pthread_create(&child, &tattr, rdbSaveToSlavesSocketsThread, args)) {
pthread_attr_destroy(&tattr);
serverLog(LL_WARNING,"Can't save in background: fork: %s",
strerror(errno));

/* Undo the state change. The caller will perform cleanup on
* all the slaves in BGSAVE_START state, but an early call to
* replicationSetupSlaveForFullResync() turned it into BGSAVE_END */
listRewind(g_pserver->slaves,&li);
while((ln = listNext(&li))) {
client *replica = (client*)ln->value;
if (replica->replstate == SLAVE_STATE_WAIT_BGSAVE_END) {
replica->replstate = SLAVE_STATE_WAIT_BGSAVE_START;
}
}
close(args->rdb_pipe_write);
close(g_pserver->rdb_pipe_read);
zfree(g_pserver->rdb_pipe_conns);
close(args->safe_to_exit_pipe);
g_pserver->rdb_pipe_conns = NULL;
g_pserver->rdb_pipe_numconns = 0;
g_pserver->rdb_pipe_numconns_writing = 0;
args->rsi.~rdbSaveInfo();
zfree(args);
closeChildInfoPipe();
return C_ERR;
}
});
pthread_attr_destroy(&tattr);
g_pserver->child_type = CHILD_TYPE_RDB;

serverLog(LL_NOTICE,"Background RDB transfer started");
g_pserver->rdb_save_time_start = time(NULL);
serverAssert(!g_pserver->rdbThreadVars.fRdbThreadActive);
g_pserver->rdbThreadVars.rdb_child_thread = child;
g_pserver->rdbThreadVars.fRdbThreadActive = true;
g_pserver->rdb_child_type = RDB_CHILD_TYPE_SOCKET;
aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, []{
if (aeCreateFileEvent(serverTL->el, g_pserver->rdb_pipe_read, AE_READABLE, rdbPipeReadHandler, nullptr) == AE_ERR) {
serverPanic("Unrecoverable error creating server.rdb_pipe_read file event.");
}
});
}

return C_OK; /* Unreached. */
}
Expand Down
2 changes: 1 addition & 1 deletion src/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5172,7 +5172,7 @@ int prepareForShutdown(int flags) {
* to unlink file actully) in background thread.
* The temp rdb file fd may won't be closed when redis exits quickly,
* but OS will close this fd when process exits. */
rdbRemoveTempFile(g_pserver->child_pid, 0);
rdbRemoveTempFile(g_pserver->rdbThreadVars.tmpfileNum, 0);
}

/* Kill module child if there is one. */
Expand Down