Skip to content

Commit 36acc6e

Browse files
cursoragentarsalann
andcommitted
fix: serialize ingestr installation to prevent file lock conflicts on Windows
When running Bruin with multiple workers on Windows, parallel workers would simultaneously try to install ingestr via 'uv tool install --force', causing file lock conflicts on ingestr.exe (OS error 32). This commit: - Adds a package-level mutex to serialize ingestr installation calls - Tracks installed package combinations to avoid redundant reinstalls - Only uses --force flag when actually needed (first install or local path) - Adds comprehensive tests for the synchronization mechanism The fix ensures that ingestr is installed only once per unique package combination during a Bruin run, preventing the race condition that caused file locks on Windows. Fixes: ingestr.exe file lock error when running with multiple workers Co-authored-by: Arsalan <arsalann@users.noreply.github.com>
1 parent e604389 commit 36acc6e

2 files changed

Lines changed: 220 additions & 12 deletions

File tree

pkg/python/uv.go

Lines changed: 59 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,11 @@ import (
2525
"github.com/spf13/afero"
2626
)
2727

28+
var (
29+
ingestrInstallMutex sync.Mutex
30+
ingestrInstalledPackages = make(map[string]bool)
31+
)
32+
2833
var AvailablePythonVersions = map[string]bool{
2934
"3.8": true,
3035
"3.9": true,
@@ -126,12 +131,9 @@ func (u *UvPythonRunner) RunIngestr(ctx context.Context, args, extraPackages []s
126131
}
127132
u.binaryFullPath = binaryFullPath
128133

129-
err = u.Cmd.Run(ctx, repo, &CommandInstance{
130-
Name: u.binaryFullPath,
131-
Args: u.ingestrInstallCmd(ctx, extraPackages),
132-
})
134+
err = u.ensureIngestrInstalled(ctx, extraPackages, repo)
133135
if err != nil {
134-
return errors.Wrap(err, "failed to install ingestr")
136+
return err
135137
}
136138

137139
flags := []string{"tool", "run", "--no-config", "--prerelease", "allow", "--python", pythonVersionForIngestr, "ingestr"}
@@ -381,12 +383,9 @@ func (u *UvPythonRunner) runWithMaterialization(ctx context.Context, execCtx *ex
381383
}
382384
}
383385

384-
err = u.Cmd.Run(ctx, execCtx.repo, &CommandInstance{
385-
Name: u.binaryFullPath,
386-
Args: u.ingestrInstallCmd(ctx, extraPackages),
387-
})
386+
err = u.ensureIngestrInstalled(ctx, extraPackages, execCtx.repo)
388387
if err != nil {
389-
return errors.Wrap(err, "failed to install ingestr")
388+
return err
390389
}
391390

392391
runArgs := slices.Concat([]string{"tool", "run", "--no-config", "--prerelease", "allow", "--python", pythonVersionForIngestr, "ingestr"}, cmdArgs)
@@ -425,19 +424,21 @@ func (u *UvPythonRunner) ingestrPackage(ctx context.Context) (string, bool) {
425424

426425
// ingestrInstallCmd returns the uv tool commandline
427426
// args necessary for installing ingestr.
428-
func (u *UvPythonRunner) ingestrInstallCmd(ctx context.Context, pkgs []string) []string {
427+
func (u *UvPythonRunner) ingestrInstallCmd(ctx context.Context, pkgs []string, forceReinstall bool) []string {
429428
ingestrPackageName, isLocal := u.ingestrPackage(ctx)
430429
cmdline := []string{
431430
"tool",
432431
"install",
433432
"--no-config",
434-
"--force",
435433
"--quiet",
436434
"--prerelease",
437435
"allow",
438436
"--python",
439437
pythonVersionForIngestr,
440438
}
439+
if forceReinstall {
440+
cmdline = append(cmdline, "--force")
441+
}
441442
for _, pkg := range pkgs {
442443
cmdline = append(cmdline, "--with", pkg)
443444
}
@@ -448,6 +449,52 @@ func (u *UvPythonRunner) ingestrInstallCmd(ctx context.Context, pkgs []string) [
448449
return cmdline
449450
}
450451

452+
// buildIngestrPackageKey creates a unique key for the combination of ingestr version and extra packages.
453+
func (u *UvPythonRunner) buildIngestrPackageKey(ctx context.Context, extraPackages []string) string {
454+
ingestrPackageName, _ := u.ingestrPackage(ctx)
455+
sortedPkgs := make([]string, len(extraPackages))
456+
copy(sortedPkgs, extraPackages)
457+
sort.Strings(sortedPkgs)
458+
return ingestrPackageName + ":" + strings.Join(sortedPkgs, ",")
459+
}
460+
461+
// ensureIngestrInstalled installs ingestr with the specified extra packages in a thread-safe manner.
462+
// It prevents parallel workers from installing ingestr simultaneously, which causes file lock
463+
// conflicts on Windows (OS error 32).
464+
func (u *UvPythonRunner) ensureIngestrInstalled(ctx context.Context, extraPackages []string, repo *git.Repo) error {
465+
packageKey := u.buildIngestrPackageKey(ctx, extraPackages)
466+
467+
ingestrInstallMutex.Lock()
468+
defer ingestrInstallMutex.Unlock()
469+
470+
_, isLocal := u.ingestrPackage(ctx)
471+
if !isLocal {
472+
if ingestrInstalledPackages[packageKey] {
473+
return nil
474+
}
475+
}
476+
477+
forceReinstall := isLocal || !ingestrInstalledPackages[packageKey]
478+
err := u.Cmd.Run(ctx, repo, &CommandInstance{
479+
Name: u.binaryFullPath,
480+
Args: u.ingestrInstallCmd(ctx, extraPackages, forceReinstall),
481+
})
482+
if err != nil {
483+
return errors.Wrap(err, "failed to install ingestr")
484+
}
485+
486+
ingestrInstalledPackages[packageKey] = true
487+
return nil
488+
}
489+
490+
// ResetIngestrInstallCache resets the ingestr installation cache.
491+
// This is primarily useful for testing.
492+
func ResetIngestrInstallCache() {
493+
ingestrInstallMutex.Lock()
494+
defer ingestrInstallMutex.Unlock()
495+
ingestrInstalledPackages = make(map[string]bool)
496+
}
497+
451498
const PythonArrowTemplate = `
452499
# /// script
453500
# dependencies = [

pkg/python/uv_test.go

Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ package python
22

33
import (
44
"context"
5+
"sync"
6+
"sync/atomic"
57
"testing"
68

79
"github.com/bruin-data/bruin/pkg/git"
@@ -131,3 +133,162 @@ func Test_uvPythonRunner_Run(t *testing.T) {
131133
})
132134
}
133135
}
136+
137+
func Test_buildIngestrPackageKey(t *testing.T) {
138+
t.Parallel()
139+
140+
tests := []struct {
141+
name string
142+
extraPackages []string
143+
expected string
144+
}{
145+
{
146+
name: "no extra packages",
147+
extraPackages: []string{},
148+
expected: "ingestr@" + ingestrVersion + ":",
149+
},
150+
{
151+
name: "single package",
152+
extraPackages: []string{"pyodbc"},
153+
expected: "ingestr@" + ingestrVersion + ":pyodbc",
154+
},
155+
{
156+
name: "multiple packages sorted",
157+
extraPackages: []string{"pyodbc", "duckdb"},
158+
expected: "ingestr@" + ingestrVersion + ":duckdb,pyodbc",
159+
},
160+
{
161+
name: "packages in different order produce same key",
162+
extraPackages: []string{"duckdb", "pyodbc"},
163+
expected: "ingestr@" + ingestrVersion + ":duckdb,pyodbc",
164+
},
165+
}
166+
167+
for _, tt := range tests {
168+
t.Run(tt.name, func(t *testing.T) {
169+
t.Parallel()
170+
171+
u := &UvPythonRunner{}
172+
key := u.buildIngestrPackageKey(context.Background(), tt.extraPackages)
173+
assert.Equal(t, tt.expected, key)
174+
})
175+
}
176+
}
177+
178+
func Test_ensureIngestrInstalled_OnlyInstallsOnce(t *testing.T) {
179+
ResetIngestrInstallCache()
180+
defer ResetIngestrInstallCache()
181+
182+
repo := &git.Repo{}
183+
var installCount atomic.Int32
184+
185+
cmd := new(mockCmd)
186+
cmd.On("Run", mock.Anything, repo, mock.MatchedBy(func(c *CommandInstance) bool {
187+
return len(c.Args) > 0 && c.Args[0] == "tool" && c.Args[1] == "install"
188+
})).Run(func(args mock.Arguments) {
189+
installCount.Add(1)
190+
}).Return(nil)
191+
192+
inst := new(mockUvInstaller)
193+
inst.On("EnsureUvInstalled", mock.Anything).Return("~/.bruin/uv", nil)
194+
195+
u := &UvPythonRunner{
196+
Cmd: cmd,
197+
UvInstaller: inst,
198+
binaryFullPath: "~/.bruin/uv",
199+
}
200+
201+
ctx := context.Background()
202+
extraPackages := []string{"pyodbc"}
203+
204+
err := u.ensureIngestrInstalled(ctx, extraPackages, repo)
205+
assert.NoError(t, err)
206+
207+
err = u.ensureIngestrInstalled(ctx, extraPackages, repo)
208+
assert.NoError(t, err)
209+
210+
err = u.ensureIngestrInstalled(ctx, extraPackages, repo)
211+
assert.NoError(t, err)
212+
213+
assert.Equal(t, int32(1), installCount.Load(), "ingestr should only be installed once for the same package combination")
214+
}
215+
216+
func Test_ensureIngestrInstalled_InstallsForDifferentPackages(t *testing.T) {
217+
ResetIngestrInstallCache()
218+
defer ResetIngestrInstallCache()
219+
220+
repo := &git.Repo{}
221+
var installCount atomic.Int32
222+
223+
cmd := new(mockCmd)
224+
cmd.On("Run", mock.Anything, repo, mock.MatchedBy(func(c *CommandInstance) bool {
225+
return len(c.Args) > 0 && c.Args[0] == "tool" && c.Args[1] == "install"
226+
})).Run(func(args mock.Arguments) {
227+
installCount.Add(1)
228+
}).Return(nil)
229+
230+
inst := new(mockUvInstaller)
231+
inst.On("EnsureUvInstalled", mock.Anything).Return("~/.bruin/uv", nil)
232+
233+
u := &UvPythonRunner{
234+
Cmd: cmd,
235+
UvInstaller: inst,
236+
binaryFullPath: "~/.bruin/uv",
237+
}
238+
239+
ctx := context.Background()
240+
241+
err := u.ensureIngestrInstalled(ctx, []string{"pyodbc"}, repo)
242+
assert.NoError(t, err)
243+
244+
err = u.ensureIngestrInstalled(ctx, []string{"duckdb"}, repo)
245+
assert.NoError(t, err)
246+
247+
err = u.ensureIngestrInstalled(ctx, []string{"pyodbc", "duckdb"}, repo)
248+
assert.NoError(t, err)
249+
250+
assert.Equal(t, int32(3), installCount.Load(), "ingestr should be installed once for each unique package combination")
251+
}
252+
253+
func Test_ensureIngestrInstalled_ConcurrentCalls(t *testing.T) {
254+
ResetIngestrInstallCache()
255+
defer ResetIngestrInstallCache()
256+
257+
repo := &git.Repo{}
258+
var installCount atomic.Int32
259+
260+
cmd := new(mockCmd)
261+
cmd.On("Run", mock.Anything, repo, mock.MatchedBy(func(c *CommandInstance) bool {
262+
return len(c.Args) > 0 && c.Args[0] == "tool" && c.Args[1] == "install"
263+
})).Run(func(args mock.Arguments) {
264+
installCount.Add(1)
265+
}).Return(nil)
266+
267+
inst := new(mockUvInstaller)
268+
inst.On("EnsureUvInstalled", mock.Anything).Return("~/.bruin/uv", nil)
269+
270+
u := &UvPythonRunner{
271+
Cmd: cmd,
272+
UvInstaller: inst,
273+
binaryFullPath: "~/.bruin/uv",
274+
}
275+
276+
ctx := context.Background()
277+
extraPackages := []string{"pyodbc"}
278+
279+
var wg sync.WaitGroup
280+
numGoroutines := 10
281+
282+
for range numGoroutines {
283+
wg.Add(1)
284+
go func() {
285+
defer wg.Done()
286+
err := u.ensureIngestrInstalled(ctx, extraPackages, repo)
287+
assert.NoError(t, err)
288+
}()
289+
}
290+
291+
wg.Wait()
292+
293+
assert.Equal(t, int32(1), installCount.Load(), "ingestr should only be installed once even with concurrent calls")
294+
}

0 commit comments

Comments
 (0)