tornado: Switch to start_json_consumer interface.

This commit is contained in:
Alex Vandiver 2020-10-09 20:24:35 -07:00 committed by Tim Abbott
parent f0b23b0752
commit 179c387409
3 changed files with 21 additions and 13 deletions

View File

@ -372,19 +372,25 @@ class TornadoQueueClient(SimpleQueueClient):
assert self.channel is not None
callback(self.channel)
def register_consumer(self, queue_name: str, consumer: Consumer) -> None:
def start_json_consumer(self,
queue_name: str,
callback: Callable[[List[Dict[str, Any]]], None],
batch_size: int=1,
timeout: Optional[int]=None) -> None:
def wrapped_consumer(ch: BlockingChannel,
method: Basic.Deliver,
properties: pika.BasicProperties,
body: bytes) -> None:
consumer(ch, method, properties, body)
callback([orjson.loads(body)])
ch.basic_ack(delivery_tag=method.delivery_tag)
assert batch_size == 1
assert timeout is None
self.consumers[queue_name].add(wrapped_consumer)
if not self.ready():
self.consumers[queue_name].add(wrapped_consumer)
return
self.consumers[queue_name].add(wrapped_consumer)
self.ensure_queue(
queue_name,
lambda channel: channel.basic_consume(

View File

@ -30,7 +30,7 @@ from zerver.tornado.event_queue import (
from zerver.tornado.sharding import notify_tornado_queue_name
if settings.USING_RABBITMQ:
from zerver.lib.queue import get_queue_client
from zerver.lib.queue import TornadoQueueClient, get_queue_client
def handle_callback_exception(callback: Callable[..., Any]) -> None:
@ -89,10 +89,11 @@ class Command(BaseCommand):
if settings.USING_RABBITMQ:
queue_client = get_queue_client()
assert isinstance(queue_client, TornadoQueueClient)
# Process notifications received via RabbitMQ
queue_name = notify_tornado_queue_name(port)
queue_client.register_json_consumer(queue_name,
get_wrapped_process_notification(queue_name))
queue_client.start_json_consumer(queue_name,
get_wrapped_process_notification(queue_name))
try:
# Application is an instance of Django's standard wsgi handler.

View File

@ -1081,16 +1081,17 @@ def process_notification(notice: Mapping[str, Any]) -> None:
event['type'], len(users), int(1000 * (time.time() - start_time)),
)
def get_wrapped_process_notification(queue_name: str) -> Callable[[Dict[str, Any]], None]:
def get_wrapped_process_notification(queue_name: str) -> Callable[[List[Dict[str, Any]]], None]:
def failure_processor(notice: Dict[str, Any]) -> None:
logging.error(
"Maximum retries exceeded for Tornado notice:%s\nStack trace:\n%s\n",
notice, traceback.format_exc())
def wrapped_process_notification(notice: Dict[str, Any]) -> None:
try:
process_notification(notice)
except Exception:
retry_event(queue_name, notice, failure_processor)
def wrapped_process_notification(notices: List[Dict[str, Any]]) -> None:
for notice in notices:
try:
process_notification(notice)
except Exception:
retry_event(queue_name, notice, failure_processor)
return wrapped_process_notification