Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 41 additions & 5 deletions transport/internet/splithttp/dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,11 @@ func Dial(ctx context.Context, dest net.Destination, streamSettings *internet.Me

httpClient := getHTTPClient(ctx, dest, streamSettings)

uploadPipeReader, uploadPipeWriter := pipe.New(pipe.WithSizeLimit(scMaxEachPostBytes.roll()))
maxUploadSize := scMaxEachPostBytes.roll()
// WithSizeLimit(0) will still allow single bytes to pass, and a lot of
// code relies on this behavior. Subtract 1 so that together with
// uploadWriter wrapper, exact size limits can be enforced
uploadPipeReader, uploadPipeWriter := pipe.New(pipe.WithSizeLimit(maxUploadSize - 1))

go func() {
requestsLimiter := semaphore.New(int(scMaxConcurrentPosts.roll()))
Expand Down Expand Up @@ -318,16 +322,48 @@ func Dial(ctx context.Context, dest net.Destination, streamSettings *internet.Me
},
}

// necessary in order to send larger chunks in upload
bufferedUploadPipeWriter := buf.NewBufferedWriter(uploadPipeWriter)
bufferedUploadPipeWriter.SetBuffered(false)
writer := uploadWriter{
uploadPipeWriter,
maxUploadSize,
}

conn := splitConn{
writer: bufferedUploadPipeWriter,
writer: writer,
reader: lazyDownload,
remoteAddr: remoteAddr,
localAddr: localAddr,
}

return stat.Connection(&conn), nil
}

// A wrapper around pipe that ensures the size limit is exactly honored.
//
// The MultiBuffer pipe accepts any single WriteMultiBuffer call even if that
// single MultiBuffer exceeds the size limit, and then starts blocking on the
// next WriteMultiBuffer call. This means that ReadMultiBuffer can return more
// bytes than the size limit. We work around this by splitting a potentially
// too large write up into multiple.
type uploadWriter struct {
*pipe.Writer
maxLen int32
}

func (w uploadWriter) Write(b []byte) (int, error) {
capacity := int(w.maxLen - w.Len())
if capacity > 0 && capacity < len(b) {
b = b[:capacity]
}

buffer := buf.New()
n, err := buffer.Write(b)
if err != nil {
return 0, err
}

err = w.WriteMultiBuffer([]*buf.Buffer{buffer})
if err != nil {
return 0, err
}
return n, nil
}
56 changes: 55 additions & 1 deletion transport/internet/splithttp/splithttp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ func Test_queryString(t *testing.T) {
ctx := context.Background()
streamSettings := &internet.MemoryStreamConfig{
ProtocolName: "splithttp",
ProtocolSettings: &Config{Path: "sh"},
ProtocolSettings: &Config{Path: "sh?ed=2048"},
}
conn, err := Dial(ctx, net.TCPDestination(net.DomainAddress("localhost"), listenPort), streamSettings)

Expand All @@ -407,3 +407,57 @@ func Test_queryString(t *testing.T) {
common.Must(conn.Close())
common.Must(listen.Close())
}

func Test_maxUpload(t *testing.T) {
listenPort := tcp.PickPort()
streamSettings := &internet.MemoryStreamConfig{
ProtocolName: "splithttp",
ProtocolSettings: &Config{
Path: "/sh",
ScMaxEachPostBytes: &RandRangeConfig{
From: 100,
To: 100,
},
},
}

var uploadSize int
listen, err := ListenSH(context.Background(), net.LocalHostIP, listenPort, streamSettings, func(conn stat.Connection) {
go func(c stat.Connection) {
defer c.Close()
var b [1024]byte
c.SetReadDeadline(time.Now().Add(2 * time.Second))
n, err := c.Read(b[:])
if err != nil {
return
}

uploadSize = n

common.Must2(c.Write([]byte("Response")))
}(conn)
})
common.Must(err)
ctx := context.Background()

conn, err := Dial(ctx, net.TCPDestination(net.DomainAddress("localhost"), listenPort), streamSettings)

// send a slightly too large upload
var upload [101]byte
_, err = conn.Write(upload[:])
common.Must(err)

var b [1024]byte
n, _ := io.ReadFull(conn, b[:])
fmt.Println("string is", n)
if string(b[:n]) != "Response" {
t.Error("response: ", string(b[:n]))
}
common.Must(conn.Close())

if uploadSize > 100 || uploadSize == 0 {
t.Error("incorrect upload size: ", uploadSize)
}

common.Must(listen.Close())
}
8 changes: 8 additions & 0 deletions transport/pipe/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,14 @@ var (
errSlowDown = errors.New("slow down")
)

func (p *pipe) Len() int32 {
data := p.data
if data == nil {
return 0
}
return data.Len()
}

func (p *pipe) getState(forRead bool) error {
switch p.state {
case open:
Expand Down
4 changes: 4 additions & 0 deletions transport/pipe/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ func (w *Writer) Close() error {
return w.pipe.Close()
}

func (w *Writer) Len() int32 {
return w.pipe.Len()
}

// Interrupt implements common.Interruptible.
func (w *Writer) Interrupt() {
w.pipe.Interrupt()
Expand Down