diff --git a/client/client_test.go b/client/client_test.go index edfd29e5b8cd..8dc0ef0cb706 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -246,6 +246,9 @@ var allTests = []func(t *testing.T, sb integration.Sandbox){ testHTTPResolveSourceMetadata, testHTTPPruneAfterCacheKey, testHTTPPruneAfterResolveMeta, + testHTTPResolveMetaReuse, + testHTTPResolveMultiBuild, + testGitResolveMutatedSource, } func TestIntegration(t *testing.T) { @@ -12270,6 +12273,265 @@ func testHTTPPruneAfterResolveMeta(t *testing.T, sb integration.Sandbox) { checkAllReleasable(t, c, sb, false) } +func testHTTPResolveMetaReuse(t *testing.T, sb integration.Sandbox) { + // the difference with testHTTPPruneAfterResolveMeta is that here we change content with the etag on the server + // but because the URL was already resolved once, the new content should not be seen + ctx := sb.Context() + c, err := New(ctx, sb.Address()) + require.NoError(t, err) + defer c.Close() + + resp := &httpserver.Response{ + Etag: identity.NewID(), + Content: []byte("content1"), + } + server := httpserver.NewTestServer(map[string]*httpserver.Response{ + "/foo": resp, + }) + defer server.Close() + + dest := t.TempDir() + _, err = c.Build(ctx, SolveOpt{ + Exports: []ExportEntry{ + { + Type: ExporterLocal, + OutputDir: dest, + }, + }, + }, "test", func(ctx context.Context, gc gateway.Client) (*gateway.Result, error) { + id := server.URL + "/foo" + md, err := gc.ResolveSourceMetadata(ctx, &pb.SourceOp{ + Identifier: id, + }, sourceresolver.Opt{}) + if err != nil { + return nil, err + } + require.NotNil(t, md.HTTP) + + resp.Etag = identity.NewID() + resp.Content = []byte("content2") // etag changed so new content would be returned if re-resolving + + st := llb.Scratch().File(llb.Copy(llb.HTTP(id), "foo", "bar")) + def, err := st.Marshal(sb.Context()) + if err != nil { + return nil, err + } + return gc.Solve(ctx, gateway.SolveRequest{ + Definition: def.ToPB(), + }) + }, nil) + require.NoError(t, err) + + dt, err := os.ReadFile(filepath.Join(dest, "bar")) + require.NoError(t, err) + require.Equal(t, "content1", string(dt)) +} + +// testHTTPResolveMultiBuild is a negative test for testHTTPResolveMetaReuse to ensure that +// URLs are resolved in between separate builds +func testHTTPResolveMultiBuild(t *testing.T, sb integration.Sandbox) { + ctx := sb.Context() + c, err := New(ctx, sb.Address()) + require.NoError(t, err) + defer c.Close() + + resp := &httpserver.Response{ + Etag: identity.NewID(), + Content: []byte("content1"), + } + server := httpserver.NewTestServer(map[string]*httpserver.Response{ + "/foo": resp, + }) + defer server.Close() + + dest := t.TempDir() + _, err = c.Build(ctx, SolveOpt{ + Exports: []ExportEntry{ + { + Type: ExporterLocal, + OutputDir: dest, + }, + }, + }, "test", func(ctx context.Context, gc gateway.Client) (*gateway.Result, error) { + id := server.URL + "/foo" + md, err := gc.ResolveSourceMetadata(ctx, &pb.SourceOp{ + Identifier: id, + }, sourceresolver.Opt{}) + if err != nil { + return nil, err + } + require.NotNil(t, md.HTTP) + require.Equal(t, digest.FromBytes(resp.Content), md.HTTP.Digest) + + st := llb.Scratch().File(llb.Copy(llb.HTTP(id), "foo", "bar")) + def, err := st.Marshal(sb.Context()) + if err != nil { + return nil, err + } + return gc.Solve(ctx, gateway.SolveRequest{ + Definition: def.ToPB(), + }) + }, nil) + require.NoError(t, err) + + dt, err := os.ReadFile(filepath.Join(dest, "bar")) + require.NoError(t, err) + require.Equal(t, "content1", string(dt)) + + resp.Etag = identity.NewID() + resp.Content = []byte("content2") + + _, err = c.Build(ctx, SolveOpt{ + Exports: []ExportEntry{ + { + Type: ExporterLocal, + OutputDir: dest, + }, + }, + }, "test", func(ctx context.Context, gc gateway.Client) (*gateway.Result, error) { + id := server.URL + "/foo" + md, err := gc.ResolveSourceMetadata(ctx, &pb.SourceOp{ + Identifier: id, + }, sourceresolver.Opt{}) + if err != nil { + return nil, err + } + require.NotNil(t, md.HTTP) + require.Equal(t, digest.FromBytes(resp.Content), md.HTTP.Digest) + st := llb.Scratch().File(llb.Copy(llb.HTTP(id), "foo", "bar")) + def, err := st.Marshal(sb.Context()) + if err != nil { + return nil, err + } + return gc.Solve(ctx, gateway.SolveRequest{ + Definition: def.ToPB(), + }) + }, nil) + require.NoError(t, err) + + dt, err = os.ReadFile(filepath.Join(dest, "bar")) + require.NoError(t, err) + require.Equal(t, "content2", string(dt)) +} + +func testGitResolveMutatedSource(t *testing.T, sb integration.Sandbox) { + integration.SkipOnPlatform(t, "windows") + ctx := sb.Context() + c, err := New(ctx, sb.Address()) + require.NoError(t, err) + defer c.Close() + + gitDir := t.TempDir() + gitCommands := []string{ + "git init", + "git config --local user.email test", + "git config --local user.name test", + "echo a > a", + "git add a", + "git commit -m a", + "git tag -a v0.1 -m v0.1", + "echo b > b", + "git add b", + "git commit -m b", + "git checkout -B v2", + "git update-server-info", + } + err = runInDir(gitDir, gitCommands...) + require.NoError(t, err) + + // cmd := exec.Command("git", "rev-parse", "HEAD") + // cmd.Dir = gitDir + // out, err := cmd.Output() + // require.NoError(t, err) + // commitHEAD := strings.TrimSpace(string(out)) + + cmd := exec.Command("git", "rev-parse", "v0.1") + cmd.Dir = gitDir + out, err := cmd.Output() + require.NoError(t, err) + commitTag := strings.TrimSpace(string(out)) + + cmd = exec.Command("git", "rev-parse", "v0.1^{commit}") + cmd.Dir = gitDir + out, err = cmd.Output() + require.NoError(t, err) + commitTagCommit := strings.TrimSpace(string(out)) + + server := httptest.NewServer(http.FileServer(http.Dir(filepath.Clean(gitDir)))) + defer server.Close() + + dest := t.TempDir() + + _, err = c.Build(ctx, SolveOpt{ + Exports: []ExportEntry{ + { + Type: ExporterLocal, + OutputDir: dest, + }, + }, + }, "test", func(ctx context.Context, c gateway.Client) (*gateway.Result, error) { + id := "git://" + strings.TrimPrefix(server.URL, "http://") + "/.git#v0.1" + md, err := c.ResolveSourceMetadata(ctx, &pb.SourceOp{ + Identifier: id, + Attrs: map[string]string{ + "git.fullurl": server.URL + "/.git", + }, + }, sourceresolver.Opt{}) + if err != nil { + return nil, err + } + require.NotNil(t, md.Git) + require.Equal(t, "refs/tags/v0.1", md.Git.Ref) + require.Equal(t, commitTag, md.Git.Checksum) + require.Equal(t, commitTagCommit, md.Git.CommitChecksum) + require.Equal(t, id, md.Op.Identifier) + require.Equal(t, server.URL+"/.git", md.Op.Attrs["git.fullurl"]) + + // update the tag to point to a different commit + err = runInDir(gitDir, []string{ + "git tag -f v0.1", + "git update-server-info", + }...) + require.NoError(t, err) + + md, err = c.ResolveSourceMetadata(ctx, &pb.SourceOp{ + Identifier: id, + Attrs: map[string]string{ + "git.fullurl": server.URL + "/.git", + }, + }, sourceresolver.Opt{}) + if err != nil { + return nil, err + } + require.NotNil(t, md.Git) + require.Equal(t, "refs/tags/v0.1", md.Git.Ref) + require.Equal(t, commitTag, md.Git.Checksum) + require.Equal(t, commitTagCommit, md.Git.CommitChecksum) + require.Equal(t, id, md.Op.Identifier) + require.Equal(t, server.URL+"/.git", md.Op.Attrs["git.fullurl"]) + + st := llb.Git(server.URL+"/.git", "", llb.GitRef("v0.1")) + def, err := st.Marshal(sb.Context()) + if err != nil { + return nil, err + } + return c.Solve(ctx, gateway.SolveRequest{ + Definition: def.ToPB(), + }) + }, nil) + require.NoError(t, err) + + _, err = os.ReadFile(filepath.Join(dest, "b")) + require.Error(t, err) + require.True(t, os.IsNotExist(err), "expected file b to not exist") + + dt, err := os.ReadFile(filepath.Join(dest, "a")) + require.NoError(t, err) + require.Equal(t, "a\n", string(dt)) + + checkAllReleasable(t, c, sb, false) +} + func runInDir(dir string, cmds ...string) error { for _, args := range cmds { var cmd *exec.Cmd diff --git a/solver/jobs.go b/solver/jobs.go index a49983a360e9..b9f719becef1 100644 --- a/solver/jobs.go +++ b/solver/jobs.go @@ -85,6 +85,21 @@ func (s *state) Cleanup(fn func() error) error { return nil } +func (s *state) ResolverCache() ResolverCache { + return s +} + +func (s *state) Lock(key any) (values []any, release func(any) error, err error) { + var rcs []ResolverCache + s.mu.Lock() + for j := range s.jobs { + rcs = append(rcs, j.resolverCache) + } + s.mu.Unlock() + + return combinedResolverCache(rcs).Lock(key) +} + func (s *state) SessionIterator() session.Iterator { return s.sessionIterator() } @@ -329,6 +344,7 @@ type Job struct { startedTime time.Time completedTime time.Time releasers []func() error + resolverCache *resolverCache progressCloser func(error) SessionID string @@ -645,6 +661,7 @@ func (jl *Solver) NewJob(id string) (*Job, error) { id: id, startedTime: time.Now(), uniqueID: identity.NewID(), + resolverCache: newResolverCache(), } jl.jobs[id] = j @@ -862,6 +879,10 @@ func (j *Job) Cleanup(fn func() error) error { return nil } +func (j *Job) ResolverCache() ResolverCache { + return j.resolverCache +} + func (j *Job) SetValue(key string, v any) { j.values.Store(key, v) } diff --git a/solver/llbsolver/ops/exec_test.go b/solver/llbsolver/ops/exec_test.go index ef9def366901..5271fdb700f0 100644 --- a/solver/llbsolver/ops/exec_test.go +++ b/solver/llbsolver/ops/exec_test.go @@ -307,3 +307,7 @@ func (j *jobCtx) Session() session.Group { func (j *jobCtx) Cleanup(f func() error) error { return errors.Errorf("cleanup not implemented for %T", j) } + +func (j *jobCtx) ResolverCache() solver.ResolverCache { + return nil +} diff --git a/solver/resolvercache.go b/solver/resolvercache.go new file mode 100644 index 000000000000..54fc4f9555df --- /dev/null +++ b/solver/resolvercache.go @@ -0,0 +1,156 @@ +package solver + +import ( + "slices" + "sync" +) + +type resolverCache struct { + mu sync.Mutex + locks map[any]*entry +} + +var _ ResolverCache = &resolverCache{} + +type entry struct { + waiting []chan struct{} + values []any + locked bool +} + +func newResolverCache() *resolverCache { + return &resolverCache{locks: make(map[any]*entry)} +} + +func (r *resolverCache) Lock(key any) (values []any, release func(any) error, err error) { + r.mu.Lock() + e, ok := r.locks[key] + if !ok { + e = &entry{} + r.locks[key] = e + } + if !e.locked { + e.locked = true + values = slices.Clone(e.values) + r.mu.Unlock() + return values, func(v any) error { + r.mu.Lock() + defer r.mu.Unlock() + if v != nil { + e.values = append(e.values, v) + } + for _, ch := range e.waiting { + close(ch) + } + e.waiting = nil + e.locked = false + if len(e.values) == 0 { + delete(r.locks, key) + } + return nil + }, nil + } + + ch := make(chan struct{}) + e.waiting = append(e.waiting, ch) + r.mu.Unlock() + + <-ch // wait for unlock + + r.mu.Lock() + defer r.mu.Unlock() + e2, ok := r.locks[key] + if !ok { + return nil, nil, nil // key deleted + } + values = slices.Clone(e2.values) + if e2.locked { + // shouldn't happen, but protect against logic errors + return values, func(any) error { return nil }, nil + } + e2.locked = true + return values, func(v any) error { + r.mu.Lock() + defer r.mu.Unlock() + if v != nil { + e2.values = append(e2.values, v) + } + for _, ch := range e2.waiting { + close(ch) + } + e2.waiting = nil + e2.locked = false + if len(e2.values) == 0 { + delete(r.locks, key) + } + return nil + }, nil +} + +// combinedResolverCache returns a ResolverCache that wraps multiple caches. +// Lock() calls each underlying cache in parallel, merges their values, and +// returns a combined release that releases all sublocks. +func combinedResolverCache(rcs []ResolverCache) ResolverCache { + return &combinedCache{rcs: rcs} +} + +type combinedCache struct { + rcs []ResolverCache +} + +func (c *combinedCache) Lock(key any) (values []any, release func(any) error, err error) { + if len(c.rcs) == 0 { + return nil, func(any) error { return nil }, nil + } + + var ( + mu sync.Mutex + wg sync.WaitGroup + valuesAll []any + releasers []func(any) error + firstErr error + ) + + wg.Add(len(c.rcs)) + for _, rc := range c.rcs { + go func(rc ResolverCache) { + defer wg.Done() + vals, rel, e := rc.Lock(key) + if e != nil { + mu.Lock() + if firstErr == nil { + firstErr = e + } + mu.Unlock() + return + } + + mu.Lock() + valuesAll = append(valuesAll, vals...) + releasers = append(releasers, rel) + mu.Unlock() + }(rc) + } + + wg.Wait() + + if firstErr != nil { + // rollback all acquired locks + for _, r := range releasers { + _ = r(nil) + } + return nil, nil, firstErr + } + + release = func(v any) error { + var errOnce error + for _, r := range releasers { + if e := r(v); e != nil && errOnce == nil { + errOnce = e + } + } + return errOnce + } + + return valuesAll, release, nil +} diff --git a/solver/resolvercache_test.go b/solver/resolvercache_test.go new file mode 100644 index 000000000000..b8e3864bafc2 --- /dev/null +++ b/solver/resolvercache_test.go @@ -0,0 +1,253 @@ +package solver + +import ( + "sync" + "testing" + "time" + + "github.com/pkg/errors" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestResolverCache_SerialAccess(t *testing.T) { + rc := newResolverCache() + + // First lock should succeed immediately + values, release, err := rc.Lock("key1") + require.NoError(t, err) + assert.Empty(t, values) + + // Add a value and release + err = release("val1") + require.NoError(t, err) + + // Next lock should see accumulated value + values, release, err = rc.Lock("key1") + require.NoError(t, err) + assert.Equal(t, []any{"val1"}, values) + + // Add another value + _ = release("val2") + + // Lock again should return both values + values, release, err = rc.Lock("key1") + require.NoError(t, err) + assert.Equal(t, []any{"val1", "val2"}, values) + _ = release(nil) +} + +func TestResolverCache_ConcurrentWaiters(t *testing.T) { + rc := newResolverCache() + + var wg sync.WaitGroup + results := make(chan []any, 2) + + // First goroutine acquires lock + values, release, err := rc.Lock("shared") + require.NoError(t, err) + assert.Empty(t, values) + + // Two goroutines that will wait + for range 2 { + wg.Add(1) + go func() { + defer wg.Done() + v, _, err := rc.Lock("shared") + results <- v + assert.NoError(t, err) + }() + } + + select { + case <-results: + t.Fatal("expected goroutines to be waiting, but got result") + case <-time.After(100 * time.Millisecond): + } + + // Release with a value + err = release("done") + require.NoError(t, err) + + // Wait for both goroutines + wg.Wait() + close(results) + + for v := range results { + assert.Equal(t, []any{"done"}, v) + } +} + +func TestResolverCache_MultipleIndependentKeys(t *testing.T) { + rc := newResolverCache() + + v1, r1, err := rc.Lock("a") + require.NoError(t, err) + v2, r2, err := rc.Lock("b") + require.NoError(t, err) + + assert.Empty(t, v1) + assert.Empty(t, v2) + + require.NoError(t, r1("x")) + require.NoError(t, r2("y")) + + v1, _, err = rc.Lock("a") + require.NoError(t, err) + assert.Equal(t, []any{"x"}, v1) + + v2, _, err = rc.Lock("b") + require.NoError(t, err) + assert.Equal(t, []any{"y"}, v2) +} + +func TestResolverCache_ReleaseNilDoesNotAdd(t *testing.T) { + rc := newResolverCache() + + v, r, err := rc.Lock("niltest") + require.NoError(t, err) + assert.Empty(t, v) + + require.NoError(t, r(nil)) + + v, _, err = rc.Lock("niltest") + require.NoError(t, err) + assert.Empty(t, v) +} + +func TestResolverCache_SequentialLocks(t *testing.T) { + rc := newResolverCache() + + for i := range 3 { + v, r, err := rc.Lock("seq") + require.NoError(t, err) + assert.Len(t, v, i) + require.NoError(t, r(i)) + } + + v, _, err := rc.Lock("seq") + require.NoError(t, err) + assert.ElementsMatch(t, []any{0, 1, 2}, v) +} + +// mockResolverCache implements ResolverCache for testing. +type mockResolverCache struct { + lockFn func(key any) ([]any, func(any) error, error) + lockCalls int + mu sync.Mutex +} + +func (m *mockResolverCache) Lock(key any) ([]any, func(any) error, error) { + m.mu.Lock() + m.lockCalls++ + m.mu.Unlock() + if m.lockFn != nil { + return m.lockFn(key) + } + return nil, func(any) error { return nil }, nil +} + +func TestCombinedResolverCache_BasicMerge(t *testing.T) { + rc1 := &mockResolverCache{ + lockFn: func(key any) ([]any, func(any) error, error) { + return []any{"a1", "a2"}, func(v any) error { + if v != nil { + assert.Equal(t, "merged", v) + } + return nil + }, nil + }, + } + rc2 := &mockResolverCache{ + lockFn: func(key any) ([]any, func(any) error, error) { + return []any{"b1"}, func(v any) error { + if v != nil { + assert.Equal(t, "merged", v) + } + return nil + }, nil + }, + } + + combined := combinedResolverCache([]ResolverCache{rc1, rc2}) + values, release, err := combined.Lock("key") + + require.NoError(t, err) + assert.ElementsMatch(t, []any{"a1", "a2", "b1"}, values) + + err = release("merged") + require.NoError(t, err) + + assert.Equal(t, 1, rc1.lockCalls) + assert.Equal(t, 1, rc2.lockCalls) +} + +func TestCombinedResolverCache_EmptyInput(t *testing.T) { + combined := combinedResolverCache(nil) + values, release, err := combined.Lock("any") + require.NoError(t, err) + assert.Nil(t, values) + require.NoError(t, release("whatever")) +} + +func TestCombinedResolverCache_ErrorHandlingAndRollback(t *testing.T) { + var released []string + var mu sync.Mutex + + rc1 := &mockResolverCache{ + lockFn: func(key any) ([]any, func(any) error, error) { + return []any{"x"}, func(v any) error { + mu.Lock() + released = append(released, "rc1") + mu.Unlock() + return nil + }, nil + }, + } + + rc2 := &mockResolverCache{ + lockFn: func(key any) ([]any, func(any) error, error) { + return nil, nil, errors.New("rc2 failed") + }, + } + + combined := combinedResolverCache([]ResolverCache{rc1, rc2}) + values, release, err := combined.Lock("key") + + assert.Nil(t, values) + assert.Nil(t, release) + require.EqualError(t, err, "rc2 failed") + + mu.Lock() + assert.Contains(t, released, "rc1", "should rollback acquired locks") + mu.Unlock() +} + +func TestCombinedResolverCache_ParallelReleaseErrorPropagation(t *testing.T) { + var count int + rc1 := &mockResolverCache{ + lockFn: func(key any) ([]any, func(any) error, error) { + return []any{"v1"}, func(v any) error { + count++ + return errors.New("rc1 release failed") + }, nil + }, + } + rc2 := &mockResolverCache{ + lockFn: func(key any) ([]any, func(any) error, error) { + return []any{"v2"}, func(v any) error { + count++ + return nil + }, nil + }, + } + + combined := combinedResolverCache([]ResolverCache{rc1, rc2}) + values, release, err := combined.Lock("k") + require.NoError(t, err) + assert.ElementsMatch(t, []any{"v1", "v2"}, values) + + e := release("data") + require.EqualError(t, e, "rc1 release failed") + assert.Equal(t, 2, count, "both releases must be called") +} diff --git a/solver/types.go b/solver/types.go index 5f9ae71dde48..4f1d0cd2fdfa 100644 --- a/solver/types.go +++ b/solver/types.go @@ -181,6 +181,17 @@ type JobContext interface { // Cleanup adds a function that is called when the job is done. This can be used to associate // resources with the job and keep them from being released until the job is done. Cleanup(func() error) error + // ResolverCache returns object for memorizing/synchronizing remote resolving decisions during the job. + // Steps from same build job will share the same resolver cache. + ResolverCache() ResolverCache +} + +type ResolverCache interface { + // Lock locks a key until the returned release function is called. + // Release function can return value that will be returned to next callers. + // Lock can return multiple values because two steps can be merged once + // both have independently completed their resolution. + Lock(key any) (values []any, release func(any) error, err error) } type ProvenanceProvider interface { diff --git a/source/git/source.go b/source/git/source.go index 6c95e09b847b..6bd4a662e693 100644 --- a/source/git/source.go +++ b/source/git/source.go @@ -384,7 +384,11 @@ func (gs *gitSourceHandler) mountKnownHosts() (string, func() error, error) { return knownHosts.Name(), cleanup, nil } -func (gs *gitSourceHandler) resolveMetadata(ctx context.Context, jobCtx solver.JobContext) (*Metadata, error) { +func (gs *gitSourceHandler) remoteKey() string { + return gs.src.Remote + "#" + gs.src.Ref +} + +func (gs *gitSourceHandler) resolveMetadata(ctx context.Context, jobCtx solver.JobContext) (md *Metadata, retErr error) { remote := gs.src.Remote gs.locker.Lock(remote) defer gs.locker.Unlock(remote) @@ -397,6 +401,9 @@ func (gs *gitSourceHandler) resolveMetadata(ctx context.Context, jobCtx solver.J } if gitutil.IsCommitSHA(gs.src.Ref) { + if gs.src.Checksum != "" && !strings.HasPrefix(gs.src.Ref, gs.src.Checksum) { + return nil, errors.Errorf("expected checksum to match %s, got %s", gs.src.Checksum, gs.src.Ref) + } return &Metadata{ Ref: gs.src.Ref, Checksum: gs.src.Ref, @@ -406,6 +413,35 @@ func (gs *gitSourceHandler) resolveMetadata(ctx context.Context, jobCtx solver.J var g session.Group if jobCtx != nil { g = jobCtx.Session() + + if rc := jobCtx.ResolverCache(); rc != nil { + values, release, err := rc.Lock(gs.remoteKey()) + if err != nil { + return nil, err + } + saveResolved := true + defer func() { + v := md + if retErr != nil || !saveResolved { + v = nil + } + if err := release(v); err != nil { + bklog.G(ctx).Warnf("failed to release resolver cache lock for %s: %v", gs.remoteKey(), err) + } + }() + for _, v := range values { + v2, ok := v.(*Metadata) + if !ok { + return nil, errors.Errorf("invalid resolver cache value for %s: %T", gs.remoteKey(), v) + } + if gs.src.Checksum != "" && !strings.HasPrefix(v2.Checksum, gs.src.Checksum) { + continue + } + saveResolved = false + clone := *v2 + return &clone, nil + } + } } gs.getAuthToken(ctx, g) @@ -483,7 +519,7 @@ func (gs *gitSourceHandler) resolveMetadata(ctx context.Context, jobCtx solver.J return nil, errors.Errorf("expected checksum to match %s, got %s", gs.src.Checksum, exp) } } - md := &Metadata{ + md = &Metadata{ Ref: usedRef, Checksum: sha, } diff --git a/source/http/source.go b/source/http/source.go index efacee966230..2a5e319cd037 100644 --- a/source/http/source.go +++ b/source/http/source.go @@ -139,11 +139,15 @@ type Metadata struct { type httpSourceHandler struct { *Source src HTTPIdentifier - refID string - cacheKey digest.Digest + resolved *metadataWithRef sm *session.Manager } +type metadataWithRef struct { + Metadata + refID string +} + func (hs *Source) ResolveMetadata(ctx context.Context, id *HTTPIdentifier, sm *session.Manager, jobCtx solver.JobContext) (*Metadata, error) { hsh := &httpSourceHandler{ src: *id, @@ -226,7 +230,7 @@ func (hs *httpSourceHandler) formatCacheKey(filename string, dgst digest.Digest, return dgst } -func (hs *httpSourceHandler) resolveMetadata(ctx context.Context, jobCtx solver.JobContext) (*Metadata, error) { +func (hs *httpSourceHandler) resolveMetadata(ctx context.Context, jobCtx solver.JobContext) (md *Metadata, retErr error) { if hs.src.Checksum != "" { return &Metadata{ Digest: hs.src.Checksum, @@ -244,6 +248,43 @@ func (hs *httpSourceHandler) resolveMetadata(ctx context.Context, jobCtx solver. return nil, err } + if hs.resolved != nil { + return &hs.resolved.Metadata, nil + } + if jobCtx != nil { + if rc := jobCtx.ResolverCache(); rc != nil { + vals, release, err := rc.Lock(uh) + if err != nil { + return nil, err + } + saveResolved := true + defer func() { + ret := hs.resolved + if retErr != nil || !saveResolved { + ret = nil + } + if err := release(ret); err != nil { + bklog.G(ctx).WithError(err).Warn("failed to release resolver cache lock") + } + }() + for _, v := range vals { + v2, ok := v.(*metadataWithRef) + if !ok { + return nil, errors.Errorf("invalid HTTP resolver cache value: %T", v) + } + if hs.src.Checksum != "" && v2.Digest != hs.src.Checksum { + continue + } + hs.resolved = v2 + saveResolved = false + return &hs.resolved.Metadata, nil + } + if hs.src.Checksum != "" && len(vals) > 0 { + return nil, errors.Errorf("digest mismatch for %s: %s (expected: %s)", hs.src.URL, vals[0], hs.src.Checksum) + } + } + } + // look up metadata(previously stored headers) for that URL mds, err := searchHTTPURLDigest(ctx, hs.cache, uh) if err != nil { @@ -311,7 +352,6 @@ func (hs *httpSourceHandler) resolveMetadata(ctx context.Context, jobCtx solver. } md, ok := m[respETag] if ok { - hs.refID = md.ID() dgst := md.getHTTPChecksum() if dgst != "" { var modTime *time.Time @@ -321,11 +361,16 @@ func (hs *httpSourceHandler) resolveMetadata(ctx context.Context, jobCtx solver. } } resp.Body.Close() - return &Metadata{ + m := &Metadata{ Digest: dgst, Filename: getFileName(hs.src.URL, hs.src.Filename, resp), LastModified: modTime, - }, nil + } + hs.resolved = &metadataWithRef{ + Metadata: *m, + refID: md.ID(), + } + return m, nil } } } @@ -358,7 +403,6 @@ func (hs *httpSourceHandler) resolveMetadata(ctx context.Context, jobCtx solver. if !ok { return nil, errors.Errorf("invalid not-modified ETag: %v", respETag) } - hs.refID = md.ID() dgst := md.getHTTPChecksum() if dgst == "" { return nil, errors.Errorf("invalid metadata change") @@ -371,11 +415,16 @@ func (hs *httpSourceHandler) resolveMetadata(ctx context.Context, jobCtx solver. modTime = &t } } - return &Metadata{ + m := &Metadata{ Digest: dgst, Filename: getFileName(hs.src.URL, hs.src.Filename, resp), LastModified: modTime, - }, nil + } + hs.resolved = &metadataWithRef{ + Metadata: *m, + refID: md.ID(), + } + return m, nil } ref, dgst, err := hs.save(ctx, resp, g) @@ -401,11 +450,16 @@ func (hs *httpSourceHandler) resolveMetadata(ctx context.Context, jobCtx solver. } } - return &Metadata{ + out := &Metadata{ Digest: dgst, Filename: getFileName(hs.src.URL, hs.src.Filename, resp), LastModified: modTime, - }, nil + } + hs.resolved = &metadataWithRef{ + Metadata: *out, + refID: ref.ID(), + } + return out, nil } func (hs *httpSourceHandler) CacheKey(ctx context.Context, jobCtx solver.JobContext, index int) (string, string, solver.CacheOpts, bool, error) { @@ -413,7 +467,11 @@ func (hs *httpSourceHandler) CacheKey(ctx context.Context, jobCtx solver.JobCont if err != nil { return "", "", nil, false, err } - hs.cacheKey = md.Digest + if hs.resolved == nil { + hs.resolved = &metadataWithRef{ + Metadata: *md, + } + } return hs.formatCacheKey(md.Filename, md.Digest, md.LastModified).String(), md.Digest.String(), nil, true, nil } @@ -513,7 +571,6 @@ func (hs *httpSourceHandler) save(ctx context.Context, resp *http.Response, s se newRef = nil md := cacheRefMetadata{ref} - hs.refID = ref.ID() dgst = digest.NewDigest(digest.SHA256, h) if respETag := resp.Header.Get("ETag"); respETag != "" { @@ -540,10 +597,43 @@ func (hs *httpSourceHandler) save(ctx context.Context, resp *http.Response, s se } func (hs *httpSourceHandler) Snapshot(ctx context.Context, jobCtx solver.JobContext) (cache.ImmutableRef, error) { - if hs.refID != "" { - ref, err := hs.cache.Get(ctx, hs.refID, nil) + refID := "" + if hs.resolved != nil && hs.resolved.refID != "" { + refID = hs.resolved.refID + } else if jobCtx != nil { + if rc := jobCtx.ResolverCache(); rc != nil { + uh, err := hs.urlHash() + if err != nil { + return nil, err + } + vals, release, err := rc.Lock(uh) + if err != nil { + return nil, err + } + for _, v := range vals { + v2, ok := v.(*metadataWithRef) + if !ok { + return nil, errors.Errorf("invalid HTTP resolver cache value: %T", vals[0]) + } + if hs.src.Checksum != "" && v2.Digest != hs.src.Checksum { + continue + } + if v2.refID != "" { + hs.resolved = v2 + refID = v2.refID + } + } + release(nil) + if hs.src.Checksum != "" && len(vals) > 0 && refID == "" { + return nil, errors.Errorf("digest mismatch for %s: %s (expected: %s)", hs.src.URL, vals[0], hs.src.Checksum) + } + } + } + + if refID != "" { + ref, err := hs.cache.Get(ctx, hs.resolved.refID, nil) if err != nil { - bklog.G(ctx).WithError(err).Warnf("failed to get HTTP snapshot for ref %s (%s)", hs.refID, hs.src.URL) + bklog.G(ctx).WithError(err).Warnf("failed to get HTTP snapshot for ref %s (%s)", hs.resolved.refID, hs.src.URL) } else { return ref, nil } @@ -573,9 +663,9 @@ func (hs *httpSourceHandler) Snapshot(ctx context.Context, jobCtx solver.JobCont if err != nil { return nil, err } - if dgst != hs.cacheKey { + if hs.resolved != nil && dgst != hs.resolved.Digest { ref.Release(context.TODO()) - return nil, errors.Errorf("digest mismatch %s: %s", dgst, hs.cacheKey) + return nil, errors.Errorf("digest mismatch %s: %s", dgst, hs.resolved.Digest) } return ref, nil diff --git a/source/http/source_test.go b/source/http/source_test.go index 21865d4a633e..06873745d89d 100644 --- a/source/http/source_test.go +++ b/source/http/source_test.go @@ -19,6 +19,7 @@ import ( "github.com/moby/buildkit/session" "github.com/moby/buildkit/snapshot" containerdsnapshot "github.com/moby/buildkit/snapshot/containerd" + "github.com/moby/buildkit/solver" "github.com/moby/buildkit/source" "github.com/moby/buildkit/util/leaseutil" "github.com/moby/buildkit/util/testutil/httpserver" @@ -506,3 +507,7 @@ func (s *simpleJobContext) Release() error { s.releasers = nil return firstErr } + +func (s *simpleJobContext) ResolverCache() solver.ResolverCache { + return nil +}