From 0c87c7e31827491ea9fcf0a526aef1db856135c5 Mon Sep 17 00:00:00 2001 From: Dmitry Anderson <4nd3r5z0n@gmail.com> Date: Wed, 11 Sep 2024 17:34:08 +0200 Subject: [PATCH 1/5] SplitHTTP HTTPv1 fix Changes made to read responses from a server (and check the response codes) before making a request if HTTPv1 is used --- transport/internet/splithttp/client.go | 1 + transport/internet/splithttp/connection.go | 101 +++++++++++++++++++++ 2 files changed, 102 insertions(+) diff --git a/transport/internet/splithttp/client.go b/transport/internet/splithttp/client.go index e491ef3ea383..ce23bc5fb7be 100644 --- a/transport/internet/splithttp/client.go +++ b/transport/internet/splithttp/client.go @@ -162,6 +162,7 @@ func (c *DefaultDialerClient) SendUploadRequest(ctx context.Context, url string, newConnection := uploadConn == nil if newConnection { uploadConn, err = c.dialUploadConn(context.WithoutCancel(ctx)) + uploadConn = NewConnHolder(uploadConn.(net.Conn)) if err != nil { return err } diff --git a/transport/internet/splithttp/connection.go b/transport/internet/splithttp/connection.go index 697381d4252c..81cfb41bdc26 100644 --- a/transport/internet/splithttp/connection.go +++ b/transport/internet/splithttp/connection.go @@ -1,11 +1,112 @@ package splithttp import ( + "bytes" + "errors" + "fmt" "io" "net" + "strconv" + "strings" "time" ) +var ( + ErrBadRespCode = errors.New("bad response code") + BadCodes = map[int]struct{}{502: {}, 503: {}, 505: {}} +) + +// Optimised to read only response codes +// Reads response codes until getting EOF or error +func ConnHttpReadRespCodes(conn net.Conn) (codes []int, err error) { + buff := &bytes.Buffer{} + if _, err = io.Copy(buff, conn); err != nil { + return nil, err + } + + for { + var line string + line, err = buff.ReadString('\n') + if err != nil && err == io.EOF { + break + } else if err != nil { + return codes, err + } + + if strings.HasPrefix(line, "HTTP/") { + parts := strings.Split(line, " ") + if len(parts) < 2 { + continue + } + if code, err := strconv.Atoi(parts[1]); err == nil { + codes = append(codes, code) + } + } + + for { + line, err := buff.ReadString('\n') + if err != nil && err == io.EOF { + break + } else if err != nil { + return codes, err + } + if line == "\r\n" || line == "\n" { + break // End of headers + } + } + } + + return codes, err +} + +// ConnHolder implements the net.Conn interface +// adds logic of reading the responses before writing the next request +// Used as a bugfix for HTTP1.1 +type ConnHolder struct { + ResponsesToRead int + Conn net.Conn +} + +func NewConnHolder(conn net.Conn) *ConnHolder { + return &ConnHolder{ + ResponsesToRead: 0, + Conn: conn, + } +} + +func (ch *ConnHolder) Read(b []byte) (n int, err error) { + n, err = ch.Conn.Read(b) + if err != nil { + return n, err + } + ch.ResponsesToRead += 1 + return n, err +} + +func (ch *ConnHolder) Write(b []byte) (n int, err error) { + if ch.ResponsesToRead > 0 { + codes, err := ConnHttpReadRespCodes(ch) + if err != nil { + return len(b), err + } + ch.ResponsesToRead -= len(codes) + for _, code := range codes { + if _, isBadCode := BadCodes[code]; isBadCode { + return len(b), errors.Join(ErrBadRespCode, fmt.Errorf("get response code %d", code)) + } + } + } + return ch.Conn.Write(b) +} + +// Just calling the same method on the Connection to implement interface +func (ch *ConnHolder) Close() error { return ch.Conn.Close() } +func (ch *ConnHolder) LocalAddr() net.Addr { return ch.Conn.LocalAddr() } +func (ch *ConnHolder) RemoteAddr() net.Addr { return ch.Conn.RemoteAddr() } +func (ch *ConnHolder) SetDeadline(t time.Time) error { return ch.Conn.SetDeadline(t) } +func (ch *ConnHolder) SetReadDeadline(t time.Time) error { return ch.Conn.SetReadDeadline(t) } +func (ch *ConnHolder) SetWriteDeadline(t time.Time) error { return ch.Conn.SetWriteDeadline(t) } + type splitConn struct { writer io.WriteCloser reader io.ReadCloser From fc7fad48157889d3ed930e5bd5aea5803e35b7dd Mon Sep 17 00:00:00 2001 From: Dmitry Anderson <4nd3r5z0n@gmail.com> Date: Sun, 15 Sep 2024 15:00:15 +0200 Subject: [PATCH 2/5] HTTPv1 fix 2.0 SendUploadRequest reads responses, ConnHolder removed --- .gitignore | 3 + transport/internet/splithttp/client.go | 47 ++++++---- transport/internet/splithttp/connection.go | 101 --------------------- transport/internet/splithttp/dialer.go | 1 - 4 files changed, 31 insertions(+), 121 deletions(-) diff --git a/.gitignore b/.gitignore index c77bc579c8ef..22469b88f8d0 100644 --- a/.gitignore +++ b/.gitignore @@ -28,3 +28,6 @@ errorgen *.dat .vscode /build_assets + +# Output from dlv test +**/debug.* diff --git a/transport/internet/splithttp/client.go b/transport/internet/splithttp/client.go index ce23bc5fb7be..18297c935da5 100644 --- a/transport/internet/splithttp/client.go +++ b/transport/internet/splithttp/client.go @@ -1,8 +1,10 @@ package splithttp import ( + "bufio" "bytes" "context" + "fmt" "io" gonet "net" "net/http" @@ -157,29 +159,36 @@ func (c *DefaultDialerClient) SendUploadRequest(ctx context.Context, url string, var uploadConn any - for { - uploadConn = c.uploadRawPool.Get() - newConnection := uploadConn == nil - if newConnection { - uploadConn, err = c.dialUploadConn(context.WithoutCancel(ctx)) - uploadConn = NewConnHolder(uploadConn.(net.Conn)) - if err != nil { - return err - } + uploadConn = c.uploadRawPool.Get() + newConnection := uploadConn == nil + if newConnection { + uploadConn, err = c.dialUploadConn(context.WithoutCancel(ctx)) + if err != nil { + return err } + } - _, err = uploadConn.(net.Conn).Write(requestBytes.Bytes()) + conn := uploadConn.(net.Conn) + _, err := conn.Write(requestBytes.Bytes()) + // if the write failed, we try another connection from + // the pool, until the write on a new connection fails. + // failed writes to a pooled connection are normal when + // the connection has been closed in the meantime. + if newConnection && err != nil { + return err + } - // if the write failed, we try another connection from - // the pool, until the write on a new connection fails. - // failed writes to a pooled connection are normal when - // the connection has been closed in the meantime. - if err == nil { - break - } else if newConnection { - return err - } + resp, err := http.ReadResponse(bufio.NewReader(conn), req) + if err != nil { + return fmt.Errorf("error while reading response: %s", err.Error()) } + if resp.StatusCode != 200 { + return fmt.Errorf("error response code: %d", resp.StatusCode) + } + + // buff := &bytes.Buffer{} + // _, err := io.Copy(buff, uploadConnTyped) + // common.Must(err) c.uploadRawPool.Put(uploadConn) } diff --git a/transport/internet/splithttp/connection.go b/transport/internet/splithttp/connection.go index 81cfb41bdc26..697381d4252c 100644 --- a/transport/internet/splithttp/connection.go +++ b/transport/internet/splithttp/connection.go @@ -1,112 +1,11 @@ package splithttp import ( - "bytes" - "errors" - "fmt" "io" "net" - "strconv" - "strings" "time" ) -var ( - ErrBadRespCode = errors.New("bad response code") - BadCodes = map[int]struct{}{502: {}, 503: {}, 505: {}} -) - -// Optimised to read only response codes -// Reads response codes until getting EOF or error -func ConnHttpReadRespCodes(conn net.Conn) (codes []int, err error) { - buff := &bytes.Buffer{} - if _, err = io.Copy(buff, conn); err != nil { - return nil, err - } - - for { - var line string - line, err = buff.ReadString('\n') - if err != nil && err == io.EOF { - break - } else if err != nil { - return codes, err - } - - if strings.HasPrefix(line, "HTTP/") { - parts := strings.Split(line, " ") - if len(parts) < 2 { - continue - } - if code, err := strconv.Atoi(parts[1]); err == nil { - codes = append(codes, code) - } - } - - for { - line, err := buff.ReadString('\n') - if err != nil && err == io.EOF { - break - } else if err != nil { - return codes, err - } - if line == "\r\n" || line == "\n" { - break // End of headers - } - } - } - - return codes, err -} - -// ConnHolder implements the net.Conn interface -// adds logic of reading the responses before writing the next request -// Used as a bugfix for HTTP1.1 -type ConnHolder struct { - ResponsesToRead int - Conn net.Conn -} - -func NewConnHolder(conn net.Conn) *ConnHolder { - return &ConnHolder{ - ResponsesToRead: 0, - Conn: conn, - } -} - -func (ch *ConnHolder) Read(b []byte) (n int, err error) { - n, err = ch.Conn.Read(b) - if err != nil { - return n, err - } - ch.ResponsesToRead += 1 - return n, err -} - -func (ch *ConnHolder) Write(b []byte) (n int, err error) { - if ch.ResponsesToRead > 0 { - codes, err := ConnHttpReadRespCodes(ch) - if err != nil { - return len(b), err - } - ch.ResponsesToRead -= len(codes) - for _, code := range codes { - if _, isBadCode := BadCodes[code]; isBadCode { - return len(b), errors.Join(ErrBadRespCode, fmt.Errorf("get response code %d", code)) - } - } - } - return ch.Conn.Write(b) -} - -// Just calling the same method on the Connection to implement interface -func (ch *ConnHolder) Close() error { return ch.Conn.Close() } -func (ch *ConnHolder) LocalAddr() net.Addr { return ch.Conn.LocalAddr() } -func (ch *ConnHolder) RemoteAddr() net.Addr { return ch.Conn.RemoteAddr() } -func (ch *ConnHolder) SetDeadline(t time.Time) error { return ch.Conn.SetDeadline(t) } -func (ch *ConnHolder) SetReadDeadline(t time.Time) error { return ch.Conn.SetReadDeadline(t) } -func (ch *ConnHolder) SetWriteDeadline(t time.Time) error { return ch.Conn.SetWriteDeadline(t) } - type splitConn struct { writer io.WriteCloser reader io.ReadCloser diff --git a/transport/internet/splithttp/dialer.go b/transport/internet/splithttp/dialer.go index 69a3087003db..ff1501a8b7b0 100644 --- a/transport/internet/splithttp/dialer.go +++ b/transport/internet/splithttp/dialer.go @@ -267,7 +267,6 @@ func Dial(ctx context.Context, dest net.Destination, streamSettings *internet.Me &buf.MultiBufferContainer{MultiBuffer: chunk}, int64(chunk.Len()), ) - if err != nil { errors.LogInfoInner(ctx, err, "failed to send upload") uploadPipeReader.Interrupt() From d0d4b07dcd867b5c7c9f8237bca7f61939396108 Mon Sep 17 00:00:00 2001 From: Dmitry Anderson <4nd3r5z0n@gmail.com> Date: Mon, 16 Sep 2024 10:27:24 +0200 Subject: [PATCH 3/5] Reading response before making the request --- transport/internet/splithttp/client.go | 66 ++++++++++++++----------- transport/internet/splithttp/h1_conn.go | 12 +++++ 2 files changed, 49 insertions(+), 29 deletions(-) create mode 100644 transport/internet/splithttp/h1_conn.go diff --git a/transport/internet/splithttp/client.go b/transport/internet/splithttp/client.go index 18297c935da5..7b7c9d0101a8 100644 --- a/transport/internet/splithttp/client.go +++ b/transport/internet/splithttp/client.go @@ -154,42 +154,50 @@ func (c *DefaultDialerClient) SendUploadRequest(ctx context.Context, url string, // safely retried. if instead req.Write is called multiple // times, the body is already drained after the first // request - requestBytes := new(bytes.Buffer) - common.Must(req.Write(requestBytes)) + requestBuff := new(bytes.Buffer) + common.Must(req.Write(requestBuff)) var uploadConn any + var h1UploadConn *H1Conn + + for { + uploadConn = c.uploadRawPool.Get() + newConnection := uploadConn == nil + if newConnection { + newConn, err := c.dialUploadConn(context.WithoutCancel(ctx)) + if err != nil { + return err + } + h1UploadConn = NewH1Conn(newConn) + uploadConn = h1UploadConn + } else { + h1UploadConn = uploadConn.(*H1Conn) + + // TODO: Replace 0 here with a config value later + // Or add some other condition for optimization purposes + if h1UploadConn.UnreadedResponsesCount > 0 { + resp, err := http.ReadResponse(bufio.NewReader(h1UploadConn), req) + if err != nil { + return fmt.Errorf("error while reading response: %s", err.Error()) + } + if resp.StatusCode != 200 { + return fmt.Errorf("got non-200 error response code: %d", resp.StatusCode) + } + } + } - uploadConn = c.uploadRawPool.Get() - newConnection := uploadConn == nil - if newConnection { - uploadConn, err = c.dialUploadConn(context.WithoutCancel(ctx)) - if err != nil { + _, err := h1UploadConn.Write(requestBuff.Bytes()) + // if the write failed, we try another connection from + // the pool, until the write on a new connection fails. + // failed writes to a pooled connection are normal when + // the connection has been closed in the meantime. + if err == nil { + break + } else if newConnection { return err } } - conn := uploadConn.(net.Conn) - _, err := conn.Write(requestBytes.Bytes()) - // if the write failed, we try another connection from - // the pool, until the write on a new connection fails. - // failed writes to a pooled connection are normal when - // the connection has been closed in the meantime. - if newConnection && err != nil { - return err - } - - resp, err := http.ReadResponse(bufio.NewReader(conn), req) - if err != nil { - return fmt.Errorf("error while reading response: %s", err.Error()) - } - if resp.StatusCode != 200 { - return fmt.Errorf("error response code: %d", resp.StatusCode) - } - - // buff := &bytes.Buffer{} - // _, err := io.Copy(buff, uploadConnTyped) - // common.Must(err) - c.uploadRawPool.Put(uploadConn) } diff --git a/transport/internet/splithttp/h1_conn.go b/transport/internet/splithttp/h1_conn.go new file mode 100644 index 000000000000..97166a1c29bf --- /dev/null +++ b/transport/internet/splithttp/h1_conn.go @@ -0,0 +1,12 @@ +package splithttp + +import "net" + +type H1Conn struct { + UnreadedResponsesCount int + net.Conn +} + +func NewH1Conn(conn net.Conn) *H1Conn { + return &H1Conn{Conn: conn} +} From 2cf2f23960609ea1e6f7c64e913fd56a64606509 Mon Sep 17 00:00:00 2001 From: Dmitry Anderson <4nd3r5z0n@gmail.com> Date: Mon, 16 Sep 2024 12:27:10 +0200 Subject: [PATCH 4/5] Reusing h1UploadConn buffer reader To prevent losing data due to some reader errors like EOF and to optimise the code --- transport/internet/splithttp/client.go | 3 +-- transport/internet/splithttp/h1_conn.go | 13 +++++++++++-- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/transport/internet/splithttp/client.go b/transport/internet/splithttp/client.go index 7b7c9d0101a8..7fc269457011 100644 --- a/transport/internet/splithttp/client.go +++ b/transport/internet/splithttp/client.go @@ -1,7 +1,6 @@ package splithttp import ( - "bufio" "bytes" "context" "fmt" @@ -176,7 +175,7 @@ func (c *DefaultDialerClient) SendUploadRequest(ctx context.Context, url string, // TODO: Replace 0 here with a config value later // Or add some other condition for optimization purposes if h1UploadConn.UnreadedResponsesCount > 0 { - resp, err := http.ReadResponse(bufio.NewReader(h1UploadConn), req) + resp, err := http.ReadResponse(h1UploadConn.RespBufReader, req) if err != nil { return fmt.Errorf("error while reading response: %s", err.Error()) } diff --git a/transport/internet/splithttp/h1_conn.go b/transport/internet/splithttp/h1_conn.go index 97166a1c29bf..4f28c085a5cd 100644 --- a/transport/internet/splithttp/h1_conn.go +++ b/transport/internet/splithttp/h1_conn.go @@ -1,12 +1,21 @@ package splithttp -import "net" +import ( + "bufio" + "net" +) type H1Conn struct { UnreadedResponsesCount int + // To reuse response reader, so we won't lose data + // If some EOF will accure + RespBufReader *bufio.Reader net.Conn } func NewH1Conn(conn net.Conn) *H1Conn { - return &H1Conn{Conn: conn} + return &H1Conn{ + RespBufReader: bufio.NewReader(conn), + Conn: conn, + } } From dade6d3bca2b73210553953d7c44ec34afd527a1 Mon Sep 17 00:00:00 2001 From: Dmitry Anderson <4nd3r5z0n@gmail.com> Date: Mon, 16 Sep 2024 13:28:00 +0200 Subject: [PATCH 5/5] Missleading comment removed --- transport/internet/splithttp/h1_conn.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/transport/internet/splithttp/h1_conn.go b/transport/internet/splithttp/h1_conn.go index 4f28c085a5cd..f89f2a666107 100644 --- a/transport/internet/splithttp/h1_conn.go +++ b/transport/internet/splithttp/h1_conn.go @@ -7,9 +7,7 @@ import ( type H1Conn struct { UnreadedResponsesCount int - // To reuse response reader, so we won't lose data - // If some EOF will accure - RespBufReader *bufio.Reader + RespBufReader *bufio.Reader net.Conn }