serialhdl: Rework mcu clock synchronization

The existing clock synchronization code has two flaws: intermittent
transmission latency during get_status requests can cause the
estimated clock to be too low, and the estimated clock calculation did
not take into account possible clock drift between samples.  The
former could potentially lead to "Timer too close" errors and the
latter could potentially lead to "Move queue empty" errors.

Rework the code to avoid the above problems.  It's not necessary to
estimate the micro-controller clock as an excellent estimate is
reported by the micro-controller (via the CLOCK_FREQ constant).
Account for a small drift from the reported value, and check on each
sample if the drift exceeds the expected limits.  With a good starting
estimated clock, only the offset needs to be calculated.  Use previous
offsets (and the estimated clock) in calculation of new offsets to
avoid intermittent latency from badly skewing the results.  Finally,
add an additional time offset of one millisecond to account for any
minor inaccuracies.

Signed-off-by: Kevin O'Connor <kevin@koconnor.net>
This commit is contained in:
Kevin O'Connor 2017-06-27 20:01:11 -04:00
parent c957e4ba86
commit c7d0358c41
1 changed files with 68 additions and 52 deletions

View File

@ -8,6 +8,8 @@ import serial
import msgproto, chelper, util
MAX_CLOCK_DRIFT = 0.000100
class error(Exception):
pass
@ -26,9 +28,9 @@ class SerialReader:
self.default_cmd_queue = self.alloc_command_queue()
self.stats_buf = self.ffi_main.new('char[4096]')
# MCU time/clock tracking
self.last_ack_time = self.last_ack_rtt_time = 0.
self.last_ack_clock = self.last_ack_rtt_clock = 0
self.est_clock = 0.
self.last_clock = 0
self.last_clock_time = self.last_clock_time_min = 0.
self.min_freq = self.max_freq = 0.
# Threading
self.lock = threading.Lock()
self.background_thread = None
@ -101,34 +103,31 @@ class SerialReader:
baud_adjust = self.BITS_PER_BYTE / mcu_baud
self.ffi_lib.serialqueue_set_baud_adjust(
self.serialqueue, baud_adjust)
# Enable periodic get_status timer
get_status = msgparser.lookup_command('get_status')
self.status_cmd = get_status.encode()
self.reactor.update_timer(self.status_timer, self.reactor.NOW)
# Load initial last_ack_clock/last_ack_time
# Load initial last_clock/last_clock_time
uptime_msg = msgparser.create_command('get_uptime')
params = self.send_with_response(uptime_msg, 'uptime')
self.last_ack_clock = (params['high'] << 32) | params['clock']
self.last_ack_time = params['#receive_time']
# Make sure est_clock is calculated
starttime = eventtime = self.reactor.monotonic()
while not self.est_clock:
if eventtime > starttime + 5.:
raise error("timeout on est_clock calculation")
eventtime = self.reactor.pause(eventtime + 0.010)
self.last_clock = (params['high'] << 32) | params['clock']
self.last_clock_time = params['#receive_time']
self.last_clock_time_min = params['#sent_time']
clock_freq = msgparser.get_constant_float('CLOCK_FREQ')
self.min_freq = clock_freq * (1. - MAX_CLOCK_DRIFT)
self.max_freq = clock_freq * (1. + MAX_CLOCK_DRIFT)
# Enable periodic get_status timer
self.status_cmd = msgparser.create_command('get_status')
self.reactor.update_timer(self.status_timer, self.reactor.NOW)
def connect_file(self, debugoutput, dictionary, pace=False):
self.ser = debugoutput
self.msgparser.process_identify(dictionary, decompress=False)
est_clock = 1000000000000.
est_freq = 1000000000000.
if pace:
est_clock = float(self.msgparser.config['CLOCK_FREQ'])
est_freq = float(self.msgparser.config['CLOCK_FREQ'])
self.serialqueue = self.ffi_lib.serialqueue_alloc(self.ser.fileno(), 1)
self.est_clock = est_clock
self.last_ack_time = self.reactor.monotonic()
self.last_ack_clock = 0
self.min_freq = self.max_freq = est_freq
self.last_clock = 0
self.last_clock_time = self.reactor.monotonic()
self.ffi_lib.serialqueue_set_clock_est(
self.serialqueue, self.est_clock, self.last_ack_time
, self.last_ack_clock)
self.serialqueue, self.min_freq, self.last_clock_time
, self.last_clock)
def disconnect(self):
if self.serialqueue is not None:
self.ffi_lib.serialqueue_exit(self.serialqueue)
@ -145,8 +144,8 @@ class SerialReader:
sqstats = self.ffi_lib.serialqueue_get_stats(
self.serialqueue, self.stats_buf, len(self.stats_buf))
sqstats = self.ffi_main.string(self.stats_buf)
tstats = " est_clock=%.3f last_ack_time=%.3f last_ack_clock=%d" % (
self.est_clock, self.last_ack_time, self.last_ack_clock)
tstats = " last_clock=%d last_clock_time=%.3f" % (
self.last_clock, self.last_clock_time)
return sqstats + tstats
def _status_event(self, eventtime):
self.send(self.status_cmd)
@ -161,18 +160,18 @@ class SerialReader:
# Clock tracking
def get_clock(self, eventtime):
with self.lock:
return int(self.last_ack_clock
+ (eventtime - self.last_ack_time) * self.est_clock)
return int(self.last_clock
+ (eventtime - self.last_clock_time) * self.min_freq)
def translate_clock(self, raw_clock):
with self.lock:
last_ack_clock = self.last_ack_clock
clock_diff = (last_ack_clock - raw_clock) & 0xffffffff
last_clock = self.last_clock
clock_diff = (last_clock - raw_clock) & 0xffffffff
if clock_diff & 0x80000000:
return last_ack_clock + 0x100000000 - clock_diff
return last_ack_clock - clock_diff
return last_clock + 0x100000000 - clock_diff
return last_clock - clock_diff
def get_last_clock(self):
with self.lock:
return self.last_ack_clock, self.last_ack_time
return self.last_clock, self.last_clock_time
# Command sending
def send(self, cmd, minclock=0, reqclock=0, cq=None):
if cq is None:
@ -210,28 +209,45 @@ class SerialReader:
i, msg.receive_time, msg.sent_time, msg.len, ', '.join(cmds)))
# Default message handlers
def handle_status(self, params):
sent_time = params['#sent_time']
if not sent_time:
return
receive_time = params['#receive_time']
clock = params['clock']
with self.lock:
# Update last_ack_time / last_ack_clock
ack_clock = (self.last_ack_clock & ~0xffffffff) | params['clock']
if ack_clock < self.last_ack_clock:
ack_clock += 0x100000000
sent_time = params['#sent_time']
self.last_ack_time = receive_time = params['#receive_time']
self.last_ack_clock = ack_clock
# Update est_clock (if applicable)
if receive_time > self.last_ack_rtt_time + 1. and sent_time:
if self.last_ack_rtt_time:
timedelta = receive_time - self.last_ack_rtt_time
clockdelta = ack_clock - self.last_ack_rtt_clock
estclock = clockdelta / timedelta
if estclock > self.est_clock and self.est_clock:
self.est_clock = (self.est_clock * 63. + estclock) / 64.
else:
self.est_clock = estclock
self.last_ack_rtt_time = sent_time
self.last_ack_rtt_clock = ack_clock
# Extend clock to 64bit
clock = (self.last_clock & ~0xffffffff) | clock
if clock < self.last_clock:
clock += 0x100000000
# Calculate expected send time from clock and previous estimates
clock_delta = clock - self.last_clock
min_send_time = (self.last_clock_time_min
+ clock_delta / self.max_freq)
max_send_time = self.last_clock_time + clock_delta / self.min_freq
# Calculate intersection of times
min_time = max(min_send_time, sent_time)
max_time = min(max_send_time, receive_time)
if min_time > max_time:
# No intersection - clock drift must be greater than expected
new_min_freq, new_max_freq = self.min_freq, self.max_freq
if min_send_time > receive_time:
new_max_freq = (
clock_delta / (receive_time - self.last_clock_time_min))
else:
new_min_freq = (
clock_delta / (sent_time - self.last_clock_time))
logging.warning(
"High clock drift! Now %.0f:%.0f was %.0f:%.0f" % (
new_min_freq, new_max_freq,
self.min_freq, self.max_freq))
self.min_freq, self.max_freq = new_min_freq, new_max_freq
min_time, max_time = sent_time, receive_time
# Update variables
self.last_clock = clock
self.last_clock_time = max_time
self.last_clock_time_min = min_time
self.ffi_lib.serialqueue_set_clock_est(
self.serialqueue, self.est_clock, receive_time, ack_clock)
self.serialqueue, self.min_freq, max_time + 0.001, clock)
def handle_unknown(self, params):
logging.warn("Unknown message type %d: %s" % (
params['#msgid'], repr(params['#msg'])))