mirror of https://github.com/zulip/zulip.git
queue: Rename queue_size, and update for all local queues.
Despite its name, the `queue_size` method does not return the number of items in the queue; it returns the number of items that the local consumer has delivered but unprocessed. These are often, but not always, the same. RabbitMQ's queues maintain the queue of unacknowledged messages; when a consumer connects, it sends to the consumer some number of messages to handle, known as the "prefetch." This is a performance optimization, to ensure the consumer code does not need to wait for a network round-trip before having new data to consume. The default prefetch is 0, which means that RabbitMQ immediately dumps all outstanding messages to the consumer, which slowly processes and acknowledges them. If a second consumer were to connect to the same queue, they would receive no messages to process, as the first consumer has already been allocated them. If the first consumer disconnects or crashes, all prior events sent to it are then made available for other consumers on the queue. The consumer does not know the total size of the queue -- merely how many messages it has been handed. No change is made to the prefetch here; however, future changes may wish to limit the prefetch, either for memory-saving, or to allow multiple consumers to work the same queue. Rename the method to make clear that it only contains information about the local queue in the consumer, not the full RabbitMQ queue. Also include the waiting message count, which is used by the `consume()` iterator for similar purpose to the pending events list.
This commit is contained in:
parent
a1ce1aca3b
commit
d5a6b0f99a
|
@ -203,9 +203,9 @@ class SimpleQueueClient:
|
||||||
with self.drain_queue(queue_name) as binary_messages:
|
with self.drain_queue(queue_name) as binary_messages:
|
||||||
yield list(map(orjson.loads, binary_messages))
|
yield list(map(orjson.loads, binary_messages))
|
||||||
|
|
||||||
def queue_size(self) -> int:
|
def local_queue_size(self) -> int:
|
||||||
assert self.channel is not None
|
assert self.channel is not None
|
||||||
return len(self.channel._pending_events)
|
return self.channel.get_waiting_message_count() + len(self.channel._pending_events)
|
||||||
|
|
||||||
def start_consuming(self) -> None:
|
def start_consuming(self) -> None:
|
||||||
assert self.channel is not None
|
assert self.channel is not None
|
||||||
|
|
|
@ -75,7 +75,7 @@ class WorkerTest(ZulipTestCase):
|
||||||
|
|
||||||
yield events
|
yield events
|
||||||
|
|
||||||
def queue_size(self) -> int:
|
def local_queue_size(self) -> int:
|
||||||
return len(self.queue)
|
return len(self.queue)
|
||||||
|
|
||||||
def test_UserActivityWorker(self) -> None:
|
def test_UserActivityWorker(self) -> None:
|
||||||
|
|
|
@ -200,7 +200,7 @@ class QueueProcessingWorker(ABC):
|
||||||
|
|
||||||
self.update_statistics(0)
|
self.update_statistics(0)
|
||||||
|
|
||||||
def update_statistics(self, remaining_queue_size: int) -> None:
|
def update_statistics(self, remaining_local_queue_size: int) -> None:
|
||||||
total_seconds = sum(seconds for _, seconds in self.recent_consume_times)
|
total_seconds = sum(seconds for _, seconds in self.recent_consume_times)
|
||||||
total_events = sum(events_number for events_number, _ in self.recent_consume_times)
|
total_events = sum(events_number for events_number, _ in self.recent_consume_times)
|
||||||
if total_events == 0:
|
if total_events == 0:
|
||||||
|
@ -210,7 +210,7 @@ class QueueProcessingWorker(ABC):
|
||||||
stats_dict = dict(
|
stats_dict = dict(
|
||||||
update_time=time.time(),
|
update_time=time.time(),
|
||||||
recent_average_consume_time=recent_average_consume_time,
|
recent_average_consume_time=recent_average_consume_time,
|
||||||
current_queue_size=remaining_queue_size,
|
current_queue_size=remaining_local_queue_size,
|
||||||
queue_last_emptied_timestamp=self.queue_last_emptied_timestamp,
|
queue_last_emptied_timestamp=self.queue_last_emptied_timestamp,
|
||||||
consumed_since_last_emptied=self.consumed_since_last_emptied,
|
consumed_since_last_emptied=self.consumed_since_last_emptied,
|
||||||
)
|
)
|
||||||
|
@ -228,9 +228,9 @@ class QueueProcessingWorker(ABC):
|
||||||
os.rename(tmp_fn, fn)
|
os.rename(tmp_fn, fn)
|
||||||
self.last_statistics_update_time = time.time()
|
self.last_statistics_update_time = time.time()
|
||||||
|
|
||||||
def get_remaining_queue_size(self) -> int:
|
def get_remaining_local_queue_size(self) -> int:
|
||||||
if self.q is not None:
|
if self.q is not None:
|
||||||
return self.q.queue_size()
|
return self.q.local_queue_size()
|
||||||
else:
|
else:
|
||||||
# This is a special case that will happen if we're operating without
|
# This is a special case that will happen if we're operating without
|
||||||
# using RabbitMQ (e.g. in tests). In that case there's no queuing to speak of
|
# using RabbitMQ (e.g. in tests). In that case there's no queuing to speak of
|
||||||
|
@ -250,7 +250,7 @@ class QueueProcessingWorker(ABC):
|
||||||
type='debug',
|
type='debug',
|
||||||
category='queue_processor',
|
category='queue_processor',
|
||||||
message=f"Consuming {self.queue_name}",
|
message=f"Consuming {self.queue_name}",
|
||||||
data={"events": events, "queue_size": self.get_remaining_queue_size()},
|
data={"events": events, "local_queue_size": self.get_remaining_local_queue_size()},
|
||||||
)
|
)
|
||||||
try:
|
try:
|
||||||
if self.idle:
|
if self.idle:
|
||||||
|
@ -259,7 +259,7 @@ class QueueProcessingWorker(ABC):
|
||||||
# that the queue started processing, in case the event we're about to process
|
# that the queue started processing, in case the event we're about to process
|
||||||
# makes us freeze.
|
# makes us freeze.
|
||||||
self.idle = False
|
self.idle = False
|
||||||
self.update_statistics(self.get_remaining_queue_size())
|
self.update_statistics(self.get_remaining_local_queue_size())
|
||||||
|
|
||||||
time_start = time.time()
|
time_start = time.time()
|
||||||
if self.MAX_CONSUME_SECONDS and self.ENABLE_TIMEOUTS:
|
if self.MAX_CONSUME_SECONDS and self.ENABLE_TIMEOUTS:
|
||||||
|
@ -288,8 +288,8 @@ class QueueProcessingWorker(ABC):
|
||||||
if consume_time_seconds is not None:
|
if consume_time_seconds is not None:
|
||||||
self.recent_consume_times.append((len(events), consume_time_seconds))
|
self.recent_consume_times.append((len(events), consume_time_seconds))
|
||||||
|
|
||||||
remaining_queue_size = self.get_remaining_queue_size()
|
remaining_local_queue_size = self.get_remaining_local_queue_size()
|
||||||
if remaining_queue_size == 0:
|
if remaining_local_queue_size == 0:
|
||||||
self.queue_last_emptied_timestamp = time.time()
|
self.queue_last_emptied_timestamp = time.time()
|
||||||
self.consumed_since_last_emptied = 0
|
self.consumed_since_last_emptied = 0
|
||||||
# We've cleared all the events from the queue, so we don't
|
# We've cleared all the events from the queue, so we don't
|
||||||
|
@ -304,7 +304,7 @@ class QueueProcessingWorker(ABC):
|
||||||
if (self.consume_iteration_counter >= self.CONSUME_ITERATIONS_BEFORE_UPDATE_STATS_NUM
|
if (self.consume_iteration_counter >= self.CONSUME_ITERATIONS_BEFORE_UPDATE_STATS_NUM
|
||||||
or time.time() - self.last_statistics_update_time >= self.MAX_SECONDS_BEFORE_UPDATE_STATS):
|
or time.time() - self.last_statistics_update_time >= self.MAX_SECONDS_BEFORE_UPDATE_STATS):
|
||||||
self.consume_iteration_counter = 0
|
self.consume_iteration_counter = 0
|
||||||
self.update_statistics(remaining_queue_size)
|
self.update_statistics(remaining_local_queue_size)
|
||||||
|
|
||||||
def consume_wrapper(self, data: Dict[str, Any]) -> None:
|
def consume_wrapper(self, data: Dict[str, Any]) -> None:
|
||||||
consume_func = lambda events: self.consume(events[0])
|
consume_func = lambda events: self.consume(events[0])
|
||||||
|
|
Loading…
Reference in New Issue