Conversation
aaronbee
left a comment
There was a problem hiding this comment.
Looks like some good cleanups. I have suggested a few more in the comments below.
tunnel/tunnel.go
Outdated
| go func() { | ||
| if err := c.streamHandler(ctx, tag, Target{ID: tID, Type: tType}); err != nil { | ||
| select { | ||
| case errCh <- err: |
There was a problem hiding this comment.
If you write to errCh here then Start will return, but the goroutine started on line 1224 will keep running, ie. it will be leaked and any other streamHandler goroutines will be leaked. On the other hand if the goroutine started on line 1224 hits an error then Start will return, but any of these streamHandler goroutines will keep running and be leaked.
This can be refactored to be more simple by removing the outer goroutine. For example,
ctx, cancel := context.WithCancel(ctx)
defer cancel() // Tell the streamHandler goroutines to exit when this function exits
for {
reg, err := c.s.Recv()
if err != nil {
return err
}
switch reg.Registration.(type) {
case *tpb.RegisterOp_Session:
...
go func() {
if err := c.streamHandler(ctx, ...); err != nil {
log.Printf("streamHandler error: %v", err)
}
}()
This simplifies things by removing one goroutine and the error channel. The code can now just return errors. If an error is encountered Start will return and cancel the context passed to any streamHandler goroutines, which should cause them to end. This means nothing is leaked. A streamHandler error doesn't cause this whole function to exit anymore. I'm not sure if that is desirable or not?
There was a problem hiding this comment.
Thanks for the comment. I removed the goroutine. It looks nicer.
For the streamHandler, think we probably still need it to propagate the error for downstream. For example, in conn.go Listener.Accept will check if an error or a connection is received in order to proceeds. So I kept the errCh here.
tunnel/tunnel.go
Outdated
| } | ||
| }() | ||
| } | ||
| return <-errCh |
There was a problem hiding this comment.
This code is unreachable because the for loop never breaks, it only returns.
A way you could make an error from a streamHandler cause everything to shut down is to store a context cancel function and error inside the Client struct. In register, before creating the stream, you create a context with a cancel func, you store this cancel func on the Client. The context gets passed c.tc.Register.
type Client struct {
cancelFunc func()
err error
}
func (c *Client) cancel(err error) {
c.mu.Lock()
defer c.mu.Unlock()
c.cancelFunc()
if c.err != nil {
c.err = err
}
}
func (c *Client) Register(ctx) {
...
ctx, c.cancelFunc = context.WithCancel()
stream, err := c.tc.Register(ctx, c.cc.Opts...)
...
}
func (c *Client) Start(ctx) error {
...
for {
reg, err := c.rs.Recv()
if err != nil {
c.cancel(err)
return c.err
}
...
go func () {
if err := c.streamHandler(ctx); err != nil {
c.cancel(err) // Cancels the stream's context, so the next call on the stream will return an error. This will cause the outer for loop to exit.
}
}()
By creating the cancel func at the very beginning you have the ability to cancel the stream wherever you want.
There was a problem hiding this comment.
Thanks for the suggestion. It looks great in this way. I have updated the code as sugguested.
aaronbee
left a comment
There was a problem hiding this comment.
Thanks for the changes. Just one more comment below.
| if err := client.Start(ctx); err != nil { | ||
| errCh <- err | ||
| } | ||
| if err := client.Error(); err != nil { |
There was a problem hiding this comment.
client.Start is always going to return an error before you get to this line. It is likely to be a context.Cancelled error if one of the streamHandler goroutines hits a different error first. This defeats the purpose of reading client.Error() here. What you could do instead is have client.Start return c.Error(). Any place that Start hits an error replace:
return err
with
c.cancel(err)
return c.Error()
That way client.Start always returns the first error it encountered and you will be able to remove the client.Error() check here.
There was a problem hiding this comment.
Actually, a smaller and perhaps better change you could make here is to leave client.Start how you currently have it and replace the code here with:
c.cancel(client.Start(ctx))
if err := c.Error(); err != nil {
errCh <- err
}
This ensures you always call cancel when Start returns, which you need to do to clean up any streamHandler goroutines and gets you the most recent (most interesting) error.
There was a problem hiding this comment.
Thanks for the comments. I was thinking of calling cancel as a defer within Start, but that will involves slightly more change (similar to your first proposal). I ended up following your most recent proposal, which looks good too.
There was a problem hiding this comment.
Sorry, to go back-and-forth here. I actually do like your idea better of having cancel called inside Start, and having Start return c.Error(). That way other users of Start don't have to reimplement what you do inside Run.
| if err := client.Start(ctx); err != nil { | ||
| errCh <- err | ||
| } | ||
| if err := client.Error(); err != nil { |
There was a problem hiding this comment.
Sorry, to go back-and-forth here. I actually do like your idea better of having cancel called inside Start, and having Start return c.Error(). That way other users of Start don't have to reimplement what you do inside Run.
| // Cancel cancels goroutine from Start() and streamHandler(), and records error. | ||
| func (c *Client) Cancel(err error) { | ||
| // Avoid calling multiple times. | ||
| if c.cancelFunc == nil { |
There was a problem hiding this comment.
This should be done in the protected section, ie. after the Lock() call.
There was a problem hiding this comment.
Yes, defer is probably a better solution here.
conn.go wraps tunnel client and server, and returns net.Conn.