From 5c96f942060ee9235ee7aee342a7289764e3f31a Mon Sep 17 00:00:00 2001 From: Alex Vandiver Date: Tue, 26 Sep 2023 15:48:34 +0000 Subject: [PATCH] topic: Use a single SQL statement to propagate message moves. Rather than use `bulk_update()` to batch-move chunks of messages, use a single SQL query to move the messages. This is much more efficient for large topic moves. Since the `edit_history` field is not yet JSON (see #26496) this requires that PostgreSQL cast the current data into `jsonb`, append the new data (also cast to `jsonb`), and then re-cast that as text. For single-message moves, this _increases_ the SQL query count by one, since we have to re-query for the updated data from the database after the bulk update. However, this is overall still a performance improvement, which improves to 2x or 3x for larger topic moves. Below is a table of duration in seconds to run `do_update_message` to move a topic to a new stream, based on messages in the topic, for before and after this change: | Topic size | Before | After | | ---------- | -------- | ------- | | 1 | 0.1036 | 0.0868 | | 2 | 0.1108 | 0.0925 | | 5 | 0.1139 | 0.0959 | | 10 | 0.1218 | 0.0972 | | 20 | 0.1310 | 0.1098 | | 50 | 0.1759 | 0.1366 | | 100 | 0.2307 | 0.1662 | | 200 | 0.3880 | 0.2229 | | 500 | 0.7676 | 0.4052 | | 1000 | 1.3990 | 0.6848 | | 2000 | 2.9706 | 1.3370 | | 5000 | 7.5218 | 3.2882 | | 10000 | 14.0272 | 5.4434 | --- zerver/lib/topic.py | 86 ++++++++++++++++++++----------- zerver/tests/test_message_edit.py | 55 +++++++++++++++++--- 2 files changed, 104 insertions(+), 37 deletions(-) diff --git a/zerver/lib/topic.py b/zerver/lib/topic.py index 0843528ca3..a6ee0d028c 100644 --- a/zerver/lib/topic.py +++ b/zerver/lib/topic.py @@ -3,12 +3,14 @@ from typing import Any, Dict, List, Optional, Set, Tuple import orjson from django.db import connection -from django.db.models import Q, QuerySet, Subquery +from django.db.models import F, Func, JSONField, Q, QuerySet, Subquery, TextField, Value +from django.db.models.functions import Cast from sqlalchemy.sql import ColumnElement, column, func, literal from sqlalchemy.types import Boolean, Text from zerver.lib.request import REQ from zerver.lib.types import EditHistoryEvent +from zerver.lib.utils import assert_is_not_none from zerver.models import Message, Reaction, Stream, UserMessage, UserProfile # Only use these constants for events. @@ -153,21 +155,16 @@ def update_messages_for_topic_edit( edit_history_event: EditHistoryEvent, last_edit_time: datetime, ) -> List[Message]: - propagate_query = Q( - recipient_id=old_stream.recipient_id, + # Uses index: zerver_message_realm_recipient_upper_subject + messages = Message.objects.filter( + realm_id=old_stream.realm_id, + recipient_id=assert_is_not_none(old_stream.recipient_id), subject__iexact=orig_topic_name, ) if propagate_mode == "change_all": - propagate_query = propagate_query & ~Q(id=edited_message.id) + messages = messages.exclude(id=edited_message.id) if propagate_mode == "change_later": - propagate_query = propagate_query & Q(id__gt=edited_message.id) - - # Uses index: zerver_message_realm_recipient_upper_subject - messages = Message.objects.filter(propagate_query, realm_id=old_stream.realm_id).select_related( - *Message.DEFAULT_SELECT_RELATED - ) - - update_fields = ["edit_history", "last_edit_time"] + messages = messages.filter(id__gt=edited_message.id) if new_stream is not None: # If we're moving the messages between streams, only move @@ -175,33 +172,62 @@ def update_messages_for_topic_edit( # gain access to messages through moving them. from zerver.lib.message import bulk_access_stream_messages_query - messages_list = list(bulk_access_stream_messages_query(acting_user, messages, old_stream)) + messages = bulk_access_stream_messages_query(acting_user, messages, old_stream) else: # For single-message edits or topic moves within a stream, we # allow moving history the user may not have access in order # to keep topics together. - messages_list = list(messages) + pass - # The cached ORM objects are not changed by the upcoming - # messages.update(), and the remote cache update (done by the - # caller) requires the new value, so we manually update the - # objects in addition to sending a bulk query to the database. + update_fields: Dict[str, object] = { + "last_edit_time": last_edit_time, + # We cast the `edit_history` column to jsonb (defaulting NULL + # to `[]`), apply the `||` array concatenation operator to it, + # and cast the result back to text. See #26496 for making + # this column itself jsonb, which is a complicated migration. + # + # This equates to: + # "edit_history" = ( + # (COALESCE("zerver_message"."edit_history", '[]'))::jsonb + # || + # ( '[{ ..json event.. }]' )::jsonb + # )::text + "edit_history": Cast( + Func( + Cast( + Func( + F("edit_history"), + Value("[]"), + function="COALESCE", + ), + JSONField(), + ), + Cast( + Value(orjson.dumps([edit_history_event]).decode()), + JSONField(), + ), + function="", + arg_joiner=" || ", + ), + TextField(), + ), + } if new_stream is not None: - update_fields.append("recipient") - for m in messages_list: - assert new_stream.recipient is not None - m.recipient = new_stream.recipient + update_fields["recipient"] = new_stream.recipient if topic_name is not None: - update_fields.append("subject") - for m in messages_list: - m.set_topic_name(topic_name) + update_fields["subject"] = topic_name - for message in messages_list: - update_edit_history(message, last_edit_time, edit_history_event) + # The update will cause the 'messages' query to no longer match + # any rows; we capture the set of matching ids first, do the + # update, and then return a fresh collection -- so we know their + # metadata has been updated for the UPDATE command, and the caller + # can update the remote cache with that. + message_ids = list(messages.values_list("id", flat=True)) + messages.update(**update_fields) - Message.objects.bulk_update(messages_list, update_fields, batch_size=100) - - return messages_list + return list( + Message.objects.filter(id__in=message_ids).select_related(*Message.DEFAULT_SELECT_RELATED) + ) def generate_topic_history_from_db_rows(rows: List[Tuple[str, int]]) -> List[Dict[str, Any]]: diff --git a/zerver/tests/test_message_edit.py b/zerver/tests/test_message_edit.py index 200f3032d6..df85b40c75 100644 --- a/zerver/tests/test_message_edit.py +++ b/zerver/tests/test_message_edit.py @@ -1431,7 +1431,7 @@ class EditMessageTest(EditMessageTestCase): # state + 1/user with a UserTopic row for the events data) # beyond what is typical were there not UserTopic records to # update. Ideally, we'd eliminate the per-user component. - with self.assert_database_query_count(23): + with self.assert_database_query_count(24): check_update_message( user_profile=hamlet, message_id=message_id, @@ -1528,7 +1528,7 @@ class EditMessageTest(EditMessageTestCase): set_topic_visibility_policy(desdemona, muted_topics, UserTopic.VisibilityPolicy.MUTED) set_topic_visibility_policy(cordelia, muted_topics, UserTopic.VisibilityPolicy.MUTED) - with self.assert_database_query_count(28): + with self.assert_database_query_count(29): check_update_message( user_profile=desdemona, message_id=message_id, @@ -1559,7 +1559,7 @@ class EditMessageTest(EditMessageTestCase): set_topic_visibility_policy(desdemona, muted_topics, UserTopic.VisibilityPolicy.MUTED) set_topic_visibility_policy(cordelia, muted_topics, UserTopic.VisibilityPolicy.MUTED) - with self.assert_database_query_count(33): + with self.assert_database_query_count(34): check_update_message( user_profile=desdemona, message_id=message_id, @@ -1592,7 +1592,7 @@ class EditMessageTest(EditMessageTestCase): set_topic_visibility_policy(desdemona, muted_topics, UserTopic.VisibilityPolicy.MUTED) set_topic_visibility_policy(cordelia, muted_topics, UserTopic.VisibilityPolicy.MUTED) - with self.assert_database_query_count(28): + with self.assert_database_query_count(29): check_update_message( user_profile=desdemona, message_id=message_id, @@ -1615,7 +1615,7 @@ class EditMessageTest(EditMessageTestCase): second_message_id = self.send_stream_message( hamlet, stream_name, topic_name="changed topic name", content="Second message" ) - with self.assert_database_query_count(23): + with self.assert_database_query_count(24): check_update_message( user_profile=desdemona, message_id=second_message_id, @@ -1714,7 +1714,7 @@ class EditMessageTest(EditMessageTestCase): users_to_be_notified_via_muted_topics_event.append(user_topic.user_profile_id) change_all_topic_name = "Topic 1 edited" - with self.assert_database_query_count(28): + with self.assert_database_query_count(29): check_update_message( user_profile=hamlet, message_id=message_id, @@ -3783,7 +3783,7 @@ class EditMessageTest(EditMessageTestCase): "iago", "test move stream", "new stream", "test" ) - with self.assert_database_query_count(52), self.assert_memcached_count(14): + with self.assert_database_query_count(53), self.assert_memcached_count(14): result = self.client_patch( f"/json/messages/{msg_id}", { @@ -3809,6 +3809,47 @@ class EditMessageTest(EditMessageTestCase): ) self.assert_json_success(result) + def test_move_many_messages_to_stream_and_topic(self) -> None: + (user_profile, old_stream, new_stream, msg_id, msg_id_later) = self.prepare_move_topics( + "iago", "first origin stream", "first destination stream", "first topic" + ) + + with queries_captured() as queries: + result = self.client_patch( + f"/json/messages/{msg_id}", + { + "propagate_mode": "change_all", + "send_notification_to_old_thread": "true", + "stream_id": new_stream.id, + "topic": "first topic", + }, + ) + self.assert_json_success(result) + + # Adding more messages should not increase the number of + # queries + (user_profile, old_stream, new_stream, msg_id, msg_id_later) = self.prepare_move_topics( + "iago", "second origin stream", "second destination stream", "second topic" + ) + for i in range(1, 5): + self.send_stream_message( + user_profile, + "second origin stream", + topic_name="second topic", + content=f"Extra message {i}", + ) + with self.assert_database_query_count(len(queries)): + result = self.client_patch( + f"/json/messages/{msg_id}", + { + "propagate_mode": "change_all", + "send_notification_to_old_thread": "true", + "stream_id": new_stream.id, + "topic": "second topic", + }, + ) + self.assert_json_success(result) + def test_inaccessible_msg_after_stream_change(self) -> None: """Simulates the case where message is moved to a stream where user is not a subscribed""" (user_profile, old_stream, new_stream, msg_id, msg_id_lt) = self.prepare_move_topics(