Skip to content
Draft
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion backend/app/manage.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/grafana/grafana-plugin-sdk-go/backend/tracing"
"github.com/grafana/grafana-plugin-sdk-go/internal/automanagement"
"github.com/grafana/grafana-plugin-sdk-go/internal/buildinfo"
"github.com/grafana/grafana-plugin-sdk-go/internal/busytracking"
)

// ManageOpts can modify Manage behavior.
Expand Down Expand Up @@ -43,7 +44,9 @@ func Manage(pluginID string, instanceFactory InstanceFactoryFunc, opts ManageOpt
if err := backend.SetupTracer(pluginID, opts.TracingOpts); err != nil {
return fmt.Errorf("setup tracer: %w", err)
}
handler := automanagement.NewManager(NewInstanceManager(instanceFactory))

instanceManager := busytracking.NewManager(NewInstanceManager(instanceFactory))
handler := automanagement.NewManager(instanceManager)
return backend.Manage(pluginID, backend.ServeOpts{
CheckHealthHandler: handler,
CallResourceHandler: handler,
Expand Down
5 changes: 4 additions & 1 deletion backend/datasource/manage.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/grafana/grafana-plugin-sdk-go/backend/tracing"
"github.com/grafana/grafana-plugin-sdk-go/internal/automanagement"
"github.com/grafana/grafana-plugin-sdk-go/internal/buildinfo"
"github.com/grafana/grafana-plugin-sdk-go/internal/busytracking"
)

// ManageOpts can modify Manage behavior.
Expand Down Expand Up @@ -46,7 +47,9 @@ func Manage(pluginID string, instanceFactory InstanceFactoryFunc, opts ManageOpt
if err := backend.SetupTracer(pluginID, opts.TracingOpts); err != nil {
return fmt.Errorf("setup tracer: %w", err)
}
handler := automanagement.NewManager(NewInstanceManager(instanceFactory))

instanceManager := busytracking.NewManager(NewInstanceManager(instanceFactory))
handler := automanagement.NewManager(instanceManager)
return backend.Manage(pluginID, backend.ServeOpts{
CheckHealthHandler: handler,
CallResourceHandler: handler,
Expand Down
5 changes: 5 additions & 0 deletions backend/instancemgmt/context_aware_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,8 @@ func (c *instanceManagerWrapper) Do(ctx context.Context, pluginContext backend.P
manager := c.selectManager(ctx, pluginContext)
return manager.Do(ctx, pluginContext, fn)
}

// Provider returns the provider from the selected manager.
func (c *instanceManagerWrapper) Provider() InstanceProvider {
return c.provider
}
24 changes: 20 additions & 4 deletions backend/instancemgmt/instance_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ type InstanceDisposer interface {
Dispose()
}

// InstanceBusyChecker can check if an instance is currently busy.
type InstanceBusyChecker interface {
// Busy returns true if the instance is currently busy
Busy() bool
}

// InstanceCallbackFunc defines the callback function of the InstanceManager.Do method.
// The argument provided will of type Instance.
type InstanceCallbackFunc interface{}
Expand All @@ -52,6 +58,9 @@ type InstanceManager interface {
// If Instance is cached and not updated provides as argument to fn. If Instance is not cached or
// updated, a new Instance is created and cached before provided as argument to fn.
Do(ctx context.Context, pluginContext backend.PluginContext, fn InstanceCallbackFunc) error

// Provider returns the underlying instance provider.
Provider() InstanceProvider
}

// CachedInstance a cached Instance.
Expand All @@ -60,6 +69,11 @@ type CachedInstance struct {
instance Instance
}

// Instance returns the cached instance.
func (ci CachedInstance) Instance() Instance {
return ci.instance
}

// InstanceProvider defines an instance provider, providing instances.
type InstanceProvider interface {
// GetKey returns a cache key to be used for caching an Instance.
Expand Down Expand Up @@ -133,12 +147,10 @@ func (im *instanceManager) Get(ctx context.Context, pluginContext backend.Plugin

if disposer, valid := ci.instance.(InstanceDisposer); valid {
time.AfterFunc(im.disposeTTL, func() {
disposer.Dispose()
activeInstances.Dec()
safeDispose(ci.instance, disposer)
})
} else {
activeInstances.Dec()
}
activeInstances.Dec()
}

instance, err := im.provider.NewInstance(ctx, pluginContext)
Expand Down Expand Up @@ -168,6 +180,10 @@ func (im *instanceManager) Do(ctx context.Context, pluginContext backend.PluginC
return nil
}

func (im *instanceManager) Provider() InstanceProvider {
return im.provider
}

func callInstanceHandlerFunc(fn InstanceCallbackFunc, instance interface{}) {
var params = []reflect.Value{}
params = append(params, reflect.ValueOf(instance))
Expand Down
47 changes: 45 additions & 2 deletions backend/instancemgmt/instance_manager_ttl.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ func newTTLInstanceManager(provider InstanceProvider, instanceTTL, instanceClean

cache := gocache.New(instanceTTL, instanceCleanupInterval)

// Set up the OnEvicted callback to dispose instances
// Set up the OnEvicted callback to dispose instances safely
cache.OnEvicted(func(key string, value interface{}) {
ci := value.(CachedInstance)
if disposer, valid := ci.instance.(InstanceDisposer); valid {
disposer.Dispose()
safeDispose(ci.instance, disposer)
}
backend.Logger.Debug("Evicted instance", "key", key)
activeInstances.Dec()
Expand Down Expand Up @@ -116,8 +116,51 @@ func (im *instanceManagerWithTTL) Do(ctx context.Context, pluginContext backend.
return nil
}

func (im *instanceManagerWithTTL) Provider() InstanceProvider {
return im.provider
}

// refreshTTL updates the TTL of the cached instance by resetting its expiration time.
func (im *instanceManagerWithTTL) refreshTTL(cacheKey string, ci CachedInstance) {
// SetDefault() technically creates a new cache entry with fresh TTL, effectively extending the instance's lifetime.
im.cache.SetDefault(cacheKey, ci)
}

// safeDispose checks if an instance is busy before disposing it
// If busy, it will retry with a timeout
func safeDispose(instance Instance, disposer InstanceDisposer) {
if busyChecker, ok := instance.(InstanceBusyChecker); ok && busyChecker.Busy() {
backend.Logger.Debug("TTL instance is busy, delaying disposal")
retryDispose(instance, disposer, 30*time.Second)
return
}

backend.Logger.Debug("Disposing TTL instance")
disposer.Dispose()
}

// retryDispose waits for a TTL instance to become idle before disposing it
func retryDispose(instance Instance, disposer InstanceDisposer, timeout time.Duration) {
go func() {
deadline := time.Now().Add(timeout)
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()

for range ticker.C {
// Check if instance is still busy
if busyChecker, ok := instance.(InstanceBusyChecker); ok && busyChecker.Busy() {
// Still busy, check if we've exceeded timeout
if time.Now().After(deadline) {
backend.Logger.Warn("Timeout waiting for TTL instance to become idle, disposing anyway")
disposer.Dispose()
return
}
continue
}

backend.Logger.Debug("TTL instance became idle, proceeding with disposal")
disposer.Dispose()
return
}
}()
}
121 changes: 121 additions & 0 deletions internal/busytracking/busy_tracker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package busytracking

import (
"context"
"sync/atomic"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/backend/instancemgmt"
)

// NewManager wraps an InstanceManager with automatic busy tracking
// for all instances. This prevents disposal of instances while they're actively
// processing requests.
func NewManager(manager instancemgmt.InstanceManager) instancemgmt.InstanceManager {
return &busyTrackingInstanceManager{manager}
}

// busyTrackingInstanceManager automatically wraps all instances with busy tracking
type busyTrackingInstanceManager struct {
instancemgmt.InstanceManager
}

// Get wraps the returned instance with automatic busy tracking
func (w *busyTrackingInstanceManager) Get(ctx context.Context, pluginContext backend.PluginContext) (instancemgmt.Instance, error) {
instance, err := w.InstanceManager.Get(ctx, pluginContext)
if err != nil {
return nil, err
}

return &busyTrackingWrapper{Instance: instance}, nil
}

// busyTrackingWrapper automatically wraps any instance with busy tracking
type busyTrackingWrapper struct {
instancemgmt.Instance
isBusy int32 // Atomic busy flag
}

// SetBusy marks this instance as busy or idle
func (w *busyTrackingWrapper) SetBusy(busy bool) {
if busy {
atomic.StoreInt32(&w.isBusy, 1)
} else {
atomic.StoreInt32(&w.isBusy, 0)
}
}

// Busy returns true if this instance is currently busy
func (w *busyTrackingWrapper) Busy() bool {
return atomic.LoadInt32(&w.isBusy) == 1
}

// Dispose delegates to the wrapped instance if it supports disposal
func (w *busyTrackingWrapper) Dispose() {
if disposer, ok := w.Instance.(instancemgmt.InstanceDisposer); ok {
disposer.Dispose()
}
}

// QueryData delegates to the wrapped instance with busy tracking
func (w *busyTrackingWrapper) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
if handler, ok := w.Instance.(backend.QueryDataHandler); ok {
w.SetBusy(true)
defer w.SetBusy(false)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will this potentially overwrite the busy status for concurrent requests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes - great catch. Will patch that

return handler.QueryData(ctx, req)
}
return nil, status.Error(codes.Unimplemented, "QueryData not implemented")
}

// CheckHealth delegates to the wrapped instance with busy tracking
func (w *busyTrackingWrapper) CheckHealth(ctx context.Context, req *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) {
if handler, ok := w.Instance.(backend.CheckHealthHandler); ok {
w.SetBusy(true)
defer w.SetBusy(false)
return handler.CheckHealth(ctx, req)
}
return &backend.CheckHealthResult{Status: backend.HealthStatusUnknown, Message: "CheckHealth not implemented"}, nil
}

// CallResource delegates to the wrapped instance with busy tracking
func (w *busyTrackingWrapper) CallResource(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error {
if handler, ok := w.Instance.(backend.CallResourceHandler); ok {
w.SetBusy(true)
defer w.SetBusy(false)
return handler.CallResource(ctx, req, sender)
}
return status.Error(codes.Unimplemented, "CallResource not implemented")
}

// SubscribeStream delegates to the wrapped instance with busy tracking
func (w *busyTrackingWrapper) SubscribeStream(ctx context.Context, req *backend.SubscribeStreamRequest) (*backend.SubscribeStreamResponse, error) {
if handler, ok := w.Instance.(backend.StreamHandler); ok {
w.SetBusy(true)
defer w.SetBusy(false)
return handler.SubscribeStream(ctx, req)
}
return nil, status.Error(codes.Unimplemented, "SubscribeStream not implemented")
}

// PublishStream delegates to the wrapped instance with busy tracking
func (w *busyTrackingWrapper) PublishStream(ctx context.Context, req *backend.PublishStreamRequest) (*backend.PublishStreamResponse, error) {
if handler, ok := w.Instance.(backend.StreamHandler); ok {
w.SetBusy(true)
defer w.SetBusy(false)
return handler.PublishStream(ctx, req)
}
return nil, status.Error(codes.Unimplemented, "PublishStream not implemented")
}

// RunStream delegates to the wrapped instance with busy tracking
func (w *busyTrackingWrapper) RunStream(ctx context.Context, req *backend.RunStreamRequest, sender *backend.StreamSender) error {
if handler, ok := w.Instance.(backend.StreamHandler); ok {
w.SetBusy(true)
defer w.SetBusy(false)
return handler.RunStream(ctx, req, sender)
}
return status.Error(codes.Unimplemented, "RunStream not implemented")
}
Loading