from dataclasses import dataclass from datetime import datetime, timedelta from typing import Optional, TypedDict, Union from urllib.parse import urlencode, urljoin, urlunsplit from django.conf import settings from django.db.models import Sum from django.urls import reverse from django.utils.timezone import now as timezone_now from corporate.lib.stripe import ( BillingSession, PushNotificationsEnabledStatus, RealmBillingSession, RemoteRealmBillingSession, RemoteServerBillingSession, get_configured_fixed_price_plan_offer, get_price_per_license, get_push_status_for_remote_request, start_of_next_billing_cycle, ) from corporate.models import ( Customer, CustomerPlan, CustomerPlanOffer, ZulipSponsorshipRequest, get_current_plan_by_customer, ) from zerver.models import Realm from zerver.models.realms import get_org_type_display_name, get_realm from zilencer.lib.remote_counts import MissingDataError from zilencer.models import ( RemoteCustomerUserCount, RemoteInstallationCount, RemotePushDeviceToken, RemoteRealm, RemoteRealmCount, RemoteZulipServer, RemoteZulipServerAuditLog, get_remote_realm_guest_and_non_guest_count, get_remote_server_guest_and_non_guest_count, has_stale_audit_log, ) USER_DATA_STALE_WARNING = "Recent audit log data missing: No information for used licenses" class SponsorshipRequestDict(TypedDict): org_type: str org_website: str org_description: str total_users: str paid_users: str paid_users_description: str requested_plan: str @dataclass class SponsorshipData: sponsorship_pending: bool = False monthly_discounted_price: Optional[int] = None annual_discounted_price: Optional[int] = None original_monthly_plan_price: Optional[int] = None original_annual_plan_price: Optional[int] = None minimum_licenses: Optional[int] = None required_plan_tier: Optional[int] = None latest_sponsorship_request: Optional[SponsorshipRequestDict] = None @dataclass class NextPlanData: plan: Union["CustomerPlan", "CustomerPlanOffer", None] = None estimated_revenue: Optional[int] = None @dataclass class PlanData: customer: Optional["Customer"] = None current_plan: Optional["CustomerPlan"] = None next_plan: Union["CustomerPlan", "CustomerPlanOffer", None] = None licenses: Optional[int] = None licenses_used: Optional[int] = None next_billing_cycle_start: Optional[datetime] = None is_legacy_plan: bool = False has_fixed_price: bool = False is_current_plan_billable: bool = False stripe_customer_url: Optional[str] = None warning: Optional[str] = None annual_recurring_revenue: Optional[int] = None estimated_next_plan_revenue: Optional[int] = None @dataclass class MobilePushData: total_mobile_users: int push_notification_status: PushNotificationsEnabledStatus uncategorized_mobile_users: Optional[int] = None mobile_pushes_forwarded: Optional[int] = None last_mobile_push_sent: str = "" @dataclass class RemoteSupportData: date_created: datetime has_stale_audit_log: bool plan_data: PlanData sponsorship_data: SponsorshipData user_data: RemoteCustomerUserCount mobile_push_data: MobilePushData @dataclass class CloudSupportData: plan_data: PlanData sponsorship_data: SponsorshipData def get_stripe_customer_url(stripe_id: str) -> str: return f"https://dashboard.stripe.com/customers/{stripe_id}" # nocoverage def get_realm_support_url(realm: Realm) -> str: support_realm_url = get_realm(settings.STAFF_SUBDOMAIN).url support_url = urljoin( support_realm_url, urlunsplit(("", "", reverse("support"), urlencode({"q": realm.string_id}), "")), ) return support_url def get_customer_sponsorship_data(customer: Customer) -> SponsorshipData: pending = customer.sponsorship_pending licenses = customer.minimum_licenses plan_tier = customer.required_plan_tier sponsorship_request = None monthly_discounted_price = None annual_discounted_price = None original_monthly_plan_price = None original_annual_plan_price = None if customer.monthly_discounted_price: monthly_discounted_price = customer.monthly_discounted_price if customer.annual_discounted_price: annual_discounted_price = customer.annual_discounted_price if plan_tier is not None: original_monthly_plan_price = get_price_per_license( plan_tier, CustomerPlan.BILLING_SCHEDULE_MONTHLY ) original_annual_plan_price = get_price_per_license( plan_tier, CustomerPlan.BILLING_SCHEDULE_ANNUAL ) if pending: last_sponsorship_request = ( ZulipSponsorshipRequest.objects.filter(customer=customer).order_by("id").last() ) if last_sponsorship_request is not None: org_type_name = get_org_type_display_name(last_sponsorship_request.org_type) if ( last_sponsorship_request.org_website is None or last_sponsorship_request.org_website == "" ): website = "No website submitted" else: website = last_sponsorship_request.org_website sponsorship_request = SponsorshipRequestDict( org_type=org_type_name, org_website=website, org_description=last_sponsorship_request.org_description, total_users=last_sponsorship_request.expected_total_users, paid_users=last_sponsorship_request.paid_users_count, paid_users_description=last_sponsorship_request.paid_users_description, requested_plan=last_sponsorship_request.requested_plan, ) return SponsorshipData( sponsorship_pending=pending, monthly_discounted_price=monthly_discounted_price, annual_discounted_price=annual_discounted_price, original_monthly_plan_price=original_monthly_plan_price, original_annual_plan_price=original_annual_plan_price, minimum_licenses=licenses, required_plan_tier=plan_tier, latest_sponsorship_request=sponsorship_request, ) def get_annual_invoice_count(billing_schedule: int) -> int: if billing_schedule == CustomerPlan.BILLING_SCHEDULE_MONTHLY: return 12 else: # nocoverage return 1 def get_next_plan_data( billing_session: BillingSession, customer: Customer, current_plan: Optional[CustomerPlan] = None, ) -> NextPlanData: plan_offer: Optional[CustomerPlanOffer] = None # A customer can have a CustomerPlanOffer with or without a current plan. if customer.required_plan_tier: plan_offer = get_configured_fixed_price_plan_offer(customer, customer.required_plan_tier) if plan_offer is not None: next_plan_data = NextPlanData(plan=plan_offer) elif current_plan is not None: next_plan_data = NextPlanData(plan=billing_session.get_next_plan(current_plan)) else: next_plan_data = NextPlanData() if next_plan_data.plan is not None: if next_plan_data.plan.fixed_price is not None: next_plan_data.estimated_revenue = next_plan_data.plan.fixed_price return next_plan_data if current_plan is not None: licenses_at_next_renewal = current_plan.licenses_at_next_renewal() if licenses_at_next_renewal is not None: assert type(next_plan_data.plan) is CustomerPlan assert next_plan_data.plan.price_per_license is not None invoice_count = get_annual_invoice_count(next_plan_data.plan.billing_schedule) next_plan_data.estimated_revenue = ( next_plan_data.plan.price_per_license * licenses_at_next_renewal * invoice_count ) else: next_plan_data.estimated_revenue = 0 # nocoverage return next_plan_data return next_plan_data def get_plan_data_for_support_view( billing_session: BillingSession, user_count: Optional[int] = None, stale_user_data: bool = False ) -> PlanData: customer = billing_session.get_customer() plan = None if customer is not None: plan = get_current_plan_by_customer(customer) plan_data = PlanData( customer=customer, current_plan=plan, ) if plan is not None: new_plan, last_ledger_entry = billing_session.make_end_of_cycle_updates_if_needed( plan, timezone_now() ) if last_ledger_entry is not None: if new_plan is not None: plan_data.current_plan = new_plan # nocoverage plan_data.licenses = last_ledger_entry.licenses assert plan_data.current_plan is not None # for mypy # If we already have user count data, we use that # instead of querying the database again to get # the number of currently used licenses. if stale_user_data: plan_data.warning = USER_DATA_STALE_WARNING elif user_count is None: try: plan_data.licenses_used = billing_session.current_count_for_billed_licenses() except MissingDataError: # nocoverage plan_data.warning = USER_DATA_STALE_WARNING else: # nocoverage assert user_count is not None plan_data.licenses_used = user_count if plan_data.current_plan.status in ( CustomerPlan.FREE_TRIAL, CustomerPlan.DOWNGRADE_AT_END_OF_FREE_TRIAL, ): # nocoverage assert plan_data.current_plan.next_invoice_date is not None plan_data.next_billing_cycle_start = plan_data.current_plan.next_invoice_date else: plan_data.next_billing_cycle_start = start_of_next_billing_cycle( plan_data.current_plan, timezone_now() ) plan_data.is_legacy_plan = ( plan_data.current_plan.tier == CustomerPlan.TIER_SELF_HOSTED_LEGACY ) plan_data.has_fixed_price = plan_data.current_plan.fixed_price is not None plan_data.is_current_plan_billable = billing_session.check_plan_tier_is_billable( plan_tier=plan_data.current_plan.tier ) if last_ledger_entry is not None: plan_data.annual_recurring_revenue = ( billing_session.get_annual_recurring_revenue_for_support_data( plan_data.current_plan, last_ledger_entry ) ) else: plan_data.annual_recurring_revenue = 0 # nocoverage # Check for a non-active/scheduled CustomerPlan or CustomerPlanOffer if customer is not None: next_plan_data = get_next_plan_data(billing_session, customer, plan_data.current_plan) plan_data.next_plan = next_plan_data.plan plan_data.estimated_next_plan_revenue = next_plan_data.estimated_revenue # If customer has a stripe ID, add link to stripe customer dashboard if customer is not None and customer.stripe_customer_id is not None: plan_data.stripe_customer_url = get_stripe_customer_url( customer.stripe_customer_id ) # nocoverage return plan_data def get_mobile_push_data(remote_entity: Union[RemoteZulipServer, RemoteRealm]) -> MobilePushData: if isinstance(remote_entity, RemoteZulipServer): total_users = ( RemotePushDeviceToken.objects.filter(server=remote_entity) .distinct("user_id", "user_uuid") .count() ) uncategorized_users = ( RemotePushDeviceToken.objects.filter(server=remote_entity, remote_realm__isnull=True) .distinct("user_id", "user_uuid") .count() ) mobile_pushes = RemoteInstallationCount.objects.filter( server=remote_entity, property="mobile_pushes_forwarded::day", end_time__gte=timezone_now() - timedelta(days=7), ).aggregate(total_forwarded=Sum("value", default=0)) latest_remote_server_push_forwarded_count = RemoteInstallationCount.objects.filter( server=remote_entity, property="mobile_pushes_forwarded::day", ).last() if latest_remote_server_push_forwarded_count is not None: # nocoverage # mobile_pushes_forwarded is a CountStat with a day frequency, # so we want to show the start of the latest day interval. push_forwarded_interval_start = ( latest_remote_server_push_forwarded_count.end_time - timedelta(days=1) ).strftime("%Y-%m-%d") else: push_forwarded_interval_start = "None" push_notification_status = get_push_status_for_remote_request( remote_server=remote_entity, remote_realm=None ) return MobilePushData( total_mobile_users=total_users, push_notification_status=push_notification_status, uncategorized_mobile_users=uncategorized_users, mobile_pushes_forwarded=mobile_pushes["total_forwarded"], last_mobile_push_sent=push_forwarded_interval_start, ) else: assert isinstance(remote_entity, RemoteRealm) mobile_users = ( RemotePushDeviceToken.objects.filter(remote_realm=remote_entity) .distinct("user_id", "user_uuid") .count() ) mobile_pushes = RemoteRealmCount.objects.filter( remote_realm=remote_entity, property="mobile_pushes_forwarded::day", end_time__gte=timezone_now() - timedelta(days=7), ).aggregate(total_forwarded=Sum("value", default=0)) latest_remote_realm_push_forwarded_count = RemoteRealmCount.objects.filter( remote_realm=remote_entity, property="mobile_pushes_forwarded::day", ).last() if latest_remote_realm_push_forwarded_count is not None: # nocoverage # mobile_pushes_forwarded is a CountStat with a day frequency, # so we want to show the start of the latest day interval. push_forwarded_interval_start = ( latest_remote_realm_push_forwarded_count.end_time - timedelta(days=1) ).strftime("%Y-%m-%d") else: push_forwarded_interval_start = "None" push_notification_status = get_push_status_for_remote_request( remote_entity.server, remote_entity ) return MobilePushData( total_mobile_users=mobile_users, push_notification_status=push_notification_status, uncategorized_mobile_users=None, mobile_pushes_forwarded=mobile_pushes["total_forwarded"], last_mobile_push_sent=push_forwarded_interval_start, ) def get_data_for_remote_support_view(billing_session: BillingSession) -> RemoteSupportData: if isinstance(billing_session, RemoteServerBillingSession): user_data = get_remote_server_guest_and_non_guest_count(billing_session.remote_server.id) stale_audit_log_data = has_stale_audit_log(billing_session.remote_server) date_created = RemoteZulipServerAuditLog.objects.get( event_type=RemoteZulipServerAuditLog.REMOTE_SERVER_CREATED, server__id=billing_session.remote_server.id, ).event_time mobile_data = get_mobile_push_data(billing_session.remote_server) else: assert isinstance(billing_session, RemoteRealmBillingSession) user_data = get_remote_realm_guest_and_non_guest_count(billing_session.remote_realm) stale_audit_log_data = has_stale_audit_log(billing_session.remote_realm.server) date_created = billing_session.remote_realm.realm_date_created mobile_data = get_mobile_push_data(billing_session.remote_realm) user_count = user_data.guest_user_count + user_data.non_guest_user_count plan_data = get_plan_data_for_support_view(billing_session, user_count, stale_audit_log_data) if plan_data.customer is not None: sponsorship_data = get_customer_sponsorship_data(plan_data.customer) else: sponsorship_data = SponsorshipData() return RemoteSupportData( date_created=date_created, has_stale_audit_log=stale_audit_log_data, plan_data=plan_data, sponsorship_data=sponsorship_data, user_data=user_data, mobile_push_data=mobile_data, ) def get_data_for_cloud_support_view(billing_session: BillingSession) -> CloudSupportData: assert isinstance(billing_session, RealmBillingSession) plan_data = get_plan_data_for_support_view(billing_session) if plan_data.customer is not None: sponsorship_data = get_customer_sponsorship_data(plan_data.customer) else: sponsorship_data = SponsorshipData() return CloudSupportData( plan_data=plan_data, sponsorship_data=sponsorship_data, )