retention: Rewrite move_messages_to_archive to use existing functions.

Instead of having a bunch of custom code in the function, we make it use
run_message_batch_query and run_archiving_in_chunks to do the necessary
operations in a consistent way, using the same codepaths as the rest of
the archiving system.
This breaks test_archiving_messages_second_time temporarily, but we will
fix it and re-enable the test in the next commits, where we'll address
various other issues with re-archiving of messages.

We also remove the @transaction.atomic wrapper, because atomicity is
handled by the logic inside run_archiving_in_chunks.
This commit is contained in:
Mateusz Mandera 2019-06-24 16:57:54 +02:00 committed by Tim Abbott
parent 5b20317379
commit 9acd3b0f46
2 changed files with 27 additions and 30 deletions

View File

@ -12,7 +12,7 @@ from zerver.models import (Message, UserMessage, ArchivedMessage, ArchivedUserMe
SubMessage, ArchivedSubMessage, Recipient, Stream, ArchiveTransaction,
get_stream_recipients, get_user_including_cross_realm)
from typing import Any, Dict, Iterator, List
from typing import Any, Dict, Iterator, List, Optional
import logging
@ -228,7 +228,8 @@ def move_related_objects_to_archive(msg_ids: List[int]) -> None:
move_attachments_to_archive(msg_ids)
move_attachment_messages_to_archive(msg_ids)
def run_archiving_in_chunks(message_id_chunks: Iterator[List[int]], realm: Realm) -> int:
def run_archiving_in_chunks(message_id_chunks: Iterator[List[int]], type: int,
realm: Optional[Realm]=None) -> int:
# This function is carefully designed to achieve our
# transactionality goals: A batch of messages is either fully
# archived-and-deleted or not transactionally.
@ -246,15 +247,10 @@ def run_archiving_in_chunks(message_id_chunks: Iterator[List[int]], realm: Realm
except StopIteration:
break
archive_transaction = ArchiveTransaction.objects.create(
type=ArchiveTransaction.RETENTION_POLICY_BASED, realm=realm
)
archive_transaction = ArchiveTransaction.objects.create(type=type, realm=realm)
ArchivedMessage.objects.filter(id__in=chunk).update(archive_transaction=archive_transaction)
logger.info(
"Processing {} messages in transaction id {}, realm {}, timestamp {}".format(
len(chunk), archive_transaction.id, realm.string_id,
archive_transaction.archive_timestamp
)
"Processing {} messages in {}".format(len(chunk), archive_transaction)
)
move_related_objects_to_archive(chunk)
@ -267,13 +263,14 @@ def archive_messages_by_recipient(recipient: Recipient, message_retention_days:
realm: Realm, chunk_size: int=MESSAGE_BATCH_SIZE) -> int:
message_id_chunks = move_expired_messages_to_archive_by_recipient(recipient, message_retention_days,
chunk_size)
return run_archiving_in_chunks(message_id_chunks, realm)
return run_archiving_in_chunks(message_id_chunks, ArchiveTransaction.RETENTION_POLICY_BASED, realm)
def archive_personal_and_huddle_messages(realm: Realm, chunk_size: int=MESSAGE_BATCH_SIZE) -> None:
logger.info("Archiving personal and huddle messages for realm " + realm.string_id)
message_id_chunks = move_expired_personal_and_huddle_messages_to_archive(realm, chunk_size)
message_count = run_archiving_in_chunks(message_id_chunks, realm)
message_count = run_archiving_in_chunks(message_id_chunks,
ArchiveTransaction.RETENTION_POLICY_BASED, realm)
logger.info("Done. Archived {} messages".format(message_count))
@ -313,25 +310,22 @@ def archive_messages(chunk_size: int=MESSAGE_BATCH_SIZE) -> None:
# Messages have been archived for the realm, now we can clean up attachments:
delete_expired_attachments(realm)
@transaction.atomic
def move_messages_to_archive(message_ids: List[int]) -> None:
messages = list(Message.objects.filter(id__in=message_ids).values())
if not messages:
raise Message.DoesNotExist
archive_transaction = ArchiveTransaction.objects.create(type=ArchiveTransaction.MANUAL)
logger.info(
"Processing {} messages in transaction id {}, type MANUAL, timestamp {}".format(
len(messages), archive_transaction.id, archive_transaction.archive_timestamp
)
)
ArchivedMessage.objects.bulk_create([ArchivedMessage(archive_transaction=archive_transaction,
**message) for message in messages])
move_related_objects_to_archive(message_ids)
# Remove data from main tables
delete_messages(message_ids)
def move_messages_to_archive(message_ids: List[int], chunk_size: int=MESSAGE_BATCH_SIZE) -> None:
query = """
INSERT INTO zerver_archivedmessage ({dst_fields})
SELECT {src_fields}
FROM zerver_message
LEFT JOIN zerver_archivedmessage ON zerver_archivedmessage.id = zerver_message.id
WHERE zerver_message.id IN {message_ids}
AND zerver_archivedmessage.id is NULL
LIMIT {chunk_size}
RETURNING id
"""
message_id_chunks = run_message_batch_query(query, returning_id=True,
message_ids=ids_list_to_sql_query_format(message_ids),
chunk_size=chunk_size)
run_archiving_in_chunks(message_id_chunks, type=ArchiveTransaction.MANUAL)
# Clean up attachments:
archived_attachments = ArchivedAttachment.objects.filter(messages__id__in=message_ids).distinct()
Attachment.objects.filter(messages__isnull=True, id__in=archived_attachments).delete()

View File

@ -459,8 +459,11 @@ class MoveMessageToArchiveGeneral(MoveMessageToArchiveBase):
move_messages_to_archive(message_ids=msg_ids)
self._verify_archive_data(msg_ids, usermsg_ids)
"""
TODO: Temporarily broken, uncomment in upcoming commits:
with self.assertRaises(Message.DoesNotExist):
move_messages_to_archive(message_ids=msg_ids)
"""
def test_archiving_messages_with_attachment(self) -> None:
self._create_attachments()