Skip to content
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
1225f0b
Another direction for package server
peterbourgon Mar 5, 2015
1a47496
Clarifications
peterbourgon Mar 5, 2015
b5e562a
Fix proto definition
peterbourgon Mar 5, 2015
d37eb84
Illustrate metrics
peterbourgon Mar 12, 2015
1c2aff6
Per-transport fielded metrics
peterbourgon Mar 12, 2015
7609672
addsvc: protobuf3 compile script
peterbourgon Mar 14, 2015
c937cf4
addsvc: add Thrift bindings
peterbourgon Mar 14, 2015
1a0510c
server: context checking to Endpoint
peterbourgon Mar 14, 2015
bc6b60e
addsvc: small rename
peterbourgon Mar 14, 2015
b8402c6
Many server and transport enhancements
peterbourgon Mar 17, 2015
8f82101
Consistently use alice-style chainable middlewares
peterbourgon Mar 18, 2015
7c5fc0c
addsvc: stub out Thrift until they fix codegen
peterbourgon Mar 18, 2015
2952440
Merge branch 'master' into package-server
peterbourgon Mar 18, 2015
b1de212
transport: http: tests for CORS
peterbourgon Mar 27, 2015
eb649c6
addsvc: use e.g. cors and gzip middlewares directly
peterbourgon Mar 27, 2015
f3eb132
addsvc: fix import order
peterbourgon Mar 27, 2015
97ac7c6
Merge branch 'master' into package-server
peterbourgon Apr 13, 2015
2ffa5ab
New protoc and gRPC
peterbourgon Apr 13, 2015
8b1f69c
Improvements surrounding HTTP and Zipkin
peterbourgon Apr 14, 2015
772919e
update_deps.bash helper script
peterbourgon Apr 14, 2015
19d29d3
Zipkin moves to new package tracing
peterbourgon Apr 14, 2015
a1ab6e5
addsvc: thrift compile.sh custom thrift_import
peterbourgon Apr 15, 2015
906bc35
tracing: changes to Zipkin
peterbourgon Apr 15, 2015
f4b7976
tracing: name symmetry: FromHTTP, ToContext
peterbourgon Apr 15, 2015
7f09517
addsvc: Thrift compile.sh: fix thrift_import
peterbourgon Apr 17, 2015
0483d05
zipkin: (Scribe) collector
peterbourgon Apr 17, 2015
4c83964
zipkin: fix Scribe collector
peterbourgon Apr 17, 2015
fb1a4e4
zipkin: thrift to _thrift
peterbourgon Apr 17, 2015
e72a454
addsvc: re-add Thrift binding
peterbourgon Apr 17, 2015
da28ae6
zipkin: improve Scribe collector and tests
peterbourgon Apr 17, 2015
079ad1f
Merge branch 'master' into package-server
peterbourgon Apr 27, 2015
85c9ec9
Merge branch 'master' into package-server
peterbourgon Apr 27, 2015
667b0bf
log: support writing to stdlib logger
peterbourgon Apr 27, 2015
ab1c14c
transport: comment formatting
peterbourgon Apr 27, 2015
5ceb3fd
Merge branch 'master' into package-server
peterbourgon Apr 27, 2015
4e4efb3
define and invoke log.DefaultLogger where appropriate
peterbourgon Apr 27, 2015
144bfe8
tracing: zipkin: log 'err' rather than 'error'
peterbourgon Apr 28, 2015
8c863f7
addsvc: new protoc defs
peterbourgon Apr 28, 2015
abb8001
addsvc: uniform IDL compilation scripts
peterbourgon Apr 28, 2015
4a1ed28
Tracing and logging upgrades to addsvc
peterbourgon Apr 29, 2015
9355934
Temporary cleanup for log.Caller
peterbourgon Apr 29, 2015
383fa68
addsvc: proper structured logging
peterbourgon Apr 29, 2015
bfcc5b9
log: fix comment
peterbourgon Apr 29, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions addsvc/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
addsvc
22 changes: 22 additions & 0 deletions addsvc/add.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package main

// Add is the abstract definition of what this service does.
// It could easily be an interface type with multiple methods.
type Add func(int64, int64) int64

func pureAdd(a, b int64) int64 { return a + b }

func addVia(r Resource) Add {
return func(a, b int64) int64 {
return r.Value(a) + r.Value(b)
}
}

// Resource represents some dependency, outside of our control.
type Resource interface {
Value(int64) int64
}

type mockResource struct{}

func (mockResource) Value(i int64) int64 { return i }
32 changes: 32 additions & 0 deletions addsvc/endpoint.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package main

import (
"github.com/peterbourgon/gokit/server"
"golang.org/x/net/context"
)

// makeEndpoint returns a server.Endpoint wrapping the passed Add. If Add were
// an interface with multiple methods, we'd need individual endpoints for
// each.
//
// This function is just boiler-plate; in theory, it could be generated.
func makeEndpoint(a Add) server.Endpoint {
return func(ctx context.Context, req server.Request) (server.Response, error) {
select {
case <-ctx.Done():
return nil, server.ErrContextCanceled
default:
}

addReq, ok := req.(*request)
if !ok {
return nil, server.ErrBadCast
}

v := a(addReq.A, addReq.B)

return response{
V: v,
}, nil
}
}
22 changes: 22 additions & 0 deletions addsvc/enhancements.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package main

import (
"encoding/json"
"io"
"time"
)

func logging(w io.Writer, add Add) Add {
return func(a, b int64) (v int64) {
defer func(begin time.Time) {
json.NewEncoder(w).Encode(map[string]interface{}{
"a": a,
"b": b,
"result": v,
"took": time.Since(begin),
})
}(time.Now())
v = add(a, b)
return
}
}
50 changes: 50 additions & 0 deletions addsvc/grpc_binding.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package main

import (
"time"

"github.com/peterbourgon/gokit/addsvc/pb"
"github.com/peterbourgon/gokit/metrics"
"github.com/peterbourgon/gokit/server"
"golang.org/x/net/context"
)

// A binding wraps an Endpoint so that it's usable by a transport. grpcBinding
// makes an Endpoint usable over gRPC.
type grpcBinding struct{ server.Endpoint }

// Add implements the proto3 AddServer by forwarding to the wrapped Endpoint.
func (b grpcBinding) Add(ctx context.Context, req *pb.AddRequest) (*pb.AddReply, error) {
addReq := request{req.A, req.B}
r, err := b.Endpoint(ctx, addReq)
if err != nil {
return nil, err
}

resp, ok := r.(*response)
if !ok {
return nil, server.ErrBadCast
}

return &pb.AddReply{
V: resp.V,
}, nil
}

func grpcInstrument(requests metrics.Counter, duration metrics.Histogram) func(pb.AddServer) pb.AddServer {
return func(next pb.AddServer) pb.AddServer {
return grpcInstrumented{requests, duration, next}
}
}

type grpcInstrumented struct {
requests metrics.Counter
duration metrics.Histogram
next pb.AddServer
}

func (i grpcInstrumented) Add(ctx context.Context, req *pb.AddRequest) (*pb.AddReply, error) {
i.requests.Add(1)
defer func(begin time.Time) { i.duration.Observe(time.Since(begin).Nanoseconds()) }(time.Now())
return i.next.Add(ctx, req)
}
80 changes: 80 additions & 0 deletions addsvc/httpjson_binding.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package main

import (
"encoding/json"
"io"
"net/http"
"time"

"golang.org/x/net/context"

"github.com/peterbourgon/gokit/metrics"
"github.com/peterbourgon/gokit/server"
"github.com/peterbourgon/gokit/server/zipkin"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

strong suggestion to make the tracing support pluggable... happy to elaborate/justify as desired!

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd agree w/ @bensigelman on this - for example right now this is http only - what if you have a queue that is blowing up and it talks tcp but not http? - you prob wouldn't want to support everything in the kitchen sink so it'd make sense to be pluggable

"github.com/peterbourgon/gokit/transport/codec"
)

// jsonCodec decodes and encodes requests and responses respectively as JSON.
// It requires that the (package main) request and response structs support
// JSON de/serialization.
//
// This type is mostly boiler-plate; in theory, it could be generated.
type jsonCodec struct{}

func (jsonCodec) Decode(ctx context.Context, r io.Reader) (server.Request, context.Context, error) {
var req request
err := json.NewDecoder(r).Decode(&req)
return &req, ctx, err
}

func (jsonCodec) Encode(w io.Writer, resp server.Response) error {
return json.NewEncoder(w).Encode(resp)
}

// A binding wraps an Endpoint so that it's usable by a transport. httpBinding
// makes an Endpoint usable over HTTP. It combines a parent context, a codec,
// and an endpoint to expose. It implements http.Handler by decoding a request
// from the HTTP request body, and encoding a response to the response writer.
type httpBinding struct {
context.Context // parent context
codec.Codec // how to decode requests and encode responses
contentType string // what we report as the response ContentType
server.Endpoint // the endpoint being bound
}

func (b httpBinding) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Perform HTTP-specific context amendments.
b.Context = zipkin.GetHeaders(b.Context, r.Header)

// Decode request.
req, ctx, err := b.Codec.Decode(b.Context, r.Body)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
b.Context = ctx

// Execute RPC.
resp, err := b.Endpoint(b.Context, req)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

// Encode response.
w.Header().Set("Content-Type", b.contentType)
if err := b.Codec.Encode(w, resp); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}

func httpInstrument(requests metrics.Counter, duration metrics.Histogram) func(http.Handler) http.Handler {
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
requests.Add(1)
defer func(begin time.Time) { duration.Observe(time.Since(begin).Nanoseconds()) }(time.Now())
next.ServeHTTP(w, r)
})
}
}
115 changes: 115 additions & 0 deletions addsvc/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package main

import (
"flag"
"fmt"
"io/ioutil"
"log"
"net"
"net/http"
"os"
"os/signal"
"syscall"
"time"

"golang.org/x/net/context"
"google.golang.org/grpc"

"github.com/peterbourgon/gokit/addsvc/pb"
"github.com/peterbourgon/gokit/metrics"
"github.com/peterbourgon/gokit/metrics/expvar"
"github.com/peterbourgon/gokit/metrics/statsd"
"github.com/peterbourgon/gokit/server"
"github.com/peterbourgon/gokit/server/zipkin"
"github.com/peterbourgon/gokit/transport/http/cors"
)

func main() {
var (
httpJSONAddr = flag.String("http.json.addr", ":8001", "Address for HTTP/JSON server")
grpcTCPAddr = flag.String("grpc.tcp.addr", ":8002", "Address for gRPC (TCP) server")
)
flag.Parse()

// Our business and operational domain
var a Add
a = pureAdd
a = logging(logWriter{}, a)

// `package server` domain
var e server.Endpoint
e = makeEndpoint(a)
e = server.Gate(zipkin.RequireInContext)(e) // must have Zipkin headers
// e = server.ChainableEnhancement(arg1, arg2, e)

// `package metrics` domain
requests := metrics.NewMultiCounter(
expvar.NewCounter("requests"),
statsd.NewCounter(ioutil.Discard, "requests", time.Second),
)
duration := metrics.NewMultiHistogram(
expvar.NewHistogram("duration_ns", 0, 100000000, 3),
statsd.NewHistogram(ioutil.Discard, "duration_ns", time.Second),
)

// Mechanical stuff
root := context.Background()
errc := make(chan error)

go func() {
errc <- interrupt()
}()

// Transport: gRPC
go func() {
ln, err := net.Listen("tcp", *grpcTCPAddr)
if err != nil {
errc <- err
return
}
s := grpc.NewServer() // uses its own context?
field := metrics.Field{Key: "transport", Value: "grpc"}

var addServer pb.AddServer
addServer = grpcBinding{e}
addServer = grpcInstrument(requests.With(field), duration.With(field))(addServer)
// Note that this will always fail, because the Endpoint is gated on
// Zipkin headers, and we don't extract them from the gRPC request.

pb.RegisterAddServer(s, addServer)
log.Printf("gRPC server on TCP %s", *grpcTCPAddr)
errc <- s.Serve(ln)
}()

// Transport: HTTP/JSON
go func() {
ctx, cancel := context.WithCancel(root)
defer cancel()
mux := http.NewServeMux()
field := metrics.Field{Key: "transport", Value: "http"}

var handler http.Handler
handler = httpBinding{ctx, jsonCodec{}, "application/json", e}
handler = httpInstrument(requests.With(field), duration.With(field))(handler)
handler = cors.Middleware(cors.MaxAge(5 * time.Minute))(handler)

mux.Handle("/add", handler)
log.Printf("HTTP/JSON server on %s", *httpJSONAddr)
errc <- http.ListenAndServe(*httpJSONAddr, mux)
}()

log.Fatal(<-errc)
}

type logWriter struct{}

func (logWriter) Write(p []byte) (int, error) {
log.Printf("%s", p)
return len(p), nil
}

func interrupt() error {
c := make(chan os.Signal)
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
return fmt.Errorf("%s", <-c)
}
Loading