diff --git a/puppet/zulip/manifests/base.pp b/puppet/zulip/manifests/base.pp index 943bb64b12..dc4c9dcb13 100644 --- a/puppet/zulip/manifests/base.pp +++ b/puppet/zulip/manifests/base.pp @@ -38,6 +38,7 @@ class zulip::base { } $normal_queues = [ + 'deferred_work', 'digest_emails', 'email_mirror', 'embed_links', diff --git a/puppet/zulip_ops/files/nagios3/conf.d/services.cfg b/puppet/zulip_ops/files/nagios3/conf.d/services.cfg index ac5e15a6a3..13e80485b7 100644 --- a/puppet/zulip_ops/files/nagios3/conf.d/services.cfg +++ b/puppet/zulip_ops/files/nagios3/conf.d/services.cfg @@ -573,6 +573,15 @@ define service { contact_groups admins } +define service { + use generic-service + service_description Check deferred_work queue processor + check_command check_remote_arg_string!manage.py process_queue --queue_name=deferred_work!1:1!1:1 + max_check_attempts 3 + hostgroup_name frontends + contact_groups admins +} + define service { use generic-service service_description Check worker memory usage diff --git a/scripts/nagios/check-rabbitmq-consumers b/scripts/nagios/check-rabbitmq-consumers index 9300a3126f..f6bb04d747 100755 --- a/scripts/nagios/check-rabbitmq-consumers +++ b/scripts/nagios/check-rabbitmq-consumers @@ -40,6 +40,7 @@ consumers = defaultdict(int) # type: Dict[str, int] sys.path.append(os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))))) from scripts.lib.zulip_tools import su_to_zulip queues = { + 'deferred_work' 'digest_emails', 'email_mirror', 'embed_links', diff --git a/tools/test-queue-worker-reload b/tools/test-queue-worker-reload index 9b62e8b75b..441f353724 100755 --- a/tools/test-queue-worker-reload +++ b/tools/test-queue-worker-reload @@ -19,6 +19,7 @@ TOOLS_DIR = os.path.dirname(os.path.abspath(__file__)) successful_worker_launches = [ 'launching queue worker thread error_reports', 'launching queue worker thread user_presence', + 'launching queue worker thread deferred_work', 'launching queue worker thread digest_emails', 'launching queue worker thread slow_queries', 'launching queue worker thread missedmessage_mobile_notifications', diff --git a/zerver/lib/actions.py b/zerver/lib/actions.py index 7eb6972125..878ff62261 100644 --- a/zerver/lib/actions.py +++ b/zerver/lib/actions.py @@ -2443,6 +2443,11 @@ def bulk_remove_subscriptions(users, streams, acting_user=None): continue notify_subscriptions_removed(user_profile, streams_by_user[user_profile.id]) + event = {'type': 'mark_stream_messages_as_read', + 'user_profile_id': user_profile.id, + 'stream_ids': [stream.id for stream in streams]} + queue_json_publish("deferred_work", event, lambda x: None, call_consume_in_tests=True) + all_subscribers_by_stream = get_user_ids_for_streams(streams=streams) for stream in streams: diff --git a/zerver/tests/test_events.py b/zerver/tests/test_events.py index ff295695de..0bddc5b4b0 100644 --- a/zerver/tests/test_events.py +++ b/zerver/tests/test_events.py @@ -1861,7 +1861,7 @@ class EventsRegisterTest(ZulipTestCase): [stream]) events = self.do_test(action, include_subscribers=include_subscribers, - num_events=2) + num_events=3) error = remove_schema_checker('events[1]', events[1]) self.assert_on_error(error) diff --git a/zerver/tests/test_subs.py b/zerver/tests/test_subs.py index bc1d05317c..210cf13f86 100644 --- a/zerver/tests/test_subs.py +++ b/zerver/tests/test_subs.py @@ -65,6 +65,11 @@ from zerver.views.streams import ( compose_views ) +from zerver.lib.message import ( + aggregate_unread_data, + get_raw_unread_data, +) + from django.http import HttpResponse import mock import random @@ -615,7 +620,7 @@ class StreamAdminTest(ZulipTestCase): those you aren't on. """ result = self.attempt_unsubscribe_of_principal( - query_count=14, is_admin=True, is_subbed=True, invite_only=False, + query_count=21, is_admin=True, is_subbed=True, invite_only=False, other_user_subbed=True) json = self.assert_json_success(result) self.assertEqual(len(json["removed"]), 1) @@ -627,7 +632,7 @@ class StreamAdminTest(ZulipTestCase): are on. """ result = self.attempt_unsubscribe_of_principal( - query_count=14, is_admin=True, is_subbed=True, invite_only=True, + query_count=19, is_admin=True, is_subbed=True, invite_only=True, other_user_subbed=True) json = self.assert_json_success(result) self.assertEqual(len(json["removed"]), 1) @@ -1783,7 +1788,7 @@ class SubscriptionAPITest(ZulipTestCase): streams_to_sub, dict(principals=ujson.dumps([user1.email, user2.email])), ) - self.assert_length(queries, 40) + self.assert_length(queries, 39) self.assert_length(events, 7) for ev in [x for x in events if x['event']['type'] not in ('message', 'stream')]: @@ -2305,6 +2310,38 @@ class SubscriptionAPITest(ZulipTestCase): self.assertFalse(subscription.push_notifications) self.assertFalse(subscription.audible_notifications) + def test_mark_messages_as_unread_on_unsubscribe(self) -> None: + realm = get_realm("zulip") + user = self.example_user("iago") + random_user = self.example_user("hamlet") + (stream1, _) = create_stream_if_needed(realm, "stream1", invite_only=False) + (stream2, _) = create_stream_if_needed(realm, "stream2", invite_only=False) + + self.subscribe(user, "stream1") + self.subscribe(user, "stream2") + self.subscribe(random_user, "stream1") + self.subscribe(random_user, "stream2") + + self.send_stream_message(random_user.email, "stream1", "test", "test") + self.send_stream_message(random_user.email, "stream2", "test", "test") + + def get_unread_stream_data() -> List[Dict[str, Any]]: + raw_unread_data = get_raw_unread_data(user) + aggregated_data = aggregate_unread_data(raw_unread_data) + return aggregated_data['streams'] + + result = get_unread_stream_data() + self.assert_length(result, 2) + self.assertEqual(result[0]['stream_id'], stream1.id) + self.assertEqual(result[1]['stream_id'], stream2.id) + + # Unsubscribing should mark all the messages in stream2 as read + self.unsubscribe(user, "stream2") + + self.subscribe(user, "stream2") + result = get_unread_stream_data() + self.assert_length(result, 1) + self.assertEqual(result[0]['stream_id'], stream1.id) class GetPublicStreamsTest(ZulipTestCase): diff --git a/zerver/worker/queue_processors.py b/zerver/worker/queue_processors.py index c3e46e5213..ba2c6d5de3 100644 --- a/zerver/worker/queue_processors.py +++ b/zerver/worker/queue_processors.py @@ -26,12 +26,13 @@ from zerver.lib.push_notifications import handle_push_notification from zerver.lib.actions import do_send_confirmation_email, \ do_update_user_activity, do_update_user_activity_interval, do_update_user_presence, \ internal_send_message, check_send_message, extract_recipients, \ - render_incoming_message, do_update_embedded_data + render_incoming_message, do_update_embedded_data, do_mark_stream_messages_as_read from zerver.lib.url_preview import preview as url_preview from zerver.lib.digest import handle_digest_email from zerver.lib.send_email import send_future_email, send_email_from_dict, \ FromAddress, EmailNotDeliveredException from zerver.lib.email_mirror import process_message as mirror_email +from zerver.lib.streams import access_stream_by_id from zerver.decorator import JsonableError from zerver.tornado.socket import req_redis_key, respond_send_message from confirmation.models import Confirmation, create_confirmation_link @@ -508,3 +509,13 @@ class EmbeddedBotWorker(QueueProcessingWorker): message=message, bot_handler=self.get_bot_api_client(user_profile), state_handler=None) + +@assign_queue('deferred_work') +class DeferredWorker(QueueProcessingWorker): + def consume(self, event: Mapping[str, Any]) -> None: + if event['type'] == 'mark_stream_messages_as_read': + user_profile = get_user_profile_by_id(event['user_profile_id']) + + for stream_id in event['stream_ids']: + (stream, recipient, sub) = access_stream_by_id(user_profile, stream_id) + do_mark_stream_messages_as_read(user_profile, stream)