Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,6 @@ type simpleHTTPClient struct {

func (c *simpleHTTPClient) Do(ctx context.Context, act httpAction) (*http.Response, []byte, error) {
req := act.HTTPRequest(c.endpoint)

if err := printcURL(req); err != nil {
return nil, nil, err
}
Expand Down
24 changes: 24 additions & 0 deletions clientv3/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@ package clientv3
import (
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/pkg/pbutil"
"github.com/opentracing/opentracing-go"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)

type (
Expand Down Expand Up @@ -90,21 +93,35 @@ func NewKV(c *Client) KV {
}

func (kv *kv) Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error) {
sp, ctx := opentracing.StartSpanFromContext(ctx, "clientv3/Put")
defer sp.Finish()
r, err := kv.Do(ctx, OpPut(key, val, opts...))
return r.put, rpctypes.Error(err)
}

func (kv *kv) Get(ctx context.Context, key string, opts ...OpOption) (*GetResponse, error) {
sp, ctx := opentracing.StartSpanFromContext(ctx, "clientv3/Get")
defer sp.Finish()
r, err := kv.Do(ctx, OpGet(key, opts...))
return r.get, rpctypes.Error(err)
}

func (kv *kv) Delete(ctx context.Context, key string, opts ...OpOption) (*DeleteResponse, error) {
sp, ctx := opentracing.StartSpanFromContext(ctx, "clientv3/Delete")
defer sp.Finish()
r, err := kv.Do(ctx, OpDelete(key, opts...))
return r.del, rpctypes.Error(err)
}

func (kv *kv) Compact(ctx context.Context, rev int64) error {
sp, ctx := opentracing.StartSpanFromContext(ctx, "clientv3/kv/Compact")
defer sp.Finish()
md := metadata.New(nil)
if err := sp.Tracer().Inject(sp, opentracing.TextMap, pbutil.MetadataReaderWriter{&md}); err != nil {
return err
}
ctx = metadata.NewContext(ctx, md)

remote, err := kv.getRemote(ctx)
if err != nil {
return rpctypes.Error(err)
Expand All @@ -129,6 +146,13 @@ func (kv *kv) Txn(ctx context.Context) Txn {
}

func (kv *kv) Do(ctx context.Context, op Op) (OpResponse, error) {
sp, ctx := opentracing.StartSpanFromContext(ctx, "clientv3/Do")
defer sp.Finish()
md := metadata.New(nil)
if err := sp.Tracer().Inject(sp, opentracing.TextMap, pbutil.MetadataReaderWriter{&md}); err != nil {
return OpResponse{}, err
}
ctx = metadata.NewContext(ctx, md)
for {
resp, err := kv.do(ctx, op)
if err == nil {
Expand Down
16 changes: 16 additions & 0 deletions etcdmain/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,14 @@ type config struct {
ProxyReadTimeoutMs uint `json:"proxy-read-timeout"`
ProxyCfgFile string `json:"proxy"`

// tracing
TracingLightstepToken string `json:"tracing-lightstep-token"`
TracingLightstepHost string `json:"tracing-lighstep-host"`
TracingLightstepCollector string `json:"tracing-lighstep-collector"`
TracingAppdashHost string `json:"tracing-appdash-host"`
TracingAppdashVerbose bool `json:"tracing-appdash-verbose"`
TracingAppdashSamplingRate uint64 `json:"tracing-appdash-sampling-rate"`

// security
clientTLSInfo, peerTLSInfo transport.TLSInfo
ClientAutoTLS bool
Expand Down Expand Up @@ -250,6 +258,14 @@ func NewConfig() *config {
fs.BoolVar(&cfg.Debug, "debug", false, "Enable debug-level logging for etcd.")
fs.StringVar(&cfg.LogPkgLevels, "log-package-levels", "", "Specify a particular log level for each etcd package (eg: 'etcdmain=CRITICAL,etcdserver=DEBUG').")

// tracing
fs.StringVar(&cfg.TracingLightstepToken, "tracing-lightstep-token", "", "Lightstep access token")
fs.StringVar(&cfg.TracingLightstepHost, "tracing-lightstep-host", "", "Lightstep API host")
fs.StringVar(&cfg.TracingLightstepCollector, "tracing-lightstep-collector", "", "Lightstep collector host")
fs.StringVar(&cfg.TracingAppdashHost, "tracing-appdash-host", "", "Appdash remote collector address (host:port)")
fs.Uint64Var(&cfg.TracingAppdashSamplingRate, "tracing-appdash-sampling-rate", 128, "Trace sampling rate for Appdash")
fs.BoolVar(&cfg.TracingAppdashVerbose, "tracing-appdash-verbose", false, "Enable verbose logging for Appdash")

// unsafe
fs.BoolVar(&cfg.ForceNewCluster, "force-new-cluster", false, "Force to create a new one member cluster.")

Expand Down
26 changes: 26 additions & 0 deletions etcdmain/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,12 @@ import (
"github.com/coreos/go-systemd/daemon"
systemdutil "github.com/coreos/go-systemd/util"
"github.com/coreos/pkg/capnslog"
"github.com/lightstep/lightstep-tracer-go"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"

"sourcegraph.com/sourcegraph/appdash"
appdashot "sourcegraph.com/sourcegraph/appdash/opentracing"
)

type dirType string
Expand Down Expand Up @@ -108,6 +113,27 @@ func startEtcdOrProxyV2() {
plog.Warningf("no data-dir provided, using default data-dir ./%s", cfg.Dir)
}

// The global tracer is noop by default.
if cfg.TracingLightstepToken != "" {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is in my opinion one of the most important parts about opentracing: You don't have to modify the entire codebase to use a different tracing backend. It's a quick change in the configuration and everything else stays the same.

plog.Warningf("Using lightstep %s %s", cfg.TracingLightstepToken, cfg.TracingLightstepCollector)
opentracing.InitGlobalTracer(lightstep.NewTracer(lightstep.Options{
AccessToken: cfg.TracingLightstepToken,
Collector: lightstep.Endpoint{
Host: cfg.TracingLightstepCollector,
},
Tags: opentracing.Tags{"component": "etcd"},
}))
} else if cfg.TracingAppdashHost != "" {
collector := appdash.NewRemoteCollector(cfg.TracingAppdashHost)
tracer := appdashot.NewTracerWithOptions(collector, appdashot.Options{
ShouldSample: func(traceID uint64) bool {
return traceID%cfg.TracingAppdashSamplingRate == 0
},
Verbose: cfg.TracingAppdashVerbose,
})
opentracing.InitGlobalTracer(tracer)
}

which := identifyDataDirOrDie(cfg.Dir)
if which != dirEmpty {
plog.Noticef("the server is already initialized as %v before, starting as etcd %v...", which, which)
Expand Down
3 changes: 3 additions & 0 deletions etcdserver/api/v2http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/etcdserver/membership"
"github.com/coreos/etcd/etcdserver/stats"
"github.com/coreos/etcd/pkg/httputil"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/store"
Expand Down Expand Up @@ -208,6 +209,8 @@ type membersHandler struct {
}

func (h *membersHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
sp := httputil.JoinOrCreateSpanFromHeader("membershandler/ServeHTTP", r.Header)
defer sp.Finish()
if !allowMethod(w, r.Method, "GET", "POST", "DELETE", "PUT") {
return
}
Expand Down
18 changes: 18 additions & 0 deletions etcdserver/api/v3rpc/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ func NewAuthServer(s *etcdserver.EtcdServer) *AuthServer {
}

func (as *AuthServer) AuthEnable(ctx context.Context, r *pb.AuthEnableRequest) (*pb.AuthEnableResponse, error) {
sp, ctx := startSpanFromMetadata(ctx, "authserver/AuthEnable")
defer sp.Finish()
resp, err := as.authenticator.AuthEnable(ctx, r)
if err != nil {
return nil, togRPCError(err)
Expand All @@ -37,6 +39,8 @@ func (as *AuthServer) AuthEnable(ctx context.Context, r *pb.AuthEnableRequest) (
}

func (as *AuthServer) AuthDisable(ctx context.Context, r *pb.AuthDisableRequest) (*pb.AuthDisableResponse, error) {
sp, ctx := startSpanFromMetadata(ctx, "authserver/AuthDisable")
defer sp.Finish()
resp, err := as.authenticator.AuthDisable(ctx, r)
if err != nil {
return nil, togRPCError(err)
Expand All @@ -45,6 +49,8 @@ func (as *AuthServer) AuthDisable(ctx context.Context, r *pb.AuthDisableRequest)
}

func (as *AuthServer) Authenticate(ctx context.Context, r *pb.AuthenticateRequest) (*pb.AuthenticateResponse, error) {
sp, ctx := startSpanFromMetadata(ctx, "authserver/Authenticate")
defer sp.Finish()
resp, err := as.authenticator.Authenticate(ctx, r)
if err != nil {
return nil, togRPCError(err)
Expand All @@ -53,6 +59,8 @@ func (as *AuthServer) Authenticate(ctx context.Context, r *pb.AuthenticateReques
}

func (as *AuthServer) RoleAdd(ctx context.Context, r *pb.AuthRoleAddRequest) (*pb.AuthRoleAddResponse, error) {
sp, ctx := startSpanFromMetadata(ctx, "authserver/RoleAdd")
defer sp.Finish()
resp, err := as.authenticator.RoleAdd(ctx, r)
if err != nil {
return nil, togRPCError(err)
Expand All @@ -76,6 +84,8 @@ func (as *AuthServer) RoleRevoke(ctx context.Context, r *pb.AuthRoleRevokeReques
}

func (as *AuthServer) RoleGrant(ctx context.Context, r *pb.AuthRoleGrantRequest) (*pb.AuthRoleGrantResponse, error) {
sp, ctx := startSpanFromMetadata(ctx, "authserver/RoleGrant")
defer sp.Finish()
resp, err := as.authenticator.RoleGrant(ctx, r)
if err != nil {
return nil, togRPCError(err)
Expand All @@ -84,6 +94,8 @@ func (as *AuthServer) RoleGrant(ctx context.Context, r *pb.AuthRoleGrantRequest)
}

func (as *AuthServer) UserAdd(ctx context.Context, r *pb.AuthUserAddRequest) (*pb.AuthUserAddResponse, error) {
sp, ctx := startSpanFromMetadata(ctx, "authserver/UserAdd")
defer sp.Finish()
resp, err := as.authenticator.UserAdd(ctx, r)
if err != nil {
return nil, togRPCError(err)
Expand All @@ -92,6 +104,8 @@ func (as *AuthServer) UserAdd(ctx context.Context, r *pb.AuthUserAddRequest) (*p
}

func (as *AuthServer) UserDelete(ctx context.Context, r *pb.AuthUserDeleteRequest) (*pb.AuthUserDeleteResponse, error) {
sp, ctx := startSpanFromMetadata(ctx, "authserver/UserDelete")
defer sp.Finish()
resp, err := as.authenticator.UserDelete(ctx, r)
if err != nil {
return nil, togRPCError(err)
Expand All @@ -105,6 +119,8 @@ func (as *AuthServer) UserGet(ctx context.Context, r *pb.AuthUserGetRequest) (*p
}

func (as *AuthServer) UserGrant(ctx context.Context, r *pb.AuthUserGrantRequest) (*pb.AuthUserGrantResponse, error) {
sp, ctx := startSpanFromMetadata(ctx, "authserver/UserGrant")
defer sp.Finish()
resp, err := as.authenticator.UserGrant(ctx, r)
if err != nil {
return nil, togRPCError(err)
Expand All @@ -118,6 +134,8 @@ func (as *AuthServer) UserRevoke(ctx context.Context, r *pb.AuthUserRevokeReques
}

func (as *AuthServer) UserChangePassword(ctx context.Context, r *pb.AuthUserChangePasswordRequest) (*pb.AuthUserChangePasswordResponse, error) {
sp, ctx := startSpanFromMetadata(ctx, "authserver/UserChangePassword")
defer sp.Finish()
resp, err := as.authenticator.UserChangePassword(ctx, r)
if err != nil {
return nil, togRPCError(err)
Expand Down
30 changes: 30 additions & 0 deletions etcdserver/api/v3rpc/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,14 @@ package v3rpc
import (
"sort"

"google.golang.org/grpc/metadata"

"github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/pkg/pbutil"
"github.com/coreos/pkg/capnslog"
"github.com/opentracing/opentracing-go"
"golang.org/x/net/context"
)

Expand All @@ -43,6 +47,9 @@ func NewKVServer(s *etcdserver.EtcdServer) pb.KVServer {
}

func (s *kvServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) {
var sp opentracing.Span
var err error
defer sp.Finish()
if err := checkRangeRequest(r); err != nil {
return nil, err
}
Expand All @@ -60,6 +67,8 @@ func (s *kvServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResp
}

func (s *kvServer) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) {
sp, ctx := startSpanFromMetadata(ctx, "kvServer/v3/Range")
defer sp.Finish()
if err := checkPutRequest(r); err != nil {
return nil, err
}
Expand All @@ -77,6 +86,8 @@ func (s *kvServer) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse,
}

func (s *kvServer) DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) {
sp, ctx := startSpanFromMetadata(ctx, "kvServer/v3/Range")
defer sp.Finish()
if err := checkDeleteRequest(r); err != nil {
return nil, err
}
Expand All @@ -94,6 +105,8 @@ func (s *kvServer) DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) (*
}

func (s *kvServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) {
sp, ctx := startSpanFromMetadata(ctx, "kvServer/v3/Range")
defer sp.Finish()
if err := checkTxnRequest(r); err != nil {
return nil, err
}
Expand All @@ -111,6 +124,8 @@ func (s *kvServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse,
}

func (s *kvServer) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error) {
sp, ctx := startSpanFromMetadata(ctx, "kvServer/v3/Range")
defer sp.Finish()
resp, err := s.kv.Compact(ctx, r)
if err != nil {
return nil, togRPCError(err)
Expand Down Expand Up @@ -251,3 +266,18 @@ func checkRequestUnion(u *pb.RequestUnion) error {
}
return nil
}

func startSpanFromMetadata(ctx context.Context, operationName string) (opentracing.Span, context.Context) {
md, ok := metadata.FromContext(ctx)
if ok {
sp, err := opentracing.GlobalTracer().Join(operationName, opentracing.TextMap, pbutil.MetadataReaderWriter{&md})
if err != nil {
plog.Warningf("Couldn't join trace %v", err)
return opentracing.StartSpanFromContext(ctx, operationName)
} else {
return sp, opentracing.ContextWithSpan(ctx, sp)
}
}
plog.Infof("Couldn't find metadata")
return opentracing.StartSpanFromContext(ctx, operationName)
}
13 changes: 13 additions & 0 deletions etcdserver/api/v3rpc/member.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/etcdserver/membership"
"github.com/coreos/etcd/pkg/types"
"github.com/opentracing/opentracing-go"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
Expand All @@ -43,6 +44,9 @@ func NewClusterServer(s *etcdserver.EtcdServer) *ClusterServer {
}

func (cs *ClusterServer) MemberAdd(ctx context.Context, r *pb.MemberAddRequest) (*pb.MemberAddResponse, error) {
sp, ctx := opentracing.StartSpanFromContext(ctx, "clusterserver/MemberAdd")
defer sp.Finish()

urls, err := types.NewURLs(r.PeerURLs)
if err != nil {
return nil, rpctypes.ErrGRPCMemberBadURLs
Expand All @@ -67,6 +71,9 @@ func (cs *ClusterServer) MemberAdd(ctx context.Context, r *pb.MemberAddRequest)
}

func (cs *ClusterServer) MemberRemove(ctx context.Context, r *pb.MemberRemoveRequest) (*pb.MemberRemoveResponse, error) {
sp, ctx := opentracing.StartSpanFromContext(ctx, "clusterserver/MemberRemove")
defer sp.Finish()

err := cs.server.RemoveMember(ctx, r.ID)
switch {
case err == membership.ErrIDRemoved:
Expand All @@ -81,6 +88,9 @@ func (cs *ClusterServer) MemberRemove(ctx context.Context, r *pb.MemberRemoveReq
}

func (cs *ClusterServer) MemberUpdate(ctx context.Context, r *pb.MemberUpdateRequest) (*pb.MemberUpdateResponse, error) {
sp, ctx := opentracing.StartSpanFromContext(ctx, "clusterserver/MemberUpdate")
defer sp.Finish()

m := membership.Member{
ID: types.ID(r.ID),
RaftAttributes: membership.RaftAttributes{PeerURLs: r.PeerURLs},
Expand All @@ -99,6 +109,9 @@ func (cs *ClusterServer) MemberUpdate(ctx context.Context, r *pb.MemberUpdateReq
}

func (cs *ClusterServer) MemberList(ctx context.Context, r *pb.MemberListRequest) (*pb.MemberListResponse, error) {
sp, ctx := opentracing.StartSpanFromContext(ctx, "clusterserver/MemberList")
defer sp.Finish()

membs := cs.cluster.Members()

protoMembs := make([]*pb.Member, len(membs))
Expand Down
Loading