Skip to content

Commit 110d38a

Browse files
committed
transport: reconnect to the last endpoint without restarting tasks
Avoid restarting transport tasks during client reconnect. Reuse the existing client transport and try the last successful endpoint first. If that fails, continue with the configured connect locators or scouted locators. Restore the connection without replacing the transport object. Closes: #1005 Closes: #1053
1 parent 7a2e422 commit 110d38a

11 files changed

Lines changed: 366 additions & 28 deletions

File tree

.github/workflows/ci.yml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -352,9 +352,10 @@ jobs:
352352
- name: Build project and run tests
353353
run: |
354354
sudo apt install -y ninja-build
355-
CMAKE_GENERATOR=Ninja ASAN=ON make BUILD_TYPE=Debug
355+
CMAKE_GENERATOR=Ninja ASAN=ON ZENOH_DEBUG=3 make BUILD_TYPE=Debug
356356
ninja -C build/ test
357357
python3 ./build/tests/single_thread.py
358+
python3 -u ./build/tests/connection_restore.py ./zenoh-standalone/zenohd --single-thread
358359
timeout-minutes: 15
359360
env:
360361
Z_FEATURE_MULTI_THREAD: 0

include/zenoh-pico/net/session.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,7 @@ typedef struct _z_session_t {
148148

149149
#if Z_FEATURE_AUTO_RECONNECT == 1
150150
_z_network_message_slist_t *_declaration_cache;
151+
_z_string_t _last_connect_locator;
151152
#endif
152153

153154
// Session subscriptions

include/zenoh-pico/transport/manager.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ enum _z_peer_op_e {
3131

3232
z_result_t _z_new_transport(_z_transport_t *zt, const _z_id_t *bs, const _z_string_t *locator, z_whatami_t mode,
3333
int peer_op, const _z_config_t *session_cfg, _z_runtime_t *runtime);
34+
z_result_t _z_reconnect_unicast_client_transport(_z_transport_unicast_t *ztu, const _z_string_t *locator,
35+
const _z_id_t *local_zid, const _z_config_t *session_cfg);
3436
z_result_t _z_new_peer(_z_transport_t *zt, const _z_id_t *session_id, const _z_string_t *locator,
3537
const _z_config_t *session_cfg);
3638
bool _z_transport_open_error_is_retryable(z_result_t ret);

include/zenoh-pico/transport/unicast/transport.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ z_result_t _z_unicast_open_peer(_z_transport_unicast_establish_param_t *param, c
3131
const _z_id_t *local_zid, int peer_op, _z_sys_net_socket_t *socket);
3232
z_result_t _z_unicast_send_close(_z_transport_unicast_t *ztu, uint8_t reason, bool link_only);
3333
z_result_t _z_unicast_transport_close(_z_transport_unicast_t *ztu, uint8_t reason);
34+
void _z_unicast_transport_clear_connection(_z_transport_unicast_t *ztu);
35+
void _z_unicast_transport_replace_connection(_z_transport_unicast_t *dst, _z_transport_unicast_t *src);
3436
void _z_unicast_transport_clear(_z_transport_unicast_t *ztu);
3537

3638
#ifdef __cplusplus

src/net/session.c

Lines changed: 163 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
#include "zenoh-pico/transport/unicast.h"
4040
#include "zenoh-pico/transport/unicast/lease.h"
4141
#include "zenoh-pico/transport/unicast/read.h"
42+
#include "zenoh-pico/transport/unicast/transport.h"
4243
#include "zenoh-pico/utils/config.h"
4344
#include "zenoh-pico/utils/logging.h"
4445
#include "zenoh-pico/utils/result.h"
@@ -130,6 +131,17 @@ static z_result_t _z_config_get_mode(const _z_config_t *config, z_whatami_t *mod
130131
return ret;
131132
}
132133

134+
#if Z_FEATURE_AUTO_RECONNECT == 1
135+
static void _z_session_update_last_connect_locator(_z_session_t *s, const _z_string_t *locator) {
136+
_z_string_t copy = _z_string_null();
137+
if (_z_string_copy(&copy, locator) != _Z_RES_OK) {
138+
return;
139+
}
140+
_z_string_clear(&s->_last_connect_locator);
141+
(void)_z_string_move(&s->_last_connect_locator, &copy);
142+
}
143+
#endif
144+
133145
static z_result_t _z_open_inner(_z_session_rc_t *zs, _z_string_t *locator, const _z_id_t *zid, int peer_op,
134146
const _z_config_t *config) {
135147
z_result_t ret = _Z_RES_OK;
@@ -141,6 +153,11 @@ static z_result_t _z_open_inner(_z_session_rc_t *zs, _z_string_t *locator, const
141153
}
142154
_z_transport_get_common(&zn->_tp)->_session = _z_session_rc_clone_as_weak(zs);
143155
_z_transport_get_common(&zn->_tp)->_state = _Z_TRANSPORT_STATE_OPEN;
156+
#if Z_FEATURE_AUTO_RECONNECT == 1
157+
if ((zn->_mode == Z_WHATAMI_CLIENT) && (peer_op == _Z_PEER_OP_OPEN)) {
158+
_z_session_update_last_connect_locator(zn, locator);
159+
}
160+
#endif
144161
#if Z_FEATURE_MULTICAST_DECLARATIONS == 1
145162
if (zn->_tp._type == _Z_TRANSPORT_MULTICAST_TYPE) {
146163
ret = _z_interest_pull_resource_from_peers(zn);
@@ -542,6 +559,116 @@ z_result_t _z_open(_z_session_rc_t *zn, _z_config_t *config, const _z_id_t *zid)
542559
}
543560

544561
#if Z_FEATURE_AUTO_RECONNECT == 1
562+
static z_result_t _z_reconnect_append_unique_locator(_z_string_svec_t *dst, const _z_string_t *locator) {
563+
if ((locator == NULL) || !_z_string_check(locator)) {
564+
return _Z_RES_OK;
565+
}
566+
567+
size_t len = _z_string_svec_len(dst);
568+
for (size_t i = 0; i < len; i++) {
569+
if (_z_string_equals(_z_string_svec_get(dst, i), locator)) {
570+
return _Z_RES_OK;
571+
}
572+
}
573+
574+
_z_string_t copy = _z_string_null();
575+
_Z_RETURN_IF_ERR(_z_string_copy(&copy, locator));
576+
z_result_t ret = _z_string_svec_append(dst, &copy, false);
577+
if (ret != _Z_RES_OK) {
578+
_z_string_clear(&copy);
579+
}
580+
return ret;
581+
}
582+
583+
static z_result_t _z_client_reconnect_candidates(_z_session_t *s, _z_string_svec_t *candidates) {
584+
_z_string_svec_t listen_locators = _z_string_svec_null();
585+
_z_string_svec_t connect_locators = _z_string_svec_null();
586+
_z_string_svec_t scouted_locators = _z_string_svec_null();
587+
588+
*candidates = _z_string_svec_null();
589+
z_result_t ret = _z_reconnect_append_unique_locator(candidates, &s->_last_connect_locator);
590+
if (ret != _Z_RES_OK) {
591+
return ret;
592+
}
593+
594+
ret = _z_config_get_all(&s->_config, &listen_locators, Z_CONFIG_LISTEN_KEY);
595+
if (ret != _Z_RES_OK) {
596+
goto exit;
597+
}
598+
ret = _z_config_get_all(&s->_config, &connect_locators, Z_CONFIG_CONNECT_KEY);
599+
if (ret != _Z_RES_OK) {
600+
goto exit;
601+
}
602+
603+
size_t connect_len = _z_string_svec_len(&connect_locators);
604+
for (size_t i = 0; i < connect_len; i++) {
605+
ret = _z_reconnect_append_unique_locator(candidates, _z_string_svec_get(&connect_locators, i));
606+
if (ret != _Z_RES_OK) {
607+
goto exit;
608+
}
609+
}
610+
611+
if ((connect_len == 0) && (_z_string_svec_len(&listen_locators) == 0)) {
612+
ret = _z_locators_by_scout(&s->_config, &s->_local_zid, &scouted_locators);
613+
if (ret != _Z_RES_OK) {
614+
goto exit;
615+
}
616+
size_t scouted_len = _z_string_svec_len(&scouted_locators);
617+
for (size_t i = 0; i < scouted_len; i++) {
618+
ret = _z_reconnect_append_unique_locator(candidates, _z_string_svec_get(&scouted_locators, i));
619+
if (ret != _Z_RES_OK) {
620+
goto exit;
621+
}
622+
}
623+
}
624+
625+
exit:
626+
_z_string_svec_clear(&listen_locators);
627+
_z_string_svec_clear(&connect_locators);
628+
_z_string_svec_clear(&scouted_locators);
629+
630+
if (ret != _Z_RES_OK) {
631+
_z_string_svec_clear(candidates);
632+
return ret;
633+
}
634+
return _z_string_svec_is_empty(candidates) ? _Z_ERR_CONFIG_LOCATOR_INVALID : _Z_RES_OK;
635+
}
636+
637+
static z_result_t _z_client_reopen_unicast(_z_session_t *s) {
638+
_z_string_svec_t candidates = _z_string_svec_null();
639+
_Z_RETURN_IF_ERR(_z_client_reconnect_candidates(s, &candidates));
640+
641+
z_result_t ret = _Z_ERR_TRANSPORT_OPEN_FAILED;
642+
z_result_t last_retryable_ret = _Z_RES_OK;
643+
z_result_t last_non_retryable_ret = _Z_RES_OK;
644+
size_t len = _z_string_svec_len(&candidates);
645+
for (size_t i = 0; i < len; i++) {
646+
_z_string_t *locator = _z_string_svec_get(&candidates, i);
647+
ret = _z_reconnect_unicast_client_transport(&s->_tp._transport._unicast, locator, &s->_local_zid, &s->_config);
648+
if (ret == _Z_RES_OK) {
649+
_z_session_update_last_connect_locator(s, locator);
650+
break;
651+
}
652+
if (_z_transport_open_error_is_retryable(ret)) {
653+
last_retryable_ret = ret;
654+
continue;
655+
}
656+
last_non_retryable_ret = ret;
657+
}
658+
659+
_z_string_svec_clear(&candidates);
660+
if (ret == _Z_RES_OK) {
661+
return _Z_RES_OK;
662+
}
663+
if (last_retryable_ret != _Z_RES_OK) {
664+
return last_retryable_ret;
665+
}
666+
if (last_non_retryable_ret != _Z_RES_OK) {
667+
return last_non_retryable_ret;
668+
}
669+
return _Z_ERR_TRANSPORT_OPEN_FAILED;
670+
}
671+
545672
void _z_client_reopen_task_drop(void *ztc_arg) {
546673
_z_transport_common_t *tc = (_z_transport_common_t *)ztc_arg;
547674
if (tc->_state == _Z_TRANSPORT_STATE_RECONNECTING) {
@@ -555,47 +682,66 @@ _z_fut_fn_result_t _z_client_reopen_task_fn(void *ztc_arg, _z_executor_t *execut
555682
_z_transport_common_t *tc = (_z_transport_common_t *)ztc_arg;
556683
_z_transport_tasks_t tasks_handles = tc->_tasks;
557684
_z_session_rc_t zs = _z_session_weak_upgrade(&tc->_session); // should not fail
685+
if (_Z_RC_IS_NULL(&zs)) {
686+
return _z_fut_fn_result_ready();
687+
}
558688
_z_session_t *s = _Z_RC_IN_VAL(&zs);
559-
_z_session_weak_drop(&tc->_session);
689+
bool in_place_unicast = (s->_mode == Z_WHATAMI_CLIENT) && (s->_tp._type == _Z_TRANSPORT_UNICAST_TYPE) &&
690+
(tc == &s->_tp._transport._unicast._common);
691+
if (!in_place_unicast) {
692+
_z_session_weak_drop(&tc->_session);
693+
}
560694

561695
if (_z_config_is_empty(&s->_config)) {
562696
_z_session_rc_drop(&zs);
563697
return _z_fut_fn_result_ready();
564698
}
699+
565700
_z_session_transport_mutex_lock(s);
566-
z_result_t ret = _z_open(&zs, &s->_config, &s->_local_zid);
567-
_z_session_transport_mutex_unlock(s);
701+
z_result_t ret = in_place_unicast ? _z_client_reopen_unicast(s) : _z_open(&zs, &s->_config, &s->_local_zid);
568702
if (ret != _Z_RES_OK) {
569-
if (ret == _Z_ERR_TRANSPORT_OPEN_FAILED || ret == _Z_ERR_SCOUT_NO_RESULTS ||
570-
ret == _Z_ERR_TRANSPORT_TX_FAILED || ret == _Z_ERR_TRANSPORT_RX_FAILED ||
571-
ret == _Z_ERR_TRANSPORT_RX_DURATION_EXPIRED || ret == _Z_ERR_TRANSPORT_OPEN_PARTIAL_CONNECTIVITY) {
703+
_z_session_transport_mutex_unlock(s);
704+
if ((ret == _Z_ERR_TRANSPORT_OPEN_FAILED) || (ret == _Z_ERR_SCOUT_NO_RESULTS) ||
705+
(ret == _Z_ERR_TRANSPORT_TX_FAILED) || (ret == _Z_ERR_TRANSPORT_RX_FAILED) ||
706+
(ret == _Z_ERR_TRANSPORT_RX_DURATION_EXPIRED) || (ret == _Z_ERR_TRANSPORT_OPEN_PARTIAL_CONNECTIVITY)) {
572707
_Z_DEBUG("Reopen failed, next try in 1s");
573-
tc->_session = _z_session_rc_clone_as_weak(&zs);
708+
if (!in_place_unicast) {
709+
tc->_session = _z_session_rc_clone_as_weak(&zs);
710+
}
574711
tc->_state = _Z_TRANSPORT_STATE_RECONNECTING;
575712
tc->_tasks = tasks_handles;
576713
_z_session_rc_drop(&zs);
577714
return _z_fut_fn_result_wake_up_after(1000);
578-
} else {
579-
_Z_ERROR("Reopen failed, will not retry");
580-
tc->_state = _Z_TRANSPORT_STATE_CLOSED;
581-
_z_session_rc_drop(&zs);
582-
return _z_fut_fn_result_ready();
583715
}
716+
_Z_ERROR("Reopen failed, will not retry");
717+
tc->_state = _Z_TRANSPORT_STATE_CLOSED;
718+
_z_session_rc_drop(&zs);
719+
return _z_fut_fn_result_ready();
584720
}
585-
721+
tc = _z_transport_get_common(&s->_tp);
586722
tc->_tasks = tasks_handles;
723+
_z_session_transport_mutex_unlock(s);
724+
587725
if (!_z_network_message_slist_is_empty(s->_declaration_cache)) {
588726
_z_network_message_slist_t *iter = s->_declaration_cache;
589727
while (iter != NULL) {
590728
_z_network_message_t *n_msg = _z_network_message_slist_value(iter);
591729
ret = _z_send_n_msg(s, n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK, NULL);
592730
if (ret != _Z_RES_OK) {
593731
_Z_DEBUG("Send message during reopen failed: %i", ret);
594-
_z_transport_clear(&s->_tp);
595-
tc->_session = _z_session_rc_clone_as_weak(&zs);
596-
tc->_state = _Z_TRANSPORT_STATE_RECONNECTING;
732+
_z_session_transport_mutex_lock(s);
733+
if (in_place_unicast && s->_tp._type == _Z_TRANSPORT_UNICAST_TYPE) {
734+
tc = &s->_tp._transport._unicast._common;
735+
_z_unicast_transport_clear_connection(&s->_tp._transport._unicast);
736+
} else {
737+
_z_transport_clear(&s->_tp);
738+
tc->_session = _z_session_rc_clone_as_weak(&zs);
739+
tc->_state = _Z_TRANSPORT_STATE_RECONNECTING;
740+
}
741+
tc->_tasks = tasks_handles;
742+
_z_session_transport_mutex_unlock(s);
597743
_z_session_rc_drop(&zs);
598-
return _z_fut_fn_result_continue();
744+
return _z_fut_fn_result_wake_up_after(1000);
599745
}
600746

601747
iter = _z_network_message_slist_next(iter);

src/session/utils.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ z_result_t _z_session_init(_z_session_t *zn, const _z_id_t *zid) {
8484
_z_config_init(&zn->_config);
8585
#if Z_FEATURE_AUTO_RECONNECT == 1
8686
zn->_declaration_cache = NULL;
87+
zn->_last_connect_locator = _z_string_null();
8788
#endif
8889

8990
// Initialize the data structs
@@ -180,6 +181,7 @@ z_result_t _z_session_close(_z_session_t *zn) {
180181
_Z_RETURN_IF_ERR(_z_session_mutex_lock(zn));
181182
#if Z_FEATURE_AUTO_RECONNECT == 1
182183
_z_network_message_slist_free(&zn->_declaration_cache);
184+
_z_string_clear(&zn->_last_connect_locator);
183185
#endif
184186
_z_session_mutex_unlock(zn);
185187
_z_flush_local_resources(zn);

src/transport/common/tx.c

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -273,8 +273,12 @@ z_result_t _z_transport_tx_send_t_msg(_z_transport_common_t *ztc, const _z_trans
273273
// If sending to a peer list, make sure the peer mutex is locked
274274
_z_transport_tx_mutex_lock(ztc, true);
275275

276-
ret = _z_transport_tx_send_t_msg_inner(ztc, t_msg, peers);
276+
if ((ztc->_state != _Z_TRANSPORT_STATE_OPEN) || (ztc->_link == NULL)) {
277+
_z_transport_tx_mutex_unlock(ztc);
278+
return _Z_ERR_TRANSPORT_NOT_AVAILABLE;
279+
}
277280

281+
ret = _z_transport_tx_send_t_msg_inner(ztc, t_msg, peers);
278282
_z_transport_tx_mutex_unlock(ztc);
279283
return ret;
280284
}
@@ -298,6 +302,13 @@ static z_result_t _z_transport_tx_send_n_msg(_z_transport_common_t *ztc, const _
298302
return ret;
299303
}
300304
// Process message
305+
if ((ztc->_state != _Z_TRANSPORT_STATE_OPEN) || (ztc->_link == NULL)) {
306+
if (!_z_transport_batch_hold_tx_mutex()) {
307+
_z_transport_tx_mutex_unlock(ztc);
308+
}
309+
return _Z_ERR_TRANSPORT_NOT_AVAILABLE;
310+
}
311+
301312
ret = _z_transport_tx_send_n_msg_inner(ztc, n_msg, reliability, peers);
302313
if (!_z_transport_batch_hold_tx_mutex()) {
303314
_z_transport_tx_mutex_unlock(ztc);
@@ -320,6 +331,13 @@ static z_result_t _z_transport_tx_send_n_batch(_z_transport_common_t *ztc, z_con
320331
return ret;
321332
}
322333
// Send batch
334+
if ((ztc->_state != _Z_TRANSPORT_STATE_OPEN) || (ztc->_link == NULL)) {
335+
if (!_z_transport_batch_hold_tx_mutex()) {
336+
_z_transport_tx_mutex_unlock(ztc);
337+
}
338+
return _Z_ERR_TRANSPORT_NOT_AVAILABLE;
339+
}
340+
323341
_Z_DEBUG("Send network batch");
324342
ret = _z_transport_tx_flush_buffer(ztc, peers);
325343
if (!_z_transport_batch_hold_tx_mutex()) {
@@ -391,6 +409,10 @@ z_result_t _z_send_t_msg(_z_transport_t *zt, const _z_transport_message_t *t_msg
391409
ret = _z_transport_tx_send_t_msg(&zt->_transport._multicast._common, t_msg, NULL);
392410
break;
393411
case _Z_TRANSPORT_RAWETH_TYPE:
412+
if (zt->_transport._raweth._common._state != _Z_TRANSPORT_STATE_OPEN) {
413+
ret = _Z_ERR_TRANSPORT_NOT_AVAILABLE;
414+
break;
415+
}
394416
ret = _z_raweth_send_t_msg(&zt->_transport._raweth._common, t_msg);
395417
break;
396418
default:
@@ -530,6 +552,10 @@ z_result_t _z_send_n_msg(_z_session_t *zn, const _z_network_message_t *z_msg, z_
530552
_z_transport_tx_send_n_msg(&zn->_tp._transport._multicast._common, z_msg, reliability, cong_ctrl, NULL);
531553
break;
532554
case _Z_TRANSPORT_RAWETH_TYPE:
555+
if (zn->_tp._transport._raweth._common._state != _Z_TRANSPORT_STATE_OPEN) {
556+
ret = _Z_ERR_TRANSPORT_NOT_AVAILABLE;
557+
break;
558+
}
533559
ret = _z_raweth_send_n_msg(zn, z_msg, reliability, cong_ctrl);
534560
break;
535561
default:

0 commit comments

Comments
 (0)