Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
294 changes: 1 addition & 293 deletions receiver/zipkinreceiver/trace_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,10 @@ import (
"io/ioutil"
"net"
"net/http"
"strconv"
"strings"
"sync"

"github.com/apache/thrift/lib/go/thrift"
commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1"
tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1"
"github.com/jaegertracing/jaeger/thrift-gen/zipkincore"
zipkinmodel "github.com/openzipkin/zipkin-go/model"
zipkinproto "github.com/openzipkin/zipkin-go/proto/v2"
Expand All @@ -41,9 +38,7 @@ import (
"go.opentelemetry.io/collector/component/componenterror"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumerdata"
"go.opentelemetry.io/collector/internal"
"go.opentelemetry.io/collector/obsreport"
tracetranslator "go.opentelemetry.io/collector/translator/trace"
"go.opentelemetry.io/collector/translator/trace/zipkin"
)

Expand Down Expand Up @@ -201,39 +196,7 @@ func (zr *ZipkinReceiver) v2ToTraceSpans(blob []byte, hdr http.Header) (reqs []c
return nil, err
}

// *commonpb.Node instances have unique addresses hence
// for grouping within a map, we'll use the .String() value
byNodeGrouping := make(map[string][]*tracepb.Span)
uniqueNodes := make([]*commonpb.Node, 0, len(zipkinSpans))
// Now translate them into tracepb.Span
for _, zspan := range zipkinSpans {
if zspan == nil {
continue
}
span, node := zipkinSpanToTraceSpan(zspan)
key := node.String()
if _, alreadyAdded := byNodeGrouping[key]; !alreadyAdded {
uniqueNodes = append(uniqueNodes, node)
}
byNodeGrouping[key] = append(byNodeGrouping[key], span)
}

for _, node := range uniqueNodes {
key := node.String()
spans := byNodeGrouping[key]
if len(spans) == 0 {
// Should never happen but nonetheless be cautious
// not to send blank spans.
continue
}
reqs = append(reqs, consumerdata.TraceData{
Node: node,
Spans: spans,
})
delete(byNodeGrouping, key)
}

return reqs, nil
return zipkin.V2BatchToOCProto(zipkinSpans)
}

func (zr *ZipkinReceiver) deserializeFromJSON(jsonBlob []byte, debugWasSet bool) (zs []*zipkinmodel.SpanModel, err error) {
Expand Down Expand Up @@ -369,261 +332,6 @@ func (zr *ZipkinReceiver) ServeHTTP(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusAccepted)
}

func zipkinSpanToTraceSpan(zs *zipkinmodel.SpanModel) (*tracepb.Span, *commonpb.Node) {
traceID := tracetranslator.UInt64ToByteTraceID(zs.TraceID.High, zs.TraceID.Low)
var parentSpanID []byte
if zs.ParentID != nil {
parentSpanID = tracetranslator.UInt64ToByteSpanID(uint64(*zs.ParentID))
}

pbs := &tracepb.Span{
TraceId: traceID,
SpanId: tracetranslator.UInt64ToByteSpanID(uint64(zs.ID)),
ParentSpanId: parentSpanID,
Name: &tracepb.TruncatableString{Value: zs.Name},
StartTime: internal.TimeToTimestamp(zs.Timestamp),
EndTime: internal.TimeToTimestamp(zs.Timestamp.Add(zs.Duration)),
Kind: zipkinSpanKindToProtoSpanKind(zs.Kind),
Status: extractProtoStatus(zs),
Attributes: zipkinTagsToTraceAttributes(zs.Tags, zs.Kind),
TimeEvents: zipkinAnnotationsToProtoTimeEvents(zs.Annotations),
}

node := nodeFromZipkinEndpoints(zs, pbs)

return pbs, node
}

func nodeFromZipkinEndpoints(zs *zipkinmodel.SpanModel, pbs *tracepb.Span) *commonpb.Node {
if zs.LocalEndpoint == nil && zs.RemoteEndpoint == nil {
return nil
}

node := new(commonpb.Node)
var endpointMap map[string]string

// Retrieve and make use of the local endpoint
if lep := zs.LocalEndpoint; lep != nil {
node.ServiceInfo = &commonpb.ServiceInfo{
Name: lep.ServiceName,
}
endpointMap = zipkinEndpointIntoAttributes(lep, endpointMap, isLocalEndpoint)
}

// Retrieve and make use of the remote endpoint
if rep := zs.RemoteEndpoint; rep != nil {
endpointMap = zipkinEndpointIntoAttributes(rep, endpointMap, isRemoteEndpoint)
}

if endpointMap != nil {
if pbs.Attributes == nil {
pbs.Attributes = &tracepb.Span_Attributes{}
}
if pbs.Attributes.AttributeMap == nil {
pbs.Attributes.AttributeMap = make(
map[string]*tracepb.AttributeValue, len(endpointMap))
}

// Delete the redundant serviceName key since it is already on the node.
delete(endpointMap, zipkin.LocalEndpointServiceName)
attrbMap := pbs.Attributes.AttributeMap
for key, value := range endpointMap {
attrbMap[key] = &tracepb.AttributeValue{
Value: &tracepb.AttributeValue_StringValue{
StringValue: &tracepb.TruncatableString{Value: value},
},
}
}
}

return node
}

type zipkinDirection bool

const (
isLocalEndpoint zipkinDirection = true
isRemoteEndpoint zipkinDirection = false
)

var blankIP net.IP

// zipkinEndpointIntoAttributes extracts information from s zipkin endpoint struct
// and puts it into a map with pre-defined keys.
func zipkinEndpointIntoAttributes(
ep *zipkinmodel.Endpoint,
into map[string]string,
endpointType zipkinDirection,
) map[string]string {

if into == nil {
into = make(map[string]string)
}

var ipv4Key, ipv6Key, portKey, serviceNameKey string
if endpointType == isLocalEndpoint {
ipv4Key, ipv6Key = zipkin.LocalEndpointIPv4, zipkin.LocalEndpointIPv6
portKey, serviceNameKey = zipkin.LocalEndpointPort, zipkin.LocalEndpointServiceName
} else {
ipv4Key, ipv6Key = zipkin.RemoteEndpointIPv4, zipkin.RemoteEndpointIPv6
portKey, serviceNameKey = zipkin.RemoteEndpointPort, zipkin.RemoteEndpointServiceName
}
if ep.IPv4 != nil && !ep.IPv4.Equal(blankIP) {
into[ipv4Key] = ep.IPv4.String()
}
if ep.IPv6 != nil && !ep.IPv6.Equal(blankIP) {
into[ipv6Key] = ep.IPv6.String()
}
if ep.Port > 0 {
into[portKey] = strconv.Itoa(int(ep.Port))
}
if serviceName := ep.ServiceName; serviceName != "" {
into[serviceNameKey] = serviceName
}
return into
}

const statusCodeUnknown = 2

func extractProtoStatus(zs *zipkinmodel.SpanModel) *tracepb.Status {
// The status is stored with the "error" key
// See https://github.com/census-instrumentation/opencensus-go/blob/1eb9a13c7dd02141e065a665f6bf5c99a090a16a/exporter/zipkin/zipkin.go#L160-L165
if zs == nil || len(zs.Tags) == 0 {
return nil
}
canonicalCodeStr := zs.Tags["error"]
message := zs.Tags["opencensus.status_description"]
if message == "" && canonicalCodeStr == "" {
return nil
}
code, set := canonicalCodesMap[canonicalCodeStr]
if !set {
// If not status code was set, then we should use UNKNOWN
code = statusCodeUnknown
}
return &tracepb.Status{
Message: message,
Code: code,
}
}

var canonicalCodesMap = map[string]int32{
// https://github.com/googleapis/googleapis/blob/bee79fbe03254a35db125dc6d2f1e9b752b390fe/google/rpc/code.proto#L33-L186
"OK": 0,
"CANCELLED": 1,
"UNKNOWN": 2,
"INVALID_ARGUMENT": 3,
"DEADLINE_EXCEEDED": 4,
"NOT_FOUND": 5,
"ALREADY_EXISTS": 6,
"PERMISSION_DENIED": 7,
"RESOURCE_EXHAUSTED": 8,
"FAILED_PRECONDITION": 9,
"ABORTED": 10,
"OUT_OF_RANGE": 11,
"UNIMPLEMENTED": 12,
"INTERNAL": 13,
"UNAVAILABLE": 14,
"DATA_LOSS": 15,
"UNAUTHENTICATED": 16,
}

func zipkinSpanKindToProtoSpanKind(skind zipkinmodel.Kind) tracepb.Span_SpanKind {
switch strings.ToUpper(string(skind)) {
case "CLIENT":
return tracepb.Span_CLIENT
case "SERVER":
return tracepb.Span_SERVER
default:
return tracepb.Span_SPAN_KIND_UNSPECIFIED
}
}

func zipkinAnnotationsToProtoTimeEvents(zas []zipkinmodel.Annotation) *tracepb.Span_TimeEvents {
if len(zas) == 0 {
return nil
}
tevs := make([]*tracepb.Span_TimeEvent, 0, len(zas))
for _, za := range zas {
if tev := zipkinAnnotationToProtoAnnotation(za); tev != nil {
tevs = append(tevs, tev)
}
}
if len(tevs) == 0 {
return nil
}
return &tracepb.Span_TimeEvents{
TimeEvent: tevs,
}
}

var blankAnnotation zipkinmodel.Annotation

func zipkinAnnotationToProtoAnnotation(zas zipkinmodel.Annotation) *tracepb.Span_TimeEvent {
if zas == blankAnnotation {
return nil
}
return &tracepb.Span_TimeEvent{
Time: internal.TimeToTimestamp(zas.Timestamp),
Value: &tracepb.Span_TimeEvent_Annotation_{
Annotation: &tracepb.Span_TimeEvent_Annotation{
Description: &tracepb.TruncatableString{Value: zas.Value},
},
},
}
}

func zipkinTagsToTraceAttributes(tags map[string]string, skind zipkinmodel.Kind) *tracepb.Span_Attributes {
// Produce and Consumer span kinds are not representable in OpenCensus format.
// We will represent them using TagSpanKind attribute, according to OpenTracing
// conventions. Check if it is one of those span kinds.
var spanKindTagVal tracetranslator.OpenTracingSpanKind
switch skind {
case zipkinmodel.Producer:
spanKindTagVal = tracetranslator.OpenTracingSpanKindProducer
case zipkinmodel.Consumer:
spanKindTagVal = tracetranslator.OpenTracingSpanKindConsumer
}

if len(tags) == 0 && spanKindTagVal == "" {
// No input tags and no need to add a span kind tag. Keep attributes map empty.
return nil
}

amap := make(map[string]*tracepb.AttributeValue, len(tags))
for key, value := range tags {
// We did a translation from "boolean" to "string"
// in OpenCensus-Go's Zipkin exporter as per
// https://github.com/census-instrumentation/opencensus-go/blob/1eb9a13c7dd02141e065a665f6bf5c99a090a16a/exporter/zipkin/zipkin.go#L138-L155
switch value {
case "true", "false":
amap[key] = &tracepb.AttributeValue{
Value: &tracepb.AttributeValue_BoolValue{BoolValue: value == "true"},
}
default:
amap[key] = &tracepb.AttributeValue{
Value: &tracepb.AttributeValue_StringValue{
StringValue: &tracepb.TruncatableString{Value: value},
},
}
}

}

if spanKindTagVal != "" {
// Set the previously translated span kind attribute (see top of this function).
// We do this after the "tags" map is translated so that we will overwrite
// the attribute if it exists.
amap[tracetranslator.TagSpanKind] = &tracepb.AttributeValue{
Value: &tracepb.AttributeValue_StringValue{
StringValue: &tracepb.TruncatableString{Value: string(spanKindTagVal)},
},
}
}

return &tracepb.Span_Attributes{AttributeMap: amap}
}

func transportType(r *http.Request) string {
v1 := r.URL != nil && strings.Contains(r.URL.Path, "api/v1/spans")
if v1 {
Expand Down
Loading