diff --git a/internal/fsck/fsck.go b/internal/fsck/fsck.go index 1cba2b470..a78237ccd 100644 --- a/internal/fsck/fsck.go +++ b/internal/fsck/fsck.go @@ -70,9 +70,6 @@ func Check(ctx context.Context, origin string, verifier note.Verifier, f Fetcher } // Set up a stream of entry bundles from the log to be checked. - getSize := func(_ context.Context) (uint64, error) { return cp.Size, nil } - next, cancel := stream.StreamAdaptor(ctx, N, getSize, f.ReadEntryBundle, 0) - defer cancel() eg := errgroup.Group{} @@ -81,16 +78,17 @@ func Check(ctx context.Context, origin string, verifier note.Verifier, f Fetcher eg.Go(fTree.resourceCheckWorker(ctx)) } + getSize := func(_ context.Context) (uint64, error) { return cp.Size, nil } // Consume the stream of bundles to re-derive the other log resources. // TODO(al): consider chunking the log and doing each in parallel. - for fTree.tree.End() < cp.Size { - ri, b, err := next() + for b, err := range stream.EntryBundles(ctx, N, getSize, f.ReadEntryBundle, 0, cp.Size) { if err != nil { - klog.Warningf("next: %v", err) - break + return fmt.Errorf("error while streaming bundles: %v", err) + } + if err := fTree.AppendBundle(b.RangeInfo, b.Data); err != nil { + return fmt.Errorf("failure calling AppendBundle(%v): %v", b.RangeInfo, err) } - if err := fTree.AppendBundle(ri, b); err != nil { - klog.Warningf("AppendBundle(%v): %v", ri, err) + if fTree.tree.End() >= cp.Size { break } } diff --git a/internal/stream/follower.go b/internal/stream/follower.go index 99103bed4..7c4426f09 100644 --- a/internal/stream/follower.go +++ b/internal/stream/follower.go @@ -16,8 +16,7 @@ package stream import ( "context" - - "github.com/transparency-dev/tessera/api/layout" + "iter" ) // Follower describes the contract of something which is required to track the contents of the local log. @@ -56,19 +55,13 @@ type Streamer interface { // entries into account. NextIndex(ctx context.Context) (uint64, error) - // StreamEntries() returns functions `next` and `stop` which act like a pull iterator for - // consecutive entry bundles, starting with the entry bundle which contains the requested entry - // index. - // - // Each call to `next` will return raw entry bundle bytes along with a RangeInfo struct which - // contains information on which entries within that bundle are to be considered valid. + // StreamEntries returns an iterator over the range of requested entries [startEntryIdx, startEntryIdx+N). // - // next will hang if it has reached the extent of the current tree, and return once either - // the tree has grown and more entries are available, or cancel was called. + // The iterator will yield either a Bundle struct or an error. If an error is returned the caller should + // stop consuming from the iterator as it's unlikely that a partial stream of entries from a transparency log + // is useful. // - // next will cease iterating if either: - // - it produces an error (e.g. via the underlying calls to the log storage) - // - the returned cancel function is called - // and will continue to return an error if called again after either of these cases. - StreamEntries(ctx context.Context, fromEntryIdx uint64) (next func() (layout.RangeInfo, []byte, error), cancel func()) + // The returned Bundle contains the raw serialised form of the entry bundle, along with a layout.RangeInfo + // struct that describes which of the entries in the entry bundle are part of the requested range. + StreamEntries(ctx context.Context, startEntryIdx, N uint64) iter.Seq2[Bundle, error] } diff --git a/internal/stream/stream.go b/internal/stream/stream.go index 65c16b487..0ea4b4a22 100644 --- a/internal/stream/stream.go +++ b/internal/stream/stream.go @@ -17,46 +17,45 @@ package stream import ( "context" - "errors" "fmt" - "time" + "iter" "github.com/transparency-dev/tessera/api/layout" "k8s.io/klog/v2" ) -// NoMoreEntries is a sentinel error returned by StreamEntries when no more entries will be returned by calls to the next function. -var ErrNoMoreEntries = errors.New("no more entries") - // GetBundleFn is a function which knows how to fetch a single entry bundle from the specified address. type GetBundleFn func(ctx context.Context, bundleIdx uint64, partial uint8) ([]byte, error) // GetTreeSizeFn is a function which knows how to return a tree size. type GetTreeSizeFn func(ctx context.Context) (uint64, error) -// StreamAdaptor uses the provided function to produce a stream of entry bundles accesible via the returned functions. -// -// Entry bundles are retuned strictly in order via consecutive calls to the returned next func. -// If the adaptor encounters an error while reading an entry bundle, the encountered error will be returned by the corresponding call to next, -// and the stream will be stopped - further calls to next will continue to return errors. +// Bundle represents an entry bundle in a log, along with some metadata about which parts of the bundle +// are relevent. +type Bundle struct { + // RangeInfo decribes which of the entries in this bundle are relevent. + RangeInfo layout.RangeInfo + // Data is the raw serialised bundle, as fetched from the log. + Data []byte +} + +// EntryBundles produces an iterator which returns a stream of Bundle structs which cover the requested range of entries in their natural order in the log. // -// When the caller has finished consuming entry bundles (either because of an error being returned via next, or having consumed all the bundles it needs), -// it MUST call the returned cancel function to release resources. +// If the adaptor encounters an error while reading an entry bundle, the encountered error will be returned via the iterator. // // This adaptor is optimised for the case where calling getBundle has some appreciable latency, and works // around that by maintaining a read-ahead cache of subsequent bundles which is populated a number of parallel // requests to getBundle. The request parallelism is set by the value of the numWorkers paramemter, which can be tuned // to balance throughput against consumption of resources, but such balancing needs to be mindful of the nature of the // source infrastructure, and how concurrent requests affect performance (e.g. GCS buckets vs. files on a single disk). -func StreamAdaptor(ctx context.Context, numWorkers uint, getSize GetTreeSizeFn, getBundle GetBundleFn, fromEntry uint64) (next func() (ri layout.RangeInfo, bundle []byte, err error), cancel func()) { +func EntryBundles(ctx context.Context, numWorkers uint, getSize GetTreeSizeFn, getBundle GetBundleFn, fromEntry uint64, N uint64) iter.Seq2[Bundle, error] { ctx, span := tracer.Start(ctx, "tessera.storage.StreamAdaptor") defer span.End() // bundleOrErr represents a fetched entry bundle and its params, or an error if we couldn't fetch it for // some reason. type bundleOrErr struct { - ri layout.RangeInfo - b []byte + b Bundle err error } @@ -76,136 +75,105 @@ func StreamAdaptor(ctx context.Context, numWorkers uint, getSize GetTreeSizeFn, defer close(bundles) + treeSize, err := getSize(ctx) + if err != nil { + bundles <- func() bundleOrErr { return bundleOrErr{err: err} } + return + } + // We'll limit ourselves to numWorkers worth of on-going work using these tokens: tokens := make(chan struct{}, numWorkers) for range numWorkers { tokens <- struct{}{} } - // We'll keep looping around until told to exit. - for { - // Check afresh what size the tree is so we can keep streaming entries as the tree grows. - treeSize, err := getSize(ctx) - if err != nil { - klog.Warningf("StreamAdaptor: failed to get current tree size: %v", err) - continue - } - klog.V(1).Infof("StreamAdaptor: streaming from %d to %d", fromEntry, treeSize) - - // For each bundle, pop a future into the bundles channel and kick off an async request - // to resolve it. - rangeLoop: - for ri := range layout.Range(fromEntry, treeSize, treeSize) { - select { - case <-exit: - break rangeLoop - case <-tokens: - // We'll return a token below, once the bundle is fetched _and_ is being yielded. - } - - c := make(chan bundleOrErr, 1) - go func(ri layout.RangeInfo) { - b, err := getBundle(ctx, ri.Index, ri.Partial) - c <- bundleOrErr{ri: ri, b: b, err: err} - }(ri) - - f := func() bundleOrErr { - b := <-c - // We're about to yield a value, so we can now return the token and unblock another fetch. - tokens <- struct{}{} - return b - } - - bundles <- f - } - - // Next loop, carry on from where we got to. - fromEntry = treeSize + klog.V(1).Infof("stream.EntryBundles: streaming [%d, %d)", fromEntry, fromEntry+N) + // For each bundle, pop a future into the bundles channel and kick off an async request + // to resolve it. + for ri := range layout.Range(fromEntry, fromEntry+N, treeSize) { select { case <-exit: - klog.Infof("StreamAdaptor: exiting") return - case <-time.After(time.Second): - // We've caught up with and hit the end of the tree, so wait a bit before looping to avoid busy waiting. - // TODO(al): could consider a shallow channel of sizes here. + case <-tokens: + // We'll return a token below, once the bundle is fetched _and_ is being yielded. } - } - }() - cancel = func() { - close(exit) - } + c := make(chan bundleOrErr, 1) + go func(ri layout.RangeInfo) { + b, err := getBundle(ctx, ri.Index, ri.Partial) + c <- bundleOrErr{b: Bundle{RangeInfo: ri, Data: b}, err: err} + }(ri) + + f := func() bundleOrErr { + b := <-c + // We're about to yield a value, so we can now return the token and unblock another fetch. + tokens <- struct{}{} + return b + } - var streamErr error - next = func() (layout.RangeInfo, []byte, error) { - if streamErr != nil { - return layout.RangeInfo{}, nil, streamErr + bundles <- f } - f, ok := <-bundles - if !ok { - streamErr = ErrNoMoreEntries - return layout.RangeInfo{}, nil, streamErr - } - b := f() - if b.err != nil { - streamErr = b.err + klog.V(1).Infof("stream.EntryBundles: exiting") + }() + + return func(yield func(Bundle, error) bool) { + defer close(exit) + + for f := range bundles { + b := f() + if !yield(b.b, b.err) { + return + } + // For now, force the iterator to stop if we've just returned an error. + // If there's a good reason to allow it to continue we can change this. + if b.err != nil { + return + } } - return b.ri, b.b, b.err + klog.V(1).Infof("stream.EntryBundles: iter done") } - return next, cancel } -// EntryStreamReader converts a stream of {RangeInfo, EntryBundle} into a stream of individually processed entries. -// -// This is mostly useful to Follower implementations which need to parse and consume individual entries being streamed -// from a LogReader. -type EntryStreamReader[T any] struct { - bundleFn func([]byte) ([]T, error) - next func() (layout.RangeInfo, []byte, error) - - curData []T - curRI layout.RangeInfo - i uint64 +// Entry represents a single leaf in a log. +type Entry[T any] struct { + // Index is the index of the entry in the log. + Index uint64 + // Entry is the entry from the log. + Entry T } -// NewEntryStreamReader creates a new stream reader which uses the provided bundleFn to process bundles into processed entries of type T. +// Entries creates a new stream reader which uses the provided bundleFn to process bundles into processed entries of type T. // // Different bundleFn implementations can be provided to return raw entry bytes, parsed entry structs, or derivations of entries (e.g. hashes) as needed. -func NewEntryStreamReader[T any](next func() (layout.RangeInfo, []byte, error), bundleFn func([]byte) ([]T, error)) *EntryStreamReader[T] { - return &EntryStreamReader[T]{ - bundleFn: bundleFn, - next: next, - i: 0, - } -} - -// Next processes and returns the next available entry in the stream along with its index in the log. -func (e *EntryStreamReader[T]) Next() (uint64, T, error) { - var t T - if len(e.curData) == 0 { - var err error - var b []byte - e.curRI, b, err = e.next() - if err != nil { - return 0, t, fmt.Errorf("next: %v", err) - } - e.curData, err = e.bundleFn(b) - if err != nil { - return 0, t, fmt.Errorf("bundleFn(bundleEntry @%d): %v", e.curRI.Index, err) +func Entries[T any](bundles iter.Seq2[Bundle, error], bundleFn func([]byte) ([]T, error)) iter.Seq2[Entry[T], error] { + return func(yield func(Entry[T], error) bool) { + for b, err := range bundles { + if err != nil { + yield(Entry[T]{}, err) + return + } + es, err := bundleFn(b.Data) + if err != nil { + yield(Entry[T]{}, err) + return + } + if len(es) <= int(b.RangeInfo.First) { + yield(Entry[T]{}, fmt.Errorf("logic error: First is %d but only %d entries", b.RangeInfo.First, len(es))) + return + } + es = es[b.RangeInfo.First:] + if len(es) > int(b.RangeInfo.N) { + es = es[:b.RangeInfo.N] + } + rIdx := b.RangeInfo.Index*layout.EntryBundleWidth + uint64(b.RangeInfo.First) + for i, e := range es { + if !yield(Entry[T]{Index: rIdx + uint64(i), Entry: e}, nil) { + return + } + } } - if e.curRI.First > 0 { - e.curData = e.curData[e.curRI.First:] - } - if len(e.curData) > int(e.curRI.N) { - e.curData = e.curData[:e.curRI.N] - } - e.i = 0 } - t, e.curData = e.curData[0], e.curData[1:] - rIdx := e.curRI.Index*layout.EntryBundleWidth + uint64(e.curRI.First) + e.i - e.i++ - return rIdx, t, nil } diff --git a/lifecycle.go b/lifecycle.go index f85724387..6f03fa9bf 100644 --- a/lifecycle.go +++ b/lifecycle.go @@ -21,13 +21,9 @@ import ( "github.com/transparency-dev/merkle/rfc6962" "github.com/transparency-dev/tessera/api" - "github.com/transparency-dev/tessera/api/layout" "github.com/transparency-dev/tessera/internal/stream" ) -// NoMoreEntries is a sentinel error returned by StreamEntries when no more entries will be returned by calls to the next function. -var ErrNoMoreEntries = stream.ErrNoMoreEntries - // LogReader provides read-only access to the log. type LogReader interface { // ReadCheckpoint returns the latest checkpoint available. @@ -53,39 +49,7 @@ type LogReader interface { // The expected usage and corresponding behaviours are similar to ReadTile. ReadEntryBundle(ctx context.Context, index uint64, p uint8) ([]byte, error) - // IntegratedSize returns the current size of the integrated tree. - // - // This tree will have in place all the static resources the returned size implies, but - // there may not yet be a checkpoint for this size signed, witnessed, or published. - // - // It's ONLY safe to use this value for processes internal to the operation of the log (e.g. - // populating antispam data structures); it MUST NOT not be used as a substitute for - // reading the checkpoint when only data which has been publicly committed to by the - // log should be used. If in doubt, use ReadCheckpoint instead. - IntegratedSize(ctx context.Context) (uint64, error) - - // NextIndex returns the first as-yet unassigned index. - // - // In a quiescent log, this will be the same as the checkpoint size. In a log with entries actively - // being added, this number will be higher since it will take sequenced but not-yet-integrated/not-yet-published - // entries into account. - NextIndex(ctx context.Context) (uint64, error) - - // StreamEntries() returns functions `next` and `stop` which act like a pull iterator for - // consecutive entry bundles, starting with the entry bundle which contains the requested entry - // index. - // - // Each call to `next` will return raw entry bundle bytes along with a RangeInfo struct which - // contains information on which entries within that bundle are to be considered valid. - // - // next will hang if it has reached the extent of the current tree, and return once either - // the tree has grown and more entries are available, or cancel was called. - // - // next will cease iterating if either: - // - it produces an error (e.g. via the underlying calls to the log storage) - // - the returned cancel function is called - // and will continue to return an error if called again after either of these cases. - StreamEntries(ctx context.Context, fromEntryIdx uint64) (next func() (layout.RangeInfo, []byte, error), cancel func()) + stream.Streamer } // Antispam describes the contract that an antispam implementation must meet in order to be used via the diff --git a/storage/aws/antispam/aws.go b/storage/aws/antispam/aws.go index f64b719a1..8b447273f 100644 --- a/storage/aws/antispam/aws.go +++ b/storage/aws/antispam/aws.go @@ -23,6 +23,7 @@ import ( "database/sql" "errors" "fmt" + "iter" "strings" "sync/atomic" "time" @@ -256,11 +257,8 @@ func (f *follower) Follow(ctx context.Context, lr stream.Streamer) { t := time.NewTicker(time.Second) var ( - entryReader *stream.EntryStreamReader[[]byte] - stop func() - - curEntries [][]byte - curIndex uint64 + next func() (stream.Entry[[]byte], error, bool) + stop func() ) for { select { @@ -275,7 +273,7 @@ func (f *follower) Follow(ctx context.Context, lr stream.Streamer) { } // Busy loop while there's work to be done - for workDone := true; workDone; { + for streamDone := false; !streamDone; { select { case <-ctx.Done(): return @@ -304,7 +302,7 @@ func (f *follower) Follow(ctx context.Context, lr stream.Streamer) { if followFrom >= size { // Our view of the log is out of date, exit the busy loop and refresh it. - workDone = false + streamDone = true return nil } @@ -312,30 +310,36 @@ func (f *follower) Follow(ctx context.Context, lr stream.Streamer) { // If this is the first time around the loop we need to start the stream of entries now that we know where we want to // start reading from: - if entryReader == nil { - next, st := lr.StreamEntries(ctx, followFrom) - stop = st - entryReader = stream.NewEntryStreamReader(next, f.bundleHasher) + if next == nil { + next, stop = iter.Pull2(stream.Entries(lr.StreamEntries(ctx, followFrom, size-followFrom), f.bundleHasher)) } bs := uint64(f.as.opts.MaxBatchSize) if r := size - followFrom; r < bs { bs = r } - batch := make([][]byte, 0, bs) + curEntries := make([][]byte, 0, bs) for i := range int(bs) { - idx, c, err := entryReader.Next() + e, err, ok := next() + if !ok { + // The entry stream has ended so we'll need to start a new stream next time around the loop: + stop() + next = nil + break + } if err != nil { return fmt.Errorf("entryReader.next: %v", err) } - if wantIdx := followFrom + uint64(i); idx != wantIdx { + if wantIdx := followFrom + uint64(i); e.Index != wantIdx { // We're out of sync return errOutOfSync } - batch = append(batch, c) + curEntries = append(curEntries, e.Entry) + } + + if len(curEntries) == 0 { + return nil } - curEntries = batch - curIndex = followFrom klog.V(1).Infof("Inserting %d entries into antispam database (follow from %d of size %d)", len(curEntries), followFrom, size) @@ -343,7 +347,7 @@ func (f *follower) Follow(ctx context.Context, lr stream.Streamer) { vals := make([]any, 0, 2*len(curEntries)) for i, e := range curEntries { args = append(args, "(?, ?)") - vals = append(vals, e, curIndex+uint64(i)) + vals = append(vals, e, followFrom+uint64(i)) } sqlStr := fmt.Sprintf("INSERT IGNORE INTO AntispamIDSeq (h, idx) VALUES %s", strings.Join(args, ",")) @@ -370,14 +374,14 @@ func (f *follower) Follow(ctx context.Context, lr stream.Streamer) { if err != errOutOfSync { klog.Errorf("Failed to commit antispam population tx: %v", err) } - if entryReader != nil { + if next != nil { stop() - entryReader = nil + next = nil stop = nil } + streamDone = true continue } - curEntries = nil } } } diff --git a/storage/aws/antispam/aws_test.go b/storage/aws/antispam/aws_test.go index 4d268e9f0..052837ce2 100644 --- a/storage/aws/antispam/aws_test.go +++ b/storage/aws/antispam/aws_test.go @@ -71,9 +71,18 @@ func TestAntispam(t *testing.T) { t.Error("expected initial position to be 0") } + a := tessera.NewPublicationAwaiter(t.Context(), fl.LogReader.ReadCheckpoint, time.Second) var idx1 tessera.Index idxf1 := addFn(ctx, tessera.NewEntry([]byte("one"))) + if _, _, err := a.Await(t.Context(), idxf1); err != nil { + t.Fatalf("Await(1): %v", err) + } + idxf2 := addFn(ctx, tessera.NewEntry([]byte("two"))) + if _, _, err := a.Await(t.Context(), idxf2); err != nil { + t.Fatalf("Await(2): %v", err) + } + if idx1, err = idxf1(); err != nil { t.Fatal(err) } diff --git a/storage/aws/aws.go b/storage/aws/aws.go index ee2d1ff00..1b3e82ea8 100644 --- a/storage/aws/aws.go +++ b/storage/aws/aws.go @@ -38,6 +38,7 @@ import ( "errors" "fmt" "io" + "iter" "net/http" "os" "path/filepath" @@ -634,14 +635,14 @@ func (lr *logResourceStore) NextIndex(ctx context.Context) (uint64, error) { return lr.nextIndex(ctx) } -func (lr *logResourceStore) StreamEntries(ctx context.Context, fromEntry uint64) (next func() (ri layout.RangeInfo, bundle []byte, err error), cancel func()) { - klog.Infof("StreamEntries from %d", fromEntry) +func (lr *logResourceStore) StreamEntries(ctx context.Context, startEntry, N uint64) iter.Seq2[stream.Bundle, error] { + klog.Infof("StreamEntries [%d, %d)", startEntry, startEntry+N) // TODO(al): Consider making this configurable. // Reads to S3 should be able to go highly concurrent without issue, but some performance testing should probably be undertaken. // 10 works well for GCP, so start with that as a default. numWorkers := uint(10) - return stream.StreamAdaptor(ctx, numWorkers, lr.IntegratedSize, lr.ReadEntryBundle, fromEntry) + return stream.EntryBundles(ctx, numWorkers, lr.IntegratedSize, lr.ReadEntryBundle, startEntry, N) } // get returns the requested object. diff --git a/storage/aws/aws_test.go b/storage/aws/aws_test.go index dd10e35d3..211b23193 100644 --- a/storage/aws/aws_test.go +++ b/storage/aws/aws_test.go @@ -498,21 +498,16 @@ func TestStreamEntries(t *testing.T) { // We'll first try to stream up to logSize1, then when we reach it we'll // make the tree appear to grow to logSize2 to test resuming. seenEntries := uint64(0) - next, stop := s.StreamEntries(ctx, 0) - for { - gotRI, _, gotErr := next() + for gotEntry, gotErr := range s.StreamEntries(ctx, 0, uint64(logSize2)) { if gotErr != nil { - if errors.Is(gotErr, tessera.ErrNoMoreEntries) { - break - } t.Fatalf("gotErr after %d: %v", seenEntries, gotErr) } - if e := gotRI.Index*layout.EntryBundleWidth + uint64(gotRI.First); e != seenEntries { + if e := gotEntry.RangeInfo.Index*layout.EntryBundleWidth + uint64(gotEntry.RangeInfo.First); e != seenEntries { t.Fatalf("got idx %d, want %d", e, seenEntries) } - seenEntries += uint64(gotRI.N) - t.Logf("got RI %d / %d", gotRI.Index, seenEntries) + seenEntries += uint64(gotEntry.RangeInfo.N) + t.Logf("got RI %d / %d", gotEntry.RangeInfo.Index, seenEntries) switch seenEntries { case uint64(logSize1): @@ -522,9 +517,6 @@ func TestStreamEntries(t *testing.T) { t.Log("Reached logSize, growing tree") logSize.Store(uint64(logSize2)) time.Sleep(time.Second) - case uint64(logSize2): - // We've seen all the entries we created, stop the iterator - stop() } } } diff --git a/storage/gcp/antispam/gcp.go b/storage/gcp/antispam/gcp.go index 2ec9be254..2fefcc7c1 100644 --- a/storage/gcp/antispam/gcp.go +++ b/storage/gcp/antispam/gcp.go @@ -22,6 +22,7 @@ import ( "context" "errors" "fmt" + "iter" "sync/atomic" "time" @@ -221,8 +222,8 @@ func (f *follower) Follow(ctx context.Context, lr stream.Streamer) { t := time.NewTicker(time.Second) var ( - entryReader *stream.EntryStreamReader[[]byte] - stop func() + next func() (stream.Entry[[]byte], error, bool) + stop func() curEntries [][]byte curIndex uint64 @@ -239,8 +240,8 @@ func (f *follower) Follow(ctx context.Context, lr stream.Streamer) { continue } - // Busy loop while there's work to be done - for workDone := true; workDone; { + // Busy loop while there are entries to be consumed from the stream + for streamDone := false; !streamDone; { _, err = f.as.dbPool.ReadWriteTransaction(ctx, func(ctx context.Context, txn *spanner.ReadWriteTransaction) error { ctx, span := tracer.Start(ctx, "tessera.antispam.gcp.FollowTxn") defer span.End() @@ -260,7 +261,7 @@ func (f *follower) Follow(ctx context.Context, lr stream.Streamer) { followFrom := uint64(nextIdx) if followFrom >= size { // Our view of the log is out of date, exit the busy loop and refresh it. - workDone = false + streamDone = true return nil } @@ -270,11 +271,9 @@ func (f *follower) Follow(ctx context.Context, lr stream.Streamer) { // If this is the first time around the loop we need to start the stream of entries now that we know where we want to // start reading from: - if entryReader == nil { + if next == nil { span.AddEvent("Start streaming entries") - next, st := lr.StreamEntries(ctx, followFrom) - stop = st - entryReader = stream.NewEntryStreamReader(next, f.bundleHasher) + next, stop = iter.Pull2(stream.Entries(lr.StreamEntries(ctx, followFrom, size-followFrom), f.bundleHasher)) } if curIndex == followFrom && curEntries != nil { @@ -289,20 +288,30 @@ func (f *follower) Follow(ctx context.Context, lr stream.Streamer) { } batch := make([][]byte, 0, bs) for i := range int(bs) { - idx, c, err := entryReader.Next() + e, err, ok := next() + if !ok { + // The entry stream has ended so we'll need to start a new stream next time around the loop: + stop() + next = nil + break + } if err != nil { return fmt.Errorf("entryReader.next: %v", err) } - if wantIdx := followFrom + uint64(i); idx != wantIdx { + if wantIdx := followFrom + uint64(i); e.Index != wantIdx { // We're out of sync return errOutOfSync } - batch = append(batch, c) + batch = append(batch, e.Entry) } curEntries = batch curIndex = followFrom } + if len(curEntries) == 0 { + return nil + } + // Now update the index. { ms := make([]*spanner.Mutation, 0, len(curEntries)) @@ -328,7 +337,8 @@ func (f *follower) Follow(ctx context.Context, lr stream.Streamer) { klog.Errorf("Failed to commit antispam population tx: %v", err) } stop() - entryReader = nil + next = nil + streamDone = true continue } curEntries = nil diff --git a/storage/gcp/antispam/gcp_test.go b/storage/gcp/antispam/gcp_test.go index 2800e9316..2123d61f0 100644 --- a/storage/gcp/antispam/gcp_test.go +++ b/storage/gcp/antispam/gcp_test.go @@ -83,17 +83,21 @@ func TestAntispamStorage(t *testing.T) { // Hack in a workaround for spannertest not supporting BatchWrites f.(*follower).updateIndex = updateIndexTx + go f.Follow(ctx, fl.LogReader) + entryIndex := make(map[string]uint64) + a := tessera.NewPublicationAwaiter(t.Context(), fl.LogReader.ReadCheckpoint, 100*time.Millisecond) for i, e := range test.logEntries { - idx, err := fl.Appender.Add(t.Context(), tessera.NewEntry(e))() + entry := tessera.NewEntry(e) + f := fl.Appender.Add(t.Context(), entry) + idx, _, err := a.Await(t.Context(), f) if err != nil { - t.Fatalf("Add(%d): %v", i, err) + t.Fatalf("Await(%d): %v", i, err) } + klog.Infof("%d == %x", i, entry.Identity()) entryIndex[string(testIDHash(e))] = idx.Index } - go f.Follow(ctx, fl.LogReader) - for { time.Sleep(time.Second) pos, err := f.EntriesProcessed(ctx) diff --git a/storage/gcp/gcp.go b/storage/gcp/gcp.go index cd900cb47..acaa3391a 100644 --- a/storage/gcp/gcp.go +++ b/storage/gcp/gcp.go @@ -35,6 +35,7 @@ import ( "errors" "fmt" "io" + "iter" "net/http" "os" "path/filepath" @@ -183,16 +184,16 @@ func (lr *LogReader) NextIndex(ctx context.Context) (uint64, error) { return lr.nextIndex(ctx) } -func (lr *LogReader) StreamEntries(ctx context.Context, fromEntry uint64) (next func() (ri layout.RangeInfo, bundle []byte, err error), cancel func()) { +func (lr *LogReader) StreamEntries(ctx context.Context, startEntry, N uint64) iter.Seq2[stream.Bundle, error] { ctx, span := tracer.Start(ctx, "tessera.storage.gcp.StreamEntries") defer span.End() - klog.Infof("StreamEntries from %d", fromEntry) + klog.Infof("StreamEntries from %d", startEntry) // TODO(al): Consider making this configurable. // Requests to GCS can go super parallel without too much issue, but even just 10 concurrent requests seems to provide pretty good throughput. numWorkers := uint(10) - return stream.StreamAdaptor(ctx, numWorkers, lr.integratedSize, lr.lrs.getEntryBundle, fromEntry) + return stream.EntryBundles(ctx, numWorkers, lr.integratedSize, lr.lrs.getEntryBundle, startEntry, N) } func (s *Storage) Appender(ctx context.Context, opts *tessera.AppendOptions) (*tessera.Appender, tessera.LogReader, error) { diff --git a/storage/gcp/gcp_test.go b/storage/gcp/gcp_test.go index 78d033dce..df7b6a365 100644 --- a/storage/gcp/gcp_test.go +++ b/storage/gcp/gcp_test.go @@ -388,21 +388,16 @@ func TestStreamEntries(t *testing.T) { // We'll first try to stream up to logSize1, then when we reach it we'll // make the tree appear to grow to logSize2 to test resuming. seenEntries := uint64(0) - next, stop := s.StreamEntries(ctx, 0) - for { - gotRI, _, gotErr := next() + for gotEntry, gotErr := range s.StreamEntries(ctx, 0, uint64(logSize2)) { if gotErr != nil { - if errors.Is(gotErr, tessera.ErrNoMoreEntries) { - break - } t.Fatalf("gotErr after %d: %v", seenEntries, gotErr) } - if e := gotRI.Index*layout.EntryBundleWidth + uint64(gotRI.First); e != seenEntries { + if e := gotEntry.RangeInfo.Index*layout.EntryBundleWidth + uint64(gotEntry.RangeInfo.First); e != seenEntries { t.Fatalf("got idx %d, want %d", e, seenEntries) } - seenEntries += uint64(gotRI.N) - t.Logf("got RI %d / %d", gotRI.Index, seenEntries) + seenEntries += uint64(gotEntry.RangeInfo.N) + t.Logf("got RI %d / %d", gotEntry.RangeInfo.Index, seenEntries) switch seenEntries { case uint64(logSize1): @@ -412,9 +407,6 @@ func TestStreamEntries(t *testing.T) { t.Log("Reached logSize, growing tree") logSize.Store(uint64(logSize2)) time.Sleep(time.Second) - case uint64(logSize2): - // We've seen all the entries we created, stop the iterator - stop() } } } diff --git a/storage/mysql/mysql.go b/storage/mysql/mysql.go index 9395e85fe..328061235 100644 --- a/storage/mysql/mysql.go +++ b/storage/mysql/mysql.go @@ -35,6 +35,7 @@ import ( "github.com/transparency-dev/tessera/api" "github.com/transparency-dev/tessera/api/layout" "github.com/transparency-dev/tessera/internal/migrate" + "github.com/transparency-dev/tessera/internal/stream" storage "github.com/transparency-dev/tessera/storage/internal" "k8s.io/klog/v2" ) @@ -345,7 +346,7 @@ func (s *Storage) NextIndex(ctx context.Context) (uint64, error) { // index. // // This is part of the tessera LogReader contract. -func (s *Storage) StreamEntries(ctx context.Context, fromEntry uint64) (next func() (ri layout.RangeInfo, bundle []byte, err error), cancel func()) { +func (s *Storage) StreamEntries(ctx context.Context, startEntry, N uint64) iter.Seq2[stream.Bundle, error] { type riBundle struct { ri layout.RangeInfo b []byte @@ -358,120 +359,73 @@ func (s *Storage) StreamEntries(ctx context.Context, fromEntry uint64) (next fun // This happens when the returned cancel func is called. done := make(chan struct{}) - // Kick off a background goroutine which fills c. go func() { - var rangeInfoNext func() (layout.RangeInfo, bool) - var rangeInfoCancel func() - var rows *sql.Rows - nextEntry := fromEntry - - // reset should be called if we detect that something has gone wrong and/or we need to re-start our streaming. - reset := func() { - if rows != nil { - _ = rows.Close() - rows = nil - } - if rangeInfoCancel != nil { - rangeInfoCancel() - rangeInfoCancel = nil - rangeInfoNext = nil - } + defer close(c) + ts, err := s.readTreeState(ctx) + if err != nil { + klog.Warningf("Failed to read tree state: %v", err) + c <- riBundle{err: err} + return } - sleep := time.Duration(0) - tryAgain: - for { - // We'll keep going until the context is done, but don't want to hammer the DB when we've - // streamed all the current entries and are waiting for the tree to grow. + rows, err := s.db.QueryContext(ctx, streamTiledLeavesSQL, startEntry/layout.EntryBundleWidth) + if err != nil { + klog.Warningf("Failed to read entry bundle @%d: %v", startEntry/layout.EntryBundleWidth, err) + c <- riBundle{err: err} + return + } + defer func() { + if err := rows.Close(); err != nil { + klog.Warningf("Failed to Close rows: %v", err) + } + }() + + nextRange, stopRange := iter.Pull(layout.Range(startEntry, N, ts.size)) + defer stopRange() + + for rows.Next() { select { - case <-ctx.Done(): - return case <-done: - close(c) return - case <-time.After(sleep): - // We avoid pausing unnecessarily the first time we enter the loop by initialising sleep to zero, but - // subsequent iterations around the loop _should_ sleep to avoid hammering the DB when we've caught up with - // all the entries it contains. - sleep = time.Second + default: } - // Check if we need to (re-) setup the data stream, and do it if so. - if rangeInfoNext == nil { - // We need to know what the current local tree size is. - ts, err := s.readTreeState(ctx) - if err != nil { - klog.Warningf("Failed to read tree state: %v", err) - reset() - continue - } - klog.Infof("StreamEntries scanning %d -> %d", fromEntry, ts.size) - // And we need the corresponding range info which tell us the "shape" of the entry bundles. - rangeInfoNext, rangeInfoCancel = iter.Pull(layout.Range(nextEntry, ts.size, ts.size)) - nextBundle := nextEntry / layout.EntryBundleWidth - // Finally, we need the actual raw entry bundles themselves. - rows, err = s.db.QueryContext(ctx, streamTiledLeavesSQL, nextBundle) - if err != nil { - klog.Warningf("Failed to read entry bundle @%d: %v", nextBundle, err) - reset() - continue - } + ri, ok := nextRange() + if !ok { + return } // Now we can iterate over the streams we've set up above, and turn the data into the right form // for sending over c, to be returned to the caller via the next func. var idx, size uint64 var data []byte - for rows.Next() { - // Parse a bundle from the DB. - if err := rows.Scan(&idx, &size, &data); err != nil { - reset() - c <- riBundle{err: err} - continue tryAgain - } - // And grab the corresponding range info which describes it. - ri, ok := rangeInfoNext() - if !ok { - reset() - continue tryAgain - } - // The bundle data and the range info MUST refer to the same entry bundle index, so assert that they do. - if idx != ri.Index { - // Something's gone wonky - our rangeinfo and entry bundle streams are no longer lined up. - // Bail and set up the streams again. - klog.Infof("Out of sync, got entrybundle index %d, but rangeinfo for index %d", idx, ri.Index) - reset() - continue tryAgain - } - // All good, so queue up the data to be returned via calls to next. - klog.V(1).Infof("Sending %v", ri) - c <- riBundle{ri: ri, b: data} - nextEntry += uint64(ri.N) + // Parse a bundle from the DB. + if err := rows.Scan(&idx, &size, &data); err != nil { + c <- riBundle{err: err} + return + } + // The bundle data and the range info MUST refer to the same entry bundle index, so assert that they do. + if idx != ri.Index { + // Something's gone wonky - our rangeinfo and entry bundle streams are no longer lined up. + // Bail and set up the streams again. + klog.Infof("Out of sync, got entrybundle index %d, but rangeinfo for index %d", idx, ri.Index) + return } - klog.V(1).Infof("StreamEntries: no more entry bundle rows, will retry") - // We have no more rows coming from the entrybundle table of the DB, so go around again and re-check - // the tree size in case it's grown since we started the query. - reset() + // All good, so queue up the data to be returned via calls to next. + klog.V(1).Infof("Sending %v", ri) + c <- riBundle{ri: ri, b: data} } + klog.V(1).Infof("StreamEntries: no more entry bundle rows, exiting") }() - // This is the implementation of the next function we'll return to the caller. - // They'll call this repeatedly to consume entries from c. - next = func() (layout.RangeInfo, []byte, error) { - select { - case <-ctx.Done(): - return layout.RangeInfo{}, nil, ctx.Err() - case r, ok := <-c: - if !ok { - return layout.RangeInfo{}, nil, errors.New("no more entries") + return func(yield func(stream.Bundle, error) bool) { + defer close(done) + for r := range c { + if !yield(stream.Bundle{RangeInfo: r.ri, Data: r.b}, r.err) { + return } - return r.ri, r.b, r.err } } - - return next, func() { - close(done) - } } // dbExecContext describes something which can support the sql ExecContext function. diff --git a/storage/mysql/mysql_test.go b/storage/mysql/mysql_test.go index 30579ee5f..6db351072 100644 --- a/storage/mysql/mysql_test.go +++ b/storage/mysql/mysql_test.go @@ -510,20 +510,18 @@ func TestStreamEntries(t *testing.T) { } { t.Run(test.name, func(t *testing.T) { populateEntries(t, test.treeSize, s) - next, cancel := s.StreamEntries(context.Background(), test.streamFrom) wantBundle := uint64(test.streamFrom / layout.EntryBundleWidth) nBundles := test.treeSize / layout.EntryBundleWidth - for i := 0; ; i++ { - ri, b, err := next() + for b, err := range s.StreamEntries(context.Background(), test.streamFrom, test.treeSize-test.streamFrom) { if err != nil { t.Fatalf("expecting bundle %d, next: %v", wantBundle, err) } - if ri.Index != wantBundle { - t.Fatalf("got bundle %d, want %d", ri.Index, wantBundle) + if b.RangeInfo.Index != wantBundle { + t.Fatalf("got bundle %d, want %d", b.RangeInfo.Index, wantBundle) } - verifyBundle(t, wantBundle, b) + verifyBundle(t, wantBundle, b.Data) - if i == int(test.growAfterBundle) && test.growTreeSize > 0 { + if b.RangeInfo.Index == test.growAfterBundle && test.growTreeSize > 0 { populateEntries(t, test.growTreeSize, s) nBundles = test.growTreeSize / layout.EntryBundleWidth } @@ -532,7 +530,6 @@ func TestStreamEntries(t *testing.T) { t.Logf("want %d N %d", wantBundle, nBundles) if wantBundle >= nBundles { - cancel() break } } diff --git a/storage/posix/antispam/badger.go b/storage/posix/antispam/badger.go index 0bb7a9ae9..018c740e4 100644 --- a/storage/posix/antispam/badger.go +++ b/storage/posix/antispam/badger.go @@ -22,6 +22,7 @@ import ( "encoding/binary" "errors" "fmt" + "iter" "sync/atomic" "time" @@ -213,8 +214,8 @@ func (f *follower) Follow(ctx context.Context, lr stream.Streamer) { t := time.NewTicker(time.Second) var ( - entryReader *stream.EntryStreamReader[[]byte] - stop func() + next func() (stream.Entry[[]byte], error, bool) + stop func() curEntries [][]byte curIndex uint64 @@ -253,7 +254,6 @@ func (f *follower) Follow(ctx context.Context, lr stream.Streamer) { return fmt.Errorf("failed to get nextIdx value: %v", err) } } - klog.Infof("Following from %d", followFrom) span.SetAttributes(followFromKey.Int64(otel.Clamp64(followFrom))) @@ -269,11 +269,9 @@ func (f *follower) Follow(ctx context.Context, lr stream.Streamer) { // If this is the first time around the loop we need to start the stream of entries now that we know where we want to // start reading from: - if entryReader == nil { + if next == nil { span.AddEvent("Start streaming entries") - next, st := lr.StreamEntries(ctx, followFrom) - stop = st - entryReader = stream.NewEntryStreamReader(next, f.bundleHasher) + next, stop = iter.Pull2(stream.Entries(lr.StreamEntries(ctx, followFrom, size-followFrom), f.bundleHasher)) } if curIndex == followFrom && curEntries != nil { @@ -288,16 +286,21 @@ func (f *follower) Follow(ctx context.Context, lr stream.Streamer) { } batch := make([][]byte, 0, bs) for i := range int(bs) { - idx, c, err := entryReader.Next() + e, err, ok := next() + if !ok { + // The entry stream has ended so we'll need to start a new stream next time around the loop: + next = nil + break + } if err != nil { return fmt.Errorf("entryReader.next: %v", err) } - if wantIdx := followFrom + uint64(i); idx != wantIdx { - klog.Infof("at %d, expected %d - out of sync", idx, wantIdx) + if wantIdx := followFrom + uint64(i); e.Index != wantIdx { + klog.Infof("at %d, expected %d - out of sync", e.Index, wantIdx) // We're out of sync return errOutOfSync } - batch = append(batch, c) + batch = append(batch, e.Entry) } curEntries = batch curIndex = followFrom @@ -333,7 +336,7 @@ func (f *follower) Follow(ctx context.Context, lr stream.Streamer) { klog.Errorf("Failed to commit antispam population tx: %v", err) } stop() - entryReader = nil + next = nil continue } curEntries = nil diff --git a/storage/posix/files.go b/storage/posix/files.go index b742231a3..a77a04b37 100644 --- a/storage/posix/files.go +++ b/storage/posix/files.go @@ -21,6 +21,7 @@ import ( "fmt" "io" "io/fs" + "iter" "net/http" "os" "path/filepath" @@ -204,13 +205,13 @@ func (l *logResourceStorage) NextIndex(ctx context.Context) (uint64, error) { return l.IntegratedSize(ctx) } -func (l *logResourceStorage) StreamEntries(ctx context.Context, fromEntry uint64) (next func() (ri layout.RangeInfo, bundle []byte, err error), cancel func()) { +func (l *logResourceStorage) StreamEntries(ctx context.Context, startEntry, N uint64) iter.Seq2[stream.Bundle, error] { // TODO(al): Consider making this configurable. // The performance of different levels of concurrency here will depend very much on the nature of the underlying storage infra, // e.g. NVME will likely respond well to some concurrency, HDD less so. // For now, we'll just stick to a safe default. numWorkers := uint(1) - return stream.StreamAdaptor(ctx, numWorkers, l.IntegratedSize, l.ReadEntryBundle, fromEntry) + return stream.EntryBundles(ctx, numWorkers, l.IntegratedSize, l.ReadEntryBundle, startEntry, N) } // sequenceBatch writes the entries from the provided batch into the entry bundle files of the log.