klipper-dgus/klippy/serialqueue.c

1053 lines
31 KiB
C

// Serial port command queuing
//
// Copyright (C) 2016 Kevin O'Connor <kevin@koconnor.net>
//
// This file may be distributed under the terms of the GNU GPLv3 license.
//
// This goal of this code is to handle low-level serial port
// communications with a microcontroller (mcu). This code is written
// in C (instead of python) to reduce communication latencies and to
// reduce scheduling jitter. The code queues messages to be
// transmitted, schedules transmission of commands at specified mcu
// clock times, prioritizes commands, and handles retransmissions. A
// background thread is launched to do this work and minimize latency.
#include <errno.h> // errno
#include <math.h> // ceil
#include <poll.h> // poll
#include <pthread.h> // pthread_mutex_lock
#include <stddef.h> // offsetof
#include <stdint.h> // uint64_t
#include <stdio.h> // snprintf
#include <stdlib.h> // malloc
#include <string.h> // memset
#include <sys/time.h> // gettimeofday
#include <time.h> // struct timespec
#include <termios.h> // tcflush
#include <unistd.h> // pipe
#include "list.h" // list_add_tail
#include "serialqueue.h" // struct queue_message
/****************************************************************
* Helper functions
****************************************************************/
// Return the current system time as a double
static double
get_time(void)
{
struct timeval tv;
gettimeofday(&tv, NULL);
return (double)tv.tv_sec + (double)tv.tv_usec / 1000000.;
}
#if 0
// Fill a 'struct timespec' with a system time stored in a double
struct timespec
fill_time(double time)
{
time_t t = time;
return (struct timespec) {t, (time - t)*1000000000. };
}
#endif
// Report 'errno' in a message written to stderr
void
report_errno(char *where, int rc)
{
int e = errno;
fprintf(stderr, "Got error %d in %s: (%d)%s\n", rc, where, e, strerror(e));
}
// Return a hex character for a given number
#define GETHEX(x) ((x) < 10 ? '0' + (x) : 'e' + (x) - 10)
// Translate a binary string into an ASCII string with escape sequences
char *
dump_string(char *outbuf, int outbuf_size, uint8_t *inbuf, int inbuf_size)
{
char *outend = &outbuf[outbuf_size-5], *o = outbuf;
uint8_t *inend = &inbuf[inbuf_size], *p = inbuf;
while (p < inend && o < outend) {
uint8_t c = *p++;
if (c > 31 && c < 127 && c != '\\') {
*o++ = c;
continue;
}
*o++ = '\\';
*o++ = 'x';
*o++ = GETHEX(c >> 4);
*o++ = GETHEX(c & 0x0f);
}
*o = '\0';
return outbuf;
}
/****************************************************************
* Poll reactor
****************************************************************/
// The 'poll reactor' code is a mechanism for dispatching timer and
// file descriptor events.
#define PR_NOW 0.
#define PR_NEVER 9999999999999999.
struct pollreactor_timer {
double waketime;
double (*callback)(void *data, double eventtime);
};
struct pollreactor {
int num_fds, num_timers, must_exit;
void *callback_data;
double next_timer;
struct pollfd *fds;
void (**fd_callbacks)(void *data, double eventtime);
struct pollreactor_timer *timers;
};
// Allocate a new 'struct pollreactor' object
static void
pollreactor_setup(struct pollreactor *pr, int num_fds, int num_timers
, void *callback_data)
{
pr->num_fds = num_fds;
pr->num_timers = num_timers;
pr->must_exit = 0;
pr->callback_data = callback_data;
pr->next_timer = PR_NEVER;
pr->fds = malloc(num_fds * sizeof(*pr->fds));
memset(pr->fds, 0, num_fds * sizeof(*pr->fds));
pr->fd_callbacks = malloc(num_fds * sizeof(*pr->fd_callbacks));
memset(pr->fd_callbacks, 0, num_fds * sizeof(*pr->fd_callbacks));
pr->timers = malloc(num_timers * sizeof(*pr->timers));
memset(pr->timers, 0, num_timers * sizeof(*pr->timers));
int i;
for (i=0; i<num_timers; i++)
pr->timers[i].waketime = PR_NEVER;
}
// Add a callback for when a file descriptor (fd) becomes readable
static void
pollreactor_add_fd(struct pollreactor *pr, int pos, int fd, void *callback)
{
pr->fds[pos].fd = fd;
pr->fds[pos].events = POLLIN|POLLHUP;
pr->fds[pos].revents = 0;
pr->fd_callbacks[pos] = callback;
}
// Add a timer callback
static void
pollreactor_add_timer(struct pollreactor *pr, int pos, void *callback)
{
pr->timers[pos].callback = callback;
pr->timers[pos].waketime = PR_NEVER;
}
#if 0
// Return the last schedule wake-up time for a timer
static double
pollreactor_get_timer(struct pollreactor *pr, int pos)
{
return pr->timers[pos].waketime;
}
#endif
// Set the wake-up time for a given timer
static void
pollreactor_update_timer(struct pollreactor *pr, int pos, double waketime)
{
pr->timers[pos].waketime = waketime;
if (waketime < pr->next_timer)
pr->next_timer = waketime;
}
// Internal code to invoke timer callbacks
static int
pollreactor_check_timers(struct pollreactor *pr, double eventtime)
{
if (eventtime >= pr->next_timer) {
pr->next_timer = PR_NEVER;
int i;
for (i=0; i<pr->num_timers; i++) {
struct pollreactor_timer *timer = &pr->timers[i];
double t = timer->waketime;
if (eventtime >= t) {
t = timer->callback(pr->callback_data, eventtime);
timer->waketime = t;
}
if (t < pr->next_timer)
pr->next_timer = t;
}
if (eventtime >= pr->next_timer)
return 0;
}
double timeout = ceil((pr->next_timer - eventtime) * 1000.);
return timeout < 1. ? 1 : (timeout > 1000. ? 1000 : (int)timeout);
}
// Repeatedly check for timer and fd events and invoke their callbacks
static void
pollreactor_run(struct pollreactor *pr)
{
pr->must_exit = 0;
double eventtime = get_time();
while (! pr->must_exit) {
int timeout = pollreactor_check_timers(pr, eventtime);
int ret = poll(pr->fds, pr->num_fds, timeout);
eventtime = get_time();
if (ret > 0) {
int i;
for (i=0; i<pr->num_fds; i++)
if (pr->fds[i].revents)
pr->fd_callbacks[i](pr->callback_data, eventtime);
} else if (ret < 0) {
report_errno("poll", ret);
pr->must_exit = 1;
}
}
}
// Request that a currently running pollreactor_run() loop exit
static void
pollreactor_do_exit(struct pollreactor *pr)
{
pr->must_exit = 1;
}
// Check if a pollreactor_run() loop has been requested to exit
static int
pollreactor_is_exit(struct pollreactor *pr)
{
return pr->must_exit;
}
/****************************************************************
* Serial protocol helpers
****************************************************************/
// Implement the standard crc "ccitt" algorithm on the given buffer
static uint16_t
crc16_ccitt(uint8_t *buf, uint8_t len)
{
uint16_t crc = 0xffff;
while (len--) {
uint8_t data = *buf++;
data ^= crc & 0xff;
data ^= data << 4;
crc = ((((uint16_t)data << 8) | (crc >> 8)) ^ (uint8_t)(data >> 4)
^ ((uint16_t)data << 3));
}
return crc;
}
// Verify a buffer starts with a valid mcu message
static int
check_message(uint8_t *need_sync, uint8_t *buf, int buf_len)
{
if (buf_len < MESSAGE_MIN)
// Need more data
return 0;
if (*need_sync)
goto error;
uint8_t msglen = buf[MESSAGE_POS_LEN];
if (msglen < MESSAGE_MIN || msglen > MESSAGE_MAX)
goto error;
uint8_t msgseq = buf[MESSAGE_POS_SEQ];
if ((msgseq & ~MESSAGE_SEQ_MASK) != MESSAGE_DEST)
goto error;
if (buf_len < msglen)
// Need more data
return 0;
if (buf[msglen-MESSAGE_TRAILER_SYNC] != MESSAGE_SYNC)
goto error;
uint16_t msgcrc = ((buf[msglen-MESSAGE_TRAILER_CRC] << 8)
| (uint8_t)buf[msglen-MESSAGE_TRAILER_CRC+1]);
uint16_t crc = crc16_ccitt(buf, msglen-MESSAGE_TRAILER_SIZE);
if (crc != msgcrc)
goto error;
return msglen;
error: ;
// Discard bytes until next SYNC found
uint8_t *next_sync = memchr(buf, MESSAGE_SYNC, buf_len);
if (next_sync) {
*need_sync = 0;
return -(next_sync - buf + 1);
}
*need_sync = 1;
return -buf_len;
}
// Encode an integer as a variable length quantity (vlq)
static uint8_t *
encode_int(uint8_t *p, uint32_t v)
{
int32_t sv = v;
if (sv < (3L<<5) && sv >= -(1L<<5)) goto f4;
if (sv < (3L<<12) && sv >= -(1L<<12)) goto f3;
if (sv < (3L<<19) && sv >= -(1L<<19)) goto f2;
if (sv < (3L<<26) && sv >= -(1L<<26)) goto f1;
*p++ = (v>>28) | 0x80;
f1: *p++ = ((v>>21) & 0x7f) | 0x80;
f2: *p++ = ((v>>14) & 0x7f) | 0x80;
f3: *p++ = ((v>>7) & 0x7f) | 0x80;
f4: *p++ = v & 0x7f;
return p;
}
/****************************************************************
* Command queues
****************************************************************/
struct command_queue {
struct list_head stalled_queue, ready_queue;
struct list_node node;
};
// Allocate a 'struct queue_message' object
static struct queue_message *
message_alloc(void)
{
struct queue_message *qm = malloc(sizeof(*qm));
memset(qm, 0, sizeof(*qm));
return qm;
}
// Allocate a queue_message and fill it with the specified data
static struct queue_message *
message_fill(uint8_t *data, int len)
{
struct queue_message *qm = message_alloc();
memcpy(qm->msg, data, len);
qm->len = len;
return qm;
}
// Allocate a queue_message and fill it with a series of encoded vlq integers
struct queue_message *
message_alloc_and_encode(uint32_t *data, int len)
{
struct queue_message *qm = message_alloc();
int i;
uint8_t *p = qm->msg;
for (i=0; i<len; i++) {
p = encode_int(p, data[i]);
if (p > &qm->msg[MESSAGE_PAYLOAD_MAX])
goto fail;
}
qm->len = p - qm->msg;
return qm;
fail:
fprintf(stderr, "Encode error\n");
qm->len = 0;
return qm;
}
// Free the storage from a previous message_alloc() call
static void
message_free(struct queue_message *qm)
{
free(qm);
}
/****************************************************************
* Serialqueue interface
****************************************************************/
struct serialqueue {
// Input reading
struct pollreactor pr;
int serial_fd;
int pipe_fds[2];
uint8_t input_buf[4096];
uint8_t need_sync;
int input_pos;
// Threading
pthread_t tid;
pthread_mutex_t lock; // protects variables below
pthread_cond_t cond;
int receive_waiting;
// Baud / clock tracking
double baud_adjust, idle_time;
double est_clock, last_ack_time;
uint64_t last_ack_clock;
double last_receive_sent_time;
// Retransmit support
uint64_t send_seq, receive_seq;
uint64_t retransmit_seq, rtt_sample_seq;
struct list_head sent_queue;
double srtt, rttvar, rto;
// Pending transmission message queues
struct list_head pending_queues;
int ready_bytes, stalled_bytes;
uint64_t need_kick_clock;
int can_delay_writes;
// Received messages
struct list_head receive_queue;
// Debugging
struct list_head old_sent, old_receive;
// Stats
uint32_t bytes_write, bytes_read, bytes_retransmit, bytes_invalid;
};
#define SQPF_SERIAL 0
#define SQPF_PIPE 1
#define SQPF_NUM 2
#define SQPT_RETRANSMIT 0
#define SQPT_COMMAND 1
#define SQPT_NUM 2
#define MIN_RTO 0.025
#define MAX_RTO 5.000
#define MAX_SERIAL_BUFFER 0.050
#define MIN_REQTIME_DELTA 0.250
#define IDLE_QUERY_TIME 1.0
#define DEBUG_QUEUE_SENT 100
#define DEBUG_QUEUE_RECEIVE 20
// Create a series of empty messages and add them to a list
static void
debug_queue_alloc(struct list_head *root, int count)
{
int i;
for (i=0; i<count; i++) {
struct queue_message *qm = message_alloc();
list_add_head(&qm->node, root);
}
}
// Copy a message to a debug queue and free old debug messages
static void
debug_queue_add(struct list_head *root, struct queue_message *qm)
{
list_add_tail(&qm->node, root);
struct queue_message *old = list_first_entry(
root, struct queue_message, node);
list_del(&old->node);
message_free(old);
}
// Wake up the receiver thread if it is waiting
static void
check_wake_receive(struct serialqueue *sq)
{
if (sq->receive_waiting) {
sq->receive_waiting = 0;
pthread_cond_signal(&sq->cond);
}
}
// Update internal state when the receive sequence increases
static void
update_receive_seq(struct serialqueue *sq, double eventtime, uint64_t rseq)
{
// Remove from sent queue
int ack_count = rseq - sq->receive_seq;
uint64_t sent_seq = sq->receive_seq;
while (!list_empty(&sq->sent_queue) && ack_count--) {
struct queue_message *sent = list_first_entry(
&sq->sent_queue, struct queue_message, node);
if (rseq == ++sent_seq)
sq->last_receive_sent_time = sent->receive_time;
list_del(&sent->node);
debug_queue_add(&sq->old_sent, sent);
}
sq->receive_seq = rseq;
if (rseq > sq->send_seq)
sq->send_seq = rseq;
pollreactor_update_timer(&sq->pr, SQPT_COMMAND, PR_NOW);
// Update retransmit info
if (sq->rtt_sample_seq && rseq >= sq->rtt_sample_seq
&& sq->last_receive_sent_time) {
// RFC6298 rtt calculations
double delta = eventtime - sq->last_receive_sent_time;
if (!sq->srtt) {
sq->rttvar = delta / 2.0;
sq->srtt = delta * 10.0; // use a higher start default
} else {
sq->rttvar = (3.0 * sq->rttvar + fabs(sq->srtt - delta)) / 4.0;
sq->srtt = (7.0 * sq->srtt + delta) / 8.0;
}
double rttvar4 = sq->rttvar * 4.0;
if (rttvar4 < 0.001)
rttvar4 = 0.001;
sq->rto = sq->srtt + rttvar4;
if (sq->rto < MIN_RTO)
sq->rto = MIN_RTO;
else if (sq->rto > MAX_RTO)
sq->rto = MAX_RTO;
sq->rtt_sample_seq = 0;
}
if (list_empty(&sq->sent_queue)) {
pollreactor_update_timer(&sq->pr, SQPT_RETRANSMIT, PR_NEVER);
} else {
struct queue_message *sent = list_first_entry(
&sq->sent_queue, struct queue_message, node);
double nr = eventtime + sq->rto + sent->len * sq->baud_adjust;
pollreactor_update_timer(&sq->pr, SQPT_RETRANSMIT, nr);
}
}
// Process a well formed input message
static void
handle_message(struct serialqueue *sq, double eventtime, int len)
{
// Calculate receive sequence number
uint64_t rseq = ((sq->receive_seq & ~MESSAGE_SEQ_MASK)
| (sq->input_buf[MESSAGE_POS_SEQ] & MESSAGE_SEQ_MASK));
if (rseq < sq->receive_seq)
rseq += MESSAGE_SEQ_MASK+1;
if (rseq != sq->receive_seq)
// New sequence number
update_receive_seq(sq, eventtime, rseq);
else if (len == MESSAGE_MIN && rseq > sq->retransmit_seq)
// Duplicate sequence number in an empty message is a nak
pollreactor_update_timer(&sq->pr, SQPT_RETRANSMIT, PR_NOW);
if (len > MESSAGE_MIN) {
// Add message to receive queue
struct queue_message *qm = message_fill(sq->input_buf, len);
qm->sent_time = sq->last_receive_sent_time;
qm->receive_time = eventtime;
list_add_tail(&qm->node, &sq->receive_queue);
check_wake_receive(sq);
}
}
// Callback for input activity on the serial fd
static void
input_event(struct serialqueue *sq, double eventtime)
{
int ret = read(sq->serial_fd, &sq->input_buf[sq->input_pos]
, sizeof(sq->input_buf) - sq->input_pos);
if (ret <= 0) {
report_errno("read", ret);
pollreactor_do_exit(&sq->pr);
return;
}
sq->input_pos += ret;
for (;;) {
ret = check_message(&sq->need_sync, sq->input_buf, sq->input_pos);
if (!ret)
// Need more data
return;
if (ret > 0) {
// Received a valid message
pthread_mutex_lock(&sq->lock);
handle_message(sq, eventtime, ret);
sq->bytes_read += ret;
pthread_mutex_unlock(&sq->lock);
} else {
// Skip bad data at beginning of input
ret = -ret;
pthread_mutex_lock(&sq->lock);
sq->bytes_invalid += ret;
pthread_mutex_unlock(&sq->lock);
}
sq->input_pos -= ret;
if (sq->input_pos)
memmove(sq->input_buf, &sq->input_buf[ret], sq->input_pos);
}
}
// Callback for input activity on the pipe fd (wakes command_event)
static void
kick_event(struct serialqueue *sq, double eventtime)
{
char dummy[4096];
int ret = read(sq->pipe_fds[0], dummy, sizeof(dummy));
if (ret < 0)
report_errno("pipe read", ret);
pollreactor_update_timer(&sq->pr, SQPT_COMMAND, PR_NOW);
}
// Callback timer for when a retransmit should be done
static double
retransmit_event(struct serialqueue *sq, double eventtime)
{
int ret = tcflush(sq->serial_fd, TCOFLUSH);
if (ret < 0)
report_errno("tcflush", ret);
pthread_mutex_lock(&sq->lock);
// Retransmit all pending messages
uint8_t buf[MESSAGE_MAX * MESSAGE_SEQ_MASK + 1];
int buflen = 0;
buf[buflen++] = MESSAGE_SYNC;
struct queue_message *qm;
list_for_each_entry(qm, &sq->sent_queue, node) {
memcpy(&buf[buflen], qm->msg, qm->len);
buflen += qm->len;
}
ret = write(sq->serial_fd, buf, buflen);
if (ret < 0)
report_errno("retransmit write", ret);
sq->bytes_retransmit += buflen;
// Update rto
sq->rto *= 2.0;
if (sq->rto > MAX_RTO)
sq->rto = MAX_RTO;
sq->retransmit_seq = sq->send_seq;
sq->rtt_sample_seq = 0;
sq->idle_time = eventtime + buflen * sq->baud_adjust;
double waketime = sq->idle_time + sq->rto;
pthread_mutex_unlock(&sq->lock);
return waketime;
}
// Construct a block of data and send to the serial port
static void
build_and_send_command(struct serialqueue *sq, double eventtime)
{
struct queue_message *out = message_alloc();
out->len = MESSAGE_HEADER_SIZE;
while (sq->ready_bytes) {
// Find highest priority message (message with lowest req_clock)
uint64_t min_clock = MAX_CLOCK;
struct command_queue *q, *cq = NULL;
struct queue_message *qm = NULL;
list_for_each_entry(q, &sq->pending_queues, node) {
if (!list_empty(&q->ready_queue)) {
struct queue_message *m = list_first_entry(
&q->ready_queue, struct queue_message, node);
if (m->req_clock < min_clock) {
min_clock = m->req_clock;
cq = q;
qm = m;
}
}
}
// Append message to outgoing command
if (out->len + qm->len > sizeof(out->msg) - MESSAGE_TRAILER_SIZE)
break;
list_del(&qm->node);
if (list_empty(&cq->ready_queue) && list_empty(&cq->stalled_queue))
list_del(&cq->node);
memcpy(&out->msg[out->len], qm->msg, qm->len);
out->len += qm->len;
sq->ready_bytes -= qm->len;
message_free(qm);
}
// Fill header / trailer
out->len += MESSAGE_TRAILER_SIZE;
out->msg[MESSAGE_POS_LEN] = out->len;
out->msg[MESSAGE_POS_SEQ] = MESSAGE_DEST | (sq->send_seq & MESSAGE_SEQ_MASK);
uint16_t crc = crc16_ccitt(out->msg, out->len - MESSAGE_TRAILER_SIZE);
out->msg[out->len - MESSAGE_TRAILER_CRC] = crc >> 8;
out->msg[out->len - MESSAGE_TRAILER_CRC+1] = crc & 0xff;
out->msg[out->len - MESSAGE_TRAILER_SYNC] = MESSAGE_SYNC;
// Send message
int ret = write(sq->serial_fd, out->msg, out->len);
if (ret < 0)
report_errno("write", ret);
sq->bytes_write += out->len;
if (eventtime > sq->idle_time)
sq->idle_time = eventtime;
sq->idle_time += out->len * sq->baud_adjust;
out->sent_time = eventtime;
out->receive_time = sq->idle_time;
if (list_empty(&sq->sent_queue))
pollreactor_update_timer(&sq->pr, SQPT_RETRANSMIT
, sq->idle_time + sq->rto);
sq->send_seq++;
if (!sq->rtt_sample_seq)
sq->rtt_sample_seq = sq->send_seq;
list_add_tail(&out->node, &sq->sent_queue);
}
// Determine the time the next serial data should be sent
static double
check_send_command(struct serialqueue *sq, double eventtime)
{
if (eventtime < sq->idle_time - MAX_SERIAL_BUFFER)
// Serial port already busy
return sq->idle_time - MAX_SERIAL_BUFFER;
if (sq->send_seq - sq->receive_seq >= MESSAGE_SEQ_MASK
&& sq->receive_seq != (uint64_t)-1)
// Need an ack before more messages can be sent
return PR_NEVER;
// Check for stalled messages now ready
double idletime = eventtime > sq->idle_time ? eventtime : sq->idle_time;
idletime += MESSAGE_MIN * sq->baud_adjust;
double timedelta = idletime - sq->last_ack_time;
uint64_t ack_clock = (uint64_t)(timedelta * sq->est_clock) + sq->last_ack_clock;
uint64_t min_stalled_clock = MAX_CLOCK, min_ready_clock = MAX_CLOCK;
struct command_queue *cq;
list_for_each_entry(cq, &sq->pending_queues, node) {
// Move messages from the stalled_queue to the ready_queue
while (!list_empty(&cq->stalled_queue)) {
struct queue_message *qm = list_first_entry(
&cq->stalled_queue, struct queue_message, node);
if (ack_clock < qm->min_clock) {
if (qm->min_clock < min_stalled_clock)
min_stalled_clock = qm->min_clock;
break;
}
list_del(&qm->node);
list_add_tail(&qm->node, &cq->ready_queue);
sq->stalled_bytes -= qm->len;
sq->ready_bytes += qm->len;
}
// Update min_ready_clock
if (!list_empty(&cq->ready_queue)) {
struct queue_message *qm = list_first_entry(
&cq->ready_queue, struct queue_message, node);
if (qm->req_clock < min_ready_clock)
min_ready_clock = qm->req_clock;
}
}
// Check for messages to send
if (sq->ready_bytes >= MESSAGE_PAYLOAD_MAX)
return PR_NOW;
if (! sq->can_delay_writes) {
if (sq->ready_bytes)
return PR_NOW;
if (sq->est_clock)
sq->can_delay_writes = 1;
sq->need_kick_clock = MAX_CLOCK;
return PR_NEVER;
}
uint64_t reqclock_delta = MIN_REQTIME_DELTA * sq->est_clock;
if (min_ready_clock <= ack_clock + reqclock_delta)
return PR_NOW;
uint64_t wantclock = min_ready_clock - reqclock_delta;
if (min_stalled_clock < wantclock)
wantclock = min_stalled_clock;
sq->need_kick_clock = wantclock;
return idletime + (wantclock - ack_clock) / sq->est_clock;
}
// Callback timer to send data to the serial port
static double
command_event(struct serialqueue *sq, double eventtime)
{
pthread_mutex_lock(&sq->lock);
double waketime;
for (;;) {
waketime = check_send_command(sq, eventtime);
if (waketime != PR_NOW)
break;
build_and_send_command(sq, eventtime);
}
pthread_mutex_unlock(&sq->lock);
return waketime;
}
// Main background thread for reading/writing to serial port
static void *
background_thread(void *data)
{
struct serialqueue *sq = data;
pollreactor_run(&sq->pr);
pthread_mutex_lock(&sq->lock);
check_wake_receive(sq);
pthread_mutex_unlock(&sq->lock);
return NULL;
}
// Create a new 'struct serialqueue' object
struct serialqueue *
serialqueue_alloc(int serial_fd, int write_only)
{
struct serialqueue *sq = malloc(sizeof(*sq));
memset(sq, 0, sizeof(*sq));
// Reactor setup
sq->serial_fd = serial_fd;
int ret = pipe(sq->pipe_fds);
if (ret)
goto fail;
pollreactor_setup(&sq->pr, SQPF_NUM, SQPT_NUM, sq);
if (!write_only)
pollreactor_add_fd(&sq->pr, SQPF_SERIAL, serial_fd, input_event);
pollreactor_add_fd(&sq->pr, SQPF_PIPE, sq->pipe_fds[0], kick_event);
pollreactor_add_timer(&sq->pr, SQPT_RETRANSMIT, retransmit_event);
pollreactor_add_timer(&sq->pr, SQPT_COMMAND, command_event);
// Retransmit setup
sq->send_seq = 1;
if (write_only) {
sq->receive_seq = -1;
sq->rto = PR_NEVER;
} else {
sq->receive_seq = 1;
sq->rto = MIN_RTO;
}
// Queues
sq->need_kick_clock = MAX_CLOCK;
list_init(&sq->pending_queues);
list_init(&sq->sent_queue);
list_init(&sq->receive_queue);
// Debugging
list_init(&sq->old_sent);
list_init(&sq->old_receive);
debug_queue_alloc(&sq->old_sent, DEBUG_QUEUE_SENT);
debug_queue_alloc(&sq->old_receive, DEBUG_QUEUE_RECEIVE);
// Thread setup
ret = pthread_mutex_init(&sq->lock, NULL);
if (ret)
goto fail;
ret = pthread_cond_init(&sq->cond, NULL);
if (ret)
goto fail;
ret = pthread_create(&sq->tid, NULL, background_thread, sq);
if (ret)
goto fail;
return sq;
fail:
report_errno("init", ret);
return NULL;
}
// Request that the background thread exit
void
serialqueue_exit(struct serialqueue *sq)
{
pollreactor_do_exit(&sq->pr);
int ret = pthread_join(sq->tid, NULL);
if (ret)
report_errno("pthread_join", ret);
}
// Allocate a 'struct command_queue'
struct command_queue *
serialqueue_alloc_commandqueue(void)
{
struct command_queue *cq = malloc(sizeof(*cq));
memset(cq, 0, sizeof(*cq));
list_init(&cq->ready_queue);
list_init(&cq->stalled_queue);
return cq;
}
// Write to the internal pipe to wake the background thread if in poll
static void
kick_bg_thread(struct serialqueue *sq)
{
int ret = write(sq->pipe_fds[1], ".", 1);
if (ret < 0)
report_errno("pipe write", ret);
}
// Add a batch of messages to the given command_queue
void
serialqueue_send_batch(struct serialqueue *sq, struct command_queue *cq
, struct list_head *msgs)
{
// Make sure min_clock is set in list and calculate total bytes
int len = 0;
struct queue_message *qm;
list_for_each_entry(qm, msgs, node) {
if (qm->min_clock + (1LL<<31) < qm->req_clock)
qm->min_clock = qm->req_clock - (1LL<<31);
len += qm->len;
}
if (! len)
return;
qm = list_first_entry(msgs, struct queue_message, node);
// Add list to cq->stalled_queue
pthread_mutex_lock(&sq->lock);
if (list_empty(&cq->ready_queue) && list_empty(&cq->stalled_queue))
list_add_tail(&cq->node, &sq->pending_queues);
list_join_tail(msgs, &cq->stalled_queue);
sq->stalled_bytes += len;
int mustwake = 0;
if (qm->min_clock < sq->need_kick_clock) {
sq->need_kick_clock = 0;
mustwake = 1;
}
pthread_mutex_unlock(&sq->lock);
// Wake the background thread if necessary
if (mustwake)
kick_bg_thread(sq);
}
// Schedule the transmission of a message on the serial port at a
// given time and priority.
void
serialqueue_send(struct serialqueue *sq, struct command_queue *cq
, uint8_t *msg, int len, uint64_t min_clock, uint64_t req_clock)
{
struct queue_message *qm = message_fill(msg, len);
qm->min_clock = min_clock;
qm->req_clock = req_clock;
struct list_head msgs;
list_init(&msgs);
list_add_tail(&qm->node, &msgs);
serialqueue_send_batch(sq, cq, &msgs);
}
// Like serialqueue_send() but also builds the message to be sent
void
serialqueue_encode_and_send(struct serialqueue *sq, struct command_queue *cq
, uint32_t *data, int len
, uint64_t min_clock, uint64_t req_clock)
{
struct queue_message *qm = message_alloc_and_encode(data, len);
qm->min_clock = min_clock;
qm->req_clock = req_clock;
struct list_head msgs;
list_init(&msgs);
list_add_tail(&qm->node, &msgs);
serialqueue_send_batch(sq, cq, &msgs);
}
// Return a message read from the serial port (or wait for one if none
// available)
void
serialqueue_pull(struct serialqueue *sq, struct pull_queue_message *pqm)
{
pthread_mutex_lock(&sq->lock);
// Wait for message to be available
while (list_empty(&sq->receive_queue)) {
if (pollreactor_is_exit(&sq->pr))
goto exit;
sq->receive_waiting = 1;
int ret = pthread_cond_wait(&sq->cond, &sq->lock);
if (ret)
report_errno("pthread_cond_wait", ret);
}
// Remove message from queue
struct queue_message *qm = list_first_entry(
&sq->receive_queue, struct queue_message, node);
list_del(&qm->node);
// Copy message
memcpy(pqm->msg, qm->msg, qm->len);
pqm->len = qm->len;
pqm->sent_time = qm->sent_time;
pqm->receive_time = qm->receive_time;
debug_queue_add(&sq->old_receive, qm);
pthread_mutex_unlock(&sq->lock);
return;
exit:
pqm->len = -1;
pthread_mutex_unlock(&sq->lock);
}
void
serialqueue_set_baud_adjust(struct serialqueue *sq, double baud_adjust)
{
pthread_mutex_lock(&sq->lock);
sq->baud_adjust = baud_adjust;
pthread_mutex_unlock(&sq->lock);
}
// Set the estimated clock rate of the mcu on the other end of the
// serial port
void
serialqueue_set_clock_est(struct serialqueue *sq, double est_clock
, double last_ack_time, uint64_t last_ack_clock)
{
pthread_mutex_lock(&sq->lock);
sq->est_clock = est_clock;
sq->last_ack_time = last_ack_time;
sq->last_ack_clock = last_ack_clock;
pthread_mutex_unlock(&sq->lock);
}
// Flush all messages in a "ready" state
void
serialqueue_flush_ready(struct serialqueue *sq)
{
pthread_mutex_lock(&sq->lock);
sq->can_delay_writes = 0;
pthread_mutex_unlock(&sq->lock);
kick_bg_thread(sq);
}
// Return a string buffer containing statistics for the serial port
void
serialqueue_get_stats(struct serialqueue *sq, char *buf, int len)
{
struct serialqueue stats;
pthread_mutex_lock(&sq->lock);
memcpy(&stats, sq, sizeof(stats));
pthread_mutex_unlock(&sq->lock);
snprintf(buf, len, "bytes_write=%u bytes_read=%u"
" bytes_retransmit=%u bytes_invalid=%u"
" send_seq=%u receive_seq=%u retransmit_seq=%u"
" srtt=%.3f rttvar=%.3f rto=%.3f"
" ready_bytes=%u stalled_bytes=%u"
, stats.bytes_write, stats.bytes_read
, stats.bytes_retransmit, stats.bytes_invalid
, (int)stats.send_seq, (int)stats.receive_seq
, (int)stats.retransmit_seq
, stats.srtt, stats.rttvar, stats.rto
, stats.ready_bytes, stats.stalled_bytes);
}
// Extract old messages stored in the debug queues
int
serialqueue_extract_old(struct serialqueue *sq, int sentq
, struct pull_queue_message *q, int max)
{
int count = sentq ? DEBUG_QUEUE_SENT : DEBUG_QUEUE_RECEIVE;
struct list_head *rootp = sentq ? &sq->old_sent : &sq->old_receive;
struct list_head replacement, current;
list_init(&replacement);
debug_queue_alloc(&replacement, count);
list_init(&current);
// Atomically replace existing debug list with new zero'd list
pthread_mutex_lock(&sq->lock);
list_join_tail(rootp, &current);
list_init(rootp);
list_join_tail(&replacement, rootp);
pthread_mutex_unlock(&sq->lock);
// Walk the debug list
int pos = 0;
while (!list_empty(&current) && pos < max) {
struct queue_message *qm = list_first_entry(
&current, struct queue_message, node);
if (qm->len) {
struct pull_queue_message *pqm = q++;
pos++;
memcpy(pqm->msg, qm->msg, qm->len);
pqm->len = qm->len;
pqm->sent_time = qm->sent_time;
pqm->receive_time = qm->receive_time;
}
list_del(&qm->node);
message_free(qm);
}
return pos;
}