Skip to content

Commit 7db77d7

Browse files
committed
fix(lambda): intercept raw JSON before deserialization to preserve trace context
lambda_runtime deserializes the event into E before wrap_handler sees it. If the user's type omits fields like headers, re-serializing with to_value loses them permanently. For API Gateway triggers this means trace context in event.headers is silently dropped. wrap_handler now receives LambdaEvent<Value> so lambda_runtime preserves the full event. The span inferrer runs on the complete JSON, then the library deserializes Value to E before calling the user handler. The E: Serialize bound is replaced with E: DeserializeOwned, which is the constraint lambda_runtime itself requires.
1 parent 8339227 commit 7db77d7

File tree

2 files changed

+37
-27
lines changed

2 files changed

+37
-27
lines changed

integrations/aws/datadog-lambda-rs/src/invocation.rs

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use lambda_runtime::LambdaEvent;
88
use opentelemetry::trace::{FutureExt, SpanKind, TraceContextExt, Tracer, TracerProvider as _};
99
use opentelemetry::{Context, KeyValue};
1010
use opentelemetry_sdk::trace::{SdkTracer, SdkTracerProvider};
11-
use serde::Serialize;
11+
use serde_json::Value;
1212
use std::future::Future;
1313
use std::sync::atomic::{AtomicBool, Ordering};
1414

@@ -40,20 +40,16 @@ pub(crate) struct InvocationScope {
4040
pub invocation_cx: Context,
4141
}
4242

43-
pub(crate) fn start_invocation<E>(
44-
event: &LambdaEvent<E>,
43+
pub(crate) fn start_invocation(
44+
event: &LambdaEvent<Value>,
4545
provider: &SdkTracerProvider,
4646
_config: &Config,
47-
) -> InvocationScope
48-
where
49-
E: Serialize,
50-
{
47+
) -> InvocationScope {
5148
let is_cold = detect_cold_start();
5249
let tracer = provider.tracer("datadog-lambda-rs");
53-
let payload_value = serde_json::to_value(&event.payload).unwrap_or_default();
5450

5551
let inferrer = SpanInferrer::new(&tracer);
56-
let result = inferrer.infer(&payload_value);
52+
let result = inferrer.infer(&event.payload);
5753

5854
let lambda_span = LambdaSpan {
5955
function_name: event.context.env_config.function_name.clone(),

integrations/aws/datadog-lambda-rs/src/lib.rs

Lines changed: 32 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,9 @@ use lambda_runtime::{
1010
LambdaEvent,
1111
};
1212
use opentelemetry_sdk::trace::SdkTracerProvider;
13-
use serde::Serialize;
13+
use serde::de::DeserializeOwned;
14+
use serde_json::Value;
15+
use std::marker::PhantomData;
1416
use std::sync::Arc;
1517
use std::task::{Context as TaskContext, Poll};
1618

@@ -25,11 +27,11 @@ pub struct Config {}
2527
pub fn wrap_handler<F, Fut, E, R>(
2628
handler: F,
2729
provider: SdkTracerProvider,
28-
) -> impl Fn(LambdaEvent<E>) -> BoxFuture<Result<R, HandlerError>>
30+
) -> impl Fn(LambdaEvent<Value>) -> BoxFuture<Result<R, HandlerError>>
2931
where
3032
F: Fn(LambdaEvent<E>) -> Fut + Send + Sync + 'static,
3133
Fut: std::future::Future<Output = Result<R, HandlerError>> + Send + 'static,
32-
E: Serialize + Send + Sync + 'static,
34+
E: DeserializeOwned + Send + Sync + 'static,
3335
R: Send + 'static,
3436
{
3537
wrap_handler_with_config(handler, provider, Config::default())
@@ -40,21 +42,26 @@ pub fn wrap_handler_with_config<F, Fut, E, R>(
4042
handler: F,
4143
provider: SdkTracerProvider,
4244
config: Config,
43-
) -> impl Fn(LambdaEvent<E>) -> BoxFuture<Result<R, HandlerError>>
45+
) -> impl Fn(LambdaEvent<Value>) -> BoxFuture<Result<R, HandlerError>>
4446
where
4547
F: Fn(LambdaEvent<E>) -> Fut + Send + Sync + 'static,
4648
Fut: std::future::Future<Output = Result<R, HandlerError>> + Send + 'static,
47-
E: Serialize + Send + Sync + 'static,
49+
E: DeserializeOwned + Send + Sync + 'static,
4850
R: Send + 'static,
4951
{
5052
let handler = Arc::new(handler);
5153
let config = Arc::new(config);
52-
move |event: LambdaEvent<E>| {
54+
move |event: LambdaEvent<Value>| {
5355
let handler = Arc::clone(&handler);
5456
let provider = provider.clone();
5557
let config = Arc::clone(&config);
5658
let scope = start_invocation(&event, &provider, &config);
57-
let fut = handler(event);
59+
let typed_payload = match serde_json::from_value::<E>(event.payload) {
60+
Ok(p) => p,
61+
Err(e) => return Box::pin(async move { Err(e.into()) }),
62+
};
63+
let typed_event = LambdaEvent::new(typed_payload, event.context);
64+
let fut = handler(typed_event);
5865
Box::pin(async move { run_in_invocation_scope(scope, provider, fut).await })
5966
}
6067
}
@@ -89,15 +96,17 @@ impl<S> Layer<S> for DatadogLambdaLayer {
8996
inner,
9097
provider: self.provider.clone(),
9198
config: Arc::clone(&self.config),
99+
_phantom: PhantomData,
92100
}
93101
}
94102
}
95103

96104
#[doc(hidden)]
97-
pub struct DatadogLambdaService<S> {
105+
pub struct DatadogLambdaService<S, E = Value> {
98106
inner: S,
99107
provider: SdkTracerProvider,
100108
config: Arc<Config>,
109+
_phantom: PhantomData<fn(LambdaEvent<E>)>,
101110
}
102111

103112
#[cfg(test)]
@@ -119,27 +128,32 @@ mod tests {
119128
}
120129
}
121130

122-
impl<S, E, R, Err> Service<LambdaEvent<E>> for DatadogLambdaService<S>
131+
impl<S, E> Service<LambdaEvent<Value>> for DatadogLambdaService<S, E>
123132
where
124-
S: Service<LambdaEvent<E>, Response = R, Error = Err> + Send + 'static,
133+
S: Service<LambdaEvent<E>> + Send + 'static,
125134
S::Future: Send + 'static,
126-
E: Serialize + Send + Sync + 'static,
127-
R: Send + 'static,
128-
Err: std::fmt::Display + Send + 'static,
135+
S::Response: Send + 'static,
136+
S::Error: From<serde_json::Error> + std::fmt::Display + Send + 'static,
137+
E: DeserializeOwned + Send + Sync + 'static,
129138
{
130-
type Response = R;
131-
type Error = Err;
132-
type Future = BoxFuture<Result<R, Err>>;
139+
type Response = S::Response;
140+
type Error = S::Error;
141+
type Future = BoxFuture<Result<S::Response, S::Error>>;
133142

134143
fn poll_ready(&mut self, cx: &mut TaskContext<'_>) -> Poll<Result<(), Self::Error>> {
135144
self.inner.poll_ready(cx)
136145
}
137146

138-
fn call(&mut self, event: LambdaEvent<E>) -> Self::Future {
147+
fn call(&mut self, event: LambdaEvent<Value>) -> Self::Future {
139148
let provider = self.provider.clone();
140149
let config = Arc::clone(&self.config);
141150
let scope = start_invocation(&event, &provider, &config);
142-
let fut = self.inner.call(event);
151+
let typed_payload = match serde_json::from_value::<E>(event.payload) {
152+
Ok(p) => p,
153+
Err(e) => return Box::pin(async move { Err(e.into()) }),
154+
};
155+
let typed_event = LambdaEvent::new(typed_payload, event.context);
156+
let fut = self.inner.call(typed_event);
143157
Box::pin(async move { run_in_invocation_scope(scope, provider, fut).await })
144158
}
145159
}

0 commit comments

Comments
 (0)