Skip to content
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -2400,6 +2400,18 @@ grpc_cc_library(
],
)

# This is an EXPERIMENTAL target subject to change.
grpc_cc_library(
name = "grpcpp_otel_plugin",
hdrs = [
"include/grpcpp/ext/otel_plugin.h",
],
language = "c++",
deps = [
"//src/cpp/ext/otel:otel_plugin",
],
)

grpc_cc_library(
name = "work_serializer",
srcs = [
Expand Down
11 changes: 8 additions & 3 deletions include/grpcpp/ext/csm_observability.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,12 @@
#include "absl/strings/string_view.h"
#include "opentelemetry/sdk/metrics/meter_provider.h"

#include "src/cpp/ext/otel/otel_plugin.h"

namespace grpc {

namespace internal {
class OpenTelemetryPluginBuilderImpl;
} // namespace internal

namespace experimental {

// This is a no-op at present, but in the future, this object would be useful
Expand All @@ -41,6 +44,8 @@ class CsmObservability {};
// for a binary running on CSM.
class CsmObservabilityBuilder {
public:
CsmObservabilityBuilder();
~CsmObservabilityBuilder();
CsmObservabilityBuilder& SetMeterProvider(
std::shared_ptr<opentelemetry::sdk::metrics::MeterProvider>
meter_provider);
Expand Down Expand Up @@ -80,7 +85,7 @@ class CsmObservabilityBuilder {
absl::StatusOr<CsmObservability> BuildAndRegister();

private:
internal::OpenTelemetryPluginBuilder builder_;
std::unique_ptr<grpc::internal::OpenTelemetryPluginBuilderImpl> builder_;
};

} // namespace experimental
Expand Down
108 changes: 108 additions & 0 deletions include/grpcpp/ext/otel_plugin.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
//
//
// Copyright 2023 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
//

#ifndef GRPCPP_EXT_OTEL_PLUGIN_H
#define GRPCPP_EXT_OTEL_PLUGIN_H

#include <grpc/support/port_platform.h>

#include <stddef.h>
#include <stdint.h>

#include <memory>

#include "absl/functional/any_invocable.h"
#include "absl/strings/string_view.h"
#include "opentelemetry/metrics/meter_provider.h"

namespace grpc {

namespace internal {
class OpenTelemetryPluginBuilderImpl;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume that once we have the plugin option API in place, there will no longer be any need for having two different layers here (the private API wrapped inside of the public API)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think once the experimental CsmObservability API goes away, we won't need to do this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like we need the CSM plugin option implemented before we can remove the CsmObservability API. And once we have the CSM plugin option implemented, the CsmObservability code can use that plugin option in the public API, so it won't need to reach inside the internal API anymore. Or am I missing something here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see what you mean. I wasn't originally planning on changing the implementation of CsmObservability to use the plugin option, but I like that idea. I will be able to remove the internal API earlier. Will do that. Thanks!

} // namespace internal

namespace experimental {

/// The most common way to use this API is -
///
/// OpenTelemetryPluginBuilder().SetMeterProvider(provider).BuildAndRegister();
///
/// The set of instruments available are -
/// grpc.client.attempt.started
/// grpc.client.attempt.duration
/// grpc.client.attempt.sent_total_compressed_message_size
/// grpc.client.attempt.rcvd_total_compressed_message_size
/// grpc.server.call.started
/// grpc.server.call.duration
/// grpc.server.call.sent_total_compressed_message_size
/// grpc.server.call.rcvd_total_compressed_message_size
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably no need to list the metrics in the comment now that the constants are in the header file directly.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

class OpenTelemetryPluginBuilder {
public:
/// Metrics
static constexpr absl::string_view kClientAttemptStartedInstrumentName =
"grpc.client.attempt.started";
static constexpr absl::string_view kClientAttemptDurationInstrumentName =
"grpc.client.attempt.duration";
static constexpr absl::string_view
kClientAttemptSentTotalCompressedMessageSizeInstrumentName =
"grpc.client.attempt.sent_total_compressed_message_size";
static constexpr absl::string_view
kClientAttemptRcvdTotalCompressedMessageSizeInstrumentName =
"grpc.client.attempt.rcvd_total_compressed_message_size";
static constexpr absl::string_view kServerCallStartedInstrumentName =
"grpc.server.call.started";
static constexpr absl::string_view kServerCallDurationInstrumentName =
"grpc.server.call.duration";
static constexpr absl::string_view
kServerCallSentTotalCompressedMessageSizeInstrumentName =
"grpc.server.call.sent_total_compressed_message_size";
static constexpr absl::string_view
kServerCallRcvdTotalCompressedMessageSizeInstrumentName =
"grpc.server.call.rcvd_total_compressed_message_size";

OpenTelemetryPluginBuilder();
/// If `SetMeterProvider()` is not called, no metrics are collected.
OpenTelemetryPluginBuilder& SetMeterProvider(
std::shared_ptr<opentelemetry::metrics::MeterProvider> meter_provider);
/// If set, \a target_attribute_filter is called per channel to decide whether
/// to record the target attribute on client or to replace it with "other".
/// This helps reduce the cardinality on metrics in cases where many channels
/// are created with different targets in the same binary (which might happen
/// for example, if the channel target string uses IP addresses directly).
OpenTelemetryPluginBuilder& SetTargetAttributeFilter(
absl::AnyInvocable<bool(absl::string_view /*target*/) const>
target_attribute_filter);
/// If set, \a generic_method_attribute_filter is called per call with a
/// generic method type to decide whether to record the method name or to
/// replace it with "other". Non-generic or pre-registered methods remain
/// unaffected. If not set, by default, generic method names are replaced with
/// "other" when recording metrics.
OpenTelemetryPluginBuilder& SetGenericMethodAttributeFilter(
absl::AnyInvocable<bool(absl::string_view /*generic_method*/) const>
generic_method_attribute_filter);
/// Registers a global plugin that acts on all channels and servers running on
/// the process.
void BuildAndRegisterGlobal();

private:
std::unique_ptr<internal::OpenTelemetryPluginBuilderImpl> impl_;
};
} // namespace experimental
} // namespace grpc

#endif // GRPCPP_EXT_OTEL_PLUGIN_H
20 changes: 13 additions & 7 deletions src/cpp/ext/csm/csm_observability.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,40 +48,46 @@ namespace experimental {
// CsmObservabilityBuilder
//

CsmObservabilityBuilder::CsmObservabilityBuilder()
: builder_(
std::make_unique<grpc::internal::OpenTelemetryPluginBuilderImpl>()) {}

CsmObservabilityBuilder::~CsmObservabilityBuilder() = default;

CsmObservabilityBuilder& CsmObservabilityBuilder::SetMeterProvider(
std::shared_ptr<opentelemetry::sdk::metrics::MeterProvider>
meter_provider) {
builder_.SetMeterProvider(meter_provider);
builder_->SetMeterProvider(meter_provider);
return *this;
}

CsmObservabilityBuilder& CsmObservabilityBuilder::SetTargetAttributeFilter(
absl::AnyInvocable<bool(absl::string_view /*target*/) const>
target_attribute_filter) {
builder_.SetTargetAttributeFilter(std::move(target_attribute_filter));
builder_->SetTargetAttributeFilter(std::move(target_attribute_filter));
return *this;
}

CsmObservabilityBuilder&
CsmObservabilityBuilder::SetGenericMethodAttributeFilter(
absl::AnyInvocable<bool(absl::string_view /*generic_method*/) const>
generic_method_attribute_filter) {
builder_.SetGenericMethodAttributeFilter(
builder_->SetGenericMethodAttributeFilter(
std::move(generic_method_attribute_filter));
return *this;
}

absl::StatusOr<CsmObservability> CsmObservabilityBuilder::BuildAndRegister() {
builder_.SetServerSelector([](const grpc_core::ChannelArgs& args) {
builder_->SetServerSelector([](const grpc_core::ChannelArgs& args) {
return args.GetBool(GRPC_ARG_XDS_ENABLED_SERVER).value_or(false);
});
builder_.SetTargetSelector(internal::CsmChannelTargetSelector);
builder_.SetLabelsInjector(
builder_->SetTargetSelector(internal::CsmChannelTargetSelector);
builder_->SetLabelsInjector(
std::make_unique<internal::ServiceMeshLabelsInjector>(
google::cloud::otel::MakeResourceDetector()
->Detect()
.GetAttributes()));
builder_.BuildAndRegisterGlobal();
builder_->BuildAndRegisterGlobal();
return CsmObservability();
}

Expand Down
1 change: 1 addition & 0 deletions src/cpp/ext/otel/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ grpc_cc_library(
"otel_client_filter.h",
"otel_plugin.h",
"otel_server_call_tracer.h",
"//:include/grpcpp/ext/otel_plugin.h",
],
external_deps = [
"absl/base:core_headers",
Expand Down
10 changes: 5 additions & 5 deletions src/cpp/ext/otel/key_value_iterable.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
namespace grpc {
namespace internal {

inline opentelemetry::nostd::string_view AbslStrViewToOTelStrView(
inline opentelemetry::nostd::string_view AbslStrViewToOpenTelemetryStrView(
absl::string_view str) {
return opentelemetry::nostd::string_view(str.data(), str.size());
}
Expand All @@ -62,15 +62,15 @@ class KeyValueIterable : public opentelemetry::common::KeyValueIterable {
if (injected_labels_iterable_ != nullptr) {
injected_labels_iterable_->ResetIteratorPosition();
while (const auto& pair = injected_labels_iterable_->Next()) {
if (!callback(AbslStrViewToOTelStrView(pair->first),
AbslStrViewToOTelStrView(pair->second))) {
if (!callback(AbslStrViewToOpenTelemetryStrView(pair->first),
AbslStrViewToOpenTelemetryStrView(pair->second))) {
return false;
}
}
}
for (const auto& pair : additional_labels_) {
if (!callback(AbslStrViewToOTelStrView(pair.first),
AbslStrViewToOTelStrView(pair.second))) {
if (!callback(AbslStrViewToOpenTelemetryStrView(pair.first),
AbslStrViewToOpenTelemetryStrView(pair.second))) {
return false;
}
}
Expand Down
74 changes: 39 additions & 35 deletions src/cpp/ext/otel/otel_client_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ absl::StatusOr<OpenTelemetryClientFilter> OpenTelemetryClientFilter::Create(
std::string target = args.GetOwnedString(GRPC_ARG_SERVER_URI).value_or("");
// Use the original target string only if a filter on the attribute is not
// registered or if the filter returns true, otherwise use "other".
if (OTelPluginState().target_attribute_filter == nullptr ||
OTelPluginState().target_attribute_filter(target)) {
if (OpenTelemetryPluginState().target_attribute_filter == nullptr ||
OpenTelemetryPluginState().target_attribute_filter(target)) {
return OpenTelemetryClientFilter(std::move(target));
}
return OpenTelemetryClientFilter("other");
Expand Down Expand Up @@ -116,31 +116,32 @@ OpenTelemetryCallTracer::OpenTelemetryCallAttemptTracer::
: parent_(parent),
arena_allocated_(arena_allocated),
start_time_(absl::Now()) {
if (OTelPluginState().client.attempt.started != nullptr) {
if (OpenTelemetryPluginState().client.attempt.started != nullptr) {
std::array<std::pair<absl::string_view, absl::string_view>, 2>
additional_labels = {{{OTelMethodKey(), parent_->MethodForStats()},
{OTelTargetKey(), parent_->parent_->target()}}};
additional_labels = {
{{OpenTelemetryMethodKey(), parent_->MethodForStats()},
{OpenTelemetryTargetKey(), parent_->parent_->target()}}};
// We might not have all the injected labels that we want at this point, so
// avoid recording a subset of injected labels here.
OTelPluginState().client.attempt.started->Add(
OpenTelemetryPluginState().client.attempt.started->Add(
1, KeyValueIterable(/*injected_labels_iterable=*/nullptr,
additional_labels));
}
}

void OpenTelemetryCallTracer::OpenTelemetryCallAttemptTracer::
RecordReceivedInitialMetadata(grpc_metadata_batch* recv_initial_metadata) {
if (OTelPluginState().labels_injector != nullptr) {
injected_labels_ =
OTelPluginState().labels_injector->GetLabels(recv_initial_metadata);
if (OpenTelemetryPluginState().labels_injector != nullptr) {
injected_labels_ = OpenTelemetryPluginState().labels_injector->GetLabels(
recv_initial_metadata);
}
}

void OpenTelemetryCallTracer::OpenTelemetryCallAttemptTracer::
RecordSendInitialMetadata(grpc_metadata_batch* send_initial_metadata) {
if (OTelPluginState().labels_injector != nullptr) {
OTelPluginState().labels_injector->AddLabels(send_initial_metadata,
nullptr);
if (OpenTelemetryPluginState().labels_injector != nullptr) {
OpenTelemetryPluginState().labels_injector->AddLabels(send_initial_metadata,
nullptr);
}
}

Expand Down Expand Up @@ -175,32 +176,35 @@ void OpenTelemetryCallTracer::OpenTelemetryCallAttemptTracer::
absl::Status status, grpc_metadata_batch* /*recv_trailing_metadata*/,
const grpc_transport_stream_stats* transport_stream_stats) {
std::array<std::pair<absl::string_view, absl::string_view>, 3>
additional_labels = {{{OTelMethodKey(), parent_->MethodForStats()},
{OTelTargetKey(), parent_->parent_->target()},
{OTelStatusKey(), grpc_status_code_to_string(
static_cast<grpc_status_code>(
status.code()))}}};
additional_labels = {
{{OpenTelemetryMethodKey(), parent_->MethodForStats()},
{OpenTelemetryTargetKey(), parent_->parent_->target()},
{OpenTelemetryStatusKey(),
grpc_status_code_to_string(
static_cast<grpc_status_code>(status.code()))}}};
KeyValueIterable labels(injected_labels_.get(), additional_labels);
if (OTelPluginState().client.attempt.duration != nullptr) {
OTelPluginState().client.attempt.duration->Record(
if (OpenTelemetryPluginState().client.attempt.duration != nullptr) {
OpenTelemetryPluginState().client.attempt.duration->Record(
absl::ToDoubleSeconds(absl::Now() - start_time_), labels,
opentelemetry::context::Context{});
}
if (OTelPluginState().client.attempt.sent_total_compressed_message_size !=
nullptr) {
OTelPluginState().client.attempt.sent_total_compressed_message_size->Record(
transport_stream_stats != nullptr
? transport_stream_stats->outgoing.data_bytes
: 0,
labels, opentelemetry::context::Context{});
if (OpenTelemetryPluginState()
.client.attempt.sent_total_compressed_message_size != nullptr) {
OpenTelemetryPluginState()
.client.attempt.sent_total_compressed_message_size->Record(
transport_stream_stats != nullptr
? transport_stream_stats->outgoing.data_bytes
: 0,
labels, opentelemetry::context::Context{});
}
if (OTelPluginState().client.attempt.rcvd_total_compressed_message_size !=
nullptr) {
OTelPluginState().client.attempt.rcvd_total_compressed_message_size->Record(
transport_stream_stats != nullptr
? transport_stream_stats->incoming.data_bytes
: 0,
labels, opentelemetry::context::Context{});
if (OpenTelemetryPluginState()
.client.attempt.rcvd_total_compressed_message_size != nullptr) {
OpenTelemetryPluginState()
.client.attempt.rcvd_total_compressed_message_size->Record(
transport_stream_stats != nullptr
? transport_stream_stats->incoming.data_bytes
: 0,
labels, opentelemetry::context::Context{});
}
}

Expand Down Expand Up @@ -273,8 +277,8 @@ OpenTelemetryCallTracer::StartNewAttempt(bool is_transparent_retry) {
absl::string_view OpenTelemetryCallTracer::MethodForStats() const {
absl::string_view method = absl::StripPrefix(path_.as_string_view(), "/");
if (registered_method_ ||
(OTelPluginState().generic_method_attribute_filter != nullptr &&
OTelPluginState().generic_method_attribute_filter(method))) {
(OpenTelemetryPluginState().generic_method_attribute_filter != nullptr &&
OpenTelemetryPluginState().generic_method_attribute_filter(method))) {
return method;
}
return "other";
Expand Down
Loading