queue: Rename queue_size, and update for all local queues.

Despite its name, the `queue_size` method does not return the number
of items in the queue; it returns the number of items that the local
consumer has delivered but unprocessed.  These are often, but not
always, the same.

RabbitMQ's queues maintain the queue of unacknowledged messages; when
a consumer connects, it sends to the consumer some number of messages
to handle, known as the "prefetch."  This is a performance
optimization, to ensure the consumer code does not need to wait for a
network round-trip before having new data to consume.

The default prefetch is 0, which means that RabbitMQ immediately dumps
all outstanding messages to the consumer, which slowly processes and
acknowledges them.  If a second consumer were to connect to the same
queue, they would receive no messages to process, as the first
consumer has already been allocated them.  If the first consumer
disconnects or crashes, all prior events sent to it are then made
available for other consumers on the queue.

The consumer does not know the total size of the queue -- merely how
many messages it has been handed.

No change is made to the prefetch here; however, future changes may
wish to limit the prefetch, either for memory-saving, or to allow
multiple consumers to work the same queue.

Rename the method to make clear that it only contains information
about the local queue in the consumer, not the full RabbitMQ queue.
Also include the waiting message count, which is used by the
`consume()` iterator for similar purpose to the pending events list.
This commit is contained in:
Alex Vandiver 2020-10-09 13:12:55 -07:00
parent a1ce1aca3b
commit d5a6b0f99a
3 changed files with 12 additions and 12 deletions

View File

@ -203,9 +203,9 @@ class SimpleQueueClient:
with self.drain_queue(queue_name) as binary_messages: with self.drain_queue(queue_name) as binary_messages:
yield list(map(orjson.loads, binary_messages)) yield list(map(orjson.loads, binary_messages))
def queue_size(self) -> int: def local_queue_size(self) -> int:
assert self.channel is not None assert self.channel is not None
return len(self.channel._pending_events) return self.channel.get_waiting_message_count() + len(self.channel._pending_events)
def start_consuming(self) -> None: def start_consuming(self) -> None:
assert self.channel is not None assert self.channel is not None

View File

@ -75,7 +75,7 @@ class WorkerTest(ZulipTestCase):
yield events yield events
def queue_size(self) -> int: def local_queue_size(self) -> int:
return len(self.queue) return len(self.queue)
def test_UserActivityWorker(self) -> None: def test_UserActivityWorker(self) -> None:

View File

@ -200,7 +200,7 @@ class QueueProcessingWorker(ABC):
self.update_statistics(0) self.update_statistics(0)
def update_statistics(self, remaining_queue_size: int) -> None: def update_statistics(self, remaining_local_queue_size: int) -> None:
total_seconds = sum(seconds for _, seconds in self.recent_consume_times) total_seconds = sum(seconds for _, seconds in self.recent_consume_times)
total_events = sum(events_number for events_number, _ in self.recent_consume_times) total_events = sum(events_number for events_number, _ in self.recent_consume_times)
if total_events == 0: if total_events == 0:
@ -210,7 +210,7 @@ class QueueProcessingWorker(ABC):
stats_dict = dict( stats_dict = dict(
update_time=time.time(), update_time=time.time(),
recent_average_consume_time=recent_average_consume_time, recent_average_consume_time=recent_average_consume_time,
current_queue_size=remaining_queue_size, current_queue_size=remaining_local_queue_size,
queue_last_emptied_timestamp=self.queue_last_emptied_timestamp, queue_last_emptied_timestamp=self.queue_last_emptied_timestamp,
consumed_since_last_emptied=self.consumed_since_last_emptied, consumed_since_last_emptied=self.consumed_since_last_emptied,
) )
@ -228,9 +228,9 @@ class QueueProcessingWorker(ABC):
os.rename(tmp_fn, fn) os.rename(tmp_fn, fn)
self.last_statistics_update_time = time.time() self.last_statistics_update_time = time.time()
def get_remaining_queue_size(self) -> int: def get_remaining_local_queue_size(self) -> int:
if self.q is not None: if self.q is not None:
return self.q.queue_size() return self.q.local_queue_size()
else: else:
# This is a special case that will happen if we're operating without # This is a special case that will happen if we're operating without
# using RabbitMQ (e.g. in tests). In that case there's no queuing to speak of # using RabbitMQ (e.g. in tests). In that case there's no queuing to speak of
@ -250,7 +250,7 @@ class QueueProcessingWorker(ABC):
type='debug', type='debug',
category='queue_processor', category='queue_processor',
message=f"Consuming {self.queue_name}", message=f"Consuming {self.queue_name}",
data={"events": events, "queue_size": self.get_remaining_queue_size()}, data={"events": events, "local_queue_size": self.get_remaining_local_queue_size()},
) )
try: try:
if self.idle: if self.idle:
@ -259,7 +259,7 @@ class QueueProcessingWorker(ABC):
# that the queue started processing, in case the event we're about to process # that the queue started processing, in case the event we're about to process
# makes us freeze. # makes us freeze.
self.idle = False self.idle = False
self.update_statistics(self.get_remaining_queue_size()) self.update_statistics(self.get_remaining_local_queue_size())
time_start = time.time() time_start = time.time()
if self.MAX_CONSUME_SECONDS and self.ENABLE_TIMEOUTS: if self.MAX_CONSUME_SECONDS and self.ENABLE_TIMEOUTS:
@ -288,8 +288,8 @@ class QueueProcessingWorker(ABC):
if consume_time_seconds is not None: if consume_time_seconds is not None:
self.recent_consume_times.append((len(events), consume_time_seconds)) self.recent_consume_times.append((len(events), consume_time_seconds))
remaining_queue_size = self.get_remaining_queue_size() remaining_local_queue_size = self.get_remaining_local_queue_size()
if remaining_queue_size == 0: if remaining_local_queue_size == 0:
self.queue_last_emptied_timestamp = time.time() self.queue_last_emptied_timestamp = time.time()
self.consumed_since_last_emptied = 0 self.consumed_since_last_emptied = 0
# We've cleared all the events from the queue, so we don't # We've cleared all the events from the queue, so we don't
@ -304,7 +304,7 @@ class QueueProcessingWorker(ABC):
if (self.consume_iteration_counter >= self.CONSUME_ITERATIONS_BEFORE_UPDATE_STATS_NUM if (self.consume_iteration_counter >= self.CONSUME_ITERATIONS_BEFORE_UPDATE_STATS_NUM
or time.time() - self.last_statistics_update_time >= self.MAX_SECONDS_BEFORE_UPDATE_STATS): or time.time() - self.last_statistics_update_time >= self.MAX_SECONDS_BEFORE_UPDATE_STATS):
self.consume_iteration_counter = 0 self.consume_iteration_counter = 0
self.update_statistics(remaining_queue_size) self.update_statistics(remaining_local_queue_size)
def consume_wrapper(self, data: Dict[str, Any]) -> None: def consume_wrapper(self, data: Dict[str, Any]) -> None:
consume_func = lambda events: self.consume(events[0]) consume_func = lambda events: self.consume(events[0])