queue_processors: Drop unused current_queue_size, which was local size.

The `current_queue_size` key in the queue monitoring stats file was
the local queue size, not the global queue size -- d5a6b0f99a
renamed the function, but did not adjust the queue monitoring JSON,
despite the last use of it having been removed in cd9b194d88.

The function is still used to mark "we emptied our queue," and it
remains a reasonable metric for that.
This commit is contained in:
Alex Vandiver 2021-11-15 13:32:01 -08:00 committed by Tim Abbott
parent 800e38016a
commit 64268f47e8
2 changed files with 5 additions and 11 deletions

View File

@ -25,7 +25,6 @@ class AnalyzeQueueStatsTests(TestCase):
"name", "name",
{ {
"update_time": time.time(), "update_time": time.time(),
"current_queue_size": 10000,
"recent_average_consume_time": None, "recent_average_consume_time": None,
}, },
10000, 10000,
@ -38,7 +37,6 @@ class AnalyzeQueueStatsTests(TestCase):
"name", "name",
{ {
"update_time": time.time(), "update_time": time.time(),
"current_queue_size": 10000,
"queue_last_emptied_timestamp": time.time() - 10000, "queue_last_emptied_timestamp": time.time() - 10000,
"recent_average_consume_time": 1, "recent_average_consume_time": 1,
}, },
@ -52,7 +50,6 @@ class AnalyzeQueueStatsTests(TestCase):
"name", "name",
{ {
"update_time": time.time(), "update_time": time.time(),
"current_queue_size": 10000,
"queue_last_emptied_timestamp": time.time() - 10000, "queue_last_emptied_timestamp": time.time() - 10000,
"recent_average_consume_time": 0.0001, "recent_average_consume_time": 0.0001,
}, },
@ -66,7 +63,6 @@ class AnalyzeQueueStatsTests(TestCase):
"name", "name",
{ {
"update_time": time.time(), "update_time": time.time(),
"current_queue_size": 11,
"queue_last_emptied_timestamp": time.time() - 10000, "queue_last_emptied_timestamp": time.time() - 10000,
"recent_average_consume_time": 1, "recent_average_consume_time": 1,
}, },
@ -79,7 +75,6 @@ class AnalyzeQueueStatsTests(TestCase):
"name", "name",
{ {
"update_time": time.time(), "update_time": time.time(),
"current_queue_size": 9,
"queue_last_emptied_timestamp": time.time() - 10000, "queue_last_emptied_timestamp": time.time() - 10000,
"recent_average_consume_time": 1, "recent_average_consume_time": 1,
}, },

View File

@ -234,9 +234,9 @@ class QueueProcessingWorker(ABC):
self.idle = True self.idle = True
self.last_statistics_update_time = 0.0 self.last_statistics_update_time = 0.0
self.update_statistics(0) self.update_statistics()
def update_statistics(self, remaining_local_queue_size: int) -> None: def update_statistics(self) -> None:
total_seconds = sum(seconds for _, seconds in self.recent_consume_times) total_seconds = sum(seconds for _, seconds in self.recent_consume_times)
total_events = sum(events_number for events_number, _ in self.recent_consume_times) total_events = sum(events_number for events_number, _ in self.recent_consume_times)
if total_events == 0: if total_events == 0:
@ -246,7 +246,6 @@ class QueueProcessingWorker(ABC):
stats_dict = dict( stats_dict = dict(
update_time=time.time(), update_time=time.time(),
recent_average_consume_time=recent_average_consume_time, recent_average_consume_time=recent_average_consume_time,
current_queue_size=remaining_local_queue_size,
queue_last_emptied_timestamp=self.queue_last_emptied_timestamp, queue_last_emptied_timestamp=self.queue_last_emptied_timestamp,
consumed_since_last_emptied=self.consumed_since_last_emptied, consumed_since_last_emptied=self.consumed_since_last_emptied,
) )
@ -296,7 +295,7 @@ class QueueProcessingWorker(ABC):
# that the queue started processing, in case the event we're about to process # that the queue started processing, in case the event we're about to process
# makes us freeze. # makes us freeze.
self.idle = False self.idle = False
self.update_statistics(self.get_remaining_local_queue_size()) self.update_statistics()
time_start = time.time() time_start = time.time()
if self.MAX_CONSUME_SECONDS and self.ENABLE_TIMEOUTS: if self.MAX_CONSUME_SECONDS and self.ENABLE_TIMEOUTS:
@ -333,7 +332,7 @@ class QueueProcessingWorker(ABC):
# need to worry about the small overhead of doing a disk write. # need to worry about the small overhead of doing a disk write.
# We take advantage of this to update the stats file to keep it fresh, # We take advantage of this to update the stats file to keep it fresh,
# especially since the queue might go idle until new events come in. # especially since the queue might go idle until new events come in.
self.update_statistics(0) self.update_statistics()
self.idle = True self.idle = True
return return
@ -344,7 +343,7 @@ class QueueProcessingWorker(ABC):
>= self.MAX_SECONDS_BEFORE_UPDATE_STATS >= self.MAX_SECONDS_BEFORE_UPDATE_STATS
): ):
self.consume_iteration_counter = 0 self.consume_iteration_counter = 0
self.update_statistics(remaining_local_queue_size) self.update_statistics()
def consume_single_event(self, event: Dict[str, Any]) -> None: def consume_single_event(self, event: Dict[str, Any]) -> None:
consume_func = lambda events: self.consume(events[0]) consume_func = lambda events: self.consume(events[0])