diff --git a/zerver/lib/retention.py b/zerver/lib/retention.py index 8749c67f62..f18554811a 100644 --- a/zerver/lib/retention.py +++ b/zerver/lib/retention.py @@ -124,7 +124,7 @@ def run_archiving( query: SQL, type: int, realm: Realm | None = None, - chunk_size: int = MESSAGE_BATCH_SIZE, + chunk_size: int | None = MESSAGE_BATCH_SIZE, **kwargs: Composable, ) -> int: # This function is carefully designed to achieve our @@ -135,6 +135,9 @@ def run_archiving( # (such as UserMessage, Reaction, and Attachment) inside the same transaction.atomic() block. assert type in (ArchiveTransaction.MANUAL, ArchiveTransaction.RETENTION_POLICY_BASED) + if chunk_size is not None: + kwargs["chunk_size"] = Literal(chunk_size) + message_count = 0 while True: start_time = time.time() @@ -144,7 +147,6 @@ def run_archiving( Message, query, src_db_table=None, - chunk_size=Literal(chunk_size), returning_id=True, archive_transaction_id=Literal(archive_transaction.id), **kwargs, @@ -169,8 +171,9 @@ def run_archiving( ) # We run the loop, until the query returns fewer results than chunk_size, - # which means we are done: - if len(new_chunk) < chunk_size: + # which means we are done; or if we're not chunking, we're done + # after one iteration. + if chunk_size is None or len(new_chunk) < chunk_size: break return message_count @@ -458,35 +461,48 @@ def get_realms_and_streams_for_archiving() -> list[tuple[Realm, list[Stream]]]: def move_messages_to_archive( message_ids: list[int], realm: Realm | None = None, chunk_size: int = MESSAGE_BATCH_SIZE ) -> None: - # Uses index: zerver_message_pkey - query = SQL( - """ - INSERT INTO zerver_archivedmessage ({dst_fields}, archive_transaction_id) - SELECT {src_fields}, {archive_transaction_id} - FROM zerver_message - WHERE zerver_message.id IN {message_ids} - LIMIT {chunk_size} - ON CONFLICT (id) DO UPDATE SET archive_transaction_id = {archive_transaction_id} - RETURNING id """ - ) - count = run_archiving( - query, - type=ArchiveTransaction.MANUAL, - message_ids=Literal(tuple(message_ids)), - realm=realm, - chunk_size=chunk_size, - ) + Callers using this to archive a large amount of messages should ideally make sure the message_ids are + ordered, as that'll allow better performance here by keeping the batches that'll be sent to the database + ordered rather than randomly scattered. + """ + count = 0 + # In order to avoid sending a massive list of message ids to the database, + # we'll handle chunking the list of ids directly here. + message_ids_head = message_ids + while message_ids_head: + message_ids_chunk = message_ids_head[0:chunk_size] + message_ids_head = message_ids_head[chunk_size:] + + # Uses index: zerver_message_pkey + query = SQL( + """ + INSERT INTO zerver_archivedmessage ({dst_fields}, archive_transaction_id) + SELECT {src_fields}, {archive_transaction_id} + FROM zerver_message + WHERE zerver_message.id IN {message_ids} + ON CONFLICT (id) DO UPDATE SET archive_transaction_id = {archive_transaction_id} + RETURNING id + """ + ) + + count += run_archiving( + query, + type=ArchiveTransaction.MANUAL, + message_ids=Literal(tuple(message_ids_chunk)), + realm=realm, + chunk_size=None, + ) + # Clean up attachments: + archived_attachments = ArchivedAttachment.objects.filter( + messages__id__in=message_ids_chunk + ).distinct() + Attachment.objects.filter( + messages__isnull=True, scheduled_messages__isnull=True, id__in=archived_attachments + ).delete() if count == 0: raise Message.DoesNotExist - # Clean up attachments: - archived_attachments = ArchivedAttachment.objects.filter( - messages__id__in=message_ids - ).distinct() - Attachment.objects.filter( - messages__isnull=True, scheduled_messages__isnull=True, id__in=archived_attachments - ).delete() def restore_messages_from_archive(archive_transaction_id: int) -> list[int]: