motion_report: Add helper tool for internal subscribers of data

Add support for internal clients to "subscribe" to data like external
clients.

Signed-off-by: Kevin O'Connor <kevin@koconnor.net>
This commit is contained in:
Kevin O'Connor 2021-08-11 23:29:37 -04:00
parent 3f46609c1d
commit 6999ff6256
1 changed files with 22 additions and 0 deletions

View File

@ -50,6 +50,11 @@ class APIDumpHelper:
template = web_request.get_dict('response_template', {}) template = web_request.get_dict('response_template', {})
self.clients[cconn] = template self.clients[cconn] = template
self._start() self._start()
def add_internal_client(self):
cconn = InternalDumpClient()
self.clients[cconn] = {}
self._start()
return cconn
def _update(self, eventtime): def _update(self, eventtime):
try: try:
msg = self.data_cb(eventtime) msg = self.data_cb(eventtime)
@ -69,6 +74,23 @@ class APIDumpHelper:
cconn.send(tmp) cconn.send(tmp)
return eventtime + self.update_interval return eventtime + self.update_interval
# An "internal webhooks" wrapper for using APIDumpHelper internally
class InternalDumpClient:
def __init__(self):
self.msgs = []
self.is_done = False
def get_messages(self):
return self.msgs
def finalize(self):
self.is_done = True
def is_closed(self):
return self.is_done
def send(self, msg):
self.msgs.append(msg)
if len(self.msgs) >= 10000:
# Avoid filling up memory with too many samples
self.finalize()
# Extract stepper queue_step messages # Extract stepper queue_step messages
class DumpStepper: class DumpStepper:
def __init__(self, printer, mcu_stepper): def __init__(self, printer, mcu_stepper):