Skip to content

Commit 6fbe59a

Browse files
committed
in_storage_backlog: Preserve DLQ on restarting
Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>
1 parent c22185e commit 6fbe59a

File tree

1 file changed

+25
-0
lines changed
  • plugins/in_storage_backlog

1 file changed

+25
-0
lines changed

plugins/in_storage_backlog/sb.c

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -331,6 +331,25 @@ static int sb_append_chunk_to_segregated_backlogs(struct cio_chunk *target_chun
331331
return 0;
332332
}
333333

334+
static inline int sb_is_rejected_stream(struct flb_config *config,
335+
struct cio_stream *stream)
336+
{
337+
const char *rp;
338+
339+
if (!config || !stream || !stream->name) {
340+
return FLB_FALSE;
341+
}
342+
343+
if (config->storage_keep_rejected != FLB_TRUE) {
344+
return FLB_FALSE;
345+
}
346+
347+
rp = config->storage_rejected_path ?
348+
config->storage_rejected_path : "rejected";
349+
350+
return strcmp(stream->name, rp) == 0;
351+
}
352+
334353
int sb_segregate_chunks(struct flb_config *config)
335354
{
336355
int ret;
@@ -357,6 +376,12 @@ int sb_segregate_chunks(struct flb_config *config)
357376
mk_list_foreach(stream_iterator, &context->cio->streams) {
358377
stream = mk_list_entry(stream_iterator, struct cio_stream, _head);
359378

379+
/* DLQ stream is not part of backlog. Just skip. */
380+
if (sb_is_rejected_stream(config, stream)) {
381+
flb_debug("[storage backlog] skipping DLQ stream '%s'", stream->name);
382+
continue;
383+
}
384+
360385
mk_list_foreach_safe(chunk_iterator, tmp, &stream->chunks) {
361386
chunk = mk_list_entry(chunk_iterator, struct cio_chunk, _head);
362387

0 commit comments

Comments
 (0)