Skip to content

Commit ce52476

Browse files
committed
Add support for deadlines
Adds a generic `Metadata` field where we store the deadline for the request, which is taken from the passed in context. This deadline is then added to the context passed into the server handler function. This is pretty simnilar to how GRPC implements this, except GRPC doesn't support deadlines, instead a timeout is specified where the timeouty is the duration left until the deadline just before the client sends the request... I chose to use a deadline since it seems like this is a more precise implementation of a caller's intention. Signed-off-by: Brian Goff <[email protected]>
1 parent f51df44 commit ce52476

4 files changed

Lines changed: 116 additions & 8 deletions

File tree

client.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"io"
2222
"net"
2323
"os"
24+
"strconv"
2425
"strings"
2526
"sync"
2627
"syscall"
@@ -78,14 +79,19 @@ func (c *Client) Call(ctx context.Context, service, method string, req, resp int
7879

7980
var (
8081
creq = &Request{
81-
Service: service,
82-
Method: method,
83-
Payload: payload,
82+
Service: service,
83+
Method: method,
84+
Payload: payload,
85+
Metadata: make(map[string]*Values),
8486
}
8587

8688
cresp = &Response{}
8789
)
8890

91+
if dl, ok := ctx.Deadline(); ok {
92+
creq.Metadata[metadataDeadlineKey] = &Values{Values: []string{strconv.FormatInt(dl.UnixNano(), 10)}}
93+
}
94+
8995
if err := c.dispatch(ctx, creq, cresp); err != nil {
9096
return err
9197
}
@@ -104,6 +110,7 @@ func (c *Client) Call(ctx context.Context, service, method string, req, resp int
104110
func (c *Client) dispatch(ctx context.Context, req *Request, resp *Response) error {
105111
errs := make(chan error, 1)
106112
call := &callRequest{
113+
ctx: context.TODO(),
107114
req: req,
108115
resp: resp,
109116
errs: errs,

server.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,20 @@ import (
2121
"io"
2222
"math/rand"
2323
"net"
24+
"strconv"
2425
"sync"
2526
"sync/atomic"
2627
"time"
2728

2829
"github.com/pkg/errors"
2930
"github.com/sirupsen/logrus"
31+
spb "google.golang.org/genproto/googleapis/rpc/status"
3032
"google.golang.org/grpc/codes"
3133
"google.golang.org/grpc/status"
3234
)
3335

36+
const metadataDeadlineKey = "ttrpc-deadline"
37+
3438
var (
3539
ErrServerClosed = errors.New("ttrpc: server closed")
3640
)
@@ -414,6 +418,26 @@ func (c *serverConn) run(sctx context.Context) {
414418
case request := <-requests:
415419
active++
416420
go func(id uint32) {
421+
ctx, cancel, err := getRequestContext(ctx, request.req)
422+
if err != nil {
423+
resp := response{
424+
id: id,
425+
resp: &Response{
426+
Status: &spb.Status{
427+
Code: int32(codes.InvalidArgument),
428+
Message: err.Error(),
429+
},
430+
},
431+
}
432+
select {
433+
case responses <- resp:
434+
case <-done:
435+
}
436+
return
437+
}
438+
439+
defer cancel()
440+
417441
p, status := c.server.services.call(ctx, request.req.Service, request.req.Method, request.req.Payload)
418442
resp := &Response{
419443
Status: status.Proto(),
@@ -454,3 +478,27 @@ func (c *serverConn) run(sctx context.Context) {
454478
}
455479
}
456480
}
481+
482+
var noopFunc = func() {}
483+
484+
func getRequestContext(ctx context.Context, req *Request) (retCtx context.Context, cancel func(), err error) {
485+
cancel = noopFunc
486+
md := req.GetMetadata()
487+
488+
deadlineValues, ok := md[metadataDeadlineKey]
489+
if !ok {
490+
return ctx, cancel, nil
491+
}
492+
493+
if len(deadlineValues) == 0 {
494+
return ctx, cancel, nil
495+
}
496+
497+
nano, err := strconv.ParseInt(deadlineValues[0], 10, 64)
498+
if err != nil {
499+
return ctx, cancel, errors.New("invalid value for deadline")
500+
}
501+
502+
ctx, cancel = context.WithDeadline(ctx, time.Unix(0, nano))
503+
return ctx, cancel, nil
504+
}

server_test.go

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"strings"
2525
"sync"
2626
"testing"
27+
"time"
2728

2829
"github.com/gogo/protobuf/proto"
2930
"github.com/pkg/errors"
@@ -57,7 +58,8 @@ func (tc *testingClient) Test(ctx context.Context, req *testPayload) (*testPaylo
5758
}
5859

5960
type testPayload struct {
60-
Foo string `protobuf:"bytes,1,opt,name=foo,proto3"`
61+
Foo string `protobuf:"bytes,1,opt,name=foo,proto3"`
62+
Deadline int64 `protobuf:"varint,2,opt,name=deadline,proto3"`
6163
}
6264

6365
func (r *testPayload) Reset() { *r = testPayload{} }
@@ -68,7 +70,11 @@ func (r *testPayload) ProtoMessage() {}
6870
type testingServer struct{}
6971

7072
func (s *testingServer) Test(ctx context.Context, req *testPayload) (*testPayload, error) {
71-
return &testPayload{Foo: strings.Repeat(req.Foo, 2)}, nil
73+
tp := &testPayload{Foo: strings.Repeat(req.Foo, 2)}
74+
if dl, ok := ctx.Deadline(); ok {
75+
tp.Deadline = dl.UnixNano()
76+
}
77+
return tp, nil
7278
}
7379

7480
// registerTestingService mocks more of what is generated code. Unlike grpc, we
@@ -376,6 +382,34 @@ func TestUnixSocketHandshake(t *testing.T) {
376382
}
377383
}
378384

385+
func TestServerRequestTimeout(t *testing.T) {
386+
var (
387+
ctx, cancel = context.WithDeadline(context.Background(), time.Now().Add(10*time.Minute))
388+
server = mustServer(t)(NewServer())
389+
addr, listener = newTestListener(t)
390+
testImpl = &testingServer{}
391+
client, cleanup = newTestClient(t, addr)
392+
result testPayload
393+
)
394+
defer cancel()
395+
defer cleanup()
396+
defer listener.Close()
397+
398+
registerTestingService(server, testImpl)
399+
400+
go server.Serve(ctx, listener)
401+
defer server.Shutdown(ctx)
402+
403+
if err := client.Call(ctx, serviceName, "Test", &testPayload{}, &result); err != nil {
404+
t.Fatalf("unexpected error making call: %v", err)
405+
}
406+
407+
dl, _ := ctx.Deadline()
408+
if result.Deadline != dl.UnixNano() {
409+
t.Fatalf("expected deadline %v, actual: %v", dl, result.Deadline)
410+
}
411+
}
412+
379413
func BenchmarkRoundTrip(b *testing.B) {
380414
var (
381415
ctx = context.Background()

types.go

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,34 @@ import (
2323
)
2424

2525
type Request struct {
26-
Service string `protobuf:"bytes,1,opt,name=service,proto3"`
27-
Method string `protobuf:"bytes,2,opt,name=method,proto3"`
28-
Payload []byte `protobuf:"bytes,3,opt,name=payload,proto3"`
26+
Service string `protobuf:"bytes,1,opt,name=service,proto3"`
27+
Method string `protobuf:"bytes,2,opt,name=method,proto3"`
28+
Payload []byte `protobuf:"bytes,3,opt,name=payload,proto3"`
29+
Metadata map[string]*Values `protobuf:"bytes,4,rep,name=metadata,proto3" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
2930
}
3031

3132
func (r *Request) Reset() { *r = Request{} }
3233
func (r *Request) String() string { return fmt.Sprintf("%+#v", r) }
3334
func (r *Request) ProtoMessage() {}
3435

36+
type Metadata map[string][]string
37+
38+
func (r *Request) GetMetadata() Metadata {
39+
md := make(Metadata)
40+
for k, v := range r.Metadata {
41+
md[k] = v.Values
42+
}
43+
return md
44+
}
45+
46+
type Values struct {
47+
Values []string `protobuf:"bytes,1,rep,name=values,proto3"`
48+
}
49+
50+
func (v *Values) Reset() { *v = Values{} }
51+
func (v *Values) String() string { return fmt.Sprintf("%+#v", v) }
52+
func (v *Values) ProtoMessage() {}
53+
3554
type Response struct {
3655
Status *spb.Status `protobuf:"bytes,1,opt,name=status,proto3"`
3756
Payload []byte `protobuf:"bytes,2,opt,name=payload,proto3"`

0 commit comments

Comments
 (0)