Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/jobs/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,9 +517,9 @@ func (r *Registry) CreateStartableJobWithTxn(
// Construct a context which contains a tracing span that follows from the
// span in the parent context. We don't directly use the parent span because
// we want independent lifetimes and cancellation. For the same reason, we
// don't use the Context returned by ForkCtxSpan.
// don't use the Context returned by ForkSpan.
resumerCtx, cancel := r.makeCtx()
_, span := tracing.ForkCtxSpan(ctx, "job")
_, span := tracing.ForkSpan(ctx, "job")
if span != nil {
resumerCtx = tracing.ContextWithSpan(resumerCtx, span)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/rangecache/range_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -632,7 +632,7 @@ func (rc *RangeCache) tryLookup(
resC, leader := rc.lookupRequests.DoChan(requestKey, func() (interface{}, error) {
var lookupRes EvictionToken
if err := rc.stopper.RunTaskWithErr(ctx, "rangecache: range lookup", func(ctx context.Context) error {
ctx, reqSpan := tracing.ForkCtxSpan(ctx, "range lookup")
ctx, reqSpan := tracing.ForkSpan(ctx, "range lookup")
defer reqSpan.Finish()
// Clear the context's cancelation. This request services potentially many
// callers waiting for its result, and using the flight's leader's
Expand Down
1 change: 0 additions & 1 deletion pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,6 @@ go_library(
"@com_github_cockroachdb_redact//:redact",
"@com_github_google_btree//:btree",
"@com_github_kr_pretty//:pretty",
"@com_github_opentracing_opentracing_go//:opentracing-go",
"@io_etcd_go_etcd_raft_v3//:raft",
"@io_etcd_go_etcd_raft_v3//raftpb",
"@io_etcd_go_etcd_raft_v3//tracker",
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/concurrency/lock_table_waiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -713,11 +713,11 @@ func (h *contentionEventHelper) emit() {
}
h.ev.Duration = timeutil.Since(h.tBegin)
if h.onEvent != nil {
// NB: this is intentionally above the call to LogStructured so that
// NB: this is intentionally above the call to RecordStructured so that
// this interceptor gets to mutate the event (used for test determinism).
h.onEvent(h.ev)
}
h.sp.LogStructured(h.ev)
h.sp.RecordStructured(h.ev)
h.ev = nil
}

Expand Down
35 changes: 18 additions & 17 deletions pkg/kv/kvserver/kvserverpb/proposer_kv.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions pkg/kv/kvserver/kvserverpb/proposer_kv.proto
Original file line number Diff line number Diff line change
Expand Up @@ -259,9 +259,10 @@ message RaftCommand {
// to the physical operations being made in the write_batch.
LogicalOpLog logical_op_log = 15;

// trace_data, if not empty, contains details of proposer's trace as returned by
// Tracer.Inject(opentracing.TextMap). Used to create span for command
// application on all the replicas that "follow from" the proposer.
// trace_data, if not empty, contains details of the proposer's trace as
// returned by Tracer.InjectMetaInto(sp.Meta(), ...). This is used to create
// spans for the command application process on all the replicas that "follow
// from" the proposer.
map<string, string> trace_data = 16;

reserved 1, 2, 10001 to 10014;
Expand Down
12 changes: 6 additions & 6 deletions pkg/kv/kvserver/replica_application_decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/quotapool"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/opentracing/opentracing-go"
"go.etcd.io/etcd/raft/v3/raftpb"
)

Expand Down Expand Up @@ -140,12 +139,13 @@ func (d *replicaDecoder) createTracingSpans(ctx context.Context) {
for it.init(&d.cmdBuf); it.Valid(); it.Next() {
cmd := it.cur()
if cmd.IsLocal() {
cmd.ctx, cmd.sp = tracing.ForkCtxSpan(cmd.proposal.ctx, opName)
cmd.ctx, cmd.sp = tracing.ForkSpan(cmd.proposal.ctx, opName)
} else if cmd.raftCmd.TraceData != nil {
// The proposal isn't local, and trace data is available. Extract
// the remote span and start a server-side span that follows from it.
spanCtx, err := d.r.AmbientContext.Tracer.Extract(
opentracing.TextMap, opentracing.TextMapCarrier(cmd.raftCmd.TraceData))
spanMeta, err := d.r.AmbientContext.Tracer.ExtractMetaFrom(tracing.MapCarrier{
Map: cmd.raftCmd.TraceData,
})
if err != nil {
log.Errorf(ctx, "unable to extract trace data from raft command: %s", err)
} else {
Expand All @@ -154,12 +154,12 @@ func (d *replicaDecoder) createTracingSpans(ctx context.Context) {
"raft application",
// NB: we are lying here - we are not actually going to propagate
// the recording towards the root. That seems ok.
tracing.WithParentAndManualCollection(spanCtx),
tracing.WithParentAndManualCollection(spanMeta),
tracing.WithFollowsFrom(),
)
}
} else {
cmd.ctx, cmd.sp = tracing.ForkCtxSpan(ctx, opName)
cmd.ctx, cmd.sp = tracing.ForkSpan(ctx, opName)
}
}
}
Expand Down
14 changes: 7 additions & 7 deletions pkg/kv/kvserver/replica_proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/kr/pretty"
opentracing "github.com/opentracing/opentracing-go"
"golang.org/x/time/rate"
)

Expand Down Expand Up @@ -842,20 +841,21 @@ func (r *Replica) requestToProposal(
}

// getTraceData extracts the SpanMeta of the current span.
func (r *Replica) getTraceData(ctx context.Context) opentracing.TextMapCarrier {
func (r *Replica) getTraceData(ctx context.Context) map[string]string {
sp := tracing.SpanFromContext(ctx)
if sp == nil {
return nil
}
if sp.IsBlackHole() {
return nil
}
traceData := opentracing.TextMapCarrier{}
if err := r.AmbientContext.Tracer.Inject(
sp.Meta(), opentracing.TextMap, traceData,
); err != nil {

traceCarrier := tracing.MapCarrier{
Map: make(map[string]string),
}
if err := r.AmbientContext.Tracer.InjectMetaInto(sp.Meta(), traceCarrier); err != nil {
log.Errorf(ctx, "failed to inject sp context (%+v) as trace data: %s", sp.Meta(), err)
return nil
}
return traceData
return traceCarrier.Map
}
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (r *Replica) evalAndPropose(

// Fork the proposal's context span so that the proposal's context
// can outlive the original proposer's context.
proposal.ctx, proposal.sp = tracing.ForkCtxSpan(ctx, "async consensus")
proposal.ctx, proposal.sp = tracing.ForkSpan(ctx, "async consensus")

// Signal the proposal's response channel immediately.
reply := *proposal.Local.Reply
Expand Down Expand Up @@ -205,7 +205,7 @@ func (r *Replica) evalAndPropose(
defer r.raftMu.Unlock()
r.mu.Lock()
defer r.mu.Unlock()
// TODO(radu): Should this context be created via tracer.ForkCtxSpan?
// TODO(radu): Should this context be created via tracer.ForkSpan?
// We'd need to make sure the span is finished eventually.
proposal.ctx = r.AnnotateCtx(context.TODO())
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/flowinfra/flow_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ func (fr *FlowRegistry) RegisterFlow(
fr.Unlock()
if len(timedOutReceivers) != 0 {
// The span in the context might be finished by the time this runs. In
// principle, we could ForkCtxSpan() beforehand, but we don't want to
// principle, we could ForkSpan() beforehand, but we don't want to
// create the extra span every time.
timeoutCtx := tracing.ContextWithSpan(ctx, nil)
log.Errorf(
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/stats/stats_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ func (sc *TableStatisticsCache) refreshCacheEntry(ctx context.Context, tableID d
// by fetching the new stats from the database.
func (sc *TableStatisticsCache) RefreshTableStats(ctx context.Context, tableID descpb.ID) {
log.VEventf(ctx, 1, "refreshing statistics for table %d", tableID)
ctx, span := tracing.ForkCtxSpan(ctx, "refresh-table-stats")
ctx, span := tracing.ForkSpan(ctx, "refresh-table-stats")
// Perform an asynchronous refresh of the cache.
go func() {
defer span.Finish()
Expand Down
4 changes: 2 additions & 2 deletions pkg/util/stop/stopper.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ func (s *Stopper) RunAsyncTask(
return ErrUnavailable
}

ctx, span := tracing.ForkCtxSpan(ctx, taskName)
ctx, span := tracing.ForkSpan(ctx, taskName)

// Call f.
go func() {
Expand Down Expand Up @@ -400,7 +400,7 @@ func (s *Stopper) RunLimitedAsyncTask(
return ErrUnavailable
}

ctx, span := tracing.ForkCtxSpan(ctx, taskName)
ctx, span := tracing.ForkSpan(ctx, taskName)

go func() {
defer s.Recover(ctx)
Expand Down
1 change: 1 addition & 0 deletions pkg/util/tracing/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -67,5 +67,6 @@ go_test(
"@com_github_opentracing_opentracing_go//:opentracing-go",
"@com_github_opentracing_opentracing_go//log",
"@com_github_stretchr_testify//require",
"@org_golang_google_grpc//metadata",
],
)
2 changes: 1 addition & 1 deletion pkg/util/tracing/crdbspan.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func (s *crdbSpan) record(msg string) {
}
}

func (s *crdbSpan) logStructured(item Structured) {
func (s *crdbSpan) recordStructured(item Structured) {
s.mu.Lock()
defer s.mu.Unlock()
s.mu.structured = append(s.mu.structured, item)
Expand Down
4 changes: 2 additions & 2 deletions pkg/util/tracing/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,13 @@
// [2]: https://opentracing.io/specification/
// [3]: `Recording.String`
// [4]: `ChildSpan`
// [5]: `ForkCtxSpan`. "forking" a Span is the same as creating a new one
// [5]: `ForkSpan`. "forking" a Span is the same as creating a new one
// with a "follows from" relation.
// [6]: `crdbSpan`
// [7]: `Span.SetVerbose`. To understand the specifics of what exactly is
// captured in Span recording, when Spans have children that may be either
// local or remote, look towards `WithParentAnd{Auto,Manual}Collection`
// [8]: `Tracer.{Inject,Extract}`
// [8]: `Tracer.{InjectMetaInto,ExtractMetaFrom}`
// [9]: `SpanMeta`
// [10]: `{Client,Server}Interceptor`
// [11]: `SpanFromContext`
Expand Down
Loading