From 962c64a1d40454460751d7cefca12c1468ab073d Mon Sep 17 00:00:00 2001 From: Tim Abbott Date: Tue, 15 May 2018 17:39:29 -0700 Subject: [PATCH] test_queue: Add basic tests for SimpleQueueClient. --- zerver/tests/test_queue.py | 53 ++++++++++++++++++++++++++++++++++++-- 1 file changed, 51 insertions(+), 2 deletions(-) diff --git a/zerver/tests/test_queue.py b/zerver/tests/test_queue.py index 5d3cfb4098..9401a847b2 100644 --- a/zerver/tests/test_queue.py +++ b/zerver/tests/test_queue.py @@ -3,9 +3,11 @@ import os from typing import Any import ujson -from pika.exceptions import ConnectionClosed +from django.test import override_settings +from pika.exceptions import ConnectionClosed, AMQPConnectionError -from zerver.lib.queue import TornadoQueueClient +from zerver.lib.queue import TornadoQueueClient, queue_json_publish, \ + get_queue_client, SimpleQueueClient from zerver.lib.test_classes import ZulipTestCase class TestTornadoQueueClient(ZulipTestCase): @@ -16,3 +18,50 @@ class TestTornadoQueueClient(ZulipTestCase): connection = TornadoQueueClient() connection.connection.channel.side_effect = ConnectionClosed connection._on_open(mock.MagicMock()) + + +class TestQueueImplementation(ZulipTestCase): + @override_settings(USING_RABBITMQ=True) + def test_queue_basics(self) -> None: + queue_client = get_queue_client() + queue_client.publish("test_suite", 'test_event') + + result = queue_client.drain_queue("test_suite") + self.assertEqual(len(result), 1) + self.assertEqual(result[0], b'test_event') + + @override_settings(USING_RABBITMQ=True) + def test_queue_basics_json(self) -> None: + queue_json_publish("test_suite", {"event": "my_event"}) + + queue_client = get_queue_client() + result = queue_client.drain_queue("test_suite", json=True) + self.assertEqual(len(result), 1) + self.assertEqual(result[0]['event'], 'my_event') + + @override_settings(USING_RABBITMQ=True) + def test_queue_error_json(self) -> None: + queue_client = get_queue_client() + actual_publish = queue_client.publish + + self.counter = 0 + + def throw_connection_error_once(self_obj: Any, *args: Any, + **kwargs: Any) -> None: + self.counter += 1 + if self.counter <= 1: + raise AMQPConnectionError("test") + actual_publish(*args, **kwargs) + + with mock.patch("zerver.lib.queue.SimpleQueueClient.publish", + throw_connection_error_once): + queue_json_publish("test_suite", {"event": "my_event"}) + + result = queue_client.drain_queue("test_suite", json=True) + self.assertEqual(len(result), 1) + self.assertEqual(result[0]['event'], 'my_event') + + @override_settings(USING_RABBITMQ=True) + def tearDown(self) -> None: + queue_client = get_queue_client() + queue_client.drain_queue("test_suite")