Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 28 additions & 16 deletions solver/edge.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,13 @@ func (t edgeStatusType) String() string {

func newEdge(ed Edge, op activeOp, index *edgeIndex) *edge {
e := &edge{
edge: ed,
op: op,
depRequests: map[pipe.Receiver]*dep{},
keyMap: map[string]struct{}{},
cacheRecords: map[string]*CacheRecord{},
index: index,
edge: ed,
op: op,
depRequests: map[pipe.Receiver]*dep{},
keyMap: map[string]struct{}{},
cacheRecords: map[string]*CacheRecord{},
cacheRecordsLoaded: map[string]struct{}{},
index: index,
}
return e
}
Expand All @@ -44,14 +45,16 @@ type edge struct {
depRequests map[pipe.Receiver]*dep
deps []*dep

cacheMapReq pipe.Receiver
cacheMapDone bool
cacheMapIndex int
cacheMapDigests []digest.Digest
execReq pipe.Receiver
err error
cacheRecords map[string]*CacheRecord
keyMap map[string]struct{}
cacheMapReq pipe.Receiver
cacheMapDone bool
cacheMapIndex int
cacheMapDigests []digest.Digest
execReq pipe.Receiver
execCacheLoad bool
err error
cacheRecords map[string]*CacheRecord
cacheRecordsLoaded map[string]struct{}
keyMap map[string]struct{}

noCacheMatchPossible bool
allDepsCompletedCacheFast bool
Expand Down Expand Up @@ -425,7 +428,11 @@ func (e *edge) processUpdate(upt pipe.Receiver) (depChanged bool) {
if upt == e.execReq && upt.Status().Completed {
if err := upt.Status().Err; err != nil {
e.execReq = nil
if !upt.Status().Canceled && e.err == nil {
if e.execCacheLoad {
for k := range e.cacheRecordsLoaded {
delete(e.cacheRecords, k)
}
} else if !upt.Status().Canceled && e.err == nil {
e.err = err
}
} else {
Expand Down Expand Up @@ -561,7 +568,9 @@ func (e *edge) recalcCurrentState() {
}

for _, r := range records {
e.cacheRecords[r.ID] = r
if _, ok := e.cacheRecordsLoaded[r.ID]; !ok {
e.cacheRecords[r.ID] = r
}
}

e.keys = append(e.keys, e.makeExportable(mergedKey, records))
Expand Down Expand Up @@ -821,6 +830,7 @@ func (e *edge) execIfPossible(f *pipeFactory) bool {
return true
}
e.execReq = f.NewFuncRequest(e.loadCache)
e.execCacheLoad = true
for req := range e.depRequests {
req.Cancel()
}
Expand All @@ -831,6 +841,7 @@ func (e *edge) execIfPossible(f *pipeFactory) bool {
return true
}
e.execReq = f.NewFuncRequest(e.execOp)
e.execCacheLoad = false
return true
}
return false
Expand All @@ -851,6 +862,7 @@ func (e *edge) loadCache(ctx context.Context) (interface{}, error) {
}

rec := getBestResult(recs)
e.cacheRecordsLoaded[rec.ID] = struct{}{}

logrus.Debugf("load cache for %s with %s", e.edge.Vertex.Name(), rec.ID)
res, err := e.op.LoadCache(ctx, rec)
Expand Down
104 changes: 104 additions & 0 deletions solver/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3102,6 +3102,106 @@ func TestMergedEdgesLookup(t *testing.T) {
}
}

func TestCacheLoadError(t *testing.T) {
t.Parallel()

rand.Seed(time.Now().UnixNano())

ctx := context.TODO()

cacheManager := newTrackingCacheManager(NewInMemoryCacheManager())

l := NewSolver(SolverOpt{
ResolveOpFunc: testOpResolver,
DefaultCache: cacheManager,
})
defer l.Close()

j0, err := l.NewJob("j0")
require.NoError(t, err)

defer func() {
if j0 != nil {
j0.Discard()
}
}()

g := Edge{
Vertex: vtxSum(3, vtxOpt{inputs: []Edge{
{Vertex: vtxSum(0, vtxOpt{inputs: []Edge{
{Vertex: vtxSum(2, vtxOpt{inputs: []Edge{
{Vertex: vtxConst(2, vtxOpt{})},
}})},
{Vertex: vtxConst(0, vtxOpt{})},
}})},
{Vertex: vtxSum(2, vtxOpt{inputs: []Edge{
{Vertex: vtxConst(2, vtxOpt{})},
}})},
}}),
}
g.Vertex.(*vertexSum).setupCallCounters()

res, err := j0.Build(ctx, g)
require.NoError(t, err)
require.Equal(t, unwrapInt(res), 11)
require.Equal(t, int64(7), *g.Vertex.(*vertexSum).cacheCallCount)
require.Equal(t, int64(5), *g.Vertex.(*vertexSum).execCallCount)
require.Equal(t, int64(0), cacheManager.loadCounter)

require.NoError(t, j0.Discard())
j0 = nil

// repeat with cache
j1, err := l.NewJob("j1")
require.NoError(t, err)

defer func() {
if j1 != nil {
j1.Discard()
}
}()

g1 := g

g1.Vertex.(*vertexSum).setupCallCounters()

res, err = j1.Build(ctx, g1)
require.NoError(t, err)
require.Equal(t, unwrapInt(res), 11)
require.Equal(t, int64(7), *g.Vertex.(*vertexSum).cacheCallCount)
require.Equal(t, int64(0), *g.Vertex.(*vertexSum).execCallCount)
require.Equal(t, int64(1), cacheManager.loadCounter)

require.NoError(t, j1.Discard())
j1 = nil

// repeat with cache but loading will now fail
j2, err := l.NewJob("j2")
require.NoError(t, err)

defer func() {
if j2 != nil {
j2.Discard()
}
}()

g2 := g

g2.Vertex.(*vertexSum).setupCallCounters()

cacheManager.forceFail = true

res, err = j2.Build(ctx, g2)
require.NoError(t, err)
require.Equal(t, unwrapInt(res), 11)
require.Equal(t, int64(7), *g.Vertex.(*vertexSum).cacheCallCount)
require.Equal(t, int64(5), *g.Vertex.(*vertexSum).execCallCount)
require.Equal(t, int64(6), cacheManager.loadCounter)

require.NoError(t, j2.Discard())
j2 = nil
}

func TestInputRequestDeadlock(t *testing.T) {
t.Parallel()
ctx := context.TODO()
Expand Down Expand Up @@ -3584,10 +3684,14 @@ func newTrackingCacheManager(cm CacheManager) *trackingCacheManager {
type trackingCacheManager struct {
CacheManager
loadCounter int64
forceFail bool
}

func (cm *trackingCacheManager) Load(ctx context.Context, rec *CacheRecord) (Result, error) {
atomic.AddInt64(&cm.loadCounter, 1)
if cm.forceFail {
return nil, errors.Errorf("force fail")
}
return cm.CacheManager.Load(ctx, rec)
}

Expand Down