From 87d102d2f2330503c3f3c450eebef7f40408e0b1 Mon Sep 17 00:00:00 2001 From: irfan sharif Date: Fri, 5 Feb 2021 17:28:58 -0500 Subject: [PATCH 1/2] tracing: s/ForkCtxSpan/ForkSpan, s/LogStructured/RecordStructured And improve some documentation while here. Release note: None --- pkg/jobs/registry.go | 4 +- pkg/kv/kvclient/rangecache/range_cache.go | 2 +- .../kvserver/concurrency/lock_table_waiter.go | 4 +- .../kvserver/replica_application_decoder.go | 4 +- pkg/kv/kvserver/replica_raft.go | 4 +- pkg/sql/flowinfra/flow_registry.go | 2 +- pkg/sql/stats/stats_cache.go | 2 +- pkg/util/stop/stopper.go | 4 +- pkg/util/tracing/crdbspan.go | 2 +- pkg/util/tracing/doc.go | 2 +- pkg/util/tracing/span.go | 35 +++++++----------- pkg/util/tracing/span_test.go | 6 +-- pkg/util/tracing/tracer.go | 37 +++++++++++-------- pkg/util/tracing/tracingpb/recorded_span.go | 2 +- 14 files changed, 54 insertions(+), 56 deletions(-) diff --git a/pkg/jobs/registry.go b/pkg/jobs/registry.go index fbf34efa3b78..78663dc484ac 100644 --- a/pkg/jobs/registry.go +++ b/pkg/jobs/registry.go @@ -517,9 +517,9 @@ func (r *Registry) CreateStartableJobWithTxn( // Construct a context which contains a tracing span that follows from the // span in the parent context. We don't directly use the parent span because // we want independent lifetimes and cancellation. For the same reason, we - // don't use the Context returned by ForkCtxSpan. + // don't use the Context returned by ForkSpan. resumerCtx, cancel := r.makeCtx() - _, span := tracing.ForkCtxSpan(ctx, "job") + _, span := tracing.ForkSpan(ctx, "job") if span != nil { resumerCtx = tracing.ContextWithSpan(resumerCtx, span) } diff --git a/pkg/kv/kvclient/rangecache/range_cache.go b/pkg/kv/kvclient/rangecache/range_cache.go index ecc390d4dc12..30272a00edb8 100644 --- a/pkg/kv/kvclient/rangecache/range_cache.go +++ b/pkg/kv/kvclient/rangecache/range_cache.go @@ -632,7 +632,7 @@ func (rc *RangeCache) tryLookup( resC, leader := rc.lookupRequests.DoChan(requestKey, func() (interface{}, error) { var lookupRes EvictionToken if err := rc.stopper.RunTaskWithErr(ctx, "rangecache: range lookup", func(ctx context.Context) error { - ctx, reqSpan := tracing.ForkCtxSpan(ctx, "range lookup") + ctx, reqSpan := tracing.ForkSpan(ctx, "range lookup") defer reqSpan.Finish() // Clear the context's cancelation. This request services potentially many // callers waiting for its result, and using the flight's leader's diff --git a/pkg/kv/kvserver/concurrency/lock_table_waiter.go b/pkg/kv/kvserver/concurrency/lock_table_waiter.go index 16e37d6d1a1f..d51929e5bed5 100644 --- a/pkg/kv/kvserver/concurrency/lock_table_waiter.go +++ b/pkg/kv/kvserver/concurrency/lock_table_waiter.go @@ -713,11 +713,11 @@ func (h *contentionEventHelper) emit() { } h.ev.Duration = timeutil.Since(h.tBegin) if h.onEvent != nil { - // NB: this is intentionally above the call to LogStructured so that + // NB: this is intentionally above the call to RecordStructured so that // this interceptor gets to mutate the event (used for test determinism). h.onEvent(h.ev) } - h.sp.LogStructured(h.ev) + h.sp.RecordStructured(h.ev) h.ev = nil } diff --git a/pkg/kv/kvserver/replica_application_decoder.go b/pkg/kv/kvserver/replica_application_decoder.go index c8d299691b07..9dbcf042226f 100644 --- a/pkg/kv/kvserver/replica_application_decoder.go +++ b/pkg/kv/kvserver/replica_application_decoder.go @@ -140,7 +140,7 @@ func (d *replicaDecoder) createTracingSpans(ctx context.Context) { for it.init(&d.cmdBuf); it.Valid(); it.Next() { cmd := it.cur() if cmd.IsLocal() { - cmd.ctx, cmd.sp = tracing.ForkCtxSpan(cmd.proposal.ctx, opName) + cmd.ctx, cmd.sp = tracing.ForkSpan(cmd.proposal.ctx, opName) } else if cmd.raftCmd.TraceData != nil { // The proposal isn't local, and trace data is available. Extract // the remote span and start a server-side span that follows from it. @@ -159,7 +159,7 @@ func (d *replicaDecoder) createTracingSpans(ctx context.Context) { ) } } else { - cmd.ctx, cmd.sp = tracing.ForkCtxSpan(ctx, opName) + cmd.ctx, cmd.sp = tracing.ForkSpan(ctx, opName) } } } diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index bfa8ea29ae1d..c4e52a2c6809 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -124,7 +124,7 @@ func (r *Replica) evalAndPropose( // Fork the proposal's context span so that the proposal's context // can outlive the original proposer's context. - proposal.ctx, proposal.sp = tracing.ForkCtxSpan(ctx, "async consensus") + proposal.ctx, proposal.sp = tracing.ForkSpan(ctx, "async consensus") // Signal the proposal's response channel immediately. reply := *proposal.Local.Reply @@ -205,7 +205,7 @@ func (r *Replica) evalAndPropose( defer r.raftMu.Unlock() r.mu.Lock() defer r.mu.Unlock() - // TODO(radu): Should this context be created via tracer.ForkCtxSpan? + // TODO(radu): Should this context be created via tracer.ForkSpan? // We'd need to make sure the span is finished eventually. proposal.ctx = r.AnnotateCtx(context.TODO()) } diff --git a/pkg/sql/flowinfra/flow_registry.go b/pkg/sql/flowinfra/flow_registry.go index 9c1a3f50a089..5096dda017ea 100644 --- a/pkg/sql/flowinfra/flow_registry.go +++ b/pkg/sql/flowinfra/flow_registry.go @@ -245,7 +245,7 @@ func (fr *FlowRegistry) RegisterFlow( fr.Unlock() if len(timedOutReceivers) != 0 { // The span in the context might be finished by the time this runs. In - // principle, we could ForkCtxSpan() beforehand, but we don't want to + // principle, we could ForkSpan() beforehand, but we don't want to // create the extra span every time. timeoutCtx := tracing.ContextWithSpan(ctx, nil) log.Errorf( diff --git a/pkg/sql/stats/stats_cache.go b/pkg/sql/stats/stats_cache.go index 1c00447e3073..f23d5bb304dd 100644 --- a/pkg/sql/stats/stats_cache.go +++ b/pkg/sql/stats/stats_cache.go @@ -315,7 +315,7 @@ func (sc *TableStatisticsCache) refreshCacheEntry(ctx context.Context, tableID d // by fetching the new stats from the database. func (sc *TableStatisticsCache) RefreshTableStats(ctx context.Context, tableID descpb.ID) { log.VEventf(ctx, 1, "refreshing statistics for table %d", tableID) - ctx, span := tracing.ForkCtxSpan(ctx, "refresh-table-stats") + ctx, span := tracing.ForkSpan(ctx, "refresh-table-stats") // Perform an asynchronous refresh of the cache. go func() { defer span.Finish() diff --git a/pkg/util/stop/stopper.go b/pkg/util/stop/stopper.go index ecdfefa83c13..b09ba5d3a901 100644 --- a/pkg/util/stop/stopper.go +++ b/pkg/util/stop/stopper.go @@ -341,7 +341,7 @@ func (s *Stopper) RunAsyncTask( return ErrUnavailable } - ctx, span := tracing.ForkCtxSpan(ctx, taskName) + ctx, span := tracing.ForkSpan(ctx, taskName) // Call f. go func() { @@ -400,7 +400,7 @@ func (s *Stopper) RunLimitedAsyncTask( return ErrUnavailable } - ctx, span := tracing.ForkCtxSpan(ctx, taskName) + ctx, span := tracing.ForkSpan(ctx, taskName) go func() { defer s.Recover(ctx) diff --git a/pkg/util/tracing/crdbspan.go b/pkg/util/tracing/crdbspan.go index c7abdd8a0d3d..674b32308a2f 100644 --- a/pkg/util/tracing/crdbspan.go +++ b/pkg/util/tracing/crdbspan.go @@ -206,7 +206,7 @@ func (s *crdbSpan) record(msg string) { } } -func (s *crdbSpan) logStructured(item Structured) { +func (s *crdbSpan) recordStructured(item Structured) { s.mu.Lock() defer s.mu.Unlock() s.mu.structured = append(s.mu.structured, item) diff --git a/pkg/util/tracing/doc.go b/pkg/util/tracing/doc.go index 99f8c24919ad..b4deacfda26b 100644 --- a/pkg/util/tracing/doc.go +++ b/pkg/util/tracing/doc.go @@ -90,7 +90,7 @@ // [2]: https://opentracing.io/specification/ // [3]: `Recording.String` // [4]: `ChildSpan` -// [5]: `ForkCtxSpan`. "forking" a Span is the same as creating a new one +// [5]: `ForkSpan`. "forking" a Span is the same as creating a new one // with a "follows from" relation. // [6]: `crdbSpan` // [7]: `Span.SetVerbose`. To understand the specifics of what exactly is diff --git a/pkg/util/tracing/span.go b/pkg/util/tracing/span.go index 6c3e399a20df..bfbb4f99a64c 100644 --- a/pkg/util/tracing/span.go +++ b/pkg/util/tracing/span.go @@ -193,14 +193,14 @@ type SpanStats interface { // SetSpanStats sets the stats on a Span. stats.Stats() will also be added to // the Span tags. // -// This is deprecated. Use LogStructured instead. +// This is deprecated. Use RecordStructured instead. // // TODO(tbg): remove this in the 21.2 cycle. func (s *Span) SetSpanStats(stats SpanStats) { if s.isNoop() { return } - s.LogStructured(stats) + s.RecordStructured(stats) s.crdb.mu.Lock() s.crdb.mu.stats = stats for name, value := range stats.StatsTags() { @@ -320,25 +320,25 @@ func (s *Span) setTagInner(key string, value interface{}, locked bool) *Span { } // Structured is an opaque protobuf that can be attached to a trace via -// `Span.LogStructured`. This is the only kind of data a Span carries when +// `Span.RecordStructured`. This is the only kind of data a Span carries when // `trace.mode = background`. type Structured interface { protoutil.Message } -// LogStructured adds a Structured payload to the Span. It will be added to the -// recording even if the Span is not verbose; however it will be discarded if -// the underlying Span has been optimized out (i.e. is a noop span). +// RecordStructured adds a Structured payload to the Span. It will be added to +// the recording even if the Span is not verbose; however it will be discarded +// if the underlying Span has been optimized out (i.e. is a noop span). // -// The caller must not mutate the item once LogStructured has been called. -func (s *Span) LogStructured(item Structured) { +// The caller must not mutate the item once RecordStructured has been called. +func (s *Span) RecordStructured(item Structured) { if s.isNoop() { return } - s.crdb.logStructured(item) + s.crdb.recordStructured(item) if s.hasVerboseSink() { - // NB: TrimSpace avoids the trailing whitespace - // generated by the protobuf stringers. + // NB: TrimSpace avoids the trailing whitespace generated by the + // protobuf stringers. s.Record(strings.TrimSpace(item.String())) } } @@ -346,19 +346,10 @@ func (s *Span) LogStructured(item Structured) { // Record provides a way to record free-form text into verbose spans. // // TODO(irfansharif): We don't currently have redactability with trace -// recordings (both here, and using LogStructured above). We'll want to do this +// recordings (both here, and using RecordStructured above). We'll want to do this // soon. func (s *Span) Record(msg string) { - if !s.hasVerboseSink() { - return - } - if s.ot.shadowSpan != nil { - s.ot.shadowSpan.LogFields(otlog.String(tracingpb.LogMessageField, msg)) - } - if s.netTr != nil { - s.netTr.LazyPrintf("%s", msg) - } - s.crdb.record(msg) + s.Recordf("%s", msg) } // Recordf is like Record, but accepts a format specifier. diff --git a/pkg/util/tracing/span_test.go b/pkg/util/tracing/span_test.go index 8ab66d94e847..8d49e51f7ba5 100644 --- a/pkg/util/tracing/span_test.go +++ b/pkg/util/tracing/span_test.go @@ -182,13 +182,13 @@ Span grandchild: require.Equal(t, exp, recToStrippedString(childRec)) } -func TestSpan_LogStructured(t *testing.T) { +func TestSpanRecordStructured(t *testing.T) { tr := NewTracer() tr._mode = int32(modeBackground) sp := tr.StartSpan("root", WithForceRealSpan()) defer sp.Finish() - sp.LogStructured(&types.Int32Value{Value: 4}) + sp.RecordStructured(&types.Int32Value{Value: 4}) rec := sp.GetRecording() require.Len(t, rec, 1) require.Len(t, rec[0].InternalStructured, 1) @@ -207,7 +207,7 @@ func TestNonVerboseChildSpanRegisteredWithParent(t *testing.T) { defer ch.Finish() require.Len(t, sp.crdb.mu.recording.children, 1) require.Equal(t, ch.crdb, sp.crdb.mu.recording.children[0]) - ch.LogStructured(&types.Int32Value{Value: 5}) + ch.RecordStructured(&types.Int32Value{Value: 5}) // Check that the child span (incl its payload) is in the recording. rec := sp.GetRecording() require.Len(t, rec, 2) diff --git a/pkg/util/tracing/tracer.go b/pkg/util/tracing/tracer.go index 8cd6ce567d31..50e131fcd6ec 100644 --- a/pkg/util/tracing/tracer.go +++ b/pkg/util/tracing/tracer.go @@ -629,15 +629,20 @@ func (t *Tracer) VisitSpans(visitor func(*Span) error) error { return nil } -// ForkCtxSpan checks if ctx has a Span open; if it does, it creates a new Span -// that "follows from" the original Span. This allows the resulting context to be -// used in an async task that might outlive the original operation. +// ForkSpan forks the current span, if any[1]. Forked spans "follow from" the +// original, and are typically used to trace operations that may outlive the +// parent (think async tasks). See the package-level documentation for more +// details. // -// Returns the new context and the new Span (if any). The Span should be -// closed via FinishSpan. +// The recordings from these spans will be automatically propagated to the +// parent span. Also see `ChildSpan`, for the other kind of derived span +// relation. // -// See also ChildSpan() for a "parent-child relationship". -func ForkCtxSpan(ctx context.Context, opName string) (context.Context, *Span) { +// A context wrapping the newly created span is returned, along with the span +// itself. If non-nil, the caller is responsible for eventually Finish()ing it. +// +// [1]: Looking towards the provided context to see if one exists. +func ForkSpan(ctx context.Context, opName string) (context.Context, *Span) { sp := SpanFromContext(ctx) if sp == nil { return ctx, nil @@ -647,12 +652,13 @@ func ForkCtxSpan(ctx context.Context, opName string) (context.Context, *Span) { ) } -// ChildSpan opens a Span as a child of the current Span in the context (if -// there is one), via the WithParentAndAutoCollection option. -// The Span's tags are inherited from the ctx's log tags automatically. +// ChildSpan creates a child span of the current one, if any. Recordings from +// child spans are automatically propagated to the parent span, and the tags are +// inherited from the context's log tags automatically. Also see `ForkSpan`, +// for the other kind of derived span relation. // -// Returns the new context and the new Span (if any). If a non-nil Span is -// returned, it is the caller's duty to eventually call Finish() on it. +// A context wrapping the newly created span is returned, along with the span +// itself. If non-nil, the caller is responsible for eventually Finish()ing it. func ChildSpan(ctx context.Context, opName string) (context.Context, *Span) { sp := SpanFromContext(ctx) if sp == nil { @@ -661,9 +667,10 @@ func ChildSpan(ctx context.Context, opName string) (context.Context, *Span) { return sp.Tracer().StartSpanCtx(ctx, opName, WithParentAndAutoCollection(sp)) } -// ChildSpanRemote is like ChildSpan but the new Span is created using WithParentAndManualCollection -// instead of WithParentAndAutoCollection. When this is used, it's the caller's duty to collect this span's -// recording and return it to the root span of the trace. +// ChildSpanRemote is like ChildSpan but the new Span is created using +// WithParentAndManualCollection instead of WithParentAndAutoCollection. When +// this is used, it's the caller's duty to collect this span's recording and +// return it to the root span of the trace. func ChildSpanRemote(ctx context.Context, opName string) (context.Context, *Span) { sp := SpanFromContext(ctx) if sp == nil { diff --git a/pkg/util/tracing/tracingpb/recorded_span.go b/pkg/util/tracing/tracingpb/recorded_span.go index 32640cbb38c8..04db86974e6d 100644 --- a/pkg/util/tracing/tracingpb/recorded_span.go +++ b/pkg/util/tracing/tracingpb/recorded_span.go @@ -31,7 +31,7 @@ func (s *RecordedSpan) String() string { return sb.String() } -// Structured visits the data passed to LogStructured for the Span from which +// Structured visits the data passed to RecordStructured for the Span from which // the RecordedSpan was created. func (s *RecordedSpan) Structured(visit func(*types.Any)) { if s.DeprecatedStats != nil { From 9687953962d421e911b3571f18ad3f57f83098b2 Mon Sep 17 00:00:00 2001 From: irfan sharif Date: Fri, 5 Feb 2021 17:34:07 -0500 Subject: [PATCH 2/2] tracing: improve interfaces around process boundaries We can rid ourselves of the opentracing cruft just a bit more, and add some type-safety while doing so. While here, we'll also add some text explaining how tracing data gets propagated across process boundaries. Release note: None --- pkg/kv/kvserver/BUILD.bazel | 1 - pkg/kv/kvserver/kvserverpb/proposer_kv.pb.go | 35 ++--- pkg/kv/kvserver/kvserverpb/proposer_kv.proto | 7 +- .../kvserver/replica_application_decoder.go | 8 +- pkg/kv/kvserver/replica_proposal.go | 14 +- pkg/util/tracing/BUILD.bazel | 1 + pkg/util/tracing/doc.go | 2 +- pkg/util/tracing/grpc_interceptor.go | 47 +++--- pkg/util/tracing/span_test.go | 17 ++- pkg/util/tracing/tracer.go | 141 +++++++++++++----- pkg/util/tracing/tracer_test.go | 34 ++--- 11 files changed, 188 insertions(+), 119 deletions(-) diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index a50dbc205222..c59b7b5af089 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -183,7 +183,6 @@ go_library( "@com_github_cockroachdb_redact//:redact", "@com_github_google_btree//:btree", "@com_github_kr_pretty//:pretty", - "@com_github_opentracing_opentracing_go//:opentracing-go", "@io_etcd_go_etcd_raft_v3//:raft", "@io_etcd_go_etcd_raft_v3//raftpb", "@io_etcd_go_etcd_raft_v3//tracker", diff --git a/pkg/kv/kvserver/kvserverpb/proposer_kv.pb.go b/pkg/kv/kvserver/kvserverpb/proposer_kv.pb.go index 270c5b2de75c..8b43941018d9 100644 --- a/pkg/kv/kvserver/kvserverpb/proposer_kv.pb.go +++ b/pkg/kv/kvserver/kvserverpb/proposer_kv.pb.go @@ -48,7 +48,7 @@ func (m *Split) Reset() { *m = Split{} } func (m *Split) String() string { return proto.CompactTextString(m) } func (*Split) ProtoMessage() {} func (*Split) Descriptor() ([]byte, []int) { - return fileDescriptor_proposer_kv_367b1f11f61ba339, []int{0} + return fileDescriptor_proposer_kv_0c8837b323bf7b92, []int{0} } func (m *Split) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -83,7 +83,7 @@ func (m *Merge) Reset() { *m = Merge{} } func (m *Merge) String() string { return proto.CompactTextString(m) } func (*Merge) ProtoMessage() {} func (*Merge) Descriptor() ([]byte, []int) { - return fileDescriptor_proposer_kv_367b1f11f61ba339, []int{1} + return fileDescriptor_proposer_kv_0c8837b323bf7b92, []int{1} } func (m *Merge) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -117,7 +117,7 @@ type ChangeReplicas struct { func (m *ChangeReplicas) Reset() { *m = ChangeReplicas{} } func (*ChangeReplicas) ProtoMessage() {} func (*ChangeReplicas) Descriptor() ([]byte, []int) { - return fileDescriptor_proposer_kv_367b1f11f61ba339, []int{2} + return fileDescriptor_proposer_kv_0c8837b323bf7b92, []int{2} } func (m *ChangeReplicas) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -169,7 +169,7 @@ func (m *ComputeChecksum) Reset() { *m = ComputeChecksum{} } func (m *ComputeChecksum) String() string { return proto.CompactTextString(m) } func (*ComputeChecksum) ProtoMessage() {} func (*ComputeChecksum) Descriptor() ([]byte, []int) { - return fileDescriptor_proposer_kv_367b1f11f61ba339, []int{3} + return fileDescriptor_proposer_kv_0c8837b323bf7b92, []int{3} } func (m *ComputeChecksum) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -206,7 +206,7 @@ func (m *Compaction) Reset() { *m = Compaction{} } func (m *Compaction) String() string { return proto.CompactTextString(m) } func (*Compaction) ProtoMessage() {} func (*Compaction) Descriptor() ([]byte, []int) { - return fileDescriptor_proposer_kv_367b1f11f61ba339, []int{4} + return fileDescriptor_proposer_kv_0c8837b323bf7b92, []int{4} } func (m *Compaction) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -243,7 +243,7 @@ func (m *SuggestedCompaction) Reset() { *m = SuggestedCompaction{} } func (m *SuggestedCompaction) String() string { return proto.CompactTextString(m) } func (*SuggestedCompaction) ProtoMessage() {} func (*SuggestedCompaction) Descriptor() ([]byte, []int) { - return fileDescriptor_proposer_kv_367b1f11f61ba339, []int{5} + return fileDescriptor_proposer_kv_0c8837b323bf7b92, []int{5} } func (m *SuggestedCompaction) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -305,7 +305,7 @@ func (m *ReplicatedEvalResult) Reset() { *m = ReplicatedEvalResult{} } func (m *ReplicatedEvalResult) String() string { return proto.CompactTextString(m) } func (*ReplicatedEvalResult) ProtoMessage() {} func (*ReplicatedEvalResult) Descriptor() ([]byte, []int) { - return fileDescriptor_proposer_kv_367b1f11f61ba339, []int{6} + return fileDescriptor_proposer_kv_0c8837b323bf7b92, []int{6} } func (m *ReplicatedEvalResult) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -349,7 +349,7 @@ func (m *ReplicatedEvalResult_AddSSTable) Reset() { *m = ReplicatedEvalR func (m *ReplicatedEvalResult_AddSSTable) String() string { return proto.CompactTextString(m) } func (*ReplicatedEvalResult_AddSSTable) ProtoMessage() {} func (*ReplicatedEvalResult_AddSSTable) Descriptor() ([]byte, []int) { - return fileDescriptor_proposer_kv_367b1f11f61ba339, []int{6, 0} + return fileDescriptor_proposer_kv_0c8837b323bf7b92, []int{6, 0} } func (m *ReplicatedEvalResult_AddSSTable) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -386,7 +386,7 @@ func (m *WriteBatch) Reset() { *m = WriteBatch{} } func (m *WriteBatch) String() string { return proto.CompactTextString(m) } func (*WriteBatch) ProtoMessage() {} func (*WriteBatch) Descriptor() ([]byte, []int) { - return fileDescriptor_proposer_kv_367b1f11f61ba339, []int{7} + return fileDescriptor_proposer_kv_0c8837b323bf7b92, []int{7} } func (m *WriteBatch) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -423,7 +423,7 @@ func (m *LogicalOpLog) Reset() { *m = LogicalOpLog{} } func (m *LogicalOpLog) String() string { return proto.CompactTextString(m) } func (*LogicalOpLog) ProtoMessage() {} func (*LogicalOpLog) Descriptor() ([]byte, []int) { - return fileDescriptor_proposer_kv_367b1f11f61ba339, []int{8} + return fileDescriptor_proposer_kv_0c8837b323bf7b92, []int{8} } func (m *LogicalOpLog) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -518,9 +518,10 @@ type RaftCommand struct { // logical_op_log contains a series of logical MVCC operations that correspond // to the physical operations being made in the write_batch. LogicalOpLog *LogicalOpLog `protobuf:"bytes,15,opt,name=logical_op_log,json=logicalOpLog,proto3" json:"logical_op_log,omitempty"` - // trace_data, if not empty, contains details of proposer's trace as returned by - // Tracer.Inject(opentracing.TextMap). Used to create span for command - // application on all the replicas that "follow from" the proposer. + // trace_data, if not empty, contains details of the proposer's trace as + // returned by Tracer.InjectMetaInto(sp.Meta(), ...). This is used to create + // spans for the command application process on all the replicas that "follow + // from" the proposer. TraceData map[string]string `protobuf:"bytes,16,rep,name=trace_data,json=traceData,proto3" json:"trace_data,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` } @@ -528,7 +529,7 @@ func (m *RaftCommand) Reset() { *m = RaftCommand{} } func (m *RaftCommand) String() string { return proto.CompactTextString(m) } func (*RaftCommand) ProtoMessage() {} func (*RaftCommand) Descriptor() ([]byte, []int) { - return fileDescriptor_proposer_kv_367b1f11f61ba339, []int{9} + return fileDescriptor_proposer_kv_0c8837b323bf7b92, []int{9} } func (m *RaftCommand) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -567,7 +568,7 @@ func (m *RaftCommandFooter) Reset() { *m = RaftCommandFooter{} } func (m *RaftCommandFooter) String() string { return proto.CompactTextString(m) } func (*RaftCommandFooter) ProtoMessage() {} func (*RaftCommandFooter) Descriptor() ([]byte, []int) { - return fileDescriptor_proposer_kv_367b1f11f61ba339, []int{10} + return fileDescriptor_proposer_kv_0c8837b323bf7b92, []int{10} } func (m *RaftCommandFooter) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -3477,10 +3478,10 @@ var ( ) func init() { - proto.RegisterFile("kv/kvserver/kvserverpb/proposer_kv.proto", fileDescriptor_proposer_kv_367b1f11f61ba339) + proto.RegisterFile("kv/kvserver/kvserverpb/proposer_kv.proto", fileDescriptor_proposer_kv_0c8837b323bf7b92) } -var fileDescriptor_proposer_kv_367b1f11f61ba339 = []byte{ +var fileDescriptor_proposer_kv_0c8837b323bf7b92 = []byte{ // 1424 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xa4, 0x57, 0x4f, 0x6f, 0xdb, 0x46, 0x16, 0xb7, 0x2c, 0xc9, 0xa6, 0x9e, 0x6c, 0x89, 0x9e, 0x38, 0x09, 0xd7, 0xbb, 0x2b, 0x19, 0xda, diff --git a/pkg/kv/kvserver/kvserverpb/proposer_kv.proto b/pkg/kv/kvserver/kvserverpb/proposer_kv.proto index 5bc6530d459a..3267b2a7b4a8 100644 --- a/pkg/kv/kvserver/kvserverpb/proposer_kv.proto +++ b/pkg/kv/kvserver/kvserverpb/proposer_kv.proto @@ -259,9 +259,10 @@ message RaftCommand { // to the physical operations being made in the write_batch. LogicalOpLog logical_op_log = 15; - // trace_data, if not empty, contains details of proposer's trace as returned by - // Tracer.Inject(opentracing.TextMap). Used to create span for command - // application on all the replicas that "follow from" the proposer. + // trace_data, if not empty, contains details of the proposer's trace as + // returned by Tracer.InjectMetaInto(sp.Meta(), ...). This is used to create + // spans for the command application process on all the replicas that "follow + // from" the proposer. map trace_data = 16; reserved 1, 2, 10001 to 10014; diff --git a/pkg/kv/kvserver/replica_application_decoder.go b/pkg/kv/kvserver/replica_application_decoder.go index 9dbcf042226f..35f55d3fddf8 100644 --- a/pkg/kv/kvserver/replica_application_decoder.go +++ b/pkg/kv/kvserver/replica_application_decoder.go @@ -17,7 +17,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/quotapool" "github.com/cockroachdb/cockroach/pkg/util/tracing" - "github.com/opentracing/opentracing-go" "go.etcd.io/etcd/raft/v3/raftpb" ) @@ -144,8 +143,9 @@ func (d *replicaDecoder) createTracingSpans(ctx context.Context) { } else if cmd.raftCmd.TraceData != nil { // The proposal isn't local, and trace data is available. Extract // the remote span and start a server-side span that follows from it. - spanCtx, err := d.r.AmbientContext.Tracer.Extract( - opentracing.TextMap, opentracing.TextMapCarrier(cmd.raftCmd.TraceData)) + spanMeta, err := d.r.AmbientContext.Tracer.ExtractMetaFrom(tracing.MapCarrier{ + Map: cmd.raftCmd.TraceData, + }) if err != nil { log.Errorf(ctx, "unable to extract trace data from raft command: %s", err) } else { @@ -154,7 +154,7 @@ func (d *replicaDecoder) createTracingSpans(ctx context.Context) { "raft application", // NB: we are lying here - we are not actually going to propagate // the recording towards the root. That seems ok. - tracing.WithParentAndManualCollection(spanCtx), + tracing.WithParentAndManualCollection(spanMeta), tracing.WithFollowsFrom(), ) } diff --git a/pkg/kv/kvserver/replica_proposal.go b/pkg/kv/kvserver/replica_proposal.go index 74cfeb89e7a1..b50a20e62957 100644 --- a/pkg/kv/kvserver/replica_proposal.go +++ b/pkg/kv/kvserver/replica_proposal.go @@ -38,7 +38,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/kr/pretty" - opentracing "github.com/opentracing/opentracing-go" "golang.org/x/time/rate" ) @@ -842,7 +841,7 @@ func (r *Replica) requestToProposal( } // getTraceData extracts the SpanMeta of the current span. -func (r *Replica) getTraceData(ctx context.Context) opentracing.TextMapCarrier { +func (r *Replica) getTraceData(ctx context.Context) map[string]string { sp := tracing.SpanFromContext(ctx) if sp == nil { return nil @@ -850,12 +849,13 @@ func (r *Replica) getTraceData(ctx context.Context) opentracing.TextMapCarrier { if sp.IsBlackHole() { return nil } - traceData := opentracing.TextMapCarrier{} - if err := r.AmbientContext.Tracer.Inject( - sp.Meta(), opentracing.TextMap, traceData, - ); err != nil { + + traceCarrier := tracing.MapCarrier{ + Map: make(map[string]string), + } + if err := r.AmbientContext.Tracer.InjectMetaInto(sp.Meta(), traceCarrier); err != nil { log.Errorf(ctx, "failed to inject sp context (%+v) as trace data: %s", sp.Meta(), err) return nil } - return traceData + return traceCarrier.Map } diff --git a/pkg/util/tracing/BUILD.bazel b/pkg/util/tracing/BUILD.bazel index 588dbb14f20c..31ade7e64afa 100644 --- a/pkg/util/tracing/BUILD.bazel +++ b/pkg/util/tracing/BUILD.bazel @@ -67,5 +67,6 @@ go_test( "@com_github_opentracing_opentracing_go//:opentracing-go", "@com_github_opentracing_opentracing_go//log", "@com_github_stretchr_testify//require", + "@org_golang_google_grpc//metadata", ], ) diff --git a/pkg/util/tracing/doc.go b/pkg/util/tracing/doc.go index b4deacfda26b..6db26cc4b5ae 100644 --- a/pkg/util/tracing/doc.go +++ b/pkg/util/tracing/doc.go @@ -96,7 +96,7 @@ // [7]: `Span.SetVerbose`. To understand the specifics of what exactly is // captured in Span recording, when Spans have children that may be either // local or remote, look towards `WithParentAnd{Auto,Manual}Collection` -// [8]: `Tracer.{Inject,Extract}` +// [8]: `Tracer.{InjectMetaInto,ExtractMetaFrom}` // [9]: `SpanMeta` // [10]: `{Client,Server}Interceptor` // [11]: `SpanFromContext` diff --git a/pkg/util/tracing/grpc_interceptor.go b/pkg/util/tracing/grpc_interceptor.go index 3e11dcd1ee6e..e25393b59e74 100644 --- a/pkg/util/tracing/grpc_interceptor.go +++ b/pkg/util/tracing/grpc_interceptor.go @@ -27,15 +27,14 @@ import ( "google.golang.org/grpc/status" ) -var gRPCComponentTag = opentracing.Tag{Key: string(ext.Component), Value: "gRPC"} - -// metadataReaderWriter satisfies both the opentracing.TextMapReader and -// opentracing.TextMapWriter interfaces. -type metadataReaderWriter struct { +// metadataCarrier is an implementation of the Carrier interface for gRPC +// metadata. +type metadataCarrier struct { metadata.MD } -func (w metadataReaderWriter) Set(key, val string) { +// Set implements the Carrier interface. +func (w metadataCarrier) Set(key, val string) { // The GRPC HPACK implementation rejects any uppercase keys here. // // As such, since the HTTP_HEADERS format is case-insensitive anyway, we @@ -45,10 +44,11 @@ func (w metadataReaderWriter) Set(key, val string) { w.MD[key] = append(w.MD[key], val) } -func (w metadataReaderWriter) ForeachKey(handler func(key, val string) error) error { +// ForEach implements the Carrier interface. +func (w metadataCarrier) ForEach(fn func(key, val string) error) error { for k, vals := range w.MD { for _, v := range vals { - if err := handler(k, v); err != nil { + if err := fn(k, v); err != nil { return err } } @@ -62,7 +62,7 @@ func extractSpanMeta(ctx context.Context, tracer *Tracer) (*SpanMeta, error) { if !ok { md = metadata.New(nil) } - return tracer.Extract(opentracing.HTTPHeaders, metadataReaderWriter{md}) + return tracer.ExtractMetaFrom(metadataCarrier{md}) } // spanInclusionFuncForServer is used as a SpanInclusionFunc for the server-side @@ -76,9 +76,9 @@ func spanInclusionFuncForServer(t *Tracer, spanMeta *SpanMeta) bool { return !spanMeta.isNilOrNoop() || t.AlwaysTrace() } -// SetSpanTags sets one or more tags on the given span according to the +// setSpanTags sets one or more tags on the given span according to the // error. -func SetSpanTags(sp *Span, err error, client bool) { +func setSpanTags(sp *Span, err error, client bool) { c := otgrpc.ErrorClass(err) code := codes.Unknown if s, ok := status.FromError(err); ok { @@ -94,6 +94,8 @@ func SetSpanTags(sp *Span, err error, client bool) { } } +var gRPCComponentTag = opentracing.Tag{Key: string(ext.Component), Value: "gRPC"} + // ServerInterceptor returns a grpc.UnaryServerInterceptor suitable // for use in a grpc.NewServer call. // @@ -134,7 +136,7 @@ func ServerInterceptor(tracer *Tracer) grpc.UnaryServerInterceptor { resp, err = handler(ctx, req) if err != nil { - SetSpanTags(serverSpan, err, false) + setSpanTags(serverSpan, err, false) serverSpan.Recordf("error: %s", err) } return resp, err @@ -180,7 +182,7 @@ func StreamServerInterceptor(tracer *Tracer) grpc.StreamServerInterceptor { } err = handler(srv, ss) if err != nil { - SetSpanTags(serverSpan, err, false) + setSpanTags(serverSpan, err, false) serverSpan.Recordf("error: %s", err) } return err @@ -209,17 +211,16 @@ func spanInclusionFuncForClient(parent *Span) bool { return parent != nil && !parent.isNoop() } -func injectSpanContext(ctx context.Context, tracer *Tracer, clientSpan *Span) context.Context { +func injectSpanMeta(ctx context.Context, tracer *Tracer, clientSpan *Span) context.Context { md, ok := metadata.FromOutgoingContext(ctx) if !ok { md = metadata.New(nil) } else { md = md.Copy() } - mdWriter := metadataReaderWriter{md} - err := tracer.Inject(clientSpan.Meta(), opentracing.HTTPHeaders, mdWriter) - // We have no better place to record an error than the Span itself :-/ - if err != nil { + + if err := tracer.InjectMetaInto(clientSpan.Meta(), metadataCarrier{md}); err != nil { + // We have no better place to record an error than the Span itself. clientSpan.Recordf("error: %s", err) } return metadata.NewOutgoingContext(ctx, md) @@ -262,10 +263,10 @@ func ClientInterceptor(tracer *Tracer, init func(*Span)) grpc.UnaryClientInterce ) init(clientSpan) defer clientSpan.Finish() - ctx = injectSpanContext(ctx, tracer, clientSpan) + ctx = injectSpanMeta(ctx, tracer, clientSpan) err := invoker(ctx, method, req, resp, cc, opts...) if err != nil { - SetSpanTags(clientSpan, err, true) + setSpanTags(clientSpan, err, true) clientSpan.Recordf("error: %s", err) } return err @@ -310,11 +311,11 @@ func StreamClientInterceptor(tracer *Tracer, init func(*Span)) grpc.StreamClient WithTags(gRPCComponentTag, ext.SpanKindRPCClient), ) init(clientSpan) - ctx = injectSpanContext(ctx, tracer, clientSpan) + ctx = injectSpanMeta(ctx, tracer, clientSpan) cs, err := streamer(ctx, desc, cc, method, opts...) if err != nil { clientSpan.Recordf("error: %s", err) - SetSpanTags(clientSpan, err, true) + setSpanTags(clientSpan, err, true) clientSpan.Finish() return cs, err } @@ -341,7 +342,7 @@ func newTracingClientStream( defer clientSpan.Finish() if err != nil { clientSpan.Recordf("error: %s", err) - SetSpanTags(clientSpan, err, true) + setSpanTags(clientSpan, err, true) } } go func() { diff --git a/pkg/util/tracing/span_test.go b/pkg/util/tracing/span_test.go index 8d49e51f7ba5..0866092f8f24 100644 --- a/pkg/util/tracing/span_test.go +++ b/pkg/util/tracing/span_test.go @@ -19,8 +19,8 @@ import ( "github.com/cockroachdb/errors" "github.com/gogo/protobuf/types" - "github.com/opentracing/opentracing-go" "github.com/stretchr/testify/require" + "google.golang.org/grpc/metadata" ) func TestRecordingString(t *testing.T) { @@ -36,18 +36,19 @@ func TestRecordingString(t *testing.T) { // than the one we just assigned. Otherwise the sorting will be screwed up. time.Sleep(10 * time.Millisecond) - carrier := make(opentracing.HTTPHeadersCarrier) - err := tr.Inject(root.Meta(), opentracing.HTTPHeaders, carrier) + carrier := metadataCarrier{MD: metadata.MD{}} + require.NoError(t, tr.InjectMetaInto(root.Meta(), carrier)) + + wireSpanMeta, err := tr2.ExtractMetaFrom(carrier) require.NoError(t, err) - wireContext, err := tr2.Extract(opentracing.HTTPHeaders, carrier) - remoteChild := tr2.StartSpan("remote child", WithParentAndManualCollection(wireContext)) + + remoteChild := tr2.StartSpan("remote child", WithParentAndManualCollection(wireSpanMeta)) root.Record("root 2") remoteChild.Record("remote child 1") - require.NoError(t, err) remoteChild.Finish() + remoteRec := remoteChild.GetRecording() - err = root.ImportRemoteSpans(remoteRec) - require.NoError(t, err) + require.NoError(t, root.ImportRemoteSpans(remoteRec)) root.Finish() root.Record("root 3") diff --git a/pkg/util/tracing/tracer.go b/pkg/util/tracing/tracer.go index 50e131fcd6ec..9a8492f7a8c3 100644 --- a/pkg/util/tracing/tracer.go +++ b/pkg/util/tracing/tracer.go @@ -25,6 +25,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/envutil" "github.com/cockroachdb/cockroach/pkg/util/iterutil" "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/errors" "github.com/cockroachdb/logtags" opentracing "github.com/opentracing/opentracing-go" "github.com/petermattis/goid" @@ -453,6 +454,68 @@ func (t *Tracer) startSpanGeneric( return maybeWrapCtx(ctx, &helper.octx, s) } +// serializationFormat is the format used by the Tracer to {de,}serialize span +// metadata across process boundaries. This takes place within +// Tracer.{InjectMetaInto,ExtractMetaFrom}. Each format is inextricably linked +// to a corresponding Carrier, which is the thing that actually captures the +// serialized data and crosses process boundaries. +// +// The usage pattern is as follows: +// +// // One end of the RPC. +// carrier := MapCarrier{...} +// tracer.InjectMetaInto(sp.Meta(), carrier) +// +// // carrier crosses RPC boundary. +// +// // Other end of the RPC. +// spMeta, _ := Tracer.ExtractMetaFrom(carrier) +// ctx, sp := tracer.StartSpanCtx(..., spMeta) +// +type serializationFormat = opentracing.BuiltinFormat + +const ( + _ serializationFormat = iota + + // metadataFormat is used to {de,}serialize data as HTTP header string + // pairs. It's used with gRPC (the carrier must be metadataCarrier), for + // when operations straddle RPC boundaries. + metadataFormat = opentracing.HTTPHeaders + + // mapFormat is used to serialize data as a map of string pairs. The carrier + // must be MapCarrier. + mapFormat = opentracing.TextMap +) + +// Carrier is what's used to capture the serialized data. Each carrier is +// inextricably linked to a corresponding format. See serializationFormat for +// more details. +type Carrier interface { + Set(key, val string) + ForEach(fn func(key, val string) error) error +} + +// MapCarrier is an implementation of the Carrier interface for a map of string +// pairs. +type MapCarrier struct { + Map map[string]string +} + +// Set implements the Carrier interface. +func (c MapCarrier) Set(key, val string) { + c.Map[key] = val +} + +// ForEach implements the Carrier interface. +func (c MapCarrier) ForEach(fn func(key, val string) error) error { + for k, v := range c.Map { + if err := fn(k, v); err != nil { + return err + } + } + return nil +} + type textMapWriterFn func(key, val string) var _ opentracing.TextMapWriter = textMapWriterFn(nil) @@ -462,29 +525,31 @@ func (fn textMapWriterFn) Set(key, val string) { fn(key, val) } -// Inject is part of the opentracing.Tracer interface. -func (t *Tracer) Inject(sc *SpanMeta, format interface{}, carrier interface{}) error { - if sc.isNilOrNoop() { - // Fast path when tracing is disabled. Extract will accept an empty map as a - // noop context. +// InjectMetaInto is used to serialize the given span metadata into the given +// Carrier. This, alongside ExtractMetaFrom, can be used to carry span metadata +// across process boundaries. See serializationFormat for more details. +func (t *Tracer) InjectMetaInto(sm *SpanMeta, carrier Carrier) error { + if sm.isNilOrNoop() { + // Fast path when tracing is disabled. ExtractMetaFrom will accept an + // empty map as a noop context. return nil } - // We only support the HTTPHeaders/TextMap format. - if format != opentracing.HTTPHeaders && format != opentracing.TextMap { - return opentracing.ErrUnsupportedFormat - } - - mapWriter, ok := carrier.(opentracing.TextMapWriter) - if !ok { - return opentracing.ErrInvalidCarrier + var format serializationFormat + switch carrier.(type) { + case MapCarrier: + format = mapFormat + case metadataCarrier: + format = metadataFormat + default: + return errors.New("unsupported carrier") } - mapWriter.Set(fieldNameTraceID, strconv.FormatUint(sc.traceID, 16)) - mapWriter.Set(fieldNameSpanID, strconv.FormatUint(sc.spanID, 16)) + carrier.Set(fieldNameTraceID, strconv.FormatUint(sm.traceID, 16)) + carrier.Set(fieldNameSpanID, strconv.FormatUint(sm.spanID, 16)) - for k, v := range sc.Baggage { - mapWriter.Set(prefixBaggage+k, v) + for k, v := range sm.Baggage { + carrier.Set(prefixBaggage+k, v) } shadowTr := t.getShadowTracer() @@ -493,11 +558,11 @@ func (t *Tracer) Inject(sc *SpanMeta, format interface{}, carrier interface{}) e // to put information on the wire. If something changes out from under us, forget // about shadow tracing. curTyp, _ := shadowTr.Type() - if typ := sc.shadowTracerType; typ == curTyp { - mapWriter.Set(fieldNameShadowType, sc.shadowTracerType) + if typ := sm.shadowTracerType; typ == curTyp { + carrier.Set(fieldNameShadowType, sm.shadowTracerType) // Encapsulate the shadow text map, prepending a prefix to the keys. - if err := shadowTr.Inject(sc.shadowCtx, format, textMapWriterFn(func(key, val string) { - mapWriter.Set(prefixShadow+key, val) + if err := shadowTr.Inject(sm.shadowCtx, format, textMapWriterFn(func(key, val string) { + carrier.Set(prefixShadow+key, val) })); err != nil { return err } @@ -507,20 +572,20 @@ func (t *Tracer) Inject(sc *SpanMeta, format interface{}, carrier interface{}) e return nil } -var noopSpanContext = &SpanMeta{} - -// Extract is part of the opentracing.Tracer interface. -// It always returns a valid context, even in error cases (this is assumed by the -// grpc-opentracing interceptor). -func (t *Tracer) Extract(format interface{}, carrier interface{}) (*SpanMeta, error) { - // We only support the HTTPHeaders/TextMap format. - if format != opentracing.HTTPHeaders && format != opentracing.TextMap { - return noopSpanContext, opentracing.ErrUnsupportedFormat - } +var noopSpanMeta = &SpanMeta{} - mapReader, ok := carrier.(opentracing.TextMapReader) - if !ok { - return noopSpanContext, opentracing.ErrInvalidCarrier +// ExtractMetaFrom is used to deserialize a span metadata (if any) from the +// given Carrier. This, alongside InjectMetaFrom, can be used to carry span +// metadata across process boundaries. See serializationFormat for more details. +func (t *Tracer) ExtractMetaFrom(carrier Carrier) (*SpanMeta, error) { + var format serializationFormat + switch carrier.(type) { + case MapCarrier: + format = mapFormat + case metadataCarrier: + format = metadataFormat + default: + return noopSpanMeta, errors.New("unsupported carrier") } var shadowType string @@ -532,7 +597,7 @@ func (t *Tracer) Extract(format interface{}, carrier interface{}) (*SpanMeta, er // TODO(tbg): ForeachKey forces things on the heap. We can do better // by using an explicit carrier. - err := mapReader.ForeachKey(func(k, v string) error { + err := carrier.ForEach(func(k, v string) error { switch k = strings.ToLower(k); k { case fieldNameTraceID: var err error @@ -565,10 +630,10 @@ func (t *Tracer) Extract(format interface{}, carrier interface{}) (*SpanMeta, er return nil }) if err != nil { - return noopSpanContext, err + return noopSpanMeta, err } if traceID == 0 && spanID == 0 { - return noopSpanContext, nil + return noopSpanMeta, nil } var recordingType RecordingType @@ -593,7 +658,7 @@ func (t *Tracer) Extract(format interface{}, carrier interface{}) (*SpanMeta, er // Extract the shadow context using the un-encapsulated textmap. shadowCtx, err = shadowTr.Extract(format, shadowCarrier) if err != nil { - return noopSpanContext, err + return noopSpanMeta, err } } } diff --git a/pkg/util/tracing/tracer_test.go b/pkg/util/tracing/tracer_test.go index 4e78e8f8abe0..8bc59728aeb2 100644 --- a/pkg/util/tracing/tracer_test.go +++ b/pkg/util/tracing/tracer_test.go @@ -17,8 +17,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/iterutil" "github.com/cockroachdb/logtags" lightstep "github.com/lightstep/lightstep-tracer-go" - opentracing "github.com/opentracing/opentracing-go" "github.com/stretchr/testify/require" + "google.golang.org/grpc/metadata" ) func TestStartSpanAlwaysTrace(t *testing.T) { @@ -211,22 +211,22 @@ func TestTracerInjectExtract(t *testing.T) { if !noop1.isNoop() { t.Fatalf("expected noop Span: %+v", noop1) } - carrier := make(opentracing.HTTPHeadersCarrier) - if err := tr.Inject(noop1.Meta(), opentracing.HTTPHeaders, carrier); err != nil { + carrier := metadataCarrier{metadata.MD{}} + if err := tr.InjectMetaInto(noop1.Meta(), carrier); err != nil { t.Fatal(err) } - if len(carrier) != 0 { + if len(carrier.MD) != 0 { t.Errorf("noop Span has carrier: %+v", carrier) } - wireContext, err := tr2.Extract(opentracing.HTTPHeaders, carrier) + wireSpanMeta, err := tr2.ExtractMetaFrom(carrier) if err != nil { t.Fatal(err) } - if !wireContext.isNilOrNoop() { - t.Errorf("expected noop context: %v", wireContext) + if !wireSpanMeta.isNilOrNoop() { + t.Errorf("expected noop context: %v", wireSpanMeta) } - noop2 := tr2.StartSpan("remote op", WithParentAndManualCollection(wireContext)) + noop2 := tr2.StartSpan("remote op", WithParentAndManualCollection(wireSpanMeta)) if !noop2.isNoop() { t.Fatalf("expected noop Span: %+v", noop2) } @@ -239,16 +239,16 @@ func TestTracerInjectExtract(t *testing.T) { s1 := tr.StartSpan("a", WithForceRealSpan()) s1.SetVerbose(true) - carrier = make(opentracing.HTTPHeadersCarrier) - if err := tr.Inject(s1.Meta(), opentracing.HTTPHeaders, carrier); err != nil { + carrier = metadataCarrier{metadata.MD{}} + if err := tr.InjectMetaInto(s1.Meta(), carrier); err != nil { t.Fatal(err) } - wireContext, err = tr2.Extract(opentracing.HTTPHeaders, carrier) + wireSpanMeta, err = tr2.ExtractMetaFrom(carrier) if err != nil { t.Fatal(err) } - s2 := tr2.StartSpan("remote op", WithParentAndManualCollection(wireContext)) + s2 := tr2.StartSpan("remote op", WithParentAndManualCollection(wireSpanMeta)) // Compare TraceIDs trace1 := s1.Meta().traceID @@ -311,18 +311,18 @@ func TestLightstepContext(t *testing.T) { const testBaggageVal = "test-val" s.SetBaggageItem(testBaggageKey, testBaggageVal) - carrier := make(opentracing.HTTPHeadersCarrier) - if err := tr.Inject(s.Meta(), opentracing.HTTPHeaders, carrier); err != nil { + carrier := metadataCarrier{metadata.MD{}} + if err := tr.InjectMetaInto(s.Meta(), carrier); err != nil { t.Fatal(err) } - // Extract also extracts the embedded lightstep context. - wireContext, err := tr.Extract(opentracing.HTTPHeaders, carrier) + // ExtractMetaFrom also extracts the embedded lightstep context. + wireSpanMeta, err := tr.ExtractMetaFrom(carrier) if err != nil { t.Fatal(err) } - s2 := tr.StartSpan("child", WithParentAndManualCollection(wireContext)) + s2 := tr.StartSpan("child", WithParentAndManualCollection(wireSpanMeta)) s2Ctx := s2.ot.shadowSpan.Context() // Verify that the baggage is correct in both the tracer context and in the