From bfa0bdf3d665cb2d5ac7bebda83c6db9b56ee08e Mon Sep 17 00:00:00 2001 From: Steve Howell Date: Fri, 13 Nov 2020 17:13:13 +0000 Subject: [PATCH] email digests: Process users in chunks of 30. This should make the queue empty more quickly, because we do bulk queries to prevent database hops. --- zerver/lib/digest.py | 20 ++++++++++++------- zerver/tests/test_digest.py | 33 ++++++++++++++++++------------- zerver/worker/queue_processors.py | 9 +++++++-- 3 files changed, 39 insertions(+), 23 deletions(-) diff --git a/zerver/lib/digest.py b/zerver/lib/digest.py index 3dba9815a8..ba81b708f4 100644 --- a/zerver/lib/digest.py +++ b/zerver/lib/digest.py @@ -85,10 +85,10 @@ def should_process_digest(realm_str: str) -> bool: # Changes to this should also be reflected in # zerver/worker/queue_processors.py:DigestWorker.consume() -def queue_digest_recipient(user_id: int, cutoff: datetime.datetime) -> None: +def queue_digest_user_ids(user_ids: List[int], cutoff: datetime.datetime) -> None: # Convert cutoff to epoch seconds for transit. event = { - "user_profile_id": user_id, + "user_profile_id": user_ids, "cutoff": cutoff.strftime('%s') } queue_json_publish("digest_emails", event) @@ -127,13 +127,19 @@ def _enqueue_emails_for_realm(realm: Realm, cutoff: datetime.datetime) -> None: end__gt=cutoff, ).values_list('user_profile_id', flat=True).distinct()) - user_ids = realm_user_ids - active_user_ids + user_ids = list(realm_user_ids - active_user_ids) + user_ids.sort() - for user_id in user_ids: - queue_digest_recipient(user_id, cutoff) + # We process batches of 30. We want a big enough batch + # to amorize work, but not so big that a single item + # from the queue takes too long to process. + chunk_size = 30 + for i in range(0, len(user_ids), chunk_size): + chunk_user_ids = user_ids[i:i + chunk_size] + queue_digest_user_ids(chunk_user_ids, cutoff) logger.info( - "User %s is inactive, queuing for potential digest", - user_id, + "Queuing user_ids for potential digest: %s", + chunk_user_ids, ) def get_recent_topics( diff --git a/zerver/tests/test_digest.py b/zerver/tests/test_digest.py index a80eca92b1..ae2807f934 100644 --- a/zerver/tests/test_digest.py +++ b/zerver/tests/test_digest.py @@ -283,19 +283,20 @@ class TestDigestEmailMessages(ZulipTestCase): cutoff = timezone_now() - datetime.timedelta(days=5) - with mock.patch('zerver.lib.digest.queue_digest_recipient') as queue_mock: + with mock.patch('zerver.lib.digest.queue_digest_user_ids') as queue_mock: _enqueue_emails_for_realm(realm, cutoff) users = self.active_human_users(realm) - self.assertEqual(queue_mock.call_count, len(users)) + num_queued_users = len(queue_mock.call_args[0][0]) + self.assertEqual(num_queued_users, len(users)) # Simulate that we have sent digests for all our users. bulk_write_realm_audit_logs(users) # Now if we run again, we won't get any users, since they will have # recent RealmAuditLog rows. - with mock.patch('zerver.lib.digest.queue_digest_recipient') as queue_mock: + with mock.patch('zerver.lib.digest.queue_digest_user_ids') as queue_mock: _enqueue_emails_for_realm(realm, cutoff) self.assertEqual(queue_mock.call_count, 0) @@ -313,10 +314,11 @@ class TestDigestEmailMessages(ZulipTestCase): # Check that all users without an a UserActivityInterval entry are considered # inactive users and get enqueued. - with mock.patch('zerver.lib.digest.queue_digest_recipient') as queue_mock: + with mock.patch('zerver.lib.digest.queue_digest_user_ids') as queue_mock: _enqueue_emails_for_realm(realm, cutoff) - self.assertEqual(queue_mock.call_count, len(users)) + num_queued_users = len(queue_mock.call_args[0][0]) + self.assertEqual(num_queued_users, len(users)) for user in users: last_visit = timezone_now() - datetime.timedelta(days=1) @@ -327,7 +329,7 @@ class TestDigestEmailMessages(ZulipTestCase): ) # Now we expect no users, due to recent activity. - with mock.patch('zerver.lib.digest.queue_digest_recipient') as queue_mock: + with mock.patch('zerver.lib.digest.queue_digest_user_ids') as queue_mock: _enqueue_emails_for_realm(realm, cutoff) self.assertEqual(queue_mock.call_count, 0) @@ -336,10 +338,11 @@ class TestDigestEmailMessages(ZulipTestCase): last_visit = timezone_now() - datetime.timedelta(days=7) UserActivityInterval.objects.all().update(start=last_visit, end=last_visit) - with mock.patch('zerver.lib.digest.queue_digest_recipient') as queue_mock: + with mock.patch('zerver.lib.digest.queue_digest_user_ids') as queue_mock: _enqueue_emails_for_realm(realm, cutoff) - self.assertEqual(queue_mock.call_count, len(users)) + num_queued_users = len(queue_mock.call_args[0][0]) + self.assertEqual(num_queued_users, len(users)) def tuesday(self) -> datetime.datetime: return datetime.datetime(year=2016, month=1, day=5, tzinfo=datetime.timezone.utc) @@ -352,7 +355,7 @@ class TestDigestEmailMessages(ZulipTestCase): cutoff = tuesday - datetime.timedelta(days=5) with mock.patch("zerver.lib.digest.timezone_now", return_value=tuesday): - with mock.patch("zerver.lib.digest.queue_digest_recipient") as queue_mock: + with mock.patch("zerver.lib.digest.queue_digest_user_ids") as queue_mock: enqueue_emails(cutoff) queue_mock.assert_not_called() @@ -364,7 +367,7 @@ class TestDigestEmailMessages(ZulipTestCase): cutoff = not_tuesday - datetime.timedelta(days=5) with mock.patch("zerver.lib.digest.timezone_now", return_value=not_tuesday): - with mock.patch("zerver.lib.digest.queue_digest_recipient") as queue_mock: + with mock.patch("zerver.lib.digest.queue_digest_user_ids") as queue_mock: enqueue_emails(cutoff) queue_mock.assert_not_called() @@ -387,14 +390,16 @@ class TestDigestEmailMessages(ZulipTestCase): ) # Check that bots are not sent emails - with mock.patch('zerver.lib.digest.queue_digest_recipient') as queue_mock: + with mock.patch('zerver.lib.digest.queue_digest_user_ids') as queue_mock: _enqueue_emails_for_realm(realm, cutoff) - assert queue_mock.call_count >= 5 + num_queued_users = len(queue_mock.call_args[0][0]) + assert num_queued_users >= 5 for arg in queue_mock.call_args_list: - user_id = arg[0][0] - self.assertNotEqual(user_id, bot.id) + user_ids = arg[0][0] + for user_id in user_ids: + self.assertNotEqual(user_id, bot.id) @override_settings(SEND_DIGEST_EMAILS=True) def test_new_stream_link(self) -> None: diff --git a/zerver/worker/queue_processors.py b/zerver/worker/queue_processors.py index ea03a72dfd..3a7356f502 100644 --- a/zerver/worker/queue_processors.py +++ b/zerver/worker/queue_processors.py @@ -60,7 +60,7 @@ from zerver.lib.actions import ( from zerver.lib.bot_lib import EmbeddedBotHandler, EmbeddedBotQuitException, get_bot_handler from zerver.lib.context_managers import lockfile from zerver.lib.db import reset_queries -from zerver.lib.digest import handle_digest_email +from zerver.lib.digest import bulk_handle_digest_email from zerver.lib.email_mirror import decode_stream_email_address, is_missed_message_address from zerver.lib.email_mirror import process_message as mirror_email from zerver.lib.email_mirror import rate_limit_mirror_by_realm @@ -613,7 +613,12 @@ class DigestWorker(QueueProcessingWorker): # nocoverage # management command, not here. def consume(self, event: Mapping[str, Any]) -> None: logging.info("Received digest event: %s", event) - handle_digest_email(event["user_profile_id"], event["cutoff"]) + if "user_ids" in event: + user_ids = event["user_ids"] + else: + # legacy code may have enqueued a single id + user_ids = [event["user_profile_id"]] + bulk_handle_digest_email(user_ids, event["cutoff"]) @assign_queue('email_mirror') class MirrorWorker(QueueProcessingWorker):