queue: Only ACK drain_queue once it has completed work on the list.

Currently, drain_queue and json_drain_queue ack every message as it is
pulled off of the queue, until the queue is empty.  This means that if
the consumer crashes between pulling a batch of messages off the
queue, and actually processing them, those messages will be
permanently lost.  Sending an ACK on every message also results in a
significant amount lot of traffic to rabbitmq, with notable
performance implications.

Send a singular ACK after the processing has completed, by making
`drain_queue` into a contextmanager.  Additionally, use the `multiple`
flag to ACK all of the messages at once -- or explicitly NACK the
messages if processing failed.  Sending a NACK will re-queue them at
the front of the queue.

Performance of a no-op dequeue before this change:
```
$ ./manage.py queue_rate --count 50000 --batch
Purging queue...
Enqueue rate: 10847 / sec
Dequeue rate: 2479 / sec
```
Performance of a no-op dequeue after this change (a 25% increase):
```
$ ./manage.py queue_rate --count 50000 --batch
Purging queue...
Enqueue rate: 10752 / sec
Dequeue rate: 3079 / sec
```
This commit is contained in:
Alex Vandiver 2020-09-29 19:03:57 -07:00 committed by Tim Abbott
parent df86a564dc
commit baf882a133
4 changed files with 62 additions and 23 deletions

View File

@ -3,7 +3,8 @@ import random
import threading
import time
from collections import defaultdict
from typing import Any, Callable, Dict, List, Mapping, Optional, Set
from contextlib import contextmanager
from typing import Any, Callable, Dict, Iterator, List, Mapping, Optional, Set
import orjson
import pika
@ -167,25 +168,40 @@ class SimpleQueueClient:
callback(orjson.loads(body))
self.register_consumer(queue_name, wrapped_callback)
def drain_queue(self, queue_name: str) -> List[bytes]:
"Returns all messages in the desired queue"
@contextmanager
def drain_queue(self, queue_name: str) -> Iterator[List[bytes]]:
"""As a contextmanger, yields all messages in the desired queue.
NACKs all of the messages if the block throws an exception,
ACKs them otherwise.
"""
messages = []
max_tag: Optional[int] = None
def opened(channel: BlockingChannel) -> None:
nonlocal max_tag
while True:
(meta, _, message) = channel.basic_get(queue_name)
if message is None:
break
channel.basic_ack(meta.delivery_tag)
max_tag = meta.delivery_tag
messages.append(message)
self.ensure_queue(queue_name, opened)
return messages
assert self.channel is not None
try:
yield messages
if max_tag:
self.channel.basic_ack(max_tag, multiple=True)
except Exception:
if max_tag:
self.channel.basic_nack(max_tag, multiple=True)
raise
def json_drain_queue(self, queue_name: str) -> List[Dict[str, Any]]:
return list(map(orjson.loads, self.drain_queue(queue_name)))
@contextmanager
def json_drain_queue(self, queue_name: str) -> Iterator[List[Dict[str, Any]]]:
with self.drain_queue(queue_name) as binary_messages:
yield list(map(orjson.loads, binary_messages))
def queue_size(self) -> int:
assert self.channel is not None

View File

@ -24,17 +24,37 @@ class TestQueueImplementation(ZulipTestCase):
queue_client = get_queue_client()
queue_client.publish("test_suite", b"test_event\x00\xff")
result = queue_client.drain_queue("test_suite")
self.assertEqual(result, [b"test_event\x00\xff"])
with queue_client.drain_queue("test_suite") as result:
self.assertEqual(result, [b"test_event\x00\xff"])
@override_settings(USING_RABBITMQ=True)
def test_queue_basics_json(self) -> None:
queue_json_publish("test_suite", {"event": "my_event"})
queue_client = get_queue_client()
result = queue_client.json_drain_queue("test_suite")
self.assertEqual(len(result), 1)
self.assertEqual(result[0]['event'], 'my_event')
with queue_client.json_drain_queue("test_suite") as result:
self.assertEqual(len(result), 1)
self.assertEqual(result[0]['event'], 'my_event')
@override_settings(USING_RABBITMQ=True)
def test_queue_basics_json_error(self) -> None:
queue_json_publish("test_suite", {"event": "my_event"})
queue_client = get_queue_client()
raised = False
try:
with queue_client.json_drain_queue("test_suite") as result:
self.assertEqual(len(result), 1)
self.assertEqual(result[0]['event'], 'my_event')
raise ValueError()
except ValueError:
raised = True
assert raised
# Still in the queue to be fetched
with queue_client.json_drain_queue("test_suite") as result:
self.assertEqual(len(result), 1)
self.assertEqual(result[0]['event'], 'my_event')
@override_settings(USING_RABBITMQ=True)
def test_register_consumer(self) -> None:
@ -104,12 +124,13 @@ class TestQueueImplementation(ZulipTestCase):
'WARNING:zulip.queue:Failed to send to rabbitmq, trying to reconnect and send again'
])
result = queue_client.json_drain_queue("test_suite")
self.assertEqual(len(result), 1)
self.assertEqual(result[0]['event'], 'my_event')
with queue_client.json_drain_queue("test_suite") as result:
self.assertEqual(len(result), 1)
self.assertEqual(result[0]['event'], 'my_event')
@override_settings(USING_RABBITMQ=True)
def tearDown(self) -> None:
queue_client = get_queue_client()
queue_client.drain_queue("test_suite")
with queue_client.drain_queue("test_suite"):
pass
super().tearDown()

View File

@ -2,7 +2,8 @@ import base64
import os
import smtplib
import time
from typing import Any, Callable, Dict, List, Mapping, Optional, Tuple
from contextlib import contextmanager
from typing import Any, Callable, Dict, Iterator, List, Mapping, Optional, Tuple
from unittest.mock import MagicMock, patch
import orjson
@ -58,7 +59,8 @@ class WorkerTest(ZulipTestCase):
callback(data)
self.queue = []
def json_drain_queue(self, queue_name: str) -> List[Event]:
@contextmanager
def json_drain_queue(self, queue_name: str) -> Iterator[List[Event]]:
events = [
dct
for (queue_name, dct)
@ -70,7 +72,7 @@ class WorkerTest(ZulipTestCase):
# queues, which was a bug at one point.
self.queue = []
return events
yield events
def queue_size(self) -> int:
return len(self.queue)

View File

@ -324,8 +324,8 @@ class LoopQueueProcessingWorker(QueueProcessingWorker):
self.initialize_statistics()
self.is_consuming = True
while self.is_consuming:
events = self.q.json_drain_queue(self.queue_name)
self.do_consume(self.consume_batch, events)
with self.q.json_drain_queue(self.queue_name) as events:
self.do_consume(self.consume_batch, events)
# To avoid spinning the CPU, we go to sleep if there's
# nothing in the queue, or for certain queues with
# sleep_only_if_empty=False, unconditionally.