webhooks: Rework get_status() subscriptions

Implement a new subscription system for get_status() updates.
Subscriptions are per-client.  After an initial update, only changes
will be transmitted.  Responses are only transmitted to the client
that issued the subscription.

Signed-off-by: Kevin O'Connor <kevin@koconnor.net>
This commit is contained in:
Kevin O'Connor 2020-08-11 21:31:09 -04:00
parent 16a53e6918
commit b0e3effb53
1 changed files with 90 additions and 117 deletions

View File

@ -357,128 +357,101 @@ class GCodeHelper:
SUBSCRIPTION_REFRESH_TIME = .25
class StatusHandler:
class QueryStatusHelper:
def __init__(self, printer):
self.printer = printer
self.webhooks = webhooks = printer.lookup_object('webhooks')
self.ready = self.timer_started = False
self.reactor = self.printer.get_reactor()
self.available_objects = {}
self.subscriptions = {}
self.subscription_timer = self.reactor.register_timer(
self._batch_subscription_handler, self.reactor.NEVER)
# Register events
self.printer.register_event_handler(
"klippy:ready", self._handle_ready)
self.printer.register_event_handler(
"gcode:request_restart", self._handle_restart)
self.clients = {}
self.pending_queries = []
self.query_timer = None
self.last_query = {}
# Register webhooks
webhooks.register_endpoint("objects/list", self._handle_object_request)
webhooks.register_endpoint("objects/status",
self._handle_status_request)
webhooks.register_endpoint("objects/subscription",
self._handle_subscription_request)
webhooks.register_endpoint("objects/list_subscription",
self._handle_list_subscription_request)
def _handle_ready(self):
eventtime = self.reactor.monotonic()
self.available_objects = {}
objs = self.printer.lookup_objects()
status_objs = {n: o for n, o in objs if hasattr(o, "get_status")}
for name, obj in status_objs.items():
attrs = obj.get_status(eventtime)
self.available_objects[name] = attrs.keys()
self.ready = True
def _handle_restart(self, eventtime):
self.ready = False
self.reactor.update_timer(self.subscription_timer, self.reactor.NEVER)
def _batch_subscription_handler(self, eventtime):
status = self._process_status_request(self.subscriptions, eventtime)
self.webhooks.call_remote_method(
"process_status_update", status=status)
return eventtime + SUBSCRIPTION_REFRESH_TIME
def _process_status_request(self, requested_objects, eventtime):
result = {}
if self.ready:
for name, req_items in requested_objects.items():
obj = self.printer.lookup_object(name, None)
if obj is not None and name in self.available_objects:
status = obj.get_status(eventtime)
if not req_items:
# return all items excluding callables
result[name] = {k: v for k, v in status.items()
if not callable(v)}
else:
# return requested items excluding callables
result[name] = {k: v for k, v in status.items()
if k in req_items and not callable(v)}
else:
result = {"status": "Klippy Not Ready"}
return result
def _handle_object_request(self, web_request):
web_request.send(dict(self.available_objects))
def _handle_status_request(self, web_request):
args = web_request.get_args()
eventtime = self.reactor.monotonic()
result = self._process_status_request(args, eventtime)
web_request.send(result)
def _handle_subscription_request(self, web_request):
args = web_request.get_args()
if args:
self.add_subscripton(args)
else:
raise web_request.error("Invalid argument")
def _handle_list_subscription_request(self, web_request):
web_request.send(dict(self.subscriptions))
def add_subscripton(self, new_sub):
if not new_sub:
return
for obj_name, req_items in new_sub.items():
if obj_name not in self.available_objects:
logging.info(
"webhooks: Object {%s} not available for subscription"
% (obj_name))
webhooks = printer.lookup_object('webhooks')
webhooks.register_endpoint("objects/list", self._handle_list)
webhooks.register_endpoint("objects/query", self._handle_query)
webhooks.register_endpoint("objects/subscribe", self._handle_subscribe)
def _handle_list(self, web_request):
objects = [n for n, o in self.printer.lookup_objects()
if hasattr(o, 'get_status')]
web_request.send({'objects': objects})
def _do_query(self, eventtime):
last_query = self.last_query
query = self.last_query = {}
msglist = self.pending_queries
self.pending_queries = []
msglist.extend(self.clients.values())
# Generate get_status() info for each client
for cconn, subscription, send_func, template in msglist:
is_query = cconn is None
if not is_query and cconn.is_closed():
del self.clients[cconn]
continue
# validate requested items
if req_items:
avail_items = set(self.available_objects[obj_name])
invalid_items = set(req_items) - avail_items
if invalid_items:
logging.info(
"webhooks: Removed invalid items [%s] from "
"subscription request %s" %
(", ".join(invalid_items), obj_name))
req_items = list(set(req_items) - invalid_items)
if not req_items:
# No valid items remaining
continue
# Add or update subscription
existing_items = self.subscriptions.get(obj_name, None)
if existing_items is not None:
if req_items == [] or existing_items == []:
# Subscribe to all items
self.subscriptions[obj_name] = []
else:
req_items = list(set(req_items) | set(existing_items))
self.subscriptions[obj_name] = req_items
else:
self.subscriptions[obj_name] = req_items
if not self.timer_started:
self.reactor.update_timer(self.subscription_timer, self.reactor.NOW)
self.timer_started = True
# Query each requested printer object
cquery = {}
for obj_name, req_items in subscription.items():
res = query.get(obj_name, None)
if res is None:
po = self.printer.lookup_object(obj_name, None)
if po is None or not hasattr(po, 'get_status'):
res = query[obj_name] = {}
else:
res = query[obj_name] = po.get_status(eventtime)
if req_items is None:
req_items = list(res.keys())
if req_items:
subscription[obj_name] = req_items
lres = last_query.get(obj_name, {})
cres = {}
for ri in req_items:
rd = res.get(ri, None)
if not callable(rd) and (is_query or rd != lres.get(ri)):
cres[ri] = rd
if cres or is_query:
cquery[obj_name] = cres
# Send data
if cquery or is_query:
tmp = dict(template)
tmp['params'] = {'eventtime': eventtime, 'status': cquery}
send_func(tmp)
if not query:
# Unregister timer if there are no longer any subscriptions
reactor = self.printer.get_reactor()
reactor.unregister_timer(self.query_timer)
self.query_timer = None
return reactor.NEVER
return eventtime + SUBSCRIPTION_REFRESH_TIME
def _handle_query(self, web_request, is_subscribe=False):
objects = web_request.get('objects')
# Validate subscription format
if type(objects) != type({}):
raise web_request.error("Invalid argument")
for k, v in objects.items():
if type(k) != type("") or type(v) not in [type([]), type(None)]:
raise web_request.error("Invalid argument")
if v is not None:
for ri in v:
if type(ri) != type(""):
raise web_request.error("Invalid argument")
# Add to pending queries
cconn = web_request.get_client_connection()
template = web_request.get('response_template', {})
if is_subscribe and cconn in self.clients:
del self.clients[cconn]
reactor = self.printer.get_reactor()
complete = reactor.completion()
self.pending_queries.append((None, objects, complete.complete, {}))
# Start timer if needed
if self.query_timer is None:
qt = reactor.register_timer(self._do_query, reactor.NOW)
self.query_timer = qt
# Wait for data to be queried
msg = complete.wait()
web_request.send(msg['params'])
if is_subscribe:
self.clients[cconn] = (cconn, objects, cconn.send, template)
def _handle_subscribe(self, web_request):
self._handle_query(web_request, is_subscribe=True)
def add_early_printer_objects(printer):
printer.add_object('webhooks', WebHooks(printer))
GCodeHelper(printer)
StatusHandler(printer)
QueryStatusHelper(printer)