email digests: Process users in chunks of 30.

This should make the queue empty more quickly,
because we do bulk queries to prevent database
hops.
This commit is contained in:
Steve Howell 2020-11-13 17:13:13 +00:00 committed by Tim Abbott
parent e0b451730a
commit bfa0bdf3d6
3 changed files with 39 additions and 23 deletions

View File

@ -85,10 +85,10 @@ def should_process_digest(realm_str: str) -> bool:
# Changes to this should also be reflected in
# zerver/worker/queue_processors.py:DigestWorker.consume()
def queue_digest_recipient(user_id: int, cutoff: datetime.datetime) -> None:
def queue_digest_user_ids(user_ids: List[int], cutoff: datetime.datetime) -> None:
# Convert cutoff to epoch seconds for transit.
event = {
"user_profile_id": user_id,
"user_profile_id": user_ids,
"cutoff": cutoff.strftime('%s')
}
queue_json_publish("digest_emails", event)
@ -127,13 +127,19 @@ def _enqueue_emails_for_realm(realm: Realm, cutoff: datetime.datetime) -> None:
end__gt=cutoff,
).values_list('user_profile_id', flat=True).distinct())
user_ids = realm_user_ids - active_user_ids
user_ids = list(realm_user_ids - active_user_ids)
user_ids.sort()
for user_id in user_ids:
queue_digest_recipient(user_id, cutoff)
# We process batches of 30. We want a big enough batch
# to amorize work, but not so big that a single item
# from the queue takes too long to process.
chunk_size = 30
for i in range(0, len(user_ids), chunk_size):
chunk_user_ids = user_ids[i:i + chunk_size]
queue_digest_user_ids(chunk_user_ids, cutoff)
logger.info(
"User %s is inactive, queuing for potential digest",
user_id,
"Queuing user_ids for potential digest: %s",
chunk_user_ids,
)
def get_recent_topics(

View File

@ -283,19 +283,20 @@ class TestDigestEmailMessages(ZulipTestCase):
cutoff = timezone_now() - datetime.timedelta(days=5)
with mock.patch('zerver.lib.digest.queue_digest_recipient') as queue_mock:
with mock.patch('zerver.lib.digest.queue_digest_user_ids') as queue_mock:
_enqueue_emails_for_realm(realm, cutoff)
users = self.active_human_users(realm)
self.assertEqual(queue_mock.call_count, len(users))
num_queued_users = len(queue_mock.call_args[0][0])
self.assertEqual(num_queued_users, len(users))
# Simulate that we have sent digests for all our users.
bulk_write_realm_audit_logs(users)
# Now if we run again, we won't get any users, since they will have
# recent RealmAuditLog rows.
with mock.patch('zerver.lib.digest.queue_digest_recipient') as queue_mock:
with mock.patch('zerver.lib.digest.queue_digest_user_ids') as queue_mock:
_enqueue_emails_for_realm(realm, cutoff)
self.assertEqual(queue_mock.call_count, 0)
@ -313,10 +314,11 @@ class TestDigestEmailMessages(ZulipTestCase):
# Check that all users without an a UserActivityInterval entry are considered
# inactive users and get enqueued.
with mock.patch('zerver.lib.digest.queue_digest_recipient') as queue_mock:
with mock.patch('zerver.lib.digest.queue_digest_user_ids') as queue_mock:
_enqueue_emails_for_realm(realm, cutoff)
self.assertEqual(queue_mock.call_count, len(users))
num_queued_users = len(queue_mock.call_args[0][0])
self.assertEqual(num_queued_users, len(users))
for user in users:
last_visit = timezone_now() - datetime.timedelta(days=1)
@ -327,7 +329,7 @@ class TestDigestEmailMessages(ZulipTestCase):
)
# Now we expect no users, due to recent activity.
with mock.patch('zerver.lib.digest.queue_digest_recipient') as queue_mock:
with mock.patch('zerver.lib.digest.queue_digest_user_ids') as queue_mock:
_enqueue_emails_for_realm(realm, cutoff)
self.assertEqual(queue_mock.call_count, 0)
@ -336,10 +338,11 @@ class TestDigestEmailMessages(ZulipTestCase):
last_visit = timezone_now() - datetime.timedelta(days=7)
UserActivityInterval.objects.all().update(start=last_visit, end=last_visit)
with mock.patch('zerver.lib.digest.queue_digest_recipient') as queue_mock:
with mock.patch('zerver.lib.digest.queue_digest_user_ids') as queue_mock:
_enqueue_emails_for_realm(realm, cutoff)
self.assertEqual(queue_mock.call_count, len(users))
num_queued_users = len(queue_mock.call_args[0][0])
self.assertEqual(num_queued_users, len(users))
def tuesday(self) -> datetime.datetime:
return datetime.datetime(year=2016, month=1, day=5, tzinfo=datetime.timezone.utc)
@ -352,7 +355,7 @@ class TestDigestEmailMessages(ZulipTestCase):
cutoff = tuesday - datetime.timedelta(days=5)
with mock.patch("zerver.lib.digest.timezone_now", return_value=tuesday):
with mock.patch("zerver.lib.digest.queue_digest_recipient") as queue_mock:
with mock.patch("zerver.lib.digest.queue_digest_user_ids") as queue_mock:
enqueue_emails(cutoff)
queue_mock.assert_not_called()
@ -364,7 +367,7 @@ class TestDigestEmailMessages(ZulipTestCase):
cutoff = not_tuesday - datetime.timedelta(days=5)
with mock.patch("zerver.lib.digest.timezone_now", return_value=not_tuesday):
with mock.patch("zerver.lib.digest.queue_digest_recipient") as queue_mock:
with mock.patch("zerver.lib.digest.queue_digest_user_ids") as queue_mock:
enqueue_emails(cutoff)
queue_mock.assert_not_called()
@ -387,14 +390,16 @@ class TestDigestEmailMessages(ZulipTestCase):
)
# Check that bots are not sent emails
with mock.patch('zerver.lib.digest.queue_digest_recipient') as queue_mock:
with mock.patch('zerver.lib.digest.queue_digest_user_ids') as queue_mock:
_enqueue_emails_for_realm(realm, cutoff)
assert queue_mock.call_count >= 5
num_queued_users = len(queue_mock.call_args[0][0])
assert num_queued_users >= 5
for arg in queue_mock.call_args_list:
user_id = arg[0][0]
self.assertNotEqual(user_id, bot.id)
user_ids = arg[0][0]
for user_id in user_ids:
self.assertNotEqual(user_id, bot.id)
@override_settings(SEND_DIGEST_EMAILS=True)
def test_new_stream_link(self) -> None:

View File

@ -60,7 +60,7 @@ from zerver.lib.actions import (
from zerver.lib.bot_lib import EmbeddedBotHandler, EmbeddedBotQuitException, get_bot_handler
from zerver.lib.context_managers import lockfile
from zerver.lib.db import reset_queries
from zerver.lib.digest import handle_digest_email
from zerver.lib.digest import bulk_handle_digest_email
from zerver.lib.email_mirror import decode_stream_email_address, is_missed_message_address
from zerver.lib.email_mirror import process_message as mirror_email
from zerver.lib.email_mirror import rate_limit_mirror_by_realm
@ -613,7 +613,12 @@ class DigestWorker(QueueProcessingWorker): # nocoverage
# management command, not here.
def consume(self, event: Mapping[str, Any]) -> None:
logging.info("Received digest event: %s", event)
handle_digest_email(event["user_profile_id"], event["cutoff"])
if "user_ids" in event:
user_ids = event["user_ids"]
else:
# legacy code may have enqueued a single id
user_ids = [event["user_profile_id"]]
bulk_handle_digest_email(user_ids, event["cutoff"])
@assign_queue('email_mirror')
class MirrorWorker(QueueProcessingWorker):