2020-06-21 02:34:33 +02:00
|
|
|
# Core implementation of message retention policies and low-level
|
|
|
|
# helpers for deleting messages.
|
|
|
|
#
|
|
|
|
# Because bugs in code that deletes message content can cause
|
|
|
|
# irreversible harm in installations without backups, this is a
|
|
|
|
# particularly sensitive system that requires careful design,
|
|
|
|
# thoughtful database transaction boundaries, and a well-written test
|
|
|
|
# suite to make bugs unlikely and mitigate their impact.
|
|
|
|
#
|
|
|
|
# The core design principle of this system is we never delete a live
|
|
|
|
# Message/Reaction/etc. object. Instead, we use move_rows, which moves
|
|
|
|
# objects to a "deleted objects" table like ArchiveMessage, recording
|
|
|
|
# the change using a structure linked to an ArchiveTransaction object
|
|
|
|
# that can be used to undo that deletion transaction in a clean
|
|
|
|
# fashion.
|
|
|
|
#
|
|
|
|
# We move all of the data associated with a given block of messages in
|
|
|
|
# a single database transaction in order to avoid broken intermediate
|
|
|
|
# states where, for example, a message's reactions were deleted but
|
|
|
|
# not the messages themselves.
|
|
|
|
#
|
|
|
|
# And then a separate process deletes ArchiveTransaction objects
|
|
|
|
# ARCHIVED_DATA_VACUUMING_DELAY_DAYS after they were created.
|
|
|
|
#
|
|
|
|
# Because of the nice properties of this deletion system, we use the
|
|
|
|
# same system for routine deletions via the Zulip UI (deleting a
|
|
|
|
# message or group of messages) as we use for message retention policy
|
|
|
|
# deletions.
|
2020-06-11 00:54:34 +02:00
|
|
|
import logging
|
2020-06-21 17:39:24 +02:00
|
|
|
import time
|
2016-10-25 19:51:31 +02:00
|
|
|
from datetime import timedelta
|
2022-06-23 20:07:19 +02:00
|
|
|
from typing import Any, Dict, Iterable, List, Mapping, Optional, Tuple, Type, Union
|
2017-05-14 21:14:26 +02:00
|
|
|
|
2019-06-05 20:22:08 +02:00
|
|
|
from django.conf import settings
|
2017-05-14 21:14:26 +02:00
|
|
|
from django.db import connection, transaction
|
2019-07-03 19:54:56 +02:00
|
|
|
from django.db.models import Model
|
2017-04-15 04:03:56 +02:00
|
|
|
from django.utils.timezone import now as timezone_now
|
2020-06-11 00:54:34 +02:00
|
|
|
from psycopg2.sql import SQL, Composable, Identifier, Literal
|
2019-06-10 19:20:09 +02:00
|
|
|
|
|
|
|
from zerver.lib.logging_util import log_to_file
|
2020-06-21 11:14:35 +02:00
|
|
|
from zerver.lib.request import RequestVariableConversionError
|
2020-06-11 00:54:34 +02:00
|
|
|
from zerver.models import (
|
|
|
|
ArchivedAttachment,
|
|
|
|
ArchivedReaction,
|
|
|
|
ArchivedSubMessage,
|
|
|
|
ArchivedUserMessage,
|
|
|
|
ArchiveTransaction,
|
|
|
|
Attachment,
|
|
|
|
Message,
|
|
|
|
Reaction,
|
|
|
|
Realm,
|
|
|
|
Recipient,
|
|
|
|
Stream,
|
|
|
|
SubMessage,
|
|
|
|
UserMessage,
|
|
|
|
)
|
2019-06-10 19:20:09 +02:00
|
|
|
|
2021-02-12 08:20:45 +01:00
|
|
|
logger = logging.getLogger("zulip.retention")
|
2019-06-10 19:20:09 +02:00
|
|
|
log_to_file(logger, settings.RETENTION_LOG_PATH)
|
|
|
|
|
2019-06-10 18:09:50 +02:00
|
|
|
MESSAGE_BATCH_SIZE = 1000
|
2020-06-24 16:47:17 +02:00
|
|
|
STREAM_MESSAGE_BATCH_SIZE = 100
|
2020-05-15 19:31:15 +02:00
|
|
|
TRANSACTION_DELETION_BATCH_SIZE = 100
|
2019-06-10 18:09:50 +02:00
|
|
|
|
2020-06-21 02:34:33 +02:00
|
|
|
# This data structure declares the details of all database tables that
|
|
|
|
# hang off the Message table (with a foreign key to Message being part
|
|
|
|
# of its primary lookup key). This structure allows us to share the
|
|
|
|
# code for managing these related tables.
|
python: Convert assignment type annotations to Python 3.6 style.
This commit was split by tabbott; this piece covers the vast majority
of files in Zulip, but excludes scripts/, tools/, and puppet/ to help
ensure we at least show the right error messages for Xenial systems.
We can likely further refine the remaining pieces with some testing.
Generated by com2ann, with whitespace fixes and various manual fixes
for runtime issues:
- invoiced_through: Optional[LicenseLedger] = models.ForeignKey(
+ invoiced_through: Optional["LicenseLedger"] = models.ForeignKey(
-_apns_client: Optional[APNsClient] = None
+_apns_client: Optional["APNsClient"] = None
- notifications_stream: Optional[Stream] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
- signup_notifications_stream: Optional[Stream] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
+ notifications_stream: Optional["Stream"] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
+ signup_notifications_stream: Optional["Stream"] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
- author: Optional[UserProfile] = models.ForeignKey('UserProfile', blank=True, null=True, on_delete=CASCADE)
+ author: Optional["UserProfile"] = models.ForeignKey('UserProfile', blank=True, null=True, on_delete=CASCADE)
- bot_owner: Optional[UserProfile] = models.ForeignKey('self', null=True, on_delete=models.SET_NULL)
+ bot_owner: Optional["UserProfile"] = models.ForeignKey('self', null=True, on_delete=models.SET_NULL)
- default_sending_stream: Optional[Stream] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
- default_events_register_stream: Optional[Stream] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
+ default_sending_stream: Optional["Stream"] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
+ default_events_register_stream: Optional["Stream"] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
-descriptors_by_handler_id: Dict[int, ClientDescriptor] = {}
+descriptors_by_handler_id: Dict[int, "ClientDescriptor"] = {}
-worker_classes: Dict[str, Type[QueueProcessingWorker]] = {}
-queues: Dict[str, Dict[str, Type[QueueProcessingWorker]]] = {}
+worker_classes: Dict[str, Type["QueueProcessingWorker"]] = {}
+queues: Dict[str, Dict[str, Type["QueueProcessingWorker"]]] = {}
-AUTH_LDAP_REVERSE_EMAIL_SEARCH: Optional[LDAPSearch] = None
+AUTH_LDAP_REVERSE_EMAIL_SEARCH: Optional["LDAPSearch"] = None
Signed-off-by: Anders Kaseorg <anders@zulipchat.com>
2020-04-22 01:09:50 +02:00
|
|
|
models_with_message_key: List[Dict[str, Any]] = [
|
2019-05-30 15:06:45 +02:00
|
|
|
{
|
2021-02-12 08:20:45 +01:00
|
|
|
"class": Reaction,
|
|
|
|
"archive_class": ArchivedReaction,
|
|
|
|
"table_name": "zerver_reaction",
|
|
|
|
"archive_table_name": "zerver_archivedreaction",
|
2019-05-30 15:06:45 +02:00
|
|
|
},
|
2019-05-30 15:24:18 +02:00
|
|
|
{
|
2021-02-12 08:20:45 +01:00
|
|
|
"class": SubMessage,
|
|
|
|
"archive_class": ArchivedSubMessage,
|
|
|
|
"table_name": "zerver_submessage",
|
|
|
|
"archive_table_name": "zerver_archivedsubmessage",
|
2019-05-30 15:24:18 +02:00
|
|
|
},
|
2019-06-05 20:47:04 +02:00
|
|
|
{
|
2021-02-12 08:20:45 +01:00
|
|
|
"class": UserMessage,
|
|
|
|
"archive_class": ArchivedUserMessage,
|
|
|
|
"table_name": "zerver_usermessage",
|
|
|
|
"archive_table_name": "zerver_archivedusermessage",
|
2019-06-05 20:47:04 +02:00
|
|
|
},
|
python: Convert assignment type annotations to Python 3.6 style.
This commit was split by tabbott; this piece covers the vast majority
of files in Zulip, but excludes scripts/, tools/, and puppet/ to help
ensure we at least show the right error messages for Xenial systems.
We can likely further refine the remaining pieces with some testing.
Generated by com2ann, with whitespace fixes and various manual fixes
for runtime issues:
- invoiced_through: Optional[LicenseLedger] = models.ForeignKey(
+ invoiced_through: Optional["LicenseLedger"] = models.ForeignKey(
-_apns_client: Optional[APNsClient] = None
+_apns_client: Optional["APNsClient"] = None
- notifications_stream: Optional[Stream] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
- signup_notifications_stream: Optional[Stream] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
+ notifications_stream: Optional["Stream"] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
+ signup_notifications_stream: Optional["Stream"] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
- author: Optional[UserProfile] = models.ForeignKey('UserProfile', blank=True, null=True, on_delete=CASCADE)
+ author: Optional["UserProfile"] = models.ForeignKey('UserProfile', blank=True, null=True, on_delete=CASCADE)
- bot_owner: Optional[UserProfile] = models.ForeignKey('self', null=True, on_delete=models.SET_NULL)
+ bot_owner: Optional["UserProfile"] = models.ForeignKey('self', null=True, on_delete=models.SET_NULL)
- default_sending_stream: Optional[Stream] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
- default_events_register_stream: Optional[Stream] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
+ default_sending_stream: Optional["Stream"] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
+ default_events_register_stream: Optional["Stream"] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
-descriptors_by_handler_id: Dict[int, ClientDescriptor] = {}
+descriptors_by_handler_id: Dict[int, "ClientDescriptor"] = {}
-worker_classes: Dict[str, Type[QueueProcessingWorker]] = {}
-queues: Dict[str, Dict[str, Type[QueueProcessingWorker]]] = {}
+worker_classes: Dict[str, Type["QueueProcessingWorker"]] = {}
+queues: Dict[str, Dict[str, Type["QueueProcessingWorker"]]] = {}
-AUTH_LDAP_REVERSE_EMAIL_SEARCH: Optional[LDAPSearch] = None
+AUTH_LDAP_REVERSE_EMAIL_SEARCH: Optional["LDAPSearch"] = None
Signed-off-by: Anders Kaseorg <anders@zulipchat.com>
2020-04-22 01:09:50 +02:00
|
|
|
]
|
2016-10-25 19:51:31 +02:00
|
|
|
|
2022-02-23 08:14:01 +01:00
|
|
|
EXCLUDE_FIELDS = {Message._meta.get_field("search_tsvector")}
|
|
|
|
|
2021-02-12 08:19:30 +01:00
|
|
|
|
2019-06-21 11:18:51 +02:00
|
|
|
@transaction.atomic(savepoint=False)
|
2020-05-04 01:51:26 +02:00
|
|
|
def move_rows(
|
2021-08-18 18:15:36 +02:00
|
|
|
base_model: Type[Model],
|
2021-08-10 00:58:58 +02:00
|
|
|
raw_query: SQL,
|
2020-05-04 01:51:26 +02:00
|
|
|
*,
|
2021-02-12 08:19:30 +01:00
|
|
|
src_db_table: Optional[str] = None,
|
|
|
|
returning_id: bool = False,
|
2020-05-04 01:51:26 +02:00
|
|
|
**kwargs: Composable,
|
|
|
|
) -> List[int]:
|
2020-06-21 02:34:33 +02:00
|
|
|
"""Core helper for bulk moving rows between a table and its archive table"""
|
2020-05-04 01:51:26 +02:00
|
|
|
if src_db_table is None:
|
2019-06-21 13:25:54 +02:00
|
|
|
# Use base_model's db_table unless otherwise specified.
|
|
|
|
src_db_table = base_model._meta.db_table
|
|
|
|
|
2022-02-23 08:14:01 +01:00
|
|
|
fields = [field for field in base_model._meta.fields if field not in EXCLUDE_FIELDS]
|
|
|
|
src_fields = [Identifier(src_db_table, field.column) for field in fields]
|
|
|
|
dst_fields = [Identifier(field.column) for field in fields]
|
2016-11-01 11:26:38 +01:00
|
|
|
with connection.cursor() as cursor:
|
|
|
|
cursor.execute(
|
2022-07-30 07:17:40 +02:00
|
|
|
raw_query.format(
|
|
|
|
src_fields=SQL(",").join(src_fields), dst_fields=SQL(",").join(dst_fields), **kwargs
|
|
|
|
)
|
2016-11-01 11:26:38 +01:00
|
|
|
)
|
2019-06-02 17:29:25 +02:00
|
|
|
if returning_id:
|
2020-05-04 01:51:26 +02:00
|
|
|
return [id for (id,) in cursor.fetchall()] # return list of row ids
|
2019-06-02 17:29:25 +02:00
|
|
|
else:
|
|
|
|
return []
|
2016-10-25 19:51:31 +02:00
|
|
|
|
2021-02-12 08:19:30 +01:00
|
|
|
|
2020-05-04 01:51:26 +02:00
|
|
|
def run_archiving_in_chunks(
|
2021-08-10 00:58:58 +02:00
|
|
|
query: SQL,
|
2020-05-04 01:51:26 +02:00
|
|
|
type: int,
|
2021-02-12 08:19:30 +01:00
|
|
|
realm: Optional[Realm] = None,
|
|
|
|
chunk_size: int = MESSAGE_BATCH_SIZE,
|
2020-05-04 01:51:26 +02:00
|
|
|
**kwargs: Composable,
|
|
|
|
) -> int:
|
2019-06-24 21:16:31 +02:00
|
|
|
# This function is carefully designed to achieve our
|
|
|
|
# transactionality goals: A batch of messages is either fully
|
|
|
|
# archived-and-deleted or not transactionally.
|
|
|
|
#
|
|
|
|
# We implement this design by executing queries that archive messages and their related objects
|
|
|
|
# (such as UserMessage, Reaction, and Attachment) inside the same transaction.atomic() block.
|
|
|
|
assert type in (ArchiveTransaction.MANUAL, ArchiveTransaction.RETENTION_POLICY_BASED)
|
|
|
|
|
|
|
|
message_count = 0
|
2019-06-10 18:09:50 +02:00
|
|
|
while True:
|
2020-06-21 17:39:24 +02:00
|
|
|
start_time = time.time()
|
2019-06-24 21:16:31 +02:00
|
|
|
with transaction.atomic():
|
2019-06-24 22:25:02 +02:00
|
|
|
archive_transaction = ArchiveTransaction.objects.create(type=type, realm=realm)
|
2020-05-04 01:51:26 +02:00
|
|
|
new_chunk = move_rows(
|
|
|
|
Message,
|
|
|
|
query,
|
2022-06-28 20:56:11 +02:00
|
|
|
src_db_table=None,
|
2020-05-04 01:51:26 +02:00
|
|
|
chunk_size=Literal(chunk_size),
|
2022-06-28 20:56:11 +02:00
|
|
|
returning_id=True,
|
2020-05-04 01:51:26 +02:00
|
|
|
archive_transaction_id=Literal(archive_transaction.id),
|
|
|
|
**kwargs,
|
|
|
|
)
|
2019-06-24 21:16:31 +02:00
|
|
|
if new_chunk:
|
|
|
|
move_related_objects_to_archive(new_chunk)
|
|
|
|
delete_messages(new_chunk)
|
|
|
|
message_count += len(new_chunk)
|
2019-06-24 22:25:02 +02:00
|
|
|
else:
|
|
|
|
archive_transaction.delete() # Nothing was archived
|
2020-06-21 17:39:24 +02:00
|
|
|
total_time = time.time() - start_time
|
2019-06-24 21:16:31 +02:00
|
|
|
|
2019-06-28 19:08:37 +02:00
|
|
|
# This line needs to be outside of the atomic block, to capture the actual moment
|
|
|
|
# archiving of the chunk is finished (since Django does some significant additional work
|
|
|
|
# when leaving the block).
|
2020-06-21 17:39:24 +02:00
|
|
|
if len(new_chunk) > 0:
|
2021-02-12 08:19:30 +01:00
|
|
|
logger.info(
|
|
|
|
"Archived %s messages in %.2fs in transaction %s.",
|
|
|
|
len(new_chunk),
|
|
|
|
total_time,
|
|
|
|
archive_transaction.id,
|
|
|
|
)
|
2019-06-28 19:08:37 +02:00
|
|
|
|
|
|
|
# We run the loop, until the query returns fewer results than chunk_size,
|
|
|
|
# which means we are done:
|
|
|
|
if len(new_chunk) < chunk_size:
|
|
|
|
break
|
2019-06-10 18:09:50 +02:00
|
|
|
|
2019-06-24 21:16:31 +02:00
|
|
|
return message_count
|
2019-06-10 18:09:50 +02:00
|
|
|
|
2021-02-12 08:19:30 +01:00
|
|
|
|
2019-06-10 18:09:50 +02:00
|
|
|
# Note about batching these Message archiving queries:
|
|
|
|
# We can simply use LIMIT without worrying about OFFSETs and ordering
|
|
|
|
# while executing batches, because any Message already archived (in the previous batch)
|
|
|
|
# will not show up in the "SELECT ... FROM zerver_message ..." query for the next batches.
|
|
|
|
|
2021-02-12 08:19:30 +01:00
|
|
|
|
|
|
|
def move_expired_messages_to_archive_by_recipient(
|
|
|
|
recipient: Recipient,
|
|
|
|
message_retention_days: int,
|
|
|
|
realm: Realm,
|
|
|
|
chunk_size: int = MESSAGE_BATCH_SIZE,
|
|
|
|
) -> int:
|
2020-06-24 13:02:07 +02:00
|
|
|
assert message_retention_days != -1
|
|
|
|
|
2021-02-12 08:19:30 +01:00
|
|
|
query = SQL(
|
|
|
|
"""
|
2019-06-24 22:25:02 +02:00
|
|
|
INSERT INTO zerver_archivedmessage ({dst_fields}, archive_transaction_id)
|
|
|
|
SELECT {src_fields}, {archive_transaction_id}
|
2019-06-10 18:09:50 +02:00
|
|
|
FROM zerver_message
|
|
|
|
WHERE zerver_message.recipient_id = {recipient_id}
|
2020-05-04 01:51:26 +02:00
|
|
|
AND zerver_message.date_sent < {check_date}
|
2019-06-10 18:09:50 +02:00
|
|
|
LIMIT {chunk_size}
|
2019-06-24 22:25:02 +02:00
|
|
|
ON CONFLICT (id) DO UPDATE SET archive_transaction_id = {archive_transaction_id}
|
2019-06-05 20:22:08 +02:00
|
|
|
RETURNING id
|
2021-02-12 08:19:30 +01:00
|
|
|
"""
|
|
|
|
)
|
2019-06-05 20:22:08 +02:00
|
|
|
check_date = timezone_now() - timedelta(days=message_retention_days)
|
|
|
|
|
2020-05-04 01:51:26 +02:00
|
|
|
return run_archiving_in_chunks(
|
|
|
|
query,
|
|
|
|
type=ArchiveTransaction.RETENTION_POLICY_BASED,
|
|
|
|
realm=realm,
|
|
|
|
recipient_id=Literal(recipient.id),
|
|
|
|
check_date=Literal(check_date.isoformat()),
|
|
|
|
chunk_size=chunk_size,
|
|
|
|
)
|
2019-06-05 20:22:08 +02:00
|
|
|
|
2021-02-12 08:19:30 +01:00
|
|
|
|
|
|
|
def move_expired_personal_and_huddle_messages_to_archive(
|
|
|
|
realm: Realm,
|
|
|
|
chunk_size: int = MESSAGE_BATCH_SIZE,
|
|
|
|
) -> int:
|
2020-06-24 13:02:07 +02:00
|
|
|
message_retention_days = realm.message_retention_days
|
|
|
|
assert message_retention_days != -1
|
|
|
|
check_date = timezone_now() - timedelta(days=message_retention_days)
|
|
|
|
|
2019-06-05 20:22:08 +02:00
|
|
|
recipient_types = (Recipient.PERSONAL, Recipient.HUDDLE)
|
|
|
|
|
2022-10-28 22:06:01 +02:00
|
|
|
# Archive expired personal and huddle Messages in the realm, including cross-realm messages.
|
2021-02-12 08:19:30 +01:00
|
|
|
query = SQL(
|
|
|
|
"""
|
2019-06-24 22:25:02 +02:00
|
|
|
INSERT INTO zerver_archivedmessage ({dst_fields}, archive_transaction_id)
|
|
|
|
SELECT {src_fields}, {archive_transaction_id}
|
2019-06-10 18:09:50 +02:00
|
|
|
FROM zerver_message
|
|
|
|
INNER JOIN zerver_recipient ON zerver_recipient.id = zerver_message.recipient_id
|
2022-10-28 22:06:01 +02:00
|
|
|
WHERE zerver_message.realm_id = {realm_id}
|
2019-06-10 18:09:50 +02:00
|
|
|
AND zerver_recipient.type in {recipient_types}
|
2020-05-04 01:51:26 +02:00
|
|
|
AND zerver_message.date_sent < {check_date}
|
2019-06-10 18:09:50 +02:00
|
|
|
LIMIT {chunk_size}
|
2019-06-24 22:25:02 +02:00
|
|
|
ON CONFLICT (id) DO UPDATE SET archive_transaction_id = {archive_transaction_id}
|
2019-06-02 17:29:25 +02:00
|
|
|
RETURNING id
|
2021-02-12 08:19:30 +01:00
|
|
|
"""
|
|
|
|
)
|
2016-11-01 11:26:38 +01:00
|
|
|
|
2020-05-04 01:51:26 +02:00
|
|
|
message_count = run_archiving_in_chunks(
|
|
|
|
query,
|
|
|
|
type=ArchiveTransaction.RETENTION_POLICY_BASED,
|
|
|
|
realm=realm,
|
|
|
|
realm_id=Literal(realm.id),
|
|
|
|
recipient_types=Literal(recipient_types),
|
|
|
|
check_date=Literal(check_date.isoformat()),
|
|
|
|
chunk_size=chunk_size,
|
|
|
|
)
|
2019-06-28 01:50:40 +02:00
|
|
|
|
|
|
|
return message_count
|
2019-06-02 17:29:25 +02:00
|
|
|
|
2021-02-12 08:19:30 +01:00
|
|
|
|
2019-07-04 01:58:45 +02:00
|
|
|
def move_models_with_message_key_to_archive(msg_ids: List[int]) -> None:
|
2019-06-10 18:09:50 +02:00
|
|
|
assert len(msg_ids) > 0
|
2019-06-02 17:29:25 +02:00
|
|
|
|
2019-05-30 15:06:45 +02:00
|
|
|
for model in models_with_message_key:
|
2021-02-12 08:19:30 +01:00
|
|
|
query = SQL(
|
|
|
|
"""
|
2019-07-04 01:58:45 +02:00
|
|
|
INSERT INTO {archive_table_name} ({dst_fields})
|
|
|
|
SELECT {src_fields}
|
|
|
|
FROM {table_name}
|
|
|
|
WHERE {table_name}.message_id IN {message_ids}
|
|
|
|
ON CONFLICT (id) DO NOTHING
|
2021-02-12 08:19:30 +01:00
|
|
|
"""
|
|
|
|
)
|
2020-05-04 01:51:26 +02:00
|
|
|
move_rows(
|
2021-02-12 08:20:45 +01:00
|
|
|
model["class"],
|
2020-05-04 01:51:26 +02:00
|
|
|
query,
|
2021-02-12 08:20:45 +01:00
|
|
|
table_name=Identifier(model["table_name"]),
|
|
|
|
archive_table_name=Identifier(model["archive_table_name"]),
|
2020-05-04 01:51:26 +02:00
|
|
|
message_ids=Literal(tuple(msg_ids)),
|
|
|
|
)
|
2019-06-02 17:29:25 +02:00
|
|
|
|
2021-02-12 08:19:30 +01:00
|
|
|
|
2020-06-21 02:34:33 +02:00
|
|
|
# Attachments can't use the common models_with_message_key system,
|
|
|
|
# because they can be referenced by more than one Message, and we only
|
|
|
|
# want to delete the Attachment if we're deleting the last message
|
|
|
|
# referencing them.
|
2019-06-05 20:22:08 +02:00
|
|
|
def move_attachments_to_archive(msg_ids: List[int]) -> None:
|
2019-06-10 18:09:50 +02:00
|
|
|
assert len(msg_ids) > 0
|
2019-05-30 15:06:45 +02:00
|
|
|
|
2021-02-12 08:19:30 +01:00
|
|
|
query = SQL(
|
|
|
|
"""
|
2019-06-27 17:02:20 +02:00
|
|
|
INSERT INTO zerver_archivedattachment ({dst_fields})
|
|
|
|
SELECT {src_fields}
|
|
|
|
FROM zerver_attachment
|
|
|
|
INNER JOIN zerver_attachment_messages
|
|
|
|
ON zerver_attachment_messages.attachment_id = zerver_attachment.id
|
|
|
|
WHERE zerver_attachment_messages.message_id IN {message_ids}
|
|
|
|
GROUP BY zerver_attachment.id
|
|
|
|
ON CONFLICT (id) DO NOTHING
|
2021-02-12 08:19:30 +01:00
|
|
|
"""
|
|
|
|
)
|
2020-05-04 01:51:26 +02:00
|
|
|
move_rows(Attachment, query, message_ids=Literal(tuple(msg_ids)))
|
2016-11-01 11:26:38 +01:00
|
|
|
|
|
|
|
|
2019-06-12 17:05:58 +02:00
|
|
|
def move_attachment_messages_to_archive(msg_ids: List[int]) -> None:
|
2019-06-10 18:09:50 +02:00
|
|
|
assert len(msg_ids) > 0
|
2019-06-02 17:29:25 +02:00
|
|
|
|
2021-02-12 08:19:30 +01:00
|
|
|
query = SQL(
|
|
|
|
"""
|
2019-07-04 01:58:45 +02:00
|
|
|
INSERT INTO zerver_archivedattachment_messages (id, archivedattachment_id, archivedmessage_id)
|
|
|
|
SELECT zerver_attachment_messages.id, zerver_attachment_messages.attachment_id,
|
|
|
|
zerver_attachment_messages.message_id
|
|
|
|
FROM zerver_attachment_messages
|
2020-05-04 01:51:26 +02:00
|
|
|
WHERE zerver_attachment_messages.message_id IN %(message_ids)s
|
2019-07-04 01:58:45 +02:00
|
|
|
ON CONFLICT (id) DO NOTHING
|
2021-02-12 08:19:30 +01:00
|
|
|
"""
|
|
|
|
)
|
2016-11-01 11:26:38 +01:00
|
|
|
with connection.cursor() as cursor:
|
2020-05-04 01:51:26 +02:00
|
|
|
cursor.execute(query, dict(message_ids=tuple(msg_ids)))
|
2016-11-01 11:26:38 +01:00
|
|
|
|
2021-02-12 08:19:30 +01:00
|
|
|
|
2019-06-05 20:22:08 +02:00
|
|
|
def delete_messages(msg_ids: List[int]) -> None:
|
2019-06-06 20:41:07 +02:00
|
|
|
# Important note: This also deletes related objects with a foreign
|
|
|
|
# key to Message (due to `on_delete=CASCADE` in our models
|
|
|
|
# configuration), so we need to be sure we've taken care of
|
|
|
|
# archiving the messages before doing this step.
|
2019-06-05 20:47:04 +02:00
|
|
|
Message.objects.filter(id__in=msg_ids).delete()
|
2016-11-01 11:26:38 +01:00
|
|
|
|
2021-02-12 08:19:30 +01:00
|
|
|
|
2019-06-12 15:39:39 +02:00
|
|
|
def delete_expired_attachments(realm: Realm) -> None:
|
2023-03-17 08:38:23 +01:00
|
|
|
(num_deleted, ignored) = Attachment.objects.filter(
|
2019-06-12 15:39:39 +02:00
|
|
|
messages__isnull=True,
|
2023-05-07 20:04:37 +02:00
|
|
|
scheduled_messages__isnull=True,
|
2019-06-12 15:39:39 +02:00
|
|
|
realm_id=realm.id,
|
|
|
|
id__in=ArchivedAttachment.objects.filter(realm_id=realm.id),
|
2019-06-05 20:22:08 +02:00
|
|
|
).delete()
|
|
|
|
|
2023-03-17 08:38:23 +01:00
|
|
|
if num_deleted > 0:
|
|
|
|
logger.info("Cleaned up %s attachments for realm %s", num_deleted, realm.string_id)
|
2020-05-03 13:51:18 +02:00
|
|
|
|
2021-02-12 08:19:30 +01:00
|
|
|
|
2019-06-05 20:47:04 +02:00
|
|
|
def move_related_objects_to_archive(msg_ids: List[int]) -> None:
|
2019-07-04 01:58:45 +02:00
|
|
|
move_models_with_message_key_to_archive(msg_ids)
|
2019-06-05 20:22:08 +02:00
|
|
|
move_attachments_to_archive(msg_ids)
|
2019-06-12 17:05:58 +02:00
|
|
|
move_attachment_messages_to_archive(msg_ids)
|
2019-06-05 20:22:08 +02:00
|
|
|
|
|
|
|
|
2021-02-12 08:19:30 +01:00
|
|
|
def archive_messages_by_recipient(
|
|
|
|
recipient: Recipient,
|
|
|
|
message_retention_days: int,
|
|
|
|
realm: Realm,
|
|
|
|
chunk_size: int = MESSAGE_BATCH_SIZE,
|
|
|
|
) -> int:
|
|
|
|
return move_expired_messages_to_archive_by_recipient(
|
|
|
|
recipient, message_retention_days, realm, chunk_size
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
def archive_personal_and_huddle_messages(
|
|
|
|
realm: Realm, chunk_size: int = MESSAGE_BATCH_SIZE
|
|
|
|
) -> None:
|
2020-05-03 19:57:23 +02:00
|
|
|
logger.info("Archiving personal and huddle messages for realm %s", realm.string_id)
|
2019-06-24 21:16:31 +02:00
|
|
|
message_count = move_expired_personal_and_huddle_messages_to_archive(realm, chunk_size)
|
2020-05-02 08:44:14 +02:00
|
|
|
logger.info("Done. Archived %s messages", message_count)
|
2019-06-05 20:22:08 +02:00
|
|
|
|
2021-02-12 08:19:30 +01:00
|
|
|
|
|
|
|
def archive_stream_messages(
|
|
|
|
realm: Realm, streams: List[Stream], chunk_size: int = STREAM_MESSAGE_BATCH_SIZE
|
|
|
|
) -> None:
|
2020-05-03 13:51:18 +02:00
|
|
|
if not streams:
|
2021-04-27 16:56:45 +02:00
|
|
|
return # nocoverage # TODO
|
2020-05-03 13:51:18 +02:00
|
|
|
|
2020-05-03 19:57:23 +02:00
|
|
|
logger.info("Archiving stream messages for realm %s", realm.string_id)
|
python: Convert assignment type annotations to Python 3.6 style.
This commit was split by tabbott; this piece covers the vast majority
of files in Zulip, but excludes scripts/, tools/, and puppet/ to help
ensure we at least show the right error messages for Xenial systems.
We can likely further refine the remaining pieces with some testing.
Generated by com2ann, with whitespace fixes and various manual fixes
for runtime issues:
- invoiced_through: Optional[LicenseLedger] = models.ForeignKey(
+ invoiced_through: Optional["LicenseLedger"] = models.ForeignKey(
-_apns_client: Optional[APNsClient] = None
+_apns_client: Optional["APNsClient"] = None
- notifications_stream: Optional[Stream] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
- signup_notifications_stream: Optional[Stream] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
+ notifications_stream: Optional["Stream"] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
+ signup_notifications_stream: Optional["Stream"] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
- author: Optional[UserProfile] = models.ForeignKey('UserProfile', blank=True, null=True, on_delete=CASCADE)
+ author: Optional["UserProfile"] = models.ForeignKey('UserProfile', blank=True, null=True, on_delete=CASCADE)
- bot_owner: Optional[UserProfile] = models.ForeignKey('self', null=True, on_delete=models.SET_NULL)
+ bot_owner: Optional["UserProfile"] = models.ForeignKey('self', null=True, on_delete=models.SET_NULL)
- default_sending_stream: Optional[Stream] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
- default_events_register_stream: Optional[Stream] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
+ default_sending_stream: Optional["Stream"] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
+ default_events_register_stream: Optional["Stream"] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
-descriptors_by_handler_id: Dict[int, ClientDescriptor] = {}
+descriptors_by_handler_id: Dict[int, "ClientDescriptor"] = {}
-worker_classes: Dict[str, Type[QueueProcessingWorker]] = {}
-queues: Dict[str, Dict[str, Type[QueueProcessingWorker]]] = {}
+worker_classes: Dict[str, Type["QueueProcessingWorker"]] = {}
+queues: Dict[str, Dict[str, Type["QueueProcessingWorker"]]] = {}
-AUTH_LDAP_REVERSE_EMAIL_SEARCH: Optional[LDAPSearch] = None
+AUTH_LDAP_REVERSE_EMAIL_SEARCH: Optional["LDAPSearch"] = None
Signed-off-by: Anders Kaseorg <anders@zulipchat.com>
2020-04-22 01:09:50 +02:00
|
|
|
retention_policy_dict: Dict[int, int] = {}
|
2019-06-12 15:39:39 +02:00
|
|
|
for stream in streams:
|
|
|
|
# if stream.message_retention_days is null, use the realm's policy
|
|
|
|
if stream.message_retention_days:
|
|
|
|
retention_policy_dict[stream.id] = stream.message_retention_days
|
|
|
|
else:
|
2020-06-24 13:02:07 +02:00
|
|
|
assert realm.message_retention_days != -1
|
2019-07-03 19:54:56 +02:00
|
|
|
retention_policy_dict[stream.id] = realm.message_retention_days
|
2019-06-12 15:39:39 +02:00
|
|
|
|
2019-12-06 00:27:21 +01:00
|
|
|
recipients = [stream.recipient for stream in streams]
|
2019-06-10 19:20:09 +02:00
|
|
|
message_count = 0
|
2019-06-12 15:39:39 +02:00
|
|
|
for recipient in recipients:
|
2022-05-31 01:27:38 +02:00
|
|
|
assert recipient is not None
|
2019-06-10 19:20:09 +02:00
|
|
|
message_count += archive_messages_by_recipient(
|
2021-02-12 08:19:30 +01:00
|
|
|
recipient,
|
|
|
|
retention_policy_dict[recipient.type_id],
|
|
|
|
realm,
|
|
|
|
chunk_size,
|
2019-06-10 19:20:09 +02:00
|
|
|
)
|
|
|
|
|
2020-05-02 08:44:14 +02:00
|
|
|
logger.info("Done. Archived %s messages.", message_count)
|
2017-05-14 21:14:26 +02:00
|
|
|
|
2021-02-12 08:19:30 +01:00
|
|
|
|
|
|
|
def archive_messages(chunk_size: int = MESSAGE_BATCH_SIZE) -> None:
|
2020-05-02 08:44:14 +02:00
|
|
|
logger.info("Starting the archiving process with chunk_size %s", chunk_size)
|
2019-06-10 19:20:09 +02:00
|
|
|
|
2020-05-07 14:24:45 +02:00
|
|
|
for realm, streams in get_realms_and_streams_for_archiving():
|
2020-06-24 16:47:17 +02:00
|
|
|
archive_stream_messages(realm, streams, chunk_size=STREAM_MESSAGE_BATCH_SIZE)
|
2020-06-24 13:02:07 +02:00
|
|
|
if realm.message_retention_days != -1:
|
2019-06-12 15:39:39 +02:00
|
|
|
archive_personal_and_huddle_messages(realm, chunk_size)
|
2019-06-05 20:22:08 +02:00
|
|
|
|
2019-06-12 15:39:39 +02:00
|
|
|
# Messages have been archived for the realm, now we can clean up attachments:
|
|
|
|
delete_expired_attachments(realm)
|
2017-05-14 21:14:26 +02:00
|
|
|
|
2021-02-12 08:19:30 +01:00
|
|
|
|
2020-05-07 14:24:45 +02:00
|
|
|
def get_realms_and_streams_for_archiving() -> List[Tuple[Realm, List[Stream]]]:
|
|
|
|
"""
|
|
|
|
This function constructs a list of (realm, streams_of_the_realm) tuples
|
|
|
|
where each realm is a Realm that requires calling the archiving functions on it,
|
|
|
|
and streams_of_the_realm is a list of streams of the realm to call archive_stream_messages with.
|
|
|
|
|
|
|
|
The purpose of this is performance - for servers with thousands of realms, it is important
|
|
|
|
to fetch all this data in bulk.
|
|
|
|
"""
|
|
|
|
|
|
|
|
realm_id_to_realm = {}
|
|
|
|
realm_id_to_streams_list: Dict[int, List[Stream]] = {}
|
|
|
|
|
|
|
|
# All realms with a retention policy set qualify for archiving:
|
2020-06-24 13:02:07 +02:00
|
|
|
for realm in Realm.objects.exclude(message_retention_days=-1):
|
2020-05-07 14:24:45 +02:00
|
|
|
realm_id_to_realm[realm.id] = realm
|
|
|
|
realm_id_to_streams_list[realm.id] = []
|
|
|
|
|
|
|
|
# Now we find all streams that require archiving.
|
|
|
|
# First category are streams in retention-enabled realms,
|
|
|
|
# that don't have retention explicitly disabled (through the value -1).
|
2021-02-12 08:19:30 +01:00
|
|
|
query_one = (
|
|
|
|
Stream.objects.exclude(message_retention_days=-1)
|
|
|
|
.exclude(realm__message_retention_days=-1)
|
2021-02-12 08:20:45 +01:00
|
|
|
.select_related("realm", "recipient")
|
2021-02-12 08:19:30 +01:00
|
|
|
)
|
2020-05-07 14:24:45 +02:00
|
|
|
# Second category are streams that are in realms without a realm-wide retention policy,
|
|
|
|
# but have their own stream-specific policy enabled.
|
2021-02-12 08:19:30 +01:00
|
|
|
query_two = (
|
|
|
|
Stream.objects.filter(realm__message_retention_days=-1)
|
|
|
|
.exclude(message_retention_days__isnull=True)
|
|
|
|
.exclude(message_retention_days=-1)
|
2021-02-12 08:20:45 +01:00
|
|
|
.select_related("realm", "recipient")
|
2021-02-12 08:19:30 +01:00
|
|
|
)
|
2020-05-07 14:24:45 +02:00
|
|
|
query = query_one.union(query_two)
|
|
|
|
|
|
|
|
for stream in query:
|
|
|
|
realm = stream.realm
|
|
|
|
realm_id_to_realm[realm.id] = realm
|
|
|
|
if realm.id not in realm_id_to_streams_list:
|
|
|
|
realm_id_to_streams_list[realm.id] = []
|
|
|
|
|
|
|
|
realm_id_to_streams_list[realm.id].append(stream)
|
|
|
|
|
2021-02-12 08:19:30 +01:00
|
|
|
return [
|
|
|
|
(realm_id_to_realm[realm_id], realm_id_to_streams_list[realm_id])
|
|
|
|
for realm_id in realm_id_to_realm
|
|
|
|
]
|
|
|
|
|
2020-05-07 14:24:45 +02:00
|
|
|
|
2021-02-12 08:19:30 +01:00
|
|
|
def move_messages_to_archive(
|
|
|
|
message_ids: List[int], realm: Optional[Realm] = None, chunk_size: int = MESSAGE_BATCH_SIZE
|
|
|
|
) -> None:
|
|
|
|
query = SQL(
|
|
|
|
"""
|
2019-06-24 22:25:02 +02:00
|
|
|
INSERT INTO zerver_archivedmessage ({dst_fields}, archive_transaction_id)
|
|
|
|
SELECT {src_fields}, {archive_transaction_id}
|
2019-06-24 16:57:54 +02:00
|
|
|
FROM zerver_message
|
|
|
|
WHERE zerver_message.id IN {message_ids}
|
|
|
|
LIMIT {chunk_size}
|
2019-06-24 22:25:02 +02:00
|
|
|
ON CONFLICT (id) DO UPDATE SET archive_transaction_id = {archive_transaction_id}
|
2019-06-24 16:57:54 +02:00
|
|
|
RETURNING id
|
2021-02-12 08:19:30 +01:00
|
|
|
"""
|
|
|
|
)
|
2020-05-04 01:51:26 +02:00
|
|
|
count = run_archiving_in_chunks(
|
|
|
|
query,
|
|
|
|
type=ArchiveTransaction.MANUAL,
|
|
|
|
message_ids=Literal(tuple(message_ids)),
|
2020-05-16 20:30:11 +02:00
|
|
|
realm=realm,
|
2020-05-04 01:51:26 +02:00
|
|
|
chunk_size=chunk_size,
|
|
|
|
)
|
2019-05-28 11:38:53 +02:00
|
|
|
|
2019-06-24 22:25:02 +02:00
|
|
|
if count == 0:
|
|
|
|
raise Message.DoesNotExist
|
2019-06-24 16:57:54 +02:00
|
|
|
# Clean up attachments:
|
2021-02-12 08:19:30 +01:00
|
|
|
archived_attachments = ArchivedAttachment.objects.filter(
|
|
|
|
messages__id__in=message_ids
|
|
|
|
).distinct()
|
2023-07-28 16:46:24 +02:00
|
|
|
Attachment.objects.filter(
|
|
|
|
messages__isnull=True, scheduled_messages__isnull=True, id__in=archived_attachments
|
|
|
|
).delete()
|
2019-06-24 17:19:22 +02:00
|
|
|
|
2021-02-12 08:19:30 +01:00
|
|
|
|
2019-06-24 17:19:22 +02:00
|
|
|
def restore_messages_from_archive(archive_transaction_id: int) -> List[int]:
|
2021-02-12 08:19:30 +01:00
|
|
|
query = SQL(
|
|
|
|
"""
|
2019-06-24 17:19:22 +02:00
|
|
|
INSERT INTO zerver_message ({dst_fields})
|
|
|
|
SELECT {src_fields}
|
|
|
|
FROM zerver_archivedmessage
|
|
|
|
WHERE zerver_archivedmessage.archive_transaction_id = {archive_transaction_id}
|
2019-06-27 17:02:20 +02:00
|
|
|
ON CONFLICT (id) DO NOTHING
|
2019-06-24 17:19:22 +02:00
|
|
|
RETURNING id
|
2021-02-12 08:19:30 +01:00
|
|
|
"""
|
|
|
|
)
|
2020-05-04 01:51:26 +02:00
|
|
|
return move_rows(
|
|
|
|
Message,
|
|
|
|
query,
|
2021-02-12 08:20:45 +01:00
|
|
|
src_db_table="zerver_archivedmessage",
|
2022-06-28 20:56:11 +02:00
|
|
|
returning_id=True,
|
2020-05-04 01:51:26 +02:00
|
|
|
archive_transaction_id=Literal(archive_transaction_id),
|
|
|
|
)
|
2019-06-24 17:19:22 +02:00
|
|
|
|
2021-02-12 08:19:30 +01:00
|
|
|
|
2019-06-24 17:19:22 +02:00
|
|
|
def restore_models_with_message_key_from_archive(archive_transaction_id: int) -> None:
|
|
|
|
for model in models_with_message_key:
|
2021-02-12 08:19:30 +01:00
|
|
|
query = SQL(
|
|
|
|
"""
|
2019-06-24 17:19:22 +02:00
|
|
|
INSERT INTO {table_name} ({dst_fields})
|
2019-06-27 17:02:20 +02:00
|
|
|
SELECT {src_fields}
|
|
|
|
FROM {archive_table_name}
|
|
|
|
INNER JOIN zerver_archivedmessage ON {archive_table_name}.message_id = zerver_archivedmessage.id
|
|
|
|
WHERE zerver_archivedmessage.archive_transaction_id = {archive_transaction_id}
|
|
|
|
ON CONFLICT (id) DO NOTHING
|
2021-02-12 08:19:30 +01:00
|
|
|
"""
|
|
|
|
)
|
2020-05-04 01:51:26 +02:00
|
|
|
|
|
|
|
move_rows(
|
2021-02-12 08:20:45 +01:00
|
|
|
model["class"],
|
2020-05-04 01:51:26 +02:00
|
|
|
query,
|
2021-02-12 08:20:45 +01:00
|
|
|
src_db_table=model["archive_table_name"],
|
|
|
|
table_name=Identifier(model["table_name"]),
|
2020-05-04 01:51:26 +02:00
|
|
|
archive_transaction_id=Literal(archive_transaction_id),
|
2021-02-12 08:20:45 +01:00
|
|
|
archive_table_name=Identifier(model["archive_table_name"]),
|
2020-05-04 01:51:26 +02:00
|
|
|
)
|
2019-06-24 17:19:22 +02:00
|
|
|
|
2021-02-12 08:19:30 +01:00
|
|
|
|
2019-06-24 17:19:22 +02:00
|
|
|
def restore_attachments_from_archive(archive_transaction_id: int) -> None:
|
2021-02-12 08:19:30 +01:00
|
|
|
query = SQL(
|
|
|
|
"""
|
2019-06-27 17:02:20 +02:00
|
|
|
INSERT INTO zerver_attachment ({dst_fields})
|
2019-06-24 17:19:22 +02:00
|
|
|
SELECT {src_fields}
|
|
|
|
FROM zerver_archivedattachment
|
|
|
|
INNER JOIN zerver_archivedattachment_messages
|
|
|
|
ON zerver_archivedattachment_messages.archivedattachment_id = zerver_archivedattachment.id
|
|
|
|
INNER JOIN zerver_archivedmessage
|
|
|
|
ON zerver_archivedattachment_messages.archivedmessage_id = zerver_archivedmessage.id
|
|
|
|
WHERE zerver_archivedmessage.archive_transaction_id = {archive_transaction_id}
|
|
|
|
GROUP BY zerver_archivedattachment.id
|
2019-06-27 17:02:20 +02:00
|
|
|
ON CONFLICT (id) DO NOTHING
|
2021-02-12 08:19:30 +01:00
|
|
|
"""
|
|
|
|
)
|
2020-05-04 01:51:26 +02:00
|
|
|
move_rows(
|
|
|
|
Attachment,
|
|
|
|
query,
|
2021-02-12 08:20:45 +01:00
|
|
|
src_db_table="zerver_archivedattachment",
|
2020-05-04 01:51:26 +02:00
|
|
|
archive_transaction_id=Literal(archive_transaction_id),
|
|
|
|
)
|
2019-06-24 17:19:22 +02:00
|
|
|
|
2021-02-12 08:19:30 +01:00
|
|
|
|
2019-06-24 17:19:22 +02:00
|
|
|
def restore_attachment_messages_from_archive(archive_transaction_id: int) -> None:
|
2021-02-12 08:19:30 +01:00
|
|
|
query = SQL(
|
|
|
|
"""
|
2019-06-27 17:02:20 +02:00
|
|
|
INSERT INTO zerver_attachment_messages (id, attachment_id, message_id)
|
2019-06-24 17:19:22 +02:00
|
|
|
SELECT zerver_archivedattachment_messages.id,
|
|
|
|
zerver_archivedattachment_messages.archivedattachment_id,
|
|
|
|
zerver_archivedattachment_messages.archivedmessage_id
|
|
|
|
FROM zerver_archivedattachment_messages
|
|
|
|
INNER JOIN zerver_archivedmessage
|
|
|
|
ON zerver_archivedattachment_messages.archivedmessage_id = zerver_archivedmessage.id
|
2020-05-04 01:51:26 +02:00
|
|
|
WHERE zerver_archivedmessage.archive_transaction_id = %(archive_transaction_id)s
|
2019-06-27 17:02:20 +02:00
|
|
|
ON CONFLICT (id) DO NOTHING
|
2021-02-12 08:19:30 +01:00
|
|
|
"""
|
|
|
|
)
|
2019-06-24 17:19:22 +02:00
|
|
|
with connection.cursor() as cursor:
|
2020-05-04 01:51:26 +02:00
|
|
|
cursor.execute(query, dict(archive_transaction_id=archive_transaction_id))
|
2019-06-24 17:19:22 +02:00
|
|
|
|
2021-02-12 08:19:30 +01:00
|
|
|
|
2019-06-27 16:20:00 +02:00
|
|
|
def restore_data_from_archive(archive_transaction: ArchiveTransaction) -> int:
|
2020-05-02 08:44:14 +02:00
|
|
|
logger.info("Restoring %s", archive_transaction)
|
2019-06-27 16:20:00 +02:00
|
|
|
# transaction.atomic needs to be used here, rather than being a wrapper on the whole function,
|
|
|
|
# so that when we log "Finished", the process has indeed finished - and that happens only after
|
|
|
|
# leaving the atomic block - Django does work committing the changes to the database when
|
|
|
|
# the block ends.
|
|
|
|
with transaction.atomic():
|
|
|
|
msg_ids = restore_messages_from_archive(archive_transaction.id)
|
|
|
|
restore_models_with_message_key_from_archive(archive_transaction.id)
|
|
|
|
restore_attachments_from_archive(archive_transaction.id)
|
|
|
|
restore_attachment_messages_from_archive(archive_transaction.id)
|
|
|
|
|
|
|
|
archive_transaction.restored = True
|
|
|
|
archive_transaction.save()
|
|
|
|
|
2020-05-02 08:44:14 +02:00
|
|
|
logger.info("Finished. Restored %s messages", len(msg_ids))
|
2019-06-27 16:20:00 +02:00
|
|
|
return len(msg_ids)
|
|
|
|
|
2021-02-12 08:19:30 +01:00
|
|
|
|
|
|
|
def restore_data_from_archive_by_transactions(
|
2022-06-23 20:07:19 +02:00
|
|
|
archive_transactions: Iterable[ArchiveTransaction],
|
2021-02-12 08:19:30 +01:00
|
|
|
) -> int:
|
2019-06-24 17:19:22 +02:00
|
|
|
# Looping over the list of ids means we're batching the restoration process by the size of the
|
|
|
|
# transactions:
|
2019-06-27 16:20:00 +02:00
|
|
|
message_count = 0
|
2019-06-24 17:19:22 +02:00
|
|
|
for archive_transaction in archive_transactions:
|
2019-06-27 16:20:00 +02:00
|
|
|
message_count += restore_data_from_archive(archive_transaction)
|
|
|
|
|
|
|
|
return message_count
|
2019-06-24 17:19:22 +02:00
|
|
|
|
2021-02-12 08:19:30 +01:00
|
|
|
|
2019-06-24 17:19:22 +02:00
|
|
|
def restore_data_from_archive_by_realm(realm: Realm) -> None:
|
2019-06-28 16:03:33 +02:00
|
|
|
transactions = ArchiveTransaction.objects.exclude(restored=True).filter(
|
2021-02-12 08:19:30 +01:00
|
|
|
realm=realm, type=ArchiveTransaction.RETENTION_POLICY_BASED
|
|
|
|
)
|
2020-05-02 08:44:14 +02:00
|
|
|
logger.info("Restoring %s transactions from realm %s", len(transactions), realm.string_id)
|
2019-06-27 16:20:00 +02:00
|
|
|
message_count = restore_data_from_archive_by_transactions(transactions)
|
|
|
|
|
2020-05-02 08:44:14 +02:00
|
|
|
logger.info("Finished. Restored %s messages from realm %s", message_count, realm.string_id)
|
2019-06-24 17:19:22 +02:00
|
|
|
|
2021-02-12 08:19:30 +01:00
|
|
|
|
|
|
|
def restore_all_data_from_archive(restore_manual_transactions: bool = True) -> None:
|
2019-06-24 17:19:22 +02:00
|
|
|
for realm in Realm.objects.all():
|
|
|
|
restore_data_from_archive_by_realm(realm)
|
|
|
|
|
|
|
|
if restore_manual_transactions:
|
|
|
|
restore_data_from_archive_by_transactions(
|
2021-02-12 08:19:30 +01:00
|
|
|
ArchiveTransaction.objects.exclude(restored=True).filter(
|
|
|
|
type=ArchiveTransaction.MANUAL
|
|
|
|
),
|
2019-06-24 17:19:22 +02:00
|
|
|
)
|
2019-06-25 20:01:05 +02:00
|
|
|
|
2021-02-12 08:19:30 +01:00
|
|
|
|
2020-06-22 11:26:06 +02:00
|
|
|
def restore_retention_policy_deletions_for_stream(stream: Stream) -> None:
|
|
|
|
"""
|
|
|
|
Utility function for calling in the Django shell if a stream's policy was
|
|
|
|
set to something too aggressive and the administrator wants to restore
|
|
|
|
the messages deleted as a result.
|
|
|
|
"""
|
2021-02-12 08:19:30 +01:00
|
|
|
relevant_transactions = ArchiveTransaction.objects.filter(
|
|
|
|
archivedmessage__recipient=stream.recipient, type=ArchiveTransaction.RETENTION_POLICY_BASED
|
2021-02-12 08:20:45 +01:00
|
|
|
).distinct("id")
|
2021-02-12 08:19:30 +01:00
|
|
|
|
|
|
|
restore_data_from_archive_by_transactions(list(relevant_transactions))
|
2020-06-22 11:26:06 +02:00
|
|
|
|
|
|
|
|
2019-06-25 20:01:05 +02:00
|
|
|
def clean_archived_data() -> None:
|
2022-06-08 10:14:11 +02:00
|
|
|
"""This function deletes archived data that was archived at least
|
|
|
|
settings.ARCHIVED_DATA_VACUUMING_DELAY_DAYS days ago.
|
|
|
|
|
|
|
|
It works by deleting ArchiveTransaction objects that are
|
|
|
|
sufficiently old. We've configured most archive tables, like
|
|
|
|
ArchiveMessage, with on_delete=CASCADE, so that deleting an
|
|
|
|
ArchiveTransaction entails deleting associated objects, including
|
|
|
|
ArchivedMessage, ArchivedUserMessage, ArchivedReaction.
|
|
|
|
|
|
|
|
The exception to this rule is ArchivedAttachment. Archive
|
|
|
|
attachment objects that were only referenced by ArchivedMessage
|
|
|
|
objects that have now been deleted will be left with an empty
|
|
|
|
`.messages` relation. A separate step,
|
|
|
|
delete_old_unclaimed_attachments, will delete those
|
|
|
|
ArchivedAttachment objects (and delete the files themselves from
|
|
|
|
the storage).
|
|
|
|
"""
|
2019-06-27 16:20:00 +02:00
|
|
|
logger.info("Cleaning old archive data.")
|
2019-06-25 20:01:05 +02:00
|
|
|
check_date = timezone_now() - timedelta(days=settings.ARCHIVED_DATA_VACUUMING_DELAY_DAYS)
|
2020-05-15 19:31:15 +02:00
|
|
|
# Associated archived objects will get deleted through the on_delete=CASCADE property:
|
|
|
|
count = 0
|
2021-02-12 08:19:30 +01:00
|
|
|
transaction_ids = list(
|
|
|
|
ArchiveTransaction.objects.filter(timestamp__lt=check_date).values_list("id", flat=True)
|
|
|
|
)
|
2020-05-15 19:31:15 +02:00
|
|
|
while len(transaction_ids) > 0:
|
|
|
|
transaction_block = transaction_ids[0:TRANSACTION_DELETION_BATCH_SIZE]
|
|
|
|
transaction_ids = transaction_ids[TRANSACTION_DELETION_BATCH_SIZE:]
|
|
|
|
ArchiveTransaction.objects.filter(id__in=transaction_block).delete()
|
|
|
|
count += len(transaction_block)
|
2019-06-27 16:20:00 +02:00
|
|
|
|
2020-05-02 08:44:14 +02:00
|
|
|
logger.info("Deleted %s old ArchiveTransactions.", count)
|
2020-06-21 11:14:35 +02:00
|
|
|
|
2021-02-12 08:19:30 +01:00
|
|
|
|
2020-07-05 00:33:33 +02:00
|
|
|
def parse_message_retention_days(
|
|
|
|
value: Union[int, str],
|
|
|
|
special_values_map: Mapping[str, Optional[int]],
|
|
|
|
) -> Optional[int]:
|
2022-12-12 03:39:16 +01:00
|
|
|
if isinstance(value, str) and value in special_values_map:
|
2020-06-21 11:14:35 +02:00
|
|
|
return special_values_map[value]
|
|
|
|
if isinstance(value, str) or value <= 0:
|
2021-02-12 08:20:45 +01:00
|
|
|
raise RequestVariableConversionError("message_retention_days", value)
|
2020-06-21 11:14:35 +02:00
|
|
|
assert isinstance(value, int)
|
|
|
|
return value
|