import calendar import datetime import os import time from dataclasses import dataclass from typing import Any, Dict, List, Optional, Tuple import pytz from django.conf import settings from django.http import HttpRequest from django.utils import translation from django.utils.timezone import now as timezone_now from two_factor.utils import default_device from zerver.lib.events import do_events_register from zerver.lib.i18n import ( get_and_set_request_language, get_language_list, get_language_list_for_templates, get_language_name, get_language_translation_data, ) from zerver.models import Message, Realm, Stream, UserProfile from zerver.views.message_flags import get_latest_update_message_flag_activity @dataclass class BillingInfo: show_billing: bool show_plans: bool @dataclass class UserPermissionInfo: color_scheme: int is_guest: bool is_realm_admin: bool is_realm_owner: bool show_webathena: bool # LAST_SERVER_UPGRADE_TIME is the last time the server had a version deployed. if settings.PRODUCTION: # nocoverage timestamp = os.path.basename(os.path.abspath(settings.DEPLOY_ROOT)) LAST_SERVER_UPGRADE_TIME = datetime.datetime.strptime(timestamp, "%Y-%m-%d-%H-%M-%S").replace( tzinfo=pytz.utc ) else: LAST_SERVER_UPGRADE_TIME = timezone_now() def is_outdated_server(user_profile: Optional[UserProfile]) -> bool: # Release tarballs are unpacked via `tar -xf`, which means the # `mtime` on files in them is preserved from when the release # tarball was built. Checking this allows us to catch cases where # someone has upgraded in the last year but to a release more than # a year old. git_version_path = os.path.join(settings.DEPLOY_ROOT, "version.py") release_build_time = datetime.datetime.utcfromtimestamp( os.path.getmtime(git_version_path) ).replace(tzinfo=pytz.utc) version_no_newer_than = min(LAST_SERVER_UPGRADE_TIME, release_build_time) deadline = version_no_newer_than + datetime.timedelta( days=settings.SERVER_UPGRADE_NAG_DEADLINE_DAYS ) if user_profile is None or not user_profile.is_realm_admin: # Administrators get warned at the deadline; all users 30 days later. deadline = deadline + datetime.timedelta(days=30) if timezone_now() > deadline: return True return False def get_furthest_read_time(user_profile: Optional[UserProfile]) -> Optional[float]: if user_profile is None: return time.time() user_activity = get_latest_update_message_flag_activity(user_profile) if user_activity is None: return None return calendar.timegm(user_activity.last_visit.utctimetuple()) def get_bot_types(user_profile: Optional[UserProfile]) -> List[Dict[str, object]]: bot_types: List[Dict[str, object]] = [] if user_profile is None: return bot_types for type_id, name in UserProfile.BOT_TYPES.items(): bot_types.append( dict( type_id=type_id, name=name, allowed=type_id in user_profile.allowed_bot_types, ) ) return bot_types def promote_sponsoring_zulip_in_realm(realm: Realm) -> bool: if not settings.PROMOTE_SPONSORING_ZULIP: return False # If PROMOTE_SPONSORING_ZULIP is enabled, advertise sponsoring # Zulip in the gear menu of non-paying organizations. return realm.plan_type in [Realm.STANDARD_FREE, Realm.SELF_HOSTED] def get_billing_info(user_profile: UserProfile) -> BillingInfo: show_billing = False show_plans = False if settings.CORPORATE_ENABLED and user_profile is not None: if user_profile.has_billing_access: from corporate.models import CustomerPlan, get_customer_by_realm customer = get_customer_by_realm(user_profile.realm) if customer is not None: if customer.sponsorship_pending: show_billing = True elif CustomerPlan.objects.filter(customer=customer).exists(): show_billing = True if not user_profile.is_guest and user_profile.realm.plan_type == Realm.LIMITED: show_plans = True return BillingInfo( show_billing=show_billing, show_plans=show_plans, ) def get_user_permission_info(user_profile: Optional[UserProfile]) -> UserPermissionInfo: if user_profile is not None: return UserPermissionInfo( color_scheme=user_profile.color_scheme, is_guest=user_profile.is_guest, is_realm_owner=user_profile.is_realm_owner, is_realm_admin=user_profile.is_realm_admin, show_webathena=user_profile.realm.webathena_enabled, ) else: return UserPermissionInfo( color_scheme=UserProfile.COLOR_SCHEME_AUTOMATIC, is_guest=False, is_realm_admin=False, is_realm_owner=False, show_webathena=False, ) def build_page_params_for_home_page_load( request: HttpRequest, user_profile: Optional[UserProfile], realm: Realm, insecure_desktop_app: bool, has_mobile_devices: bool, narrow: List[List[str]], narrow_stream: Optional[Stream], narrow_topic: Optional[str], first_in_realm: bool, prompt_for_invites: bool, needs_tutorial: bool, ) -> Tuple[int, Dict[str, Any]]: """ This function computes page_params for when we load the home page. The page_params data structure gets sent to the client. """ client_capabilities = { "notification_settings_null": True, "bulk_message_deletion": True, "user_avatar_url_field_optional": True, "stream_typing_notifications": False, # Set this to True when frontend support is implemented. } if user_profile is not None: register_ret = do_events_register( user_profile, request.client, apply_markdown=True, client_gravatar=True, slim_presence=True, client_capabilities=client_capabilities, narrow=narrow, include_streams=False, ) else: # Since events for web_public_visitor is not implemented, we only fetch the data # at the time of request and don't register for any events. # TODO: Implement events for web_public_visitor. from zerver.lib.events import fetch_initial_state_data, post_process_state register_ret = fetch_initial_state_data( user_profile, realm=realm, event_types=None, queue_id=None, client_gravatar=False, user_avatar_url_field_optional=client_capabilities["user_avatar_url_field_optional"], slim_presence=False, include_subscribers=False, include_streams=False, ) post_process_state(user_profile, register_ret, False) furthest_read_time = get_furthest_read_time(user_profile) request_language = get_and_set_request_language( request, register_ret["default_language"], translation.get_language_from_path(request.path_info), ) two_fa_enabled = settings.TWO_FACTOR_AUTHENTICATION_ENABLED and user_profile is not None # Pass parameters to the client-side JavaScript code. # These end up in a JavaScript Object named 'page_params'. page_params = dict( # Server settings. debug_mode=settings.DEBUG, test_suite=settings.TEST_SUITE, poll_timeout=settings.POLL_TIMEOUT, insecure_desktop_app=insecure_desktop_app, server_needs_upgrade=is_outdated_server(user_profile), login_page=settings.HOME_NOT_LOGGED_IN, root_domain_uri=settings.ROOT_DOMAIN_URI, save_stacktraces=settings.SAVE_FRONTEND_STACKTRACES, warn_no_email=settings.WARN_NO_EMAIL, search_pills_enabled=settings.SEARCH_PILLS_ENABLED, # Only show marketing email settings if on Zulip Cloud enable_marketing_emails_enabled=settings.CORPORATE_ENABLED, # Misc. extra data. initial_servertime=time.time(), # Used for calculating relative presence age default_language_name=get_language_name(register_ret["default_language"]), language_list_dbl_col=get_language_list_for_templates(register_ret["default_language"]), language_list=get_language_list(), needs_tutorial=needs_tutorial, first_in_realm=first_in_realm, prompt_for_invites=prompt_for_invites, furthest_read_time=furthest_read_time, has_mobile_devices=has_mobile_devices, bot_types=get_bot_types(user_profile), two_fa_enabled=two_fa_enabled, # Adding two_fa_enabled as condition saves us 3 queries when # 2FA is not enabled. two_fa_enabled_user=two_fa_enabled and bool(default_device(user_profile)), is_web_public_visitor=user_profile is None, # There is no event queue for web_public_visitors since # events support for web_public_visitors is not implemented yet. no_event_queue=user_profile is None, ) for field_name in register_ret.keys(): page_params[field_name] = register_ret[field_name] if narrow_stream is not None: # In narrow_stream context, initial pointer is just latest message recipient = narrow_stream.recipient try: max_message_id = ( Message.objects.filter(recipient=recipient).order_by("id").reverse()[0].id ) except IndexError: max_message_id = -1 page_params["narrow_stream"] = narrow_stream.name if narrow_topic is not None: page_params["narrow_topic"] = narrow_topic page_params["narrow"] = [dict(operator=term[0], operand=term[1]) for term in narrow] page_params["max_message_id"] = max_message_id page_params["enable_desktop_notifications"] = False page_params["translation_data"] = get_language_translation_data(request_language) return register_ret["queue_id"], page_params