From 07dcee36b2a34d63429d7a706f880628cf3433df Mon Sep 17 00:00:00 2001 From: Prakhar Pratyush Date: Thu, 26 Sep 2024 15:48:55 +0530 Subject: [PATCH] export_realm: Add RealmExport model. Earlier, we used to store the key data related to realm exports in RealmAuditLog. This commit adds a separate table to store those data. It includes the code to migrate the concerned existing data in RealmAuditLog to RealmExport. Fixes part of #31201. --- zerver/actions/realm_export.py | 20 +- zerver/lib/export.py | 179 +++++++++++++----- zerver/management/commands/export.py | 33 ++-- ...0595_add_realmexport_table_and_backfill.py | 120 ++++++++++++ zerver/models/__init__.py | 1 + zerver/models/realms.py | 42 +++- zerver/tests/test_events.py | 13 +- zerver/tests/test_import_export.py | 20 +- zerver/tests/test_management_commands.py | 6 +- zerver/tests/test_realm_export.py | 101 +++++----- zerver/views/realm_export.py | 40 ++-- zerver/worker/deferred_work.py | 66 ++++--- 12 files changed, 426 insertions(+), 215 deletions(-) create mode 100644 zerver/migrations/0595_add_realmexport_table_and_backfill.py diff --git a/zerver/actions/realm_export.py b/zerver/actions/realm_export.py index 73efa31431..1ae86b50f3 100644 --- a/zerver/actions/realm_export.py +++ b/zerver/actions/realm_export.py @@ -2,7 +2,7 @@ from django.utils.timezone import now as timezone_now from zerver.lib.export import get_realm_exports_serialized from zerver.lib.upload import delete_export_tarball -from zerver.models import Realm, RealmAuditLog +from zerver.models import Realm, RealmExport from zerver.tornado.django_api import send_event_on_commit @@ -11,15 +11,13 @@ def notify_realm_export(realm: Realm) -> None: send_event_on_commit(realm, event, realm.get_human_admin_users().values_list("id", flat=True)) -def do_delete_realm_export(export: RealmAuditLog) -> None: - export_data = export.extra_data - export_path = export_data.get("export_path") +def do_delete_realm_export(export_row: RealmExport) -> None: + export_path = export_row.export_path + assert export_path is not None - if export_path: - # Allow removal even if the export failed. - delete_export_tarball(export_path) + delete_export_tarball(export_path) - export_data.update(deleted_timestamp=timezone_now().timestamp()) - export.extra_data = export_data - export.save(update_fields=["extra_data"]) - notify_realm_export(export.realm) + export_row.status = RealmExport.DELETED + export_row.date_deleted = timezone_now() + export_row.save(update_fields=["status", "date_deleted"]) + notify_realm_export(export_row.realm) diff --git a/zerver/lib/export.py b/zerver/lib/export.py index 75520bcb31..8a342a524c 100644 --- a/zerver/lib/export.py +++ b/zerver/lib/export.py @@ -7,6 +7,7 @@ # (2) if it doesn't belong in EXCLUDED_TABLES, add a Config object for # it to get_realm_config. import glob +import hashlib import logging import os import shutil @@ -17,6 +18,7 @@ from contextlib import suppress from datetime import datetime from functools import cache from typing import TYPE_CHECKING, Any, Optional, TypeAlias, TypedDict +from urllib.parse import urlsplit import orjson from django.apps import apps @@ -24,12 +26,14 @@ from django.conf import settings from django.db.models import Exists, OuterRef, Q from django.forms.models import model_to_dict from django.utils.timezone import is_naive as timezone_is_naive +from django.utils.timezone import now as timezone_now import zerver.lib.upload from analytics.models import RealmCount, StreamCount, UserCount from scripts.lib.zulip_tools import overwrite_symlink from zerver.lib.avatar_hash import user_avatar_base_path_from_ids from zerver.lib.pysa import mark_sanitized +from zerver.lib.timestamp import datetime_to_timestamp from zerver.lib.upload.s3 import get_bucket from zerver.models import ( AlertWord, @@ -53,6 +57,7 @@ from zerver.models import ( RealmAuthenticationMethod, RealmDomain, RealmEmoji, + RealmExport, RealmFilter, RealmPlayground, RealmUserDefault, @@ -73,7 +78,7 @@ from zerver.models import ( ) from zerver.models.presence import PresenceSequence from zerver.models.realm_audit_logs import AuditLogEventType -from zerver.models.realms import EXPORT_FULL_WITH_CONSENT, EXPORT_PUBLIC, get_realm +from zerver.models.realms import get_realm from zerver.models.users import get_system_bot, get_user_profile_by_id if TYPE_CHECKING: @@ -158,6 +163,7 @@ ALL_ZULIP_TABLES = { "zerver_realmauthenticationmethod", "zerver_realmdomain", "zerver_realmemoji", + "zerver_realmexport", "zerver_realmfilter", "zerver_realmplayground", "zerver_realmreactivationstatus", @@ -302,6 +308,13 @@ DATE_FIELDS: dict[TableName, list[Field]] = { "zerver_muteduser": ["date_muted"], "zerver_realmauditlog": ["event_time"], "zerver_realm": ["date_created"], + "zerver_realmexport": [ + "date_requested", + "date_started", + "date_succeeded", + "date_failed", + "date_deleted", + ], "zerver_scheduledmessage": ["scheduled_timestamp"], "zerver_stream": ["date_created"], "zerver_namedusergroup": ["date_created"], @@ -748,6 +761,13 @@ def get_realm_config() -> Config: include_rows="realm_id__in", ) + Config( + table="zerver_realmexport", + model=RealmExport, + normal_parent=realm_config, + include_rows="realm_id__in", + ) + Config( table="zerver_realmfilter", model=RealmFilter, @@ -1327,16 +1347,16 @@ def export_partial_message_files( ) consented_user_ids: set[int] = set() - if export_type == EXPORT_FULL_WITH_CONSENT: + if export_type == RealmExport.EXPORT_FULL_WITH_CONSENT: consented_user_ids = get_consented_user_ids(realm) - if export_type == EXPORT_PUBLIC: + if export_type == RealmExport.EXPORT_PUBLIC: recipient_streams = Stream.objects.filter(realm=realm, invite_only=False) recipient_ids = Recipient.objects.filter( type=Recipient.STREAM, type_id__in=recipient_streams ).values_list("id", flat=True) recipient_ids_for_us = get_ids(response["zerver_recipient"]) & set(recipient_ids) - elif export_type == EXPORT_FULL_WITH_CONSENT: + elif export_type == RealmExport.EXPORT_FULL_WITH_CONSENT: public_streams = Stream.objects.filter(realm=realm, invite_only=False) public_stream_recipient_ids = Recipient.objects.filter( type=Recipient.STREAM, type_id__in=public_streams @@ -1359,7 +1379,7 @@ def export_partial_message_files( # For a full export, we have implicit consent for all users in the export. consented_user_ids = user_ids_for_us - if export_type == EXPORT_PUBLIC: + if export_type == RealmExport.EXPORT_PUBLIC: messages_we_received = Message.objects.filter( # Uses index: zerver_message_realm_sender_recipient realm_id=realm.id, @@ -1385,7 +1405,7 @@ def export_partial_message_files( ) message_queries.append(messages_we_received) - if export_type == EXPORT_FULL_WITH_CONSENT: + if export_type == RealmExport.EXPORT_FULL_WITH_CONSENT: # Export with member consent requires some careful handling to make sure # we only include messages that a consenting user can access. has_usermessage_expression = Exists( @@ -1936,7 +1956,7 @@ def export_emoji_from_local( write_records_json_file(output_dir, records) -def do_write_stats_file_for_realm_export(output_dir: Path) -> None: +def do_write_stats_file_for_realm_export(output_dir: Path) -> dict[str, int | dict[str, int]]: stats_file = os.path.join(output_dir, "stats.json") realm_file = os.path.join(output_dir, "realm.json") attachment_file = os.path.join(output_dir, "attachment.json") @@ -1962,6 +1982,8 @@ def do_write_stats_file_for_realm_export(output_dir: Path) -> None: with open(stats_file, "wb") as f: f.write(orjson.dumps(stats, option=orjson.OPT_INDENT_2)) + return stats + def get_exportable_scheduled_message_ids(realm: Realm, export_type: int) -> set[int]: """ @@ -1969,10 +1991,10 @@ def get_exportable_scheduled_message_ids(realm: Realm, export_type: int) -> set[ public/consent/full export mode. """ - if export_type == EXPORT_PUBLIC: + if export_type == RealmExport.EXPORT_PUBLIC: return set() - if export_type == EXPORT_FULL_WITH_CONSENT: + if export_type == RealmExport.EXPORT_FULL_WITH_CONSENT: sender_ids = get_consented_user_ids(realm) return set( ScheduledMessage.objects.filter(sender_id__in=sender_ids, realm=realm).values_list( @@ -1990,7 +2012,7 @@ def do_export_realm( export_type: int, exportable_user_ids: set[int] | None = None, export_as_active: bool | None = None, -) -> str: +) -> tuple[str, dict[str, int | dict[str, int]]]: response: TableData = {} # We need at least one thread running to export @@ -2065,13 +2087,13 @@ def do_export_realm( launch_user_message_subprocesses( threads=threads, output_dir=output_dir, - export_full_with_consent=export_type == EXPORT_FULL_WITH_CONSENT, + export_full_with_consent=export_type == RealmExport.EXPORT_FULL_WITH_CONSENT, ) logging.info("Finished exporting %s", realm.string_id) create_soft_link(source=output_dir, in_progress=False) - do_write_stats_file_for_realm_export(output_dir) + stats = do_write_stats_file_for_realm_export(output_dir) logging.info("Compressing tarball...") tarball_path = output_dir.rstrip("/") + ".tar.gz" @@ -2083,7 +2105,7 @@ def do_export_realm( os.path.basename(output_dir), ] ) - return tarball_path + return tarball_path, stats def export_attachment_table( @@ -2441,35 +2463,97 @@ def get_consented_user_ids(realm: Realm) -> set[int]: def export_realm_wrapper( - realm: Realm, + export_row: RealmExport, output_dir: str, threads: int, upload: bool, - export_type: int, percent_callback: Callable[[Any], None] | None = None, export_as_active: bool | None = None, ) -> str | None: - tarball_path = do_export_realm( - realm=realm, - output_dir=output_dir, - threads=threads, - export_type=export_type, - export_as_active=export_as_active, - ) - shutil.rmtree(output_dir) - print(f"Tarball written to {tarball_path}") - if not upload: - return None + try: + export_row.status = RealmExport.STARTED + export_row.date_started = timezone_now() + export_row.save(update_fields=["status", "date_started"]) - print("Uploading export tarball...") - public_url = zerver.lib.upload.upload_backend.upload_export_tarball( - realm, tarball_path, percent_callback=percent_callback - ) - print(f"\nUploaded to {public_url}") + tarball_path, stats = do_export_realm( + realm=export_row.realm, + output_dir=output_dir, + threads=threads, + export_type=export_row.type, + export_as_active=export_as_active, + ) - os.remove(tarball_path) - print(f"Successfully deleted the tarball at {tarball_path}") - return public_url + RealmAuditLog.objects.create( + acting_user=export_row.acting_user, + realm=export_row.realm, + event_type=AuditLogEventType.REALM_EXPORTED, + event_time=timezone_now(), + extra_data={"realm_export_id": export_row.id}, + ) + + shutil.rmtree(output_dir) + print(f"Tarball written to {tarball_path}") + + print("Calculating SHA-256 checksum of tarball...") + + # TODO: We can directly use 'hashlib.file_digest'. (New in Python 3.11) + sha256_hash = hashlib.sha256() + with open(tarball_path, "rb") as f: + buf = bytearray(2**18) + view = memoryview(buf) + while True: + size = f.readinto(buf) + if size == 0: + break + sha256_hash.update(view[:size]) + + export_row.sha256sum_hex = sha256_hash.hexdigest() + export_row.tarball_size_bytes = os.path.getsize(tarball_path) + export_row.status = RealmExport.SUCCEEDED + export_row.date_succeeded = timezone_now() + export_row.stats = stats + + print(f"SHA-256 checksum is {export_row.sha256sum_hex}") + + if not upload: + export_row.save( + update_fields=[ + "sha256sum_hex", + "tarball_size_bytes", + "status", + "date_succeeded", + "stats", + ] + ) + return None + + print("Uploading export tarball...") + public_url = zerver.lib.upload.upload_backend.upload_export_tarball( + export_row.realm, tarball_path, percent_callback=percent_callback + ) + print(f"\nUploaded to {public_url}") + + # Update the export_path field now that the export is complete. + export_row.export_path = urlsplit(public_url).path + export_row.save( + update_fields=[ + "sha256sum_hex", + "tarball_size_bytes", + "status", + "date_succeeded", + "stats", + "export_path", + ] + ) + + os.remove(tarball_path) + print(f"Successfully deleted the tarball at {tarball_path}") + return public_url + except Exception: + export_row.status = RealmExport.FAILED + export_row.date_failed = timezone_now() + export_row.save(update_fields=["status", "date_failed"]) + raise def get_realm_exports_serialized(realm: Realm) -> list[dict[str, Any]]: @@ -2479,31 +2563,26 @@ def get_realm_exports_serialized(realm: Realm) -> list[dict[str, Any]]: # TODO: We should return those via the API as well, with an # appropriate way to express for who issued them; this requires an # API change. - all_exports = RealmAuditLog.objects.filter( - realm=realm, event_type=AuditLogEventType.REALM_EXPORTED - ).exclude(acting_user=None) + all_exports = RealmExport.objects.filter(realm=realm).exclude(acting_user=None) exports_dict = {} for export in all_exports: export_url = None - deleted_timestamp = None - failed_timestamp = None - acting_user = export.acting_user + export_path = export.export_path + pending = export.status in [RealmExport.REQUESTED, RealmExport.STARTED] - export_data = export.extra_data - - deleted_timestamp = export_data.get("deleted_timestamp") - failed_timestamp = export_data.get("failed_timestamp") - export_path = export_data.get("export_path") - - pending = deleted_timestamp is None and failed_timestamp is None and export_path is None - - if export_path is not None and not deleted_timestamp: + if export.status == RealmExport.SUCCEEDED: + assert export_path is not None export_url = zerver.lib.upload.upload_backend.get_export_tarball_url(realm, export_path) + deleted_timestamp = ( + datetime_to_timestamp(export.date_deleted) if export.date_deleted else None + ) + failed_timestamp = datetime_to_timestamp(export.date_failed) if export.date_failed else None + acting_user = export.acting_user assert acting_user is not None exports_dict[export.id] = dict( id=export.id, - export_time=export.event_time.timestamp(), + export_time=datetime_to_timestamp(export.date_requested), acting_user_id=acting_user.id, export_url=export_url, deleted_timestamp=deleted_timestamp, diff --git a/zerver/management/commands/export.py b/zerver/management/commands/export.py index d2c8c9df3c..c8be4cd774 100644 --- a/zerver/management/commands/export.py +++ b/zerver/management/commands/export.py @@ -11,13 +11,7 @@ from typing_extensions import override from zerver.actions.realm_settings import do_deactivate_realm from zerver.lib.export import export_realm_wrapper from zerver.lib.management import ZulipBaseCommand -from zerver.models import RealmAuditLog -from zerver.models.realm_audit_logs import AuditLogEventType -from zerver.models.realms import ( - EXPORT_FULL_WITH_CONSENT, - EXPORT_FULL_WITHOUT_CONSENT, - EXPORT_PUBLIC, -) +from zerver.models import RealmExport class Command(ZulipBaseCommand): @@ -169,26 +163,27 @@ class Command(ZulipBaseCommand): def percent_callback(bytes_transferred: Any) -> None: print(end=".", flush=True) - RealmAuditLog.objects.create( - acting_user=None, + if public_only: + export_type = RealmExport.EXPORT_PUBLIC + elif export_full_with_consent: + export_type = RealmExport.EXPORT_FULL_WITH_CONSENT + else: + export_type = RealmExport.EXPORT_FULL_WITHOUT_CONSENT + + export_row = RealmExport.objects.create( realm=realm, - event_type=AuditLogEventType.REALM_EXPORTED, - event_time=timezone_now(), + type=export_type, + acting_user=None, + status=RealmExport.REQUESTED, + date_requested=timezone_now(), ) # Allows us to trigger exports separately from command line argument parsing - if public_only: - export_type = EXPORT_PUBLIC - elif export_full_with_consent: - export_type = EXPORT_FULL_WITH_CONSENT - else: - export_type = EXPORT_FULL_WITHOUT_CONSENT export_realm_wrapper( - realm=realm, + export_row=export_row, output_dir=output_dir, threads=num_threads, upload=options["upload"], - export_type=export_type, percent_callback=percent_callback, export_as_active=True if options["deactivate_realm"] else None, ) diff --git a/zerver/migrations/0595_add_realmexport_table_and_backfill.py b/zerver/migrations/0595_add_realmexport_table_and_backfill.py new file mode 100644 index 0000000000..0d5e874299 --- /dev/null +++ b/zerver/migrations/0595_add_realmexport_table_and_backfill.py @@ -0,0 +1,120 @@ +# Generated by Django 5.0.9 on 2024-10-04 09:57 + +from datetime import datetime, timezone + +import django.db.models.deletion +from django.conf import settings +from django.db import migrations, models +from django.db.backends.base.schema import BaseDatabaseSchemaEditor +from django.db.migrations.state import StateApps + + +def timestamp_to_datetime(timestamp: float | None) -> datetime | None: + if timestamp is None: + return None + return datetime.fromtimestamp(float(timestamp), tz=timezone.utc) + + +def datetime_to_timestamp(dt: datetime) -> int: + return int(dt.timestamp()) + + +def backfill_realm_export(apps: StateApps, schema_editor: BaseDatabaseSchemaEditor) -> None: + REALM_EXPORTED = 206 + REQUESTED = 1 + STARTED = 2 + SUCCEEDED = 3 + FAILED = 4 + DELETED = 5 + EXPORT_PUBLIC = 1 + + RealmAuditLog = apps.get_model("zerver", "RealmAuditLog") + RealmExport = apps.get_model("zerver", "RealmExport") + + for audit_log in RealmAuditLog.objects.filter(event_type=REALM_EXPORTED).order_by("id"): + if audit_log.acting_user is None: + # This audit_log is for an export made through shell. + # We don't have enough data to determine if the export was + # successful or failed. + continue + + extra_data = audit_log.extra_data + started_timestamp = extra_data.get("started_timestamp") + failed_timestamp = extra_data.get("failed_timestamp") + deleted_timestamp = extra_data.get("deleted_timestamp") + export_path = extra_data.get("export_path") + + status = REQUESTED + if deleted_timestamp is not None: + status = DELETED + elif failed_timestamp is not None: + status = FAILED + elif export_path is not None: + status = SUCCEEDED + elif started_timestamp is not None: + status = STARTED + + # We don't have historical data to set `date_succeeded`. + RealmExport.objects.create( + realm=audit_log.realm, + acting_user=audit_log.acting_user, + type=EXPORT_PUBLIC, + status=status, + date_requested=audit_log.event_time, + date_started=timestamp_to_datetime(started_timestamp), + date_failed=timestamp_to_datetime(failed_timestamp), + date_deleted=timestamp_to_datetime(deleted_timestamp), + export_path=export_path, + ) + + +def reverse_code(apps: StateApps, schema_editor: BaseDatabaseSchemaEditor) -> None: + RealmExport = apps.get_model("zerver", "RealmExport") + + RealmExport.objects.all().delete() + + +class Migration(migrations.Migration): + dependencies = [ + ("zerver", "0594_remove_realm_user_group_edit_policy"), + ] + + operations = [ + migrations.CreateModel( + name="RealmExport", + fields=[ + ( + "id", + models.BigAutoField( + auto_created=True, primary_key=True, serialize=False, verbose_name="ID" + ), + ), + ("type", models.PositiveSmallIntegerField(default=1)), + ("status", models.PositiveSmallIntegerField(default=1)), + ("date_requested", models.DateTimeField()), + ("date_started", models.DateTimeField(default=None, null=True)), + ("date_succeeded", models.DateTimeField(default=None, null=True)), + ("date_failed", models.DateTimeField(default=None, null=True)), + ("date_deleted", models.DateTimeField(default=None, null=True)), + ("export_path", models.TextField(default=None, null=True)), + ("sha256sum_hex", models.CharField(default=None, max_length=64, null=True)), + ("tarball_size_bytes", models.PositiveIntegerField(default=None, null=True)), + ("stats", models.JSONField(default=None, null=True)), + ( + "acting_user", + models.ForeignKey( + null=True, + on_delete=django.db.models.deletion.SET_NULL, + to=settings.AUTH_USER_MODEL, + ), + ), + ( + "realm", + models.ForeignKey( + on_delete=django.db.models.deletion.CASCADE, to="zerver.realm" + ), + ), + ], + ), + migrations.RunPython(backfill_realm_export, reverse_code=reverse_code, elidable=True), + ] diff --git a/zerver/models/__init__.py b/zerver/models/__init__.py index f86ae2b4bf..0e791cbb31 100644 --- a/zerver/models/__init__.py +++ b/zerver/models/__init__.py @@ -49,6 +49,7 @@ from zerver.models.realm_playgrounds import RealmPlayground as RealmPlayground from zerver.models.realms import Realm as Realm from zerver.models.realms import RealmAuthenticationMethod as RealmAuthenticationMethod from zerver.models.realms import RealmDomain as RealmDomain +from zerver.models.realms import RealmExport as RealmExport from zerver.models.recipients import DirectMessageGroup as DirectMessageGroup from zerver.models.recipients import Recipient as Recipient from zerver.models.saved_snippets import SavedSnippet as SavedSnippet diff --git a/zerver/models/realms.py b/zerver/models/realms.py index 8870bd0349..5ba4501636 100644 --- a/zerver/models/realms.py +++ b/zerver/models/realms.py @@ -1313,12 +1313,36 @@ def get_fake_email_domain(realm_host: str) -> str: return settings.FAKE_EMAIL_DOMAIN -# TODO: Move this into a new ReamExport model. -EXPORT_PUBLIC = 1 -EXPORT_FULL_WITH_CONSENT = 2 -EXPORT_FULL_WITHOUT_CONSENT = 3 -EXPORT_TYPES = [ - EXPORT_PUBLIC, - EXPORT_FULL_WITH_CONSENT, - EXPORT_FULL_WITHOUT_CONSENT, -] +class RealmExport(models.Model): + """Every data export is recorded in this table.""" + + realm = models.ForeignKey(Realm, on_delete=CASCADE) + + EXPORT_PUBLIC = 1 + EXPORT_FULL_WITH_CONSENT = 2 + EXPORT_FULL_WITHOUT_CONSENT = 3 + EXPORT_TYPES = [ + EXPORT_PUBLIC, + EXPORT_FULL_WITH_CONSENT, + EXPORT_FULL_WITHOUT_CONSENT, + ] + type = models.PositiveSmallIntegerField(default=EXPORT_PUBLIC) + + REQUESTED = 1 + STARTED = 2 + SUCCEEDED = 3 + FAILED = 4 + DELETED = 5 + status = models.PositiveSmallIntegerField(default=REQUESTED) + + date_requested = models.DateTimeField() + date_started = models.DateTimeField(default=None, null=True) + date_succeeded = models.DateTimeField(default=None, null=True) + date_failed = models.DateTimeField(default=None, null=True) + date_deleted = models.DateTimeField(default=None, null=True) + + acting_user = models.ForeignKey("UserProfile", null=True, on_delete=models.SET_NULL) + export_path = models.TextField(default=None, null=True) + sha256sum_hex = models.CharField(default=None, null=True, max_length=64) + tarball_size_bytes = models.PositiveIntegerField(default=None, null=True) + stats = models.JSONField(default=None, null=True) diff --git a/zerver/tests/test_events.py b/zerver/tests/test_events.py index 1ce4c6e804..f0cb6ac4d9 100644 --- a/zerver/tests/test_events.py +++ b/zerver/tests/test_events.py @@ -238,6 +238,7 @@ from zerver.models import ( Realm, RealmAuditLog, RealmDomain, + RealmExport, RealmFilter, RealmPlayground, RealmUserDefault, @@ -3378,7 +3379,7 @@ class NormalActionsTest(BaseAction): with mock.patch( "zerver.lib.export.do_export_realm", - return_value=create_dummy_file("test-export.tar.gz"), + return_value=(create_dummy_file("test-export.tar.gz"), dict()), ): with ( stdout_suppressed(), @@ -3407,13 +3408,11 @@ class NormalActionsTest(BaseAction): ) # Now we check the deletion of the export. - audit_log_entry = RealmAuditLog.objects.filter( - event_type=AuditLogEventType.REALM_EXPORTED - ).first() - assert audit_log_entry is not None - audit_log_entry_id = audit_log_entry.id + export_row = RealmExport.objects.first() + assert export_row is not None + export_row_id = export_row.id with self.verify_action(state_change_expected=False, num_events=1) as events: - self.client_delete(f"/json/export/realm/{audit_log_entry_id}") + self.client_delete(f"/json/export/realm/{export_row_id}") check_realm_export( "events[0]", diff --git a/zerver/tests/test_import_export.py b/zerver/tests/test_import_export.py index 9f841d4249..cc955f2e38 100644 --- a/zerver/tests/test_import_export.py +++ b/zerver/tests/test_import_export.py @@ -76,6 +76,7 @@ from zerver.models import ( Realm, RealmAuditLog, RealmEmoji, + RealmExport, RealmUserDefault, Recipient, ScheduledMessage, @@ -94,12 +95,7 @@ from zerver.models.clients import get_client from zerver.models.groups import SystemGroups from zerver.models.presence import PresenceSequence from zerver.models.realm_audit_logs import AuditLogEventType -from zerver.models.realms import ( - EXPORT_FULL_WITH_CONSENT, - EXPORT_FULL_WITHOUT_CONSENT, - EXPORT_PUBLIC, - get_realm, -) +from zerver.models.realms import get_realm from zerver.models.recipients import get_direct_message_group_hash from zerver.models.streams import get_active_streams, get_stream from zerver.models.users import get_system_bot, get_user_by_delivery_email @@ -364,13 +360,13 @@ class RealmImportExportTest(ExportFile): export_usermessages_batch( input_path=os.path.join(output_dir, "messages-000001.json.partial"), output_path=os.path.join(output_dir, "messages-000001.json"), - export_full_with_consent=export_type == EXPORT_FULL_WITH_CONSENT, + export_full_with_consent=export_type == RealmExport.EXPORT_FULL_WITH_CONSENT, ) def export_realm_and_create_auditlog( self, original_realm: Realm, - export_type: int = EXPORT_FULL_WITHOUT_CONSENT, + export_type: int = RealmExport.EXPORT_FULL_WITHOUT_CONSENT, exportable_user_ids: set[int] | None = None, ) -> None: RealmAuditLog.objects.create( @@ -413,7 +409,7 @@ class RealmImportExportTest(ExportFile): is_message_realm_public=True, ) - self.export_realm_and_create_auditlog(realm, export_type=EXPORT_PUBLIC) + self.export_realm_and_create_auditlog(realm, export_type=RealmExport.EXPORT_PUBLIC) # The attachment row shouldn't have been exported: self.assertEqual(read_json("attachment.json")["zerver_attachment"], []) @@ -662,7 +658,9 @@ class RealmImportExportTest(ExportFile): self.example_user("hamlet"), "allow_private_data_export", True, acting_user=None ) - self.export_realm_and_create_auditlog(realm, export_type=EXPORT_FULL_WITH_CONSENT) + self.export_realm_and_create_auditlog( + realm, export_type=RealmExport.EXPORT_FULL_WITH_CONSENT + ) data = read_json("realm.json") @@ -1003,7 +1001,7 @@ class RealmImportExportTest(ExportFile): for f in getters: snapshots[f.__name__] = f(original_realm) - self.export_realm(original_realm, export_type=EXPORT_FULL_WITHOUT_CONSENT) + self.export_realm(original_realm, export_type=RealmExport.EXPORT_FULL_WITHOUT_CONSENT) with self.settings(BILLING_ENABLED=False), self.assertLogs(level="INFO"): do_import_realm(get_output_dir(), "test-zulip") diff --git a/zerver/tests/test_management_commands.py b/zerver/tests/test_management_commands.py index 39aaebf020..c91bbd4133 100644 --- a/zerver/tests/test_management_commands.py +++ b/zerver/tests/test_management_commands.py @@ -20,7 +20,7 @@ from zerver.lib.management import ZulipBaseCommand, check_config from zerver.lib.test_classes import ZulipTestCase from zerver.lib.test_helpers import most_recent_message, stdout_suppressed from zerver.models import Realm, Recipient, UserProfile -from zerver.models.realms import EXPORT_FULL_WITH_CONSENT, get_realm +from zerver.models.realms import get_realm from zerver.models.streams import get_stream from zerver.models.users import get_user_profile_by_email @@ -515,7 +515,6 @@ class TestExport(ZulipTestCase): COMMAND_NAME = "export" def test_command_to_export_full_with_consent(self) -> None: - realm = get_realm("zulip") do_change_user_setting( self.example_user("iago"), "allow_private_data_export", True, acting_user=None ) @@ -529,8 +528,7 @@ class TestExport(ZulipTestCase): ): call_command(self.COMMAND_NAME, "-r=zulip", "--export-full-with-consent") m.assert_called_once_with( - realm=realm, - export_type=EXPORT_FULL_WITH_CONSENT, + export_row=mock.ANY, threads=mock.ANY, output_dir=mock.ANY, percent_callback=mock.ANY, diff --git a/zerver/tests/test_realm_export.py b/zerver/tests/test_realm_export.py index 4729ed70c0..a1da2c8fa2 100644 --- a/zerver/tests/test_realm_export.py +++ b/zerver/tests/test_realm_export.py @@ -18,9 +18,7 @@ from zerver.lib.test_helpers import ( stdout_suppressed, use_s3_backend, ) -from zerver.models import Realm, RealmAuditLog, UserProfile -from zerver.models.realm_audit_logs import AuditLogEventType -from zerver.models.realms import EXPORT_PUBLIC +from zerver.models import Realm, RealmExport, UserProfile from zerver.views.realm_export import export_realm @@ -48,7 +46,9 @@ class RealmExportTest(ZulipTestCase): tarball_path = create_dummy_file("test-export.tar.gz") # Test the export logic. - with patch("zerver.lib.export.do_export_realm", return_value=tarball_path) as mock_export: + with patch( + "zerver.lib.export.do_export_realm", return_value=(tarball_path, dict()) + ) as mock_export: with ( self.settings(LOCAL_UPLOADS_DIR=None), stdout_suppressed(), @@ -61,19 +61,19 @@ class RealmExportTest(ZulipTestCase): self.assertFalse(os.path.exists(tarball_path)) args = mock_export.call_args_list[0][1] self.assertEqual(args["realm"], admin.realm) - self.assertEqual(args["export_type"], EXPORT_PUBLIC) + self.assertEqual(args["export_type"], RealmExport.EXPORT_PUBLIC) self.assertTrue(os.path.basename(args["output_dir"]).startswith("zulip-export-")) self.assertEqual(args["threads"], 6) # Get the entry and test that iago initiated it. - audit_log_entry = RealmAuditLog.objects.filter( - event_type=AuditLogEventType.REALM_EXPORTED - ).first() - assert audit_log_entry is not None - self.assertEqual(audit_log_entry.acting_user_id, admin.id) + export_row = RealmExport.objects.first() + assert export_row is not None + self.assertEqual(export_row.acting_user_id, admin.id) + self.assertEqual(export_row.status, RealmExport.SUCCEEDED) # Test that the file is hosted, and the contents are as expected. - export_path = audit_log_entry.extra_data["export_path"] + export_path = export_row.export_path + assert export_path is not None assert export_path.startswith("/") path_id = export_path.removeprefix("/") self.assertEqual(bucket.Object(path_id).get()["Body"].read(), b"zulip!") @@ -83,7 +83,7 @@ class RealmExportTest(ZulipTestCase): # Test that the export we have is the export we created. export_dict = response_dict["exports"] - self.assertEqual(export_dict[0]["id"], audit_log_entry.id) + self.assertEqual(export_dict[0]["id"], export_row.id) parsed_url = urlsplit(export_dict[0]["export_url"]) self.assertEqual( parsed_url._replace(query="").geturl(), @@ -92,22 +92,20 @@ class RealmExportTest(ZulipTestCase): self.assertEqual(export_dict[0]["acting_user_id"], admin.id) self.assert_length( export_dict, - RealmAuditLog.objects.filter( - realm=admin.realm, event_type=AuditLogEventType.REALM_EXPORTED - ).count(), + RealmExport.objects.filter(realm=admin.realm).count(), ) # Finally, delete the file. - result = self.client_delete(f"/json/export/realm/{audit_log_entry.id}") + result = self.client_delete(f"/json/export/realm/{export_row.id}") self.assert_json_success(result) with self.assertRaises(botocore.exceptions.ClientError): bucket.Object(path_id).load() - # Try to delete an export with a `deleted_timestamp` key. - audit_log_entry.refresh_from_db() - export_data = audit_log_entry.extra_data - self.assertIn("deleted_timestamp", export_data) - result = self.client_delete(f"/json/export/realm/{audit_log_entry.id}") + # Try to delete an export with a `DELETED` status. + export_row.refresh_from_db() + self.assertEqual(export_row.status, RealmExport.DELETED) + self.assertIsNotNone(export_row.date_deleted) + result = self.client_delete(f"/json/export/realm/{export_row.id}") self.assert_json_error(result, "Export already deleted") # Now try to delete a non-existent export. @@ -127,9 +125,9 @@ class RealmExportTest(ZulipTestCase): export_type: int, exportable_user_ids: set[int] | None = None, export_as_active: bool | None = None, - ) -> str: + ) -> tuple[str, dict[str, int | dict[str, int]]]: self.assertEqual(realm, admin.realm) - self.assertEqual(export_type, EXPORT_PUBLIC) + self.assertEqual(export_type, RealmExport.EXPORT_PUBLIC) self.assertTrue(os.path.basename(output_dir).startswith("zulip-export-")) self.assertEqual(threads, 6) @@ -149,7 +147,7 @@ class RealmExportTest(ZulipTestCase): result = self.client_delete(f"/json/export/realm/{id}") self.assert_json_error(result, "Export still in progress") - return tarball_path + return tarball_path, dict() with patch( "zerver.lib.export.do_export_realm", side_effect=fake_export_realm @@ -166,15 +164,15 @@ class RealmExportTest(ZulipTestCase): self.assertFalse(os.path.exists(tarball_path)) # Get the entry and test that iago initiated it. - audit_log_entry = RealmAuditLog.objects.filter( - event_type=AuditLogEventType.REALM_EXPORTED - ).first() - assert audit_log_entry is not None - self.assertEqual(audit_log_entry.id, data["id"]) - self.assertEqual(audit_log_entry.acting_user_id, admin.id) + export_row = RealmExport.objects.first() + assert export_row is not None + self.assertEqual(export_row.id, data["id"]) + self.assertEqual(export_row.acting_user_id, admin.id) + self.assertEqual(export_row.status, RealmExport.SUCCEEDED) # Test that the file is hosted, and the contents are as expected. - export_path = audit_log_entry.extra_data.get("export_path") + export_path = export_row.export_path + assert export_path is not None response = self.client_get(export_path) self.assertEqual(response.status_code, 200) self.assertEqual(response.getvalue(), b"zulip!") @@ -184,27 +182,22 @@ class RealmExportTest(ZulipTestCase): # Test that the export we have is the export we created. export_dict = response_dict["exports"] - self.assertEqual(export_dict[0]["id"], audit_log_entry.id) + self.assertEqual(export_dict[0]["id"], export_row.id) self.assertEqual(export_dict[0]["export_url"], admin.realm.url + export_path) self.assertEqual(export_dict[0]["acting_user_id"], admin.id) - self.assert_length( - export_dict, - RealmAuditLog.objects.filter( - realm=admin.realm, event_type=AuditLogEventType.REALM_EXPORTED - ).count(), - ) + self.assert_length(export_dict, RealmExport.objects.filter(realm=admin.realm).count()) # Finally, delete the file. - result = self.client_delete(f"/json/export/realm/{audit_log_entry.id}") + result = self.client_delete(f"/json/export/realm/{export_row.id}") self.assert_json_success(result) response = self.client_get(export_path) self.assertEqual(response.status_code, 404) - # Try to delete an export with a `deleted_timestamp` key. - audit_log_entry.refresh_from_db() - export_data = audit_log_entry.extra_data - self.assertIn("deleted_timestamp", export_data) - result = self.client_delete(f"/json/export/realm/{audit_log_entry.id}") + # Try to delete an export with a `DELETED` status. + export_row.refresh_from_db() + self.assertEqual(export_row.status, RealmExport.DELETED) + self.assertIsNotNone(export_row.date_deleted) + result = self.client_delete(f"/json/export/realm/{export_row.id}") self.assert_json_error(result, "Export already deleted") # Now try to delete a non-existent export. @@ -244,6 +237,9 @@ class RealmExportTest(ZulipTestCase): self.assertIsNotNone(export_dict[0]["failed_timestamp"]) self.assertEqual(export_dict[0]["acting_user_id"], admin.id) + export_row = RealmExport.objects.get(id=export_id) + self.assertEqual(export_row.status, RealmExport.FAILED) + # Check that we can't delete it result = self.client_delete(f"/json/export/realm/{export_id}") self.assert_json_error(result, "Export failed, nothing to delete") @@ -258,10 +254,8 @@ class RealmExportTest(ZulipTestCase): "deferred_work", { "type": "realm_export", - "time": 42, - "realm_id": admin.realm.id, "user_profile_id": admin.id, - "id": export_id, + "realm_export_id": export_id, }, ) mock_export.assert_not_called() @@ -279,18 +273,19 @@ class RealmExportTest(ZulipTestCase): admin = self.example_user("iago") self.login_user(admin) - current_log = RealmAuditLog.objects.filter(event_type=AuditLogEventType.REALM_EXPORTED) - self.assert_length(current_log, 0) + export_rows = RealmExport.objects.all() + self.assert_length(export_rows, 0) exports = [ - RealmAuditLog( + RealmExport( realm=admin.realm, - event_type=AuditLogEventType.REALM_EXPORTED, - event_time=timezone_now(), + type=RealmExport.EXPORT_PUBLIC, + date_requested=timezone_now(), + acting_user=admin, ) for i in range(5) ] - RealmAuditLog.objects.bulk_create(exports) + RealmExport.objects.bulk_create(exports) with self.assertRaises(JsonableError) as error: export_realm(HostRequestMock(), admin) diff --git a/zerver/views/realm_export.py b/zerver/views/realm_export.py index ec0d0c5d78..98a517f351 100644 --- a/zerver/views/realm_export.py +++ b/zerver/views/realm_export.py @@ -13,16 +13,13 @@ from zerver.lib.exceptions import JsonableError from zerver.lib.export import get_realm_exports_serialized from zerver.lib.queue import queue_json_publish from zerver.lib.response import json_success -from zerver.models import RealmAuditLog, UserProfile -from zerver.models.realm_audit_logs import AuditLogEventType +from zerver.models import RealmExport, UserProfile @transaction.atomic(durable=True) @require_realm_admin def export_realm(request: HttpRequest, user: UserProfile) -> HttpResponse: # Currently only supports public-data-only exports. - event_type = AuditLogEventType.REALM_EXPORTED - event_time = timezone_now() realm = user.realm EXPORT_LIMIT = 5 @@ -37,9 +34,9 @@ def export_realm(request: HttpRequest, user: UserProfile) -> HttpResponse: # Filter based upon the number of events that have occurred in the delta # If we are at the limit, the incoming request is rejected - event_time_delta = event_time - timedelta(days=7) - limit_check = RealmAuditLog.objects.filter( - realm=realm, event_type=event_type, event_time__gte=event_time_delta + event_time_delta = timezone_now() - timedelta(days=7) + limit_check = RealmExport.objects.filter( + realm=realm, date_requested__gte=event_time_delta ).count() if limit_check >= EXPORT_LIMIT: raise JsonableError(_("Exceeded rate limit.")) @@ -68,8 +65,12 @@ def export_realm(request: HttpRequest, user: UserProfile) -> HttpResponse: ) ) - row = RealmAuditLog.objects.create( - realm=realm, event_type=event_type, event_time=event_time, acting_user=user + row = RealmExport.objects.create( + realm=realm, + type=RealmExport.EXPORT_PUBLIC, + acting_user=user, + status=RealmExport.REQUESTED, + date_requested=timezone_now(), ) # Allow for UI updates on a pending export @@ -79,10 +80,8 @@ def export_realm(request: HttpRequest, user: UserProfile) -> HttpResponse: # killing the process after 60s event = { "type": "realm_export", - "time": event_time, - "realm_id": realm.id, "user_profile_id": user.id, - "id": row.id, + "realm_export_id": row.id, } transaction.on_commit(lambda: queue_json_publish("deferred_work", event)) return json_success(request, data={"id": row.id}) @@ -97,20 +96,17 @@ def get_realm_exports(request: HttpRequest, user: UserProfile) -> HttpResponse: @require_realm_admin def delete_realm_export(request: HttpRequest, user: UserProfile, export_id: int) -> HttpResponse: try: - audit_log_entry = RealmAuditLog.objects.get( - id=export_id, realm=user.realm, event_type=AuditLogEventType.REALM_EXPORTED - ) - except RealmAuditLog.DoesNotExist: + export_row = RealmExport.objects.get(id=export_id) + except RealmExport.DoesNotExist: raise JsonableError(_("Invalid data export ID")) - export_data = audit_log_entry.extra_data - if export_data.get("deleted_timestamp") is not None: + if export_row.status == RealmExport.DELETED: raise JsonableError(_("Export already deleted")) - if export_data.get("export_path") is None: - if export_data.get("failed_timestamp") is not None: - raise JsonableError(_("Export failed, nothing to delete")) + if export_row.status == RealmExport.FAILED: + raise JsonableError(_("Export failed, nothing to delete")) + if export_row.status in [RealmExport.REQUESTED, RealmExport.STARTED]: raise JsonableError(_("Export still in progress")) - do_delete_realm_export(audit_log_entry) + do_delete_realm_export(export_row) return json_success(request) diff --git a/zerver/worker/deferred_work.py b/zerver/worker/deferred_work.py index 9de5458b76..3d91876a29 100644 --- a/zerver/worker/deferred_work.py +++ b/zerver/worker/deferred_work.py @@ -3,7 +3,6 @@ import logging import tempfile import time from typing import Any -from urllib.parse import urlsplit from django.conf import settings from django.db import transaction @@ -25,8 +24,7 @@ from zerver.lib.remote_server import ( ) from zerver.lib.soft_deactivation import reactivate_user_if_soft_deactivated from zerver.lib.upload import handle_reupload_emojis_event -from zerver.models import Message, Realm, RealmAuditLog, Stream, UserMessage -from zerver.models.realms import EXPORT_PUBLIC +from zerver.models import Message, Realm, RealmAuditLog, RealmExport, Stream, UserMessage from zerver.models.users import get_system_bot, get_user_profile_by_id from zerver.worker.base import QueueProcessingWorker, assign_queue @@ -126,60 +124,70 @@ class DeferredWorker(QueueProcessingWorker): retry_event(self.queue_name, event, failure_processor) elif event["type"] == "realm_export": - realm = Realm.objects.get(id=event["realm_id"]) output_dir = tempfile.mkdtemp(prefix="zulip-export-") - export_event = RealmAuditLog.objects.get(id=event["id"]) user_profile = get_user_profile_by_id(event["user_profile_id"]) - extra_data = export_event.extra_data - if extra_data.get("started_timestamp") is not None: + realm = user_profile.realm + export_event = None + + if "realm_export_id" in event: + export_row = RealmExport.objects.get(id=event["realm_export_id"]) + else: + # Handle existing events in the queue before we switched to RealmExport model. + export_event = RealmAuditLog.objects.get(id=event["id"]) + extra_data = export_event.extra_data + + if extra_data.get("export_row_id") is not None: + export_row = RealmExport.objects.get(id=extra_data["export_row_id"]) + else: + export_row = RealmExport.objects.create( + realm=realm, + type=RealmExport.EXPORT_PUBLIC, + acting_user=user_profile, + status=RealmExport.REQUESTED, + date_requested=event["time"], + ) + export_event.extra_data = {"export_row_id": export_row.id} + export_event.save(update_fields=["extra_data"]) + + if export_row.status != RealmExport.REQUESTED: logger.error( "Marking export for realm %s as failed due to retry -- possible OOM during export?", realm.string_id, ) - extra_data["failed_timestamp"] = timezone_now().timestamp() - export_event.extra_data = extra_data - export_event.save(update_fields=["extra_data"]) + export_row.status = RealmExport.FAILED + export_row.date_failed = timezone_now() + export_row.save(update_fields=["status", "date_failed"]) notify_realm_export(realm) return - extra_data["started_timestamp"] = timezone_now().timestamp() - export_event.extra_data = extra_data - export_event.save(update_fields=["extra_data"]) - logger.info( "Starting realm export for realm %s into %s, initiated by user_profile_id %s", realm.string_id, output_dir, - event["user_profile_id"], + user_profile.id, ) try: - public_url = export_realm_wrapper( - realm=realm, + export_realm_wrapper( + export_row=export_row, output_dir=output_dir, threads=1 if self.threaded else 6, upload=True, - export_type=EXPORT_PUBLIC, ) except Exception: - extra_data["failed_timestamp"] = timezone_now().timestamp() - export_event.extra_data = extra_data - export_event.save(update_fields=["extra_data"]) logging.exception( "Data export for %s failed after %s", - user_profile.realm.string_id, + realm.string_id, time.time() - start, stack_info=True, ) notify_realm_export(realm) return - assert public_url is not None - - # Update the extra_data field now that the export is complete. - extra_data["export_path"] = urlsplit(public_url).path - export_event.extra_data = extra_data - export_event.save(update_fields=["extra_data"]) + # We create RealmAuditLog entry in 'export_realm_wrapper'. + # Delete the old entry created before we switched to RealmExport model. + if export_event: + export_event.delete() # Send a direct message notification letting the user who # triggered the export know the export finished. @@ -198,7 +206,7 @@ class DeferredWorker(QueueProcessingWorker): notify_realm_export(realm) logging.info( "Completed data export for %s in %s", - user_profile.realm.string_id, + realm.string_id, time.time() - start, ) elif event["type"] == "reupload_realm_emoji":