Skip to content

Commit a73c573

Browse files
committed
[3.4] backport etcd-io#12706 clientv3: PS: Replace balancer with upstream grpc solution
Signed-off-by: Chao Chen <[email protected]>
1 parent b239cf6 commit a73c573

File tree

5 files changed

+184
-68
lines changed

5 files changed

+184
-68
lines changed

clientv3/client.go

Lines changed: 21 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -211,10 +211,8 @@ func (c *Client) dialSetupOpts(creds grpccredentials.TransportCredentials, dopts
211211
}
212212
grpc.WithDisableRetry()
213213

214-
// Interceptor retry and backoff.
215-
// TODO: Replace all of clientv3/retry.go with interceptor based retry, or with
216-
// https://github.com/grpc/proposal/blob/master/A6-client-retries.md#retry-policy
217-
// once it is available.
214+
// TODO: Replace all of clientv3/retry.go with RetryPolicy:
215+
// https://github.com/grpc/grpc-proto/blob/cdd9ed5c3d3f87aef62f373b93361cf7bddc620d/grpc/service_config/service_config.proto#L130
218216
rrBackoff := withBackoff(c.roundRobinQuorumBackoff(defaultBackoffWaitBetween, defaultBackoffJitterFraction))
219217
opts = append(opts,
220218
// Disable stream retry by default since go-grpc-middleware/retry does not support client streams.
@@ -255,8 +253,8 @@ func (c *Client) getToken(ctx context.Context) error {
255253

256254
// dialWithBalancer dials the client's current load balanced resolver group. The scheme of the host
257255
// of the provided endpoint determines the scheme used for all endpoints of the client connection.
258-
func (c *Client) dialWithBalancer(ep string, dopts ...grpc.DialOption) (*grpc.ClientConn, error) {
259-
creds := c.credentialsForEndpoint(ep)
256+
func (c *Client) dialWithBalancer(dopts ...grpc.DialOption) (*grpc.ClientConn, error) {
257+
creds := c.credentialsForEndpoint(c.Endpoints()[0])
260258
opts := append(dopts, grpc.WithResolvers(c.resolver))
261259
return c.dial(creds, opts...)
262260
}
@@ -281,21 +279,30 @@ func (c *Client) dial(creds grpccredentials.TransportCredentials, dopts ...grpc.
281279
defer cancel() // TODO: Is this right for cases where grpc.WithBlock() is not set on the dial options?
282280
}
283281

284-
conn, err := grpc.DialContext(dctx, c.resolver.Scheme()+":///", opts...)
282+
initialEndpoints := strings.Join(c.cfg.Endpoints, ";")
283+
target := fmt.Sprintf("%s://%p/#initially=[%s]", resolver.Schema, c, initialEndpoints)
284+
conn, err := grpc.DialContext(dctx, target, opts...)
285285
if err != nil {
286286
return nil, err
287287
}
288288
return conn, nil
289289
}
290290

291291
func (c *Client) credentialsForEndpoint(ep string) grpccredentials.TransportCredentials {
292-
if c.creds != nil {
292+
r := endpoint.RequiresCredentials(ep)
293+
switch r {
294+
case endpoint.CREDS_DROP:
295+
return nil
296+
case endpoint.CREDS_OPTIONAL:
293297
return c.creds
294-
}
295-
if endpoint.RequiresCredentials(ep) {
298+
case endpoint.CREDS_REQUIRE:
299+
if c.creds != nil {
300+
return c.creds
301+
}
296302
return credentials.NewBundle(credentials.Config{}).TransportCredentials()
303+
default:
304+
panic(fmt.Errorf("Unsupported CredsRequirement: %v", r))
297305
}
298-
return nil
299306
}
300307

301308
func newClient(cfg *Config) (*Client, error) {
@@ -362,18 +369,16 @@ func newClient(cfg *Config) (*Client, error) {
362369
if len(cfg.Endpoints) < 1 {
363370
return nil, fmt.Errorf("at least one Endpoint must is required in client config")
364371
}
365-
dialEndpoint := cfg.Endpoints[0]
366372

367373
// Use a provided endpoint target so that for https:// without any tls config given, then
368374
// grpc will assume the certificate server name is the endpoint host.
369-
conn, err := client.dialWithBalancer(dialEndpoint)
375+
conn, err := client.dialWithBalancer()
370376
if err != nil {
371377
client.cancel()
372378
client.resolver.Close()
379+
// TODO: Error like `fmt.Errorf(dialing [%s] failed: %v, strings.Join(cfg.Endpoints, ";"), err)` would help with debugging a lot.
373380
return nil, err
374381
}
375-
// TODO: With the old grpc balancer interface, we waited until the dial timeout
376-
// for the balancer to be ready. Is there an equivalent wait we should do with the new grpc balancer interface?
377382
client.conn = conn
378383

379384
client.Cluster = NewCluster(client)
@@ -392,6 +397,7 @@ func newClient(cfg *Config) (*Client, error) {
392397
if err != nil {
393398
client.Close()
394399
cancel()
400+
//TODO: Consider fmt.Errorf("communicating with [%s] failed: %v", strings.Join(cfg.Endpoints, ";"), err)
395401
return nil, err
396402
}
397403
cancel()

clientv3/client_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,8 @@ func TestDialCancel(t *testing.T) {
8383
func TestDialTimeout(t *testing.T) {
8484
defer testutil.AfterTest(t)
8585

86+
wantError := context.DeadlineExceeded
87+
8688
// grpc.WithBlock to block until connection up or timeout
8789
testCfgs := []Config{
8890
{
@@ -122,8 +124,8 @@ func TestDialTimeout(t *testing.T) {
122124
case <-time.After(5 * time.Second):
123125
t.Errorf("#%d: failed to timeout dial on time", i)
124126
case err := <-donec:
125-
if err != context.DeadlineExceeded {
126-
t.Errorf("#%d: unexpected error %v, want %v", i, err, context.DeadlineExceeded)
127+
if err.Error() != wantError.Error() {
128+
t.Errorf("#%d: unexpected error '%v', want '%v'", i, err, wantError)
127129
}
128130
}
129131
}

clientv3/internal/endpoint/endpoint.go

Lines changed: 94 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -15,45 +15,115 @@
1515
package endpoint
1616

1717
import (
18+
"fmt"
19+
"net"
1820
"net/url"
19-
"regexp"
21+
"path"
22+
"strings"
2023
)
2124

22-
var (
23-
STRIP_PORT_REGEXP = regexp.MustCompile("(.*):([0-9]+)")
25+
type CredsRequirement int
26+
27+
const (
28+
// CREDS_REQUIRE - Credentials/certificate required for thi type of connection.
29+
CREDS_REQUIRE CredsRequirement = iota
30+
// CREDS_DROP - Credentials/certificate not needed and should get ignored.
31+
CREDS_DROP
32+
// CREDS_OPTIONAL - Credentials/certificate might be used if supplied
33+
CREDS_OPTIONAL
2434
)
2535

26-
func stripPort(ep string) string {
27-
return STRIP_PORT_REGEXP.ReplaceAllString(ep, "$1")
36+
func extractHostFromHostPort(ep string) string {
37+
host, _, err := net.SplitHostPort(ep)
38+
if err != nil {
39+
return ep
40+
}
41+
return host
2842
}
2943

30-
func translateEndpoint(ep string) (addr string, serverName string, requireCreds bool) {
31-
url, err := url.Parse(ep)
32-
if err != nil {
33-
return ep, stripPort(ep), false
44+
func extractHostFromPath(pathStr string) string {
45+
return extractHostFromHostPort(path.Base(pathStr))
46+
}
47+
48+
// mustSplit2 returns the values from strings.SplitN(s, sep, 2).
49+
// If sep is not found, it returns ("", "", false) instead.
50+
func mustSplit2(s, sep string) (string, string) {
51+
spl := strings.SplitN(s, sep, 2)
52+
if len(spl) < 2 {
53+
panic(fmt.Errorf("Token '%v' expected to have separator sep: `%v`", s, sep))
3454
}
35-
switch url.Scheme {
36-
case "http", "https":
37-
return url.Host, url.Hostname(), url.Scheme == "https"
38-
case "unix", "unixs":
39-
requireCreds = url.Scheme == "unixs"
40-
if url.Opaque != "" {
41-
return "unix:" + url.Opaque, stripPort(url.Opaque), requireCreds
42-
} else if url.Path != "" {
43-
return "unix://" + url.Host + url.Path, url.Host + url.Path, requireCreds
44-
} else {
45-
return "unix:" + url.Host, url.Hostname(), requireCreds
46-
}
55+
return spl[0], spl[1]
56+
}
57+
58+
func schemeToCredsRequirement(schema string) CredsRequirement {
59+
switch schema {
60+
case "https", "unixs":
61+
return CREDS_REQUIRE
62+
case "http":
63+
return CREDS_DROP
64+
case "unix":
65+
// Preserving previous behavior from:
66+
// https://github.com/etcd-io/etcd/blob/dae29bb719dd69dc119146fc297a0628fcc1ccf8/client/v3/client.go#L212
67+
// that likely was a bug due to missing 'fallthrough'.
68+
// At the same time it seems legit to let the users decide whether they
69+
// want credential control or not (and 'unixs' schema is not a standard thing).
70+
return CREDS_OPTIONAL
4771
case "":
48-
return url.Host + url.Path, url.Host + url.Path, false
72+
return CREDS_OPTIONAL
4973
default:
50-
return ep, stripPort(ep), false
74+
return CREDS_OPTIONAL
75+
}
76+
}
77+
78+
// This function translates endpoints names supported by etcd server into
79+
// endpoints as supported by grpc with additional information
80+
// (server_name for cert validation, requireCreds - whether certs are needed).
81+
// The main differences:
82+
// - etcd supports unixs & https names as opposed to unix & http to
83+
// distinguish need to configure certificates.
84+
// - etcd support http(s) names as opposed to tcp supported by grpc/dial method.
85+
// - etcd supports unix(s)://local-file naming schema
86+
// (as opposed to unix:local-file canonical name used by grpc for current dir files).
87+
// - Within the unix(s) schemas, the last segment (filename) without 'port' (content after colon)
88+
// is considered serverName - to allow local testing of cert-protected communication.
89+
//
90+
// See more:
91+
// - https://github.com/grpc/grpc-go/blob/26c143bd5f59344a4b8a1e491e0f5e18aa97abc7/internal/grpcutil/target.go#L47
92+
// - https://golang.org/pkg/net/#Dial
93+
// - https://github.com/grpc/grpc/blob/master/doc/naming.md
94+
func translateEndpoint(ep string) (addr string, serverName string, requireCreds CredsRequirement) {
95+
if strings.HasPrefix(ep, "unix:") || strings.HasPrefix(ep, "unixs:") {
96+
if strings.HasPrefix(ep, "unix:///") || strings.HasPrefix(ep, "unixs:///") {
97+
// absolute path case
98+
schema, absolutePath := mustSplit2(ep, "://")
99+
return "unix://" + absolutePath, extractHostFromPath(absolutePath), schemeToCredsRequirement(schema)
100+
}
101+
if strings.HasPrefix(ep, "unix://") || strings.HasPrefix(ep, "unixs://") {
102+
// legacy etcd local path
103+
schema, localPath := mustSplit2(ep, "://")
104+
return "unix:" + localPath, extractHostFromPath(localPath), schemeToCredsRequirement(schema)
105+
}
106+
schema, localPath := mustSplit2(ep, ":")
107+
return "unix:" + localPath, extractHostFromPath(localPath), schemeToCredsRequirement(schema)
108+
}
109+
110+
if strings.Contains(ep, "://") {
111+
url, err := url.Parse(ep)
112+
if err != nil {
113+
return ep, extractHostFromHostPort(ep), CREDS_OPTIONAL
114+
}
115+
if url.Scheme == "http" || url.Scheme == "https" {
116+
return url.Host, url.Hostname(), schemeToCredsRequirement(url.Scheme)
117+
}
118+
return ep, url.Hostname(), schemeToCredsRequirement(url.Scheme)
51119
}
120+
// Handles plain addresses like 10.0.0.44:437.
121+
return ep, extractHostFromHostPort(ep), CREDS_OPTIONAL
52122
}
53123

54124
// RequiresCredentials returns whether given endpoint requires
55125
// credentials/certificates for connection.
56-
func RequiresCredentials(ep string) bool {
126+
func RequiresCredentials(ep string) CredsRequirement {
57127
_, _, requireCreds := translateEndpoint(ep)
58128
return requireCreds
59129
}

clientv3/internal/endpoint/endpoint_test.go

Lines changed: 60 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -18,41 +18,48 @@ import (
1818
"testing"
1919
)
2020

21-
func TestInterpret(t *testing.T) {
21+
func Test_interpret(t *testing.T) {
2222
tests := []struct {
23-
endpoint string
24-
wantAddress string
25-
wantServerName string
23+
endpoint string
24+
wantAddress string
25+
wantServerName string
26+
wantRequiresCreds CredsRequirement
2627
}{
27-
{"127.0.0.1", "127.0.0.1", "127.0.0.1"},
28-
{"localhost", "localhost", "localhost"},
29-
{"localhost:8080", "localhost:8080", "localhost"},
28+
{"127.0.0.1", "127.0.0.1", "127.0.0.1", CREDS_OPTIONAL},
29+
{"localhost", "localhost", "localhost", CREDS_OPTIONAL},
30+
{"localhost:8080", "localhost:8080", "localhost", CREDS_OPTIONAL},
3031

31-
{"unix:127.0.0.1", "unix:127.0.0.1", "127.0.0.1"},
32-
{"unix:127.0.0.1:8080", "unix:127.0.0.1:8080", "127.0.0.1"},
32+
{"unix:127.0.0.1", "unix:127.0.0.1", "127.0.0.1", CREDS_OPTIONAL},
33+
{"unix:127.0.0.1:8080", "unix:127.0.0.1:8080", "127.0.0.1", CREDS_OPTIONAL},
3334

34-
{"unix://127.0.0.1", "unix:127.0.0.1", "127.0.0.1"},
35-
{"unix://127.0.0.1:8080", "unix:127.0.0.1:8080", "127.0.0.1"},
35+
{"unix://127.0.0.1", "unix:127.0.0.1", "127.0.0.1", CREDS_OPTIONAL},
36+
{"unix://127.0.0.1:8080", "unix:127.0.0.1:8080", "127.0.0.1", CREDS_OPTIONAL},
3637

37-
{"unixs:127.0.0.1", "unix:127.0.0.1", "127.0.0.1"},
38-
{"unixs:127.0.0.1:8080", "unix:127.0.0.1:8080", "127.0.0.1"},
39-
{"unixs://127.0.0.1", "unix:127.0.0.1", "127.0.0.1"},
40-
{"unixs://127.0.0.1:8080", "unix:127.0.0.1:8080", "127.0.0.1"},
38+
{"unixs:127.0.0.1", "unix:127.0.0.1", "127.0.0.1", CREDS_REQUIRE},
39+
{"unixs:127.0.0.1:8080", "unix:127.0.0.1:8080", "127.0.0.1", CREDS_REQUIRE},
40+
{"unixs://127.0.0.1", "unix:127.0.0.1", "127.0.0.1", CREDS_REQUIRE},
41+
{"unixs://127.0.0.1:8080", "unix:127.0.0.1:8080", "127.0.0.1", CREDS_REQUIRE},
4142

42-
{"http://127.0.0.1", "127.0.0.1", "127.0.0.1"},
43-
{"http://127.0.0.1:8080", "127.0.0.1:8080", "127.0.0.1"},
44-
{"https://127.0.0.1", "127.0.0.1", "127.0.0.1"},
45-
{"https://127.0.0.1:8080", "127.0.0.1:8080", "127.0.0.1"},
46-
{"https://localhost:20000", "localhost:20000", "localhost"},
43+
{"http://127.0.0.1", "127.0.0.1", "127.0.0.1", CREDS_DROP},
44+
{"http://127.0.0.1:8080", "127.0.0.1:8080", "127.0.0.1", CREDS_DROP},
45+
{"https://127.0.0.1", "127.0.0.1", "127.0.0.1", CREDS_REQUIRE},
46+
{"https://127.0.0.1:8080", "127.0.0.1:8080", "127.0.0.1", CREDS_REQUIRE},
47+
{"https://localhost:20000", "localhost:20000", "localhost", CREDS_REQUIRE},
4748

48-
{"unix:///tmp/abc", "unix:///tmp/abc", "/tmp/abc"},
49-
{"unixs:///tmp/abc", "unix:///tmp/abc", "/tmp/abc"},
50-
{"etcd.io", "etcd.io", "etcd.io"},
51-
{"http://etcd.io/abc", "etcd.io", "etcd.io"},
52-
{"dns://something-other", "dns://something-other", "dns://something-other"},
49+
{"unix:///tmp/abc", "unix:///tmp/abc", "abc", CREDS_OPTIONAL},
50+
{"unixs:///tmp/abc", "unix:///tmp/abc", "abc", CREDS_REQUIRE},
51+
{"unix:///tmp/abc:1234", "unix:///tmp/abc:1234", "abc", CREDS_OPTIONAL},
52+
{"unixs:///tmp/abc:1234", "unix:///tmp/abc:1234", "abc", CREDS_REQUIRE},
53+
{"etcd.io", "etcd.io", "etcd.io", CREDS_OPTIONAL},
54+
{"http://etcd.io/abc", "etcd.io", "etcd.io", CREDS_DROP},
55+
{"dns://something-other", "dns://something-other", "something-other", CREDS_OPTIONAL},
56+
57+
{"http://[2001:db8:1f70::999:de8:7648:6e8]:100/", "[2001:db8:1f70::999:de8:7648:6e8]:100", "2001:db8:1f70::999:de8:7648:6e8", CREDS_DROP},
58+
{"[2001:db8:1f70::999:de8:7648:6e8]:100", "[2001:db8:1f70::999:de8:7648:6e8]:100", "2001:db8:1f70::999:de8:7648:6e8", CREDS_OPTIONAL},
59+
{"unix:unexpected-file_name#123$456", "unix:unexpected-file_name#123$456", "unexpected-file_name#123$456", CREDS_OPTIONAL},
5360
}
5461
for _, tt := range tests {
55-
t.Run(tt.endpoint, func(t *testing.T) {
62+
t.Run("Interpret_"+tt.endpoint, func(t *testing.T) {
5663
gotAddress, gotServerName := Interpret(tt.endpoint)
5764
if gotAddress != tt.wantAddress {
5865
t.Errorf("Interpret() gotAddress = %v, want %v", gotAddress, tt.wantAddress)
@@ -61,5 +68,32 @@ func TestInterpret(t *testing.T) {
6168
t.Errorf("Interpret() gotServerName = %v, want %v", gotServerName, tt.wantServerName)
6269
}
6370
})
71+
t.Run("RequiresCredentials_"+tt.endpoint, func(t *testing.T) {
72+
requiresCreds := RequiresCredentials(tt.endpoint)
73+
if requiresCreds != tt.wantRequiresCreds {
74+
t.Errorf("RequiresCredentials() got = %v, want %v", requiresCreds, tt.wantRequiresCreds)
75+
}
76+
})
77+
}
78+
}
79+
80+
func Test_extractHostFromHostPort(t *testing.T) {
81+
tests := []struct {
82+
ep string
83+
want string
84+
}{
85+
{ep: "localhost", want: "localhost"},
86+
{ep: "localhost:8080", want: "localhost"},
87+
{ep: "192.158.7.14:8080", want: "192.158.7.14"},
88+
{ep: "192.158.7.14:8080", want: "192.158.7.14"},
89+
{ep: "[2001:db8:1f70::999:de8:7648:6e8]", want: "[2001:db8:1f70::999:de8:7648:6e8]"},
90+
{ep: "[2001:db8:1f70::999:de8:7648:6e8]:100", want: "2001:db8:1f70::999:de8:7648:6e8"},
91+
}
92+
for _, tt := range tests {
93+
t.Run(tt.ep, func(t *testing.T) {
94+
if got := extractHostFromHostPort(tt.ep); got != tt.want {
95+
t.Errorf("extractHostFromHostPort() = %v, want %v", got, tt.want)
96+
}
97+
})
6498
}
6599
}

clientv3/internal/resolver/resolver.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,10 @@ import (
2222
"google.golang.org/grpc/serviceconfig"
2323
)
2424

25+
const (
26+
Schema = "etcd-endpoints"
27+
)
28+
2529
// EtcdManualResolver is a Resolver (and resolver.Builder) that can be updated
2630
// using SetEndpoints.
2731
type EtcdManualResolver struct {
@@ -31,7 +35,7 @@ type EtcdManualResolver struct {
3135
}
3236

3337
func New(endpoints ...string) *EtcdManualResolver {
34-
r := manual.NewBuilderWithScheme("etcd-endpoints")
38+
r := manual.NewBuilderWithScheme(Schema)
3539
return &EtcdManualResolver{Resolver: r, endpoints: endpoints, serviceConfig: nil}
3640
}
3741

0 commit comments

Comments
 (0)