retention: Limit number of ids passed to db in delete messages query.

If do_delete_messages (and friends) are called for a massive number of
messages, the giant list of message ids is passed to Postgres even
though chunk_size makes all but the first chunk_size of message ids
useless.
This commit is contained in:
Mateusz Mandera 2024-09-19 21:59:44 +02:00 committed by Tim Abbott
parent ed7c330548
commit 18fbb5d146
1 changed files with 45 additions and 29 deletions

View File

@ -124,7 +124,7 @@ def run_archiving(
query: SQL,
type: int,
realm: Realm | None = None,
chunk_size: int = MESSAGE_BATCH_SIZE,
chunk_size: int | None = MESSAGE_BATCH_SIZE,
**kwargs: Composable,
) -> int:
# This function is carefully designed to achieve our
@ -135,6 +135,9 @@ def run_archiving(
# (such as UserMessage, Reaction, and Attachment) inside the same transaction.atomic() block.
assert type in (ArchiveTransaction.MANUAL, ArchiveTransaction.RETENTION_POLICY_BASED)
if chunk_size is not None:
kwargs["chunk_size"] = Literal(chunk_size)
message_count = 0
while True:
start_time = time.time()
@ -144,7 +147,6 @@ def run_archiving(
Message,
query,
src_db_table=None,
chunk_size=Literal(chunk_size),
returning_id=True,
archive_transaction_id=Literal(archive_transaction.id),
**kwargs,
@ -169,8 +171,9 @@ def run_archiving(
)
# We run the loop, until the query returns fewer results than chunk_size,
# which means we are done:
if len(new_chunk) < chunk_size:
# which means we are done; or if we're not chunking, we're done
# after one iteration.
if chunk_size is None or len(new_chunk) < chunk_size:
break
return message_count
@ -458,35 +461,48 @@ def get_realms_and_streams_for_archiving() -> list[tuple[Realm, list[Stream]]]:
def move_messages_to_archive(
message_ids: list[int], realm: Realm | None = None, chunk_size: int = MESSAGE_BATCH_SIZE
) -> None:
# Uses index: zerver_message_pkey
query = SQL(
"""
INSERT INTO zerver_archivedmessage ({dst_fields}, archive_transaction_id)
SELECT {src_fields}, {archive_transaction_id}
FROM zerver_message
WHERE zerver_message.id IN {message_ids}
LIMIT {chunk_size}
ON CONFLICT (id) DO UPDATE SET archive_transaction_id = {archive_transaction_id}
RETURNING id
"""
)
count = run_archiving(
query,
type=ArchiveTransaction.MANUAL,
message_ids=Literal(tuple(message_ids)),
realm=realm,
chunk_size=chunk_size,
)
Callers using this to archive a large amount of messages should ideally make sure the message_ids are
ordered, as that'll allow better performance here by keeping the batches that'll be sent to the database
ordered rather than randomly scattered.
"""
count = 0
# In order to avoid sending a massive list of message ids to the database,
# we'll handle chunking the list of ids directly here.
message_ids_head = message_ids
while message_ids_head:
message_ids_chunk = message_ids_head[0:chunk_size]
message_ids_head = message_ids_head[chunk_size:]
# Uses index: zerver_message_pkey
query = SQL(
"""
INSERT INTO zerver_archivedmessage ({dst_fields}, archive_transaction_id)
SELECT {src_fields}, {archive_transaction_id}
FROM zerver_message
WHERE zerver_message.id IN {message_ids}
ON CONFLICT (id) DO UPDATE SET archive_transaction_id = {archive_transaction_id}
RETURNING id
"""
)
count += run_archiving(
query,
type=ArchiveTransaction.MANUAL,
message_ids=Literal(tuple(message_ids_chunk)),
realm=realm,
chunk_size=None,
)
# Clean up attachments:
archived_attachments = ArchivedAttachment.objects.filter(
messages__id__in=message_ids_chunk
).distinct()
Attachment.objects.filter(
messages__isnull=True, scheduled_messages__isnull=True, id__in=archived_attachments
).delete()
if count == 0:
raise Message.DoesNotExist
# Clean up attachments:
archived_attachments = ArchivedAttachment.objects.filter(
messages__id__in=message_ids
).distinct()
Attachment.objects.filter(
messages__isnull=True, scheduled_messages__isnull=True, id__in=archived_attachments
).delete()
def restore_messages_from_archive(archive_transaction_id: int) -> list[int]: