-
Notifications
You must be signed in to change notification settings - Fork 29
Implement simultaneous open extension #42
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 8 commits
0a5c7a1
5da27f8
276e57e
ef959da
b087b1b
2c5076b
85f04aa
31c07c1
890f066
4c3567f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -2,8 +2,11 @@ package multistream | |||||
|
|
||||||
| import ( | ||||||
| "bytes" | ||||||
| "crypto/rand" | ||||||
| "encoding/base64" | ||||||
| "errors" | ||||||
| "io" | ||||||
| "strings" | ||||||
| ) | ||||||
|
|
||||||
| // ErrNotSupported is the error returned when the muxer does not support | ||||||
|
|
@@ -14,6 +17,10 @@ var ErrNotSupported = errors.New("protocol not supported") | |||||
| // specified. | ||||||
| var ErrNoProtocols = errors.New("no protocols specified") | ||||||
|
|
||||||
| var ( | ||||||
| tieBreakerPrefix = "select:" | ||||||
| ) | ||||||
|
|
||||||
| // SelectProtoOrFail performs the initial multistream handshake | ||||||
| // to inform the muxer of the protocol that will be used to communicate | ||||||
| // on this ReadWriteCloser. It returns an error if, for example, | ||||||
|
|
@@ -22,8 +29,10 @@ func SelectProtoOrFail(proto string, rwc io.ReadWriteCloser) error { | |||||
| errCh := make(chan error, 1) | ||||||
| go func() { | ||||||
| var buf bytes.Buffer | ||||||
| delimWrite(&buf, []byte(ProtocolID)) | ||||||
| delimWrite(&buf, []byte(proto)) | ||||||
| if err := delitmWriteAll(&buf, []byte(ProtocolID), []byte(proto)); err != nil { | ||||||
| errCh <- err | ||||||
| return | ||||||
| } | ||||||
| _, err := io.Copy(rwc, &buf) | ||||||
| errCh <- err | ||||||
| }() | ||||||
|
|
@@ -61,7 +70,80 @@ func SelectOneOf(protos []string, rwc io.ReadWriteCloser) (string, error) { | |||||
| default: | ||||||
| return "", err | ||||||
| } | ||||||
| for _, p := range protos[1:] { | ||||||
| return selectProtosOrFail(protos[1:], rwc) | ||||||
| } | ||||||
|
|
||||||
| // Performs protocol negotiation with the simultaneous open extension; the returned boolean | ||||||
| // indicator will be true if we should act as a server. | ||||||
| func SelectWithSimopenOrFail(protos []string, rwc io.ReadWriteCloser) (string, bool, error) { | ||||||
| if len(protos) == 0 { | ||||||
| return "", false, ErrNoProtocols | ||||||
| } | ||||||
|
|
||||||
| werrCh := make(chan error, 1) | ||||||
| go func() { | ||||||
| var buf bytes.Buffer | ||||||
| if err := delitmWriteAll(&buf, []byte(ProtocolID), []byte("iamclient"), []byte(protos[0])); err != nil { | ||||||
| werrCh <- err | ||||||
| return | ||||||
| } | ||||||
|
|
||||||
| _, err := io.Copy(rwc, &buf) | ||||||
| werrCh <- err | ||||||
| }() | ||||||
|
|
||||||
| err := readMultistreamHeader(rwc) | ||||||
| if err != nil { | ||||||
| return "", false, err | ||||||
| } | ||||||
|
|
||||||
| tok, err := ReadNextToken(rwc) | ||||||
| if err != nil { | ||||||
| return "", false, err | ||||||
| } | ||||||
|
|
||||||
| if err = <-werrCh; err != nil { | ||||||
| return "", false, err | ||||||
| } | ||||||
|
|
||||||
| switch tok { | ||||||
| case "iamclient": | ||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: define a string constant. |
||||||
| // simultaneous open | ||||||
| return simOpen(protos, rwc) | ||||||
|
|
||||||
| case "na": | ||||||
| // client open | ||||||
| proto, err := clientOpen(protos, rwc) | ||||||
Stebalien marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
| if err != nil { | ||||||
| return "", false, err | ||||||
| } | ||||||
|
Comment on lines
+120
to
+122
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: do we need this |
||||||
|
|
||||||
| return proto, false, nil | ||||||
|
|
||||||
| default: | ||||||
| return "", false, errors.New("unexpected response: " + tok) | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| func clientOpen(protos []string, rwc io.ReadWriteCloser) (string, error) { | ||||||
| // check to see if we selected the pipelined protocol | ||||||
| tok, err := ReadNextToken(rwc) | ||||||
| if err != nil { | ||||||
| return "", err | ||||||
| } | ||||||
|
|
||||||
| switch tok { | ||||||
| case protos[0]: | ||||||
| return tok, nil | ||||||
| case "na": | ||||||
| return selectProtosOrFail(protos[1:], rwc) | ||||||
| default: | ||||||
| return "", errors.New("unexpected response: " + tok) | ||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: |
||||||
| } | ||||||
| } | ||||||
|
|
||||||
| func selectProtosOrFail(protos []string, rwc io.ReadWriteCloser) (string, error) { | ||||||
| for _, p := range protos { | ||||||
| err := trySelect(p, rwc) | ||||||
| switch err { | ||||||
| case nil: | ||||||
|
|
@@ -74,6 +156,141 @@ func SelectOneOf(protos []string, rwc io.ReadWriteCloser) (string, error) { | |||||
| return "", ErrNotSupported | ||||||
| } | ||||||
|
|
||||||
| func simOpen(protos []string, rwc io.ReadWriteCloser) (string, bool, error) { | ||||||
| mynonce := make([]byte, 32) | ||||||
| _, err := rand.Read(mynonce) | ||||||
| if err != nil { | ||||||
| return "", false, err | ||||||
| } | ||||||
|
|
||||||
| werrCh := make(chan error, 1) | ||||||
| go func() { | ||||||
| myselect := []byte(tieBreakerPrefix + base64.StdEncoding.EncodeToString(mynonce)) | ||||||
| err := delimWriteBuffered(rwc, myselect) | ||||||
| werrCh <- err | ||||||
| }() | ||||||
|
|
||||||
| // skip exactly one protocol | ||||||
| // see https://github.com/multiformats/go-multistream/pull/42#discussion_r558757135 | ||||||
| _, err = ReadNextToken(rwc) | ||||||
| if err != nil { | ||||||
| return "", false, err | ||||||
| } | ||||||
|
|
||||||
| // read the tie breaker nonce | ||||||
| tok, err := ReadNextToken(rwc) | ||||||
| if err != nil { | ||||||
| return "", false, err | ||||||
| } | ||||||
| if !strings.HasPrefix(tok, tieBreakerPrefix) { | ||||||
| return "", false, errors.New("tie breaker nonce not sent with the correct prefix") | ||||||
| } | ||||||
|
|
||||||
| if err = <-werrCh; err != nil { | ||||||
| return "", false, err | ||||||
| } | ||||||
|
|
||||||
| peernonce, err := base64.StdEncoding.DecodeString(tok[7:]) | ||||||
|
||||||
| peernonce, err := base64.StdEncoding.DecodeString(tok[7:]) | |
| peernonce, err := base64.StdEncoding.DecodeString(tok[len(tieBreakerPrefix):]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: drop this line.
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will this yield the correct result if one peer sends a "2" and the other one a "0000001"?
On a more general note (and as I've argued on the specs PR), I still believe that using a 256-bit number here is excessive. We definitely can live with a collision probability of 2^-64 (for that matter, 2^-32 would be totally fine as well, and might simplify stuff in JS, where 64 bit numbers tend to be more problematic), which would allow us to use a simple uint64 / uint32 comparison here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why? It doesn't cost anything to use bigger nonces, which makes the probability of collision astronomically low.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why? Because it simplifies the code. I'm still not sure if bytes.Compare will give the correct result if the number is encoded with leading 0s.
With a 32 bit nonce the chance is already astronomically small. At this probability, you'd expect a collision every 2^31 connection attempts. Assuming you're performing 10 simultaneous connects per second (I don't think any node will ever do this), you'll have to wait for almost 7 years until you get the first collision. With a 64 bit nonce, you'd have to wait about 30 billion years, twice the age of the universe.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the idea of using a random uint64 is a reasonable one and have made the change in the PR. Please let me know if there are any strong objections against this and we can discuss this in depth.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@marten-seemann the code was correct. It took a random byte string, encoded it, decoded it, then compared the bytes.
Honestly, that's a lot simpler than comparing a 64bit number as we're not relying on encoding order at all.
However, sending a raw number also works and I agree that 32 bytes was overkill for our purposes.
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You actually don't need this case, iamserver is already initialized to false.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this helps readability of the code a bit, so keeping it around.
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably nicer:
if peerNonce == myNonce {
return ...
}
iamserver := peerNonce > myNonceThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think you need a default when using bytes.Compare.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This has been removed.
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe make this a const? I see you're using it a few times.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is done for both initiator and responder.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vyzo When can this EOF happen ? I mean, why do we treat this error separately ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
EOF will happen when the remote side gives up because we have no protocols in common. In general, it's best to avoid returning EOF for actual error cases anyways (usually, you want to either return ErrUnexpectedEOF, or a better error like we're doing here).
Uh oh!
There was an error while loading. Please reload this page.