diff --git a/zerver/lib/retention.py b/zerver/lib/retention.py index 714741bff8..ea29377fd8 100644 --- a/zerver/lib/retention.py +++ b/zerver/lib/retention.py @@ -7,7 +7,7 @@ from django.db.models import Model, Q from django.utils.timezone import now as timezone_now from zerver.lib.logging_util import log_to_file -from zerver.models import (Message, UserMessage, ArchivedMessage, ArchivedUserMessage, Realm, +from zerver.models import (Message, UserMessage, ArchivedUserMessage, Realm, Attachment, ArchivedAttachment, Reaction, ArchivedReaction, SubMessage, ArchivedSubMessage, Recipient, Stream, ArchiveTransaction, get_stream_recipients, get_user_including_cross_realm) @@ -89,11 +89,10 @@ def run_archiving_in_chunks(query: str, type: int, realm: Optional[Realm]=None, message_count = 0 while True: with transaction.atomic(): - new_chunk = move_rows(Message, query, chunk_size=chunk_size, returning_id=True, **kwargs) + archive_transaction = ArchiveTransaction.objects.create(type=type, realm=realm) + new_chunk = move_rows(Message, query, chunk_size=chunk_size, returning_id=True, + archive_transaction_id=archive_transaction.id, **kwargs) if new_chunk: - archive_transaction = ArchiveTransaction.objects.create(type=type, realm=realm) - ArchivedMessage.objects.filter(id__in=new_chunk).update( - archive_transaction=archive_transaction) logger.info( "Processing {} messages in {}".format(len(new_chunk), archive_transaction) ) @@ -101,6 +100,8 @@ def run_archiving_in_chunks(query: str, type: int, realm: Optional[Realm]=None, move_related_objects_to_archive(new_chunk) delete_messages(new_chunk) message_count += len(new_chunk) + else: + archive_transaction.delete() # Nothing was archived # We run the loop, until the query returns fewer results than chunk_size, # which means we are done: @@ -119,14 +120,13 @@ def move_expired_messages_to_archive_by_recipient(recipient: Recipient, chunk_size: int=MESSAGE_BATCH_SIZE) -> int: # This function will archive appropriate messages and their related objects. query = """ - INSERT INTO zerver_archivedmessage ({dst_fields}) - SELECT {src_fields} + INSERT INTO zerver_archivedmessage ({dst_fields}, archive_transaction_id) + SELECT {src_fields}, {archive_transaction_id} FROM zerver_message - LEFT JOIN zerver_archivedmessage ON zerver_archivedmessage.id = zerver_message.id WHERE zerver_message.recipient_id = {recipient_id} AND zerver_message.pub_date < '{check_date}' - AND zerver_archivedmessage.id is NULL LIMIT {chunk_size} + ON CONFLICT (id) DO UPDATE SET archive_transaction_id = {archive_transaction_id} RETURNING id """ check_date = timezone_now() - timedelta(days=message_retention_days) @@ -148,18 +148,17 @@ def move_expired_personal_and_huddle_messages_to_archive(realm: Realm, # TODO: Remove the "zerver_userprofile.id NOT IN {cross_realm_bot_ids}" clause # once https://github.com/zulip/zulip/issues/11015 is solved. query = """ - INSERT INTO zerver_archivedmessage ({dst_fields}) - SELECT {src_fields} + INSERT INTO zerver_archivedmessage ({dst_fields}, archive_transaction_id) + SELECT {src_fields}, {archive_transaction_id} FROM zerver_message INNER JOIN zerver_recipient ON zerver_recipient.id = zerver_message.recipient_id INNER JOIN zerver_userprofile ON zerver_userprofile.id = zerver_message.sender_id - LEFT JOIN zerver_archivedmessage ON zerver_archivedmessage.id = zerver_message.id WHERE zerver_userprofile.id NOT IN {cross_realm_bot_ids} AND zerver_userprofile.realm_id = {realm_id} AND zerver_recipient.type in {recipient_types} AND zerver_message.pub_date < '{check_date}' - AND zerver_archivedmessage.id is NULL LIMIT {chunk_size} + ON CONFLICT (id) DO UPDATE SET archive_transaction_id = {archive_transaction_id} RETURNING id """ assert realm.message_retention_days is not None @@ -297,19 +296,20 @@ def archive_messages(chunk_size: int=MESSAGE_BATCH_SIZE) -> None: def move_messages_to_archive(message_ids: List[int], chunk_size: int=MESSAGE_BATCH_SIZE) -> None: query = """ - INSERT INTO zerver_archivedmessage ({dst_fields}) - SELECT {src_fields} + INSERT INTO zerver_archivedmessage ({dst_fields}, archive_transaction_id) + SELECT {src_fields}, {archive_transaction_id} FROM zerver_message - LEFT JOIN zerver_archivedmessage ON zerver_archivedmessage.id = zerver_message.id WHERE zerver_message.id IN {message_ids} - AND zerver_archivedmessage.id is NULL LIMIT {chunk_size} + ON CONFLICT (id) DO UPDATE SET archive_transaction_id = {archive_transaction_id} RETURNING id """ - run_archiving_in_chunks(query, type=ArchiveTransaction.MANUAL, - message_ids=ids_list_to_sql_query_format(message_ids), - chunk_size=chunk_size) + count = run_archiving_in_chunks(query, type=ArchiveTransaction.MANUAL, + message_ids=ids_list_to_sql_query_format(message_ids), + chunk_size=chunk_size) + if count == 0: + raise Message.DoesNotExist # Clean up attachments: archived_attachments = ArchivedAttachment.objects.filter(messages__id__in=message_ids).distinct() Attachment.objects.filter(messages__isnull=True, id__in=archived_attachments).delete() diff --git a/zerver/tests/test_retention.py b/zerver/tests/test_retention.py index 67c401a01e..3bdb4d9b2d 100644 --- a/zerver/tests/test_retention.py +++ b/zerver/tests/test_retention.py @@ -10,7 +10,7 @@ from zerver.lib.test_classes import ZulipTestCase from zerver.lib.upload import create_attachment from zerver.models import (Message, Realm, UserProfile, Stream, ArchivedUserMessage, SubMessage, ArchivedMessage, Attachment, ArchivedAttachment, UserMessage, - Reaction, ArchivedReaction, ArchivedSubMessage, + Reaction, ArchivedReaction, ArchivedSubMessage, ArchiveTransaction, get_realm, get_user_profile_by_email, get_stream, get_system_bot) from zerver.lib.retention import ( archive_messages, @@ -362,6 +362,36 @@ class TestArchiveMessagesGeneral(ArchiveMessagesTestingBase): sorted(msgs_ids.values()) ) + def test_restoring_and_rearchiving(self) -> None: + expired_msg_ids = self._make_mit_messages( + 7, + timezone_now() - timedelta(days=MIT_REALM_DAYS+1) + ) + expired_usermsg_ids = self._get_usermessage_ids(expired_msg_ids) + + archive_messages(chunk_size=4) + self._verify_archive_data(expired_msg_ids, expired_usermsg_ids) + + transactions = ArchiveTransaction.objects.all() + self.assertEqual(len(transactions), 2) # With chunk_size 4, there should be 2 transactions + + restore_all_data_from_archive() + transactions[0].refresh_from_db() + transactions[1].refresh_from_db() + self.assertTrue(transactions[0].restored) + self.assertTrue(transactions[1].restored) + + archive_messages(chunk_size=10) + self._verify_archive_data(expired_msg_ids, expired_usermsg_ids) + + transactions = ArchiveTransaction.objects.order_by("id") + self.assertEqual(len(transactions), 3) + + archived_messages = ArchivedMessage.objects.filter(id__in=expired_msg_ids) + # Check that the re-archived messages are correctly assigned to the new transaction: + for message in archived_messages: + self.assertEqual(message.archive_transaction_id, transactions[2].id) + class TestArchivingSubMessages(ArchiveMessagesTestingBase): def test_archiving_submessages(self) -> None: expired_msg_ids = self._make_expired_zulip_messages(2) @@ -497,11 +527,8 @@ class MoveMessageToArchiveGeneral(MoveMessageToArchiveBase): move_messages_to_archive(message_ids=msg_ids) self._verify_archive_data(msg_ids, usermsg_ids) - """ - TODO: Temporarily broken, uncomment in upcoming commits: with self.assertRaises(Message.DoesNotExist): move_messages_to_archive(message_ids=msg_ids) - """ def test_archiving_messages_with_attachment(self) -> None: self._create_attachments() @@ -590,8 +617,10 @@ class MoveMessageToArchiveGeneral(MoveMessageToArchiveBase): ) self._assert_archive_empty() + # Archive one of the messages: move_messages_to_archive(message_ids=[msg_id]) self._verify_archive_data([msg_id], usermsg_ids) + # Attachments shouldn't have been deleted, as the second message links to them: self.assertEqual(Attachment.objects.count(), 5) self.assertEqual( @@ -599,7 +628,15 @@ class MoveMessageToArchiveGeneral(MoveMessageToArchiveBase): set(attachment_ids) ) + # Restore the first message: + restore_all_data_from_archive() + # Archive the second: move_messages_to_archive(message_ids=[reply_msg_id]) + # The restored messages links to the Attachments, so they shouldn't be deleted: + self.assertEqual(Attachment.objects.count(), 5) + + # Archive the first message again: + move_messages_to_archive(message_ids=[msg_id]) # Now the attachment should have been deleted: self.assertEqual(Attachment.objects.count(), 0)