2016-05-25 17:37:40 +02:00
|
|
|
# File descriptor and timer event helper
|
|
|
|
#
|
2019-06-08 04:41:58 +02:00
|
|
|
# Copyright (C) 2016-2019 Kevin O'Connor <kevin@koconnor.net>
|
2016-05-25 17:37:40 +02:00
|
|
|
#
|
|
|
|
# This file may be distributed under the terms of the GNU GPLv3 license.
|
2020-06-12 16:11:57 +02:00
|
|
|
import os, select, math, time, Queue as queue
|
2016-11-16 01:56:27 +01:00
|
|
|
import greenlet
|
2018-06-26 15:24:19 +02:00
|
|
|
import chelper, util
|
2016-05-25 17:37:40 +02:00
|
|
|
|
2019-06-26 20:05:10 +02:00
|
|
|
_NOW = 0.
|
|
|
|
_NEVER = 9999999999999999.
|
|
|
|
|
2016-05-25 17:37:40 +02:00
|
|
|
class ReactorTimer:
|
|
|
|
def __init__(self, callback, waketime):
|
|
|
|
self.callback = callback
|
|
|
|
self.waketime = waketime
|
|
|
|
|
2019-06-26 20:05:10 +02:00
|
|
|
class ReactorCompletion:
|
|
|
|
class sentinel: pass
|
|
|
|
def __init__(self, reactor):
|
|
|
|
self.reactor = reactor
|
|
|
|
self.result = self.sentinel
|
2020-02-19 16:49:56 +01:00
|
|
|
self.waiting = []
|
2019-06-26 20:05:10 +02:00
|
|
|
def test(self):
|
|
|
|
return self.result is not self.sentinel
|
|
|
|
def complete(self, result):
|
|
|
|
self.result = result
|
2020-02-19 16:49:56 +01:00
|
|
|
for wait in self.waiting:
|
|
|
|
self.reactor.update_timer(wait.timer, self.reactor.NOW)
|
2019-06-26 20:05:10 +02:00
|
|
|
def wait(self, waketime=_NEVER, waketime_result=None):
|
|
|
|
if self.result is self.sentinel:
|
2020-02-19 16:49:56 +01:00
|
|
|
wait = greenlet.getcurrent()
|
|
|
|
self.waiting.append(wait)
|
2019-06-26 20:05:10 +02:00
|
|
|
self.reactor.pause(waketime)
|
2020-02-19 16:49:56 +01:00
|
|
|
self.waiting.remove(wait)
|
2019-06-26 20:05:10 +02:00
|
|
|
if self.result is self.sentinel:
|
|
|
|
return waketime_result
|
|
|
|
return self.result
|
|
|
|
|
2018-06-26 15:24:19 +02:00
|
|
|
class ReactorCallback:
|
2018-12-20 19:56:41 +01:00
|
|
|
def __init__(self, reactor, callback, waketime):
|
2018-06-26 15:24:19 +02:00
|
|
|
self.reactor = reactor
|
2018-12-20 19:56:41 +01:00
|
|
|
self.timer = reactor.register_timer(self.invoke, waketime)
|
2018-06-26 15:24:19 +02:00
|
|
|
self.callback = callback
|
2019-06-26 20:05:10 +02:00
|
|
|
self.completion = ReactorCompletion(reactor)
|
2018-06-26 15:24:19 +02:00
|
|
|
def invoke(self, eventtime):
|
|
|
|
self.reactor.unregister_timer(self.timer)
|
2019-06-26 20:05:10 +02:00
|
|
|
res = self.callback(eventtime)
|
|
|
|
self.completion.complete(res)
|
2018-06-26 15:24:19 +02:00
|
|
|
return self.reactor.NEVER
|
|
|
|
|
2016-05-25 17:37:40 +02:00
|
|
|
class ReactorFileHandler:
|
|
|
|
def __init__(self, fd, callback):
|
|
|
|
self.fd = fd
|
|
|
|
self.callback = callback
|
|
|
|
def fileno(self):
|
|
|
|
return self.fd
|
|
|
|
|
2016-11-16 01:56:27 +01:00
|
|
|
class ReactorGreenlet(greenlet.greenlet):
|
|
|
|
def __init__(self, run):
|
|
|
|
greenlet.greenlet.__init__(self, run=run)
|
|
|
|
self.timer = None
|
|
|
|
|
2019-06-08 04:41:58 +02:00
|
|
|
class ReactorMutex:
|
|
|
|
def __init__(self, reactor, is_locked):
|
|
|
|
self.reactor = reactor
|
|
|
|
self.is_locked = is_locked
|
|
|
|
self.next_pending = False
|
|
|
|
self.queue = []
|
|
|
|
self.lock = self.__enter__
|
|
|
|
self.unlock = self.__exit__
|
|
|
|
def test(self):
|
|
|
|
return self.is_locked
|
|
|
|
def __enter__(self):
|
|
|
|
if not self.is_locked:
|
|
|
|
self.is_locked = True
|
|
|
|
return
|
|
|
|
g = greenlet.getcurrent()
|
|
|
|
self.queue.append(g)
|
|
|
|
while 1:
|
|
|
|
self.reactor.pause(self.reactor.NEVER)
|
|
|
|
if self.next_pending and self.queue[0] is g:
|
|
|
|
self.next_pending = False
|
|
|
|
self.queue.pop(0)
|
|
|
|
return
|
|
|
|
def __exit__(self, type=None, value=None, tb=None):
|
|
|
|
if not self.queue:
|
|
|
|
self.is_locked = False
|
|
|
|
return
|
|
|
|
self.next_pending = True
|
|
|
|
self.reactor.update_timer(self.queue[0].timer, self.reactor.NOW)
|
|
|
|
|
2016-05-25 17:37:40 +02:00
|
|
|
class SelectReactor:
|
2019-06-26 20:05:10 +02:00
|
|
|
NOW = _NOW
|
|
|
|
NEVER = _NEVER
|
2016-05-25 17:37:40 +02:00
|
|
|
def __init__(self):
|
2018-06-26 15:24:19 +02:00
|
|
|
# Main code
|
|
|
|
self._process = False
|
|
|
|
self.monotonic = chelper.get_ffi()[1].get_monotonic
|
|
|
|
# Timers
|
2016-05-25 17:37:40 +02:00
|
|
|
self._timers = []
|
|
|
|
self._next_timer = self.NEVER
|
2018-06-26 15:24:19 +02:00
|
|
|
# Callbacks
|
|
|
|
self._pipe_fds = None
|
2020-06-12 16:11:57 +02:00
|
|
|
self._async_queue = queue.Queue()
|
2018-06-26 15:24:19 +02:00
|
|
|
# File descriptors
|
|
|
|
self._fds = []
|
|
|
|
# Greenlets
|
2016-11-16 01:56:27 +01:00
|
|
|
self._g_dispatch = None
|
|
|
|
self._greenlets = []
|
2016-05-25 17:37:40 +02:00
|
|
|
# Timers
|
2019-06-19 15:32:40 +02:00
|
|
|
def update_timer(self, timer_handler, waketime):
|
|
|
|
timer_handler.waketime = waketime
|
|
|
|
self._next_timer = min(self._next_timer, waketime)
|
|
|
|
def register_timer(self, callback, waketime=NEVER):
|
2019-06-09 20:38:01 +02:00
|
|
|
timer_handler = ReactorTimer(callback, waketime)
|
2016-05-25 17:37:40 +02:00
|
|
|
timers = list(self._timers)
|
2019-06-09 20:38:01 +02:00
|
|
|
timers.append(timer_handler)
|
2016-05-25 17:37:40 +02:00
|
|
|
self._timers = timers
|
2019-06-19 15:32:40 +02:00
|
|
|
self._next_timer = min(self._next_timer, waketime)
|
2019-06-09 20:38:01 +02:00
|
|
|
return timer_handler
|
|
|
|
def unregister_timer(self, timer_handler):
|
2019-06-19 15:32:40 +02:00
|
|
|
timer_handler.waketime = self.NEVER
|
2016-05-25 17:37:40 +02:00
|
|
|
timers = list(self._timers)
|
2019-06-09 20:38:01 +02:00
|
|
|
timers.pop(timers.index(timer_handler))
|
2016-05-25 17:37:40 +02:00
|
|
|
self._timers = timers
|
|
|
|
def _check_timers(self, eventtime):
|
|
|
|
if eventtime < self._next_timer:
|
|
|
|
return min(1., max(.001, self._next_timer - eventtime))
|
|
|
|
self._next_timer = self.NEVER
|
2016-11-16 01:56:27 +01:00
|
|
|
g_dispatch = self._g_dispatch
|
2016-05-25 17:37:40 +02:00
|
|
|
for t in self._timers:
|
2019-06-19 15:32:40 +02:00
|
|
|
waketime = t.waketime
|
|
|
|
if eventtime >= waketime:
|
2016-11-16 01:56:27 +01:00
|
|
|
t.waketime = self.NEVER
|
2019-06-19 15:32:40 +02:00
|
|
|
t.waketime = waketime = t.callback(eventtime)
|
2016-11-16 01:56:27 +01:00
|
|
|
if g_dispatch is not self._g_dispatch:
|
2019-06-19 15:32:40 +02:00
|
|
|
self._next_timer = min(self._next_timer, waketime)
|
2016-11-16 01:56:27 +01:00
|
|
|
self._end_greenlet(g_dispatch)
|
|
|
|
return 0.
|
2019-06-19 15:32:40 +02:00
|
|
|
self._next_timer = min(self._next_timer, waketime)
|
2016-05-25 17:37:40 +02:00
|
|
|
if eventtime >= self._next_timer:
|
|
|
|
return 0.
|
2017-02-06 19:31:34 +01:00
|
|
|
return min(1., max(.001, self._next_timer - self.monotonic()))
|
2019-06-26 20:05:10 +02:00
|
|
|
# Callbacks and Completions
|
|
|
|
def completion(self):
|
|
|
|
return ReactorCompletion(self)
|
2019-06-19 15:32:40 +02:00
|
|
|
def register_callback(self, callback, waketime=NOW):
|
2019-06-26 20:05:10 +02:00
|
|
|
rcb = ReactorCallback(self, callback, waketime)
|
|
|
|
return rcb.completion
|
|
|
|
# Asynchronous (from another thread) callbacks and completions
|
|
|
|
def register_async_callback(self, callback, waketime=NOW):
|
|
|
|
self._async_queue.put_nowait(
|
|
|
|
(ReactorCallback, (self, callback, waketime)))
|
|
|
|
try:
|
|
|
|
os.write(self._pipe_fds[1], '.')
|
|
|
|
except os.error:
|
|
|
|
pass
|
|
|
|
def async_complete(self, completion, result):
|
|
|
|
self._async_queue.put_nowait((completion.complete, (result,)))
|
2018-06-26 15:24:19 +02:00
|
|
|
try:
|
|
|
|
os.write(self._pipe_fds[1], '.')
|
|
|
|
except os.error:
|
|
|
|
pass
|
|
|
|
def _got_pipe_signal(self, eventtime):
|
|
|
|
try:
|
|
|
|
os.read(self._pipe_fds[0], 4096)
|
|
|
|
except os.error:
|
|
|
|
pass
|
|
|
|
while 1:
|
|
|
|
try:
|
2019-06-26 20:05:10 +02:00
|
|
|
func, args = self._async_queue.get_nowait()
|
2020-06-12 16:11:57 +02:00
|
|
|
except queue.Empty:
|
2018-06-26 15:24:19 +02:00
|
|
|
break
|
2019-06-26 20:05:10 +02:00
|
|
|
func(*args)
|
2018-06-26 15:24:19 +02:00
|
|
|
def _setup_async_callbacks(self):
|
|
|
|
self._pipe_fds = os.pipe()
|
|
|
|
util.set_nonblock(self._pipe_fds[0])
|
|
|
|
util.set_nonblock(self._pipe_fds[1])
|
|
|
|
self.register_fd(self._pipe_fds[0], self._got_pipe_signal)
|
2016-11-16 01:56:27 +01:00
|
|
|
# Greenlets
|
2017-04-01 02:48:29 +02:00
|
|
|
def _sys_pause(self, waketime):
|
|
|
|
# Pause using system sleep for when reactor not running
|
|
|
|
delay = waketime - self.monotonic()
|
|
|
|
if delay > 0.:
|
|
|
|
time.sleep(delay)
|
|
|
|
return self.monotonic()
|
2016-11-16 01:56:27 +01:00
|
|
|
def pause(self, waketime):
|
|
|
|
g = greenlet.getcurrent()
|
|
|
|
if g is not self._g_dispatch:
|
2017-04-01 02:48:29 +02:00
|
|
|
if self._g_dispatch is None:
|
|
|
|
return self._sys_pause(waketime)
|
2019-06-19 15:32:40 +02:00
|
|
|
# Switch to _check_timers (via g.timer.callback return)
|
2016-11-16 01:56:27 +01:00
|
|
|
return self._g_dispatch.switch(waketime)
|
2019-06-19 15:32:40 +02:00
|
|
|
# Pausing the dispatch greenlet - prepare a new greenlet to do dispatch
|
2016-11-16 01:56:27 +01:00
|
|
|
if self._greenlets:
|
|
|
|
g_next = self._greenlets.pop()
|
|
|
|
else:
|
|
|
|
g_next = ReactorGreenlet(run=self._dispatch_loop)
|
|
|
|
g_next.parent = g.parent
|
|
|
|
g.timer = self.register_timer(g.switch, waketime)
|
2019-06-19 15:32:40 +02:00
|
|
|
self._next_timer = self.NOW
|
|
|
|
# Switch to _dispatch_loop (via _end_greenlet or direct)
|
|
|
|
eventtime = g_next.switch()
|
|
|
|
# This greenlet activated from g.timer.callback (via _check_timers)
|
|
|
|
return eventtime
|
2016-11-16 01:56:27 +01:00
|
|
|
def _end_greenlet(self, g_old):
|
2018-06-26 15:24:19 +02:00
|
|
|
# Cache this greenlet for later use
|
2016-11-16 01:56:27 +01:00
|
|
|
self._greenlets.append(g_old)
|
|
|
|
self.unregister_timer(g_old.timer)
|
|
|
|
g_old.timer = None
|
2019-06-19 15:32:40 +02:00
|
|
|
# Switch to _check_timers (via g_old.timer.callback return)
|
2016-11-16 01:56:27 +01:00
|
|
|
self._g_dispatch.switch(self.NEVER)
|
2019-06-19 15:32:40 +02:00
|
|
|
# This greenlet reactivated from pause() - return to main dispatch loop
|
2016-11-16 01:56:27 +01:00
|
|
|
self._g_dispatch = g_old
|
2019-06-08 04:41:58 +02:00
|
|
|
# Mutexes
|
|
|
|
def mutex(self, is_locked=False):
|
|
|
|
return ReactorMutex(self, is_locked)
|
2016-05-25 17:37:40 +02:00
|
|
|
# File descriptors
|
|
|
|
def register_fd(self, fd, callback):
|
2019-06-09 20:38:01 +02:00
|
|
|
file_handler = ReactorFileHandler(fd, callback)
|
|
|
|
self._fds.append(file_handler)
|
|
|
|
return file_handler
|
|
|
|
def unregister_fd(self, file_handler):
|
|
|
|
self._fds.pop(self._fds.index(file_handler))
|
2016-05-25 17:37:40 +02:00
|
|
|
# Main loop
|
2016-11-16 01:56:27 +01:00
|
|
|
def _dispatch_loop(self):
|
|
|
|
self._g_dispatch = g_dispatch = greenlet.getcurrent()
|
2017-02-06 19:31:34 +01:00
|
|
|
eventtime = self.monotonic()
|
2016-05-25 17:37:40 +02:00
|
|
|
while self._process:
|
|
|
|
timeout = self._check_timers(eventtime)
|
|
|
|
res = select.select(self._fds, [], [], timeout)
|
2017-02-06 19:31:34 +01:00
|
|
|
eventtime = self.monotonic()
|
2016-05-25 17:37:40 +02:00
|
|
|
for fd in res[0]:
|
|
|
|
fd.callback(eventtime)
|
2016-11-16 01:56:27 +01:00
|
|
|
if g_dispatch is not self._g_dispatch:
|
|
|
|
self._end_greenlet(g_dispatch)
|
2017-02-06 19:31:34 +01:00
|
|
|
eventtime = self.monotonic()
|
2016-11-16 01:56:27 +01:00
|
|
|
break
|
|
|
|
self._g_dispatch = None
|
|
|
|
def run(self):
|
2018-06-26 15:24:19 +02:00
|
|
|
if self._pipe_fds is None:
|
|
|
|
self._setup_async_callbacks()
|
2017-04-13 19:12:46 +02:00
|
|
|
self._process = True
|
2016-11-16 01:56:27 +01:00
|
|
|
g_next = ReactorGreenlet(run=self._dispatch_loop)
|
|
|
|
g_next.switch()
|
2016-05-25 17:37:40 +02:00
|
|
|
def end(self):
|
|
|
|
self._process = False
|
2020-09-16 18:15:19 +02:00
|
|
|
def finalize(self):
|
|
|
|
if self._pipe_fds is not None:
|
|
|
|
os.close(self._pipe_fds[0])
|
|
|
|
os.close(self._pipe_fds[1])
|
|
|
|
self._pipe_fds = None
|
2016-05-25 17:37:40 +02:00
|
|
|
|
|
|
|
class PollReactor(SelectReactor):
|
|
|
|
def __init__(self):
|
|
|
|
SelectReactor.__init__(self)
|
|
|
|
self._poll = select.poll()
|
|
|
|
self._fds = {}
|
|
|
|
# File descriptors
|
|
|
|
def register_fd(self, fd, callback):
|
2019-06-09 20:38:01 +02:00
|
|
|
file_handler = ReactorFileHandler(fd, callback)
|
2016-05-25 17:37:40 +02:00
|
|
|
fds = self._fds.copy()
|
|
|
|
fds[fd] = callback
|
|
|
|
self._fds = fds
|
2019-06-09 20:38:01 +02:00
|
|
|
self._poll.register(file_handler, select.POLLIN | select.POLLHUP)
|
|
|
|
return file_handler
|
|
|
|
def unregister_fd(self, file_handler):
|
|
|
|
self._poll.unregister(file_handler)
|
2016-05-25 17:37:40 +02:00
|
|
|
fds = self._fds.copy()
|
2019-06-09 20:38:01 +02:00
|
|
|
del fds[file_handler.fd]
|
2016-05-25 17:37:40 +02:00
|
|
|
self._fds = fds
|
|
|
|
# Main loop
|
2016-11-16 01:56:27 +01:00
|
|
|
def _dispatch_loop(self):
|
|
|
|
self._g_dispatch = g_dispatch = greenlet.getcurrent()
|
2017-02-06 19:31:34 +01:00
|
|
|
eventtime = self.monotonic()
|
2016-05-25 17:37:40 +02:00
|
|
|
while self._process:
|
2017-01-10 16:55:46 +01:00
|
|
|
timeout = self._check_timers(eventtime)
|
|
|
|
res = self._poll.poll(int(math.ceil(timeout * 1000.)))
|
2017-02-06 19:31:34 +01:00
|
|
|
eventtime = self.monotonic()
|
2016-05-25 17:37:40 +02:00
|
|
|
for fd, event in res:
|
|
|
|
self._fds[fd](eventtime)
|
2016-11-16 01:56:27 +01:00
|
|
|
if g_dispatch is not self._g_dispatch:
|
|
|
|
self._end_greenlet(g_dispatch)
|
2017-02-06 19:31:34 +01:00
|
|
|
eventtime = self.monotonic()
|
2016-11-16 01:56:27 +01:00
|
|
|
break
|
|
|
|
self._g_dispatch = None
|
2016-05-25 17:37:40 +02:00
|
|
|
|
|
|
|
class EPollReactor(SelectReactor):
|
|
|
|
def __init__(self):
|
|
|
|
SelectReactor.__init__(self)
|
|
|
|
self._epoll = select.epoll()
|
|
|
|
self._fds = {}
|
|
|
|
# File descriptors
|
|
|
|
def register_fd(self, fd, callback):
|
2019-06-09 20:38:01 +02:00
|
|
|
file_handler = ReactorFileHandler(fd, callback)
|
2016-05-25 17:37:40 +02:00
|
|
|
fds = self._fds.copy()
|
|
|
|
fds[fd] = callback
|
|
|
|
self._fds = fds
|
|
|
|
self._epoll.register(fd, select.EPOLLIN | select.EPOLLHUP)
|
2019-06-09 20:38:01 +02:00
|
|
|
return file_handler
|
|
|
|
def unregister_fd(self, file_handler):
|
|
|
|
self._epoll.unregister(file_handler.fd)
|
2016-05-25 17:37:40 +02:00
|
|
|
fds = self._fds.copy()
|
2019-06-09 20:38:01 +02:00
|
|
|
del fds[file_handler.fd]
|
2016-05-25 17:37:40 +02:00
|
|
|
self._fds = fds
|
|
|
|
# Main loop
|
2016-11-16 01:56:27 +01:00
|
|
|
def _dispatch_loop(self):
|
|
|
|
self._g_dispatch = g_dispatch = greenlet.getcurrent()
|
2017-02-06 19:31:34 +01:00
|
|
|
eventtime = self.monotonic()
|
2016-05-25 17:37:40 +02:00
|
|
|
while self._process:
|
|
|
|
timeout = self._check_timers(eventtime)
|
|
|
|
res = self._epoll.poll(timeout)
|
2017-02-06 19:31:34 +01:00
|
|
|
eventtime = self.monotonic()
|
2016-05-25 17:37:40 +02:00
|
|
|
for fd, event in res:
|
|
|
|
self._fds[fd](eventtime)
|
2016-11-16 01:56:27 +01:00
|
|
|
if g_dispatch is not self._g_dispatch:
|
|
|
|
self._end_greenlet(g_dispatch)
|
2017-02-06 19:31:34 +01:00
|
|
|
eventtime = self.monotonic()
|
2016-11-16 01:56:27 +01:00
|
|
|
break
|
|
|
|
self._g_dispatch = None
|
2016-05-25 17:37:40 +02:00
|
|
|
|
|
|
|
# Use the poll based reactor if it is available
|
|
|
|
try:
|
|
|
|
select.poll
|
|
|
|
Reactor = PollReactor
|
|
|
|
except:
|
|
|
|
Reactor = SelectReactor
|