mirror of https://github.com/zulip/zulip.git
292 lines
13 KiB
Python
292 lines
13 KiB
Python
import time
|
|
from datetime import datetime, timedelta
|
|
|
|
from django.conf import settings
|
|
from django.db import connection, transaction
|
|
from psycopg2 import sql
|
|
|
|
from zerver.actions.user_activity import update_user_activity_interval
|
|
from zerver.lib.presence import (
|
|
format_legacy_presence_dict,
|
|
user_presence_datetime_with_date_joined_default,
|
|
)
|
|
from zerver.lib.queue import queue_json_publish
|
|
from zerver.lib.timestamp import datetime_to_timestamp
|
|
from zerver.lib.users import get_user_ids_who_can_access_user
|
|
from zerver.models import Client, UserPresence, UserProfile
|
|
from zerver.models.clients import get_client
|
|
from zerver.models.users import active_user_ids
|
|
from zerver.tornado.django_api import send_event
|
|
|
|
|
|
def send_presence_changed(
|
|
user_profile: UserProfile, presence: UserPresence, *, force_send_update: bool = False
|
|
) -> None:
|
|
# Most presence data is sent to clients in the main presence
|
|
# endpoint in response to the user's own presence; this results
|
|
# data that is 1-2 minutes stale for who is online. The flaw with
|
|
# this plan is when a user comes back online and then immediately
|
|
# sends a message, recipients may still see that user as offline!
|
|
# We solve that by sending an immediate presence update clients.
|
|
#
|
|
# See https://zulip.readthedocs.io/en/latest/subsystems/presence.html for
|
|
# internals documentation on presence.
|
|
if settings.CAN_ACCESS_ALL_USERS_GROUP_LIMITS_PRESENCE:
|
|
user_ids = get_user_ids_who_can_access_user(user_profile)
|
|
else:
|
|
user_ids = active_user_ids(user_profile.realm_id)
|
|
|
|
if (
|
|
len(user_ids) > settings.USER_LIMIT_FOR_SENDING_PRESENCE_UPDATE_EVENTS
|
|
and not force_send_update
|
|
):
|
|
# These immediate presence generate quadratic work for Tornado
|
|
# (linear number of users in each event and the frequency of
|
|
# users coming online grows linearly with userbase too). In
|
|
# organizations with thousands of users, this can overload
|
|
# Tornado, especially if much of the realm comes online at the
|
|
# same time.
|
|
#
|
|
# The utility of these live-presence updates goes down as
|
|
# organizations get bigger (since one is much less likely to
|
|
# be paying attention to the sidebar); so beyond a limit, we
|
|
# stop sending them at all.
|
|
return
|
|
|
|
last_active_time = user_presence_datetime_with_date_joined_default(
|
|
presence.last_active_time, user_profile.date_joined
|
|
)
|
|
last_connected_time = user_presence_datetime_with_date_joined_default(
|
|
presence.last_connected_time, user_profile.date_joined
|
|
)
|
|
|
|
# The mobile app handles these events so we need to use the old format.
|
|
# The format of the event should also account for the slim_presence
|
|
# API parameter when this becomes possible in the future.
|
|
presence_dict = format_legacy_presence_dict(last_active_time, last_connected_time)
|
|
event = dict(
|
|
type="presence",
|
|
email=user_profile.email,
|
|
user_id=user_profile.id,
|
|
server_timestamp=time.time(),
|
|
presence={presence_dict["client"]: presence_dict},
|
|
)
|
|
send_event(user_profile.realm, event, user_ids)
|
|
|
|
|
|
def consolidate_client(client: Client) -> Client:
|
|
# The web app reports a client as 'website'
|
|
# The desktop app reports a client as ZulipDesktop
|
|
# due to it setting a custom user agent. We want both
|
|
# to count as web users
|
|
|
|
# Alias ZulipDesktop to website
|
|
if client.name in ["ZulipDesktop"]:
|
|
return get_client("website")
|
|
else:
|
|
return client
|
|
|
|
|
|
# This function takes a very hot lock on the PresenceSequence row for the user's realm.
|
|
# Since all presence updates in the realm all compete for this lock, we need to be
|
|
# maximally efficient and only hold it as briefly as possible.
|
|
# For that reason, we need durable=True to ensure we're not running inside a larger
|
|
# transaction, which may stay alive longer than we'd like, holding the lock.
|
|
@transaction.atomic(durable=True)
|
|
def do_update_user_presence(
|
|
user_profile: UserProfile,
|
|
client: Client,
|
|
log_time: datetime,
|
|
status: int,
|
|
*,
|
|
force_send_update: bool = False,
|
|
) -> None:
|
|
# This function requires some careful handling around setting the
|
|
# last_update_id field when updatng UserPresence objects. See the
|
|
# PresenceSequence model and the comments throughout the code for more details.
|
|
|
|
client = consolidate_client(client)
|
|
|
|
# If the user doesn't have a UserPresence row yet, we create one with
|
|
# sensible defaults. If we're getting a presence update, clearly the user
|
|
# at least connected, so last_connected_time should be set. last_active_time
|
|
# will depend on whether the status sent is idle or active.
|
|
defaults = dict(
|
|
last_active_time=None,
|
|
last_connected_time=log_time,
|
|
realm_id=user_profile.realm_id,
|
|
)
|
|
if status == UserPresence.LEGACY_STATUS_ACTIVE_INT:
|
|
defaults["last_active_time"] = log_time
|
|
|
|
try:
|
|
presence = UserPresence.objects.select_for_update().get(user_profile=user_profile)
|
|
creating = False
|
|
except UserPresence.DoesNotExist:
|
|
# We're not ready to write until we know the next last_update_id value.
|
|
# We don't want to hold the lock on PresenceSequence for too long,
|
|
# so we defer that until the last moment.
|
|
# Create the presence object in-memory only for now.
|
|
presence = UserPresence(**defaults, user_profile=user_profile)
|
|
creating = True
|
|
|
|
# We initialize these values as a large delta so that if the user
|
|
# was never active, we always treat the user as newly online.
|
|
time_since_last_active_for_comparison = timedelta(days=1)
|
|
time_since_last_connected_for_comparison = timedelta(days=1)
|
|
if presence.last_active_time is not None:
|
|
time_since_last_active_for_comparison = log_time - presence.last_active_time
|
|
if presence.last_connected_time is not None:
|
|
time_since_last_connected_for_comparison = log_time - presence.last_connected_time
|
|
|
|
assert (3 * settings.PRESENCE_PING_INTERVAL_SECS + 20) <= settings.OFFLINE_THRESHOLD_SECS
|
|
now_online = time_since_last_active_for_comparison > timedelta(
|
|
# Here, we decide whether the user is newly online, and we need to consider
|
|
# sending an immediate presence update via the events system that this user is now online,
|
|
# rather than waiting for other clients to poll the presence update.
|
|
# Sending these presence update events adds load to the system, so we only want to do this
|
|
# if the user has missed a couple regular presence check-ins
|
|
# (so their state is at least 2 * PRESENCE_PING_INTERVAL_SECS + 10 old),
|
|
# and also is under the risk of being shown by clients as offline before the next regular presence check-in
|
|
# (so at least `settings.OFFLINE_THRESHOLD_SECS - settings.PRESENCE_PING_INTERVAL_SECS - 10`).
|
|
# These two values happen to be the same in the default configuration.
|
|
seconds=settings.OFFLINE_THRESHOLD_SECS - settings.PRESENCE_PING_INTERVAL_SECS - 10
|
|
)
|
|
became_online = status == UserPresence.LEGACY_STATUS_ACTIVE_INT and now_online
|
|
|
|
update_fields = []
|
|
|
|
# This check is to prevent updating `last_connected_time` several
|
|
# times per minute with multiple connected browser windows.
|
|
# We also need to be careful not to wrongly "update" the timestamp if we actually already
|
|
# have newer presence than the reported log_time.
|
|
if not creating and time_since_last_connected_for_comparison > timedelta(
|
|
seconds=settings.PRESENCE_UPDATE_MIN_FREQ_SECONDS
|
|
):
|
|
presence.last_connected_time = log_time
|
|
update_fields.append("last_connected_time")
|
|
if (
|
|
not creating
|
|
and status == UserPresence.LEGACY_STATUS_ACTIVE_INT
|
|
and time_since_last_active_for_comparison
|
|
> timedelta(seconds=settings.PRESENCE_UPDATE_MIN_FREQ_SECONDS)
|
|
):
|
|
presence.last_active_time = log_time
|
|
update_fields.append("last_active_time")
|
|
if presence.last_connected_time is None or log_time > presence.last_connected_time:
|
|
# Update last_connected_time as well to ensure
|
|
# last_connected_time >= last_active_time.
|
|
presence.last_connected_time = log_time
|
|
update_fields.append("last_connected_time")
|
|
|
|
# WARNING: Delicate, performance-sensitive block.
|
|
|
|
# It's time to determine last_update_id and update the presence object in the database.
|
|
# This briefly takes the crucial lock on the PresenceSequence row for the user's realm.
|
|
# We're doing this in a single SQL query to avoid any unnecessary overhead, in particular
|
|
# database round-trips.
|
|
# We're also intentionally doing this at the very end of the function, at the last step
|
|
# before the transaction commits. This ensures the lock is held for the shortest
|
|
# time possible.
|
|
# Note: The lock isn't acquired explicitly via something like SELECT FOR UPDATE,
|
|
# but rather we rely on the UPDATE statement taking an implicit row lock.
|
|
|
|
# Equivalent Python code:
|
|
# if creating or len(update_fields) > 0:
|
|
# presence_sequence = PresenceSequence.objects.select_for_update().get(realm_id=user_profile.realm_id)
|
|
# new_last_update_id = presence_sequence.last_update_id + 1
|
|
# presence_sequence.last_update_id = new_last_update_id
|
|
# if creating:
|
|
# presence.last_update_id = new_last_update_id
|
|
# presence.save()
|
|
# elif len(update_fields) > 0:
|
|
# presence.last_update_id = new_last_update_id
|
|
# presence.save(update_fields=[*update_fields, "last_update_id"])
|
|
# presence_sequence.save(update_fields=["last_update_id"])
|
|
# But let's do it in a single, direct SQL query instead.
|
|
|
|
if creating or len(update_fields) > 0:
|
|
query = sql.SQL("""
|
|
WITH new_last_update_id AS (
|
|
UPDATE zerver_presencesequence
|
|
SET last_update_id = last_update_id + 1
|
|
WHERE realm_id = {realm_id}
|
|
RETURNING last_update_id
|
|
)
|
|
""").format(realm_id=sql.Literal(user_profile.realm_id))
|
|
|
|
if creating:
|
|
# There's a small possibility of a race where a different process may have
|
|
# already created a row for this user. Given the extremely close timing
|
|
# of these events, there's no clear reason to prefer one over the other,
|
|
# so we just go with the most direct approach of DO UPDATE, so that the
|
|
# last event emitted (the one coming from our process, since we're the slower one)
|
|
# matches the created presence state.
|
|
# TODO: Might be worth changing this to DO NOTHING instead with a bit of extra logic
|
|
# to skip emitting an event in such a scenario.
|
|
query += sql.SQL("""
|
|
INSERT INTO zerver_userpresence (user_profile_id, last_active_time, last_connected_time, realm_id, last_update_id)
|
|
VALUES ({user_profile_id}, {last_active_time}, {last_connected_time}, {realm_id}, (SELECT last_update_id FROM new_last_update_id))
|
|
ON CONFLICT (user_profile_id) DO UPDATE SET
|
|
last_active_time = EXCLUDED.last_active_time,
|
|
last_connected_time = EXCLUDED.last_connected_time,
|
|
realm_id = EXCLUDED.realm_id,
|
|
last_update_id = EXCLUDED.last_update_id;
|
|
""").format(
|
|
user_profile_id=sql.Literal(user_profile.id),
|
|
last_active_time=sql.Literal(presence.last_active_time),
|
|
last_connected_time=sql.Literal(presence.last_connected_time),
|
|
realm_id=sql.Literal(user_profile.realm_id),
|
|
)
|
|
else:
|
|
assert len(update_fields) > 0
|
|
update_fields_segment = sql.SQL(", ").join(
|
|
sql.SQL("{field} = {value} ").format(
|
|
field=sql.Identifier(field), value=sql.Literal(getattr(presence, field))
|
|
)
|
|
for field in update_fields
|
|
)
|
|
query += sql.SQL("""
|
|
UPDATE zerver_userpresence
|
|
SET {update_fields_segment}, last_update_id = (SELECT last_update_id FROM new_last_update_id)
|
|
WHERE id = {presence_id}
|
|
""").format(
|
|
update_fields_segment=update_fields_segment, presence_id=sql.Literal(presence.id)
|
|
)
|
|
|
|
with connection.cursor() as cursor:
|
|
cursor.execute(query)
|
|
|
|
if force_send_update or (
|
|
not user_profile.realm.presence_disabled and (creating or became_online)
|
|
):
|
|
# We do the transaction.on_commit here, rather than inside
|
|
# send_presence_changed, to help keep presence transactions
|
|
# brief; the active_user_ids call there is more expensive than
|
|
# this whole function.
|
|
transaction.on_commit(
|
|
lambda: send_presence_changed(
|
|
user_profile, presence, force_send_update=force_send_update
|
|
)
|
|
)
|
|
|
|
|
|
def update_user_presence(
|
|
user_profile: UserProfile,
|
|
client: Client,
|
|
log_time: datetime,
|
|
status: int,
|
|
new_user_input: bool,
|
|
) -> None:
|
|
event = {
|
|
"user_profile_id": user_profile.id,
|
|
"status": status,
|
|
"time": datetime_to_timestamp(log_time),
|
|
"client": client.name,
|
|
}
|
|
|
|
queue_json_publish("user_presence", event)
|
|
|
|
if new_user_input:
|
|
update_user_activity_interval(user_profile, log_time)
|