Skip to content

Commit 549682b

Browse files
committed
chore(http/prom): upgrade to hyper 1.x
Signed-off-by: katelyn martin <[email protected]>
1 parent c4813dd commit 549682b

File tree

3 files changed

+34
-54
lines changed

3 files changed

+34
-54
lines changed

linkerd/http/prom/src/body_data/body.rs

Lines changed: 14 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
use super::metrics::BodyDataMetrics;
2-
use http::HeaderMap;
3-
use http_body::SizeHint;
2+
use http_body::{Frame, SizeHint};
43
use pin_project::pin_project;
54
use std::{
65
pin::Pin,
@@ -35,34 +34,28 @@ where
3534
type Error = B::Error;
3635

3736
/// Attempt to pull out the next data buffer of this stream.
38-
fn poll_data(
37+
fn poll_frame(
3938
self: Pin<&mut Self>,
4039
cx: &mut Context<'_>,
41-
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
40+
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
4241
let this = self.project();
4342
let inner = this.inner;
4443
let BodyDataMetrics { frame_size } = this.metrics;
4544

46-
let data = std::task::ready!(inner.poll_data(cx));
45+
let frame = std::task::ready!(inner.poll_frame(cx));
4746

48-
if let Some(Ok(data)) = data.as_ref() {
49-
// We've polled and yielded a new chunk! Increment our telemetry.
50-
//
51-
// NB: We're careful to call `remaining()` rather than `chunk()`, which
52-
// "can return a shorter slice (this allows non-continuous internal representation)."
53-
let bytes = bytes::Buf::remaining(data);
54-
frame_size.observe(linkerd_metrics::to_f64(bytes as u64));
47+
if let Some(Ok(frame)) = &frame {
48+
if let Some(data) = frame.data_ref() {
49+
// We've polled and yielded a new chunk! Increment our telemetry.
50+
//
51+
// NB: We're careful to call `remaining()` rather than `chunk()`, which
52+
// "can return a shorter slice (this allows non-continuous internal representation)."
53+
let bytes = bytes::Buf::remaining(data);
54+
frame_size.observe(linkerd_metrics::to_f64(bytes as u64));
55+
}
5556
}
5657

57-
Poll::Ready(data)
58-
}
59-
60-
#[inline]
61-
fn poll_trailers(
62-
self: Pin<&mut Self>,
63-
cx: &mut Context<'_>,
64-
) -> Poll<Result<Option<HeaderMap>, Self::Error>> {
65-
self.project().inner.poll_trailers(cx)
58+
Poll::Ready(frame)
6659
}
6760

6861
#[inline]

linkerd/http/prom/src/record_response.rs

Lines changed: 16 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -254,29 +254,27 @@ where
254254
type Data = <BoxBody as http_body::Body>::Data;
255255
type Error = Error;
256256

257-
fn poll_data(
257+
fn poll_frame(
258258
self: Pin<&mut Self>,
259259
cx: &mut Context<'_>,
260-
) -> Poll<Option<Result<Self::Data, Error>>> {
260+
) -> Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> {
261261
let mut this = self.project();
262-
let res =
263-
futures::ready!(this.inner.as_mut().poll_data(cx)).map(|res| res.map_err(Into::into));
264-
if let Some(Err(error)) = res.as_ref() {
265-
end_stream(this.state, Err(error));
266-
} else if (*this.inner).is_end_stream() {
267-
end_stream(this.state, Ok(None));
262+
263+
// Poll the inner body for the next frame.
264+
let poll = this.inner.as_mut().poll_frame(cx);
265+
let frame = futures::ready!(poll).map(|res| res.map_err(Error::from));
266+
267+
match &frame {
268+
Some(Ok(frame)) => {
269+
if let trls @ Some(_) = frame.trailers_ref() {
270+
end_stream(this.state, Ok(trls));
271+
}
272+
}
273+
Some(Err(error)) => end_stream(this.state, Err(error)),
274+
None => end_stream(this.state, Ok(None)),
268275
}
269-
Poll::Ready(res)
270-
}
271276

272-
fn poll_trailers(
273-
self: Pin<&mut Self>,
274-
cx: &mut Context<'_>,
275-
) -> Poll<Result<Option<http::HeaderMap>, Error>> {
276-
let this = self.project();
277-
let res = futures::ready!(this.inner.poll_trailers(cx)).map_err(Into::into);
278-
end_stream(this.state, res.as_ref().map(Option::as_ref));
279-
Poll::Ready(res)
277+
Poll::Ready(frame)
280278
}
281279

282280
fn is_end_stream(&self) -> bool {

linkerd/http/prom/src/record_response/response.rs

Lines changed: 4 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use http_body::Frame;
12
use linkerd_error::Error;
23
use linkerd_http_box::BoxBody;
34
use linkerd_metrics::prom::Counter;
@@ -150,12 +151,12 @@ where
150151
type Data = B::Data;
151152
type Error = B::Error;
152153

153-
fn poll_data(
154+
fn poll_frame(
154155
self: Pin<&mut Self>,
155156
cx: &mut Context<'_>,
156-
) -> Poll<Option<Result<Self::Data, B::Error>>> {
157+
) -> Poll<Option<Result<Frame<Self::Data>, B::Error>>> {
157158
let mut this = self.project();
158-
let res = futures::ready!(this.inner.as_mut().poll_data(cx));
159+
let res = futures::ready!(this.inner.as_mut().poll_frame(cx));
159160
if (*this.inner).is_end_stream() {
160161
if let Some(tx) = this.flushed.take() {
161162
let _ = tx.send(time::Instant::now());
@@ -164,18 +165,6 @@ where
164165
Poll::Ready(res)
165166
}
166167

167-
fn poll_trailers(
168-
self: Pin<&mut Self>,
169-
cx: &mut Context<'_>,
170-
) -> Poll<Result<Option<http::HeaderMap>, B::Error>> {
171-
let this = self.project();
172-
let res = futures::ready!(this.inner.poll_trailers(cx));
173-
if let Some(tx) = this.flushed.take() {
174-
let _ = tx.send(time::Instant::now());
175-
}
176-
Poll::Ready(res)
177-
}
178-
179168
fn is_end_stream(&self) -> bool {
180169
self.inner.is_end_stream()
181170
}

0 commit comments

Comments
 (0)