reactor: Add support for waiting on fds becoming writable

Signed-off-by: Kevin O'Connor <kevin@koconnor.net>
This commit is contained in:
Kevin O'Connor 2022-05-23 21:23:56 -04:00
parent be503b2b9b
commit 02dd0742c4
1 changed files with 75 additions and 24 deletions

View File

@ -50,9 +50,10 @@ class ReactorCallback:
return self.reactor.NEVER return self.reactor.NEVER
class ReactorFileHandler: class ReactorFileHandler:
def __init__(self, fd, callback): def __init__(self, fd, read_callback, write_callback):
self.fd = fd self.fd = fd
self.callback = callback self.read_callback = read_callback
self.write_callback = write_callback
def fileno(self): def fileno(self):
return self.fd return self.fd
@ -107,7 +108,8 @@ class SelectReactor:
self._pipe_fds = None self._pipe_fds = None
self._async_queue = queue.Queue() self._async_queue = queue.Queue()
# File descriptors # File descriptors
self._fds = [] self._read_fds = []
self._write_fds = []
# Greenlets # Greenlets
self._g_dispatch = None self._g_dispatch = None
self._greenlets = [] self._greenlets = []
@ -236,12 +238,26 @@ class SelectReactor:
def mutex(self, is_locked=False): def mutex(self, is_locked=False):
return ReactorMutex(self, is_locked) return ReactorMutex(self, is_locked)
# File descriptors # File descriptors
def register_fd(self, fd, callback): def register_fd(self, fd, read_callback, write_callback=None):
file_handler = ReactorFileHandler(fd, callback) file_handler = ReactorFileHandler(fd, read_callback, write_callback)
self._fds.append(file_handler) self.set_fd_wake(file_handle, True, False)
return file_handler return file_handler
def unregister_fd(self, file_handler): def unregister_fd(self, file_handler):
self._fds.pop(self._fds.index(file_handler)) if file_handler in self._read_fds:
self._read_fds.pop(self._read_fds.index(file_handler))
if file_handler in self._write_fds:
self._write_fds.pop(self._write_fds.index(file_handler))
def set_fd_wake(self, file_handler, is_readable=True, is_writeable=False):
if file_hander in self._read_fds:
if not is_readable:
self._read_fds.pop(self._read_fds.index(file_handler))
elif is_readable:
self._read_fds.append(file_handler)
if file_hander in self._write_fds:
if not is_writeable:
self._write_fds.pop(self._write_fds.index(file_handler))
elif is_writeable:
self._write_fds.append(file_handler)
# Main loop # Main loop
def _dispatch_loop(self): def _dispatch_loop(self):
self._g_dispatch = g_dispatch = greenlet.getcurrent() self._g_dispatch = g_dispatch = greenlet.getcurrent()
@ -250,11 +266,18 @@ class SelectReactor:
while self._process: while self._process:
timeout = self._check_timers(eventtime, busy) timeout = self._check_timers(eventtime, busy)
busy = False busy = False
res = select.select(self._fds, [], [], timeout) res = select.select(self._read_fds, self.write_fds, [], timeout)
eventtime = self.monotonic() eventtime = self.monotonic()
for fd in res[0]: for fd in res[0]:
busy = True busy = True
fd.callback(eventtime) fd.read_callback(eventtime)
if g_dispatch is not self._g_dispatch:
self._end_greenlet(g_dispatch)
eventtime = self.monotonic()
break
for fd in res[1]:
busy = True
fd.write_callback(eventtime)
if g_dispatch is not self._g_dispatch: if g_dispatch is not self._g_dispatch:
self._end_greenlet(g_dispatch) self._end_greenlet(g_dispatch)
eventtime = self.monotonic() eventtime = self.monotonic()
@ -289,10 +312,10 @@ class PollReactor(SelectReactor):
self._poll = select.poll() self._poll = select.poll()
self._fds = {} self._fds = {}
# File descriptors # File descriptors
def register_fd(self, fd, callback): def register_fd(self, fd, read_callback, write_callback=None):
file_handler = ReactorFileHandler(fd, callback) file_handler = ReactorFileHandler(fd, read_callback, write_callback)
fds = self._fds.copy() fds = self._fds.copy()
fds[fd] = callback fds[fd] = file_handler
self._fds = fds self._fds = fds
self._poll.register(file_handler, select.POLLIN | select.POLLHUP) self._poll.register(file_handler, select.POLLIN | select.POLLHUP)
return file_handler return file_handler
@ -301,6 +324,13 @@ class PollReactor(SelectReactor):
fds = self._fds.copy() fds = self._fds.copy()
del fds[file_handler.fd] del fds[file_handler.fd]
self._fds = fds self._fds = fds
def set_fd_wake(self, file_handler, is_readable=True, is_writeable=False):
flags = select.POLLHUP
if is_readable:
flags |= select.POLLIN
if is_writeable:
flags |= select.POLLOUT
self._poll.modify(file_handler, flags)
# Main loop # Main loop
def _dispatch_loop(self): def _dispatch_loop(self):
self._g_dispatch = g_dispatch = greenlet.getcurrent() self._g_dispatch = g_dispatch = greenlet.getcurrent()
@ -313,7 +343,14 @@ class PollReactor(SelectReactor):
eventtime = self.monotonic() eventtime = self.monotonic()
for fd, event in res: for fd, event in res:
busy = True busy = True
self._fds[fd](eventtime) if event & (select.POLLIN | select.POLLHUP):
self._fds[fd].read_callback(eventtime)
if g_dispatch is not self._g_dispatch:
self._end_greenlet(g_dispatch)
eventtime = self.monotonic()
break
if event & select.POLLOUT:
self._fds[fd].write_callback(eventtime)
if g_dispatch is not self._g_dispatch: if g_dispatch is not self._g_dispatch:
self._end_greenlet(g_dispatch) self._end_greenlet(g_dispatch)
eventtime = self.monotonic() eventtime = self.monotonic()
@ -326,8 +363,8 @@ class EPollReactor(SelectReactor):
self._epoll = select.epoll() self._epoll = select.epoll()
self._fds = {} self._fds = {}
# File descriptors # File descriptors
def register_fd(self, fd, callback): def register_fd(self, fd, read_callback, write_callback=None):
file_handler = ReactorFileHandler(fd, callback) file_handler = ReactorFileHandler(fd, read_callback, write_callback)
fds = self._fds.copy() fds = self._fds.copy()
fds[fd] = callback fds[fd] = callback
self._fds = fds self._fds = fds
@ -338,6 +375,13 @@ class EPollReactor(SelectReactor):
fds = self._fds.copy() fds = self._fds.copy()
del fds[file_handler.fd] del fds[file_handler.fd]
self._fds = fds self._fds = fds
def set_fd_wake(self, file_handler, is_readable=True, is_writeable=False):
flags = select.POLLHUP
if is_readable:
flags |= select.EPOLLIN
if is_writeable:
flags |= select.EPOLLOUT
self._epoll.modify(file_handler, flags)
# Main loop # Main loop
def _dispatch_loop(self): def _dispatch_loop(self):
self._g_dispatch = g_dispatch = greenlet.getcurrent() self._g_dispatch = g_dispatch = greenlet.getcurrent()
@ -350,7 +394,14 @@ class EPollReactor(SelectReactor):
eventtime = self.monotonic() eventtime = self.monotonic()
for fd, event in res: for fd, event in res:
busy = True busy = True
self._fds[fd](eventtime) if event & (select.EPOLLIN | select.EPOLLHUP):
self._fds[fd].read_callback(eventtime)
if g_dispatch is not self._g_dispatch:
self._end_greenlet(g_dispatch)
eventtime = self.monotonic()
break
if event & select.EPOLLOUT:
self._fds[fd].write_callback(eventtime)
if g_dispatch is not self._g_dispatch: if g_dispatch is not self._g_dispatch:
self._end_greenlet(g_dispatch) self._end_greenlet(g_dispatch)
eventtime = self.monotonic() eventtime = self.monotonic()