# Serial port management for firmware communication # # Copyright (C) 2016-2021 Kevin O'Connor # # This file may be distributed under the terms of the GNU GPLv3 license. import logging, threading, os import serial import msgproto, chelper, util class error(Exception): pass class SerialReader: def __init__(self, reactor, warn_prefix=""): self.reactor = reactor self.warn_prefix = warn_prefix # Serial port self.serial_dev = None self.msgparser = msgproto.MessageParser(warn_prefix=warn_prefix) # C interface self.ffi_main, self.ffi_lib = chelper.get_ffi() self.serialqueue = None self.default_cmd_queue = self.alloc_command_queue() self.stats_buf = self.ffi_main.new('char[4096]') # Threading self.lock = threading.Lock() self.background_thread = None # Message handlers self.handlers = {} self.register_response(self._handle_unknown_init, '#unknown') self.register_response(self.handle_output, '#output') # Sent message notification tracking self.last_notify_id = 0 self.pending_notifications = {} def _bg_thread(self): response = self.ffi_main.new('struct pull_queue_message *') while 1: self.ffi_lib.serialqueue_pull(self.serialqueue, response) count = response.len if count < 0: break if response.notify_id: params = {'#sent_time': response.sent_time, '#receive_time': response.receive_time} completion = self.pending_notifications.pop(response.notify_id) self.reactor.async_complete(completion, params) continue params = self.msgparser.parse(response.msg[0:count]) params['#sent_time'] = response.sent_time params['#receive_time'] = response.receive_time hdl = (params['#name'], params.get('oid')) try: with self.lock: hdl = self.handlers.get(hdl, self.handle_default) hdl(params) except: logging.exception("%sException in serial callback", self.warn_prefix) def _error(self, msg, *params): raise error(self.warn_prefix + (msg % params)) def _get_identify_data(self, eventtime): # Query the "data dictionary" from the micro-controller identify_data = b"" while 1: msg = "identify offset=%d count=%d" % (len(identify_data), 40) try: params = self.send_with_response(msg, 'identify_response') except error as e: logging.exception("%sWait for identify_response", self.warn_prefix) return None if params['offset'] == len(identify_data): msgdata = params['data'] if not msgdata: # Done return identify_data identify_data += msgdata def _start_session(self, serial_dev, serial_fd_type=b'u', client_id=0): self.serial_dev = serial_dev self.serialqueue = self.ffi_main.gc( self.ffi_lib.serialqueue_alloc(serial_dev.fileno(), serial_fd_type, client_id), self.ffi_lib.serialqueue_free) self.background_thread = threading.Thread(target=self._bg_thread) self.background_thread.start() # Obtain and load the data dictionary from the firmware completion = self.reactor.register_callback(self._get_identify_data) identify_data = completion.wait(self.reactor.monotonic() + 5.) if identify_data is None: logging.info("%sTimeout on connect", self.warn_prefix) self.disconnect() return False msgparser = msgproto.MessageParser(warn_prefix=self.warn_prefix) msgparser.process_identify(identify_data) self.msgparser = msgparser self.register_response(self.handle_unknown, '#unknown') # Setup baud adjust if serial_fd_type == b'c': wire_freq = msgparser.get_constant_float('CANBUS_FREQUENCY', None) else: wire_freq = msgparser.get_constant_float('SERIAL_BAUD', None) if wire_freq is not None: self.ffi_lib.serialqueue_set_wire_frequency(self.serialqueue, wire_freq) receive_window = msgparser.get_constant_int('RECEIVE_WINDOW', None) if receive_window is not None: self.ffi_lib.serialqueue_set_receive_window( self.serialqueue, receive_window) return True def connect_canbus(self, canbus_uuid, canbus_nodeid, canbus_iface="can0"): import can # XXX txid = canbus_nodeid * 2 + 256 filters = [{"can_id": txid+1, "can_mask": 0x7ff, "extended": False}] # Prep for SET_NODEID command try: uuid = int(canbus_uuid, 16) except ValueError: uuid = -1 if uuid < 0 or uuid > 0xffffffffffff: self._error("Invalid CAN uuid") uuid = [(uuid >> (40 - i*8)) & 0xff for i in range(6)] CANBUS_ID_ADMIN = 0x3f0 CMD_SET_NODEID = 0x01 set_id_cmd = [CMD_SET_NODEID] + uuid + [canbus_nodeid] set_id_msg = can.Message(arbitration_id=CANBUS_ID_ADMIN, data=set_id_cmd, is_extended_id=False) # Start connection attempt logging.info("%sStarting CAN connect", self.warn_prefix) start_time = self.reactor.monotonic() while 1: if self.reactor.monotonic() > start_time + 90.: self._error("Unable to connect") try: bus = can.interface.Bus(channel=canbus_iface, can_filters=filters, bustype='socketcan') bus.send(set_id_msg) except can.CanError as e: logging.warn("%sUnable to open CAN port: %s", self.warn_prefix, e) self.reactor.pause(self.reactor.monotonic() + 5.) continue bus.close = bus.shutdown # XXX ret = self._start_session(bus, b'c', txid) if not ret: continue # Verify correct canbus_nodeid to canbus_uuid mapping try: params = self.send_with_response('get_canbus_id', 'canbus_id') got_uuid = bytearray(params['canbus_uuid']) if got_uuid == bytearray(uuid): break except: logging.exception("%sError in canbus_uuid check", self.warn_prefix) logging.info("%sFailed to match canbus_uuid - retrying..", self.warn_prefix) self.disconnect() def connect_pipe(self, filename): logging.info("%sStarting connect", self.warn_prefix) start_time = self.reactor.monotonic() while 1: if self.reactor.monotonic() > start_time + 90.: self._error("Unable to connect") try: fd = os.open(filename, os.O_RDWR | os.O_NOCTTY) except OSError as e: logging.warn("%sUnable to open port: %s", self.warn_prefix, e) self.reactor.pause(self.reactor.monotonic() + 5.) continue serial_dev = os.fdopen(fd, 'rb+', 0) ret = self._start_session(serial_dev) if ret: break def connect_uart(self, serialport, baud, rts=True): # Initial connection logging.info("%sStarting serial connect", self.warn_prefix) start_time = self.reactor.monotonic() while 1: if self.reactor.monotonic() > start_time + 90.: self._error("Unable to connect") try: serial_dev = serial.Serial(baudrate=baud, timeout=0, exclusive=True) serial_dev.port = serialport serial_dev.rts = rts serial_dev.open() except (OSError, IOError, serial.SerialException) as e: logging.warn("%sUnable to open serial port: %s", self.warn_prefix, e) self.reactor.pause(self.reactor.monotonic() + 5.) continue stk500v2_leave(serial_dev, self.reactor) ret = self._start_session(serial_dev) if ret: break def connect_file(self, debugoutput, dictionary, pace=False): self.serial_dev = debugoutput self.msgparser.process_identify(dictionary, decompress=False) self.serialqueue = self.ffi_main.gc( self.ffi_lib.serialqueue_alloc(self.serial_dev.fileno(), b'f', 0), self.ffi_lib.serialqueue_free) def set_clock_est(self, freq, conv_time, conv_clock, last_clock): self.ffi_lib.serialqueue_set_clock_est( self.serialqueue, freq, conv_time, conv_clock, last_clock) def disconnect(self): if self.serialqueue is not None: self.ffi_lib.serialqueue_exit(self.serialqueue) if self.background_thread is not None: self.background_thread.join() self.background_thread = self.serialqueue = None if self.serial_dev is not None: self.serial_dev.close() self.serial_dev = None for pn in self.pending_notifications.values(): pn.complete(None) self.pending_notifications.clear() def stats(self, eventtime): if self.serialqueue is None: return "" self.ffi_lib.serialqueue_get_stats(self.serialqueue, self.stats_buf, len(self.stats_buf)) return str(self.ffi_main.string(self.stats_buf).decode()) def get_reactor(self): return self.reactor def get_msgparser(self): return self.msgparser def get_default_command_queue(self): return self.default_cmd_queue # Serial response callbacks def register_response(self, callback, name, oid=None): with self.lock: if callback is None: del self.handlers[name, oid] else: self.handlers[name, oid] = callback # Command sending def raw_send(self, cmd, minclock, reqclock, cmd_queue): cmdlen = len(cmd) if cmdlen > msgproto.MESSAGE_PAYLOAD_MAX: self._error("Message is too long (%d bytes)" % (cmdlen,)) self.ffi_lib.serialqueue_send(self.serialqueue, cmd_queue, cmd, cmdlen, minclock, reqclock, 0) def raw_send_wait_ack(self, cmd, minclock, reqclock, cmd_queue): cmdlen = len(cmd) if cmdlen > msgproto.MESSAGE_PAYLOAD_MAX: self._error("Message is too long (%d bytes)" % (cmdlen,)) self.last_notify_id += 1 nid = self.last_notify_id completion = self.reactor.completion() self.pending_notifications[nid] = completion self.ffi_lib.serialqueue_send(self.serialqueue, cmd_queue, cmd, cmdlen, minclock, reqclock, nid) params = completion.wait() if params is None: self._error("Serial connection closed") return params def send(self, msg, minclock=0, reqclock=0): cmd = self.msgparser.create_command(msg) self.raw_send(cmd, minclock, reqclock, self.default_cmd_queue) def send_with_response(self, msg, response): cmd = self.msgparser.create_command(msg) src = SerialRetryCommand(self, response) return src.get_response([cmd], self.default_cmd_queue) def alloc_command_queue(self): return self.ffi_main.gc(self.ffi_lib.serialqueue_alloc_commandqueue(), self.ffi_lib.serialqueue_free_commandqueue) # Dumping debug lists def dump_debug(self): out = [] out.append("Dumping serial stats: %s" % ( self.stats(self.reactor.monotonic()),)) sdata = self.ffi_main.new('struct pull_queue_message[1024]') rdata = self.ffi_main.new('struct pull_queue_message[1024]') scount = self.ffi_lib.serialqueue_extract_old(self.serialqueue, 1, sdata, len(sdata)) rcount = self.ffi_lib.serialqueue_extract_old(self.serialqueue, 0, rdata, len(rdata)) out.append("Dumping send queue %d messages" % (scount,)) for i in range(scount): msg = sdata[i] cmds = self.msgparser.dump(msg.msg[0:msg.len]) out.append("Sent %d %f %f %d: %s" % ( i, msg.receive_time, msg.sent_time, msg.len, ', '.join(cmds))) out.append("Dumping receive queue %d messages" % (rcount,)) for i in range(rcount): msg = rdata[i] cmds = self.msgparser.dump(msg.msg[0:msg.len]) out.append("Receive: %d %f %f %d: %s" % ( i, msg.receive_time, msg.sent_time, msg.len, ', '.join(cmds))) return '\n'.join(out) # Default message handlers def _handle_unknown_init(self, params): logging.debug("%sUnknown message %d (len %d) while identifying", self.warn_prefix, params['#msgid'], len(params['#msg'])) def handle_unknown(self, params): logging.warn("%sUnknown message type %d: %s", self.warn_prefix, params['#msgid'], repr(params['#msg'])) def handle_output(self, params): logging.info("%s%s: %s", self.warn_prefix, params['#name'], params['#msg']) def handle_default(self, params): logging.warn("%sgot %s", self.warn_prefix, params) # Class to send a query command and return the received response class SerialRetryCommand: def __init__(self, serial, name, oid=None): self.serial = serial self.name = name self.oid = oid self.last_params = None self.serial.register_response(self.handle_callback, name, oid) def handle_callback(self, params): self.last_params = params def get_response(self, cmds, cmd_queue, minclock=0, reqclock=0): retries = 5 retry_delay = .010 while 1: for cmd in cmds[:-1]: self.serial.raw_send(cmd, minclock, reqclock, cmd_queue) self.serial.raw_send_wait_ack(cmds[-1], minclock, reqclock, cmd_queue) params = self.last_params if params is not None: self.serial.register_response(None, self.name, self.oid) return params if retries <= 0: self.serial.register_response(None, self.name, self.oid) raise error("Unable to obtain '%s' response" % (self.name,)) reactor = self.serial.reactor reactor.pause(reactor.monotonic() + retry_delay) retries -= 1 retry_delay *= 2. # Attempt to place an AVR stk500v2 style programmer into normal mode def stk500v2_leave(ser, reactor): logging.debug("Starting stk500v2 leave programmer sequence") util.clear_hupcl(ser.fileno()) origbaud = ser.baudrate # Request a dummy speed first as this seems to help reset the port ser.baudrate = 2400 ser.read(1) # Send stk500v2 leave programmer sequence ser.baudrate = 115200 reactor.pause(reactor.monotonic() + 0.100) ser.read(4096) ser.write(b'\x1b\x01\x00\x01\x0e\x11\x04') reactor.pause(reactor.monotonic() + 0.050) res = ser.read(4096) logging.debug("Got %s from stk500v2", repr(res)) ser.baudrate = origbaud def cheetah_reset(serialport, reactor): # Fysetc Cheetah v1.2 boards have a weird stateful circuitry for # configuring the bootloader. This sequence takes care of disabling it for # sure. # Open the serial port with RTS asserted ser = serial.Serial(baudrate=2400, timeout=0, exclusive=True) ser.port = serialport ser.rts = True ser.open() ser.read(1) reactor.pause(reactor.monotonic() + 0.100) # Toggle DTR ser.dtr = True reactor.pause(reactor.monotonic() + 0.100) ser.dtr = False # Deassert RTS reactor.pause(reactor.monotonic() + 0.100) ser.rts = False reactor.pause(reactor.monotonic() + 0.100) # Toggle DTR again ser.dtr = True reactor.pause(reactor.monotonic() + 0.100) ser.dtr = False reactor.pause(reactor.monotonic() + 0.100) ser.close() # Attempt an arduino style reset on a serial port def arduino_reset(serialport, reactor): # First try opening the port at a different baud ser = serial.Serial(serialport, 2400, timeout=0, exclusive=True) ser.read(1) reactor.pause(reactor.monotonic() + 0.100) # Then toggle DTR ser.dtr = True reactor.pause(reactor.monotonic() + 0.100) ser.dtr = False reactor.pause(reactor.monotonic() + 0.100) ser.close()