From 7b2b4435edd9e440467bbf4e4e8a5bbf31accd39 Mon Sep 17 00:00:00 2001 From: Mateusz Mandera Date: Mon, 24 Jun 2019 21:16:31 +0200 Subject: [PATCH] retention: Combine run_message_batch_query and run_archiving_in_chunks. We combine run_message_batch_query and run_archiving_in_chunks functions, which makes the code simpler and more readable - we get rid of hacky generator usage, for example. In the process, move_expired_messages_* functions are adjusted, and now they archive Messages as well as their related objects. Appropriate adjustments in reaction to this are made in the main archiving functions which call move_expired_messages_* (they no longer need to call move_related_objects_to_archive). --- zerver/lib/retention.py | 116 +++++++++++++++++----------------------- 1 file changed, 50 insertions(+), 66 deletions(-) diff --git a/zerver/lib/retention.py b/zerver/lib/retention.py index 5ea02d5848..714741bff8 100644 --- a/zerver/lib/retention.py +++ b/zerver/lib/retention.py @@ -12,7 +12,7 @@ from zerver.models import (Message, UserMessage, ArchivedMessage, ArchivedUserMe SubMessage, ArchivedSubMessage, Recipient, Stream, ArchiveTransaction, get_stream_recipients, get_user_including_cross_realm) -from typing import Any, Dict, Iterator, List, Optional +from typing import Any, Dict, List, Optional import logging @@ -76,16 +76,38 @@ def ids_list_to_sql_query_format(ids: List[int]) -> str: return ids_string -def run_message_batch_query(query: str, chunk_size: int=MESSAGE_BATCH_SIZE, - **kwargs: Any) -> Iterator[List[int]]: - while True: - new_chunk = move_rows(Message, query, chunk_size=chunk_size, **kwargs) - if new_chunk: - yield new_chunk +def run_archiving_in_chunks(query: str, type: int, realm: Optional[Realm]=None, + chunk_size: int=MESSAGE_BATCH_SIZE, **kwargs: Any) -> int: + # This function is carefully designed to achieve our + # transactionality goals: A batch of messages is either fully + # archived-and-deleted or not transactionally. + # + # We implement this design by executing queries that archive messages and their related objects + # (such as UserMessage, Reaction, and Attachment) inside the same transaction.atomic() block. + assert type in (ArchiveTransaction.MANUAL, ArchiveTransaction.RETENTION_POLICY_BASED) - # We run the loop, until the query returns fewer results than chunk_size, which means we are done: - if len(new_chunk) < chunk_size: - break + message_count = 0 + while True: + with transaction.atomic(): + new_chunk = move_rows(Message, query, chunk_size=chunk_size, returning_id=True, **kwargs) + if new_chunk: + archive_transaction = ArchiveTransaction.objects.create(type=type, realm=realm) + ArchivedMessage.objects.filter(id__in=new_chunk).update( + archive_transaction=archive_transaction) + logger.info( + "Processing {} messages in {}".format(len(new_chunk), archive_transaction) + ) + + move_related_objects_to_archive(new_chunk) + delete_messages(new_chunk) + message_count += len(new_chunk) + + # We run the loop, until the query returns fewer results than chunk_size, + # which means we are done: + if len(new_chunk) < chunk_size: + break + + return message_count # Note about batching these Message archiving queries: # We can simply use LIMIT without worrying about OFFSETs and ordering @@ -93,10 +115,9 @@ def run_message_batch_query(query: str, chunk_size: int=MESSAGE_BATCH_SIZE, # will not show up in the "SELECT ... FROM zerver_message ..." query for the next batches. def move_expired_messages_to_archive_by_recipient(recipient: Recipient, - message_retention_days: int, - chunk_size: int=MESSAGE_BATCH_SIZE) -> Iterator[List[int]]: - # Important: This function is a generator, and you need to iterate - # through the Iterator it returns to execute the queries. + message_retention_days: int, realm: Realm, + chunk_size: int=MESSAGE_BATCH_SIZE) -> int: + # This function will archive appropriate messages and their related objects. query = """ INSERT INTO zerver_archivedmessage ({dst_fields}) SELECT {src_fields} @@ -110,15 +131,14 @@ def move_expired_messages_to_archive_by_recipient(recipient: Recipient, """ check_date = timezone_now() - timedelta(days=message_retention_days) - yield from run_message_batch_query(query, returning_id=True, - recipient_id=recipient.id, check_date=check_date.isoformat(), - chunk_size=chunk_size) + return run_archiving_in_chunks(query, type=ArchiveTransaction.RETENTION_POLICY_BASED, realm=realm, + recipient_id=recipient.id, check_date=check_date.isoformat(), + chunk_size=chunk_size) def move_expired_personal_and_huddle_messages_to_archive(realm: Realm, chunk_size: int=MESSAGE_BATCH_SIZE - ) -> Iterator[List[int]]: - # Important: This function is a generator, and you need to iterate - # through the Iterator it returns to execute the queries. + ) -> int: + # This function will archive appropriate messages and their related objects. cross_realm_bot_ids_list = [get_user_including_cross_realm(email).id for email in settings.CROSS_REALM_BOT_EMAILS] cross_realm_bot_ids = str(tuple(cross_realm_bot_ids_list)) @@ -145,9 +165,10 @@ def move_expired_personal_and_huddle_messages_to_archive(realm: Realm, assert realm.message_retention_days is not None check_date = timezone_now() - timedelta(days=realm.message_retention_days) - yield from run_message_batch_query(query, returning_id=True, cross_realm_bot_ids=cross_realm_bot_ids, - realm_id=realm.id, recipient_types=recipient_types, - check_date=check_date.isoformat(), chunk_size=chunk_size) + return run_archiving_in_chunks(query, type=ArchiveTransaction.RETENTION_POLICY_BASED, realm=realm, + cross_realm_bot_ids=cross_realm_bot_ids, + realm_id=realm.id, recipient_types=recipient_types, + check_date=check_date.isoformat(), chunk_size=chunk_size) def move_to_archive_and_delete_models_with_message_key(msg_ids: List[int]) -> None: assert len(msg_ids) > 0 @@ -228,50 +249,14 @@ def move_related_objects_to_archive(msg_ids: List[int]) -> None: move_attachments_to_archive(msg_ids) move_attachment_messages_to_archive(msg_ids) -def run_archiving_in_chunks(message_id_chunks: Iterator[List[int]], type: int, - realm: Optional[Realm]=None) -> int: - # This function is carefully designed to achieve our - # transactionality goals: A batch of messages is either fully - # archived-and-deleted or not transactionally. - # - # We implement this design by calling `next()` explicitly inside - # the `transaction.atomic()` block, ensuring that the queries that - # populate message_id_chunks run inside the same transaction block - # as the code that handles archiving related objects like - # UserMessage, Reaction, and Attachment. - message_count = 0 - while True: - with transaction.atomic(): - try: - chunk = next(message_id_chunks) - except StopIteration: - break - - archive_transaction = ArchiveTransaction.objects.create(type=type, realm=realm) - ArchivedMessage.objects.filter(id__in=chunk).update(archive_transaction=archive_transaction) - logger.info( - "Processing {} messages in {}".format(len(chunk), archive_transaction) - ) - - move_related_objects_to_archive(chunk) - delete_messages(chunk) - message_count += len(chunk) - - return message_count - def archive_messages_by_recipient(recipient: Recipient, message_retention_days: int, realm: Realm, chunk_size: int=MESSAGE_BATCH_SIZE) -> int: - message_id_chunks = move_expired_messages_to_archive_by_recipient(recipient, message_retention_days, - chunk_size) - return run_archiving_in_chunks(message_id_chunks, ArchiveTransaction.RETENTION_POLICY_BASED, realm) + return move_expired_messages_to_archive_by_recipient(recipient, message_retention_days, + realm, chunk_size) def archive_personal_and_huddle_messages(realm: Realm, chunk_size: int=MESSAGE_BATCH_SIZE) -> None: logger.info("Archiving personal and huddle messages for realm " + realm.string_id) - - message_id_chunks = move_expired_personal_and_huddle_messages_to_archive(realm, chunk_size) - message_count = run_archiving_in_chunks(message_id_chunks, - ArchiveTransaction.RETENTION_POLICY_BASED, realm) - + message_count = move_expired_personal_and_huddle_messages_to_archive(realm, chunk_size) logger.info("Done. Archived {} messages".format(message_count)) def archive_stream_messages(realm: Realm, chunk_size: int=MESSAGE_BATCH_SIZE) -> None: @@ -321,10 +306,9 @@ def move_messages_to_archive(message_ids: List[int], chunk_size: int=MESSAGE_BAT LIMIT {chunk_size} RETURNING id """ - message_id_chunks = run_message_batch_query(query, returning_id=True, - message_ids=ids_list_to_sql_query_format(message_ids), - chunk_size=chunk_size) - run_archiving_in_chunks(message_id_chunks, type=ArchiveTransaction.MANUAL) + run_archiving_in_chunks(query, type=ArchiveTransaction.MANUAL, + message_ids=ids_list_to_sql_query_format(message_ids), + chunk_size=chunk_size) # Clean up attachments: archived_attachments = ArchivedAttachment.objects.filter(messages__id__in=message_ids).distinct()