Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion pkg/jobs/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,9 @@ func (r *Registry) ID() base.SQLInstanceID {
// makeCtx returns a new context from r's ambient context and an associated
// cancel func.
func (r *Registry) makeCtx() (context.Context, func()) {
return context.WithCancel(r.ac.AnnotateCtx(r.serverCtx))
ctx := r.ac.AnnotateCtx(context.Background())
ctx = logtags.WithTags(ctx, logtags.FromContext(r.serverCtx))
return context.WithCancel(ctx)
}

// MakeJobID generates a new job ID.
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/drain.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ func (s *Server) drainClients(ctx context.Context, reporter func(int, redact.Saf

// Drain the SQL leases. This must be done after the pgServer has
// given sessions a chance to finish ongoing work.
s.sqlServer.leaseMgr.SetDraining(true /* drain */, reporter)
s.sqlServer.leaseMgr.SetDraining(ctx, true /* drain */, reporter)

// Done. This executes the defers set above to drain SQL leases.
return nil
Expand Down
9 changes: 8 additions & 1 deletion pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ import (
// standalone SQLServer instances per tenant (the KV layer is shared across all
// tenants).
type SQLServer struct {
ambientCtx log.AmbientContext
stopper *stop.Stopper
sqlIDContainer *base.SQLIDContainer
pgServer *pgwire.Server
Expand Down Expand Up @@ -906,6 +907,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
}

return &SQLServer{
ambientCtx: cfg.BaseConfig.AmbientCtx,
stopper: cfg.stopper,
sqlIDContainer: cfg.nodeIDContainer,
pgServer: pgServer,
Expand Down Expand Up @@ -1120,7 +1122,7 @@ func (s *SQLServer) preStart(

// Delete all orphaned table leases created by a prior instance of this
// node. This also uses SQL.
s.leaseMgr.DeleteOrphanedLeases(orphanedLeasesTimeThresholdNanos)
s.leaseMgr.DeleteOrphanedLeases(ctx, orphanedLeasesTimeThresholdNanos)

// Start scheduled jobs daemon.
jobs.StartJobSchedulerDaemon(
Expand Down Expand Up @@ -1164,3 +1166,8 @@ func (s *SQLServer) SQLInstanceID() base.SQLInstanceID {
func (s *SQLServer) StartDiagnostics(ctx context.Context) {
s.diagnosticsReporter.PeriodicallyReportDiagnostics(ctx, s.stopper)
}

// AnnotateCtx annotates the given context with the server tracer and tags.
func (s *SQLServer) AnnotateCtx(ctx context.Context) context.Context {
return s.ambientCtx.AnnotateCtx(ctx)
}
34 changes: 30 additions & 4 deletions pkg/server/tenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func StartTenant(
return nil, "", "", err
}

args, err := makeTenantSQLServerArgs(stopper, kvClusterName, baseCfg, sqlCfg)
args, err := makeTenantSQLServerArgs(ctx, stopper, kvClusterName, baseCfg, sqlCfg)
if err != nil {
return nil, "", "", err
}
Expand Down Expand Up @@ -96,7 +96,18 @@ func StartTenant(
// TODO(davidh): Do we need to force this to be false?
baseCfg.SplitListenSQL = false

background := baseCfg.AmbientCtx.AnnotateCtx(context.Background())
// Add the server tags to the startup context.
//
// We use args.BaseConfig here instead of baseCfg directly because
// makeTenantSQLArgs defines its own AmbientCtx instance and it's
// defined by-value.
ctx = args.BaseConfig.AmbientCtx.AnnotateCtx(ctx)

// Add the server tags to a generic background context for use
// by async goroutines.
// We can only annotate the context after makeTenantSQLServerArgs
// has defined the instance ID container in the AmbientCtx.
background := args.BaseConfig.AmbientCtx.AnnotateCtx(context.Background())

// StartListenRPCAndSQL will replace the SQLAddr fields if we choose
// to share the SQL and gRPC port so here, since the tenant config
Expand Down Expand Up @@ -164,6 +175,16 @@ func StartTenant(
args.sqlStatusServer = tenantStatusServer
s, err := newSQLServer(ctx, args)
tenantStatusServer.sqlServer = s
// Also add the SQL instance tag to the tenant status server's
// ambient context.
//
// We use the tag "sqli" instead of just "sql" because the latter is
// too generic and would be hard to search if someone was looking at
// a log message and wondering what it stands for.
//
// TODO(knz): find a way to share common logging tags between
// multiple AmbientContext instances.
tenantStatusServer.AmbientContext.AddLogTag("sqli", s.sqlIDContainer)

if err != nil {
return nil, "", "", err
Expand Down Expand Up @@ -355,7 +376,11 @@ func loadVarsHandler(
}

func makeTenantSQLServerArgs(
stopper *stop.Stopper, kvClusterName string, baseCfg BaseConfig, sqlCfg SQLConfig,
startupCtx context.Context,
stopper *stop.Stopper,
kvClusterName string,
baseCfg BaseConfig,
sqlCfg SQLConfig,
) (sqlServerArgs, error) {
st := baseCfg.Settings

Expand All @@ -366,6 +391,7 @@ func makeTenantSQLServerArgs(
// too generic and would be hard to search if someone was looking at
// a log message and wondering what it stands for.
baseCfg.AmbientCtx.AddLogTag("sqli", instanceIDContainer)
startupCtx = baseCfg.AmbientCtx.AnnotateCtx(startupCtx)

// TODO(tbg): this is needed so that the RPC heartbeats between the testcluster
// and this tenant work.
Expand Down Expand Up @@ -480,7 +506,7 @@ func makeTenantSQLServerArgs(

recorder := status.NewMetricsRecorder(clock, nil, rpcContext, nil, st)

runtime := status.NewRuntimeStatSampler(context.Background(), clock)
runtime := status.NewRuntimeStatSampler(startupCtx, clock)
registry.AddMetricStruct(runtime)

esb := &externalStorageBuilder{}
Expand Down
7 changes: 5 additions & 2 deletions pkg/sql/catalog/lease/descriptor_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
"github.com/cockroachdb/redact"
)

Expand Down Expand Up @@ -259,7 +260,7 @@ func (t *descriptorState) release(ctx context.Context, s *descriptorVersionState
return nil
}
if l := maybeRemoveLease(); l != nil {
releaseLease(l, t.m)
releaseLease(ctx, l, t.m)
}
}

Expand All @@ -273,7 +274,9 @@ func (t *descriptorState) maybeQueueLeaseRenewal(
}

// Start the renewal. When it finishes, it will reset t.renewalInProgress.
return t.stopper.RunAsyncTask(context.Background(),
newCtx := m.ambientCtx.AnnotateCtx(context.Background())
newCtx = logtags.WithTags(newCtx, logtags.FromContext(ctx))
return t.stopper.RunAsyncTask(newCtx,
"lease renewal", func(ctx context.Context) {
t.startLeaseRenewal(ctx, m, id, name)
})
Expand Down
27 changes: 17 additions & 10 deletions pkg/sql/catalog/lease/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,9 @@ func acquireNodeLease(ctx context.Context, m *Manager, id descpb.ID) (bool, erro
// of the first context cancels other callers to the `acquireNodeLease()` method,
// because of its use of `singleflight.Group`. See issue #41780 for how this has
// happened.
newCtx, cancel := m.stopper.WithCancelOnQuiesce(logtags.WithTags(context.Background(), logtags.FromContext(ctx)))
baseCtx := m.ambientCtx.AnnotateCtx(context.Background())
baseCtx = logtags.WithTags(baseCtx, logtags.FromContext(ctx))
newCtx, cancel := m.stopper.WithCancelOnQuiesce(baseCtx)
defer cancel()
if m.isDraining() {
return nil, errors.New("cannot acquire lease when draining")
Expand All @@ -457,7 +459,7 @@ func acquireNodeLease(ctx context.Context, m *Manager, id descpb.ID) (bool, erro
m.names.insert(newDescVersionState)
}
if toRelease != nil {
releaseLease(toRelease, m)
releaseLease(ctx, toRelease, m)
}
return true, nil
})
Expand All @@ -473,17 +475,18 @@ func acquireNodeLease(ctx context.Context, m *Manager, id descpb.ID) (bool, erro
}

// releaseLease from store.
func releaseLease(lease *storedLease, m *Manager) {
ctx := context.TODO()
func releaseLease(ctx context.Context, lease *storedLease, m *Manager) {
if m.isDraining() {
// Release synchronously to guarantee release before exiting.
m.storage.release(ctx, m.stopper, lease)
return
}

// Release to the store asynchronously, without the descriptorState lock.
newCtx := m.ambientCtx.AnnotateCtx(context.Background())
newCtx = logtags.WithTags(newCtx, logtags.FromContext(ctx))
if err := m.stopper.RunAsyncTask(
ctx, "sql.descriptorState: releasing descriptor lease",
newCtx, "sql.descriptorState: releasing descriptor lease",
func(ctx context.Context) {
m.storage.release(ctx, m.stopper, lease)
}); err != nil {
Expand Down Expand Up @@ -528,7 +531,7 @@ func purgeOldVersions(
leases := t.removeInactiveVersions()
t.mu.Unlock()
for _, l := range leases {
releaseLease(l, m)
releaseLease(ctx, l, m)
}
}

Expand Down Expand Up @@ -950,7 +953,9 @@ func (m *Manager) isDraining() bool {
// to report work that needed to be done and which may or may not have
// been done by the time this call returns. See the explanation in
// pkg/server/drain.go for details.
func (m *Manager) SetDraining(drain bool, reporter func(int, redact.SafeString)) {
func (m *Manager) SetDraining(
ctx context.Context, drain bool, reporter func(int, redact.SafeString),
) {
m.draining.Store(drain)
if !drain {
return
Expand All @@ -963,7 +968,7 @@ func (m *Manager) SetDraining(drain bool, reporter func(int, redact.SafeString))
leases := t.removeInactiveVersions()
t.mu.Unlock()
for _, l := range leases {
releaseLease(l, m)
releaseLease(ctx, l, m)
}
if reporter != nil {
// Report progress through the Drain RPC.
Expand Down Expand Up @@ -1166,7 +1171,7 @@ func (m *Manager) refreshSomeLeases(ctx context.Context) {
// DeleteOrphanedLeases releases all orphaned leases created by a prior
// instance of this node. timeThreshold is a walltime lower than the
// lowest hlc timestamp that the current instance of the node can use.
func (m *Manager) DeleteOrphanedLeases(timeThreshold int64) {
func (m *Manager) DeleteOrphanedLeases(ctx context.Context, timeThreshold int64) {
if m.testingKnobs.DisableDeleteOrphanedLeases {
return
}
Expand All @@ -1179,7 +1184,9 @@ func (m *Manager) DeleteOrphanedLeases(timeThreshold int64) {

// Run as async worker to prevent blocking the main server Start method.
// Exit after releasing all the orphaned leases.
_ = m.stopper.RunAsyncTask(context.Background(), "del-orphaned-leases", func(ctx context.Context) {
newCtx := m.ambientCtx.AnnotateCtx(context.Background())
newCtx = logtags.WithTags(newCtx, logtags.FromContext(ctx))
_ = m.stopper.RunAsyncTask(newCtx, "del-orphaned-leases", func(ctx context.Context) {
// This could have been implemented using DELETE WHERE, but DELETE WHERE
// doesn't implement AS OF SYSTEM TIME.

Expand Down
9 changes: 5 additions & 4 deletions pkg/sql/catalog/lease/lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,7 @@ func TestLeaseManagerDrain(testingT *testing.T) {
t := newLeaseTest(testingT, params)
defer t.cleanup()

ctx := context.Background()
const descID = keys.LeaseTableID

{
Expand All @@ -499,8 +500,8 @@ func TestLeaseManagerDrain(testingT *testing.T) {
// starts draining.
l1RemovalTracker := leaseRemovalTracker.TrackRemoval(l1.Underlying())

t.nodes[1].SetDraining(true, nil /* reporter */)
t.nodes[2].SetDraining(true, nil /* reporter */)
t.nodes[1].SetDraining(ctx, true, nil /* reporter */)
t.nodes[2].SetDraining(ctx, true, nil /* reporter */)

// Leases cannot be acquired when in draining mode.
if _, err := t.acquire(1, descID); !testutils.IsError(err, "cannot acquire lease when draining") {
Expand All @@ -523,7 +524,7 @@ func TestLeaseManagerDrain(testingT *testing.T) {
{
// Check that leases with a refcount of 0 are correctly kept in the
// store once the drain mode has been exited.
t.nodes[1].SetDraining(false, nil /* reporter */)
t.nodes[1].SetDraining(ctx, false, nil /* reporter */)
l1 := t.mustAcquire(1, descID)
t.mustRelease(1, l1, nil)
t.expectLeases(descID, "/1/1")
Expand Down Expand Up @@ -2007,7 +2008,7 @@ CREATE TABLE t.after (k CHAR PRIMARY KEY, v CHAR);
t.expectLeases(afterDesc.GetID(), "/1/1")

// Call DeleteOrphanedLeases() with the server startup time.
t.node(1).DeleteOrphanedLeases(now)
t.node(1).DeleteOrphanedLeases(ctx, now)
// Orphaned lease is gone.
t.expectLeases(beforeDesc.GetID(), "")
t.expectLeases(afterDesc.GetID(), "/1/1")
Expand Down
5 changes: 5 additions & 0 deletions pkg/testutils/serverutils/test_tenant_shim.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
package serverutils

import (
"context"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/rpc"
)
Expand Down Expand Up @@ -56,4 +58,7 @@ type TestTenantInterface interface {

// RPCContext returns the *rpc.Context
RPCContext() *rpc.Context

// AnnotateCtx annotates a context.
AnnotateCtx(context.Context) context.Context
}