From b0e3effb5324df0ed7371c3c0103139d65f2480e Mon Sep 17 00:00:00 2001 From: Kevin O'Connor Date: Tue, 11 Aug 2020 21:31:09 -0400 Subject: [PATCH] webhooks: Rework get_status() subscriptions Implement a new subscription system for get_status() updates. Subscriptions are per-client. After an initial update, only changes will be transmitted. Responses are only transmitted to the client that issued the subscription. Signed-off-by: Kevin O'Connor --- klippy/webhooks.py | 207 ++++++++++++++++++++------------------------- 1 file changed, 90 insertions(+), 117 deletions(-) diff --git a/klippy/webhooks.py b/klippy/webhooks.py index d10f5c2c..358e9646 100644 --- a/klippy/webhooks.py +++ b/klippy/webhooks.py @@ -357,128 +357,101 @@ class GCodeHelper: SUBSCRIPTION_REFRESH_TIME = .25 -class StatusHandler: +class QueryStatusHelper: def __init__(self, printer): self.printer = printer - self.webhooks = webhooks = printer.lookup_object('webhooks') - self.ready = self.timer_started = False - self.reactor = self.printer.get_reactor() - self.available_objects = {} - self.subscriptions = {} - self.subscription_timer = self.reactor.register_timer( - self._batch_subscription_handler, self.reactor.NEVER) - - # Register events - self.printer.register_event_handler( - "klippy:ready", self._handle_ready) - self.printer.register_event_handler( - "gcode:request_restart", self._handle_restart) - + self.clients = {} + self.pending_queries = [] + self.query_timer = None + self.last_query = {} # Register webhooks - webhooks.register_endpoint("objects/list", self._handle_object_request) - webhooks.register_endpoint("objects/status", - self._handle_status_request) - webhooks.register_endpoint("objects/subscription", - self._handle_subscription_request) - webhooks.register_endpoint("objects/list_subscription", - self._handle_list_subscription_request) - - def _handle_ready(self): - eventtime = self.reactor.monotonic() - self.available_objects = {} - objs = self.printer.lookup_objects() - status_objs = {n: o for n, o in objs if hasattr(o, "get_status")} - for name, obj in status_objs.items(): - attrs = obj.get_status(eventtime) - self.available_objects[name] = attrs.keys() - self.ready = True - - def _handle_restart(self, eventtime): - self.ready = False - self.reactor.update_timer(self.subscription_timer, self.reactor.NEVER) - - def _batch_subscription_handler(self, eventtime): - status = self._process_status_request(self.subscriptions, eventtime) - self.webhooks.call_remote_method( - "process_status_update", status=status) - return eventtime + SUBSCRIPTION_REFRESH_TIME - - def _process_status_request(self, requested_objects, eventtime): - result = {} - if self.ready: - for name, req_items in requested_objects.items(): - obj = self.printer.lookup_object(name, None) - if obj is not None and name in self.available_objects: - status = obj.get_status(eventtime) - if not req_items: - # return all items excluding callables - result[name] = {k: v for k, v in status.items() - if not callable(v)} - else: - # return requested items excluding callables - result[name] = {k: v for k, v in status.items() - if k in req_items and not callable(v)} - else: - result = {"status": "Klippy Not Ready"} - return result - - def _handle_object_request(self, web_request): - web_request.send(dict(self.available_objects)) - - def _handle_status_request(self, web_request): - args = web_request.get_args() - eventtime = self.reactor.monotonic() - result = self._process_status_request(args, eventtime) - web_request.send(result) - - def _handle_subscription_request(self, web_request): - args = web_request.get_args() - if args: - self.add_subscripton(args) - else: - raise web_request.error("Invalid argument") - - def _handle_list_subscription_request(self, web_request): - web_request.send(dict(self.subscriptions)) - - def add_subscripton(self, new_sub): - if not new_sub: - return - for obj_name, req_items in new_sub.items(): - if obj_name not in self.available_objects: - logging.info( - "webhooks: Object {%s} not available for subscription" - % (obj_name)) + webhooks = printer.lookup_object('webhooks') + webhooks.register_endpoint("objects/list", self._handle_list) + webhooks.register_endpoint("objects/query", self._handle_query) + webhooks.register_endpoint("objects/subscribe", self._handle_subscribe) + def _handle_list(self, web_request): + objects = [n for n, o in self.printer.lookup_objects() + if hasattr(o, 'get_status')] + web_request.send({'objects': objects}) + def _do_query(self, eventtime): + last_query = self.last_query + query = self.last_query = {} + msglist = self.pending_queries + self.pending_queries = [] + msglist.extend(self.clients.values()) + # Generate get_status() info for each client + for cconn, subscription, send_func, template in msglist: + is_query = cconn is None + if not is_query and cconn.is_closed(): + del self.clients[cconn] continue - # validate requested items - if req_items: - avail_items = set(self.available_objects[obj_name]) - invalid_items = set(req_items) - avail_items - if invalid_items: - logging.info( - "webhooks: Removed invalid items [%s] from " - "subscription request %s" % - (", ".join(invalid_items), obj_name)) - req_items = list(set(req_items) - invalid_items) - if not req_items: - # No valid items remaining - continue - # Add or update subscription - existing_items = self.subscriptions.get(obj_name, None) - if existing_items is not None: - if req_items == [] or existing_items == []: - # Subscribe to all items - self.subscriptions[obj_name] = [] - else: - req_items = list(set(req_items) | set(existing_items)) - self.subscriptions[obj_name] = req_items - else: - self.subscriptions[obj_name] = req_items - if not self.timer_started: - self.reactor.update_timer(self.subscription_timer, self.reactor.NOW) - self.timer_started = True + # Query each requested printer object + cquery = {} + for obj_name, req_items in subscription.items(): + res = query.get(obj_name, None) + if res is None: + po = self.printer.lookup_object(obj_name, None) + if po is None or not hasattr(po, 'get_status'): + res = query[obj_name] = {} + else: + res = query[obj_name] = po.get_status(eventtime) + if req_items is None: + req_items = list(res.keys()) + if req_items: + subscription[obj_name] = req_items + lres = last_query.get(obj_name, {}) + cres = {} + for ri in req_items: + rd = res.get(ri, None) + if not callable(rd) and (is_query or rd != lres.get(ri)): + cres[ri] = rd + if cres or is_query: + cquery[obj_name] = cres + # Send data + if cquery or is_query: + tmp = dict(template) + tmp['params'] = {'eventtime': eventtime, 'status': cquery} + send_func(tmp) + if not query: + # Unregister timer if there are no longer any subscriptions + reactor = self.printer.get_reactor() + reactor.unregister_timer(self.query_timer) + self.query_timer = None + return reactor.NEVER + return eventtime + SUBSCRIPTION_REFRESH_TIME + def _handle_query(self, web_request, is_subscribe=False): + objects = web_request.get('objects') + # Validate subscription format + if type(objects) != type({}): + raise web_request.error("Invalid argument") + for k, v in objects.items(): + if type(k) != type("") or type(v) not in [type([]), type(None)]: + raise web_request.error("Invalid argument") + if v is not None: + for ri in v: + if type(ri) != type(""): + raise web_request.error("Invalid argument") + # Add to pending queries + cconn = web_request.get_client_connection() + template = web_request.get('response_template', {}) + if is_subscribe and cconn in self.clients: + del self.clients[cconn] + reactor = self.printer.get_reactor() + complete = reactor.completion() + self.pending_queries.append((None, objects, complete.complete, {})) + # Start timer if needed + if self.query_timer is None: + qt = reactor.register_timer(self._do_query, reactor.NOW) + self.query_timer = qt + # Wait for data to be queried + msg = complete.wait() + web_request.send(msg['params']) + if is_subscribe: + self.clients[cconn] = (cconn, objects, cconn.send, template) + def _handle_subscribe(self, web_request): + self._handle_query(web_request, is_subscribe=True) def add_early_printer_objects(printer): printer.add_object('webhooks', WebHooks(printer)) GCodeHelper(printer) - StatusHandler(printer) + QueryStatusHelper(printer)