topic: Use a single SQL statement to propagate message moves.

Rather than use `bulk_update()` to batch-move chunks of messages, use
a single SQL query to move the messages.  This is much more efficient
for large topic moves.  Since the `edit_history` field is not yet
JSON (see #26496) this requires that PostgreSQL cast the current data
into `jsonb`, append the new data (also cast to `jsonb`), and then
re-cast that as text.

For single-message moves, this _increases_ the SQL query count by one,
since we have to re-query for the updated data from the database after
the bulk update.  However, this is overall still a performance
improvement, which improves to 2x or 3x for larger topic moves.  Below
is a table of duration in seconds to run `do_update_message` to move a
topic to a new stream, based on messages in the topic, for before and
after this change:

| Topic size |  Before  |  After  |
| ---------- | -------- | ------- |
| 1          |   0.1036 |  0.0868 |
| 2          |   0.1108 |  0.0925 |
| 5          |   0.1139 |  0.0959 |
| 10         |   0.1218 |  0.0972 |
| 20         |   0.1310 |  0.1098 |
| 50         |   0.1759 |  0.1366 |
| 100        |   0.2307 |  0.1662 |
| 200        |   0.3880 |  0.2229 |
| 500        |   0.7676 |  0.4052 |
| 1000       |   1.3990 |  0.6848 |
| 2000       |   2.9706 |  1.3370 |
| 5000       |   7.5218 |  3.2882 |
| 10000      |  14.0272 |  5.4434 |
This commit is contained in:
Alex Vandiver 2023-09-26 15:48:34 +00:00 committed by Tim Abbott
parent 822131fef4
commit 5c96f94206
2 changed files with 104 additions and 37 deletions

View File

@ -3,12 +3,14 @@ from typing import Any, Dict, List, Optional, Set, Tuple
import orjson
from django.db import connection
from django.db.models import Q, QuerySet, Subquery
from django.db.models import F, Func, JSONField, Q, QuerySet, Subquery, TextField, Value
from django.db.models.functions import Cast
from sqlalchemy.sql import ColumnElement, column, func, literal
from sqlalchemy.types import Boolean, Text
from zerver.lib.request import REQ
from zerver.lib.types import EditHistoryEvent
from zerver.lib.utils import assert_is_not_none
from zerver.models import Message, Reaction, Stream, UserMessage, UserProfile
# Only use these constants for events.
@ -153,21 +155,16 @@ def update_messages_for_topic_edit(
edit_history_event: EditHistoryEvent,
last_edit_time: datetime,
) -> List[Message]:
propagate_query = Q(
recipient_id=old_stream.recipient_id,
# Uses index: zerver_message_realm_recipient_upper_subject
messages = Message.objects.filter(
realm_id=old_stream.realm_id,
recipient_id=assert_is_not_none(old_stream.recipient_id),
subject__iexact=orig_topic_name,
)
if propagate_mode == "change_all":
propagate_query = propagate_query & ~Q(id=edited_message.id)
messages = messages.exclude(id=edited_message.id)
if propagate_mode == "change_later":
propagate_query = propagate_query & Q(id__gt=edited_message.id)
# Uses index: zerver_message_realm_recipient_upper_subject
messages = Message.objects.filter(propagate_query, realm_id=old_stream.realm_id).select_related(
*Message.DEFAULT_SELECT_RELATED
)
update_fields = ["edit_history", "last_edit_time"]
messages = messages.filter(id__gt=edited_message.id)
if new_stream is not None:
# If we're moving the messages between streams, only move
@ -175,33 +172,62 @@ def update_messages_for_topic_edit(
# gain access to messages through moving them.
from zerver.lib.message import bulk_access_stream_messages_query
messages_list = list(bulk_access_stream_messages_query(acting_user, messages, old_stream))
messages = bulk_access_stream_messages_query(acting_user, messages, old_stream)
else:
# For single-message edits or topic moves within a stream, we
# allow moving history the user may not have access in order
# to keep topics together.
messages_list = list(messages)
pass
# The cached ORM objects are not changed by the upcoming
# messages.update(), and the remote cache update (done by the
# caller) requires the new value, so we manually update the
# objects in addition to sending a bulk query to the database.
update_fields: Dict[str, object] = {
"last_edit_time": last_edit_time,
# We cast the `edit_history` column to jsonb (defaulting NULL
# to `[]`), apply the `||` array concatenation operator to it,
# and cast the result back to text. See #26496 for making
# this column itself jsonb, which is a complicated migration.
#
# This equates to:
# "edit_history" = (
# (COALESCE("zerver_message"."edit_history", '[]'))::jsonb
# ||
# ( '[{ ..json event.. }]' )::jsonb
# )::text
"edit_history": Cast(
Func(
Cast(
Func(
F("edit_history"),
Value("[]"),
function="COALESCE",
),
JSONField(),
),
Cast(
Value(orjson.dumps([edit_history_event]).decode()),
JSONField(),
),
function="",
arg_joiner=" || ",
),
TextField(),
),
}
if new_stream is not None:
update_fields.append("recipient")
for m in messages_list:
assert new_stream.recipient is not None
m.recipient = new_stream.recipient
update_fields["recipient"] = new_stream.recipient
if topic_name is not None:
update_fields.append("subject")
for m in messages_list:
m.set_topic_name(topic_name)
update_fields["subject"] = topic_name
for message in messages_list:
update_edit_history(message, last_edit_time, edit_history_event)
# The update will cause the 'messages' query to no longer match
# any rows; we capture the set of matching ids first, do the
# update, and then return a fresh collection -- so we know their
# metadata has been updated for the UPDATE command, and the caller
# can update the remote cache with that.
message_ids = list(messages.values_list("id", flat=True))
messages.update(**update_fields)
Message.objects.bulk_update(messages_list, update_fields, batch_size=100)
return messages_list
return list(
Message.objects.filter(id__in=message_ids).select_related(*Message.DEFAULT_SELECT_RELATED)
)
def generate_topic_history_from_db_rows(rows: List[Tuple[str, int]]) -> List[Dict[str, Any]]:

View File

@ -1431,7 +1431,7 @@ class EditMessageTest(EditMessageTestCase):
# state + 1/user with a UserTopic row for the events data)
# beyond what is typical were there not UserTopic records to
# update. Ideally, we'd eliminate the per-user component.
with self.assert_database_query_count(23):
with self.assert_database_query_count(24):
check_update_message(
user_profile=hamlet,
message_id=message_id,
@ -1528,7 +1528,7 @@ class EditMessageTest(EditMessageTestCase):
set_topic_visibility_policy(desdemona, muted_topics, UserTopic.VisibilityPolicy.MUTED)
set_topic_visibility_policy(cordelia, muted_topics, UserTopic.VisibilityPolicy.MUTED)
with self.assert_database_query_count(28):
with self.assert_database_query_count(29):
check_update_message(
user_profile=desdemona,
message_id=message_id,
@ -1559,7 +1559,7 @@ class EditMessageTest(EditMessageTestCase):
set_topic_visibility_policy(desdemona, muted_topics, UserTopic.VisibilityPolicy.MUTED)
set_topic_visibility_policy(cordelia, muted_topics, UserTopic.VisibilityPolicy.MUTED)
with self.assert_database_query_count(33):
with self.assert_database_query_count(34):
check_update_message(
user_profile=desdemona,
message_id=message_id,
@ -1592,7 +1592,7 @@ class EditMessageTest(EditMessageTestCase):
set_topic_visibility_policy(desdemona, muted_topics, UserTopic.VisibilityPolicy.MUTED)
set_topic_visibility_policy(cordelia, muted_topics, UserTopic.VisibilityPolicy.MUTED)
with self.assert_database_query_count(28):
with self.assert_database_query_count(29):
check_update_message(
user_profile=desdemona,
message_id=message_id,
@ -1615,7 +1615,7 @@ class EditMessageTest(EditMessageTestCase):
second_message_id = self.send_stream_message(
hamlet, stream_name, topic_name="changed topic name", content="Second message"
)
with self.assert_database_query_count(23):
with self.assert_database_query_count(24):
check_update_message(
user_profile=desdemona,
message_id=second_message_id,
@ -1714,7 +1714,7 @@ class EditMessageTest(EditMessageTestCase):
users_to_be_notified_via_muted_topics_event.append(user_topic.user_profile_id)
change_all_topic_name = "Topic 1 edited"
with self.assert_database_query_count(28):
with self.assert_database_query_count(29):
check_update_message(
user_profile=hamlet,
message_id=message_id,
@ -3783,7 +3783,7 @@ class EditMessageTest(EditMessageTestCase):
"iago", "test move stream", "new stream", "test"
)
with self.assert_database_query_count(52), self.assert_memcached_count(14):
with self.assert_database_query_count(53), self.assert_memcached_count(14):
result = self.client_patch(
f"/json/messages/{msg_id}",
{
@ -3809,6 +3809,47 @@ class EditMessageTest(EditMessageTestCase):
)
self.assert_json_success(result)
def test_move_many_messages_to_stream_and_topic(self) -> None:
(user_profile, old_stream, new_stream, msg_id, msg_id_later) = self.prepare_move_topics(
"iago", "first origin stream", "first destination stream", "first topic"
)
with queries_captured() as queries:
result = self.client_patch(
f"/json/messages/{msg_id}",
{
"propagate_mode": "change_all",
"send_notification_to_old_thread": "true",
"stream_id": new_stream.id,
"topic": "first topic",
},
)
self.assert_json_success(result)
# Adding more messages should not increase the number of
# queries
(user_profile, old_stream, new_stream, msg_id, msg_id_later) = self.prepare_move_topics(
"iago", "second origin stream", "second destination stream", "second topic"
)
for i in range(1, 5):
self.send_stream_message(
user_profile,
"second origin stream",
topic_name="second topic",
content=f"Extra message {i}",
)
with self.assert_database_query_count(len(queries)):
result = self.client_patch(
f"/json/messages/{msg_id}",
{
"propagate_mode": "change_all",
"send_notification_to_old_thread": "true",
"stream_id": new_stream.id,
"topic": "second topic",
},
)
self.assert_json_success(result)
def test_inaccessible_msg_after_stream_change(self) -> None:
"""Simulates the case where message is moved to a stream where user is not a subscribed"""
(user_profile, old_stream, new_stream, msg_id, msg_id_lt) = self.prepare_move_topics(