Skip to content
16 changes: 7 additions & 9 deletions internal/fsck/fsck.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,6 @@ func Check(ctx context.Context, origin string, verifier note.Verifier, f Fetcher
}

// Set up a stream of entry bundles from the log to be checked.
getSize := func(_ context.Context) (uint64, error) { return cp.Size, nil }
next, cancel := stream.StreamAdaptor(ctx, N, getSize, f.ReadEntryBundle, 0)
defer cancel()

eg := errgroup.Group{}

Expand All @@ -81,16 +78,17 @@ func Check(ctx context.Context, origin string, verifier note.Verifier, f Fetcher
eg.Go(fTree.resourceCheckWorker(ctx))
}

getSize := func(_ context.Context) (uint64, error) { return cp.Size, nil }
// Consume the stream of bundles to re-derive the other log resources.
// TODO(al): consider chunking the log and doing each in parallel.
for fTree.tree.End() < cp.Size {
ri, b, err := next()
for b, err := range stream.EntryBundles(ctx, N, getSize, f.ReadEntryBundle, 0, cp.Size) {
if err != nil {
klog.Warningf("next: %v", err)
break
return fmt.Errorf("error while streaming bundles: %v", err)
}
if err := fTree.AppendBundle(b.RangeInfo, b.Data); err != nil {
return fmt.Errorf("failure calling AppendBundle(%v): %v", b.RangeInfo, err)
}
if err := fTree.AppendBundle(ri, b); err != nil {
klog.Warningf("AppendBundle(%v): %v", ri, err)
if fTree.tree.End() >= cp.Size {
break
}
}
Expand Down
23 changes: 8 additions & 15 deletions internal/stream/follower.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ package stream

import (
"context"

"github.com/transparency-dev/tessera/api/layout"
"iter"
)

// Follower describes the contract of something which is required to track the contents of the local log.
Expand Down Expand Up @@ -56,19 +55,13 @@ type Streamer interface {
// entries into account.
NextIndex(ctx context.Context) (uint64, error)

// StreamEntries() returns functions `next` and `stop` which act like a pull iterator for
// consecutive entry bundles, starting with the entry bundle which contains the requested entry
// index.
//
// Each call to `next` will return raw entry bundle bytes along with a RangeInfo struct which
// contains information on which entries within that bundle are to be considered valid.
// StreamEntries returns an iterator over the range of requested entries [startEntryIdx, startEntryIdx+N).
//
// next will hang if it has reached the extent of the current tree, and return once either
// the tree has grown and more entries are available, or cancel was called.
// The iterator will yield either a Bundle struct or an error. If an error is returned the caller should
// stop consuming from the iterator as it's unlikely that a partial stream of entries from a transparency log
// is useful.
//
// next will cease iterating if either:
// - it produces an error (e.g. via the underlying calls to the log storage)
// - the returned cancel function is called
// and will continue to return an error if called again after either of these cases.
StreamEntries(ctx context.Context, fromEntryIdx uint64) (next func() (layout.RangeInfo, []byte, error), cancel func())
// The returned Bundle contains the raw serialised form of the entry bundle, along with a layout.RangeInfo
// struct that describes which of the entries in the entry bundle are part of the requested range.
StreamEntries(ctx context.Context, startEntryIdx, N uint64) iter.Seq2[Bundle, error]
}
210 changes: 89 additions & 121 deletions internal/stream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,46 +17,45 @@ package stream

import (
"context"
"errors"
"fmt"
"time"
"iter"

"github.com/transparency-dev/tessera/api/layout"
"k8s.io/klog/v2"
)

// NoMoreEntries is a sentinel error returned by StreamEntries when no more entries will be returned by calls to the next function.
var ErrNoMoreEntries = errors.New("no more entries")

// GetBundleFn is a function which knows how to fetch a single entry bundle from the specified address.
type GetBundleFn func(ctx context.Context, bundleIdx uint64, partial uint8) ([]byte, error)

// GetTreeSizeFn is a function which knows how to return a tree size.
type GetTreeSizeFn func(ctx context.Context) (uint64, error)

// StreamAdaptor uses the provided function to produce a stream of entry bundles accesible via the returned functions.
//
// Entry bundles are retuned strictly in order via consecutive calls to the returned next func.
// If the adaptor encounters an error while reading an entry bundle, the encountered error will be returned by the corresponding call to next,
// and the stream will be stopped - further calls to next will continue to return errors.
// Bundle represents an entry bundle in a log, along with some metadata about which parts of the bundle
// are relevent.
type Bundle struct {
// RangeInfo decribes which of the entries in this bundle are relevent.
RangeInfo layout.RangeInfo
// Data is the raw serialised bundle, as fetched from the log.
Data []byte
}

// EntryBundles produces an iterator which returns a stream of Bundle structs which cover the requested range of entries in their natural order in the log.
//
// When the caller has finished consuming entry bundles (either because of an error being returned via next, or having consumed all the bundles it needs),
// it MUST call the returned cancel function to release resources.
// If the adaptor encounters an error while reading an entry bundle, the encountered error will be returned via the iterator.
//
// This adaptor is optimised for the case where calling getBundle has some appreciable latency, and works
// around that by maintaining a read-ahead cache of subsequent bundles which is populated a number of parallel
// requests to getBundle. The request parallelism is set by the value of the numWorkers paramemter, which can be tuned
// to balance throughput against consumption of resources, but such balancing needs to be mindful of the nature of the
// source infrastructure, and how concurrent requests affect performance (e.g. GCS buckets vs. files on a single disk).
func StreamAdaptor(ctx context.Context, numWorkers uint, getSize GetTreeSizeFn, getBundle GetBundleFn, fromEntry uint64) (next func() (ri layout.RangeInfo, bundle []byte, err error), cancel func()) {
func EntryBundles(ctx context.Context, numWorkers uint, getSize GetTreeSizeFn, getBundle GetBundleFn, fromEntry uint64, N uint64) iter.Seq2[Bundle, error] {
ctx, span := tracer.Start(ctx, "tessera.storage.StreamAdaptor")
defer span.End()

// bundleOrErr represents a fetched entry bundle and its params, or an error if we couldn't fetch it for
// some reason.
type bundleOrErr struct {
ri layout.RangeInfo
b []byte
b Bundle
err error
}

Expand All @@ -76,136 +75,105 @@ func StreamAdaptor(ctx context.Context, numWorkers uint, getSize GetTreeSizeFn,

defer close(bundles)

treeSize, err := getSize(ctx)
if err != nil {
bundles <- func() bundleOrErr { return bundleOrErr{err: err} }
return
}

// We'll limit ourselves to numWorkers worth of on-going work using these tokens:
tokens := make(chan struct{}, numWorkers)
for range numWorkers {
tokens <- struct{}{}
}

// We'll keep looping around until told to exit.
for {
// Check afresh what size the tree is so we can keep streaming entries as the tree grows.
treeSize, err := getSize(ctx)
if err != nil {
klog.Warningf("StreamAdaptor: failed to get current tree size: %v", err)
continue
}
klog.V(1).Infof("StreamAdaptor: streaming from %d to %d", fromEntry, treeSize)

// For each bundle, pop a future into the bundles channel and kick off an async request
// to resolve it.
rangeLoop:
for ri := range layout.Range(fromEntry, treeSize, treeSize) {
select {
case <-exit:
break rangeLoop
case <-tokens:
// We'll return a token below, once the bundle is fetched _and_ is being yielded.
}

c := make(chan bundleOrErr, 1)
go func(ri layout.RangeInfo) {
b, err := getBundle(ctx, ri.Index, ri.Partial)
c <- bundleOrErr{ri: ri, b: b, err: err}
}(ri)

f := func() bundleOrErr {
b := <-c
// We're about to yield a value, so we can now return the token and unblock another fetch.
tokens <- struct{}{}
return b
}

bundles <- f
}

// Next loop, carry on from where we got to.
fromEntry = treeSize
klog.V(1).Infof("stream.EntryBundles: streaming [%d, %d)", fromEntry, fromEntry+N)

// For each bundle, pop a future into the bundles channel and kick off an async request
// to resolve it.
for ri := range layout.Range(fromEntry, fromEntry+N, treeSize) {
select {
case <-exit:
klog.Infof("StreamAdaptor: exiting")
return
case <-time.After(time.Second):
// We've caught up with and hit the end of the tree, so wait a bit before looping to avoid busy waiting.
// TODO(al): could consider a shallow channel of sizes here.
case <-tokens:
// We'll return a token below, once the bundle is fetched _and_ is being yielded.
}
}
}()

cancel = func() {
close(exit)
}
c := make(chan bundleOrErr, 1)
go func(ri layout.RangeInfo) {
b, err := getBundle(ctx, ri.Index, ri.Partial)
c <- bundleOrErr{b: Bundle{RangeInfo: ri, Data: b}, err: err}
}(ri)

f := func() bundleOrErr {
b := <-c
// We're about to yield a value, so we can now return the token and unblock another fetch.
tokens <- struct{}{}
return b
}

var streamErr error
next = func() (layout.RangeInfo, []byte, error) {
if streamErr != nil {
return layout.RangeInfo{}, nil, streamErr
bundles <- f
}

f, ok := <-bundles
if !ok {
streamErr = ErrNoMoreEntries
return layout.RangeInfo{}, nil, streamErr
}
b := f()
if b.err != nil {
streamErr = b.err
klog.V(1).Infof("stream.EntryBundles: exiting")
}()

return func(yield func(Bundle, error) bool) {
defer close(exit)

for f := range bundles {
b := f()
if !yield(b.b, b.err) {
return
}
// For now, force the iterator to stop if we've just returned an error.
// If there's a good reason to allow it to continue we can change this.
if b.err != nil {
return
}
}
return b.ri, b.b, b.err
klog.V(1).Infof("stream.EntryBundles: iter done")
}
return next, cancel
}

// EntryStreamReader converts a stream of {RangeInfo, EntryBundle} into a stream of individually processed entries.
//
// This is mostly useful to Follower implementations which need to parse and consume individual entries being streamed
// from a LogReader.
type EntryStreamReader[T any] struct {
bundleFn func([]byte) ([]T, error)
next func() (layout.RangeInfo, []byte, error)

curData []T
curRI layout.RangeInfo
i uint64
// Entry represents a single leaf in a log.
type Entry[T any] struct {
// Index is the index of the entry in the log.
Index uint64
// Entry is the entry from the log.
Entry T
}

// NewEntryStreamReader creates a new stream reader which uses the provided bundleFn to process bundles into processed entries of type T.
// Entries creates a new stream reader which uses the provided bundleFn to process bundles into processed entries of type T.
//
// Different bundleFn implementations can be provided to return raw entry bytes, parsed entry structs, or derivations of entries (e.g. hashes) as needed.
func NewEntryStreamReader[T any](next func() (layout.RangeInfo, []byte, error), bundleFn func([]byte) ([]T, error)) *EntryStreamReader[T] {
return &EntryStreamReader[T]{
bundleFn: bundleFn,
next: next,
i: 0,
}
}

// Next processes and returns the next available entry in the stream along with its index in the log.
func (e *EntryStreamReader[T]) Next() (uint64, T, error) {
var t T
if len(e.curData) == 0 {
var err error
var b []byte
e.curRI, b, err = e.next()
if err != nil {
return 0, t, fmt.Errorf("next: %v", err)
}
e.curData, err = e.bundleFn(b)
if err != nil {
return 0, t, fmt.Errorf("bundleFn(bundleEntry @%d): %v", e.curRI.Index, err)
func Entries[T any](bundles iter.Seq2[Bundle, error], bundleFn func([]byte) ([]T, error)) iter.Seq2[Entry[T], error] {
return func(yield func(Entry[T], error) bool) {
for b, err := range bundles {
if err != nil {
yield(Entry[T]{}, err)
return
}
es, err := bundleFn(b.Data)
if err != nil {
yield(Entry[T]{}, err)
return
}
if len(es) <= int(b.RangeInfo.First) {
yield(Entry[T]{}, fmt.Errorf("logic error: First is %d but only %d entries", b.RangeInfo.First, len(es)))
return
}
es = es[b.RangeInfo.First:]
if len(es) > int(b.RangeInfo.N) {
es = es[:b.RangeInfo.N]
}

rIdx := b.RangeInfo.Index*layout.EntryBundleWidth + uint64(b.RangeInfo.First)
for i, e := range es {
if !yield(Entry[T]{Index: rIdx + uint64(i), Entry: e}, nil) {
return
}
}
}
if e.curRI.First > 0 {
e.curData = e.curData[e.curRI.First:]
}
if len(e.curData) > int(e.curRI.N) {
e.curData = e.curData[:e.curRI.N]
}
e.i = 0
}
t, e.curData = e.curData[0], e.curData[1:]
rIdx := e.curRI.Index*layout.EntryBundleWidth + uint64(e.curRI.First) + e.i
e.i++
return rIdx, t, nil
}
38 changes: 1 addition & 37 deletions lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,9 @@ import (

"github.com/transparency-dev/merkle/rfc6962"
"github.com/transparency-dev/tessera/api"
"github.com/transparency-dev/tessera/api/layout"
"github.com/transparency-dev/tessera/internal/stream"
)

// NoMoreEntries is a sentinel error returned by StreamEntries when no more entries will be returned by calls to the next function.
var ErrNoMoreEntries = stream.ErrNoMoreEntries

// LogReader provides read-only access to the log.
type LogReader interface {
// ReadCheckpoint returns the latest checkpoint available.
Expand All @@ -53,39 +49,7 @@ type LogReader interface {
// The expected usage and corresponding behaviours are similar to ReadTile.
ReadEntryBundle(ctx context.Context, index uint64, p uint8) ([]byte, error)

// IntegratedSize returns the current size of the integrated tree.
//
// This tree will have in place all the static resources the returned size implies, but
// there may not yet be a checkpoint for this size signed, witnessed, or published.
//
// It's ONLY safe to use this value for processes internal to the operation of the log (e.g.
// populating antispam data structures); it MUST NOT not be used as a substitute for
// reading the checkpoint when only data which has been publicly committed to by the
// log should be used. If in doubt, use ReadCheckpoint instead.
IntegratedSize(ctx context.Context) (uint64, error)

// NextIndex returns the first as-yet unassigned index.
//
// In a quiescent log, this will be the same as the checkpoint size. In a log with entries actively
// being added, this number will be higher since it will take sequenced but not-yet-integrated/not-yet-published
// entries into account.
NextIndex(ctx context.Context) (uint64, error)

// StreamEntries() returns functions `next` and `stop` which act like a pull iterator for
// consecutive entry bundles, starting with the entry bundle which contains the requested entry
// index.
//
// Each call to `next` will return raw entry bundle bytes along with a RangeInfo struct which
// contains information on which entries within that bundle are to be considered valid.
//
// next will hang if it has reached the extent of the current tree, and return once either
// the tree has grown and more entries are available, or cancel was called.
//
// next will cease iterating if either:
// - it produces an error (e.g. via the underlying calls to the log storage)
// - the returned cancel function is called
// and will continue to return an error if called again after either of these cases.
StreamEntries(ctx context.Context, fromEntryIdx uint64) (next func() (layout.RangeInfo, []byte, error), cancel func())
stream.Streamer
}

// Antispam describes the contract that an antispam implementation must meet in order to be used via the
Expand Down
Loading
Loading