Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions control-plane/config/agentfield.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 14 additions & 1 deletion control-plane/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@ require (
github.com/rs/zerolog v1.34.0
github.com/spf13/cobra v1.8.1
github.com/spf13/viper v1.20.1
github.com/stretchr/testify v1.10.0
github.com/stretchr/testify v1.11.1
go.opentelemetry.io/otel v1.39.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.32.0
go.opentelemetry.io/otel/sdk v1.39.0
go.opentelemetry.io/otel/trace v1.39.0
golang.org/x/crypto v0.46.0
golang.org/x/term v0.38.0
google.golang.org/grpc v1.79.3
Expand All @@ -39,6 +43,7 @@ require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/bytedance/sonic v1.13.2 // indirect
github.com/bytedance/sonic/loader v0.2.4 // indirect
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/charmbracelet/colorprofile v0.2.3-0.20250311203215-f60798e515dc // indirect
github.com/charmbracelet/x/ansi v0.10.1 // indirect
Expand All @@ -50,10 +55,13 @@ require (
github.com/erikgeiser/coninput v0.0.0-20211004153227-1c3628e74d0f // indirect
github.com/gabriel-vasile/mimetype v1.4.8 // indirect
github.com/gin-contrib/sse v1.0.0 // indirect
github.com/go-logr/logr v1.4.3 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-playground/locales v0.14.1 // indirect
github.com/go-playground/universal-translator v0.18.1 // indirect
github.com/go-viper/mapstructure/v2 v2.4.0 // indirect
github.com/goccy/go-json v0.10.5 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.23.0 // indirect
github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
Expand Down Expand Up @@ -92,13 +100,18 @@ require (
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
github.com/ugorji/go/codec v1.2.12 // indirect
github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e // indirect
go.opentelemetry.io/auto/sdk v1.2.1 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.32.0 // indirect
go.opentelemetry.io/otel/metric v1.39.0 // indirect
go.opentelemetry.io/proto/otlp v1.3.1 // indirect
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.9.0 // indirect
golang.org/x/arch v0.15.0 // indirect
golang.org/x/net v0.48.0 // indirect
golang.org/x/sync v0.19.0 // indirect
golang.org/x/sys v0.39.0 // indirect
golang.org/x/text v0.32.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20251202230838-ff82c1b0f217 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20251202230838-ff82c1b0f217 // indirect
modernc.org/gc/v3 v3.0.0-20240107210532-573471604cb6 // indirect
modernc.org/libc v1.41.0 // indirect
Expand Down
22 changes: 19 additions & 3 deletions control-plane/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ github.com/bytedance/sonic v1.13.2/go.mod h1:o68xyaF9u2gvVBuGHPlUVCy+ZfmNNO5ETf1
github.com/bytedance/sonic/loader v0.1.1/go.mod h1:ncP89zfokxS5LZrJxl5z0UJcsk4M4yY2JpfqGeCtNLU=
github.com/bytedance/sonic/loader v0.2.4 h1:ZWCw4stuXUsn1/+zQDqeE7JKP+QO47tz7QCNan80NzY=
github.com/bytedance/sonic/loader v0.2.4/go.mod h1:N8A3vUdtUebEY2/VQC0MyhYeKUFosQU6FxH2JmUe6VI=
github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=
github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/charmbracelet/bubbletea v1.3.10 h1:otUDHWMMzQSB0Pkc87rm691KZ3SWa4KUlvF9nRvCICw=
Expand Down Expand Up @@ -51,6 +53,7 @@ github.com/gin-contrib/sse v1.0.0 h1:y3bT1mUWUxDpW4JLQg/HnTqV4rozuW4tC9eFKTxYI9E
github.com/gin-contrib/sse v1.0.0/go.mod h1:zNuFdwarAygJBht0NTKiSi3jRf6RbqeILZ9Sp6Slhe0=
github.com/gin-gonic/gin v1.10.1 h1:T0ujvqyCSqRopADpgPgiTT63DUQVSfojyME59Ei63pQ=
github.com/gin-gonic/gin v1.10.1/go.mod h1:4PMNQiOhvDRa013RKVbsiNwoyezlm2rm0uX/T7kzp5Y=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI=
github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
Expand Down Expand Up @@ -81,6 +84,8 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.23.0 h1:ad0vkEBuk23VJzZR9nkLVG0YAoN9coASF1GusYX6AlU=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.23.0/go.mod h1:igFoXX2ELCW06bol23DWPB5BEWfZISOzSP5K2sbLea0=
github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k=
github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM=
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
Expand Down Expand Up @@ -158,8 +163,8 @@ github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qq
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ=
github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88=
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog=
github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ=
github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc=
github.com/rs/xid v1.6.0/go.mod h1:7XoLgs4eV+QndskICGsho+ADou8ySMSjJKDIan90Nz0=
github.com/rs/zerolog v1.34.0 h1:k43nTLIwcTVQAncfCw4KZ2VY6ukYoZaBPNOE8txlOeY=
github.com/rs/zerolog v1.34.0/go.mod h1:bJsvje4Z08ROH4Nhs5iH600c3IkWhwp44iRc54W6wYQ=
Expand Down Expand Up @@ -190,8 +195,9 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8=
github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU=
github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI=
Expand All @@ -204,6 +210,10 @@ go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ
go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y=
go.opentelemetry.io/otel v1.39.0 h1:8yPrr/S0ND9QEfTfdP9V+SiwT4E0G7Y5MO7p85nis48=
go.opentelemetry.io/otel v1.39.0/go.mod h1:kLlFTywNWrFyEdH0oj2xK0bFYZtHRYUdv1NklR/tgc8=
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.32.0 h1:IJFEoHiytixx8cMiVAO+GmHR6Frwu+u5Ur8njpFO6Ac=
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.32.0/go.mod h1:3rHrKNtLIoS0oZwkY2vxi+oJcwFRWdtUyRII+so45p8=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.32.0 h1:cMyu9O88joYEaI47CnQkxO1XZdpoTF9fEnW2duIddhw=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.32.0/go.mod h1:6Am3rn7P9TVVeXYG+wtcGE7IE1tsQ+bP3AuWcKt/gOI=
go.opentelemetry.io/otel/metric v1.39.0 h1:d1UzonvEZriVfpNKEVmHXbdf909uGTOQjA0HF0Ls5Q0=
go.opentelemetry.io/otel/metric v1.39.0/go.mod h1:jrZSWL33sD7bBxg1xjrqyDjnuzTUB0x1nBERXd7Ftcs=
go.opentelemetry.io/otel/sdk v1.39.0 h1:nMLYcjVsvdui1B/4FRkwjzoRVsMK8uL/cj0OyhKzt18=
Expand All @@ -212,8 +222,12 @@ go.opentelemetry.io/otel/sdk/metric v1.39.0 h1:cXMVVFVgsIf2YL6QkRF4Urbr/aMInf+2W
go.opentelemetry.io/otel/sdk/metric v1.39.0/go.mod h1:xq9HEVH7qeX69/JnwEfp6fVq5wosJsY1mt4lLfYdVew=
go.opentelemetry.io/otel/trace v1.39.0 h1:2d2vfpEDmCJ5zVYz7ijaJdOF59xLomrvj7bjt6/qCJI=
go.opentelemetry.io/otel/trace v1.39.0/go.mod h1:88w4/PnZSazkGzz/w84VHpQafiU4EtqqlVdxWy+rNOA=
go.opentelemetry.io/proto/otlp v1.3.1 h1:TrMUixzpM0yuc/znrFTP9MMRh8trP93mkCiDVeXrui0=
go.opentelemetry.io/proto/otlp v1.3.1/go.mod h1:0X1WI4de4ZsLrrJNLAQbFeLCm3T7yBkR0XqQ7niQU+8=
go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE=
go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
go.uber.org/multierr v1.9.0 h1:7fIwc/ZtS0q++VgcfqFDxSBZVv/Xo49/SYnDFupUwlI=
go.uber.org/multierr v1.9.0/go.mod h1:X2jQV1h+kxSjClGpnseKVIxpmcjrj7MNnI0bnlfKTVQ=
golang.org/x/arch v0.15.0 h1:QtOrQd0bTUnhNVNndMpLHNWrDmYzZ2KDqSrEymqInZw=
Expand Down Expand Up @@ -242,6 +256,8 @@ golang.org/x/tools v0.39.0 h1:ik4ho21kwuQln40uelmciQPp9SipgNDdrafrYA4TmQQ=
golang.org/x/tools v0.39.0/go.mod h1:JnefbkDPyD8UU2kI5fuf8ZX4/yUeh9W877ZeBONxUqQ=
gonum.org/v1/gonum v0.16.0 h1:5+ul4Swaf3ESvrOnidPp4GZbzf0mxVQpDCYUQE7OJfk=
gonum.org/v1/gonum v0.16.0/go.mod h1:fef3am4MQ93R2HHpKnLk4/Tbh/s0+wqD5nfa6Pnwy4E=
google.golang.org/genproto/googleapis/api v0.0.0-20251202230838-ff82c1b0f217 h1:fCvbg86sFXwdrl5LgVcTEvNC+2txB5mgROGmRL5mrls=
google.golang.org/genproto/googleapis/api v0.0.0-20251202230838-ff82c1b0f217/go.mod h1:+rXWjjaukWZun3mLfjmVnQi18E1AsFbDN9QdJ5YXLto=
google.golang.org/genproto/googleapis/rpc v0.0.0-20251202230838-ff82c1b0f217 h1:gRkg/vSppuSQoDjxyiGfN4Upv/h/DQmIR10ZU8dh4Ww=
google.golang.org/genproto/googleapis/rpc v0.0.0-20251202230838-ff82c1b0f217/go.mod h1:7i2o+ce6H/6BluujYR+kqX3GKH+dChPTQU19wjRPiGk=
google.golang.org/grpc v1.79.3 h1:sybAEdRIEtvcD68Gx7dmnwjZKlyfuc61Dyo9pGXXkKE=
Expand Down
25 changes: 25 additions & 0 deletions control-plane/internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,16 @@ type LLMEndpoint struct {
type FeatureConfig struct {
DID DIDConfig `yaml:"did" mapstructure:"did"`
Connector ConnectorConfig `yaml:"connector" mapstructure:"connector"`
Tracing TracingConfig `yaml:"tracing" mapstructure:"tracing"`
}

// TracingConfig holds configuration for OpenTelemetry distributed tracing.
type TracingConfig struct {
Enabled bool `yaml:"enabled" mapstructure:"enabled"` // Enable OTel trace export (default: false)
Exporter string `yaml:"exporter" mapstructure:"exporter"` // "otlp-http" (default) or "otlp-grpc"
Endpoint string `yaml:"endpoint" mapstructure:"endpoint"` // OTLP endpoint (default: "localhost:4318")
ServiceName string `yaml:"service_name" mapstructure:"service_name"` // Service name for traces (default: "agentfield")
Insecure bool `yaml:"insecure" mapstructure:"insecure"` // Skip TLS verification
}

// ConnectorConfig holds configuration for the connector service integration.
Expand Down Expand Up @@ -509,6 +519,21 @@ func ApplyEnvOverrides(cfg *Config) {
}
}

// OpenTelemetry tracing overrides (also supports standard OTEL_* env vars)
if val := os.Getenv("AGENTFIELD_TRACING_ENABLED"); val != "" {
cfg.Features.Tracing.Enabled = val == "true" || val == "1"
}
if val := os.Getenv("OTEL_EXPORTER_OTLP_ENDPOINT"); val != "" {
cfg.Features.Tracing.Endpoint = val
cfg.Features.Tracing.Enabled = true
}
if val := os.Getenv("OTEL_SERVICE_NAME"); val != "" {
cfg.Features.Tracing.ServiceName = val
}
if val := os.Getenv("AGENTFIELD_TRACING_INSECURE"); val != "" {
cfg.Features.Tracing.Insecure = val == "true" || val == "1"
}

// Connector overrides
if val := os.Getenv("AGENTFIELD_CONNECTOR_ENABLED"); val != "" {
cfg.Features.Connector.Enabled = val == "true" || val == "1"
Expand Down
210 changes: 210 additions & 0 deletions control-plane/internal/observability/execution_tracer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
package observability

import (
"context"
"fmt"
"sync"

"github.com/Agent-Field/agentfield/control-plane/internal/events"
"github.com/Agent-Field/agentfield/control-plane/internal/logger"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)

// ExecutionTracer subscribes to the execution and reasoner event buses
// and translates events into OpenTelemetry spans.
type ExecutionTracer struct {
tracer *Tracer

mu sync.Mutex
spans map[string]trace.Span // keyed by execution_id
ctxs map[string]context.Context

ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
}

// NewExecutionTracer creates a new execution tracer that bridges events to OTel spans.
func NewExecutionTracer(tracer *Tracer) *ExecutionTracer {
return &ExecutionTracer{
tracer: tracer,
spans: make(map[string]trace.Span),
ctxs: make(map[string]context.Context),
}
}

// Start subscribes to event buses and begins translating events to spans.
func (et *ExecutionTracer) Start(ctx context.Context) {
et.ctx, et.cancel = context.WithCancel(ctx)

et.wg.Add(2)
go et.subscribeExecutionEvents()
go et.subscribeReasonerEvents()

logger.Logger.Info().Msg("execution tracer started")
}

// Stop unsubscribes from event buses and ends any open spans.
func (et *ExecutionTracer) Stop() {
if et.cancel != nil {
et.cancel()
}
et.wg.Wait()

// End any remaining open spans
et.mu.Lock()
for id, span := range et.spans {
span.SetAttributes(attribute.String("agentfield.execution.end_reason", "tracer_shutdown"))
span.End()
delete(et.spans, id)
delete(et.ctxs, id)
}
et.mu.Unlock()

logger.Logger.Info().Msg("execution tracer stopped")
}

func (et *ExecutionTracer) subscribeExecutionEvents() {
defer et.wg.Done()

subscriberID := "otel-execution-tracer"
ch := events.GlobalExecutionEventBus.Subscribe(subscriberID)
defer events.GlobalExecutionEventBus.Unsubscribe(subscriberID)

for {
select {
case <-et.ctx.Done():
return
case event, ok := <-ch:
if !ok {
return
}
et.handleExecutionEvent(event)
}
}
}

func (et *ExecutionTracer) subscribeReasonerEvents() {
defer et.wg.Done()

subscriberID := "otel-reasoner-tracer"
ch := events.GlobalReasonerEventBus.Subscribe(subscriberID)
defer events.GlobalReasonerEventBus.Unsubscribe(subscriberID)

for {
select {
case <-et.ctx.Done():
return
case event, ok := <-ch:
if !ok {
return
}
et.handleReasonerEvent(event)
}
}
}

func (et *ExecutionTracer) handleExecutionEvent(event events.ExecutionEvent) {
switch event.Type {
case events.ExecutionCreated, events.ExecutionStarted:
et.startExecution(event)
case events.ExecutionCompleted:
et.endExecution(event, false)
case events.ExecutionFailed, events.ExecutionCancelledEvent:
et.endExecution(event, true)
case events.ExecutionUpdated, events.ExecutionWaiting, events.ExecutionPaused, events.ExecutionResumed:
et.addExecutionEvent(event)
}
}

func (et *ExecutionTracer) handleReasonerEvent(event events.ReasonerEvent) {
if event.Type == events.Heartbeat {
return
}

et.mu.Lock()
parentCtx, exists := et.ctxs[event.NodeID]
et.mu.Unlock()

if !exists {
// No parent execution span; create a standalone reasoner span
parentCtx = et.ctx
}

_, span := et.tracer.StartStepSpan(parentCtx, "reasoner", event.ReasonerID, event.NodeID)
span.SetAttributes(
attribute.String("agentfield.reasoner.status", event.Status),
attribute.String("agentfield.event.type", string(event.Type)),
)

// Reasoner events are point-in-time; end the span immediately.
span.End()
}

func (et *ExecutionTracer) startExecution(event events.ExecutionEvent) {
et.mu.Lock()
defer et.mu.Unlock()

if _, exists := et.spans[event.ExecutionID]; exists {
return // already tracking
}

ctx, span := et.tracer.StartExecutionSpan(et.ctx, event.ExecutionID, event.WorkflowID, event.AgentNodeID)
span.SetAttributes(
attribute.String("agentfield.execution.status", event.Status),
)

et.spans[event.ExecutionID] = span
et.ctxs[event.ExecutionID] = ctx

logger.Logger.Debug().
Str("execution_id", event.ExecutionID).
Msg("started OTel execution span")
}

func (et *ExecutionTracer) endExecution(event events.ExecutionEvent, isError bool) {
et.mu.Lock()
span, exists := et.spans[event.ExecutionID]
if exists {
delete(et.spans, event.ExecutionID)
delete(et.ctxs, event.ExecutionID)
}
et.mu.Unlock()

if !exists {
return
}

span.SetAttributes(
attribute.String("agentfield.execution.final_status", event.Status),
)

if isError {
span.SetStatus(2, fmt.Sprintf("execution %s: %s", event.Type, event.Status))
}

span.End()

logger.Logger.Debug().
Str("execution_id", event.ExecutionID).
Bool("error", isError).
Msg("ended OTel execution span")
}

func (et *ExecutionTracer) addExecutionEvent(event events.ExecutionEvent) {
et.mu.Lock()
span, exists := et.spans[event.ExecutionID]
et.mu.Unlock()

if !exists {
return
}

span.AddEvent(string(event.Type),
trace.WithAttributes(
attribute.String("agentfield.execution.status", event.Status),
),
)
}
Loading
Loading