motan: Add support for analyzing "angle" sensor data

Signed-off-by: Kevin O'Connor <kevin@koconnor.net>
This commit is contained in:
Kevin O'Connor 2021-07-25 11:42:10 -04:00
parent aec742ece4
commit 76558168d9
2 changed files with 65 additions and 0 deletions

View File

@ -157,6 +157,10 @@ class DataLogger:
aname = cfgname.split()[-1]
self.send_subscribe("adxl345:" + aname, "adxl345/dump_adxl345",
{"sensor": aname})
if cfgname.startswith("angle "):
aname = cfgname.split()[1]
self.send_subscribe("angle:" + aname, "angle/dump_angle",
{"sensor": aname})
def handle_dump(self, msg, raw_msg):
msg_id = msg["id"]
if "result" not in msg:

View File

@ -374,6 +374,67 @@ class HandleADXL345:
self.data_pos += 1
LogHandlers["adxl345"] = HandleADXL345
# Extract positions from magnetic angle sensor
class HandleAngle:
SubscriptionIdParts = 2
ParametersMin = ParametersMax = 1
DataSets = [
('angle(<name>)', 'Angle sensor position'),
]
def __init__(self, lmanager, name, name_parts):
self.name = name
self.angle_name = name_parts[1]
self.jdispatch = lmanager.get_jdispatch()
self.next_angle_time = self.last_angle_time = 0.
self.next_angle = self.last_angle = 0.
self.cur_data = []
self.data_pos = 0
self.position_offset = 0.
self.angle_dist = 1.
# Determine angle distance from associated stepper's rotation_distance
config = lmanager.get_initial_status()['configfile']['settings']
aname = 'angle %s' % (self.angle_name,)
stepper_name = config.get(aname, {}).get('stepper')
if stepper_name is not None:
sconfig = config.get(stepper_name, {})
rotation_distance = sconfig.get('rotation_distance', 1.)
gear_ratio = sconfig.get('gear_ratio', ())
if type(gear_ratio) == str: # XXX
gear_ratio = [[float(v.strip()) for v in gr.split(':')]
for gr in gear_ratio.split(',')]
for n, d in gear_ratio:
rotation_distance *= d / n
self.angle_dist = rotation_distance / 65536.
def get_label(self):
label = '%s position' % (self.angle_name,)
return {'label': label, 'units': 'Position\n(mm)'}
def pull_data(self, req_time):
while 1:
if req_time <= self.next_angle_time:
pdiff = self.next_angle - self.last_angle
tdiff = self.next_angle_time - self.last_angle_time
rtdiff = req_time - self.last_angle_time
po = rtdiff * pdiff / tdiff
return ((self.last_angle + po) * self.angle_dist
+ self.position_offset)
if self.data_pos >= len(self.cur_data):
# Read next data block
jmsg = self.jdispatch.pull_msg(req_time, self.name)
if jmsg is None:
return (self.next_angle * self.angle_dist
+ self.position_offset)
self.cur_data = jmsg['data']
position_offset = jmsg.get('position_offset')
if position_offset is not None:
self.position_offset = position_offset
self.data_pos = 0
continue
self.last_angle = self.next_angle
self.last_angle_time = self.next_angle_time
self.next_angle_time, self.next_angle = self.cur_data[self.data_pos]
self.data_pos += 1
LogHandlers["angle"] = HandleAngle
######################################################################
# Log reading