Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 86 additions & 37 deletions cmd/experimental/mirror/internal/mirror.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@ package mirror

import (
"context"
"errors"
"fmt"
"iter"
"log"
"os"
"sync/atomic"

"github.com/avast/retry-go/v4"
Expand All @@ -27,16 +30,17 @@ import (
"golang.org/x/sync/errgroup"
)

// Store describes a type which can store log static resources.
type Store interface {
// Target describes a type which can store log static resources.
type Target interface {
ReadCheckpoint(ctx context.Context) ([]byte, error)
WriteCheckpoint(ctx context.Context, data []byte) error
WriteTile(ctx context.Context, l, i uint64, p uint8, data []byte) error
WriteEntryBundle(ctx context.Context, i uint64, p uint8, data []byte) error
}

// Fetcher describes a type which can fetch static resources from a source log, like
// Source describes a type which can fetch static resources from a source log, like
// the .*Fetcher implementations in the client package.
type Fetcher interface {
type Source interface {
ReadCheckpoint(ctx context.Context) ([]byte, error)
ReadTile(ctx context.Context, l, i uint64, p uint8) ([]byte, error)
ReadEntryBundle(_ context.Context, i uint64, p uint8) ([]byte, error)
Expand All @@ -52,8 +56,8 @@ type Fetcher interface {
// the copied tiles/entries/checkpoint is undertaken.
type Mirror struct {
NumWorkers uint
Src Fetcher
Store Store
Source Source
Target Target

totalResources uint64
resourcesFetched atomic.Uint64
Expand All @@ -64,40 +68,41 @@ type Mirror struct {
// This is a long-lived operation, returning only once ctx becomes Done, the copy is completed,
// or an error occurs during the operation.
func (m *Mirror) Run(ctx context.Context) error {
sourceCP, err := m.Src.ReadCheckpoint(ctx)
sourceCP, sourceSize, err := fetchAndParseCP(ctx, m.Source.ReadCheckpoint)
if err != nil {
return fmt.Errorf("failed to fetch initial source checkpoint: %v", err)
return fmt.Errorf("failed to fetch source checkpoint size: %v", err)
}
_, srcSize, _, err := parse.CheckpointUnsafe(sourceCP)
if err != nil {
return fmt.Errorf("invalid CP: %v", err)
_, targetSize, err := fetchAndParseCP(ctx, m.Target.ReadCheckpoint)
if err != nil && !errors.Is(err, os.ErrNotExist) {
return fmt.Errorf("failed to read checkpoint in target: %v", err)
}
log.Printf("Source log size: %d", srcSize)

m.totalResources = (srcSize/layout.TileWidth)*3 - 1
delta := sourceSize - targetSize
stride := delta / uint64(m.NumWorkers)
if r := stride % layout.TileWidth; r != 0 {
stride += (layout.TileWidth - r)
}

log.Printf("Source log size: %d, target log size: %d, ∆ %d, stride: %d", sourceSize, targetSize, delta, stride)

if delta == 0 {
return nil
}

m.totalResources = calcNumResources(sourceSize, targetSize, stride)
m.resourcesFetched = atomic.Uint64{}

work := make(chan job, m.NumWorkers)

go func() {
defer close(work)

stride := srcSize / uint64(m.NumWorkers)
if r := stride % layout.TileWidth; r != 0 {
stride += (layout.TileWidth - r)
}
log.Printf("Stride: %d", stride)

for ext, l := srcSize-1, uint64(0); ext > 0; ext, l = uint64(ext>>layout.TileHeight), l+1 {
for from := uint64(0); from < ext; {
N := min(stride, ext-from)
select {
case <-ctx.Done():
return
case work <- job{level: l, from: from, N: N}:
log.Printf("Level %d, [%d, %d) ", l, from, from+N)
}
from = from + N
for j := range jobs(sourceSize, targetSize, stride) {
select {
case <-ctx.Done():
return
case work <- j:
log.Printf("Job: %s", j)
}
}
log.Println("No more work")
Expand All @@ -108,7 +113,7 @@ func (m *Mirror) Run(ctx context.Context) error {
g.Go(func() error {
for j := range work {
log.Printf("Worker %d: working on %s", i, j)
for ri := range layout.Range(j.from, j.N, srcSize>>(j.level*layout.TileHeight)) {
for ri := range layout.Range(j.from, j.N, sourceSize>>(j.level*layout.TileHeight)) {
if err := retry.Do(m.copyTile(ctx, j.level, ri.Index, ri.Partial)); err != nil {
log.Println(err.Error())
return err
Expand All @@ -128,7 +133,7 @@ func (m *Mirror) Run(ctx context.Context) error {
if err := g.Wait(); err != nil {
return fmt.Errorf("failed to migrate static resources: %v", err)
}
return m.Store.WriteCheckpoint(ctx, sourceCP)
return m.Target.WriteCheckpoint(ctx, sourceCP)
}

// Progress returns the total number of resources present in the source log, and the number of resources
Expand All @@ -140,34 +145,32 @@ func (m *Mirror) Progress() (uint64, uint64) {
// copyTile reads a tile from the source log and stores it into the same location in the destination log.
func (m *Mirror) copyTile(ctx context.Context, l, i uint64, p uint8) func() error {
return func() error {
d, err := m.Src.ReadTile(ctx, l, i, p)
d, err := m.Source.ReadTile(ctx, l, i, p)
if err != nil {
log.Println(err.Error())
return err
}
if err := m.Store.WriteTile(ctx, l, i, p, d); err != nil {
if err := m.Target.WriteTile(ctx, l, i, p, d); err != nil {
return err
}
m.resourcesFetched.Add(1)
return nil

}
}

// copyBundle reads an entry bundle from the source log and stores it into the same location in the destination log.
func (m *Mirror) copyBundle(ctx context.Context, i uint64, p uint8) func() error {
return func() error {
d, err := m.Src.ReadEntryBundle(ctx, i, p)
d, err := m.Source.ReadEntryBundle(ctx, i, p)
if err != nil {
log.Println(err.Error())
return err
}
if err := m.Store.WriteEntryBundle(ctx, i, p, d); err != nil {
if err := m.Target.WriteEntryBundle(ctx, i, p, d); err != nil {
return err
}
m.resourcesFetched.Add(1)
return nil

}
}

Expand All @@ -178,3 +181,49 @@ type job struct {
func (j job) String() string {
return fmt.Sprintf("Level: %d, Range: [%d, %d)", j.level, j.from, j.from+j.N)
}

func jobs(srcSize, targetSize, stride uint64) iter.Seq[job] {
return func(yield func(job) bool) {
for start, ext, l := targetSize, srcSize, uint64(0); ext > 0; start, ext, l = start>>layout.TileHeight, uint64(ext>>layout.TileHeight), l+1 {
for from := start; from < ext; {
N := stride
// If we're starting from a partial tile, then just fetch the remainder first so we're tile-aligned from then on.
if r := from % layout.TileWidth; r != 0 {
N = layout.TileWidth - r
}
N = min(N, ext-from)
if !yield(job{level: l, from: from, N: N}) {
return
}
from = from + N
}
}

}
}

// calcNumResources calculates the number of new static resources which need to be mirrored, given the
// size of the source and target.
func calcNumResources(srcSize, targetSize, stride uint64) uint64 {
leafBundles := uint64(0)
tiles := uint64(0)

for j := range jobs(srcSize, targetSize, stride) {
nTiles := (j.N + layout.TileWidth - 1) / layout.TileWidth
tiles += nTiles
if j.level == 0 {
leafBundles += nTiles
}
}

return leafBundles + tiles
}

func fetchAndParseCP(ctx context.Context, f func(context.Context) ([]byte, error)) ([]byte, uint64, error) {
cp, err := f(ctx)
if err != nil {
return nil, 0, err
}
_, size, _, err := parse.CheckpointUnsafe(cp)
return cp, size, err
}
34 changes: 24 additions & 10 deletions cmd/experimental/mirror/posix/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ var (

func main() {
klog.InitFlags(nil)
klog.CopyStandardLogTo("INFO")
flag.Parse()
ctx := context.Background()

Expand All @@ -52,8 +53,8 @@ func main() {

m := &mirror.Mirror{
NumWorkers: *numWorkers,
Src: src,
Store: &posixStore{root: *storageDir},
Source: src,
Target: &posixTarget{root: *storageDir},
}

// Print out stats.
Expand All @@ -64,9 +65,7 @@ func main() {
case <-ctx.Done():
return
case <-t.C:
total, done := m.Progress()
p := float64(done*100) / float64(total)
klog.Infof("Progress %d of %d resources (%0.2f%%)", done, total, p)
printProgress(m.Progress)
}
}
}()
Expand All @@ -75,26 +74,41 @@ func main() {
klog.Exitf("Failed to mirror log: %v", err)
}

printProgress(m.Progress)
klog.Info("Log mirrored successfully.")
}

type posixStore struct {
func printProgress(f func() (uint64, uint64)) {
total, done := f()
p := float64(done*100) / float64(total)
// Let's just say we're 100% done if we've completed no work when nothing needed doing.
if total == done && done == 0 {
p = 100.0
}
klog.Infof("Progress: %d of %d resources (%0.2f%%)", done, total, p)
}

type posixTarget struct {
root string
}

func (s *posixStore) WriteCheckpoint(_ context.Context, d []byte) error {
func (s *posixTarget) ReadCheckpoint(_ context.Context) ([]byte, error) {
return os.ReadFile(filepath.Join(s.root, layout.CheckpointPath))
}

func (s *posixTarget) WriteCheckpoint(_ context.Context, d []byte) error {
return s.store(layout.CheckpointPath, d)
}

func (s *posixStore) WriteTile(_ context.Context, l, i uint64, p uint8, d []byte) error {
func (s *posixTarget) WriteTile(_ context.Context, l, i uint64, p uint8, d []byte) error {
return s.store(layout.TilePath(l, i, p), d)
}

func (s *posixStore) WriteEntryBundle(_ context.Context, i uint64, p uint8, d []byte) error {
func (s *posixTarget) WriteEntryBundle(_ context.Context, i uint64, p uint8, d []byte) error {
return s.store(layout.EntriesPath(i, p), d)
}

func (s *posixStore) store(p string, d []byte) (err error) {
func (s *posixTarget) store(p string, d []byte) (err error) {
fp := filepath.Join(s.root, p)
if err := os.MkdirAll(filepath.Dir(fp), 0o755); err != nil {
return err
Expand Down
Loading