Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 19 additions & 13 deletions storage/aws/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -1132,21 +1132,27 @@ func (s *mySQLSequencer) publishTree(ctx context.Context, minAge time.Duration,
if err := pRow.Scan(&pubAt); err != nil {
return fmt.Errorf("failed to parse publishedAt: %v", err)
}
if time.Since(time.Unix(pubAt, 0)) > minAge {
row := tx.QueryRowContext(ctx, "SELECT seq, rootHash FROM IntCoord WHERE id = ?", 0)
var fromSeq uint64
var rootHash []byte
if err := row.Scan(&fromSeq, &rootHash); err != nil {
return fmt.Errorf("failed to read IntCoord: %v", err)
}
cpAge := time.Since(time.Unix(pubAt, 0))
if cpAge < minAge {
klog.V(1).Infof("publishTree: last checkpoint published %s ago (< required %s), not publishing new checkpoint", cpAge, minAge)
return nil
}

if err := f(ctx, fromSeq, rootHash); err != nil {
return err
}
klog.V(1).Infof("publishTree: updating checkpoint (replacing %s old checkpoint)", cpAge)

if _, err := tx.ExecContext(ctx, "UPDATE PubCoord SET publishedAt=? WHERE id=?", time.Now().Unix(), 0); err != nil {
return err
}
row := tx.QueryRowContext(ctx, "SELECT seq, rootHash FROM IntCoord WHERE id = ?", 0)
var fromSeq uint64
var rootHash []byte
if err := row.Scan(&fromSeq, &rootHash); err != nil {
return fmt.Errorf("failed to read IntCoord: %v", err)
}

if err := f(ctx, fromSeq, rootHash); err != nil {
return err
}

if _, err := tx.ExecContext(ctx, "UPDATE PubCoord SET publishedAt=? WHERE id=?", time.Now().Unix(), 0); err != nil {
return err
}
if err := tx.Commit(); err != nil {
return err
Expand Down
67 changes: 38 additions & 29 deletions storage/aws/aws_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,33 +375,43 @@ func TestPublishTree(t *testing.T) {
klog.Warningf("MySQL not available, skipping %s", t.Name())
t.Skip("MySQL not available, skipping test")
}
// Clean tables in case there's already something in there.
mustDropTables(t, ctx)

s, err := newMySQLSequencer(ctx, *mySQLURI, 1000, 0, 0)
if err != nil {
t.Fatalf("newMySQLSequencer: %v", err)
}

for _, test := range []struct {
name string
publishInterval time.Duration
wait time.Duration
wantUpdate bool
attempts []time.Duration
wantUpdates int
}{
{
name: "works ok",
publishInterval: 10 * time.Millisecond,
wait: 1 * time.Second,
wantUpdate: true,
publishInterval: 100 * time.Millisecond,
attempts: []time.Duration{1 * time.Second},
wantUpdates: 1,
}, {
name: "too soon, skip update",
publishInterval: 10 * time.Second,
wait: 100 * time.Millisecond,
wantUpdate: false,
attempts: []time.Duration{100 * time.Millisecond},
wantUpdates: 0,
}, {
name: "too soon, skip update, but recovers",
publishInterval: 2 * time.Second,
attempts: []time.Duration{100 * time.Millisecond, 2 * time.Second},
wantUpdates: 1,
}, {
name: "many attempts, eventually one succeeds",
publishInterval: 1 * time.Second,
attempts: []time.Duration{300 * time.Millisecond, 300 * time.Millisecond, 300 * time.Millisecond, 300 * time.Millisecond},
wantUpdates: 1,
},
} {
t.Run(test.name, func(t *testing.T) {
// Clean tables in case there's already something in there.
mustDropTables(t, ctx)

s, err := newMySQLSequencer(ctx, *mySQLURI, 1000, 0, 0)
if err != nil {
t.Fatalf("newMySQLSequencer: %v", err)
}
m := newMemObjStore()
storage := &Appender{
logStore: &logResourceStore{
Expand All @@ -424,24 +434,23 @@ func TestPublishTree(t *testing.T) {
if err := m.setObject(ctx, layout.CheckpointPath, cpOld, "", ""); err != nil {
t.Fatalf("setObject(bananas): %v", err)
}

time.Sleep(test.wait)

if err := s.publishTree(ctx, test.publishInterval, storage.publishCheckpoint); err != nil {
t.Fatalf("publishTree: %v", err)
}
cpNew, err := m.getObject(ctx, layout.CheckpointPath)
cpUpdated := !bytes.Equal(cpOld, cpNew)
if err != nil {
// Do not use errors.Is. Keep errors.As to compare by type and not by value.
var nske *types.NoSuchKey
if !errors.As(err, &nske) {
updatesSeen := 0
for _, d := range test.attempts {
time.Sleep(d)
if err := s.publishTree(ctx, test.publishInterval, storage.publishCheckpoint); err != nil {
t.Fatalf("publishTree: %v", err)
}
cpNew, err := m.getObject(ctx, layout.CheckpointPath)
if err != nil {
t.Fatalf("getObject: %v", err)
}
cpUpdated = false
if !bytes.Equal(cpOld, cpNew) {
updatesSeen++
cpOld = cpNew
}
}
if test.wantUpdate != cpUpdated {
t.Fatalf("got cpUpdated=%t, want %t", cpUpdated, test.wantUpdate)
if updatesSeen != test.wantUpdates {
t.Fatalf("Saw %d updates, want %d", updatesSeen, test.wantUpdates)
}
})
}
Expand Down
35 changes: 21 additions & 14 deletions storage/gcp/gcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -915,20 +915,27 @@ func (s *spannerCoordinator) publishTree(ctx context.Context, minAge time.Durati
if err := pRow.Column(0, &pubAt); err != nil {
return fmt.Errorf("failed to parse publishedAt: %v", err)
}
if time.Since(pubAt) > minAge {
// Can't just use currentTree() here as the spanner emulator doesn't do nested transactions, so do it manually:
row, err := txn.ReadRow(ctx, "IntCoord", spanner.Key{0}, []string{"seq", "rootHash"})
if err != nil {
return fmt.Errorf("failed to read IntCoord: %w", err)
}
var fromSeq int64 // Spanner doesn't support uint64
var rootHash []byte
if err := row.Columns(&fromSeq, &rootHash); err != nil {
return fmt.Errorf("failed to parse integration coordination info: %v", err)
}
if err := f(ctx, uint64(fromSeq), rootHash); err != nil {
return err
}

cpAge := time.Since(pubAt)
if cpAge < minAge {
klog.V(1).Infof("publishTree: last checkpoint published %s ago (< required %s), not publishing new checkpoint", cpAge, minAge)
return nil
}

klog.V(1).Infof("publishTree: updating checkpoint (replacing %s old checkpoint)", cpAge)

// Can't just use currentTree() here as the spanner emulator doesn't do nested transactions, so do it manually:
row, err := txn.ReadRow(ctx, "IntCoord", spanner.Key{0}, []string{"seq", "rootHash"})
if err != nil {
return fmt.Errorf("failed to read IntCoord: %w", err)
}
var fromSeq int64 // Spanner doesn't support uint64
var rootHash []byte
if err := row.Columns(&fromSeq, &rootHash); err != nil {
return fmt.Errorf("failed to parse integration coordination info: %v", err)
}
if err := f(ctx, uint64(fromSeq), rootHash); err != nil {
return err
}
if err := txn.BufferWrite([]*spanner.Mutation{spanner.Update("PubCoord", []string{"id", "publishedAt"}, []any{0, time.Now()})}); err != nil {
return err
Expand Down
68 changes: 39 additions & 29 deletions storage/gcp/gcp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,34 +421,43 @@ func TestStreamEntries(t *testing.T) {

func TestPublishTree(t *testing.T) {
ctx := context.Background()

close := newSpannerDB(t)
defer close()

s, err := newSpannerCoordinator(ctx, "projects/p/instances/i/databases/d", 1000)
if err != nil {
t.Fatalf("newSpannerCoordinator: %v", err)
}

for _, test := range []struct {
name string
publishInterval time.Duration
wait time.Duration
wantUpdate bool
attempts []time.Duration
wantUpdates int
}{
{
name: "works ok",
publishInterval: 10 * time.Millisecond,
wait: 1 * time.Second,
wantUpdate: true,
publishInterval: 100 * time.Millisecond,
attempts: []time.Duration{1 * time.Second},
wantUpdates: 1,
}, {
name: "too soon, skip update",
publishInterval: 10 * time.Second,
wait: 100 * time.Millisecond,
wantUpdate: false,
attempts: []time.Duration{100 * time.Millisecond},
wantUpdates: 0,
}, {
name: "too soon, skip update, but recovers",
publishInterval: 2 * time.Second,
attempts: []time.Duration{100 * time.Millisecond, 2 * time.Second},
wantUpdates: 1,
}, {
name: "many attempts, eventually one succeeds",
publishInterval: 1 * time.Second,
attempts: []time.Duration{300 * time.Millisecond, 300 * time.Millisecond, 300 * time.Millisecond, 300 * time.Millisecond},
wantUpdates: 1,
},
} {
t.Run(test.name, func(t *testing.T) {
closeDB := newSpannerDB(t)
defer closeDB()
s, err := newSpannerCoordinator(ctx, "projects/p/instances/i/databases/d", 1000)
if err != nil {
t.Fatalf("newSpannerCoordinator: %v", err)
}
defer s.dbPool.Close()

m := newMemObjStore()
storage := &Appender{
logStore: &logResourceStore{
Expand All @@ -471,22 +480,23 @@ func TestPublishTree(t *testing.T) {
if err := m.setObject(ctx, layout.CheckpointPath, cpOld, nil, "", ""); err != nil {
t.Fatalf("setObject(bananas): %v", err)
}

time.Sleep(test.wait)

if err := s.publishTree(ctx, test.publishInterval, storage.publishCheckpoint); err != nil {
t.Fatalf("publishTree: %v", err)
}
cpNew, _, err := m.getObject(ctx, layout.CheckpointPath)
cpUpdated := !bytes.Equal(cpOld, cpNew)
if err != nil {
if !errors.Is(err, gcs.ErrObjectNotExist) {
updatesSeen := 0
for _, d := range test.attempts {
time.Sleep(d)
if err := s.publishTree(ctx, test.publishInterval, storage.publishCheckpoint); err != nil {
t.Fatalf("publishTree: %v", err)
}
cpNew, _, err := m.getObject(ctx, layout.CheckpointPath)
if err != nil {
t.Fatalf("getObject: %v", err)
}
cpUpdated = false
if !bytes.Equal(cpOld, cpNew) {
updatesSeen++
cpOld = cpNew
}
}
if test.wantUpdate != cpUpdated {
t.Fatalf("got cpUpdated=%t, want %t", cpUpdated, test.wantUpdate)
if updatesSeen != test.wantUpdates {
t.Fatalf("Saw %d updates, want %d", updatesSeen, test.wantUpdates)
}
})
}
Expand Down
Loading