queue: Switch tests to start_json_consumer interface.

This commit is contained in:
Alex Vandiver 2020-10-09 20:27:42 -07:00 committed by Tim Abbott
parent 179c387409
commit 5477b9d9a1
1 changed files with 19 additions and 14 deletions

View File

@ -1,6 +1,7 @@
from typing import Any, Dict
from typing import Any, Dict, List
from unittest import mock
import orjson
from django.test import override_settings
from pika.exceptions import AMQPConnectionError, ConnectionClosed
@ -62,14 +63,14 @@ class TestQueueImplementation(ZulipTestCase):
queue_client = get_queue_client()
def collect(event: Dict[str, Any]) -> None:
output.append(event)
def collect(events: List[Dict[str, Any]]) -> None:
assert len(events) == 1
output.append(events[0])
queue_client.stop_consuming()
queue_client.register_json_consumer("test_suite", collect)
queue_json_publish("test_suite", {"event": "my_event"})
queue_client.start_consuming()
queue_client.start_json_consumer("test_suite", collect)
self.assertEqual(len(output), 1)
self.assertEqual(output[0]['event'], 'my_event')
@ -81,22 +82,21 @@ class TestQueueImplementation(ZulipTestCase):
queue_client = get_queue_client()
def collect(event: Dict[str, Any]) -> None:
def collect(events: List[Dict[str, Any]]) -> None:
assert len(events) == 1
queue_client.stop_consuming()
nonlocal count
count += 1
if count == 1:
raise Exception("Make me nack!")
output.append(event)
output.append(events[0])
queue_client.register_json_consumer("test_suite", collect)
queue_json_publish("test_suite", {"event": "my_event"})
try:
queue_client.start_consuming()
queue_client.start_json_consumer("test_suite", collect)
except Exception:
queue_client.register_json_consumer("test_suite", collect)
queue_client.start_consuming()
queue_client.start_json_consumer("test_suite", collect)
# Confirm that we processed the event fully once
self.assertEqual(count, 2)
@ -124,9 +124,14 @@ class TestQueueImplementation(ZulipTestCase):
'WARNING:zulip.queue:Failed to send to rabbitmq, trying to reconnect and send again'
])
with queue_client.json_drain_queue("test_suite") as result:
self.assertEqual(len(result), 1)
self.assertEqual(result[0]['event'], 'my_event')
assert queue_client.channel
(_, _, message) = queue_client.channel.basic_get("test_suite")
assert message
result = orjson.loads(message)
self.assertEqual(result['event'], 'my_event')
(_, _, message) = queue_client.channel.basic_get("test_suite")
assert not message
@override_settings(USING_RABBITMQ=True)
def tearDown(self) -> None: