Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 109 additions & 40 deletions src/flb_log_event_encoder.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,27 @@

#include <fluent-bit/flb_log_event_encoder.h>
#include <fluent-bit/flb_log_event_encoder_primitives.h>
#include <fluent-bit/flb_byteswap.h>
#include <stdarg.h>
#include <string.h>

/*
* Encoder hot-path optimization notes (emit_record / commit_record / metadata):
*
* 1. emit_record: For FLUENT_BIT_V2 we use a direct-emit path that writes
* the record string [ [timestamp, metadata], body ] directly into the
* main buffer (no root buffer build/copy), matching the original format.
*
* 2. Possible future improvements:
* - Batch metadata: add append_metadata_kv_batch() or allow multiple
* key-value pairs in one call to cut va_list and call overhead.
* - Inline / fast path: move begin_record/commit_record hot branches
* into a header so the compiler can inline for the common case.
* - Buffer growth: reserve space for the next record (e.g. typical
* metadata + body size) to reduce reallocs in msgpack_sbuffer.
* - dynamic_field_flush: when there is only one scope per field, avoid
* the scope loop (single commit then set data/size).
*/

void static inline flb_log_event_encoder_update_internal_state(
struct flb_log_event_encoder *context)
Expand Down Expand Up @@ -144,71 +164,120 @@ int flb_log_event_encoder_emit_raw_record(struct flb_log_event_encoder *context,
return result;
}

/*
* Optimized emit_record: for FLUENT_BIT_V2 appends the record as raw
* msgpack array bytes [ [timestamp, metadata], body ] to the main buffer
* (no root buffer build/copy), matching the slow path format.
*/
int flb_log_event_encoder_emit_record(struct flb_log_event_encoder *context)
{
int result;
int result;
char ts_buf[8];
char rec_header[5];
uint32_t sec;
uint32_t nsec;

if (context == NULL) {
return FLB_EVENT_ENCODER_ERROR_INVALID_CONTEXT;
}

result = FLB_EVENT_ENCODER_SUCCESS;

/* This function needs to be improved and optimized to avoid excessive
* memory copying operations.
*/

/* This conditional accounts for external raw record emission as
* performed by some filters using either
* flb_log_event_encoder_set_root_from_raw_msgpack
* or
* flb_log_event_encoder_set_root_from_msgpack_object
*/
if (context->root.size == 0) {
result = flb_log_event_encoder_root_begin_array(context);
if (context->format == FLB_LOG_EVENT_FORMAT_FLUENT_BIT_V2 &&
context->metadata.data != NULL && context->body.data != NULL) {
/*
* Direct-emit: append raw array bytes [ [timestamp, metadata], body ]
* (fixarray 2, fixarray 2, fixext 8 type 0, 8b ts, metadata, body).
* Do not wrap in a msgpack string; match slow path which uses
* msgpack_pack_str_body(root.data, root.size) with no str header.
*/
rec_header[0] = (char) 0x92;
rec_header[1] = (char) 0x92;
rec_header[2] = (char) 0xd7;
rec_header[3] = (char) 0x00;

result = msgpack_pack_str_body(&context->packer,
rec_header, (size_t) 4);
if (result == 0) {
sec = FLB_UINT32_TO_NETWORK_BYTE_ORDER((uint32_t) context->timestamp.tm.tv_sec);
nsec = FLB_UINT32_TO_NETWORK_BYTE_ORDER((uint32_t) context->timestamp.tm.tv_nsec);
memcpy(&ts_buf[0], &sec, 4);
memcpy(&ts_buf[4], &nsec, 4);
result = msgpack_pack_str_body(&context->packer, ts_buf, 8);
}
if (result == 0) {
result = msgpack_pack_str_body(&context->packer,
context->metadata.data,
context->metadata.size);
}
if (result == 0) {
result = msgpack_pack_str_body(&context->packer,
context->body.data,
context->body.size);
}
if (result != 0) {
result = FLB_EVENT_ENCODER_ERROR_SERIALIZATION_FAILURE;
}
else {
result = FLB_EVENT_ENCODER_SUCCESS;
}
}
else {
result = flb_log_event_encoder_root_begin_array(context);

if (context->format == FLB_LOG_EVENT_FORMAT_FLUENT_BIT_V2) {
if (result == FLB_EVENT_ENCODER_SUCCESS) {
result = flb_log_event_encoder_root_begin_array(context);
}
}

if (context->format == FLB_LOG_EVENT_FORMAT_FLUENT_BIT_V2) {
if (result == FLB_EVENT_ENCODER_SUCCESS) {
result = flb_log_event_encoder_root_begin_array(context);
result = flb_log_event_encoder_append_root_timestamp(
context, &context->timestamp);
}
}

if (result == FLB_EVENT_ENCODER_SUCCESS) {
result = flb_log_event_encoder_append_root_timestamp(
context, &context->timestamp);
}
if (context->format == FLB_LOG_EVENT_FORMAT_FLUENT_BIT_V2) {
if (result == FLB_EVENT_ENCODER_SUCCESS) {
result = flb_log_event_encoder_append_root_raw_msgpack(
context,
context->metadata.data,
context->metadata.size);
}

if (result == FLB_EVENT_ENCODER_SUCCESS) {
result = flb_log_event_encoder_root_commit_array(context);
}
}

if (context->format == FLB_LOG_EVENT_FORMAT_FLUENT_BIT_V2) {
if (result == FLB_EVENT_ENCODER_SUCCESS) {
result = flb_log_event_encoder_append_root_raw_msgpack(
context,
context->metadata.data,
context->metadata.size);
context->body.data,
context->body.size);
}

/* We need to explicitly commit the current array (which
* holds the timestamp and metadata elements so we leave
* that scope and go back to the root scope where we can
* append the body element.
*/
if (result == FLB_EVENT_ENCODER_SUCCESS) {
result = flb_log_event_encoder_root_commit_array(context);
result = flb_log_event_encoder_dynamic_field_flush(&context->root);
}
}

if (result == FLB_EVENT_ENCODER_SUCCESS) {
result = flb_log_event_encoder_append_root_raw_msgpack(
context,
context->body.data,
context->body.size);
}

if (result == FLB_EVENT_ENCODER_SUCCESS) {
result = flb_log_event_encoder_dynamic_field_flush(&context->root);
if (result == FLB_EVENT_ENCODER_SUCCESS &&
context->root.data != NULL && context->root.size > 0) {
result = msgpack_pack_str_body(&context->packer,
context->root.data,
context->root.size);

if (result != 0) {
result = FLB_EVENT_ENCODER_ERROR_SERIALIZATION_FAILURE;
}
else {
result = FLB_EVENT_ENCODER_SUCCESS;
}
}
}
}

if (result == FLB_EVENT_ENCODER_SUCCESS) {
else if (result == FLB_EVENT_ENCODER_SUCCESS &&
context->root.data != NULL && context->root.size > 0) {
result = msgpack_pack_str_body(&context->packer,
context->root.data,
context->root.size);
Expand Down
Loading