push_notifications: Shard mobile push notifications.

This commit is contained in:
Tim Abbott 2023-12-04 09:35:04 -08:00
parent 572fbfe114
commit 0a756c652c
8 changed files with 67 additions and 13 deletions

View File

@ -157,6 +157,7 @@ class zulip::app_frontend_base {
} else { } else {
$uwsgi_default_processes = 3 $uwsgi_default_processes = 3
} }
$mobile_notification_shards = Integer(zulipconf('application_server','mobile_notification_shards', 1))
$tornado_ports = $zulip::tornado_sharding::tornado_ports $tornado_ports = $zulip::tornado_sharding::tornado_ports
$proxy_host = zulipconf('http_proxy', 'host', 'localhost') $proxy_host = zulipconf('http_proxy', 'host', 'localhost')

View File

@ -61,7 +61,16 @@ directory=/home/zulip/deployments/current/
<% if @queues_multiprocess %> <% if @queues_multiprocess %>
<% @queues.each do |queue| -%> <% @queues.each do |queue| -%>
[program:zulip_events_<%= queue %>] [program:zulip_events_<%= queue %>]
<% if queue == "missedmessage_mobile_notifications" and @mobile_notification_shards > 1 -%>
process_name=zulip_events_<%= queue %>_shard%(process_num)s
command=nice -n10 /home/zulip/deployments/current/manage.py process_queue --queue_name=<%= queue %> --skip-checks --worker_num %(process_num)s
stdout_logfile=/var/log/zulip/events_<%= queue %>_shard%(process_num)s.log ; stdout log path, NONE for none; default AUTO
numprocs=<%= @mobile_notification_shards %>
numprocs_start=1
<% else -%>
command=nice -n10 /home/zulip/deployments/current/manage.py process_queue --queue_name=<%= queue %> --skip-checks command=nice -n10 /home/zulip/deployments/current/manage.py process_queue --queue_name=<%= queue %> --skip-checks
stdout_logfile=/var/log/zulip/events_<%= queue %>.log ; stdout log path, NONE for none; default AUTO
<%end -%>
environment=HTTP_proxy="<%= @proxy %>",HTTPS_proxy="<%= @proxy %>" environment=HTTP_proxy="<%= @proxy %>",HTTPS_proxy="<%= @proxy %>"
priority=300 ; the relative start priority (default 999) priority=300 ; the relative start priority (default 999)
autostart=true ; start at supervisord start (default: true) autostart=true ; start at supervisord start (default: true)
@ -70,7 +79,6 @@ stopsignal=TERM ; signal used to kill process (default TERM)
stopwaitsecs=30 ; max num secs to wait b4 SIGKILL (default 10) stopwaitsecs=30 ; max num secs to wait b4 SIGKILL (default 10)
user=zulip ; setuid to this UNIX account to run the program user=zulip ; setuid to this UNIX account to run the program
redirect_stderr=true ; redirect proc stderr to stdout (default false) redirect_stderr=true ; redirect proc stderr to stdout (default false)
stdout_logfile=/var/log/zulip/events_<%= queue %>.log ; stdout log path, NONE for none; default AUTO
stdout_logfile_maxbytes=20MB ; max # logfile bytes b4 rotation (default 50MB) stdout_logfile_maxbytes=20MB ; max # logfile bytes b4 rotation (default 50MB)
stdout_logfile_backups=3 ; # of stdout logfile backups (default 10) stdout_logfile_backups=3 ; # of stdout logfile backups (default 10)
directory=/home/zulip/deployments/current/ directory=/home/zulip/deployments/current/

View File

@ -2,12 +2,16 @@ import json
import os import os
import re import re
import subprocess import subprocess
import sys
import time import time
from collections import defaultdict from collections import defaultdict
from typing import Any, DefaultDict, Dict, List from typing import Any, DefaultDict, Dict, List
ZULIP_PATH = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) ZULIP_PATH = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
sys.path.append(ZULIP_PATH)
from scripts.lib.zulip_tools import get_config, get_config_file
normal_queues = [ normal_queues = [
"deferred_work", "deferred_work",
"digest_emails", "digest_emails",
@ -23,6 +27,10 @@ normal_queues = [
"user_presence", "user_presence",
] ]
mobile_notification_shards = int(
get_config(get_config_file(), "application_server", "mobile_notification_shards", "1")
)
OK = 0 OK = 0
WARNING = 1 WARNING = 1
CRITICAL = 2 CRITICAL = 2
@ -154,7 +162,13 @@ def check_rabbitmq_queues() -> None:
text=True, text=True,
).strip() ).strip()
queue_stats: Dict[str, Dict[str, Any]] = {} queue_stats: Dict[str, Dict[str, Any]] = {}
queues_to_check = set(normal_queues).intersection(set(queues_with_consumers)) check_queues = normal_queues
if mobile_notification_shards > 1:
check_queues += [
f"missedmessage_mobile_notifications_shard{d}"
for d in range(1, mobile_notification_shards + 1)
]
queues_to_check = set(check_queues).intersection(set(queues_with_consumers))
for queue in queues_to_check: for queue in queues_to_check:
fn = queue + ".stats" fn = queue + ".stats"
file_path = os.path.join(queue_stats_dir, fn) file_path = os.path.join(queue_stats_dir, fn)

View File

@ -1,6 +1,7 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import argparse import argparse
import os import os
import re
import subprocess import subprocess
import sys import sys
import time import time
@ -10,7 +11,7 @@ from typing import Dict
ZULIP_PATH = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) ZULIP_PATH = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
sys.path.append(ZULIP_PATH) sys.path.append(ZULIP_PATH)
from scripts.lib.check_rabbitmq_queue import normal_queues from scripts.lib.check_rabbitmq_queue import normal_queues
from scripts.lib.zulip_tools import get_config_file, get_tornado_ports from scripts.lib.zulip_tools import get_config, get_config_file, get_tornado_ports
states = { states = {
0: "OK", 0: "OK",
@ -49,6 +50,7 @@ for line in output.split("\n"):
queue_name = parts[0] queue_name = parts[0]
if queue_name.startswith("notify_tornado_"): if queue_name.startswith("notify_tornado_"):
queue_name = "notify_tornado" queue_name = "notify_tornado"
queue_name = re.sub(r"_shard(\d+)", "", queue_name)
consumers[queue_name] += 1 consumers[queue_name] += 1
now = int(time.time()) now = int(time.time())
@ -60,6 +62,10 @@ for queue_name in consumers:
target_count = 1 target_count = 1
if queue_name == "notify_tornado": if queue_name == "notify_tornado":
target_count = TORNADO_PROCESSES target_count = TORNADO_PROCESSES
elif queue_name == "missedmessage_mobile_notifications":
target_count = int(
get_config(config_file, "application_server", "mobile_notification_shards", "1")
)
if consumers[queue_name] < target_count: if consumers[queue_name] < target_count:
status = 2 status = 2

View File

@ -3,6 +3,7 @@ from collections import defaultdict
from dataclasses import asdict, dataclass, field from dataclasses import asdict, dataclass, field
from typing import List, Optional, Set from typing import List, Optional, Set
from django.conf import settings
from django.db import transaction from django.db import transaction
from django.db.models import F from django.db.models import F
from django.utils.timezone import now as timezone_now from django.utils.timezone import now as timezone_now
@ -247,14 +248,16 @@ def do_clear_mobile_push_notifications_for_ids(
messages_by_user[user_id].append(message_id) messages_by_user[user_id].append(message_id)
for user_profile_id, event_message_ids in messages_by_user.items(): for user_profile_id, event_message_ids in messages_by_user.items():
queue_json_publish( notice = {
"missedmessage_mobile_notifications", "type": "remove",
{ "user_profile_id": user_profile_id,
"type": "remove", "message_ids": event_message_ids,
"user_profile_id": user_profile_id, }
"message_ids": event_message_ids, if settings.MOBILE_NOTIFICATIONS_SHARDS > 1: # nocoverage
}, shard_id = user_profile_id % settings.MOBILE_NOTIFICATIONS_SHARDS + 1
) queue_json_publish(f"missedmessage_mobile_notifications_shard{shard_id}", notice)
else:
queue_json_publish("missedmessage_mobile_notifications", notice)
def do_update_message_flags( def do_update_message_flags(

View File

@ -959,7 +959,13 @@ def maybe_enqueue_notifications(
notice["type"] = "add" notice["type"] = "add"
notice["mentioned_user_group_id"] = mentioned_user_group_id notice["mentioned_user_group_id"] = mentioned_user_group_id
if not already_notified.get("push_notified"): if not already_notified.get("push_notified"):
queue_json_publish("missedmessage_mobile_notifications", notice) if settings.MOBILE_NOTIFICATIONS_SHARDS > 1:
shard_id = (
user_notifications_data.user_id % settings.MOBILE_NOTIFICATIONS_SHARDS + 1
)
queue_json_publish(f"missedmessage_mobile_notifications_shard{shard_id}", notice)
else:
queue_json_publish("missedmessage_mobile_notifications", notice)
notified["push_notified"] = True notified["push_notified"] = True
# Send missed_message emails if a direct message or a # Send missed_message emails if a direct message or a

View File

@ -1,7 +1,8 @@
# Documented in https://zulip.readthedocs.io/en/latest/subsystems/queuing.html # Documented in https://zulip.readthedocs.io/en/latest/subsystems/queuing.html
import logging import logging
from typing import Any, Dict from typing import Any, Dict, Optional
from django.conf import settings
from typing_extensions import override from typing_extensions import override
from zerver.lib.push_notifications import ( from zerver.lib.push_notifications import (
@ -23,6 +24,17 @@ class PushNotificationsWorker(QueueProcessingWorker):
# play well with asyncio. # play well with asyncio.
MAX_CONSUME_SECONDS = None MAX_CONSUME_SECONDS = None
@override
def __init__(
self,
threaded: bool = False,
disable_timeout: bool = False,
worker_num: Optional[int] = None,
) -> None:
if settings.MOBILE_NOTIFICATIONS_SHARDS > 1 and worker_num is not None: # nocoverage
self.queue_name = self.queue_name + f"_shard{worker_num}"
super().__init__(threaded, disable_timeout, worker_num)
@override @override
def start(self) -> None: def start(self) -> None:
# initialize_push_notifications doesn't strictly do anything # initialize_push_notifications doesn't strictly do anything

View File

@ -1176,6 +1176,10 @@ CROSS_REALM_BOT_EMAILS = {
"emailgateway@zulip.com", "emailgateway@zulip.com",
} }
MOBILE_NOTIFICATIONS_SHARDS = int(
get_config("application_server", "mobile_notification_shards", "1")
)
TWO_FACTOR_PATCH_ADMIN = False TWO_FACTOR_PATCH_ADMIN = False
# Allow the environment to override the default DSN # Allow the environment to override the default DSN