-
Notifications
You must be signed in to change notification settings - Fork 4.6k
grpc: Fix cardinality violations in non-server streaming RPCs #8278
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
f06ff56
1546427
ca4860a
c3ca09b
bbd8366
1a23bb7
f37ea78
9436092
11da0dc
7808f8a
ad4ce90
a21fcc4
8107444
6deac72
643998d
f85707b
4e5075b
b2ab207
85ab6ed
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -542,8 +542,6 @@ type clientStream struct { | |
|
|
||
| sentLast bool // sent an end stream | ||
|
|
||
| recvFirstMsg bool // received first msg from server | ||
|
|
||
| methodConfig *MethodConfig | ||
|
|
||
| ctx context.Context // the application's context, wrapped by stats/tracing | ||
|
|
@@ -1137,17 +1135,14 @@ func (a *csAttempt) recvMsg(m any, payInfo *payloadInfo) (err error) { | |
| return statusErr | ||
| } | ||
| // received no msg and status ok for non-server streaming rpcs. | ||
| if !cs.desc.ServerStreams && !cs.recvFirstMsg { | ||
| if !cs.desc.ServerStreams { | ||
arjan-bal marked this conversation as resolved.
Show resolved
Hide resolved
arjan-bal marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| return status.Errorf(codes.Internal, "client streaming cardinality violation") | ||
|
||
| } | ||
| return io.EOF // indicates successful end of stream. | ||
| } | ||
|
|
||
| return toRPCErr(err) | ||
| } | ||
| if !cs.desc.ServerStreams { | ||
| cs.recvFirstMsg = true | ||
| } | ||
| if a.trInfo != nil { | ||
| a.mu.Lock() | ||
| if a.trInfo.tr != nil { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3736,6 +3736,87 @@ func (s) TestClientStreaming_ReturnErrorAfterSendAndClose(t *testing.T) { | |
| } | ||
| } | ||
|
|
||
| // Tests that a client receives a cardinality violation error for unary | ||
| // RPCs if the server doesn't send a message before returning status OK. | ||
| func (s) TestUnaryRPC_ServerSendsOnlyTrailersWithOK(t *testing.T) { | ||
| lis, err := testutils.LocalTCPListener() | ||
| if err != nil { | ||
| t.Fatal(err) | ||
| } | ||
| defer lis.Close() | ||
|
|
||
| ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) | ||
| defer cancel() | ||
| cc, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) | ||
| if err != nil { | ||
| t.Fatalf("grpc.NewClient(%s) = %v", lis.Addr().String(), err) | ||
dfawley marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } | ||
| defer cc.Close() | ||
|
|
||
| go func() { | ||
| conn, err := lis.Accept() | ||
| if err != nil { | ||
| t.Errorf("lis.Accept() = %v", err) | ||
dfawley marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| return | ||
| } | ||
| defer conn.Close() | ||
| framer := http2.NewFramer(conn, conn) | ||
|
||
|
|
||
| if _, err := io.ReadFull(conn, make([]byte, len(clientPreface))); err != nil { | ||
| t.Errorf("Error reading client preface: %v", err) | ||
dfawley marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| return | ||
| } | ||
| if err := framer.WriteSettings(); err != nil { | ||
| t.Errorf("Error writing server settings: %v", err) | ||
| return | ||
| } | ||
| if err := framer.WriteSettingsAck(); err != nil { | ||
| t.Errorf("Error writing settings ack: %v", err) | ||
| return | ||
| } | ||
|
|
||
| for ctx.Err() == nil { | ||
| frame, err := framer.ReadFrame() | ||
| if err != nil { | ||
| t.Errorf("Error reading frame: %v", err) | ||
| return | ||
| } | ||
|
|
||
| switch frame := frame.(type) { | ||
| case *http2.HeadersFrame: | ||
| if frame.Header().StreamID != 1 { | ||
| t.Errorf("Expected stream ID 1, got %d", frame.Header().StreamID) | ||
| return | ||
| } | ||
|
|
||
| var buf bytes.Buffer | ||
| enc := hpack.NewEncoder(&buf) | ||
| _ = enc.WriteField(hpack.HeaderField{Name: ":status", Value: "200"}) | ||
| _ = enc.WriteField(hpack.HeaderField{Name: "content-type", Value: "application/grpc"}) | ||
| _ = enc.WriteField(hpack.HeaderField{Name: "grpc-status", Value: "0"}) | ||
dfawley marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| if err := framer.WriteHeaders(http2.HeadersFrameParam{ | ||
| StreamID: 1, | ||
| BlockFragment: buf.Bytes(), | ||
| EndHeaders: true, | ||
| EndStream: true, | ||
| }); err != nil { | ||
| t.Errorf("Error while writing headers: %v", err) | ||
| return | ||
| } | ||
| time.Sleep(50 * time.Millisecond) | ||
| default: | ||
| t.Logf("Server received frame: %v", frame) | ||
| } | ||
| } | ||
| }() | ||
|
|
||
| client := testgrpc.NewTestServiceClient(cc) | ||
| if _, err = client.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.Internal { | ||
| t.Errorf("stream.RecvMsg() = %v, want error %v", status.Code(err), codes.Internal) | ||
| } | ||
| } | ||
|
|
||
| func (s) TestExceedMaxStreamsLimit(t *testing.T) { | ||
| for _, e := range listTestEnv() { | ||
| testExceedMaxStreamsLimit(t, e) | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.