Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions src/bio.c
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,15 @@


#include "server.h"
#include "connection.h"
#include "bio.h"
#include <stdatomic.h>

static char *bio_worker_title[] = {
"bio_close_file",
"bio_aof",
"bio_lazy_free",
"bio_rdb_save",
};

#define BIO_WORKER_NUM (sizeof(bio_worker_title) / sizeof(*bio_worker_title))
Expand All @@ -77,13 +79,15 @@ static unsigned int bio_job_to_worker[] = {
[BIO_AOF_FSYNC] = 1,
[BIO_CLOSE_AOF] = 1,
[BIO_LAZY_FREE] = 2,
[BIO_RDB_SAVE] = 3,
};

static pthread_t bio_threads[BIO_WORKER_NUM];
static pthread_mutex_t bio_mutex[BIO_WORKER_NUM];
static pthread_cond_t bio_newjob_cond[BIO_WORKER_NUM];
static list *bio_jobs[BIO_WORKER_NUM];
static unsigned long bio_jobs_counter[BIO_NUM_OPS] = {0};
static __thread unsigned long bio_thread_id = 0;

/* This structure represents a background Job. It is only used locally to this
* file as the API does not expose the internals at all. */
Expand All @@ -108,6 +112,12 @@ typedef union bio_job {
lazy_free_fn *free_fn; /* Function that will free the provided arguments */
void *free_args[]; /* List of arguments to be passed to the free function */
} free_args;

struct {
int type;
connection *conn; /* Connection to download the RDB from */
int is_dual_channel; /* Single vs dual channel */
} save_to_disk_args;
} bio_job;

void *bioProcessBackgroundJobs(void *arg);
Expand Down Expand Up @@ -203,6 +213,13 @@ void bioCreateFsyncJob(int fd, long long offset, int need_reclaim_cache) {
bioSubmitJob(BIO_AOF_FSYNC, job);
}

void bioCreateSaveRDBToDiskJob(connection *conn, int is_dual_channel) {
bio_job *job = zmalloc(sizeof(*job));
job->save_to_disk_args.conn = conn;
job->save_to_disk_args.is_dual_channel = is_dual_channel;
bioSubmitJob(BIO_RDB_SAVE, job);
}

void *bioProcessBackgroundJobs(void *arg) {
bio_job *job;
unsigned long worker = (unsigned long)arg;
Expand All @@ -225,6 +242,8 @@ void *bioProcessBackgroundJobs(void *arg) {
if (pthread_sigmask(SIG_BLOCK, &sigset, NULL))
serverLog(LL_WARNING, "Warning: can't mask SIGALRM in bio.c thread: %s", strerror(errno));

bio_thread_id = worker;

while (1) {
listNode *ln;

Expand Down Expand Up @@ -278,6 +297,8 @@ void *bioProcessBackgroundJobs(void *arg) {
if (job_type == BIO_CLOSE_AOF) close(job->fd_args.fd);
} else if (job_type == BIO_LAZY_FREE) {
job->free_args.free_fn(job->free_args.free_args);
} else if (job_type == BIO_RDB_SAVE) {
replicaReceiveRDBFromPrimaryToDisk(job->save_to_disk_args.conn, job->save_to_disk_args.is_dual_channel);
} else {
serverPanic("Wrong job type in bioProcessBackgroundJobs().");
}
Expand Down Expand Up @@ -333,3 +354,7 @@ void bioKillThreads(void) {
}
}
}

int inBioThread(void) {
return bio_thread_id != 0;
}
3 changes: 3 additions & 0 deletions src/bio.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,16 @@ void bioCreateCloseJob(int fd, int need_fsync, int need_reclaim_cache);
void bioCreateCloseAofJob(int fd, long long offset, int need_reclaim_cache);
void bioCreateFsyncJob(int fd, long long offset, int need_reclaim_cache);
void bioCreateLazyFreeJob(lazy_free_fn free_fn, int arg_count, ...);
void bioCreateSaveRDBToDiskJob(connection *conn, int is_dual_channel);
int inBioThread(void);

/* Background job opcodes */
enum {
BIO_CLOSE_FILE = 0, /* Deferred close(2) syscall. */
BIO_AOF_FSYNC, /* Deferred AOF fsync. */
BIO_LAZY_FREE, /* Deferred objects freeing. */
BIO_CLOSE_AOF, /* Deferred close for AOF files. */
BIO_RDB_SAVE, /* Deferred save RDB to disk on replica */
BIO_NUM_OPS
};

Expand Down
Loading
Loading