queue: Add a tool to profile no-op enqueue and dequeue actions.

This commit is contained in:
Alex Vandiver 2020-10-02 16:29:49 -07:00 committed by Tim Abbott
parent 98529564ae
commit 8cf37a0d4b
4 changed files with 134 additions and 15 deletions

View File

@ -61,7 +61,9 @@ fi
echo; echo "Now confirming all the RabbitMQ queue processors are correctly registered!"; echo
# These hacky shell scripts just extract the sorted list of queue processors, running and expected
supervisorctl status | cut -f1 -dR | cut -f2- -d: | grep events | cut -f1 -d" " | cut -f3- -d_ | cut -f1 -d- | sort -u > /tmp/running_queue_processors.txt
su zulip -c /home/zulip/deployments/current/scripts/lib/queue_workers.py | grep -v ^test$ | sort -u > /tmp/expected_queue_processors.txt
su zulip -c /home/zulip/deployments/current/scripts/lib/queue_workers.py | sort -u > /tmp/all_queue_processors.txt
su zulip -c "/home/zulip/deployments/current/scripts/lib/queue_workers.py --queue-type test" | sort -u > /tmp/expected_test_queues.txt
grep -v -x -f /tmp/expected_test_queues.txt /tmp/all_queue_processors.txt > /tmp/expected_queue_processors.txt
if ! diff /tmp/expected_queue_processors.txt /tmp/running_queue_processors.txt >/dev/null; then
set +x
echo "FAILURE: Runnable queue processors declared in zerver/worker/queue_processors.py "

View File

@ -630,8 +630,10 @@ class WorkerTest(ZulipTestCase):
TestWorker()
def test_get_active_worker_queues(self) -> None:
test_queue_count = len(get_active_worker_queues(queue_type='test'))
self.assertEqual(3, test_queue_count)
worker_queue_count = (len(QueueProcessingWorker.__subclasses__()) +
len(EmailSendingWorker.__subclasses__()) +
len(LoopQueueProcessingWorker.__subclasses__()) - 1)
self.assertEqual(worker_queue_count, len(get_active_worker_queues()))
self.assertEqual(1, len(get_active_worker_queues(queue_type='test')))

View File

@ -25,6 +25,7 @@ from typing import (
Mapping,
MutableSequence,
Optional,
Set,
Tuple,
Type,
TypeVar,
@ -634,19 +635,6 @@ class MirrorWorker(QueueProcessingWorker):
mirror_email(msg, rcpt_to=rcpt_to)
@assign_queue('test', queue_type="test")
class TestWorker(QueueProcessingWorker):
# This worker allows you to test the queue worker infrastructure without
# creating significant side effects. It can be useful in development or
# for troubleshooting prod/staging. It pulls a message off the test queue
# and appends it to a file in /tmp.
def consume(self, event: Mapping[str, Any]) -> None: # nocoverage
fn = settings.ZULIP_WORKER_TEST_FILE
message = orjson.dumps(event)
logging.info("TestWorker should append this message to %s: %s", fn, message.decode())
with open(fn, 'ab') as f:
f.write(message + b'\n')
@assign_queue('embed_links')
class FetchLinksEmbedData(QueueProcessingWorker):
# This is a slow queue with network requests, so a disk write is neglible.
@ -827,3 +815,55 @@ class DeferredWorker(QueueProcessingWorker):
"Completed data export for %s in %s",
user_profile.realm.string_id, time.time() - start,
)
@assign_queue('test', queue_type="test")
class TestWorker(QueueProcessingWorker):
# This worker allows you to test the queue worker infrastructure without
# creating significant side effects. It can be useful in development or
# for troubleshooting prod/staging. It pulls a message off the test queue
# and appends it to a file in /tmp.
def consume(self, event: Mapping[str, Any]) -> None: # nocoverage
fn = settings.ZULIP_WORKER_TEST_FILE
message = orjson.dumps(event)
logging.info("TestWorker should append this message to %s: %s", fn, message.decode())
with open(fn, 'ab') as f:
f.write(message + b'\n')
@assign_queue('noop', queue_type="test")
class NoopWorker(QueueProcessingWorker):
"""Used to profile the queue procesing framework, in zilencer's queue_rate."""
def __init__(self, max_consume: int=1000, slow_queries: Optional[List[int]]=None) -> None:
self.consumed = 0
self.max_consume = max_consume
self.slow_queries: Set[int] = set(slow_queries or [])
def consume(self, event: Mapping[str, Any]) -> None:
self.consumed += 1
if self.consumed in self.slow_queries:
logging.info("Slow request...")
time.sleep(60)
logging.info("Done!")
if self.consumed >= self.max_consume:
self.stop()
@assign_queue('noop_batch', queue_type="test")
class BatchNoopWorker(LoopQueueProcessingWorker):
"""Used to profile the queue procesing framework, in zilencer's queue_rate."""
batch_size = 500
def __init__(self, max_consume: int=1000, slow_queries: Optional[List[int]]=None) -> None:
self.consumed = 0
self.max_consume = max_consume
self.slow_queries: Set[int] = set(slow_queries or [])
def consume_batch(self, events: List[Dict[str, Any]]) -> None:
event_numbers = set(range(self.consumed + 1, self.consumed + 1 + len(events)))
found_slow = self.slow_queries & event_numbers
if found_slow:
logging.info("%d slow requests...", len(found_slow))
time.sleep(60 * len(found_slow))
logging.info("Done!")
self.consumed += len(events)
if self.consumed >= self.max_consume:
self.stop()

View File

@ -0,0 +1,75 @@
from timeit import timeit
from typing import Any
from django.core.management.base import BaseCommand, CommandParser
from zerver.lib.queue import SimpleQueueClient, queue_json_publish
from zerver.worker.queue_processors import BatchNoopWorker, NoopWorker, QueueProcessingWorker
class Command(BaseCommand):
help = """Times the overhead of enqueuing and dequeuing messages from rabbitmq."""
def add_arguments(self, parser: CommandParser) -> None:
parser.add_argument(
"--count", help="Number of messages to enqueue", default=10000, type=int
)
parser.add_argument(
"--reps", help="Iterations of enqueue/dequeue", default=1, type=int
)
parser.add_argument(
"--batch", help="Enables batch dequeuing", action="store_true"
)
parser.add_argument(
"--prefetch",
help="Limits the prefetch size; rabbitmq defaults to unbounded (0)",
default=0,
type=int,
)
parser.add_argument(
"--slow",
help="Which request numbers should take 60s (1-based)",
action="append",
type=int,
default=[],
)
def handle(self, *args: Any, **options: Any) -> None:
print("Purging queue...")
queue = SimpleQueueClient()
queue_name = "noop_batch" if options["batch"] else "noop"
queue.ensure_queue(queue_name, lambda channel: channel.queue_purge("noop"))
count = options["count"]
reps = options["reps"]
worker: QueueProcessingWorker = NoopWorker(count, options["slow"])
if options["batch"]:
worker = BatchNoopWorker(count, options["slow"])
worker.setup()
assert worker.q is not None
assert worker.q.channel is not None
worker.q.channel.basic_qos(prefetch_count=options["prefetch"])
total_enqueue_time = 0.0
total_dequeue_time = 0.0
def one_rep() -> None:
nonlocal total_enqueue_time, total_dequeue_time
total_enqueue_time += timeit(
lambda: queue_json_publish(queue_name, {}),
number=count,
)
total_dequeue_time += timeit(
lambda: worker.start(),
number=1,
)
rate = lambda time, iterations: int(iterations/time)
total_reps_time = timeit(one_rep, number=reps)
if reps > 1:
print(f"Total rate per rep: {rate(total_reps_time, reps)} / sec")
print(f"Enqueue rate: {rate(total_enqueue_time, count * reps)} / sec")
print(f"Dequeue rate: {rate(total_dequeue_time, count * reps)} / sec")