tests: Add option to call queue processor consumer.

This makes tests of queue processors more realistic,
by adding a parameter to `queue_json_publish` that
calls a queue's consumer function if accessed in a test.

Fixes part of #6542.
This commit is contained in:
derAnfaenger 2017-10-13 17:53:02 +02:00 committed by Tim Abbott
parent 22ade4f691
commit af699500b7
2 changed files with 7 additions and 3 deletions

View File

@ -297,12 +297,16 @@ def get_queue_client():
# randomly close.
queue_lock = threading.RLock()
def queue_json_publish(queue_name, event, processor):
# type: (str, Union[Mapping[str, Any], str], Callable[[Any], None]) -> None
def queue_json_publish(queue_name, event, processor, call_consume_in_tests=False):
# type: (str, Union[Dict[str, Any], str], Callable[[Any], None], bool) -> None
# most events are dicts, but zerver.middleware.write_log_line uses a str
with queue_lock:
if settings.USING_RABBITMQ:
get_queue_client().json_publish(queue_name, event)
elif call_consume_in_tests:
# Must be imported here: A top section import leads to obscure not-defined-ish errors.
from zerver.worker.queue_processors import get_worker
get_worker(queue_name).consume_wrapper(event) # type: ignore # https://github.com/python/mypy/issues/3360
else:
processor(event)

View File

@ -942,7 +942,7 @@ def send_notification_http(data):
process_notification(data)
def send_notification(data):
# type: (Mapping[str, Any]) -> None
# type: (Dict[str, Any]) -> None
queue_json_publish("notify_tornado", data, send_notification_http)
def send_event(event, users):