reactor: Add ability to register callbacks

Add the ability to register callbacks - both asynchronous (ie, from
another thread) and synchronous.

Signed-off-by: Kevin O'Connor <kevin@koconnor.net>
This commit is contained in:
Kevin O'Connor 2018-06-26 09:24:19 -04:00
parent ecf53e6194
commit 4061026c25
1 changed files with 58 additions and 6 deletions

View File

@ -1,17 +1,27 @@
# File descriptor and timer event helper # File descriptor and timer event helper
# #
# Copyright (C) 2016,2017 Kevin O'Connor <kevin@koconnor.net> # Copyright (C) 2016-2018 Kevin O'Connor <kevin@koconnor.net>
# #
# This file may be distributed under the terms of the GNU GPLv3 license. # This file may be distributed under the terms of the GNU GPLv3 license.
import select, math, time import os, select, math, time, Queue
import greenlet import greenlet
import chelper import chelper, util
class ReactorTimer: class ReactorTimer:
def __init__(self, callback, waketime): def __init__(self, callback, waketime):
self.callback = callback self.callback = callback
self.waketime = waketime self.waketime = waketime
class ReactorCallback:
def __init__(self, reactor, callback):
self.reactor = reactor
self.timer = reactor.register_timer(self.invoke, reactor.NOW)
self.callback = callback
def invoke(self, eventtime):
self.reactor.unregister_timer(self.timer)
self.callback(eventtime)
return self.reactor.NEVER
class ReactorFileHandler: class ReactorFileHandler:
def __init__(self, fd, callback): def __init__(self, fd, callback):
self.fd = fd self.fd = fd
@ -28,13 +38,20 @@ class SelectReactor:
NOW = 0. NOW = 0.
NEVER = 9999999999999999. NEVER = 9999999999999999.
def __init__(self): def __init__(self):
self._fds = [] # Main code
self._process = False
self.monotonic = chelper.get_ffi()[1].get_monotonic
# Timers
self._timers = [] self._timers = []
self._next_timer = self.NEVER self._next_timer = self.NEVER
self._process = False # Callbacks
self._pipe_fds = None
self._async_queue = Queue.Queue()
# File descriptors
self._fds = []
# Greenlets
self._g_dispatch = None self._g_dispatch = None
self._greenlets = [] self._greenlets = []
self.monotonic = chelper.get_ffi()[1].get_monotonic
# Timers # Timers
def _note_time(self, t): def _note_time(self, t):
nexttime = t.waketime nexttime = t.waketime
@ -70,6 +87,36 @@ class SelectReactor:
if eventtime >= self._next_timer: if eventtime >= self._next_timer:
return 0. return 0.
return min(1., max(.001, self._next_timer - self.monotonic())) return min(1., max(.001, self._next_timer - self.monotonic()))
# Callbacks
def register_callback(self, callback):
ReactorCallback(self, callback)
def register_async_callback(self, callback):
self._async_queue.put_nowait(callback)
try:
os.write(self._pipe_fds[1], '.')
except os.error:
pass
def _got_pipe_signal(self, eventtime):
try:
os.read(self._pipe_fds[0], 4096)
except os.error:
pass
while 1:
try:
callback = self._async_queue.get_nowait()
except Queue.Empty:
break
ReactorCallback(self, callback)
def _setup_async_callbacks(self):
self._pipe_fds = os.pipe()
util.set_nonblock(self._pipe_fds[0])
util.set_nonblock(self._pipe_fds[1])
self.register_fd(self._pipe_fds[0], self._got_pipe_signal)
def __del__(self):
if self._pipe_fds is not None:
os.close(self._pipe_fds[0])
os.close(self._pipe_fds[1])
self._pipe_fds = None
# Greenlets # Greenlets
def _sys_pause(self, waketime): def _sys_pause(self, waketime):
# Pause using system sleep for when reactor not running # Pause using system sleep for when reactor not running
@ -91,10 +138,13 @@ class SelectReactor:
g.timer = self.register_timer(g.switch, waketime) g.timer = self.register_timer(g.switch, waketime)
return g_next.switch() return g_next.switch()
def _end_greenlet(self, g_old): def _end_greenlet(self, g_old):
# Cache this greenlet for later use
self._greenlets.append(g_old) self._greenlets.append(g_old)
self.unregister_timer(g_old.timer) self.unregister_timer(g_old.timer)
g_old.timer = None g_old.timer = None
# Switch to existing dispatch
self._g_dispatch.switch(self.NEVER) self._g_dispatch.switch(self.NEVER)
# This greenlet was reactivated - prepare for main processing loop
self._g_dispatch = g_old self._g_dispatch = g_old
# File descriptors # File descriptors
def register_fd(self, fd, callback): def register_fd(self, fd, callback):
@ -119,6 +169,8 @@ class SelectReactor:
break break
self._g_dispatch = None self._g_dispatch = None
def run(self): def run(self):
if self._pipe_fds is None:
self._setup_async_callbacks()
self._process = True self._process = True
g_next = ReactorGreenlet(run=self._dispatch_loop) g_next = ReactorGreenlet(run=self._dispatch_loop)
g_next.switch() g_next.switch()