diff --git a/control-plane/config/agentfield.yaml b/control-plane/config/agentfield.yaml index 096b388d6..447113a96 100644 --- a/control-plane/config/agentfield.yaml +++ b/control-plane/config/agentfield.yaml @@ -128,6 +128,12 @@ features: value: 10000 action: "allow" priority: 90 + tracing: + enabled: false # Enable OpenTelemetry distributed tracing export + exporter: "otlp-http" # "otlp-http" or "otlp-grpc" + endpoint: "localhost:4318" + service_name: "agentfield" + insecure: false # WARNING: Set true only for local dev; production must use TLS connector: enabled: true token: "test-connector-token-123" diff --git a/control-plane/go.mod b/control-plane/go.mod index df8e52ac8..293c9b677 100644 --- a/control-plane/go.mod +++ b/control-plane/go.mod @@ -22,7 +22,11 @@ require ( github.com/rs/zerolog v1.34.0 github.com/spf13/cobra v1.8.1 github.com/spf13/viper v1.20.1 - github.com/stretchr/testify v1.10.0 + github.com/stretchr/testify v1.11.1 + go.opentelemetry.io/otel v1.39.0 + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.32.0 + go.opentelemetry.io/otel/sdk v1.39.0 + go.opentelemetry.io/otel/trace v1.39.0 golang.org/x/crypto v0.46.0 golang.org/x/term v0.38.0 google.golang.org/grpc v1.79.3 @@ -39,6 +43,7 @@ require ( github.com/beorn7/perks v1.0.1 // indirect github.com/bytedance/sonic v1.13.2 // indirect github.com/bytedance/sonic/loader v0.2.4 // indirect + github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/charmbracelet/colorprofile v0.2.3-0.20250311203215-f60798e515dc // indirect github.com/charmbracelet/x/ansi v0.10.1 // indirect @@ -50,10 +55,13 @@ require ( github.com/erikgeiser/coninput v0.0.0-20211004153227-1c3628e74d0f // indirect github.com/gabriel-vasile/mimetype v1.4.8 // indirect github.com/gin-contrib/sse v1.0.0 // indirect + github.com/go-logr/logr v1.4.3 // indirect + github.com/go-logr/stdr v1.2.2 // indirect github.com/go-playground/locales v0.14.1 // indirect github.com/go-playground/universal-translator v0.18.1 // indirect github.com/go-viper/mapstructure/v2 v2.4.0 // indirect github.com/goccy/go-json v0.10.5 // indirect + github.com/grpc-ecosystem/grpc-gateway/v2 v2.23.0 // indirect github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect @@ -92,6 +100,10 @@ require ( github.com/twitchyliquid64/golang-asm v0.15.1 // indirect github.com/ugorji/go/codec v1.2.12 // indirect github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e // indirect + go.opentelemetry.io/auto/sdk v1.2.1 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.32.0 // indirect + go.opentelemetry.io/otel/metric v1.39.0 // indirect + go.opentelemetry.io/proto/otlp v1.3.1 // indirect go.uber.org/atomic v1.9.0 // indirect go.uber.org/multierr v1.9.0 // indirect golang.org/x/arch v0.15.0 // indirect @@ -99,6 +111,7 @@ require ( golang.org/x/sync v0.19.0 // indirect golang.org/x/sys v0.39.0 // indirect golang.org/x/text v0.32.0 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20251202230838-ff82c1b0f217 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20251202230838-ff82c1b0f217 // indirect modernc.org/gc/v3 v3.0.0-20240107210532-573471604cb6 // indirect modernc.org/libc v1.41.0 // indirect diff --git a/control-plane/go.sum b/control-plane/go.sum index 1a304ff60..8ff859d6a 100644 --- a/control-plane/go.sum +++ b/control-plane/go.sum @@ -11,6 +11,8 @@ github.com/bytedance/sonic v1.13.2/go.mod h1:o68xyaF9u2gvVBuGHPlUVCy+ZfmNNO5ETf1 github.com/bytedance/sonic/loader v0.1.1/go.mod h1:ncP89zfokxS5LZrJxl5z0UJcsk4M4yY2JpfqGeCtNLU= github.com/bytedance/sonic/loader v0.2.4 h1:ZWCw4stuXUsn1/+zQDqeE7JKP+QO47tz7QCNan80NzY= github.com/bytedance/sonic/loader v0.2.4/go.mod h1:N8A3vUdtUebEY2/VQC0MyhYeKUFosQU6FxH2JmUe6VI= +github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= +github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/charmbracelet/bubbletea v1.3.10 h1:otUDHWMMzQSB0Pkc87rm691KZ3SWa4KUlvF9nRvCICw= @@ -51,6 +53,7 @@ github.com/gin-contrib/sse v1.0.0 h1:y3bT1mUWUxDpW4JLQg/HnTqV4rozuW4tC9eFKTxYI9E github.com/gin-contrib/sse v1.0.0/go.mod h1:zNuFdwarAygJBht0NTKiSi3jRf6RbqeILZ9Sp6Slhe0= github.com/gin-gonic/gin v1.10.1 h1:T0ujvqyCSqRopADpgPgiTT63DUQVSfojyME59Ei63pQ= github.com/gin-gonic/gin v1.10.1/go.mod h1:4PMNQiOhvDRa013RKVbsiNwoyezlm2rm0uX/T7kzp5Y= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= @@ -81,6 +84,8 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.23.0 h1:ad0vkEBuk23VJzZR9nkLVG0YAoN9coASF1GusYX6AlU= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.23.0/go.mod h1:igFoXX2ELCW06bol23DWPB5BEWfZISOzSP5K2sbLea0= github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= @@ -158,8 +163,8 @@ github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qq github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ= github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= -github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= -github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= +github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ= +github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc= github.com/rs/xid v1.6.0/go.mod h1:7XoLgs4eV+QndskICGsho+ADou8ySMSjJKDIan90Nz0= github.com/rs/zerolog v1.34.0 h1:k43nTLIwcTVQAncfCw4KZ2VY6ukYoZaBPNOE8txlOeY= github.com/rs/zerolog v1.34.0/go.mod h1:bJsvje4Z08ROH4Nhs5iH600c3IkWhwp44iRc54W6wYQ= @@ -190,8 +195,9 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= -github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8= github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU= github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI= @@ -204,6 +210,10 @@ go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y= go.opentelemetry.io/otel v1.39.0 h1:8yPrr/S0ND9QEfTfdP9V+SiwT4E0G7Y5MO7p85nis48= go.opentelemetry.io/otel v1.39.0/go.mod h1:kLlFTywNWrFyEdH0oj2xK0bFYZtHRYUdv1NklR/tgc8= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.32.0 h1:IJFEoHiytixx8cMiVAO+GmHR6Frwu+u5Ur8njpFO6Ac= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.32.0/go.mod h1:3rHrKNtLIoS0oZwkY2vxi+oJcwFRWdtUyRII+so45p8= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.32.0 h1:cMyu9O88joYEaI47CnQkxO1XZdpoTF9fEnW2duIddhw= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.32.0/go.mod h1:6Am3rn7P9TVVeXYG+wtcGE7IE1tsQ+bP3AuWcKt/gOI= go.opentelemetry.io/otel/metric v1.39.0 h1:d1UzonvEZriVfpNKEVmHXbdf909uGTOQjA0HF0Ls5Q0= go.opentelemetry.io/otel/metric v1.39.0/go.mod h1:jrZSWL33sD7bBxg1xjrqyDjnuzTUB0x1nBERXd7Ftcs= go.opentelemetry.io/otel/sdk v1.39.0 h1:nMLYcjVsvdui1B/4FRkwjzoRVsMK8uL/cj0OyhKzt18= @@ -212,8 +222,12 @@ go.opentelemetry.io/otel/sdk/metric v1.39.0 h1:cXMVVFVgsIf2YL6QkRF4Urbr/aMInf+2W go.opentelemetry.io/otel/sdk/metric v1.39.0/go.mod h1:xq9HEVH7qeX69/JnwEfp6fVq5wosJsY1mt4lLfYdVew= go.opentelemetry.io/otel/trace v1.39.0 h1:2d2vfpEDmCJ5zVYz7ijaJdOF59xLomrvj7bjt6/qCJI= go.opentelemetry.io/otel/trace v1.39.0/go.mod h1:88w4/PnZSazkGzz/w84VHpQafiU4EtqqlVdxWy+rNOA= +go.opentelemetry.io/proto/otlp v1.3.1 h1:TrMUixzpM0yuc/znrFTP9MMRh8trP93mkCiDVeXrui0= +go.opentelemetry.io/proto/otlp v1.3.1/go.mod h1:0X1WI4de4ZsLrrJNLAQbFeLCm3T7yBkR0XqQ7niQU+8= go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/multierr v1.9.0 h1:7fIwc/ZtS0q++VgcfqFDxSBZVv/Xo49/SYnDFupUwlI= go.uber.org/multierr v1.9.0/go.mod h1:X2jQV1h+kxSjClGpnseKVIxpmcjrj7MNnI0bnlfKTVQ= golang.org/x/arch v0.15.0 h1:QtOrQd0bTUnhNVNndMpLHNWrDmYzZ2KDqSrEymqInZw= @@ -242,6 +256,8 @@ golang.org/x/tools v0.39.0 h1:ik4ho21kwuQln40uelmciQPp9SipgNDdrafrYA4TmQQ= golang.org/x/tools v0.39.0/go.mod h1:JnefbkDPyD8UU2kI5fuf8ZX4/yUeh9W877ZeBONxUqQ= gonum.org/v1/gonum v0.16.0 h1:5+ul4Swaf3ESvrOnidPp4GZbzf0mxVQpDCYUQE7OJfk= gonum.org/v1/gonum v0.16.0/go.mod h1:fef3am4MQ93R2HHpKnLk4/Tbh/s0+wqD5nfa6Pnwy4E= +google.golang.org/genproto/googleapis/api v0.0.0-20251202230838-ff82c1b0f217 h1:fCvbg86sFXwdrl5LgVcTEvNC+2txB5mgROGmRL5mrls= +google.golang.org/genproto/googleapis/api v0.0.0-20251202230838-ff82c1b0f217/go.mod h1:+rXWjjaukWZun3mLfjmVnQi18E1AsFbDN9QdJ5YXLto= google.golang.org/genproto/googleapis/rpc v0.0.0-20251202230838-ff82c1b0f217 h1:gRkg/vSppuSQoDjxyiGfN4Upv/h/DQmIR10ZU8dh4Ww= google.golang.org/genproto/googleapis/rpc v0.0.0-20251202230838-ff82c1b0f217/go.mod h1:7i2o+ce6H/6BluujYR+kqX3GKH+dChPTQU19wjRPiGk= google.golang.org/grpc v1.79.3 h1:sybAEdRIEtvcD68Gx7dmnwjZKlyfuc61Dyo9pGXXkKE= diff --git a/control-plane/internal/config/config.go b/control-plane/internal/config/config.go index 837f87b94..8e9f3e0a0 100644 --- a/control-plane/internal/config/config.go +++ b/control-plane/internal/config/config.go @@ -162,6 +162,16 @@ type LLMEndpoint struct { type FeatureConfig struct { DID DIDConfig `yaml:"did" mapstructure:"did"` Connector ConnectorConfig `yaml:"connector" mapstructure:"connector"` + Tracing TracingConfig `yaml:"tracing" mapstructure:"tracing"` +} + +// TracingConfig holds configuration for OpenTelemetry distributed tracing. +type TracingConfig struct { + Enabled bool `yaml:"enabled" mapstructure:"enabled"` // Enable OTel trace export (default: false) + Exporter string `yaml:"exporter" mapstructure:"exporter"` // "otlp-http" (default) or "otlp-grpc" + Endpoint string `yaml:"endpoint" mapstructure:"endpoint"` // OTLP endpoint (default: "localhost:4318") + ServiceName string `yaml:"service_name" mapstructure:"service_name"` // Service name for traces (default: "agentfield") + Insecure bool `yaml:"insecure" mapstructure:"insecure"` // Skip TLS verification } // ConnectorConfig holds configuration for the connector service integration. @@ -509,6 +519,21 @@ func ApplyEnvOverrides(cfg *Config) { } } + // OpenTelemetry tracing overrides (also supports standard OTEL_* env vars) + if val := os.Getenv("AGENTFIELD_TRACING_ENABLED"); val != "" { + cfg.Features.Tracing.Enabled = val == "true" || val == "1" + } + if val := os.Getenv("OTEL_EXPORTER_OTLP_ENDPOINT"); val != "" { + cfg.Features.Tracing.Endpoint = val + cfg.Features.Tracing.Enabled = true + } + if val := os.Getenv("OTEL_SERVICE_NAME"); val != "" { + cfg.Features.Tracing.ServiceName = val + } + if val := os.Getenv("AGENTFIELD_TRACING_INSECURE"); val != "" { + cfg.Features.Tracing.Insecure = val == "true" || val == "1" + } + // Connector overrides if val := os.Getenv("AGENTFIELD_CONNECTOR_ENABLED"); val != "" { cfg.Features.Connector.Enabled = val == "true" || val == "1" diff --git a/control-plane/internal/observability/execution_tracer.go b/control-plane/internal/observability/execution_tracer.go new file mode 100644 index 000000000..b8a436e9e --- /dev/null +++ b/control-plane/internal/observability/execution_tracer.go @@ -0,0 +1,210 @@ +package observability + +import ( + "context" + "fmt" + "sync" + + "github.com/Agent-Field/agentfield/control-plane/internal/events" + "github.com/Agent-Field/agentfield/control-plane/internal/logger" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" +) + +// ExecutionTracer subscribes to the execution and reasoner event buses +// and translates events into OpenTelemetry spans. +type ExecutionTracer struct { + tracer *Tracer + + mu sync.Mutex + spans map[string]trace.Span // keyed by execution_id + ctxs map[string]context.Context + + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup +} + +// NewExecutionTracer creates a new execution tracer that bridges events to OTel spans. +func NewExecutionTracer(tracer *Tracer) *ExecutionTracer { + return &ExecutionTracer{ + tracer: tracer, + spans: make(map[string]trace.Span), + ctxs: make(map[string]context.Context), + } +} + +// Start subscribes to event buses and begins translating events to spans. +func (et *ExecutionTracer) Start(ctx context.Context) { + et.ctx, et.cancel = context.WithCancel(ctx) + + et.wg.Add(2) + go et.subscribeExecutionEvents() + go et.subscribeReasonerEvents() + + logger.Logger.Info().Msg("execution tracer started") +} + +// Stop unsubscribes from event buses and ends any open spans. +func (et *ExecutionTracer) Stop() { + if et.cancel != nil { + et.cancel() + } + et.wg.Wait() + + // End any remaining open spans + et.mu.Lock() + for id, span := range et.spans { + span.SetAttributes(attribute.String("agentfield.execution.end_reason", "tracer_shutdown")) + span.End() + delete(et.spans, id) + delete(et.ctxs, id) + } + et.mu.Unlock() + + logger.Logger.Info().Msg("execution tracer stopped") +} + +func (et *ExecutionTracer) subscribeExecutionEvents() { + defer et.wg.Done() + + subscriberID := "otel-execution-tracer" + ch := events.GlobalExecutionEventBus.Subscribe(subscriberID) + defer events.GlobalExecutionEventBus.Unsubscribe(subscriberID) + + for { + select { + case <-et.ctx.Done(): + return + case event, ok := <-ch: + if !ok { + return + } + et.handleExecutionEvent(event) + } + } +} + +func (et *ExecutionTracer) subscribeReasonerEvents() { + defer et.wg.Done() + + subscriberID := "otel-reasoner-tracer" + ch := events.GlobalReasonerEventBus.Subscribe(subscriberID) + defer events.GlobalReasonerEventBus.Unsubscribe(subscriberID) + + for { + select { + case <-et.ctx.Done(): + return + case event, ok := <-ch: + if !ok { + return + } + et.handleReasonerEvent(event) + } + } +} + +func (et *ExecutionTracer) handleExecutionEvent(event events.ExecutionEvent) { + switch event.Type { + case events.ExecutionCreated, events.ExecutionStarted: + et.startExecution(event) + case events.ExecutionCompleted: + et.endExecution(event, false) + case events.ExecutionFailed, events.ExecutionCancelledEvent: + et.endExecution(event, true) + case events.ExecutionUpdated, events.ExecutionWaiting, events.ExecutionPaused, events.ExecutionResumed: + et.addExecutionEvent(event) + } +} + +func (et *ExecutionTracer) handleReasonerEvent(event events.ReasonerEvent) { + if event.Type == events.Heartbeat { + return + } + + et.mu.Lock() + parentCtx, exists := et.ctxs[event.NodeID] + et.mu.Unlock() + + if !exists { + // No parent execution span; create a standalone reasoner span + parentCtx = et.ctx + } + + _, span := et.tracer.StartStepSpan(parentCtx, "reasoner", event.ReasonerID, event.NodeID) + span.SetAttributes( + attribute.String("agentfield.reasoner.status", event.Status), + attribute.String("agentfield.event.type", string(event.Type)), + ) + + // Reasoner events are point-in-time; end the span immediately. + span.End() +} + +func (et *ExecutionTracer) startExecution(event events.ExecutionEvent) { + et.mu.Lock() + defer et.mu.Unlock() + + if _, exists := et.spans[event.ExecutionID]; exists { + return // already tracking + } + + ctx, span := et.tracer.StartExecutionSpan(et.ctx, event.ExecutionID, event.WorkflowID, event.AgentNodeID) + span.SetAttributes( + attribute.String("agentfield.execution.status", event.Status), + ) + + et.spans[event.ExecutionID] = span + et.ctxs[event.ExecutionID] = ctx + + logger.Logger.Debug(). + Str("execution_id", event.ExecutionID). + Msg("started OTel execution span") +} + +func (et *ExecutionTracer) endExecution(event events.ExecutionEvent, isError bool) { + et.mu.Lock() + span, exists := et.spans[event.ExecutionID] + if exists { + delete(et.spans, event.ExecutionID) + delete(et.ctxs, event.ExecutionID) + } + et.mu.Unlock() + + if !exists { + return + } + + span.SetAttributes( + attribute.String("agentfield.execution.final_status", event.Status), + ) + + if isError { + span.SetStatus(2, fmt.Sprintf("execution %s: %s", event.Type, event.Status)) + } + + span.End() + + logger.Logger.Debug(). + Str("execution_id", event.ExecutionID). + Bool("error", isError). + Msg("ended OTel execution span") +} + +func (et *ExecutionTracer) addExecutionEvent(event events.ExecutionEvent) { + et.mu.Lock() + span, exists := et.spans[event.ExecutionID] + et.mu.Unlock() + + if !exists { + return + } + + span.AddEvent(string(event.Type), + trace.WithAttributes( + attribute.String("agentfield.execution.status", event.Status), + ), + ) +} diff --git a/control-plane/internal/observability/execution_tracer_test.go b/control-plane/internal/observability/execution_tracer_test.go new file mode 100644 index 000000000..1a27c2e19 --- /dev/null +++ b/control-plane/internal/observability/execution_tracer_test.go @@ -0,0 +1,166 @@ +package observability + +import ( + "context" + "testing" + "time" + + "github.com/Agent-Field/agentfield/control-plane/internal/events" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func newTestTracer(t *testing.T) (*Tracer, func()) { + t.Helper() + cfg := TracerConfig{ + Enabled: true, + Endpoint: "localhost:4318", + ServiceName: "test-agentfield", + Insecure: true, + } + tracer, shutdown, err := InitTracer(context.Background(), cfg) + require.NoError(t, err) + require.NotNil(t, tracer) + return tracer, func() { shutdown(context.Background()) } //nolint:errcheck +} + +// waitForSubscription gives goroutines time to subscribe to event buses. +func waitForSubscription() { + time.Sleep(50 * time.Millisecond) +} + +// waitForEvent gives the subscriber goroutine time to process an event. +func waitForEvent() { + time.Sleep(50 * time.Millisecond) +} + +func TestExecutionTracer_StartStop(t *testing.T) { + tracer, cleanup := newTestTracer(t) + defer cleanup() + + et := NewExecutionTracer(tracer) + require.NotNil(t, et) + + et.Start(context.Background()) + waitForSubscription() + + // Verify it subscribed to event buses + assert.Greater(t, events.GlobalExecutionEventBus.GetSubscriberCount(), 0) + assert.Greater(t, events.GlobalReasonerEventBus.GetSubscriberCount(), 0) + + et.Stop() +} + +func TestExecutionTracer_HandlesExecutionLifecycle(t *testing.T) { + tracer, cleanup := newTestTracer(t) + defer cleanup() + + et := NewExecutionTracer(tracer) + et.Start(context.Background()) + waitForSubscription() + defer et.Stop() + + // Simulate execution lifecycle via event bus + events.PublishExecutionCreated("exec-1", "wf-1", "node-1", nil) + waitForEvent() + + // Verify span is tracked + et.mu.Lock() + _, exists := et.spans["exec-1"] + et.mu.Unlock() + assert.True(t, exists, "execution span should be tracked after creation") + + // Complete the execution + events.PublishExecutionCompleted("exec-1", "wf-1", "node-1", nil) + waitForEvent() + + // Verify span is ended and removed + et.mu.Lock() + _, exists = et.spans["exec-1"] + et.mu.Unlock() + assert.False(t, exists, "execution span should be removed after completion") +} + +func TestExecutionTracer_HandlesFailedExecution(t *testing.T) { + tracer, cleanup := newTestTracer(t) + defer cleanup() + + et := NewExecutionTracer(tracer) + et.Start(context.Background()) + waitForSubscription() + defer et.Stop() + + events.PublishExecutionStarted("exec-fail", "wf-1", "node-1", nil) + waitForEvent() + + et.mu.Lock() + _, exists := et.spans["exec-fail"] + et.mu.Unlock() + assert.True(t, exists) + + events.PublishExecutionFailed("exec-fail", "wf-1", "node-1", map[string]interface{}{"error": "timeout"}) + waitForEvent() + + et.mu.Lock() + _, exists = et.spans["exec-fail"] + et.mu.Unlock() + assert.False(t, exists, "failed execution span should be cleaned up") +} + +func TestExecutionTracer_DuplicateCreatedIgnored(t *testing.T) { + tracer, cleanup := newTestTracer(t) + defer cleanup() + + et := NewExecutionTracer(tracer) + et.Start(context.Background()) + waitForSubscription() + defer et.Stop() + + events.PublishExecutionCreated("exec-dup", "wf-1", "node-1", nil) + waitForEvent() + + et.mu.Lock() + span1 := et.spans["exec-dup"] + et.mu.Unlock() + require.NotNil(t, span1, "span should exist after first creation event") + + // Publish another created event for the same execution + events.PublishExecutionCreated("exec-dup", "wf-1", "node-1", nil) + waitForEvent() + + et.mu.Lock() + span2 := et.spans["exec-dup"] + et.mu.Unlock() + + assert.Equal(t, span1, span2, "duplicate creation should not replace existing span") + + // Cleanup + events.PublishExecutionCompleted("exec-dup", "wf-1", "node-1", nil) + waitForEvent() +} + +func TestExecutionTracer_StopEndsOpenSpans(t *testing.T) { + tracer, cleanup := newTestTracer(t) + defer cleanup() + + et := NewExecutionTracer(tracer) + et.Start(context.Background()) + waitForSubscription() + + events.PublishExecutionCreated("exec-open", "wf-1", "node-1", nil) + waitForEvent() + + et.mu.Lock() + _, exists := et.spans["exec-open"] + et.mu.Unlock() + assert.True(t, exists) + + // Stop should end all open spans + et.Stop() + + et.mu.Lock() + count := len(et.spans) + et.mu.Unlock() + assert.Equal(t, 0, count, "all spans should be cleaned up after Stop") +} diff --git a/control-plane/internal/observability/tracer.go b/control-plane/internal/observability/tracer.go new file mode 100644 index 000000000..2580abce4 --- /dev/null +++ b/control-plane/internal/observability/tracer.go @@ -0,0 +1,126 @@ +package observability + +import ( + "context" + "fmt" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp" + "go.opentelemetry.io/otel/sdk/resource" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + semconv "go.opentelemetry.io/otel/semconv/v1.26.0" + "go.opentelemetry.io/otel/trace" +) + +const ( + instrumentationName = "github.com/Agent-Field/agentfield/control-plane" +) + +// TracerConfig holds configuration for the OTel tracer. +type TracerConfig struct { + Enabled bool `yaml:"enabled" mapstructure:"enabled"` + Exporter string `yaml:"exporter" mapstructure:"exporter"` // "otlp-http" or "otlp-grpc" + Endpoint string `yaml:"endpoint" mapstructure:"endpoint"` // e.g. "localhost:4318" + ServiceName string `yaml:"service_name" mapstructure:"service_name"` // defaults to "agentfield" + Insecure bool `yaml:"insecure" mapstructure:"insecure"` // skip TLS verification +} + +// Tracer wraps the OTel tracer and provides AgentField-specific span helpers. +type Tracer struct { + tracer trace.Tracer + provider *sdktrace.TracerProvider +} + +// InitTracer creates and registers an OTel TracerProvider with an OTLP HTTP exporter. +// Returns a Tracer that can create spans, and a shutdown function to flush on exit. +func InitTracer(ctx context.Context, cfg TracerConfig) (*Tracer, func(context.Context) error, error) { + if !cfg.Enabled { + return nil, func(context.Context) error { return nil }, nil + } + + serviceName := cfg.ServiceName + if serviceName == "" { + serviceName = "agentfield" + } + + endpoint := cfg.Endpoint + if endpoint == "" { + endpoint = "localhost:4318" + } + + opts := []otlptracehttp.Option{ + otlptracehttp.WithEndpoint(endpoint), + } + if cfg.Insecure { + opts = append(opts, otlptracehttp.WithInsecure()) + } + + exporter, err := otlptracehttp.New(ctx, opts...) + if err != nil { + return nil, nil, fmt.Errorf("create OTLP trace exporter: %w", err) + } + + res, err := resource.New(ctx, + resource.WithAttributes( + semconv.ServiceName(serviceName), + attribute.String("agentfield.component", "control-plane"), + ), + ) + if err != nil { + return nil, nil, fmt.Errorf("create OTel resource: %w", err) + } + + provider := sdktrace.NewTracerProvider( + sdktrace.WithBatcher(exporter), + sdktrace.WithResource(res), + ) + + otel.SetTracerProvider(provider) + + tracer := provider.Tracer(instrumentationName) + + t := &Tracer{ + tracer: tracer, + provider: provider, + } + + return t, provider.Shutdown, nil +} + +// StartExecutionSpan creates a root span for an execution workflow. +func (t *Tracer) StartExecutionSpan(ctx context.Context, executionID, workflowID, agentNodeID string) (context.Context, trace.Span) { + ctx, span := t.tracer.Start(ctx, "execution", + trace.WithSpanKind(trace.SpanKindInternal), + trace.WithAttributes( + attribute.String("agentfield.execution.id", executionID), + attribute.String("agentfield.workflow.id", workflowID), + attribute.String("agentfield.agent.node_id", agentNodeID), + ), + ) + return ctx, span +} + +// StartStepSpan creates a child span for an individual execution step (reasoner or skill invocation). +func (t *Tracer) StartStepSpan(ctx context.Context, stepType, stepID, agentNodeID string) (context.Context, trace.Span) { + ctx, span := t.tracer.Start(ctx, fmt.Sprintf("step.%s", stepType), + trace.WithSpanKind(trace.SpanKindInternal), + trace.WithAttributes( + attribute.String("agentfield.step.type", stepType), + attribute.String("agentfield.step.id", stepID), + attribute.String("agentfield.agent.node_id", agentNodeID), + ), + ) + return ctx, span +} + +// RecordError adds an error event to the span and sets its status to error. +func (t *Tracer) RecordError(span trace.Span, err error) { + span.RecordError(err) + span.SetStatus(2, err.Error()) // codes.Error = 2 +} + +// Enabled returns true if tracing is initialized. +func (t *Tracer) Enabled() bool { + return t != nil && t.tracer != nil +} diff --git a/control-plane/internal/observability/tracer_test.go b/control-plane/internal/observability/tracer_test.go new file mode 100644 index 000000000..4ba6ba5be --- /dev/null +++ b/control-plane/internal/observability/tracer_test.go @@ -0,0 +1,103 @@ +package observability + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestInitTracer_Disabled(t *testing.T) { + cfg := TracerConfig{Enabled: false} + tracer, shutdown, err := InitTracer(context.Background(), cfg) + + require.NoError(t, err) + assert.Nil(t, tracer) + assert.NotNil(t, shutdown) + + // Shutdown should be a no-op when disabled + err = shutdown(context.Background()) + assert.NoError(t, err) +} + +func TestTracer_Enabled(t *testing.T) { + var tracer *Tracer + assert.False(t, tracer.Enabled(), "nil tracer should not be enabled") + + tracer = &Tracer{} + assert.False(t, tracer.Enabled(), "tracer with nil inner tracer should not be enabled") +} + +func TestTracerConfig_Defaults(t *testing.T) { + cfg := TracerConfig{ + Enabled: true, + // All other fields left empty to test defaults + } + + // InitTracer will fail to connect since there's no OTLP endpoint, + // but it should not error on creation (connection is lazy). + tracer, shutdown, err := InitTracer(context.Background(), cfg) + require.NoError(t, err) + require.NotNil(t, tracer) + require.NotNil(t, shutdown) + + assert.True(t, tracer.Enabled()) + + // Clean up + err = shutdown(context.Background()) + assert.NoError(t, err) +} + +func TestStartExecutionSpan(t *testing.T) { + cfg := TracerConfig{ + Enabled: true, + Endpoint: "localhost:4318", + ServiceName: "test-agentfield", + Insecure: true, + } + + tracer, shutdown, err := InitTracer(context.Background(), cfg) + require.NoError(t, err) + require.NotNil(t, tracer) + defer shutdown(context.Background()) //nolint:errcheck + + ctx, span := tracer.StartExecutionSpan(context.Background(), "exec-123", "wf-456", "node-789") + assert.NotNil(t, ctx) + assert.NotNil(t, span) + assert.True(t, span.SpanContext().IsValid()) + assert.True(t, span.SpanContext().HasTraceID()) + assert.True(t, span.SpanContext().HasSpanID()) + span.End() +} + +func TestStartStepSpan(t *testing.T) { + cfg := TracerConfig{ + Enabled: true, + Endpoint: "localhost:4318", + ServiceName: "test-agentfield", + Insecure: true, + } + + tracer, shutdown, err := InitTracer(context.Background(), cfg) + require.NoError(t, err) + require.NotNil(t, tracer) + defer shutdown(context.Background()) //nolint:errcheck + + // Create a parent execution span first + ctx, parentSpan := tracer.StartExecutionSpan(context.Background(), "exec-123", "wf-456", "node-789") + defer parentSpan.End() + + // Create a child step span + _, childSpan := tracer.StartStepSpan(ctx, "reasoner", "reasoner-abc", "node-789") + assert.NotNil(t, childSpan) + assert.True(t, childSpan.SpanContext().IsValid()) + + // Verify parent-child relationship via trace ID + assert.Equal(t, parentSpan.SpanContext().TraceID(), childSpan.SpanContext().TraceID(), + "child span should share the same trace ID as parent") + assert.NotEqual(t, parentSpan.SpanContext().SpanID(), childSpan.SpanContext().SpanID(), + "child span should have a different span ID") + + childSpan.End() +} diff --git a/control-plane/internal/server/server.go b/control-plane/internal/server/server.go index 5d05d5ac4..6d9be9026 100644 --- a/control-plane/internal/server/server.go +++ b/control-plane/internal/server/server.go @@ -30,6 +30,7 @@ import ( "github.com/Agent-Field/agentfield/control-plane/internal/infrastructure/process" infrastorage "github.com/Agent-Field/agentfield/control-plane/internal/infrastructure/storage" "github.com/Agent-Field/agentfield/control-plane/internal/logger" + "github.com/Agent-Field/agentfield/control-plane/internal/observability" "github.com/Agent-Field/agentfield/control-plane/internal/server/apicatalog" // API catalog "github.com/Agent-Field/agentfield/control-plane/internal/server/knowledgebase" // Knowledge base "github.com/Agent-Field/agentfield/control-plane/internal/server/middleware" @@ -85,6 +86,8 @@ type AgentFieldServer struct { adminGRPCPort int webhookDispatcher services.WebhookDispatcher observabilityForwarder services.ObservabilityForwarder + executionTracer *observability.ExecutionTracer + tracerShutdown func(context.Context) error configMu sync.RWMutex // Agentic API apiCatalog *apicatalog.Catalog @@ -398,6 +401,29 @@ func NewAgentFieldServer(cfg *config.Config) (*AgentFieldServer, error) { logger.Logger.Warn().Err(err).Msg("failed to start observability forwarder") } + // Initialize OpenTelemetry distributed tracing + var executionTracer *observability.ExecutionTracer + var tracerShutdown func(context.Context) error + if cfg.Features.Tracing.Enabled { + tracer, shutdown, err := observability.InitTracer(context.Background(), observability.TracerConfig{ + Enabled: cfg.Features.Tracing.Enabled, + Exporter: cfg.Features.Tracing.Exporter, + Endpoint: cfg.Features.Tracing.Endpoint, + ServiceName: cfg.Features.Tracing.ServiceName, + Insecure: cfg.Features.Tracing.Insecure, + }) + if err != nil { + logger.Logger.Warn().Err(err).Msg("failed to initialize OTel tracer") + } else if tracer != nil { + executionTracer = observability.NewExecutionTracer(tracer) + tracerShutdown = shutdown + logger.Logger.Info(). + Str("endpoint", cfg.Features.Tracing.Endpoint). + Str("service_name", cfg.Features.Tracing.ServiceName). + Msg("OpenTelemetry tracing enabled") + } + } + // Initialize LLM health monitor var llmHealthMonitor *services.LLMHealthMonitor if cfg.AgentField.LLMHealth.Enabled && len(cfg.AgentField.LLMHealth.Endpoints) > 0 { @@ -449,6 +475,8 @@ func NewAgentFieldServer(cfg *config.Config) (*AgentFieldServer, error) { payloadStore: payloadStore, webhookDispatcher: webhookDispatcher, observabilityForwarder: observabilityForwarder, + executionTracer: executionTracer, + tracerShutdown: tracerShutdown, registryWatcherCancel: nil, adminGRPCPort: adminPort, apiCatalog: initAPICatalog(), @@ -527,6 +555,11 @@ func (s *AgentFieldServer) Start() error { // Don't fail server startup if cleanup service fails to start } + // Start OpenTelemetry execution tracer in background + if s.executionTracer != nil { + s.executionTracer.Start(ctx) + } + // Start reasoner event heartbeat (30 second intervals) events.StartHeartbeat(30 * time.Second) @@ -656,6 +689,18 @@ func (s *AgentFieldServer) Stop() error { } } + // Stop OpenTelemetry execution tracer and flush remaining spans + if s.executionTracer != nil { + s.executionTracer.Stop() + } + if s.tracerShutdown != nil { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if err := s.tracerShutdown(ctx); err != nil { + logger.Logger.Error().Err(err).Msg("Failed to shutdown OTel tracer provider") + } + } + // TODO: Implement graceful shutdown for HTTP, WebSocket, gRPC return nil }