Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions internal/core/plugin_manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func InitGlobalManager(oss oss.OSS, configuration *app.Config) *PluginManager {
configuration.PluginInstalledPath,
),
localPluginLaunchingLock: lock.NewGranularityLock(),
maxLaunchingLock: make(chan bool, 2), // by default, we allow 2 plugins launching at the same time
maxLaunchingLock: make(chan bool, configuration.PluginLocalLaunchingConcurrent), // by default, we allow 2 plugins launching at the same time
config: configuration,
}

Expand Down Expand Up @@ -156,7 +156,7 @@ func (p *PluginManager) Launch(configuration *app.Config) {

// start local watcher
if configuration.Platform == app.PLATFORM_LOCAL {
p.startLocalWatcher()
p.startLocalWatcher(configuration)
}

// launch serverless connector
Expand Down
50 changes: 32 additions & 18 deletions internal/core/plugin_manager/watcher.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package plugin_manager

import (
"sync"
"time"

"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/debugging_runtime"
Expand All @@ -11,12 +12,12 @@ import (
"github.com/langgenius/dify-plugin-daemon/pkg/entities/plugin_entities"
)

func (p *PluginManager) startLocalWatcher() {
func (p *PluginManager) startLocalWatcher(config *app.Config) {
go func() {
log.Info("start to handle new plugins in path: %s", p.config.PluginInstalledPath)
p.handleNewLocalPlugins()
p.handleNewLocalPlugins(config)
for range time.NewTicker(time.Second * 30).C {
p.handleNewLocalPlugins()
p.handleNewLocalPlugins(config)
p.removeUninstalledLocalPlugins()
}
}()
Expand Down Expand Up @@ -66,34 +67,47 @@ func (p *PluginManager) startRemoteWatcher(config *app.Config) {
}
}

func (p *PluginManager) handleNewLocalPlugins() {
func (p *PluginManager) handleNewLocalPlugins(config *app.Config) {
// walk through all plugins
plugins, err := p.installedBucket.List()
if err != nil {
log.Error("list installed plugins failed: %s", err.Error())
return
}

var wg sync.WaitGroup
maxConcurrency := config.PluginLocalLaunchingConcurrent
sem := make(chan struct{}, maxConcurrency)

log.Info("Launching %d plugins with max concurrency: %d", len(plugins), maxConcurrency)

for _, plugin := range plugins {
_, launchedChan, errChan, err := p.launchLocal(plugin)
if err != nil {
log.Error("launch local plugin failed: %s", err.Error())
}
wg.Add(1)
go func(plugin plugin_entities.PluginUniqueIdentifier) {
defer wg.Done()

// Acquire semaphore in child goroutine to avoid blocking main goroutine
sem <- struct{}{}
defer func() { <-sem }()

// avoid receiving nil channel
if errChan != nil {
// consume error, avoid deadlock
for err := range errChan {
log.Error("plugin launch error: %s", err.Error())
_, launchedChan, errChan, err := p.launchLocal(plugin)
if err != nil {
log.Error("launch local plugin failed: %s", err.Error())
return
}
}

// avoid receiving nil channel
if launchedChan != nil {
// wait for plugin launched
// Consume errors asynchronously to avoid blocking
go func() {
for err := range errChan {
log.Error("plugin launch error: %s", err.Error())
}
}()

// Wait for plugin to complete startup
<-launchedChan
}
}(plugin)
}
wg.Wait()
}

// an async function to remove uninstalled local plugins
Expand Down