Skip to content

Commit 8fcbb5e

Browse files
committed
[AWS] Garbage collection support
1 parent 8ee4e93 commit 8fcbb5e

4 files changed

Lines changed: 388 additions & 17 deletions

File tree

storage/aws/README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,10 @@ This table is used to coordinate integration of sequenced batches in the `Seq` t
3838
This table is used to coordinate publication of new checkpoints, ensuring that checkpoints are not published
3939
more frequently than configured.
4040

41+
### `GCCoord`
42+
This table is used to coordinate garbage collection of partial tiles and entry bundles which have been
43+
make obsolete by the continued growth of the log.
44+
4145
## Life of a leaf
4246

4347
1. Leaves are submitted by the binary built using Tessera via a call the storage's `Add` func.

storage/aws/aws.go

Lines changed: 195 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ import (
5757
"github.com/transparency-dev/tessera/api"
5858
"github.com/transparency-dev/tessera/api/layout"
5959
"github.com/transparency-dev/tessera/internal/migrate"
60+
"github.com/transparency-dev/tessera/internal/parse"
6061
"github.com/transparency-dev/tessera/internal/stream"
6162
storage "github.com/transparency-dev/tessera/storage/internal"
6263
"golang.org/x/sync/errgroup"
@@ -95,6 +96,7 @@ type objStore interface {
9596
getObject(ctx context.Context, obj string) ([]byte, error)
9697
setObject(ctx context.Context, obj string, data []byte, contType string, cacheControl string) error
9798
setObjectIfNoneMatch(ctx context.Context, obj string, data []byte, contType string, cacheControl string) error
99+
deleteObjectsWithPrefix(ctx context.Context, prefix string) error
98100
}
99101

100102
// sequencer describes a type which knows how to sequence entries.
@@ -117,6 +119,9 @@ type sequencer interface {
117119

118120
// publishCheckpoint coordinates the publication of new checkpoints based on the current integrated tree.
119121
publishCheckpoint(ctx context.Context, minAge time.Duration, f func(ctx context.Context, size uint64, root []byte) error) error
122+
123+
// garbageCollect coordinates the removal of unneeded partial tiles/entry bundles for the provided tree size, up to a maximum number of deletes per invocation.
124+
garbageCollect(ctx context.Context, treeSize uint64, maxDeletes uint, removePrefix func(ctx context.Context, prefix string) error) error
120125
}
121126

122127
// consumeFunc is the signature of a function which can consume entries from the sequencer.
@@ -173,26 +178,36 @@ func New(ctx context.Context, cfg Config) (tessera.Driver, error) {
173178
}, nil
174179
}
175180

181+
// Appender creates a new tessera.Appender lifecycle object.
176182
func (s *Storage) Appender(ctx context.Context, opts *tessera.AppendOptions) (*tessera.Appender, tessera.LogReader, error) {
177-
pb := uint64(opts.PushbackMaxOutstanding())
178-
if pb == 0 {
179-
pb = DefaultPushbackMaxOutstanding
183+
seq, err := newMySQLSequencer(ctx, s.cfg.DSN, uint64(opts.PushbackMaxOutstanding()), s.cfg.MaxOpenConns, s.cfg.MaxIdleConns)
184+
if err != nil {
185+
return nil, nil, fmt.Errorf("failed to create MySQL sequencer: %v", err)
180186
}
181-
if opts.CheckpointInterval() < minCheckpointInterval {
182-
return nil, nil, fmt.Errorf("requested CheckpointInterval (%v) is less than minimum permitted %v", opts.CheckpointInterval(), minCheckpointInterval)
187+
188+
s3Store := &s3Storage{
189+
s3Client: s3.NewFromConfig(*s.cfg.SDKConfig, s.cfg.S3Options),
190+
bucket: s.cfg.Bucket,
191+
bucketPrefix: s.cfg.BucketPrefix,
183192
}
184193

185-
seq, err := newMySQLSequencer(ctx, s.cfg.DSN, pb, s.cfg.MaxOpenConns, s.cfg.MaxIdleConns)
194+
a, lr, err := s.newAppender(ctx, s3Store, seq, opts)
186195
if err != nil {
187-
return nil, nil, fmt.Errorf("failed to create MySQL sequencer: %v", err)
196+
return nil, nil, err
197+
}
198+
return &tessera.Appender{
199+
Add: a.Add,
200+
}, lr, nil
201+
}
202+
203+
// newAppender creates and initialises a tessera.Appender struct with the provided underlying storage implementations.
204+
func (s *Storage) newAppender(ctx context.Context, o objStore, seq sequencer, opts *tessera.AppendOptions) (*Appender, tessera.LogReader, error) {
205+
if opts.CheckpointInterval() < minCheckpointInterval {
206+
return nil, nil, fmt.Errorf("requested CheckpointInterval (%v) is less than minimum permitted %v", opts.CheckpointInterval(), minCheckpointInterval)
188207
}
189208

190209
logStore := &logResourceStore{
191-
objStore: &s3Storage{
192-
s3Client: s3.NewFromConfig(*s.cfg.SDKConfig, s.cfg.S3Options),
193-
bucket: s.cfg.Bucket,
194-
bucketPrefix: s.cfg.BucketPrefix,
195-
},
210+
objStore: o,
196211
entriesPath: opts.EntriesPath(),
197212
integratedSize: func(context.Context) (uint64, error) {
198213
s, _, err := seq.currentTree(ctx)
@@ -202,13 +217,14 @@ func (s *Storage) Appender(ctx context.Context, opts *tessera.AppendOptions) (*t
202217
return seq.nextIndex(ctx)
203218
},
204219
}
220+
205221
r := &Appender{
206222
logStore: logStore,
207223
sequencer: seq,
224+
queue: storage.NewQueue(ctx, opts.BatchMaxAge(), opts.BatchMaxSize(), seq.assignEntries),
208225
newCP: opts.CheckpointPublisher(logStore, http.DefaultClient),
209226
treeUpdated: make(chan struct{}),
210227
}
211-
r.queue = storage.NewQueue(ctx, opts.BatchMaxAge(), opts.BatchMaxSize(), r.sequencer.assignEntries)
212228

213229
if err := r.init(ctx); err != nil {
214230
return nil, nil, fmt.Errorf("failed to initialise log storage: %v", err)
@@ -220,9 +236,11 @@ func (s *Storage) Appender(ctx context.Context, opts *tessera.AppendOptions) (*t
220236
// Kick off go-routine which handles the publication of checkpoints.
221237
go r.publishCheckpointJob(ctx, opts.CheckpointInterval())
222238

223-
return &tessera.Appender{
224-
Add: r.Add,
225-
}, r.logStore, nil
239+
if i := opts.GarbageCollectionInterval(); i > 0 {
240+
go r.garbageCollectorJob(ctx, i)
241+
}
242+
243+
return r, r.logStore, nil
226244
}
227245

228246
// Appender is an implementation of the Tessera appender lifecycle contract.
@@ -287,6 +305,46 @@ func (a *Appender) publishCheckpointJob(ctx context.Context, interval time.Durat
287305
}
288306
}
289307

308+
// garbageCollectorJob is a long-running function which handles the removal of obsolete partial tiles
309+
// and entry bundles.
310+
// Blocks until ctx is done.
311+
func (a *Appender) garbageCollectorJob(ctx context.Context, i time.Duration) {
312+
t := time.NewTicker(i)
313+
defer t.Stop()
314+
315+
// Entirely arbitrary number.
316+
maxBundlesPerRun := uint(100)
317+
318+
for {
319+
select {
320+
case <-ctx.Done():
321+
return
322+
case <-t.C:
323+
}
324+
func() {
325+
ctx, span := tracer.Start(ctx, "tessera.storage.aws.garbageCollectTask")
326+
defer span.End()
327+
328+
// Figure out the size of the latest published checkpoint - we can't be removing partial tiles implied by
329+
// that checkpoint just because we've done an integration and know about a larger (but as yet unpublished)
330+
// checkpoint!
331+
cp, err := a.logStore.ReadCheckpoint(ctx)
332+
if err != nil {
333+
klog.Warningf("Failed to get published checkpoint: %v", err)
334+
}
335+
_, pubSize, _, err := parse.CheckpointUnsafe(cp)
336+
if err != nil {
337+
klog.Warningf("Failed to parse published checkpoint: %v", err)
338+
}
339+
340+
if err := a.sequencer.garbageCollect(ctx, pubSize, maxBundlesPerRun, a.logStore.objStore.deleteObjectsWithPrefix); err != nil {
341+
klog.Warningf("GarbageCollect failed: %v", err)
342+
}
343+
}()
344+
}
345+
346+
}
347+
290348
// Add is the entrypoint for adding entries to a sequencing log.
291349
func (a *Appender) Add(ctx context.Context, e *tessera.Entry) tessera.IndexFuture {
292350
return a.queue.Add(ctx, e)
@@ -889,6 +947,15 @@ func (s *mySQLSequencer) initDB(ctx context.Context) error {
889947
return err
890948
}
891949

950+
if _, err := s.dbPool.ExecContext(ctx,
951+
`CREATE TABLE IF NOT EXISTS GCCoord(
952+
id INT UNSIGNED NOT NULL,
953+
fromSize BIGINT NOT NULL,
954+
PRIMARY KEY (id)
955+
)`); err != nil {
956+
return err
957+
}
958+
892959
// Set default values for a newly initialised schema - these rows being present are a precondition for
893960
// sequencing and integration to occur.
894961
// Note that this will only succeed if no row exists, so there's no danger
@@ -909,6 +976,10 @@ func (s *mySQLSequencer) initDB(ctx context.Context) error {
909976
`INSERT IGNORE INTO PubCoord (id, publishedAt) VALUES (0, 0)`); err != nil {
910977
return err
911978
}
979+
if _, err := s.dbPool.ExecContext(ctx,
980+
`INSERT IGNORE INTO GCCoord (id, fromSize) VALUES (0, 0)`); err != nil {
981+
return err
982+
}
912983
return nil
913984
}
914985

@@ -1163,6 +1234,81 @@ func (s *mySQLSequencer) publishCheckpoint(ctx context.Context, minAge time.Dura
11631234
return nil
11641235
}
11651236

1237+
// garbageCollect will identify up to maxBundles unneeded partial entry bundles (and any unneeded partial tiles which sit above them in the tree) and
1238+
// call the provided function to remove them.
1239+
//
1240+
// Uses the `GCCoord` table to ensure that only one binary is actively garbage collecting at any given time, and to track progress so that we don't
1241+
// needlessly attempt to GC over regions which have already been cleaned.
1242+
func (s *mySQLSequencer) garbageCollect(ctx context.Context, treeSize uint64, maxBundles uint, deleteWithPrefix func(ctx context.Context, prefix string) error) error {
1243+
tx, err := s.dbPool.Begin()
1244+
if err != nil {
1245+
return err
1246+
}
1247+
defer func() {
1248+
if tx != nil {
1249+
_ = tx.Rollback()
1250+
}
1251+
}()
1252+
pRow := tx.QueryRowContext(ctx, "SELECT fromSize FROM GCCoord WHERE id = ? FOR UPDATE", 0)
1253+
var fromSize uint64
1254+
if err := pRow.Scan(&fromSize); err != nil {
1255+
return fmt.Errorf("failed to parse publishedAt: %v", err)
1256+
}
1257+
1258+
if fromSize == treeSize {
1259+
return nil
1260+
}
1261+
1262+
d := uint(0)
1263+
eg := errgroup.Group{}
1264+
// GC the tree in "vertical" chunks defined by entry bundles.
1265+
for ri := range layout.Range(fromSize, treeSize-fromSize, treeSize) {
1266+
// Only known-full bundles are in-scope for for GC, so exit if the current bundle is partial or
1267+
// we've reached our limit of chunks.
1268+
if ri.Partial > 0 || d > maxBundles {
1269+
break
1270+
}
1271+
1272+
// GC any partial versions of the entry bundle itself and the tile which sits immediately above it.
1273+
eg.Go(func() error { return deleteWithPrefix(ctx, layout.EntriesPath(ri.Index, 0)+".p/") })
1274+
eg.Go(func() error { return deleteWithPrefix(ctx, layout.TilePath(0, ri.Index, 0)+".p/") })
1275+
fromSize += uint64(ri.N)
1276+
d++
1277+
1278+
// Now consider (only) the part of the tree which sits above the bundle.
1279+
// We'll walk up the parent tiles for as a long as we're tracing the right-hand
1280+
// edge of a perfect subtree.
1281+
// This gives the property we'll only visit each parent tile once, rather than up to 256 times.
1282+
pL, pIdx := uint64(0), ri.Index
1283+
for isLastLeafInParent(pIdx) {
1284+
// Move our coordinates up to the parent
1285+
pL, pIdx = pL+1, pIdx>>layout.TileHeight
1286+
// GC any partial versions of the parent tile.
1287+
eg.Go(func() error { return deleteWithPrefix(ctx, layout.TilePath(pL, pIdx, 0)+".p/") })
1288+
1289+
}
1290+
}
1291+
if err := eg.Wait(); err != nil {
1292+
return fmt.Errorf("failed to delete one or more objects: %v", err)
1293+
}
1294+
1295+
if _, err := tx.ExecContext(ctx, "UPDATE GCCoord SET fromSize=? WHERE id=?", fromSize, 0); err != nil {
1296+
return err
1297+
}
1298+
if err := tx.Commit(); err != nil {
1299+
return err
1300+
}
1301+
tx = nil
1302+
1303+
return nil
1304+
}
1305+
1306+
// isLastLeafInParent returns true if a tile with the provided index is the final child node of a
1307+
// (hypothetical) full parent tile.
1308+
func isLastLeafInParent(i uint64) bool {
1309+
return i%layout.TileWidth == layout.TileWidth-1
1310+
}
1311+
11661312
func placeholder(n int) string {
11671313
places := make([]string, n)
11681314
for i := range n {
@@ -1265,6 +1411,39 @@ func (s *s3Storage) setObjectIfNoneMatch(ctx context.Context, objName string, da
12651411
return nil
12661412
}
12671413

1414+
// deleteObjectsWithPrefix removes any objects with the provided prefix from S3.
1415+
func (s *s3Storage) deleteObjectsWithPrefix(ctx context.Context, objPrefix string) error {
1416+
ctx, span := tracer.Start(ctx, "tessera.storage.aws.deleteObject")
1417+
defer span.End()
1418+
1419+
if s.bucketPrefix != "" {
1420+
objPrefix = filepath.Join(s.bucketPrefix, objPrefix)
1421+
}
1422+
span.SetAttributes(objectPathKey.String(objPrefix))
1423+
1424+
l, err := s.s3Client.ListObjectsV2(ctx, &s3.ListObjectsV2Input{
1425+
Bucket: aws.String(s.bucket),
1426+
Prefix: aws.String(objPrefix),
1427+
})
1428+
if err != nil {
1429+
return fmt.Errorf("failed to list objects with prefix %q: %v", objPrefix, err)
1430+
}
1431+
di := &s3.DeleteObjectsInput{
1432+
Bucket: aws.String(s.bucket),
1433+
Delete: &types.Delete{
1434+
Objects: make([]types.ObjectIdentifier, 0, len(l.Contents)),
1435+
},
1436+
}
1437+
for _, k := range l.Contents {
1438+
klog.V(2).Infof("Deleting object %s", *k.Key)
1439+
di.Delete.Objects = append(di.Delete.Objects, types.ObjectIdentifier{Key: k.Key})
1440+
}
1441+
if _, err := s.s3Client.DeleteObjects(ctx, di); err != nil {
1442+
return fmt.Errorf("failed to delete objects: %v", err)
1443+
}
1444+
return nil
1445+
}
1446+
12681447
func printDragonsWarning() {
12691448
d := `H4sIAFZYZGcAA01QMQ7EIAzbeYXV5UCqkq1bf2IFtpNuPalj334hFQdkwLGNAwBzyXnKitOiqTYj
12701449
B7ZGplWEwZhZqxZ1aKuswcD0AA4GXPUhI0MEpSd5Ow09vJ+m6rVtF6m0GDccYXDZEdp9N/g1H9Pf

0 commit comments

Comments
 (0)