diff --git a/puppet/zulip/manifests/app_frontend_base.pp b/puppet/zulip/manifests/app_frontend_base.pp index 3731ee5d60..a02e373b5f 100644 --- a/puppet/zulip/manifests/app_frontend_base.pp +++ b/puppet/zulip/manifests/app_frontend_base.pp @@ -157,6 +157,7 @@ class zulip::app_frontend_base { } else { $uwsgi_default_processes = 3 } + $mobile_notification_shards = Integer(zulipconf('application_server','mobile_notification_shards', 1)) $tornado_ports = $zulip::tornado_sharding::tornado_ports $proxy_host = zulipconf('http_proxy', 'host', 'localhost') diff --git a/puppet/zulip/templates/supervisor/zulip.conf.template.erb b/puppet/zulip/templates/supervisor/zulip.conf.template.erb index bb2b488ff5..eee69f322e 100644 --- a/puppet/zulip/templates/supervisor/zulip.conf.template.erb +++ b/puppet/zulip/templates/supervisor/zulip.conf.template.erb @@ -61,7 +61,16 @@ directory=/home/zulip/deployments/current/ <% if @queues_multiprocess %> <% @queues.each do |queue| -%> [program:zulip_events_<%= queue %>] +<% if queue == "missedmessage_mobile_notifications" and @mobile_notification_shards > 1 -%> +process_name=zulip_events_<%= queue %>_shard%(process_num)s +command=nice -n10 /home/zulip/deployments/current/manage.py process_queue --queue_name=<%= queue %> --skip-checks --worker_num %(process_num)s +stdout_logfile=/var/log/zulip/events_<%= queue %>_shard%(process_num)s.log ; stdout log path, NONE for none; default AUTO +numprocs=<%= @mobile_notification_shards %> +numprocs_start=1 +<% else -%> command=nice -n10 /home/zulip/deployments/current/manage.py process_queue --queue_name=<%= queue %> --skip-checks +stdout_logfile=/var/log/zulip/events_<%= queue %>.log ; stdout log path, NONE for none; default AUTO +<%end -%> environment=HTTP_proxy="<%= @proxy %>",HTTPS_proxy="<%= @proxy %>" priority=300 ; the relative start priority (default 999) autostart=true ; start at supervisord start (default: true) @@ -70,7 +79,6 @@ stopsignal=TERM ; signal used to kill process (default TERM) stopwaitsecs=30 ; max num secs to wait b4 SIGKILL (default 10) user=zulip ; setuid to this UNIX account to run the program redirect_stderr=true ; redirect proc stderr to stdout (default false) -stdout_logfile=/var/log/zulip/events_<%= queue %>.log ; stdout log path, NONE for none; default AUTO stdout_logfile_maxbytes=20MB ; max # logfile bytes b4 rotation (default 50MB) stdout_logfile_backups=3 ; # of stdout logfile backups (default 10) directory=/home/zulip/deployments/current/ diff --git a/scripts/lib/check_rabbitmq_queue.py b/scripts/lib/check_rabbitmq_queue.py index 42021b1ba6..fa12b7531f 100644 --- a/scripts/lib/check_rabbitmq_queue.py +++ b/scripts/lib/check_rabbitmq_queue.py @@ -2,12 +2,16 @@ import json import os import re import subprocess +import sys import time from collections import defaultdict from typing import Any, DefaultDict, Dict, List ZULIP_PATH = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +sys.path.append(ZULIP_PATH) +from scripts.lib.zulip_tools import get_config, get_config_file + normal_queues = [ "deferred_work", "digest_emails", @@ -23,6 +27,10 @@ normal_queues = [ "user_presence", ] +mobile_notification_shards = int( + get_config(get_config_file(), "application_server", "mobile_notification_shards", "1") +) + OK = 0 WARNING = 1 CRITICAL = 2 @@ -154,7 +162,13 @@ def check_rabbitmq_queues() -> None: text=True, ).strip() queue_stats: Dict[str, Dict[str, Any]] = {} - queues_to_check = set(normal_queues).intersection(set(queues_with_consumers)) + check_queues = normal_queues + if mobile_notification_shards > 1: + check_queues += [ + f"missedmessage_mobile_notifications_shard{d}" + for d in range(1, mobile_notification_shards + 1) + ] + queues_to_check = set(check_queues).intersection(set(queues_with_consumers)) for queue in queues_to_check: fn = queue + ".stats" file_path = os.path.join(queue_stats_dir, fn) diff --git a/scripts/nagios/check-rabbitmq-consumers b/scripts/nagios/check-rabbitmq-consumers index 5d2c13ff19..e78ad6d682 100755 --- a/scripts/nagios/check-rabbitmq-consumers +++ b/scripts/nagios/check-rabbitmq-consumers @@ -1,6 +1,7 @@ #!/usr/bin/env python3 import argparse import os +import re import subprocess import sys import time @@ -10,7 +11,7 @@ from typing import Dict ZULIP_PATH = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) sys.path.append(ZULIP_PATH) from scripts.lib.check_rabbitmq_queue import normal_queues -from scripts.lib.zulip_tools import get_config_file, get_tornado_ports +from scripts.lib.zulip_tools import get_config, get_config_file, get_tornado_ports states = { 0: "OK", @@ -49,6 +50,7 @@ for line in output.split("\n"): queue_name = parts[0] if queue_name.startswith("notify_tornado_"): queue_name = "notify_tornado" + queue_name = re.sub(r"_shard(\d+)", "", queue_name) consumers[queue_name] += 1 now = int(time.time()) @@ -60,6 +62,10 @@ for queue_name in consumers: target_count = 1 if queue_name == "notify_tornado": target_count = TORNADO_PROCESSES + elif queue_name == "missedmessage_mobile_notifications": + target_count = int( + get_config(config_file, "application_server", "mobile_notification_shards", "1") + ) if consumers[queue_name] < target_count: status = 2 diff --git a/zerver/actions/message_flags.py b/zerver/actions/message_flags.py index 7a6802c399..af0ec5347b 100644 --- a/zerver/actions/message_flags.py +++ b/zerver/actions/message_flags.py @@ -3,6 +3,7 @@ from collections import defaultdict from dataclasses import asdict, dataclass, field from typing import List, Optional, Set +from django.conf import settings from django.db import transaction from django.db.models import F from django.utils.timezone import now as timezone_now @@ -247,14 +248,16 @@ def do_clear_mobile_push_notifications_for_ids( messages_by_user[user_id].append(message_id) for user_profile_id, event_message_ids in messages_by_user.items(): - queue_json_publish( - "missedmessage_mobile_notifications", - { - "type": "remove", - "user_profile_id": user_profile_id, - "message_ids": event_message_ids, - }, - ) + notice = { + "type": "remove", + "user_profile_id": user_profile_id, + "message_ids": event_message_ids, + } + if settings.MOBILE_NOTIFICATIONS_SHARDS > 1: # nocoverage + shard_id = user_profile_id % settings.MOBILE_NOTIFICATIONS_SHARDS + 1 + queue_json_publish(f"missedmessage_mobile_notifications_shard{shard_id}", notice) + else: + queue_json_publish("missedmessage_mobile_notifications", notice) def do_update_message_flags( diff --git a/zerver/tornado/event_queue.py b/zerver/tornado/event_queue.py index 26f4fe0d24..637d8ecd79 100644 --- a/zerver/tornado/event_queue.py +++ b/zerver/tornado/event_queue.py @@ -959,7 +959,13 @@ def maybe_enqueue_notifications( notice["type"] = "add" notice["mentioned_user_group_id"] = mentioned_user_group_id if not already_notified.get("push_notified"): - queue_json_publish("missedmessage_mobile_notifications", notice) + if settings.MOBILE_NOTIFICATIONS_SHARDS > 1: + shard_id = ( + user_notifications_data.user_id % settings.MOBILE_NOTIFICATIONS_SHARDS + 1 + ) + queue_json_publish(f"missedmessage_mobile_notifications_shard{shard_id}", notice) + else: + queue_json_publish("missedmessage_mobile_notifications", notice) notified["push_notified"] = True # Send missed_message emails if a direct message or a diff --git a/zerver/worker/missedmessage_mobile_notifications.py b/zerver/worker/missedmessage_mobile_notifications.py index 5d04fad740..23a941defc 100644 --- a/zerver/worker/missedmessage_mobile_notifications.py +++ b/zerver/worker/missedmessage_mobile_notifications.py @@ -1,7 +1,8 @@ # Documented in https://zulip.readthedocs.io/en/latest/subsystems/queuing.html import logging -from typing import Any, Dict +from typing import Any, Dict, Optional +from django.conf import settings from typing_extensions import override from zerver.lib.push_notifications import ( @@ -23,6 +24,17 @@ class PushNotificationsWorker(QueueProcessingWorker): # play well with asyncio. MAX_CONSUME_SECONDS = None + @override + def __init__( + self, + threaded: bool = False, + disable_timeout: bool = False, + worker_num: Optional[int] = None, + ) -> None: + if settings.MOBILE_NOTIFICATIONS_SHARDS > 1 and worker_num is not None: # nocoverage + self.queue_name = self.queue_name + f"_shard{worker_num}" + super().__init__(threaded, disable_timeout, worker_num) + @override def start(self) -> None: # initialize_push_notifications doesn't strictly do anything diff --git a/zproject/computed_settings.py b/zproject/computed_settings.py index 81200c5e30..67631febdb 100644 --- a/zproject/computed_settings.py +++ b/zproject/computed_settings.py @@ -1176,6 +1176,10 @@ CROSS_REALM_BOT_EMAILS = { "emailgateway@zulip.com", } +MOBILE_NOTIFICATIONS_SHARDS = int( + get_config("application_server", "mobile_notification_shards", "1") +) + TWO_FACTOR_PATCH_ADMIN = False # Allow the environment to override the default DSN