Skip to content
Merged
Changes from 1 commit
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
298edba
handled cardinality violation and added tests
Pranjali-2501 Jun 6, 2025
4c3a18a
Merge branch 'master' into server_streaming
Pranjali-2501 Jun 6, 2025
fd5d614
remove vet errors
Pranjali-2501 Jun 6, 2025
7e4206c
modified tests
Pranjali-2501 Jun 6, 2025
324566b
modified tests
Pranjali-2501 Jun 6, 2025
9fbf6b7
Merge branch 'master' into server_streaming
Pranjali-2501 Jun 19, 2025
9b9cddf
change server.recvmsg() to catch cardinality violation
Pranjali-2501 Jun 23, 2025
63565f2
Merge branch 'master' into server_streaming
Pranjali-2501 Jun 23, 2025
67549e7
replace srv with _
Pranjali-2501 Jun 23, 2025
6030b90
resolving comments
Pranjali-2501 Jul 1, 2025
dff08b3
resolving vets
Pranjali-2501 Jul 1, 2025
f4f1b61
addressed comments
Pranjali-2501 Jul 8, 2025
83e8664
resolving vets
Pranjali-2501 Jul 8, 2025
5acd9ab
resolving vets
Pranjali-2501 Jul 8, 2025
8f4f39b
minor change
Pranjali-2501 Jul 15, 2025
6de922d
minor change
Pranjali-2501 Jul 15, 2025
5f6c715
minor change
Pranjali-2501 Jul 22, 2025
990efe1
resolving comments
Pranjali-2501 Jul 24, 2025
7f6d31f
resolving nits
Pranjali-2501 Jul 24, 2025
41d8328
vet changes
Pranjali-2501 Jul 24, 2025
59ad122
added comment
Pranjali-2501 Jul 24, 2025
0f5248a
added comment
Pranjali-2501 Jul 25, 2025
1ff8868
resolving comments
Pranjali-2501 Jul 28, 2025
68fd0f8
update comment
Pranjali-2501 Jul 28, 2025
19e9e71
nits
Pranjali-2501 Jul 29, 2025
0d30b18
modifying test
Pranjali-2501 Jul 30, 2025
83fd598
resolving vet
Pranjali-2501 Jul 30, 2025
efa8e5a
remove comment
Pranjali-2501 Jul 30, 2025
29f6657
modifying tests
Pranjali-2501 Jul 31, 2025
183b1da
resolving comments
Pranjali-2501 Aug 3, 2025
749a52c
resolving comments
Pranjali-2501 Aug 4, 2025
1b1800d
resolving comments
Pranjali-2501 Aug 4, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 10 additions & 13 deletions test/end2end_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3741,13 +3741,9 @@ func (s) TestClientStreaming_ReturnErrorAfterSendAndClose(t *testing.T) {
}
}

// Tests the behavior for server-side streaming RPCs when client calls SendMsg twice.
// The first client.SendMsg() sends EOF along with the message. When client calls a
// second SendMsg, it triggers a RST_STREAM which cancels the stream context on the
// server. There would be a race, the RST_STREAM could cause the server’s first RecvMsg
// to fail, even if the request message was already delivered. By synchronizing, we
// ensure that the server has read the first message before the client triggers RST_STREAM
// and validating expected error codes.
// Tests the behavior for server-side streaming when client calls SendMsg twice.
// Second call to SendMsg should fail with Internal error and result in closing
// the connection with a RST_STREAM.
func (s) TestServerStreaming_ClientCallSendMsgTwice(t *testing.T) {
// To ensure server.recvMsg() is successfully completed.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this actually needed? Why? If the client were to attempt to send its second message immediately, and that caused a RST_STREAM, that would all happen after the server processed the headers, and the handler should get invoked regardless, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The first call to server.RecvMsg is made by the generated handler before the test's server handler is invoked. If this call fails, the server handler in the test will not be executed at all.

To handle this, I synchronise the calls in the following way:

  • Client.SendMsg()
  • Server.RecvMsg()
  • Client.SendMsg() – Expected to fail with an Internal error and close connection with RST_STREAM.
  • Server.SendMsg() – Expected to fail with a Canceled error

An alternative approach would be to use a fake server implementation, which would allow us to directly observe and assert the error returned from server.RecvMsg().

Copy link
Member

@dfawley dfawley Jul 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does that help? The first Server.RecvMsg should be hung waiting for the client to send an END_STREAM, shouldn't it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it will not hung.
For non-client-streaming RPCs, initial call to client.SendMsg will close the client stream with EOF.

grpc-go/stream.go

Line 1102 in ac13172

if err := a.transportStream.Write(hdr, payld, &transport.WriteOptions{Last: !cs.desc.ClientStreams}); err != nil {

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yes, since the client knows it's not client-streaming, too, then it will do END_STREAM along with the first call to SendMsg.

So then the reason for the synchronization here is that, if the client does a RST_STREAM, that might propagate to the server and cause its first recv to fail -- even though the first request message was received perfectly normally -- because we cancel the stream's context upon RST_STREAM receipt?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, correct.

So, should we keep the test as it is or change it with alternative approach?

An alternative approach would be to use a fake server implementation, which would allow us to directly observe and assert the error returned from server.RecvMsg().

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, but can you explain more like how I did, to say why the synchronization is needed? Because there would be a race between client cancellation and the server reading the first request message.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please leave the function-level comment to just a description of the test, and put the explanation of the various steps in intra-function comments.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I was hoping for was more like this:

Suggested change
// To ensure server.recvMsg() is successfully completed.
// To ensure server.recvMsg() is successfully completed. Otherwise, if the client application
// attempts to send a second request message, that will trigger a RST_STREAM from the
// client due to the application violating the RPC's protocol. The RST_STREAM will prevent
// the method handler from being called.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

recvDoneOnServer := make(chan struct{})
Expand All @@ -3756,8 +3752,9 @@ func (s) TestServerStreaming_ClientCallSendMsgTwice(t *testing.T) {
handlerDone := make(chan struct{})
ss := stubserver.StubServer{
StreamingOutputCallF: func(_ *testpb.StreamingOutputCallRequest, stream testgrpc.TestService_StreamingOutputCallServer) error {
// The initial call to recvMsg is made by the generated code.
// The initial call to recvMsg is made by the generated code. Signal test when done.
close(recvDoneOnServer)
// Block until the stream’s context is done (cancelled by client).
<-stream.Context().Done()
if err := stream.SendMsg(&testpb.StreamingOutputCallRequest{}); status.Code(err) != codes.Canceled {
t.Errorf("stream.SendMsg() = %v, want error %v", err, codes.Canceled)
Expand Down Expand Up @@ -3790,18 +3787,20 @@ func (s) TestServerStreaming_ClientCallSendMsgTwice(t *testing.T) {
t.Fatalf("cc.NewStream() failed unexpectedly: %v", err)
}

// First SendMsg sends EOF along with the message.
if err := stream.SendMsg(&testpb.Empty{}); err != nil {
t.Errorf("stream.SendMsg() = %v, want <nil>", err)
}

// To ensure that the server has read the first message before client triggers RST_STREAM.
<-recvDoneOnServer
if err := stream.SendMsg(&testpb.Empty{}); status.Code(err) != codes.Internal {
t.Errorf("stream.SendMsg() = %v, want error %v", err, codes.Internal)
}
<-handlerDone
}

// TODO : https://github.com/grpc/grpc-go/issues/7286 - Add tests to check
// server-side behavior for Unary RPC.
// TODO(i/7286) : Add tests to check server-side behavior for Unary RPC.
// Tests the behavior for unary RPC when client calls SendMsg twice. Second call
// to SendMsg should fail with Internal error.
func (s) TestUnaryRPC_ClientCallSendMsgTwice(t *testing.T) {
Expand Down Expand Up @@ -3947,9 +3946,7 @@ func (s) TestServerStreaming_ServerRecvZeroRequests(t *testing.T) {

// Tests the behavior of client for server-side streaming RPC when client sends zero request messages.
func (s) TestServerStreaming_ClientSendsZeroRequests(t *testing.T) {
// TODO : https://github.com/grpc/grpc-go/issues/7286 - remove `t.Skip()`
// after this is fixed.
t.Skip()
t.Skip("blocked on i/7286")
// The initial call to recvMsg made by the generated code, will return the error.
ss := stubserver.StubServer{}
if err := ss.Start(nil); err != nil {
Expand Down