diff --git a/klippy/reactor.py b/klippy/reactor.py index 69eedcbd..5b1ec569 100644 --- a/klippy/reactor.py +++ b/klippy/reactor.py @@ -50,9 +50,10 @@ class ReactorCallback: return self.reactor.NEVER class ReactorFileHandler: - def __init__(self, fd, callback): + def __init__(self, fd, read_callback, write_callback): self.fd = fd - self.callback = callback + self.read_callback = read_callback + self.write_callback = write_callback def fileno(self): return self.fd @@ -107,7 +108,8 @@ class SelectReactor: self._pipe_fds = None self._async_queue = queue.Queue() # File descriptors - self._fds = [] + self._read_fds = [] + self._write_fds = [] # Greenlets self._g_dispatch = None self._greenlets = [] @@ -236,12 +238,26 @@ class SelectReactor: def mutex(self, is_locked=False): return ReactorMutex(self, is_locked) # File descriptors - def register_fd(self, fd, callback): - file_handler = ReactorFileHandler(fd, callback) - self._fds.append(file_handler) + def register_fd(self, fd, read_callback, write_callback=None): + file_handler = ReactorFileHandler(fd, read_callback, write_callback) + self.set_fd_wake(file_handle, True, False) return file_handler def unregister_fd(self, file_handler): - self._fds.pop(self._fds.index(file_handler)) + if file_handler in self._read_fds: + self._read_fds.pop(self._read_fds.index(file_handler)) + if file_handler in self._write_fds: + self._write_fds.pop(self._write_fds.index(file_handler)) + def set_fd_wake(self, file_handler, is_readable=True, is_writeable=False): + if file_hander in self._read_fds: + if not is_readable: + self._read_fds.pop(self._read_fds.index(file_handler)) + elif is_readable: + self._read_fds.append(file_handler) + if file_hander in self._write_fds: + if not is_writeable: + self._write_fds.pop(self._write_fds.index(file_handler)) + elif is_writeable: + self._write_fds.append(file_handler) # Main loop def _dispatch_loop(self): self._g_dispatch = g_dispatch = greenlet.getcurrent() @@ -250,11 +266,18 @@ class SelectReactor: while self._process: timeout = self._check_timers(eventtime, busy) busy = False - res = select.select(self._fds, [], [], timeout) + res = select.select(self._read_fds, self.write_fds, [], timeout) eventtime = self.monotonic() for fd in res[0]: busy = True - fd.callback(eventtime) + fd.read_callback(eventtime) + if g_dispatch is not self._g_dispatch: + self._end_greenlet(g_dispatch) + eventtime = self.monotonic() + break + for fd in res[1]: + busy = True + fd.write_callback(eventtime) if g_dispatch is not self._g_dispatch: self._end_greenlet(g_dispatch) eventtime = self.monotonic() @@ -289,10 +312,10 @@ class PollReactor(SelectReactor): self._poll = select.poll() self._fds = {} # File descriptors - def register_fd(self, fd, callback): - file_handler = ReactorFileHandler(fd, callback) + def register_fd(self, fd, read_callback, write_callback=None): + file_handler = ReactorFileHandler(fd, read_callback, write_callback) fds = self._fds.copy() - fds[fd] = callback + fds[fd] = file_handler self._fds = fds self._poll.register(file_handler, select.POLLIN | select.POLLHUP) return file_handler @@ -301,6 +324,13 @@ class PollReactor(SelectReactor): fds = self._fds.copy() del fds[file_handler.fd] self._fds = fds + def set_fd_wake(self, file_handler, is_readable=True, is_writeable=False): + flags = select.POLLHUP + if is_readable: + flags |= select.POLLIN + if is_writeable: + flags |= select.POLLOUT + self._poll.modify(file_handler, flags) # Main loop def _dispatch_loop(self): self._g_dispatch = g_dispatch = greenlet.getcurrent() @@ -313,11 +343,18 @@ class PollReactor(SelectReactor): eventtime = self.monotonic() for fd, event in res: busy = True - self._fds[fd](eventtime) - if g_dispatch is not self._g_dispatch: - self._end_greenlet(g_dispatch) - eventtime = self.monotonic() - break + if event & (select.POLLIN | select.POLLHUP): + self._fds[fd].read_callback(eventtime) + if g_dispatch is not self._g_dispatch: + self._end_greenlet(g_dispatch) + eventtime = self.monotonic() + break + if event & select.POLLOUT: + self._fds[fd].write_callback(eventtime) + if g_dispatch is not self._g_dispatch: + self._end_greenlet(g_dispatch) + eventtime = self.monotonic() + break self._g_dispatch = None class EPollReactor(SelectReactor): @@ -326,8 +363,8 @@ class EPollReactor(SelectReactor): self._epoll = select.epoll() self._fds = {} # File descriptors - def register_fd(self, fd, callback): - file_handler = ReactorFileHandler(fd, callback) + def register_fd(self, fd, read_callback, write_callback=None): + file_handler = ReactorFileHandler(fd, read_callback, write_callback) fds = self._fds.copy() fds[fd] = callback self._fds = fds @@ -338,6 +375,13 @@ class EPollReactor(SelectReactor): fds = self._fds.copy() del fds[file_handler.fd] self._fds = fds + def set_fd_wake(self, file_handler, is_readable=True, is_writeable=False): + flags = select.POLLHUP + if is_readable: + flags |= select.EPOLLIN + if is_writeable: + flags |= select.EPOLLOUT + self._epoll.modify(file_handler, flags) # Main loop def _dispatch_loop(self): self._g_dispatch = g_dispatch = greenlet.getcurrent() @@ -350,11 +394,18 @@ class EPollReactor(SelectReactor): eventtime = self.monotonic() for fd, event in res: busy = True - self._fds[fd](eventtime) - if g_dispatch is not self._g_dispatch: - self._end_greenlet(g_dispatch) - eventtime = self.monotonic() - break + if event & (select.EPOLLIN | select.EPOLLHUP): + self._fds[fd].read_callback(eventtime) + if g_dispatch is not self._g_dispatch: + self._end_greenlet(g_dispatch) + eventtime = self.monotonic() + break + if event & select.EPOLLOUT: + self._fds[fd].write_callback(eventtime) + if g_dispatch is not self._g_dispatch: + self._end_greenlet(g_dispatch) + eventtime = self.monotonic() + break self._g_dispatch = None # Use the poll based reactor if it is available