bus: Add UART support

This commit is contained in:
Desuuuu 2021-09-10 01:08:25 +02:00
parent 2735aecb92
commit 49753e37b8
No known key found for this signature in database
GPG Key ID: 85943F4B2C2CE0DC
1 changed files with 91 additions and 1 deletions

View File

@ -3,7 +3,7 @@
# Copyright (C) 2018,2019 Kevin O'Connor <kevin@koconnor.net> # Copyright (C) 2018,2019 Kevin O'Connor <kevin@koconnor.net>
# #
# This file may be distributed under the terms of the GNU GPLv3 license. # This file may be distributed under the terms of the GNU GPLv3 license.
import mcu import mcu, msgproto
def resolve_bus_name(mcu, param, bus): def resolve_bus_name(mcu, param, bus):
# Find enumerations for the given bus # Find enumerations for the given bus
@ -217,6 +217,96 @@ def MCU_I2C_from_config(config, default_addr=None, default_speed=100000):
return MCU_I2C(i2c_mcu, bus, addr, speed) return MCU_I2C(i2c_mcu, bus, addr, speed)
######################################################################
# UART
######################################################################
# Helper code for working with devices connected to an MCU via a UART bus
class MCU_UART:
def __init__(self, mcu, bus, receive_cb, baud, rx_buffer, tx_buffer,
rx_interval):
self.mcu = mcu
self.bus = bus
self.receive_cb = receive_cb
self.oid = mcu.create_oid()
self.rx_buffer = rx_buffer
self.tx_buffer = tx_buffer
self.rx_interval = rx_interval
mcu.add_config_cmd(
"config_uart oid=%d rx_buffer=%d tx_buffer=%d rx_interval=%d"
% (self.oid, rx_buffer, tx_buffer, rx_interval))
self.config_fmt = (
"uart_set_bus oid=%d uart_bus=%%s baud=%d"
% (self.oid, baud))
self.cmd_queue = mcu.alloc_command_queue()
mcu.register_config_callback(self.build_config)
self.uart_send_cmd = None
def setup_shutdown_msg(self, data):
data_len = len(data)
if data_len > self.tx_buffer:
raise ValueError("Data size exceeds TX buffer size")
chunk_size = msgproto.MESSAGE_PAYLOAD_MAX - 4
chunks = [data[i:i+chunk_size] for i in range(0, data_len, chunk_size)]
for chunk in chunks:
data_msg = "".join(["%02x" % (x,) for x in chunk])
self.mcu.add_config_cmd(
"config_uart_shutdown oid=%d uart_oid=%d shutdown_msg=%s"
% (self.mcu.create_oid(), self.oid, data_msg))
def get_oid(self):
return self.oid
def get_mcu(self):
return self.mcu
def get_command_queue(self):
return self.cmd_queue
def build_config(self):
bus = resolve_bus_name(self.mcu, "uart_bus", self.bus)
self.mcu.add_config_cmd(self.config_fmt % (bus,))
self.uart_send_cmd = self.mcu.lookup_command(
"uart_send oid=%c data=%*s", cq=self.cmd_queue)
if self.receive_cb is not None:
self.mcu.register_response(self._handle_uart_receive,
"uart_receive",
self.oid)
def uart_send(self, data, minclock=0, reqclock=0):
data_len = len(data)
if data_len > self.tx_buffer:
raise ValueError("Data size exceeds TX buffer size")
chunk_size = msgproto.MESSAGE_PAYLOAD_MAX - 3
chunks = [data[i:i+chunk_size] for i in range(0, data_len, chunk_size)]
for chunk in chunks:
if self.uart_send_cmd is None:
# Send setup message via mcu initialization
data_msg = "".join(["%02x" % (x,) for x in chunk])
self.mcu.add_config_cmd(
"uart_send oid=%d data=%s"
% (self.oid, data_msg),
is_init=True)
return
self.uart_send_cmd.send([self.oid, chunk],
minclock=minclock, reqclock=reqclock)
def _handle_uart_receive(self, params):
data = bytearray(params['data'])
self.receive_cb(data)
def MCU_UART_from_config(config, receive_cb, default_baud=115200,
default_rx_buffer=64, default_tx_buffer=128,
default_rx_interval=50):
# Load bus parameters
printer = config.get_printer()
uart_mcu = mcu.get_printer_mcu(printer, config.get('uart_mcu', 'mcu'))
bus = config.get('uart_bus', None)
baud = config.getint('uart_baud', default_baud, minval=9600, maxval=460800)
rx_buffer = config.getint('uart_rx_buffer', default_rx_buffer,
minval=1, maxval=255)
tx_buffer = config.getint('uart_tx_buffer', default_tx_buffer,
minval=1, maxval=255)
rx_interval = config.getint('uart_rx_interval', default_rx_interval,
minval=0, maxval=1000)
# Create MCU_UART object
return MCU_UART(uart_mcu, bus, receive_cb, baud, rx_buffer, tx_buffer,
rx_interval)
###################################################################### ######################################################################
# Bus synchronized digital outputs # Bus synchronized digital outputs
###################################################################### ######################################################################