From 83724b0527cb01d4dfafdd99cffa9c84407bbecc Mon Sep 17 00:00:00 2001 From: Al Cutter Date: Wed, 21 May 2025 10:55:06 +0100 Subject: [PATCH 1/2] Fix bug in GCP checkpoint publishing --- storage/gcp/gcp.go | 35 ++++++++++++--------- storage/gcp/gcp_test.go | 68 +++++++++++++++++++++++------------------ 2 files changed, 60 insertions(+), 43 deletions(-) diff --git a/storage/gcp/gcp.go b/storage/gcp/gcp.go index 72280bc14..cd900cb47 100644 --- a/storage/gcp/gcp.go +++ b/storage/gcp/gcp.go @@ -915,20 +915,27 @@ func (s *spannerCoordinator) publishTree(ctx context.Context, minAge time.Durati if err := pRow.Column(0, &pubAt); err != nil { return fmt.Errorf("failed to parse publishedAt: %v", err) } - if time.Since(pubAt) > minAge { - // Can't just use currentTree() here as the spanner emulator doesn't do nested transactions, so do it manually: - row, err := txn.ReadRow(ctx, "IntCoord", spanner.Key{0}, []string{"seq", "rootHash"}) - if err != nil { - return fmt.Errorf("failed to read IntCoord: %w", err) - } - var fromSeq int64 // Spanner doesn't support uint64 - var rootHash []byte - if err := row.Columns(&fromSeq, &rootHash); err != nil { - return fmt.Errorf("failed to parse integration coordination info: %v", err) - } - if err := f(ctx, uint64(fromSeq), rootHash); err != nil { - return err - } + + cpAge := time.Since(pubAt) + if cpAge < minAge { + klog.V(1).Infof("publishTree: last checkpoint published %s ago (< required %s), not publishing new checkpoint", cpAge, minAge) + return nil + } + + klog.V(1).Infof("publishTree: updating checkpoint (replacing %s old checkpoint)", cpAge) + + // Can't just use currentTree() here as the spanner emulator doesn't do nested transactions, so do it manually: + row, err := txn.ReadRow(ctx, "IntCoord", spanner.Key{0}, []string{"seq", "rootHash"}) + if err != nil { + return fmt.Errorf("failed to read IntCoord: %w", err) + } + var fromSeq int64 // Spanner doesn't support uint64 + var rootHash []byte + if err := row.Columns(&fromSeq, &rootHash); err != nil { + return fmt.Errorf("failed to parse integration coordination info: %v", err) + } + if err := f(ctx, uint64(fromSeq), rootHash); err != nil { + return err } if err := txn.BufferWrite([]*spanner.Mutation{spanner.Update("PubCoord", []string{"id", "publishedAt"}, []any{0, time.Now()})}); err != nil { return err diff --git a/storage/gcp/gcp_test.go b/storage/gcp/gcp_test.go index dea576ef0..78d033dce 100644 --- a/storage/gcp/gcp_test.go +++ b/storage/gcp/gcp_test.go @@ -421,34 +421,43 @@ func TestStreamEntries(t *testing.T) { func TestPublishTree(t *testing.T) { ctx := context.Background() - - close := newSpannerDB(t) - defer close() - - s, err := newSpannerCoordinator(ctx, "projects/p/instances/i/databases/d", 1000) - if err != nil { - t.Fatalf("newSpannerCoordinator: %v", err) - } - for _, test := range []struct { name string publishInterval time.Duration - wait time.Duration - wantUpdate bool + attempts []time.Duration + wantUpdates int }{ { name: "works ok", - publishInterval: 10 * time.Millisecond, - wait: 1 * time.Second, - wantUpdate: true, + publishInterval: 100 * time.Millisecond, + attempts: []time.Duration{1 * time.Second}, + wantUpdates: 1, }, { name: "too soon, skip update", publishInterval: 10 * time.Second, - wait: 100 * time.Millisecond, - wantUpdate: false, + attempts: []time.Duration{100 * time.Millisecond}, + wantUpdates: 0, + }, { + name: "too soon, skip update, but recovers", + publishInterval: 2 * time.Second, + attempts: []time.Duration{100 * time.Millisecond, 2 * time.Second}, + wantUpdates: 1, + }, { + name: "many attempts, eventually one succeeds", + publishInterval: 1 * time.Second, + attempts: []time.Duration{300 * time.Millisecond, 300 * time.Millisecond, 300 * time.Millisecond, 300 * time.Millisecond}, + wantUpdates: 1, }, } { t.Run(test.name, func(t *testing.T) { + closeDB := newSpannerDB(t) + defer closeDB() + s, err := newSpannerCoordinator(ctx, "projects/p/instances/i/databases/d", 1000) + if err != nil { + t.Fatalf("newSpannerCoordinator: %v", err) + } + defer s.dbPool.Close() + m := newMemObjStore() storage := &Appender{ logStore: &logResourceStore{ @@ -471,22 +480,23 @@ func TestPublishTree(t *testing.T) { if err := m.setObject(ctx, layout.CheckpointPath, cpOld, nil, "", ""); err != nil { t.Fatalf("setObject(bananas): %v", err) } - - time.Sleep(test.wait) - - if err := s.publishTree(ctx, test.publishInterval, storage.publishCheckpoint); err != nil { - t.Fatalf("publishTree: %v", err) - } - cpNew, _, err := m.getObject(ctx, layout.CheckpointPath) - cpUpdated := !bytes.Equal(cpOld, cpNew) - if err != nil { - if !errors.Is(err, gcs.ErrObjectNotExist) { + updatesSeen := 0 + for _, d := range test.attempts { + time.Sleep(d) + if err := s.publishTree(ctx, test.publishInterval, storage.publishCheckpoint); err != nil { + t.Fatalf("publishTree: %v", err) + } + cpNew, _, err := m.getObject(ctx, layout.CheckpointPath) + if err != nil { t.Fatalf("getObject: %v", err) } - cpUpdated = false + if !bytes.Equal(cpOld, cpNew) { + updatesSeen++ + cpOld = cpNew + } } - if test.wantUpdate != cpUpdated { - t.Fatalf("got cpUpdated=%t, want %t", cpUpdated, test.wantUpdate) + if updatesSeen != test.wantUpdates { + t.Fatalf("Saw %d updates, want %d", updatesSeen, test.wantUpdates) } }) } From 125211881b84ccfe469d25b283069f2b96d00a4a Mon Sep 17 00:00:00 2001 From: Al Cutter Date: Wed, 21 May 2025 10:55:06 +0100 Subject: [PATCH 2/2] Fix bug in AWS checkpoint publishing --- storage/aws/aws.go | 32 ++++++++++++-------- storage/aws/aws_test.go | 67 +++++++++++++++++++++++------------------ 2 files changed, 57 insertions(+), 42 deletions(-) diff --git a/storage/aws/aws.go b/storage/aws/aws.go index 6f1c75401..ee2d1ff00 100644 --- a/storage/aws/aws.go +++ b/storage/aws/aws.go @@ -1132,21 +1132,27 @@ func (s *mySQLSequencer) publishTree(ctx context.Context, minAge time.Duration, if err := pRow.Scan(&pubAt); err != nil { return fmt.Errorf("failed to parse publishedAt: %v", err) } - if time.Since(time.Unix(pubAt, 0)) > minAge { - row := tx.QueryRowContext(ctx, "SELECT seq, rootHash FROM IntCoord WHERE id = ?", 0) - var fromSeq uint64 - var rootHash []byte - if err := row.Scan(&fromSeq, &rootHash); err != nil { - return fmt.Errorf("failed to read IntCoord: %v", err) - } + cpAge := time.Since(time.Unix(pubAt, 0)) + if cpAge < minAge { + klog.V(1).Infof("publishTree: last checkpoint published %s ago (< required %s), not publishing new checkpoint", cpAge, minAge) + return nil + } - if err := f(ctx, fromSeq, rootHash); err != nil { - return err - } + klog.V(1).Infof("publishTree: updating checkpoint (replacing %s old checkpoint)", cpAge) - if _, err := tx.ExecContext(ctx, "UPDATE PubCoord SET publishedAt=? WHERE id=?", time.Now().Unix(), 0); err != nil { - return err - } + row := tx.QueryRowContext(ctx, "SELECT seq, rootHash FROM IntCoord WHERE id = ?", 0) + var fromSeq uint64 + var rootHash []byte + if err := row.Scan(&fromSeq, &rootHash); err != nil { + return fmt.Errorf("failed to read IntCoord: %v", err) + } + + if err := f(ctx, fromSeq, rootHash); err != nil { + return err + } + + if _, err := tx.ExecContext(ctx, "UPDATE PubCoord SET publishedAt=? WHERE id=?", time.Now().Unix(), 0); err != nil { + return err } if err := tx.Commit(); err != nil { return err diff --git a/storage/aws/aws_test.go b/storage/aws/aws_test.go index 75693b21f..dd10e35d3 100644 --- a/storage/aws/aws_test.go +++ b/storage/aws/aws_test.go @@ -375,33 +375,43 @@ func TestPublishTree(t *testing.T) { klog.Warningf("MySQL not available, skipping %s", t.Name()) t.Skip("MySQL not available, skipping test") } - // Clean tables in case there's already something in there. - mustDropTables(t, ctx) - - s, err := newMySQLSequencer(ctx, *mySQLURI, 1000, 0, 0) - if err != nil { - t.Fatalf("newMySQLSequencer: %v", err) - } for _, test := range []struct { name string publishInterval time.Duration - wait time.Duration - wantUpdate bool + attempts []time.Duration + wantUpdates int }{ { name: "works ok", - publishInterval: 10 * time.Millisecond, - wait: 1 * time.Second, - wantUpdate: true, + publishInterval: 100 * time.Millisecond, + attempts: []time.Duration{1 * time.Second}, + wantUpdates: 1, }, { name: "too soon, skip update", publishInterval: 10 * time.Second, - wait: 100 * time.Millisecond, - wantUpdate: false, + attempts: []time.Duration{100 * time.Millisecond}, + wantUpdates: 0, + }, { + name: "too soon, skip update, but recovers", + publishInterval: 2 * time.Second, + attempts: []time.Duration{100 * time.Millisecond, 2 * time.Second}, + wantUpdates: 1, + }, { + name: "many attempts, eventually one succeeds", + publishInterval: 1 * time.Second, + attempts: []time.Duration{300 * time.Millisecond, 300 * time.Millisecond, 300 * time.Millisecond, 300 * time.Millisecond}, + wantUpdates: 1, }, } { t.Run(test.name, func(t *testing.T) { + // Clean tables in case there's already something in there. + mustDropTables(t, ctx) + + s, err := newMySQLSequencer(ctx, *mySQLURI, 1000, 0, 0) + if err != nil { + t.Fatalf("newMySQLSequencer: %v", err) + } m := newMemObjStore() storage := &Appender{ logStore: &logResourceStore{ @@ -424,24 +434,23 @@ func TestPublishTree(t *testing.T) { if err := m.setObject(ctx, layout.CheckpointPath, cpOld, "", ""); err != nil { t.Fatalf("setObject(bananas): %v", err) } - - time.Sleep(test.wait) - - if err := s.publishTree(ctx, test.publishInterval, storage.publishCheckpoint); err != nil { - t.Fatalf("publishTree: %v", err) - } - cpNew, err := m.getObject(ctx, layout.CheckpointPath) - cpUpdated := !bytes.Equal(cpOld, cpNew) - if err != nil { - // Do not use errors.Is. Keep errors.As to compare by type and not by value. - var nske *types.NoSuchKey - if !errors.As(err, &nske) { + updatesSeen := 0 + for _, d := range test.attempts { + time.Sleep(d) + if err := s.publishTree(ctx, test.publishInterval, storage.publishCheckpoint); err != nil { + t.Fatalf("publishTree: %v", err) + } + cpNew, err := m.getObject(ctx, layout.CheckpointPath) + if err != nil { t.Fatalf("getObject: %v", err) } - cpUpdated = false + if !bytes.Equal(cpOld, cpNew) { + updatesSeen++ + cpOld = cpNew + } } - if test.wantUpdate != cpUpdated { - t.Fatalf("got cpUpdated=%t, want %t", cpUpdated, test.wantUpdate) + if updatesSeen != test.wantUpdates { + t.Fatalf("Saw %d updates, want %d", updatesSeen, test.wantUpdates) } }) }