Skip to content

Commit cefbdf6

Browse files
emilkMichaelGrupp
authored andcommitted
Fix performance regression with --server-memory-limit 0
## Related * Closes #12626 ## What 0.29 changed the size of the channel for the gRPC proxy channel to be dependent on the `--server-memory-limit`. The thinking was to better heed the total memory limit. However, using a `server_memory_limit` of exactly `0` is common, and even encouraged in our docs. Having zero bytes buffer on the incoming channel leads to a lot of early backpressure, making the sender block, degrading performance. This PR changes the channel to always have a fixed 256 MiB of buffer space. This trades performance for (potential) latency. In future versions of Rerun we should consider making this value configurable, but doing so now would be a breaking change, and I would like to get this PR into a patch release. Previous to 0.29, we also had a fixed size input buffer, but measured in number of messages instead of number of bytes. This meant the server could easily balloon in memory use when logging large things. I believe having it defined in terms of bytes makes more sense, but there is _also_ a fixed upper-bound on the number of messages in the new code (4096). Source-Ref: be1671f8c672ccf5d743b21baf8984aa20eec150
1 parent 7fd794e commit cefbdf6

File tree

2 files changed

+28
-16
lines changed

2 files changed

+28
-16
lines changed

crates/store/re_grpc_server/src/lib.rs

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -40,14 +40,23 @@ pub const DEFAULT_SERVER_PORT: u16 = 9876;
4040
pub const MAX_DECODING_MESSAGE_SIZE: usize = u32::MAX as usize;
4141
pub const MAX_ENCODING_MESSAGE_SIZE: usize = MAX_DECODING_MESSAGE_SIZE;
4242

43+
/// Maximum number of messages in the input queue.
44+
const CHANNEL_SIZE_MESSAGES: usize = 1024; // TODO(emilk): move into `ServerOptions` after the patch release.
45+
46+
/// Make sure we can handle a quick burst of messages without blocking,
47+
/// even if the server has a [`ServerOptions::memory_limit`] of zero.
48+
const CHANNEL_SIZE_BYTES: u64 = 128 * 1024 * 1024; // TODO(emilk): move into `ServerOptions` after the patch release.
49+
4350
/// Options for the gRPC Proxy Server
4451
#[derive(Clone, Copy, Debug)]
4552
pub struct ServerOptions {
4653
/// When a client connect, should they be sent the oldest data first, or the newest?
4754
pub playback_behavior: PlaybackBehavior,
4855

49-
/// Start garbage collecting old data when we reach this.
50-
pub memory_limit: MemoryLimit,
56+
/// Limit on how much history the server saves.
57+
///
58+
/// It will start garbage collecting old data when we reach this.
59+
pub memory_limit: MemoryLimit, // TODO(emilk): rename `history_limit`
5160
}
5261

5362
impl Default for ServerOptions {
@@ -871,26 +880,17 @@ impl MessageProxy {
871880
}
872881

873882
fn new_with_recv(
874-
mut options: ServerOptions,
883+
options: ServerOptions,
875884
) -> (Self, async_broadcast_channel::Receiver<LogOrTableMsgProto>) {
876-
// Divide up the memory budget:
877-
let (broadcast_channel_memory_limit, rest_memory_limit) = options.memory_limit.split(0.25);
878-
options.memory_limit = rest_memory_limit;
879-
880885
let (broadcast_log_tx, broadcast_log_rx) = async_broadcast_channel::channel(
881886
"re_grpc_server broadcast",
882-
4096,
883-
broadcast_channel_memory_limit.as_bytes(),
887+
CHANNEL_SIZE_MESSAGES,
888+
CHANNEL_SIZE_BYTES,
884889
);
885890

886891
let (event_tx, event_rx) = {
887-
let message_queue_capacity = if options.memory_limit == MemoryLimit::ZERO {
888-
1
889-
} else {
890-
16 // Apply backpressure early
891-
};
892892
// TODO(emilk): this could also use a size-based backpressure mechanism.
893-
893+
let message_queue_capacity = 32; // Apply backpressure early
894894
async_mpsc_channel::channel("re_grpc_server events", message_queue_capacity)
895895
};
896896

crates/utils/re_quota_channel/src/async_broadcast_channel.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -404,12 +404,24 @@ pub fn channel<T: Clone>(
404404
max_messages: usize,
405405
max_bytes: u64,
406406
) -> (Sender<T>, Receiver<T>) {
407+
let debug_name = debug_name.into();
407408
let max_messages = max_messages.max(1); // Ensure we allow at least 1 message
408409

410+
if max_bytes == 0 {
411+
let msg = format!(
412+
"Channel '{debug_name}' has a memory limit of 0 bytes. Consider giving it at least a few MiB so that it can handle a quick burst of messages without blocking."
413+
);
414+
if cfg!(debug_assertions) {
415+
re_log::warn_once!("DEBUG WARNING: {msg}");
416+
} else {
417+
re_log::debug_once!("{msg}");
418+
}
419+
}
420+
409421
let (inner, inner_rx) = broadcast::channel(max_messages);
410422

411423
let state = Arc::new(ChannelState {
412-
debug_name: debug_name.into(),
424+
debug_name,
413425
locked: Mutex::new(Locked {
414426
bytes_in_flight: 0,
415427
messages_in_flight: 0,

0 commit comments

Comments
 (0)