diff --git a/.github/workflows/ci-linux.yml b/.github/workflows/ci-linux.yml index 04e3d770c8..966162a48f 100644 --- a/.github/workflows/ci-linux.yml +++ b/.github/workflows/ci-linux.yml @@ -545,6 +545,57 @@ jobs: name: ${{ runner.os }}-${{ runner.arch }}-${{ github.job }}-${{ github.run_id }} path: artifacts + ioq-no-fast-track: + needs: [detect-changes] + if: github.event_name == 'push' || needs.detect-changes.outputs.pjlib_deps == 'true' + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + include: + - name: select+ossl + config_site: | + #define PJ_TODO(x) + #define PJ_IOQUEUE_FAST_TRACK 0 + #define PJ_IOQUEUE_IMP PJ_IOQUEUE_IMP_SELECT + configure_args: "" + install_deps: "" + - name: epoll+gtls + config_site: | + #define PJ_TODO(x) + #define PJ_IOQUEUE_FAST_TRACK 0 + configure_args: "--with-gnutls=/usr/" + install_deps: "sudo apt-get update && sudo apt-get install -y libgnutls28-dev" + name: ioq-no-fast-track / ${{ matrix.name }} + steps: + - uses: actions/checkout@v2 + - name: install cirunner + run: | + git clone --depth 1 https://github.com/pjsip/cirunner.git + cirunner/installlinux.sh + - name: install dependencies + if: matrix.install_deps != '' + run: ${{ matrix.install_deps }} + - name: config site + run: | + cat > pjlib/include/pj/config_site.h << 'SITE_EOF' + ${{ matrix.config_site }} + SITE_EOF + - name: configure + run: CFLAGS="-g -fsanitize=address" LDFLAGS="-rdynamic -fsanitize=address" ./configure ${{ matrix.configure_args }} + - name: make + run: $MAKE_FAST + - name: pjlib ioqueue, activesock, ssl_sock test + run: | + export LSAN_OPTIONS="suppressions=$GITHUB_WORKSPACE/tests/sanitizers/lsan.supp" + cd pjlib/build && ../bin/pjlib-test-`make -s -C ../.. infotarget` $CI_MODE $CI_ARGS udp_ioqueue_test tcp_ioqueue_test udp_ioqueue_unreg_test activesock_test ssl_sock_test ssl_sock_stress_test + - name: upload artifacts on failure + if: ${{ failure() }} + uses: actions/upload-artifact@v4 + with: + name: ${{ runner.os }}-${{ runner.arch }}-${{ github.job }}-${{ matrix.name }}-${{ github.run_id }} + path: artifacts + cmake-build: runs-on: ubuntu-latest name: CMake / ${{ matrix.build_type }} diff --git a/pjlib/CMakeLists.txt b/pjlib/CMakeLists.txt index 8408db093c..1adfb87fb4 100644 --- a/pjlib/CMakeLists.txt +++ b/pjlib/CMakeLists.txt @@ -729,6 +729,7 @@ if(BUILD_TESTING) src/pjlib-test/sock.c src/pjlib-test/sock_perf.c src/pjlib-test/ssl_sock.c + src/pjlib-test/ssl_sock_stress.c src/pjlib-test/string.c src/pjlib-test/test.c src/pjlib-test/thread.c diff --git a/pjlib/build/Makefile b/pjlib/build/Makefile index aa5ce5f4a0..43d645f904 100644 --- a/pjlib/build/Makefile +++ b/pjlib/build/Makefile @@ -51,7 +51,7 @@ export TEST_OBJS += activesock.o atomic.o atomic_slist.o \ fifobuf.o file.o hash_test.o ioq_perf.o ioq_udp.o \ ioq_stress_test.o ioq_unreg.o ioq_tcp.o ioq_iocp_unreg_test.o \ list.o mutex.o os.o pool.o pool_perf.o rand.o rbtree.o \ - select.o sleep.o sock.o sock_perf.o ssl_sock.o \ + select.o sleep.o sock.o sock_perf.o ssl_sock.o ssl_sock_stress.o \ string.o test.o thread.o timer.o timestamp.o \ udp_echo_srv_sync.o udp_echo_srv_ioqueue.o \ unittest_test.o util.o diff --git a/pjlib/build/pjlib_test.vcxproj b/pjlib/build/pjlib_test.vcxproj index 71007d561f..d1cf410b1b 100644 --- a/pjlib/build/pjlib_test.vcxproj +++ b/pjlib/build/pjlib_test.vcxproj @@ -789,6 +789,7 @@ + diff --git a/pjlib/build/pjlib_test.vcxproj.filters b/pjlib/build/pjlib_test.vcxproj.filters index 985fa4db85..e493983a89 100644 --- a/pjlib/build/pjlib_test.vcxproj.filters +++ b/pjlib/build/pjlib_test.vcxproj.filters @@ -93,6 +93,9 @@ Source Files + + Source Files + Source Files diff --git a/pjlib/src/pj/ssl_sock_apple.m b/pjlib/src/pj/ssl_sock_apple.m index 7570ccdcb2..c0a135756e 100644 --- a/pjlib/src/pj/ssl_sock_apple.m +++ b/pjlib/src/pj/ssl_sock_apple.m @@ -1135,12 +1135,12 @@ static pj_status_t network_setup_connection(pj_ssl_sock_t *ssock, pj_status_t status; /* Initialize input circular buffer */ - status = circ_init(ssock->pool->factory, &ssock->circ_buf_input, 8192); + status = circ_init(ssock->pool->factory, &ssock->ssl_read_buf, 8192); if (status != PJ_SUCCESS) return status; /* Initialize output circular buffer */ - status = circ_init(ssock->pool->factory, &ssock->circ_buf_output, 8192); + status = circ_init(ssock->pool->factory, &ssock->ssl_write_buf, 8192); if (status != PJ_SUCCESS) return status; @@ -1525,8 +1525,8 @@ static void ssl_destroy(pj_ssl_sock_t *ssock) } /* Destroy circular buffers */ - circ_deinit(&ssock->circ_buf_input); - circ_deinit(&ssock->circ_buf_output); + circ_deinit(&ssock->ssl_read_buf); + circ_deinit(&ssock->ssl_write_buf); PJ_LOG(4, (THIS_FILE, "SSL %p destroyed", ssock)); } @@ -1535,9 +1535,9 @@ static void ssl_destroy(pj_ssl_sock_t *ssock) /* Reset socket state. */ static void ssl_reset_sock_state(pj_ssl_sock_t *ssock) { - pj_lock_acquire(ssock->circ_buf_output_mutex); + pj_lock_acquire(ssock->ssl_write_buf_mutex); ssock->ssl_state = SSL_STATE_NULL; - pj_lock_release(ssock->circ_buf_output_mutex); + pj_lock_release(ssock->ssl_write_buf_mutex); #if SSL_DEBUG PJ_LOG(3, (THIS_FILE, "SSL reset sock state %p", ssock)); @@ -2170,20 +2170,20 @@ static pj_status_t ssl_read(pj_ssl_sock_t *ssock, void *data, int *size) { pj_size_t circ_buf_size, read_size; - pj_lock_acquire(ssock->circ_buf_input_mutex); + pj_lock_acquire(ssock->ssl_read_buf_mutex); - if (circ_empty(&ssock->circ_buf_input)) { - pj_lock_release(ssock->circ_buf_input_mutex); + if (circ_empty(&ssock->ssl_read_buf)) { + pj_lock_release(ssock->ssl_read_buf_mutex); *size = 0; return PJ_SUCCESS; } - circ_buf_size = circ_size(&ssock->circ_buf_input); + circ_buf_size = circ_size(&ssock->ssl_read_buf); read_size = PJ_MIN(circ_buf_size, (pj_size_t)*size); - circ_read(&ssock->circ_buf_input, data, read_size); + circ_read(&ssock->ssl_read_buf, data, read_size); - pj_lock_release(ssock->circ_buf_input_mutex); + pj_lock_release(ssock->ssl_read_buf_mutex); *size = read_size; @@ -2192,14 +2192,14 @@ static pj_status_t ssl_read(pj_ssl_sock_t *ssock, void *data, int *size) /* * Write the plain data to buffer. It will be encrypted later during - * sending. + * sending. Caller must hold ssock->write_mutex. */ static pj_status_t ssl_write(pj_ssl_sock_t *ssock, const void *data, pj_ssize_t size, int *nwritten) { pj_status_t status; - status = circ_write(&ssock->circ_buf_output, data, size); + status = circ_write(&ssock->ssl_write_buf, data, size); *nwritten = (status == PJ_SUCCESS)? (int)size: 0; return status; diff --git a/pjlib/src/pj/ssl_sock_darwin.c b/pjlib/src/pj/ssl_sock_darwin.c index 92f39e3c03..6e84295837 100644 --- a/pjlib/src/pj/ssl_sock_darwin.c +++ b/pjlib/src/pj/ssl_sock_darwin.c @@ -109,7 +109,7 @@ static OSStatus SocketWrite(SSLConnectionRef connection, pj_size_t len = *dataLength; pj_lock_acquire(ssock->write_mutex); - if (circ_write(&ssock->circ_buf_output, data, len) != PJ_SUCCESS) { + if (circ_write(&ssock->ssl_write_buf, data, len) != PJ_SUCCESS) { pj_lock_release(ssock->write_mutex); *dataLength = 0; return errSSLInternal; @@ -128,22 +128,22 @@ static OSStatus SocketRead(SSLConnectionRef connection, pj_ssl_sock_t *ssock = (pj_ssl_sock_t *)connection; pj_size_t len = *dataLength; - pj_lock_acquire(ssock->circ_buf_input_mutex); + pj_lock_acquire(ssock->ssl_read_buf_mutex); - if (circ_empty(&ssock->circ_buf_input)) { - pj_lock_release(ssock->circ_buf_input_mutex); + if (circ_empty(&ssock->ssl_read_buf)) { + pj_lock_release(ssock->ssl_read_buf_mutex); /* Data buffers not yet filled */ *dataLength = 0; return errSSLWouldBlock; } - pj_size_t circ_buf_size = circ_size(&ssock->circ_buf_input); + pj_size_t circ_buf_size = circ_size(&ssock->ssl_read_buf); pj_size_t read_size = PJ_MIN(circ_buf_size, len); - circ_read(&ssock->circ_buf_input, data, read_size); + circ_read(&ssock->ssl_read_buf, data, read_size); - pj_lock_release(ssock->circ_buf_input_mutex); + pj_lock_release(ssock->ssl_read_buf_mutex); *dataLength = read_size; @@ -377,12 +377,12 @@ static pj_status_t ssl_create(pj_ssl_sock_t *ssock) pj_status_t status; /* Initialize input circular buffer */ - status = circ_init(ssock->pool->factory, &ssock->circ_buf_input, 8192); + status = circ_init(ssock->pool->factory, &ssock->ssl_read_buf, 8192); if (status != PJ_SUCCESS) return status; /* Initialize output circular buffer */ - status = circ_init(ssock->pool->factory, &ssock->circ_buf_output, 8192); + status = circ_init(ssock->pool->factory, &ssock->ssl_write_buf, 8192); if (status != PJ_SUCCESS) return status; @@ -511,17 +511,17 @@ static void ssl_destroy(pj_ssl_sock_t *ssock) } /* Destroy circular buffers */ - circ_deinit(&ssock->circ_buf_input); - circ_deinit(&ssock->circ_buf_output); + circ_deinit(&ssock->ssl_read_buf); + circ_deinit(&ssock->ssl_write_buf); } /* Reset socket state. */ static void ssl_reset_sock_state(pj_ssl_sock_t *ssock) { - pj_lock_acquire(ssock->circ_buf_output_mutex); + pj_lock_acquire(ssock->ssl_write_buf_mutex); ssock->ssl_state = SSL_STATE_NULL; - pj_lock_release(ssock->circ_buf_output_mutex); + pj_lock_release(ssock->ssl_write_buf_mutex); ssl_close_sockets(ssock); } @@ -1366,11 +1366,6 @@ static pj_status_t ssl_do_handshake(pj_ssl_sock_t *ssock) } pj_lock_release(ssock->write_mutex); - status = flush_circ_buf_output(ssock, &ssock->handshake_op_key, 0, 0); - if (status != PJ_SUCCESS && status != PJ_EPENDING) { - return status; - } - if (ret == noErr) { /* Handshake has been completed */ ssock->ssl_state = SSL_STATE_ESTABLISHED; @@ -1402,6 +1397,7 @@ static pj_status_t ssl_read(pj_ssl_sock_t *ssock, void *data, int *size) /* * Write the plain data to Darwin SSL, it will be encrypted by SSLWrite() * and call SocketWrite. + * Caller must hold ssock->write_mutex. */ static pj_status_t ssl_write(pj_ssl_sock_t *ssock, const void *data, pj_ssize_t size, int *nwritten) diff --git a/pjlib/src/pj/ssl_sock_gtls.c b/pjlib/src/pj/ssl_sock_gtls.c index fc2815057d..37b3651beb 100644 --- a/pjlib/src/pj/ssl_sock_gtls.c +++ b/pjlib/src/pj/ssl_sock_gtls.c @@ -204,19 +204,40 @@ static void tls_print_logs(int level, const char* msg) /* Initialize GnuTLS. */ +static int tls_init_count; +static void tls_deinit(void); + static pj_status_t tls_init(void) { + pj_status_t status; + int ret; + + if (tls_init_count == 1) + return PJ_SUCCESS; + + if (tls_init_count == -1) { + /* Another thread is initializing. Spin briefly. */ + int retry; + for (retry = 0; retry < 100 && tls_init_count == -1; ++retry) + pj_thread_sleep(10); + return (tls_init_count == 1) ? PJ_SUCCESS : PJ_EBUSY; + } + + tls_init_count = -1; + /* Register error subsystem */ - pj_status_t status = pj_register_strerror(PJ_ERRNO_START_USER + + status = pj_register_strerror(PJ_ERRNO_START_USER + PJ_ERRNO_SPACE_SIZE * 6, PJ_ERRNO_SPACE_SIZE, &tls_strerror); pj_assert(status == PJ_SUCCESS); /* Init GnuTLS library */ - int ret = gnutls_global_init(); - if (ret < 0) + ret = gnutls_global_init(); + if (ret < 0) { + tls_init_count = 0; return tls_status_from_err(NULL, ret); + } gnutls_global_set_log_level(GNUTLS_LOG_LEVEL); gnutls_global_set_log_function(tls_print_logs); @@ -246,6 +267,14 @@ static pj_status_t tls_init(void) ssl_cipher_num = i; } + /* Schedule cleanup at pj_shutdown() */ + status = pj_atexit(&tls_deinit); + if (status != PJ_SUCCESS) { + PJ_PERROR(2, ("gtls", status, + "Failed to register GnuTLS cleanup")); + } + + tls_init_count = 1; return PJ_SUCCESS; } @@ -253,7 +282,10 @@ static pj_status_t tls_init(void) /* Shutdown GnuTLS */ static void tls_deinit(void) { - gnutls_global_deinit(); + if (tls_init_count == 1) { + gnutls_global_deinit(); + tls_init_count = 0; + } } @@ -356,15 +388,15 @@ static pj_ssize_t tls_data_push(gnutls_transport_ptr_t ptr, pj_ssl_sock_t *ssock = (pj_ssl_sock_t *)ptr; gnutls_sock_t *gssock = (gnutls_sock_t *)ssock; - pj_lock_acquire(ssock->circ_buf_output_mutex); - if (circ_write(&ssock->circ_buf_output, data, len) != PJ_SUCCESS) { - pj_lock_release(ssock->circ_buf_output_mutex); + pj_lock_acquire(ssock->ssl_write_buf_mutex); + if (circ_write(&ssock->ssl_write_buf, data, len) != PJ_SUCCESS) { + pj_lock_release(ssock->ssl_write_buf_mutex); gnutls_transport_set_errno(gssock->session, ENOMEM); return -1; } - pj_lock_release(ssock->circ_buf_output_mutex); + pj_lock_release(ssock->ssl_write_buf_mutex); return len; } @@ -378,22 +410,22 @@ static pj_ssize_t tls_data_pull(gnutls_transport_ptr_t ptr, pj_ssl_sock_t *ssock = (pj_ssl_sock_t *)ptr; gnutls_sock_t *gssock = (gnutls_sock_t *)ssock; - pj_lock_acquire(ssock->circ_buf_input_mutex); + pj_lock_acquire(ssock->ssl_read_buf_mutex); - if (circ_empty(&ssock->circ_buf_input)) { - pj_lock_release(ssock->circ_buf_input_mutex); + if (circ_empty(&ssock->ssl_read_buf)) { + pj_lock_release(ssock->ssl_read_buf_mutex); /* Data buffers not yet filled */ gnutls_transport_set_errno(gssock->session, EAGAIN); return -1; } - pj_size_t circ_buf_size = circ_size(&ssock->circ_buf_input); + pj_size_t circ_buf_size = circ_size(&ssock->ssl_read_buf); pj_size_t read_size = PJ_MIN(circ_buf_size, len); - circ_read(&ssock->circ_buf_input, data, read_size); + circ_read(&ssock->ssl_read_buf, data, read_size); - pj_lock_release(ssock->circ_buf_input_mutex); + pj_lock_release(ssock->ssl_read_buf_mutex); return read_size; } @@ -656,15 +688,12 @@ static pj_status_t ssl_create(pj_ssl_sock_t *ssock) cert = ssock->cert; - /* Even if reopening is harmless, having one instance only simplifies - * deallocating it later on */ - if (!gssock->tls_init_count) { - gssock->tls_init_count++; - ret = tls_init(); - if (ret < 0) - return ret; - } else - return PJ_SUCCESS; + /* Initialize GnuTLS library (idempotent) */ + { + pj_status_t status = tls_init(); + if (status != PJ_SUCCESS) + return status; + } /* Start this socket session */ ret = gnutls_init(&gssock->session, ssock->is_server ? GNUTLS_SERVER @@ -680,12 +709,12 @@ static pj_status_t ssl_create(pj_ssl_sock_t *ssock) (gnutls_transport_ptr_t) (uintptr_t) ssock); /* Initialize input circular buffer */ - status = circ_init(ssock->pool->factory, &ssock->circ_buf_input, 512); + status = circ_init(ssock->pool->factory, &ssock->ssl_read_buf, 512); if (status != PJ_SUCCESS) return status; /* Initialize output circular buffer */ - status = circ_init(ssock->pool->factory, &ssock->circ_buf_output, 512); + status = circ_init(ssock->pool->factory, &ssock->ssl_write_buf, 512); if (status != PJ_SUCCESS) return status; @@ -842,24 +871,21 @@ static void ssl_destroy(pj_ssl_sock_t *ssock) gssock->xcred = NULL; } - /* Free GnuTLS library */ - if (gssock->tls_init_count) { - gssock->tls_init_count--; - tls_deinit(); - } + /* Note: GnuTLS library stays initialized for the process lifetime. + * tls_deinit() is called via pj_atexit() at pj_shutdown(). */ /* Destroy circular buffers */ - circ_deinit(&ssock->circ_buf_input); - circ_deinit(&ssock->circ_buf_output); + circ_deinit(&ssock->ssl_read_buf); + circ_deinit(&ssock->ssl_write_buf); } /* Reset socket state. */ static void ssl_reset_sock_state(pj_ssl_sock_t *ssock) { - pj_lock_acquire(ssock->circ_buf_output_mutex); + pj_lock_acquire(ssock->ssl_write_buf_mutex); ssock->ssl_state = SSL_STATE_NULL; - pj_lock_release(ssock->circ_buf_output_mutex); + pj_lock_release(ssock->ssl_write_buf_mutex); ssl_close_sockets(ssock); @@ -870,8 +896,12 @@ static void ssl_reset_sock_state(pj_ssl_sock_t *ssock) static void ssl_ciphers_populate(void) { if (!ssl_cipher_num) { - tls_init(); - tls_deinit(); + pj_status_t status = tls_init(); + if (status != PJ_SUCCESS) { + PJ_PERROR(1, ("gtls", status, "Failed to initialize GnuTLS")); + return; + } + /* GnuTLS stays initialized — no tls_deinit() here */ } } @@ -1157,10 +1187,6 @@ static pj_status_t ssl_do_handshake(pj_ssl_sock_t *ssock) /* Perform SSL handshake */ ret = gnutls_handshake(gssock->session); - status = flush_circ_buf_output(ssock, &ssock->handshake_op_key, 0, 0); - if (status != PJ_SUCCESS) - return status; - if (ret == GNUTLS_E_SUCCESS) { /* System are GO */ ssock->ssl_state = SSL_STATE_ESTABLISHED; @@ -1210,6 +1236,7 @@ static pj_status_t ssl_read(pj_ssl_sock_t *ssock, void *data, int *size) * Write the plain data to GnuTLS, it will be encrypted by gnutls_record_send() * and sent via tls_data_push. Note that re-negotitation may be on progress, so * sending data should be delayed until re-negotiation is completed. + * Caller must hold ssock->write_mutex. */ static pj_status_t ssl_write(pj_ssl_sock_t *ssock, const void *data, pj_ssize_t size, int *nwritten) @@ -1226,7 +1253,7 @@ static pj_status_t ssl_write(pj_ssl_sock_t *ssock, const void *data, while (total_written < size) { /* Try encrypting using GnuTLS */ nwritten_ = gnutls_record_send(gssock->session, - ((read_data_t *)data) + total_written, + ((char *)data) + total_written, size - total_written); if (nwritten_ > 0) { diff --git a/pjlib/src/pj/ssl_sock_imp_common.c b/pjlib/src/pj/ssl_sock_imp_common.c index 077c648b22..0bf382ab7a 100644 --- a/pjlib/src/pj/ssl_sock_imp_common.c +++ b/pjlib/src/pj/ssl_sock_imp_common.c @@ -439,192 +439,120 @@ static pj_bool_t on_handshake_complete(pj_ssl_sock_t *ssock, return PJ_TRUE; } -static write_data_t* alloc_send_data(pj_ssl_sock_t *ssock, pj_size_t len) +static ssl_send_op_t* alloc_send_op(pj_ssl_sock_t *ssock, pj_size_t enc_len) { - send_buf_t *send_buf = &ssock->send_buf; - pj_size_t avail_len, skipped_len = 0; - char *reg1, *reg2; - pj_size_t reg1_len, reg2_len; - write_data_t *p; - - /* Check buffer availability */ - avail_len = send_buf->max_len - send_buf->len; - if (avail_len < len) - return NULL; - - /* If buffer empty, reset start pointer and return it */ - if (send_buf->len == 0) { - send_buf->start = send_buf->buf; - send_buf->len = len; - p = (write_data_t*)send_buf->start; - goto init_send_data; - } - - /* Free space may be wrapped/splitted into two regions, so let's - * analyze them if any region can hold the write data. - */ - reg1 = send_buf->start + send_buf->len; - if (reg1 >= send_buf->buf + send_buf->max_len) - reg1 -= send_buf->max_len; - reg1_len = send_buf->max_len - send_buf->len; - if (reg1 + reg1_len > send_buf->buf + send_buf->max_len) { - reg1_len = send_buf->buf + send_buf->max_len - reg1; - reg2 = send_buf->buf; - reg2_len = send_buf->start - send_buf->buf; - } else { - reg2 = NULL; - reg2_len = 0; + ssl_send_op_t *op; + pj_pool_t *op_pool; + pj_size_t alloc_len; + + /* Apply minimum buffer size for better reuse */ + if (enc_len < PJ_SSL_SEND_OP_MIN_BUF_SIZE) + enc_len = PJ_SSL_SEND_OP_MIN_BUF_SIZE; + + /* Scan free list for first op with sufficient capacity */ + if (!pj_list_empty(&ssock->send_op_free)) { + ssl_send_op_t *p = ssock->send_op_free.next; + while (p != &ssock->send_op_free) { + if (p->enc_buf_cap >= enc_len) { + pj_list_erase(p); + ssock->send_op_free_cnt--; + return p; + } + p = p->next; + } + /* No suitable op found, allocate new below */ } - /* More buffer availability check, note that the write data must be in - * a contigue buffer. + /* Allocate new op + embedded buffer from its own pool. + * Each op has its own pool so it can be truly freed when discarded + * from the free list (cap exceeded). */ - avail_len = PJ_MAX(reg1_len, reg2_len); - if (avail_len < len) + alloc_len = sizeof(ssl_send_op_t) - 1 + enc_len; + op_pool = pj_pool_create(ssock->pool->factory, "ssl_sop", + alloc_len + 256, 0, NULL); + if (!op_pool) return NULL; - /* Get the data slot */ - if (reg1_len >= len) { - p = (write_data_t*)reg1; - } else { - p = (write_data_t*)reg2; - skipped_len = reg1_len; + op = (ssl_send_op_t *)pj_pool_alloc(op_pool, alloc_len); + if (!op) { + pj_pool_release(op_pool); + return NULL; } - - /* Update buffer length */ - send_buf->len += len + skipped_len; - -init_send_data: - /* Init the new send data */ - pj_bzero(p, sizeof(*p)); - pj_list_init(p); - pj_list_push_back(&ssock->send_pending, p); - - return p; + pj_bzero(op, sizeof(ssl_send_op_t) - 1); + op->pool = op_pool; + op->enc_buf_cap = enc_len; + return op; } -static void free_send_data(pj_ssl_sock_t *ssock, write_data_t *wdata) +static void free_send_op(pj_ssl_sock_t *ssock, ssl_send_op_t *op) { - send_buf_t *buf = &ssock->send_buf; - write_data_t *spl = &ssock->send_pending; + pj_list_erase(op); - pj_assert(!pj_list_empty(&ssock->send_pending)); - - /* Free slot from the buffer */ - if (spl->next == wdata && spl->prev == wdata) { - /* This is the only data, reset the buffer */ - buf->start = buf->buf; - buf->len = 0; - } else if (spl->next == wdata) { - /* This is the first data, shift start pointer of the buffer and - * adjust the buffer length. - */ - buf->start = (char*)wdata->next; - if (wdata->next > wdata) { - buf->len -= ((char*)wdata->next - buf->start); - } else { - /* Overlapped */ - pj_size_t right_len, left_len; - right_len = buf->buf + buf->max_len - (char*)wdata; - left_len = (char*)wdata->next - buf->buf; - buf->len -= (right_len + left_len); - } - } else if (spl->prev == wdata) { - /* This is the last data, just adjust the buffer length */ - if (wdata->prev < wdata) { - pj_size_t jump_len; - jump_len = (char*)wdata - - ((char*)wdata->prev + wdata->prev->record_len); - buf->len -= (wdata->record_len + jump_len); - } else { - /* Overlapped */ - pj_size_t right_len, left_len; - right_len = buf->buf + buf->max_len - - ((char*)wdata->prev + wdata->prev->record_len); - left_len = (char*)wdata + wdata->record_len - buf->buf; - buf->len -= (right_len + left_len); - } + if (ssock->send_op_free_cnt < PJ_SSL_SEND_OP_FREE_LIST_MAX) { + pj_list_push_back(&ssock->send_op_free, op); + ssock->send_op_free_cnt++; + } else { + /* Free list full, release the pool to truly free memory */ + pj_pool_release(op->pool); } - /* For data in the middle buffer, just do nothing on the buffer. The slot - * will be freed later when freeing the first/last data. - */ - - /* Remove the data from send pending list */ - pj_list_erase(wdata); } -/* Flush write circular buffer to network socket. */ -static pj_status_t flush_circ_buf_output(pj_ssl_sock_t *ssock, - pj_ioqueue_op_key_t *send_key, - pj_size_t orig_len, unsigned flags) +/* Flush SSL write buffer to network socket. */ +static pj_status_t flush_ssl_write_buf(pj_ssl_sock_t *ssock, + pj_ioqueue_op_key_t *send_key, + pj_size_t orig_len, unsigned flags) { pj_ssize_t len; - write_data_t *wdata; - pj_size_t needed_len; + ssl_send_op_t *op; pj_status_t status; pj_lock_acquire(ssock->write_mutex); - /* Check if there is data in the circular buffer, flush it if any */ - if (io_empty(ssock, &ssock->circ_buf_output)) { + /* Check if there is data in the SSL write buffer, flush it if any */ + if (io_empty(ssock, &ssock->ssl_write_buf)) { pj_lock_release(ssock->write_mutex); return PJ_SUCCESS; } /* Get data and its length */ - len = io_size(ssock, &ssock->circ_buf_output); + len = io_size(ssock, &ssock->ssl_write_buf); if (len == 0) { pj_lock_release(ssock->write_mutex); return PJ_SUCCESS; } - /* Calculate buffer size needed, and align it to 8 */ - needed_len = len + sizeof(write_data_t); - needed_len = ((needed_len + 7) >> 3) << 3; - - /* Allocate buffer for send data */ - wdata = alloc_send_data(ssock, needed_len); - if (wdata == NULL) { - /* Oops, the send buffer is full, let's just - * queue it for sending and return PJ_EPENDING. - */ - ssock->send_buf_pending.data_len = needed_len; - ssock->send_buf_pending.app_key = send_key; - ssock->send_buf_pending.flags = flags; - ssock->send_buf_pending.plain_data_len = orig_len; + /* Allocate send op */ + op = alloc_send_op(ssock, len); + if (!op) { pj_lock_release(ssock->write_mutex); - return PJ_EPENDING; + return PJ_ENOMEM; } + pj_ioqueue_op_key_init(&op->key, sizeof(pj_ioqueue_op_key_t)); + op->key.user_data = op; + op->app_key = send_key; + op->plain_data_len = orig_len; + op->enc_len = len; + io_read(ssock, &ssock->ssl_write_buf, (pj_uint8_t *)op->enc_data, len); - /* Copy the data and set its properties into the send data */ - pj_ioqueue_op_key_init(&wdata->key, sizeof(pj_ioqueue_op_key_t)); - wdata->key.user_data = wdata; - wdata->app_key = send_key; - wdata->record_len = needed_len; - wdata->data_len = len; - wdata->plain_data_len = orig_len; - wdata->flags = flags; - io_read(ssock, &ssock->circ_buf_output, (pj_uint8_t *)&wdata->data, len); + /* Track in active list */ + pj_list_push_back(&ssock->send_op_active, op); - /* Ticket #4533: Lock before write_mutex release, make sure send order is correct */ + /* Ticket #4533: Lock before write_mutex release for send order */ pj_lock_acquire(ssock->asock_send_mutex); /* Ticket #1573: Don't hold mutex while calling PJLIB socket send(). */ pj_lock_release(ssock->write_mutex); - /* Send it */ + /* Send encrypted data */ #ifdef SSL_SOCK_IMP_USE_OWN_NETWORK - status = network_send(ssock, &wdata->key, wdata->data.content, &len, - flags); + status = network_send(ssock, &op->key, op->enc_data, &len, flags); #else if (ssock->param.sock_type == pj_SOCK_STREAM()) { - status = pj_activesock_send(ssock->asock, &wdata->key, - wdata->data.content, &len, - flags); + status = pj_activesock_send(ssock->asock, &op->key, + op->enc_data, &len, flags); } else { - status = pj_activesock_sendto(ssock->asock, &wdata->key, - wdata->data.content, &len, - flags, + status = pj_activesock_sendto(ssock->asock, &op->key, + op->enc_data, &len, flags, (pj_sockaddr_t*)&ssock->rem_addr, ssock->addr_len); } @@ -633,74 +561,45 @@ static pj_status_t flush_circ_buf_output(pj_ssl_sock_t *ssock, pj_lock_release(ssock->asock_send_mutex); if (status != PJ_EPENDING) { - /* When the sending is not pending, remove the wdata from send - * pending list. - */ + /* Send completed (success or error), recycle the send op. */ pj_lock_acquire(ssock->write_mutex); - free_send_data(ssock, wdata); + free_send_op(ssock, op); pj_lock_release(ssock->write_mutex); + } return status; } -#if 0 -/* Just for testing send buffer alloc/free */ -#include -pj_status_t pj_ssl_sock_ossl_test_send_buf(pj_pool_t *pool) +/* ssl_do_handshake_and_flush: common wrapper that calls the backend's + * ssl_do_handshake() then flushes any handshake data from ssl_write_buf. + * This unifies error handling across all SSL backends. + */ +static pj_status_t ssl_do_handshake_and_flush(pj_ssl_sock_t *ssock) { - enum { MAX_CHUNK_NUM = 20 }; - unsigned chunk_size, chunk_cnt, i; - write_data_t *wdata[MAX_CHUNK_NUM] = {0}; - pj_time_val now; - pj_ssl_sock_t *ssock = NULL; - pj_ssl_sock_param param; pj_status_t status; + pj_status_t flush_status; - pj_gettimeofday(&now); - pj_srand((unsigned)now.sec); - - pj_ssl_sock_param_default(¶m); - status = pj_ssl_sock_create(pool, ¶m, &ssock); - if (status != PJ_SUCCESS) { - return status; - } - - if (ssock->send_buf.max_len == 0) { - ssock->send_buf.buf = (char*) - pj_pool_alloc(ssock->pool, - ssock->param.send_buffer_size); - ssock->send_buf.max_len = ssock->param.send_buffer_size; - ssock->send_buf.start = ssock->send_buf.buf; - ssock->send_buf.len = 0; - } - - chunk_size = ssock->param.send_buffer_size / MAX_CHUNK_NUM / 2; - chunk_cnt = 0; - for (i = 0; i < MAX_CHUNK_NUM; i++) { - wdata[i] = alloc_send_data(ssock, pj_rand() % chunk_size + 321); - if (wdata[i]) - chunk_cnt++; - else - break; - } + status = ssl_do_handshake(ssock); - while (chunk_cnt) { - i = pj_rand() % MAX_CHUNK_NUM; - if (wdata[i]) { - free_send_data(ssock, wdata[i]); - wdata[i] = NULL; - chunk_cnt--; + /* Flush any handshake data produced by the SSL library. + * Must flush unconditionally — even on handshake failure, the SSL + * library may have queued error alert records that should be sent + * to the peer. + */ + flush_status = flush_ssl_write_buf(ssock, + &ssock->handshake_op_key, 0, 0); + if (status == PJ_SUCCESS || status == PJ_EPENDING) { + /* Only override status with flush error when handshake itself + * was still in progress. Don't mask handshake failure. + */ + if (flush_status != PJ_SUCCESS && flush_status != PJ_EPENDING) { + status = flush_status; } } - if (ssock->send_buf.len != 0) - status = PJ_EBUG; - - pj_ssl_sock_close(ssock); return status; } -#endif static void on_timer(pj_timer_heap_t *th, struct pj_timer_entry *te) { @@ -733,14 +632,28 @@ static void ssl_on_destroy(void *arg) ssl_destroy(ssock); - if (ssock->circ_buf_input_mutex) { - pj_lock_destroy(ssock->circ_buf_input_mutex); - ssock->circ_buf_input_mutex = NULL; + /* Release all send op pools (each op has its own pool) */ + while (!pj_list_empty(&ssock->send_op_free)) { + ssl_send_op_t *op = ssock->send_op_free.next; + pj_list_erase(op); + pj_pool_release(op->pool); + } + ssock->send_op_free_cnt = 0; + + while (!pj_list_empty(&ssock->send_op_active)) { + ssl_send_op_t *op = ssock->send_op_active.next; + pj_list_erase(op); + pj_pool_release(op->pool); + } + + if (ssock->ssl_read_buf_mutex) { + pj_lock_destroy(ssock->ssl_read_buf_mutex); + ssock->ssl_read_buf_mutex = NULL; } - if (ssock->circ_buf_output_mutex) { - pj_lock_destroy(ssock->circ_buf_output_mutex); - ssock->circ_buf_output_mutex = NULL; + if (ssock->ssl_write_buf_mutex) { + pj_lock_destroy(ssock->ssl_write_buf_mutex); + ssock->ssl_write_buf_mutex = NULL; ssock->write_mutex = NULL; } @@ -788,11 +701,11 @@ static pj_bool_t ssock_on_data_read (pj_ssl_sock_t *ssock, pj_status_t status_; /* Consume the whole data */ - if (ssock->circ_buf_input_mutex) - pj_lock_acquire(ssock->circ_buf_input_mutex); - status_ = io_write(ssock,&ssock->circ_buf_input, data, size); - if (ssock->circ_buf_input_mutex) - pj_lock_release(ssock->circ_buf_input_mutex); + if (ssock->ssl_read_buf_mutex) + pj_lock_acquire(ssock->ssl_read_buf_mutex); + status_ = io_write(ssock,&ssock->ssl_read_buf, data, size); + if (ssock->ssl_read_buf_mutex) + pj_lock_release(ssock->ssl_read_buf_mutex); if (status_ != PJ_SUCCESS) { status = status_; goto on_error; @@ -804,7 +717,7 @@ static pj_bool_t ssock_on_data_read (pj_ssl_sock_t *ssock, pj_bool_t ret = PJ_TRUE; if (status == PJ_SUCCESS) - status = ssl_do_handshake(ssock); + status = ssl_do_handshake_and_flush(ssock); /* Not pending is either success or failed */ if (status != PJ_EPENDING) @@ -859,7 +772,7 @@ static pj_bool_t ssock_on_data_read (pj_ssl_sock_t *ssock, } else if (status_ == PJ_SUCCESS) { break; } else if (status_ == PJ_ETRYAGAIN) { - status = ssl_do_handshake(ssock); + status = ssl_do_handshake_and_flush(ssock); if (status == PJ_SUCCESS) { /* Renegotiation completed */ @@ -926,32 +839,34 @@ static pj_bool_t ssock_on_data_sent (pj_ssl_sock_t *ssock, pj_ioqueue_op_key_t *send_key, pj_ssize_t sent) { - write_data_t *wdata = (write_data_t*)send_key->user_data; - pj_ioqueue_op_key_t *app_key = wdata->app_key; + ssl_send_op_t *op = (ssl_send_op_t *)send_key->user_data; + pj_ioqueue_op_key_t *app_key = op->app_key; pj_ssize_t sent_len; - sent_len = (sent > 0)? (pj_ssize_t)wdata->plain_data_len : sent; + sent_len = (sent > 0) ? (pj_ssize_t)op->plain_data_len : sent; - /* Update write buffer state */ + /* Free the send op */ pj_lock_acquire(ssock->write_mutex); - free_send_data(ssock, wdata); + free_send_op(ssock, op); pj_lock_release(ssock->write_mutex); - wdata = NULL; + op = NULL; if (ssock->ssl_state == SSL_STATE_HANDSHAKING) { /* Initial handshaking */ pj_status_t status; - - status = ssl_do_handshake(ssock); + + status = ssl_do_handshake_and_flush(ssock); /* Not pending is either success or failed */ if (status != PJ_EPENDING) return on_handshake_complete(ssock, status); - } else if (send_key != &ssock->handshake_op_key) { - /* Some data has been sent, notify application */ + } else if (app_key != &ssock->handshake_op_key && + app_key != &ssock->shutdown_op_key) + { + /* Application data has been sent, notify application */ if (ssock->param.cb.on_data_sent) { pj_bool_t ret; - ret = (*ssock->param.cb.on_data_sent)(ssock, app_key, + ret = (*ssock->param.cb.on_data_sent)(ssock, app_key, sent_len); if (!ret) { /* We've been destroyed */ @@ -959,18 +874,12 @@ static pj_bool_t ssock_on_data_sent (pj_ssl_sock_t *ssock, } } } else { - /* SSL re-negotiation is on-progress, just do nothing */ + /* Handshake or shutdown send, no app callback needed */ } - /* Send buffer has been updated, let's try to send any pending data */ - if (ssock->send_buf_pending.data_len) { - pj_status_t status; - status = flush_circ_buf_output(ssock, ssock->send_buf_pending.app_key, - ssock->send_buf_pending.plain_data_len, - ssock->send_buf_pending.flags); - if (status == PJ_SUCCESS || status == PJ_EPENDING) { - ssock->send_buf_pending.data_len = 0; - } + /* Try to drain write_pending (delayed sends from renegotiation) */ + if (!pj_list_empty(&ssock->write_pending)) { + flush_delayed_send(ssock); } return PJ_TRUE; @@ -1160,18 +1069,6 @@ static pj_bool_t ssock_on_accept_complete (pj_ssl_sock_t *ssock_parent, pj_sockaddr_cp(&ssock->local_addr, &ssock_parent->local_addr); } - /* Prepare write/send state */ - pj_assert(ssock->send_buf.max_len == 0); - ssock->send_buf.buf = (char*) - pj_pool_alloc(ssock->pool, - ssock->param.send_buffer_size); - if (!ssock->send_buf.buf) - return PJ_ENOMEM; - - ssock->send_buf.max_len = ssock->param.send_buffer_size; - ssock->send_buf.start = ssock->send_buf.buf; - ssock->send_buf.len = 0; - /* Start handshake timer */ if (ssock->param.timer_heap && (ssock->param.timeout.sec != 0 || ssock->param.timeout.msec != 0)) @@ -1191,13 +1088,13 @@ static pj_bool_t ssock_on_accept_complete (pj_ssl_sock_t *ssock_parent, /* Prevent data race with on_data_read() until ssl_do_handshake() * completes. */ - if (ssock->circ_buf_input_mutex) - pj_lock_acquire(ssock->circ_buf_input_mutex); + if (ssock->ssl_read_buf_mutex) + pj_lock_acquire(ssock->ssl_read_buf_mutex); ssock->ssl_state = SSL_STATE_HANDSHAKING; ssl_set_state(ssock, PJ_TRUE); - status = ssl_do_handshake(ssock); - if (ssock->circ_buf_input_mutex) - pj_lock_release(ssock->circ_buf_input_mutex); + status = ssl_do_handshake_and_flush(ssock); + if (ssock->ssl_read_buf_mutex) + pj_lock_release(ssock->ssl_read_buf_mutex); on_return: if (ssock && status != PJ_EPENDING) { @@ -1228,19 +1125,23 @@ static pj_bool_t ssock_on_connect_complete (pj_ssl_sock_t *ssock, goto on_return; /* Prepare read buffer */ - ssock->asock_rbuf = (void**)pj_pool_calloc(ssock->pool, + ssock->asock_rbuf = (void**)pj_pool_calloc(ssock->pool, ssock->param.async_cnt, sizeof(void*)); - if (!ssock->asock_rbuf) - return PJ_ENOMEM; + if (!ssock->asock_rbuf) { + status = PJ_ENOMEM; + goto on_return; + } for (i = 0; iparam.async_cnt; ++i) { ssock->asock_rbuf[i] = (void*) pj_pool_alloc( - ssock->pool, - ssock->param.read_buffer_size + + ssock->pool, + ssock->param.read_buffer_size + sizeof(read_data_t*)); - if (!ssock->asock_rbuf[i]) - return PJ_ENOMEM; + if (!ssock->asock_rbuf[i]) { + status = PJ_ENOMEM; + goto on_return; + } } /* Start read */ @@ -1257,18 +1158,6 @@ static pj_bool_t ssock_on_connect_complete (pj_ssl_sock_t *ssock, if (status != PJ_SUCCESS) goto on_return; - /* Prepare write/send state */ - pj_assert(ssock->send_buf.max_len == 0); - ssock->send_buf.buf = (char*) - pj_pool_alloc(ssock->pool, - ssock->param.send_buffer_size); - if (!ssock->send_buf.buf) - return PJ_ENOMEM; - - ssock->send_buf.max_len = ssock->param.send_buffer_size; - ssock->send_buf.start = ssock->send_buf.buf; - ssock->send_buf.len = 0; - /* Set peer name */ ssl_set_peer_name(ssock); @@ -1276,7 +1165,7 @@ static pj_bool_t ssock_on_connect_complete (pj_ssl_sock_t *ssock, ssock->ssl_state = SSL_STATE_HANDSHAKING; ssl_set_state(ssock, PJ_FALSE); - status = ssl_do_handshake(ssock); + status = ssl_do_handshake_and_flush(ssock); if (status != PJ_EPENDING) goto on_return; @@ -1502,12 +1391,13 @@ PJ_DEF(pj_status_t) pj_ssl_sock_create (pj_pool_t *pool, ssock->info_pool = info_pool; ssock->sock = PJ_INVALID_SOCKET; ssock->ssl_state = SSL_STATE_NULL; - ssock->circ_buf_input.owner = ssock; - ssock->circ_buf_output.owner = ssock; + ssock->ssl_read_buf.owner = ssock; + ssock->ssl_write_buf.owner = ssock; ssock->handshake_status = PJ_EUNKNOWN; pj_list_init(&ssock->write_pending); pj_list_init(&ssock->write_pending_empty); - pj_list_init(&ssock->send_pending); + pj_list_init(&ssock->send_op_active); + pj_list_init(&ssock->send_op_free); pj_timer_entry_init(&ssock->timer, 0, ssock, &on_timer); pj_ioqueue_op_key_init(&ssock->handshake_op_key, sizeof(pj_ioqueue_op_key_t)); @@ -1516,14 +1406,14 @@ PJ_DEF(pj_status_t) pj_ssl_sock_create (pj_pool_t *pool, /* Create secure socket mutex */ status = pj_lock_create_recursive_mutex(pool, pool->obj_name, - &ssock->circ_buf_output_mutex); - ssock->write_mutex = ssock->circ_buf_output_mutex; + &ssock->ssl_write_buf_mutex); + ssock->write_mutex = ssock->ssl_write_buf_mutex; if (status != PJ_SUCCESS) return status; /* Create input circular buffer mutex */ status = pj_lock_create_recursive_mutex(pool, pool->obj_name, - &ssock->circ_buf_input_mutex); + &ssock->ssl_read_buf_mutex); if (status != PJ_SUCCESS) return status; @@ -1793,7 +1683,7 @@ PJ_DEF(pj_status_t) pj_ssl_sock_start_recvfrom2 (pj_ssl_sock_t *ssock, /* Write plain data to SSL and flush the buffer. */ -static pj_status_t ssl_send (pj_ssl_sock_t *ssock, +static pj_status_t ssl_send (pj_ssl_sock_t *ssock, pj_ioqueue_op_key_t *send_key, const void *data, pj_ssize_t size, @@ -1808,22 +1698,15 @@ static pj_status_t ssl_send (pj_ssl_sock_t *ssock, * until re-negotiation is completed. */ pj_lock_acquire(ssock->write_mutex); - /* Don't write to SSL if send buffer is full and some data is in - * write buffer already, just return PJ_ENOMEM. - */ - if (ssock->send_buf_pending.data_len) { - pj_lock_release(ssock->write_mutex); - return PJ_ENOMEM; - } status = ssl_write(ssock, data, size, &nwritten); pj_lock_release(ssock->write_mutex); - + if (status == PJ_SUCCESS && nwritten == size) { /* All data written, flush write buffer to network socket */ - status = flush_circ_buf_output(ssock, send_key, size, flags); + status = flush_ssl_write_buf(ssock, send_key, size, flags); } else if (status == PJ_ETRYAGAIN) { /* Re-negotiation is on progress, flush re-negotiation data */ - status = flush_circ_buf_output(ssock, &ssock->handshake_op_key, 0, 0); + status = flush_ssl_write_buf(ssock, &ssock->handshake_op_key, 0, 0); if (status == PJ_SUCCESS || status == PJ_EPENDING) { /* Just return PJ_EBUSY when re-negotiation is on progress */ status = PJ_EBUSY; @@ -1853,24 +1736,62 @@ static pj_status_t flush_delayed_send(pj_ssl_sock_t *ssock) while (!pj_list_empty(&ssock->write_pending)) { write_data_t *wp; + pj_ioqueue_op_key_t *app_key; + pj_ssize_t plain_data_len; pj_status_t status; wp = ssock->write_pending.next; + app_key = wp->app_key; + plain_data_len = wp->plain_data_len; /* Ticket #1573: Don't hold mutex while calling socket send. */ pj_lock_release(ssock->write_mutex); - status = ssl_send (ssock, &wp->key, wp->data.ptr, - wp->plain_data_len, wp->flags); + /* Pass the original app_key (not &wp->key) so the correct + * key propagates through to the final on_data_sent callback. + */ + status = ssl_send(ssock, app_key, wp->data.ptr, + plain_data_len, wp->flags); + + if (status == PJ_EPENDING) { + /* Data encrypted and queued for async sending. + * Remove wp to prevent double-processing on next flush. + */ + pj_lock_acquire(ssock->write_mutex); + pj_list_erase(wp); + pj_list_push_back(&ssock->write_pending_empty, wp); + pj_lock_release(ssock->write_mutex); + + ssock->flushing_write_pend = PJ_FALSE; + return PJ_EPENDING; + } + if (status != PJ_SUCCESS) { /* Reset ongoing flush flag first. */ ssock->flushing_write_pend = PJ_FALSE; return status; } + /* PJ_SUCCESS: sent synchronously. Remove wp. */ pj_lock_acquire(ssock->write_mutex); pj_list_erase(wp); pj_list_push_back(&ssock->write_pending_empty, wp); + pj_lock_release(ssock->write_mutex); + + /* Invoke callback — app originally got PJ_EPENDING from + * pj_ssl_sock_send, so it expects a completion callback. + */ + if (ssock->param.cb.on_data_sent) { + pj_bool_t ret; + ret = (*ssock->param.cb.on_data_sent)(ssock, app_key, + plain_data_len); + if (!ret) { + /* We've been destroyed. Do NOT touch ssock. */ + return PJ_SUCCESS; + } + } + + pj_lock_acquire(ssock->write_mutex); } /* Reset ongoing flush flag */ @@ -2232,10 +2153,17 @@ PJ_DEF(pj_status_t) pj_ssl_sock_start_connect2( status = pj_activesock_start_connect(ssock->asock, pool, remaddr, addr_len); - if (status == PJ_SUCCESS) + if (status == PJ_SUCCESS) { + /* Synchronous connect completion. The callback handles + * local address update, SSL create, handshake, and user + * notification. Don't access ssock after this — the user + * callback may have destroyed it. + */ asock_on_connect_complete(ssock->asock, PJ_SUCCESS); - else if (status != PJ_EPENDING) + return PJ_EPENDING; + } else if (status != PJ_EPENDING) { goto on_error; + } /* Update local address */ ssock->addr_len = addr_len; @@ -2289,7 +2217,7 @@ PJ_DEF(pj_status_t) pj_ssl_sock_renegotiate(pj_ssl_sock_t *ssock) status = ssl_renegotiate(ssock); if (status == PJ_SUCCESS) { - status = ssl_do_handshake(ssock); + status = ssl_do_handshake_and_flush(ssock); } return status; diff --git a/pjlib/src/pj/ssl_sock_imp_common.h b/pjlib/src/pj/ssl_sock_imp_common.h index 889ab2acbf..c775ebb9ac 100644 --- a/pjlib/src/pj/ssl_sock_imp_common.h +++ b/pjlib/src/pj/ssl_sock_imp_common.h @@ -69,14 +69,36 @@ typedef struct write_data_t { } write_data_t; /* - * Structure of SSL socket write buffer (circular buffer). + * Per-send operation data. Each op is a single contiguous allocation + * (header + embedded encrypted data buffer) from its own pool, so + * the pool can be released when the op is discarded from the free list. + * Replaces the old send_buf ring buffer + write_data_t for network sends. */ -typedef struct send_buf_t { - char *buf; - pj_size_t max_len; - char *start; - pj_size_t len; -} send_buf_t; + +/* Minimum encrypted data buffer size. Ensures small sends get reusable + * buffers. Override in config_site.h if needed. + */ +#ifndef PJ_SSL_SEND_OP_MIN_BUF_SIZE +# define PJ_SSL_SEND_OP_MIN_BUF_SIZE 4000 +#endif + +/* Maximum number of send ops kept in the free list for recycling. + * Excess ops have their pools released (true memory free). + */ +#ifndef PJ_SSL_SEND_OP_FREE_LIST_MAX +# define PJ_SSL_SEND_OP_FREE_LIST_MAX 4 +#endif + +typedef struct ssl_send_op_t { + PJ_DECL_LIST_MEMBER(struct ssl_send_op_t); + pj_pool_t *pool; /* own pool, released on discard */ + pj_ioqueue_op_key_t key; /* internal op_key for activesock */ + pj_ioqueue_op_key_t *app_key; /* caller's op_key (for callback) */ + pj_size_t plain_data_len;/* plaintext length for callback */ + pj_size_t enc_len; /* actual encrypted data length */ + pj_size_t enc_buf_cap; /* embedded buffer capacity */ + char enc_data[1]; /* variable-length encrypted data */ +} ssl_send_op_t; /* Circular buffer object */ typedef struct circ_buf_t { @@ -133,18 +155,17 @@ struct pj_ssl_sock_t write_data_t write_pending;/* list of pending write to ssl */ write_data_t write_pending_empty; /* cache for write_pending */ pj_bool_t flushing_write_pend; /* flag of flushing is ongoing*/ - send_buf_t send_buf; - write_data_t send_buf_pending; /* send buffer is full but some - * data is queuing in wbio. */ - write_data_t send_pending; /* list of pending write to network */ - pj_lock_t *write_mutex; /* protect write BIO and send_buf */ + ssl_send_op_t send_op_active; /* list: in-flight send ops */ + ssl_send_op_t send_op_free; /* free list for recycling */ + unsigned send_op_free_cnt;/* free list count */ + pj_lock_t *write_mutex; /* protect ssl_write_buf & send ops */ pj_lock_t *asock_send_mutex; /* protect send order */ - circ_buf_t circ_buf_input; - pj_lock_t *circ_buf_input_mutex; + circ_buf_t ssl_read_buf; + pj_lock_t *ssl_read_buf_mutex; - circ_buf_t circ_buf_output; - pj_lock_t *circ_buf_output_mutex; + circ_buf_t ssl_write_buf; + pj_lock_t *ssl_write_buf_mutex; }; @@ -200,8 +221,8 @@ static void io_read(pj_ssl_sock_t *ssock, circ_buf_t *cb, static pj_status_t io_write(pj_ssl_sock_t *ssock, circ_buf_t *cb, const pj_uint8_t *src, pj_size_t len); -static write_data_t* alloc_send_data(pj_ssl_sock_t *ssock, pj_size_t len); -static void free_send_data(pj_ssl_sock_t *ssock, write_data_t *wdata); +static ssl_send_op_t* alloc_send_op(pj_ssl_sock_t *ssock, pj_size_t enc_len); +static void free_send_op(pj_ssl_sock_t *ssock, ssl_send_op_t *op); static pj_status_t flush_delayed_send(pj_ssl_sock_t *ssock); #ifdef SSL_SOCK_IMP_USE_CIRC_BUF diff --git a/pjlib/src/pj/ssl_sock_mbedtls.c b/pjlib/src/pj/ssl_sock_mbedtls.c index b75568e150..d52a75ae47 100644 --- a/pjlib/src/pj/ssl_sock_mbedtls.c +++ b/pjlib/src/pj/ssl_sock_mbedtls.c @@ -153,14 +153,14 @@ static int ssl_data_push(void *ctx, const unsigned char *buf, size_t len) { pj_ssl_sock_t *ssock = (pj_ssl_sock_t *)ctx; - pj_lock_acquire(ssock->circ_buf_output_mutex); - if (circ_write(&ssock->circ_buf_output, buf, len) != PJ_SUCCESS) { - pj_lock_release(ssock->circ_buf_output_mutex); + pj_lock_acquire(ssock->ssl_write_buf_mutex); + if (circ_write(&ssock->ssl_write_buf, buf, len) != PJ_SUCCESS) { + pj_lock_release(ssock->ssl_write_buf_mutex); return MBEDTLS_ERR_SSL_WANT_WRITE; } - pj_lock_release(ssock->circ_buf_output_mutex); + pj_lock_release(ssock->ssl_write_buf_mutex); return len; } @@ -171,19 +171,19 @@ static int ssl_data_pull(void *ctx, unsigned char *buf, size_t len) pj_size_t circ_buf_size; pj_size_t read_size; - pj_lock_acquire(ssock->circ_buf_input_mutex); + pj_lock_acquire(ssock->ssl_read_buf_mutex); - if (circ_empty(&ssock->circ_buf_input)) { - pj_lock_release(ssock->circ_buf_input_mutex); + if (circ_empty(&ssock->ssl_read_buf)) { + pj_lock_release(ssock->ssl_read_buf_mutex); return MBEDTLS_ERR_SSL_WANT_READ; } - circ_buf_size = circ_size(&ssock->circ_buf_input); + circ_buf_size = circ_size(&ssock->ssl_read_buf); read_size = PJ_MIN(circ_buf_size, len); - circ_read(&ssock->circ_buf_input, buf, read_size); - pj_lock_release(ssock->circ_buf_input_mutex); + circ_read(&ssock->ssl_read_buf, buf, read_size); + pj_lock_release(ssock->ssl_read_buf_mutex); return read_size; } @@ -623,12 +623,12 @@ static pj_status_t ssl_create(pj_ssl_sock_t *ssock) pj_assert(ssock); /* Initialize input circular buffer */ - status = circ_init(ssock->pool->factory, &ssock->circ_buf_input, 512); + status = circ_init(ssock->pool->factory, &ssock->ssl_read_buf, 512); if (status != PJ_SUCCESS) return status; /* Initialize output circular buffer */ - status = circ_init(ssock->pool->factory, &ssock->circ_buf_output, 512); + status = circ_init(ssock->pool->factory, &ssock->ssl_write_buf, 512); if (status != PJ_SUCCESS) { return status; } @@ -742,16 +742,16 @@ static void ssl_destroy(pj_ssl_sock_t *ssock) #endif /* Destroy circular buffers */ - circ_deinit(&ssock->circ_buf_input); - circ_deinit(&ssock->circ_buf_output); + circ_deinit(&ssock->ssl_read_buf); + circ_deinit(&ssock->ssl_write_buf); } /* Reset socket state. */ static void ssl_reset_sock_state(pj_ssl_sock_t *ssock) { - pj_lock_acquire(ssock->circ_buf_output_mutex); + pj_lock_acquire(ssock->ssl_write_buf_mutex); ssock->ssl_state = SSL_STATE_NULL; - pj_lock_release(ssock->circ_buf_output_mutex); + pj_lock_release(ssock->ssl_write_buf_mutex); ssl_close_sockets(ssock); } @@ -826,17 +826,11 @@ static pj_status_t ssl_do_handshake(pj_ssl_sock_t *ssock) { mbedtls_sock_t *mssock = (mbedtls_sock_t *)ssock; pj_status_t handshake_status; - pj_status_t status; int ret; ret = mbedtls_ssl_handshake(&mssock->ssl_ctx); handshake_status = ssl_status_from_err(ssock, ret); - status = flush_circ_buf_output(ssock, &ssock->handshake_op_key, 0, 0); - if (status != PJ_SUCCESS) { - PJ_LOG(2, (THIS_FILE, "Failed to send handshake packets")); - return status; - } if (handshake_status == PJ_EPENDING) return PJ_EPENDING; @@ -875,6 +869,7 @@ static pj_status_t ssl_read(pj_ssl_sock_t *ssock, void *data, int *size) } } +/* Caller must hold ssock->write_mutex. */ static pj_status_t ssl_write(pj_ssl_sock_t *ssock, const void *data, pj_ssize_t size, int *nwritten) { diff --git a/pjlib/src/pj/ssl_sock_ossl.c b/pjlib/src/pj/ssl_sock_ossl.c index c96e793bf2..ff846f7e60 100644 --- a/pjlib/src/pj/ssl_sock_ossl.c +++ b/pjlib/src/pj/ssl_sock_ossl.c @@ -691,10 +691,22 @@ static pj_status_t init_openssl(void) { pj_status_t status; - if (openssl_init_count) + if (openssl_init_count == 1) return PJ_SUCCESS; - openssl_init_count = 1; + /* Use count as a simple state: 0=not started, -1=in progress, 1=done. + * This prevents a race where a concurrent caller sees count=1 (done) + * while initialization is still in progress. + */ + if (openssl_init_count == -1) { + /* Another thread is initializing. Spin briefly. */ + int retry; + for (retry = 0; retry < 100 && openssl_init_count == -1; ++retry) + pj_thread_sleep(10); + return (openssl_init_count == 1) ? PJ_SUCCESS : PJ_EBUSY; + } + + openssl_init_count = -1; PJ_LOG(4, (THIS_FILE, "OpenSSL version : %ld", OPENSSL_VERSION_NUMBER)); /* Register error subsystem */ @@ -750,9 +762,20 @@ static pj_status_t init_openssl(void) pj_assert(meth); ctx=SSL_CTX_new(meth); + if (!ctx) { + PJ_LOG(1, (THIS_FILE, "SSL_CTX_new() failed")); + openssl_init_count = 0; + return PJ_ENOMEM; + } SSL_CTX_set_cipher_list(ctx, "ALL:COMPLEMENTOFALL"); ssl = SSL_new(ctx); + if (!ssl) { + PJ_LOG(1, (THIS_FILE, "SSL_new() failed")); + SSL_CTX_free(ctx); + openssl_init_count = 0; + return PJ_ENOMEM; + } sk_cipher = SSL_get_ciphers(ssl); @@ -889,6 +912,7 @@ static pj_status_t init_openssl(void) PJ_LOG(1,(THIS_FILE, "Fatal error: failed to get application data index for " "SSL socket")); + openssl_init_count = 0; return status; } @@ -896,10 +920,13 @@ static pj_status_t init_openssl(void) PJ_SSL_SOCK_OSSL_USE_THREAD_CB != 0 && OPENSSL_VERSION_NUMBER < 0x10100000L status = init_ossl_lock(); - if (status != PJ_SUCCESS) + if (status != PJ_SUCCESS) { + openssl_init_count = 0; return status; + } #endif + openssl_init_count = 1; return status; } @@ -1656,7 +1683,9 @@ static pj_status_t ssl_create(pj_ssl_sock_t *ssock) pj_assert(ssock); /* Make sure OpenSSL library has been initialized */ - init_openssl(); + status = init_openssl(); + if (status != PJ_SUCCESS) + return status; set_entropy(ssock); @@ -1814,7 +1843,7 @@ static void ssl_reset_sock_state(pj_ssl_sock_t *ssock) if (post_unlock_flush_circ_buf) { /* Flush data to send close notify. */ - flush_circ_buf_output(ssock, &ssock->shutdown_op_key, 0, 0); + flush_ssl_write_buf(ssock, &ssock->shutdown_op_key, 0, 0); } ssl_close_sockets(ssock); @@ -1832,7 +1861,11 @@ static void ssl_reset_sock_state(pj_ssl_sock_t *ssock) static void ssl_ciphers_populate() { if (ssl_cipher_num == 0 || ssl_curves_num == 0) { - init_openssl(); + pj_status_t status = init_openssl(); + if (status != PJ_SUCCESS) { + PJ_PERROR(1, ("ossl", status, "Failed to initialize OpenSSL")); + return; + } shutdown_openssl(); } } @@ -2359,7 +2392,7 @@ static void update_certs_info(pj_ssl_sock_t* ssock, * this function (flushing all data in write BIO generated by above * OpenSSL API call). */ -static pj_status_t flush_circ_buf_output(pj_ssl_sock_t *ssock, +static pj_status_t flush_ssl_write_buf(pj_ssl_sock_t *ssock, pj_ioqueue_op_key_t *send_key, pj_size_t orig_len, unsigned flags); @@ -2442,14 +2475,6 @@ static pj_status_t ssl_do_handshake(pj_ssl_sock_t *ssock) err = SSL_do_handshake(ossock->ossl_ssl); pj_lock_release(ssock->write_mutex); - /* SSL_do_handshake() may put some pending data into SSL write BIO, - * flush it if any. - */ - status = flush_circ_buf_output(ssock, &ssock->handshake_op_key, 0, 0); - if (status != PJ_SUCCESS && status != PJ_EPENDING) { - return status; - } - if (err < 0) { int err2 = SSL_get_error(ossock->ossl_ssl, err); if (err2 != SSL_ERROR_NONE && err2 != SSL_ERROR_WANT_READ) @@ -2573,7 +2598,9 @@ static pj_status_t ssl_read(pj_ssl_sock_t *ssock, void *data, int *size) } -/* Write plain data to SSL and flush write BIO. */ +/* Write plain data to SSL and flush write BIO. + * Caller must hold ssock->write_mutex. + */ static pj_status_t ssl_write(pj_ssl_sock_t *ssock, const void *data, pj_ssize_t size, int *nwritten) { diff --git a/pjlib/src/pj/ssl_sock_schannel.c b/pjlib/src/pj/ssl_sock_schannel.c index 0a749f17c0..6dd6908632 100644 --- a/pjlib/src/pj/ssl_sock_schannel.c +++ b/pjlib/src/pj/ssl_sock_schannel.c @@ -335,12 +335,12 @@ static pj_status_t ssl_create(pj_ssl_sock_t *ssock) } /* Initialize input circular buffer */ - status = circ_init(pf, &ssock->circ_buf_input, read_cap); + status = circ_init(pf, &ssock->ssl_read_buf, read_cap); if (status != PJ_SUCCESS) goto on_return; /* Initialize output circular buffer */ - status = circ_init(pf, &ssock->circ_buf_output, write_cap); + status = circ_init(pf, &ssock->ssl_write_buf, write_cap); if (status != PJ_SUCCESS) goto on_return; @@ -364,8 +364,8 @@ static void ssl_destroy(pj_ssl_sock_t* ssock) sch_ssl_sock_t* sch_ssock = (sch_ssl_sock_t*)ssock; /* Destroy circular buffers */ - circ_deinit(&ssock->circ_buf_input); - circ_deinit(&ssock->circ_buf_output); + circ_deinit(&ssock->ssl_read_buf); + circ_deinit(&ssock->ssl_write_buf); circ_deinit(&sch_ssock->decrypted_buf); /* Free certificate */ @@ -429,14 +429,14 @@ static void ssl_reset_sock_state(pj_ssl_sock_t* ssock) if (buf_out->cbBuffer > 0 && buf_out[0].pvBuffer) { pj_status_t status; - status = circ_write(&ssock->circ_buf_output, + status = circ_write(&ssock->ssl_write_buf, buf_out[0].pvBuffer, buf_out[0].cbBuffer); if (status != PJ_SUCCESS) { PJ_PERROR(1, (SNAME(ssock), status, "Failed to queuehandshake packets")); } else { - flush_circ_buf_output(ssock, &ssock->shutdown_op_key, + flush_ssl_write_buf(ssock, &ssock->shutdown_op_key, 0, 0); } } @@ -456,8 +456,8 @@ static void ssl_reset_sock_state(pj_ssl_sock_t* ssock) FreeCredentialsHandle(&sch_ssock->cred_handle); SecInvalidateHandle(&sch_ssock->cred_handle); } - circ_reset(&ssock->circ_buf_input); - circ_reset(&ssock->circ_buf_output); + circ_reset(&ssock->ssl_read_buf); + circ_reset(&ssock->ssl_write_buf); circ_reset(&sch_ssock->decrypted_buf); pj_lock_release(ssock->write_mutex); @@ -1244,13 +1244,13 @@ static pj_status_t ssl_do_handshake(pj_ssl_sock_t* ssock) /* Start handshake iteration */ - pj_lock_acquire(ssock->circ_buf_input_mutex); + pj_lock_acquire(ssock->ssl_read_buf_mutex); - if (!circ_empty(&ssock->circ_buf_input) && !renego_req) { + if (!circ_empty(&ssock->ssl_read_buf) && !renego_req) { data_in = sch_ssock->read_buf; data_in_size = PJ_MIN(sch_ssock->read_buf_cap, - circ_size(&ssock->circ_buf_input)); - circ_read(&ssock->circ_buf_input, data_in, data_in_size); + circ_size(&ssock->ssl_read_buf)); + circ_read(&ssock->ssl_read_buf, data_in, data_in_size); } SecBuffer buf_in[2] = { {0} }; @@ -1318,7 +1318,7 @@ static pj_status_t ssl_do_handshake(pj_ssl_sock_t* ssock) /* Check for any unprocessed input data, put it back to buffer */ if (buf_in[1].BufferType==SECBUFFER_EXTRA && buf_in[1].cbBuffer>0) { - circ_read_cancel(&ssock->circ_buf_input, buf_in[1].cbBuffer); + circ_read_cancel(&ssock->ssl_read_buf, buf_in[1].cbBuffer); } if (ss == SEC_E_OK && !renego_req) { @@ -1380,7 +1380,7 @@ static pj_status_t ssl_do_handshake(pj_ssl_sock_t* ssock) LOG_DEBUG_ERR(SNAME(ssock), "Handshake progress", ss); /* Put back the incomplete message */ - circ_read_cancel(&ssock->circ_buf_input, data_in_size); + circ_read_cancel(&ssock->ssl_read_buf, data_in_size); } else if (!renego_req) { @@ -1389,13 +1389,13 @@ static pj_status_t ssl_do_handshake(pj_ssl_sock_t* ssock) status = sec_err_to_pj(ss); } - pj_lock_release(ssock->circ_buf_input_mutex); + pj_lock_release(ssock->ssl_read_buf_mutex); if ((ss == SEC_E_OK || ss == SEC_I_CONTINUE_NEEDED) && buf_out[0].cbBuffer > 0 && buf_out[0].pvBuffer) { /* Queue output data to send */ - status2 = circ_write(&ssock->circ_buf_output, buf_out[0].pvBuffer, + status2 = circ_write(&ssock->ssl_write_buf, buf_out[0].pvBuffer, buf_out[0].cbBuffer); if (status2 != PJ_SUCCESS) { PJ_PERROR(1,(SNAME(ssock), status2, @@ -1404,14 +1404,6 @@ static pj_status_t ssl_do_handshake(pj_ssl_sock_t* ssock) } } - /* Send handshake packets to wire */ - status2 = flush_circ_buf_output(ssock, &ssock->handshake_op_key, 0, 0); - if (status2 != PJ_SUCCESS && status2 != PJ_EPENDING) { - PJ_PERROR(1,(SNAME(ssock), status2, - "Failed to send handshake packets")); - status = status2; - } - on_return: pj_lock_release(ssock->write_mutex); @@ -1448,7 +1440,7 @@ static pj_status_t ssl_read(pj_ssl_sock_t* ssock, void* data, int* size) /* Avoid compile warning of unused debugging var */ PJ_UNUSED_ARG(requested); - pj_lock_acquire(ssock->circ_buf_input_mutex); + pj_lock_acquire(ssock->ssl_read_buf_mutex); /* Try read from the decrypted buffer */ size_ = circ_size(&sch_ssock->decrypted_buf); @@ -1458,7 +1450,7 @@ static pj_status_t ssl_read(pj_ssl_sock_t* ssock, void* data, int* size) *size = (int)need; LOG_DEBUG1(SNAME(ssock), "Read %d: returned all from decrypted buffer.", requested); - pj_lock_release(ssock->circ_buf_input_mutex); + pj_lock_release(ssock->ssl_read_buf_mutex); return PJ_SUCCESS; } @@ -1470,15 +1462,15 @@ static pj_status_t ssl_read(pj_ssl_sock_t* ssock, void* data, int* size) need -= (int)size_; /* Decrypt data of network input buffer */ - if (!circ_empty(&ssock->circ_buf_input)) { + if (!circ_empty(&ssock->ssl_read_buf)) { data_ = sch_ssock->read_buf; size_ = PJ_MIN(sch_ssock->read_buf_cap, - circ_size(&ssock->circ_buf_input)); - circ_read(&ssock->circ_buf_input, data_, size_); + circ_size(&ssock->ssl_read_buf)); + circ_read(&ssock->ssl_read_buf, data_, size_); } else { LOG_DEBUG2(SNAME(ssock), "Read %d: no data to decrypt, returned %d.", requested, *size); - pj_lock_release(ssock->circ_buf_input_mutex); + pj_lock_release(ssock->ssl_read_buf_mutex); return PJ_SUCCESS; } @@ -1500,7 +1492,7 @@ static pj_status_t ssl_read(pj_ssl_sock_t* ssock, void* data, int* size) /* Check for any unprocessed input data, put it back to buffer */ i = find_sec_buffer(buf, ARRAYSIZE(buf), SECBUFFER_EXTRA); if (i >= 0) { - circ_read_cancel(&ssock->circ_buf_input, buf[i].cbBuffer); + circ_read_cancel(&ssock->ssl_read_buf, buf[i].cbBuffer); } /* Process any decrypted data */ @@ -1534,7 +1526,7 @@ static pj_status_t ssl_read(pj_ssl_sock_t* ssock, void* data, int* size) else if (ss == SEC_E_INCOMPLETE_MESSAGE) { /* Put back the incomplete message */ - circ_read_cancel(&ssock->circ_buf_input, size_); + circ_read_cancel(&ssock->ssl_read_buf, size_); } else if (ss == SEC_I_RENEGOTIATE) { @@ -1545,7 +1537,7 @@ static pj_status_t ssl_read(pj_ssl_sock_t* ssock, void* data, int* size) i = find_sec_buffer(buf, ARRAYSIZE(buf), SECBUFFER_EXTRA); if (i >= 0 && buf[i].pvBuffer && buf[i].cbBuffer) { /* Queue the token as input in the handshake */ - circ_write(&ssock->circ_buf_input, buf[i].pvBuffer, + circ_write(&ssock->ssl_read_buf, buf[i].pvBuffer, buf[i].cbBuffer); } @@ -1567,13 +1559,14 @@ static pj_status_t ssl_read(pj_ssl_sock_t* ssock, void* data, int* size) status = sec_err_to_pj(ss); } - pj_lock_release(ssock->circ_buf_input_mutex); + pj_lock_release(ssock->ssl_read_buf_mutex); LOG_DEBUG2(SNAME(ssock), "Read %d: returned=%d.", requested, *size); return status; } +/* Caller must hold ssock->write_mutex. */ static pj_status_t ssl_write(pj_ssl_sock_t* ssock, const void* data, pj_ssize_t size, int* nwritten) { @@ -1622,7 +1615,7 @@ static pj_status_t ssl_write(pj_ssl_sock_t* ssock, const void* data, out_size = (pj_ssize_t)buf[0].cbBuffer + buf[1].cbBuffer + buf[2].cbBuffer; - status = circ_write(&ssock->circ_buf_output, sch_ssock->write_buf, + status = circ_write(&ssock->ssl_write_buf, sch_ssock->write_buf, out_size); if (status != PJ_SUCCESS) { PJ_PERROR(1, (SNAME(ssock), status, diff --git a/pjlib/src/pjlib-test/ssl_sock.c b/pjlib/src/pjlib-test/ssl_sock.c index d9b4ea8bf1..ccedc0d931 100644 --- a/pjlib/src/pjlib-test/ssl_sock.c +++ b/pjlib/src/pjlib-test/ssl_sock.c @@ -16,23 +16,7 @@ * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ -#include "test.h" -#include - - -#define CERT_DIR "../build/" -#if (PJ_SSL_SOCK_IMP == PJ_SSL_SOCK_IMP_DARWIN) || \ - (PJ_SSL_SOCK_IMP == PJ_SSL_SOCK_IMP_APPLE) -/* If we use Darwin SSL, use the cert in DER format. */ -# define CERT_CA_FILE CERT_DIR "cacert.der" -#else -# define CERT_CA_FILE CERT_DIR "cacert.pem" -#endif -#define CERT_FILE CERT_DIR "cacert.pem" -#define CERT_PRIVKEY_FILE CERT_DIR "privkey.pem" -#define CERT_PRIVKEY_PASS "privkeypass" - -#define TEST_LOAD_FROM_FILES 1 +#include "ssl_sock_test.h" /* Test direct certificate loading. * For OpenSSL backend only and TEST_LOAD_FROM_FILES must be 1. @@ -81,6 +65,7 @@ struct test_state { pj_pool_t *pool; /* pool */ pj_ioqueue_t *ioqueue; /* ioqueue */ + pj_ssl_sock_t *accepted_ssock; /* accepted server-side socket */ pj_bool_t is_server; /* server role flag */ pj_bool_t is_verbose; /* verbose flag, e.g: cert info */ pj_bool_t echo; /* echo received data */ @@ -231,6 +216,9 @@ static pj_bool_t ssl_on_accept_complete(pj_ssl_sock_t *ssock, *st = *parent_st; pj_ssl_sock_set_user_data(newsock, st); + /* Track accepted socket for cleanup */ + parent_st->accepted_ssock = newsock; + status = pj_ssl_sock_get_info(newsock, &info); if (status != PJ_SUCCESS) { app_perror("...ERROR pj_ssl_sock_get_info()", status); @@ -1243,10 +1231,21 @@ static int client_non_ssl(unsigned ms_timeout) PJ_LOG(3, ("", "...Done!")); on_return: - if (ssock_serv) - pj_ssl_sock_close(ssock_serv); if (asock_cli && !state_cli.err && !state_cli.done) pj_activesock_close(asock_cli); + if (state_serv.accepted_ssock) + pj_ssl_sock_close(state_serv.accepted_ssock); + if (ssock_serv) + pj_ssl_sock_close(ssock_serv); + + /* Poll to let deferred socket destruction complete */ + if (ioqueue) { + pj_time_val delay = {0, 500}; + int n = 50; + while (n-- > 0 && pj_ioqueue_poll(ioqueue, &delay) > 0) + ; + } + if (timer) pj_timer_heap_destroy(timer); if (ioqueue) @@ -1665,8 +1664,8 @@ static int perf_test(unsigned clients, unsigned ms_handshake_timeout) if (state_cli[i].err != PJ_SUCCESS) cli_err++; - tot_sent += state_cli[1].sent; - tot_recv += state_cli[1].recv; + tot_sent += state_cli[i].sent; + tot_recv += state_cli[i].recv; } PJ_LOG(3, ("", ".....Clients: %d (%d errors)", clients, cli_err)); @@ -1692,34 +1691,371 @@ static int perf_test(unsigned clients, unsigned ms_handshake_timeout) } #endif -#if 0 && (!defined(PJ_SYMBIAN) || PJ_SYMBIAN==0) -pj_status_t pj_ssl_sock_ossl_test_send_buf(pj_pool_t *pool); -static int ossl_test_send_buf() +/* Stress tests (send_load, close_pending, bidir, mt_send_load) are in + * ssl_sock_stress.c, registered as ssl_sock_stress_test(). + */ + + +/* + * Large message test: send 64KB data (multiple TLS records), + * verify byte-for-byte echo. Uses dedicated state/callbacks with + * larger buffers than the echo_test infrastructure. + */ +#define LARGE_MSG_SIZE (64 * 1024) +#define LARGE_MSG_BUF_SIZE 8192 + +struct large_msg_state { - pj_pool_t *pool; + pj_pool_t *pool; + pj_ssl_sock_t *accepted_ssock; + pj_bool_t is_server; + pj_bool_t echo; + pj_status_t err; + pj_size_t sent; + pj_size_t recv; + pj_uint8_t read_buf[LARGE_MSG_BUF_SIZE]; + pj_bool_t done; + char *send_str; + pj_size_t send_str_len; + const char *check_echo_ptr; + pj_ioqueue_op_key_t send_key; +}; + +static pj_bool_t lm_on_connect_complete(pj_ssl_sock_t *ssock, + pj_status_t status) +{ + struct large_msg_state *st = (struct large_msg_state *) + pj_ssl_sock_get_user_data(ssock); + void *read_buf[1]; + + if (status != PJ_SUCCESS) { + st->err = status; + return PJ_FALSE; + } + + read_buf[0] = st->read_buf; + status = pj_ssl_sock_start_read2(ssock, st->pool, + sizeof(st->read_buf), + (void **)read_buf, 0); + if (status != PJ_SUCCESS) { + st->err = status; + return PJ_FALSE; + } + + /* Start sending */ + while (st->sent < st->send_str_len) { + pj_ssize_t size = st->send_str_len - st->sent; + + status = pj_ssl_sock_send(ssock, &st->send_key, + st->send_str + st->sent, &size, 0); + if (status == PJ_SUCCESS) { + st->sent += size; + } else { + if (status != PJ_EPENDING) + st->err = status; + break; + } + } + + return (st->err == PJ_SUCCESS) ? PJ_TRUE : PJ_FALSE; +} + +static pj_bool_t lm_on_accept_complete(pj_ssl_sock_t *ssock, + pj_ssl_sock_t *newsock, + const pj_sockaddr_t *src_addr, + int src_addr_len, + pj_status_t accept_status) +{ + struct large_msg_state *parent_st = (struct large_msg_state *) + pj_ssl_sock_get_user_data(ssock); + struct large_msg_state *st; + void *read_buf[1]; pj_status_t status; - pool = pj_pool_create(mem, "send_buf", 256, 256, NULL); - status = pj_ssl_sock_ossl_test_send_buf(pool); - pj_pool_release(pool); - return status; + PJ_UNUSED_ARG(src_addr); + PJ_UNUSED_ARG(src_addr_len); + + if (accept_status != PJ_SUCCESS) + return PJ_FALSE; + + st = (struct large_msg_state *)pj_pool_zalloc(parent_st->pool, + sizeof(struct large_msg_state)); + *st = *parent_st; + st->sent = 0; + st->recv = 0; + st->done = PJ_FALSE; + st->err = PJ_SUCCESS; + pj_ssl_sock_set_user_data(newsock, st); + + /* Track accepted socket for cleanup */ + parent_st->accepted_ssock = newsock; + + read_buf[0] = st->read_buf; + status = pj_ssl_sock_start_read2(newsock, st->pool, + sizeof(st->read_buf), + (void **)read_buf, 0); + if (status != PJ_SUCCESS) { + st->err = status; + return PJ_FALSE; + } + + return PJ_TRUE; } -#else -static int ossl_test_send_buf() + +static pj_bool_t lm_on_data_read(pj_ssl_sock_t *ssock, + void *data, + pj_size_t size, + pj_status_t status, + pj_size_t *remainder) { - return 0; + struct large_msg_state *st = (struct large_msg_state *) + pj_ssl_sock_get_user_data(ssock); + + if (remainder) + *remainder = 0; + + if (size > 0) { + st->recv += size; + + /* Server: echo data back */ + if (st->echo) { + pj_ssize_t sz = (pj_ssize_t)size; + pj_status_t s; + + s = pj_ssl_sock_send(ssock, &st->send_key, data, &sz, 0); + if (s == PJ_SUCCESS) + st->sent += sz; + else if (s != PJ_EPENDING) + st->err = s; + } + + /* Client: verify echoed data byte-for-byte */ + if (st->check_echo_ptr) { + if (pj_memcmp(st->check_echo_ptr, data, size)) { + PJ_LOG(1, ("", "...ERROR echoed data mismatch at " + "offset %lu", + (unsigned long)(st->check_echo_ptr - + st->send_str))); + st->err = PJ_EINVAL; + } + st->check_echo_ptr += size; + + if (st->recv >= st->send_str_len) + st->done = PJ_TRUE; + } + } + + if (status != PJ_SUCCESS) { + if (status == PJ_EEOF) + st->done = PJ_TRUE; + else + st->err = status; + } + + if (st->err != PJ_SUCCESS || st->done) + return PJ_FALSE; + + return PJ_TRUE; } -#endif + +static pj_bool_t lm_on_data_sent(pj_ssl_sock_t *ssock, + pj_ioqueue_op_key_t *op_key, + pj_ssize_t sent) +{ + struct large_msg_state *st = (struct large_msg_state *) + pj_ssl_sock_get_user_data(ssock); + PJ_UNUSED_ARG(op_key); + + if (sent < 0) { + st->err = (pj_status_t)-sent; + return PJ_FALSE; + } + + st->sent += sent; + + /* Client: continue sending remaining data */ + if (st->send_str && st->sent < st->send_str_len) { + while (st->sent < st->send_str_len) { + pj_ssize_t size = st->send_str_len - st->sent; + pj_status_t s; + + s = pj_ssl_sock_send(ssock, &st->send_key, + st->send_str + st->sent, &size, 0); + if (s == PJ_SUCCESS) { + st->sent += size; + } else { + if (s != PJ_EPENDING) + st->err = s; + break; + } + } + } + + return (st->err == PJ_SUCCESS) ? PJ_TRUE : PJ_FALSE; +} + +static int large_msg_test(void) +{ + pj_pool_t *pool = NULL; + pj_ioqueue_t *ioqueue = NULL; + pj_timer_heap_t *timer = NULL; + pj_ssl_sock_t *ssock_serv = NULL; + pj_ssl_sock_t *ssock_cli = NULL; + pj_ssl_sock_param param; + struct large_msg_state state_serv; + struct large_msg_state state_cli; + pj_sockaddr addr, listen_addr; + pj_status_t status; + char *large_buf; + int i; + + pool = pj_pool_create(mem, "ssl_large", 256000, 4096, NULL); + + pj_bzero(&state_serv, sizeof(state_serv)); + pj_bzero(&state_cli, sizeof(state_cli)); + + status = pj_ioqueue_create(pool, 4, &ioqueue); + if (status != PJ_SUCCESS) { + app_perror("...large_msg_test: ioqueue create", status); + goto on_return; + } + + status = pj_timer_heap_create(pool, 4, &timer); + if (status != PJ_SUCCESS) { + app_perror("...large_msg_test: timer create", status); + goto on_return; + } + + pj_ssl_sock_param_default(¶m); + param.cb.on_accept_complete2 = &lm_on_accept_complete; + param.cb.on_connect_complete = &lm_on_connect_complete; + param.cb.on_data_read = &lm_on_data_read; + param.cb.on_data_sent = &lm_on_data_sent; + param.ioqueue = ioqueue; + param.timer_heap = timer; + param.proto = PJ_SSL_SOCK_PROTO_TLS1_2; + param.ciphers_num = 0; + + { + pj_str_t tmp_st; + pj_sockaddr_init(PJ_AF_INET, &addr, + pj_strset2(&tmp_st, "127.0.0.1"), 0); + } + + /* Fill send data with deterministic pattern */ + large_buf = (char *)pj_pool_alloc(pool, LARGE_MSG_SIZE); + for (i = 0; i < LARGE_MSG_SIZE; i++) + large_buf[i] = (char)(i & 0xFF); + + /* SERVER */ + state_serv.pool = pool; + state_serv.echo = PJ_TRUE; + state_serv.is_server = PJ_TRUE; + param.user_data = &state_serv; + param.require_client_cert = PJ_FALSE; + + listen_addr = addr; + status = ssl_test_create_server(pool, ¶m, "large_msg_test", + &ssock_serv, &listen_addr); + if (status != PJ_SUCCESS) + goto on_return; + + /* CLIENT */ + state_cli.pool = pool; + state_cli.is_server = PJ_FALSE; + state_cli.send_str = large_buf; + state_cli.send_str_len = LARGE_MSG_SIZE; + state_cli.check_echo_ptr = large_buf; + param.user_data = &state_cli; + + status = pj_ssl_sock_create(pool, ¶m, &ssock_cli); + if (status != PJ_SUCCESS) { + app_perror("...large_msg_test: client create", status); + goto on_return; + } + + status = pj_ssl_sock_start_connect(ssock_cli, pool, &addr, + &listen_addr, + pj_sockaddr_get_len(&addr)); + if (status == PJ_SUCCESS) { + lm_on_connect_complete(ssock_cli, PJ_SUCCESS); + } else if (status != PJ_EPENDING) { + app_perror("...large_msg_test: connect", status); + goto on_return; + } + + /* Poll until done */ + { + pj_timestamp t_start, t_now; + pj_uint32_t elapsed; + + pj_get_timestamp(&t_start); + while (!state_cli.err && !state_cli.done) { + pj_time_val delay = {0, 100}; + pj_ioqueue_poll(ioqueue, &delay); + pj_timer_heap_poll(timer, NULL); + + pj_get_timestamp(&t_now); + elapsed = pj_elapsed_msec(&t_start, &t_now); + if (elapsed > 30000) { + PJ_LOG(1, ("", "...large_msg_test TIMEOUT after 30s")); + status = PJ_ETIMEDOUT; + goto on_return; + } + } + } + + if (state_cli.err) { + status = state_cli.err; + app_perror("...large_msg_test client error", status); + goto on_return; + } + + PJ_LOG(3, ("", "...large_msg_test: sent=%lu, recv=%lu", + (unsigned long)state_cli.sent, + (unsigned long)state_cli.recv)); + + if (state_cli.recv < (pj_size_t)LARGE_MSG_SIZE) { + PJ_LOG(1, ("", "...large_msg_test: recv=%lu < expected=%d", + (unsigned long)state_cli.recv, LARGE_MSG_SIZE)); + status = PJ_EINVAL; + goto on_return; + } + + status = PJ_SUCCESS; + +on_return: + if (ssock_cli) + pj_ssl_sock_close(ssock_cli); + if (state_serv.accepted_ssock) + pj_ssl_sock_close(state_serv.accepted_ssock); + if (ssock_serv) + pj_ssl_sock_close(ssock_serv); + + /* Poll to drain pending events after close */ + if (ioqueue) { + pj_time_val delay = {0, 500}; + int n = 50; + while (n-- > 0 && pj_ioqueue_poll(ioqueue, &delay) > 0) + ; + } + + if (timer) + pj_timer_heap_destroy(timer); + if (ioqueue) + pj_ioqueue_destroy(ioqueue); + if (pool) + pj_pool_release(pool); + + return (status == PJ_SUCCESS) ? 0 : -1; +} + + int ssl_sock_test(void) { int ret; - PJ_LOG(3,("", "..test ossl send buf")); - ret = ossl_test_send_buf(); - if (ret != 0) - return ret; - PJ_LOG(3,("", "..get cipher list test")); ret = get_cipher_list(); if (ret != 0) @@ -1826,6 +2162,11 @@ int ssl_sock_test(void) return ret; #endif + PJ_LOG(3,("", "..large message test")); + ret = large_msg_test(); + if (ret != 0) + return ret; + #if WITH_BENCHMARK #if (PJ_SSL_SOCK_IMP != PJ_SSL_SOCK_IMP_MBEDTLS) PJ_LOG(3,("", "..performance test")); diff --git a/pjlib/src/pjlib-test/ssl_sock_stress.c b/pjlib/src/pjlib-test/ssl_sock_stress.c new file mode 100644 index 0000000000..0b972e6712 --- /dev/null +++ b/pjlib/src/pjlib-test/ssl_sock_stress.c @@ -0,0 +1,1436 @@ +/* + * Copyright (C) 2008-2011 Teluu Inc. (http://www.teluu.com) + * Copyright (C) 2003-2008 Benny Prijono + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ +#include "ssl_sock_test.h" + + +#if INCLUDE_SSLSOCK_TEST + +/* + * SSL send load test: blast many sends, verify all callbacks fire. + * Uses small SO_SNDBUF to force async (PJ_EPENDING) send path. + */ +#define SEND_LOAD_COUNT 100 +#define SEND_LOAD_PKT_LEN 512 + +struct send_load_state +{ + pj_pool_t *pool; + pj_ssl_sock_t *accepted_ssock; + pj_bool_t is_server; + pj_bool_t echo; + pj_status_t err; + pj_size_t sent; + pj_size_t recv; + pj_uint8_t read_buf[8192]; + pj_bool_t done; + int pending_cnt; + int sent_cb_cnt; + int send_idx; + pj_ioqueue_op_key_t op_keys[SEND_LOAD_COUNT]; + char send_data[SEND_LOAD_PKT_LEN]; +}; + +static pj_bool_t load_on_connect_complete(pj_ssl_sock_t *ssock, + pj_status_t status) +{ + struct send_load_state *st = (struct send_load_state *) + pj_ssl_sock_get_user_data(ssock); + void *read_buf[1]; + int i; + + if (status != PJ_SUCCESS) { + st->err = status; + return PJ_FALSE; + } + + /* Start reading */ + read_buf[0] = st->read_buf; + status = pj_ssl_sock_start_read2(ssock, st->pool, + sizeof(st->read_buf), + (void **)read_buf, 0); + if (status != PJ_SUCCESS) { + st->err = status; + return PJ_FALSE; + } + + /* Blast sends — many rapid sends naturally trigger PJ_EPENDING + * as the SSL/network buffers fill up. + */ + for (i = 0; i < SEND_LOAD_COUNT; i++) { + pj_ssize_t len = SEND_LOAD_PKT_LEN; + + status = pj_ssl_sock_send(ssock, &st->op_keys[i], + st->send_data, &len, 0); + if (status == PJ_EPENDING) { + st->pending_cnt++; + } else if (status == PJ_SUCCESS) { + st->sent += len; + } else { + st->err = status; + return PJ_FALSE; + } + st->send_idx++; + } + + return PJ_TRUE; +} + +static pj_bool_t load_on_accept_complete(pj_ssl_sock_t *ssock, + pj_ssl_sock_t *newsock, + const pj_sockaddr_t *src_addr, + int src_addr_len, + pj_status_t accept_status) +{ + struct send_load_state *parent_st = (struct send_load_state *) + pj_ssl_sock_get_user_data(ssock); + struct send_load_state *st; + void *read_buf[1]; + pj_status_t status; + + PJ_UNUSED_ARG(src_addr); + PJ_UNUSED_ARG(src_addr_len); + + if (accept_status != PJ_SUCCESS) + return PJ_FALSE; + + st = (struct send_load_state *)pj_pool_zalloc(parent_st->pool, + sizeof(struct send_load_state)); + *st = *parent_st; + pj_ssl_sock_set_user_data(newsock, st); + + /* Track accepted socket for cleanup */ + parent_st->accepted_ssock = newsock; + + read_buf[0] = st->read_buf; + status = pj_ssl_sock_start_read2(newsock, st->pool, + sizeof(st->read_buf), + (void **)read_buf, 0); + if (status != PJ_SUCCESS) { + st->err = status; + return PJ_FALSE; + } + + return PJ_TRUE; +} + +static pj_bool_t load_on_data_read(pj_ssl_sock_t *ssock, + void *data, + pj_size_t size, + pj_status_t status, + pj_size_t *remainder) +{ + struct send_load_state *st = (struct send_load_state *) + pj_ssl_sock_get_user_data(ssock); + + if (remainder) + *remainder = 0; + + if (size > 0) { + st->recv += size; + + /* Server echoes data back */ + if (st->echo) { + pj_ssize_t sz = (pj_ssize_t)size; + pj_status_t s; + + s = pj_ssl_sock_send(ssock, &st->op_keys[0], data, &sz, 0); + if (s != PJ_SUCCESS && s != PJ_EPENDING) { + st->err = s; + } + } + + /* Client: check if all echoed data received */ + if (!st->is_server) { + pj_size_t expected = (pj_size_t)st->send_idx * + SEND_LOAD_PKT_LEN; + if (st->recv >= expected) + st->done = PJ_TRUE; + } + } + + if (status != PJ_SUCCESS) { + if (status == PJ_EEOF) { + st->done = PJ_TRUE; + } else { + st->err = status; + } + } + + if (st->err != PJ_SUCCESS || st->done) + return PJ_FALSE; + + return PJ_TRUE; +} + +static pj_bool_t load_on_data_sent(pj_ssl_sock_t *ssock, + pj_ioqueue_op_key_t *op_key, + pj_ssize_t sent) +{ + struct send_load_state *st = (struct send_load_state *) + pj_ssl_sock_get_user_data(ssock); + PJ_UNUSED_ARG(op_key); + + if (sent < 0) { + st->err = (pj_status_t)-sent; + return PJ_FALSE; + } + + st->sent += sent; + st->sent_cb_cnt++; + return PJ_TRUE; +} + +static int send_load_test(void) +{ + pj_pool_t *pool = NULL; + pj_ioqueue_t *ioqueue = NULL; + pj_timer_heap_t *timer = NULL; + pj_ssl_sock_t *ssock_serv = NULL; + pj_ssl_sock_t *ssock_cli = NULL; + pj_ssl_sock_param param; + struct send_load_state state_serv; + struct send_load_state state_cli; + pj_sockaddr addr, listen_addr; + + pj_status_t status; + int i; + + pool = pj_pool_create(mem, "ssl_load", 4096, 4096, NULL); + + pj_bzero(&state_serv, sizeof(state_serv)); + pj_bzero(&state_cli, sizeof(state_cli)); + + status = pj_ioqueue_create(pool, 4, &ioqueue); + if (status != PJ_SUCCESS) { + app_perror("...send_load_test: ioqueue create", status); + goto on_return; + } + + status = pj_timer_heap_create(pool, 4, &timer); + if (status != PJ_SUCCESS) { + app_perror("...send_load_test: timer create", status); + goto on_return; + } + + pj_ssl_sock_param_default(¶m); + param.cb.on_accept_complete2 = &load_on_accept_complete; + param.cb.on_connect_complete = &load_on_connect_complete; + param.cb.on_data_read = &load_on_data_read; + param.cb.on_data_sent = &load_on_data_sent; + param.ioqueue = ioqueue; + param.timer_heap = timer; + param.proto = PJ_SSL_SOCK_PROTO_TLS1_2; + param.ciphers_num = 0; + + { + pj_str_t tmp_st; + pj_sockaddr_init(PJ_AF_INET, &addr, + pj_strset2(&tmp_st, "127.0.0.1"), 0); + } + + /* Fill send data with pattern */ + for (i = 0; i < SEND_LOAD_PKT_LEN; i++) + state_cli.send_data[i] = (char)(i & 0xFF); + + /* SERVER */ + state_serv.pool = pool; + state_serv.echo = PJ_TRUE; + state_serv.is_server = PJ_TRUE; + param.user_data = &state_serv; + param.require_client_cert = PJ_FALSE; + + listen_addr = addr; + status = ssl_test_create_server(pool, ¶m, "send_load_test", + &ssock_serv, &listen_addr); + if (status != PJ_SUCCESS) + goto on_return; + + /* CLIENT */ + param.user_data = &state_cli; + param.require_client_cert = PJ_FALSE; + + state_cli.pool = pool; + state_cli.echo = PJ_FALSE; + state_cli.is_server = PJ_FALSE; + + status = pj_ssl_sock_create(pool, ¶m, &ssock_cli); + if (status != PJ_SUCCESS) { + app_perror("...send_load_test: client create", status); + goto on_return; + } + + status = pj_ssl_sock_start_connect(ssock_cli, pool, &addr, + &listen_addr, + pj_sockaddr_get_len(&addr)); + if (status == PJ_SUCCESS) { + load_on_connect_complete(ssock_cli, PJ_SUCCESS); + } else if (status != PJ_EPENDING) { + app_perror("...send_load_test: connect", status); + goto on_return; + } + + /* Poll until done or error */ + { + pj_timestamp t_start, t_now; + pj_uint32_t elapsed; + + pj_get_timestamp(&t_start); + while (!state_cli.err && !state_cli.done) { + pj_time_val delay = {0, 100}; + pj_ioqueue_poll(ioqueue, &delay); + pj_timer_heap_poll(timer, NULL); + + pj_get_timestamp(&t_now); + elapsed = pj_elapsed_msec(&t_start, &t_now); + if (elapsed > 30000) { + PJ_LOG(1, ("", "...send_load_test TIMEOUT after 30s")); + status = PJ_ETIMEDOUT; + goto on_return; + } + } + } + + if (state_cli.err) { + status = state_cli.err; + app_perror("...send_load_test client error", status); + goto on_return; + } + + /* Verify results */ + PJ_LOG(3, ("", "...send_load_test: sent=%lu, recv=%lu, " + "pending=%d, sent_cb=%d", + (unsigned long)state_cli.sent, + (unsigned long)state_cli.recv, + state_cli.pending_cnt, + state_cli.sent_cb_cnt)); + + if (state_cli.pending_cnt == 0) { + PJ_LOG(3, ("", "...NOTE: all sends completed synchronously, " + "async path NOT tested. Set PJ_IOQUEUE_FAST_TRACK=0 " + "in config_site.h to force async path.")); + } + + if (state_cli.sent_cb_cnt != state_cli.pending_cnt) { + PJ_LOG(1, ("", "...ERROR: sent callback count (%d) != " + "pending count (%d)", + state_cli.sent_cb_cnt, state_cli.pending_cnt)); + status = PJ_EBUG; + goto on_return; + } + + if (state_cli.sent != (pj_size_t)SEND_LOAD_COUNT * SEND_LOAD_PKT_LEN) { + PJ_LOG(1, ("", "...ERROR: total sent (%lu) != expected (%lu)", + (unsigned long)state_cli.sent, + (unsigned long)(SEND_LOAD_COUNT * SEND_LOAD_PKT_LEN))); + status = PJ_EBUG; + goto on_return; + } + + status = PJ_SUCCESS; + +on_return: + if (ssock_cli) + pj_ssl_sock_close(ssock_cli); + if (state_serv.accepted_ssock) + pj_ssl_sock_close(state_serv.accepted_ssock); + if (ssock_serv) + pj_ssl_sock_close(ssock_serv); + + /* Poll to drain pending events after close */ + if (ioqueue) { + pj_time_val delay = {0, 500}; + int n = 50; + while (n-- > 0 && pj_ioqueue_poll(ioqueue, &delay) > 0) + ; + } + + if (timer) + pj_timer_heap_destroy(timer); + if (ioqueue) + pj_ioqueue_destroy(ioqueue); + if (pool) + pj_pool_release(pool); + + return (status == PJ_SUCCESS) ? 0 : -1; +} + +/* + * Close under pending sends test: blast sends then immediately close socket. + * Verifies no crash or use-after-free. + */ +struct close_pending_state +{ + pj_pool_t *pool; + pj_ssl_sock_t *accepted_ssock; + pj_bool_t is_server; + pj_bool_t echo; + pj_status_t err; + pj_size_t sent; + pj_size_t recv; + pj_uint8_t read_buf[8192]; + pj_bool_t done; + int pending_cnt; + int sent_cb_cnt; + int send_idx; + pj_ioqueue_op_key_t op_keys[SEND_LOAD_COUNT]; + char send_data[SEND_LOAD_PKT_LEN]; + pj_bool_t blast_done; +}; + +static pj_bool_t cp_on_connect_complete(pj_ssl_sock_t *ssock, + pj_status_t status) +{ + struct close_pending_state *st = (struct close_pending_state *) + pj_ssl_sock_get_user_data(ssock); + void *read_buf[1]; + pj_ssize_t len; + int i; + + if (status != PJ_SUCCESS) { + st->err = status; + return PJ_FALSE; + } + + read_buf[0] = st->read_buf; + status = pj_ssl_sock_start_read2(ssock, st->pool, + sizeof(st->read_buf), + (void **)read_buf, 0); + if (status != PJ_SUCCESS) { + st->err = status; + return PJ_FALSE; + } + + /* Blast sends */ + for (i = 0; i < SEND_LOAD_COUNT; i++) { + len = SEND_LOAD_PKT_LEN; + status = pj_ssl_sock_send(ssock, &st->op_keys[i], + st->send_data, &len, 0); + if (status == PJ_EPENDING) { + st->pending_cnt++; + } else if (status == PJ_SUCCESS) { + st->sent += len; + } else { + /* Send error during blast, stop but don't fail test */ + break; + } + st->send_idx++; + } + + st->blast_done = PJ_TRUE; + return PJ_TRUE; +} + +static pj_bool_t cp_on_data_read(pj_ssl_sock_t *ssock, + void *data, + pj_size_t size, + pj_status_t status, + pj_size_t *remainder) +{ + struct close_pending_state *st = (struct close_pending_state *) + pj_ssl_sock_get_user_data(ssock); + + PJ_UNUSED_ARG(data); + + if (remainder) + *remainder = 0; + + if (size > 0) + st->recv += size; + + /* Server: just consume data, no echo needed for this test */ + if (status != PJ_SUCCESS && status != PJ_EEOF) { + st->err = status; + return PJ_FALSE; + } + + return PJ_TRUE; +} + +static pj_bool_t cp_on_data_sent(pj_ssl_sock_t *ssock, + pj_ioqueue_op_key_t *op_key, + pj_ssize_t sent) +{ + struct close_pending_state *st = (struct close_pending_state *) + pj_ssl_sock_get_user_data(ssock); + PJ_UNUSED_ARG(op_key); + + /* Tolerate errors — socket may be closing under us */ + if (sent > 0) + st->sent += sent; + st->sent_cb_cnt++; + + return PJ_TRUE; +} + +static int close_pending_test(void) +{ + pj_pool_t *pool = NULL; + pj_ioqueue_t *ioqueue = NULL; + pj_timer_heap_t *timer = NULL; + pj_ssl_sock_t *ssock_serv = NULL; + pj_ssl_sock_t *ssock_cli = NULL; + pj_ssl_sock_param param; + struct close_pending_state state_serv; + struct close_pending_state state_cli; + pj_sockaddr addr, listen_addr; + + pj_status_t status; + int i; + + pool = pj_pool_create(mem, "ssl_closep", 8192, 4096, NULL); + + pj_bzero(&state_serv, sizeof(state_serv)); + pj_bzero(&state_cli, sizeof(state_cli)); + + status = pj_ioqueue_create(pool, 4, &ioqueue); + if (status != PJ_SUCCESS) { + app_perror("...close_pending_test: ioqueue create", status); + goto on_return; + } + + status = pj_timer_heap_create(pool, 4, &timer); + if (status != PJ_SUCCESS) { + app_perror("...close_pending_test: timer create", status); + goto on_return; + } + + pj_ssl_sock_param_default(¶m); + param.cb.on_accept_complete2 = &load_on_accept_complete; + param.cb.on_connect_complete = &cp_on_connect_complete; + param.cb.on_data_read = &cp_on_data_read; + param.cb.on_data_sent = &cp_on_data_sent; + param.ioqueue = ioqueue; + param.timer_heap = timer; + param.proto = PJ_SSL_SOCK_PROTO_TLS1_2; + param.ciphers_num = 0; + + { + pj_str_t tmp_st; + pj_sockaddr_init(PJ_AF_INET, &addr, + pj_strset2(&tmp_st, "127.0.0.1"), 0); + } + + for (i = 0; i < SEND_LOAD_PKT_LEN; i++) + state_cli.send_data[i] = (char)(i & 0xFF); + + /* SERVER */ + state_serv.pool = pool; + state_serv.is_server = PJ_TRUE; + param.user_data = &state_serv; + param.require_client_cert = PJ_FALSE; + + listen_addr = addr; + status = ssl_test_create_server(pool, ¶m, "close_pending_test", + &ssock_serv, &listen_addr); + if (status != PJ_SUCCESS) + goto on_return; + + /* CLIENT */ + state_cli.pool = pool; + state_cli.is_server = PJ_FALSE; + param.user_data = &state_cli; + + status = pj_ssl_sock_create(pool, ¶m, &ssock_cli); + if (status != PJ_SUCCESS) { + app_perror("...close_pending_test: client create", status); + goto on_return; + } + + status = pj_ssl_sock_start_connect(ssock_cli, pool, &addr, + &listen_addr, + pj_sockaddr_get_len(&addr)); + if (status == PJ_SUCCESS) { + cp_on_connect_complete(ssock_cli, PJ_SUCCESS); + } else if (status != PJ_EPENDING) { + app_perror("...close_pending_test: connect", status); + goto on_return; + } + + /* Poll until blast is done */ + { + pj_timestamp t_start, t_now; + pj_uint32_t elapsed; + + pj_get_timestamp(&t_start); + while (!state_cli.blast_done && !state_cli.err) { + pj_time_val delay = {0, 100}; + pj_ioqueue_poll(ioqueue, &delay); + pj_timer_heap_poll(timer, NULL); + + pj_get_timestamp(&t_now); + elapsed = pj_elapsed_msec(&t_start, &t_now); + if (elapsed > 30000) { + PJ_LOG(1, ("", "...close_pending_test TIMEOUT")); + status = PJ_ETIMEDOUT; + goto on_return; + } + } + } + + /* Close client while sends may still be pending */ + PJ_LOG(3, ("", "...close_pending_test: closing client with " + "pending=%d", state_cli.pending_cnt)); + pj_ssl_sock_close(ssock_cli); + ssock_cli = NULL; + + /* Poll briefly to let server-side and async cleanup drain */ + { + pj_time_val delay = {0, 500}; + int n = 20; + while (n-- > 0) + pj_ioqueue_poll(ioqueue, &delay); + } + + PJ_LOG(3, ("", "...close_pending_test: completed (no crash)")); + status = PJ_SUCCESS; + +on_return: + if (ssock_cli) + pj_ssl_sock_close(ssock_cli); + if (state_serv.accepted_ssock) + pj_ssl_sock_close(state_serv.accepted_ssock); + if (ssock_serv) + pj_ssl_sock_close(ssock_serv); + + /* Poll to drain pending events after close */ + if (ioqueue) { + pj_time_val delay = {0, 500}; + int n = 50; + while (n-- > 0 && pj_ioqueue_poll(ioqueue, &delay) > 0) + ; + } + + if (timer) + pj_timer_heap_destroy(timer); + if (ioqueue) + pj_ioqueue_destroy(ioqueue); + if (pool) + pj_pool_release(pool); + + return (status == PJ_SUCCESS) ? 0 : -1; +} + + +/* + * Bidirectional simultaneous load test: both sides send independent data. + */ +#define BIDIR_SEND_COUNT 100 +#define BIDIR_PKT_LEN 1024 + +struct bidir_state +{ + pj_pool_t *pool; + pj_ssl_sock_t *accepted_ssock; + pj_bool_t is_server; + pj_status_t err; + pj_size_t sent; + pj_size_t recv; + pj_uint8_t read_buf[8192]; + pj_bool_t done; + int pending_cnt; + int sent_cb_cnt; + int send_idx; + pj_ioqueue_op_key_t op_keys[BIDIR_SEND_COUNT]; + char send_data[BIDIR_PKT_LEN]; + pj_size_t expected_recv; +}; + +static pj_bool_t bidir_blast_sends(pj_ssl_sock_t *ssock, + struct bidir_state *st) +{ + int i; + + for (i = st->send_idx; i < BIDIR_SEND_COUNT; i++) { + pj_ssize_t len = BIDIR_PKT_LEN; + pj_status_t s; + + s = pj_ssl_sock_send(ssock, &st->op_keys[i], + st->send_data, &len, 0); + if (s == PJ_EPENDING) { + st->pending_cnt++; + } else if (s == PJ_SUCCESS) { + st->sent += len; + } else { + st->err = s; + return PJ_FALSE; + } + st->send_idx++; + } + return PJ_TRUE; +} + +static pj_bool_t bidir_on_connect_complete(pj_ssl_sock_t *ssock, + pj_status_t status) +{ + struct bidir_state *st = (struct bidir_state *) + pj_ssl_sock_get_user_data(ssock); + void *read_buf[1]; + + if (status != PJ_SUCCESS) { + st->err = status; + return PJ_FALSE; + } + + read_buf[0] = st->read_buf; + status = pj_ssl_sock_start_read2(ssock, st->pool, + sizeof(st->read_buf), + (void **)read_buf, 0); + if (status != PJ_SUCCESS) { + st->err = status; + return PJ_FALSE; + } + + return bidir_blast_sends(ssock, st); +} + +static pj_bool_t bidir_on_accept_complete(pj_ssl_sock_t *ssock, + pj_ssl_sock_t *newsock, + const pj_sockaddr_t *src_addr, + int src_addr_len, + pj_status_t accept_status) +{ + struct bidir_state *parent_st = (struct bidir_state *) + pj_ssl_sock_get_user_data(ssock); + struct bidir_state *st; + void *read_buf[1]; + pj_status_t status; + + PJ_UNUSED_ARG(src_addr); + PJ_UNUSED_ARG(src_addr_len); + + if (accept_status != PJ_SUCCESS) + return PJ_FALSE; + + st = (struct bidir_state *)pj_pool_zalloc(parent_st->pool, + sizeof(struct bidir_state)); + *st = *parent_st; + st->send_idx = 0; + st->pending_cnt = 0; + st->sent_cb_cnt = 0; + st->sent = 0; + st->recv = 0; + st->done = PJ_FALSE; + st->err = PJ_SUCCESS; + pj_ssl_sock_set_user_data(newsock, st); + + /* Track accepted socket for cleanup */ + parent_st->accepted_ssock = newsock; + + read_buf[0] = st->read_buf; + status = pj_ssl_sock_start_read2(newsock, st->pool, + sizeof(st->read_buf), + (void **)read_buf, 0); + if (status != PJ_SUCCESS) { + st->err = status; + return PJ_FALSE; + } + + /* Server also blast-sends its own data */ + return bidir_blast_sends(newsock, st); +} + +static pj_bool_t bidir_on_data_read(pj_ssl_sock_t *ssock, + void *data, + pj_size_t size, + pj_status_t status, + pj_size_t *remainder) +{ + struct bidir_state *st = (struct bidir_state *) + pj_ssl_sock_get_user_data(ssock); + + PJ_UNUSED_ARG(data); + + if (remainder) + *remainder = 0; + + if (size > 0) { + st->recv += size; + if (st->recv >= st->expected_recv) + st->done = PJ_TRUE; + } + + if (status != PJ_SUCCESS) { + if (status == PJ_EEOF) + st->done = PJ_TRUE; + else + st->err = status; + } + + if (st->err != PJ_SUCCESS || st->done) + return PJ_FALSE; + + return PJ_TRUE; +} + +static pj_bool_t bidir_on_data_sent(pj_ssl_sock_t *ssock, + pj_ioqueue_op_key_t *op_key, + pj_ssize_t sent) +{ + struct bidir_state *st = (struct bidir_state *) + pj_ssl_sock_get_user_data(ssock); + PJ_UNUSED_ARG(op_key); + + if (sent < 0) { + st->err = (pj_status_t)-sent; + return PJ_FALSE; + } + + st->sent += sent; + st->sent_cb_cnt++; + return PJ_TRUE; +} + +static int bidir_test(void) +{ + pj_pool_t *pool = NULL; + pj_ioqueue_t *ioqueue = NULL; + pj_timer_heap_t *timer = NULL; + pj_ssl_sock_t *ssock_serv = NULL; + pj_ssl_sock_t *ssock_cli = NULL; + pj_ssl_sock_param param; + struct bidir_state state_serv; + struct bidir_state state_cli; + pj_sockaddr addr, listen_addr; + + pj_status_t status; + int i; + + pool = pj_pool_create(mem, "ssl_bidir", 8192, 4096, NULL); + + pj_bzero(&state_serv, sizeof(state_serv)); + pj_bzero(&state_cli, sizeof(state_cli)); + + status = pj_ioqueue_create(pool, 4, &ioqueue); + if (status != PJ_SUCCESS) { + app_perror("...bidir_test: ioqueue create", status); + goto on_return; + } + + status = pj_timer_heap_create(pool, 4, &timer); + if (status != PJ_SUCCESS) { + app_perror("...bidir_test: timer create", status); + goto on_return; + } + + pj_ssl_sock_param_default(¶m); + param.cb.on_accept_complete2 = &bidir_on_accept_complete; + param.cb.on_connect_complete = &bidir_on_connect_complete; + param.cb.on_data_read = &bidir_on_data_read; + param.cb.on_data_sent = &bidir_on_data_sent; + param.ioqueue = ioqueue; + param.timer_heap = timer; + param.proto = PJ_SSL_SOCK_PROTO_TLS1_2; + param.ciphers_num = 0; + + { + pj_str_t tmp_st; + pj_sockaddr_init(PJ_AF_INET, &addr, + pj_strset2(&tmp_st, "127.0.0.1"), 0); + } + + /* Fill send data with unique patterns per side */ + for (i = 0; i < BIDIR_PKT_LEN; i++) { + state_serv.send_data[i] = (char)(0xAA ^ (i & 0xFF)); + state_cli.send_data[i] = (char)(0xBB ^ (i & 0xFF)); + } + + /* SERVER */ + state_serv.pool = pool; + state_serv.is_server = PJ_TRUE; + state_serv.expected_recv = BIDIR_SEND_COUNT * BIDIR_PKT_LEN; + param.user_data = &state_serv; + param.require_client_cert = PJ_FALSE; + + listen_addr = addr; + status = ssl_test_create_server(pool, ¶m, "bidir_test", + &ssock_serv, &listen_addr); + if (status != PJ_SUCCESS) + goto on_return; + + /* CLIENT */ + state_cli.pool = pool; + state_cli.is_server = PJ_FALSE; + state_cli.expected_recv = BIDIR_SEND_COUNT * BIDIR_PKT_LEN; + param.user_data = &state_cli; + + status = pj_ssl_sock_create(pool, ¶m, &ssock_cli); + if (status != PJ_SUCCESS) { + app_perror("...bidir_test: client create", status); + goto on_return; + } + + status = pj_ssl_sock_start_connect(ssock_cli, pool, &addr, + &listen_addr, + pj_sockaddr_get_len(&addr)); + if (status == PJ_SUCCESS) { + bidir_on_connect_complete(ssock_cli, PJ_SUCCESS); + } else if (status != PJ_EPENDING) { + app_perror("...bidir_test: connect", status); + goto on_return; + } + + /* Poll until client done */ + { + pj_timestamp t_start, t_now; + pj_uint32_t elapsed; + + pj_get_timestamp(&t_start); + while (!state_cli.err && !state_cli.done) { + pj_time_val delay = {0, 100}; + pj_ioqueue_poll(ioqueue, &delay); + pj_timer_heap_poll(timer, NULL); + + pj_get_timestamp(&t_now); + elapsed = pj_elapsed_msec(&t_start, &t_now); + if (elapsed > 30000) { + PJ_LOG(1, ("", "...bidir_test TIMEOUT after 30s")); + status = PJ_ETIMEDOUT; + goto on_return; + } + } + } + + if (state_cli.err) { + status = state_cli.err; + app_perror("...bidir_test client error", status); + goto on_return; + } + + PJ_LOG(3, ("", "...bidir_test: cli sent=%lu recv=%lu " + "pending=%d sent_cb=%d", + (unsigned long)state_cli.sent, + (unsigned long)state_cli.recv, + state_cli.pending_cnt, + state_cli.sent_cb_cnt)); + + if (state_cli.recv < state_cli.expected_recv) { + PJ_LOG(1, ("", "...bidir_test: recv=%lu < expected=%lu", + (unsigned long)state_cli.recv, + (unsigned long)state_cli.expected_recv)); + status = PJ_EINVAL; + goto on_return; + } + + if (state_cli.pending_cnt != state_cli.sent_cb_cnt) { + PJ_LOG(1, ("", "...bidir_test: cli pending=%d != sent_cb=%d", + state_cli.pending_cnt, state_cli.sent_cb_cnt)); + status = PJ_EBUG; + goto on_return; + } + + status = PJ_SUCCESS; + +on_return: + if (ssock_cli) + pj_ssl_sock_close(ssock_cli); + if (state_serv.accepted_ssock) + pj_ssl_sock_close(state_serv.accepted_ssock); + if (ssock_serv) + pj_ssl_sock_close(ssock_serv); + + /* Poll to drain pending events after close */ + if (ioqueue) { + pj_time_val delay = {0, 500}; + int n = 50; + while (n-- > 0 && pj_ioqueue_poll(ioqueue, &delay) > 0) + ; + } + + if (timer) + pj_timer_heap_destroy(timer); + if (ioqueue) + pj_ioqueue_destroy(ioqueue); + if (pool) + pj_pool_release(pool); + + return (status == PJ_SUCCESS) ? 0 : -1; +} + + +/* + * Multi-threaded send load test: multiple clients with worker threads. + * This is the closest simulation to production (multiple SIP registrations + * over TLS with worker threads polling ioqueue). + */ +#if PJ_HAS_THREADS + +#define MT_WORKER_THREADS 3 +#define MT_CLIENTS 3 +#define MT_SEND_COUNT 100 +#define MT_SEND_PKT_LEN 512 + +struct mt_state +{ + pj_pool_t *pool; + pj_ssl_sock_t **accepted_arr; + int accepted_cnt; + pj_bool_t is_server; + pj_bool_t echo; + pj_status_t err; + pj_size_t sent; + pj_size_t recv; + pj_uint8_t read_buf[8192]; + pj_bool_t done; + int pending_cnt; + int sent_cb_cnt; + int send_idx; + pj_ioqueue_op_key_t op_keys[MT_SEND_COUNT]; + char send_data[MT_SEND_PKT_LEN]; +}; + +struct mt_test_ctx +{ + pj_ioqueue_t *ioqueue; + pj_timer_heap_t *timer; + pj_bool_t quit_flag; + pj_atomic_t *clients_done; +}; + +static pj_bool_t mt_on_connect_complete(pj_ssl_sock_t *ssock, + pj_status_t status) +{ + struct mt_state *st = (struct mt_state *) + pj_ssl_sock_get_user_data(ssock); + void *read_buf[1]; + pj_ssize_t len; + int i; + + if (status != PJ_SUCCESS) { + st->err = status; + return PJ_FALSE; + } + + read_buf[0] = st->read_buf; + status = pj_ssl_sock_start_read2(ssock, st->pool, + sizeof(st->read_buf), + (void **)read_buf, 0); + if (status != PJ_SUCCESS) { + st->err = status; + return PJ_FALSE; + } + + for (i = 0; i < MT_SEND_COUNT; i++) { + len = MT_SEND_PKT_LEN; + status = pj_ssl_sock_send(ssock, &st->op_keys[i], + st->send_data, &len, 0); + if (status == PJ_EPENDING) { + st->pending_cnt++; + } else if (status == PJ_SUCCESS) { + st->sent += len; + } else { + st->err = status; + return PJ_FALSE; + } + st->send_idx++; + } + + return PJ_TRUE; +} + +static pj_bool_t mt_on_accept_complete(pj_ssl_sock_t *ssock, + pj_ssl_sock_t *newsock, + const pj_sockaddr_t *src_addr, + int src_addr_len, + pj_status_t accept_status) +{ + struct mt_state *parent_st = (struct mt_state *) + pj_ssl_sock_get_user_data(ssock); + struct mt_state *st; + void *read_buf[1]; + pj_status_t status; + + PJ_UNUSED_ARG(src_addr); + PJ_UNUSED_ARG(src_addr_len); + + if (accept_status != PJ_SUCCESS) + return PJ_FALSE; + + st = (struct mt_state *)pj_pool_zalloc(parent_st->pool, + sizeof(struct mt_state)); + *st = *parent_st; + st->sent = 0; + st->recv = 0; + st->done = PJ_FALSE; + st->err = PJ_SUCCESS; + pj_ssl_sock_set_user_data(newsock, st); + + /* Track accepted socket for cleanup */ + if (parent_st->accepted_arr && + parent_st->accepted_cnt < MT_CLIENTS) + { + parent_st->accepted_arr[parent_st->accepted_cnt++] = newsock; + } + + read_buf[0] = st->read_buf; + status = pj_ssl_sock_start_read2(newsock, st->pool, + sizeof(st->read_buf), + (void **)read_buf, 0); + if (status != PJ_SUCCESS) { + st->err = status; + return PJ_FALSE; + } + + return PJ_TRUE; +} + +static pj_bool_t mt_on_data_read(pj_ssl_sock_t *ssock, + void *data, + pj_size_t size, + pj_status_t status, + pj_size_t *remainder) +{ + struct mt_state *st = (struct mt_state *) + pj_ssl_sock_get_user_data(ssock); + + if (remainder) + *remainder = 0; + + if (size > 0) { + st->recv += size; + + /* Server echoes data back */ + if (st->echo) { + pj_ssize_t sz = (pj_ssize_t)size; + pj_status_t s; + + s = pj_ssl_sock_send(ssock, &st->op_keys[0], data, &sz, 0); + if (s != PJ_SUCCESS && s != PJ_EPENDING) + st->err = s; + } + + /* Client: check completion */ + if (!st->is_server) { + pj_size_t expected = (pj_size_t)MT_SEND_COUNT * MT_SEND_PKT_LEN; + if (st->recv >= expected) + st->done = PJ_TRUE; + } + } + + if (status != PJ_SUCCESS) { + if (status == PJ_EEOF) + st->done = PJ_TRUE; + else + st->err = status; + } + + if (st->err != PJ_SUCCESS || st->done) + return PJ_FALSE; + + return PJ_TRUE; +} + +static pj_bool_t mt_on_data_sent(pj_ssl_sock_t *ssock, + pj_ioqueue_op_key_t *op_key, + pj_ssize_t sent) +{ + struct mt_state *st = (struct mt_state *) + pj_ssl_sock_get_user_data(ssock); + PJ_UNUSED_ARG(op_key); + + if (sent < 0) { + st->err = (pj_status_t)-sent; + return PJ_FALSE; + } + + st->sent += sent; + st->sent_cb_cnt++; + return PJ_TRUE; +} + +static int mt_worker_proc(void *arg) +{ + struct mt_test_ctx *ctx = (struct mt_test_ctx *)arg; + + while (!ctx->quit_flag) { + pj_time_val delay = {0, 20}; + pj_ioqueue_poll(ctx->ioqueue, &delay); + pj_timer_heap_poll(ctx->timer, NULL); + } + + return 0; +} + +static int mt_send_load_test(void) +{ + pj_pool_t *pool = NULL; + pj_ioqueue_t *ioqueue = NULL; + pj_timer_heap_t *timer = NULL; + pj_ssl_sock_t *ssock_serv = NULL; + pj_ssl_sock_t *ssock_cli[MT_CLIENTS]; + pj_ssl_sock_t *ssock_accepted[MT_CLIENTS]; + pj_thread_t *threads[MT_WORKER_THREADS]; + pj_ssl_sock_param param; + struct mt_state state_serv; + struct mt_state state_cli[MT_CLIENTS]; + struct mt_test_ctx ctx; + pj_sockaddr addr, listen_addr; + + pj_status_t status; + int i; + + pj_bzero(ssock_cli, sizeof(ssock_cli)); + pj_bzero(ssock_accepted, sizeof(ssock_accepted)); + pj_bzero(threads, sizeof(threads)); + pj_bzero(&ctx, sizeof(ctx)); + + pool = pj_pool_create(mem, "ssl_mt", 32000, 4096, NULL); + + pj_bzero(&state_serv, sizeof(state_serv)); + pj_bzero(state_cli, sizeof(state_cli)); + + status = pj_ioqueue_create(pool, 4 + MT_CLIENTS * 2, &ioqueue); + if (status != PJ_SUCCESS) { + app_perror("...mt_send_load_test: ioqueue create", status); + goto on_return; + } + + status = pj_timer_heap_create(pool, 4 + MT_CLIENTS * 2, &timer); + if (status != PJ_SUCCESS) { + app_perror("...mt_send_load_test: timer create", status); + goto on_return; + } + + ctx.ioqueue = ioqueue; + ctx.timer = timer; + ctx.quit_flag = PJ_FALSE; + + pj_ssl_sock_param_default(¶m); + param.cb.on_accept_complete2 = &mt_on_accept_complete; + param.cb.on_connect_complete = &mt_on_connect_complete; + param.cb.on_data_read = &mt_on_data_read; + param.cb.on_data_sent = &mt_on_data_sent; + param.ioqueue = ioqueue; + param.timer_heap = timer; + param.proto = PJ_SSL_SOCK_PROTO_TLS1_2; + param.ciphers_num = 0; + + { + pj_str_t tmp_st; + pj_sockaddr_init(PJ_AF_INET, &addr, + pj_strset2(&tmp_st, "127.0.0.1"), 0); + } + + /* SERVER */ + state_serv.pool = pool; + state_serv.echo = PJ_TRUE; + state_serv.is_server = PJ_TRUE; + state_serv.accepted_arr = ssock_accepted; + state_serv.accepted_cnt = 0; + param.user_data = &state_serv; + param.require_client_cert = PJ_FALSE; + + listen_addr = addr; + status = ssl_test_create_server(pool, ¶m, "mt_send_load_test", + &ssock_serv, &listen_addr); + if (status != PJ_SUCCESS) + goto on_return; + + /* Start worker threads */ + for (i = 0; i < MT_WORKER_THREADS; i++) { + status = pj_thread_create(pool, "ssl_mt_w", &mt_worker_proc, + &ctx, 0, 0, &threads[i]); + if (status != PJ_SUCCESS) { + app_perror("...mt_send_load_test: thread create", status); + goto on_return; + } + } + + /* Create and connect clients */ + for (i = 0; i < MT_CLIENTS; i++) { + int k; + + state_cli[i].pool = pool; + state_cli[i].is_server = PJ_FALSE; + for (k = 0; k < MT_SEND_PKT_LEN; k++) + state_cli[i].send_data[k] = (char)((i + k) & 0xFF); + + param.user_data = &state_cli[i]; + + status = pj_ssl_sock_create(pool, ¶m, &ssock_cli[i]); + if (status != PJ_SUCCESS) { + app_perror("...mt_send_load_test: client create", status); + goto on_return; + } + + status = pj_ssl_sock_start_connect(ssock_cli[i], pool, &addr, + &listen_addr, + pj_sockaddr_get_len(&addr)); + if (status == PJ_SUCCESS) { + mt_on_connect_complete(ssock_cli[i], PJ_SUCCESS); + } else if (status != PJ_EPENDING) { + app_perror("...mt_send_load_test: connect", status); + goto on_return; + } + } + + /* Main thread also polls */ + { + pj_timestamp t_start, t_now; + pj_uint32_t elapsed; + int all_done; + + pj_get_timestamp(&t_start); + for (;;) { + pj_time_val delay = {0, 100}; + pj_ioqueue_poll(ioqueue, &delay); + pj_timer_heap_poll(timer, NULL); + + all_done = PJ_TRUE; + for (i = 0; i < MT_CLIENTS; i++) { + if (!state_cli[i].done && !state_cli[i].err) { + all_done = PJ_FALSE; + break; + } + } + if (all_done) + break; + + pj_get_timestamp(&t_now); + elapsed = pj_elapsed_msec(&t_start, &t_now); + if (elapsed > 60000) { + PJ_LOG(1, ("", "...mt_send_load_test TIMEOUT after 60s")); + status = PJ_ETIMEDOUT; + goto on_return; + } + } + } + + /* Stop workers */ + ctx.quit_flag = PJ_TRUE; + for (i = 0; i < MT_WORKER_THREADS; i++) { + if (threads[i]) { + pj_thread_join(threads[i]); + pj_thread_destroy(threads[i]); + threads[i] = NULL; + } + } + + /* Verify */ + for (i = 0; i < MT_CLIENTS; i++) { + pj_size_t expected = (pj_size_t)MT_SEND_COUNT * MT_SEND_PKT_LEN; + + if (state_cli[i].err) { + PJ_LOG(1, ("", "...mt_send_load_test: client %d error", i)); + status = state_cli[i].err; + app_perror("...mt_send_load_test: client error", status); + goto on_return; + } + + PJ_LOG(3, ("", "...mt_send_load_test: cli[%d] sent=%lu recv=%lu " + "pending=%d sent_cb=%d", + i, + (unsigned long)state_cli[i].sent, + (unsigned long)state_cli[i].recv, + state_cli[i].pending_cnt, + state_cli[i].sent_cb_cnt)); + + if (state_cli[i].pending_cnt != state_cli[i].sent_cb_cnt) { + PJ_LOG(1, ("", "...mt_send_load_test: cli[%d] pending=%d " + "!= sent_cb=%d", i, + state_cli[i].pending_cnt, + state_cli[i].sent_cb_cnt)); + status = PJ_EBUG; + goto on_return; + } + + if (state_cli[i].sent < expected) { + PJ_LOG(1, ("", "...mt_send_load_test: cli[%d] sent=%lu " + "< expected=%lu", i, + (unsigned long)state_cli[i].sent, + (unsigned long)expected)); + status = PJ_EBUG; + goto on_return; + } + } + + status = PJ_SUCCESS; + +on_return: + ctx.quit_flag = PJ_TRUE; + for (i = 0; i < MT_WORKER_THREADS; i++) { + if (threads[i]) { + pj_thread_join(threads[i]); + pj_thread_destroy(threads[i]); + } + } + + for (i = 0; i < MT_CLIENTS; i++) { + if (ssock_cli[i]) + pj_ssl_sock_close(ssock_cli[i]); + } + for (i = 0; i < state_serv.accepted_cnt; i++) { + if (ssock_accepted[i]) + pj_ssl_sock_close(ssock_accepted[i]); + } + if (ssock_serv) + pj_ssl_sock_close(ssock_serv); + + /* Poll to drain pending events after close */ + if (ioqueue) { + pj_time_val delay = {0, 500}; + int n = 50; + while (n-- > 0 && pj_ioqueue_poll(ioqueue, &delay) > 0) + ; + } + + if (timer) + pj_timer_heap_destroy(timer); + if (ioqueue) + pj_ioqueue_destroy(ioqueue); + if (pool) + pj_pool_release(pool); + + return (status == PJ_SUCCESS) ? 0 : -1; +} + +#endif /* PJ_HAS_THREADS */ + + +int ssl_sock_stress_test(void) +{ + int ret; + + PJ_LOG(3,("", "..send load test")); + ret = send_load_test(); + if (ret != 0) + return ret; + + PJ_LOG(3,("", "..close under pending sends test")); + ret = close_pending_test(); + if (ret != 0) + return ret; + + PJ_LOG(3,("", "..bidirectional simultaneous load test")); + ret = bidir_test(); + if (ret != 0) + return ret; + +#if PJ_HAS_THREADS + PJ_LOG(3,("", "..multi-threaded send load test")); + ret = mt_send_load_test(); + if (ret != 0) + return ret; +#endif + + return 0; +} + +#else /* INCLUDE_SSLSOCK_TEST */ +int dummy_ssl_sock_stress_test; +#endif /* INCLUDE_SSLSOCK_TEST */ diff --git a/pjlib/src/pjlib-test/ssl_sock_test.h b/pjlib/src/pjlib-test/ssl_sock_test.h new file mode 100644 index 0000000000..cd2a41acbd --- /dev/null +++ b/pjlib/src/pjlib-test/ssl_sock_test.h @@ -0,0 +1,109 @@ +/* + * Shared definitions for SSL socket tests. + */ +#ifndef __SSL_SOCK_TEST_H__ +#define __SSL_SOCK_TEST_H__ + +#include "test.h" +#include + +#define CERT_DIR "../build/" +#if (PJ_SSL_SOCK_IMP == PJ_SSL_SOCK_IMP_DARWIN) || \ + (PJ_SSL_SOCK_IMP == PJ_SSL_SOCK_IMP_APPLE) +# define CERT_CA_FILE CERT_DIR "cacert.der" +#else +# define CERT_CA_FILE CERT_DIR "cacert.pem" +#endif +#define CERT_FILE CERT_DIR "cacert.pem" +#define CERT_PRIVKEY_FILE CERT_DIR "privkey.pem" +#define CERT_PRIVKEY_PASS "privkeypass" + +#define TEST_LOAD_FROM_FILES 1 + + +#if INCLUDE_SSLSOCK_TEST + +/* + * Load server certificate. Returns PJ_SUCCESS, or PJ_SUCCESS with + * cert==NULL if backend doesn't support file-based certs (Schannel). + */ +PJ_INLINE(pj_status_t) ssl_test_load_cert(pj_pool_t *pool, + const char *caller, + pj_ssl_cert_t **p_cert) +{ +#if TEST_LOAD_FROM_FILES + pj_str_t ca = pj_str((char *)CERT_CA_FILE); + pj_str_t crt = pj_str((char *)CERT_FILE); + pj_str_t key = pj_str((char *)CERT_PRIVKEY_FILE); + pj_str_t pass = pj_str((char *)CERT_PRIVKEY_PASS); + pj_status_t status; + + status = pj_ssl_cert_load_from_files(pool, &ca, &crt, &key, + &pass, p_cert); + if (status == PJ_ENOTSUP) { + PJ_LOG(3, ("", "...%s: cert load not supported, skipping", + caller)); + *p_cert = NULL; + return PJ_SUCCESS; + } + if (status != PJ_SUCCESS) { + app_perror(caller, status); + return status; + } + return PJ_SUCCESS; +#else + PJ_UNUSED_ARG(pool); + PJ_UNUSED_ARG(caller); + *p_cert = NULL; + return PJ_SUCCESS; +#endif +} + +/* + * Setup SSL server socket: create, load cert, start accept on 127.0.0.1 + * with random port. Returns listen address in *listen_addr. + */ +PJ_INLINE(pj_status_t) ssl_test_create_server( + pj_pool_t *pool, + pj_ssl_sock_param *param, + const char *caller, + pj_ssl_sock_t **p_ssock, + pj_sockaddr *listen_addr) +{ + pj_ssl_cert_t *cert = NULL; + pj_ssl_sock_info info; + pj_status_t status; + + status = pj_ssl_sock_create(pool, param, p_ssock); + if (status != PJ_SUCCESS) { + app_perror(caller, status); + return status; + } + + status = ssl_test_load_cert(pool, caller, &cert); + if (status != PJ_SUCCESS) + return status; + + if (cert) { + status = pj_ssl_sock_set_certificate(*p_ssock, pool, cert); + if (status != PJ_SUCCESS) { + app_perror(caller, status); + return status; + } + } + + status = pj_ssl_sock_start_accept(*p_ssock, pool, listen_addr, + pj_sockaddr_get_len(listen_addr)); + if (status != PJ_SUCCESS) { + app_perror(caller, status); + return status; + } + + pj_ssl_sock_get_info(*p_ssock, &info); + *listen_addr = info.local_addr; + return PJ_SUCCESS; +} + +#endif /* INCLUDE_SSLSOCK_TEST */ + +#endif /* __SSL_SOCK_TEST_H__ */ diff --git a/pjlib/src/pjlib-test/test.c b/pjlib/src/pjlib-test/test.c index b0813d7bcd..8e932a34b8 100644 --- a/pjlib/src/pjlib-test/test.c +++ b/pjlib/src/pjlib-test/test.c @@ -350,6 +350,7 @@ static int features_tests(int argc, char *argv[]) #if INCLUDE_SSLSOCK_TEST UT_ADD_TEST(&test_app.ut_app, ssl_sock_test, 0); + UT_ADD_TEST(&test_app.ut_app, ssl_sock_stress_test, 0); #endif #if INCLUDE_IOCP_UNREG_TEST diff --git a/pjlib/src/pjlib-test/test.h b/pjlib/src/pjlib-test/test.h index 0721ecfd9f..e42a1c6440 100644 --- a/pjlib/src/pjlib-test/test.h +++ b/pjlib/src/pjlib-test/test.h @@ -143,6 +143,7 @@ extern int iocp_unregister_test(void); extern int activesock_test(void); extern int file_test(void); extern int ssl_sock_test(void); +extern int ssl_sock_stress_test(void); extern int unittest_basic_test(void); extern int unittest_parallel_test(void); extern int unittest_test(void); diff --git a/tests/sanitizers/lsan.supp b/tests/sanitizers/lsan.supp index c3e9170881..10885c3b6c 100644 --- a/tests/sanitizers/lsan.supp +++ b/tests/sanitizers/lsan.supp @@ -1,3 +1,5 @@ leak:ASN1_STRING_set leak:PEM_ASN1_read_bio leak:libcrypto.so +leak:libgnutls.so +leak:libtasn1.so