Skip to content

Commit a64d933

Browse files
grpc: Fix cardinality violations in non-server streaming RPCs (#8278)
1 parent d2e8366 commit a64d933

File tree

2 files changed

+116
-6
lines changed

2 files changed

+116
-6
lines changed

stream.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1138,6 +1138,10 @@ func (a *csAttempt) recvMsg(m any, payInfo *payloadInfo) (err error) {
11381138
if statusErr := a.transportStream.Status().Err(); statusErr != nil {
11391139
return statusErr
11401140
}
1141+
// Received no msg and status OK for non-server streaming rpcs.
1142+
if !cs.desc.ServerStreams {
1143+
return status.Error(codes.Internal, "cardinality violation: received no response message from non-streaming RPC")
1144+
}
11411145
return io.EOF // indicates successful end of stream.
11421146
}
11431147

@@ -1171,7 +1175,7 @@ func (a *csAttempt) recvMsg(m any, payInfo *payloadInfo) (err error) {
11711175
} else if err != nil {
11721176
return toRPCErr(err)
11731177
}
1174-
return status.Errorf(codes.Internal, "cardinality violation: expected <EOF> for non server-streaming RPCs, but received another message")
1178+
return status.Error(codes.Internal, "cardinality violation: expected <EOF> for non server-streaming RPCs, but received another message")
11751179
}
11761180

11771181
func (a *csAttempt) finish(err error) {
@@ -1478,6 +1482,10 @@ func (as *addrConnStream) RecvMsg(m any) (err error) {
14781482
if statusErr := as.transportStream.Status().Err(); statusErr != nil {
14791483
return statusErr
14801484
}
1485+
// Received no msg and status OK for non-server streaming rpcs.
1486+
if !as.desc.ServerStreams {
1487+
return status.Error(codes.Internal, "cardinality violation: received no response message from non-streaming RPC")
1488+
}
14811489
return io.EOF // indicates successful end of stream.
14821490
}
14831491
return toRPCErr(err)
@@ -1495,7 +1503,7 @@ func (as *addrConnStream) RecvMsg(m any) (err error) {
14951503
} else if err != nil {
14961504
return toRPCErr(err)
14971505
}
1498-
return status.Errorf(codes.Internal, "cardinality violation: expected <EOF> for non server-streaming RPCs, but received another message")
1506+
return status.Error(codes.Internal, "cardinality violation: expected <EOF> for non server-streaming RPCs, but received another message")
14991507
}
15001508

15011509
func (as *addrConnStream) finish(err error) {

test/end2end_test.go

Lines changed: 106 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3589,9 +3589,6 @@ func testClientStreamingError(t *testing.T, e env) {
35893589
// Tests that a client receives a cardinality violation error for client-streaming
35903590
// RPCs if the server doesn't send a message before returning status OK.
35913591
func (s) TestClientStreamingCardinalityViolation_ServerHandlerMissingSendAndClose(t *testing.T) {
3592-
// TODO : https://github.com/grpc/grpc-go/issues/8119 - remove `t.Skip()`
3593-
// after this is fixed.
3594-
t.Skip()
35953592
ss := &stubserver.StubServer{
35963593
StreamingInputCallF: func(_ testgrpc.TestService_StreamingInputCallServer) error {
35973594
// Returning status OK without sending a response message.This is a
@@ -3740,8 +3737,113 @@ func (s) TestClientStreaming_ReturnErrorAfterSendAndClose(t *testing.T) {
37403737
}
37413738
}
37423739

3740+
// Tests that a client receives a cardinality violation error for unary
3741+
// RPCs if the server doesn't send a message before returning status OK.
3742+
func (s) TestUnaryRPC_ServerSendsOnlyTrailersWithOK(t *testing.T) {
3743+
lis, err := testutils.LocalTCPListener()
3744+
if err != nil {
3745+
t.Fatal(err)
3746+
}
3747+
defer lis.Close()
3748+
3749+
ss := grpc.UnknownServiceHandler(func(any, grpc.ServerStream) error {
3750+
return nil
3751+
})
3752+
3753+
s := grpc.NewServer(ss)
3754+
go s.Serve(lis)
3755+
defer s.Stop()
3756+
3757+
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
3758+
defer cancel()
3759+
cc, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
3760+
if err != nil {
3761+
t.Fatalf("grpc.NewClient(%q) failed unexpectedly: %v", lis.Addr(), err)
3762+
}
3763+
defer cc.Close()
3764+
3765+
client := testgrpc.NewTestServiceClient(cc)
3766+
if _, err = client.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.Internal {
3767+
t.Errorf("stream.RecvMsg() = %v, want error %v", status.Code(err), codes.Internal)
3768+
}
3769+
}
3770+
3771+
// Tests that client will receive cardinality violations when calling
3772+
// RecvMsg() multiple times for non-streaming response streams.
3773+
func (s) TestUnaryRPC_ClientCallRecvMsgTwice(t *testing.T) {
3774+
e := tcpTLSEnv
3775+
te := newTest(t, e)
3776+
defer te.tearDown()
3777+
3778+
te.startServer(&testServer{security: e.security})
3779+
3780+
cc := te.clientConn()
3781+
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
3782+
defer cancel()
3783+
3784+
desc := &grpc.StreamDesc{
3785+
StreamName: "UnaryCall",
3786+
ServerStreams: false,
3787+
ClientStreams: false,
3788+
}
3789+
stream, err := cc.NewStream(ctx, desc, "/grpc.testing.TestService/UnaryCall")
3790+
if err != nil {
3791+
t.Fatalf("cc.NewStream() failed unexpectedly: %v", err)
3792+
}
3793+
3794+
if err := stream.SendMsg(&testpb.SimpleRequest{}); err != nil {
3795+
t.Fatalf("stream.SendMsg(_) = %v, want <nil>", err)
3796+
}
3797+
3798+
resp := &testpb.SimpleResponse{}
3799+
if err := stream.RecvMsg(resp); err != nil {
3800+
t.Fatalf("stream.RecvMsg() = %v , want <nil>", err)
3801+
}
3802+
3803+
if err = stream.RecvMsg(resp); status.Code(err) != codes.Internal {
3804+
t.Errorf("stream.RecvMsg() = %v, want error %v", status.Code(err), codes.Internal)
3805+
}
3806+
}
3807+
3808+
// Tests that client will receive cardinality violations when calling
3809+
// RecvMsg() multiple times for non-streaming response streams.
3810+
func (s) TestClientStreaming_ClientCallRecvMsgTwice(t *testing.T) {
3811+
ss := stubserver.StubServer{
3812+
StreamingInputCallF: func(stream testgrpc.TestService_StreamingInputCallServer) error {
3813+
if err := stream.SendAndClose(&testpb.StreamingInputCallResponse{}); err != nil {
3814+
t.Errorf("stream.SendAndClose(_) = %v, want <nil>", err)
3815+
}
3816+
return nil
3817+
},
3818+
}
3819+
if err := ss.Start(nil); err != nil {
3820+
t.Fatal("Error starting server:", err)
3821+
}
3822+
defer ss.Stop()
3823+
3824+
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
3825+
defer cancel()
3826+
stream, err := ss.Client.StreamingInputCall(ctx)
3827+
if err != nil {
3828+
t.Fatalf(".StreamingInputCall(_) = _, %v, want <nil>", err)
3829+
}
3830+
if err := stream.Send(&testpb.StreamingInputCallRequest{}); err != nil {
3831+
t.Fatalf("stream.Send(_) = %v, want <nil>", err)
3832+
}
3833+
if err := stream.CloseSend(); err != nil {
3834+
t.Fatalf("stream.CloseSend() = %v, want <nil>", err)
3835+
}
3836+
resp := new(testpb.StreamingInputCallResponse)
3837+
if err := stream.RecvMsg(resp); err != nil {
3838+
t.Fatalf("stream.RecvMsg() = %v , want <nil>", err)
3839+
}
3840+
if err = stream.RecvMsg(resp); status.Code(err) != codes.Internal {
3841+
t.Errorf("stream.RecvMsg() = %v, want error %v", status.Code(err), codes.Internal)
3842+
}
3843+
}
3844+
37433845
// Tests that a client receives a cardinality violation error for client-streaming
3744-
// RPCs if the server call SendMsg multiple times.
3846+
// RPCs if the server call SendMsg() multiple times.
37453847
func (s) TestClientStreaming_ServerHandlerSendMsgAfterSendMsg(t *testing.T) {
37463848
ss := stubserver.StubServer{
37473849
StreamingInputCallF: func(stream testgrpc.TestService_StreamingInputCallServer) error {

0 commit comments

Comments
 (0)