From df4d2aee3d040296794281c6f7fa33d83a5c443b Mon Sep 17 00:00:00 2001 From: Tonis Tiigi Date: Tue, 14 Oct 2025 17:47:29 -0700 Subject: [PATCH 1/2] solver: add ResolverCache support New ResolverCache interface in JobContext allows build jobs to memorize and synchronize accesses to mutable remote resources. This is to make sure that when multiple parts of the same build job, or build job and source metadata resolver access the same remote resources, it remains the same for the duration of the single build request, even if data happens to change on the remote side. Fix such a possible case in the HTTP source. Even if the server now returns completely different data, if the same URL was accessed once for the ongoing build, then the initial contents are always used until the build completes. Signed-off-by: Tonis Tiigi --- client/client_test.go | 143 +++++++++++++++++ solver/jobs.go | 21 +++ solver/llbsolver/ops/exec_test.go | 4 + solver/resolvercache.go | 156 ++++++++++++++++++ solver/resolvercache_test.go | 253 ++++++++++++++++++++++++++++++ solver/types.go | 11 ++ source/http/source.go | 126 ++++++++++++--- source/http/source_test.go | 5 + 8 files changed, 701 insertions(+), 18 deletions(-) create mode 100644 solver/resolvercache.go create mode 100644 solver/resolvercache_test.go diff --git a/client/client_test.go b/client/client_test.go index edfd29e5b8cd..e31bbc4a145e 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -246,6 +246,8 @@ var allTests = []func(t *testing.T, sb integration.Sandbox){ testHTTPResolveSourceMetadata, testHTTPPruneAfterCacheKey, testHTTPPruneAfterResolveMeta, + testHTTPResolveMetaReuse, + testHTTPResolveMultiBuild, } func TestIntegration(t *testing.T) { @@ -12270,6 +12272,147 @@ func testHTTPPruneAfterResolveMeta(t *testing.T, sb integration.Sandbox) { checkAllReleasable(t, c, sb, false) } +func testHTTPResolveMetaReuse(t *testing.T, sb integration.Sandbox) { + // the difference with testHTTPPruneAfterResolveMeta is that here we change content with the etag on the server + // but because the URL was already resolved once, the new content should not be seen + ctx := sb.Context() + c, err := New(ctx, sb.Address()) + require.NoError(t, err) + defer c.Close() + + resp := &httpserver.Response{ + Etag: identity.NewID(), + Content: []byte("content1"), + } + server := httpserver.NewTestServer(map[string]*httpserver.Response{ + "/foo": resp, + }) + defer server.Close() + + dest := t.TempDir() + _, err = c.Build(ctx, SolveOpt{ + Exports: []ExportEntry{ + { + Type: ExporterLocal, + OutputDir: dest, + }, + }, + }, "test", func(ctx context.Context, gc gateway.Client) (*gateway.Result, error) { + id := server.URL + "/foo" + md, err := gc.ResolveSourceMetadata(ctx, &pb.SourceOp{ + Identifier: id, + }, sourceresolver.Opt{}) + if err != nil { + return nil, err + } + require.NotNil(t, md.HTTP) + + resp.Etag = identity.NewID() + resp.Content = []byte("content2") // etag changed so new content would be returned if re-resolving + + st := llb.Scratch().File(llb.Copy(llb.HTTP(id), "foo", "bar")) + def, err := st.Marshal(sb.Context()) + if err != nil { + return nil, err + } + return gc.Solve(ctx, gateway.SolveRequest{ + Definition: def.ToPB(), + }) + }, nil) + require.NoError(t, err) + + dt, err := os.ReadFile(filepath.Join(dest, "bar")) + require.NoError(t, err) + require.Equal(t, "content1", string(dt)) +} + +// testHTTPResolveMultiBuild is a negative test for testHTTPResolveMetaReuse to ensure that +// URLs are resolved in between separate builds +func testHTTPResolveMultiBuild(t *testing.T, sb integration.Sandbox) { + ctx := sb.Context() + c, err := New(ctx, sb.Address()) + require.NoError(t, err) + defer c.Close() + + resp := &httpserver.Response{ + Etag: identity.NewID(), + Content: []byte("content1"), + } + server := httpserver.NewTestServer(map[string]*httpserver.Response{ + "/foo": resp, + }) + defer server.Close() + + dest := t.TempDir() + _, err = c.Build(ctx, SolveOpt{ + Exports: []ExportEntry{ + { + Type: ExporterLocal, + OutputDir: dest, + }, + }, + }, "test", func(ctx context.Context, gc gateway.Client) (*gateway.Result, error) { + id := server.URL + "/foo" + md, err := gc.ResolveSourceMetadata(ctx, &pb.SourceOp{ + Identifier: id, + }, sourceresolver.Opt{}) + if err != nil { + return nil, err + } + require.NotNil(t, md.HTTP) + require.Equal(t, digest.FromBytes(resp.Content), md.HTTP.Digest) + + st := llb.Scratch().File(llb.Copy(llb.HTTP(id), "foo", "bar")) + def, err := st.Marshal(sb.Context()) + if err != nil { + return nil, err + } + return gc.Solve(ctx, gateway.SolveRequest{ + Definition: def.ToPB(), + }) + }, nil) + require.NoError(t, err) + + dt, err := os.ReadFile(filepath.Join(dest, "bar")) + require.NoError(t, err) + require.Equal(t, "content1", string(dt)) + + resp.Etag = identity.NewID() + resp.Content = []byte("content2") + + _, err = c.Build(ctx, SolveOpt{ + Exports: []ExportEntry{ + { + Type: ExporterLocal, + OutputDir: dest, + }, + }, + }, "test", func(ctx context.Context, gc gateway.Client) (*gateway.Result, error) { + id := server.URL + "/foo" + md, err := gc.ResolveSourceMetadata(ctx, &pb.SourceOp{ + Identifier: id, + }, sourceresolver.Opt{}) + if err != nil { + return nil, err + } + require.NotNil(t, md.HTTP) + require.Equal(t, digest.FromBytes(resp.Content), md.HTTP.Digest) + st := llb.Scratch().File(llb.Copy(llb.HTTP(id), "foo", "bar")) + def, err := st.Marshal(sb.Context()) + if err != nil { + return nil, err + } + return gc.Solve(ctx, gateway.SolveRequest{ + Definition: def.ToPB(), + }) + }, nil) + require.NoError(t, err) + + dt, err = os.ReadFile(filepath.Join(dest, "bar")) + require.NoError(t, err) + require.Equal(t, "content2", string(dt)) +} + func runInDir(dir string, cmds ...string) error { for _, args := range cmds { var cmd *exec.Cmd diff --git a/solver/jobs.go b/solver/jobs.go index a49983a360e9..b9f719becef1 100644 --- a/solver/jobs.go +++ b/solver/jobs.go @@ -85,6 +85,21 @@ func (s *state) Cleanup(fn func() error) error { return nil } +func (s *state) ResolverCache() ResolverCache { + return s +} + +func (s *state) Lock(key any) (values []any, release func(any) error, err error) { + var rcs []ResolverCache + s.mu.Lock() + for j := range s.jobs { + rcs = append(rcs, j.resolverCache) + } + s.mu.Unlock() + + return combinedResolverCache(rcs).Lock(key) +} + func (s *state) SessionIterator() session.Iterator { return s.sessionIterator() } @@ -329,6 +344,7 @@ type Job struct { startedTime time.Time completedTime time.Time releasers []func() error + resolverCache *resolverCache progressCloser func(error) SessionID string @@ -645,6 +661,7 @@ func (jl *Solver) NewJob(id string) (*Job, error) { id: id, startedTime: time.Now(), uniqueID: identity.NewID(), + resolverCache: newResolverCache(), } jl.jobs[id] = j @@ -862,6 +879,10 @@ func (j *Job) Cleanup(fn func() error) error { return nil } +func (j *Job) ResolverCache() ResolverCache { + return j.resolverCache +} + func (j *Job) SetValue(key string, v any) { j.values.Store(key, v) } diff --git a/solver/llbsolver/ops/exec_test.go b/solver/llbsolver/ops/exec_test.go index ef9def366901..5271fdb700f0 100644 --- a/solver/llbsolver/ops/exec_test.go +++ b/solver/llbsolver/ops/exec_test.go @@ -307,3 +307,7 @@ func (j *jobCtx) Session() session.Group { func (j *jobCtx) Cleanup(f func() error) error { return errors.Errorf("cleanup not implemented for %T", j) } + +func (j *jobCtx) ResolverCache() solver.ResolverCache { + return nil +} diff --git a/solver/resolvercache.go b/solver/resolvercache.go new file mode 100644 index 000000000000..54fc4f9555df --- /dev/null +++ b/solver/resolvercache.go @@ -0,0 +1,156 @@ +package solver + +import ( + "slices" + "sync" +) + +type resolverCache struct { + mu sync.Mutex + locks map[any]*entry +} + +var _ ResolverCache = &resolverCache{} + +type entry struct { + waiting []chan struct{} + values []any + locked bool +} + +func newResolverCache() *resolverCache { + return &resolverCache{locks: make(map[any]*entry)} +} + +func (r *resolverCache) Lock(key any) (values []any, release func(any) error, err error) { + r.mu.Lock() + e, ok := r.locks[key] + if !ok { + e = &entry{} + r.locks[key] = e + } + if !e.locked { + e.locked = true + values = slices.Clone(e.values) + r.mu.Unlock() + return values, func(v any) error { + r.mu.Lock() + defer r.mu.Unlock() + if v != nil { + e.values = append(e.values, v) + } + for _, ch := range e.waiting { + close(ch) + } + e.waiting = nil + e.locked = false + if len(e.values) == 0 { + delete(r.locks, key) + } + return nil + }, nil + } + + ch := make(chan struct{}) + e.waiting = append(e.waiting, ch) + r.mu.Unlock() + + <-ch // wait for unlock + + r.mu.Lock() + defer r.mu.Unlock() + e2, ok := r.locks[key] + if !ok { + return nil, nil, nil // key deleted + } + values = slices.Clone(e2.values) + if e2.locked { + // shouldn't happen, but protect against logic errors + return values, func(any) error { return nil }, nil + } + e2.locked = true + return values, func(v any) error { + r.mu.Lock() + defer r.mu.Unlock() + if v != nil { + e2.values = append(e2.values, v) + } + for _, ch := range e2.waiting { + close(ch) + } + e2.waiting = nil + e2.locked = false + if len(e2.values) == 0 { + delete(r.locks, key) + } + return nil + }, nil +} + +// combinedResolverCache returns a ResolverCache that wraps multiple caches. +// Lock() calls each underlying cache in parallel, merges their values, and +// returns a combined release that releases all sublocks. +func combinedResolverCache(rcs []ResolverCache) ResolverCache { + return &combinedCache{rcs: rcs} +} + +type combinedCache struct { + rcs []ResolverCache +} + +func (c *combinedCache) Lock(key any) (values []any, release func(any) error, err error) { + if len(c.rcs) == 0 { + return nil, func(any) error { return nil }, nil + } + + var ( + mu sync.Mutex + wg sync.WaitGroup + valuesAll []any + releasers []func(any) error + firstErr error + ) + + wg.Add(len(c.rcs)) + for _, rc := range c.rcs { + go func(rc ResolverCache) { + defer wg.Done() + vals, rel, e := rc.Lock(key) + if e != nil { + mu.Lock() + if firstErr == nil { + firstErr = e + } + mu.Unlock() + return + } + + mu.Lock() + valuesAll = append(valuesAll, vals...) + releasers = append(releasers, rel) + mu.Unlock() + }(rc) + } + + wg.Wait() + + if firstErr != nil { + // rollback all acquired locks + for _, r := range releasers { + _ = r(nil) + } + return nil, nil, firstErr + } + + release = func(v any) error { + var errOnce error + for _, r := range releasers { + if e := r(v); e != nil && errOnce == nil { + errOnce = e + } + } + return errOnce + } + + return valuesAll, release, nil +} diff --git a/solver/resolvercache_test.go b/solver/resolvercache_test.go new file mode 100644 index 000000000000..b8e3864bafc2 --- /dev/null +++ b/solver/resolvercache_test.go @@ -0,0 +1,253 @@ +package solver + +import ( + "sync" + "testing" + "time" + + "github.com/pkg/errors" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestResolverCache_SerialAccess(t *testing.T) { + rc := newResolverCache() + + // First lock should succeed immediately + values, release, err := rc.Lock("key1") + require.NoError(t, err) + assert.Empty(t, values) + + // Add a value and release + err = release("val1") + require.NoError(t, err) + + // Next lock should see accumulated value + values, release, err = rc.Lock("key1") + require.NoError(t, err) + assert.Equal(t, []any{"val1"}, values) + + // Add another value + _ = release("val2") + + // Lock again should return both values + values, release, err = rc.Lock("key1") + require.NoError(t, err) + assert.Equal(t, []any{"val1", "val2"}, values) + _ = release(nil) +} + +func TestResolverCache_ConcurrentWaiters(t *testing.T) { + rc := newResolverCache() + + var wg sync.WaitGroup + results := make(chan []any, 2) + + // First goroutine acquires lock + values, release, err := rc.Lock("shared") + require.NoError(t, err) + assert.Empty(t, values) + + // Two goroutines that will wait + for range 2 { + wg.Add(1) + go func() { + defer wg.Done() + v, _, err := rc.Lock("shared") + results <- v + assert.NoError(t, err) + }() + } + + select { + case <-results: + t.Fatal("expected goroutines to be waiting, but got result") + case <-time.After(100 * time.Millisecond): + } + + // Release with a value + err = release("done") + require.NoError(t, err) + + // Wait for both goroutines + wg.Wait() + close(results) + + for v := range results { + assert.Equal(t, []any{"done"}, v) + } +} + +func TestResolverCache_MultipleIndependentKeys(t *testing.T) { + rc := newResolverCache() + + v1, r1, err := rc.Lock("a") + require.NoError(t, err) + v2, r2, err := rc.Lock("b") + require.NoError(t, err) + + assert.Empty(t, v1) + assert.Empty(t, v2) + + require.NoError(t, r1("x")) + require.NoError(t, r2("y")) + + v1, _, err = rc.Lock("a") + require.NoError(t, err) + assert.Equal(t, []any{"x"}, v1) + + v2, _, err = rc.Lock("b") + require.NoError(t, err) + assert.Equal(t, []any{"y"}, v2) +} + +func TestResolverCache_ReleaseNilDoesNotAdd(t *testing.T) { + rc := newResolverCache() + + v, r, err := rc.Lock("niltest") + require.NoError(t, err) + assert.Empty(t, v) + + require.NoError(t, r(nil)) + + v, _, err = rc.Lock("niltest") + require.NoError(t, err) + assert.Empty(t, v) +} + +func TestResolverCache_SequentialLocks(t *testing.T) { + rc := newResolverCache() + + for i := range 3 { + v, r, err := rc.Lock("seq") + require.NoError(t, err) + assert.Len(t, v, i) + require.NoError(t, r(i)) + } + + v, _, err := rc.Lock("seq") + require.NoError(t, err) + assert.ElementsMatch(t, []any{0, 1, 2}, v) +} + +// mockResolverCache implements ResolverCache for testing. +type mockResolverCache struct { + lockFn func(key any) ([]any, func(any) error, error) + lockCalls int + mu sync.Mutex +} + +func (m *mockResolverCache) Lock(key any) ([]any, func(any) error, error) { + m.mu.Lock() + m.lockCalls++ + m.mu.Unlock() + if m.lockFn != nil { + return m.lockFn(key) + } + return nil, func(any) error { return nil }, nil +} + +func TestCombinedResolverCache_BasicMerge(t *testing.T) { + rc1 := &mockResolverCache{ + lockFn: func(key any) ([]any, func(any) error, error) { + return []any{"a1", "a2"}, func(v any) error { + if v != nil { + assert.Equal(t, "merged", v) + } + return nil + }, nil + }, + } + rc2 := &mockResolverCache{ + lockFn: func(key any) ([]any, func(any) error, error) { + return []any{"b1"}, func(v any) error { + if v != nil { + assert.Equal(t, "merged", v) + } + return nil + }, nil + }, + } + + combined := combinedResolverCache([]ResolverCache{rc1, rc2}) + values, release, err := combined.Lock("key") + + require.NoError(t, err) + assert.ElementsMatch(t, []any{"a1", "a2", "b1"}, values) + + err = release("merged") + require.NoError(t, err) + + assert.Equal(t, 1, rc1.lockCalls) + assert.Equal(t, 1, rc2.lockCalls) +} + +func TestCombinedResolverCache_EmptyInput(t *testing.T) { + combined := combinedResolverCache(nil) + values, release, err := combined.Lock("any") + require.NoError(t, err) + assert.Nil(t, values) + require.NoError(t, release("whatever")) +} + +func TestCombinedResolverCache_ErrorHandlingAndRollback(t *testing.T) { + var released []string + var mu sync.Mutex + + rc1 := &mockResolverCache{ + lockFn: func(key any) ([]any, func(any) error, error) { + return []any{"x"}, func(v any) error { + mu.Lock() + released = append(released, "rc1") + mu.Unlock() + return nil + }, nil + }, + } + + rc2 := &mockResolverCache{ + lockFn: func(key any) ([]any, func(any) error, error) { + return nil, nil, errors.New("rc2 failed") + }, + } + + combined := combinedResolverCache([]ResolverCache{rc1, rc2}) + values, release, err := combined.Lock("key") + + assert.Nil(t, values) + assert.Nil(t, release) + require.EqualError(t, err, "rc2 failed") + + mu.Lock() + assert.Contains(t, released, "rc1", "should rollback acquired locks") + mu.Unlock() +} + +func TestCombinedResolverCache_ParallelReleaseErrorPropagation(t *testing.T) { + var count int + rc1 := &mockResolverCache{ + lockFn: func(key any) ([]any, func(any) error, error) { + return []any{"v1"}, func(v any) error { + count++ + return errors.New("rc1 release failed") + }, nil + }, + } + rc2 := &mockResolverCache{ + lockFn: func(key any) ([]any, func(any) error, error) { + return []any{"v2"}, func(v any) error { + count++ + return nil + }, nil + }, + } + + combined := combinedResolverCache([]ResolverCache{rc1, rc2}) + values, release, err := combined.Lock("k") + require.NoError(t, err) + assert.ElementsMatch(t, []any{"v1", "v2"}, values) + + e := release("data") + require.EqualError(t, e, "rc1 release failed") + assert.Equal(t, 2, count, "both releases must be called") +} diff --git a/solver/types.go b/solver/types.go index 5f9ae71dde48..3fe7bea8a75f 100644 --- a/solver/types.go +++ b/solver/types.go @@ -181,6 +181,17 @@ type JobContext interface { // Cleanup adds a function that is called when the job is done. This can be used to associate // resources with the job and keep them from being released until the job is done. Cleanup(func() error) error + // ResolverCache returns object for memorizing/synchronizing remote resolving decisions during the job. + // Steps from same build job will share the same resolver cache. + ResolverCache() ResolverCache +} + +type ResolverCache interface { + // Lock locks a key until the returned release function is called. + // Release function can return value that will be returned to next callers. + // Lock can return multiple values because two steps can be merged together after + // they both already completed resolve independently. + Lock(key any) (values []any, release func(any) error, err error) } type ProvenanceProvider interface { diff --git a/source/http/source.go b/source/http/source.go index efacee966230..2a5e319cd037 100644 --- a/source/http/source.go +++ b/source/http/source.go @@ -139,11 +139,15 @@ type Metadata struct { type httpSourceHandler struct { *Source src HTTPIdentifier - refID string - cacheKey digest.Digest + resolved *metadataWithRef sm *session.Manager } +type metadataWithRef struct { + Metadata + refID string +} + func (hs *Source) ResolveMetadata(ctx context.Context, id *HTTPIdentifier, sm *session.Manager, jobCtx solver.JobContext) (*Metadata, error) { hsh := &httpSourceHandler{ src: *id, @@ -226,7 +230,7 @@ func (hs *httpSourceHandler) formatCacheKey(filename string, dgst digest.Digest, return dgst } -func (hs *httpSourceHandler) resolveMetadata(ctx context.Context, jobCtx solver.JobContext) (*Metadata, error) { +func (hs *httpSourceHandler) resolveMetadata(ctx context.Context, jobCtx solver.JobContext) (md *Metadata, retErr error) { if hs.src.Checksum != "" { return &Metadata{ Digest: hs.src.Checksum, @@ -244,6 +248,43 @@ func (hs *httpSourceHandler) resolveMetadata(ctx context.Context, jobCtx solver. return nil, err } + if hs.resolved != nil { + return &hs.resolved.Metadata, nil + } + if jobCtx != nil { + if rc := jobCtx.ResolverCache(); rc != nil { + vals, release, err := rc.Lock(uh) + if err != nil { + return nil, err + } + saveResolved := true + defer func() { + ret := hs.resolved + if retErr != nil || !saveResolved { + ret = nil + } + if err := release(ret); err != nil { + bklog.G(ctx).WithError(err).Warn("failed to release resolver cache lock") + } + }() + for _, v := range vals { + v2, ok := v.(*metadataWithRef) + if !ok { + return nil, errors.Errorf("invalid HTTP resolver cache value: %T", v) + } + if hs.src.Checksum != "" && v2.Digest != hs.src.Checksum { + continue + } + hs.resolved = v2 + saveResolved = false + return &hs.resolved.Metadata, nil + } + if hs.src.Checksum != "" && len(vals) > 0 { + return nil, errors.Errorf("digest mismatch for %s: %s (expected: %s)", hs.src.URL, vals[0], hs.src.Checksum) + } + } + } + // look up metadata(previously stored headers) for that URL mds, err := searchHTTPURLDigest(ctx, hs.cache, uh) if err != nil { @@ -311,7 +352,6 @@ func (hs *httpSourceHandler) resolveMetadata(ctx context.Context, jobCtx solver. } md, ok := m[respETag] if ok { - hs.refID = md.ID() dgst := md.getHTTPChecksum() if dgst != "" { var modTime *time.Time @@ -321,11 +361,16 @@ func (hs *httpSourceHandler) resolveMetadata(ctx context.Context, jobCtx solver. } } resp.Body.Close() - return &Metadata{ + m := &Metadata{ Digest: dgst, Filename: getFileName(hs.src.URL, hs.src.Filename, resp), LastModified: modTime, - }, nil + } + hs.resolved = &metadataWithRef{ + Metadata: *m, + refID: md.ID(), + } + return m, nil } } } @@ -358,7 +403,6 @@ func (hs *httpSourceHandler) resolveMetadata(ctx context.Context, jobCtx solver. if !ok { return nil, errors.Errorf("invalid not-modified ETag: %v", respETag) } - hs.refID = md.ID() dgst := md.getHTTPChecksum() if dgst == "" { return nil, errors.Errorf("invalid metadata change") @@ -371,11 +415,16 @@ func (hs *httpSourceHandler) resolveMetadata(ctx context.Context, jobCtx solver. modTime = &t } } - return &Metadata{ + m := &Metadata{ Digest: dgst, Filename: getFileName(hs.src.URL, hs.src.Filename, resp), LastModified: modTime, - }, nil + } + hs.resolved = &metadataWithRef{ + Metadata: *m, + refID: md.ID(), + } + return m, nil } ref, dgst, err := hs.save(ctx, resp, g) @@ -401,11 +450,16 @@ func (hs *httpSourceHandler) resolveMetadata(ctx context.Context, jobCtx solver. } } - return &Metadata{ + out := &Metadata{ Digest: dgst, Filename: getFileName(hs.src.URL, hs.src.Filename, resp), LastModified: modTime, - }, nil + } + hs.resolved = &metadataWithRef{ + Metadata: *out, + refID: ref.ID(), + } + return out, nil } func (hs *httpSourceHandler) CacheKey(ctx context.Context, jobCtx solver.JobContext, index int) (string, string, solver.CacheOpts, bool, error) { @@ -413,7 +467,11 @@ func (hs *httpSourceHandler) CacheKey(ctx context.Context, jobCtx solver.JobCont if err != nil { return "", "", nil, false, err } - hs.cacheKey = md.Digest + if hs.resolved == nil { + hs.resolved = &metadataWithRef{ + Metadata: *md, + } + } return hs.formatCacheKey(md.Filename, md.Digest, md.LastModified).String(), md.Digest.String(), nil, true, nil } @@ -513,7 +571,6 @@ func (hs *httpSourceHandler) save(ctx context.Context, resp *http.Response, s se newRef = nil md := cacheRefMetadata{ref} - hs.refID = ref.ID() dgst = digest.NewDigest(digest.SHA256, h) if respETag := resp.Header.Get("ETag"); respETag != "" { @@ -540,10 +597,43 @@ func (hs *httpSourceHandler) save(ctx context.Context, resp *http.Response, s se } func (hs *httpSourceHandler) Snapshot(ctx context.Context, jobCtx solver.JobContext) (cache.ImmutableRef, error) { - if hs.refID != "" { - ref, err := hs.cache.Get(ctx, hs.refID, nil) + refID := "" + if hs.resolved != nil && hs.resolved.refID != "" { + refID = hs.resolved.refID + } else if jobCtx != nil { + if rc := jobCtx.ResolverCache(); rc != nil { + uh, err := hs.urlHash() + if err != nil { + return nil, err + } + vals, release, err := rc.Lock(uh) + if err != nil { + return nil, err + } + for _, v := range vals { + v2, ok := v.(*metadataWithRef) + if !ok { + return nil, errors.Errorf("invalid HTTP resolver cache value: %T", vals[0]) + } + if hs.src.Checksum != "" && v2.Digest != hs.src.Checksum { + continue + } + if v2.refID != "" { + hs.resolved = v2 + refID = v2.refID + } + } + release(nil) + if hs.src.Checksum != "" && len(vals) > 0 && refID == "" { + return nil, errors.Errorf("digest mismatch for %s: %s (expected: %s)", hs.src.URL, vals[0], hs.src.Checksum) + } + } + } + + if refID != "" { + ref, err := hs.cache.Get(ctx, hs.resolved.refID, nil) if err != nil { - bklog.G(ctx).WithError(err).Warnf("failed to get HTTP snapshot for ref %s (%s)", hs.refID, hs.src.URL) + bklog.G(ctx).WithError(err).Warnf("failed to get HTTP snapshot for ref %s (%s)", hs.resolved.refID, hs.src.URL) } else { return ref, nil } @@ -573,9 +663,9 @@ func (hs *httpSourceHandler) Snapshot(ctx context.Context, jobCtx solver.JobCont if err != nil { return nil, err } - if dgst != hs.cacheKey { + if hs.resolved != nil && dgst != hs.resolved.Digest { ref.Release(context.TODO()) - return nil, errors.Errorf("digest mismatch %s: %s", dgst, hs.cacheKey) + return nil, errors.Errorf("digest mismatch %s: %s", dgst, hs.resolved.Digest) } return ref, nil diff --git a/source/http/source_test.go b/source/http/source_test.go index 21865d4a633e..06873745d89d 100644 --- a/source/http/source_test.go +++ b/source/http/source_test.go @@ -19,6 +19,7 @@ import ( "github.com/moby/buildkit/session" "github.com/moby/buildkit/snapshot" containerdsnapshot "github.com/moby/buildkit/snapshot/containerd" + "github.com/moby/buildkit/solver" "github.com/moby/buildkit/source" "github.com/moby/buildkit/util/leaseutil" "github.com/moby/buildkit/util/testutil/httpserver" @@ -506,3 +507,7 @@ func (s *simpleJobContext) Release() error { s.releasers = nil return firstErr } + +func (s *simpleJobContext) ResolverCache() solver.ResolverCache { + return nil +} From 1cff633ba53fbe559f01be121a0c193f0c94ebde Mon Sep 17 00:00:00 2001 From: Tonis Tiigi Date: Tue, 14 Oct 2025 23:06:14 -0700 Subject: [PATCH 2/2] git: add resolvercache support to git source Make sure remote ref does not change to different commit if git repo changes in the middle of the build. Signed-off-by: Tonis Tiigi --- client/client_test.go | 119 ++++++++++++++++++++++++++++++++++++++++++ solver/types.go | 4 +- source/git/source.go | 40 +++++++++++++- 3 files changed, 159 insertions(+), 4 deletions(-) diff --git a/client/client_test.go b/client/client_test.go index e31bbc4a145e..8dc0ef0cb706 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -248,6 +248,7 @@ var allTests = []func(t *testing.T, sb integration.Sandbox){ testHTTPPruneAfterResolveMeta, testHTTPResolveMetaReuse, testHTTPResolveMultiBuild, + testGitResolveMutatedSource, } func TestIntegration(t *testing.T) { @@ -12413,6 +12414,124 @@ func testHTTPResolveMultiBuild(t *testing.T, sb integration.Sandbox) { require.Equal(t, "content2", string(dt)) } +func testGitResolveMutatedSource(t *testing.T, sb integration.Sandbox) { + integration.SkipOnPlatform(t, "windows") + ctx := sb.Context() + c, err := New(ctx, sb.Address()) + require.NoError(t, err) + defer c.Close() + + gitDir := t.TempDir() + gitCommands := []string{ + "git init", + "git config --local user.email test", + "git config --local user.name test", + "echo a > a", + "git add a", + "git commit -m a", + "git tag -a v0.1 -m v0.1", + "echo b > b", + "git add b", + "git commit -m b", + "git checkout -B v2", + "git update-server-info", + } + err = runInDir(gitDir, gitCommands...) + require.NoError(t, err) + + // cmd := exec.Command("git", "rev-parse", "HEAD") + // cmd.Dir = gitDir + // out, err := cmd.Output() + // require.NoError(t, err) + // commitHEAD := strings.TrimSpace(string(out)) + + cmd := exec.Command("git", "rev-parse", "v0.1") + cmd.Dir = gitDir + out, err := cmd.Output() + require.NoError(t, err) + commitTag := strings.TrimSpace(string(out)) + + cmd = exec.Command("git", "rev-parse", "v0.1^{commit}") + cmd.Dir = gitDir + out, err = cmd.Output() + require.NoError(t, err) + commitTagCommit := strings.TrimSpace(string(out)) + + server := httptest.NewServer(http.FileServer(http.Dir(filepath.Clean(gitDir)))) + defer server.Close() + + dest := t.TempDir() + + _, err = c.Build(ctx, SolveOpt{ + Exports: []ExportEntry{ + { + Type: ExporterLocal, + OutputDir: dest, + }, + }, + }, "test", func(ctx context.Context, c gateway.Client) (*gateway.Result, error) { + id := "git://" + strings.TrimPrefix(server.URL, "http://") + "/.git#v0.1" + md, err := c.ResolveSourceMetadata(ctx, &pb.SourceOp{ + Identifier: id, + Attrs: map[string]string{ + "git.fullurl": server.URL + "/.git", + }, + }, sourceresolver.Opt{}) + if err != nil { + return nil, err + } + require.NotNil(t, md.Git) + require.Equal(t, "refs/tags/v0.1", md.Git.Ref) + require.Equal(t, commitTag, md.Git.Checksum) + require.Equal(t, commitTagCommit, md.Git.CommitChecksum) + require.Equal(t, id, md.Op.Identifier) + require.Equal(t, server.URL+"/.git", md.Op.Attrs["git.fullurl"]) + + // update the tag to point to a different commit + err = runInDir(gitDir, []string{ + "git tag -f v0.1", + "git update-server-info", + }...) + require.NoError(t, err) + + md, err = c.ResolveSourceMetadata(ctx, &pb.SourceOp{ + Identifier: id, + Attrs: map[string]string{ + "git.fullurl": server.URL + "/.git", + }, + }, sourceresolver.Opt{}) + if err != nil { + return nil, err + } + require.NotNil(t, md.Git) + require.Equal(t, "refs/tags/v0.1", md.Git.Ref) + require.Equal(t, commitTag, md.Git.Checksum) + require.Equal(t, commitTagCommit, md.Git.CommitChecksum) + require.Equal(t, id, md.Op.Identifier) + require.Equal(t, server.URL+"/.git", md.Op.Attrs["git.fullurl"]) + + st := llb.Git(server.URL+"/.git", "", llb.GitRef("v0.1")) + def, err := st.Marshal(sb.Context()) + if err != nil { + return nil, err + } + return c.Solve(ctx, gateway.SolveRequest{ + Definition: def.ToPB(), + }) + }, nil) + require.NoError(t, err) + + _, err = os.ReadFile(filepath.Join(dest, "b")) + require.Error(t, err) + require.True(t, os.IsNotExist(err), "expected file b to not exist") + + dt, err := os.ReadFile(filepath.Join(dest, "a")) + require.NoError(t, err) + require.Equal(t, "a\n", string(dt)) + + checkAllReleasable(t, c, sb, false) +} + func runInDir(dir string, cmds ...string) error { for _, args := range cmds { var cmd *exec.Cmd diff --git a/solver/types.go b/solver/types.go index 3fe7bea8a75f..4f1d0cd2fdfa 100644 --- a/solver/types.go +++ b/solver/types.go @@ -189,8 +189,8 @@ type JobContext interface { type ResolverCache interface { // Lock locks a key until the returned release function is called. // Release function can return value that will be returned to next callers. - // Lock can return multiple values because two steps can be merged together after - // they both already completed resolve independently. + // Lock can return multiple values because two steps can be merged once + // both have independently completed their resolution. Lock(key any) (values []any, release func(any) error, err error) } diff --git a/source/git/source.go b/source/git/source.go index 6c95e09b847b..6bd4a662e693 100644 --- a/source/git/source.go +++ b/source/git/source.go @@ -384,7 +384,11 @@ func (gs *gitSourceHandler) mountKnownHosts() (string, func() error, error) { return knownHosts.Name(), cleanup, nil } -func (gs *gitSourceHandler) resolveMetadata(ctx context.Context, jobCtx solver.JobContext) (*Metadata, error) { +func (gs *gitSourceHandler) remoteKey() string { + return gs.src.Remote + "#" + gs.src.Ref +} + +func (gs *gitSourceHandler) resolveMetadata(ctx context.Context, jobCtx solver.JobContext) (md *Metadata, retErr error) { remote := gs.src.Remote gs.locker.Lock(remote) defer gs.locker.Unlock(remote) @@ -397,6 +401,9 @@ func (gs *gitSourceHandler) resolveMetadata(ctx context.Context, jobCtx solver.J } if gitutil.IsCommitSHA(gs.src.Ref) { + if gs.src.Checksum != "" && !strings.HasPrefix(gs.src.Ref, gs.src.Checksum) { + return nil, errors.Errorf("expected checksum to match %s, got %s", gs.src.Checksum, gs.src.Ref) + } return &Metadata{ Ref: gs.src.Ref, Checksum: gs.src.Ref, @@ -406,6 +413,35 @@ func (gs *gitSourceHandler) resolveMetadata(ctx context.Context, jobCtx solver.J var g session.Group if jobCtx != nil { g = jobCtx.Session() + + if rc := jobCtx.ResolverCache(); rc != nil { + values, release, err := rc.Lock(gs.remoteKey()) + if err != nil { + return nil, err + } + saveResolved := true + defer func() { + v := md + if retErr != nil || !saveResolved { + v = nil + } + if err := release(v); err != nil { + bklog.G(ctx).Warnf("failed to release resolver cache lock for %s: %v", gs.remoteKey(), err) + } + }() + for _, v := range values { + v2, ok := v.(*Metadata) + if !ok { + return nil, errors.Errorf("invalid resolver cache value for %s: %T", gs.remoteKey(), v) + } + if gs.src.Checksum != "" && !strings.HasPrefix(v2.Checksum, gs.src.Checksum) { + continue + } + saveResolved = false + clone := *v2 + return &clone, nil + } + } } gs.getAuthToken(ctx, g) @@ -483,7 +519,7 @@ func (gs *gitSourceHandler) resolveMetadata(ctx context.Context, jobCtx solver.J return nil, errors.Errorf("expected checksum to match %s, got %s", gs.src.Checksum, exp) } } - md := &Metadata{ + md = &Metadata{ Ref: usedRef, Checksum: sha, }