Skip to content

Commit cc80271

Browse files
(manual census reporting) add CLI subcommand and API endpoint for census utilization
1 parent 72918f9 commit cc80271

File tree

24 files changed

+824
-282
lines changed

24 files changed

+824
-282
lines changed

agent/agent.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1602,6 +1602,7 @@ func newConsulConfig(runtimeCfg *config.RuntimeConfig, logger hclog.Logger) (*co
16021602
cfg.Cloud = runtimeCfg.Cloud
16031603

16041604
cfg.Reporting.License.Enabled = runtimeCfg.Reporting.License.Enabled
1605+
cfg.Reporting.SnapshotRetentionTime = runtimeCfg.Reporting.SnapshotRetentionTime
16051606

16061607
cfg.ServerRejoinAgeMax = runtimeCfg.ServerRejoinAgeMax
16071608
cfg.EnableXDSLoadBalancing = runtimeCfg.EnableXDSLoadBalancing

agent/config/config.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1000,5 +1000,6 @@ type License struct {
10001000
}
10011001

10021002
type Reporting struct {
1003-
License License `mapstructure:"license"`
1003+
License License `mapstructure:"license"`
1004+
SnapshotRetentionTime *string `mapstructure:"snapshot_retention_time" json:"snapshot_retention_time,omitempty"`
10041005
}

agent/config/runtime.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1541,7 +1541,8 @@ type LicenseConfig struct {
15411541
}
15421542

15431543
type ReportingConfig struct {
1544-
License LicenseConfig
1544+
License LicenseConfig
1545+
SnapshotRetentionTime time.Duration
15451546
}
15461547

15471548
type AutoConfig struct {

agent/consul/config.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717

1818
"github.com/hashicorp/consul/agent/checks"
1919
consulrate "github.com/hashicorp/consul/agent/consul/rate"
20+
"github.com/hashicorp/consul/agent/consul/reporting"
2021
hcpconfig "github.com/hashicorp/consul/agent/hcp/config"
2122
"github.com/hashicorp/consul/agent/structs"
2223
"github.com/hashicorp/consul/internal/gossip/libserf"
@@ -597,6 +598,8 @@ func DefaultConfig() *Config {
597598
ServerRejoinAgeMax: 24 * 7 * time.Hour,
598599
}
599600

601+
conf.Reporting.SnapshotRetentionTime = reporting.DefaultSnapshotRetention
602+
600603
// Increase our reap interval to 3 days instead of 24h.
601604
conf.SerfLANConfig.ReconnectTimeout = 3 * 24 * time.Hour
602605
conf.SerfWANConfig.ReconnectTimeout = 3 * 24 * time.Hour
@@ -732,5 +735,6 @@ type License struct {
732735
}
733736

734737
type Reporting struct {
735-
License License
738+
License License
739+
SnapshotRetentionTime time.Duration
736740
}

agent/consul/leader.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -340,9 +340,8 @@ func (s *Server) establishLeadership(ctx context.Context) error {
340340
s.startLogVerification(ctx)
341341
}
342342

343-
if s.config.Reporting.License.Enabled && s.reportingManager != nil {
344-
s.reportingManager.StartReportingAgent()
345-
}
343+
// The reporting manager will start the metrics collection but will only report if config.reporting.license.enabled is true
344+
s.reportingManager.StartReportingAgent()
346345

347346
s.logger.Debug("successfully established leadership", "duration", time.Since(start))
348347
return nil

agent/consul/reporting/reporting.go

Lines changed: 40 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,45 +11,75 @@ import (
1111
"github.com/hashicorp/consul/agent/structs"
1212
"github.com/hashicorp/go-hclog"
1313
"github.com/hashicorp/go-memdb"
14+
"github.com/hashicorp/go-retryablehttp"
1415
)
1516

1617
type ReportingManager struct {
17-
logger hclog.Logger
18-
server ServerDelegate
19-
stateProvider StateDelegate
20-
tickerInterval time.Duration
18+
logger hclog.Logger
19+
clusterId string
20+
autoReporting bool
21+
server ServerDelegate
22+
stateProvider StateDelegate
23+
tickerInterval time.Duration
24+
manualSnapshotInterval time.Duration
25+
snapshotRetention time.Duration
26+
customerID string
2127
EntDeps
2228
sync.RWMutex
29+
manualHTTPClient *retryablehttp.Client
30+
manualServiceAddress string
2331
}
2432

2533
const (
2634
SystemMetadataReportingProcessID = "reporting-process-id"
2735
ReportingInterval = 1 * time.Hour
2836
)
2937

38+
const (
39+
// ManualSnapshotInterval controls how often we persist manual census snapshots.
40+
ManualSnapshotInterval = 24 * time.Hour
41+
// DefaultSnapshotRetention is the default retention period for manual census snapshots.
42+
DefaultSnapshotRetention = 9600 * time.Hour // 400 days
43+
)
44+
3045
//go:generate mockery --name ServerDelegate --inpackage
3146
type ServerDelegate interface {
3247
GetSystemMetadata(key string) (string, error)
3348
SetSystemMetadataKey(key, val string) error
3449
IsLeader() bool
50+
ApplyCensusRequest(req *structs.CensusRequest) error
3551
}
3652

3753
type StateDelegate interface {
54+
// Metrics methods
3855
NodeUsage() (uint64, state.NodeUsage, error)
3956
ServiceUsage(ws memdb.WatchSet, tenantUsage bool) (uint64, structs.ServiceUsage, error)
57+
// Census methods
58+
CensusPut(idx uint64, req *structs.CensusRequest) error
59+
CensusPrune(idx uint64, cutoff time.Time) (int, error)
60+
CensusListAll() (uint64, []*state.CensusSnapshot, error)
4061
}
4162

42-
func NewReportingManager(logger hclog.Logger, deps EntDeps, server ServerDelegate, stateProvider StateDelegate) *ReportingManager {
63+
func NewReportingManager(logger hclog.Logger, clusterId string, autoReporting bool, deps EntDeps, server ServerDelegate, stateProvider StateDelegate, snapshotRetention time.Duration) *ReportingManager {
64+
if snapshotRetention <= 0 {
65+
snapshotRetention = DefaultSnapshotRetention
66+
}
67+
4368
rm := &ReportingManager{
44-
logger: logger.Named("reporting"),
45-
server: server,
46-
stateProvider: stateProvider,
47-
tickerInterval: ReportingInterval,
69+
logger: logger.Named("reporting"),
70+
clusterId: clusterId,
71+
autoReporting: autoReporting,
72+
server: server,
73+
stateProvider: stateProvider,
74+
tickerInterval: ReportingInterval,
75+
manualSnapshotInterval: ManualSnapshotInterval,
76+
snapshotRetention: snapshotRetention,
4877
}
4978
err := rm.initEnterpriseReporting(deps)
5079
if err != nil {
5180
rm.logger.Error("Error initializing reporting manager", "error", err)
5281
return nil
5382
}
83+
rm.logger.Debug("Created reporting manager")
5484
return rm
55-
}
85+
}

agent/consul/reporting/reporting_ce.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,10 @@ func (rm *ReportingManager) StopReportingAgent() error {
2626
return nil
2727
}
2828

29-
func (m *ReportingManager) Run(ctx context.Context) {
29+
func (rm *ReportingManager) RunMetricsWriter(ctx context.Context) {
30+
// no op
31+
}
32+
33+
func (rm *ReportingManager) RunManualSnapshotWriter(ctx context.Context) {
3034
// no op
3135
}

agent/consul/server.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -817,8 +817,9 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server,
817817
s.overviewManager = NewOverviewManager(s.logger, s.fsm, s.config.MetricsReportingInterval)
818818
go s.overviewManager.Run(&lib.StopChannelContext{StopCh: s.shutdownCh})
819819

820-
s.reportingManager = reporting.NewReportingManager(s.logger, getEnterpriseReportingDeps(flat), s, s.fsm.State())
821-
go s.reportingManager.Run(&lib.StopChannelContext{StopCh: s.shutdownCh})
820+
s.reportingManager = reporting.NewReportingManager(s.logger, config.Datacenter, config.Reporting.License.Enabled, getEnterpriseReportingDeps(flat), s, s.fsm.State(), config.Reporting.SnapshotRetentionTime)
821+
go s.reportingManager.RunMetricsWriter(&lib.StopChannelContext{StopCh: s.shutdownCh})
822+
go s.reportingManager.RunManualSnapshotWriter(&lib.StopChannelContext{StopCh: s.shutdownCh})
822823

823824
// configure the server specific grpc interfaces (in-process + internal multiplexed grpc)
824825
if err := s.setupGRPCInterfaces(config, flat); err != nil {
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
package state
2+
3+
import (
4+
"time"
5+
6+
"github.com/hashicorp/consul/agent/structs"
7+
"github.com/hashicorp/go-memdb"
8+
)
9+
10+
const censusTable = "census_snapshots"
11+
12+
type CensusSnapshot struct {
13+
ID string
14+
TS time.Time
15+
TSUnix int64
16+
Data []byte
17+
CreateIndex uint64
18+
ModifyIndex uint64
19+
}
20+
21+
func censusTableSchema() *memdb.TableSchema {
22+
return &memdb.TableSchema{
23+
Name: censusTable,
24+
Indexes: map[string]*memdb.IndexSchema{
25+
"id": {Name: "id", Unique: true, Indexer: &memdb.StringFieldIndex{Field: "ID"}},
26+
"ts": {Name: "ts", Unique: false, Indexer: &memdb.IntFieldIndex{Field: "TSUnix"}},
27+
"create": {Name: "create", Unique: false, Indexer: &memdb.UintFieldIndex{Field: "CreateIndex"}},
28+
"modify": {Name: "modify", Unique: false, Indexer: &memdb.UintFieldIndex{Field: "ModifyIndex"}},
29+
},
30+
}
31+
}
32+
33+
// Register in the DB schema builder:
34+
// Tables[censusTable] = censusTableSchema()
35+
36+
// Write: insert one snapshot at raft index idx.
37+
func (s *Store) CensusPut(idx uint64, req *structs.CensusRequest) error {
38+
tx := s.db.WriteTxn(idx)
39+
defer tx.Abort()
40+
row := &CensusSnapshot{
41+
ID: req.ID,
42+
TS: req.TS.UTC(),
43+
TSUnix: req.TS.UTC().Unix(),
44+
Data: append([]byte(nil), req.Data...),
45+
CreateIndex: idx,
46+
ModifyIndex: idx,
47+
}
48+
49+
if err := tx.Insert(censusTable, row); err != nil {
50+
return err
51+
}
52+
if err := tx.Insert(tableIndex, &IndexEntry{censusTable, idx}); err != nil {
53+
return err
54+
}
55+
return tx.Commit()
56+
}
57+
58+
// Read: list all (already pruned, so this is “everything we keep”).
59+
func (s *Store) CensusListAll() (uint64, []*CensusSnapshot, error) {
60+
txn := s.db.ReadTxn()
61+
defer txn.Abort()
62+
it, err := txn.Get(censusTable, "ts")
63+
if err != nil {
64+
return 0, nil, err
65+
}
66+
var out []*CensusSnapshot
67+
for obj := it.Next(); obj != nil; obj = it.Next() {
68+
out = append(out, obj.(*CensusSnapshot))
69+
}
70+
return s.maxIndex(censusTable), out, nil
71+
}
72+
73+
// Write: prune anything with TS < cutoff. Returns count.
74+
func (s *Store) CensusPrune(idx uint64, cutoff time.Time) (int, error) {
75+
tx := s.db.WriteTxn(idx)
76+
defer tx.Abort()
77+
cut := cutoff.UTC().Unix()
78+
79+
it, err := tx.Get(censusTable, "ts")
80+
if err != nil {
81+
return 0, err
82+
}
83+
84+
c := 0
85+
for obj := it.Next(); obj != nil; obj = it.Next() {
86+
row := obj.(*CensusSnapshot)
87+
if row.TSUnix < cut {
88+
if err := tx.Delete(censusTable, row); err != nil {
89+
return c, err
90+
}
91+
c++
92+
} else {
93+
// rows are ordered by TS; once we reach >= cut we can bail if desired
94+
break
95+
}
96+
}
97+
98+
if err := tx.Insert(tableIndex, &IndexEntry{censusTable, idx}); err != nil {
99+
return c, err
100+
}
101+
return c, tx.Commit()
102+
}

agent/consul/state/schema.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ func newDBSchema() *memdb.DBSchema {
4848
tokensTableSchema,
4949
tombstonesTableSchema,
5050
usageTableSchema,
51+
censusTableSchema,
5152
)
5253
withEnterpriseSchema(db)
5354
return db

0 commit comments

Comments
 (0)