webhooks: Implement StatusHandler class

This class provides endpoints that allow connected hosts to fetch the state of printer objects and subscribe to state "pushed" over the connection.

Signed-off-by:  Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
Arksine 2020-08-04 18:01:15 -04:00 committed by Kevin O'Connor
parent cde7e75139
commit 27ef58642e
1 changed files with 134 additions and 1 deletions

View File

@ -220,13 +220,13 @@ class WebHooks:
if log_file is not None: if log_file is not None:
self.register_static_path("klippy.log", log_file) self.register_static_path("klippy.log", log_file)
self.sconn = ServerConnection(self, printer) self.sconn = ServerConnection(self, printer)
StatusHandler(self)
def register_endpoint(self, path, callback): def register_endpoint(self, path, callback):
if path in self._endpoints: if path in self._endpoints:
raise WebRequestError("Path already registered to an endpoint") raise WebRequestError("Path already registered to an endpoint")
self._endpoints[path] = callback self._endpoints[path] = callback
def register_static_path(self, resource_id, file_path): def register_static_path(self, resource_id, file_path):
static_path_info = { static_path_info = {
'resource_id': resource_id, 'file_path': file_path} 'resource_id': resource_id, 'file_path': file_path}
@ -280,5 +280,138 @@ class WebHooks:
"action_call_remote_method": self._action_call_remote_method "action_call_remote_method": self._action_call_remote_method
} }
SUBSCRIPTION_REFRESH_TIME = .25
class StatusHandler:
def __init__(self, webhooks):
self.printer = webhooks.printer
self.webhooks = webhooks
self.ready = self.timer_started = False
self.reactor = self.printer.get_reactor()
self.available_objects = {}
self.subscriptions = {}
self.subscription_timer = self.reactor.register_timer(
self._batch_subscription_handler, self.reactor.NEVER)
# Register events
self.printer.register_event_handler(
"klippy:ready", self._handle_ready)
self.printer.register_event_handler(
"gcode:request_restart", self._handle_restart)
# Register webhooks
webhooks.register_endpoint(
"objects/list",
self._handle_object_request)
webhooks.register_endpoint(
"objects/status",
self._handle_status_request)
webhooks.register_endpoint(
"objects/subscription",
self._handle_subscription_request)
def _handle_ready(self):
eventtime = self.reactor.monotonic()
self.available_objects = {}
objs = self.printer.lookup_objects()
status_objs = {n: o for n, o in objs if hasattr(o, "get_status")}
for name, obj in status_objs.items():
attrs = obj.get_status(eventtime)
self.available_objects[name] = attrs.keys()
self.ready = True
def _handle_restart(self, eventtime):
self.ready = False
self.reactor.update_timer(self.subscription_timer, self.reactor.NEVER)
def _batch_subscription_handler(self, eventtime):
status = self._process_status_request(self.subscriptions, eventtime)
self.webhooks.call_remote_method(
"process_status_update", status=status)
return eventtime + SUBSCRIPTION_REFRESH_TIME
def _process_status_request(self, requested_objects, eventtime):
result = {}
if self.ready:
for name, req_items in requested_objects.items():
obj = self.printer.lookup_object(name, None)
if obj is not None and name in self.available_objects:
status = obj.get_status(eventtime)
if not req_items:
# return all items excluding callables
result[name] = {k: v for k, v in status.items()
if not callable(v)}
else:
# return requested items excluding callables
result[name] = {k: v for k, v in status.items()
if k in req_items and not callable(v)}
else:
result = {"status": "Klippy Not Ready"}
return result
def _handle_object_request(self, web_request):
if web_request.get_method() != 'GET':
raise web_request.error("Invalid Request Method")
web_request.send(dict(self.available_objects))
def _handle_status_request(self, web_request):
if web_request.get_method() != 'GET':
raise web_request.error("Invalid Request Method")
args = web_request.get_args()
eventtime = self.reactor.monotonic()
result = self._process_status_request(args, eventtime)
web_request.send(result)
def _handle_subscription_request(self, web_request):
method = web_request.get_method()
if method == 'POST':
# add a subscription
args = web_request.get_args()
if args:
self.add_subscripton(args)
else:
raise web_request.error("Invalid argument")
else:
# get subscription info
result = dict(self.subscriptions)
web_request.send(result)
def add_subscripton(self, new_sub):
if not new_sub:
return
for obj_name, req_items in new_sub.items():
if obj_name not in self.available_objects:
logging.info(
"webhooks: Object {%s} not available for subscription"
% (obj_name))
continue
# validate requested items
if req_items:
avail_items = set(self.available_objects[obj_name])
invalid_items = set(req_items) - avail_items
if invalid_items:
logging.info(
"webhooks: Removed invalid items [%s] from "
"subscription request %s" %
(", ".join(invalid_items), obj_name))
req_items = list(set(req_items) - invalid_items)
if not req_items:
# No valid items remaining
continue
# Add or update subscription
existing_items = self.subscriptions.get(obj_name, None)
if existing_items is not None:
if req_items == [] or existing_items == []:
# Subscribe to all items
self.subscriptions[obj_name] = []
else:
req_items = list(set(req_items) | set(existing_items))
self.subscriptions[obj_name] = req_items
else:
self.subscriptions[obj_name] = req_items
if not self.timer_started:
self.reactor.update_timer(self.subscription_timer, self.reactor.NOW)
self.timer_started = True
def add_early_printer_objects(printer): def add_early_printer_objects(printer):
printer.add_object('webhooks', WebHooks(printer)) printer.add_object('webhooks', WebHooks(printer))