deferred_work: Re-queue remaining "mark all as read" work after 30s.

This commit is contained in:
Alex Vandiver 2024-02-27 01:34:40 +00:00 committed by Tim Abbott
parent 75e9903be5
commit 58f0669997
1 changed files with 13 additions and 3 deletions

View File

@ -78,7 +78,7 @@ from zerver.lib.push_notifications import (
initialize_push_notifications, initialize_push_notifications,
) )
from zerver.lib.pysa import mark_sanitized from zerver.lib.pysa import mark_sanitized
from zerver.lib.queue import SimpleQueueClient, retry_event from zerver.lib.queue import SimpleQueueClient, queue_json_publish, retry_event
from zerver.lib.remote_server import ( from zerver.lib.remote_server import (
PushNotificationBouncerRetryLaterError, PushNotificationBouncerRetryLaterError,
send_server_data_to_push_bouncer, send_server_data_to_push_bouncer,
@ -1061,7 +1061,8 @@ class DeferredWorker(QueueProcessingWorker):
stream = Stream.objects.get(recipient_id=event["stream_recipient_id"]) stream = Stream.objects.get(recipient_id=event["stream_recipient_id"])
# This event is generated by the stream deactivation code path. # This event is generated by the stream deactivation code path.
batch_size = 100 batch_size = 100
offset = 0 offset = event.get("offset", 0)
start_time = time.perf_counter()
while True: while True:
with transaction.atomic(savepoint=False): with transaction.atomic(savepoint=False):
# Note that this algorithm is susceptible to # Note that this algorithm is susceptible to
@ -1087,9 +1088,18 @@ class DeferredWorker(QueueProcessingWorker):
offset += message_count offset += message_count
if message_count < batch_size: if message_count < batch_size:
break break
if time.perf_counter() - start_time > 30:
# This task may take a _very_ long time to
# complete, if we have a large number of messages
# to mark as read. If we have taken more than
# 30s, we re-push the task onto the tail of the
# queue, to allow other deferred work to complete;
# this task is extremely low priority.
queue_json_publish("deferred_work", {**event, "offset": offset})
break
logger.info( logger.info(
"Marked %s messages as read for all users, stream_recipient_id %s", "Marked %s messages as read for all users, stream_recipient_id %s",
offset, offset - event.get("offset", 0),
event["stream_recipient_id"], event["stream_recipient_id"],
) )
elif event["type"] == "clear_push_device_tokens": elif event["type"] == "clear_push_device_tokens":