Skip to content
This repository was archived by the owner on Aug 11, 2020. It is now read-only.

Commit 0279419

Browse files
committed
quic: refactor SendPendingData
For DefaultApplication, use a linked list to only iterate through QuicStream instances that have data to send. Use a single unified implementation of SendPendingData for QuicApplication.
1 parent 3405c97 commit 0279419

8 files changed

Lines changed: 238 additions & 327 deletions

src/quic/node_quic_default_application.cc

Lines changed: 80 additions & 166 deletions
Original file line numberDiff line numberDiff line change
@@ -13,16 +13,69 @@
1313
namespace node {
1414
namespace quic {
1515

16+
namespace {
17+
void Consume(ngtcp2_vec** pvec, size_t* pcnt, size_t len) {
18+
ngtcp2_vec* v = *pvec;
19+
size_t cnt = *pcnt;
20+
21+
for (; cnt > 0; --cnt, ++v) {
22+
if (v->len > len) {
23+
v->len -= len;
24+
v->base += len;
25+
break;
26+
}
27+
len -= v->len;
28+
}
29+
30+
*pvec = v;
31+
*pcnt = cnt;
32+
}
33+
34+
int IsEmpty(const ngtcp2_vec* vec, size_t cnt) {
35+
size_t i;
36+
for (i = 0; i < cnt && vec[i].len == 0; ++i) {}
37+
return i == cnt;
38+
}
39+
} // anonymous namespace
40+
1641
DefaultApplication::DefaultApplication(
1742
QuicSession* session) :
1843
QuicApplication(session) {}
1944

2045
bool DefaultApplication::Initialize() {
21-
if (!needs_init())
22-
return false;
23-
Debug(session(), "Default QUIC Application Initialized");
24-
set_init_done();
25-
return true;
46+
if (needs_init()) {
47+
Debug(session(), "Default QUIC Application Initialized");
48+
set_init_done();
49+
}
50+
return needs_init();
51+
}
52+
53+
void DefaultApplication::ScheduleStream(int64_t stream_id) {
54+
QuicStream* stream = session()->FindStream(stream_id);
55+
Debug(session(), "Scheduling stream %" PRIu64, stream_id);
56+
if (stream != nullptr)
57+
stream->Schedule(&stream_queue_);
58+
}
59+
60+
void DefaultApplication::UnscheduleStream(int64_t stream_id) {
61+
QuicStream* stream = session()->FindStream(stream_id);
62+
Debug(session(), "Unscheduling stream %" PRIu64, stream_id);
63+
if (stream != nullptr)
64+
stream->Unschedule();
65+
}
66+
67+
void DefaultApplication::StreamClose(
68+
int64_t stream_id,
69+
uint64_t app_error_code) {
70+
if (app_error_code == 0)
71+
app_error_code = NGTCP2_APP_NOERROR;
72+
UnscheduleStream(stream_id);
73+
QuicApplication::StreamClose(stream_id, app_error_code);
74+
}
75+
76+
void DefaultApplication::ResumeStream(int64_t stream_id) {
77+
Debug(session(), "Stream %" PRId64 " has data to send");
78+
ScheduleStream(stream_id);
2679
}
2780

2881
bool DefaultApplication::ReceiveStreamData(
@@ -59,189 +112,50 @@ bool DefaultApplication::ReceiveStreamData(
59112
return true;
60113
}
61114

62-
void DefaultApplication::AcknowledgeStreamData(
63-
int64_t stream_id,
64-
uint64_t offset,
65-
size_t datalen) {
66-
QuicStream* stream = session()->FindStream(stream_id);
67-
Debug(session(), "Default QUIC Application acknowledging stream data");
68-
// It's possible that the stream has already been destroyed and
69-
// removed. If so, just silently ignore the ack
70-
if (stream != nullptr)
71-
stream->Acknowledge(offset, datalen);
72-
}
73-
74-
bool DefaultApplication::SendPendingData() {
75-
// Right now this iterates through the streams in the order they
76-
// were created. Later, we might want to implement a prioritization
77-
// scheme to allow higher priority streams to be serialized first.
78-
// Prioritization is left entirely up to the application layer in QUIC.
79-
// HTTP/3, for instance, drops prioritization entirely.
80-
Debug(session(), "Default QUIC Application sending pending data");
81-
for (const auto& stream : session()->streams()) {
82-
if (!SendStreamData(stream.second.get()))
83-
return false;
84-
85-
// Check to make sure QuicSession state did not change in this iteration
86-
if (session()->is_in_draining_period() ||
87-
session()->is_in_closing_period() ||
88-
session()->is_destroyed()) {
89-
break;
90-
}
91-
}
92-
93-
return true;
94-
}
95-
96-
namespace {
97-
void Consume(ngtcp2_vec** pvec, size_t* pcnt, size_t len) {
98-
ngtcp2_vec* v = *pvec;
99-
size_t cnt = *pcnt;
100-
101-
for (; cnt > 0; --cnt, ++v) {
102-
if (v->len > len) {
103-
v->len -= len;
104-
v->base += len;
105-
break;
106-
}
107-
len -= v->len;
108-
}
109-
110-
*pvec = v;
111-
*pcnt = cnt;
112-
}
113-
114-
int IsEmpty(const ngtcp2_vec* vec, size_t cnt) {
115-
size_t i;
116-
for (i = 0; i < cnt && vec[i].len == 0; ++i) {}
117-
return i == cnt;
118-
}
119-
} // anonymous namespace
120-
121115
int DefaultApplication::GetStreamData(StreamData* stream_data) {
122-
QuicStream* stream = session()->FindStream(stream_data->id);
116+
QuicStream* stream = stream_queue_.PopFront();
117+
// If stream is nullptr, there are no streams with data pending.
118+
if (stream == nullptr)
119+
return 0;
120+
123121
stream_data->remaining =
124-
stream->DrainInto(&stream_data->data, &stream_data->count, 16);
122+
stream->DrainInto(
123+
&stream_data->data,
124+
&stream_data->count,
125+
MAX_VECTOR_COUNT);
126+
127+
stream_data->user_data = stream;
128+
stream_data->id = stream->id();
125129
stream_data->fin = stream->is_writable() ? 0 : 1;
126130

131+
// Schedule the stream again only if there is data to write. There
132+
// might not actually be any more data to write but we can't know
133+
// that yet as it depends entirely on how much data actually gets
134+
// serialized by ngtcp2.
135+
if (stream_data->count > 0)
136+
ScheduleStream(stream->id());
137+
127138
Debug(session(), "Selected %" PRId64 " buffers for stream %" PRId64 "%s",
128139
stream_data->count,
129140
stream_data->id,
130141
stream_data->fin == 1 ? " (fin)" : "");
131142
return 0;
132143
}
133144

134-
bool DefaultApplication::SendStreamData(QuicStream* stream) {
135-
ssize_t ndatalen = 0;
136-
QuicPathStorage path;
137-
Debug(session(), "Default QUIC Application sending stream %" PRId64 " data",
138-
stream->id());
139-
140-
StreamData stream_data;
141-
stream_data.id = stream->id();
142-
stream_data.user_data = stream;
143-
GetStreamData(&stream_data);
144-
145-
// If there is no stream data and we're not sending fin,
146-
// Just return without doing anything.
147-
if (stream_data.count == 0 && !stream_data.fin) {
148-
Debug(stream, "There is no stream data to send");
149-
return true;
150-
}
151-
152-
std::unique_ptr<QuicPacket> packet = CreateStreamDataPacket();
153-
uint8_t* pos = packet->data();
154-
155-
for (;;) {
156-
// If packet was sent on the previous iteration, it will have been reset
157-
if (!packet) {
158-
packet = CreateStreamDataPacket();
159-
pos = packet->data();
160-
}
161-
162-
ssize_t nwrite = WriteVStream(&path, pos, &ndatalen, stream_data);
163-
164-
if (nwrite <= 0) {
165-
switch (nwrite) {
166-
case 0:
167-
goto congestion_limited;
168-
case NGTCP2_ERR_PKT_NUM_EXHAUSTED:
169-
// There is a finite number of packets that can be sent
170-
// per connection. Once those are exhausted, there's
171-
// absolutely nothing we can do except immediately
172-
// and silently tear down the QuicSession. This has
173-
// to be silent because we can't even send a
174-
// CONNECTION_CLOSE since even those require a
175-
// packet number.
176-
session()->SilentClose();
177-
return false;
178-
case NGTCP2_ERR_STREAM_DATA_BLOCKED:
179-
session()->StreamDataBlocked(stream->id());
180-
if (session()->max_data_left() == 0)
181-
goto congestion_limited;
182-
return true;
183-
case NGTCP2_ERR_STREAM_SHUT_WR:
184-
if (UNLIKELY(!BlockStream(stream_data.id)))
185-
return false;
186-
return true;
187-
case NGTCP2_ERR_STREAM_NOT_FOUND:
188-
return true;
189-
case NGTCP2_ERR_WRITE_STREAM_MORE:
190-
CHECK_GT(ndatalen, 0);
191-
CHECK(StreamCommit(&stream_data, ndatalen));
192-
pos += ndatalen;
193-
continue;
194-
}
195-
session()->set_last_error(QUIC_ERROR_SESSION, static_cast<int>(nwrite));
196-
return false;
197-
}
198-
199-
pos += nwrite;
200-
201-
if (ndatalen >= 0)
202-
CHECK(StreamCommit(&stream_data, ndatalen));
203-
204-
Debug(stream, "Sending %" PRIu64 " bytes in serialized packet", nwrite);
205-
packet->set_length(nwrite);
206-
if (!session()->SendPacket(std::move(packet), path))
207-
return false;
208-
209-
packet.reset();
210-
pos = nullptr;
211-
212-
if (ShouldSetFin(stream_data))
213-
set_stream_fin(stream_data.id);
214-
215-
if (IsEmpty(stream_data.buf, stream_data.count))
216-
break;
217-
}
218-
219-
return true;
220-
221-
congestion_limited:
222-
if (pos - packet->data()) {
223-
// Some data was serialized into the packet. We need to send it.
224-
packet->set_length(pos - packet->data());
225-
Debug(session(), "Congestion limited, but %" PRIu64 " bytes pending.",
226-
packet->length());
227-
if (!session()->SendPacket(std::move(packet), path))
228-
return false;
229-
}
230-
return true;
231-
}
232-
233145
bool DefaultApplication::StreamCommit(
234146
StreamData* stream_data,
235147
size_t datalen) {
236148
QuicStream* stream = static_cast<QuicStream*>(stream_data->user_data);
149+
CHECK_NOT_NULL(stream);
237150
stream_data->remaining -= datalen;
238151
Consume(&stream_data->buf, &stream_data->count, datalen);
239152
stream->Commit(datalen);
240153
return true;
241154
}
242155

243156
bool DefaultApplication::ShouldSetFin(const StreamData& stream_data) {
244-
if (!IsEmpty(stream_data.buf, stream_data.count))
157+
if (stream_data.user_data == nullptr ||
158+
!IsEmpty(stream_data.buf, stream_data.count))
245159
return false;
246160
QuicStream* stream = static_cast<QuicStream*>(stream_data.user_data);
247161
return !stream->is_writable();

src/quic/node_quic_default_application.h

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,10 @@
33

44
#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
55

6+
#include "node_quic_stream.h"
67
#include "node_quic_session.h"
78
#include "node_quic_util.h"
9+
#include "util.h"
810
#include "v8.h"
911

1012
namespace node {
@@ -28,21 +30,23 @@ class DefaultApplication final : public QuicApplication {
2830
const uint8_t* data,
2931
size_t datalen,
3032
uint64_t offset) override;
31-
void AcknowledgeStreamData(
32-
int64_t stream_id,
33-
uint64_t offset,
34-
size_t datalen) override;
35-
36-
bool SendPendingData() override;
37-
bool SendStreamData(QuicStream* stream) override;
3833

3934
int GetStreamData(StreamData* stream_data) override;
40-
bool StreamCommit(StreamData* stream_data, size_t datalen) override;
35+
36+
void ResumeStream(int64_t stream_id) override;
37+
void StreamClose(int64_t stream_id, uint64_t app_error_code) override;
4138
bool ShouldSetFin(const StreamData& stream_data) override;
39+
bool StreamCommit(StreamData* stream_data, size_t datalen) override;
4240

4341
SET_SELF_SIZE(DefaultApplication)
4442
SET_MEMORY_INFO_NAME(DefaultApplication)
4543
SET_NO_MEMORY_INFO()
44+
45+
private:
46+
void ScheduleStream(int64_t stream_id);
47+
void UnscheduleStream(int64_t stream_id);
48+
49+
QuicStream::Queue stream_queue_;
4650
};
4751

4852
} // namespace quic

0 commit comments

Comments
 (0)