presence: Remove the queue worker.

This commit is contained in:
Mateusz Mandera 2024-07-31 22:17:53 +02:00 committed by Tim Abbott
parent 8570cd81b1
commit aaca394813
8 changed files with 16 additions and 56 deletions

View File

@ -91,7 +91,6 @@ zulip-workers:zulip_events_missedmessage_mobile_notifications RUNNING pid 11
zulip-workers:zulip_events_outgoing_webhooks RUNNING pid 11358, uptime 19:40:17 zulip-workers:zulip_events_outgoing_webhooks RUNNING pid 11358, uptime 19:40:17
zulip-workers:zulip_events_user_activity RUNNING pid 11365, uptime 19:40:14 zulip-workers:zulip_events_user_activity RUNNING pid 11365, uptime 19:40:14
zulip-workers:zulip_events_user_activity_interval RUNNING pid 11376, uptime 19:40:11 zulip-workers:zulip_events_user_activity_interval RUNNING pid 11376, uptime 19:40:11
zulip-workers:zulip_events_user_presence RUNNING pid 11384, uptime 19:40:08
``` ```
If you see any services showing a status other than `RUNNING`, or you If you see any services showing a status other than `RUNNING`, or you

View File

@ -83,7 +83,7 @@ apt and then run:
```bash ```bash
amqp-delete-queue --username=zulip --password='...' --server=localhost \ amqp-delete-queue --username=zulip --password='...' --server=localhost \
--queue=user_presence --queue=user_activity
``` ```
with the RabbitMQ password from `/etc/zulip/zulip-secrets.conf`. with the RabbitMQ password from `/etc/zulip/zulip-secrets.conf`.

View File

@ -394,12 +394,6 @@ define service {
check_command check_rabbitmq_consumers!user_activity_interval check_command check_rabbitmq_consumers!user_activity_interval
} }
define service {
use rabbitmq-consumer-service
service_description Check RabbitMQ user_presence consumers
check_command check_rabbitmq_consumers!user_presence
}
define service { define service {
use generic-service use generic-service
service_description Check worker memory usage service_description Check worker memory usage

View File

@ -144,7 +144,6 @@ class zulip::app_frontend_base {
'thumbnail', 'thumbnail',
'user_activity', 'user_activity',
'user_activity_interval', 'user_activity_interval',
'user_presence',
] ]
if $zulip::common::total_memory_mb > 24000 { if $zulip::common::total_memory_mb > 24000 {

View File

@ -25,7 +25,6 @@ normal_queues = [
"thumbnail", "thumbnail",
"user_activity", "user_activity",
"user_activity_interval", "user_activity_interval",
"user_presence",
] ]
mobile_notification_shards = int( mobile_notification_shards = int(

View File

@ -11,8 +11,6 @@ from zerver.lib.presence import (
format_legacy_presence_dict, format_legacy_presence_dict,
user_presence_datetime_with_date_joined_default, user_presence_datetime_with_date_joined_default,
) )
from zerver.lib.queue import queue_json_publish
from zerver.lib.timestamp import datetime_to_timestamp
from zerver.lib.users import get_user_ids_who_can_access_user from zerver.lib.users import get_user_ids_who_can_access_user
from zerver.models import Client, UserPresence, UserProfile from zerver.models import Client, UserPresence, UserProfile
from zerver.models.clients import get_client from zerver.models.clients import get_client
@ -286,14 +284,12 @@ def update_user_presence(
status: int, status: int,
new_user_input: bool, new_user_input: bool,
) -> None: ) -> None:
event = { logger.debug(
"user_profile_id": user_profile.id, "Processing presence update for user %s, client %s, status %s",
"status": status, user_profile.id,
"time": datetime_to_timestamp(log_time), client,
"client": client.name, status,
} )
do_update_user_presence(user_profile, client, log_time, status)
queue_json_publish("user_presence", event)
if new_user_input: if new_user_input:
update_user_activity_interval(user_profile, log_time) update_user_activity_interval(user_profile, log_time)

View File

@ -263,9 +263,6 @@ class UserPresenceTests(ZulipTestCase):
# In tests, the presence update is processed immediately rather than in the background # In tests, the presence update is processed immediately rather than in the background
# in the queue worker, so we see it reflected immediately. # in the queue worker, so we see it reflected immediately.
# In production, our presence update may be processed with some delay, so the last_update_id
# might not include it yet. In such a case, we'd see the original value of -1 returned,
# due to there being no new data to return.
last_update_id = UserPresence.objects.latest("last_update_id").last_update_id last_update_id = UserPresence.objects.latest("last_update_id").last_update_id
self.assertEqual(json["presence_last_update_id"], last_update_id) self.assertEqual(json["presence_last_update_id"], last_update_id)
@ -323,12 +320,14 @@ class UserPresenceTests(ZulipTestCase):
UserPresence.objects.all().delete() UserPresence.objects.all().delete()
params = dict(status="idle", last_update_id=-1) params = dict(status="idle", last_update_id=-1)
# Make do_update_user_presence a noop. This simulates a production-like environment # Make do_update_user_presence a noop. This simulates a scenario as if there
# where the update is processed in a queue worker, so hamlet may not see his update # is no presence data.
# reflected back to him in the response. Therefore it is as if there is no presence # This is not a realistic situation, because the presence update that the user
# data. # is making will by itself bump the last_update_id which will be reflected
# here in the response - but it's still good to test the code is robust
# and works fine in such an edge case.
# In such a situation, he should get his last_update_id=-1 back. # In such a situation, he should get his last_update_id=-1 back.
with mock.patch("zerver.worker.user_presence.do_update_user_presence"): with mock.patch("zerver.actions.presence.do_update_user_presence"):
result = self.client_post("/json/users/me/presence", params) result = self.client_post("/json/users/me/presence", params)
json = self.assert_json_success(result) json = self.assert_json_success(result)
@ -340,7 +339,7 @@ class UserPresenceTests(ZulipTestCase):
# like an old slim_presence client would due to an implementation # like an old slim_presence client would due to an implementation
# prior to the introduction of last_update_id. # prior to the introduction of last_update_id.
params = dict(status="idle") params = dict(status="idle")
with mock.patch("zerver.worker.user_presence.do_update_user_presence"): with mock.patch("zerver.actions.presence.do_update_user_presence"):
result = self.client_post("/json/users/me/presence", params) result = self.client_post("/json/users/me/presence", params)
json = self.assert_json_success(result) json = self.assert_json_success(result)
self.assertEqual(set(json["presences"].keys()), set()) self.assertEqual(set(json["presences"].keys()), set())

View File

@ -1,26 +0,0 @@
# Documented in https://zulip.readthedocs.io/en/latest/subsystems/queuing.html
import logging
from collections.abc import Mapping
from typing import Any
from typing_extensions import override
from zerver.actions.presence import do_update_user_presence
from zerver.lib.timestamp import timestamp_to_datetime
from zerver.models.clients import get_client
from zerver.models.users import get_user_profile_by_id
from zerver.worker.base import QueueProcessingWorker, assign_queue
logger = logging.getLogger(__name__)
@assign_queue("user_presence")
class UserPresenceWorker(QueueProcessingWorker):
@override
def consume(self, event: Mapping[str, Any]) -> None:
logging.debug("Received presence event: %s", event)
user_profile = get_user_profile_by_id(event["user_profile_id"])
client = get_client(event["client"])
log_time = timestamp_to_datetime(event["time"])
status = event["status"]
do_update_user_presence(user_profile, client, log_time, status)