From d5a6b0f99acc99911dbc95454b9b2cc446b79622 Mon Sep 17 00:00:00 2001 From: Alex Vandiver Date: Fri, 9 Oct 2020 13:12:55 -0700 Subject: [PATCH] queue: Rename queue_size, and update for all local queues. Despite its name, the `queue_size` method does not return the number of items in the queue; it returns the number of items that the local consumer has delivered but unprocessed. These are often, but not always, the same. RabbitMQ's queues maintain the queue of unacknowledged messages; when a consumer connects, it sends to the consumer some number of messages to handle, known as the "prefetch." This is a performance optimization, to ensure the consumer code does not need to wait for a network round-trip before having new data to consume. The default prefetch is 0, which means that RabbitMQ immediately dumps all outstanding messages to the consumer, which slowly processes and acknowledges them. If a second consumer were to connect to the same queue, they would receive no messages to process, as the first consumer has already been allocated them. If the first consumer disconnects or crashes, all prior events sent to it are then made available for other consumers on the queue. The consumer does not know the total size of the queue -- merely how many messages it has been handed. No change is made to the prefetch here; however, future changes may wish to limit the prefetch, either for memory-saving, or to allow multiple consumers to work the same queue. Rename the method to make clear that it only contains information about the local queue in the consumer, not the full RabbitMQ queue. Also include the waiting message count, which is used by the `consume()` iterator for similar purpose to the pending events list. --- zerver/lib/queue.py | 4 ++-- zerver/tests/test_queue_worker.py | 2 +- zerver/worker/queue_processors.py | 18 +++++++++--------- 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/zerver/lib/queue.py b/zerver/lib/queue.py index a5718aca68..f7d59370d1 100644 --- a/zerver/lib/queue.py +++ b/zerver/lib/queue.py @@ -203,9 +203,9 @@ class SimpleQueueClient: with self.drain_queue(queue_name) as binary_messages: yield list(map(orjson.loads, binary_messages)) - def queue_size(self) -> int: + def local_queue_size(self) -> int: assert self.channel is not None - return len(self.channel._pending_events) + return self.channel.get_waiting_message_count() + len(self.channel._pending_events) def start_consuming(self) -> None: assert self.channel is not None diff --git a/zerver/tests/test_queue_worker.py b/zerver/tests/test_queue_worker.py index 8259eb4710..5375564eff 100644 --- a/zerver/tests/test_queue_worker.py +++ b/zerver/tests/test_queue_worker.py @@ -75,7 +75,7 @@ class WorkerTest(ZulipTestCase): yield events - def queue_size(self) -> int: + def local_queue_size(self) -> int: return len(self.queue) def test_UserActivityWorker(self) -> None: diff --git a/zerver/worker/queue_processors.py b/zerver/worker/queue_processors.py index f143aaa18d..d938979ea7 100644 --- a/zerver/worker/queue_processors.py +++ b/zerver/worker/queue_processors.py @@ -200,7 +200,7 @@ class QueueProcessingWorker(ABC): self.update_statistics(0) - def update_statistics(self, remaining_queue_size: int) -> None: + def update_statistics(self, remaining_local_queue_size: int) -> None: total_seconds = sum(seconds for _, seconds in self.recent_consume_times) total_events = sum(events_number for events_number, _ in self.recent_consume_times) if total_events == 0: @@ -210,7 +210,7 @@ class QueueProcessingWorker(ABC): stats_dict = dict( update_time=time.time(), recent_average_consume_time=recent_average_consume_time, - current_queue_size=remaining_queue_size, + current_queue_size=remaining_local_queue_size, queue_last_emptied_timestamp=self.queue_last_emptied_timestamp, consumed_since_last_emptied=self.consumed_since_last_emptied, ) @@ -228,9 +228,9 @@ class QueueProcessingWorker(ABC): os.rename(tmp_fn, fn) self.last_statistics_update_time = time.time() - def get_remaining_queue_size(self) -> int: + def get_remaining_local_queue_size(self) -> int: if self.q is not None: - return self.q.queue_size() + return self.q.local_queue_size() else: # This is a special case that will happen if we're operating without # using RabbitMQ (e.g. in tests). In that case there's no queuing to speak of @@ -250,7 +250,7 @@ class QueueProcessingWorker(ABC): type='debug', category='queue_processor', message=f"Consuming {self.queue_name}", - data={"events": events, "queue_size": self.get_remaining_queue_size()}, + data={"events": events, "local_queue_size": self.get_remaining_local_queue_size()}, ) try: if self.idle: @@ -259,7 +259,7 @@ class QueueProcessingWorker(ABC): # that the queue started processing, in case the event we're about to process # makes us freeze. self.idle = False - self.update_statistics(self.get_remaining_queue_size()) + self.update_statistics(self.get_remaining_local_queue_size()) time_start = time.time() if self.MAX_CONSUME_SECONDS and self.ENABLE_TIMEOUTS: @@ -288,8 +288,8 @@ class QueueProcessingWorker(ABC): if consume_time_seconds is not None: self.recent_consume_times.append((len(events), consume_time_seconds)) - remaining_queue_size = self.get_remaining_queue_size() - if remaining_queue_size == 0: + remaining_local_queue_size = self.get_remaining_local_queue_size() + if remaining_local_queue_size == 0: self.queue_last_emptied_timestamp = time.time() self.consumed_since_last_emptied = 0 # We've cleared all the events from the queue, so we don't @@ -304,7 +304,7 @@ class QueueProcessingWorker(ABC): if (self.consume_iteration_counter >= self.CONSUME_ITERATIONS_BEFORE_UPDATE_STATS_NUM or time.time() - self.last_statistics_update_time >= self.MAX_SECONDS_BEFORE_UPDATE_STATS): self.consume_iteration_counter = 0 - self.update_statistics(remaining_queue_size) + self.update_statistics(remaining_local_queue_size) def consume_wrapper(self, data: Dict[str, Any]) -> None: consume_func = lambda events: self.consume(events[0])