zulip/zerver/lib/presence.py

200 lines
6.8 KiB
Python

import datetime
import itertools
import time
from collections import defaultdict
from typing import Any, Dict, List, Set
from django.utils.timezone import now as timezone_now
from zerver.lib.timestamp import datetime_to_timestamp
from zerver.models import PushDeviceToken, Realm, UserPresence, UserProfile, query_for_ids
def get_status_dicts_for_rows(all_rows: List[Dict[str, Any]],
mobile_user_ids: Set[int],
slim_presence: bool) -> Dict[str, Dict[str, Any]]:
# Note that datetime values have sub-second granularity, which is
# mostly important for avoiding test flakes, but it's also technically
# more precise for real users.
# We could technically do this sort with the database, but doing it
# here prevents us from having to assume the caller is playing nice.
all_rows = sorted(
all_rows,
key = lambda row: (row['user_profile__id'], row['timestamp']),
)
if slim_presence:
# Stringify user_id here, since it's gonna be turned
# into a string anyway by JSON, and it keeps mypy happy.
get_user_key = lambda row: str(row['user_profile__id'])
get_user_info = get_modern_user_info
else:
get_user_key = lambda row: row['user_profile__email']
get_user_info = get_legacy_user_info
user_statuses: Dict[str, Dict[str, Any]] = {}
for user_key, presence_rows in itertools.groupby(all_rows, get_user_key):
info = get_user_info(
list(presence_rows),
mobile_user_ids=mobile_user_ids,
)
user_statuses[user_key] = info
return user_statuses
def get_modern_user_info(presence_rows: List[Dict[str, Any]],
mobile_user_ids: Set[int]) -> Dict[str, Any]:
active_timestamp = None
for row in reversed(presence_rows):
if row['status'] == UserPresence.ACTIVE:
active_timestamp = datetime_to_timestamp(
row['timestamp'])
break
idle_timestamp = None
for row in reversed(presence_rows):
if row['status'] == UserPresence.IDLE:
idle_timestamp = datetime_to_timestamp(
row['timestamp'])
break
# Be stingy about bandwidth, and don't even include
# keys for entities that have None values. JS
# code should just do a falsy check here.
result = {}
if active_timestamp is not None:
result['active_timestamp'] = active_timestamp
if idle_timestamp is not None:
result['idle_timestamp'] = idle_timestamp
return result
def get_legacy_user_info(presence_rows: List[Dict[str, Any]],
mobile_user_ids: Set[int]) -> Dict[str, Any]:
# The format of data here is for legacy users of our API,
# including old versions of the mobile app.
info_rows = []
for row in presence_rows:
client_name = row['client__name']
status = UserPresence.status_to_string(row['status'])
dt = row['timestamp']
timestamp = datetime_to_timestamp(dt)
push_enabled = row['user_profile__enable_offline_push_notifications']
has_push_devices = row['user_profile__id'] in mobile_user_ids
pushable = (push_enabled and has_push_devices)
info = dict(
client=client_name,
status=status,
timestamp=timestamp,
pushable=pushable,
)
info_rows.append(info)
most_recent_info = info_rows[-1]
result = {}
# The word "aggegrated" here is possibly misleading.
# It's really just the most recent client's info.
result['aggregated'] = dict(
client=most_recent_info['client'],
status=most_recent_info['status'],
timestamp=most_recent_info['timestamp'],
)
# Build a dictionary of client -> info. There should
# only be one row per client, but to be on the safe side,
# we always overwrite with rows that are later in our list.
for info in info_rows:
result[info['client']] = info
return result
def get_presence_for_user(user_profile_id: int,
slim_presence: bool=False) -> Dict[str, Dict[str, Any]]:
query = UserPresence.objects.filter(user_profile_id=user_profile_id).values(
'client__name',
'status',
'timestamp',
'user_profile__email',
'user_profile__id',
'user_profile__enable_offline_push_notifications',
)
presence_rows = list(query)
mobile_user_ids: Set[int] = set()
if PushDeviceToken.objects.filter(user_id=user_profile_id).exists(): # nocoverage
# TODO: Add a test, though this is low priority, since we don't use mobile_user_ids yet.
mobile_user_ids.add(user_profile_id)
return get_status_dicts_for_rows(presence_rows, mobile_user_ids, slim_presence)
def get_status_dict_by_realm(realm_id: int, slim_presence: bool = False) -> Dict[str, Dict[str, Any]]:
two_weeks_ago = timezone_now() - datetime.timedelta(weeks=2)
query = UserPresence.objects.filter(
realm_id=realm_id,
timestamp__gte=two_weeks_ago,
user_profile__is_active=True,
user_profile__is_bot=False,
).values(
'client__name',
'status',
'timestamp',
'user_profile__email',
'user_profile__id',
'user_profile__enable_offline_push_notifications',
)
presence_rows = list(query)
mobile_query = PushDeviceToken.objects.distinct(
'user_id',
).values_list(
'user_id',
flat=True,
)
user_profile_ids = [presence_row['user_profile__id'] for presence_row in presence_rows]
if len(user_profile_ids) == 0:
# This conditional is necessary because query_for_ids
# throws an exception if passed an empty list.
#
# It's not clear this condition is actually possible,
# though, because it shouldn't be possible to end up with
# a realm with 0 active users.
return {}
mobile_query = query_for_ids(
query=mobile_query,
user_ids=user_profile_ids,
field='user_id',
)
mobile_user_ids = set(mobile_query)
return get_status_dicts_for_rows(presence_rows, mobile_user_ids, slim_presence)
def get_presences_for_realm(realm: Realm,
slim_presence: bool) -> Dict[str, Dict[str, Dict[str, Any]]]:
if realm.presence_disabled:
# Return an empty dict if presence is disabled in this realm
return defaultdict(dict)
return get_status_dict_by_realm(realm.id, slim_presence)
def get_presence_response(requesting_user_profile: UserProfile,
slim_presence: bool) -> Dict[str, Any]:
realm = requesting_user_profile.realm
server_timestamp = time.time()
presences = get_presences_for_realm(realm, slim_presence)
return dict(presences=presences, server_timestamp=server_timestamp)