workers: Pass down if they are running multi-threaded.

This allows them to decide for themselves if they should enable
timeouts.
This commit is contained in:
Alex Vandiver 2023-05-16 14:57:35 +00:00 committed by Tim Abbott
parent 5329fed387
commit 9f231322c9
4 changed files with 16 additions and 16 deletions

View File

@ -107,7 +107,6 @@ class Command(BaseCommand):
signal.signal(signal.SIGTERM, signal_handler) signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGUSR1, signal_handler) signal.signal(signal.SIGUSR1, signal_handler)
worker.ENABLE_TIMEOUTS = True
worker.start() worker.start()
@ -118,7 +117,7 @@ class ThreadedWorker(threading.Thread):
self.queue_name = queue_name self.queue_name = queue_name
with log_and_exit_if_exception(logger, queue_name, threaded=True): with log_and_exit_if_exception(logger, queue_name, threaded=True):
self.worker = get_worker(queue_name) self.worker = get_worker(queue_name, threaded=True)
def run(self) -> None: def run(self) -> None:
with configure_scope() as scope, log_and_exit_if_exception( with configure_scope() as scope, log_and_exit_if_exception(

View File

@ -717,7 +717,6 @@ class WorkerTest(ZulipTestCase):
with simulated_queue_client(fake_client): with simulated_queue_client(fake_client):
worker = TimeoutWorker() worker = TimeoutWorker()
worker.setup() worker.setup()
worker.ENABLE_TIMEOUTS = True
with self.assertLogs(level="ERROR") as m: with self.assertLogs(level="ERROR") as m:
worker.start() worker.start()
self.assertEqual( self.assertEqual(
@ -757,7 +756,6 @@ class WorkerTest(ZulipTestCase):
with simulated_queue_client(fake_client): with simulated_queue_client(fake_client):
worker = TimeoutWorker() worker = TimeoutWorker()
worker.setup() worker.setup()
worker.ENABLE_TIMEOUTS = True
with self.assertLogs(level="WARNING") as m: with self.assertLogs(level="WARNING") as m:
worker.start() worker.start()
self.assertEqual( self.assertEqual(

View File

@ -161,8 +161,8 @@ def register_worker(
test_queues.add(queue_name) test_queues.add(queue_name)
def get_worker(queue_name: str) -> "QueueProcessingWorker": def get_worker(queue_name: str, threaded: bool = False) -> "QueueProcessingWorker":
return worker_classes[queue_name]() return worker_classes[queue_name](threaded=threaded)
def get_active_worker_queues(only_test_queues: bool = False) -> List[str]: def get_active_worker_queues(only_test_queues: bool = False) -> List[str]:
@ -213,9 +213,6 @@ def retry_send_email_failures(
class QueueProcessingWorker(ABC): class QueueProcessingWorker(ABC):
queue_name: str queue_name: str
MAX_CONSUME_SECONDS: Optional[int] = 30 MAX_CONSUME_SECONDS: Optional[int] = 30
# The MAX_CONSUME_SECONDS timeout is only enabled when handling a
# single queue at once, with no threads.
ENABLE_TIMEOUTS = False
CONSUME_ITERATIONS_BEFORE_UPDATE_STATS_NUM = 50 CONSUME_ITERATIONS_BEFORE_UPDATE_STATS_NUM = 50
MAX_SECONDS_BEFORE_UPDATE_STATS = 30 MAX_SECONDS_BEFORE_UPDATE_STATS = 30
@ -225,8 +222,9 @@ class QueueProcessingWorker(ABC):
# startup and steady-state memory. # startup and steady-state memory.
PREFETCH = 100 PREFETCH = 100
def __init__(self) -> None: def __init__(self, threaded: bool = False) -> None:
self.q: Optional[SimpleQueueClient] = None self.q: Optional[SimpleQueueClient] = None
self.threaded = threaded
if not hasattr(self, "queue_name"): if not hasattr(self, "queue_name"):
raise WorkerDeclarationError("Queue worker declared without queue_name") raise WorkerDeclarationError("Queue worker declared without queue_name")
@ -304,7 +302,7 @@ class QueueProcessingWorker(ABC):
self.update_statistics() self.update_statistics()
time_start = time.time() time_start = time.time()
if self.MAX_CONSUME_SECONDS and self.ENABLE_TIMEOUTS: if self.MAX_CONSUME_SECONDS and not self.threaded:
try: try:
signal.signal( signal.signal(
signal.SIGALRM, signal.SIGALRM,
@ -765,8 +763,8 @@ class MissedMessageWorker(QueueProcessingWorker):
@assign_queue("email_senders") @assign_queue("email_senders")
class EmailSendingWorker(LoopQueueProcessingWorker): class EmailSendingWorker(LoopQueueProcessingWorker):
def __init__(self) -> None: def __init__(self, threaded: bool = False) -> None:
super().__init__() super().__init__(threaded)
self.connection: BaseEmailBackend = initialize_connection(None) self.connection: BaseEmailBackend = initialize_connection(None)
@retry_send_email_failures @retry_send_email_failures
@ -1167,7 +1165,10 @@ class TestWorker(QueueProcessingWorker):
class NoopWorker(QueueProcessingWorker): class NoopWorker(QueueProcessingWorker):
"""Used to profile the queue processing framework, in zilencer's queue_rate.""" """Used to profile the queue processing framework, in zilencer's queue_rate."""
def __init__(self, max_consume: int = 1000, slow_queries: Sequence[int] = []) -> None: def __init__(
self, threaded: bool = False, max_consume: int = 1000, slow_queries: Sequence[int] = []
) -> None:
super().__init__(threaded)
self.consumed = 0 self.consumed = 0
self.max_consume = max_consume self.max_consume = max_consume
self.slow_queries: Set[int] = set(slow_queries) self.slow_queries: Set[int] = set(slow_queries)
@ -1188,7 +1189,10 @@ class BatchNoopWorker(LoopQueueProcessingWorker):
batch_size = 100 batch_size = 100
def __init__(self, max_consume: int = 1000, slow_queries: Sequence[int] = []) -> None: def __init__(
self, threaded: bool = False, max_consume: int = 1000, slow_queries: Sequence[int] = []
) -> None:
super().__init__(threaded)
self.consumed = 0 self.consumed = 0
self.max_consume = max_consume self.max_consume = max_consume
self.slow_queries: Set[int] = set(slow_queries) self.slow_queries: Set[int] = set(slow_queries)

View File

@ -57,7 +57,6 @@ class Command(BaseCommand):
f" Skipping, as prefetch {prefetch} is less than batch size {worker.batch_size}" f" Skipping, as prefetch {prefetch} is less than batch size {worker.batch_size}"
) )
continue continue
worker.ENABLE_TIMEOUTS = True
worker.setup() worker.setup()
assert worker.q is not None assert worker.q is not None