From e1f7748e1a9dfda3e78f8a27e9972a9eb954e946 Mon Sep 17 00:00:00 2001 From: Kevin O'Connor Date: Thu, 11 Feb 2021 17:49:47 -0500 Subject: [PATCH] trdispatch: Support fast responses to trsync_state messages Add C code that can process trsync_state messages and dispatch responses with low-latency. Signed-off-by: Kevin O'Connor --- klippy/chelper/__init__.py | 22 +++- klippy/chelper/trdispatch.c | 226 ++++++++++++++++++++++++++++++++++++ 2 files changed, 244 insertions(+), 4 deletions(-) create mode 100644 klippy/chelper/trdispatch.c diff --git a/klippy/chelper/__init__.py b/klippy/chelper/__init__.py index 4a8728fb..7b20963d 100644 --- a/klippy/chelper/__init__.py +++ b/klippy/chelper/__init__.py @@ -18,7 +18,7 @@ COMPILE_ARGS = ("-Wall -g -O2 -shared -fPIC" SSE_FLAGS = "-mfpmath=sse -msse2" SOURCE_FILES = [ 'pyhelper.c', 'serialqueue.c', 'stepcompress.c', 'itersolve.c', 'trapq.c', - 'pollreactor.c', 'msgblock.c', + 'pollreactor.c', 'msgblock.c', 'trdispatch.c', 'kin_cartesian.c', 'kin_corexy.c', 'kin_corexz.c', 'kin_delta.c', 'kin_polar.c', 'kin_rotary_delta.c', 'kin_winch.c', 'kin_extruder.c', 'kin_shaper.c', @@ -168,6 +168,19 @@ defs_serialqueue = """ , struct pull_queue_message *q, int max); """ +defs_trdispatch = """ + void trdispatch_start(struct trdispatch *td, uint32_t dispatch_reason); + void trdispatch_stop(struct trdispatch *td); + struct trdispatch *trdispatch_alloc(void); + struct trdispatch_mcu *trdispatch_mcu_alloc(struct trdispatch *td + , struct serialqueue *sq, struct command_queue *cq, uint32_t trsync_oid + , uint32_t set_timeout_msgtag, uint32_t trigger_msgtag + , uint32_t state_msgtag); + void trdispatch_mcu_setup(struct trdispatch_mcu *tdm + , uint64_t last_status_clock, uint64_t expire_clock + , uint64_t expire_ticks, uint64_t min_extend_ticks); +""" + defs_pyhelper = """ void set_python_logging_callback(void (*func)(const char *)); double get_monotonic(void); @@ -179,9 +192,10 @@ defs_std = """ defs_all = [ defs_pyhelper, defs_serialqueue, defs_std, defs_stepcompress, - defs_itersolve, defs_trapq, defs_kin_cartesian, defs_kin_corexy, - defs_kin_corexz, defs_kin_delta, defs_kin_polar, defs_kin_rotary_delta, - defs_kin_winch, defs_kin_extruder, defs_kin_shaper, + defs_itersolve, defs_trapq, defs_trdispatch, + defs_kin_cartesian, defs_kin_corexy, defs_kin_corexz, defs_kin_delta, + defs_kin_polar, defs_kin_rotary_delta, defs_kin_winch, defs_kin_extruder, + defs_kin_shaper, ] # Update filenames to an absolute path diff --git a/klippy/chelper/trdispatch.c b/klippy/chelper/trdispatch.c new file mode 100644 index 00000000..3cd1b642 --- /dev/null +++ b/klippy/chelper/trdispatch.c @@ -0,0 +1,226 @@ +// Trigger sync "trsync" message dispatch +// +// Copyright (C) 2021 Kevin O'Connor +// +// This file may be distributed under the terms of the GNU GPLv3 license. + +#include // pthread_mutex_lock +#include // offsetof +#include // malloc +#include // memset +#include "compiler.h" // ARRAY_SIZE +#include "list.h" // list_add_tail +#include "pollreactor.h" // PR_NEVER +#include "pyhelper.h" // report_errno +#include "serialqueue.h" // serialqueue_add_fastreader + +struct trdispatch { + struct list_head tdm_list; + + pthread_mutex_t lock; // protects variables below + uint32_t is_active, can_trigger, dispatch_reason; +}; + +struct trdispatch_mcu { + struct fastreader fr; + struct trdispatch *td; + struct list_node node; + struct serialqueue *sq; + struct command_queue *cq; + uint32_t trsync_oid, set_timeout_msgtag, trigger_msgtag; + + // Remaining fields protected by trdispatch lock + uint64_t last_status_clock, expire_clock; + uint64_t expire_ticks, min_extend_ticks; + struct clock_estimate ce; +}; + +// Send: trsync_trigger oid=%c reason=%c +static void +send_trsync_trigger(struct trdispatch_mcu *tdm) +{ + uint32_t msg[3] = { + tdm->trigger_msgtag, tdm->trsync_oid, tdm->td->dispatch_reason + }; + struct queue_message *qm = message_alloc_and_encode(msg, ARRAY_SIZE(msg)); + serialqueue_send_one(tdm->sq, tdm->cq, qm); +} + +// Send: trsync_set_timeout oid=%c clock=%u +static void +send_trsync_set_timeout(struct trdispatch_mcu *tdm) +{ + uint32_t msg[3] = { + tdm->set_timeout_msgtag, tdm->trsync_oid, tdm->expire_clock + }; + struct queue_message *qm = message_alloc_and_encode(msg, ARRAY_SIZE(msg)); + qm->req_clock = tdm->expire_clock; + serialqueue_send_one(tdm->sq, tdm->cq, qm); +} + +// Handle a trsync_state message (callback from serialqueue fastreader) +static void +handle_trsync_state(struct fastreader *fr, uint8_t *data, int len) +{ + struct trdispatch_mcu *tdm = container_of(fr, struct trdispatch_mcu, fr); + + // Parse: trsync_state oid=%c can_trigger=%c trigger_reason=%c clock=%u + uint32_t fields[5]; + int ret = msgblock_decode(fields, ARRAY_SIZE(fields), data, len); + if (ret || fields[1] != tdm->trsync_oid) + return; + uint32_t can_trigger=fields[2], clock=fields[4]; + + // Process message + struct trdispatch *td = tdm->td; + pthread_mutex_lock(&td->lock); + if (!td->can_trigger) + goto done; + + if (!can_trigger) { + // mcu reports trigger or timeout - propagate to all mcus + td->can_trigger = 0; + struct trdispatch_mcu *m; + list_for_each_entry(m, &td->tdm_list, node) { + send_trsync_trigger(m); + } + goto done; + } + + // mcu is still working okay - update last_status_clock + serialqueue_get_clock_est(tdm->sq, &tdm->ce); + tdm->last_status_clock = clock_from_clock32(&tdm->ce, clock); + + // Determine minimum acknowledged time among all mcus + double min_time = PR_NEVER, next_min_time = PR_NEVER; + struct trdispatch_mcu *m, *min_tdm = NULL; + list_for_each_entry(m, &td->tdm_list, node) { + double status_time = clock_to_time(&m->ce, m->last_status_clock); + if (status_time < next_min_time) { + next_min_time = status_time; + if (status_time < min_time) { + next_min_time = min_time; + min_time = status_time; + min_tdm = m; + } + } + } + if (next_min_time == PR_NEVER) + next_min_time = min_time; + + // Send trsync_set_timeout messages to other mcus (if needed) + list_for_each_entry(m, &td->tdm_list, node) { + double status_time = m == min_tdm ? next_min_time : min_time; + uint64_t expire=clock_from_time(&m->ce, status_time) + m->expire_ticks; + if ((int64_t)(expire - m->expire_clock) >= m->min_extend_ticks) { + m->expire_clock = expire; + send_trsync_set_timeout(m); + } + } + +done: + pthread_mutex_unlock(&td->lock); +} + +// Begin synchronization +void __visible +trdispatch_start(struct trdispatch *td, uint32_t dispatch_reason) +{ + pthread_mutex_lock(&td->lock); + if (td->is_active || list_empty(&td->tdm_list)) { + pthread_mutex_unlock(&td->lock); + return; + } + td->dispatch_reason = dispatch_reason; + td->is_active = td->can_trigger = 1; + pthread_mutex_unlock(&td->lock); + + // Register handle_trsync_state message parser for each mcu + struct trdispatch_mcu *tdm; + list_for_each_entry(tdm, &td->tdm_list, node) { + serialqueue_add_fastreader(tdm->sq, &tdm->fr); + } +} + +// Cleanup after a test completes +void __visible +trdispatch_stop(struct trdispatch *td) +{ + pthread_mutex_lock(&td->lock); + if (!td->is_active) { + pthread_mutex_unlock(&td->lock); + return; + } + td->is_active = 0; + pthread_mutex_unlock(&td->lock); + + // Unregister handle_trsync_state message parsers + struct trdispatch_mcu *tdm; + list_for_each_entry(tdm, &td->tdm_list, node) { + serialqueue_rm_fastreader(tdm->sq, &tdm->fr); + } +} + +// Create a new 'struct trdispatch' object +struct trdispatch * __visible +trdispatch_alloc(void) +{ + struct trdispatch *td = malloc(sizeof(*td)); + memset(td, 0, sizeof(*td)); + + list_init(&td->tdm_list); + + int ret = pthread_mutex_init(&td->lock, NULL); + if (ret) { + report_errno("trdispatch_alloc pthread_mutex_init", ret); + return NULL; + } + return td; +} + +// Create a new 'struct trdispatch_mcu' object +struct trdispatch_mcu * __visible +trdispatch_mcu_alloc(struct trdispatch *td, struct serialqueue *sq + , struct command_queue *cq, uint32_t trsync_oid + , uint32_t set_timeout_msgtag, uint32_t trigger_msgtag + , uint32_t state_msgtag) +{ + struct trdispatch_mcu *tdm = malloc(sizeof(*tdm)); + memset(tdm, 0, sizeof(*tdm)); + + tdm->sq = sq; + tdm->cq = cq; + tdm->trsync_oid = trsync_oid; + tdm->set_timeout_msgtag = set_timeout_msgtag; + tdm->trigger_msgtag = trigger_msgtag; + + // Setup fastreader to match trsync_state messages + uint32_t state_prefix[] = {state_msgtag, trsync_oid}; + struct queue_message *dummy = message_alloc_and_encode( + state_prefix, ARRAY_SIZE(state_prefix)); + memcpy(tdm->fr.prefix, dummy->msg, dummy->len); + tdm->fr.prefix_len = dummy->len; + free(dummy); + tdm->fr.func = handle_trsync_state; + + tdm->td = td; + list_add_tail(&tdm->node, &td->tdm_list); + + return tdm; +} + +// Setup for a trigger test +void __visible +trdispatch_mcu_setup(struct trdispatch_mcu *tdm + , uint64_t last_status_clock, uint64_t expire_clock + , uint64_t expire_ticks, uint64_t min_extend_ticks) +{ + struct trdispatch *td = tdm->td; + pthread_mutex_lock(&td->lock); + tdm->last_status_clock = last_status_clock; + tdm->expire_clock = expire_clock; + tdm->expire_ticks = expire_ticks; + tdm->min_extend_ticks = min_extend_ticks; + serialqueue_get_clock_est(tdm->sq, &tdm->ce); + pthread_mutex_unlock(&td->lock); +}