diff --git a/zerver/lib/queue.py b/zerver/lib/queue.py index d9f97daaa3..7beea0f14f 100644 --- a/zerver/lib/queue.py +++ b/zerver/lib/queue.py @@ -372,19 +372,25 @@ class TornadoQueueClient(SimpleQueueClient): assert self.channel is not None callback(self.channel) - def register_consumer(self, queue_name: str, consumer: Consumer) -> None: + def start_json_consumer(self, + queue_name: str, + callback: Callable[[List[Dict[str, Any]]], None], + batch_size: int=1, + timeout: Optional[int]=None) -> None: def wrapped_consumer(ch: BlockingChannel, method: Basic.Deliver, properties: pika.BasicProperties, body: bytes) -> None: - consumer(ch, method, properties, body) + callback([orjson.loads(body)]) ch.basic_ack(delivery_tag=method.delivery_tag) + assert batch_size == 1 + assert timeout is None + self.consumers[queue_name].add(wrapped_consumer) + if not self.ready(): - self.consumers[queue_name].add(wrapped_consumer) return - self.consumers[queue_name].add(wrapped_consumer) self.ensure_queue( queue_name, lambda channel: channel.basic_consume( diff --git a/zerver/management/commands/runtornado.py b/zerver/management/commands/runtornado.py index e276a844c6..7635242bbd 100644 --- a/zerver/management/commands/runtornado.py +++ b/zerver/management/commands/runtornado.py @@ -30,7 +30,7 @@ from zerver.tornado.event_queue import ( from zerver.tornado.sharding import notify_tornado_queue_name if settings.USING_RABBITMQ: - from zerver.lib.queue import get_queue_client + from zerver.lib.queue import TornadoQueueClient, get_queue_client def handle_callback_exception(callback: Callable[..., Any]) -> None: @@ -89,10 +89,11 @@ class Command(BaseCommand): if settings.USING_RABBITMQ: queue_client = get_queue_client() + assert isinstance(queue_client, TornadoQueueClient) # Process notifications received via RabbitMQ queue_name = notify_tornado_queue_name(port) - queue_client.register_json_consumer(queue_name, - get_wrapped_process_notification(queue_name)) + queue_client.start_json_consumer(queue_name, + get_wrapped_process_notification(queue_name)) try: # Application is an instance of Django's standard wsgi handler. diff --git a/zerver/tornado/event_queue.py b/zerver/tornado/event_queue.py index e47798e149..d2d93c3f30 100644 --- a/zerver/tornado/event_queue.py +++ b/zerver/tornado/event_queue.py @@ -1081,16 +1081,17 @@ def process_notification(notice: Mapping[str, Any]) -> None: event['type'], len(users), int(1000 * (time.time() - start_time)), ) -def get_wrapped_process_notification(queue_name: str) -> Callable[[Dict[str, Any]], None]: +def get_wrapped_process_notification(queue_name: str) -> Callable[[List[Dict[str, Any]]], None]: def failure_processor(notice: Dict[str, Any]) -> None: logging.error( "Maximum retries exceeded for Tornado notice:%s\nStack trace:\n%s\n", notice, traceback.format_exc()) - def wrapped_process_notification(notice: Dict[str, Any]) -> None: - try: - process_notification(notice) - except Exception: - retry_event(queue_name, notice, failure_processor) + def wrapped_process_notification(notices: List[Dict[str, Any]]) -> None: + for notice in notices: + try: + process_notification(notice) + except Exception: + retry_event(queue_name, notice, failure_processor) return wrapped_process_notification