queue_processors: Process user_activity in one query.

This leads to significant speedups.  In a test, with 100 random unique
event classes, the old code processed a batch of 100 rows (on average
66-ish unique in the batch) in 0.45 seconds.  Doing this in a single
query processes the same batch in 0.0076 seconds.
This commit is contained in:
Alex Vandiver 2024-01-22 20:52:11 +00:00 committed by Tim Abbott
parent 1249b5929e
commit 37fa181e5f
4 changed files with 172 additions and 53 deletions

View File

@ -2,7 +2,7 @@ from datetime import datetime
from zerver.lib.queue import queue_json_publish from zerver.lib.queue import queue_json_publish
from zerver.lib.timestamp import datetime_to_timestamp from zerver.lib.timestamp import datetime_to_timestamp
from zerver.models import UserActivity, UserActivityInterval, UserProfile from zerver.models import UserActivityInterval, UserProfile
def do_update_user_activity_interval(user_profile: UserProfile, log_time: datetime) -> None: def do_update_user_activity_interval(user_profile: UserProfile, log_time: datetime) -> None:
@ -29,22 +29,6 @@ def do_update_user_activity_interval(user_profile: UserProfile, log_time: dateti
) )
def do_update_user_activity(
user_profile_id: int, client_id: int, query: str, count: int, log_time: datetime
) -> None:
(activity, created) = UserActivity.objects.get_or_create(
user_profile_id=user_profile_id,
client_id=client_id,
query=query,
defaults={"last_visit": log_time, "count": count},
)
if not created:
activity.count += count
activity.last_visit = log_time
activity.save(update_fields=["last_visit", "count"])
def update_user_activity_interval(user_profile: UserProfile, log_time: datetime) -> None: def update_user_activity_interval(user_profile: UserProfile, log_time: datetime) -> None:
event = {"user_profile_id": user_profile.id, "time": datetime_to_timestamp(log_time)} event = {"user_profile_id": user_profile.id, "time": datetime_to_timestamp(log_time)}
queue_json_publish("user_activity_interval", event) queue_json_publish("user_activity_interval", event)

View File

@ -33,7 +33,7 @@ from zerver.actions.realm_settings import (
do_set_realm_authentication_methods, do_set_realm_authentication_methods,
) )
from zerver.actions.scheduled_messages import check_schedule_message from zerver.actions.scheduled_messages import check_schedule_message
from zerver.actions.user_activity import do_update_user_activity, do_update_user_activity_interval from zerver.actions.user_activity import do_update_user_activity_interval
from zerver.actions.user_status import do_update_user_status from zerver.actions.user_status import do_update_user_status
from zerver.actions.user_topics import do_set_user_topic_visibility_policy from zerver.actions.user_topics import do_set_user_topic_visibility_policy
from zerver.actions.users import do_deactivate_user from zerver.actions.users import do_deactivate_user
@ -76,6 +76,7 @@ from zerver.models import (
ScheduledMessage, ScheduledMessage,
Stream, Stream,
Subscription, Subscription,
UserActivity,
UserGroup, UserGroup,
UserGroupMembership, UserGroupMembership,
UserMessage, UserMessage,
@ -1862,9 +1863,20 @@ class SingleUserExportTest(ExportFile):
stream_id = last_recipient.type_id stream_id = last_recipient.type_id
self.assertEqual(stream_id, get_stream("Scotland", realm).id) self.assertEqual(stream_id, get_stream("Scotland", realm).id)
do_update_user_activity(cordelia.id, client.id, "/some/endpoint", 2, now) UserActivity.objects.create(
do_update_user_activity(cordelia.id, client.id, "/some/endpoint", 3, now) user_profile_id=cordelia.id,
do_update_user_activity(othello.id, client.id, "/bogus", 20, now) client_id=client.id,
query="/some/endpoint",
count=5,
last_visit=now,
)
UserActivity.objects.create(
user_profile_id=othello.id,
client_id=client.id,
query="/bogus",
count=20,
last_visit=now,
)
@checker @checker
def zerver_useractivity(records: List[Record]) -> None: def zerver_useractivity(records: List[Record]) -> None:

View File

@ -80,37 +80,143 @@ def simulated_queue_client(client: FakeClient) -> Iterator[None]:
class WorkerTest(ZulipTestCase): class WorkerTest(ZulipTestCase):
def test_UserActivityWorker(self) -> None: def test_useractivity_worker(self) -> None:
fake_client = FakeClient() fake_client = FakeClient()
user = self.example_user("hamlet") user = self.example_user("hamlet")
other_user = self.example_user("iago")
UserActivity.objects.filter( UserActivity.objects.filter(
user_profile=user.id, user_profile__in=[user.id, other_user.id],
client=get_client("ios"),
).delete() ).delete()
data = dict( # Enqueue two events for the same user/client/query
user_profile_id=user.id, now = time.time()
client_id=get_client("ios").id, for event_time in [now, now + 10]:
time=time.time(), fake_client.enqueue(
query="send_message", "user_activity",
dict(
user_profile_id=user.id,
client_id=get_client("ios").id,
time=event_time,
query="send_message",
),
)
# And a third event for a different query
fake_client.enqueue(
"user_activity",
dict(
user_profile_id=user.id,
client_id=get_client("ios").id,
time=now + 15,
query="get_events",
),
)
# One for a different client
fake_client.enqueue(
"user_activity",
dict(
user_profile_id=user.id,
client_id=get_client("website").id,
time=now + 20,
query="get_events",
),
) )
fake_client.enqueue("user_activity", data)
# Now process the event a second time and confirm count goes # And one for a different user
# up. Ideally, we'd use an event with a slightly newer fake_client.enqueue(
# time, but it's not really important. "user_activity",
fake_client.enqueue("user_activity", data) dict(
user_profile_id=other_user.id,
client_id=get_client("ios").id,
time=now + 25,
query="get_events",
),
)
# Run the worker; this will produce a single upsert statement
with simulated_queue_client(fake_client): with simulated_queue_client(fake_client):
worker = queue_processors.UserActivityWorker() worker = queue_processors.UserActivityWorker()
worker.setup() worker.setup()
worker.start() with self.assert_database_query_count(1):
activity_records = UserActivity.objects.filter( worker.start()
user_profile=user.id,
client=get_client("ios"), activity_records = UserActivity.objects.filter(
user_profile__in=[user.id, other_user.id],
).order_by("last_visit")
self.assert_length(activity_records, 4)
self.assertEqual(activity_records[0].query, "send_message")
self.assertEqual(activity_records[0].client.name, "ios")
self.assertEqual(activity_records[0].count, 2)
self.assertEqual(
activity_records[0].last_visit, datetime.fromtimestamp(now + 10, tz=timezone.utc)
)
self.assertEqual(activity_records[1].query, "get_events")
self.assertEqual(activity_records[1].client.name, "ios")
self.assertEqual(activity_records[1].count, 1)
self.assertEqual(
activity_records[1].last_visit, datetime.fromtimestamp(now + 15, tz=timezone.utc)
)
self.assertEqual(activity_records[2].query, "get_events")
self.assertEqual(activity_records[2].client.name, "website")
self.assertEqual(activity_records[2].count, 1)
self.assertEqual(
activity_records[2].last_visit, datetime.fromtimestamp(now + 20, tz=timezone.utc)
)
self.assertEqual(activity_records[3].query, "get_events")
self.assertEqual(activity_records[3].user_profile, other_user)
self.assertEqual(activity_records[3].count, 1)
self.assertEqual(
activity_records[3].last_visit, datetime.fromtimestamp(now + 25, tz=timezone.utc)
)
# Add 3 more events which stack atop the existing send_message
# to test the update part, and a new "home_real" event to show
# the insert part of the upsert.
for event_time in [now + 30, now + 35, now + 40]:
fake_client.enqueue(
"user_activity",
dict(
user_profile_id=user.id,
client_id=get_client("ios").id,
time=event_time,
query="send_message",
),
) )
self.assert_length(activity_records, 1) fake_client.enqueue(
self.assertEqual(activity_records[0].count, 2) "user_activity",
dict(
user_profile_id=user.id,
client_id=get_client("ios").id,
time=now + 45,
query="home_real",
),
)
# Run the worker again; this will insert one row and update the other
with simulated_queue_client(fake_client):
worker = queue_processors.UserActivityWorker()
worker.setup()
with self.assert_database_query_count(1):
worker.start()
activity_records = UserActivity.objects.filter(
user_profile__in=[user.id, other_user.id],
).order_by("last_visit")
self.assert_length(activity_records, 5)
self.assertEqual(activity_records[3].query, "send_message")
self.assertEqual(activity_records[3].client.name, "ios")
self.assertEqual(activity_records[3].count, 5)
self.assertEqual(
activity_records[3].last_visit, datetime.fromtimestamp(now + 40, tz=timezone.utc)
)
self.assertEqual(activity_records[4].query, "home_real")
self.assertEqual(activity_records[4].client.name, "ios")
self.assertEqual(activity_records[4].count, 1)
self.assertEqual(
activity_records[4].last_visit, datetime.fromtimestamp(now + 45, tz=timezone.utc)
)
def test_missed_message_worker(self) -> None: def test_missed_message_worker(self) -> None:
cordelia = self.example_user("cordelia") cordelia = self.example_user("cordelia")

View File

@ -42,6 +42,7 @@ from django.db.utils import IntegrityError
from django.utils.timezone import now as timezone_now from django.utils.timezone import now as timezone_now
from django.utils.translation import gettext as _ from django.utils.translation import gettext as _
from django.utils.translation import override as override_language from django.utils.translation import override as override_language
from psycopg2.sql import SQL, Literal
from returns.curry import partial from returns.curry import partial
from sentry_sdk import add_breadcrumb, configure_scope from sentry_sdk import add_breadcrumb, configure_scope
from typing_extensions import override from typing_extensions import override
@ -53,7 +54,7 @@ from zerver.actions.message_flags import do_mark_stream_messages_as_read
from zerver.actions.message_send import internal_send_private_message, render_incoming_message from zerver.actions.message_send import internal_send_private_message, render_incoming_message
from zerver.actions.presence import do_update_user_presence from zerver.actions.presence import do_update_user_presence
from zerver.actions.realm_export import notify_realm_export from zerver.actions.realm_export import notify_realm_export
from zerver.actions.user_activity import do_update_user_activity, do_update_user_activity_interval from zerver.actions.user_activity import do_update_user_activity_interval
from zerver.context_processors import common_context from zerver.context_processors import common_context
from zerver.lib.bot_lib import EmbeddedBotHandler, EmbeddedBotQuitError, get_bot_handler from zerver.lib.bot_lib import EmbeddedBotHandler, EmbeddedBotQuitError, get_bot_handler
from zerver.lib.context_managers import lockfile from zerver.lib.context_managers import lockfile
@ -541,19 +542,35 @@ class UserActivityWorker(LoopQueueProcessingWorker):
if key_tuple not in uncommitted_events: if key_tuple not in uncommitted_events:
uncommitted_events[key_tuple] = (1, event["time"]) uncommitted_events[key_tuple] = (1, event["time"])
else: else:
count, time = uncommitted_events[key_tuple] count, event_time = uncommitted_events[key_tuple]
uncommitted_events[key_tuple] = (count + 1, max(time, event["time"])) uncommitted_events[key_tuple] = (count + 1, max(event_time, event["time"]))
# Then we insert the updates into the database. rows = []
# for key_tuple, value_tuple in uncommitted_events.items():
# TODO: Doing these updates in sequence individually is likely user_profile_id, client_id, query = key_tuple
# inefficient; the idealized version would do some sort of count, event_time = value_tuple
# bulk insert_or_update query. rows.append(
for key_tuple in uncommitted_events: SQL("({},{},{},{},to_timestamp({}))").format(
(user_profile_id, client_id, query) = key_tuple Literal(user_profile_id),
count, time = uncommitted_events[key_tuple] Literal(client_id),
log_time = timestamp_to_datetime(time) Literal(query),
do_update_user_activity(user_profile_id, client_id, query, count, log_time) Literal(count),
Literal(event_time),
)
)
# Perform a single bulk UPSERT for all of the rows
sql_query = SQL(
"""
INSERT INTO zerver_useractivity(user_profile_id, client_id, query, count, last_visit)
VALUES {rows}
ON CONFLICT (user_profile_id, client_id, query) DO UPDATE SET
count = zerver_useractivity.count + excluded.count,
last_visit = greatest(zerver_useractivity.last_visit, excluded.last_visit)
"""
).format(rows=SQL(", ").join(rows))
with connection.cursor() as cursor:
cursor.execute(sql_query)
@assign_queue("user_activity_interval") @assign_queue("user_activity_interval")