diff --git a/pkg/jobs/registry.go b/pkg/jobs/registry.go index 991a93218d00..3313212c9b36 100644 --- a/pkg/jobs/registry.go +++ b/pkg/jobs/registry.go @@ -249,7 +249,9 @@ func (r *Registry) ID() base.SQLInstanceID { // makeCtx returns a new context from r's ambient context and an associated // cancel func. func (r *Registry) makeCtx() (context.Context, func()) { - return context.WithCancel(r.ac.AnnotateCtx(r.serverCtx)) + ctx := r.ac.AnnotateCtx(context.Background()) + ctx = logtags.WithTags(ctx, logtags.FromContext(r.serverCtx)) + return context.WithCancel(ctx) } // MakeJobID generates a new job ID. diff --git a/pkg/server/drain.go b/pkg/server/drain.go index dd733acc1ffc..80755261e607 100644 --- a/pkg/server/drain.go +++ b/pkg/server/drain.go @@ -207,7 +207,7 @@ func (s *Server) drainClients(ctx context.Context, reporter func(int, redact.Saf // Drain the SQL leases. This must be done after the pgServer has // given sessions a chance to finish ongoing work. - s.sqlServer.leaseMgr.SetDraining(true /* drain */, reporter) + s.sqlServer.leaseMgr.SetDraining(ctx, true /* drain */, reporter) // Done. This executes the defers set above to drain SQL leases. return nil diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index 1b18d725cc2d..ece54e1ef863 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -112,6 +112,7 @@ import ( // standalone SQLServer instances per tenant (the KV layer is shared across all // tenants). type SQLServer struct { + ambientCtx log.AmbientContext stopper *stop.Stopper sqlIDContainer *base.SQLIDContainer pgServer *pgwire.Server @@ -906,6 +907,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { } return &SQLServer{ + ambientCtx: cfg.BaseConfig.AmbientCtx, stopper: cfg.stopper, sqlIDContainer: cfg.nodeIDContainer, pgServer: pgServer, @@ -1120,7 +1122,7 @@ func (s *SQLServer) preStart( // Delete all orphaned table leases created by a prior instance of this // node. This also uses SQL. - s.leaseMgr.DeleteOrphanedLeases(orphanedLeasesTimeThresholdNanos) + s.leaseMgr.DeleteOrphanedLeases(ctx, orphanedLeasesTimeThresholdNanos) // Start scheduled jobs daemon. jobs.StartJobSchedulerDaemon( @@ -1164,3 +1166,8 @@ func (s *SQLServer) SQLInstanceID() base.SQLInstanceID { func (s *SQLServer) StartDiagnostics(ctx context.Context) { s.diagnosticsReporter.PeriodicallyReportDiagnostics(ctx, s.stopper) } + +// AnnotateCtx annotates the given context with the server tracer and tags. +func (s *SQLServer) AnnotateCtx(ctx context.Context) context.Context { + return s.ambientCtx.AnnotateCtx(ctx) +} diff --git a/pkg/server/tenant.go b/pkg/server/tenant.go index 707ed1ca12d9..87511009c87c 100644 --- a/pkg/server/tenant.go +++ b/pkg/server/tenant.go @@ -67,7 +67,7 @@ func StartTenant( return nil, "", "", err } - args, err := makeTenantSQLServerArgs(stopper, kvClusterName, baseCfg, sqlCfg) + args, err := makeTenantSQLServerArgs(ctx, stopper, kvClusterName, baseCfg, sqlCfg) if err != nil { return nil, "", "", err } @@ -96,7 +96,18 @@ func StartTenant( // TODO(davidh): Do we need to force this to be false? baseCfg.SplitListenSQL = false - background := baseCfg.AmbientCtx.AnnotateCtx(context.Background()) + // Add the server tags to the startup context. + // + // We use args.BaseConfig here instead of baseCfg directly because + // makeTenantSQLArgs defines its own AmbientCtx instance and it's + // defined by-value. + ctx = args.BaseConfig.AmbientCtx.AnnotateCtx(ctx) + + // Add the server tags to a generic background context for use + // by async goroutines. + // We can only annotate the context after makeTenantSQLServerArgs + // has defined the instance ID container in the AmbientCtx. + background := args.BaseConfig.AmbientCtx.AnnotateCtx(context.Background()) // StartListenRPCAndSQL will replace the SQLAddr fields if we choose // to share the SQL and gRPC port so here, since the tenant config @@ -164,6 +175,16 @@ func StartTenant( args.sqlStatusServer = tenantStatusServer s, err := newSQLServer(ctx, args) tenantStatusServer.sqlServer = s + // Also add the SQL instance tag to the tenant status server's + // ambient context. + // + // We use the tag "sqli" instead of just "sql" because the latter is + // too generic and would be hard to search if someone was looking at + // a log message and wondering what it stands for. + // + // TODO(knz): find a way to share common logging tags between + // multiple AmbientContext instances. + tenantStatusServer.AmbientContext.AddLogTag("sqli", s.sqlIDContainer) if err != nil { return nil, "", "", err @@ -355,7 +376,11 @@ func loadVarsHandler( } func makeTenantSQLServerArgs( - stopper *stop.Stopper, kvClusterName string, baseCfg BaseConfig, sqlCfg SQLConfig, + startupCtx context.Context, + stopper *stop.Stopper, + kvClusterName string, + baseCfg BaseConfig, + sqlCfg SQLConfig, ) (sqlServerArgs, error) { st := baseCfg.Settings @@ -366,6 +391,7 @@ func makeTenantSQLServerArgs( // too generic and would be hard to search if someone was looking at // a log message and wondering what it stands for. baseCfg.AmbientCtx.AddLogTag("sqli", instanceIDContainer) + startupCtx = baseCfg.AmbientCtx.AnnotateCtx(startupCtx) // TODO(tbg): this is needed so that the RPC heartbeats between the testcluster // and this tenant work. @@ -480,7 +506,7 @@ func makeTenantSQLServerArgs( recorder := status.NewMetricsRecorder(clock, nil, rpcContext, nil, st) - runtime := status.NewRuntimeStatSampler(context.Background(), clock) + runtime := status.NewRuntimeStatSampler(startupCtx, clock) registry.AddMetricStruct(runtime) esb := &externalStorageBuilder{} diff --git a/pkg/sql/catalog/lease/descriptor_state.go b/pkg/sql/catalog/lease/descriptor_state.go index fe432bb6bdec..a6c75aa7b7c3 100644 --- a/pkg/sql/catalog/lease/descriptor_state.go +++ b/pkg/sql/catalog/lease/descriptor_state.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/errors" + "github.com/cockroachdb/logtags" "github.com/cockroachdb/redact" ) @@ -259,7 +260,7 @@ func (t *descriptorState) release(ctx context.Context, s *descriptorVersionState return nil } if l := maybeRemoveLease(); l != nil { - releaseLease(l, t.m) + releaseLease(ctx, l, t.m) } } @@ -273,7 +274,9 @@ func (t *descriptorState) maybeQueueLeaseRenewal( } // Start the renewal. When it finishes, it will reset t.renewalInProgress. - return t.stopper.RunAsyncTask(context.Background(), + newCtx := m.ambientCtx.AnnotateCtx(context.Background()) + newCtx = logtags.WithTags(newCtx, logtags.FromContext(ctx)) + return t.stopper.RunAsyncTask(newCtx, "lease renewal", func(ctx context.Context) { t.startLeaseRenewal(ctx, m, id, name) }) diff --git a/pkg/sql/catalog/lease/lease.go b/pkg/sql/catalog/lease/lease.go index 7c5594fa9c5b..31134e1c45d4 100644 --- a/pkg/sql/catalog/lease/lease.go +++ b/pkg/sql/catalog/lease/lease.go @@ -430,7 +430,9 @@ func acquireNodeLease(ctx context.Context, m *Manager, id descpb.ID) (bool, erro // of the first context cancels other callers to the `acquireNodeLease()` method, // because of its use of `singleflight.Group`. See issue #41780 for how this has // happened. - newCtx, cancel := m.stopper.WithCancelOnQuiesce(logtags.WithTags(context.Background(), logtags.FromContext(ctx))) + baseCtx := m.ambientCtx.AnnotateCtx(context.Background()) + baseCtx = logtags.WithTags(baseCtx, logtags.FromContext(ctx)) + newCtx, cancel := m.stopper.WithCancelOnQuiesce(baseCtx) defer cancel() if m.isDraining() { return nil, errors.New("cannot acquire lease when draining") @@ -457,7 +459,7 @@ func acquireNodeLease(ctx context.Context, m *Manager, id descpb.ID) (bool, erro m.names.insert(newDescVersionState) } if toRelease != nil { - releaseLease(toRelease, m) + releaseLease(ctx, toRelease, m) } return true, nil }) @@ -473,8 +475,7 @@ func acquireNodeLease(ctx context.Context, m *Manager, id descpb.ID) (bool, erro } // releaseLease from store. -func releaseLease(lease *storedLease, m *Manager) { - ctx := context.TODO() +func releaseLease(ctx context.Context, lease *storedLease, m *Manager) { if m.isDraining() { // Release synchronously to guarantee release before exiting. m.storage.release(ctx, m.stopper, lease) @@ -482,8 +483,10 @@ func releaseLease(lease *storedLease, m *Manager) { } // Release to the store asynchronously, without the descriptorState lock. + newCtx := m.ambientCtx.AnnotateCtx(context.Background()) + newCtx = logtags.WithTags(newCtx, logtags.FromContext(ctx)) if err := m.stopper.RunAsyncTask( - ctx, "sql.descriptorState: releasing descriptor lease", + newCtx, "sql.descriptorState: releasing descriptor lease", func(ctx context.Context) { m.storage.release(ctx, m.stopper, lease) }); err != nil { @@ -528,7 +531,7 @@ func purgeOldVersions( leases := t.removeInactiveVersions() t.mu.Unlock() for _, l := range leases { - releaseLease(l, m) + releaseLease(ctx, l, m) } } @@ -950,7 +953,9 @@ func (m *Manager) isDraining() bool { // to report work that needed to be done and which may or may not have // been done by the time this call returns. See the explanation in // pkg/server/drain.go for details. -func (m *Manager) SetDraining(drain bool, reporter func(int, redact.SafeString)) { +func (m *Manager) SetDraining( + ctx context.Context, drain bool, reporter func(int, redact.SafeString), +) { m.draining.Store(drain) if !drain { return @@ -963,7 +968,7 @@ func (m *Manager) SetDraining(drain bool, reporter func(int, redact.SafeString)) leases := t.removeInactiveVersions() t.mu.Unlock() for _, l := range leases { - releaseLease(l, m) + releaseLease(ctx, l, m) } if reporter != nil { // Report progress through the Drain RPC. @@ -1166,7 +1171,7 @@ func (m *Manager) refreshSomeLeases(ctx context.Context) { // DeleteOrphanedLeases releases all orphaned leases created by a prior // instance of this node. timeThreshold is a walltime lower than the // lowest hlc timestamp that the current instance of the node can use. -func (m *Manager) DeleteOrphanedLeases(timeThreshold int64) { +func (m *Manager) DeleteOrphanedLeases(ctx context.Context, timeThreshold int64) { if m.testingKnobs.DisableDeleteOrphanedLeases { return } @@ -1179,7 +1184,9 @@ func (m *Manager) DeleteOrphanedLeases(timeThreshold int64) { // Run as async worker to prevent blocking the main server Start method. // Exit after releasing all the orphaned leases. - _ = m.stopper.RunAsyncTask(context.Background(), "del-orphaned-leases", func(ctx context.Context) { + newCtx := m.ambientCtx.AnnotateCtx(context.Background()) + newCtx = logtags.WithTags(newCtx, logtags.FromContext(ctx)) + _ = m.stopper.RunAsyncTask(newCtx, "del-orphaned-leases", func(ctx context.Context) { // This could have been implemented using DELETE WHERE, but DELETE WHERE // doesn't implement AS OF SYSTEM TIME. diff --git a/pkg/sql/catalog/lease/lease_test.go b/pkg/sql/catalog/lease/lease_test.go index 6dffb79f1614..128145126a64 100644 --- a/pkg/sql/catalog/lease/lease_test.go +++ b/pkg/sql/catalog/lease/lease_test.go @@ -487,6 +487,7 @@ func TestLeaseManagerDrain(testingT *testing.T) { t := newLeaseTest(testingT, params) defer t.cleanup() + ctx := context.Background() const descID = keys.LeaseTableID { @@ -499,8 +500,8 @@ func TestLeaseManagerDrain(testingT *testing.T) { // starts draining. l1RemovalTracker := leaseRemovalTracker.TrackRemoval(l1.Underlying()) - t.nodes[1].SetDraining(true, nil /* reporter */) - t.nodes[2].SetDraining(true, nil /* reporter */) + t.nodes[1].SetDraining(ctx, true, nil /* reporter */) + t.nodes[2].SetDraining(ctx, true, nil /* reporter */) // Leases cannot be acquired when in draining mode. if _, err := t.acquire(1, descID); !testutils.IsError(err, "cannot acquire lease when draining") { @@ -523,7 +524,7 @@ func TestLeaseManagerDrain(testingT *testing.T) { { // Check that leases with a refcount of 0 are correctly kept in the // store once the drain mode has been exited. - t.nodes[1].SetDraining(false, nil /* reporter */) + t.nodes[1].SetDraining(ctx, false, nil /* reporter */) l1 := t.mustAcquire(1, descID) t.mustRelease(1, l1, nil) t.expectLeases(descID, "/1/1") @@ -2007,7 +2008,7 @@ CREATE TABLE t.after (k CHAR PRIMARY KEY, v CHAR); t.expectLeases(afterDesc.GetID(), "/1/1") // Call DeleteOrphanedLeases() with the server startup time. - t.node(1).DeleteOrphanedLeases(now) + t.node(1).DeleteOrphanedLeases(ctx, now) // Orphaned lease is gone. t.expectLeases(beforeDesc.GetID(), "") t.expectLeases(afterDesc.GetID(), "/1/1") diff --git a/pkg/testutils/serverutils/test_tenant_shim.go b/pkg/testutils/serverutils/test_tenant_shim.go index f3ad498ebe13..86b28a6b3a95 100644 --- a/pkg/testutils/serverutils/test_tenant_shim.go +++ b/pkg/testutils/serverutils/test_tenant_shim.go @@ -14,6 +14,8 @@ package serverutils import ( + "context" + "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/rpc" ) @@ -56,4 +58,7 @@ type TestTenantInterface interface { // RPCContext returns the *rpc.Context RPCContext() *rpc.Context + + // AnnotateCtx annotates a context. + AnnotateCtx(context.Context) context.Context }