import ctypes import sys import threading import time from types import TracebackType from typing import Callable, Optional, Tuple, Type, TypeVar # Based on https://code.activestate.com/recipes/483752/ class TimeoutExpired(Exception): """Exception raised when a function times out.""" def __str__(self) -> str: return "Function call timed out." ResultT = TypeVar("ResultT") def timeout(timeout: float, func: Callable[[], ResultT]) -> ResultT: """Call the function in a separate thread. Return its return value, or raise an exception, within approximately 'timeout' seconds. The function may receive a TimeoutExpired exception anywhere in its code, which could have arbitrary unsafe effects (resources not released, etc.). It might also fail to receive the exception and keep running in the background even though timeout() has returned. This may also fail to interrupt functions which are stuck in a long-running primitive interpreter operation.""" class TimeoutThread(threading.Thread): def __init__(self) -> None: threading.Thread.__init__(self) self.result: Optional[ResultT] = None self.exc_info: Tuple[ Optional[Type[BaseException]], Optional[BaseException], Optional[TracebackType], ] = (None, None, None) # Don't block the whole program from exiting # if this is the only thread left. self.daemon = True def run(self) -> None: try: self.result = func() except BaseException: self.exc_info = sys.exc_info() def raise_async_timeout(self) -> None: # Called from another thread. # Attempt to raise a TimeoutExpired in the thread represented by 'self'. assert self.ident is not None # Thread should be running; c_long expects int tid = ctypes.c_long(self.ident) result = ctypes.pythonapi.PyThreadState_SetAsyncExc( tid, ctypes.py_object(TimeoutExpired) ) if result > 1: # "if it returns a number greater than one, you're in trouble, # and you should call it again with exc=NULL to revert the effect" # # I was unable to find the actual source of this quote, but it # appears in the many projects across the Internet that have # copy-pasted this recipe. ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None) thread = TimeoutThread() thread.start() thread.join(timeout) if thread.is_alive(): # Gamely try to kill the thread, following the dodgy approach from # https://stackoverflow.com/a/325528/90777 # # We need to retry, because an async exception received while the # thread is in a system call is simply ignored. for i in range(10): thread.raise_async_timeout() time.sleep(0.1) if not thread.is_alive(): break raise TimeoutExpired if thread.exc_info[1] is not None: # Raise the original stack trace so our error messages are more useful. # from https://stackoverflow.com/a/4785766/90777 raise thread.exc_info[1].with_traceback(thread.exc_info[2]) assert thread.result is not None # assured if above did not reraise return thread.result