From 8b4ad34e22245694fad76e4856afd37f7478b3d6 Mon Sep 17 00:00:00 2001 From: Kevin O'Connor Date: Sun, 7 Feb 2021 16:03:39 -0500 Subject: [PATCH] serialqueue: Support sending messages over a CAN bus Signed-off-by: Kevin O'Connor --- klippy/chelper/__init__.py | 3 +- klippy/chelper/serialqueue.c | 97 ++++++++++++++++++++++++++---------- klippy/chelper/serialqueue.h | 3 +- klippy/serialhdl.py | 7 +-- 4 files changed, 80 insertions(+), 30 deletions(-) diff --git a/klippy/chelper/__init__.py b/klippy/chelper/__init__.py index 76d5c4b4..11808aef 100644 --- a/klippy/chelper/__init__.py +++ b/klippy/chelper/__init__.py @@ -141,7 +141,8 @@ defs_serialqueue = """ uint64_t notify_id; }; - struct serialqueue *serialqueue_alloc(int serial_fd, int write_only); + struct serialqueue *serialqueue_alloc(int serial_fd, char serial_fd_type + , int client_id); void serialqueue_exit(struct serialqueue *sq); void serialqueue_free(struct serialqueue *sq); struct command_queue *serialqueue_alloc_commandqueue(void); diff --git a/klippy/chelper/serialqueue.c b/klippy/chelper/serialqueue.c index 3684a721..77d547bb 100644 --- a/klippy/chelper/serialqueue.c +++ b/klippy/chelper/serialqueue.c @@ -13,6 +13,7 @@ // background thread is launched to do this work and minimize latency. #include // fcntl +#include // // struct can_frame #include // ceil #include // poll #include // pthread_mutex_lock @@ -354,7 +355,7 @@ message_queue_free(struct list_head *root) struct serialqueue { // Input reading struct pollreactor pr; - int serial_fd; + int serial_fd, serial_fd_type, client_id; int pipe_fds[2]; uint8_t input_buf[4096]; uint8_t need_sync; @@ -396,6 +397,10 @@ struct serialqueue { #define SQPT_COMMAND 1 #define SQPT_NUM 2 +#define SQT_UART 'u' +#define SQT_CAN 'c' +#define SQT_DEBUGFILE 'f' + #define MIN_RTO 0.025 #define MAX_RTO 5.000 #define MAX_PENDING_BLOCKS 12 @@ -570,17 +575,31 @@ handle_message(struct serialqueue *sq, double eventtime, int len) static void input_event(struct serialqueue *sq, double eventtime) { - int ret = read(sq->serial_fd, &sq->input_buf[sq->input_pos] - , sizeof(sq->input_buf) - sq->input_pos); - if (ret <= 0) { - if(ret < 0) - report_errno("read", ret); - else - errorf("Got EOF when reading from device"); - pollreactor_do_exit(&sq->pr); - return; + if (sq->serial_fd_type == SQT_CAN) { + struct can_frame cf; + int ret = read(sq->serial_fd, &cf, sizeof(cf)); + if (ret <= 0) { + report_errno("can read", ret); + pollreactor_do_exit(&sq->pr); + return; + } + if (cf.can_id != sq->client_id + 1) + return; + memcpy(&sq->input_buf[sq->input_pos], cf.data, cf.can_dlc); + sq->input_pos += cf.can_dlc; + } else { + int ret = read(sq->serial_fd, &sq->input_buf[sq->input_pos] + , sizeof(sq->input_buf) - sq->input_pos); + if (ret <= 0) { + if(ret < 0) + report_errno("read", ret); + else + errorf("Got EOF when reading from device"); + pollreactor_do_exit(&sq->pr); + return; + } + sq->input_pos += ret; } - sq->input_pos += ret; for (;;) { int len = check_message(&sq->need_sync, sq->input_buf, sq->input_pos); if (!len) @@ -619,13 +638,41 @@ kick_event(struct serialqueue *sq, double eventtime) pollreactor_update_timer(&sq->pr, SQPT_COMMAND, PR_NOW); } +static void +do_write(struct serialqueue *sq, void *buf, int buflen) +{ + if (sq->serial_fd_type != SQT_CAN) { + int ret = write(sq->serial_fd, buf, buflen); + if (ret < 0) + report_errno("write", ret); + return; + } + // Write to CAN fd + struct can_frame cf; + while (buflen) { + int size = buflen > 8 ? 8 : buflen; + cf.can_id = sq->client_id; + cf.can_dlc = size; + memcpy(cf.data, buf, size); + int ret = write(sq->serial_fd, &cf, sizeof(cf)); + if (ret < 0) { + report_errno("can write", ret); + return; + } + buf += size; + buflen -= size; + } +} + // Callback timer for when a retransmit should be done static double retransmit_event(struct serialqueue *sq, double eventtime) { - int ret = tcflush(sq->serial_fd, TCOFLUSH); - if (ret < 0) - report_errno("tcflush", ret); + if (sq->serial_fd_type == SQT_UART) { + int ret = tcflush(sq->serial_fd, TCOFLUSH); + if (ret < 0) + report_errno("tcflush", ret); + } pthread_mutex_lock(&sq->lock); @@ -640,9 +687,7 @@ retransmit_event(struct serialqueue *sq, double eventtime) if (!first_buflen) first_buflen = qm->len + 1; } - ret = write(sq->serial_fd, buf, buflen); - if (ret < 0) - report_errno("retransmit write", ret); + do_write(sq, buf, buflen); sq->bytes_retransmit += buflen; // Update rto @@ -823,9 +868,7 @@ command_event(struct serialqueue *sq, double eventtime) if (waketime != PR_NOW || buflen + MESSAGE_MAX > sizeof(buf)) { if (buflen) { // Write message blocks - int ret = write(sq->serial_fd, buf, buflen); - if (ret < 0) - report_errno("write", ret); + do_write(sq, buf, buflen); sq->bytes_write += buflen; buflen = 0; } @@ -854,19 +897,22 @@ background_thread(void *data) // Create a new 'struct serialqueue' object struct serialqueue * __visible -serialqueue_alloc(int serial_fd, int write_only) +serialqueue_alloc(int serial_fd, char serial_fd_type, int client_id) { struct serialqueue *sq = malloc(sizeof(*sq)); memset(sq, 0, sizeof(*sq)); - - // Reactor setup sq->serial_fd = serial_fd; + sq->serial_fd_type = serial_fd_type; + sq->client_id = client_id; + int ret = pipe(sq->pipe_fds); if (ret) goto fail; + + // Reactor setup pollreactor_setup(&sq->pr, SQPF_NUM, SQPT_NUM, sq); pollreactor_add_fd(&sq->pr, SQPF_SERIAL, serial_fd, input_event - , write_only); + , serial_fd_type==SQT_DEBUGFILE); pollreactor_add_fd(&sq->pr, SQPF_PIPE, sq->pipe_fds[0], kick_event, 0); pollreactor_add_timer(&sq->pr, SQPT_RETRANSMIT, retransmit_event); pollreactor_add_timer(&sq->pr, SQPT_COMMAND, command_event); @@ -876,7 +922,8 @@ serialqueue_alloc(int serial_fd, int write_only) // Retransmit setup sq->send_seq = 1; - if (write_only) { + if (serial_fd_type == SQT_DEBUGFILE) { + // Debug file output sq->receive_seq = -1; sq->rto = PR_NEVER; } else { diff --git a/klippy/chelper/serialqueue.h b/klippy/chelper/serialqueue.h index 9b40b8e7..62af9aaf 100644 --- a/klippy/chelper/serialqueue.h +++ b/klippy/chelper/serialqueue.h @@ -47,7 +47,8 @@ struct pull_queue_message { }; struct serialqueue; -struct serialqueue *serialqueue_alloc(int serial_fd, int write_only); +struct serialqueue *serialqueue_alloc(int serial_fd, char serial_fd_type + , int client_id); void serialqueue_exit(struct serialqueue *sq); void serialqueue_free(struct serialqueue *sq); struct command_queue *serialqueue_alloc_commandqueue(void); diff --git a/klippy/serialhdl.py b/klippy/serialhdl.py index d17993b9..0d03b1bb 100644 --- a/klippy/serialhdl.py +++ b/klippy/serialhdl.py @@ -72,10 +72,11 @@ class SerialReader: # Done return identify_data identify_data += msgdata - def _start_session(self, serial_dev): + def _start_session(self, serial_dev, serial_fd_type='u', client_id=0): self.serial_dev = serial_dev self.serialqueue = self.ffi_main.gc( - self.ffi_lib.serialqueue_alloc(serial_dev.fileno(), 0), + self.ffi_lib.serialqueue_alloc(serial_dev.fileno(), + serial_fd_type, client_id), self.ffi_lib.serialqueue_free) self.background_thread = threading.Thread(target=self._bg_thread) self.background_thread.start() @@ -142,7 +143,7 @@ class SerialReader: self.serial_dev = debugoutput self.msgparser.process_identify(dictionary, decompress=False) self.serialqueue = self.ffi_main.gc( - self.ffi_lib.serialqueue_alloc(self.serial_dev.fileno(), 1), + self.ffi_lib.serialqueue_alloc(self.serial_dev.fileno(), 'f', 0), self.ffi_lib.serialqueue_free) def set_clock_est(self, freq, last_time, last_clock): self.ffi_lib.serialqueue_set_clock_est(