Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions internal/core/plugin_manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,9 @@ func InitGlobalManager(oss oss.OSS, configuration *app.Config) *PluginManager {
configuration.PluginInstalledPath,
),
localPluginLaunchingLock: lock.NewGranularityLock(),
maxLaunchingLock: make(chan bool, 2), // by default, we allow 2 plugins launching at the same time
config: configuration,
// By default, we allow up to configuration.PluginLocalLaunchingConcurrent plugins to be launched concurrently; if not configured, the default is 2.
maxLaunchingLock: make(chan bool, configuration.PluginLocalLaunchingConcurrent),
config: configuration,
}

return manager
Expand Down Expand Up @@ -156,7 +157,7 @@ func (p *PluginManager) Launch(configuration *app.Config) {

// start local watcher
if configuration.Platform == app.PLATFORM_LOCAL {
p.startLocalWatcher()
p.startLocalWatcher(configuration)
}

// launch serverless connector
Expand Down
57 changes: 39 additions & 18 deletions internal/core/plugin_manager/watcher.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package plugin_manager

import (
"sync"
"time"

"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/debugging_runtime"
Expand All @@ -11,12 +12,12 @@ import (
"github.com/langgenius/dify-plugin-daemon/pkg/entities/plugin_entities"
)

func (p *PluginManager) startLocalWatcher() {
func (p *PluginManager) startLocalWatcher(config *app.Config) {
go func() {
log.Info("start to handle new plugins in path: %s", p.config.PluginInstalledPath)
p.handleNewLocalPlugins()
p.handleNewLocalPlugins(config)
for range time.NewTicker(time.Second * 30).C {
p.handleNewLocalPlugins()
p.handleNewLocalPlugins(config)
p.removeUninstalledLocalPlugins()
}
}()
Expand Down Expand Up @@ -66,34 +67,54 @@ func (p *PluginManager) startRemoteWatcher(config *app.Config) {
}
}

func (p *PluginManager) handleNewLocalPlugins() {
func (p *PluginManager) handleNewLocalPlugins(config *app.Config) {
// walk through all plugins
plugins, err := p.installedBucket.List()
if err != nil {
log.Error("list installed plugins failed: %s", err.Error())
return
}

var wg sync.WaitGroup
maxConcurrency := config.PluginLocalLaunchingConcurrent

log.Info("Launching %d plugins with max concurrency: %d", len(plugins), maxConcurrency)

for _, plugin := range plugins {
_, launchedChan, errChan, err := p.launchLocal(plugin)
if err != nil {
log.Error("launch local plugin failed: %s", err.Error())
}
wg.Add(1)
routine.Submit(map[string]string{
"module": "plugin_manager",
"function": "handleNewLocalPlugins",
}, func() {
defer func() {
if err := recover(); err != nil {
log.Error("plugin launch runtime error: %v", err)
}
wg.Done()
}()

_, launchedChan, errChan, err := p.launchLocal(plugin)
if err != nil {
log.Error("launch local plugin failed: %s", err.Error())
return
}

// avoid receiving nil channel
if errChan != nil {
// consume error, avoid deadlock
for err := range errChan {
log.Error("plugin launch error: %s", err.Error())
if errChan != nil {
for err := range errChan {
log.Error("plugin launch error: %s", err.Error())
}
}
}

// avoid receiving nil channel
if launchedChan != nil {
// wait for plugin launched
<-launchedChan
}
// Wait for plugin to complete startup
if launchedChan != nil {
<-launchedChan
}
})
}

// wait for all plugins to be launched
wg.Wait()
}

// an async function to remove uninstalled local plugins
Expand Down