serialhdl: Add support for communicating over a CAN bus

Signed-off-by: Kevin O'Connor <kevin@koconnor.net>
This commit is contained in:
Kevin O'Connor 2021-02-07 16:59:18 -05:00
parent 8b4ad34e22
commit a20f4a8759
2 changed files with 47 additions and 1 deletions

View File

@ -4,7 +4,7 @@
# #
# This file may be distributed under the terms of the GNU GPLv3 license. # This file may be distributed under the terms of the GNU GPLv3 license.
import logging, threading, os import logging, threading, os
import serial import serial, can
import msgproto, chelper, util import msgproto, chelper, util
@ -102,6 +102,51 @@ class SerialReader:
self.ffi_lib.serialqueue_set_receive_window( self.ffi_lib.serialqueue_set_receive_window(
self.serialqueue, receive_window) self.serialqueue, receive_window)
return True return True
def connect_canbus(self, canbus_uuid, canbus_nodeid, canbus_iface="can0"):
txid = canbus_nodeid * 2 + 256
filters = [{"can_id": txid+1, "can_mask": 0x7ff, "extended": False}]
# Prep for SET_NODEID command
try:
uuid = int(canbus_uuid, 16)
except ValueError:
uuid = -1
if uuid < 0 or uuid > 0xffffffffffff:
raise error("Invalid CAN uuid")
uuid = [(uuid >> (40 - i*8)) & 0xff for i in range(6)]
CANBUS_ID_ADMIN = 0x3f0
CMD_SET_NODEID = 0x01
set_id_cmd = [CMD_SET_NODEID] + uuid + [canbus_nodeid]
set_id_msg = can.Message(arbitration_id=CANBUS_ID_ADMIN,
data=set_id_cmd, is_extended_id=False)
# Start connection attempt
logging.info("Starting CAN connect")
start_time = self.reactor.monotonic()
while 1:
if self.reactor.monotonic() > start_time + 90.:
raise error("Unable to connect")
try:
bus = can.interface.Bus(channel=canbus_iface,
can_filters=filters,
bustype='socketcan')
bus.send(set_id_msg)
except can.CanError as e:
logging.warn("Unable to open CAN port: %s", e)
self.reactor.pause(self.reactor.monotonic() + 5.)
continue
bus.close = bus.shutdown # XXX
ret = self._start_session(bus, 'c', txid)
if not ret:
continue
# Verify correct canbus_nodeid to canbus_uuid mapping
try:
params = self.send_with_response('get_canbus_id', 'canbus_id')
got_uuid = bytearray(params['canbus_uuid'])
if got_uuid == bytearray(uuid):
break
except:
logging.exception("Error in canbus_uuid check")
logging.info("Failed to match canbus_uuid - retrying..")
self.disconnect()
def connect_pipe(self, filename): def connect_pipe(self, filename):
logging.info("Starting connect") logging.info("Starting connect")
start_time = self.reactor.monotonic() start_time = self.reactor.monotonic()

View File

@ -6,3 +6,4 @@ cffi==1.12.2
pyserial==3.4 pyserial==3.4
greenlet==0.4.15 greenlet==0.4.15
Jinja2==2.10.1 Jinja2==2.10.1
python-can==3.3.4