Skip to content

Commit a364dbd

Browse files
authored
Merge 5c748ae into fa4d4b1
2 parents fa4d4b1 + 5c748ae commit a364dbd

10 files changed

Lines changed: 911 additions & 1122 deletions

File tree

Cargo.lock

Lines changed: 10 additions & 10 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

core/src/dispatch/mod.rs

Lines changed: 25 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ pub mod queue_manager;
4747

4848
// Default values for network config.
4949
const RETRY_CONNECTIONS_INTERVAL: u64 = 5000;
50+
const BOOT_TIMEOUT: u64 = 5000;
5051
const MAX_RETRY_ATTEMPTS: u8 = 10;
5152

5253
type NetHashMap<K, V> = FxHashMap<K, V>;
@@ -74,6 +75,7 @@ pub struct NetworkConfig {
7475
tcp_nodelay: bool,
7576
max_connection_retry_attempts: u8,
7677
connection_retry_interval: u64,
78+
boot_timeout: u64,
7779
}
7880

7981
impl NetworkConfig {
@@ -88,22 +90,17 @@ impl NetworkConfig {
8890
tcp_nodelay: true,
8991
max_connection_retry_attempts: MAX_RETRY_ATTEMPTS,
9092
connection_retry_interval: RETRY_CONNECTIONS_INTERVAL,
93+
boot_timeout: BOOT_TIMEOUT,
9194
}
9295
}
9396

9497
/// Create a new config with `addr` and protocol [TCP](Transport::Tcp)
9598
/// Note: Only the NetworkThread and NetworkDispatcher will use the `BufferConfig`, not Actors
9699
pub fn with_buffer_config(addr: SocketAddr, buffer_config: BufferConfig) -> Self {
97100
buffer_config.validate();
98-
NetworkConfig {
99-
addr,
100-
transport: Transport::Tcp,
101-
buffer_config,
102-
custom_allocator: None,
103-
tcp_nodelay: true,
104-
max_connection_retry_attempts: MAX_RETRY_ATTEMPTS,
105-
connection_retry_interval: RETRY_CONNECTIONS_INTERVAL,
106-
}
101+
let mut cfg = NetworkConfig::new(addr);
102+
cfg.set_buffer_config(buffer_config);
103+
cfg
107104
}
108105

109106
/// Create a new config with `addr` and protocol [TCP](Transport::Tcp)
@@ -122,6 +119,7 @@ impl NetworkConfig {
122119
tcp_nodelay: true,
123120
max_connection_retry_attempts: MAX_RETRY_ATTEMPTS,
124121
connection_retry_interval: RETRY_CONNECTIONS_INTERVAL,
122+
boot_timeout: BOOT_TIMEOUT,
125123
}
126124
}
127125

@@ -192,6 +190,18 @@ impl NetworkConfig {
192190
pub fn get_connection_retry_interval(&self) -> u64 {
193191
self.connection_retry_interval
194192
}
193+
194+
/// Configures how long the system will wait (in ms) for the network layer to set-up
195+
///
196+
/// Default value is 5000 ms.
197+
pub fn set_boot_timeout(&mut self, milliseconds: u64) {
198+
self.boot_timeout = milliseconds;
199+
}
200+
201+
/// How long (in ms) the system will wait (in ms) for the network layer to set-up
202+
pub fn get_boot_timeout(&self) -> u64 {
203+
self.boot_timeout
204+
}
195205
}
196206

197207
/// Socket defaults to `127.0.0.1:0` (i.e. a random local port) and protocol is [TCP](Transport::Tcp)
@@ -205,6 +215,7 @@ impl Default for NetworkConfig {
205215
tcp_nodelay: true,
206216
max_connection_retry_attempts: MAX_RETRY_ATTEMPTS,
207217
connection_retry_interval: RETRY_CONNECTIONS_INTERVAL,
218+
boot_timeout: BOOT_TIMEOUT,
208219
}
209220
}
210221
}
@@ -653,15 +664,12 @@ impl NetworkDispatcher {
653664
}
654665
}
655666
ConnectionState::Initializing => {
656-
//debug!(self.ctx.log(), "Connection is initializing; queuing frame");
657667
self.queue_manager.enqueue_data(data, addr);
658668
None
659669
}
660670
ConnectionState::Closed => {
661-
// Enqueue the Frame and request a connection to the destination.
662671
self.queue_manager.enqueue_data(data, addr);
663672
if let Some(bridge) = &self.net_bridge {
664-
// Request a new connection
665673
bridge.connect(Tcp, addr)?;
666674
}
667675
Some(ConnectionState::Initializing)
@@ -829,9 +837,10 @@ impl NetworkDispatcher {
829837
if let Some(state) = self.connections.get_mut(&addr) {
830838
match state {
831839
ConnectionState::Connected => {
832-
debug!(
840+
trace!(
833841
self.ctx.log(),
834-
"Closing channel to connected system {}", addr
842+
"Closing channel to connected system {}",
843+
addr
835844
);
836845
if let Some(bridge) = &self.net_bridge {
837846
while self.queue_manager.has_data(&addr) {
@@ -953,10 +962,9 @@ impl ComponentLifecycle for NetworkDispatcher {
953962

954963
impl Provide<NetworkStatusPort> for NetworkDispatcher {
955964
fn handle(&mut self, event: <NetworkStatusPort as Port>::Request) -> Handled {
956-
trace!(
965+
debug!(
957966
self.ctx.log(),
958-
"Received NetworkStatusPort Request {:?}",
959-
event
967+
"Received NetworkStatusPort Request {:?}", event
960968
);
961969
match event {
962970
NetworkStatusRequest::DisconnectSystem(system_path) => {

core/src/net/buffers/decode_buffer.rs

Lines changed: 46 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -65,11 +65,29 @@ impl DecodeBuffer {
6565
// If the readable portion of the buffer would have less than `encode_buf_min_free_space`
6666
// Or if we would return less than 8 readable bytes we don't allow further writing into
6767
// the current buffer, caller must swap.
68-
// TODO: Define what is a sensible amount of minimum bytes to be read at any given moment.
69-
if self.writeable_len() < 8 {
70-
return None;
68+
if self.is_writeable() {
69+
unsafe { Some(self.buffer.get_slice(self.write_offset, self.buffer.len())) }
70+
} else {
71+
None
7172
}
72-
unsafe { Some(self.buffer.get_slice(self.write_offset, self.buffer.len())) }
73+
}
74+
75+
/// True if there is sufficient amount of writeable bytes
76+
pub(crate) fn is_writeable(&mut self) -> bool {
77+
self.writeable_len() > 8
78+
}
79+
80+
/// Returns true if there is data to be decoded, else false
81+
pub(crate) fn has_frame(&mut self) -> io::Result<bool> {
82+
if self.decode_frame_head().is_err() {
83+
return Err(io::Error::new(io::ErrorKind::InvalidData, "framing error"));
84+
}
85+
if let Some(head) = &self.next_frame_head {
86+
if self.readable_len() >= head.content_length() {
87+
return Ok(true);
88+
}
89+
}
90+
Ok(false)
7391
}
7492

7593
/// Swaps the underlying buffer in place with other
@@ -86,7 +104,10 @@ impl DecodeBuffer {
86104
if let Some(mut overflow_chunk) = overflow {
87105
// TODO: change the config parameter to a separate value?
88106
let overflow_len = overflow_chunk.remaining();
89-
if self.writeable_len() - overflow_len > self.buffer_config.encode_buf_min_free_space {
107+
if overflow_len < self.writeable_len()
108+
&& self.writeable_len() - overflow_len
109+
> self.buffer_config.encode_buf_min_free_space
110+
{
90111
// Just copy the overflow_chunk bytes, no need to chain.
91112
// the overflow must not exceed the new buffers capacity
92113
unsafe {
@@ -145,47 +166,41 @@ impl DecodeBuffer {
145166

146167
/// Tries to decode one frame from the readable part of the buffer
147168
pub fn get_frame(&mut self) -> Result<Frame, FramingError> {
169+
self.decode_frame_head()?;
148170
if let Some(head) = &self.next_frame_head {
149171
if self.readable_len() >= head.content_length() {
150172
let head = self.next_frame_head.take().unwrap();
151-
let chunk_lease = self.read_chunk_lease(head.content_length());
152-
match head.frame_type() {
173+
return match head.frame_type() {
153174
// Frames with empty bodies should be handled in frame-head decoding below.
154175
FrameType::Data => {
155-
Data::decode_from(chunk_lease).map_err(|_| FramingError::InvalidFrame)
176+
Data::decode_from(self.read_chunk_lease(head.content_length()))
177+
.map_err(|_| FramingError::InvalidFrame)
156178
}
157-
FrameType::StreamRequest => StreamRequest::decode_from(chunk_lease)
158-
.map_err(|_| FramingError::InvalidFrame),
159179
FrameType::Hello => {
160-
Hello::decode_from(chunk_lease).map_err(|_| FramingError::InvalidFrame)
180+
Hello::decode_from(self.read_chunk_lease(head.content_length()))
181+
.map_err(|_| FramingError::InvalidFrame)
161182
}
162183
FrameType::Start => {
163-
Start::decode_from(chunk_lease).map_err(|_| FramingError::InvalidFrame)
164-
}
165-
FrameType::Ack => {
166-
Ack::decode_from(chunk_lease).map_err(|_| FramingError::InvalidFrame)
184+
Start::decode_from(self.read_chunk_lease(head.content_length()))
185+
.map_err(|_| FramingError::InvalidFrame)
167186
}
187+
// Frames without content match here for expediency, Decoder doesn't allow 0 length.
188+
FrameType::Bye => Ok(Frame::Bye()),
189+
FrameType::Ack => Ok(Frame::Ack()),
168190
_ => Err(FramingError::UnsupportedFrameType),
169-
}
170-
} else {
171-
Err(FramingError::NoData)
191+
};
172192
}
173-
} else if self.readable_len() >= FRAME_HEAD_LEN as usize {
193+
}
194+
Err(FramingError::NoData)
195+
}
196+
197+
fn decode_frame_head(&mut self) -> Result<(), FramingError> {
198+
if self.next_frame_head.is_none() && self.readable_len() >= FRAME_HEAD_LEN as usize {
174199
let mut chunk_lease = self.read_chunk_lease(FRAME_HEAD_LEN as usize);
175200
let head = FrameHead::decode_from(&mut chunk_lease)?;
176-
if head.content_length() == 0 {
177-
match head.frame_type() {
178-
// Frames without content match here for expediency, Decoder doesn't allow 0 length.
179-
FrameType::Bye => Ok(Frame::Bye()),
180-
_ => Err(FramingError::NoData),
181-
}
182-
} else {
183-
self.next_frame_head = Some(head);
184-
self.get_frame()
185-
}
186-
} else {
187-
Err(FramingError::NoData)
201+
self.next_frame_head = Some(head);
188202
}
203+
Ok(())
189204
}
190205

191206
/// Extracts the readable portion (if any) from the active buffer as a ChunkLease
@@ -198,11 +213,6 @@ impl DecodeBuffer {
198213
}
199214
None
200215
}
201-
202-
/// Destroys the DecodeBuffer and returns the BufferChunk
203-
pub(crate) fn destroy(self) -> BufferChunk {
204-
self.buffer
205-
}
206216
}
207217

208218
#[cfg(test)]

0 commit comments

Comments
 (0)