From f5741426b2bdc05a8b49e51df019dc127bd071ae Mon Sep 17 00:00:00 2001 From: Sean Kenny Date: Thu, 16 Nov 2023 15:53:52 +0000 Subject: [PATCH 1/5] Add test for aborting grpc.aio server RPC with trailing_metadata This is basically a copy of the existing test_abort but passes trailing_metadata. This test fails because the `code()` and `details()` from the returned exception are masking the actual abort reason due to the bug. --- .../tests/test_aio_server_interceptor.py | 75 ++++++++++++++++++- 1 file changed, 72 insertions(+), 3 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_server_interceptor.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_server_interceptor.py index 52391124b7..abaf54c596 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_server_interceptor.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_server_interceptor.py @@ -88,8 +88,11 @@ async def run_with_test_server( channel = grpc.aio.insecure_channel(f"localhost:{port:d}") await server.start() - resp = await runnable(channel) - await server.stop(1000) + + try: + resp = await runnable(channel) + finally: + await server.stop(1000) return resp @@ -514,9 +517,75 @@ async def request(channel): request = Request(client_id=1, request_data=failure_message) msg = request.SerializeToString() - with testcase.assertRaises(Exception): + with testcase.assertRaises(grpc.RpcError) as cm: await channel.unary_unary(rpc_call)(msg) + self.assertEqual(cm.exception.code(), grpc.StatusCode.FAILED_PRECONDITION) + self.assertEqual(cm.exception.details(), failure_message) + + await run_with_test_server(request, servicer=AbortServicer()) + + spans_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans_list), 1) + span = spans_list[0] + + self.assertEqual(span.name, rpc_call) + self.assertIs(span.kind, trace.SpanKind.SERVER) + + # Check version and name in span's instrumentation info + self.assertEqualSpanInstrumentationInfo( + span, opentelemetry.instrumentation.grpc + ) + + # make sure this span errored, with the right status and detail + self.assertEqual(span.status.status_code, StatusCode.ERROR) + self.assertEqual( + span.status.description, + f"{grpc.StatusCode.FAILED_PRECONDITION}:{failure_message}", + ) + + # Check attributes + self.assertSpanHasAttributes( + span, + { + SpanAttributes.NET_PEER_IP: "[::1]", + SpanAttributes.NET_PEER_NAME: "localhost", + SpanAttributes.RPC_METHOD: "SimpleMethod", + SpanAttributes.RPC_SERVICE: "GRPCTestServer", + SpanAttributes.RPC_SYSTEM: "grpc", + SpanAttributes.RPC_GRPC_STATUS_CODE: grpc.StatusCode.FAILED_PRECONDITION.value[ + 0 + ], + }, + ) + + async def test_abort_with_trailing_metadata(self): + """Check that we can catch an abort properly when trailing_metadata provided""" + rpc_call = "/GRPCTestServer/SimpleMethod" + failure_message = "failure message" + + class AbortServicer(GRPCTestServerServicer): + # pylint:disable=C0103 + async def SimpleMethod(self, request, context): + metadata = ( + ("meta", "data") + ) + await context.abort( + grpc.StatusCode.FAILED_PRECONDITION, failure_message, trailing_metadata=metadata + ) + + testcase = self + + async def request(channel): + request = Request(client_id=1, request_data=failure_message) + msg = request.SerializeToString() + + with testcase.assertRaises(grpc.RpcError) as cm: + await channel.unary_unary(rpc_call)(msg) + + self.assertEqual(cm.exception.code(), grpc.StatusCode.FAILED_PRECONDITION) + self.assertEqual(cm.exception.details(), failure_message) + await run_with_test_server(request, servicer=AbortServicer()) spans_list = self.memory_exporter.get_finished_spans() From 8c3471c9582154189242f7b59c49e8ab065ec698 Mon Sep 17 00:00:00 2001 From: Sean Kenny Date: Thu, 16 Nov 2023 17:07:11 +0000 Subject: [PATCH 2/5] Switch grpc.aio interceptor to use correct context This adds a separate ServicerContext for the grpc.aio interceptor to use that inherits from the correct grpc.aio.ServicerContext. Previously it was ultimately using grpc.ServicerContext where the abort method has a different signature to what should have been in use. --- .../instrumentation/grpc/_aio_server.py | 125 +++++++++++++++++- .../tests/test_aio_server_interceptor.py | 2 +- 2 files changed, 123 insertions(+), 4 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_aio_server.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_aio_server.py index d64dcf000b..125ac5bef4 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_aio_server.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_aio_server.py @@ -12,14 +12,133 @@ # See the License for the specific language governing permissions and # limitations under the License. +import grpc import grpc.aio from ._server import ( OpenTelemetryServerInterceptor, - _OpenTelemetryServicerContext, _wrap_rpc_behavior, ) +from opentelemetry.semconv.trace import SpanAttributes +from opentelemetry.trace.status import Status, StatusCode + +# pylint:disable=abstract-method +class _OpenTelemetryAioServicerContext(grpc.aio.ServicerContext): + def __init__(self, servicer_context, active_span): + self._servicer_context = servicer_context + self._active_span = active_span + self._code = grpc.StatusCode.OK + self._details = None + super().__init__() + + def __getattr__(self, attr): + return getattr(self._servicer_context, attr) + + async def read(self): + return await self._servicer_context.read() + + async def write(self, message): + return await self._servicer_context.write(message) + + def is_active(self, *args, **kwargs): + return self._servicer_context.is_active(*args, **kwargs) + + def time_remaining(self, *args, **kwargs): + return self._servicer_context.time_remaining(*args, **kwargs) + + def cancel(self, *args, **kwargs): + return self._servicer_context.cancel(*args, **kwargs) + + def add_callback(self, *args, **kwargs): + return self._servicer_context.add_callback(*args, **kwargs) + + def disable_next_message_compression(self): + return self._service_context.disable_next_message_compression() + + def invocation_metadata(self, *args, **kwargs): + return self._servicer_context.invocation_metadata(*args, **kwargs) + + def peer(self): + return self._servicer_context.peer() + + def peer_identities(self): + return self._servicer_context.peer_identities() + + def peer_identity_key(self): + return self._servicer_context.peer_identity_key() + + def auth_context(self): + return self._servicer_context.auth_context() + + def set_compression(self, compression): + return self._servicer_context.set_compression(compression) + + async def send_initial_metadata(self, *args, **kwargs): + return await self._servicer_context.send_initial_metadata(*args, **kwargs) + + def set_trailing_metadata(self, *args, **kwargs): + return self._servicer_context.set_trailing_metadata(*args, **kwargs) + + def trailing_metadata(self): + return self._servicer_context.trailing_metadata() + + async def abort(self, code, details = "", trailing_metadata = tuple()): + self._code = code + self._details = details + self._active_span.set_attribute( + SpanAttributes.RPC_GRPC_STATUS_CODE, code.value[0] + ) + self._active_span.set_status( + Status( + status_code=StatusCode.ERROR, + description=f"{code}:{details}", + ) + ) + return await self._servicer_context.abort(code, details, trailing_metadata) + + def code(self): + if not hasattr(self._servicer_context, "code"): + raise RuntimeError( + "code() is not supported with the installed version of grpcio" + ) + return self._servicer_context.code() + + def details(self): + if not hasattr(self._servicer_context, "details"): + raise RuntimeError( + "details() is not supported with the installed version of " + "grpcio" + ) + return self._servicer_context.details() + + def set_code(self, code): + self._code = code + # use details if we already have it, otherwise the status description + details = self._details or code.value[1] + self._active_span.set_attribute( + SpanAttributes.RPC_GRPC_STATUS_CODE, code.value[0] + ) + if code != grpc.StatusCode.OK: + self._active_span.set_status( + Status( + status_code=StatusCode.ERROR, + description=f"{code}:{details}", + ) + ) + return self._servicer_context.set_code(code) + + def set_details(self, details): + self._details = details + if self._code != grpc.StatusCode.OK: + self._active_span.set_status( + Status( + status_code=StatusCode.ERROR, + description=f"{self._code}:{details}", + ) + ) + return self._servicer_context.set_details(details) + class OpenTelemetryAioServerInterceptor( grpc.aio.ServerInterceptor, OpenTelemetryServerInterceptor @@ -66,7 +185,7 @@ async def _unary_interceptor(request_or_iterator, context): set_status_on_exception=False, ) as span: # wrap the context - context = _OpenTelemetryServicerContext(context, span) + context = _OpenTelemetryAioServicerContext(context, span) # And now we run the actual RPC. try: @@ -91,7 +210,7 @@ async def _stream_interceptor(request_or_iterator, context): context, set_status_on_exception=False, ) as span: - context = _OpenTelemetryServicerContext(context, span) + context = _OpenTelemetryAioServicerContext(context, span) try: async for response in behavior( diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_server_interceptor.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_server_interceptor.py index abaf54c596..09bab13e3b 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_server_interceptor.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_server_interceptor.py @@ -568,7 +568,7 @@ class AbortServicer(GRPCTestServerServicer): # pylint:disable=C0103 async def SimpleMethod(self, request, context): metadata = ( - ("meta", "data") + ("meta", "data"), ) await context.abort( grpc.StatusCode.FAILED_PRECONDITION, failure_message, trailing_metadata=metadata From b311422c41202c37036c7cc9061d1e226d39eba7 Mon Sep 17 00:00:00 2001 From: Sean Kenny Date: Thu, 16 Nov 2023 17:28:02 +0000 Subject: [PATCH 3/5] Update CHANGELOG --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index d4f1db1959..ad2302aa54 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `opentelemetry-instrumentation` Added Otel semantic convention opt-in mechanism ([#1987](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1987)) +- `opentelemetry-instrument-grpc` Fix arity of context.abort for AIO RPCs + ([#2066](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2066)) ## Version 1.21.0/0.42b0 (2023-11-01) From dda47a1b29ebf5d3fb9b2fa01d9f57b89c8003d9 Mon Sep 17 00:00:00 2001 From: Sean Kenny Date: Tue, 21 Nov 2023 11:06:07 +0000 Subject: [PATCH 4/5] Use wrapt.ObjectProxy for _OpenTelemetryAioServicerContext Using wrapt eliminates a bunch of the boilerplate associated with inheriting from grpc.aio.ServicerContext directly. It also means that if methods are added or their signatures change, there will only be a breaking change for abort, set_code, or set_details. --- .../instrumentation/grpc/_aio_server.py | 114 ++++-------------- 1 file changed, 23 insertions(+), 91 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_aio_server.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_aio_server.py index 125ac5bef4..fe4355d19e 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_aio_server.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_aio_server.py @@ -14,6 +14,7 @@ import grpc import grpc.aio +import wrapt from ._server import ( OpenTelemetryServerInterceptor, @@ -23,121 +24,52 @@ from opentelemetry.semconv.trace import SpanAttributes from opentelemetry.trace.status import Status, StatusCode -# pylint:disable=abstract-method -class _OpenTelemetryAioServicerContext(grpc.aio.ServicerContext): +class _OpenTelemetryAioServicerContext(wrapt.ObjectProxy): def __init__(self, servicer_context, active_span): - self._servicer_context = servicer_context - self._active_span = active_span - self._code = grpc.StatusCode.OK - self._details = None - super().__init__() - - def __getattr__(self, attr): - return getattr(self._servicer_context, attr) - - async def read(self): - return await self._servicer_context.read() - - async def write(self, message): - return await self._servicer_context.write(message) - - def is_active(self, *args, **kwargs): - return self._servicer_context.is_active(*args, **kwargs) - - def time_remaining(self, *args, **kwargs): - return self._servicer_context.time_remaining(*args, **kwargs) - - def cancel(self, *args, **kwargs): - return self._servicer_context.cancel(*args, **kwargs) - - def add_callback(self, *args, **kwargs): - return self._servicer_context.add_callback(*args, **kwargs) - - def disable_next_message_compression(self): - return self._service_context.disable_next_message_compression() - - def invocation_metadata(self, *args, **kwargs): - return self._servicer_context.invocation_metadata(*args, **kwargs) - - def peer(self): - return self._servicer_context.peer() - - def peer_identities(self): - return self._servicer_context.peer_identities() - - def peer_identity_key(self): - return self._servicer_context.peer_identity_key() - - def auth_context(self): - return self._servicer_context.auth_context() - - def set_compression(self, compression): - return self._servicer_context.set_compression(compression) - - async def send_initial_metadata(self, *args, **kwargs): - return await self._servicer_context.send_initial_metadata(*args, **kwargs) - - def set_trailing_metadata(self, *args, **kwargs): - return self._servicer_context.set_trailing_metadata(*args, **kwargs) - - def trailing_metadata(self): - return self._servicer_context.trailing_metadata() - - async def abort(self, code, details = "", trailing_metadata = tuple()): - self._code = code - self._details = details - self._active_span.set_attribute( + super().__init__(servicer_context) + self._self_active_span = active_span + self._self_code = grpc.StatusCode.OK + self._self_details = None + + async def abort(self, code, details="", trailing_metadata=tuple()): + self._self_code = code + self._self_details = details + self._self_active_span.set_attribute( SpanAttributes.RPC_GRPC_STATUS_CODE, code.value[0] ) - self._active_span.set_status( + self._self_active_span.set_status( Status( status_code=StatusCode.ERROR, description=f"{code}:{details}", ) ) - return await self._servicer_context.abort(code, details, trailing_metadata) - - def code(self): - if not hasattr(self._servicer_context, "code"): - raise RuntimeError( - "code() is not supported with the installed version of grpcio" - ) - return self._servicer_context.code() - - def details(self): - if not hasattr(self._servicer_context, "details"): - raise RuntimeError( - "details() is not supported with the installed version of " - "grpcio" - ) - return self._servicer_context.details() + return await self.__wrapped__.abort(code, details, trailing_metadata) def set_code(self, code): - self._code = code - # use details if we already have it, otherwise the status description - details = self._details or code.value[1] - self._active_span.set_attribute( + self._self_code = code + details = self._self_details or code.value[1] + self._self_active_span.set_attribute( SpanAttributes.RPC_GRPC_STATUS_CODE, code.value[0] ) if code != grpc.StatusCode.OK: - self._active_span.set_status( + self._self_active_span.set_status( Status( status_code=StatusCode.ERROR, description=f"{code}:{details}", ) ) - return self._servicer_context.set_code(code) + return self.__wrapped__.set_code(code) def set_details(self, details): - self._details = details - if self._code != grpc.StatusCode.OK: - self._active_span.set_status( + self._self_details = details + if self._self_code != grpc.StatusCode.OK: + self._self_active_span.set_status( Status( status_code=StatusCode.ERROR, - description=f"{self._code}:{details}", + description=f"{self._self_code}:{details}", ) ) - return self._servicer_context.set_details(details) + return self.__wrapped__.set_details(details) class OpenTelemetryAioServerInterceptor( From cce98362c4f1ee7c9c92a4cda200c806e2c2f0cf Mon Sep 17 00:00:00 2001 From: Sean Kenny Date: Tue, 21 Nov 2023 11:25:41 +0000 Subject: [PATCH 5/5] Address linting issues --- .../instrumentation/grpc/_aio_server.py | 9 ++++----- .../tests/test_aio_server_interceptor.py | 16 ++++++++++------ 2 files changed, 14 insertions(+), 11 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_aio_server.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_aio_server.py index fe4355d19e..4d43049c78 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_aio_server.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_aio_server.py @@ -16,14 +16,13 @@ import grpc.aio import wrapt -from ._server import ( - OpenTelemetryServerInterceptor, - _wrap_rpc_behavior, -) - from opentelemetry.semconv.trace import SpanAttributes from opentelemetry.trace.status import Status, StatusCode +from ._server import OpenTelemetryServerInterceptor, _wrap_rpc_behavior + + +# pylint:disable=abstract-method class _OpenTelemetryAioServicerContext(wrapt.ObjectProxy): def __init__(self, servicer_context, active_span): super().__init__(servicer_context) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_server_interceptor.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_server_interceptor.py index 09bab13e3b..242295c08c 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_server_interceptor.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_server_interceptor.py @@ -520,7 +520,9 @@ async def request(channel): with testcase.assertRaises(grpc.RpcError) as cm: await channel.unary_unary(rpc_call)(msg) - self.assertEqual(cm.exception.code(), grpc.StatusCode.FAILED_PRECONDITION) + self.assertEqual( + cm.exception.code(), grpc.StatusCode.FAILED_PRECONDITION + ) self.assertEqual(cm.exception.details(), failure_message) await run_with_test_server(request, servicer=AbortServicer()) @@ -567,11 +569,11 @@ async def test_abort_with_trailing_metadata(self): class AbortServicer(GRPCTestServerServicer): # pylint:disable=C0103 async def SimpleMethod(self, request, context): - metadata = ( - ("meta", "data"), - ) + metadata = (("meta", "data"),) await context.abort( - grpc.StatusCode.FAILED_PRECONDITION, failure_message, trailing_metadata=metadata + grpc.StatusCode.FAILED_PRECONDITION, + failure_message, + trailing_metadata=metadata, ) testcase = self @@ -583,7 +585,9 @@ async def request(channel): with testcase.assertRaises(grpc.RpcError) as cm: await channel.unary_unary(rpc_call)(msg) - self.assertEqual(cm.exception.code(), grpc.StatusCode.FAILED_PRECONDITION) + self.assertEqual( + cm.exception.code(), grpc.StatusCode.FAILED_PRECONDITION + ) self.assertEqual(cm.exception.details(), failure_message) await run_with_test_server(request, servicer=AbortServicer())