diff --git a/cmd/experimental/mirror/internal/mirror.go b/cmd/experimental/mirror/internal/mirror.go index 9458a82cc..a3109d773 100644 --- a/cmd/experimental/mirror/internal/mirror.go +++ b/cmd/experimental/mirror/internal/mirror.go @@ -17,8 +17,11 @@ package mirror import ( "context" + "errors" "fmt" + "iter" "log" + "os" "sync/atomic" "github.com/avast/retry-go/v4" @@ -27,16 +30,17 @@ import ( "golang.org/x/sync/errgroup" ) -// Store describes a type which can store log static resources. -type Store interface { +// Target describes a type which can store log static resources. +type Target interface { + ReadCheckpoint(ctx context.Context) ([]byte, error) WriteCheckpoint(ctx context.Context, data []byte) error WriteTile(ctx context.Context, l, i uint64, p uint8, data []byte) error WriteEntryBundle(ctx context.Context, i uint64, p uint8, data []byte) error } -// Fetcher describes a type which can fetch static resources from a source log, like +// Source describes a type which can fetch static resources from a source log, like // the .*Fetcher implementations in the client package. -type Fetcher interface { +type Source interface { ReadCheckpoint(ctx context.Context) ([]byte, error) ReadTile(ctx context.Context, l, i uint64, p uint8) ([]byte, error) ReadEntryBundle(_ context.Context, i uint64, p uint8) ([]byte, error) @@ -52,8 +56,8 @@ type Fetcher interface { // the copied tiles/entries/checkpoint is undertaken. type Mirror struct { NumWorkers uint - Src Fetcher - Store Store + Source Source + Target Target totalResources uint64 resourcesFetched atomic.Uint64 @@ -64,17 +68,28 @@ type Mirror struct { // This is a long-lived operation, returning only once ctx becomes Done, the copy is completed, // or an error occurs during the operation. func (m *Mirror) Run(ctx context.Context) error { - sourceCP, err := m.Src.ReadCheckpoint(ctx) + sourceCP, sourceSize, err := fetchAndParseCP(ctx, m.Source.ReadCheckpoint) if err != nil { - return fmt.Errorf("failed to fetch initial source checkpoint: %v", err) + return fmt.Errorf("failed to fetch source checkpoint size: %v", err) } - _, srcSize, _, err := parse.CheckpointUnsafe(sourceCP) - if err != nil { - return fmt.Errorf("invalid CP: %v", err) + _, targetSize, err := fetchAndParseCP(ctx, m.Target.ReadCheckpoint) + if err != nil && !errors.Is(err, os.ErrNotExist) { + return fmt.Errorf("failed to read checkpoint in target: %v", err) } - log.Printf("Source log size: %d", srcSize) - m.totalResources = (srcSize/layout.TileWidth)*3 - 1 + delta := sourceSize - targetSize + stride := delta / uint64(m.NumWorkers) + if r := stride % layout.TileWidth; r != 0 { + stride += (layout.TileWidth - r) + } + + log.Printf("Source log size: %d, target log size: %d, ∆ %d, stride: %d", sourceSize, targetSize, delta, stride) + + if delta == 0 { + return nil + } + + m.totalResources = calcNumResources(sourceSize, targetSize, stride) m.resourcesFetched = atomic.Uint64{} work := make(chan job, m.NumWorkers) @@ -82,22 +97,12 @@ func (m *Mirror) Run(ctx context.Context) error { go func() { defer close(work) - stride := srcSize / uint64(m.NumWorkers) - if r := stride % layout.TileWidth; r != 0 { - stride += (layout.TileWidth - r) - } - log.Printf("Stride: %d", stride) - - for ext, l := srcSize-1, uint64(0); ext > 0; ext, l = uint64(ext>>layout.TileHeight), l+1 { - for from := uint64(0); from < ext; { - N := min(stride, ext-from) - select { - case <-ctx.Done(): - return - case work <- job{level: l, from: from, N: N}: - log.Printf("Level %d, [%d, %d) ", l, from, from+N) - } - from = from + N + for j := range jobs(sourceSize, targetSize, stride) { + select { + case <-ctx.Done(): + return + case work <- j: + log.Printf("Job: %s", j) } } log.Println("No more work") @@ -108,7 +113,7 @@ func (m *Mirror) Run(ctx context.Context) error { g.Go(func() error { for j := range work { log.Printf("Worker %d: working on %s", i, j) - for ri := range layout.Range(j.from, j.N, srcSize>>(j.level*layout.TileHeight)) { + for ri := range layout.Range(j.from, j.N, sourceSize>>(j.level*layout.TileHeight)) { if err := retry.Do(m.copyTile(ctx, j.level, ri.Index, ri.Partial)); err != nil { log.Println(err.Error()) return err @@ -128,7 +133,7 @@ func (m *Mirror) Run(ctx context.Context) error { if err := g.Wait(); err != nil { return fmt.Errorf("failed to migrate static resources: %v", err) } - return m.Store.WriteCheckpoint(ctx, sourceCP) + return m.Target.WriteCheckpoint(ctx, sourceCP) } // Progress returns the total number of resources present in the source log, and the number of resources @@ -140,34 +145,32 @@ func (m *Mirror) Progress() (uint64, uint64) { // copyTile reads a tile from the source log and stores it into the same location in the destination log. func (m *Mirror) copyTile(ctx context.Context, l, i uint64, p uint8) func() error { return func() error { - d, err := m.Src.ReadTile(ctx, l, i, p) + d, err := m.Source.ReadTile(ctx, l, i, p) if err != nil { log.Println(err.Error()) return err } - if err := m.Store.WriteTile(ctx, l, i, p, d); err != nil { + if err := m.Target.WriteTile(ctx, l, i, p, d); err != nil { return err } m.resourcesFetched.Add(1) return nil - } } // copyBundle reads an entry bundle from the source log and stores it into the same location in the destination log. func (m *Mirror) copyBundle(ctx context.Context, i uint64, p uint8) func() error { return func() error { - d, err := m.Src.ReadEntryBundle(ctx, i, p) + d, err := m.Source.ReadEntryBundle(ctx, i, p) if err != nil { log.Println(err.Error()) return err } - if err := m.Store.WriteEntryBundle(ctx, i, p, d); err != nil { + if err := m.Target.WriteEntryBundle(ctx, i, p, d); err != nil { return err } m.resourcesFetched.Add(1) return nil - } } @@ -178,3 +181,49 @@ type job struct { func (j job) String() string { return fmt.Sprintf("Level: %d, Range: [%d, %d)", j.level, j.from, j.from+j.N) } + +func jobs(srcSize, targetSize, stride uint64) iter.Seq[job] { + return func(yield func(job) bool) { + for start, ext, l := targetSize, srcSize, uint64(0); ext > 0; start, ext, l = start>>layout.TileHeight, uint64(ext>>layout.TileHeight), l+1 { + for from := start; from < ext; { + N := stride + // If we're starting from a partial tile, then just fetch the remainder first so we're tile-aligned from then on. + if r := from % layout.TileWidth; r != 0 { + N = layout.TileWidth - r + } + N = min(N, ext-from) + if !yield(job{level: l, from: from, N: N}) { + return + } + from = from + N + } + } + + } +} + +// calcNumResources calculates the number of new static resources which need to be mirrored, given the +// size of the source and target. +func calcNumResources(srcSize, targetSize, stride uint64) uint64 { + leafBundles := uint64(0) + tiles := uint64(0) + + for j := range jobs(srcSize, targetSize, stride) { + nTiles := (j.N + layout.TileWidth - 1) / layout.TileWidth + tiles += nTiles + if j.level == 0 { + leafBundles += nTiles + } + } + + return leafBundles + tiles +} + +func fetchAndParseCP(ctx context.Context, f func(context.Context) ([]byte, error)) ([]byte, uint64, error) { + cp, err := f(ctx) + if err != nil { + return nil, 0, err + } + _, size, _, err := parse.CheckpointUnsafe(cp) + return cp, size, err +} diff --git a/cmd/experimental/mirror/posix/main.go b/cmd/experimental/mirror/posix/main.go index 98911ea14..c06e69fe9 100644 --- a/cmd/experimental/mirror/posix/main.go +++ b/cmd/experimental/mirror/posix/main.go @@ -38,6 +38,7 @@ var ( func main() { klog.InitFlags(nil) + klog.CopyStandardLogTo("INFO") flag.Parse() ctx := context.Background() @@ -52,8 +53,8 @@ func main() { m := &mirror.Mirror{ NumWorkers: *numWorkers, - Src: src, - Store: &posixStore{root: *storageDir}, + Source: src, + Target: &posixTarget{root: *storageDir}, } // Print out stats. @@ -64,9 +65,7 @@ func main() { case <-ctx.Done(): return case <-t.C: - total, done := m.Progress() - p := float64(done*100) / float64(total) - klog.Infof("Progress %d of %d resources (%0.2f%%)", done, total, p) + printProgress(m.Progress) } } }() @@ -75,26 +74,41 @@ func main() { klog.Exitf("Failed to mirror log: %v", err) } + printProgress(m.Progress) klog.Info("Log mirrored successfully.") } -type posixStore struct { +func printProgress(f func() (uint64, uint64)) { + total, done := f() + p := float64(done*100) / float64(total) + // Let's just say we're 100% done if we've completed no work when nothing needed doing. + if total == done && done == 0 { + p = 100.0 + } + klog.Infof("Progress: %d of %d resources (%0.2f%%)", done, total, p) +} + +type posixTarget struct { root string } -func (s *posixStore) WriteCheckpoint(_ context.Context, d []byte) error { +func (s *posixTarget) ReadCheckpoint(_ context.Context) ([]byte, error) { + return os.ReadFile(filepath.Join(s.root, layout.CheckpointPath)) +} + +func (s *posixTarget) WriteCheckpoint(_ context.Context, d []byte) error { return s.store(layout.CheckpointPath, d) } -func (s *posixStore) WriteTile(_ context.Context, l, i uint64, p uint8, d []byte) error { +func (s *posixTarget) WriteTile(_ context.Context, l, i uint64, p uint8, d []byte) error { return s.store(layout.TilePath(l, i, p), d) } -func (s *posixStore) WriteEntryBundle(_ context.Context, i uint64, p uint8, d []byte) error { +func (s *posixTarget) WriteEntryBundle(_ context.Context, i uint64, p uint8, d []byte) error { return s.store(layout.EntriesPath(i, p), d) } -func (s *posixStore) store(p string, d []byte) (err error) { +func (s *posixTarget) store(p string, d []byte) (err error) { fp := filepath.Join(s.root, p) if err := os.MkdirAll(filepath.Dir(fp), 0o755); err != nil { return err