From 64268f47e8811a1f9dd7d4ae8abfe87524ad8f42 Mon Sep 17 00:00:00 2001 From: Alex Vandiver Date: Mon, 15 Nov 2021 13:32:01 -0800 Subject: [PATCH] queue_processors: Drop unused current_queue_size, which was local size. The `current_queue_size` key in the queue monitoring stats file was the local queue size, not the global queue size -- d5a6b0f99acc renamed the function, but did not adjust the queue monitoring JSON, despite the last use of it having been removed in cd9b194d88a0. The function is still used to mark "we emptied our queue," and it remains a reasonable metric for that. --- tools/tests/test_check_rabbitmq_queue.py | 5 ----- zerver/worker/queue_processors.py | 11 +++++------ 2 files changed, 5 insertions(+), 11 deletions(-) diff --git a/tools/tests/test_check_rabbitmq_queue.py b/tools/tests/test_check_rabbitmq_queue.py index f6a3c76381..b60c4fc068 100644 --- a/tools/tests/test_check_rabbitmq_queue.py +++ b/tools/tests/test_check_rabbitmq_queue.py @@ -25,7 +25,6 @@ class AnalyzeQueueStatsTests(TestCase): "name", { "update_time": time.time(), - "current_queue_size": 10000, "recent_average_consume_time": None, }, 10000, @@ -38,7 +37,6 @@ class AnalyzeQueueStatsTests(TestCase): "name", { "update_time": time.time(), - "current_queue_size": 10000, "queue_last_emptied_timestamp": time.time() - 10000, "recent_average_consume_time": 1, }, @@ -52,7 +50,6 @@ class AnalyzeQueueStatsTests(TestCase): "name", { "update_time": time.time(), - "current_queue_size": 10000, "queue_last_emptied_timestamp": time.time() - 10000, "recent_average_consume_time": 0.0001, }, @@ -66,7 +63,6 @@ class AnalyzeQueueStatsTests(TestCase): "name", { "update_time": time.time(), - "current_queue_size": 11, "queue_last_emptied_timestamp": time.time() - 10000, "recent_average_consume_time": 1, }, @@ -79,7 +75,6 @@ class AnalyzeQueueStatsTests(TestCase): "name", { "update_time": time.time(), - "current_queue_size": 9, "queue_last_emptied_timestamp": time.time() - 10000, "recent_average_consume_time": 1, }, diff --git a/zerver/worker/queue_processors.py b/zerver/worker/queue_processors.py index ba4876628d..626acaff6a 100644 --- a/zerver/worker/queue_processors.py +++ b/zerver/worker/queue_processors.py @@ -234,9 +234,9 @@ class QueueProcessingWorker(ABC): self.idle = True self.last_statistics_update_time = 0.0 - self.update_statistics(0) + self.update_statistics() - def update_statistics(self, remaining_local_queue_size: int) -> None: + def update_statistics(self) -> None: total_seconds = sum(seconds for _, seconds in self.recent_consume_times) total_events = sum(events_number for events_number, _ in self.recent_consume_times) if total_events == 0: @@ -246,7 +246,6 @@ class QueueProcessingWorker(ABC): stats_dict = dict( update_time=time.time(), recent_average_consume_time=recent_average_consume_time, - current_queue_size=remaining_local_queue_size, queue_last_emptied_timestamp=self.queue_last_emptied_timestamp, consumed_since_last_emptied=self.consumed_since_last_emptied, ) @@ -296,7 +295,7 @@ class QueueProcessingWorker(ABC): # that the queue started processing, in case the event we're about to process # makes us freeze. self.idle = False - self.update_statistics(self.get_remaining_local_queue_size()) + self.update_statistics() time_start = time.time() if self.MAX_CONSUME_SECONDS and self.ENABLE_TIMEOUTS: @@ -333,7 +332,7 @@ class QueueProcessingWorker(ABC): # need to worry about the small overhead of doing a disk write. # We take advantage of this to update the stats file to keep it fresh, # especially since the queue might go idle until new events come in. - self.update_statistics(0) + self.update_statistics() self.idle = True return @@ -344,7 +343,7 @@ class QueueProcessingWorker(ABC): >= self.MAX_SECONDS_BEFORE_UPDATE_STATS ): self.consume_iteration_counter = 0 - self.update_statistics(remaining_local_queue_size) + self.update_statistics() def consume_single_event(self, event: Dict[str, Any]) -> None: consume_func = lambda events: self.consume(events[0])