Skip to content

Commit 6ae762b

Browse files
homejimjim02.he
andauthored
feat(plugin_manager): optimize local plugin startup with concurrency (#375)
* feat(plugin_manager): optimize local plugin startup with concurrent control - Add semaphore-based concurrency control for plugin launches - Implement parallel plugin startup using goroutines - Optimize error handling to prevent goroutine blocking - Add concurrency metrics logging Note: handleNewLocalPlugins now accepts config parameter with default concurrency limit * feat(plugin_manager): make local plugin launching concurrency configurable * fix(plugin_manager): optimize comment and error handling - Updated comments to clarify the concurrent plugin launching configuration. - Added a nil check for the error channel during plugin startup to improve code robustness. * refactor(plugin_manager): refactor plugin startup logic - Remove the semaphore mechanism and switch to using routine.Submit for concurrency management * fix(plugin_manager): Optimize plugin startup logs and concurrency control - Added log output for maximum concurrency when starting local plugins - Implemented a channel-based concurrency control mechanism to ensure limits are not exceeded - Fixed closure variable capture issue to prevent incorrect plugin information - Improved error handling to avoid deadlocks during startup * fix(plugin_manager): simplify error channel handling and semaphore release logic --------- Co-authored-by: jim02.he <[email protected]>
1 parent b97cce7 commit 6ae762b

File tree

2 files changed

+49
-22
lines changed

2 files changed

+49
-22
lines changed

internal/core/plugin_manager/manager.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,9 @@ func InitGlobalManager(oss oss.OSS, configuration *app.Config) *PluginManager {
7474
configuration.PluginInstalledPath,
7575
),
7676
localPluginLaunchingLock: lock.NewGranularityLock(),
77-
maxLaunchingLock: make(chan bool, 2), // by default, we allow 2 plugins launching at the same time
78-
config: configuration,
77+
// By default, we allow up to configuration.PluginLocalLaunchingConcurrent plugins to be launched concurrently; if not configured, the default is 2.
78+
maxLaunchingLock: make(chan bool, configuration.PluginLocalLaunchingConcurrent),
79+
config: configuration,
7980
}
8081

8182
return manager
@@ -156,7 +157,7 @@ func (p *PluginManager) Launch(configuration *app.Config) {
156157

157158
// start local watcher
158159
if configuration.Platform == app.PLATFORM_LOCAL {
159-
p.startLocalWatcher()
160+
p.startLocalWatcher(configuration)
160161
}
161162

162163
// launch serverless connector

internal/core/plugin_manager/watcher.go

Lines changed: 45 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package plugin_manager
22

33
import (
4+
"sync"
45
"time"
56

67
"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/debugging_runtime"
@@ -11,12 +12,13 @@ import (
1112
"github.com/langgenius/dify-plugin-daemon/pkg/entities/plugin_entities"
1213
)
1314

14-
func (p *PluginManager) startLocalWatcher() {
15+
func (p *PluginManager) startLocalWatcher(config *app.Config) {
1516
go func() {
1617
log.Info("start to handle new plugins in path: %s", p.config.PluginInstalledPath)
17-
p.handleNewLocalPlugins()
18+
log.Info("Launching plugins with max concurrency: %d", p.config.PluginLocalLaunchingConcurrent)
19+
p.handleNewLocalPlugins(config)
1820
for range time.NewTicker(time.Second * 30).C {
19-
p.handleNewLocalPlugins()
21+
p.handleNewLocalPlugins(config)
2022
p.removeUninstalledLocalPlugins()
2123
}
2224
}()
@@ -66,34 +68,58 @@ func (p *PluginManager) startRemoteWatcher(config *app.Config) {
6668
}
6769
}
6870

69-
func (p *PluginManager) handleNewLocalPlugins() {
71+
func (p *PluginManager) handleNewLocalPlugins(config *app.Config) {
7072
// walk through all plugins
7173
plugins, err := p.installedBucket.List()
7274
if err != nil {
7375
log.Error("list installed plugins failed: %s", err.Error())
7476
return
7577
}
7678

79+
var wg sync.WaitGroup
80+
maxConcurrency := config.PluginLocalLaunchingConcurrent
81+
sem := make(chan struct{}, maxConcurrency)
82+
7783
for _, plugin := range plugins {
78-
_, launchedChan, errChan, err := p.launchLocal(plugin)
79-
if err != nil {
80-
log.Error("launch local plugin failed: %s", err.Error())
81-
}
84+
wg.Add(1)
85+
// Fix closure issue: create local variable copy
86+
currentPlugin := plugin
87+
routine.Submit(map[string]string{
88+
"module": "plugin_manager",
89+
"function": "handleNewLocalPlugins",
90+
}, func() {
91+
// Acquire sem inside goroutine
92+
sem <- struct{}{}
93+
defer func() {
94+
if err := recover(); err != nil {
95+
log.Error("plugin launch runtime error: %v", err)
96+
}
97+
<-sem
98+
wg.Done()
99+
}()
82100

83-
// avoid receiving nil channel
84-
if errChan != nil {
85-
// consume error, avoid deadlock
86-
for err := range errChan {
87-
log.Error("plugin launch error: %s", err.Error())
101+
_, launchedChan, errChan, err := p.launchLocal(currentPlugin)
102+
if err != nil {
103+
log.Error("launch local plugin failed: %s", err.Error())
104+
return
88105
}
89-
}
90106

91-
// avoid receiving nil channel
92-
if launchedChan != nil {
93-
// wait for plugin launched
94-
<-launchedChan
95-
}
107+
// Handle error channel
108+
if errChan != nil {
109+
for err := range errChan {
110+
log.Error("plugin launch error: %s", err.Error())
111+
}
112+
}
113+
114+
// Wait for plugin to complete startup
115+
if launchedChan != nil {
116+
<-launchedChan
117+
}
118+
})
96119
}
120+
121+
// wait for all plugins to be launched
122+
wg.Wait()
97123
}
98124

99125
// an async function to remove uninstalled local plugins

0 commit comments

Comments
 (0)