diff --git a/klippy/webhooks.py b/klippy/webhooks.py index e39a5eaf..9f4fa5bf 100644 --- a/klippy/webhooks.py +++ b/klippy/webhooks.py @@ -5,11 +5,12 @@ # This file may be distributed under the terms of the GNU GPLv3 license import logging import socket +import os import errno import json import homing -SOCKET_LOCATION = "/tmp/moonraker" +SERVER_ADDRESS = "/tmp/klippy_uds" # Json decodes strings as unicode types in Python 2.x. This doesn't # play well with some parts of Klipper (particuarly displays), so we @@ -89,53 +90,93 @@ class WebRequest: self.response = "ok" return {"request_id": self.id, "response": self.response} -class ServerConnection: +class ServerSocket: def __init__(self, webhooks, printer): self.printer = printer self.webhooks = webhooks self.reactor = printer.get_reactor() - - # Klippy Connection - self.fd = self.fd_handle = self.mutex = None - self.is_server_connected = False - self.partial_data = "" + self.sock = self.fd_handle = None + self.clients = {} is_fileinput = (printer.get_start_args().get('debuginput') is not None) if is_fileinput: - # Do not try to connect in klippy batch mode + # Do not enable server in batch mode return - self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - self.socket.setblocking(0) - try: - self.socket.connect(SOCKET_LOCATION) - except socket.error: - logging.debug( - "ServerConnection: Moonraker server not detected") - return - logging.debug("ServerConnection: Moonraker connection established") - self.is_server_connected = True - self.fd = self.socket.fileno() + self._remove_socket_file(SERVER_ADDRESS) + self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + self.sock.setblocking(0) + self.sock.bind(SERVER_ADDRESS) + self.sock.listen(1) self.fd_handle = self.reactor.register_fd( - self.fd, self.process_received) - self.mutex = self.reactor.mutex() - printer.register_event_handler('klippy:disconnect', self.close_socket) + self.sock.fileno(), self._handle_accept) + printer.register_event_handler( + 'klippy:disconnect', self._handle_disconnect) - def close_socket(self): - if self.is_server_connected: - logging.info("ServerConnection: lost connection to Moonraker") - self.is_server_connected = False + def _handle_accept(self, eventtime): + try: + sock, addr = self.sock.accept() + except socket.error: + return + sock.setblocking(0) + client = ClientConnection(self, sock) + self.clients[client.uid] = client + + def _handle_disconnect(self): + for client in list(self.clients.values()): + client.close() + if self.sock is not None: self.reactor.unregister_fd(self.fd_handle) try: - self.socket.close() + self.sock.close() except socket.error: pass - def is_connected(self): - return self.is_server_connected + def _remove_socket_file(self, file_path): + try: + os.remove(file_path) + except OSError: + if os.path.exists(file_path): + logging.exception( + "webhooks: Unable to delete socket file '%s'" + % (file_path)) + raise + + def pop_client(self, client_id): + self.clients.pop(client_id, None) + + def send_all_clients(self, data): + for client in self.clients.values(): + client.send(data) + +class ClientConnection: + def __init__(self, server, sock): + self.printer = server.printer + self.webhooks = server.webhooks + self.reactor = server.reactor + self.server = server + self.uid = id(self) + self.sock = sock + self.fd_handle = self.reactor.register_fd( + self.sock.fileno(), self.process_received) + self.partial_data = "" + self.mutex = self.reactor.mutex() + logging.info( + "webhooks: New connection established") + + def close(self): + if self.fd_handle is not None: + logging.info("webhooks: Client connection closed") + self.reactor.unregister_fd(self.fd_handle) + self.fd_handle = None + try: + self.sock.close() + except socket.error: + pass + self.server.pop_client(self.uid) def process_received(self, eventtime): try: - data = self.socket.recv(4096) + data = self.sock.recv(4096) except socket.error as e: # If bad file descriptor allow connection to be # closed by the data check @@ -145,19 +186,19 @@ class ServerConnection: return if data == '': # Socket Closed - self.close_socket() + self.close() return requests = data.split('\x03') requests[0] = self.partial_data + requests[0] self.partial_data = requests.pop() for req in requests: logging.debug( - "ServerConnection: Request received: %s" % (req)) + "webhooks: Request received: %s" % (req)) try: web_request = WebRequest(json_loads_byteified(req)) except Exception: logging.exception( - "ServerConnection: Error decoding Server Request %s" + "webhooks: Error decoding Server Request %s" % (req)) continue self.reactor.register_callback( @@ -177,18 +218,16 @@ class ServerConnection: self.printer.invoke_shutdown(msg) result = web_request.finish() logging.debug( - "ServerConnection: Sending response - %s" % (str(result))) + "webhooks: Sending response - %s" % (str(result))) self.send({'method': "response", 'params': result}) def send(self, data): - if not self.is_server_connected: - return with self.mutex: retries = 10 data = json.dumps(data) + "\x03" while data: try: - sent = self.socket.send(data) + sent = self.sock.send(data) except socket.error as e: if e.errno == errno.EBADF or e.errno == errno.EPIPE \ or not retries: @@ -203,9 +242,9 @@ class ServerConnection: data = data[sent:] else: logging.info( - "ServerConnection: Error sending server data," + "webhooks: Error sending server data," " closing socket") - self.close_socket() + self.close() break class WebHooks: @@ -219,7 +258,7 @@ class WebHooks: log_file = start_args.get('log_file') if log_file is not None: self.register_static_path("klippy.log", log_file) - self.sconn = ServerConnection(self, printer) + self.sconn = ServerSocket(self, printer) StatusHandler(self) def register_endpoint(self, path, callback): @@ -269,7 +308,7 @@ class WebHooks: return cb def call_remote_method(self, method, **kwargs): - self.sconn.send({'method': method, 'params': kwargs}) + self.sconn.send_all_clients({'method': method, 'params': kwargs}) def _action_call_remote_method(self, method, **kwargs): self.call_remote_method(method, **kwargs)