-
Notifications
You must be signed in to change notification settings - Fork 4.6k
grpc: Fix cardinality violations in non-server streaming RPCs #8278
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
f06ff56
1546427
ca4860a
c3ca09b
bbd8366
1a23bb7
f37ea78
9436092
11da0dc
7808f8a
ad4ce90
a21fcc4
8107444
6deac72
643998d
f85707b
4e5075b
b2ab207
85ab6ed
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3589,9 +3589,6 @@ func testClientStreamingError(t *testing.T, e env) { | |
| // Tests that a client receives a cardinality violation error for client-streaming | ||
| // RPCs if the server doesn't send a message before returning status OK. | ||
| func (s) TestClientStreamingCardinalityViolation_ServerHandlerMissingSendAndClose(t *testing.T) { | ||
| // TODO : https://github.com/grpc/grpc-go/issues/8119 - remove `t.Skip()` | ||
| // after this is fixed. | ||
| t.Skip() | ||
| ss := &stubserver.StubServer{ | ||
| StreamingInputCallF: func(_ testgrpc.TestService_StreamingInputCallServer) error { | ||
| // Returning status OK without sending a response message.This is a | ||
|
|
@@ -3740,8 +3737,113 @@ func (s) TestClientStreaming_ReturnErrorAfterSendAndClose(t *testing.T) { | |
| } | ||
| } | ||
|
|
||
| // Tests that a client receives a cardinality violation error for unary | ||
| // RPCs if the server doesn't send a message before returning status OK. | ||
| func (s) TestUnaryRPC_ServerSendsOnlyTrailersWithOK(t *testing.T) { | ||
| lis, err := testutils.LocalTCPListener() | ||
| if err != nil { | ||
| t.Fatal(err) | ||
| } | ||
| defer lis.Close() | ||
|
|
||
| ss := grpc.UnknownServiceHandler(func(any, grpc.ServerStream) error { | ||
| return nil | ||
| }) | ||
|
|
||
| s := grpc.NewServer(ss) | ||
| go s.Serve(lis) | ||
| defer s.Stop() | ||
|
|
||
| ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) | ||
| defer cancel() | ||
| cc, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) | ||
| if err != nil { | ||
| t.Fatalf("grpc.NewClient(%q) failed unexpectedly: %v", lis.Addr(), err) | ||
| } | ||
| defer cc.Close() | ||
|
|
||
| client := testgrpc.NewTestServiceClient(cc) | ||
| if _, err = client.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.Internal { | ||
| t.Errorf("stream.RecvMsg() = %v, want error %v", status.Code(err), codes.Internal) | ||
| } | ||
| } | ||
|
|
||
| // Tests that client will receive cardinality violations when calling | ||
| // RecvMsg() multiple times for non-streaming response streams. | ||
| func (s) TestUnaryRPC_ClientCallRecvMsgTwice(t *testing.T) { | ||
| e := tcpTLSEnv | ||
| te := newTest(t, e) | ||
| defer te.tearDown() | ||
|
|
||
| te.startServer(&testServer{security: e.security}) | ||
|
|
||
| cc := te.clientConn() | ||
| ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) | ||
| defer cancel() | ||
|
|
||
| desc := &grpc.StreamDesc{ | ||
| StreamName: "UnaryCall", | ||
| ServerStreams: false, | ||
| ClientStreams: false, | ||
| } | ||
| stream, err := cc.NewStream(ctx, desc, "/grpc.testing.TestService/UnaryCall") | ||
| if err != nil { | ||
| t.Fatalf("cc.NewStream() failed unexpectedly: %v", err) | ||
| } | ||
|
|
||
| if err := stream.SendMsg(&testpb.SimpleRequest{}); err != nil { | ||
| t.Fatalf("stream.SendMsg(_) = %v, want <nil>", err) | ||
| } | ||
|
|
||
| resp := &testpb.SimpleResponse{} | ||
| if err := stream.RecvMsg(resp); err != nil { | ||
| t.Fatalf("stream.RecvMsg() = %v , want <nil>", err) | ||
| } | ||
|
|
||
| if err = stream.RecvMsg(resp); status.Code(err) != codes.Internal { | ||
| t.Errorf("stream.RecvMsg() = %v, want error %v", status.Code(err), codes.Internal) | ||
| } | ||
| } | ||
|
|
||
| // Tests that client will receive cardinality violations when calling | ||
| // RecvMsg() multiple times for non-streaming response streams. | ||
| func (s) TestClientStreaming_ClientCallRecvMsgTwice(t *testing.T) { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In what way is this different from the previous test case? The server side is different, but actually does the same thing.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Both tests does the same thing, only difference is
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This PR is actually a fix for for "non-server streaming" RPCs instead of "client streaming RPCs". I've updated the PR title to reflect the same. So the value of I noticed that none of the tests send 2 response messages from the server. This shows up as missing coverage reported by Codecov around line 1486. Can you please have the server send back two response messages here? You should use the
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Discussed offline: line 1486 is in addrConnStream, not csAttempt. I had initially thought the missing coverage was in csAttempt. addrConnStream is largely similar to csAttempt but is only used for establishing health streams. There is a plan to unify it with csAttempt in the future. Adding test coverage for addrConnStream may be difficult and is not necessary at this point. |
||
| ss := stubserver.StubServer{ | ||
| StreamingInputCallF: func(stream testgrpc.TestService_StreamingInputCallServer) error { | ||
| if err := stream.SendAndClose(&testpb.StreamingInputCallResponse{}); err != nil { | ||
| t.Errorf("stream.SendAndClose(_) = %v, want <nil>", err) | ||
| } | ||
| return nil | ||
| }, | ||
| } | ||
| if err := ss.Start(nil); err != nil { | ||
| t.Fatal("Error starting server:", err) | ||
| } | ||
| defer ss.Stop() | ||
|
|
||
| ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) | ||
| defer cancel() | ||
| stream, err := ss.Client.StreamingInputCall(ctx) | ||
| if err != nil { | ||
| t.Fatalf(".StreamingInputCall(_) = _, %v, want <nil>", err) | ||
| } | ||
| if err := stream.Send(&testpb.StreamingInputCallRequest{}); err != nil { | ||
| t.Fatalf("stream.Send(_) = %v, want <nil>", err) | ||
| } | ||
| if err := stream.CloseSend(); err != nil { | ||
| t.Fatalf("stream.CloseSend() = %v, want <nil>", err) | ||
| } | ||
| resp := new(testpb.StreamingInputCallResponse) | ||
| if err := stream.RecvMsg(resp); err != nil { | ||
| t.Fatalf("stream.RecvMsg() = %v , want <nil>", err) | ||
| } | ||
| if err = stream.RecvMsg(resp); status.Code(err) != codes.Internal { | ||
| t.Errorf("stream.RecvMsg() = %v, want error %v", status.Code(err), codes.Internal) | ||
| } | ||
| } | ||
|
|
||
| // Tests that a client receives a cardinality violation error for client-streaming | ||
| // RPCs if the server call SendMsg multiple times. | ||
| // RPCs if the server call SendMsg() multiple times. | ||
| func (s) TestClientStreaming_ServerHandlerSendMsgAfterSendMsg(t *testing.T) { | ||
| ss := stubserver.StubServer{ | ||
| StreamingInputCallF: func(stream testgrpc.TestService_StreamingInputCallServer) error { | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.