Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 65 additions & 1 deletion util/contentutil/pusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,34 @@ package contentutil

import (
"context"
"runtime"
"sync"
"time"

"github.com/containerd/containerd/content"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/remotes"
digest "github.com/opencontainers/go-digest"
"github.com/pkg/errors"
)

func FromPusher(p remotes.Pusher) content.Ingester {
var mu sync.Mutex
c := sync.NewCond(&mu)
return &pushingIngester{
p: p,
mu: &mu,
c: c,
p: p,
active: map[digest.Digest]struct{}{},
}
}

type pushingIngester struct {
p remotes.Pusher

mu *sync.Mutex
c *sync.Cond
active map[digest.Digest]struct{}
}

// Writer implements content.Ingester. desc.MediaType must be set for manifest blobs.
Expand All @@ -30,20 +43,55 @@ func (i *pushingIngester) Writer(ctx context.Context, opts ...content.WriterOpt)
if wOpts.Ref == "" {
return nil, errors.Wrap(errdefs.ErrInvalidArgument, "ref must not be empty")
}

st := time.Now()

i.mu.Lock()
for {
if time.Since(st) > time.Hour {
i.mu.Unlock()
return nil, errors.Wrapf(errdefs.ErrUnavailable, "ref %v locked", wOpts.Desc.Digest)
}
if _, ok := i.active[wOpts.Desc.Digest]; ok {
i.c.Wait()
} else {
break
}
}

i.active[wOpts.Desc.Digest] = struct{}{}
i.mu.Unlock()

var once sync.Once
release := func() {
once.Do(func() {
i.mu.Lock()
delete(i.active, wOpts.Desc.Digest)
i.c.Broadcast()
i.mu.Unlock()
})
}

// pusher requires desc.MediaType to determine the PUT URL, especially for manifest blobs.
contentWriter, err := i.p.Push(ctx, wOpts.Desc)
if err != nil {
release()
return nil, err
}
runtime.SetFinalizer(contentWriter, func(_ content.Writer) {
release()
})
return &writer{
Writer: contentWriter,
contentWriterRef: wOpts.Ref,
release: release,
}, nil
}

type writer struct {
content.Writer // returned from pusher.Push
contentWriterRef string // ref passed for Writer()
release func()
}

func (w *writer) Status() (content.Status, error) {
Expand All @@ -56,3 +104,19 @@ func (w *writer) Status() (content.Status, error) {
}
return st, nil
}

func (w *writer) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error {
err := w.Writer.Commit(ctx, size, expected, opts...)
if w.release != nil {
w.release()
}
return err
}

func (w *writer) Close() error {
err := w.Writer.Close()
if w.release != nil {
w.release()
}
return err
}
37 changes: 36 additions & 1 deletion util/push/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/containerd/containerd/remotes/docker"
"github.com/docker/distribution/reference"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/util/flightcontrol"
"github.com/moby/buildkit/util/imageutil"
"github.com/moby/buildkit/util/progress"
"github.com/moby/buildkit/util/resolver"
Expand Down Expand Up @@ -73,7 +74,7 @@ func Push(ctx context.Context, sm *session.Manager, cs content.Store, dgst diges
handlers := append([]images.Handler{},
images.HandlerFunc(annotateDistributionSourceHandler(cs, childrenHandler(cs))),
filterHandler,
pushUpdateSourceHandler,
dedupeHandler(pushUpdateSourceHandler),
)

ra, err := cs.ReaderAt(ctx, desc)
Expand Down Expand Up @@ -248,3 +249,37 @@ func updateDistributionSourceHandler(cs content.Store, pushF images.HandlerFunc,
return children, nil
}), nil
}

func dedupeHandler(h images.HandlerFunc) images.HandlerFunc {
var g flightcontrol.Group
res := map[digest.Digest][]ocispec.Descriptor{}
var mu sync.Mutex

return images.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
res, err := g.Do(ctx, desc.Digest.String(), func(ctx context.Context) (interface{}, error) {
mu.Lock()
if r, ok := res[desc.Digest]; ok {
mu.Unlock()
return r, nil
}
mu.Unlock()

children, err := h(ctx, desc)
if err != nil {
return nil, err
}

mu.Lock()
res[desc.Digest] = children
mu.Unlock()
return children, nil
})
if err != nil {
return nil, err
}
if res == nil {
return nil, nil
}
return res.([]ocispec.Descriptor), nil
})
}