digest: Rewrite target-user algorithm as one query.

There is no reason to do this set manipulation in Python.
This commit is contained in:
Alex Vandiver 2023-08-31 01:50:10 +00:00 committed by Tim Abbott
parent 584c202d36
commit 058a168bfe
1 changed files with 22 additions and 25 deletions

View File

@ -6,6 +6,7 @@ from typing import Any, Collection, Dict, List, Set, Tuple
from django.conf import settings
from django.db import transaction
from django.db.models import Exists, OuterRef
from django.utils.timezone import now as timezone_now
from typing_extensions import TypeAlias
@ -105,46 +106,42 @@ def enqueue_emails(cutoff: datetime.datetime) -> None:
def _enqueue_emails_for_realm(realm: Realm, cutoff: datetime.datetime) -> None:
# This should only be called directly by tests. Use enqueue_emails
# to process all realms that are set up for processing on any given day.
realm_user_ids = set(
twelve_hours_ago = timezone_now() - datetime.timedelta(hours=12)
target_users = (
UserProfile.objects.filter(
realm=realm,
is_active=True,
is_bot=False,
enable_digest_emails=True,
).values_list("id", flat=True)
)
twelve_hours_ago = timezone_now() - datetime.timedelta(hours=12)
recent_user_ids = set(
RealmAuditLog.objects.filter(
realm_id=realm.id,
event_type=RealmAuditLog.USER_DIGEST_EMAIL_CREATED,
event_time__gt=twelve_hours_ago,
)
.values_list("modified_user_id", flat=True)
.distinct()
)
realm_user_ids -= recent_user_ids
active_user_ids = set(
UserActivityInterval.objects.filter(
user_profile_id__in=realm_user_ids,
end__gt=cutoff,
.alias(
recent_activity=Exists(
UserActivityInterval.objects.filter(user_profile_id=OuterRef("id"), end__gt=cutoff)
)
)
.values_list("user_profile_id", flat=True)
.distinct()
.filter(recent_activity=False)
.alias(
sent_recent_digest=Exists(
RealmAuditLog.objects.filter(
realm_id=realm.id,
event_type=RealmAuditLog.USER_DIGEST_EMAIL_CREATED,
event_time__gt=twelve_hours_ago,
modified_user_id=OuterRef("id"),
)
)
)
.filter(sent_recent_digest=False)
)
user_ids = sorted(realm_user_ids - active_user_ids)
user_ids = target_users.order_by("id").values_list("id", flat=True)
# We process batches of 30. We want a big enough batch
# to amortize work, but not so big that a single item
# from the queue takes too long to process.
chunk_size = 30
for i in range(0, len(user_ids), chunk_size):
chunk_user_ids = user_ids[i : i + chunk_size]
chunk_user_ids = list(user_ids[i : i + chunk_size])
queue_digest_user_ids(chunk_user_ids, cutoff)
logger.info(
"Queuing user_ids for potential digest: %s",