mirror of https://github.com/Desuuuu/klipper.git
whconsole: Add utility for testing the "webhooks" interface
Signed-off-by: Kevin O'Connor <kevin@koconnor.net>
This commit is contained in:
parent
118ef908a5
commit
ebc79a1ee8
|
@ -0,0 +1,89 @@
|
||||||
|
#!/usr/bin/env python2
|
||||||
|
# Test console for webhooks interface
|
||||||
|
#
|
||||||
|
# Copyright (C) 2020 Kevin O'Connor <kevin@koconnor.net>
|
||||||
|
#
|
||||||
|
# This file may be distributed under the terms of the GNU GPLv3 license.
|
||||||
|
import sys, os, optparse, socket, fcntl, select, json, errno, time
|
||||||
|
|
||||||
|
# Set a file-descriptor as non-blocking
|
||||||
|
def set_nonblock(fd):
|
||||||
|
fcntl.fcntl(fd, fcntl.F_SETFL
|
||||||
|
, fcntl.fcntl(fd, fcntl.F_GETFL) | os.O_NONBLOCK)
|
||||||
|
|
||||||
|
def webhook_socket_create(uds_filename):
|
||||||
|
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
||||||
|
sock.setblocking(0)
|
||||||
|
sys.stderr.write("Waiting for connect to %s\n" % (uds_filename,))
|
||||||
|
while 1:
|
||||||
|
try:
|
||||||
|
sock.connect(uds_filename)
|
||||||
|
except socket.error as e:
|
||||||
|
if e.errno == errno.ECONNREFUSED:
|
||||||
|
time.sleep(0.1)
|
||||||
|
continue
|
||||||
|
sys.stderr.write("Unable to connect socket %s [%d,%s]\n"
|
||||||
|
% (uds_filename, e.errno,
|
||||||
|
errno.errorcode[e.errno]))
|
||||||
|
sys.exit(-1)
|
||||||
|
break
|
||||||
|
sys.stderr.write("Connection.\n")
|
||||||
|
return sock
|
||||||
|
|
||||||
|
class KeyboardReader:
|
||||||
|
def __init__(self, uds_filename):
|
||||||
|
self.kbd_fd = sys.stdin.fileno()
|
||||||
|
set_nonblock(self.kbd_fd)
|
||||||
|
self.webhook_socket = webhook_socket_create(uds_filename)
|
||||||
|
self.poll = select.poll()
|
||||||
|
self.poll.register(sys.stdin, select.POLLIN | select.POLLHUP)
|
||||||
|
self.poll.register(self.webhook_socket, select.POLLIN | select.POLLHUP)
|
||||||
|
self.kbd_data = self.socket_data = ""
|
||||||
|
def process_socket(self):
|
||||||
|
data = self.webhook_socket.recv(4096)
|
||||||
|
if not data:
|
||||||
|
sys.stderr.write("Socket closed\n")
|
||||||
|
sys.exit(0)
|
||||||
|
parts = data.split('\x03')
|
||||||
|
parts[0] = self.socket_data + parts[0]
|
||||||
|
self.socket_data = parts.pop()
|
||||||
|
for line in parts:
|
||||||
|
sys.stdout.write("GOT: %s\n" % (line,))
|
||||||
|
def process_kbd(self):
|
||||||
|
data = os.read(self.kbd_fd, 4096)
|
||||||
|
parts = data.split('\n')
|
||||||
|
parts[0] = self.kbd_data + parts[0]
|
||||||
|
self.kbd_data = parts.pop()
|
||||||
|
for line in parts:
|
||||||
|
line = line.strip()
|
||||||
|
if not line or line.startswith('#'):
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
m = json.loads(line)
|
||||||
|
except:
|
||||||
|
sys.stderr.write("ERROR: Unable to parse line\n")
|
||||||
|
continue
|
||||||
|
cm = json.dumps(m, separators=(',', ':'))
|
||||||
|
sys.stdout.write("SEND: %s\n" % (cm,))
|
||||||
|
self.webhook_socket.send("%s\x03" % (cm,))
|
||||||
|
def run(self):
|
||||||
|
while 1:
|
||||||
|
res = self.poll.poll(1000.)
|
||||||
|
for fd, event in res:
|
||||||
|
if fd == self.kbd_fd:
|
||||||
|
self.process_kbd()
|
||||||
|
else:
|
||||||
|
self.process_socket()
|
||||||
|
|
||||||
|
def main():
|
||||||
|
usage = "%prog [options] <socket filename>"
|
||||||
|
opts = optparse.OptionParser(usage)
|
||||||
|
options, args = opts.parse_args()
|
||||||
|
if len(args) != 1:
|
||||||
|
opts.error("Incorrect number of arguments")
|
||||||
|
|
||||||
|
ml = KeyboardReader(args[0])
|
||||||
|
ml.run()
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
main()
|
Loading…
Reference in New Issue