From 90f806142304b85e703720a9cb1cddff077e17e4 Mon Sep 17 00:00:00 2001 From: nanang Date: Wed, 1 Apr 2026 11:06:00 +0700 Subject: [PATCH 1/3] Fix incorrect PJ_EPENDING check in ioq_tcp test Was comparing bytes (pj_ssize_t data length) against PJ_EPENDING instead of status. Pre-existing bug unrelated to #4878. Co-Authored-By: Claude Code --- pjlib/src/pjlib-test/ioq_tcp.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pjlib/src/pjlib-test/ioq_tcp.c b/pjlib/src/pjlib-test/ioq_tcp.c index 06b56a9518..5536be1088 100644 --- a/pjlib/src/pjlib-test/ioq_tcp.c +++ b/pjlib/src/pjlib-test/ioq_tcp.c @@ -167,7 +167,7 @@ static int send_recv_test(pj_ioqueue_t *ioque, // Starts send on the client side. bytes = bufsize; status = pj_ioqueue_send(ckey, &write_op, send_buf, &bytes, 0); - if (status != PJ_SUCCESS && bytes != PJ_EPENDING) { + if (status != PJ_SUCCESS && status != PJ_EPENDING) { return -120; } if (status == PJ_EPENDING) { From aacb3bc72dac0e5a14b5c5f95494b0ad92c82f24 Mon Sep 17 00:00:00 2001 From: nanang Date: Wed, 1 Apr 2026 11:06:16 +0700 Subject: [PATCH 2/3] Fix write callback not invoked on ioqueue unregister (#4864, #4878) When pj_ioqueue_unregister() is called with pending async writes, on_write_complete callbacks were silently skipped. Upper layers (e.g., SIP transport) rely on these callbacks to release resources such as tdata references, causing reference leaks under high load. The fix drains pending write callbacks during key unregistration: - write_cb_list (completed ops, deferred callback): invoked with op->written (success status), respecting write_callback_thread serialization. - write_list (pending ops, never sent): invoked with -PJ_ECANCELLED. Changes across all maintained ioqueue backends (epoll, kqueue, select, IOCP). Also includes: - PJ_IOQUEUE_FAST_TRACK config to disable fast-track for testing - activesock: forward on_data_sent callback when SHUT_TX is set - Regression test for send callbacks on activesock close Co-Authored-By: Nathan Monfils Co-Authored-By: Claude Code --- pjlib/include/pj/config.h | 15 +++ pjlib/src/pj/activesock.c | 14 ++- pjlib/src/pj/ioqueue_common_abs.c | 118 +++++++++++++++++++++++- pjlib/src/pj/ioqueue_epoll.c | 3 + pjlib/src/pj/ioqueue_kqueue.c | 11 ++- pjlib/src/pj/ioqueue_select.c | 19 ++-- pjlib/src/pj/ioqueue_winnt.c | 118 +++++++++++++++++++----- pjlib/src/pjlib-test/activesock.c | 147 ++++++++++++++++++++++++++++++ 8 files changed, 403 insertions(+), 42 deletions(-) diff --git a/pjlib/include/pj/config.h b/pjlib/include/pj/config.h index 8ffab4921e..6b7beb66f0 100644 --- a/pjlib/include/pj/config.h +++ b/pjlib/include/pj/config.h @@ -825,6 +825,21 @@ # define PJ_IOQUEUE_CALLBACK_NO_LOCK 1 #endif +/** + * Enable the ioqueue "fast track" optimization. When enabled (default), + * pj_ioqueue_send(), pj_ioqueue_sendto(), and pj_ioqueue_accept() attempt + * the operation immediately before queuing it for async processing. + * + * This should not be disabled in production. Setting to 0 forces all + * operations through the async path, intended only for debugging and + * testing the ioqueue completion callback logic. See #4864, #4878. + * + * Default: 1 (enabled). + */ +#ifndef PJ_IOQUEUE_FAST_TRACK +# define PJ_IOQUEUE_FAST_TRACK 1 +#endif + /** * Determine if FD_SETSIZE is changeable/set-able. If so, then we will diff --git a/pjlib/src/pj/activesock.c b/pjlib/src/pj/activesock.c index c1c4a09856..2a0902cf23 100644 --- a/pjlib/src/pj/activesock.c +++ b/pjlib/src/pj/activesock.c @@ -773,12 +773,18 @@ static void ioqueue_on_write_complete(pj_ioqueue_key_t *key, asock = (pj_activesock_t*) pj_ioqueue_get_user_data(key); - /* Ignore if we've been shutdown. This may cause data to be partially - * sent even when 'wholedata' was requested if the OS only sent partial - * buffer. + /* If we've been shutdown, still invoke the callback so upper layers + * can release resources (e.g., pjsip_tx_data_dec_ref). See #4878. + * + * Note: asock->key may be NULL at this point (cleared by + * pj_activesock_close before pj_ioqueue_unregister). The callback + * must not attempt further I/O on this active socket. */ - if (asock->shutdown & SHUT_TX) + if (asock->shutdown & SHUT_TX) { + if (asock->cb.on_data_sent) + (*asock->cb.on_data_sent)(asock, op_key, bytes_sent); return; + } if (bytes_sent > 0 && op_key->activesock_data) { /* whole_data is requested. Make sure we send all the data */ diff --git a/pjlib/src/pj/ioqueue_common_abs.c b/pjlib/src/pj/ioqueue_common_abs.c index f7d8a16323..5b234b11d4 100644 --- a/pjlib/src/pj/ioqueue_common_abs.c +++ b/pjlib/src/pj/ioqueue_common_abs.c @@ -443,9 +443,12 @@ static pj_bool_t ioqueue_dispatch_write_event( pj_ioqueue_t *ioqueue, #endif } - /* Call callback. */ - if (h->cb.on_write_complete && !IS_CLOSING(h)) { - (*h->cb.on_write_complete)(h, + /* Call callback. Do not skip when IS_CLOSING, as upper + * layers need the callback to release resources (e.g., + * pjsip_tx_data_dec_ref). See #4878. + */ + if (h->cb.on_write_complete) { + (*h->cb.on_write_complete)(h, (pj_ioqueue_op_key_t*)write_op, write_op->written); } @@ -538,7 +541,7 @@ static unsigned ioqueue_dispatch_write_event_no_lock(pj_ioqueue_key_t* h, /* Check if there is any pending write callback for this key. */ pj_ioqueue_lock_key(h); - if (!IS_CLOSING(h) && !pj_list_empty(&h->write_cb_list) && + if (!pj_list_empty(&h->write_cb_list) && (max_event == 0 || event_cnt < max_event)) { write_op = h->write_cb_list.next; @@ -569,6 +572,107 @@ static unsigned ioqueue_dispatch_write_event_no_lock(pj_ioqueue_key_t* h, } #endif +/* + * Drain pending write callbacks during key unregistration. + * + * This ensures upper layers (e.g., SIP transport) receive on_write_complete + * callbacks for all pending writes, allowing them to release resources such + * as tdata references. Without this, pending writes are silently discarded + * when a key is unregistered, causing reference leaks. See #4864, #4878. + * + * Two lists must be drained in order: + * 1. write_cb_list (completed ops, deferred callback) - with op->written + * 2. write_list (pending ops, never sent) - with -PJ_ECANCELLED + * + * Must be called after key->closing is set and key is unlocked. + * Not used by IOCP backend (which has its own mechanism). + */ +static void ioqueue_drain_pending_writes(pj_ioqueue_key_t *key) +{ +#if PJ_IOQUEUE_CALLBACK_NO_LOCK + pj_ioqueue_lock_key(key); + + if (key->write_callback_thread == pj_thread_this()) { + /* We are the callback thread (unregister called from within a + * write callback). Drain write_cb_list ourselves with success + * status, matching the pattern in + * ioqueue_dispatch_write_event_no_lock(). + */ + while (!pj_list_empty(&key->write_cb_list)) { + struct write_operation *op = key->write_cb_list.next; + void (*on_wr_complete)(pj_ioqueue_key_t*, + pj_ioqueue_op_key_t*, + pj_ssize_t); + + pj_list_erase(op); + on_wr_complete = key->cb.on_write_complete; + if (key->grp_lock) + pj_grp_lock_add_ref(key->grp_lock); + pj_ioqueue_unlock_key(key); + + if (on_wr_complete) { + (*on_wr_complete)(key, (pj_ioqueue_op_key_t*)op, + op->written); + } + if (key->grp_lock) + pj_grp_lock_dec_ref(key->grp_lock); + pj_ioqueue_lock_key(key); + } + pj_ioqueue_unlock_key(key); + } else if (key->write_callback_thread) { + /* Another thread is the callback thread. Wait for it to finish + * draining write_cb_list. The IS_CLOSING check has been removed + * from ioqueue_dispatch_write_event_no_lock() so the callback + * thread will continue to drain even after closing is set. + */ + unsigned counter = 0; + + while (key->write_callback_thread) { + pj_ioqueue_unlock_key(key); + pj_thread_sleep(10); + pj_ioqueue_lock_key(key); + if (++counter > 100) { + PJ_LOG(1, (THIS_FILE, "Timeout waiting for write " + "callback to drain on socket=%ld", + (long)key->fd)); + break; + } + } + pj_ioqueue_unlock_key(key); + } else { + pj_ioqueue_unlock_key(key); + } +#endif + + /* Walk write_list with -PJ_ECANCELLED (pending ops, never sent). + * + * Safe to walk without lock because: + * - closing=1 was set while key was locked, so any dispatch thread + * that subsequently acquires the lock sees IS_CLOSING and returns + * without touching write_list. + * - Any in-flight dispatch already dequeued its op from write_list + * while holding the lock, before unlocking for the callback. + * - pj_ioqueue_send() checks IS_CLOSING under lock, preventing + * new entries. + * - The socket fd was removed from the polling set before closing + * was set, so no new writable events will be dispatched. + */ + while (!pj_list_empty(&key->write_list)) { + struct write_operation *op = key->write_list.next; + + pj_list_erase(op); + if (key->grp_lock) + pj_grp_lock_add_ref(key->grp_lock); + + if (key->cb.on_write_complete) { + (*key->cb.on_write_complete)(key, (pj_ioqueue_op_key_t*)op, + -(pj_ssize_t)PJ_ECANCELLED); + } + if (key->grp_lock) + pj_grp_lock_dec_ref(key->grp_lock); + } +} + static pj_bool_t ioqueue_dispatch_read_event( pj_ioqueue_t *ioqueue, pj_ioqueue_key_t *h ) { @@ -1055,6 +1159,7 @@ PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key, /* We can not use PJ_IOQUEUE_ALWAYS_ASYNC for socket write. */ flags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC); +#if PJ_IOQUEUE_FAST_TRACK /* Fast track: * Try to send data immediately, only if there's no pending write! * Note: @@ -1088,6 +1193,7 @@ PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key, } } } +#endif /* * Schedule asynchronous send. @@ -1186,6 +1292,7 @@ PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key, /* We can not use PJ_IOQUEUE_ALWAYS_ASYNC for socket write */ flags &= ~(PJ_IOQUEUE_ALWAYS_ASYNC); +#if PJ_IOQUEUE_FAST_TRACK /* Fast track: * Try to send data immediately, only if there's no pending write! * Note: @@ -1238,6 +1345,7 @@ PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key, } } } +#endif /* * Check that address storage can hold the address parameter. @@ -1331,6 +1439,7 @@ PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key, accept_op = (struct accept_operation*)op_key; PJ_ASSERT_RETURN(accept_op->op == PJ_IOQUEUE_OP_NONE, PJ_EPENDING); +#if PJ_IOQUEUE_FAST_TRACK /* Fast track: * See if there's new connection available immediately. */ @@ -1356,6 +1465,7 @@ PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key, } } } +#endif pj_ioqueue_lock_key(key); /* Check again. Handle may have been closed after the previous check diff --git a/pjlib/src/pj/ioqueue_epoll.c b/pjlib/src/pj/ioqueue_epoll.c index 67aec1bd88..a31ca40a86 100644 --- a/pjlib/src/pj/ioqueue_epoll.c +++ b/pjlib/src/pj/ioqueue_epoll.c @@ -667,6 +667,9 @@ PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key) pj_ioqueue_unlock_key(key); #if PJ_IOQUEUE_HAS_SAFE_UNREG + /* Drain pending write callbacks. See #4864, #4878. */ + ioqueue_drain_pending_writes(key); + /* Decrement counter. */ decrement_counter(key); #else diff --git a/pjlib/src/pj/ioqueue_kqueue.c b/pjlib/src/pj/ioqueue_kqueue.c index e3937e4a0a..35cd3af624 100644 --- a/pjlib/src/pj/ioqueue_kqueue.c +++ b/pjlib/src/pj/ioqueue_kqueue.c @@ -452,22 +452,23 @@ PJ_DEF(pj_status_t) pj_ioqueue_unregister(pj_ioqueue_key_t *key) /* Mark key is closing. */ key->closing = 1; + pj_ioqueue_unlock_key(key); + + /* Drain pending write callbacks. See #4864, #4878. */ + ioqueue_drain_pending_writes(key); + /* Decrement counter. */ decrement_counter(key); /* Done. */ if (key->grp_lock) { - /* just dec_ref and unlock. we will set grp_lock to NULL - * elsewhere */ + /* just dec_ref. we will set grp_lock to NULL elsewhere */ pj_grp_lock_t *grp_lock = key->grp_lock; // Don't set grp_lock to NULL otherwise the other thread // will crash. Just leave it as dangling pointer, but this // should be safe // key->grp_lock = NULL; pj_grp_lock_dec_ref_dbg(grp_lock, "ioqueue", 0); - pj_grp_lock_release(grp_lock); - } else { - pj_ioqueue_unlock_key(key); } #else if (key->grp_lock) { diff --git a/pjlib/src/pj/ioqueue_select.c b/pjlib/src/pj/ioqueue_select.c index 23d782fb2e..572df1eaf6 100644 --- a/pjlib/src/pj/ioqueue_select.c +++ b/pjlib/src/pj/ioqueue_select.c @@ -543,12 +543,6 @@ PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key) key->fd = PJ_INVALID_SOCKET; } - /* Clear callback */ - key->cb.on_accept_complete = NULL; - key->cb.on_connect_complete = NULL; - key->cb.on_read_complete = NULL; - key->cb.on_write_complete = NULL; - /* Must release ioqueue lock first before decrementing counter, to * prevent deadlock. */ @@ -559,6 +553,19 @@ PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key) pj_ioqueue_unlock_key(key); +#if PJ_IOQUEUE_HAS_SAFE_UNREG + /* Drain pending write callbacks. See #4864, #4878. + * Must be done before clearing callbacks below. + */ + ioqueue_drain_pending_writes(key); +#endif + + /* Clear callback */ + key->cb.on_accept_complete = NULL; + key->cb.on_connect_complete = NULL; + key->cb.on_read_complete = NULL; + key->cb.on_write_complete = NULL; + #if PJ_IOQUEUE_HAS_SAFE_UNREG /* Decrement counter. */ decrement_counter(key); diff --git a/pjlib/src/pj/ioqueue_winnt.c b/pjlib/src/pj/ioqueue_winnt.c index 29d66b1ff0..cfe99f3034 100644 --- a/pjlib/src/pj/ioqueue_winnt.c +++ b/pjlib/src/pj/ioqueue_winnt.c @@ -859,12 +859,34 @@ static pj_status_t cancel_all_pending_op(pj_ioqueue_key_t *key) decrement_counter(key); } - /* Clear any pending write callbacks */ - while (!pj_list_empty(&key->write_cb_list)) { - struct pending_op *op = key->write_cb_list.next; - pj_list_erase(op); - pj_list_push_back(&key->free_pending_list, op); - decrement_counter(key); + /* Do NOT clear write_cb_list here. The write_callback_thread will + * drain it (IS_CLOSING check removed from + * ioqueue_dispatch_write_event_no_lock). See #4864, #4878. + * + * Exception: if WE are the write_callback_thread (unregister called + * from within a write callback), drain write_cb_list ourselves now. + * After we return, pj_ioqueue_unregister() will clear the callback + * pointers, and dispatch_write_event_no_lock would see NULL callback. + */ + if (key->write_callback_thread == pj_thread_this()) { + while (!pj_list_empty(&key->write_cb_list)) { + struct pending_op *op = key->write_cb_list.next; + void (*on_wr_complete)(pj_ioqueue_key_t*, + pj_ioqueue_op_key_t*, + pj_ssize_t); + + pj_list_erase(op); + on_wr_complete = key->cb.on_write_complete; + OPKEY_OPERATION(op->app_op_key) = 0; + pj_ioqueue_unlock_key(key); + + if (on_wr_complete) { + on_wr_complete(key, op->app_op_key, + op->pending_key.overlapped.bytes_read); + } + release_pending_op(key, op); + pj_ioqueue_lock_key(key); + } } /* Wait until any read callback is finished */ @@ -880,7 +902,7 @@ static pj_status_t cancel_all_pending_op(pj_ioqueue_key_t *key) pj_ioqueue_unlock_key(key); pj_thread_sleep(10); pj_ioqueue_lock_key(key); - + /* Clear any pending read callbacks again */ while (!pj_list_empty(&key->read_cb_list)) { struct pending_op *op = key->read_cb_list.next; @@ -898,7 +920,9 @@ static pj_status_t cancel_all_pending_op(pj_ioqueue_key_t *key) } } while (0); - /* Wait until any write callback is finished */ + /* Wait until any write callback is finished. The callback thread + * will drain write_cb_list (completed ops with success status). + */ do { unsigned counter = 0; @@ -911,14 +935,6 @@ static pj_status_t cancel_all_pending_op(pj_ioqueue_key_t *key) pj_ioqueue_unlock_key(key); pj_thread_sleep(10); pj_ioqueue_lock_key(key); - - /* Clear any pending write callbacks again */ - while (!pj_list_empty(&key->write_cb_list)) { - struct pending_op *op = key->write_cb_list.next; - pj_list_erase(op); - pj_list_push_back(&key->free_pending_list, op); - decrement_counter(key); - } /* Timeout after ~1 second */ if (++counter > 100) { @@ -1007,7 +1023,7 @@ static unsigned ioqueue_dispatch_write_event_no_lock(pj_ioqueue_key_t* h, /* Check if there is any pending write callback for this key. */ pj_ioqueue_lock_key(h); - if (!h->closing && !pj_list_empty(&h->write_cb_list) && + if (!pj_list_empty(&h->write_cb_list) && (max_event == 0 || event_cnt < max_event)) { write_op = h->write_cb_list.next; @@ -1106,13 +1122,30 @@ static pj_bool_t poll_iocp( HANDLE hIocp, DWORD dwTimeout, return PJ_TRUE; } - /* We shouldn't call callbacks if key is quitting. */ + /* When key is closing, still invoke write callbacks so upper + * layers can release resources (e.g., pjsip_tx_data_dec_ref). + * Read/accept callbacks can be safely skipped. See #4878. + */ if (key->closing) { - release_pending_op(key, op); + if (operation == PJ_IOQUEUE_OP_WRITE || + operation == PJ_IOQUEUE_OP_SEND || + operation == PJ_IOQUEUE_OP_SEND_TO) + { + increment_counter(key); + OPKEY_OPERATION(op_key) = 0; + if (key->cb.on_write_complete) { + key->cb.on_write_complete(key, op_key, + -(pj_ssize_t)PJ_ECANCELLED); + } + release_pending_op(key, op); + decrement_counter(key); + } else { + release_pending_op(key, op); + } return PJ_TRUE; } - /* If concurrency is disabled, lock the key + /* If concurrency is disabled, lock the key * (and save the lock status to local var since app may change * concurrency setting while in the callback) */ if (key->allow_concurrent == PJ_FALSE) { @@ -1190,7 +1223,43 @@ static pj_bool_t poll_iocp( HANDLE hIocp, DWORD dwTimeout, if (has_lock) { pj_ioqueue_unlock_key(key); } - release_pending_op(key, op); + /* Still invoke write callbacks, see first closing check + * above and #4878. + */ + if (operation == PJ_IOQUEUE_OP_WRITE || + operation == PJ_IOQUEUE_OP_SEND || + operation == PJ_IOQUEUE_OP_SEND_TO) + { + increment_counter(key); + OPKEY_OPERATION(op_key) = 0; + if (key->cb.on_write_complete) { + key->cb.on_write_complete(key, op_key, + -(pj_ssize_t)PJ_ECANCELLED); + } +#if PJ_IOQUEUE_CALLBACK_NO_LOCK + /* Clear write_callback_thread that was set before this + * closing check. Without this, cancel_all_pending_op + * would wait and timeout. See #4878. + */ + pj_ioqueue_lock_key(key); + key->write_callback_thread = NULL; + pj_ioqueue_unlock_key(key); +#endif + release_pending_op(key, op); + decrement_counter(key); + } else { +#if PJ_IOQUEUE_CALLBACK_NO_LOCK + if (operation == PJ_IOQUEUE_OP_READ || + operation == PJ_IOQUEUE_OP_RECV || + operation == PJ_IOQUEUE_OP_RECV_FROM) + { + pj_ioqueue_lock_key(key); + key->read_callback_thread = NULL; + pj_ioqueue_unlock_key(key); + } +#endif + release_pending_op(key, op); + } return PJ_TRUE; } @@ -1348,11 +1417,14 @@ PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key ) //CloseHandle(key->hnd); pj_sock_close((pj_sock_t)key->hnd); - /* Reset callbacks */ + /* Reset callbacks. Keep on_write_complete alive so that cancelled + * overlapped write ops arriving via IOCP can still invoke it for + * resource cleanup (e.g., pjsip_tx_data_dec_ref). See #4878. + */ key->cb.on_accept_complete = NULL; key->cb.on_connect_complete = NULL; key->cb.on_read_complete = NULL; - key->cb.on_write_complete = NULL; + //key->cb.on_write_complete = NULL; /* Even after handle is closed, I suspect that IOCP may still try to * do something with the handle, causing memory corruption when pool diff --git a/pjlib/src/pjlib-test/activesock.c b/pjlib/src/pjlib-test/activesock.c index 98ce4f9688..8465adb456 100644 --- a/pjlib/src/pjlib-test/activesock.c +++ b/pjlib/src/pjlib-test/activesock.c @@ -434,6 +434,150 @@ static int activesock_test1(void) return ret; } +/* + * Test that send callbacks are invoked even after activesock is closed. + * This verifies the fix for #4864/#4878 where pending write callbacks + * were silently dropped, leaking tdata references in upper layers. + */ +struct close_test_state +{ + int cb_count; +}; + +static pj_bool_t close_test_on_data_sent(pj_activesock_t *asock, + pj_ioqueue_op_key_t *op_key, + pj_ssize_t sent) +{ + struct close_test_state *st; + + PJ_UNUSED_ARG(op_key); + PJ_UNUSED_ARG(sent); + + st = (struct close_test_state*)pj_activesock_get_user_data(asock); + if (st) + st->cb_count++; + + return PJ_TRUE; +} + +static int activesock_close_send_cb_test(void) +{ + enum { COUNT = 100 }; + pj_pool_t *pool = NULL; + pj_ioqueue_t *ioqueue = NULL; + pj_sock_t sock1 = PJ_INVALID_SOCKET, sock2 = PJ_INVALID_SOCKET; + pj_activesock_t *asock1 = NULL, *asock2 = NULL; + pj_activesock_cb cb; + struct close_test_state state; + struct send_key op_keys[COUNT]; + pj_time_val delay = {0, 100}; + int pending_count = 0; + unsigned i; + int ret = 0; + pj_status_t status; + + PJ_LOG(3, (THIS_FILE, "..activesock close with pending send CB test")); + + pool = pj_pool_create(mem, "close_send_cb", 256, 256, NULL); + PJ_TEST_NOT_NULL(pool, NULL, return -500); + + PJ_TEST_SUCCESS(app_socketpair(pj_AF_INET(), pj_SOCK_STREAM(), 0, + &sock1, &sock2), + NULL, ERR(-510)); + PJ_TEST_SUCCESS(pj_ioqueue_create(pool, 4, &ioqueue), + NULL, ERR(-520)); + + pj_bzero(&cb, sizeof(cb)); + cb.on_data_sent = &close_test_on_data_sent; + + pj_bzero(&state, sizeof(state)); + PJ_TEST_SUCCESS(pj_activesock_create(pool, sock2, pj_SOCK_STREAM(), + NULL, ioqueue, &cb, &state, + &asock2), + NULL, ERR(-530)); + + /* Create asock1 just to keep sock1 alive (peer side) */ + PJ_TEST_SUCCESS(pj_activesock_create(pool, sock1, pj_SOCK_STREAM(), + NULL, ioqueue, &cb, NULL, &asock1), + NULL, ERR(-535)); + + /* Shrink send buffer so fast-track fails quickly, forcing sends + * through the async ioqueue path. + */ + { + int sndbuf = 1024; + pj_sock_setsockopt(sock2, pj_SOL_SOCKET(), pj_SO_SNDBUF(), + &sndbuf, sizeof(sndbuf)); + } + + /* Send multiple packets without polling, they should queue up */ + for (i = 0; i < COUNT; ++i) { + struct tcp_pkt pkt; + pj_ssize_t len; + + pkt.signature = SIGNATURE; + pkt.seq = i; + pj_memset(pkt.fill, 'a', sizeof(pkt.fill)); + + pj_ioqueue_op_key_init(&op_keys[i].op_key, + sizeof(op_keys[i])); + + len = sizeof(pkt); + status = pj_activesock_send(asock2, &op_keys[i].op_key, + &pkt, &len, 0); + if (status == PJ_EPENDING) { + pending_count++; + } else if (status != PJ_SUCCESS) { + break; + } + } + + /* Close the activesock while sends are pending. The send callbacks + * MUST still be invoked so upper layers can free resources. + */ + pj_activesock_close(asock2); + asock2 = NULL; + + /* Poll to process any remaining ioqueue events. On IOCP, each + * cancelled overlapped op needs a separate poll to dequeue from + * the completion port. Poll until all callbacks received or + * timeout (~20s worst case, but early-break when done). + */ + for (i = 0; i < 200; ++i) { + pj_ioqueue_poll(ioqueue, &delay); + if (state.cb_count >= pending_count && pending_count > 0) + break; + } + + PJ_LOG(3, (THIS_FILE, "...pending=%d, callbacks=%d", + pending_count, state.cb_count)); + + if (pending_count == 0) { + /* All sends completed synchronously (fast-track succeeded). + * The async drain path is not exercised in this run. To force + * async path, set PJ_IOQUEUE_FAST_TRACK=0 in config_site.h. + */ + PJ_LOG(3, (THIS_FILE, "...async path not exercised, skipping")); + } else { + /* All pending sends must have received a callback */ + PJ_TEST_GTE(state.cb_count, pending_count, + "not all pending send callbacks were invoked", + ERR(-540)); + } + +on_return: + if (asock2) + pj_activesock_close(asock2); + if (asock1) + pj_activesock_close(asock1); + if (ioqueue) + pj_ioqueue_destroy(ioqueue); + if (pool) + pj_pool_release(pool); + + return ret; +} + int activesock_test(void) { int rc; @@ -443,6 +587,9 @@ int activesock_test(void) if ((rc=activesock_test1()) != 0) return rc; + if ((rc=activesock_close_send_cb_test()) != 0) + return rc; + return 0; } From 1069078a346baba0522f763b41611fb996a00305 Mon Sep 17 00:00:00 2001 From: nanang Date: Wed, 8 Apr 2026 18:38:41 +0700 Subject: [PATCH 3/3] Fix compile warnings and test failure with PJ_IOQUEUE_FAST_TRACK=0 - Add PJ_UNUSED_ARG for status/sent variables in pj_ioqueue_send(), pj_ioqueue_sendto(), and pj_ioqueue_accept() when fast-track is disabled, fixing MSVC C4101 warnings. - Fix activesock UDP echo test: accept PJ_EPENDING as non-error from sendto() since async path always returns PJ_EPENDING when fast-track is disabled. Also fix error message to print actual send status instead of the recvfrom callback status. Co-Authored-By: Claude Code --- pjlib/src/pj/ioqueue_common_abs.c | 10 +++++++++- pjlib/src/pjlib-test/activesock.c | 4 ++-- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/pjlib/src/pj/ioqueue_common_abs.c b/pjlib/src/pj/ioqueue_common_abs.c index 5b234b11d4..1a123df2c1 100644 --- a/pjlib/src/pj/ioqueue_common_abs.c +++ b/pjlib/src/pj/ioqueue_common_abs.c @@ -1193,6 +1193,9 @@ PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key, } } } +#else + PJ_UNUSED_ARG(status); + PJ_UNUSED_ARG(sent); #endif /* @@ -1218,7 +1221,7 @@ PJ_DEF(pj_status_t) pj_ioqueue_send( pj_ioqueue_key_t *key, * the sending only. If the polling thread runs on lower priority * than the sending thread, then it's possible that the pending * write flag is not cleared in-time because clearing is only done - * during polling. + * during polling. * * Aplication should specify multiple write operation keys on * situation like this. @@ -1345,6 +1348,9 @@ PJ_DEF(pj_status_t) pj_ioqueue_sendto( pj_ioqueue_key_t *key, } } } +#else + PJ_UNUSED_ARG(status); + PJ_UNUSED_ARG(sent); #endif /* @@ -1465,6 +1471,8 @@ PJ_DEF(pj_status_t) pj_ioqueue_accept( pj_ioqueue_key_t *key, } } } +#else + PJ_UNUSED_ARG(status); #endif pj_ioqueue_lock_key(key); diff --git a/pjlib/src/pjlib-test/activesock.c b/pjlib/src/pjlib-test/activesock.c index 8465adb456..3fd09772f7 100644 --- a/pjlib/src/pjlib-test/activesock.c +++ b/pjlib/src/pjlib-test/activesock.c @@ -83,9 +83,9 @@ static pj_bool_t udp_echo_srv_on_data_recvfrom(pj_activesock_t *asock, srv->status = pj_activesock_sendto(asock, &srv->send_key, data, &sent, 0, src_addr, addr_len); - if (srv->status != PJ_SUCCESS) { + if (srv->status != PJ_SUCCESS && srv->status != PJ_EPENDING) { srv->tx_err_cnt++; - udp_echo_err("sendto()", status); + udp_echo_err("sendto()", srv->status); } }