Skip to content

Commit 6de3dab

Browse files
authored
Resolve #72 - gRPC Interceptor (#621)
* Move interceptor to plugin * Add basic net.peer info * Ensure that grpc status match span status See: https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/data-rpc.md#status * Set rpc.service attribute * Add StreamClientInterceptor and StreamServerInterceptor * Fix: golint errors * Apply automated go.mod changes from make * Implement suggestions to improve readability
1 parent 0bb12d9 commit 6de3dab

File tree

9 files changed

+918
-184
lines changed

9 files changed

+918
-184
lines changed

example/grpc/api/hello-service.pb.go

Lines changed: 241 additions & 91 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

example/grpc/api/hello-service.proto

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,12 @@ package api;
1717

1818
service HelloService {
1919
rpc SayHello (HelloRequest) returns (HelloResponse);
20+
21+
rpc SayHelloServerStream (HelloRequest) returns (stream HelloResponse);
22+
23+
rpc SayHelloClientStream (stream HelloRequest) returns (HelloResponse);
24+
25+
rpc SayHelloBidiStream (stream HelloRequest) returns (stream HelloResponse);
2026
}
2127

2228
message HelloRequest {

example/grpc/client/main.go

Lines changed: 131 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,39 +16,167 @@ package main
1616

1717
import (
1818
"context"
19+
"io"
1920
"log"
2021
"time"
2122

23+
"go.opentelemetry.io/otel/api/global"
2224
"go.opentelemetry.io/otel/example/grpc/api"
2325
"go.opentelemetry.io/otel/example/grpc/config"
26+
"go.opentelemetry.io/otel/plugin/grpctrace"
2427

2528
"google.golang.org/grpc"
2629
"google.golang.org/grpc/metadata"
27-
28-
"go.opentelemetry.io/otel/example/grpc/middleware/tracing"
2930
)
3031

3132
func main() {
3233
config.Init()
3334

3435
var conn *grpc.ClientConn
35-
conn, err := grpc.Dial(":7777", grpc.WithInsecure(), grpc.WithUnaryInterceptor(tracing.UnaryClientInterceptor))
36+
conn, err := grpc.Dial(":7777", grpc.WithInsecure(),
37+
grpc.WithUnaryInterceptor(grpctrace.UnaryClientInterceptor(global.Tracer(""))),
38+
grpc.WithStreamInterceptor(grpctrace.StreamClientInterceptor(global.Tracer(""))),
39+
)
40+
3641
if err != nil {
3742
log.Fatalf("did not connect: %s", err)
3843
}
3944
defer func() { _ = conn.Close() }()
4045

4146
c := api.NewHelloServiceClient(conn)
4247

48+
callSayHello(c)
49+
callSayHelloClientStream(c)
50+
callSayHelloServerStream(c)
51+
callSayHelloBidiStream(c)
52+
53+
time.Sleep(10 * time.Millisecond)
54+
}
55+
56+
func callSayHello(c api.HelloServiceClient) {
4357
md := metadata.Pairs(
4458
"timestamp", time.Now().Format(time.StampNano),
4559
"client-id", "web-api-client-us-east-1",
4660
"user-id", "some-test-user-id",
4761
)
62+
4863
ctx := metadata.NewOutgoingContext(context.Background(), md)
4964
response, err := c.SayHello(ctx, &api.HelloRequest{Greeting: "World"})
5065
if err != nil {
5166
log.Fatalf("Error when calling SayHello: %s", err)
5267
}
5368
log.Printf("Response from server: %s", response.Reply)
5469
}
70+
71+
func callSayHelloClientStream(c api.HelloServiceClient) {
72+
md := metadata.Pairs(
73+
"timestamp", time.Now().Format(time.StampNano),
74+
"client-id", "web-api-client-us-east-1",
75+
"user-id", "some-test-user-id",
76+
)
77+
78+
ctx := metadata.NewOutgoingContext(context.Background(), md)
79+
stream, err := c.SayHelloClientStream(ctx)
80+
if err != nil {
81+
log.Fatalf("Error when opening SayHelloClientStream: %s", err)
82+
}
83+
84+
for i := 0; i < 5; i++ {
85+
err := stream.Send(&api.HelloRequest{Greeting: "World"})
86+
87+
time.Sleep(time.Duration(i*50) * time.Millisecond)
88+
89+
if err != nil {
90+
log.Fatalf("Error when sending to SayHelloClientStream: %s", err)
91+
}
92+
}
93+
94+
response, err := stream.CloseAndRecv()
95+
if err != nil {
96+
log.Fatalf("Error when closing SayHelloClientStream: %s", err)
97+
}
98+
99+
log.Printf("Response from server: %s", response.Reply)
100+
}
101+
102+
func callSayHelloServerStream(c api.HelloServiceClient) {
103+
md := metadata.Pairs(
104+
"timestamp", time.Now().Format(time.StampNano),
105+
"client-id", "web-api-client-us-east-1",
106+
"user-id", "some-test-user-id",
107+
)
108+
109+
ctx := metadata.NewOutgoingContext(context.Background(), md)
110+
stream, err := c.SayHelloServerStream(ctx, &api.HelloRequest{Greeting: "World"})
111+
if err != nil {
112+
log.Fatalf("Error when opening SayHelloServerStream: %s", err)
113+
}
114+
115+
for {
116+
response, err := stream.Recv()
117+
if err == io.EOF {
118+
break
119+
} else if err != nil {
120+
log.Fatalf("Error when receiving from SayHelloServerStream: %s", err)
121+
}
122+
123+
log.Printf("Response from server: %s", response.Reply)
124+
time.Sleep(50 * time.Millisecond)
125+
}
126+
}
127+
128+
func callSayHelloBidiStream(c api.HelloServiceClient) {
129+
md := metadata.Pairs(
130+
"timestamp", time.Now().Format(time.StampNano),
131+
"client-id", "web-api-client-us-east-1",
132+
"user-id", "some-test-user-id",
133+
)
134+
135+
ctx := metadata.NewOutgoingContext(context.Background(), md)
136+
stream, err := c.SayHelloBidiStream(ctx)
137+
if err != nil {
138+
log.Fatalf("Error when opening SayHelloBidiStream: %s", err)
139+
}
140+
141+
serverClosed := make(chan struct{})
142+
clientClosed := make(chan struct{})
143+
144+
go func() {
145+
for i := 0; i < 5; i++ {
146+
err := stream.Send(&api.HelloRequest{Greeting: "World"})
147+
148+
if err != nil {
149+
log.Fatalf("Error when sending to SayHelloBidiStream: %s", err)
150+
}
151+
152+
time.Sleep(50 * time.Millisecond)
153+
}
154+
155+
err := stream.CloseSend()
156+
if err != nil {
157+
log.Fatalf("Error when closing SayHelloBidiStream: %s", err)
158+
}
159+
160+
clientClosed <- struct{}{}
161+
}()
162+
163+
go func() {
164+
for {
165+
response, err := stream.Recv()
166+
if err == io.EOF {
167+
break
168+
} else if err != nil {
169+
log.Fatalf("Error when receiving from SayHelloBidiStream: %s", err)
170+
}
171+
172+
log.Printf("Response from server: %s", response.Reply)
173+
time.Sleep(50 * time.Millisecond)
174+
}
175+
176+
serverClosed <- struct{}{}
177+
}()
178+
179+
// Wait until client and server both closed the connection.
180+
<-clientClosed
181+
<-serverClosed
182+
}

example/grpc/go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,5 +7,6 @@ replace go.opentelemetry.io/otel => ../..
77
require (
88
github.com/golang/protobuf v1.3.2
99
go.opentelemetry.io/otel v0.4.2
10+
golang.org/x/net v0.0.0-20190311183353-d8887717615a
1011
google.golang.org/grpc v1.27.1
1112
)

example/grpc/middleware/tracing/tracing.go

Lines changed: 0 additions & 85 deletions
This file was deleted.

example/grpc/server/main.go

Lines changed: 74 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,18 @@ package main
1616

1717
import (
1818
"context"
19+
"fmt"
20+
"io"
1921
"log"
2022
"net"
23+
"time"
2124

25+
"go.opentelemetry.io/otel/api/global"
2226
"go.opentelemetry.io/otel/example/grpc/api"
2327
"go.opentelemetry.io/otel/example/grpc/config"
28+
"go.opentelemetry.io/otel/plugin/grpctrace"
2429

2530
"google.golang.org/grpc"
26-
27-
"go.opentelemetry.io/otel/example/grpc/middleware/tracing"
2831
)
2932

3033
const (
@@ -33,15 +36,78 @@ const (
3336

3437
// server is used to implement api.HelloServiceServer
3538
type server struct {
36-
api.UnimplementedHelloServiceServer
39+
api.HelloServiceServer
3740
}
3841

3942
// SayHello implements api.HelloServiceServer
4043
func (s *server) SayHello(ctx context.Context, in *api.HelloRequest) (*api.HelloResponse, error) {
41-
log.Printf("Received: %v", in.GetGreeting())
44+
log.Printf("Received: %v\n", in.GetGreeting())
45+
time.Sleep(50 * time.Millisecond)
46+
4247
return &api.HelloResponse{Reply: "Hello " + in.Greeting}, nil
4348
}
4449

50+
func (s *server) SayHelloServerStream(in *api.HelloRequest, out api.HelloService_SayHelloServerStreamServer) error {
51+
log.Printf("Received: %v\n", in.GetGreeting())
52+
53+
for i := 0; i < 5; i++ {
54+
err := out.Send(&api.HelloResponse{Reply: "Hello " + in.Greeting})
55+
if err != nil {
56+
return err
57+
}
58+
59+
time.Sleep(time.Duration(i*50) * time.Millisecond)
60+
}
61+
62+
return nil
63+
}
64+
65+
func (s *server) SayHelloClientStream(stream api.HelloService_SayHelloClientStreamServer) error {
66+
i := 0
67+
68+
for {
69+
in, err := stream.Recv()
70+
71+
if err == io.EOF {
72+
break
73+
} else if err != nil {
74+
log.Printf("Non EOF error: %v\n", err)
75+
return err
76+
}
77+
78+
log.Printf("Received: %v\n", in.GetGreeting())
79+
i++
80+
}
81+
82+
time.Sleep(50 * time.Millisecond)
83+
84+
return stream.SendAndClose(&api.HelloResponse{Reply: fmt.Sprintf("Hello (%v times)", i)})
85+
}
86+
87+
func (s *server) SayHelloBidiStream(stream api.HelloService_SayHelloBidiStreamServer) error {
88+
for {
89+
in, err := stream.Recv()
90+
91+
if err == io.EOF {
92+
break
93+
} else if err != nil {
94+
log.Printf("Non EOF error: %v\n", err)
95+
return err
96+
}
97+
98+
time.Sleep(50 * time.Millisecond)
99+
100+
log.Printf("Received: %v\n", in.GetGreeting())
101+
err = stream.Send(&api.HelloResponse{Reply: "Hello " + in.Greeting})
102+
103+
if err != nil {
104+
return err
105+
}
106+
}
107+
108+
return nil
109+
}
110+
45111
func main() {
46112
config.Init()
47113

@@ -50,7 +116,10 @@ func main() {
50116
log.Fatalf("failed to listen: %v", err)
51117
}
52118

53-
s := grpc.NewServer(grpc.UnaryInterceptor(tracing.UnaryServerInterceptor))
119+
s := grpc.NewServer(
120+
grpc.UnaryInterceptor(grpctrace.UnaryServerInterceptor(global.Tracer(""))),
121+
grpc.StreamInterceptor(grpctrace.StreamServerInterceptor(global.Tracer(""))),
122+
)
54123

55124
api.RegisterHelloServiceServer(s, &server{})
56125
if err := s.Serve(lis); err != nil {

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ require (
66
github.com/DataDog/sketches-go v0.0.0-20190923095040-43f19ad77ff7
77
github.com/benbjohnson/clock v1.0.0
88
github.com/davecgh/go-spew v1.1.1 // indirect
9+
github.com/golang/protobuf v1.3.2
910
github.com/google/go-cmp v0.4.0
1011
github.com/google/gofuzz v1.0.0 // indirect
1112
github.com/kr/pretty v0.1.0 // indirect

0 commit comments

Comments
 (0)