webhooks: Implement unix domain socket server

Signed-off-by:  Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
Arksine 2020-08-08 06:12:01 -04:00 committed by KevinOConnor
parent d4dab9da5d
commit 4dcf494b97
1 changed files with 80 additions and 41 deletions

View File

@ -5,11 +5,12 @@
# This file may be distributed under the terms of the GNU GPLv3 license # This file may be distributed under the terms of the GNU GPLv3 license
import logging import logging
import socket import socket
import os
import errno import errno
import json import json
import homing import homing
SOCKET_LOCATION = "/tmp/moonraker" SERVER_ADDRESS = "/tmp/klippy_uds"
# Json decodes strings as unicode types in Python 2.x. This doesn't # Json decodes strings as unicode types in Python 2.x. This doesn't
# play well with some parts of Klipper (particuarly displays), so we # play well with some parts of Klipper (particuarly displays), so we
@ -89,53 +90,93 @@ class WebRequest:
self.response = "ok" self.response = "ok"
return {"request_id": self.id, "response": self.response} return {"request_id": self.id, "response": self.response}
class ServerConnection: class ServerSocket:
def __init__(self, webhooks, printer): def __init__(self, webhooks, printer):
self.printer = printer self.printer = printer
self.webhooks = webhooks self.webhooks = webhooks
self.reactor = printer.get_reactor() self.reactor = printer.get_reactor()
self.sock = self.fd_handle = None
# Klippy Connection self.clients = {}
self.fd = self.fd_handle = self.mutex = None
self.is_server_connected = False
self.partial_data = ""
is_fileinput = (printer.get_start_args().get('debuginput') is_fileinput = (printer.get_start_args().get('debuginput')
is not None) is not None)
if is_fileinput: if is_fileinput:
# Do not try to connect in klippy batch mode # Do not enable server in batch mode
return return
self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) self._remove_socket_file(SERVER_ADDRESS)
self.socket.setblocking(0) self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
try: self.sock.setblocking(0)
self.socket.connect(SOCKET_LOCATION) self.sock.bind(SERVER_ADDRESS)
except socket.error: self.sock.listen(1)
logging.debug(
"ServerConnection: Moonraker server not detected")
return
logging.debug("ServerConnection: Moonraker connection established")
self.is_server_connected = True
self.fd = self.socket.fileno()
self.fd_handle = self.reactor.register_fd( self.fd_handle = self.reactor.register_fd(
self.fd, self.process_received) self.sock.fileno(), self._handle_accept)
self.mutex = self.reactor.mutex() printer.register_event_handler(
printer.register_event_handler('klippy:disconnect', self.close_socket) 'klippy:disconnect', self._handle_disconnect)
def close_socket(self): def _handle_accept(self, eventtime):
if self.is_server_connected: try:
logging.info("ServerConnection: lost connection to Moonraker") sock, addr = self.sock.accept()
self.is_server_connected = False except socket.error:
return
sock.setblocking(0)
client = ClientConnection(self, sock)
self.clients[client.uid] = client
def _handle_disconnect(self):
for client in list(self.clients.values()):
client.close()
if self.sock is not None:
self.reactor.unregister_fd(self.fd_handle) self.reactor.unregister_fd(self.fd_handle)
try: try:
self.socket.close() self.sock.close()
except socket.error: except socket.error:
pass pass
def is_connected(self): def _remove_socket_file(self, file_path):
return self.is_server_connected try:
os.remove(file_path)
except OSError:
if os.path.exists(file_path):
logging.exception(
"webhooks: Unable to delete socket file '%s'"
% (file_path))
raise
def pop_client(self, client_id):
self.clients.pop(client_id, None)
def send_all_clients(self, data):
for client in self.clients.values():
client.send(data)
class ClientConnection:
def __init__(self, server, sock):
self.printer = server.printer
self.webhooks = server.webhooks
self.reactor = server.reactor
self.server = server
self.uid = id(self)
self.sock = sock
self.fd_handle = self.reactor.register_fd(
self.sock.fileno(), self.process_received)
self.partial_data = ""
self.mutex = self.reactor.mutex()
logging.info(
"webhooks: New connection established")
def close(self):
if self.fd_handle is not None:
logging.info("webhooks: Client connection closed")
self.reactor.unregister_fd(self.fd_handle)
self.fd_handle = None
try:
self.sock.close()
except socket.error:
pass
self.server.pop_client(self.uid)
def process_received(self, eventtime): def process_received(self, eventtime):
try: try:
data = self.socket.recv(4096) data = self.sock.recv(4096)
except socket.error as e: except socket.error as e:
# If bad file descriptor allow connection to be # If bad file descriptor allow connection to be
# closed by the data check # closed by the data check
@ -145,19 +186,19 @@ class ServerConnection:
return return
if data == '': if data == '':
# Socket Closed # Socket Closed
self.close_socket() self.close()
return return
requests = data.split('\x03') requests = data.split('\x03')
requests[0] = self.partial_data + requests[0] requests[0] = self.partial_data + requests[0]
self.partial_data = requests.pop() self.partial_data = requests.pop()
for req in requests: for req in requests:
logging.debug( logging.debug(
"ServerConnection: Request received: %s" % (req)) "webhooks: Request received: %s" % (req))
try: try:
web_request = WebRequest(json_loads_byteified(req)) web_request = WebRequest(json_loads_byteified(req))
except Exception: except Exception:
logging.exception( logging.exception(
"ServerConnection: Error decoding Server Request %s" "webhooks: Error decoding Server Request %s"
% (req)) % (req))
continue continue
self.reactor.register_callback( self.reactor.register_callback(
@ -177,18 +218,16 @@ class ServerConnection:
self.printer.invoke_shutdown(msg) self.printer.invoke_shutdown(msg)
result = web_request.finish() result = web_request.finish()
logging.debug( logging.debug(
"ServerConnection: Sending response - %s" % (str(result))) "webhooks: Sending response - %s" % (str(result)))
self.send({'method': "response", 'params': result}) self.send({'method': "response", 'params': result})
def send(self, data): def send(self, data):
if not self.is_server_connected:
return
with self.mutex: with self.mutex:
retries = 10 retries = 10
data = json.dumps(data) + "\x03" data = json.dumps(data) + "\x03"
while data: while data:
try: try:
sent = self.socket.send(data) sent = self.sock.send(data)
except socket.error as e: except socket.error as e:
if e.errno == errno.EBADF or e.errno == errno.EPIPE \ if e.errno == errno.EBADF or e.errno == errno.EPIPE \
or not retries: or not retries:
@ -203,9 +242,9 @@ class ServerConnection:
data = data[sent:] data = data[sent:]
else: else:
logging.info( logging.info(
"ServerConnection: Error sending server data," "webhooks: Error sending server data,"
" closing socket") " closing socket")
self.close_socket() self.close()
break break
class WebHooks: class WebHooks:
@ -219,7 +258,7 @@ class WebHooks:
log_file = start_args.get('log_file') log_file = start_args.get('log_file')
if log_file is not None: if log_file is not None:
self.register_static_path("klippy.log", log_file) self.register_static_path("klippy.log", log_file)
self.sconn = ServerConnection(self, printer) self.sconn = ServerSocket(self, printer)
StatusHandler(self) StatusHandler(self)
def register_endpoint(self, path, callback): def register_endpoint(self, path, callback):
@ -269,7 +308,7 @@ class WebHooks:
return cb return cb
def call_remote_method(self, method, **kwargs): def call_remote_method(self, method, **kwargs):
self.sconn.send({'method': method, 'params': kwargs}) self.sconn.send_all_clients({'method': method, 'params': kwargs})
def _action_call_remote_method(self, method, **kwargs): def _action_call_remote_method(self, method, **kwargs):
self.call_remote_method(method, **kwargs) self.call_remote_method(method, **kwargs)