Skip to content

Commit 9ee1387

Browse files
committed
for bug #251, 9k+ clients, use fast cache for msgs queue. 2.0.57
1 parent dde05c6 commit 9ee1387

File tree

8 files changed

+171
-49
lines changed

8 files changed

+171
-49
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -485,6 +485,7 @@ Supported operating systems and hardware:
485485
* 2013-10-17, Created.<br/>
486486

487487
## History
488+
* v2.0, 2014-12-05, fix [#251](https://github.com/winlinvip/simple-rtmp-server/issues/251), 9k+ clients, use fast cache for msgs queue. 2.0.57
488489
* v2.0, 2014-12-04, fix [#241](https://github.com/winlinvip/simple-rtmp-server/issues/241), add mw(merged-write) config. 2.0.53
489490
* v2.0, 2014-12-04, for [#241](https://github.com/winlinvip/simple-rtmp-server/issues/241), support mr(merged-read) config and reload. 2.0.52.
490491
* v2.0, 2014-12-04, enable [#241](https://github.com/winlinvip/simple-rtmp-server/issues/241) and [#248](https://github.com/winlinvip/simple-rtmp-server/issues/248), +25% performance, 2.5k publisher. 2.0.50

trunk/src/app/srs_app_rtmp_conn.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -605,7 +605,7 @@ int SrsRtmpConn::do_playing(SrsSource* source, SrsQueueRecvThread* trd)
605605
// get messages from consumer.
606606
// each msg in msgs.msgs must be free, for the SrsMessageArray never free them.
607607
int count = 0;
608-
if ((ret = consumer->dump_packets(msgs.max, msgs.msgs, count)) != ERROR_SUCCESS) {
608+
if ((ret = consumer->dump_packets(&msgs, &count)) != ERROR_SUCCESS) {
609609
srs_error("get messages from consumer failed. ret=%d", ret);
610610
return ret;
611611
}

trunk/src/app/srs_app_source.cpp

Lines changed: 100 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ using namespace std;
4141
#include <srs_app_edge.hpp>
4242
#include <srs_kernel_utility.hpp>
4343
#include <srs_app_avc_aac.hpp>
44+
#include <srs_protocol_msg_array.hpp>
4445

4546
#define CONST_MAX_JITTER_MS 500
4647
#define DEFAULT_FRAME_TIME_MS 40
@@ -166,22 +167,12 @@ SrsMessageQueue::~SrsMessageQueue()
166167
clear();
167168
}
168169

169-
int SrsMessageQueue::count()
170-
{
171-
return (int)msgs.size();
172-
}
173-
174-
int SrsMessageQueue::duration()
175-
{
176-
return (int)(av_end_time - av_start_time);
177-
}
178-
179170
void SrsMessageQueue::set_queue_size(double queue_size)
180171
{
181172
queue_size_ms = (int)(queue_size * 1000);
182173
}
183174

184-
int SrsMessageQueue::enqueue(SrsSharedPtrMessage* msg)
175+
int SrsMessageQueue::enqueue(SrsSharedPtrMessage* msg, bool* is_overflow)
185176
{
186177
int ret = ERROR_SUCCESS;
187178

@@ -196,6 +187,11 @@ int SrsMessageQueue::enqueue(SrsSharedPtrMessage* msg)
196187
msgs.push_back(msg);
197188

198189
while (av_end_time - av_start_time > queue_size_ms) {
190+
// notice the caller queue already overflow and shrinked.
191+
if (is_overflow) {
192+
*is_overflow = true;
193+
}
194+
199195
shrink();
200196
}
201197

@@ -305,10 +301,20 @@ SrsConsumer::SrsConsumer(SrsSource* _source)
305301
mw_min_msgs = 0;
306302
mw_duration = 0;
307303
mw_waiting = false;
304+
305+
mw_cache = new SrsMessageArray(SRS_PERF_MW_MSGS);
306+
mw_count = 0;
307+
mw_first_pkt = mw_last_pkt = 0;
308308
}
309309

310310
SrsConsumer::~SrsConsumer()
311311
{
312+
if (mw_cache) {
313+
mw_cache->free(mw_count);
314+
mw_count = 0;
315+
}
316+
srs_freep(mw_cache);
317+
312318
source->on_consumer_destroy(this);
313319
srs_freep(jitter);
314320
srs_freep(queue);
@@ -341,22 +347,53 @@ int SrsConsumer::enqueue(SrsSharedPtrMessage* msg, bool atc, int tba, int tbv, S
341347
}
342348
}
343349

344-
if ((ret = queue->enqueue(msg)) != ERROR_SUCCESS) {
345-
return ret;
350+
// use fast cache if available
351+
if (mw_count < mw_cache->max) {
352+
// update fast cache timestamps
353+
if (mw_count == 0) {
354+
mw_first_pkt = msg->header.timestamp;
355+
}
356+
mw_last_pkt = msg->header.timestamp;
357+
358+
mw_cache->msgs[mw_count++] = msg;
359+
} else{
360+
// fast cache is full, use queue.
361+
bool is_overflow = false;
362+
if ((ret = queue->enqueue(msg, &is_overflow)) != ERROR_SUCCESS) {
363+
return ret;
364+
}
365+
// when overflow, clear cache and refresh the fast cache.
366+
if (is_overflow) {
367+
mw_cache->free(mw_count);
368+
if ((ret = dumps_queue_to_fast_cache()) != ERROR_SUCCESS) {
369+
return ret;
370+
}
371+
}
346372
}
347373

348374
// fire the mw when msgs is enough.
349-
if (mw_waiting && queue->count() > mw_min_msgs && queue->duration() > mw_duration) {
350-
st_cond_signal(mw_wait);
351-
mw_waiting = false;
375+
if (mw_waiting) {
376+
// when fast cache not overflow, always flush.
377+
// so we donot care about the queue.
378+
bool fast_cache_overflow = mw_count >= mw_cache->max;
379+
int duration_ms = (int)(mw_last_pkt - mw_first_pkt);
380+
bool match_min_msgs = mw_count > mw_min_msgs;
381+
382+
// when fast cache overflow, or duration ok, signal to flush.
383+
if (fast_cache_overflow || (match_min_msgs && duration_ms > mw_duration)) {
384+
st_cond_signal(mw_wait);
385+
mw_waiting = false;
386+
}
352387
}
353388

354389
return ret;
355390
}
356391

357-
int SrsConsumer::dump_packets(int max_count, SrsMessage** pmsgs, int& count)
392+
int SrsConsumer::dump_packets(SrsMessageArray* msgs, int* count)
358393
{
359-
srs_assert(max_count > 0);
394+
int ret =ERROR_SUCCESS;
395+
396+
srs_assert(msgs->max > 0);
360397

361398
if (should_update_source_id) {
362399
srs_trace("update source_id=%d[%d]", source->source_id(), source->source_id());
@@ -365,25 +402,45 @@ int SrsConsumer::dump_packets(int max_count, SrsMessage** pmsgs, int& count)
365402

366403
// paused, return nothing.
367404
if (paused) {
368-
return ERROR_SUCCESS;
405+
return ret;
406+
}
407+
408+
// only dumps an whole array to msgs.
409+
for (int i = 0; i < mw_count; i++) {
410+
msgs->msgs[i] = mw_cache->msgs[i];
369411
}
412+
*count = mw_count;
370413

371-
return queue->dump_packets(max_count, pmsgs, count);
414+
// when fast cache is not filled,
415+
// we donot check the queue, direclty zero fast cache.
416+
if (mw_count < mw_cache->max) {
417+
mw_count = 0;
418+
mw_first_pkt = mw_last_pkt = 0;
419+
return ret;
420+
}
421+
422+
return dumps_queue_to_fast_cache();
372423
}
373424

374425
void SrsConsumer::wait(int nb_msgs, int duration)
375426
{
376427
mw_min_msgs = nb_msgs;
377428
mw_duration = duration;
378429

379-
// already ok, donot wait.
380-
if (queue->count() > mw_min_msgs && queue->duration() > mw_duration) {
430+
// when fast cache not overflow, always flush.
431+
// so we donot care about the queue.
432+
bool fast_cache_overflow = mw_count >= mw_cache->max;
433+
int duration_ms = (int)(mw_last_pkt - mw_first_pkt);
434+
bool match_min_msgs = mw_count > mw_min_msgs;
435+
436+
// when fast cache overflow, or duration ok, signal to flush.
437+
if (fast_cache_overflow || (match_min_msgs && duration_ms > mw_duration)) {
381438
return;
382439
}
383440

384441
// the enqueue will notify this cond.
385442
mw_waiting = true;
386-
443+
// wait for msgs to incoming.
387444
st_cond_wait(mw_wait);
388445
}
389446

@@ -397,6 +454,26 @@ int SrsConsumer::on_play_client_pause(bool is_pause)
397454
return ret;
398455
}
399456

457+
int SrsConsumer::dumps_queue_to_fast_cache()
458+
{
459+
int ret =ERROR_SUCCESS;
460+
461+
// fill fast cache with queue.
462+
if ((ret = queue->dump_packets(mw_cache->max, mw_cache->msgs, mw_count)) != ERROR_SUCCESS) {
463+
return ret;
464+
}
465+
// set the timestamp when got message.
466+
if (mw_count > 0) {
467+
SrsMessage* first_msg = mw_cache->msgs[0];
468+
mw_first_pkt = first_msg->header.timestamp;
469+
470+
SrsMessage* last_msg = mw_cache->msgs[mw_count - 1];
471+
mw_last_pkt = last_msg->header.timestamp;
472+
}
473+
474+
return ret;
475+
}
476+
400477
SrsGopCache::SrsGopCache()
401478
{
402479
cached_video_count = 0;

trunk/src/app/srs_app_source.hpp

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ class SrsRequest;
4848
class SrsStSocket;
4949
class SrsRtmpServer;
5050
class SrsEdgeProxyContext;
51+
class SrsMessageArray;
5152
#ifdef SRS_AUTO_HLS
5253
class SrsHls;
5354
#endif
@@ -115,14 +116,6 @@ class SrsMessageQueue
115116
SrsMessageQueue();
116117
virtual ~SrsMessageQueue();
117118
public:
118-
/**
119-
* get the count of queue.
120-
*/
121-
virtual int count();
122-
/**
123-
* get duration of queue.
124-
*/
125-
virtual int duration();
126119
/**
127120
* set the queue size
128121
* @param queue_size the queue size in seconds.
@@ -132,8 +125,9 @@ class SrsMessageQueue
132125
/**
133126
* enqueue the message, the timestamp always monotonically.
134127
* @param msg, the msg to enqueue, user never free it whatever the return code.
128+
* @param is_overflow, whether overflow and shrinked. NULL to ignore.
135129
*/
136-
virtual int enqueue(SrsSharedPtrMessage* msg);
130+
virtual int enqueue(SrsSharedPtrMessage* msg, bool* is_overflow = NULL);
137131
/**
138132
* get packets in consumer queue.
139133
* @pmsgs SrsMessages*[], used to store the msgs, user must alloc it.
@@ -168,6 +162,14 @@ class SrsConsumer
168162
bool mw_waiting;
169163
int mw_min_msgs;
170164
int mw_duration;
165+
// use fast cache for msgs
166+
// @see https://github.com/winlinvip/simple-rtmp-server/issues/251
167+
SrsMessageArray* mw_cache;
168+
// the count of msg in fast cache.
169+
int mw_count;
170+
// the packet time in fast cache.
171+
int64_t mw_first_pkt;
172+
int64_t mw_last_pkt;
171173
public:
172174
SrsConsumer(SrsSource* _source);
173175
virtual ~SrsConsumer();
@@ -197,11 +199,11 @@ class SrsConsumer
197199
virtual int enqueue(SrsSharedPtrMessage* msg, bool atc, int tba, int tbv, SrsRtmpJitterAlgorithm ag);
198200
/**
199201
* get packets in consumer queue.
200-
* @pmsgs SrsMessages*[], used to store the msgs, user must alloc it.
201-
* @count the count in array, output param.
202+
* @param msgs the msgs array to dump packets to send.
203+
* @param count the count in array, output param.
202204
* @max_count the max count to dequeue, must be positive.
203205
*/
204-
virtual int dump_packets(int max_count, SrsMessage** pmsgs, int& count);
206+
virtual int dump_packets(SrsMessageArray* msgs, int* count);
205207
/**
206208
* wait for messages incomming, atleast nb_msgs and in duration.
207209
* @param nb_msgs the messages count to wait.
@@ -212,6 +214,12 @@ class SrsConsumer
212214
* when client send the pause message.
213215
*/
214216
virtual int on_play_client_pause(bool is_pause);
217+
private:
218+
/**
219+
* dumps the queue to fast cache,
220+
* when fast cache is clear or queue is overflow.
221+
*/
222+
virtual int dumps_queue_to_fast_cache();
215223
};
216224

217225
/**

trunk/src/core/srs_core.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
3131
// current release version
3232
#define VERSION_MAJOR 2
3333
#define VERSION_MINOR 0
34-
#define VERSION_REVISION 56
34+
#define VERSION_REVISION 57
3535
// server info.
3636
#define RTMP_SIG_SRS_KEY "SRS"
3737
#define RTMP_SIG_SRS_ROLE "origin/edge server"

trunk/src/core/srs_core_performance.hpp

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -75,14 +75,24 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
7575
* @see SrsConfig::get_mw_sleep_ms()
7676
* @remark the mw sleep and msgs to send, maybe:
7777
* mw_sleep msgs iovs
78-
* 350 24/48 48/84
79-
* 500 24/48 48/84
80-
* 800 42/64 84/128
81-
* 1000 64/85 128/170
82-
* 1200 65/86 130/172
83-
* 1500 87/110 174/220
84-
* 1800 106/128 212/256
85-
* 2000 134/142 268/284
78+
* 350 43 86
79+
* 400 44 88
80+
* 500 46 92
81+
* 600 46 92
82+
* 700 82 164
83+
* 800 81 162
84+
* 900 80 160
85+
* 1000 88 176
86+
* 1100 91 182
87+
* 1200 89 178
88+
* 1300 119 238
89+
* 1400 120 240
90+
* 1500 119 238
91+
* 1600 131 262
92+
* 1700
93+
* 1800
94+
* 1900
95+
* 2000
8696
*/
8797
// the default config of mw.
8898
#define SRS_PERF_MW_SLEEP 350

trunk/src/rtmp/srs_protocol_msg_array.cpp

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,7 @@ SrsMessageArray::SrsMessageArray(int max_msgs)
3232
msgs = new SrsMessage*[max_msgs];
3333
max = max_msgs;
3434

35-
// initialize
36-
for (int i = 0; i < max_msgs; i++) {
37-
msgs[i] = NULL;
38-
}
35+
zero(max_msgs);
3936
}
4037

4138
SrsMessageArray::~SrsMessageArray()
@@ -46,4 +43,23 @@ SrsMessageArray::~SrsMessageArray()
4643
srs_freep(msgs);
4744
}
4845

46+
void SrsMessageArray::free(int count)
47+
{
48+
// initialize
49+
for (int i = 0; i < count; i++) {
50+
SrsMessage* msg = msgs[i];
51+
srs_freep(msg);
52+
53+
msgs[i] = NULL;
54+
}
55+
}
56+
57+
void SrsMessageArray::zero(int count)
58+
{
59+
// initialize
60+
for (int i = 0; i < count; i++) {
61+
msgs[i] = NULL;
62+
}
63+
}
64+
4965

trunk/src/rtmp/srs_protocol_msg_array.hpp

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,16 @@ class SrsMessageArray
6060
* free the msgs not sent out(not NULL).
6161
*/
6262
virtual ~SrsMessageArray();
63+
public:
64+
/**
65+
* free specified count of messages.
66+
*/
67+
virtual void free(int count);
68+
private:
69+
/**
70+
* zero initialize the message array.
71+
*/
72+
virtual void zero(int count);
6373
};
6474

6575
#endif

0 commit comments

Comments
 (0)