Skip to content

Commit eeba913

Browse files
committed
in_tail: Serialize DB operation for preventing resource collisions
Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>
1 parent 3ac8377 commit eeba913

File tree

1 file changed

+68
-0
lines changed

1 file changed

+68
-0
lines changed

plugins/in_tail/tail_db.c

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,24 @@ struct query_status {
3131
int64_t offset;
3232
};
3333

34+
static inline int tail_db_lock(struct flb_tail_config *ctx)
35+
{
36+
if (ctx->db == NULL) {
37+
return 0;
38+
}
39+
40+
return flb_sqldb_lock(ctx->db);
41+
}
42+
43+
static inline int tail_db_unlock(struct flb_tail_config *ctx)
44+
{
45+
if (ctx->db == NULL) {
46+
return 0;
47+
}
48+
49+
return flb_sqldb_unlock(ctx->db);
50+
}
51+
3452
/* Open or create database required by tail plugin */
3553
struct flb_sqldb *flb_tail_db_open(const char *path,
3654
struct flb_input_instance *in,
@@ -261,11 +279,21 @@ int flb_tail_db_file_set(struct flb_tail_file *file,
261279
off_t offset = 0;
262280
uint64_t inode = 0;
263281

282+
flb_plg_debug(ctx->ins, "db file set called for %s inode=%"PRIu64,
283+
file->name, file->inode);
284+
285+
ret = tail_db_lock(ctx);
286+
if (ret != 0) {
287+
flb_plg_error(ctx->ins, "db: could not acquire lock");
288+
return -1;
289+
}
290+
264291
/* Check if the file exists */
265292
ret = db_file_exists(file, ctx, &id, &inode, &offset);
266293
if (ret == -1) {
267294
flb_plg_error(ctx->ins, "cannot execute query to check inode: %" PRIu64,
268295
file->inode);
296+
tail_db_unlock(ctx);
269297
return -1;
270298
}
271299

@@ -283,6 +311,7 @@ int flb_tail_db_file_set(struct flb_tail_file *file,
283311
file->offset = offset;
284312
}
285313

314+
tail_db_unlock(ctx);
286315
return 0;
287316
}
288317

@@ -292,6 +321,12 @@ int flb_tail_db_file_offset(struct flb_tail_file *file,
292321
{
293322
int ret;
294323

324+
ret = tail_db_lock(ctx);
325+
if (ret != 0) {
326+
flb_plg_error(ctx->ins, "db: could not acquire lock");
327+
return -1;
328+
}
329+
295330
/* Bind parameters */
296331
sqlite3_bind_int64(ctx->stmt_offset, 1, file->offset);
297332
sqlite3_bind_int64(ctx->stmt_offset, 2, file->db_id);
@@ -301,6 +336,7 @@ int flb_tail_db_file_offset(struct flb_tail_file *file,
301336
if (ret != SQLITE_DONE) {
302337
sqlite3_clear_bindings(ctx->stmt_offset);
303338
sqlite3_reset(ctx->stmt_offset);
339+
tail_db_unlock(ctx);
304340
return -1;
305341
}
306342

@@ -317,6 +353,7 @@ int flb_tail_db_file_offset(struct flb_tail_file *file,
317353
sqlite3_clear_bindings(ctx->stmt_offset);
318354
sqlite3_reset(ctx->stmt_offset);
319355

356+
tail_db_unlock(ctx);
320357
return 0;
321358
}
322359

@@ -327,6 +364,12 @@ int flb_tail_db_file_rotate(const char *new_name,
327364
{
328365
int ret;
329366

367+
ret = tail_db_lock(ctx);
368+
if (ret != 0) {
369+
flb_plg_error(ctx->ins, "db: could not acquire lock");
370+
return -1;
371+
}
372+
330373
/* Bind parameters */
331374
sqlite3_bind_text(ctx->stmt_rotate_file, 1, new_name, -1, 0);
332375
sqlite3_bind_int64(ctx->stmt_rotate_file, 2, file->db_id);
@@ -337,9 +380,11 @@ int flb_tail_db_file_rotate(const char *new_name,
337380
sqlite3_reset(ctx->stmt_rotate_file);
338381

339382
if (ret != SQLITE_DONE) {
383+
tail_db_unlock(ctx);
340384
return -1;
341385
}
342386

387+
tail_db_unlock(ctx);
343388
return 0;
344389
}
345390

@@ -349,6 +394,12 @@ int flb_tail_db_file_delete(struct flb_tail_file *file,
349394
{
350395
int ret;
351396

397+
ret = tail_db_lock(ctx);
398+
if (ret != 0) {
399+
flb_plg_error(ctx->ins, "db: could not acquire lock");
400+
return -1;
401+
}
402+
352403
/* Bind parameters */
353404
sqlite3_bind_int64(ctx->stmt_delete_file, 1, file->db_id);
354405
ret = sqlite3_step(ctx->stmt_delete_file);
@@ -359,10 +410,12 @@ int flb_tail_db_file_delete(struct flb_tail_file *file,
359410
if (ret != SQLITE_DONE) {
360411
flb_plg_error(ctx->ins, "db: error deleting entry from database: %s",
361412
file->name);
413+
tail_db_unlock(ctx);
362414
return -1;
363415
}
364416

365417
flb_plg_debug(ctx->ins, "db: file deleted from database: %s", file->name);
418+
tail_db_unlock(ctx);
366419
return 0;
367420
}
368421

@@ -388,6 +441,12 @@ int flb_tail_db_stale_file_delete(struct flb_input_instance *ins,
388441
return 0;
389442
}
390443

444+
ret = tail_db_lock(ctx);
445+
if (ret != 0) {
446+
flb_plg_error(ctx->ins, "db: could not acquire lock");
447+
return -1;
448+
}
449+
391450
/* Create a stmt sql buffer */
392451
sql_size = SQL_DELETE_STALE_FILE_START_LEN;
393452
sql_size += SQL_DELETE_STALE_FILE_WHERE_LEN;
@@ -402,6 +461,7 @@ int flb_tail_db_stale_file_delete(struct flb_input_instance *ins,
402461
if (!stale_delete_sql) {
403462
flb_plg_error(ctx->ins, "cannot allocate buffer for stale_delete_sql:"
404463
" size: %zu", sql_size);
464+
tail_db_unlock(ctx);
405465
return -1;
406466
}
407467

@@ -412,6 +472,7 @@ int flb_tail_db_stale_file_delete(struct flb_input_instance *ins,
412472
flb_plg_error(ctx->ins,
413473
"error concatenating stale_delete_sql: start");
414474
flb_sds_destroy(stale_delete_sql);
475+
tail_db_unlock(ctx);
415476
return -1;
416477
}
417478
stale_delete_sql = sds_tmp;
@@ -423,6 +484,7 @@ int flb_tail_db_stale_file_delete(struct flb_input_instance *ins,
423484
flb_plg_error(ctx->ins,
424485
"error concatenating stale_delete_sql: where");
425486
flb_sds_destroy(stale_delete_sql);
487+
tail_db_unlock(ctx);
426488
return -1;
427489
}
428490
stale_delete_sql = sds_tmp;
@@ -432,6 +494,7 @@ int flb_tail_db_stale_file_delete(struct flb_input_instance *ins,
432494
flb_plg_error(ctx->ins,
433495
"error concatenating stale_delete_sql: param");
434496
flb_sds_destroy(stale_delete_sql);
497+
tail_db_unlock(ctx);
435498
return -1;
436499
}
437500
}
@@ -441,6 +504,7 @@ int flb_tail_db_stale_file_delete(struct flb_input_instance *ins,
441504
flb_plg_error(ctx->ins,
442505
"error concatenating stale_delete_sql: end");
443506
flb_sds_destroy(stale_delete_sql);
507+
tail_db_unlock(ctx);
444508
return -1;
445509
}
446510
stale_delete_sql = sds_tmp;
@@ -453,6 +517,7 @@ int flb_tail_db_stale_file_delete(struct flb_input_instance *ins,
453517
" stmt_delete_inodes sql:%s, ret=%d", stale_delete_sql,
454518
ret);
455519
flb_sds_destroy(stale_delete_sql);
520+
tail_db_unlock(ctx);
456521
return -1;
457522
}
458523

@@ -466,6 +531,7 @@ int flb_tail_db_stale_file_delete(struct flb_input_instance *ins,
466531
" inode=%" PRIu64 ", ret=%d", file->inode, ret);
467532
sqlite3_finalize(stmt_delete_inodes);
468533
flb_sds_destroy(stale_delete_sql);
534+
tail_db_unlock(ctx);
469535
return -1;
470536
}
471537
idx++;
@@ -478,6 +544,7 @@ int flb_tail_db_stale_file_delete(struct flb_input_instance *ins,
478544
flb_sds_destroy(stale_delete_sql);
479545
flb_plg_error(ctx->ins, "cannot execute delete stale inodes: ret=%d",
480546
ret);
547+
tail_db_unlock(ctx);
481548
return -1;
482549
}
483550

@@ -488,5 +555,6 @@ int flb_tail_db_stale_file_delete(struct flb_input_instance *ins,
488555
sqlite3_finalize(stmt_delete_inodes);
489556
flb_sds_destroy(stale_delete_sql);
490557

558+
tail_db_unlock(ctx);
491559
return 0;
492560
}

0 commit comments

Comments
 (0)