Skip to content

Commit ac95dd7

Browse files
committed
*: fix GracefulStop issue when using cmux for TLS
The gRPC server supports to use GracefulStop to drain all the inflight RPCs, including streaming RPCs. When we use non-cmux mode to start gRPC server (non-TLS or TLS+gRPC-only), we always invoke GracefulStop to drain requests. For cmux mode (gRPC.ServeHTTP), since the connection is maintained by http server, gRPC server is unable to send GOAWAY control frame to client. So, it's always force close all the connections and doesn't drain requests by default. In gRPC v1.61.0 version, it introduces new experimental feature `WaitForHandlers` to block gRPC.Stop() until all the RPCs finish. This patch is to use `WaitForHandlers` for cmux mode's graceful shutdown. This patch also introduces `v3rpcBeforeSnapshot` failpoint. That's used to verify cmux mode's graceful shutdown behaviour. For TestAuthGracefulDisable (tests/common) case, increased timeout from 10s to 15s because we try to graceful shutdown after connection closed and it takes more time than before. Signed-off-by: Wei Fu <[email protected]>
1 parent a7f5d4b commit ac95dd7

File tree

5 files changed

+202
-3
lines changed

5 files changed

+202
-3
lines changed

server/embed/serve.go

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ func (sctx *serveCtx) serve(
159159
defer func(gs *grpc.Server) {
160160
if err != nil {
161161
sctx.lg.Warn("stopping insecure grpc server due to error", zap.Error(err))
162-
gs.Stop()
162+
gs.GracefulStop()
163163
sctx.lg.Warn("stopped insecure grpc server due to error", zap.Error(err))
164164
}
165165
}(gs)
@@ -202,16 +202,39 @@ func (sctx *serveCtx) serve(
202202
}
203203

204204
if grpcEnabled {
205+
// TODO(XXX):
206+
//
207+
// WaitForHandlers is experimental function to drain
208+
// all the inflight handlers, including stream RPCs.
209+
// For cmux mode, we can't call GracefulStop because of
210+
// [1].
211+
//
212+
// Actually, we do call http.Shutdown first in stopServers.
213+
// We still need to drain all the inflight handlers to
214+
// make sure that there is no leaky goroutines to
215+
// use closed backend and panic. Add WaitForHandlers
216+
// to force gs.Stop to drain. We can remove this option
217+
// when we remove cmux [2].
218+
//
219+
// [1]: https://github.com/grpc/grpc-go/issues/1384#issuecomment-317124531
220+
// [2]: https://github.com/etcd-io/etcd/issues/15402
221+
gopts = append(gopts, grpc.WaitForHandlers(true))
222+
205223
gs = v3rpc.Server(s, tlscfg, nil, gopts...)
206224
v3electionpb.RegisterElectionServer(gs, servElection)
207225
v3lockpb.RegisterLockServer(gs, servLock)
208226
if sctx.serviceRegister != nil {
209227
sctx.serviceRegister(gs)
210228
}
229+
211230
defer func(gs *grpc.Server) {
212231
if err != nil {
213232
sctx.lg.Warn("stopping secure grpc server due to error", zap.Error(err))
214-
gs.Stop()
233+
if httpEnabled {
234+
gs.Stop()
235+
} else {
236+
gs.GracefulStop()
237+
}
215238
sctx.lg.Warn("stopped secure grpc server due to error", zap.Error(err))
216239
}
217240
}(gs)

server/etcdserver/api/v3rpc/maintenance.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ func (ms *maintenanceServer) Defragment(ctx context.Context, sr *pb.DefragmentRe
103103
const snapshotSendBufferSize = 32 * 1024
104104

105105
func (ms *maintenanceServer) Snapshot(sr *pb.SnapshotRequest, srv pb.Maintenance_SnapshotServer) error {
106+
// gofail: var v3rpcBeforeSnapshot struct{}
106107
ver := schema.ReadStorageVersion(ms.bg.Backend().ReadTx())
107108
storageVersion := ""
108109
if ver != nil {

tests/common/auth_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ func TestAuthGracefulDisable(t *testing.T) {
118118

119119
watchCh := rootAuthClient.Watch(wCtx, "key", config.WatchOptions{Revision: 1})
120120
wantedLen := 1
121-
watchTimeout := 10 * time.Second
121+
watchTimeout := 15 * time.Second
122122
wanted := []testutils.KV{{Key: "key", Val: "value"}}
123123
kvs, err := testutils.KeyValuesFromWatchChan(watchCh, wantedLen, watchTimeout)
124124
require.NoErrorf(t, err, "failed to get key-values from watch channel %s", err)
Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
// Copyright 2024 The etcd Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
//go:build !cluster_proxy
16+
17+
package e2e
18+
19+
import (
20+
"context"
21+
"io"
22+
"testing"
23+
"time"
24+
25+
"github.com/stretchr/testify/assert"
26+
"github.com/stretchr/testify/require"
27+
28+
clientv3 "go.etcd.io/etcd/client/v3"
29+
"go.etcd.io/etcd/tests/v3/framework/e2e"
30+
)
31+
32+
func TestShouldDrainRequestDuringShutdown(t *testing.T) {
33+
e2e.BeforeTest(t)
34+
35+
// defaultBuildSnapshotConn is to setup a database with 10 MiB and a
36+
// inflight snapshot streaming RPC.
37+
defaultBuildSnapshotConn := func(ctx context.Context, t *testing.T, cli *clientv3.Client) io.ReadCloser {
38+
t.Helper()
39+
40+
require.NoError(t, fillEtcdWithData(ctx, cli, 10*1024*1024))
41+
42+
rc, err := cli.Snapshot(ctx)
43+
require.NoError(t, err)
44+
t.Cleanup(func() { rc.Close() })
45+
46+
// make sure that streaming RPC is in progress
47+
buf := make([]byte, 1)
48+
n, err := rc.Read(buf)
49+
assert.NoError(t, err)
50+
assert.Equal(t, 1, n)
51+
52+
return rc
53+
}
54+
55+
// defaultVerifySnapshotConn is to make sure that connection is still
56+
// working even if the server is in shutdown state.
57+
defaultVerifySnapshotConn := func(t *testing.T, rc io.ReadCloser) {
58+
t.Helper()
59+
60+
_, err := io.Copy(io.Discard, rc)
61+
require.NoError(t, err)
62+
}
63+
64+
tcs := []struct {
65+
name string
66+
options []e2e.EPClusterOption
67+
cliOpt e2e.ClientConfig
68+
69+
buildSnapshotConn func(ctx context.Context, t *testing.T, cli *clientv3.Client) io.ReadCloser
70+
verifySnapshotConn func(t *testing.T, rc io.ReadCloser)
71+
}{
72+
{
73+
name: "no-tls",
74+
options: []e2e.EPClusterOption{
75+
e2e.WithClusterSize(1),
76+
e2e.WithClientAutoTLS(false),
77+
},
78+
cliOpt: e2e.ClientConfig{ConnectionType: e2e.ClientNonTLS},
79+
80+
buildSnapshotConn: defaultBuildSnapshotConn,
81+
verifySnapshotConn: defaultVerifySnapshotConn,
82+
},
83+
{
84+
name: "auto-tls_http_separated",
85+
options: []e2e.EPClusterOption{
86+
e2e.WithClusterSize(1),
87+
e2e.WithClientAutoTLS(true),
88+
e2e.WithClientConnType(e2e.ClientTLS),
89+
e2e.WithClientHTTPSeparate(true),
90+
},
91+
cliOpt: e2e.ClientConfig{
92+
ConnectionType: e2e.ClientTLS,
93+
AutoTLS: true,
94+
},
95+
buildSnapshotConn: defaultBuildSnapshotConn,
96+
verifySnapshotConn: defaultVerifySnapshotConn,
97+
},
98+
{
99+
name: "auto-tls_cmux",
100+
options: []e2e.EPClusterOption{
101+
e2e.WithClusterSize(1),
102+
e2e.WithClientAutoTLS(true),
103+
e2e.WithClientConnType(e2e.ClientTLS),
104+
e2e.WithClientHTTPSeparate(false),
105+
e2e.WithGoFailEnabled(true),
106+
// NOTE: Using failpoint is to make sure that
107+
// the RPC handler won't exit because of closed
108+
// connection.
109+
e2e.WithEnvVars(map[string]string{
110+
"GOFAIL_FAILPOINTS": `v3rpcBeforeSnapshot=sleep("8s")`,
111+
}),
112+
},
113+
cliOpt: e2e.ClientConfig{
114+
ConnectionType: e2e.ClientTLS,
115+
AutoTLS: true,
116+
},
117+
buildSnapshotConn: func(ctx context.Context, t *testing.T, cli *clientv3.Client) io.ReadCloser {
118+
t.Helper()
119+
120+
rc, err := cli.Snapshot(ctx)
121+
require.NoError(t, err)
122+
t.Cleanup(func() { rc.Close() })
123+
124+
// make sure server receives the RPC.
125+
time.Sleep(2 * time.Second)
126+
return rc
127+
},
128+
verifySnapshotConn: func(t *testing.T, rc io.ReadCloser) {
129+
t.Helper()
130+
131+
_, err := io.Copy(io.Discard, rc)
132+
require.Error(t, err) // connection will be closed forcely
133+
},
134+
},
135+
}
136+
137+
for _, tc := range tcs {
138+
t.Run(tc.name, func(t *testing.T) {
139+
ctx := context.Background()
140+
141+
epc, err := e2e.NewEtcdProcessCluster(ctx, t, tc.options...)
142+
require.NoError(t, err)
143+
t.Cleanup(func() { epc.Close() })
144+
145+
grpcEndpoint := epc.Procs[0].EndpointsGRPC()[0]
146+
if tc.cliOpt.ConnectionType == e2e.ClientTLS {
147+
grpcEndpoint = e2e.ToTLS(grpcEndpoint)
148+
}
149+
150+
cli := newClient(t, []string{grpcEndpoint}, tc.cliOpt)
151+
152+
rc := tc.buildSnapshotConn(ctx, t, cli)
153+
154+
errCh := make(chan error, 1)
155+
go func() {
156+
defer close(errCh)
157+
errCh <- epc.Stop()
158+
}()
159+
160+
select {
161+
case <-time.After(4 * time.Second):
162+
case err := <-errCh:
163+
t.Fatalf("should drain request but got error from cluster stop: %v", err)
164+
}
165+
166+
tc.verifySnapshotConn(t, rc)
167+
168+
require.NoError(t, <-errCh)
169+
})
170+
}
171+
}

tests/framework/e2e/cluster.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,10 @@ func WithClientAutoTLS(isClientAutoTLS bool) EPClusterOption {
259259
return func(c *EtcdProcessClusterConfig) { c.Client.AutoTLS = isClientAutoTLS }
260260
}
261261

262+
func WithClientHTTPSeparate(separate bool) EPClusterOption {
263+
return func(c *EtcdProcessClusterConfig) { c.ClientHTTPSeparate = separate }
264+
}
265+
262266
func WithClientRevokeCerts(isClientCRL bool) EPClusterOption {
263267
return func(c *EtcdProcessClusterConfig) { c.Client.RevokeCerts = isClientCRL }
264268
}

0 commit comments

Comments
 (0)