diff --git a/klippy/chelper/serialqueue.c b/klippy/chelper/serialqueue.c index 5f205858adff..d304335547db 100644 --- a/klippy/chelper/serialqueue.c +++ b/klippy/chelper/serialqueue.c @@ -1,6 +1,6 @@ // Serial port command queuing // -// Copyright (C) 2016-2021 Kevin O'Connor +// Copyright (C) 2016-2025 Kevin O'Connor // // This file may be distributed under the terms of the GNU GPLv3 license. @@ -38,20 +38,36 @@ struct command_queue { struct message_sub_queue ready, upcoming; }; +struct receiver { + pthread_mutex_t lock; + pthread_cond_t cond; + int waiting; + struct list_head queue; + struct list_head old_receive; +}; + +struct transmit_requests { + int pipe_fds[2]; + pthread_mutex_t lock; // protects variables below + struct list_head upcoming_queues; + int upcoming_bytes; + uint64_t need_kick_clock, min_release_clock; +}; + struct serialqueue { // Input reading struct pollreactor *pr; int serial_fd, serial_fd_type, client_id; - int pipe_fds[2]; uint8_t input_buf[4096]; uint8_t need_sync; int input_pos; + // Multi-threaded support for pushing and pulling messages + struct receiver receiver; + struct transmit_requests transmit_requests; // Threading char name[16]; pthread_t tid; pthread_mutex_t lock; // protects variables below - pthread_cond_t cond; - int receive_waiting; // Baud / clock tracking int receive_window; double bittime_adjust, idle_time; @@ -63,20 +79,15 @@ struct serialqueue { struct list_head sent_queue; double srtt, rttvar, rto; // Pending transmission message queues - struct list_head upcoming_queues; - int upcoming_bytes; struct list_head ready_queues; int ready_bytes, need_ack_bytes, last_ack_bytes; - uint64_t need_kick_clock; struct list_head notify_queue; double last_write_fail_time; - // Received messages - struct list_head receive_queue; // Fastreader support pthread_mutex_t fast_reader_dispatch_lock; struct list_head fast_readers; // Debugging - struct list_head old_sent, old_receive; + struct list_head old_sent; // Stats uint32_t bytes_write, bytes_read, bytes_retransmit, bytes_invalid; }; @@ -115,31 +126,44 @@ debug_queue_alloc(struct list_head *root, int count) } // Copy a message to a debug queue and free old debug messages -static void -debug_queue_add(struct list_head *root, struct queue_message *qm) +static struct queue_message * +_debug_queue_add(struct list_head *root, struct queue_message *qm) { list_add_tail(&qm->node, root); struct queue_message *old = list_first_entry( root, struct queue_message, node); list_del(&old->node); + return old; +} + +static void +debug_queue_add(struct list_head *root, struct queue_message *qm) +{ + struct queue_message *old = _debug_queue_add(root, qm); message_free(old); } -// Wake up the receiver thread if it is waiting +// Add messages and wake up the receiver thread if it is waiting static void -check_wake_receive(struct serialqueue *sq) +receive_append_wake(struct receiver *receiver, struct list_head *msgs) { - if (sq->receive_waiting) { - sq->receive_waiting = 0; - pthread_cond_signal(&sq->cond); + int dokick = 0; + pthread_mutex_lock(&receiver->lock); + list_join_tail(msgs, &receiver->queue); + if (receiver->waiting) { + receiver->waiting = 0; + dokick = 1; } + pthread_mutex_unlock(&receiver->lock); + if (dokick) + pthread_cond_signal(&receiver->cond); } // Write to the internal pipe to wake the background thread if in poll static void kick_bg_thread(struct serialqueue *sq) { - int ret = write(sq->pipe_fds[1], ".", 1); + int ret = write(sq->transmit_requests.pipe_fds[1], ".", 1); if (ret < 0) report_errno("pipe write", ret); } @@ -245,7 +269,8 @@ handle_message(struct serialqueue *sq, double eventtime, int len) sq->bytes_read += len; // Check for pending messages on notify_queue - int must_wake = 0; + struct list_head received; + list_init(&received); while (!list_empty(&sq->notify_queue)) { struct queue_message *qm = list_first_entry( &sq->notify_queue, struct queue_message, node); @@ -257,8 +282,7 @@ handle_message(struct serialqueue *sq, double eventtime, int len) qm->len = 0; qm->sent_time = sq->last_receive_sent_time; qm->receive_time = eventtime; - list_add_tail(&qm->node, &sq->receive_queue); - must_wake = 1; + list_add_tail(&qm->node, &received); } // Process message @@ -276,10 +300,12 @@ handle_message(struct serialqueue *sq, double eventtime, int len) ? sq->last_receive_sent_time : 0.); qm->receive_time = get_monotonic(); // must be time post read() qm->receive_time -= calculate_bittime(sq, len); - list_add_tail(&qm->node, &sq->receive_queue); - must_wake = 1; + list_add_tail(&qm->node, &received); } + if (!list_empty(&received)) + receive_append_wake(&sq->receiver, &received); + // Check fast readers struct fastreader *fr; list_for_each_entry(fr, &sq->fast_readers, node) { @@ -289,16 +315,11 @@ handle_message(struct serialqueue *sq, double eventtime, int len) continue; // Release main lock and invoke callback pthread_mutex_lock(&sq->fast_reader_dispatch_lock); - if (must_wake) - check_wake_receive(sq); pthread_mutex_unlock(&sq->lock); fr->func(fr, sq->input_buf, len); pthread_mutex_unlock(&sq->fast_reader_dispatch_lock); return; } - - if (must_wake) - check_wake_receive(sq); pthread_mutex_unlock(&sq->lock); } @@ -357,7 +378,7 @@ static void kick_event(struct serialqueue *sq, double eventtime) { char dummy[4096]; - int ret = read(sq->pipe_fds[0], dummy, sizeof(dummy)); + int ret = read(sq->transmit_requests.pipe_fds[0], dummy, sizeof(dummy)); if (ret < 0) report_errno("pipe read", ret); pollreactor_update_timer(sq->pr, SQPT_COMMAND, PR_NOW); @@ -512,30 +533,22 @@ build_and_send_command(struct serialqueue *sq, uint8_t *buf, int pending return len; } -// Determine the time the next serial data should be sent -static double -check_send_command(struct serialqueue *sq, int pending, double eventtime) +// Move messages from upcoming queues to ready queues +static uint64_t +check_upcoming_queues(struct serialqueue *sq, uint64_t ack_clock) { - if (sq->send_seq - sq->receive_seq >= MAX_PENDING_BLOCKS - && sq->receive_seq != (uint64_t)-1) - // Need an ack before more messages can be sent - return PR_NEVER; - if (sq->send_seq > sq->receive_seq && sq->receive_window) { - int need_ack_bytes = sq->need_ack_bytes + MESSAGE_MAX; - if (sq->last_ack_seq < sq->receive_seq) - need_ack_bytes += sq->last_ack_bytes; - if (need_ack_bytes > sq->receive_window) - // Wait for ack from past messages before sending next message - return PR_NEVER; + pthread_mutex_lock(&sq->transmit_requests.lock); + sq->transmit_requests.need_kick_clock = 0; + uint64_t min_release_clock = sq->transmit_requests.min_release_clock; + if (ack_clock < min_release_clock) { + pthread_mutex_unlock(&sq->transmit_requests.lock); + return min_release_clock; } - // Check for stalled messages now ready - double idletime = eventtime > sq->idle_time ? eventtime : sq->idle_time; - idletime += calculate_bittime(sq, pending + MESSAGE_MIN); - uint64_t ack_clock = clock_from_time(&sq->ce, idletime); - uint64_t min_stalled_clock = MAX_CLOCK, min_ready_clock = MAX_CLOCK; + uint64_t min_stalled_clock = MAX_CLOCK; struct command_queue *cq, *_ncq; - list_for_each_entry_safe(cq, _ncq, &sq->upcoming_queues, upcoming.node) { + list_for_each_entry_safe(cq, _ncq, &sq->transmit_requests.upcoming_queues, + upcoming.node) { int not_in_ready_queues = list_empty(&cq->ready.msg_queue); // Move messages from the upcoming.msg_queue to the ready.msg_queue struct queue_message *qm, *_nqm; @@ -547,7 +560,7 @@ check_send_command(struct serialqueue *sq, int pending, double eventtime) } list_del(&qm->node); list_add_tail(&qm->node, &cq->ready.msg_queue); - sq->upcoming_bytes -= qm->len; + sq->transmit_requests.upcoming_bytes -= qm->len; sq->ready_bytes += qm->len; } // Remove cq from the list if it is now empty @@ -557,7 +570,65 @@ check_send_command(struct serialqueue *sq, int pending, double eventtime) if (not_in_ready_queues && !list_empty(&cq->ready.msg_queue)) list_add_tail(&cq->ready.node, &sq->ready_queues); } + sq->transmit_requests.min_release_clock = min_stalled_clock; + pthread_mutex_unlock(&sq->transmit_requests.lock); + return min_stalled_clock; +} + +// Set the next transmit queue need_kick_clock +static int +update_need_kick_clock(struct serialqueue *sq, uint64_t wantclock) +{ + pthread_mutex_lock(&sq->transmit_requests.lock); + if (wantclock > sq->transmit_requests.min_release_clock) { + pthread_mutex_unlock(&sq->transmit_requests.lock); + return -1; + } + sq->transmit_requests.need_kick_clock = wantclock; + pthread_mutex_unlock(&sq->transmit_requests.lock); + return 0; +} + +// Determine if ready to send commands (or the amount of time to sleep if not) +static double +check_send_command(struct serialqueue *sq, int pending, double eventtime) +{ + // Check for upcoming messages now ready + double idletime = eventtime > sq->idle_time ? eventtime : sq->idle_time; + idletime += calculate_bittime(sq, pending + MESSAGE_MIN); + uint64_t ack_clock = clock_from_time(&sq->ce, idletime); + uint64_t min_stalled_clock = check_upcoming_queues(sq, ack_clock); + + // Check if valid to send messages + if (sq->send_seq - sq->receive_seq >= MAX_PENDING_BLOCKS + && sq->receive_seq != (uint64_t)-1) + // Need an ack before more messages can be sent + return eventtime + 0.250; + if (sq->send_seq > sq->receive_seq && sq->receive_window) { + int need_ack_bytes = sq->need_ack_bytes + MESSAGE_MAX; + if (sq->last_ack_seq < sq->receive_seq) + need_ack_bytes += sq->last_ack_bytes; + if (need_ack_bytes > sq->receive_window) + // Wait for ack from past messages before sending next message + return eventtime + 0.250; + } + + // Check if a block is fully ready to send + if (sq->ready_bytes >= MESSAGE_PAYLOAD_MAX) + return PR_NOW; + if (! sq->ce.est_freq) { + // Clock unknown during initial startup - recheck on each add + if (sq->ready_bytes) + return PR_NOW; + int mustwake = update_need_kick_clock(sq, 1); + if (mustwake) + return eventtime; + return PR_NEVER; + } + // Check if it is still needed to send messages from the ready_queues + uint64_t min_ready_clock = MAX_CLOCK; + struct command_queue *cq; list_for_each_entry(cq, &sq->ready_queues, ready.node) { // Update min_ready_clock struct queue_message *qm = list_first_entry( @@ -570,23 +641,21 @@ check_send_command(struct serialqueue *sq, int pending, double eventtime) if (req_clock < min_ready_clock) min_ready_clock = req_clock; } - - // Check for messages to send - if (sq->ready_bytes >= MESSAGE_PAYLOAD_MAX) - return PR_NOW; - if (! sq->ce.est_freq) { - if (sq->ready_bytes) - return PR_NOW; - sq->need_kick_clock = MAX_CLOCK; - return PR_NEVER; - } uint64_t reqclock_delta = MIN_REQTIME_DELTA * sq->ce.est_freq; if (min_ready_clock <= ack_clock + reqclock_delta) return PR_NOW; + + // Determine next wakeup time + if (pending) + // Caller wont sleep anyway - just return + return eventtime; uint64_t wantclock = min_ready_clock - reqclock_delta; if (min_stalled_clock < wantclock) wantclock = min_stalled_clock; - sq->need_kick_clock = wantclock; + int mustwake = update_need_kick_clock(sq, wantclock); + if (mustwake) + // Raced with add of new command - avoid sleeping + return eventtime; return idletime + (wantclock - ack_clock) / sq->ce.est_freq; } @@ -600,20 +669,19 @@ command_event(struct serialqueue *sq, double eventtime) double waketime; for (;;) { waketime = check_send_command(sq, buflen, eventtime); - if (waketime != PR_NOW || buflen + MESSAGE_MAX > sizeof(buf)) { - if (buflen) { - // Write message blocks - do_write(sq, buf, buflen); - sq->bytes_write += buflen; - double idletime = (eventtime > sq->idle_time - ? eventtime : sq->idle_time); - sq->idle_time = idletime + calculate_bittime(sq, buflen); - buflen = 0; - } - if (waketime != PR_NOW) - break; - } + if (waketime != PR_NOW) + break; buflen += build_and_send_command(sq, &buf[buflen], buflen, eventtime); + if (buflen + MESSAGE_MAX > sizeof(buf)) + break; + } + if (buflen) { + // Write message blocks + do_write(sq, buf, buflen); + sq->bytes_write += buflen; + double idletime = eventtime > sq->idle_time ? eventtime : sq->idle_time; + sq->idle_time = idletime + calculate_bittime(sq, buflen); + waketime = PR_NOW; } pthread_mutex_unlock(&sq->lock); return waketime; @@ -627,9 +695,10 @@ background_thread(void *data) set_thread_name(sq->name); pollreactor_run(sq->pr); - pthread_mutex_lock(&sq->lock); - check_wake_receive(sq); - pthread_mutex_unlock(&sq->lock); + // Wake any waiting receivers + struct list_head dummy; + list_init(&dummy); + receive_append_wake(&sq->receiver, &dummy); return NULL; } @@ -647,7 +716,7 @@ serialqueue_alloc(int serial_fd, char serial_fd_type, int client_id strncpy(sq->name, name, sizeof(sq->name)); sq->name[sizeof(sq->name)-1] = '\0'; - int ret = pipe(sq->pipe_fds); + int ret = pipe(sq->transmit_requests.pipe_fds); if (ret) goto fail; @@ -655,12 +724,13 @@ serialqueue_alloc(int serial_fd, char serial_fd_type, int client_id sq->pr = pollreactor_alloc(SQPF_NUM, SQPT_NUM, sq); pollreactor_add_fd(sq->pr, SQPF_SERIAL, serial_fd, input_event , serial_fd_type==SQT_DEBUGFILE); - pollreactor_add_fd(sq->pr, SQPF_PIPE, sq->pipe_fds[0], kick_event, 0); + pollreactor_add_fd(sq->pr, SQPF_PIPE, sq->transmit_requests.pipe_fds[0] + , kick_event, 0); pollreactor_add_timer(sq->pr, SQPT_RETRANSMIT, retransmit_event); pollreactor_add_timer(sq->pr, SQPT_COMMAND, command_event); fd_set_non_blocking(serial_fd); - fd_set_non_blocking(sq->pipe_fds[0]); - fd_set_non_blocking(sq->pipe_fds[1]); + fd_set_non_blocking(sq->transmit_requests.pipe_fds[0]); + fd_set_non_blocking(sq->transmit_requests.pipe_fds[1]); // Retransmit setup sq->send_seq = 1; @@ -674,25 +744,30 @@ serialqueue_alloc(int serial_fd, char serial_fd_type, int client_id } // Queues - sq->need_kick_clock = MAX_CLOCK; - list_init(&sq->upcoming_queues); + sq->transmit_requests.need_kick_clock = MAX_CLOCK; + sq->transmit_requests.min_release_clock = MAX_CLOCK; + list_init(&sq->transmit_requests.upcoming_queues); + pthread_mutex_init(&sq->transmit_requests.lock, NULL); list_init(&sq->ready_queues); list_init(&sq->sent_queue); - list_init(&sq->receive_queue); + list_init(&sq->receiver.queue); list_init(&sq->notify_queue); list_init(&sq->fast_readers); // Debugging list_init(&sq->old_sent); - list_init(&sq->old_receive); + list_init(&sq->receiver.old_receive); debug_queue_alloc(&sq->old_sent, DEBUG_QUEUE_SENT); - debug_queue_alloc(&sq->old_receive, DEBUG_QUEUE_RECEIVE); + debug_queue_alloc(&sq->receiver.old_receive, DEBUG_QUEUE_RECEIVE); // Thread setup ret = pthread_mutex_init(&sq->lock, NULL); if (ret) goto fail; - ret = pthread_cond_init(&sq->cond, NULL); + ret = pthread_mutex_init(&sq->receiver.lock, NULL); + if (ret) + goto fail; + ret = pthread_cond_init(&sq->receiver.cond, NULL); if (ret) goto fail; ret = pthread_mutex_init(&sq->fast_reader_dispatch_lock, NULL); @@ -730,22 +805,27 @@ serialqueue_free(struct serialqueue *sq) serialqueue_exit(sq); pthread_mutex_lock(&sq->lock); message_queue_free(&sq->sent_queue); - message_queue_free(&sq->receive_queue); + pthread_mutex_lock(&sq->receiver.lock); + message_queue_free(&sq->receiver.queue); + message_queue_free(&sq->receiver.old_receive); + pthread_mutex_unlock(&sq->receiver.lock); message_queue_free(&sq->notify_queue); message_queue_free(&sq->old_sent); - message_queue_free(&sq->old_receive); while (!list_empty(&sq->ready_queues)) { struct command_queue* cq = list_first_entry( &sq->ready_queues, struct command_queue, ready.node); list_del(&cq->ready.node); message_queue_free(&cq->ready.msg_queue); } - while (!list_empty(&sq->upcoming_queues)) { + pthread_mutex_lock(&sq->transmit_requests.lock); + while (!list_empty(&sq->transmit_requests.upcoming_queues)) { struct command_queue *cq = list_first_entry( - &sq->upcoming_queues, struct command_queue, upcoming.node); + &sq->transmit_requests.upcoming_queues, + struct command_queue, upcoming.node); list_del(&cq->upcoming.node); message_queue_free(&cq->upcoming.msg_queue); } + pthread_mutex_unlock(&sq->transmit_requests.lock); pthread_mutex_unlock(&sq->lock); pollreactor_free(sq->pr); free(sq); @@ -814,19 +894,24 @@ serialqueue_send_batch(struct serialqueue *sq, struct command_queue *cq if (! len) return; qm = list_first_entry(msgs, struct queue_message, node); + uint64_t min_clock = qm->min_clock; // Add list to cq->upcoming_queue - pthread_mutex_lock(&sq->lock); - if (list_empty(&cq->upcoming.msg_queue)) - list_add_tail(&cq->upcoming.node, &sq->upcoming_queues); - list_join_tail(msgs, &cq->upcoming.msg_queue); - sq->upcoming_bytes += len; int mustwake = 0; - if (qm->min_clock < sq->need_kick_clock) { - sq->need_kick_clock = 0; - mustwake = 1; + pthread_mutex_lock(&sq->transmit_requests.lock); + if (list_empty(&cq->upcoming.msg_queue)) { + list_add_tail(&cq->upcoming.node, + &sq->transmit_requests.upcoming_queues); + if (min_clock < sq->transmit_requests.min_release_clock) + sq->transmit_requests.min_release_clock = min_clock; + if (min_clock < sq->transmit_requests.need_kick_clock) { + sq->transmit_requests.need_kick_clock = 0; + mustwake = 1; + } } - pthread_mutex_unlock(&sq->lock); + list_join_tail(msgs, &cq->upcoming.msg_queue); + sq->transmit_requests.upcoming_bytes += len; + pthread_mutex_unlock(&sq->transmit_requests.lock); // Wake the background thread if necessary if (mustwake) @@ -863,20 +948,21 @@ serialqueue_send(struct serialqueue *sq, struct command_queue *cq, uint8_t *msg void __visible serialqueue_pull(struct serialqueue *sq, struct pull_queue_message *pqm) { - pthread_mutex_lock(&sq->lock); + struct receiver *receiver = &sq->receiver; + pthread_mutex_lock(&receiver->lock); // Wait for message to be available - while (list_empty(&sq->receive_queue)) { + while (list_empty(&receiver->queue)) { if (pollreactor_is_exit(sq->pr)) goto exit; - sq->receive_waiting = 1; - int ret = pthread_cond_wait(&sq->cond, &sq->lock); + receiver->waiting = 1; + int ret = pthread_cond_wait(&receiver->cond, &receiver->lock); if (ret) report_errno("pthread_cond_wait", ret); } // Remove message from queue struct queue_message *qm = list_first_entry( - &sq->receive_queue, struct queue_message, node); + &receiver->queue, struct queue_message, node); list_del(&qm->node); // Copy message @@ -886,16 +972,14 @@ serialqueue_pull(struct serialqueue *sq, struct pull_queue_message *pqm) pqm->receive_time = qm->receive_time; pqm->notify_id = qm->notify_id; if (qm->len) - debug_queue_add(&sq->old_receive, qm); - else - message_free(qm); - - pthread_mutex_unlock(&sq->lock); + qm = _debug_queue_add(&receiver->old_receive, qm); + pthread_mutex_unlock(&receiver->lock); + message_free(qm); return; exit: pqm->len = -1; - pthread_mutex_unlock(&sq->lock); + pthread_mutex_unlock(&receiver->lock); } void __visible @@ -946,7 +1030,9 @@ serialqueue_get_stats(struct serialqueue *sq, char *buf, int len) { struct serialqueue stats; pthread_mutex_lock(&sq->lock); + pthread_mutex_lock(&sq->transmit_requests.lock); memcpy(&stats, sq, sizeof(stats)); + pthread_mutex_unlock(&sq->transmit_requests.lock); pthread_mutex_unlock(&sq->lock); snprintf(buf, len, "bytes_write=%u bytes_read=%u" @@ -959,7 +1045,7 @@ serialqueue_get_stats(struct serialqueue *sq, char *buf, int len) , (int)stats.send_seq, (int)stats.receive_seq , (int)stats.retransmit_seq , stats.srtt, stats.rttvar, stats.rto - , stats.ready_bytes, stats.upcoming_bytes); + , stats.ready_bytes, stats.transmit_requests.upcoming_bytes); } // Extract old messages stored in the debug queues @@ -968,18 +1054,25 @@ serialqueue_extract_old(struct serialqueue *sq, int sentq , struct pull_queue_message *q, int max) { int count = sentq ? DEBUG_QUEUE_SENT : DEBUG_QUEUE_RECEIVE; - struct list_head *rootp = sentq ? &sq->old_sent : &sq->old_receive; struct list_head replacement, current; list_init(&replacement); debug_queue_alloc(&replacement, count); list_init(¤t); // Atomically replace existing debug list with new zero'd list - pthread_mutex_lock(&sq->lock); - list_join_tail(rootp, ¤t); - list_init(rootp); - list_join_tail(&replacement, rootp); - pthread_mutex_unlock(&sq->lock); + if (sentq) { + pthread_mutex_lock(&sq->lock); + list_join_tail(&sq->old_sent, ¤t); + list_init(&sq->old_sent); + list_join_tail(&replacement, &sq->old_sent); + pthread_mutex_unlock(&sq->lock); + } else { + pthread_mutex_lock(&sq->receiver.lock); + list_join_tail(&sq->receiver.old_receive, ¤t); + list_init(&sq->receiver.old_receive); + list_join_tail(&replacement, &sq->receiver.old_receive); + pthread_mutex_unlock(&sq->receiver.lock); + } // Walk the debug list int pos = 0;