Skip to content

Commit bbf385b

Browse files
committed
out_kafka: avoid duplicate OTLP partition retries
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
1 parent b5ae018 commit bbf385b

1 file changed

Lines changed: 35 additions & 9 deletions

File tree

plugins/out_kafka/kafka.c

Lines changed: 35 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -520,9 +520,12 @@ int produce_message(struct flb_time *tm, msgpack_object *map,
520520
return FLB_OK;
521521
}
522522

523-
static int produce_raw_payload_with_key(const void *payload, size_t payload_size,
524-
char *key, size_t key_len,
525-
struct flb_out_kafka *ctx)
523+
static int produce_raw_payload_with_key_retry_control(const void *payload,
524+
size_t payload_size,
525+
char *key,
526+
size_t key_len,
527+
int allow_engine_retry,
528+
struct flb_out_kafka *ctx)
526529
{
527530
int ret;
528531
int queue_full_retries;
@@ -551,7 +554,8 @@ static int produce_raw_payload_with_key(const void *payload, size_t payload_size
551554
}
552555

553556
retry:
554-
if (ctx->queue_full_retries > 0 &&
557+
if (allow_engine_retry == FLB_TRUE &&
558+
ctx->queue_full_retries > 0 &&
555559
queue_full_retries >= ctx->queue_full_retries) {
556560
ctx->blocked = FLB_FALSE;
557561
return FLB_RETRY;
@@ -588,6 +592,18 @@ static int produce_raw_payload_with_key(const void *payload, size_t payload_size
588592
return FLB_OK;
589593
}
590594

595+
static int produce_raw_payload_with_key(const void *payload, size_t payload_size,
596+
char *key, size_t key_len,
597+
struct flb_out_kafka *ctx)
598+
{
599+
return produce_raw_payload_with_key_retry_control(payload,
600+
payload_size,
601+
key,
602+
key_len,
603+
FLB_TRUE,
604+
ctx);
605+
}
606+
591607
static int produce_raw_payload(const void *payload, size_t payload_size,
592608
struct flb_out_kafka *ctx)
593609
{
@@ -903,6 +919,7 @@ static int get_otlp_group_resource(msgpack_object *group_metadata,
903919
if (ret != 0) {
904920
return -1;
905921
}
922+
(void) scope_id;
906923

907924
return 0;
908925
}
@@ -931,11 +948,13 @@ static int produce_partitioned_otlp_logs(struct flb_out_kafka *ctx,
931948
struct otlp_logs_resource_partition *current_partition;
932949
struct otlp_logs_resource_partition *partitions;
933950
struct flb_opentelemetry_otlp_logs_options options;
951+
size_t produced_count;
934952
static const char *default_logs_body_keys[] = {"log", "message"};
935953

936954
partitions = NULL;
937955
partition_count = 0;
938956
current_partition = NULL;
957+
produced_count = 0;
939958

940959
ret = flb_log_event_decoder_init(&decoder,
941960
(char *) event_chunk->data,
@@ -1080,11 +1099,17 @@ static int produce_partitioned_otlp_logs(struct flb_out_kafka *ctx,
10801099
key_len = 0;
10811100
}
10821101

1083-
ret = produce_raw_payload_with_key(payload,
1084-
flb_sds_len(payload),
1085-
key,
1086-
key_len,
1087-
ctx);
1102+
/*
1103+
* Once a partition has been accepted by librdkafka, returning FLB_RETRY
1104+
* for a later queue-full condition would replay the original chunk and
1105+
* duplicate the partitions already enqueued.
1106+
*/
1107+
ret = produce_raw_payload_with_key_retry_control(payload,
1108+
flb_sds_len(payload),
1109+
key,
1110+
key_len,
1111+
produced_count == 0,
1112+
ctx);
10881113

10891114
if (format == FLB_KAFKA_FMT_OTLP_JSON) {
10901115
flb_sds_destroy(payload);
@@ -1096,6 +1121,7 @@ static int produce_partitioned_otlp_logs(struct flb_out_kafka *ctx,
10961121
if (ret != FLB_OK) {
10971122
goto cleanup;
10981123
}
1124+
produced_count++;
10991125
}
11001126

11011127
ret = FLB_OK;

0 commit comments

Comments
 (0)