scripts: Add a supervisor package, to use the XMLRPC Supervisor API.

For many uses, shelling out to `supervisorctl` is going to produce
better error messages.  However, for instances where we wish to parse
the output of `supervisorctl`, using the API directly is less brittle.
This commit is contained in:
Alex Vandiver 2022-01-19 23:58:13 +00:00 committed by Tim Abbott
parent be7108ebca
commit 8e35cdb3da
1 changed files with 27 additions and 0 deletions

27
scripts/lib/supervisor.py Normal file
View File

@ -0,0 +1,27 @@
import socket
from http.client import HTTPConnection
from typing import Dict, List, Optional, Tuple, Union
from xmlrpc import client
class UnixStreamHTTPConnection(HTTPConnection):
def connect(self) -> None:
self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
self.sock.connect(self.host)
class UnixStreamTransport(client.Transport, object):
def __init__(self, socket_path: str) -> None:
self.socket_path = socket_path
super(UnixStreamTransport, self).__init__()
def make_connection(
self, host: Union[Tuple[str, Dict[str, str]], str]
) -> UnixStreamHTTPConnection:
return UnixStreamHTTPConnection(self.socket_path)
def rpc() -> client.ServerProxy:
return client.ServerProxy(
"http://localhost", transport=UnixStreamTransport("/var/run/supervisor.sock")
)