streams: Mark all messages as read when deactivating a stream.

The query to finds and marks all unread UserMessages in the stream as read
can be quite expensive, so we'll move that work to the deferred_work
queue and split it into batches.

Fixes #15770.
This commit is contained in:
Mateusz Mandera 2020-08-06 18:21:42 +02:00 committed by Tim Abbott
parent 2de98ab6e1
commit 9b50c49ea7
4 changed files with 90 additions and 0 deletions

View File

@ -916,6 +916,16 @@ def do_deactivate_user(user_profile: UserProfile,
do_deactivate_user(profile, acting_user=acting_user, _cascade=False)
def do_deactivate_stream(stream: Stream, log: bool=True, acting_user: Optional[UserProfile]=None) -> None:
# We want to mark all messages in the to-be-deactivated stream as
# read for all users; otherwise they will pollute queries like
# "Get the user's first unread message". Since this can be an
# expensive operation, we do it via the deferred_work queue
# processor.
deferred_work_event = {
"type": "mark_stream_messages_as_read_for_everyone",
"stream_recipient_id": stream.recipient_id
}
queue_json_publish("deferred_work", deferred_work_event)
# Get the affected user ids *before* we deactivate everybody.
affected_user_ids = can_access_stream_user_ids(stream)

View File

@ -0,0 +1,34 @@
from django.db import connection, migrations
from django.db.backends.postgresql.schema import DatabaseSchemaEditor
from django.db.migrations.state import StateApps
def mark_messages_read(apps: StateApps, schema_editor: DatabaseSchemaEditor) -> None:
Stream = apps.get_model("zerver", "Stream")
deactivated_stream_ids = list(Stream.objects.filter(deactivated=True).values_list('id', flat=True))
with connection.cursor() as cursor:
for i in deactivated_stream_ids:
cursor.execute(f"""
UPDATE zerver_usermessage SET flags = flags | 1
FROM zerver_message
INNER JOIN zerver_stream ON zerver_stream.recipient_id = zerver_message.recipient_id
WHERE zerver_message.id = zerver_usermessage.message_id
AND zerver_stream.id = {i};
""")
class Migration(migrations.Migration):
"""
We're changing the stream deactivation process to make it mark all messages
in the stream as read. For things to be consistent with streams that have been
deactivated before this change, we need a migration to fix those old streams,
to have all messages marked as read.
"""
atomic = False
dependencies = [
('zerver', '0300_add_attachment_is_web_public'),
]
operations = [
migrations.RunPython(mark_messages_read, reverse_code=migrations.RunPython.noop),
]

View File

@ -63,6 +63,7 @@ from zerver.lib.streams import (
from zerver.lib.test_classes import ZulipTestCase
from zerver.lib.test_helpers import (
get_subscription,
most_recent_usermessage,
queries_captured,
reset_emails_in_zulip_realm,
tornado_redirected_to_list,
@ -552,6 +553,32 @@ class StreamAdminTest(ZulipTestCase):
do_deactivate_stream(streams_to_remove[0])
self.assertEqual(get_streams(default_stream_groups[0]), streams_to_keep)
def test_deactivate_stream_marks_messages_as_read(self) -> None:
hamlet = self.example_user("hamlet")
cordelia = self.example_user("cordelia")
stream = self.make_stream('new_stream')
self.subscribe(hamlet, stream.name)
self.subscribe(cordelia, stream.name)
self.subscribe(hamlet, "Denmark")
self.subscribe(cordelia, "Denmark")
self.send_stream_message(hamlet, stream.name)
new_stream_usermessage = most_recent_usermessage(cordelia)
# We send a message to a different stream too, to verify that the
# deactivation of new_stream won't corrupt read state of UserMessage elsewhere.
self.send_stream_message(hamlet, "Denmark")
denmark_usermessage = most_recent_usermessage(cordelia)
self.assertFalse(new_stream_usermessage.flags.read)
self.assertFalse(denmark_usermessage.flags.read)
do_deactivate_stream(stream)
new_stream_usermessage.refresh_from_db()
denmark_usermessage.refresh_from_db()
self.assertTrue(new_stream_usermessage.flags.read)
self.assertFalse(denmark_usermessage.flags.read)
def test_vacate_private_stream_removes_default_stream(self) -> None:
stream = self.make_stream('new_stream', invite_only=True)
self.subscribe(self.example_user("hamlet"), stream.name)

View File

@ -35,6 +35,7 @@ import orjson
import requests
from django.conf import settings
from django.db import connection
from django.db.models import F
from django.utils.timezone import now as timezone_now
from django.utils.translation import override as override_language
from django.utils.translation import ugettext as _
@ -699,6 +700,12 @@ class EmbeddedBotWorker(QueueProcessingWorker):
@assign_queue('deferred_work')
class DeferredWorker(QueueProcessingWorker):
"""This queue processor is intended for cases where we want to trigger a
potentially expensive, not urgent, job to be run on a separate
thread from the Django worker that initiated it (E.g. so we that
can provide a low-latency HTTP response or avoid risk of request
timeouts for an operation that could in rare cases take minutes).
"""
def consume(self, event: Dict[str, Any]) -> None:
if event['type'] == 'mark_stream_messages_as_read':
user_profile = get_user_profile_by_id(event['user_profile_id'])
@ -711,6 +718,18 @@ class DeferredWorker(QueueProcessingWorker):
(stream, recipient, sub) = access_stream_by_id(user_profile, stream_id,
require_active=False)
do_mark_stream_messages_as_read(user_profile, client, stream)
elif event["type"] == 'mark_stream_messages_as_read_for_everyone':
# This event is generated by the stream deactivation code path.
batch_size = 100
offset = 0
while True:
messages = Message.objects.filter(recipient_id=event["stream_recipient_id"]) \
.order_by("id")[offset:offset + batch_size]
UserMessage.objects.filter(message__in=messages).extra(where=[UserMessage.where_unread()]) \
.update(flags=F('flags').bitor(UserMessage.flags.read))
offset += len(messages)
if len(messages) < batch_size:
break
elif event['type'] == 'clear_push_device_tokens':
try:
clear_push_device_tokens(event["user_profile_id"])