serialqueue: Support sending messages over a CAN bus

Signed-off-by: Kevin O'Connor <kevin@koconnor.net>
This commit is contained in:
Kevin O'Connor 2021-02-07 16:03:39 -05:00
parent 9572ad4327
commit 8b4ad34e22
4 changed files with 80 additions and 30 deletions

View File

@ -141,7 +141,8 @@ defs_serialqueue = """
uint64_t notify_id; uint64_t notify_id;
}; };
struct serialqueue *serialqueue_alloc(int serial_fd, int write_only); struct serialqueue *serialqueue_alloc(int serial_fd, char serial_fd_type
, int client_id);
void serialqueue_exit(struct serialqueue *sq); void serialqueue_exit(struct serialqueue *sq);
void serialqueue_free(struct serialqueue *sq); void serialqueue_free(struct serialqueue *sq);
struct command_queue *serialqueue_alloc_commandqueue(void); struct command_queue *serialqueue_alloc_commandqueue(void);

View File

@ -13,6 +13,7 @@
// background thread is launched to do this work and minimize latency. // background thread is launched to do this work and minimize latency.
#include <fcntl.h> // fcntl #include <fcntl.h> // fcntl
#include <linux/can.h> // // struct can_frame
#include <math.h> // ceil #include <math.h> // ceil
#include <poll.h> // poll #include <poll.h> // poll
#include <pthread.h> // pthread_mutex_lock #include <pthread.h> // pthread_mutex_lock
@ -354,7 +355,7 @@ message_queue_free(struct list_head *root)
struct serialqueue { struct serialqueue {
// Input reading // Input reading
struct pollreactor pr; struct pollreactor pr;
int serial_fd; int serial_fd, serial_fd_type, client_id;
int pipe_fds[2]; int pipe_fds[2];
uint8_t input_buf[4096]; uint8_t input_buf[4096];
uint8_t need_sync; uint8_t need_sync;
@ -396,6 +397,10 @@ struct serialqueue {
#define SQPT_COMMAND 1 #define SQPT_COMMAND 1
#define SQPT_NUM 2 #define SQPT_NUM 2
#define SQT_UART 'u'
#define SQT_CAN 'c'
#define SQT_DEBUGFILE 'f'
#define MIN_RTO 0.025 #define MIN_RTO 0.025
#define MAX_RTO 5.000 #define MAX_RTO 5.000
#define MAX_PENDING_BLOCKS 12 #define MAX_PENDING_BLOCKS 12
@ -570,17 +575,31 @@ handle_message(struct serialqueue *sq, double eventtime, int len)
static void static void
input_event(struct serialqueue *sq, double eventtime) input_event(struct serialqueue *sq, double eventtime)
{ {
int ret = read(sq->serial_fd, &sq->input_buf[sq->input_pos] if (sq->serial_fd_type == SQT_CAN) {
, sizeof(sq->input_buf) - sq->input_pos); struct can_frame cf;
if (ret <= 0) { int ret = read(sq->serial_fd, &cf, sizeof(cf));
if(ret < 0) if (ret <= 0) {
report_errno("read", ret); report_errno("can read", ret);
else pollreactor_do_exit(&sq->pr);
errorf("Got EOF when reading from device"); return;
pollreactor_do_exit(&sq->pr); }
return; if (cf.can_id != sq->client_id + 1)
return;
memcpy(&sq->input_buf[sq->input_pos], cf.data, cf.can_dlc);
sq->input_pos += cf.can_dlc;
} else {
int ret = read(sq->serial_fd, &sq->input_buf[sq->input_pos]
, sizeof(sq->input_buf) - sq->input_pos);
if (ret <= 0) {
if(ret < 0)
report_errno("read", ret);
else
errorf("Got EOF when reading from device");
pollreactor_do_exit(&sq->pr);
return;
}
sq->input_pos += ret;
} }
sq->input_pos += ret;
for (;;) { for (;;) {
int len = check_message(&sq->need_sync, sq->input_buf, sq->input_pos); int len = check_message(&sq->need_sync, sq->input_buf, sq->input_pos);
if (!len) if (!len)
@ -619,13 +638,41 @@ kick_event(struct serialqueue *sq, double eventtime)
pollreactor_update_timer(&sq->pr, SQPT_COMMAND, PR_NOW); pollreactor_update_timer(&sq->pr, SQPT_COMMAND, PR_NOW);
} }
static void
do_write(struct serialqueue *sq, void *buf, int buflen)
{
if (sq->serial_fd_type != SQT_CAN) {
int ret = write(sq->serial_fd, buf, buflen);
if (ret < 0)
report_errno("write", ret);
return;
}
// Write to CAN fd
struct can_frame cf;
while (buflen) {
int size = buflen > 8 ? 8 : buflen;
cf.can_id = sq->client_id;
cf.can_dlc = size;
memcpy(cf.data, buf, size);
int ret = write(sq->serial_fd, &cf, sizeof(cf));
if (ret < 0) {
report_errno("can write", ret);
return;
}
buf += size;
buflen -= size;
}
}
// Callback timer for when a retransmit should be done // Callback timer for when a retransmit should be done
static double static double
retransmit_event(struct serialqueue *sq, double eventtime) retransmit_event(struct serialqueue *sq, double eventtime)
{ {
int ret = tcflush(sq->serial_fd, TCOFLUSH); if (sq->serial_fd_type == SQT_UART) {
if (ret < 0) int ret = tcflush(sq->serial_fd, TCOFLUSH);
report_errno("tcflush", ret); if (ret < 0)
report_errno("tcflush", ret);
}
pthread_mutex_lock(&sq->lock); pthread_mutex_lock(&sq->lock);
@ -640,9 +687,7 @@ retransmit_event(struct serialqueue *sq, double eventtime)
if (!first_buflen) if (!first_buflen)
first_buflen = qm->len + 1; first_buflen = qm->len + 1;
} }
ret = write(sq->serial_fd, buf, buflen); do_write(sq, buf, buflen);
if (ret < 0)
report_errno("retransmit write", ret);
sq->bytes_retransmit += buflen; sq->bytes_retransmit += buflen;
// Update rto // Update rto
@ -823,9 +868,7 @@ command_event(struct serialqueue *sq, double eventtime)
if (waketime != PR_NOW || buflen + MESSAGE_MAX > sizeof(buf)) { if (waketime != PR_NOW || buflen + MESSAGE_MAX > sizeof(buf)) {
if (buflen) { if (buflen) {
// Write message blocks // Write message blocks
int ret = write(sq->serial_fd, buf, buflen); do_write(sq, buf, buflen);
if (ret < 0)
report_errno("write", ret);
sq->bytes_write += buflen; sq->bytes_write += buflen;
buflen = 0; buflen = 0;
} }
@ -854,19 +897,22 @@ background_thread(void *data)
// Create a new 'struct serialqueue' object // Create a new 'struct serialqueue' object
struct serialqueue * __visible struct serialqueue * __visible
serialqueue_alloc(int serial_fd, int write_only) serialqueue_alloc(int serial_fd, char serial_fd_type, int client_id)
{ {
struct serialqueue *sq = malloc(sizeof(*sq)); struct serialqueue *sq = malloc(sizeof(*sq));
memset(sq, 0, sizeof(*sq)); memset(sq, 0, sizeof(*sq));
// Reactor setup
sq->serial_fd = serial_fd; sq->serial_fd = serial_fd;
sq->serial_fd_type = serial_fd_type;
sq->client_id = client_id;
int ret = pipe(sq->pipe_fds); int ret = pipe(sq->pipe_fds);
if (ret) if (ret)
goto fail; goto fail;
// Reactor setup
pollreactor_setup(&sq->pr, SQPF_NUM, SQPT_NUM, sq); pollreactor_setup(&sq->pr, SQPF_NUM, SQPT_NUM, sq);
pollreactor_add_fd(&sq->pr, SQPF_SERIAL, serial_fd, input_event pollreactor_add_fd(&sq->pr, SQPF_SERIAL, serial_fd, input_event
, write_only); , serial_fd_type==SQT_DEBUGFILE);
pollreactor_add_fd(&sq->pr, SQPF_PIPE, sq->pipe_fds[0], kick_event, 0); pollreactor_add_fd(&sq->pr, SQPF_PIPE, sq->pipe_fds[0], kick_event, 0);
pollreactor_add_timer(&sq->pr, SQPT_RETRANSMIT, retransmit_event); pollreactor_add_timer(&sq->pr, SQPT_RETRANSMIT, retransmit_event);
pollreactor_add_timer(&sq->pr, SQPT_COMMAND, command_event); pollreactor_add_timer(&sq->pr, SQPT_COMMAND, command_event);
@ -876,7 +922,8 @@ serialqueue_alloc(int serial_fd, int write_only)
// Retransmit setup // Retransmit setup
sq->send_seq = 1; sq->send_seq = 1;
if (write_only) { if (serial_fd_type == SQT_DEBUGFILE) {
// Debug file output
sq->receive_seq = -1; sq->receive_seq = -1;
sq->rto = PR_NEVER; sq->rto = PR_NEVER;
} else { } else {

View File

@ -47,7 +47,8 @@ struct pull_queue_message {
}; };
struct serialqueue; struct serialqueue;
struct serialqueue *serialqueue_alloc(int serial_fd, int write_only); struct serialqueue *serialqueue_alloc(int serial_fd, char serial_fd_type
, int client_id);
void serialqueue_exit(struct serialqueue *sq); void serialqueue_exit(struct serialqueue *sq);
void serialqueue_free(struct serialqueue *sq); void serialqueue_free(struct serialqueue *sq);
struct command_queue *serialqueue_alloc_commandqueue(void); struct command_queue *serialqueue_alloc_commandqueue(void);

View File

@ -72,10 +72,11 @@ class SerialReader:
# Done # Done
return identify_data return identify_data
identify_data += msgdata identify_data += msgdata
def _start_session(self, serial_dev): def _start_session(self, serial_dev, serial_fd_type='u', client_id=0):
self.serial_dev = serial_dev self.serial_dev = serial_dev
self.serialqueue = self.ffi_main.gc( self.serialqueue = self.ffi_main.gc(
self.ffi_lib.serialqueue_alloc(serial_dev.fileno(), 0), self.ffi_lib.serialqueue_alloc(serial_dev.fileno(),
serial_fd_type, client_id),
self.ffi_lib.serialqueue_free) self.ffi_lib.serialqueue_free)
self.background_thread = threading.Thread(target=self._bg_thread) self.background_thread = threading.Thread(target=self._bg_thread)
self.background_thread.start() self.background_thread.start()
@ -142,7 +143,7 @@ class SerialReader:
self.serial_dev = debugoutput self.serial_dev = debugoutput
self.msgparser.process_identify(dictionary, decompress=False) self.msgparser.process_identify(dictionary, decompress=False)
self.serialqueue = self.ffi_main.gc( self.serialqueue = self.ffi_main.gc(
self.ffi_lib.serialqueue_alloc(self.serial_dev.fileno(), 1), self.ffi_lib.serialqueue_alloc(self.serial_dev.fileno(), 'f', 0),
self.ffi_lib.serialqueue_free) self.ffi_lib.serialqueue_free)
def set_clock_est(self, freq, last_time, last_clock): def set_clock_est(self, freq, last_time, last_clock):
self.ffi_lib.serialqueue_set_clock_est( self.ffi_lib.serialqueue_set_clock_est(