Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 68 additions & 26 deletions pkg/runner/chunk_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -713,38 +713,80 @@ func (r *ChunkRunner) handleSha256AndFinalize(ctx context.Context, chunk *v1alph
}

func (r *ChunkRunner) waitForPartialChunk(ctx context.Context, chunk *v1alpha1.Chunk, s *state, swmr ioswmr.SWMR, etags []string) {
for {
if ctx.Err() != nil {
// Try to get the chunk first (it might already be complete)
pchunk, err := r.getChunk(chunk.Spec.Sha256PartialPreviousName)
if err == nil {
if pchunk.Status.Phase == v1alpha1.ChunkPhaseSucceeded {
if len(pchunk.Status.Sha256Partial) == 0 {
err := fmt.Errorf("partial chunk %q has no sha256 partial data", chunk.Spec.Sha256PartialPreviousName)
s.handleProcessError("MissingSha256PartialData", err)
return
}
s.Update(func(ss *v1alpha1.Chunk) (*v1alpha1.Chunk, error) {
var err error
ss.Status.Sha256, ss.Status.Sha256Partial, err = updateSha256(ss.Spec.Sha256, pchunk.Status.Sha256Partial, swmr.NewReader())
if err != nil {
return nil, err
}
ss.Status.Etags = etags
utils.SetChunkTerminalPhase(ss, v1alpha1.ChunkPhaseSucceeded)
return ss, nil
})
return
}
time.Sleep(time.Second)
pchunk, err := r.getChunk(chunk.Spec.Sha256PartialPreviousName)
if err != nil {
if !apierrors.IsNotFound(err) {
s.handleProcessError("", err)
} else if !apierrors.IsNotFound(err) {
s.handleProcessError("", err)
return
}

// Use Watch API to wait for the chunk to be updated
opts := metav1.ListOptions{
FieldSelector: "metadata.name=" + chunk.Spec.Sha256PartialPreviousName,
}

watcher, err := r.client.TaskV1alpha1().Chunks().Watch(ctx, opts)
if err != nil {
s.handleProcessError("", err)
return
}
defer watcher.Stop()

for {
select {
case <-ctx.Done():
return
case event, ok := <-watcher.ResultChan():
if !ok {
// Watch channel closed, restart watch
s.handleProcessError("", fmt.Errorf("watch channel closed unexpectedly"))
return
}
continue
}
if pchunk.Status.Phase != v1alpha1.ChunkPhaseSucceeded {
continue
}
if len(pchunk.Status.Sha256Partial) == 0 {
err := fmt.Errorf("partial chunk %q has no sha256 partial data", chunk.Spec.Sha256PartialPreviousName)
s.handleProcessError("MissingSha256PartialData", err)

pchunk, ok := event.Object.(*v1alpha1.Chunk)
if !ok {
continue
}

if pchunk.Status.Phase != v1alpha1.ChunkPhaseSucceeded {
continue
}
if len(pchunk.Status.Sha256Partial) == 0 {
err := fmt.Errorf("partial chunk %q has no sha256 partial data", chunk.Spec.Sha256PartialPreviousName)
s.handleProcessError("MissingSha256PartialData", err)
return
}
s.Update(func(ss *v1alpha1.Chunk) (*v1alpha1.Chunk, error) {
var err error
ss.Status.Sha256, ss.Status.Sha256Partial, err = updateSha256(ss.Spec.Sha256, pchunk.Status.Sha256Partial, swmr.NewReader())
if err != nil {
return nil, err
}
ss.Status.Etags = etags
utils.SetChunkTerminalPhase(ss, v1alpha1.ChunkPhaseSucceeded)
return ss, nil
})
return
}
s.Update(func(ss *v1alpha1.Chunk) (*v1alpha1.Chunk, error) {
var err error
ss.Status.Sha256, ss.Status.Sha256Partial, err = updateSha256(ss.Spec.Sha256, pchunk.Status.Sha256Partial, swmr.NewReader())
if err != nil {
return nil, err
}
ss.Status.Etags = etags
utils.SetChunkTerminalPhase(ss, v1alpha1.ChunkPhaseSucceeded)
return ss, nil
})
return
}
}

Expand Down