Skip to content
This repository was archived by the owner on Oct 3, 2023. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,5 @@ require (
google.golang.org/api v0.5.0
google.golang.org/grpc v1.21.0
)

replace github.com/census-instrumentation/opencensus-proto => github.com/owais/opencensus-proto v0.3.0-beta-unary
18 changes: 16 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/census-instrumentation/opencensus-proto v0.2.0 h1:LzQXZOgg4CQfE6bFvXGM30YZL1WW/M337pXml+GrcZ4=
github.com/census-instrumentation/opencensus-proto v0.2.0/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58=
Expand All @@ -23,10 +21,18 @@ github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ
github.com/hashicorp/golang-lru v0.5.1 h1:0hERBMJE1eitiLkihrMvRVBYAkpHzc/J3QdDN+dAcgU=
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/owais/opencensus-proto v0.3.0-beta-unary h1:uiOYCggvbdhUOTc3p5LYTDksz9LNsm/r2j1WcwKTHmM=
github.com/owais/opencensus-proto v0.3.0-beta-unary/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
go.opencensus.io v0.21.0 h1:mU6zScU4U1YAFPHEHYk+3JC4SY7JxgkqS10ZOSyksNg=
go.opencensus.io v0.21.0 h1:mU6zScU4U1YAFPHEHYk+3JC4SY7JxgkqS10ZOSyksNg=
go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
go.opencensus.io v0.22.0 h1:C9hSCOW830chIVkdja34wa6Ky+IzWllkUinR+BtRZd4=
go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8=
Expand Down Expand Up @@ -55,6 +61,8 @@ golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6 h1:bjcUS9ztw9kFmmIxJInhon/0
golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a h1:1BGLXjeY4akVXGgbC9HugT3Jv3hCI0z56oJR5vAMgBU=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190502145724-3ef323f4f1fd h1:r7DufRZuZbWB7j439YfAzP8RPDa9unLkpwQKUYbIMPI=
Expand All @@ -78,11 +86,17 @@ google.golang.org/genproto v0.0.0-20190307195333-5fe7a883aa19/go.mod h1:VzzqZJRn
google.golang.org/genproto v0.0.0-20190425155659-357c62f0e4bb h1:i1Ppqkc3WQXikh8bXiwHqAN5Rv3/qDCcRk0/Otx73BY=
google.golang.org/genproto v0.0.0-20190425155659-357c62f0e4bb/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.20.1 h1:Hz2g2wirWK7H0qIIhGIqRGTuMwTE8HEKFnDZZ7lm9NU=
google.golang.org/grpc v1.20.1 h1:Hz2g2wirWK7H0qIIhGIqRGTuMwTE8HEKFnDZZ7lm9NU=
google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38=
google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38=
google.golang.org/grpc v1.21.0 h1:G+97AoqBnmZIT91cLG/EkCoK9NSelj64P8bOHHNmGn0=
google.golang.org/grpc v1.21.0/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo=
gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo=
gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bli9HhUf9+ttbYbLASfIpnQbh74=
gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bli9HhUf9+ttbYbLASfIpnQbh74=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
4 changes: 4 additions & 0 deletions load_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,10 @@ func (da *discardAgent) Export(tses agenttracepb.TraceService_ExportServer) erro
}
}

func (da *discardAgent) ExportOne(ctx context.Context, batch *agenttracepb.ExportTraceServiceRequest) (*agenttracepb.ExportTraceServiceResponse, error) {
return &agenttracepb.ExportTraceServiceResponse{}, nil
}

func parsePort(addr net.Addr) (uint16, error) {
addrStr := addr.String()
if i := strings.LastIndex(addrStr, ":"); i < 0 {
Expand Down
24 changes: 21 additions & 3 deletions mock_agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package ocagent_test

import (
"context"
"fmt"
"net"
"sync"
Expand All @@ -35,9 +36,10 @@ func makeMockAgent(t *testing.T) *mockAgent {
type mockAgent struct {
t *testing.T

spans []*tracepb.Span
mu sync.Mutex
wg *sync.WaitGroup
spans []*tracepb.Span
unarySpans []*tracepb.Span
mu sync.Mutex
wg *sync.WaitGroup

traceNodes []*commonpb.Node
receivedConfigs []*agenttracepb.CurrentLibraryConfig
Expand Down Expand Up @@ -97,6 +99,14 @@ func (ma *mockAgent) Config(tscs agenttracepb.TraceService_ConfigServer) error {
}
}

func (ma *mockAgent) ExportOne(ctx context.Context, req *agenttracepb.ExportTraceServiceRequest) (*agenttracepb.ExportTraceServiceResponse, error) {
ma.mu.Lock()
ma.unarySpans = append(ma.spans, req.Spans...)
ma.traceNodes = append(ma.traceNodes, req.Node)
ma.mu.Unlock()
return &agenttracepb.ExportTraceServiceResponse{}, nil
}

func (ma *mockAgent) Export(tses agenttracepb.TraceService_ExportServer) error {
in, err := tses.Recv()
if err != nil {
Expand Down Expand Up @@ -191,6 +201,14 @@ func runMockAgentAtAddr(t *testing.T, addr string) *mockAgent {
return ma
}

func (ma *mockAgent) getUnarySpans() []*tracepb.Span {
ma.mu.Lock()
spans := append([]*tracepb.Span{}, ma.unarySpans...)
ma.mu.Unlock()

return spans
}

func (ma *mockAgent) getSpans() []*tracepb.Span {
ma.mu.Lock()
spans := append([]*tracepb.Span{}, ma.spans...)
Expand Down
99 changes: 73 additions & 26 deletions ocagent.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,22 +59,25 @@ type Exporter struct {
// senderMu protects the concurrent unsafe send on traceExporter client
senderMu sync.Mutex
// recvMu protects the concurrent unsafe recv on traceExporter client
recvMu sync.Mutex
started bool
stopped bool
agentAddress string
serviceName string
canDialInsecure bool
traceExporter agenttracepb.TraceService_ExportClient
metricsExporter agentmetricspb.MetricsService_ExportClient
nodeInfo *commonpb.Node
grpcClientConn *grpc.ClientConn
reconnectionPeriod time.Duration
resourceDetector resource.Detector
resource *resourcepb.Resource
compressor string
headers map[string]string
lastConnectErrPtr unsafe.Pointer
recvMu sync.Mutex
started bool
stopped bool
agentAddress string
serviceName string
canDialInsecure bool
useUnaryBatchExporter bool
unaryExportTimeout time.Duration
traceSvcClient agenttracepb.TraceServiceClient
traceExporter agenttracepb.TraceService_ExportClient
metricsExporter agentmetricspb.MetricsService_ExportClient
nodeInfo *commonpb.Node
grpcClientConn *grpc.ClientConn
reconnectionPeriod time.Duration
resourceDetector resource.Detector
resource *resourcepb.Resource
compressor string
headers map[string]string
lastConnectErrPtr unsafe.Pointer

startOnce sync.Once
stopCh chan bool
Expand Down Expand Up @@ -211,17 +214,13 @@ func (ae *Exporter) enableConnectionStreams(cc *grpc.ClientConn) error {
if err := ae.createTraceServiceConnection(ae.grpcClientConn, nodeInfo); err != nil {
return err
}

return ae.createMetricsServiceConnection(ae.grpcClientConn, nodeInfo)
}

func (ae *Exporter) createTraceServiceConnection(cc *grpc.ClientConn, node *commonpb.Node) error {
// Initiate the trace service by sending over node identifier info.
traceSvcClient := agenttracepb.NewTraceServiceClient(cc)
ctx := context.Background()
if len(ae.headers) > 0 {
ctx = metadata.NewOutgoingContext(ctx, metadata.New(ae.headers))
}
ctx := ae.newGRPCContext()
traceExporter, err := traceSvcClient.Export(ctx)
if err != nil {
return fmt.Errorf("Exporter.Start:: TraceServiceClient: %v", err)
Expand All @@ -236,6 +235,7 @@ func (ae *Exporter) createTraceServiceConnection(cc *grpc.ClientConn, node *comm
}

ae.mu.Lock()
ae.traceSvcClient = traceSvcClient
ae.traceExporter = traceExporter
ae.mu.Unlock()

Expand All @@ -252,7 +252,6 @@ func (ae *Exporter) createTraceServiceConnection(cc *grpc.ClientConn, node *comm
// In the background, handle trace configurations that are beamed down
// by the agent, but also reply to it with the applied configuration.
go ae.handleConfigStreaming(configStream)

return nil
}

Expand Down Expand Up @@ -295,10 +294,7 @@ func (ae *Exporter) dialToAgent() (*grpc.ClientConn, error) {
dialOpts = append(dialOpts, ae.grpcDialOptions...)
}

ctx := context.Background()
if len(ae.headers) > 0 {
ctx = metadata.NewOutgoingContext(ctx, metadata.New(ae.headers))
}
ctx := ae.newGRPCContext()
return grpc.DialContext(ctx, addr, dialOpts...)
}

Expand Down Expand Up @@ -375,14 +371,57 @@ func (ae *Exporter) Stop() error {
return err
}

// ExportSpan exports a single span to the configured destination.
// This is usually used by the client libraries to export to the local
// OC agent
func (ae *Exporter) ExportSpan(sd *trace.SpanData) {
if sd == nil {
return
}
_ = ae.traceBundler.Add(sd, 1)
}

// ExportTraceServiceRequest exports a span batch using streaming or unary gRPC depending on
// whether `WithUnaryTraceExporter()` was used or not.
func (ae *Exporter) ExportTraceServiceRequest(batch *agenttracepb.ExportTraceServiceRequest) error {
if ae.useUnaryBatchExporter {
return ae.exportTraceServiceRequestUnary(batch)
}
return ae.exportTraceServiceRequestStream(batch)
}

func (ae *Exporter) exportTraceServiceRequestUnary(req *agenttracepb.ExportTraceServiceRequest) error {
if req == nil || len(req.Spans) == 0 {
return nil
}

select {
case <-ae.stopCh:
return errStopped

default:
if lastConnectErr := ae.lastConnectError(); lastConnectErr != nil {
return fmt.Errorf("ExportTraceServiceRequest: no active connection, last connection error: %v", lastConnectErr)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since you're touching this code, could you please move the fmt.Errorf to where the last connection error is set? This way we avoid this formatting code in a possible hot path.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Will do if/when we move forward with up-streaming unary.

}
if req.Node == nil {
req.Node = ae.nodeInfo
}
ctx := ae.newGRPCContext()
if ae.unaryExportTimeout > 0 {
var cancel func()
ctx, cancel = context.WithDeadline(ctx, time.Now().Add(ae.unaryExportTimeout))
defer cancel()
}
_, err := ae.traceSvcClient.ExportOne(ctx, req)

if err != nil {
ae.setStateDisconnected(err)
}
return err
}
}

func (ae *Exporter) exportTraceServiceRequestStream(batch *agenttracepb.ExportTraceServiceRequest) error {
if batch == nil || len(batch.Spans) == 0 {
return nil
}
Expand Down Expand Up @@ -444,6 +483,14 @@ func ocSpanDataToPbSpans(sdl []*trace.SpanData) []*tracepb.Span {
return protoSpans
}

func (ae *Exporter) newGRPCContext() context.Context {
ctx := context.Background()
if len(ae.headers) > 0 {
ctx = metadata.NewOutgoingContext(ctx, metadata.New(ae.headers))
}
return ctx
}

func (ae *Exporter) uploadTraces(sdl []*trace.SpanData) {
select {
case <-ae.stopCh:
Expand Down
Loading