-
Notifications
You must be signed in to change notification settings - Fork 29
Implement simultaneous open extension #42
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 5 commits
0a5c7a1
5da27f8
276e57e
ef959da
b087b1b
2c5076b
85f04aa
31c07c1
890f066
4c3567f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2,8 +2,12 @@ package multistream | |
|
|
||
| import ( | ||
| "bytes" | ||
| "crypto/rand" | ||
| "encoding/base64" | ||
| "errors" | ||
| "io" | ||
| "math/big" | ||
| "strings" | ||
| ) | ||
|
|
||
| // ErrNotSupported is the error returned when the muxer does not support | ||
|
|
@@ -74,6 +78,244 @@ func SelectOneOf(protos []string, rwc io.ReadWriteCloser) (string, error) { | |
| return "", ErrNotSupported | ||
| } | ||
|
|
||
| // Performs protocol negotiation with the simultaneous open extension; the returned boolean | ||
| // indicator will be true if we should act as a server. | ||
| func SelectWithSimopen(protos []string, rwc io.ReadWriteCloser) (string, bool, error) { | ||
| if len(protos) == 0 { | ||
| return "", false, ErrNoProtocols | ||
| } | ||
|
|
||
| werrCh := make(chan error, 1) | ||
| go func() { | ||
| var buf bytes.Buffer | ||
Stebalien marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| delimWrite(&buf, []byte(ProtocolID)) | ||
| delimWrite(&buf, []byte("iamclient")) | ||
| delimWrite(&buf, []byte(protos[0])) | ||
| _, err := io.Copy(rwc, &buf) | ||
| werrCh <- err | ||
| }() | ||
|
|
||
| err := readMultistreamHeader(rwc) | ||
| if err != nil { | ||
| return "", false, err | ||
| } | ||
|
|
||
| tok, err := ReadNextToken(rwc) | ||
| if err != nil { | ||
| return "", false, err | ||
| } | ||
|
|
||
| if err = <-werrCh; err != nil { | ||
| return "", false, err | ||
| } | ||
|
|
||
| switch tok { | ||
| case "iamclient": | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: define a string constant. |
||
| // simultaneous open | ||
| return simOpen(protos, rwc) | ||
|
|
||
| case "na": | ||
| // client open | ||
| proto, err := clientOpen(protos, rwc) | ||
Stebalien marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| if err != nil { | ||
| return "", false, err | ||
| } | ||
|
Comment on lines
+120
to
+122
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: do we need this |
||
|
|
||
| return proto, false, nil | ||
|
|
||
| default: | ||
| return "", false, errors.New("unexpected response: " + tok) | ||
| } | ||
| } | ||
|
|
||
| func clientOpen(protos []string, rwc io.ReadWriteCloser) (string, error) { | ||
| // check to see if we selected the pipelined protocol | ||
| tok, err := ReadNextToken(rwc) | ||
| if err != nil { | ||
| return "", err | ||
| } | ||
|
|
||
| switch tok { | ||
| case protos[0]: | ||
| return tok, nil | ||
| case "na": | ||
| // try the other protos | ||
| for _, p := range protos[1:] { | ||
| err = trySelect(p, rwc) | ||
| switch err { | ||
| case nil: | ||
| return p, nil | ||
| case ErrNotSupported: | ||
| default: | ||
| return "", err | ||
| } | ||
| } | ||
|
|
||
| return "", ErrNotSupported | ||
| default: | ||
| return "", errors.New("unexpected response: " + tok) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: |
||
| } | ||
| } | ||
|
|
||
| func simOpen(protos []string, rwc io.ReadWriteCloser) (string, bool, error) { | ||
| retries := 3 | ||
|
||
|
|
||
| again: | ||
| mynonce := make([]byte, 32) | ||
| _, err := rand.Read(mynonce) | ||
| if err != nil { | ||
| return "", false, err | ||
| } | ||
|
|
||
| werrCh := make(chan error, 1) | ||
| go func() { | ||
| myselect := []byte("select:" + base64.StdEncoding.EncodeToString(mynonce)) | ||
| err := delimWriteBuffered(rwc, myselect) | ||
| werrCh <- err | ||
| }() | ||
|
|
||
| var peerselect string | ||
| for { | ||
| tok, err := ReadNextToken(rwc) | ||
| if err != nil { | ||
| return "", false, err | ||
| } | ||
|
|
||
| // this skips pipelined protocol negoatiation | ||
| // keep reading until the token starts with select: | ||
| if strings.HasPrefix(tok, "select:") { | ||
|
||
| peerselect = tok | ||
| break | ||
| } | ||
| } | ||
|
|
||
| if err = <-werrCh; err != nil { | ||
| return "", false, err | ||
| } | ||
|
|
||
| peernonce, err := base64.StdEncoding.DecodeString(peerselect[7:]) | ||
| if err != nil { | ||
| return "", false, err | ||
| } | ||
|
|
||
| var mybig, peerbig big.Int | ||
| var iamserver bool | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: drop this line. |
||
| mybig.SetBytes(mynonce) | ||
| peerbig.SetBytes(peernonce) | ||
|
|
||
| switch mybig.Cmp(&peerbig) { | ||
| case -1: | ||
| // peer nonce bigger, he is client | ||
| iamserver = true | ||
|
|
||
| case 1: | ||
|
||
| // my nonce bigger, i am client | ||
| iamserver = false | ||
|
||
|
|
||
| case 0: | ||
|
||
| // wtf, the world is ending! try again. | ||
| if retries > 0 { | ||
| retries-- | ||
| goto again | ||
| } | ||
|
|
||
| return "", false, errors.New("failed client selection; identical nonces!") | ||
|
|
||
| default: | ||
|
||
| return "", false, errors.New("wut? bigint.Cmp returned unexpected value") | ||
| } | ||
|
|
||
| var proto string | ||
| if iamserver { | ||
| proto, err = simOpenSelectServer(protos, rwc) | ||
| } else { | ||
| proto, err = simOpenSelectClient(protos, rwc) | ||
| } | ||
|
|
||
| return proto, iamserver, err | ||
| } | ||
|
|
||
| func simOpenSelectServer(protos []string, rwc io.ReadWriteCloser) (string, error) { | ||
| werrCh := make(chan error, 1) | ||
| go func() { | ||
| err := delimWriteBuffered(rwc, []byte("responder")) | ||
| werrCh <- err | ||
| }() | ||
|
|
||
| tok, err := ReadNextToken(rwc) | ||
| if err != nil { | ||
| return "", err | ||
| } | ||
| if tok != "initiator" { | ||
|
||
| return "", errors.New("unexpected response: " + tok) | ||
| } | ||
| if err = <-werrCh; err != nil { | ||
| return "", err | ||
| } | ||
|
|
||
| for { | ||
| tok, err = ReadNextToken(rwc) | ||
|
|
||
| if err == io.EOF { | ||
| return "", ErrNotSupported | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @vyzo When can this EOF happen ? I mean, why do we treat this error separately ?
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. EOF will happen when the remote side gives up because we have no protocols in common. In general, it's best to avoid returning EOF for actual error cases anyways (usually, you want to either return ErrUnexpectedEOF, or a better error like we're doing here). |
||
| } | ||
|
|
||
| if err != nil { | ||
| return "", err | ||
| } | ||
|
|
||
| for _, p := range protos { | ||
| if tok == p { | ||
| err = delimWriteBuffered(rwc, []byte(p)) | ||
| if err != nil { | ||
| return "", err | ||
| } | ||
|
|
||
| return p, nil | ||
| } | ||
| } | ||
|
|
||
| err = delimWriteBuffered(rwc, []byte("na")) | ||
| if err != nil { | ||
| return "", err | ||
| } | ||
| } | ||
|
|
||
| } | ||
|
|
||
| func simOpenSelectClient(protos []string, rwc io.ReadWriteCloser) (string, error) { | ||
| werrCh := make(chan error, 1) | ||
| go func() { | ||
| err := delimWriteBuffered(rwc, []byte("initiator")) | ||
| werrCh <- err | ||
| }() | ||
|
|
||
| tok, err := ReadNextToken(rwc) | ||
| if err != nil { | ||
| return "", err | ||
| } | ||
| if tok != "responder" { | ||
| return "", errors.New("unexpected response: " + tok) | ||
| } | ||
| if err = <-werrCh; err != nil { | ||
| return "", err | ||
| } | ||
|
|
||
| for _, p := range protos { | ||
| err = trySelect(p, rwc) | ||
| switch err { | ||
| case nil: | ||
| return p, nil | ||
|
|
||
| case ErrNotSupported: | ||
| default: | ||
| return "", err | ||
| } | ||
| } | ||
|
|
||
| return "", ErrNotSupported | ||
| } | ||
|
|
||
| func handshake(rw io.ReadWriter) error { | ||
| errCh := make(chan error, 1) | ||
| go func() { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit:
SelectProtoOrFailSimultanious, or something like that.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is done.