zulip/zerver/lib/home.py

217 lines
7.3 KiB
Python
Raw Normal View History

import calendar
import time
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Tuple
from django.conf import settings
from django.http import HttpRequest
from django.utils import translation
from two_factor.utils import default_device
from zerver.lib.events import do_events_register
from zerver.lib.i18n import (
get_language_list,
get_language_list_for_templates,
get_language_name,
get_language_translation_data,
)
from zerver.models import Message, Realm, Stream, UserProfile
from zerver.views.message_flags import get_latest_update_message_flag_activity
@dataclass
class BillingInfo:
show_billing: bool
show_plans: bool
@dataclass
class UserPermissionInfo:
color_scheme: int
is_guest: bool
is_realm_admin: bool
is_realm_owner: bool
show_webathena: bool
def get_furthest_read_time(user_profile: Optional[UserProfile]) -> Optional[float]:
if user_profile is None:
return time.time()
user_activity = get_latest_update_message_flag_activity(user_profile)
if user_activity is None:
return None
return calendar.timegm(user_activity.last_visit.utctimetuple())
def get_bot_types(user_profile: Optional[UserProfile]) -> List[Dict[str, object]]:
bot_types: List[Dict[str, object]] = []
if user_profile is None: # nocoverage
return bot_types
for type_id, name in UserProfile.BOT_TYPES.items():
bot_types.append(
dict(
type_id=type_id,
name=name,
allowed=type_id in user_profile.allowed_bot_types,
)
)
return bot_types
def get_billing_info(user_profile: UserProfile) -> BillingInfo:
show_billing = False
show_plans = False
if settings.CORPORATE_ENABLED and user_profile is not None:
if user_profile.has_billing_access:
from corporate.models import CustomerPlan, get_customer_by_realm
customer = get_customer_by_realm(user_profile.realm)
if customer is not None:
if customer.sponsorship_pending:
show_billing = True
elif CustomerPlan.objects.filter(customer=customer).exists():
show_billing = True
if not user_profile.is_guest and user_profile.realm.plan_type == Realm.LIMITED:
show_plans = True
return BillingInfo(show_billing=show_billing, show_plans=show_plans)
def get_user_permission_info(user_profile: Optional[UserProfile]) -> UserPermissionInfo:
if user_profile is not None:
return UserPermissionInfo(
color_scheme=user_profile.color_scheme,
is_guest=user_profile.is_guest,
is_realm_owner=user_profile.is_realm_owner,
is_realm_admin=user_profile.is_realm_admin,
show_webathena=user_profile.realm.webathena_enabled,
)
else: # nocoverage
return UserPermissionInfo(
color_scheme=UserProfile.COLOR_SCHEME_AUTOMATIC,
is_guest=False,
is_realm_admin=False,
is_realm_owner=False,
show_webathena=False,
)
def build_page_params_for_home_page_load(
request: HttpRequest,
user_profile: UserProfile,
realm: Realm,
insecure_desktop_app: bool,
has_mobile_devices: bool,
narrow: List[List[str]],
narrow_stream: Optional[Stream],
narrow_topic: Optional[str],
first_in_realm: bool,
prompt_for_invites: bool,
needs_tutorial: bool,
) -> Tuple[int, Dict[str, Any]]:
"""
This function computes page_params for when we load the home page.
The page_params data structure gets sent to the client.
"""
client_capabilities = {
"notification_settings_null": True,
"bulk_message_deletion": True,
"user_avatar_url_field_optional": True,
}
register_ret = do_events_register(
user_profile,
request.client,
apply_markdown=True,
client_gravatar=True,
slim_presence=True,
client_capabilities=client_capabilities,
narrow=narrow,
)
furthest_read_time = get_furthest_read_time(user_profile)
# We pick a language for the user as follows:
# * First priority is the language in the URL, for debugging.
# * If not in the URL, we use the language from the user's settings.
request_language = translation.get_language_from_path(request.path_info)
if request_language is None:
request_language = register_ret['default_language']
translation.activate(request_language)
# We also save the language to the user's session, so that
# something reasonable will happen in logged-in portico pages.
request.session[translation.LANGUAGE_SESSION_KEY] = translation.get_language()
two_fa_enabled = (
settings.TWO_FACTOR_AUTHENTICATION_ENABLED and user_profile is not None
)
# Pass parameters to the client-side JavaScript code.
# These end up in a global JavaScript Object named 'page_params'.
page_params = dict(
# Server settings.
debug_mode=settings.DEBUG,
test_suite=settings.TEST_SUITE,
poll_timeout=settings.POLL_TIMEOUT,
insecure_desktop_app=insecure_desktop_app,
login_page=settings.HOME_NOT_LOGGED_IN,
root_domain_uri=settings.ROOT_DOMAIN_URI,
save_stacktraces=settings.SAVE_FRONTEND_STACKTRACES,
warn_no_email=settings.WARN_NO_EMAIL,
search_pills_enabled=settings.SEARCH_PILLS_ENABLED,
# Misc. extra data.
initial_servertime=time.time(), # Used for calculating relative presence age
default_language_name=get_language_name(register_ret["default_language"]),
language_list_dbl_col=get_language_list_for_templates(
register_ret["default_language"]
),
language_list=get_language_list(),
needs_tutorial=needs_tutorial,
first_in_realm=first_in_realm,
prompt_for_invites=prompt_for_invites,
furthest_read_time=furthest_read_time,
has_mobile_devices=has_mobile_devices,
bot_types=get_bot_types(user_profile),
two_fa_enabled=two_fa_enabled,
# Adding two_fa_enabled as condition saves us 3 queries when
# 2FA is not enabled.
two_fa_enabled_user=two_fa_enabled and bool(default_device(user_profile)),
)
undesired_register_ret_fields = [
"streams",
]
for field_name in set(register_ret.keys()) - set(undesired_register_ret_fields):
page_params[field_name] = register_ret[field_name]
if narrow_stream is not None:
# In narrow_stream context, initial pointer is just latest message
recipient = narrow_stream.recipient
try:
max_message_id = (
Message.objects.filter(recipient=recipient)
.order_by("id")
.reverse()[0]
.id
)
except IndexError:
max_message_id = -1
page_params["narrow_stream"] = narrow_stream.name
if narrow_topic is not None:
page_params["narrow_topic"] = narrow_topic
page_params["narrow"] = [
dict(operator=term[0], operand=term[1]) for term in narrow
]
page_params["max_message_id"] = max_message_id
page_params["enable_desktop_notifications"] = False
page_params["translation_data"] = get_language_translation_data(request_language)
return register_ret["queue_id"], page_params