diff --git a/client/client.go b/client/client.go index fb9c4599caa7..d53c9a7a17fb 100644 --- a/client/client.go +++ b/client/client.go @@ -442,7 +442,6 @@ type simpleHTTPClient struct { func (c *simpleHTTPClient) Do(ctx context.Context, act httpAction) (*http.Response, []byte, error) { req := act.HTTPRequest(c.endpoint) - if err := printcURL(req); err != nil { return nil, nil, err } diff --git a/clientv3/kv.go b/clientv3/kv.go index 7b4082a08bc3..543fbadb452c 100644 --- a/clientv3/kv.go +++ b/clientv3/kv.go @@ -17,8 +17,11 @@ package clientv3 import ( "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + "github.com/coreos/etcd/pkg/pbutil" + "github.com/opentracing/opentracing-go" "golang.org/x/net/context" "google.golang.org/grpc" + "google.golang.org/grpc/metadata" ) type ( @@ -90,21 +93,35 @@ func NewKV(c *Client) KV { } func (kv *kv) Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error) { + sp, ctx := opentracing.StartSpanFromContext(ctx, "clientv3/Put") + defer sp.Finish() r, err := kv.Do(ctx, OpPut(key, val, opts...)) return r.put, rpctypes.Error(err) } func (kv *kv) Get(ctx context.Context, key string, opts ...OpOption) (*GetResponse, error) { + sp, ctx := opentracing.StartSpanFromContext(ctx, "clientv3/Get") + defer sp.Finish() r, err := kv.Do(ctx, OpGet(key, opts...)) return r.get, rpctypes.Error(err) } func (kv *kv) Delete(ctx context.Context, key string, opts ...OpOption) (*DeleteResponse, error) { + sp, ctx := opentracing.StartSpanFromContext(ctx, "clientv3/Delete") + defer sp.Finish() r, err := kv.Do(ctx, OpDelete(key, opts...)) return r.del, rpctypes.Error(err) } func (kv *kv) Compact(ctx context.Context, rev int64) error { + sp, ctx := opentracing.StartSpanFromContext(ctx, "clientv3/kv/Compact") + defer sp.Finish() + md := metadata.New(nil) + if err := sp.Tracer().Inject(sp, opentracing.TextMap, pbutil.MetadataReaderWriter{&md}); err != nil { + return err + } + ctx = metadata.NewContext(ctx, md) + remote, err := kv.getRemote(ctx) if err != nil { return rpctypes.Error(err) @@ -129,6 +146,13 @@ func (kv *kv) Txn(ctx context.Context) Txn { } func (kv *kv) Do(ctx context.Context, op Op) (OpResponse, error) { + sp, ctx := opentracing.StartSpanFromContext(ctx, "clientv3/Do") + defer sp.Finish() + md := metadata.New(nil) + if err := sp.Tracer().Inject(sp, opentracing.TextMap, pbutil.MetadataReaderWriter{&md}); err != nil { + return OpResponse{}, err + } + ctx = metadata.NewContext(ctx, md) for { resp, err := kv.do(ctx, op) if err == nil { diff --git a/etcdmain/config.go b/etcdmain/config.go index 66023c4dcb7a..94ec2f7da786 100644 --- a/etcdmain/config.go +++ b/etcdmain/config.go @@ -125,6 +125,14 @@ type config struct { ProxyReadTimeoutMs uint `json:"proxy-read-timeout"` ProxyCfgFile string `json:"proxy"` + // tracing + TracingLightstepToken string `json:"tracing-lightstep-token"` + TracingLightstepHost string `json:"tracing-lighstep-host"` + TracingLightstepCollector string `json:"tracing-lighstep-collector"` + TracingAppdashHost string `json:"tracing-appdash-host"` + TracingAppdashVerbose bool `json:"tracing-appdash-verbose"` + TracingAppdashSamplingRate uint64 `json:"tracing-appdash-sampling-rate"` + // security clientTLSInfo, peerTLSInfo transport.TLSInfo ClientAutoTLS bool @@ -250,6 +258,14 @@ func NewConfig() *config { fs.BoolVar(&cfg.Debug, "debug", false, "Enable debug-level logging for etcd.") fs.StringVar(&cfg.LogPkgLevels, "log-package-levels", "", "Specify a particular log level for each etcd package (eg: 'etcdmain=CRITICAL,etcdserver=DEBUG').") + // tracing + fs.StringVar(&cfg.TracingLightstepToken, "tracing-lightstep-token", "", "Lightstep access token") + fs.StringVar(&cfg.TracingLightstepHost, "tracing-lightstep-host", "", "Lightstep API host") + fs.StringVar(&cfg.TracingLightstepCollector, "tracing-lightstep-collector", "", "Lightstep collector host") + fs.StringVar(&cfg.TracingAppdashHost, "tracing-appdash-host", "", "Appdash remote collector address (host:port)") + fs.Uint64Var(&cfg.TracingAppdashSamplingRate, "tracing-appdash-sampling-rate", 128, "Trace sampling rate for Appdash") + fs.BoolVar(&cfg.TracingAppdashVerbose, "tracing-appdash-verbose", false, "Enable verbose logging for Appdash") + // unsafe fs.BoolVar(&cfg.ForceNewCluster, "force-new-cluster", false, "Force to create a new one member cluster.") diff --git a/etcdmain/etcd.go b/etcdmain/etcd.go index fd09b81bbb60..4e98024b313f 100644 --- a/etcdmain/etcd.go +++ b/etcdmain/etcd.go @@ -45,7 +45,12 @@ import ( "github.com/coreos/go-systemd/daemon" systemdutil "github.com/coreos/go-systemd/util" "github.com/coreos/pkg/capnslog" + "github.com/lightstep/lightstep-tracer-go" + "github.com/opentracing/opentracing-go" "github.com/prometheus/client_golang/prometheus" + + "sourcegraph.com/sourcegraph/appdash" + appdashot "sourcegraph.com/sourcegraph/appdash/opentracing" ) type dirType string @@ -108,6 +113,27 @@ func startEtcdOrProxyV2() { plog.Warningf("no data-dir provided, using default data-dir ./%s", cfg.Dir) } + // The global tracer is noop by default. + if cfg.TracingLightstepToken != "" { + plog.Warningf("Using lightstep %s %s", cfg.TracingLightstepToken, cfg.TracingLightstepCollector) + opentracing.InitGlobalTracer(lightstep.NewTracer(lightstep.Options{ + AccessToken: cfg.TracingLightstepToken, + Collector: lightstep.Endpoint{ + Host: cfg.TracingLightstepCollector, + }, + Tags: opentracing.Tags{"component": "etcd"}, + })) + } else if cfg.TracingAppdashHost != "" { + collector := appdash.NewRemoteCollector(cfg.TracingAppdashHost) + tracer := appdashot.NewTracerWithOptions(collector, appdashot.Options{ + ShouldSample: func(traceID uint64) bool { + return traceID%cfg.TracingAppdashSamplingRate == 0 + }, + Verbose: cfg.TracingAppdashVerbose, + }) + opentracing.InitGlobalTracer(tracer) + } + which := identifyDataDirOrDie(cfg.Dir) if which != dirEmpty { plog.Noticef("the server is already initialized as %v before, starting as etcd %v...", which, which) diff --git a/etcdserver/api/v2http/client.go b/etcdserver/api/v2http/client.go index 0bcf0636a08b..72303973cf2c 100644 --- a/etcdserver/api/v2http/client.go +++ b/etcdserver/api/v2http/client.go @@ -36,6 +36,7 @@ import ( "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/etcdserver/membership" "github.com/coreos/etcd/etcdserver/stats" + "github.com/coreos/etcd/pkg/httputil" "github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/raft" "github.com/coreos/etcd/store" @@ -208,6 +209,8 @@ type membersHandler struct { } func (h *membersHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + sp := httputil.JoinOrCreateSpanFromHeader("membershandler/ServeHTTP", r.Header) + defer sp.Finish() if !allowMethod(w, r.Method, "GET", "POST", "DELETE", "PUT") { return } diff --git a/etcdserver/api/v3rpc/auth.go b/etcdserver/api/v3rpc/auth.go index 07a84db29643..a30ff465e63b 100644 --- a/etcdserver/api/v3rpc/auth.go +++ b/etcdserver/api/v3rpc/auth.go @@ -29,6 +29,8 @@ func NewAuthServer(s *etcdserver.EtcdServer) *AuthServer { } func (as *AuthServer) AuthEnable(ctx context.Context, r *pb.AuthEnableRequest) (*pb.AuthEnableResponse, error) { + sp, ctx := startSpanFromMetadata(ctx, "authserver/AuthEnable") + defer sp.Finish() resp, err := as.authenticator.AuthEnable(ctx, r) if err != nil { return nil, togRPCError(err) @@ -37,6 +39,8 @@ func (as *AuthServer) AuthEnable(ctx context.Context, r *pb.AuthEnableRequest) ( } func (as *AuthServer) AuthDisable(ctx context.Context, r *pb.AuthDisableRequest) (*pb.AuthDisableResponse, error) { + sp, ctx := startSpanFromMetadata(ctx, "authserver/AuthDisable") + defer sp.Finish() resp, err := as.authenticator.AuthDisable(ctx, r) if err != nil { return nil, togRPCError(err) @@ -45,6 +49,8 @@ func (as *AuthServer) AuthDisable(ctx context.Context, r *pb.AuthDisableRequest) } func (as *AuthServer) Authenticate(ctx context.Context, r *pb.AuthenticateRequest) (*pb.AuthenticateResponse, error) { + sp, ctx := startSpanFromMetadata(ctx, "authserver/Authenticate") + defer sp.Finish() resp, err := as.authenticator.Authenticate(ctx, r) if err != nil { return nil, togRPCError(err) @@ -53,6 +59,8 @@ func (as *AuthServer) Authenticate(ctx context.Context, r *pb.AuthenticateReques } func (as *AuthServer) RoleAdd(ctx context.Context, r *pb.AuthRoleAddRequest) (*pb.AuthRoleAddResponse, error) { + sp, ctx := startSpanFromMetadata(ctx, "authserver/RoleAdd") + defer sp.Finish() resp, err := as.authenticator.RoleAdd(ctx, r) if err != nil { return nil, togRPCError(err) @@ -76,6 +84,8 @@ func (as *AuthServer) RoleRevoke(ctx context.Context, r *pb.AuthRoleRevokeReques } func (as *AuthServer) RoleGrant(ctx context.Context, r *pb.AuthRoleGrantRequest) (*pb.AuthRoleGrantResponse, error) { + sp, ctx := startSpanFromMetadata(ctx, "authserver/RoleGrant") + defer sp.Finish() resp, err := as.authenticator.RoleGrant(ctx, r) if err != nil { return nil, togRPCError(err) @@ -84,6 +94,8 @@ func (as *AuthServer) RoleGrant(ctx context.Context, r *pb.AuthRoleGrantRequest) } func (as *AuthServer) UserAdd(ctx context.Context, r *pb.AuthUserAddRequest) (*pb.AuthUserAddResponse, error) { + sp, ctx := startSpanFromMetadata(ctx, "authserver/UserAdd") + defer sp.Finish() resp, err := as.authenticator.UserAdd(ctx, r) if err != nil { return nil, togRPCError(err) @@ -92,6 +104,8 @@ func (as *AuthServer) UserAdd(ctx context.Context, r *pb.AuthUserAddRequest) (*p } func (as *AuthServer) UserDelete(ctx context.Context, r *pb.AuthUserDeleteRequest) (*pb.AuthUserDeleteResponse, error) { + sp, ctx := startSpanFromMetadata(ctx, "authserver/UserDelete") + defer sp.Finish() resp, err := as.authenticator.UserDelete(ctx, r) if err != nil { return nil, togRPCError(err) @@ -105,6 +119,8 @@ func (as *AuthServer) UserGet(ctx context.Context, r *pb.AuthUserGetRequest) (*p } func (as *AuthServer) UserGrant(ctx context.Context, r *pb.AuthUserGrantRequest) (*pb.AuthUserGrantResponse, error) { + sp, ctx := startSpanFromMetadata(ctx, "authserver/UserGrant") + defer sp.Finish() resp, err := as.authenticator.UserGrant(ctx, r) if err != nil { return nil, togRPCError(err) @@ -118,6 +134,8 @@ func (as *AuthServer) UserRevoke(ctx context.Context, r *pb.AuthUserRevokeReques } func (as *AuthServer) UserChangePassword(ctx context.Context, r *pb.AuthUserChangePasswordRequest) (*pb.AuthUserChangePasswordResponse, error) { + sp, ctx := startSpanFromMetadata(ctx, "authserver/UserChangePassword") + defer sp.Finish() resp, err := as.authenticator.UserChangePassword(ctx, r) if err != nil { return nil, togRPCError(err) diff --git a/etcdserver/api/v3rpc/grpc.go b/etcdserver/api/v3rpc/grpc.go index 3ef980486aa4..a573301cc74b 100644 --- a/etcdserver/api/v3rpc/grpc.go +++ b/etcdserver/api/v3rpc/grpc.go @@ -19,6 +19,7 @@ import ( "github.com/coreos/etcd/etcdserver" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + "github.com/opentracing/opentracing-go" "google.golang.org/grpc" "google.golang.org/grpc/credentials" ) @@ -32,7 +33,7 @@ func Server(s *etcdserver.EtcdServer, tls *tls.Config) *grpc.Server { opts = append(opts, grpc.StreamInterceptor(newStreamInterceptor(s))) grpcServer := grpc.NewServer(opts...) - pb.RegisterKVServer(grpcServer, NewQuotaKVServer(s)) + pb.RegisterKVServer(grpcServer, NewTracedKVServer(opentracing.GlobalTracer(), NewQuotaKVServer(s))) pb.RegisterWatchServer(grpcServer, NewWatchServer(s)) pb.RegisterLeaseServer(grpcServer, NewQuotaLeaseServer(s)) pb.RegisterClusterServer(grpcServer, NewClusterServer(s)) diff --git a/etcdserver/api/v3rpc/member.go b/etcdserver/api/v3rpc/member.go index e5e76923680e..3947207dad19 100644 --- a/etcdserver/api/v3rpc/member.go +++ b/etcdserver/api/v3rpc/member.go @@ -23,6 +23,7 @@ import ( pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/etcdserver/membership" "github.com/coreos/etcd/pkg/types" + "github.com/opentracing/opentracing-go" "golang.org/x/net/context" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -43,6 +44,9 @@ func NewClusterServer(s *etcdserver.EtcdServer) *ClusterServer { } func (cs *ClusterServer) MemberAdd(ctx context.Context, r *pb.MemberAddRequest) (*pb.MemberAddResponse, error) { + sp, ctx := opentracing.StartSpanFromContext(ctx, "clusterserver/MemberAdd") + defer sp.Finish() + urls, err := types.NewURLs(r.PeerURLs) if err != nil { return nil, rpctypes.ErrGRPCMemberBadURLs @@ -67,6 +71,9 @@ func (cs *ClusterServer) MemberAdd(ctx context.Context, r *pb.MemberAddRequest) } func (cs *ClusterServer) MemberRemove(ctx context.Context, r *pb.MemberRemoveRequest) (*pb.MemberRemoveResponse, error) { + sp, ctx := opentracing.StartSpanFromContext(ctx, "clusterserver/MemberRemove") + defer sp.Finish() + err := cs.server.RemoveMember(ctx, r.ID) switch { case err == membership.ErrIDRemoved: @@ -81,6 +88,9 @@ func (cs *ClusterServer) MemberRemove(ctx context.Context, r *pb.MemberRemoveReq } func (cs *ClusterServer) MemberUpdate(ctx context.Context, r *pb.MemberUpdateRequest) (*pb.MemberUpdateResponse, error) { + sp, ctx := opentracing.StartSpanFromContext(ctx, "clusterserver/MemberUpdate") + defer sp.Finish() + m := membership.Member{ ID: types.ID(r.ID), RaftAttributes: membership.RaftAttributes{PeerURLs: r.PeerURLs}, @@ -99,6 +109,9 @@ func (cs *ClusterServer) MemberUpdate(ctx context.Context, r *pb.MemberUpdateReq } func (cs *ClusterServer) MemberList(ctx context.Context, r *pb.MemberListRequest) (*pb.MemberListResponse, error) { + sp, ctx := opentracing.StartSpanFromContext(ctx, "clusterserver/MemberList") + defer sp.Finish() + membs := cs.cluster.Members() protoMembs := make([]*pb.Member, len(membs)) diff --git a/etcdserver/api/v3rpc/tracing.go b/etcdserver/api/v3rpc/tracing.go new file mode 100644 index 000000000000..44d6f3c0f5f6 --- /dev/null +++ b/etcdserver/api/v3rpc/tracing.go @@ -0,0 +1,86 @@ +package v3rpc + +import ( + pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + "github.com/coreos/etcd/pkg/pbutil" + "github.com/opentracing/opentracing-go" + "golang.org/x/net/context" + "google.golang.org/grpc/metadata" +) + +type tracedKVServer struct { + pb.KVServer + tracer opentracing.Tracer +} + +func NewTracedKVServer(tracer opentracing.Tracer, kv pb.KVServer) pb.KVServer { + return &tracedKVServer{ + KVServer: kv, + tracer: tracer, + } +} + +func (t *tracedKVServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) { + sp, ctx := startSpanFromMetadata(ctx, "kvServer/v3/Range") + defer sp.Finish() + return t.KVServer.Range(ctx, r) +} + +func (t *tracedKVServer) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) { + sp, ctx := startSpanFromMetadata(ctx, "kvServer/v3/Put") + defer sp.Finish() + + return t.KVServer.Put(ctx, r) +} + +func (t *tracedKVServer) DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) { + sp, ctx := startSpanFromMetadata(ctx, "kvServer/v3/DeleteRange") + defer sp.Finish() + + return t.KVServer.DeleteRange(ctx, r) +} + +func (t *tracedKVServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) { + sp, ctx := startSpanFromMetadata(ctx, "kvServer/v3/DeleteRange") + defer sp.Finish() + + return t.KVServer.Txn(ctx, r) +} + +// Compact compacts the event history in the etcd key-value store. The key-value +// store should be periodically compacted or the event history will continue to grow +// indefinitely. +func (t *tracedKVServer) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error) { + sp, ctx := startSpanFromMetadata(ctx, "kvServer/v3/Compact") + defer sp.Finish() + + return t.KVServer.Compact(ctx, r) +} + +func (t *tracedKVServer) startSpanFromMetadata(ctx context.Context, operationName string) (opentracing.Span, context.Context) { + md, ok := metadata.FromContext(ctx) + if ok { + sp, err := t.tracer.Join(operationName, opentracing.TextMap, pbutil.MetadataReaderWriter{&md}) + if err != nil { + plog.Warningf("Couldn't join trace %v", err) + sp = t.tracer.StartSpan(operationName) + } + return sp, opentracing.ContextWithSpan(ctx, sp) + } + sp := t.tracer.StartSpan(operationName) + return sp, opentracing.ContextWithSpan(ctx, sp) +} + +func startSpanFromMetadata(ctx context.Context, operationName string) (opentracing.Span, context.Context) { + md, ok := metadata.FromContext(ctx) + if ok { + sp, err := opentracing.GlobalTracer().Join(operationName, opentracing.TextMap, pbutil.MetadataReaderWriter{&md}) + if err != nil { + plog.Warningf("Couldn't join trace %v", err) + sp = opentracing.StartSpan(operationName) + } + return sp, opentracing.ContextWithSpan(ctx, sp) + } + sp := opentracing.StartSpan(operationName) + return sp, opentracing.ContextWithSpan(ctx, sp) +} diff --git a/etcdserver/server.go b/etcdserver/server.go index 7dff816974a5..28ee48018859 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -23,6 +23,7 @@ import ( "os" "path" "regexp" + "strconv" "sync" "sync/atomic" "time" @@ -54,6 +55,7 @@ import ( "github.com/coreos/etcd/wal" "github.com/coreos/go-semver/semver" "github.com/coreos/pkg/capnslog" + opentracing "github.com/opentracing/opentracing-go" "golang.org/x/net/context" ) @@ -498,6 +500,9 @@ func (s *EtcdServer) RaftHandler() http.Handler { return s.r.transport.Handler() func (s *EtcdServer) Lessor() lease.Lessor { return s.lessor } func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error { + sp, ctx := opentracing.StartSpanFromContext(ctx, "etcdserver/Process") + defer sp.Finish() + if s.cluster.IsIDRemoved(types.ID(m.From)) { plog.Warningf("reject message from removed member %s", types.ID(m.From).String()) return httptypes.NewHTTPError(http.StatusForbidden, "cannot process message from removed member") @@ -765,6 +770,9 @@ func (s *EtcdServer) LeaderStats() []byte { func (s *EtcdServer) StoreStats() []byte { return s.store.JsonStats() } func (s *EtcdServer) AddMember(ctx context.Context, memb membership.Member) error { + sp, ctx := opentracing.StartSpanFromContext(ctx, "etcdserver/AddMember") + defer sp.Finish() + if s.Cfg.StrictReconfigCheck && !s.cluster.IsReadyToAddNewMember() { // If s.cfg.StrictReconfigCheck is false, it means the option --strict-reconfig-check isn't passed to etcd. // In such a case adding a new member is allowed unconditionally @@ -785,6 +793,9 @@ func (s *EtcdServer) AddMember(ctx context.Context, memb membership.Member) erro } func (s *EtcdServer) RemoveMember(ctx context.Context, id uint64) error { + sp, ctx := opentracing.StartSpanFromContext(ctx, "etcdserver/RemoveMember") + defer sp.Finish() + if s.Cfg.StrictReconfigCheck && !s.cluster.IsReadyToRemoveMember(id) { // If s.cfg.StrictReconfigCheck is false, it means the option --strict-reconfig-check isn't passed to etcd. // In such a case removing a member is allowed unconditionally @@ -799,6 +810,9 @@ func (s *EtcdServer) RemoveMember(ctx context.Context, id uint64) error { } func (s *EtcdServer) UpdateMember(ctx context.Context, memb membership.Member) error { + sp, ctx := opentracing.StartSpanFromContext(ctx, "etcdserver/UpdateMember") + defer sp.Finish() + b, err := json.Marshal(memb) if err != nil { return err @@ -830,6 +844,8 @@ func (s *EtcdServer) IsPprofEnabled() bool { return s.Cfg.EnablePprof } // then waits for it to be applied to the server. It // will block until the change is performed or there is an error. func (s *EtcdServer) configure(ctx context.Context, cc raftpb.ConfChange) error { + sp, ctx := opentracing.StartSpanFromContext(ctx, "etcdserver/configure") + defer sp.Finish() cc.ID = s.reqIDGen.Next() ch := s.w.Register(cc.ID) start := time.Now() @@ -859,16 +875,20 @@ func (s *EtcdServer) configure(ctx context.Context, cc raftpb.ConfChange) error // The request will be canceled after the given timeout. func (s *EtcdServer) sync(timeout time.Duration) { ctx, cancel := context.WithTimeout(context.Background(), timeout) + sp, ctx := opentracing.StartSpanFromContext(ctx, "etcdserver/Sync") + req := pb.Request{ Method: "SYNC", ID: s.reqIDGen.Next(), Time: time.Now().UnixNano(), } + sp.SetBaggageItem("request-id", strconv.FormatUint(req.ID, 10)) data := pbutil.MustMarshal(&req) // There is no promise that node has leader when do SYNC request, // so it uses goroutine to propose. go func() { s.r.Propose(ctx, data) + defer sp.Finish() cancel() }() } @@ -879,6 +899,9 @@ func (s *EtcdServer) sync(timeout time.Duration) { // The function keeps attempting to register until it succeeds, // or its server is stopped. func (s *EtcdServer) publish(timeout time.Duration) { + sp, ctx := opentracing.StartSpanFromContext(context.Background(), "etcdserver/publish") + defer sp.Finish() + b, err := json.Marshal(s.attributes) if err != nil { plog.Panicf("json marshal error: %v", err) @@ -891,7 +914,7 @@ func (s *EtcdServer) publish(timeout time.Duration) { } for { - ctx, cancel := context.WithTimeout(context.Background(), timeout) + ctx, cancel := context.WithTimeout(ctx, timeout) _, err := s.Do(ctx, req) cancel() switch err { @@ -989,6 +1012,7 @@ func (s *EtcdServer) apply(es []raftpb.Entry, confState *raftpb.ConfState) (uint pbutil.MustUnmarshal(&cc, e.Data) removedSelf, err := s.applyConfChange(cc, confState) shouldstop = shouldstop || removedSelf + // TODO(bg): how do we model this? s.w.Trigger(cc.ID, err) default: plog.Panicf("entry type should be either EntryNormal or EntryConfChange") @@ -1016,6 +1040,7 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) { if !pbutil.MaybeUnmarshal(&raftReq, e.Data) { // backward compatible var r pb.Request pbutil.MustUnmarshal(&r, e.Data) + // TODO(bg): how do we model triggers? s.w.Trigger(r.ID, s.applyV2Request(&r)) return } @@ -1223,6 +1248,10 @@ func (s *EtcdServer) updateClusterVersion(ver string) { Val: ver, } ctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout()) + + sp, ctx := opentracing.StartSpanFromContext(ctx, "etcdserver/updateClusterVersion") + defer sp.Finish() + _, err := s.Do(ctx, req) cancel() switch err { diff --git a/etcdserver/v2_server.go b/etcdserver/v2_server.go index a60119d79a5e..cc674ced8e55 100644 --- a/etcdserver/v2_server.go +++ b/etcdserver/v2_server.go @@ -18,6 +18,7 @@ import ( "time" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + "github.com/opentracing/opentracing-go" "golang.org/x/net/context" ) @@ -33,22 +34,32 @@ type v2API interface { type v2apiStore struct{ s *EtcdServer } func (a *v2apiStore) Post(ctx context.Context, r *pb.Request) (Response, error) { + sp, ctx := opentracing.StartSpanFromContext(ctx, "v2apiStore/Post") + defer sp.Finish() return a.processRaftRequest(ctx, r) } func (a *v2apiStore) Put(ctx context.Context, r *pb.Request) (Response, error) { + sp, ctx := opentracing.StartSpanFromContext(ctx, "v2apiStore/Put") + defer sp.Finish() return a.processRaftRequest(ctx, r) } func (a *v2apiStore) Delete(ctx context.Context, r *pb.Request) (Response, error) { + sp, ctx := opentracing.StartSpanFromContext(ctx, "v2apiStore/Delete") + defer sp.Finish() return a.processRaftRequest(ctx, r) } func (a *v2apiStore) QGet(ctx context.Context, r *pb.Request) (Response, error) { + sp, ctx := opentracing.StartSpanFromContext(ctx, "v2apiStore/QGet") + defer sp.Finish() return a.processRaftRequest(ctx, r) } func (a *v2apiStore) processRaftRequest(ctx context.Context, r *pb.Request) (Response, error) { + sp, ctx := opentracing.StartSpanFromContext(ctx, "v2apiStore/processRaftRequest") + defer sp.Finish() data, err := r.Marshal() if err != nil { return Response{}, err @@ -78,6 +89,8 @@ func (a *v2apiStore) processRaftRequest(ctx context.Context, r *pb.Request) (Res } func (a *v2apiStore) Get(ctx context.Context, r *pb.Request) (Response, error) { + sp, ctx := opentracing.StartSpanFromContext(ctx, "v2apiStore/Get") + defer sp.Finish() if r.Wait { wc, err := a.s.store.Watch(r.Path, r.Recursive, r.Stream, r.Since) if err != nil { @@ -93,6 +106,9 @@ func (a *v2apiStore) Get(ctx context.Context, r *pb.Request) (Response, error) { } func (a *v2apiStore) Head(ctx context.Context, r *pb.Request) (Response, error) { + sp, ctx := opentracing.StartSpanFromContext(ctx, "v2apiStore/Post") + defer sp.Finish() + ev, err := a.s.store.Get(r.Path, r.Recursive, r.Sorted) if err != nil { return Response{}, err @@ -106,6 +122,8 @@ func (a *v2apiStore) Head(ctx context.Context, r *pb.Request) (Response, error) // respective operation. Do will block until an action is performed or there is // an error. func (s *EtcdServer) Do(ctx context.Context, r pb.Request) (Response, error) { + sp, ctx := opentracing.StartSpanFromContext(ctx, "etcdserver/Do") + defer sp.Finish() r.ID = s.reqIDGen.Next() if r.Method == "GET" && r.Quorum { r.Method = "QGET" diff --git a/etcdserver/v3_server.go b/etcdserver/v3_server.go index 3b7bf1eb3dd8..a43aecbc83d0 100644 --- a/etcdserver/v3_server.go +++ b/etcdserver/v3_server.go @@ -21,6 +21,7 @@ import ( "github.com/coreos/etcd/lease" "github.com/coreos/etcd/lease/leasehttp" "github.com/coreos/etcd/mvcc" + "github.com/opentracing/opentracing-go" "golang.org/x/net/context" ) @@ -67,6 +68,8 @@ type Authenticator interface { } func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) { + sp, ctx := opentracing.StartSpanFromContext(ctx, "etcdserver/Range") + defer sp.Finish() if r.Serializable { return s.applyV3.Range(noTxn, r) } @@ -79,6 +82,9 @@ func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeRe } func (s *EtcdServer) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) { + sp, ctx := opentracing.StartSpanFromContext(ctx, "etcdserver/Put") + defer sp.Finish() + result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{Put: r}) if err != nil { return nil, err @@ -87,6 +93,9 @@ func (s *EtcdServer) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse } func (s *EtcdServer) DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) { + sp, ctx := opentracing.StartSpanFromContext(ctx, "etcdserver/DeleteRange") + defer sp.Finish() + result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{DeleteRange: r}) if err != nil { return nil, err @@ -95,6 +104,9 @@ func (s *EtcdServer) DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) } func (s *EtcdServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) { + sp, ctx := opentracing.StartSpanFromContext(ctx, "etcdserver/Txn") + defer sp.Finish() + if isTxnSerializable(r) { return s.applyV3.Txn(r) } @@ -121,6 +133,9 @@ func isTxnSerializable(r *pb.TxnRequest) bool { } func (s *EtcdServer) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error) { + sp, ctx := opentracing.StartSpanFromContext(ctx, "etcdserver/Compact") + defer sp.Finish() + result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{Compaction: r}) if r.Physical && result != nil && result.physc != nil { <-result.physc @@ -146,6 +161,8 @@ func (s *EtcdServer) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb. } func (s *EtcdServer) LeaseGrant(ctx context.Context, r *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) { + sp, ctx := opentracing.StartSpanFromContext(ctx, "etcdserver/LeaseGrant") + defer sp.Finish() // no id given? choose one for r.ID == int64(lease.NoLease) { // only use positive int64 id's @@ -159,6 +176,8 @@ func (s *EtcdServer) LeaseGrant(ctx context.Context, r *pb.LeaseGrantRequest) (* } func (s *EtcdServer) LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) { + sp, ctx := opentracing.StartSpanFromContext(ctx, "etcdserver/LeaseRevoke") + defer sp.Finish() result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{LeaseRevoke: r}) if err != nil { return nil, err @@ -202,6 +221,8 @@ func (s *EtcdServer) LeaseRenew(id lease.LeaseID) (int64, error) { } func (s *EtcdServer) Alarm(ctx context.Context, r *pb.AlarmRequest) (*pb.AlarmResponse, error) { + sp, ctx := opentracing.StartSpanFromContext(ctx, "etcdserver/Alarm") + defer sp.Finish() result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{Alarm: r}) if err != nil { return nil, err @@ -210,6 +231,8 @@ func (s *EtcdServer) Alarm(ctx context.Context, r *pb.AlarmRequest) (*pb.AlarmRe } func (s *EtcdServer) AuthEnable(ctx context.Context, r *pb.AuthEnableRequest) (*pb.AuthEnableResponse, error) { + sp, ctx := opentracing.StartSpanFromContext(ctx, "etcdserver/AuthEnable") + defer sp.Finish() result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{AuthEnable: r}) if err != nil { return nil, err @@ -218,6 +241,8 @@ func (s *EtcdServer) AuthEnable(ctx context.Context, r *pb.AuthEnableRequest) (* } func (s *EtcdServer) AuthDisable(ctx context.Context, r *pb.AuthDisableRequest) (*pb.AuthDisableResponse, error) { + sp, ctx := opentracing.StartSpanFromContext(ctx, "etcdserver/Authenticate") + defer sp.Finish() result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{AuthDisable: r}) if err != nil { return nil, err @@ -226,6 +251,8 @@ func (s *EtcdServer) AuthDisable(ctx context.Context, r *pb.AuthDisableRequest) } func (s *EtcdServer) Authenticate(ctx context.Context, r *pb.AuthenticateRequest) (*pb.AuthenticateResponse, error) { + sp, ctx := opentracing.StartSpanFromContext(ctx, "etcdserver/Authenticate") + defer sp.Finish() result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{Authenticate: r}) if err != nil { return nil, err @@ -234,6 +261,8 @@ func (s *EtcdServer) Authenticate(ctx context.Context, r *pb.AuthenticateRequest } func (s *EtcdServer) UserAdd(ctx context.Context, r *pb.AuthUserAddRequest) (*pb.AuthUserAddResponse, error) { + sp, ctx := opentracing.StartSpanFromContext(ctx, "etcdserver/UserAdd") + defer sp.Finish() result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{AuthUserAdd: r}) if err != nil { return nil, err @@ -242,6 +271,8 @@ func (s *EtcdServer) UserAdd(ctx context.Context, r *pb.AuthUserAddRequest) (*pb } func (s *EtcdServer) UserDelete(ctx context.Context, r *pb.AuthUserDeleteRequest) (*pb.AuthUserDeleteResponse, error) { + sp, ctx := opentracing.StartSpanFromContext(ctx, "etcdserver/UserDelete") + defer sp.Finish() result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{AuthUserDelete: r}) if err != nil { return nil, err @@ -250,6 +281,8 @@ func (s *EtcdServer) UserDelete(ctx context.Context, r *pb.AuthUserDeleteRequest } func (s *EtcdServer) UserChangePassword(ctx context.Context, r *pb.AuthUserChangePasswordRequest) (*pb.AuthUserChangePasswordResponse, error) { + sp, ctx := opentracing.StartSpanFromContext(ctx, "etcdserver/UserChangePassword") + defer sp.Finish() result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{AuthUserChangePassword: r}) if err != nil { return nil, err @@ -258,6 +291,8 @@ func (s *EtcdServer) UserChangePassword(ctx context.Context, r *pb.AuthUserChang } func (s *EtcdServer) UserGrant(ctx context.Context, r *pb.AuthUserGrantRequest) (*pb.AuthUserGrantResponse, error) { + sp, ctx := opentracing.StartSpanFromContext(ctx, "etcdserver/UserGrant") + defer sp.Finish() result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{AuthUserGrant: r}) if err != nil { return nil, err @@ -266,6 +301,8 @@ func (s *EtcdServer) UserGrant(ctx context.Context, r *pb.AuthUserGrantRequest) } func (s *EtcdServer) RoleAdd(ctx context.Context, r *pb.AuthRoleAddRequest) (*pb.AuthRoleAddResponse, error) { + sp, ctx := opentracing.StartSpanFromContext(ctx, "etcdserver/RoleAdd") + defer sp.Finish() result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{AuthRoleAdd: r}) if err != nil { return nil, err @@ -274,6 +311,9 @@ func (s *EtcdServer) RoleAdd(ctx context.Context, r *pb.AuthRoleAddRequest) (*pb } func (s *EtcdServer) RoleGrant(ctx context.Context, r *pb.AuthRoleGrantRequest) (*pb.AuthRoleGrantResponse, error) { + sp, ctx := opentracing.StartSpanFromContext(ctx, "etcdserver/RoleGrant") + defer sp.Finish() + result, err := s.processInternalRaftRequest(ctx, pb.InternalRaftRequest{AuthRoleGrant: r}) if err != nil { return nil, err @@ -282,6 +322,8 @@ func (s *EtcdServer) RoleGrant(ctx context.Context, r *pb.AuthRoleGrantRequest) } func (s *EtcdServer) processInternalRaftRequest(ctx context.Context, r pb.InternalRaftRequest) (*applyResult, error) { + sp, ctx := opentracing.StartSpanFromContext(ctx, "etcdserver/processInternalRaftRequest") + defer sp.Finish() r.ID = s.reqIDGen.Next() data, err := r.Marshal() diff --git a/pkg/httputil/httputil.go b/pkg/httputil/httputil.go index daf43bd8f6c9..d2170a2f6148 100644 --- a/pkg/httputil/httputil.go +++ b/pkg/httputil/httputil.go @@ -11,6 +11,8 @@ import ( "io" "io/ioutil" "net/http" + + opentracing "github.com/opentracing/opentracing-go" ) func RequestCanceler(rt http.RoundTripper, req *http.Request) func() { @@ -29,3 +31,12 @@ func GracefulClose(resp *http.Response) { io.Copy(ioutil.Discard, resp.Body) resp.Body.Close() } + +func JoinOrCreateSpanFromHeader(op string, h http.Header) opentracing.Span { + t := opentracing.GlobalTracer() + sp, err := t.Join(op, opentracing.TextMap, opentracing.HTTPHeaderTextMapCarrier(h)) + if err != nil { + return t.StartSpan(op) + } + return sp +} diff --git a/pkg/pbutil/pbutil.go b/pkg/pbutil/pbutil.go index 8f96b4d54905..54191061c65f 100644 --- a/pkg/pbutil/pbutil.go +++ b/pkg/pbutil/pbutil.go @@ -15,7 +15,15 @@ // Package pbutil defines interfaces for handling Protocol Buffer objects. package pbutil -import "github.com/coreos/pkg/capnslog" +import ( + "bytes" + + "google.golang.org/grpc/metadata" + + "github.com/coreos/etcd/raft/raftpb" + "github.com/coreos/pkg/capnslog" + "github.com/opentracing/opentracing-go" +) var ( plog = capnslog.NewPackageLogger("github.com/coreos/etcd/pkg", "flags") @@ -58,3 +66,43 @@ func GetBool(v *bool) (vv bool, set bool) { } func Boolp(b bool) *bool { return &b } + +func InjectSpanIntoMessage(sp opentracing.Span, msg *raftpb.Message) error { + buf := bytes.NewBuffer(nil) + err := sp.Tracer().Inject(sp, opentracing.Binary, buf) + if err != nil { + return err + } + msg.TraceContext = buf.Bytes() + return err +} + +func StartSpanFromMessage(opName string, msg raftpb.Message) opentracing.Span { + buf := bytes.NewReader(msg.TraceContext) + sp, err := opentracing.GlobalTracer().Join(opName, opentracing.Binary, buf) + if err != nil { + return opentracing.StartSpan(opName) + } + return sp +} + +// MetadataReaderWriter implements opentracing.TextMapReader +// and opentracing.TextMapWriter. Used for gRPC metadata. +type MetadataReaderWriter struct { + *metadata.MD +} + +func (w MetadataReaderWriter) Set(key, val string) { + (*w.MD)[key] = append((*w.MD)[key], val) +} + +func (w MetadataReaderWriter) ForeachKey(handler func(key, val string) error) error { + for k, vals := range *w.MD { + for _, v := range vals { + if err := handler(k, v); err != nil { + return err + } + } + } + return nil +} diff --git a/proxy/httpproxy/reverse.go b/proxy/httpproxy/reverse.go index 8a94a748aa3e..e9b35ca24e75 100644 --- a/proxy/httpproxy/reverse.go +++ b/proxy/httpproxy/reverse.go @@ -31,6 +31,7 @@ import ( "github.com/coreos/etcd/etcdserver/api/v2http/httptypes" "github.com/coreos/etcd/pkg/httputil" "github.com/coreos/pkg/capnslog" + opentracing "github.com/opentracing/opentracing-go" ) var ( @@ -63,6 +64,8 @@ type reverseProxy struct { } func (p *reverseProxy) ServeHTTP(rw http.ResponseWriter, clientreq *http.Request) { + sp := httputil.JoinOrCreateSpanFromHeader("reverse-proxy", clientreq.Header) + defer sp.Finish() reportIncomingRequest(clientreq) proxyreq := new(http.Request) *proxyreq = *clientreq @@ -89,6 +92,10 @@ func (p *reverseProxy) ServeHTTP(rw http.ResponseWriter, clientreq *http.Request proxyreq.Header = make(http.Header) copyHeader(proxyreq.Header, clientreq.Header) + err = sp.Tracer().Inject(sp, opentracing.TextMap, opentracing.HTTPHeaderTextMapCarrier(proxyreq.Header)) + if err != nil { + plog.Debugf("Error injecting header") + } normalizeRequest(proxyreq) removeSingleHopHeaders(&proxyreq.Header) maybeSetForwardedFor(proxyreq) diff --git a/raft/node.go b/raft/node.go index 0ddcfd5f6988..262e19513f72 100644 --- a/raft/node.go +++ b/raft/node.go @@ -16,8 +16,11 @@ package raft import ( "errors" + "fmt" + "github.com/coreos/etcd/pkg/pbutil" pb "github.com/coreos/etcd/raft/raftpb" + "github.com/opentracing/opentracing-go" "golang.org/x/net/context" ) @@ -384,13 +387,26 @@ func (n *node) Tick() { } } -func (n *node) Campaign(ctx context.Context) error { return n.step(ctx, pb.Message{Type: pb.MsgHup}) } +func (n *node) Campaign(ctx context.Context) error { + sp, ctx := opentracing.StartSpanFromContext(ctx, "raftnode/Campaign") + defer sp.Finish() + + return n.step(ctx, pb.Message{Type: pb.MsgHup}) +} func (n *node) Propose(ctx context.Context, data []byte) error { + sp, ctx := opentracing.StartSpanFromContext(ctx, "raftnode/Propose") + defer sp.Finish() + sp.SetTag("data", string(data)) + return n.step(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}}) } func (n *node) Step(ctx context.Context, m pb.Message) error { + sp, ctx := opentracing.StartSpanFromContext(ctx, "raftnode/Step") + defer sp.Finish() + + sp.SetTag("payload", fmt.Sprintf("%v", m)) // ignore unexpected local messages receiving over network if IsLocalMsg(m.Type) { // TODO: return an error? @@ -400,6 +416,9 @@ func (n *node) Step(ctx context.Context, m pb.Message) error { } func (n *node) ProposeConfChange(ctx context.Context, cc pb.ConfChange) error { + sp, ctx := opentracing.StartSpanFromContext(ctx, "raftnode/ProposeConfChange") + defer sp.Finish() + data, err := cc.Marshal() if err != nil { return err @@ -410,6 +429,11 @@ func (n *node) ProposeConfChange(ctx context.Context, cc pb.ConfChange) error { // Step advances the state machine using msgs. The ctx.Err() will be returned, // if any. func (n *node) step(ctx context.Context, m pb.Message) error { + // Only inject the span if it exists. + if sp := opentracing.SpanFromContext(ctx); sp != nil { + pbutil.InjectSpanIntoMessage(sp, &m) + } + ch := n.recvc if m.Type == pb.MsgProp { ch = n.propc @@ -454,8 +478,12 @@ func (n *node) Status() Status { } func (n *node) ReportUnreachable(id uint64) { + sp := opentracing.StartSpan("raftnode/ReportUnreachable") + defer sp.Finish() + msg := pb.Message{Type: pb.MsgUnreachable, From: id} + pbutil.InjectSpanIntoMessage(sp, &msg) select { - case n.recvc <- pb.Message{Type: pb.MsgUnreachable, From: id}: + case n.recvc <- msg: case <-n.done: } } diff --git a/raft/raft.go b/raft/raft.go index f8d727d6f8f3..07ea37e496b9 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -22,7 +22,11 @@ import ( "sort" "strings" + "golang.org/x/net/context" + + "github.com/coreos/etcd/pkg/pbutil" pb "github.com/coreos/etcd/raft/raftpb" + "github.com/opentracing/opentracing-go" ) // None is a placeholder node ID used when there is no leader. @@ -275,13 +279,17 @@ func (r *raft) send(m pb.Message) { } // sendAppend sends RPC, with entries to the given peer. -func (r *raft) sendAppend(to uint64) { +func (r *raft) sendAppend(ctx context.Context, to uint64) { + sp, ctx := opentracing.StartSpanFromContext(ctx, "raft/sendAppend") + defer sp.Finish() + pr := r.prs[to] if pr.isPaused() { return } m := pb.Message{} m.To = to + pbutil.InjectSpanIntoMessage(sp, &m) term, errt := r.raftLog.term(pr.Next - 1) ents, erre := r.raftLog.entries(pr.Next, r.maxMsgSize) @@ -341,23 +349,26 @@ func (r *raft) sendHeartbeat(to uint64) { // or it might not have all the committed entries. // The leader MUST NOT forward the follower's commit to // an unmatched index. + sp := opentracing.StartSpan("raft/sendHeartbeat") + defer sp.Finish() commit := min(r.prs[to].Match, r.raftLog.committed) m := pb.Message{ To: to, Type: pb.MsgHeartbeat, Commit: commit, } + pbutil.InjectSpanIntoMessage(sp, &m) r.send(m) } // bcastAppend sends RPC, with entries to all peers that are not up-to-date // according to the progress recorded in r.prs. -func (r *raft) bcastAppend() { +func (r *raft) bcastAppend(ctx context.Context) { for id := range r.prs { if id == r.id { continue } - r.sendAppend(id) + r.sendAppend(ctx, id) } } @@ -423,26 +434,37 @@ func (r *raft) appendEntry(es ...pb.Entry) { // tickElection is run by followers and candidates after r.electionTimeout. func (r *raft) tickElection() { + sp := opentracing.StartSpan("raft/tickElection") + defer sp.Finish() + if !r.promotable() { r.electionElapsed = 0 return } r.electionElapsed++ if r.pastElectionTimeout() { + r.electionElapsed = 0 - r.Step(pb.Message{From: r.id, Type: pb.MsgHup}) + msg := pb.Message{From: r.id, Type: pb.MsgHup} + pbutil.InjectSpanIntoMessage(sp, &msg) + r.Step(msg) } } // tickHeartbeat is run by leaders to send a MsgBeat after r.heartbeatTimeout. func (r *raft) tickHeartbeat() { + sp := opentracing.StartSpan("raft/tickHeartBeat") + defer sp.Finish() + r.heartbeatElapsed++ r.electionElapsed++ if r.electionElapsed >= r.electionTimeout { r.electionElapsed = 0 if r.checkQuorum { - r.Step(pb.Message{From: r.id, Type: pb.MsgCheckQuorum}) + msg := pb.Message{From: r.id, Type: pb.MsgCheckQuorum} + pbutil.InjectSpanIntoMessage(sp, &msg) + r.Step(msg) } // If current leader cannot transfer leadership in electionTimeout, it becomes leader again. if r.state == StateLeader && r.leadTransferee != None { @@ -456,7 +478,9 @@ func (r *raft) tickHeartbeat() { if r.heartbeatElapsed >= r.heartbeatTimeout { r.heartbeatElapsed = 0 - r.Step(pb.Message{From: r.id, Type: pb.MsgBeat}) + msg := pb.Message{From: r.id, Type: pb.MsgBeat} + pbutil.InjectSpanIntoMessage(sp, &msg) + r.Step(msg) } } @@ -482,7 +506,9 @@ func (r *raft) becomeCandidate() { r.logger.Infof("%x became candidate at term %d", r.id, r.Term) } -func (r *raft) becomeLeader() { +func (r *raft) becomeLeader(ctx context.Context) { + sp, ctx := opentracing.StartSpanFromContext(ctx, "raft/becomeLeader") + defer sp.Finish() // TODO(xiangli) remove the panic when the raft implementation is stable if r.state == StateFollower { panic("invalid transition [follower -> leader]") @@ -510,19 +536,27 @@ func (r *raft) becomeLeader() { r.logger.Infof("%x became leader at term %d", r.id, r.Term) } -func (r *raft) campaign() { +func (r *raft) campaign(ctx context.Context) { + sp, ctx := opentracing.StartSpanFromContext(ctx, "raft/raftCampaign") + defer sp.Finish() + r.becomeCandidate() if r.quorum() == r.poll(r.id, true) { - r.becomeLeader() + r.becomeLeader(ctx) + sp.LogEvent("Became leader") return } + for id := range r.prs { if id == r.id { continue } r.logger.Infof("%x [logterm: %d, index: %d] sent vote request to %x at term %d", r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), id, r.Term) - r.send(pb.Message{To: id, Type: pb.MsgVote, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm()}) + + msg := pb.Message{To: id, Type: pb.MsgVote, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm()} + pbutil.InjectSpanIntoMessage(sp, &msg) + r.send(msg) } } @@ -544,10 +578,13 @@ func (r *raft) poll(id uint64, v bool) (granted int) { } func (r *raft) Step(m pb.Message) error { + sp := pbutil.StartSpanFromMessage("raft/Step", m) + ctx := opentracing.BackgroundContextWithSpan(sp) + defer sp.Finish() if m.Type == pb.MsgHup { if r.state != StateLeader { r.logger.Infof("%x is starting a new election at term %d", r.id, r.Term) - r.campaign() + r.campaign(ctx) } else { r.logger.Debugf("%x ignoring MsgHup because already leader", r.id) } @@ -576,13 +613,16 @@ func (r *raft) Step(m pb.Message) error { r.id, r.Term, m.Type, m.From, m.Term) return nil } - r.step(r, m) + r.step(ctx, r, m) return nil } -type stepFunc func(r *raft, m pb.Message) +type stepFunc func(ctx context.Context, r *raft, m pb.Message) -func stepLeader(r *raft, m pb.Message) { +func stepLeader(ctx context.Context, r *raft, m pb.Message) { + sp, ctx := opentracing.StartSpanFromContext(ctx, "raft/stepLeader") + defer sp.Finish() + pbutil.InjectSpanIntoMessage(sp, &m) // These message types do not require any progress for m.From. switch m.Type { case pb.MsgBeat: @@ -618,12 +658,15 @@ func stepLeader(r *raft, m pb.Message) { } } r.appendEntry(m.Entries...) - r.bcastAppend() + r.bcastAppend(ctx) return case pb.MsgVote: r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] rejected vote from %x [logterm: %d, index: %d] at term %d", r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.From, m.LogTerm, m.Index, r.Term) - r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp, Reject: true}) + + msg := pb.Message{To: m.From, Type: pb.MsgVoteResp, Reject: true} + pbutil.InjectSpanIntoMessage(sp, &msg) + r.send(msg) return } @@ -645,7 +688,7 @@ func stepLeader(r *raft, m pb.Message) { if pr.State == ProgressStateReplicate { pr.becomeProbe() } - r.sendAppend(m.From) + r.sendAppend(ctx, m.From) } } else { oldPaused := pr.isPaused() @@ -661,11 +704,11 @@ func stepLeader(r *raft, m pb.Message) { } if r.maybeCommit() { - r.bcastAppend() + r.bcastAppend(ctx) } else if oldPaused { // update() reset the wait state on this node. If we had delayed sending // an update before, send it now. - r.sendAppend(m.From) + r.sendAppend(ctx, m.From) } // Transfer leadership is in progress. if m.From == r.leadTransferee && pr.Match == r.raftLog.lastIndex() { @@ -682,7 +725,7 @@ func stepLeader(r *raft, m pb.Message) { pr.ins.freeFirstOne() } if pr.Match < r.raftLog.lastIndex() { - r.sendAppend(m.From) + r.sendAppend(ctx, m.From) } case pb.MsgSnapStatus: if pr.State != ProgressStateSnapshot { @@ -732,36 +775,42 @@ func stepLeader(r *raft, m pb.Message) { r.sendTimeoutNow(leadTransferee) r.logger.Infof("%x sends MsgTimeoutNow to %x immediately as %x already has up-to-date log", r.id, leadTransferee, leadTransferee) } else { - r.sendAppend(leadTransferee) + r.sendAppend(ctx, leadTransferee) } } } -func stepCandidate(r *raft, m pb.Message) { +func stepCandidate(ctx context.Context, r *raft, m pb.Message) { + sp, ctx := opentracing.StartSpanFromContext(ctx, "raft/stepCandidate") + defer sp.Finish() + switch m.Type { case pb.MsgProp: r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term) return case pb.MsgApp: r.becomeFollower(r.Term, m.From) - r.handleAppendEntries(m) + r.handleAppendEntries(ctx, m) case pb.MsgHeartbeat: r.becomeFollower(r.Term, m.From) - r.handleHeartbeat(m) + r.handleHeartbeat(ctx, m) case pb.MsgSnap: r.becomeFollower(m.Term, m.From) - r.handleSnapshot(m) + r.handleSnapshot(ctx, m) case pb.MsgVote: r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] rejected vote from %x [logterm: %d, index: %d] at term %d", r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.From, m.LogTerm, m.Index, r.Term) - r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp, Reject: true}) + + msg := pb.Message{To: m.From, Type: pb.MsgVoteResp, Reject: true} + pbutil.InjectSpanIntoMessage(sp, &msg) + r.send(msg) case pb.MsgVoteResp: gr := r.poll(m.From, !m.Reject) r.logger.Infof("%x [quorum:%d] has received %d votes and %d vote rejections", r.id, r.quorum(), gr, len(r.votes)-gr) switch r.quorum() { case gr: - r.becomeLeader() - r.bcastAppend() + r.becomeLeader(ctx) + r.bcastAppend(ctx) case len(r.votes) - gr: r.becomeFollower(r.Term, None) } @@ -770,7 +819,11 @@ func stepCandidate(r *raft, m pb.Message) { } } -func stepFollower(r *raft, m pb.Message) { +func stepFollower(ctx context.Context, r *raft, m pb.Message) { + sp, ctx := opentracing.StartSpanFromContext(ctx, "raft/stepFollower") + pbutil.InjectSpanIntoMessage(sp, &m) + defer sp.Finish() + switch m.Type { case pb.MsgProp: if r.lead == None { @@ -782,62 +835,86 @@ func stepFollower(r *raft, m pb.Message) { case pb.MsgApp: r.electionElapsed = 0 r.lead = m.From - r.handleAppendEntries(m) + r.handleAppendEntries(ctx, m) case pb.MsgHeartbeat: r.electionElapsed = 0 r.lead = m.From - r.handleHeartbeat(m) + r.handleHeartbeat(ctx, m) case pb.MsgSnap: r.electionElapsed = 0 - r.handleSnapshot(m) + r.handleSnapshot(ctx, m) case pb.MsgVote: + sp.SetOperationName("MsgVote") if (r.Vote == None || r.Vote == m.From) && r.raftLog.isUpToDate(m.Index, m.LogTerm) { r.electionElapsed = 0 r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] voted for %x [logterm: %d, index: %d] at term %d", r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.From, m.LogTerm, m.Index, r.Term) r.Vote = m.From - r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp}) + msg := pb.Message{To: m.From, Type: pb.MsgVoteResp} + pbutil.InjectSpanIntoMessage(sp, &msg) + r.send(msg) } else { r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] rejected vote from %x [logterm: %d, index: %d] at term %d", r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.From, m.LogTerm, m.Index, r.Term) - r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp, Reject: true}) + + msg := pb.Message{To: m.From, Type: pb.MsgVoteResp, Reject: true} + pbutil.InjectSpanIntoMessage(sp, &msg) + r.send(msg) } case pb.MsgTimeoutNow: r.logger.Infof("%x [term %d] received MsgTimeoutNow from %x and starts an election to get leadership.", r.id, r.Term, m.From) - r.campaign() + r.campaign(ctx) } } -func (r *raft) handleAppendEntries(m pb.Message) { +func (r *raft) handleAppendEntries(ctx context.Context, m pb.Message) { + sp, ctx := opentracing.StartSpanFromContext(ctx, "raft/handleAppendEntries") + defer sp.Finish() if m.Index < r.raftLog.committed { - r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed}) + msg := pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed} + pbutil.InjectSpanIntoMessage(sp, &msg) + r.send(msg) return } if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok { - r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex}) + msg := pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex} + pbutil.InjectSpanIntoMessage(sp, &msg) + r.send(msg) } else { r.logger.Debugf("%x [logterm: %d, index: %d] rejected msgApp [logterm: %d, index: %d] from %x", r.id, r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(m.Index)), m.Index, m.LogTerm, m.Index, m.From) - r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: m.Index, Reject: true, RejectHint: r.raftLog.lastIndex()}) + msg := pb.Message{To: m.From, Type: pb.MsgAppResp, Index: m.Index, Reject: true, RejectHint: r.raftLog.lastIndex()} + pbutil.InjectSpanIntoMessage(sp, &msg) + r.send(msg) } } -func (r *raft) handleHeartbeat(m pb.Message) { +func (r *raft) handleHeartbeat(ctx context.Context, m pb.Message) { + sp, ctx := opentracing.StartSpanFromContext(ctx, "raft/handleHeartbeat") + defer sp.Finish() r.raftLog.commitTo(m.Commit) - r.send(pb.Message{To: m.From, Type: pb.MsgHeartbeatResp}) + msg := pb.Message{To: m.From, Type: pb.MsgHeartbeatResp} + pbutil.InjectSpanIntoMessage(sp, &msg) + r.send(msg) } -func (r *raft) handleSnapshot(m pb.Message) { +func (r *raft) handleSnapshot(ctx context.Context, m pb.Message) { + sp, ctx := opentracing.StartSpanFromContext(ctx, "raft/handleSnapshot") + defer sp.Finish() sindex, sterm := m.Snapshot.Metadata.Index, m.Snapshot.Metadata.Term if r.restore(m.Snapshot) { r.logger.Infof("%x [commit: %d] restored snapshot [index: %d, term: %d]", r.id, r.raftLog.committed, sindex, sterm) - r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.lastIndex()}) + msg := pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.lastIndex()} + pbutil.InjectSpanIntoMessage(sp, &msg) + r.send(msg) } else { r.logger.Infof("%x [commit: %d] ignored snapshot [index: %d, term: %d]", r.id, r.raftLog.committed, sindex, sterm) - r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed}) + msg := pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed} + pbutil.InjectSpanIntoMessage(sp, &msg) + r.send(msg) } } @@ -902,7 +979,7 @@ func (r *raft) removeNode(id uint64) { // The quorum size is now smaller, so see if any pending entries can // be committed. if r.maybeCommit() { - r.bcastAppend() + r.bcastAppend(context.TODO()) } // If the removed node is the leadTransferee, then abort the leadership transferring. if r.state == StateLeader && r.leadTransferee == id { diff --git a/raft/raftpb/raft.pb.go b/raft/raftpb/raft.pb.go index ca5d3f8dbbd0..48ce643c6f0f 100644 --- a/raft/raftpb/raft.pb.go +++ b/raft/raftpb/raft.pb.go @@ -230,6 +230,7 @@ type Message struct { Snapshot Snapshot `protobuf:"bytes,9,opt,name=snapshot" json:"snapshot"` Reject bool `protobuf:"varint,10,opt,name=reject" json:"reject"` RejectHint uint64 `protobuf:"varint,11,opt,name=rejectHint" json:"rejectHint"` + TraceContext []byte `protobuf:"bytes,12,opt,name=traceContext" json:"traceContext,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -458,6 +459,12 @@ func (m *Message) MarshalTo(data []byte) (int, error) { data[i] = 0x58 i++ i = encodeVarintRaft(data, i, uint64(m.RejectHint)) + if m.TraceContext != nil { + data[i] = 0x62 + i++ + i = encodeVarintRaft(data, i, uint64(len(m.TraceContext))) + i += copy(data[i:], m.TraceContext) + } if m.XXX_unrecognized != nil { i += copy(data[i:], m.XXX_unrecognized) } @@ -649,6 +656,10 @@ func (m *Message) Size() (n int) { n += 1 + l + sovRaft(uint64(l)) n += 2 n += 1 + sovRaft(uint64(m.RejectHint)) + if m.TraceContext != nil { + l = len(m.TraceContext) + n += 1 + l + sovRaft(uint64(l)) + } if m.XXX_unrecognized != nil { n += len(m.XXX_unrecognized) } @@ -1342,6 +1353,37 @@ func (m *Message) Unmarshal(data []byte) error { break } } + case 12: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field TraceContext", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRaft + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + byteLen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthRaft + } + postIndex := iNdEx + byteLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.TraceContext = append(m.TraceContext[:0], data[iNdEx:postIndex]...) + if m.TraceContext == nil { + m.TraceContext = []byte{} + } + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipRaft(data[iNdEx:]) diff --git a/raft/raftpb/raft.proto b/raft/raftpb/raft.proto index 42f10d269f09..3e80c3bdf51d 100644 --- a/raft/raftpb/raft.proto +++ b/raft/raftpb/raft.proto @@ -51,17 +51,18 @@ enum MessageType { } message Message { - optional MessageType type = 1 [(gogoproto.nullable) = false]; - optional uint64 to = 2 [(gogoproto.nullable) = false]; - optional uint64 from = 3 [(gogoproto.nullable) = false]; - optional uint64 term = 4 [(gogoproto.nullable) = false]; - optional uint64 logTerm = 5 [(gogoproto.nullable) = false]; - optional uint64 index = 6 [(gogoproto.nullable) = false]; - repeated Entry entries = 7 [(gogoproto.nullable) = false]; - optional uint64 commit = 8 [(gogoproto.nullable) = false]; - optional Snapshot snapshot = 9 [(gogoproto.nullable) = false]; - optional bool reject = 10 [(gogoproto.nullable) = false]; - optional uint64 rejectHint = 11 [(gogoproto.nullable) = false]; + optional MessageType type = 1 [(gogoproto.nullable) = false]; + optional uint64 to = 2 [(gogoproto.nullable) = false]; + optional uint64 from = 3 [(gogoproto.nullable) = false]; + optional uint64 term = 4 [(gogoproto.nullable) = false]; + optional uint64 logTerm = 5 [(gogoproto.nullable) = false]; + optional uint64 index = 6 [(gogoproto.nullable) = false]; + repeated Entry entries = 7 [(gogoproto.nullable) = false]; + optional uint64 commit = 8 [(gogoproto.nullable) = false]; + optional Snapshot snapshot = 9 [(gogoproto.nullable) = false]; + optional bool reject = 10 [(gogoproto.nullable) = false]; + optional uint64 rejectHint = 11 [(gogoproto.nullable) = false]; + optional bytes traceContext = 12; } message HardState { diff --git a/rafthttp/http.go b/rafthttp/http.go index 2829cb438785..1bde98a39b6f 100644 --- a/rafthttp/http.go +++ b/rafthttp/http.go @@ -22,12 +22,14 @@ import ( "path" "strings" + "github.com/coreos/etcd/pkg/httputil" pioutil "github.com/coreos/etcd/pkg/ioutil" + "github.com/coreos/etcd/pkg/pbutil" "github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/raft/raftpb" "github.com/coreos/etcd/snap" "github.com/coreos/etcd/version" - "golang.org/x/net/context" + opentracing "github.com/opentracing/opentracing-go" ) const ( @@ -116,7 +118,11 @@ func (h *pipelineHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { receivedBytes.WithLabelValues(types.ID(m.From).String()).Add(float64(len(b))) - if err := h.r.Process(context.TODO(), m); err != nil { + sp := pbutil.StartSpanFromMessage("ServeHTTP", m) + defer sp.Finish() + sp.SetTag("component", "pipelineHandler") + sp.SetBaggageItem("Cluster-ID", h.cid.String()) + if err := h.r.Process(opentracing.BackgroundContextWithSpan(sp), m); err != nil { switch v := err.(type) { case writerToResponse: v.WriteTo(w) @@ -187,6 +193,10 @@ func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } receivedBytes.WithLabelValues(types.ID(m.From).String()).Add(float64(m.Size())) + sp := pbutil.StartSpanFromMessage("ServeHTTP", m) + defer sp.Finish() + sp.SetTag("component", "snapshotHandler") + sp.SetBaggageItem("Cluster-ID", h.cid.String()) if m.Type != raftpb.MsgSnap { plog.Errorf("unexpected raft message type %s on snapshot path", m.Type) @@ -206,7 +216,7 @@ func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { plog.Infof("received and saved database snapshot [index: %d, from: %s] successfully", m.Snapshot.Metadata.Index, types.ID(m.From)) } - if err := h.r.Process(context.TODO(), m); err != nil { + if err := h.r.Process(opentracing.BackgroundContextWithSpan(sp), m); err != nil { switch v := err.(type) { // Process may return writerToResponse error when doing some // additional checks before calling raft.Node.Step. @@ -243,6 +253,9 @@ func newStreamHandler(tr *Transport, pg peerGetter, r Raft, id, cid types.ID) ht } func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + sp := httputil.JoinOrCreateSpanFromHeader("streamHandler/http", r.Header) + defer sp.Finish() + if r.Method != "GET" { w.Header().Set("Allow", "GET") http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed) @@ -251,6 +264,7 @@ func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { w.Header().Set("X-Server-Version", version.Version) w.Header().Set("X-Etcd-Cluster-ID", h.cid.String()) + sp.SetBaggageItem("Cluster-ID", h.cid.String()) if err := checkClusterCompatibilityFromHeader(r.Header, h.cid); err != nil { http.Error(w, err.Error(), http.StatusPreconditionFailed) diff --git a/rafthttp/msg_codec.go b/rafthttp/msg_codec.go index b6c97e362689..fc364a723bc1 100644 --- a/rafthttp/msg_codec.go +++ b/rafthttp/msg_codec.go @@ -15,11 +15,13 @@ package rafthttp import ( + "bytes" "encoding/binary" "io" "github.com/coreos/etcd/pkg/pbutil" "github.com/coreos/etcd/raft/raftpb" + "github.com/opentracing/opentracing-go" ) // messageEncoder is a encoder that can encode all kinds of messages. @@ -29,6 +31,16 @@ type messageEncoder struct { } func (enc *messageEncoder) encode(m raftpb.Message) error { + sp := pbutil.StartSpanFromMessage("msgAppEncoder/encode", m) + defer sp.Finish() + + // TODO(bg): Might not be necessary if we don't care about the transport layer. + traceBuf := bytes.NewBuffer(nil) + sp.Tracer().Inject(sp, opentracing.Binary, traceBuf) + if err := binary.Write(enc.w, binary.BigEndian, traceBuf.Bytes()); err != nil { + return err + } + if err := binary.Write(enc.w, binary.BigEndian, uint64(m.Size())); err != nil { return err } @@ -44,6 +56,13 @@ type messageDecoder struct { func (dec *messageDecoder) decode() (raftpb.Message, error) { var m raftpb.Message var l uint64 + + sp, err := opentracing.GlobalTracer().Join("msgAppDecoder/decode", opentracing.Binary, dec.r) + if err != nil { + sp = opentracing.StartSpan("msgAppDecoder/decode_root") + } + defer sp.Finish() + if err := binary.Read(dec.r, binary.BigEndian, &l); err != nil { return m, err } @@ -51,5 +70,10 @@ func (dec *messageDecoder) decode() (raftpb.Message, error) { if _, err := io.ReadFull(dec.r, buf); err != nil { return m, err } - return m, m.Unmarshal(buf) + // TODO(bg): ew + if err := m.Unmarshal(buf); err != nil { + return m, nil + } + pbutil.InjectSpanIntoMessage(sp, &m) + return m, nil } diff --git a/rafthttp/msgappv2_codec.go b/rafthttp/msgappv2_codec.go index e61bd98c245c..c751e5ebbc93 100644 --- a/rafthttp/msgappv2_codec.go +++ b/rafthttp/msgappv2_codec.go @@ -15,6 +15,7 @@ package rafthttp import ( + "bytes" "encoding/binary" "fmt" "io" @@ -24,6 +25,7 @@ import ( "github.com/coreos/etcd/pkg/pbutil" "github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/raft/raftpb" + opentracing "github.com/opentracing/opentracing-go" ) const ( @@ -70,6 +72,7 @@ type msgAppV2Encoder struct { buf []byte uint64buf []byte uint8buf []byte + tracebuf []byte } func newMsgAppV2Encoder(w io.Writer, fs *stats.FollowerStats) *msgAppV2Encoder { @@ -84,17 +87,33 @@ func newMsgAppV2Encoder(w io.Writer, fs *stats.FollowerStats) *msgAppV2Encoder { func (enc *msgAppV2Encoder) encode(m raftpb.Message) error { start := time.Now() + + // TODO(bg): This might not be necessary if we don't care about the encoding + // type. Instead, we can just use the embedded m.TraceContext. + sp := pbutil.StartSpanFromMessage("msgAppV2Encoder/encode", m) + defer sp.Finish() + + traceBuf := bytes.NewBuffer(nil) + sp.Tracer().Inject(sp, opentracing.Binary, traceBuf) + enc.tracebuf = traceBuf.Bytes() + switch { case isLinkHeartbeatMessage(m): enc.uint8buf[0] = byte(msgTypeLinkHeartbeat) if _, err := enc.w.Write(enc.uint8buf); err != nil { return err } + if _, err := enc.w.Write(enc.tracebuf); err != nil { + return err + } case enc.index == m.Index && enc.term == m.LogTerm && m.LogTerm == m.Term: enc.uint8buf[0] = byte(msgTypeAppEntries) if _, err := enc.w.Write(enc.uint8buf); err != nil { return err } + if _, err := enc.w.Write(enc.tracebuf); err != nil { + return err + } // write length of entries binary.BigEndian.PutUint64(enc.uint64buf, uint64(len(m.Entries))) if _, err := enc.w.Write(enc.uint64buf); err != nil { @@ -130,6 +149,9 @@ func (enc *msgAppV2Encoder) encode(m raftpb.Message) error { if err := binary.Write(enc.w, binary.BigEndian, msgTypeApp); err != nil { return err } + if _, err := enc.w.Write(enc.tracebuf); err != nil { + return err + } // write size of message if err := binary.Write(enc.w, binary.BigEndian, uint64(m.Size())); err != nil { return err @@ -179,6 +201,14 @@ func (dec *msgAppV2Decoder) decode() (raftpb.Message, error) { if _, err := io.ReadFull(dec.r, dec.uint8buf); err != nil { return m, err } + + sp, err := opentracing.GlobalTracer().Join("msgAppV2Decoder/decode", opentracing.Binary, dec.r) + if err != nil { + sp = opentracing.StartSpan("msgAppV2Decoder/decode_root") + } + + defer sp.Finish() + typ = uint8(dec.uint8buf[0]) switch typ { case msgTypeLinkHeartbeat: @@ -244,5 +274,6 @@ func (dec *msgAppV2Decoder) decode() (raftpb.Message, error) { default: return m, fmt.Errorf("failed to parse type %d in msgappv2 stream", typ) } + pbutil.InjectSpanIntoMessage(sp, &m) return m, nil } diff --git a/rafthttp/peer.go b/rafthttp/peer.go index 0ef79c69cb82..0f457420a1a3 100644 --- a/rafthttp/peer.go +++ b/rafthttp/peer.go @@ -15,6 +15,7 @@ package rafthttp import ( + "bytes" "sync" "time" @@ -23,6 +24,7 @@ import ( "github.com/coreos/etcd/raft" "github.com/coreos/etcd/raft/raftpb" "github.com/coreos/etcd/snap" + opentracing "github.com/opentracing/opentracing-go" "golang.org/x/net/context" ) @@ -141,9 +143,16 @@ func startPeer(transport *Transport, urls types.URLs, local, to, cid types.ID, r for { select { case mm := <-p.recvc: - if err := r.Process(ctx, mm); err != nil { + buf := bytes.NewReader(mm.TraceContext) + sp, err := opentracing.GlobalTracer().Join("peer/recvc", opentracing.Binary, buf) + if err != nil { + plog.Infof("Could not join trace on recvc: %v", err) + sp = opentracing.StartSpan("peer/recvc") + } + if err := r.Process(opentracing.ContextWithSpan(ctx, sp), mm); err != nil { plog.Warningf("failed to process raft message (%v)", err) } + sp.Finish() case <-p.stopc: return } @@ -157,9 +166,16 @@ func startPeer(transport *Transport, urls types.URLs, local, to, cid types.ID, r for { select { case mm := <-p.propc: - if err := r.Process(ctx, mm); err != nil { + buf := bytes.NewReader(mm.TraceContext) + sp, err := opentracing.GlobalTracer().Join("peer/propc", opentracing.Binary, buf) + if err != nil { + plog.Infof("Could not join trace on propc: %v", err) + sp = opentracing.StartSpan("peer/propc_root") + } + if err := r.Process(opentracing.ContextWithSpan(ctx, sp), mm); err != nil { plog.Warningf("failed to process raft message (%v)", err) } + sp.Finish() case <-p.stopc: return } diff --git a/rafthttp/stream.go b/rafthttp/stream.go index a5bb32e336ae..cb5fd1f3f822 100644 --- a/rafthttp/stream.go +++ b/rafthttp/stream.go @@ -31,6 +31,7 @@ import ( "github.com/coreos/etcd/raft/raftpb" "github.com/coreos/etcd/version" "github.com/coreos/go-semver/semver" + "github.com/opentracing/opentracing-go" ) const ( @@ -146,6 +147,8 @@ func (cw *streamWriter) run() { case <-heartbeatc: err := enc.encode(linkHeartbeatMessage) unflushed += linkHeartbeatMessage.Size() + // TODO(bg): Rather than constantly tracing encoding decoding, i should just trace the + // the process boundaries. if err == nil { flusher.Flush() batched = 0 @@ -380,6 +383,8 @@ func (cr *streamReader) stop() { } func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) { + sp := opentracing.StartSpan("streamReader/dial") + defer sp.Finish() u := cr.picker.pick() uu := u uu.Path = path.Join(t.endpoint(), cr.local.String()) @@ -394,6 +399,7 @@ func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) { req.Header.Set("X-Min-Cluster-Version", version.MinClusterVersion) req.Header.Set("X-Etcd-Cluster-ID", cr.cid.String()) req.Header.Set("X-Raft-To", cr.remote.String()) + sp.Tracer().Inject(sp, opentracing.TextMap, opentracing.HTTPHeaderTextMapCarrier(req.Header)) setPeerURLsHeader(req, cr.tr.URLs)