retention: Combine run_message_batch_query and run_archiving_in_chunks.

We combine run_message_batch_query and run_archiving_in_chunks
functions, which makes the code simpler and more readable - we get rid
of hacky generator usage, for example.
In the process, move_expired_messages_* functions are adjusted, and now
they archive Messages as well as their related objects.
Appropriate adjustments in reaction to this are made in the main
archiving functions which call move_expired_messages_* (they no longer
need to call move_related_objects_to_archive).
This commit is contained in:
Mateusz Mandera 2019-06-24 21:16:31 +02:00 committed by Tim Abbott
parent 6e46c6d752
commit 7b2b4435ed
1 changed files with 50 additions and 66 deletions

View File

@ -12,7 +12,7 @@ from zerver.models import (Message, UserMessage, ArchivedMessage, ArchivedUserMe
SubMessage, ArchivedSubMessage, Recipient, Stream, ArchiveTransaction,
get_stream_recipients, get_user_including_cross_realm)
from typing import Any, Dict, Iterator, List, Optional
from typing import Any, Dict, List, Optional
import logging
@ -76,16 +76,38 @@ def ids_list_to_sql_query_format(ids: List[int]) -> str:
return ids_string
def run_message_batch_query(query: str, chunk_size: int=MESSAGE_BATCH_SIZE,
**kwargs: Any) -> Iterator[List[int]]:
while True:
new_chunk = move_rows(Message, query, chunk_size=chunk_size, **kwargs)
if new_chunk:
yield new_chunk
def run_archiving_in_chunks(query: str, type: int, realm: Optional[Realm]=None,
chunk_size: int=MESSAGE_BATCH_SIZE, **kwargs: Any) -> int:
# This function is carefully designed to achieve our
# transactionality goals: A batch of messages is either fully
# archived-and-deleted or not transactionally.
#
# We implement this design by executing queries that archive messages and their related objects
# (such as UserMessage, Reaction, and Attachment) inside the same transaction.atomic() block.
assert type in (ArchiveTransaction.MANUAL, ArchiveTransaction.RETENTION_POLICY_BASED)
# We run the loop, until the query returns fewer results than chunk_size, which means we are done:
if len(new_chunk) < chunk_size:
break
message_count = 0
while True:
with transaction.atomic():
new_chunk = move_rows(Message, query, chunk_size=chunk_size, returning_id=True, **kwargs)
if new_chunk:
archive_transaction = ArchiveTransaction.objects.create(type=type, realm=realm)
ArchivedMessage.objects.filter(id__in=new_chunk).update(
archive_transaction=archive_transaction)
logger.info(
"Processing {} messages in {}".format(len(new_chunk), archive_transaction)
)
move_related_objects_to_archive(new_chunk)
delete_messages(new_chunk)
message_count += len(new_chunk)
# We run the loop, until the query returns fewer results than chunk_size,
# which means we are done:
if len(new_chunk) < chunk_size:
break
return message_count
# Note about batching these Message archiving queries:
# We can simply use LIMIT without worrying about OFFSETs and ordering
@ -93,10 +115,9 @@ def run_message_batch_query(query: str, chunk_size: int=MESSAGE_BATCH_SIZE,
# will not show up in the "SELECT ... FROM zerver_message ..." query for the next batches.
def move_expired_messages_to_archive_by_recipient(recipient: Recipient,
message_retention_days: int,
chunk_size: int=MESSAGE_BATCH_SIZE) -> Iterator[List[int]]:
# Important: This function is a generator, and you need to iterate
# through the Iterator it returns to execute the queries.
message_retention_days: int, realm: Realm,
chunk_size: int=MESSAGE_BATCH_SIZE) -> int:
# This function will archive appropriate messages and their related objects.
query = """
INSERT INTO zerver_archivedmessage ({dst_fields})
SELECT {src_fields}
@ -110,15 +131,14 @@ def move_expired_messages_to_archive_by_recipient(recipient: Recipient,
"""
check_date = timezone_now() - timedelta(days=message_retention_days)
yield from run_message_batch_query(query, returning_id=True,
recipient_id=recipient.id, check_date=check_date.isoformat(),
chunk_size=chunk_size)
return run_archiving_in_chunks(query, type=ArchiveTransaction.RETENTION_POLICY_BASED, realm=realm,
recipient_id=recipient.id, check_date=check_date.isoformat(),
chunk_size=chunk_size)
def move_expired_personal_and_huddle_messages_to_archive(realm: Realm,
chunk_size: int=MESSAGE_BATCH_SIZE
) -> Iterator[List[int]]:
# Important: This function is a generator, and you need to iterate
# through the Iterator it returns to execute the queries.
) -> int:
# This function will archive appropriate messages and their related objects.
cross_realm_bot_ids_list = [get_user_including_cross_realm(email).id
for email in settings.CROSS_REALM_BOT_EMAILS]
cross_realm_bot_ids = str(tuple(cross_realm_bot_ids_list))
@ -145,9 +165,10 @@ def move_expired_personal_and_huddle_messages_to_archive(realm: Realm,
assert realm.message_retention_days is not None
check_date = timezone_now() - timedelta(days=realm.message_retention_days)
yield from run_message_batch_query(query, returning_id=True, cross_realm_bot_ids=cross_realm_bot_ids,
realm_id=realm.id, recipient_types=recipient_types,
check_date=check_date.isoformat(), chunk_size=chunk_size)
return run_archiving_in_chunks(query, type=ArchiveTransaction.RETENTION_POLICY_BASED, realm=realm,
cross_realm_bot_ids=cross_realm_bot_ids,
realm_id=realm.id, recipient_types=recipient_types,
check_date=check_date.isoformat(), chunk_size=chunk_size)
def move_to_archive_and_delete_models_with_message_key(msg_ids: List[int]) -> None:
assert len(msg_ids) > 0
@ -228,50 +249,14 @@ def move_related_objects_to_archive(msg_ids: List[int]) -> None:
move_attachments_to_archive(msg_ids)
move_attachment_messages_to_archive(msg_ids)
def run_archiving_in_chunks(message_id_chunks: Iterator[List[int]], type: int,
realm: Optional[Realm]=None) -> int:
# This function is carefully designed to achieve our
# transactionality goals: A batch of messages is either fully
# archived-and-deleted or not transactionally.
#
# We implement this design by calling `next()` explicitly inside
# the `transaction.atomic()` block, ensuring that the queries that
# populate message_id_chunks run inside the same transaction block
# as the code that handles archiving related objects like
# UserMessage, Reaction, and Attachment.
message_count = 0
while True:
with transaction.atomic():
try:
chunk = next(message_id_chunks)
except StopIteration:
break
archive_transaction = ArchiveTransaction.objects.create(type=type, realm=realm)
ArchivedMessage.objects.filter(id__in=chunk).update(archive_transaction=archive_transaction)
logger.info(
"Processing {} messages in {}".format(len(chunk), archive_transaction)
)
move_related_objects_to_archive(chunk)
delete_messages(chunk)
message_count += len(chunk)
return message_count
def archive_messages_by_recipient(recipient: Recipient, message_retention_days: int,
realm: Realm, chunk_size: int=MESSAGE_BATCH_SIZE) -> int:
message_id_chunks = move_expired_messages_to_archive_by_recipient(recipient, message_retention_days,
chunk_size)
return run_archiving_in_chunks(message_id_chunks, ArchiveTransaction.RETENTION_POLICY_BASED, realm)
return move_expired_messages_to_archive_by_recipient(recipient, message_retention_days,
realm, chunk_size)
def archive_personal_and_huddle_messages(realm: Realm, chunk_size: int=MESSAGE_BATCH_SIZE) -> None:
logger.info("Archiving personal and huddle messages for realm " + realm.string_id)
message_id_chunks = move_expired_personal_and_huddle_messages_to_archive(realm, chunk_size)
message_count = run_archiving_in_chunks(message_id_chunks,
ArchiveTransaction.RETENTION_POLICY_BASED, realm)
message_count = move_expired_personal_and_huddle_messages_to_archive(realm, chunk_size)
logger.info("Done. Archived {} messages".format(message_count))
def archive_stream_messages(realm: Realm, chunk_size: int=MESSAGE_BATCH_SIZE) -> None:
@ -321,10 +306,9 @@ def move_messages_to_archive(message_ids: List[int], chunk_size: int=MESSAGE_BAT
LIMIT {chunk_size}
RETURNING id
"""
message_id_chunks = run_message_batch_query(query, returning_id=True,
message_ids=ids_list_to_sql_query_format(message_ids),
chunk_size=chunk_size)
run_archiving_in_chunks(message_id_chunks, type=ArchiveTransaction.MANUAL)
run_archiving_in_chunks(query, type=ArchiveTransaction.MANUAL,
message_ids=ids_list_to_sql_query_format(message_ids),
chunk_size=chunk_size)
# Clean up attachments:
archived_attachments = ArchivedAttachment.objects.filter(messages__id__in=message_ids).distinct()