Skip to content
45 changes: 28 additions & 17 deletions channelz/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package service
import (
"context"
"net"
"time"

"github.com/golang/protobuf/ptypes"
wrpb "github.com/golang/protobuf/ptypes/wrappers"
Expand Down Expand Up @@ -62,8 +63,11 @@ type serverImpl struct {
channelzgrpc.UnimplementedChannelzServer
}

func connectivityStateToProto(s connectivity.State) *channelzpb.ChannelConnectivityState {
switch s {
func connectivityStateToProto(s *connectivity.State) *channelzpb.ChannelConnectivityState {
if s == nil {
return &channelzpb.ChannelConnectivityState{State: channelzpb.ChannelConnectivityState_UNKNOWN}
}
switch *s {
case connectivity.Idle:
return &channelzpb.ChannelConnectivityState{State: channelzpb.ChannelConnectivityState_IDLE}
case connectivity.Connecting:
Expand Down Expand Up @@ -108,18 +112,25 @@ func channelTraceToProto(ct *channelz.ChannelTrace) *channelzpb.ChannelTrace {
return pbt
}

func strFromPointer(s *string) string {
if s == nil {
return ""
}
return *s
}

func channelMetricToProto(cm *channelz.ChannelMetric) *channelzpb.Channel {
c := &channelzpb.Channel{}
c.Ref = &channelzpb.ChannelRef{ChannelId: cm.ID, Name: cm.RefName}

c.Data = &channelzpb.ChannelData{
State: connectivityStateToProto(cm.ChannelData.State),
Target: cm.ChannelData.Target,
CallsStarted: cm.ChannelData.CallsStarted,
CallsSucceeded: cm.ChannelData.CallsSucceeded,
CallsFailed: cm.ChannelData.CallsFailed,
State: connectivityStateToProto(cm.ChannelData.State.Load()),
Target: strFromPointer(cm.ChannelData.Target.Load()),
CallsStarted: cm.ChannelData.CallsStarted.Load(),
CallsSucceeded: cm.ChannelData.CallsSucceeded.Load(),
CallsFailed: cm.ChannelData.CallsFailed.Load(),
}
if ts, err := ptypes.TimestampProto(cm.ChannelData.LastCallStartedTimestamp); err == nil {
if ts, err := ptypes.TimestampProto(time.Unix(0, cm.ChannelData.LastCallStartedTimestamp.Load())); err == nil {
c.Data.LastCallStartedTimestamp = ts
}
nestedChans := make([]*channelzpb.ChannelRef, 0, len(cm.NestedChans))
Expand All @@ -143,13 +154,13 @@ func subChannelMetricToProto(cm *channelz.SubChannelMetric) *channelzpb.Subchann
sc.Ref = &channelzpb.SubchannelRef{SubchannelId: cm.ID, Name: cm.RefName}

sc.Data = &channelzpb.ChannelData{
State: connectivityStateToProto(cm.ChannelData.State),
Target: cm.ChannelData.Target,
CallsStarted: cm.ChannelData.CallsStarted,
CallsSucceeded: cm.ChannelData.CallsSucceeded,
CallsFailed: cm.ChannelData.CallsFailed,
State: connectivityStateToProto(cm.ChannelData.State.Load()),
Target: strFromPointer(cm.ChannelData.Target.Load()),
CallsStarted: cm.ChannelData.CallsStarted.Load(),
CallsSucceeded: cm.ChannelData.CallsSucceeded.Load(),
CallsFailed: cm.ChannelData.CallsFailed.Load(),
}
if ts, err := ptypes.TimestampProto(cm.ChannelData.LastCallStartedTimestamp); err == nil {
if ts, err := ptypes.TimestampProto(time.Unix(0, cm.ChannelData.LastCallStartedTimestamp.Load())); err == nil {
sc.Data.LastCallStartedTimestamp = ts
}

Expand Down Expand Up @@ -247,7 +258,7 @@ func socketMetricToProto(sm *channelz.SocketMetric) *channelzpb.Socket {
}

func (s *serverImpl) GetTopChannels(ctx context.Context, req *channelzpb.GetTopChannelsRequest) (*channelzpb.GetTopChannelsResponse, error) {
metrics, end := channelz.GetTopChannels(req.GetStartChannelId(), req.GetMaxResults())
metrics, end := channelz.GetTopChannels(req.GetStartChannelId(), int(req.GetMaxResults()))
resp := &channelzpb.GetTopChannelsResponse{}
for _, m := range metrics {
resp.Channel = append(resp.Channel, channelMetricToProto(m))
Expand Down Expand Up @@ -278,7 +289,7 @@ func serverMetricToProto(sm *channelz.ServerMetric) *channelzpb.Server {
}

func (s *serverImpl) GetServers(ctx context.Context, req *channelzpb.GetServersRequest) (*channelzpb.GetServersResponse, error) {
metrics, end := channelz.GetServers(req.GetStartServerId(), req.GetMaxResults())
metrics, end := channelz.GetServers(req.GetStartServerId(), int(req.GetMaxResults()))
resp := &channelzpb.GetServersResponse{}
for _, m := range metrics {
resp.Server = append(resp.Server, serverMetricToProto(m))
Expand All @@ -288,7 +299,7 @@ func (s *serverImpl) GetServers(ctx context.Context, req *channelzpb.GetServersR
}

func (s *serverImpl) GetServerSockets(ctx context.Context, req *channelzpb.GetServerSocketsRequest) (*channelzpb.GetServerSocketsResponse, error) {
metrics, end := channelz.GetServerSockets(req.GetServerId(), req.GetStartSocketId(), req.GetMaxResults())
metrics, end := channelz.GetServerSockets(req.GetServerId(), req.GetStartSocketId(), int(req.GetMaxResults()))
resp := &channelzpb.GetServerSocketsResponse{}
for _, m := range metrics {
resp.SocketRef = append(resp.SocketRef, &channelzpb.SocketRef{SocketId: m.ID, Name: m.RefName})
Expand Down
132 changes: 54 additions & 78 deletions channelz/service/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,26 +60,6 @@ var protoToSocketOpt protoToSocketOptFunc

const defaultTestTimeout = 10 * time.Second

type dummyChannel struct {
state connectivity.State
target string
callsStarted int64
callsSucceeded int64
callsFailed int64
lastCallStartedTimestamp time.Time
}

func (d *dummyChannel) ChannelzMetric() *channelz.ChannelInternalMetric {
return &channelz.ChannelInternalMetric{
State: d.state,
Target: d.target,
CallsStarted: d.callsStarted,
CallsSucceeded: d.callsSucceeded,
CallsFailed: d.callsFailed,
LastCallStartedTimestamp: d.lastCallStartedTimestamp,
}
}

type dummyServer struct {
callsStarted int64
callsSucceeded int64
Expand Down Expand Up @@ -138,32 +118,35 @@ func (d *dummySocket) ChannelzMetric() *channelz.SocketInternalMetric {
}
}

func channelProtoToStruct(c *channelzpb.Channel) (*dummyChannel, error) {
dc := &dummyChannel{}
func channelProtoToStruct(c *channelzpb.Channel) (*channelz.ChannelInternalMetric, error) {
cm := &channelz.ChannelInternalMetric{}
pdata := c.GetData()
var s connectivity.State
switch pdata.GetState().GetState() {
case channelzpb.ChannelConnectivityState_UNKNOWN:
// TODO: what should we set here?
case channelzpb.ChannelConnectivityState_IDLE:
dc.state = connectivity.Idle
s = connectivity.Idle
case channelzpb.ChannelConnectivityState_CONNECTING:
dc.state = connectivity.Connecting
s = connectivity.Connecting
case channelzpb.ChannelConnectivityState_READY:
dc.state = connectivity.Ready
s = connectivity.Ready
case channelzpb.ChannelConnectivityState_TRANSIENT_FAILURE:
dc.state = connectivity.TransientFailure
s = connectivity.TransientFailure
case channelzpb.ChannelConnectivityState_SHUTDOWN:
dc.state = connectivity.Shutdown
}
dc.target = pdata.GetTarget()
dc.callsStarted = pdata.CallsStarted
dc.callsSucceeded = pdata.CallsSucceeded
dc.callsFailed = pdata.CallsFailed
s = connectivity.Shutdown
}
cm.State.Store(&s)
tgt := pdata.GetTarget()
cm.Target.Store(&tgt)
cm.CallsStarted.Store(pdata.CallsStarted)
cm.CallsSucceeded.Store(pdata.CallsSucceeded)
cm.CallsFailed.Store(pdata.CallsFailed)
if err := pdata.GetLastCallStartedTimestamp().CheckValid(); err != nil {
return nil, err
}
dc.lastCallStartedTimestamp = pdata.GetLastCallStartedTimestamp().AsTime()
return dc, nil
cm.LastCallStartedTimestamp.Store(int64(pdata.GetLastCallStartedTimestamp().AsTime().UnixNano()))
return cm, nil
}

func serverProtoToStruct(s *channelzpb.Server) (*dummyServer, error) {
Expand Down Expand Up @@ -279,36 +262,37 @@ func init() {
}

func (s) TestGetTopChannels(t *testing.T) {
tcs := []*dummyChannel{
{
state: connectivity.Connecting,
target: "test.channelz:1234",
callsStarted: 6,
callsSucceeded: 2,
callsFailed: 3,
lastCallStartedTimestamp: time.Now().UTC(),
},
{
state: connectivity.Connecting,
target: "test.channelz:1234",
callsStarted: 1,
callsSucceeded: 2,
callsFailed: 3,
lastCallStartedTimestamp: time.Now().UTC(),
},
{
state: connectivity.Shutdown,
target: "test.channelz:8888",
callsStarted: 0,
callsSucceeded: 0,
callsFailed: 0,
},
{},
tcs := []*channelz.ChannelInternalMetric{
channelz.NewChannelInternalMetricForTesting(
connectivity.Connecting,
"test.channelz:1234",
6,
2,
3,
time.Now().UTC().UnixNano(),
),
channelz.NewChannelInternalMetricForTesting(
connectivity.Connecting,
"test.channelz:1234",
1,
2,
3,
time.Now().UTC().UnixNano(),
),
channelz.NewChannelInternalMetricForTesting(
connectivity.Shutdown,
"test.channelz:8888",
0,
0,
0,
0,
),
}

for _, c := range tcs {
id := channelz.RegisterChannel(c, nil, "")
defer channelz.RemoveEntry(id)
cz := channelz.RegisterChannel(nil, "")
cz.Metrics().CopyFrom(c)
defer channelz.RemoveEntry(cz.ID())
}
s := newCZServer()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
Expand All @@ -322,13 +306,13 @@ func (s) TestGetTopChannels(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if diff := cmp.Diff(tcs[i], channel, protocmp.Transform(), cmp.AllowUnexported(dummyChannel{})); diff != "" {
if diff := cmp.Diff(tcs[i], channel, protocmp.Transform()); diff != "" {
t.Fatalf("unexpected channel, diff (-want +got):\n%s", diff)
}
}
for i := 0; i < 50; i++ {
id := channelz.RegisterChannel(tcs[0], nil, "")
defer channelz.RemoveEntry(id)
cz := channelz.RegisterChannel(nil, "")
defer channelz.RemoveEntry(cz.ID())
}
resp, _ = s.GetTopChannels(ctx, &channelzpb.GetTopChannelsRequest{StartChannelId: 0})
if resp.GetEnd() {
Expand Down Expand Up @@ -460,13 +444,13 @@ func (s) TestGetServerSocketsNonZeroStartID(t *testing.T) {
func (s) TestGetChannel(t *testing.T) {
refNames := []string{"top channel 1", "nested channel 1", "sub channel 2", "nested channel 3"}
ids := make([]*channelz.Identifier, 4)
ids[0] = channelz.RegisterChannel(&dummyChannel{}, nil, refNames[0])
ids[0] = channelz.RegisterChannel(nil, refNames[0]).ID()
channelz.AddTraceEvent(logger, ids[0], 0, &channelz.TraceEventDesc{
Desc: "Channel Created",
Severity: channelz.CtInfo,
})

ids[1] = channelz.RegisterChannel(&dummyChannel{}, ids[0], refNames[1])
ids[1] = channelz.RegisterChannel(ids[0], refNames[1]).ID()
channelz.AddTraceEvent(logger, ids[1], 0, &channelz.TraceEventDesc{
Desc: "Channel Created",
Severity: channelz.CtInfo,
Expand All @@ -476,11 +460,7 @@ func (s) TestGetChannel(t *testing.T) {
},
})

var err error
ids[2], err = channelz.RegisterSubChannel(&dummyChannel{}, ids[0], refNames[2])
if err != nil {
t.Fatalf("channelz.RegisterSubChannel() failed: %v", err)
}
ids[2] = channelz.RegisterSubChannel(ids[0], refNames[2]).ID()
channelz.AddTraceEvent(logger, ids[2], 0, &channelz.TraceEventDesc{
Desc: "SubChannel Created",
Severity: channelz.CtInfo,
Expand All @@ -490,7 +470,7 @@ func (s) TestGetChannel(t *testing.T) {
},
})

ids[3] = channelz.RegisterChannel(&dummyChannel{}, ids[1], refNames[3])
ids[3] = channelz.RegisterChannel(ids[1], refNames[3]).ID()
channelz.AddTraceEvent(logger, ids[3], 0, &channelz.TraceEventDesc{
Desc: "Channel Created",
Severity: channelz.CtInfo,
Expand Down Expand Up @@ -572,16 +552,12 @@ func (s) TestGetSubChannel(t *testing.T) {

refNames := []string{"top channel 1", "sub channel 1", "socket 1", "socket 2"}
ids := make([]*channelz.Identifier, 4)
ids[0] = channelz.RegisterChannel(&dummyChannel{}, nil, refNames[0])
ids[0] = channelz.RegisterChannel(nil, refNames[0]).ID()
channelz.AddTraceEvent(logger, ids[0], 0, &channelz.TraceEventDesc{
Desc: "Channel Created",
Severity: channelz.CtInfo,
})
var err error
ids[1], err = channelz.RegisterSubChannel(&dummyChannel{}, ids[0], refNames[1])
if err != nil {
t.Fatalf("channelz.RegisterSubChannel() failed: %v", err)
}
ids[1] = channelz.RegisterSubChannel(ids[0], refNames[1]).ID()
channelz.AddTraceEvent(logger, ids[1], 0, &channelz.TraceEventDesc{
Desc: subchanCreated,
Severity: channelz.CtInfo,
Expand Down
Loading