diff --git a/zerver/management/commands/process_queue.py b/zerver/management/commands/process_queue.py index 3f643f6d69..4041f3d9d2 100644 --- a/zerver/management/commands/process_queue.py +++ b/zerver/management/commands/process_queue.py @@ -101,7 +101,7 @@ class Command(BaseCommand): logger.info("Worker %d connecting to queue %s", worker_num, queue_name) with log_and_exit_if_exception(logger, queue_name, threaded=False): - worker = get_worker(queue_name) + worker = get_worker(queue_name, worker_num=worker_num) with configure_scope() as scope: scope.set_tag("queue_worker", queue_name) scope.set_tag("worker_num", worker_num) diff --git a/zerver/worker/base.py b/zerver/worker/base.py index e86482dfdc..beb26e0987 100644 --- a/zerver/worker/base.py +++ b/zerver/worker/base.py @@ -97,10 +97,16 @@ class QueueProcessingWorker(ABC): # startup and steady-state memory. PREFETCH = 100 - def __init__(self, threaded: bool = False, disable_timeout: bool = False) -> None: + def __init__( + self, + threaded: bool = False, + disable_timeout: bool = False, + worker_num: Optional[int] = None, + ) -> None: self.q: Optional[SimpleQueueClient] = None self.threaded = threaded self.disable_timeout = disable_timeout + self.worker_num = worker_num if not hasattr(self, "queue_name"): raise WorkerDeclarationError("Queue worker declared without queue_name") diff --git a/zerver/worker/email_senders.py b/zerver/worker/email_senders.py index 62e7e04bc5..5b132ed44b 100644 --- a/zerver/worker/email_senders.py +++ b/zerver/worker/email_senders.py @@ -46,8 +46,13 @@ def retry_send_email_failures( @assign_queue("email_senders") class EmailSendingWorker(LoopQueueProcessingWorker): - def __init__(self, threaded: bool = False, disable_timeout: bool = False) -> None: - super().__init__(threaded, disable_timeout) + def __init__( + self, + threaded: bool = False, + disable_timeout: bool = False, + worker_num: Optional[int] = None, + ) -> None: + super().__init__(threaded, disable_timeout, worker_num) self.connection: Optional[BaseEmailBackend] = None @retry_send_email_failures diff --git a/zerver/worker/queue_processors.py b/zerver/worker/queue_processors.py index 28b177697e..7eec596764 100644 --- a/zerver/worker/queue_processors.py +++ b/zerver/worker/queue_processors.py @@ -1,14 +1,18 @@ # Documented in https://zulip.readthedocs.io/en/latest/subsystems/queuing.html import importlib import pkgutil -from typing import List +from typing import List, Optional import zerver.worker from zerver.worker.base import QueueProcessingWorker, test_queues, worker_classes def get_worker( - queue_name: str, threaded: bool = False, disable_timeout: bool = False + queue_name: str, + *, + threaded: bool = False, + disable_timeout: bool = False, + worker_num: Optional[int] = None, ) -> QueueProcessingWorker: if queue_name in {"test", "noop", "noop_batch"}: import_module = "zerver.worker.test" @@ -16,7 +20,9 @@ def get_worker( import_module = f"zerver.worker.{queue_name}" importlib.import_module(import_module) - return worker_classes[queue_name](threaded=threaded, disable_timeout=disable_timeout) + return worker_classes[queue_name]( + threaded=threaded, disable_timeout=disable_timeout, worker_num=worker_num + ) def get_active_worker_queues(only_test_queues: bool = False) -> List[str]: diff --git a/zerver/worker/test.py b/zerver/worker/test.py index 4f8cfa37b9..742fa6d74d 100644 --- a/zerver/worker/test.py +++ b/zerver/worker/test.py @@ -1,7 +1,7 @@ # Documented in https://zulip.readthedocs.io/en/latest/subsystems/queuing.html import logging import time -from typing import Any, Dict, List, Mapping, Sequence, Set +from typing import Any, Dict, List, Mapping, Optional, Sequence, Set import orjson from django.conf import settings @@ -35,10 +35,11 @@ class NoopWorker(QueueProcessingWorker): self, threaded: bool = False, disable_timeout: bool = False, + worker_num: Optional[int] = None, max_consume: int = 1000, slow_queries: Sequence[int] = [], ) -> None: - super().__init__(threaded, disable_timeout) + super().__init__(threaded, disable_timeout, worker_num) self.consumed = 0 self.max_consume = max_consume self.slow_queries: Set[int] = set(slow_queries)