actions: Mark all messages as read when user unsubscribes from stream.

This fixes a bug where, when a user is unsubscribed from a stream,
they might have unread messages on that stream leak.  While it might
seem to be a minor problem, it can cause significant problems for
computing the `unread_msgs` data structures, since it means we need to
add an extra filter for whether the user is still subscribed, either
in the backend or in the UI.

Fixes #7095.
This commit is contained in:
Vishnu Ks 2017-11-13 20:24:51 +00:00 committed by Tim Abbott
parent 52069998eb
commit 766511e519
8 changed files with 70 additions and 5 deletions

View File

@ -38,6 +38,7 @@ class zulip::base {
}
$normal_queues = [
'deferred_work',
'digest_emails',
'email_mirror',
'embed_links',

View File

@ -573,6 +573,15 @@ define service {
contact_groups admins
}
define service {
use generic-service
service_description Check deferred_work queue processor
check_command check_remote_arg_string!manage.py process_queue --queue_name=deferred_work!1:1!1:1
max_check_attempts 3
hostgroup_name frontends
contact_groups admins
}
define service {
use generic-service
service_description Check worker memory usage

View File

@ -40,6 +40,7 @@ consumers = defaultdict(int) # type: Dict[str, int]
sys.path.append(os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__)))))
from scripts.lib.zulip_tools import su_to_zulip
queues = {
'deferred_work'
'digest_emails',
'email_mirror',
'embed_links',

View File

@ -19,6 +19,7 @@ TOOLS_DIR = os.path.dirname(os.path.abspath(__file__))
successful_worker_launches = [
'launching queue worker thread error_reports',
'launching queue worker thread user_presence',
'launching queue worker thread deferred_work',
'launching queue worker thread digest_emails',
'launching queue worker thread slow_queries',
'launching queue worker thread missedmessage_mobile_notifications',

View File

@ -2443,6 +2443,11 @@ def bulk_remove_subscriptions(users, streams, acting_user=None):
continue
notify_subscriptions_removed(user_profile, streams_by_user[user_profile.id])
event = {'type': 'mark_stream_messages_as_read',
'user_profile_id': user_profile.id,
'stream_ids': [stream.id for stream in streams]}
queue_json_publish("deferred_work", event, lambda x: None, call_consume_in_tests=True)
all_subscribers_by_stream = get_user_ids_for_streams(streams=streams)
for stream in streams:

View File

@ -1861,7 +1861,7 @@ class EventsRegisterTest(ZulipTestCase):
[stream])
events = self.do_test(action,
include_subscribers=include_subscribers,
num_events=2)
num_events=3)
error = remove_schema_checker('events[1]', events[1])
self.assert_on_error(error)

View File

@ -65,6 +65,11 @@ from zerver.views.streams import (
compose_views
)
from zerver.lib.message import (
aggregate_unread_data,
get_raw_unread_data,
)
from django.http import HttpResponse
import mock
import random
@ -615,7 +620,7 @@ class StreamAdminTest(ZulipTestCase):
those you aren't on.
"""
result = self.attempt_unsubscribe_of_principal(
query_count=14, is_admin=True, is_subbed=True, invite_only=False,
query_count=21, is_admin=True, is_subbed=True, invite_only=False,
other_user_subbed=True)
json = self.assert_json_success(result)
self.assertEqual(len(json["removed"]), 1)
@ -627,7 +632,7 @@ class StreamAdminTest(ZulipTestCase):
are on.
"""
result = self.attempt_unsubscribe_of_principal(
query_count=14, is_admin=True, is_subbed=True, invite_only=True,
query_count=19, is_admin=True, is_subbed=True, invite_only=True,
other_user_subbed=True)
json = self.assert_json_success(result)
self.assertEqual(len(json["removed"]), 1)
@ -1783,7 +1788,7 @@ class SubscriptionAPITest(ZulipTestCase):
streams_to_sub,
dict(principals=ujson.dumps([user1.email, user2.email])),
)
self.assert_length(queries, 40)
self.assert_length(queries, 39)
self.assert_length(events, 7)
for ev in [x for x in events if x['event']['type'] not in ('message', 'stream')]:
@ -2305,6 +2310,38 @@ class SubscriptionAPITest(ZulipTestCase):
self.assertFalse(subscription.push_notifications)
self.assertFalse(subscription.audible_notifications)
def test_mark_messages_as_unread_on_unsubscribe(self) -> None:
realm = get_realm("zulip")
user = self.example_user("iago")
random_user = self.example_user("hamlet")
(stream1, _) = create_stream_if_needed(realm, "stream1", invite_only=False)
(stream2, _) = create_stream_if_needed(realm, "stream2", invite_only=False)
self.subscribe(user, "stream1")
self.subscribe(user, "stream2")
self.subscribe(random_user, "stream1")
self.subscribe(random_user, "stream2")
self.send_stream_message(random_user.email, "stream1", "test", "test")
self.send_stream_message(random_user.email, "stream2", "test", "test")
def get_unread_stream_data() -> List[Dict[str, Any]]:
raw_unread_data = get_raw_unread_data(user)
aggregated_data = aggregate_unread_data(raw_unread_data)
return aggregated_data['streams']
result = get_unread_stream_data()
self.assert_length(result, 2)
self.assertEqual(result[0]['stream_id'], stream1.id)
self.assertEqual(result[1]['stream_id'], stream2.id)
# Unsubscribing should mark all the messages in stream2 as read
self.unsubscribe(user, "stream2")
self.subscribe(user, "stream2")
result = get_unread_stream_data()
self.assert_length(result, 1)
self.assertEqual(result[0]['stream_id'], stream1.id)
class GetPublicStreamsTest(ZulipTestCase):

View File

@ -26,12 +26,13 @@ from zerver.lib.push_notifications import handle_push_notification
from zerver.lib.actions import do_send_confirmation_email, \
do_update_user_activity, do_update_user_activity_interval, do_update_user_presence, \
internal_send_message, check_send_message, extract_recipients, \
render_incoming_message, do_update_embedded_data
render_incoming_message, do_update_embedded_data, do_mark_stream_messages_as_read
from zerver.lib.url_preview import preview as url_preview
from zerver.lib.digest import handle_digest_email
from zerver.lib.send_email import send_future_email, send_email_from_dict, \
FromAddress, EmailNotDeliveredException
from zerver.lib.email_mirror import process_message as mirror_email
from zerver.lib.streams import access_stream_by_id
from zerver.decorator import JsonableError
from zerver.tornado.socket import req_redis_key, respond_send_message
from confirmation.models import Confirmation, create_confirmation_link
@ -508,3 +509,13 @@ class EmbeddedBotWorker(QueueProcessingWorker):
message=message,
bot_handler=self.get_bot_api_client(user_profile),
state_handler=None)
@assign_queue('deferred_work')
class DeferredWorker(QueueProcessingWorker):
def consume(self, event: Mapping[str, Any]) -> None:
if event['type'] == 'mark_stream_messages_as_read':
user_profile = get_user_profile_by_id(event['user_profile_id'])
for stream_id in event['stream_ids']:
(stream, recipient, sub) = access_stream_by_id(user_profile, stream_id)
do_mark_stream_messages_as_read(user_profile, stream)