thumbnail: Add support for multiple queue workers.

There's no need for sharding, but this allows one to spend a bit of
extra memory to reduce image-processing latency when bursts of images
are uploaded at once.
This commit is contained in:
Tim Abbott 2024-07-21 18:26:12 -07:00
parent 14880b2b59
commit 54c2c02011
5 changed files with 28 additions and 1 deletions

View File

@ -167,6 +167,14 @@ immutable, this serves only as a potential additional limit on the
size of the contents on disk; `s3_disk_cache_size` is expected to be size of the contents on disk; `s3_disk_cache_size` is expected to be
the primary control for cache sizing. the primary control for cache sizing.
#### `thumbnail_workers`
How many image-thumbnailing workers to run. Defaults to 1; adding more
workers can prevent the image-thumbnailing queue backlogging when
large numbers of very large image files are uploaded at once. (When
backlogged, image previews for images that have not yet been
thumbnailed will appear as loading spinners).
#### `nameserver` #### `nameserver`
When the [S3 storage backend][s3-backend] is in use, downloads from S3 are When the [S3 storage backend][s3-backend] is in use, downloads from S3 are

View File

@ -158,6 +158,10 @@ class zulip::app_frontend_base {
} else { } else {
$uwsgi_default_processes = 3 $uwsgi_default_processes = 3
} }
# Not the different naming scheme for sharded workers, where each gets its own queue,
# vs when multiple workers service the same queue.
$thumbnail_workers = Integer(zulipconf('application_server', 'thumbnail_workers', 1))
$mobile_notification_shards = Integer(zulipconf('application_server', 'mobile_notification_shards', 1)) $mobile_notification_shards = Integer(zulipconf('application_server', 'mobile_notification_shards', 1))
$tornado_ports = $zulip::tornado_sharding::tornado_ports $tornado_ports = $zulip::tornado_sharding::tornado_ports

View File

@ -67,6 +67,12 @@ command=nice -n10 /home/zulip/deployments/current/manage.py process_queue --queu
stdout_logfile=/var/log/zulip/events_<%= queue %>_shard%(process_num)s.log ; stdout log path, NONE for none; default AUTO stdout_logfile=/var/log/zulip/events_<%= queue %>_shard%(process_num)s.log ; stdout log path, NONE for none; default AUTO
numprocs=<%= @mobile_notification_shards %> numprocs=<%= @mobile_notification_shards %>
numprocs_start=1 numprocs_start=1
<% elsif queue == "thumbnail" and @thumbnail_workers > 1 -%>
process_name=zulip_events_<%= queue %>_worker%(process_num)s
command=nice -n10 /home/zulip/deployments/current/manage.py process_queue --queue_name=<%= queue %> --skip-checks --worker_num %(process_num)s
stdout_logfile=/var/log/zulip/events_<%= queue %>_worker%(process_num)s.log ; stdout log path, NONE for none; default AUTO
numprocs=<%= @thumbnail_workers %>
numprocs_start=1
<% else -%> <% else -%>
command=nice -n10 /home/zulip/deployments/current/manage.py process_queue --queue_name=<%= queue %> --skip-checks command=nice -n10 /home/zulip/deployments/current/manage.py process_queue --queue_name=<%= queue %> --skip-checks
stdout_logfile=/var/log/zulip/events_<%= queue %>.log ; stdout log path, NONE for none; default AUTO stdout_logfile=/var/log/zulip/events_<%= queue %>.log ; stdout log path, NONE for none; default AUTO

View File

@ -163,12 +163,17 @@ def check_rabbitmq_queues() -> None:
text=True, text=True,
).strip() ).strip()
queue_stats: dict[str, dict[str, Any]] = {} queue_stats: dict[str, dict[str, Any]] = {}
check_queues = normal_queues check_queues = normal_queues
if mobile_notification_shards > 1: if mobile_notification_shards > 1:
# For sharded queue workers, where there's a separate queue
# for each shard, we need to make sure none of those are
# backlogged.
check_queues += [ check_queues += [
f"missedmessage_mobile_notifications_shard{d}" f"missedmessage_mobile_notifications_shard{d}"
for d in range(1, mobile_notification_shards + 1) for d in range(1, mobile_notification_shards + 1)
] ]
queues_to_check = set(check_queues).intersection(set(queues_with_consumers)) queues_to_check = set(check_queues).intersection(set(queues_with_consumers))
for queue in queues_to_check: for queue in queues_to_check:
fn = queue + ".stats" fn = queue + ".stats"

View File

@ -47,6 +47,8 @@ for line in output.split("\n"):
queue_name = parts[0] queue_name = parts[0]
if queue_name.startswith("notify_tornado_"): if queue_name.startswith("notify_tornado_"):
queue_name = "notify_tornado" queue_name = "notify_tornado"
# Collapse sharded queues into a single queue for this check.
queue_name = re.sub(r"_shard(\d+)", "", queue_name) queue_name = re.sub(r"_shard(\d+)", "", queue_name)
consumers[queue_name] += 1 consumers[queue_name] += 1
@ -60,6 +62,8 @@ for queue_name in consumers:
target_count = int( target_count = int(
get_config(config_file, "application_server", "mobile_notification_shards", "1") get_config(config_file, "application_server", "mobile_notification_shards", "1")
) )
elif queue_name == "thumbnail":
target_count = int(get_config(config_file, "application_server", "thumbnail_workers", "1"))
atomic_nagios_write( atomic_nagios_write(
"check-rabbitmq-consumers-" + queue_name, "check-rabbitmq-consumers-" + queue_name,