migrations: Backfill missing RealmAuditLog entries for subscriptions.

Backfill subscription realm audit log SUBSCRIPTION_CREATED events for
users which are currently subscribed but don't have any subscription
events, presumably due to some historical bug.  This is important
because those rows are necessary when reactivating a user who is
currently soft-deactivated.

For each stream, we find the subscribed users who have no
subscription-related realm audit log entries, and create a
`backfill=True` subscription audit log entry which is the latest it
could have been, based on UserMessage rows.  We then optionally insert
a `DEACTIVATION` if the current subscription is not active.
This commit is contained in:
Alex Vandiver 2023-04-19 19:28:58 +00:00 committed by Tim Abbott
parent 74e6367455
commit ca2ca030d2
2 changed files with 154 additions and 1 deletions

View File

@ -178,7 +178,9 @@ def add_missing_messages(user_profile: UserProfile) -> None:
# That second tiebreak is important in case a user is subscribed
# and then unsubscribed without any messages being sent in the
# meantime. Without that tiebreak, we could end up incorrectly
# processing the ordering of those two subscription changes.
# processing the ordering of those two subscription changes. Note
# that this means we cannot backfill events unless there are no
# pre-existing events for this stream/user pair!
subscription_logs = list(
RealmAuditLog.objects.filter(
modified_user=user_profile, modified_stream_id__in=stream_ids, event_type__in=events

View File

@ -0,0 +1,151 @@
# Generated by Django 4.2 on 2023-04-19 18:18
from django.db import migrations, transaction
from django.db.backends.base.schema import BaseDatabaseSchemaEditor
from django.db.migrations.state import StateApps
from django.db.models import Max, Min
from django.utils.timezone import now as timezone_now
def backfill_missing_subscriptions(
apps: StateApps, schema_editor: BaseDatabaseSchemaEditor
) -> None:
"""Backfill subscription realm audit log events for users which are
currently subscribed but don't have any, presumably due to some
historical bug. This is important because those rows are
necessary when reactivating a user who is currently
soft-deactivated.
For each stream, we find the subscribed users who have no relevant
realm audit log entries, and create a backfill=True subscription
audit log entry which is the latest it could have been, based on
UserMessage rows.
"""
Stream = apps.get_model("zerver", "Stream")
RealmAuditLog = apps.get_model("zerver", "RealmAuditLog")
Subscription = apps.get_model("zerver", "Subscription")
UserMessage = apps.get_model("zerver", "UserMessage")
Message = apps.get_model("zerver", "Message")
def get_last_message_id() -> int:
# We generally use this function to populate RealmAuditLog, and
# the max id here is actually system-wide, not per-realm. I
# assume there's some advantage in not filtering by realm.
last_id = Message.objects.aggregate(Max("id"))["id__max"]
if last_id is None:
# During initial realm creation, there might be 0 messages in
# the database; in that case, the `aggregate` query returns
# None. Since we want an int for "beginning of time", use -1.
last_id = -1
return last_id
for stream in Stream.objects.all():
with transaction.atomic():
subscribed_user_ids = set(
Subscription.objects.filter(recipient_id=stream.recipient_id).values_list(
"user_profile_id", flat=True
)
)
user_ids_in_audit_log = set(
RealmAuditLog.objects.filter(
realm=stream.realm,
event_type__in=[
301, # RealmAuditLog.SUBSCRIPTION_CREATED
302, # RealmAuditLog.SUBSCRIPTION_ACTIVATED
303, # RealmAuditLog.SUBSCRIPTION_DEACTIVATED
],
modified_stream=stream,
)
.distinct("modified_user_id")
.values_list("modified_user_id", flat=True)
)
user_ids_missing_events = subscribed_user_ids - user_ids_in_audit_log
if not user_ids_missing_events:
continue
last_message_id = get_last_message_id()
now = timezone_now()
backfills = []
for user_id in sorted(user_ids_missing_events):
print(
f"Backfilling subscription event for {user_id} in stream {stream.id} in realm {stream.realm.string_id}"
)
aggregated = UserMessage.objects.filter(
user_profile_id=user_id,
message__recipient=stream.recipient_id,
).aggregate(
earliest_date=Min("message__date_sent"),
earliest_message_id=Min("message_id"),
latest_date=Max("message__date_sent"),
latest_message_id=Max("message_id"),
)
# Assume we subscribed right before the first message we
# saw -- or, if we don't see any, right now. This makes
# this safe for streams which do not have shared history.
if aggregated["earliest_message_id"] is not None:
event_last_message_id = aggregated["earliest_message_id"] - 1
else:
event_last_message_id = last_message_id
if aggregated["earliest_date"] is not None:
event_time = aggregated["earliest_date"]
else:
event_time = now
log_event = RealmAuditLog(
event_time=event_time,
event_last_message_id=event_last_message_id,
backfilled=True,
event_type=301, # RealmAuditLog.SUBSCRIPTION_CREATED
realm_id=stream.realm_id,
modified_user_id=user_id,
modified_stream_id=stream.id,
)
backfills.append(log_event)
# If the subscription is not active, then we also need
# to manufacture a SUBSCRIPTION_DEACTIVATED event,
# which we assume to be whenever the last received
# UserMessage row was.
sub = Subscription.objects.get(
user_profile_id=user_id, recipient_id=stream.recipient_id
)
if sub.active:
continue
if aggregated["latest_message_id"] is not None:
event_last_message_id = aggregated["latest_message_id"]
else:
event_last_message_id = last_message_id
if aggregated["latest_date"] is not None:
event_time = aggregated["latest_date"]
else:
event_time = now
deactivated_log_event = RealmAuditLog(
event_time=event_time,
event_last_message_id=event_last_message_id,
backfilled=True,
event_type=303, # RealmAuditLog.SUBSCRIPTION_DEACTIVATED
realm_id=stream.realm_id,
modified_user_id=user_id,
modified_stream_id=stream.id,
)
backfills.append(deactivated_log_event)
RealmAuditLog.objects.bulk_create(backfills)
class Migration(migrations.Migration):
atomic = False
dependencies = [
("zerver", "0449_scheduledmessage_zerver_unsent_scheduled_messages_indexes"),
]
operations = [
migrations.RunPython(
backfill_missing_subscriptions, reverse_code=migrations.RunPython.noop, elidable=True
)
]