zulip/corporate/views/billing_page.py

290 lines
11 KiB
Python
Raw Normal View History

import logging
from typing import Any, Dict, Optional
import stripe
from django.conf import settings
from django.http import HttpRequest, HttpResponse, HttpResponseRedirect
from django.shortcuts import render
from django.urls import reverse
from django.utils.timezone import now as timezone_now
from django.utils.translation import gettext as _
from corporate.lib.stripe import (
cents_to_dollar_string,
do_change_plan_status,
downgrade_at_the_end_of_billing_cycle,
downgrade_now_without_creating_additional_invoices,
get_latest_seat_count,
make_end_of_cycle_updates_if_needed,
renewal_amount,
start_of_next_billing_cycle,
stripe_get_customer,
update_license_ledger_for_manual_plan,
validate_licenses,
)
from corporate.models import (
CustomerPlan,
get_current_plan_by_customer,
get_current_plan_by_realm,
get_customer_by_realm,
)
from zerver.decorator import require_billing_access, zulip_login_required
from zerver.lib.exceptions import JsonableError
from zerver.lib.request import REQ, has_request_variables
from zerver.lib.response import json_success
from zerver.lib.validator import check_bool, check_int, check_int_in
from zerver.models import Realm, UserProfile
billing_logger = logging.getLogger("corporate.stripe")
# Should only be called if the customer is being charged automatically
def payment_method_string(stripe_customer: stripe.Customer) -> str:
default_payment_method = stripe_customer.invoice_settings.default_payment_method
if default_payment_method is None:
return _("No payment method on file")
if default_payment_method.type == "card":
return _("{brand} ending in {last4}").format(
brand=default_payment_method.card.brand,
last4=default_payment_method.card.last4,
)
# There might be one-off stuff we do for a particular customer that
# would land them here. E.g. by default we don't support ACH for
# automatic payments, but in theory we could add it for a customer via
# the Stripe dashboard.
return _("Unknown payment method. Please contact {email}.").format(
email=settings.ZULIP_ADMINISTRATOR,
) # nocoverage
def add_sponsorship_info_to_context(context: Dict[str, Any], user_profile: UserProfile) -> None:
def key_helper(d: Any) -> int:
return d[1]["display_order"]
context.update(
realm_org_type=user_profile.realm.org_type,
sorted_org_types=sorted(
(
[org_type_name, org_type]
for (org_type_name, org_type) in Realm.ORG_TYPES.items()
if not org_type.get("hidden")
),
key=key_helper,
),
)
@zulip_login_required
@has_request_variables
def sponsorship_request(request: HttpRequest) -> HttpResponse:
user = request.user
assert user.is_authenticated
context: Dict[str, Any] = {}
customer = get_customer_by_realm(user.realm)
if customer is not None and customer.sponsorship_pending:
context["is_sponsorship_pending"] = True
if user.realm.plan_type == user.realm.PLAN_TYPE_STANDARD_FREE:
context["is_sponsored"] = True
add_sponsorship_info_to_context(context, user)
return render(request, "corporate/sponsorship.html", context=context)
@zulip_login_required
@has_request_variables
def billing_home(
request: HttpRequest, onboarding: bool = REQ(default=False, json_validator=check_bool)
) -> HttpResponse:
user = request.user
assert user.is_authenticated
context: Dict[str, Any] = {
"admin_access": user.has_billing_access,
"has_active_plan": False,
}
if not user.has_billing_access:
return render(request, "corporate/billing.html", context=context)
if user.realm.plan_type == user.realm.PLAN_TYPE_STANDARD_FREE:
context["is_sponsored"] = True
return HttpResponseRedirect(reverse("sponsorship_request"))
customer = get_customer_by_realm(user.realm)
if (
customer is not None and customer.sponsorship_pending
) or user.realm.plan_type == user.realm.PLAN_TYPE_STANDARD_FREE:
return HttpResponseRedirect(reverse("sponsorship_request"))
if customer is None:
from corporate.views.upgrade import initial_upgrade
return HttpResponseRedirect(reverse(initial_upgrade))
if not CustomerPlan.objects.filter(customer=customer).exists():
from corporate.views.upgrade import initial_upgrade
return HttpResponseRedirect(reverse(initial_upgrade))
plan = get_current_plan_by_customer(customer)
if plan is not None:
now = timezone_now()
new_plan, last_ledger_entry = make_end_of_cycle_updates_if_needed(plan, now)
if last_ledger_entry is not None:
if new_plan is not None: # nocoverage
plan = new_plan
assert plan is not None # for mypy
downgrade_at_end_of_cycle = plan.status == CustomerPlan.DOWNGRADE_AT_END_OF_CYCLE
switch_to_annual_at_end_of_cycle = (
plan.status == CustomerPlan.SWITCH_TO_ANNUAL_AT_END_OF_CYCLE
)
licenses = last_ledger_entry.licenses
licenses_at_next_renewal = last_ledger_entry.licenses_at_next_renewal
seat_count = get_latest_seat_count(user.realm)
# Should do this in JavaScript, using the user's time zone
if plan.is_free_trial():
assert plan.next_invoice_date is not None
renewal_date = "{dt:%B} {dt.day}, {dt.year}".format(dt=plan.next_invoice_date)
else:
renewal_date = "{dt:%B} {dt.day}, {dt.year}".format(
dt=start_of_next_billing_cycle(plan, now)
)
renewal_cents = renewal_amount(plan, now)
charge_automatically = plan.charge_automatically
assert customer.stripe_customer_id is not None # for mypy
stripe_customer = stripe_get_customer(customer.stripe_customer_id)
if charge_automatically:
payment_method = payment_method_string(stripe_customer)
else:
payment_method = "Billed by invoice"
context.update(
plan_name=plan.name,
has_active_plan=True,
free_trial=plan.is_free_trial(),
downgrade_at_end_of_cycle=downgrade_at_end_of_cycle,
automanage_licenses=plan.automanage_licenses,
switch_to_annual_at_end_of_cycle=switch_to_annual_at_end_of_cycle,
licenses=licenses,
licenses_at_next_renewal=licenses_at_next_renewal,
seat_count=seat_count,
renewal_date=renewal_date,
renewal_amount=cents_to_dollar_string(renewal_cents),
payment_method=payment_method,
charge_automatically=charge_automatically,
stripe_email=stripe_customer.email,
CustomerPlan=CustomerPlan,
onboarding=onboarding,
)
return render(request, "corporate/billing.html", context=context)
@require_billing_access
@has_request_variables
def update_plan(
request: HttpRequest,
user: UserProfile,
status: Optional[int] = REQ(
"status",
json_validator=check_int_in(
[
CustomerPlan.ACTIVE,
CustomerPlan.DOWNGRADE_AT_END_OF_CYCLE,
CustomerPlan.SWITCH_TO_ANNUAL_AT_END_OF_CYCLE,
CustomerPlan.ENDED,
]
),
default=None,
),
licenses: Optional[int] = REQ("licenses", json_validator=check_int, default=None),
licenses_at_next_renewal: Optional[int] = REQ(
"licenses_at_next_renewal", json_validator=check_int, default=None
),
) -> HttpResponse:
plan = get_current_plan_by_realm(user.realm)
assert plan is not None # for mypy
new_plan, last_ledger_entry = make_end_of_cycle_updates_if_needed(plan, timezone_now())
if new_plan is not None:
raise JsonableError(
_("Unable to update the plan. The plan has been expired and replaced with a new plan.")
)
if last_ledger_entry is None:
raise JsonableError(_("Unable to update the plan. The plan has ended."))
if status is not None:
if status == CustomerPlan.ACTIVE:
assert plan.status == CustomerPlan.DOWNGRADE_AT_END_OF_CYCLE
do_change_plan_status(plan, status)
elif status == CustomerPlan.DOWNGRADE_AT_END_OF_CYCLE:
assert plan.status == CustomerPlan.ACTIVE
downgrade_at_the_end_of_billing_cycle(user.realm)
elif status == CustomerPlan.SWITCH_TO_ANNUAL_AT_END_OF_CYCLE:
assert plan.billing_schedule == CustomerPlan.MONTHLY
assert plan.status == CustomerPlan.ACTIVE
assert plan.fixed_price is None
do_change_plan_status(plan, status)
elif status == CustomerPlan.ENDED:
assert plan.is_free_trial()
downgrade_now_without_creating_additional_invoices(user.realm)
return json_success(request)
if licenses is not None:
if plan.automanage_licenses:
raise JsonableError(
_(
"Unable to update licenses manually. Your plan is on automatic license management."
)
)
if last_ledger_entry.licenses == licenses:
raise JsonableError(
_(
"Your plan is already on {licenses} licenses in the current billing period."
).format(licenses=licenses)
)
if last_ledger_entry.licenses > licenses:
raise JsonableError(
_("You cannot decrease the licenses in the current billing period.")
)
validate_licenses(
plan.charge_automatically,
licenses,
get_latest_seat_count(user.realm),
plan.customer.exempt_from_license_number_check,
)
update_license_ledger_for_manual_plan(plan, timezone_now(), licenses=licenses)
return json_success(request)
if licenses_at_next_renewal is not None:
if plan.automanage_licenses:
raise JsonableError(
_(
"Unable to update licenses manually. Your plan is on automatic license management."
)
)
if last_ledger_entry.licenses_at_next_renewal == licenses_at_next_renewal:
raise JsonableError(
_(
"Your plan is already scheduled to renew with {licenses_at_next_renewal} licenses."
).format(licenses_at_next_renewal=licenses_at_next_renewal)
)
validate_licenses(
plan.charge_automatically,
licenses_at_next_renewal,
get_latest_seat_count(user.realm),
plan.customer.exempt_from_license_number_check,
)
update_license_ledger_for_manual_plan(
plan, timezone_now(), licenses_at_next_renewal=licenses_at_next_renewal
)
return json_success(request)
raise JsonableError(_("Nothing to change."))