diff --git a/klippy/chelper/serialqueue.c b/klippy/chelper/serialqueue.c index 78115e09..bee39837 100644 --- a/klippy/chelper/serialqueue.c +++ b/klippy/chelper/serialqueue.c @@ -64,6 +64,9 @@ struct serialqueue { struct list_head notify_queue; // Received messages struct list_head receive_queue; + // Fastreader support + pthread_mutex_t fast_reader_dispatch_lock; + struct list_head fast_readers; // Debugging struct list_head old_sent, old_receive; // Stats @@ -195,9 +198,11 @@ update_receive_seq(struct serialqueue *sq, double eventtime, uint64_t rseq) } // Process a well formed input message -static int +static void handle_message(struct serialqueue *sq, double eventtime, int len) { + pthread_mutex_lock(&sq->lock); + // Calculate receive sequence number uint64_t rseq = ((sq->receive_seq & ~MESSAGE_SEQ_MASK) | (sq->input_buf[MESSAGE_POS_SEQ] & MESSAGE_SEQ_MASK)); @@ -205,11 +210,15 @@ handle_message(struct serialqueue *sq, double eventtime, int len) // New sequence number if (rseq < sq->receive_seq) rseq += MESSAGE_SEQ_MASK+1; - if (rseq > sq->send_seq && sq->receive_seq != 1) + if (rseq > sq->send_seq && sq->receive_seq != 1) { // An ack for a message not sent? Out of order message? - return -1; + sq->bytes_invalid += len; + pthread_mutex_unlock(&sq->lock); + return; + } update_receive_seq(sq, eventtime, rseq); } + sq->bytes_read += len; // Check for pending messages on notify_queue int must_wake = 0; @@ -247,9 +256,26 @@ handle_message(struct serialqueue *sq, double eventtime, int len) must_wake = 1; } + // Check fast readers + struct fastreader *fr; + list_for_each_entry(fr, &sq->fast_readers, node) { + if (len < fr->prefix_len + MESSAGE_MIN + || memcmp(&sq->input_buf[MESSAGE_HEADER_SIZE] + , fr->prefix, fr->prefix_len) != 0) + continue; + // Release main lock and invoke callback + pthread_mutex_lock(&sq->fast_reader_dispatch_lock); + if (must_wake) + check_wake_receive(sq); + pthread_mutex_unlock(&sq->lock); + fr->func(fr, sq->input_buf, len); + pthread_mutex_unlock(&sq->fast_reader_dispatch_lock); + return; + } + if (must_wake) check_wake_receive(sq); - return 0; + pthread_mutex_unlock(&sq->lock); } // Callback for input activity on the serial fd @@ -288,13 +314,7 @@ input_event(struct serialqueue *sq, double eventtime) return; if (len > 0) { // Received a valid message - pthread_mutex_lock(&sq->lock); - int ret = handle_message(sq, eventtime, len); - if (ret) - sq->bytes_invalid += len; - else - sq->bytes_read += len; - pthread_mutex_unlock(&sq->lock); + handle_message(sq, eventtime, len); } else { // Skip bad data at beginning of input len = -len; @@ -614,6 +634,7 @@ serialqueue_alloc(int serial_fd, char serial_fd_type, int client_id) list_init(&sq->sent_queue); list_init(&sq->receive_queue); list_init(&sq->notify_queue); + list_init(&sq->fast_readers); // Debugging list_init(&sq->old_sent); @@ -626,6 +647,9 @@ serialqueue_alloc(int serial_fd, char serial_fd_type, int client_id) if (ret) goto fail; ret = pthread_cond_init(&sq->cond, NULL); + if (ret) + goto fail; + ret = pthread_mutex_init(&sq->fast_reader_dispatch_lock, NULL); if (ret) goto fail; ret = pthread_create(&sq->tid, NULL, background_thread, sq); @@ -700,6 +724,27 @@ serialqueue_free_commandqueue(struct command_queue *cq) free(cq); } +// Add a low-latency message handler +void +serialqueue_add_fastreader(struct serialqueue *sq, struct fastreader *fr) +{ + pthread_mutex_lock(&sq->lock); + list_add_tail(&fr->node, &sq->fast_readers); + pthread_mutex_unlock(&sq->lock); +} + +// Remove a previously registered low-latency message handler +void +serialqueue_rm_fastreader(struct serialqueue *sq, struct fastreader *fr) +{ + pthread_mutex_lock(&sq->lock); + list_del(&fr->node); + pthread_mutex_unlock(&sq->lock); + + pthread_mutex_lock(&sq->fast_reader_dispatch_lock); // XXX - goofy locking + pthread_mutex_unlock(&sq->fast_reader_dispatch_lock); +} + // Add a batch of messages to the given command_queue void serialqueue_send_batch(struct serialqueue *sq, struct command_queue *cq diff --git a/klippy/chelper/serialqueue.h b/klippy/chelper/serialqueue.h index 62e70403..9e66e7f5 100644 --- a/klippy/chelper/serialqueue.h +++ b/klippy/chelper/serialqueue.h @@ -1,12 +1,23 @@ #ifndef SERIALQUEUE_H #define SERIALQUEUE_H +#include // uint8_t #include "list.h" // struct list_head #include "msgblock.h" // MESSAGE_MAX #define MAX_CLOCK 0x7fffffffffffffffLL #define BACKGROUND_PRIORITY_CLOCK 0x7fffffff00000000LL +struct fastreader; +typedef void (*fastreader_cb)(struct fastreader *fr, uint8_t *data, int len); + +struct fastreader { + struct list_node node; + fastreader_cb func; + int prefix_len; + uint8_t prefix[MESSAGE_MAX]; +}; + struct pull_queue_message { uint8_t msg[MESSAGE_MAX]; int len; @@ -21,6 +32,8 @@ void serialqueue_exit(struct serialqueue *sq); void serialqueue_free(struct serialqueue *sq); struct command_queue *serialqueue_alloc_commandqueue(void); void serialqueue_free_commandqueue(struct command_queue *cq); +void serialqueue_add_fastreader(struct serialqueue *sq, struct fastreader *fr); +void serialqueue_rm_fastreader(struct serialqueue *sq, struct fastreader *fr); void serialqueue_send_batch(struct serialqueue *sq, struct command_queue *cq , struct list_head *msgs); void serialqueue_send(struct serialqueue *sq, struct command_queue *cq