mirror of https://github.com/zulip/zulip.git
topic: Use a single SQL statement to propagate message moves.
Rather than use `bulk_update()` to batch-move chunks of messages, use a single SQL query to move the messages. This is much more efficient for large topic moves. Since the `edit_history` field is not yet JSON (see #26496) this requires that PostgreSQL cast the current data into `jsonb`, append the new data (also cast to `jsonb`), and then re-cast that as text. For single-message moves, this _increases_ the SQL query count by one, since we have to re-query for the updated data from the database after the bulk update. However, this is overall still a performance improvement, which improves to 2x or 3x for larger topic moves. Below is a table of duration in seconds to run `do_update_message` to move a topic to a new stream, based on messages in the topic, for before and after this change: | Topic size | Before | After | | ---------- | -------- | ------- | | 1 | 0.1036 | 0.0868 | | 2 | 0.1108 | 0.0925 | | 5 | 0.1139 | 0.0959 | | 10 | 0.1218 | 0.0972 | | 20 | 0.1310 | 0.1098 | | 50 | 0.1759 | 0.1366 | | 100 | 0.2307 | 0.1662 | | 200 | 0.3880 | 0.2229 | | 500 | 0.7676 | 0.4052 | | 1000 | 1.3990 | 0.6848 | | 2000 | 2.9706 | 1.3370 | | 5000 | 7.5218 | 3.2882 | | 10000 | 14.0272 | 5.4434 |
This commit is contained in:
parent
822131fef4
commit
5c96f94206
|
@ -3,12 +3,14 @@ from typing import Any, Dict, List, Optional, Set, Tuple
|
|||
|
||||
import orjson
|
||||
from django.db import connection
|
||||
from django.db.models import Q, QuerySet, Subquery
|
||||
from django.db.models import F, Func, JSONField, Q, QuerySet, Subquery, TextField, Value
|
||||
from django.db.models.functions import Cast
|
||||
from sqlalchemy.sql import ColumnElement, column, func, literal
|
||||
from sqlalchemy.types import Boolean, Text
|
||||
|
||||
from zerver.lib.request import REQ
|
||||
from zerver.lib.types import EditHistoryEvent
|
||||
from zerver.lib.utils import assert_is_not_none
|
||||
from zerver.models import Message, Reaction, Stream, UserMessage, UserProfile
|
||||
|
||||
# Only use these constants for events.
|
||||
|
@ -153,21 +155,16 @@ def update_messages_for_topic_edit(
|
|||
edit_history_event: EditHistoryEvent,
|
||||
last_edit_time: datetime,
|
||||
) -> List[Message]:
|
||||
propagate_query = Q(
|
||||
recipient_id=old_stream.recipient_id,
|
||||
# Uses index: zerver_message_realm_recipient_upper_subject
|
||||
messages = Message.objects.filter(
|
||||
realm_id=old_stream.realm_id,
|
||||
recipient_id=assert_is_not_none(old_stream.recipient_id),
|
||||
subject__iexact=orig_topic_name,
|
||||
)
|
||||
if propagate_mode == "change_all":
|
||||
propagate_query = propagate_query & ~Q(id=edited_message.id)
|
||||
messages = messages.exclude(id=edited_message.id)
|
||||
if propagate_mode == "change_later":
|
||||
propagate_query = propagate_query & Q(id__gt=edited_message.id)
|
||||
|
||||
# Uses index: zerver_message_realm_recipient_upper_subject
|
||||
messages = Message.objects.filter(propagate_query, realm_id=old_stream.realm_id).select_related(
|
||||
*Message.DEFAULT_SELECT_RELATED
|
||||
)
|
||||
|
||||
update_fields = ["edit_history", "last_edit_time"]
|
||||
messages = messages.filter(id__gt=edited_message.id)
|
||||
|
||||
if new_stream is not None:
|
||||
# If we're moving the messages between streams, only move
|
||||
|
@ -175,33 +172,62 @@ def update_messages_for_topic_edit(
|
|||
# gain access to messages through moving them.
|
||||
from zerver.lib.message import bulk_access_stream_messages_query
|
||||
|
||||
messages_list = list(bulk_access_stream_messages_query(acting_user, messages, old_stream))
|
||||
messages = bulk_access_stream_messages_query(acting_user, messages, old_stream)
|
||||
else:
|
||||
# For single-message edits or topic moves within a stream, we
|
||||
# allow moving history the user may not have access in order
|
||||
# to keep topics together.
|
||||
messages_list = list(messages)
|
||||
pass
|
||||
|
||||
# The cached ORM objects are not changed by the upcoming
|
||||
# messages.update(), and the remote cache update (done by the
|
||||
# caller) requires the new value, so we manually update the
|
||||
# objects in addition to sending a bulk query to the database.
|
||||
update_fields: Dict[str, object] = {
|
||||
"last_edit_time": last_edit_time,
|
||||
# We cast the `edit_history` column to jsonb (defaulting NULL
|
||||
# to `[]`), apply the `||` array concatenation operator to it,
|
||||
# and cast the result back to text. See #26496 for making
|
||||
# this column itself jsonb, which is a complicated migration.
|
||||
#
|
||||
# This equates to:
|
||||
# "edit_history" = (
|
||||
# (COALESCE("zerver_message"."edit_history", '[]'))::jsonb
|
||||
# ||
|
||||
# ( '[{ ..json event.. }]' )::jsonb
|
||||
# )::text
|
||||
"edit_history": Cast(
|
||||
Func(
|
||||
Cast(
|
||||
Func(
|
||||
F("edit_history"),
|
||||
Value("[]"),
|
||||
function="COALESCE",
|
||||
),
|
||||
JSONField(),
|
||||
),
|
||||
Cast(
|
||||
Value(orjson.dumps([edit_history_event]).decode()),
|
||||
JSONField(),
|
||||
),
|
||||
function="",
|
||||
arg_joiner=" || ",
|
||||
),
|
||||
TextField(),
|
||||
),
|
||||
}
|
||||
if new_stream is not None:
|
||||
update_fields.append("recipient")
|
||||
for m in messages_list:
|
||||
assert new_stream.recipient is not None
|
||||
m.recipient = new_stream.recipient
|
||||
update_fields["recipient"] = new_stream.recipient
|
||||
if topic_name is not None:
|
||||
update_fields.append("subject")
|
||||
for m in messages_list:
|
||||
m.set_topic_name(topic_name)
|
||||
update_fields["subject"] = topic_name
|
||||
|
||||
for message in messages_list:
|
||||
update_edit_history(message, last_edit_time, edit_history_event)
|
||||
# The update will cause the 'messages' query to no longer match
|
||||
# any rows; we capture the set of matching ids first, do the
|
||||
# update, and then return a fresh collection -- so we know their
|
||||
# metadata has been updated for the UPDATE command, and the caller
|
||||
# can update the remote cache with that.
|
||||
message_ids = list(messages.values_list("id", flat=True))
|
||||
messages.update(**update_fields)
|
||||
|
||||
Message.objects.bulk_update(messages_list, update_fields, batch_size=100)
|
||||
|
||||
return messages_list
|
||||
return list(
|
||||
Message.objects.filter(id__in=message_ids).select_related(*Message.DEFAULT_SELECT_RELATED)
|
||||
)
|
||||
|
||||
|
||||
def generate_topic_history_from_db_rows(rows: List[Tuple[str, int]]) -> List[Dict[str, Any]]:
|
||||
|
|
|
@ -1431,7 +1431,7 @@ class EditMessageTest(EditMessageTestCase):
|
|||
# state + 1/user with a UserTopic row for the events data)
|
||||
# beyond what is typical were there not UserTopic records to
|
||||
# update. Ideally, we'd eliminate the per-user component.
|
||||
with self.assert_database_query_count(23):
|
||||
with self.assert_database_query_count(24):
|
||||
check_update_message(
|
||||
user_profile=hamlet,
|
||||
message_id=message_id,
|
||||
|
@ -1528,7 +1528,7 @@ class EditMessageTest(EditMessageTestCase):
|
|||
set_topic_visibility_policy(desdemona, muted_topics, UserTopic.VisibilityPolicy.MUTED)
|
||||
set_topic_visibility_policy(cordelia, muted_topics, UserTopic.VisibilityPolicy.MUTED)
|
||||
|
||||
with self.assert_database_query_count(28):
|
||||
with self.assert_database_query_count(29):
|
||||
check_update_message(
|
||||
user_profile=desdemona,
|
||||
message_id=message_id,
|
||||
|
@ -1559,7 +1559,7 @@ class EditMessageTest(EditMessageTestCase):
|
|||
set_topic_visibility_policy(desdemona, muted_topics, UserTopic.VisibilityPolicy.MUTED)
|
||||
set_topic_visibility_policy(cordelia, muted_topics, UserTopic.VisibilityPolicy.MUTED)
|
||||
|
||||
with self.assert_database_query_count(33):
|
||||
with self.assert_database_query_count(34):
|
||||
check_update_message(
|
||||
user_profile=desdemona,
|
||||
message_id=message_id,
|
||||
|
@ -1592,7 +1592,7 @@ class EditMessageTest(EditMessageTestCase):
|
|||
set_topic_visibility_policy(desdemona, muted_topics, UserTopic.VisibilityPolicy.MUTED)
|
||||
set_topic_visibility_policy(cordelia, muted_topics, UserTopic.VisibilityPolicy.MUTED)
|
||||
|
||||
with self.assert_database_query_count(28):
|
||||
with self.assert_database_query_count(29):
|
||||
check_update_message(
|
||||
user_profile=desdemona,
|
||||
message_id=message_id,
|
||||
|
@ -1615,7 +1615,7 @@ class EditMessageTest(EditMessageTestCase):
|
|||
second_message_id = self.send_stream_message(
|
||||
hamlet, stream_name, topic_name="changed topic name", content="Second message"
|
||||
)
|
||||
with self.assert_database_query_count(23):
|
||||
with self.assert_database_query_count(24):
|
||||
check_update_message(
|
||||
user_profile=desdemona,
|
||||
message_id=second_message_id,
|
||||
|
@ -1714,7 +1714,7 @@ class EditMessageTest(EditMessageTestCase):
|
|||
users_to_be_notified_via_muted_topics_event.append(user_topic.user_profile_id)
|
||||
|
||||
change_all_topic_name = "Topic 1 edited"
|
||||
with self.assert_database_query_count(28):
|
||||
with self.assert_database_query_count(29):
|
||||
check_update_message(
|
||||
user_profile=hamlet,
|
||||
message_id=message_id,
|
||||
|
@ -3783,7 +3783,7 @@ class EditMessageTest(EditMessageTestCase):
|
|||
"iago", "test move stream", "new stream", "test"
|
||||
)
|
||||
|
||||
with self.assert_database_query_count(52), self.assert_memcached_count(14):
|
||||
with self.assert_database_query_count(53), self.assert_memcached_count(14):
|
||||
result = self.client_patch(
|
||||
f"/json/messages/{msg_id}",
|
||||
{
|
||||
|
@ -3809,6 +3809,47 @@ class EditMessageTest(EditMessageTestCase):
|
|||
)
|
||||
self.assert_json_success(result)
|
||||
|
||||
def test_move_many_messages_to_stream_and_topic(self) -> None:
|
||||
(user_profile, old_stream, new_stream, msg_id, msg_id_later) = self.prepare_move_topics(
|
||||
"iago", "first origin stream", "first destination stream", "first topic"
|
||||
)
|
||||
|
||||
with queries_captured() as queries:
|
||||
result = self.client_patch(
|
||||
f"/json/messages/{msg_id}",
|
||||
{
|
||||
"propagate_mode": "change_all",
|
||||
"send_notification_to_old_thread": "true",
|
||||
"stream_id": new_stream.id,
|
||||
"topic": "first topic",
|
||||
},
|
||||
)
|
||||
self.assert_json_success(result)
|
||||
|
||||
# Adding more messages should not increase the number of
|
||||
# queries
|
||||
(user_profile, old_stream, new_stream, msg_id, msg_id_later) = self.prepare_move_topics(
|
||||
"iago", "second origin stream", "second destination stream", "second topic"
|
||||
)
|
||||
for i in range(1, 5):
|
||||
self.send_stream_message(
|
||||
user_profile,
|
||||
"second origin stream",
|
||||
topic_name="second topic",
|
||||
content=f"Extra message {i}",
|
||||
)
|
||||
with self.assert_database_query_count(len(queries)):
|
||||
result = self.client_patch(
|
||||
f"/json/messages/{msg_id}",
|
||||
{
|
||||
"propagate_mode": "change_all",
|
||||
"send_notification_to_old_thread": "true",
|
||||
"stream_id": new_stream.id,
|
||||
"topic": "second topic",
|
||||
},
|
||||
)
|
||||
self.assert_json_success(result)
|
||||
|
||||
def test_inaccessible_msg_after_stream_change(self) -> None:
|
||||
"""Simulates the case where message is moved to a stream where user is not a subscribed"""
|
||||
(user_profile, old_stream, new_stream, msg_id, msg_id_lt) = self.prepare_move_topics(
|
||||
|
|
Loading…
Reference in New Issue