Skip to content

Commit 43a39ec

Browse files
committed
clientv3: PS: Replace balancer with upstream grpc solution
Addresses comments from: #12671 (review)
1 parent 8d1a8ce commit 43a39ec

File tree

11 files changed

+214
-73
lines changed

11 files changed

+214
-73
lines changed

CHANGELOG-3.5.md

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ The minimum recommended etcd versions to run in **production** are 3.2.28+, 3.3.
99
<hr>
1010

1111

12-
## v3.5.0 (2020 TBD)
12+
## v3.5.0 (2021 TBD)
1313

1414
See [code changes](https://github.com/etcd-io/etcd/compare/v3.4.0...v3.5.0) and [v3.5 upgrade guide](https://github.com/etcd-io/etcd/blob/master/Documentation/upgrades/upgrade_3_5.md) for any breaking changes.
1515

@@ -63,6 +63,10 @@ See [code changes](https://github.com/etcd-io/etcd/compare/v3.4.0...v3.5.0) and
6363
- Changed `pkg/flags` function signature to [support structured logger](https://github.com/etcd-io/etcd/pull/11616).
6464
- Previously, `SetFlagsFromEnv(prefix string, fs *flag.FlagSet) error`, now `SetFlagsFromEnv(lg *zap.Logger, prefix string, fs *flag.FlagSet) error`.
6565
- Previously, `SetPflagsFromEnv(prefix string, fs *pflag.FlagSet) error`, now `SetPflagsFromEnv(lg *zap.Logger, prefix string, fs *pflag.FlagSet) error`.
66+
- ClientV3 supports [grpc resolver API](https://github.com/etcd-io/etcd/blob/master/client/v3/naming/resolver/resolver.go).
67+
- Endpoints can be managed using [endpoints.Manager](https://github.com/etcd-io/etcd/blob/master/client/v3/naming/endpoints/endpoints.go)
68+
- Previously supported [GRPCResolver was decomissioned](https://github.com/etcd-io/etcd/pull/12675). Use [resolver](https://github.com/etcd-io/etcd/blob/master/client/v3/naming/resolver/resolver.go) instead.
69+
6670

6771
### `etcdctl`
6872

@@ -174,6 +178,8 @@ Note that any `etcd_debugging_*` metrics are experimental and subject to change.
174178
- Make sure [save snapshot downloads checksum for integrity checks](https://github.com/etcd-io/etcd/pull/11896).
175179
- Fix [auth token invalid after watch reconnects](https://github.com/etcd-io/etcd/pull/12264). Get AuthToken automatically when clientConn is ready.
176180
- Improve [clientv3:get AuthToken gracefully without extra connection](https://github.com/etcd-io/etcd/pull/12165).
181+
- Changed [clientv3 dialing code]() to use grpc resolver API instead of custom balancer.
182+
- Endpoints self identify now as `etcd-endpoints://{id}/#initially={list of endpoints}` e.g. `etcd-endpoints://0xc0009d8540/#initially=[localhost:2079]`
177183

178184
### Package `lease`
179185

Makefile

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,14 @@ test:
151151
$(TEST_OPTS) ./test.sh 2>&1 | tee test-$(TEST_SUFFIX).log
152152
! egrep "(--- FAIL:|DATA RACE|panic: test timed out|appears to have leaked)" -B50 -A10 test-$(TEST_SUFFIX).log
153153

154+
test-small:
155+
$(info log-file: test-$(TEST_SUFFIX).log)
156+
PASSES="fmt build unit" ./test.sh 2<&1 | tee test-$(TEST_SUFFIX).log
157+
158+
test-full:
159+
$(info log-file: test-$(TEST_SUFFIX).log)
160+
PASSES="fmt build unit integration functional e2e grpcproxy" ./test.sh 2<&1 | tee test-$(TEST_SUFFIX).log
161+
154162
docker-test:
155163
$(info GO_VERSION: $(GO_VERSION))
156164
$(info ETCD_VERSION: $(ETCD_VERSION))

client/v3/client.go

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -206,12 +206,10 @@ func (c *Client) dialSetupOpts(creds grpccredentials.TransportCredentials, dopts
206206
} else {
207207
opts = append(opts, grpc.WithInsecure())
208208
}
209-
grpc.WithDisableRetry()
210209

211210
// Interceptor retry and backoff.
212-
// TODO: Replace all of clientv3/retry.go with interceptor based retry, or with
213-
// https://github.com/grpc/proposal/blob/master/A6-client-retries.md#retry-policy
214-
// once it is available.
211+
// TODO: Replace all of clientv3/retry.go with RetryPolicy:
212+
// https://github.com/grpc/grpc-proto/blob/cdd9ed5c3d3f87aef62f373b93361cf7bddc620d/grpc/service_config/service_config.proto#L130
215213
rrBackoff := withBackoff(c.roundRobinQuorumBackoff(defaultBackoffWaitBetween, defaultBackoffJitterFraction))
216214
opts = append(opts,
217215
// Disable stream retry by default since go-grpc-middleware/retry does not support client streams.
@@ -252,8 +250,8 @@ func (c *Client) getToken(ctx context.Context) error {
252250

253251
// dialWithBalancer dials the client's current load balanced resolver group. The scheme of the host
254252
// of the provided endpoint determines the scheme used for all endpoints of the client connection.
255-
func (c *Client) dialWithBalancer(ep string, dopts ...grpc.DialOption) (*grpc.ClientConn, error) {
256-
creds := c.credentialsForEndpoint(ep)
253+
func (c *Client) dialWithBalancer(dopts ...grpc.DialOption) (*grpc.ClientConn, error) {
254+
creds := c.credentialsForEndpoint(c.Endpoints()[0])
257255
opts := append(dopts, grpc.WithResolvers(c.resolver))
258256
return c.dial(creds, opts...)
259257
}
@@ -278,21 +276,28 @@ func (c *Client) dial(creds grpccredentials.TransportCredentials, dopts ...grpc.
278276
defer cancel() // TODO: Is this right for cases where grpc.WithBlock() is not set on the dial options?
279277
}
280278

281-
conn, err := grpc.DialContext(dctx, c.resolver.Scheme()+":///", opts...)
279+
initialEndpoints := strings.Join(c.cfg.Endpoints, ";")
280+
target := fmt.Sprintf("%s://%p/#initially=[%s]", resolver.Schema, c, initialEndpoints)
281+
conn, err := grpc.DialContext(dctx, target, opts...)
282282
if err != nil {
283283
return nil, err
284284
}
285285
return conn, nil
286286
}
287287

288288
func (c *Client) credentialsForEndpoint(ep string) grpccredentials.TransportCredentials {
289-
if c.creds != nil {
289+
switch endpoint.RequiresCredentials(ep) {
290+
case endpoint.CREDS_DROP:
291+
return nil
292+
case endpoint.CREDS_OPTIONAL:
290293
return c.creds
291-
}
292-
if endpoint.RequiresCredentials(ep) {
294+
case endpoint.CREDS_REQUIRE:
295+
if c.creds != nil {
296+
return c.creds
297+
}
293298
return credentials.NewBundle(credentials.Config{}).TransportCredentials()
294299
}
295-
return nil
300+
return c.creds
296301
}
297302

298303
func newClient(cfg *Config) (*Client, error) {
@@ -360,18 +365,15 @@ func newClient(cfg *Config) (*Client, error) {
360365
client.cancel()
361366
return nil, fmt.Errorf("at least one Endpoint is required in client config")
362367
}
363-
dialEndpoint := cfg.Endpoints[0]
364-
365368
// Use a provided endpoint target so that for https:// without any tls config given, then
366369
// grpc will assume the certificate server name is the endpoint host.
367-
conn, err := client.dialWithBalancer(dialEndpoint)
370+
conn, err := client.dialWithBalancer()
368371
if err != nil {
369372
client.cancel()
370373
client.resolver.Close()
374+
// TODO: Error like `fmt.Errorf(dialing [%s] failed: %v, strings.Join(cfg.Endpoints, ";"), err)` would help with debugging a lot.
371375
return nil, err
372376
}
373-
// TODO: With the old grpc balancer interface, we waited until the dial timeout
374-
// for the balancer to be ready. Is there an equivalent wait we should do with the new grpc balancer interface?
375377
client.conn = conn
376378

377379
client.Cluster = NewCluster(client)
@@ -390,6 +392,7 @@ func newClient(cfg *Config) (*Client, error) {
390392
if err != nil {
391393
client.Close()
392394
cancel()
395+
//TODO: Consider fmt.Errorf("communicating with [%s] failed: %v", strings.Join(cfg.Endpoints, ";"), err)
393396
return nil, err
394397
}
395398
cancel()

client/v3/client_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,8 @@ func TestDialCancel(t *testing.T) {
8282
func TestDialTimeout(t *testing.T) {
8383
defer testutil.AfterTest(t)
8484

85+
wantError := context.DeadlineExceeded
86+
8587
// grpc.WithBlock to block until connection up or timeout
8688
testCfgs := []Config{
8789
{
@@ -121,8 +123,8 @@ func TestDialTimeout(t *testing.T) {
121123
case <-time.After(5 * time.Second):
122124
t.Errorf("#%d: failed to timeout dial on time", i)
123125
case err := <-donec:
124-
if err != context.DeadlineExceeded {
125-
t.Errorf("#%d: unexpected error %v, want %v", i, err, context.DeadlineExceeded)
126+
if err.Error() != wantError.Error() {
127+
t.Errorf("#%d: unexpected error '%v', want '%v'", i, err, wantError)
126128
}
127129
}
128130
}

client/v3/internal/endpoint/endpoint.go

Lines changed: 94 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -15,45 +15,115 @@
1515
package endpoint
1616

1717
import (
18+
"net"
1819
"net/url"
19-
"regexp"
20+
"path"
21+
"strings"
2022
)
2123

22-
var (
23-
STRIP_PORT_REGEXP = regexp.MustCompile("(.*):([0-9]+)")
24+
type CredsRequirement int
25+
26+
const (
27+
// CREDS_REQUIRE - Credentials/certificate required for thi type of connection.
28+
CREDS_REQUIRE CredsRequirement = iota
29+
// CREDS_DROP - Credentials/certificate not needed and should get ignored.
30+
CREDS_DROP
31+
// CREDS_OPTIONAL - Credentials/certificate might be used if supplied
32+
CREDS_OPTIONAL
2433
)
2534

26-
func stripPort(ep string) string {
27-
return STRIP_PORT_REGEXP.ReplaceAllString(ep, "$1")
35+
func extractHostFromHostPort(ep string) string {
36+
host, _, err := net.SplitHostPort(ep)
37+
if err != nil {
38+
return ep
39+
}
40+
return host
2841
}
2942

30-
func translateEndpoint(ep string) (addr string, serverName string, requireCreds bool) {
31-
url, err := url.Parse(ep)
32-
if err != nil {
33-
return ep, stripPort(ep), false
43+
func extractHostFromPath(pathStr string) string {
44+
return extractHostFromHostPort(path.Base(pathStr))
45+
}
46+
47+
//split2 returns the values from strings.SplitN(s, sep, 2).
48+
//If sep is not found, it returns ("", "", false) instead.
49+
func split2(s, sep string) (string, string, bool) {
50+
spl := strings.SplitN(s, sep, 2)
51+
if len(spl) < 2 {
52+
return "", "", false
3453
}
35-
switch url.Scheme {
36-
case "http", "https":
37-
return url.Host, url.Hostname(), url.Scheme == "https"
38-
case "unix", "unixs":
39-
requireCreds = url.Scheme == "unixs"
40-
if url.Opaque != "" {
41-
return "unix:" + url.Opaque, stripPort(url.Opaque), requireCreds
42-
} else if url.Path != "" {
43-
return "unix://" + url.Host + url.Path, url.Host + url.Path, requireCreds
44-
} else {
45-
return "unix:" + url.Host, url.Hostname(), requireCreds
46-
}
54+
return spl[0], spl[1], true
55+
}
56+
57+
func schemeToCredsRequirement(schema string) CredsRequirement {
58+
switch schema {
59+
case "https":
60+
return CREDS_REQUIRE
61+
case "unixs":
62+
return CREDS_REQUIRE
63+
case "http":
64+
return CREDS_DROP
65+
case "unix":
66+
// Preserving previous behavior from:
67+
// https://github.com/etcd-io/etcd/blob/dae29bb719dd69dc119146fc297a0628fcc1ccf8/client/v3/client.go#L212
68+
// that likely was a bug due to missing 'fallthrough'.
69+
// At the same time it seems legit to let the users decide whether they
70+
// want credential control or not (and 'unixs' schema is not a standard thing).
71+
return CREDS_OPTIONAL
4772
case "":
48-
return url.Host + url.Path, url.Host + url.Path, false
73+
return CREDS_OPTIONAL
4974
default:
50-
return ep, stripPort(ep), false
75+
return CREDS_OPTIONAL
76+
}
77+
}
78+
79+
// This function translates endpoints names supported by etcd server into
80+
// endpoints as supported by grpc with additional information
81+
// (server_name for cert validation, requireCreds - whether certs are needed).
82+
// The main differences:
83+
// - etcd supports unixs & https names as opposed to unix & http to
84+
// distinguish need to configure certificates.
85+
// - etcd support http(s) names as opposed to tcp supported by grpc/dial method.
86+
// - etcd supports unix(s)://local-file naming schema
87+
// (as opposed to unit:local-file canonical name).
88+
// - Within the unix(s) schemas, the last segment (filename) without 'port' (content after colon)
89+
// is considered serverName - to allow local testing of cert-protected communication.
90+
// See more:
91+
// - https://github.com/grpc/grpc-go/blob/26c143bd5f59344a4b8a1e491e0f5e18aa97abc7/internal/grpcutil/target.go#L47
92+
// - https://golang.org/pkg/net/#Dial
93+
// - https://github.com/grpc/grpc/blob/master/doc/naming.md
94+
func translateEndpoint(ep string) (addr string, serverName string, requireCreds CredsRequirement) {
95+
if strings.HasPrefix(ep, "unix:") || strings.HasPrefix(ep, "unixs:") {
96+
if strings.HasPrefix(ep, "unix:///") || strings.HasPrefix(ep, "unixs:///") {
97+
// absolute path case
98+
schema, absolutePath, _ := split2(ep, "://")
99+
return "unix://" + absolutePath, extractHostFromPath(absolutePath), schemeToCredsRequirement(schema)
100+
}
101+
if strings.HasPrefix(ep, "unix://") || strings.HasPrefix(ep, "unixs://") {
102+
// legacy etcd local path
103+
schema, localPath, _ := split2(ep, "://")
104+
return "unix:" + localPath, extractHostFromPath(localPath), schemeToCredsRequirement(schema)
105+
}
106+
schema, localPath, _ := split2(ep, ":")
107+
return "unix:" + localPath, extractHostFromPath(localPath), schemeToCredsRequirement(schema)
108+
}
109+
110+
if strings.Contains(ep, "://") {
111+
url, err := url.Parse(ep)
112+
if err != nil {
113+
return ep, extractHostFromHostPort(ep), CREDS_OPTIONAL
114+
}
115+
if url.Scheme == "http" || url.Scheme == "https" {
116+
return url.Host, url.Hostname(), schemeToCredsRequirement(url.Scheme)
117+
}
118+
return ep, url.Hostname(), schemeToCredsRequirement(url.Scheme)
51119
}
120+
// Handles plain addresses like 10.0.0.44:437
121+
return ep, extractHostFromHostPort(ep), CREDS_OPTIONAL
52122
}
53123

54124
// RequiresCredentials returns whether given endpoint requires
55125
// credentials/certificates for connection.
56-
func RequiresCredentials(ep string) bool {
126+
func RequiresCredentials(ep string) CredsRequirement {
57127
_, _, requireCreds := translateEndpoint(ep)
58128
return requireCreds
59129
}

0 commit comments

Comments
 (0)