diff --git a/tools/tests/test_check_rabbitmq_queue.py b/tools/tests/test_check_rabbitmq_queue.py index f6a3c76381..b60c4fc068 100644 --- a/tools/tests/test_check_rabbitmq_queue.py +++ b/tools/tests/test_check_rabbitmq_queue.py @@ -25,7 +25,6 @@ class AnalyzeQueueStatsTests(TestCase): "name", { "update_time": time.time(), - "current_queue_size": 10000, "recent_average_consume_time": None, }, 10000, @@ -38,7 +37,6 @@ class AnalyzeQueueStatsTests(TestCase): "name", { "update_time": time.time(), - "current_queue_size": 10000, "queue_last_emptied_timestamp": time.time() - 10000, "recent_average_consume_time": 1, }, @@ -52,7 +50,6 @@ class AnalyzeQueueStatsTests(TestCase): "name", { "update_time": time.time(), - "current_queue_size": 10000, "queue_last_emptied_timestamp": time.time() - 10000, "recent_average_consume_time": 0.0001, }, @@ -66,7 +63,6 @@ class AnalyzeQueueStatsTests(TestCase): "name", { "update_time": time.time(), - "current_queue_size": 11, "queue_last_emptied_timestamp": time.time() - 10000, "recent_average_consume_time": 1, }, @@ -79,7 +75,6 @@ class AnalyzeQueueStatsTests(TestCase): "name", { "update_time": time.time(), - "current_queue_size": 9, "queue_last_emptied_timestamp": time.time() - 10000, "recent_average_consume_time": 1, }, diff --git a/zerver/worker/queue_processors.py b/zerver/worker/queue_processors.py index ba4876628d..626acaff6a 100644 --- a/zerver/worker/queue_processors.py +++ b/zerver/worker/queue_processors.py @@ -234,9 +234,9 @@ class QueueProcessingWorker(ABC): self.idle = True self.last_statistics_update_time = 0.0 - self.update_statistics(0) + self.update_statistics() - def update_statistics(self, remaining_local_queue_size: int) -> None: + def update_statistics(self) -> None: total_seconds = sum(seconds for _, seconds in self.recent_consume_times) total_events = sum(events_number for events_number, _ in self.recent_consume_times) if total_events == 0: @@ -246,7 +246,6 @@ class QueueProcessingWorker(ABC): stats_dict = dict( update_time=time.time(), recent_average_consume_time=recent_average_consume_time, - current_queue_size=remaining_local_queue_size, queue_last_emptied_timestamp=self.queue_last_emptied_timestamp, consumed_since_last_emptied=self.consumed_since_last_emptied, ) @@ -296,7 +295,7 @@ class QueueProcessingWorker(ABC): # that the queue started processing, in case the event we're about to process # makes us freeze. self.idle = False - self.update_statistics(self.get_remaining_local_queue_size()) + self.update_statistics() time_start = time.time() if self.MAX_CONSUME_SECONDS and self.ENABLE_TIMEOUTS: @@ -333,7 +332,7 @@ class QueueProcessingWorker(ABC): # need to worry about the small overhead of doing a disk write. # We take advantage of this to update the stats file to keep it fresh, # especially since the queue might go idle until new events come in. - self.update_statistics(0) + self.update_statistics() self.idle = True return @@ -344,7 +343,7 @@ class QueueProcessingWorker(ABC): >= self.MAX_SECONDS_BEFORE_UPDATE_STATS ): self.consume_iteration_counter = 0 - self.update_statistics(remaining_local_queue_size) + self.update_statistics() def consume_single_event(self, event: Dict[str, Any]) -> None: consume_func = lambda events: self.consume(events[0])