Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 73 additions & 50 deletions exporters/otlp/src/otlp_file_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "opentelemetry/exporters/otlp/protobuf_include_suffix.h" // IWYU pragma: keep
// clang-format on

#include "opentelemetry/common/macros.h"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

macros.h is not needed.

By convention, including version.h also provides macros.h, because it contains the magic line:

#include "opentelemetry/common/macros.h"  // IWYU pragma: export

This was done because every file depends on version.h anyway, so code can reliably make use of various defines like OPENTELEMETRY_HAVE_EXCEPTIONS without having to remember to include macros.h.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reported by include-what-you-use as:

/home/runner/work/opentelemetry-cpp/opentelemetry-cpp/exporters/otlp/src/otlp_file_client.cc should remove these lines:
- #include "opentelemetry/common/macros.h"  // lines 23-23

because of the export pragma in version.h.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, it's removed now.

#include "opentelemetry/nostd/string_view.h"
#include "opentelemetry/nostd/variant.h"
#include "opentelemetry/sdk/common/base64.h"
Expand Down Expand Up @@ -47,6 +48,9 @@
#include <thread>
#include <utility>
#include <vector>
#if OPENTELEMETRY_HAVE_EXCEPTIONS
# include <exception>
#endif

#if !defined(__CYGWIN__) && defined(_WIN32)
# ifndef WIN32_LEAN_AND_MEAN
Expand Down Expand Up @@ -1424,71 +1428,90 @@ class OPENTELEMETRY_LOCAL_SYMBOL OtlpFileSystemBackend : public OtlpFileAppender
return;
}

std::lock_guard<std::mutex> lock_guard_caller{file_->background_thread_lock};
if (file_->background_flush_thread)
#if OPENTELEMETRY_HAVE_EXCEPTIONS
try
{
return;
}

std::shared_ptr<FileStats> concurrency_file = file_;
std::chrono::microseconds flush_interval = options_.flush_interval;
file_->background_flush_thread.reset(new std::thread([concurrency_file, flush_interval]() {
std::chrono::system_clock::time_point last_free_job_timepoint =
std::chrono::system_clock::now();
std::size_t last_record_count = 0;
#endif

while (true)
std::lock_guard<std::mutex> lock_guard_caller{file_->background_thread_lock};
if (file_->background_flush_thread)
{
std::chrono::system_clock::time_point now = std::chrono::system_clock::now();
// Exit flush thread if there is not data to flush more than one minute.
if (now - last_free_job_timepoint > std::chrono::minutes{1})
{
break;
}
return;
}

if (concurrency_file->is_shutdown.load(std::memory_order_acquire))
{
break;
}
std::shared_ptr<FileStats> concurrency_file = file_;
std::chrono::microseconds flush_interval = options_.flush_interval;
file_->background_flush_thread.reset(new std::thread([concurrency_file, flush_interval]() {
std::chrono::system_clock::time_point last_free_job_timepoint =
std::chrono::system_clock::now();
std::size_t last_record_count = 0;

while (true)
{
std::unique_lock<std::mutex> lk(concurrency_file->background_thread_waker_lock);
concurrency_file->background_thread_waker_cv.wait_for(lk, flush_interval);
}
std::chrono::system_clock::time_point now = std::chrono::system_clock::now();
// Exit flush thread if there is not data to flush more than one minute.
if (now - last_free_job_timepoint > std::chrono::minutes{1})
{
break;
}

{
std::size_t current_record_count =
concurrency_file->record_count.load(std::memory_order_acquire);
std::lock_guard<std::mutex> lock_guard{concurrency_file->file_lock};
if (current_record_count != last_record_count)
if (concurrency_file->is_shutdown.load(std::memory_order_acquire))
{
last_record_count = current_record_count;
last_free_job_timepoint = std::chrono::system_clock::now();
break;
}

if (concurrency_file->current_file)
{
fflush(concurrency_file->current_file.get());
std::unique_lock<std::mutex> lk(concurrency_file->background_thread_waker_lock);
concurrency_file->background_thread_waker_cv.wait_for(lk, flush_interval);
}

concurrency_file->flushed_record_count.store(current_record_count,
std::memory_order_release);
}
{
std::size_t current_record_count =
concurrency_file->record_count.load(std::memory_order_acquire);
std::lock_guard<std::mutex> lock_guard{concurrency_file->file_lock};
if (current_record_count != last_record_count)
{
last_record_count = current_record_count;
last_free_job_timepoint = std::chrono::system_clock::now();
}

concurrency_file->background_thread_waiter_cv.notify_all();
}
if (concurrency_file->current_file)
{
fflush(concurrency_file->current_file.get());
}

// Detach running thread because it will exit soon
std::unique_ptr<std::thread> background_flush_thread;
{
std::lock_guard<std::mutex> lock_guard_inner{concurrency_file->background_thread_lock};
background_flush_thread.swap(concurrency_file->background_flush_thread);
}
if (background_flush_thread && background_flush_thread->joinable())
{
background_flush_thread->detach();
}
}));
concurrency_file->flushed_record_count.store(current_record_count,
std::memory_order_release);
}

concurrency_file->background_thread_waiter_cv.notify_all();
}

// Detach running thread because it will exit soon
std::unique_ptr<std::thread> background_flush_thread;
{
std::lock_guard<std::mutex> lock_guard_inner{concurrency_file->background_thread_lock};
background_flush_thread.swap(concurrency_file->background_flush_thread);
}
if (background_flush_thread && background_flush_thread->joinable())
{
background_flush_thread->detach();
}
}));
#if OPENTELEMETRY_HAVE_EXCEPTIONS
}
catch (std::exception &e)
{
OTEL_INTERNAL_LOG_WARN("[OTLP FILE Client] Try to spawn background but got a exception: "
<< e.what() << ".Data writing may experience some delays.");
}
catch (...)
{
OTEL_INTERNAL_LOG_WARN(
"[OTLP FILE Client] Try to spawn background but got a unknown exception.Data writing may "
"experience some delays.");
}
#endif
}

private:
Expand Down