99 "fmt"
1010 "io"
1111 "os"
12+ "slices"
1213 "strconv"
1314 "strings"
1415 "sync"
@@ -34,6 +35,7 @@ import (
3435 gateway "github.com/moby/buildkit/frontend/gateway/client"
3536 "github.com/moby/buildkit/identity"
3637 "github.com/moby/buildkit/session"
38+ "github.com/moby/buildkit/session/filesync"
3739 "github.com/moby/buildkit/solver/errdefs"
3840 "github.com/moby/buildkit/solver/pb"
3941 spb "github.com/moby/buildkit/sourcepolicy/pb"
@@ -44,6 +46,8 @@ import (
4446 specs "github.com/opencontainers/image-spec/specs-go/v1"
4547 "github.com/pkg/errors"
4648 "github.com/sirupsen/logrus"
49+ "github.com/tonistiigi/fsutil"
50+ fstypes "github.com/tonistiigi/fsutil/types"
4751 "go.opentelemetry.io/otel/trace"
4852 "golang.org/x/sync/errgroup"
4953)
@@ -291,6 +295,12 @@ func BuildWithResultHandler(ctx context.Context, nodes []builder.Node, opt map[s
291295 }
292296 }
293297
298+ sharedSessions , err := detectSharedMounts (ctx , reqForNodes )
299+ if err != nil {
300+ return nil , err
301+ }
302+ sharedSessionsWG := map [string ]* sync.WaitGroup {}
303+
294304 resp = map [string ]* client.SolveResponse {}
295305 var respMu sync.Mutex
296306 results := waitmap .New ()
@@ -355,7 +365,37 @@ func BuildWithResultHandler(ctx context.Context, nodes []builder.Node, opt map[s
355365 if err != nil {
356366 return err
357367 }
368+
369+ var done func ()
370+ if sessions , ok := sharedSessions [node .Name ]; ok {
371+ wg , ok := sharedSessionsWG [node .Name ]
372+ if ok {
373+ wg .Add (1 )
374+ } else {
375+ wg = & sync.WaitGroup {}
376+ wg .Add (1 )
377+ sharedSessionsWG [node .Name ] = wg
378+ for _ , s := range sessions {
379+ s := s
380+ eg .Go (func () error {
381+ return s .Run (baseCtx , c .Dialer ())
382+ })
383+ }
384+ go func () {
385+ wg .Wait ()
386+ for _ , s := range sessions {
387+ s .Close ()
388+ }
389+ }()
390+ }
391+ done = wg .Done
392+ }
393+
358394 eg2 .Go (func () error {
395+ if done != nil {
396+ defer done ()
397+ }
398+
359399 pw = progress .ResetTime (pw )
360400
361401 if err := waitContextDeps (ctx , dp .driverIndex , results , so ); err != nil {
@@ -786,6 +826,124 @@ func resultKey(index int, name string) string {
786826 return fmt .Sprintf ("%d-%s" , index , name )
787827}
788828
829+ // detectSharedMounts looks for same local mounts used by multiple requests to the same node
830+ // and creates a separate session that will be used by all detected requests.
831+ func detectSharedMounts (ctx context.Context , reqs map [string ][]* reqForNode ) (_ map [string ][]* session.Session , err error ) {
832+ type fsTracker struct {
833+ fs fsutil.FS
834+ so []* client.SolveOpt
835+ }
836+ type fsKey struct {
837+ name string
838+ dir string
839+ }
840+
841+ m := map [string ]map [fsKey ]* fsTracker {}
842+ for _ , reqs := range reqs {
843+ for _ , req := range reqs {
844+ nodeName := req .resolvedNode .Node ().Name
845+ if _ , ok := m [nodeName ]; ! ok {
846+ m [nodeName ] = map [fsKey ]* fsTracker {}
847+ }
848+ fsMap := m [nodeName ]
849+ for name , m := range req .so .LocalMounts {
850+ fs , ok := m .(* fs )
851+ if ! ok {
852+ continue
853+ }
854+ key := fsKey {name : name , dir : fs .dir }
855+ if _ , ok := fsMap [key ]; ! ok {
856+ fsMap [key ] = & fsTracker {fs : fs .FS }
857+ }
858+ fsMap [key ].so = append (fsMap [key ].so , req .so )
859+ }
860+ }
861+ }
862+
863+ type sharedSession struct {
864+ * session.Session
865+ fsMap map [string ]fsutil.FS
866+ }
867+
868+ sessionMap := map [string ][]* sharedSession {}
869+
870+ defer func () {
871+ if err != nil {
872+ for _ , sessions := range sessionMap {
873+ for _ , s := range sessions {
874+ s .Close ()
875+ }
876+ }
877+ }
878+ }()
879+
880+ for node , fsMap := range m {
881+ for key , fs := range fsMap {
882+ if len (fs .so ) <= 1 {
883+ continue
884+ }
885+
886+ sessions := sessionMap [node ]
887+
888+ // find session that doesn't have the fs name reserved
889+ idx := slices .IndexFunc (sessions , func (s * sharedSession ) bool {
890+ _ , ok := s .fsMap [key .name ]
891+ return ! ok
892+ })
893+
894+ var ss * sharedSession
895+ if idx == - 1 {
896+ s , err := session .NewSession (ctx , "" , fs .so [0 ].SharedKey )
897+ if err != nil {
898+ return nil , err
899+ }
900+ ss = & sharedSession {Session : s , fsMap : map [string ]fsutil.FS {}}
901+ sessions = append (sessions , ss )
902+ sessionMap [node ] = sessions
903+ } else {
904+ ss = sessions [idx ]
905+ }
906+
907+ ss .fsMap [key .name ] = fs .fs
908+ for _ , so := range fs .so {
909+ if so .FrontendAttrs == nil {
910+ so .FrontendAttrs = map [string ]string {}
911+ }
912+ so .FrontendAttrs ["local-sessionid:" + key .name ] = ss .ID ()
913+ }
914+ }
915+ }
916+
917+ resetUIDAndGID := func (p string , st * fstypes.Stat ) fsutil.MapResult {
918+ st .Uid = 0
919+ st .Gid = 0
920+ return fsutil .MapResultKeep
921+ }
922+
923+ // convert back to regular sessions
924+ sessions := map [string ][]* session.Session {}
925+ for n , ss := range sessionMap {
926+ arr := make ([]* session.Session , 0 , len (ss ))
927+ for _ , s := range ss {
928+ arr = append (arr , s .Session )
929+
930+ src := make (filesync.StaticDirSource , len (s .fsMap ))
931+ for name , fs := range s .fsMap {
932+ fs , err := fsutil .NewFilterFS (fs , & fsutil.FilterOpt {
933+ Map : resetUIDAndGID ,
934+ })
935+ if err != nil {
936+ return nil , err
937+ }
938+ src [name ] = fs
939+ }
940+ s .Allow (filesync .NewFSSyncProvider (src ))
941+ }
942+ sessions [n ] = arr
943+ }
944+ return sessions , nil
945+ }
946+
789947// calculateChildTargets returns all the targets that depend on current target for reverse index
790948func calculateChildTargets (reqs map [string ][]* reqForNode , opt map [string ]Options ) map [string ][]string {
791949 out := make (map [string ][]string )
0 commit comments