Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions receiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,31 @@ receivers:
cert_file: /cert.pem # path to certificate
endpoint: "localhost:9876"
```

### Remote Sampling
The Jaeger receiver also supports fetching sampling configuration from a remote collector.
It works proxying client requests for remote sampling configuration to the configured collector.

+---------------+ +--------------+ +-----------------+
| | get | | proxy | |
| client +--- sampling ---->+ agent +------------->+ collector |
| | strategy | | | |
+---------------+ +--------------+ +-----------------+

Remote sampling can be enabled by specifying the following lines in the jaeger receiver config:

```yaml
receivers:
jaeger:
protocols:
grpc:
.
.
remotesampling:
fetch_endpoint: "jaeger-collector:1234"
```


## <a name="prometheus"></a>Prometheus Receiver
**Only metrics are supported.**

Expand Down
12 changes: 9 additions & 3 deletions receiver/jaegerreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,17 @@ import (
"github.com/open-telemetry/opentelemetry-collector/receiver"
)

// RemoteSamplingConfig defines config key for remote sampling fetch endpoint
type RemoteSamplingConfig struct {
FetchEndpoint string `mapstructure:"fetch_endpoint"`
}

// Config defines configuration for Jaeger receiver.
type Config struct {
TypeVal string `mapstructure:"-"`
NameVal string `mapstructure:"-"`
Protocols map[string]*receiver.SecureReceiverSettings `mapstructure:"protocols"`
TypeVal string `mapstructure:"-"`
NameVal string `mapstructure:"-"`
Protocols map[string]*receiver.SecureReceiverSettings `mapstructure:"protocols"`
RemoteSampling *RemoteSamplingConfig `mapstructure:"remotesampling"`
}

// Name gets the receiver name.
Expand Down
3 changes: 3 additions & 0 deletions receiver/jaegerreceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ func TestLoadConfig(t *testing.T) {
},
},
},
RemoteSampling: &RemoteSamplingConfig{
FetchEndpoint: "jaeger-collector:1234",
},
})

tlsConfig := cfg.Receivers["jaeger/tls"].(*Config)
Expand Down
8 changes: 8 additions & 0 deletions receiver/jaegerreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ const (
defaultGRPCBindEndpoint = "localhost:14250"
defaultHTTPBindEndpoint = "localhost:14268"
defaultTChannelBindEndpoint = "localhost:14267"

// Endpoint to fetch remote sampling config
fetchEndpoint = "fetch_endpoint"
)

// Factory is the factory for Jaeger receiver.
Expand Down Expand Up @@ -107,6 +110,7 @@ func (f *Factory) CreateTraceReceiver(
protoTChannel := rCfg.Protocols[protoThriftTChannel]
protoThriftCompact := rCfg.Protocols[protoThriftCompact]
protoThriftBinary := rCfg.Protocols[protoThriftBinary]
remoteSamplingConfig := rCfg.RemoteSampling

config := Configuration{}
var grpcServerOptions []grpc.ServerOption
Expand Down Expand Up @@ -161,6 +165,10 @@ func (f *Factory) CreateTraceReceiver(
}
}

if remoteSamplingConfig != nil {
config.RemoteSamplingEndpoint = remoteSamplingConfig.FetchEndpoint
}

if (protoGRPC == nil && protoHTTP == nil && protoTChannel == nil && protoThriftBinary == nil && protoThriftCompact == nil) ||
(config.CollectorGRPCPort == 0 && config.CollectorHTTPPort == 0 && config.CollectorThriftPort == 0 && config.AgentBinaryThriftPort == 0 && config.AgentCompactThriftPort == 0) {
err := fmt.Errorf("either %v, %v, %v, %v, or %v protocol endpoint with non-zero port must be enabled for %s receiver",
Expand Down
30 changes: 28 additions & 2 deletions receiver/jaegerreceiver/jaeger_agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@ import (
commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1"
tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1"
"github.com/google/go-cmp/cmp"
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
"github.com/stretchr/testify/assert"
"go.opencensus.io/trace"
"go.uber.org/zap"
"google.golang.org/grpc"

"github.com/open-telemetry/opentelemetry-collector/consumer/consumerdata"
"github.com/open-telemetry/opentelemetry-collector/exporter/exportertest"
Expand Down Expand Up @@ -110,10 +112,35 @@ func TestJaegerAgentUDP_ThriftBinary_InvalidPort(t *testing.T) {
jr.StopTraceReception()
}

func initializeGRPCTestServer(t *testing.T, beforeServe func(server *grpc.Server)) (*grpc.Server, net.Addr) {
server := grpc.NewServer()
lis, err := net.Listen("tcp", "localhost:0")
assert.NoError(t, err)
beforeServe(server)
go func() {
err := server.Serve(lis)
assert.NoError(t, err)
}()
return server, lis.Addr()
}

type mockSamplingHandler struct {
}

func (*mockSamplingHandler) GetSamplingStrategy(context.Context, *api_v2.SamplingStrategyParameters) (*api_v2.SamplingStrategyResponse, error) {
return &api_v2.SamplingStrategyResponse{StrategyType: api_v2.SamplingStrategyType_PROBABILISTIC}, nil
}

func TestJaegerHTTP(t *testing.T) {
s, addr := initializeGRPCTestServer(t, func(s *grpc.Server) {
api_v2.RegisterSamplingManagerServer(s, &mockSamplingHandler{})
})
defer s.GracefulStop()

port := testutils.GetAvailablePort(t)
config := &Configuration{
AgentHTTPPort: int(port),
AgentHTTPPort: int(port),
RemoteSamplingEndpoint: addr.String(),
}
jr, err := New(context.Background(), config, nil, zap.NewNop())
assert.NoError(t, err, "Failed to create new Jaeger Receiver")
Expand All @@ -127,7 +154,6 @@ func TestJaegerHTTP(t *testing.T) {
err = testutils.WaitForPort(t, port)
assert.NoError(t, err, "WaitForPort failed")

// this functionality is just stubbed out at the moment. just confirm they 200.
testURL := fmt.Sprintf("http://localhost:%d/sampling?service=test", port)
resp, err := http.Get(testURL)
assert.NoError(t, err, "should not have failed to make request")
Expand Down
2 changes: 2 additions & 0 deletions receiver/jaegerreceiver/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ receivers:
endpoint: "0.0.0.0:456"
thrift-binary:
endpoint: "0.0.0.0:789"
remotesampling:
fetch_endpoint: "jaeger-collector:1234"

# The following demonstrates disabling the receiver.
# All of the protocols need to be disabled for the receiver to be disabled.
Expand Down
35 changes: 29 additions & 6 deletions receiver/jaegerreceiver/trace_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
apacheThrift "github.com/apache/thrift/lib/go/thrift"
"github.com/gorilla/mux"
"github.com/jaegertracing/jaeger/cmd/agent/app/configmanager"
jSamplingConfig "github.com/jaegertracing/jaeger/cmd/agent/app/configmanager/grpc"
"github.com/jaegertracing/jaeger/cmd/agent/app/httpserver"
"github.com/jaegertracing/jaeger/cmd/agent/app/processors"
"github.com/jaegertracing/jaeger/cmd/agent/app/reporter"
Expand Down Expand Up @@ -62,6 +63,7 @@ type Configuration struct {
AgentCompactThriftPort int
AgentBinaryThriftPort int
AgentHTTPPort int
RemoteSamplingEndpoint string
}

// Receiver type is used to receive spans that were originally intended to be sent to Jaeger.
Expand All @@ -81,8 +83,9 @@ type jReceiver struct {
tchanServer *jTchannelReceiver
collectorServer *http.Server

agentProcessors []processors.Processor
agentServer *http.Server
agentSamplingManager *jSamplingConfig.SamplingManager
agentProcessors []processors.Processor
agentServer *http.Server

defaultAgentCtx context.Context
logger *zap.Logger
Expand Down Expand Up @@ -112,15 +115,28 @@ const (

// New creates a TraceReceiver that receives traffic as a collector with both Thrift and HTTP transports.
func New(ctx context.Context, config *Configuration, nextConsumer consumer.TraceConsumer, logger *zap.Logger) (receiver.TraceReceiver, error) {
return &jReceiver{
jR := &jReceiver{
config: config,
defaultAgentCtx: observability.ContextWithReceiverName(context.Background(), "jaeger-agent"),
nextConsumer: nextConsumer,
tchanServer: &jTchannelReceiver{
nextConsumer: nextConsumer,
},
logger: logger,
}, nil
}

if config.RemoteSamplingEndpoint != "" {
// Create upstream grpc client
conn, err := grpc.Dial(config.RemoteSamplingEndpoint, grpc.WithInsecure())
if err != nil {
logger.Error("Error creating grpc connection to jaeger remote sampling endpoint", zap.String("endpoint", config.RemoteSamplingEndpoint))
return nil, err
}

jR.agentSamplingManager = jSamplingConfig.NewConfigManager(conn)
}

return jR, nil
}

var _ receiver.TraceReceiver = (*jReceiver)(nil)
Expand Down Expand Up @@ -339,11 +355,18 @@ func (jr *jReceiver) EmitBatch(batch *jaeger.Batch) error {
}

func (jr *jReceiver) GetSamplingStrategy(serviceName string) (*sampling.SamplingStrategyResponse, error) {
return &sampling.SamplingStrategyResponse{}, nil
return jr.agentSamplingManager.GetSamplingStrategy(serviceName)
}

func (jr *jReceiver) GetBaggageRestrictions(serviceName string) ([]*baggage.BaggageRestriction, error) {
return nil, nil
br, err := jr.agentSamplingManager.GetBaggageRestrictions(serviceName)
if err != nil {
// Baggage restrictions are not yet implemented - refer to - https://github.com/jaegertracing/jaeger/issues/373
// As of today, GetBaggageRestrictions() always returns an error.
// However, we `return nil, nil` here in order to serve a valid `200 OK` response.
return nil, nil
}
return br, nil
}

func (jr *jReceiver) PostSpans(ctx context.Context, r *api_v2.PostSpansRequest) (*api_v2.PostSpansResponse, error) {
Expand Down