feat(grpc): add server-side stream validators#2602
Conversation
507787e to
e2f4bd0
Compare
c5c0e67 to
038f365
Compare
e2f4bd0 to
90ae261
Compare
038f365 to
89b8525
Compare
89b8525 to
355c9cd
Compare
dfawley
left a comment
There was a problem hiding this comment.
I'm trying to figure out how exactly this is going to be used.
Most likely it will be a server-side interceptor? In which case, when an error occurs, I would think we'd want to terminate the RPC with a status that indicates the cause of the problem.
355c9cd to
de7ee29
Compare
Yes on the server side interceptor. RecvStreamVal.. is somewhat straight forward. This'll be used closer to the protobuf API to ensure that we transition our generic stream request receiver to the protobuf API stream object. SendStreamVal.. is closer to the transport to validate the outgoing data after all other interceptors. I somewhat agree with the need for status. We need to update the Now that we are returning trailers instead of sending them, we need more details from SendStream to propagate to trailers . Does it sound okay to add |
I was expecting we'd just need an interceptor that can terminate the RPC underneath the handler when either the SendStream or RecvStream has any problem. I.e. we can't do stream validation via something that simply wraps each stream independently. |
I mean we are definitely gonna need an interceptor for this. But I was planning to separate it into two responsibilities , one for lifecycle for stream and another for lifecycle of handler. Something along the lines of But we might need some |
Sure but they will need to be linked, so it would make more sense to me to make them all together.
So you're thinking two different interceptors might be needed?
Yes we should make sure the RPC error includes an explanation of the protocol violation. I expect the interceptor would basically select on the handler future and a receive from a oneshot channel that the validator writes to, and then terminates with whichever status comes first, dropping the other future. |
|
So, we discussed offline. It'd make sense for both stream and handler validation to be a part of the same PR. I initially thought we'd need different handlers, but based on my progress so far, I believe a single one suffices.
Since , we don't expose error status from sendstream or recvstream right now, we cannot propagate anything beyond an arbitrary "internal error: stream broken" . Would you want to tweak our stream APIs to be able to propagate errors Err(Status) or Err(String) instead of Err() |
de7ee29 to
a2a44a2
Compare
|
The validation implementation has been updated to be interceptor based and the stream validation utilities are now private. There's still a question about if we should introduce "status" or "error messages" into the sendstream and recvstream to be propagated to trailer or will an arbitrary "internal error" suffice. Also, this now is stacked on top of #2634 |
Enforces strict gRPC stream state transitions and sequence validation on the server side to prevent malformed responses and improper polling. Key Changes: - **Send Stream Validation (`ServerSendStreamValidator`)**: Tracks state transitions (`Init` -> `HeadersSent` -> `MessagesSent` -> `Done`) to guarantee headers are sent exactly once and always precede messages, rejecting invalid operations. - **Receive Stream Validation (`ServerRecvStreamValidator`)**: Prevents erroneous polling by returning stable errors once the receive stream reaches EOF or a terminal error state. - **Preemptive Error Interception (`StreamValidationInterceptor`)**: Wraps streams with channel-aware validators to immediately preempt handler execution via `tokio::select!` upon detecting any protocol violation or underlying transport error, automatically returning an `Internal` status.
a2a44a2 to
e930953
Compare
56323f1 to
51903a9
Compare
| } | ||
|
|
||
| impl<S: SendStream> ServerSendStreamValidator<S> { | ||
| pub fn new(inner: S) -> Self { |
There was a problem hiding this comment.
I think this needs to not be public since the type isn't public? (And same for the other types below)
|
|
||
| #[derive(Debug, Clone, Copy, PartialEq, Eq)] | ||
| enum RecvStreamState { | ||
| Init, |
There was a problem hiding this comment.
Nit: Init isn't quite right here, because you stay in init even after getting some messages. Maybe Active? Or you could replace this enum with a bool, maybe.
|
|
||
| struct ChannelAwareSendStreamValidator<S> { | ||
| inner: ServerSendStreamValidator<S>, | ||
| error_tx: tokio::sync::mpsc::Sender<()>, |
There was a problem hiding this comment.
Would an Option<Oneshot> be preferable here? It might make the access simpler, but I'm not sure.
| } | ||
| } | ||
|
|
||
| struct ChannelAwareRecvStreamValidator<R> { |
There was a problem hiding this comment.
Maybe we don't need this? It seems OK if the application keeps receiving after getting an error. We want our wrapper to avoid propagating that read down further, since it would be illegal-per-the-API to do so, but I don't think that needs to error the whole call?
| tokio::select! { | ||
| res = next.handle(headers, options, &mut wrapped_tx, wrapped_rx) => { | ||
| if error_rx.try_recv().is_ok() { | ||
| Trailers::new(Err(StatusError::new(StatusCodeError::Internal, "Stream validation error"))) |
There was a problem hiding this comment.
On the one hand, it's unfortunate that we aren't being any more specific than this. "Server attempted to send multiple headers / messages before headers / etc"
On the other hand, maybe it's for the best, since the client maybe shouldn't get details about server bugs?
Introduces
ServerSendStreamValidatorandServerRecvStreamValidatorto enforce strict gRPC semantics on server streams. These are intended to wrap raw transport streams before passing them to application-level handlers, ensuring that user-provided service logic cannot violate protocol rules.ServerSendStreamValidatorensures proper response sequencing (Headers -> Messages -> Trailers) and prevents invalid state transitions, while fully supporting trailers-only responses.ServerRecvStreamValidatorsafely manages terminal states, ensuring that any subsequent polls after stream completion consistently return an error to prevent undefined behavior.This is recreation of #2595 to allow using a stacked PR workflow