from datetime import datetime, timedelta, timezone
from decimal import Decimal
from typing import TYPE_CHECKING, Any, Optional
from unittest import mock
import orjson
import time_machine
from django.utils.timezone import now as timezone_now
from typing_extensions import override
from corporate.lib.stripe import RealmBillingSession, RemoteRealmBillingSession, add_months
from corporate.models import (
Customer,
CustomerPlan,
LicenseLedger,
SponsoredPlanTypes,
ZulipSponsorshipRequest,
get_current_plan_by_customer,
get_customer_by_realm,
)
from zerver.actions.invites import do_create_multiuse_invite_link
from zerver.actions.realm_settings import do_change_realm_org_type, do_send_realm_reactivation_email
from zerver.actions.user_settings import do_change_user_setting
from zerver.lib.test_classes import ZulipTestCase
from zerver.lib.test_helpers import reset_email_visibility_to_everyone_in_zulip_realm
from zerver.models import MultiuseInvite, PreregistrationUser, Realm, UserMessage, UserProfile
from zerver.models.realms import OrgTypeEnum, get_org_type_display_name, get_realm
from zilencer.lib.remote_counts import MissingDataError
if TYPE_CHECKING:
from django.test.client import _MonkeyPatchedWSGIResponse as TestHttpResponse
import uuid
from zilencer.models import RemoteRealm, RemoteZulipServer, RemoteZulipServerAuditLog
class TestRemoteServerSupportEndpoint(ZulipTestCase):
@override
def setUp(self) -> None:
def add_sponsorship_request(
name: str, org_type: int, website: str, paid_users: str, plan: str
) -> None:
remote_realm = RemoteRealm.objects.get(name=name)
customer = Customer.objects.create(remote_realm=remote_realm, sponsorship_pending=True)
ZulipSponsorshipRequest.objects.create(
customer=customer,
org_type=org_type,
org_website=website,
org_description="We help people.",
expected_total_users="20-35",
paid_users_count=paid_users,
paid_users_description="",
requested_plan=plan,
)
def add_legacy_plan_and_upgrade(name: str) -> None:
legacy_anchor = datetime(2050, 1, 1, tzinfo=timezone.utc)
next_plan_anchor = datetime(2050, 2, 1, tzinfo=timezone.utc)
billed_licenses = 10
remote_realm = RemoteRealm.objects.get(name=name)
billing_session = RemoteRealmBillingSession(remote_realm)
billing_session.migrate_customer_to_legacy_plan(legacy_anchor, next_plan_anchor)
customer = billing_session.get_customer()
assert customer is not None
legacy_plan = billing_session.get_remote_server_legacy_plan(customer)
assert legacy_plan is not None
assert legacy_plan.end_date is not None
last_ledger_entry = (
LicenseLedger.objects.filter(plan=legacy_plan).order_by("-id").first()
)
assert last_ledger_entry is not None
last_ledger_entry.licenses_at_next_renewal = billed_licenses
last_ledger_entry.save(update_fields=["licenses_at_next_renewal"])
legacy_plan.status = CustomerPlan.SWITCH_PLAN_TIER_AT_PLAN_END
legacy_plan.save(update_fields=["status"])
plan_params = {
"automanage_licenses": True,
"charge_automatically": False,
"price_per_license": 100,
"discount": customer.default_discount,
"billing_cycle_anchor": next_plan_anchor,
"billing_schedule": CustomerPlan.BILLING_SCHEDULE_MONTHLY,
"tier": CustomerPlan.TIER_SELF_HOSTED_BASIC,
"status": CustomerPlan.NEVER_STARTED,
}
CustomerPlan.objects.create(
customer=customer, next_invoice_date=next_plan_anchor, **plan_params
)
super().setUp()
# Set up some initial example data.
for i in range(5):
hostname = f"zulip-{i}.example.com"
remote_server = RemoteZulipServer.objects.create(
hostname=hostname, contact_email=f"admin@{hostname}", uuid=uuid.uuid4()
)
RemoteZulipServerAuditLog.objects.create(
event_type=RemoteZulipServerAuditLog.REMOTE_SERVER_CREATED,
server=remote_server,
event_time=remote_server.last_updated,
)
# We want at least one RemoteZulipServer that has no RemoteRealm
# as an example of a pre-8.0 release registered remote server.
if i > 1:
realm_name = f"realm-name-{i}"
realm_host = f"realm-host-{i}"
realm_uuid = uuid.uuid4()
RemoteRealm.objects.create(
server=remote_server,
uuid=realm_uuid,
host=realm_host,
name=realm_name,
realm_date_created=datetime(2023, 12, 1, tzinfo=timezone.utc),
)
# Add a deactivated server, which should be excluded from search results.
server = RemoteZulipServer.objects.get(hostname="zulip-0.example.com")
server.deactivated = True
server.save(update_fields=["deactivated"])
# Add example sponsorship request data
add_sponsorship_request(
name="realm-name-2",
org_type=OrgTypeEnum.Community.value,
website="",
paid_users="None",
plan=SponsoredPlanTypes.BUSINESS.value,
)
add_sponsorship_request(
name="realm-name-3",
org_type=OrgTypeEnum.OpenSource.value,
website="example.org",
paid_users="",
plan=SponsoredPlanTypes.COMMUNITY.value,
)
# Add expected legacy customer and plan data
add_legacy_plan_and_upgrade(name="realm-name-4")
def test_search(self) -> None:
def assert_server_details_in_response(
html_response: "TestHttpResponse", hostname: str
) -> None:
self.assert_in_success_response(
[
f"
{hostname}
",
f"Contact email: admin@{hostname}",
"Date created:",
"UUID:",
"Zulip version:",
"Plan type: Self-managed
",
"Non-guest user count: 0
",
"Guest user count: 0
",
],
html_response,
)
self.assert_not_in_success_response(["zulip-0.example.com
"], result)
def assert_realm_details_in_response(
html_response: "TestHttpResponse", name: str, host: str
) -> None:
self.assert_in_success_response(
[
f"{name}
",
f"Remote realm host: {host}
",
"Date created: 01 December 2023",
"Org type: Unspecified
",
"Has remote realms: True
",
],
html_response,
)
self.assert_not_in_success_response(["zulip-1.example.com
"], result)
def check_remote_server_with_no_realms(result: "TestHttpResponse") -> None:
assert_server_details_in_response(result, "zulip-1.example.com")
self.assert_not_in_success_response(
["zulip-2.example.com
", "Remote realm host:"], result
)
self.assert_in_success_response(["Has remote realms: False
"], result)
def check_sponsorship_request_no_website(result: "TestHttpResponse") -> None:
self.assert_in_success_response(
[
"Organization type: Community",
"Organization website: No website submitted",
"Paid users: None",
"Requested plan: Business",
"Organization description: We help people.",
"Estimated total users: 20-35",
"Description of paid users: ",
],
result,
)
def check_sponsorship_request_with_website(result: "TestHttpResponse") -> None:
self.assert_in_success_response(
[
"Organization type: Open-source project",
"Organization website: example.org",
"Paid users: ",
"Requested plan: Community",
"Organization description: We help people.",
"Estimated total users: 20-35",
"Description of paid users: ",
],
result,
)
def check_no_sponsorship_request(result: "TestHttpResponse") -> None:
self.assert_not_in_success_response(
[
"Organization description: We help people.",
"Estimated total users: 20-35",
"Description of paid users: ",
],
result,
)
def check_legacy_and_next_plan(result: "TestHttpResponse") -> None:
self.assert_in_success_response(
[
"📅 Current plan information:
",
"Plan name: Self-managed (legacy plan)
",
"Status: New plan scheduled
",
"End date: 01 February 2050
",
"⏱️ Next plan information:
",
"Plan name: Zulip Basic
",
"Status: Never started
",
"Start date: 01 February 2050
",
"Billing schedule: Monthly
",
"Price per license: $1.00
",
"Estimated billed licenses: 10
",
"Estimated annual revenue: $120.00
",
],
result,
)
self.login("cordelia")
result = self.client_get("/activity/remote/support")
self.assertEqual(result.status_code, 302)
self.assertEqual(result["Location"], "/login/")
# Iago is the user with the appropriate permissions to access this page.
self.login("iago")
assert self.example_user("iago").is_staff
result = self.client_get("/activity/remote/support")
self.assert_in_success_response(
[
'input type="text" name="q" class="input-xxlarge search-query" placeholder="hostname or contact email"'
],
result,
)
server = 0
result = self.client_get("/activity/remote/support", {"q": "example.com"})
self.assert_not_in_success_response([f"zulip-{server}.example.com
"], result)
for i in range(5):
if i != server:
self.assert_in_success_response([f"zulip-{i}.example.com
"], result)
server = 1
result = self.client_get("/activity/remote/support", {"q": f"zulip-{server}.example.com"})
check_remote_server_with_no_realms(result)
server = 2
with mock.patch("analytics.views.support.compute_max_monthly_messages", return_value=1000):
result = self.client_get(
"/activity/remote/support", {"q": f"zulip-{server}.example.com"}
)
self.assert_in_success_response(["Max monthly messages: 1000"], result)
assert_server_details_in_response(result, f"zulip-{server}.example.com")
assert_realm_details_in_response(result, f"realm-name-{server}", f"realm-host-{server}")
check_sponsorship_request_no_website(result)
with mock.patch(
"analytics.views.support.compute_max_monthly_messages", side_effect=MissingDataError
):
result = self.client_get(
"/activity/remote/support", {"q": f"zulip-{server}.example.com"}
)
self.assert_in_success_response(
["Max monthly messages: Recent data missing"], result
)
assert_server_details_in_response(result, f"zulip-{server}.example.com")
assert_realm_details_in_response(result, f"realm-name-{server}", f"realm-host-{server}")
check_sponsorship_request_no_website(result)
server = 3
result = self.client_get("/activity/remote/support", {"q": f"zulip-{server}.example.com"})
assert_server_details_in_response(result, f"zulip-{server}.example.com")
assert_realm_details_in_response(result, f"realm-name-{server}", f"realm-host-{server}")
check_sponsorship_request_with_website(result)
server = 4
result = self.client_get("/activity/remote/support", {"q": f"zulip-{server}.example.com"})
assert_server_details_in_response(result, f"zulip-{server}.example.com")
assert_realm_details_in_response(result, f"realm-name-{server}", f"realm-host-{server}")
check_no_sponsorship_request(result)
check_legacy_and_next_plan(result)
class TestSupportEndpoint(ZulipTestCase):
def create_customer_and_plan(self, realm: Realm, monthly: bool = False) -> Customer:
now = datetime(2016, 1, 2, tzinfo=timezone.utc)
billing_schedule = CustomerPlan.BILLING_SCHEDULE_ANNUAL
price_per_license = 8000
months = 12
if monthly:
billing_schedule = CustomerPlan.BILLING_SCHEDULE_MONTHLY
price_per_license = 800
months = 1
customer = Customer.objects.create(realm=realm)
plan = CustomerPlan.objects.create(
customer=customer,
billing_cycle_anchor=now,
billing_schedule=billing_schedule,
tier=CustomerPlan.TIER_CLOUD_STANDARD,
price_per_license=price_per_license,
next_invoice_date=add_months(now, months),
)
LicenseLedger.objects.create(
licenses=10,
licenses_at_next_renewal=10,
event_time=timezone_now(),
is_renewal=True,
plan=plan,
)
return customer
def test_search(self) -> None:
reset_email_visibility_to_everyone_in_zulip_realm()
lear_user = self.lear_user("king")
lear_user.is_staff = True
lear_user.save(update_fields=["is_staff"])
lear_realm = get_realm("lear")
def assert_user_details_in_html_response(
html_response: "TestHttpResponse", full_name: str, email: str, role: str
) -> None:
self.assert_in_success_response(
[
'user\n',
f"{full_name}
",
f"Email: {email}",
"Is active: True
",
f"Role: {role}
",
],
html_response,
)
def create_invitation(
stream: str, invitee_email: str, realm: Optional[Realm] = None
) -> None:
invite_expires_in_minutes = 10 * 24 * 60
self.client_post(
"/json/invites",
{
"invitee_emails": [invitee_email],
"stream_ids": orjson.dumps([self.get_stream_id(stream, realm)]).decode(),
"invite_expires_in_minutes": invite_expires_in_minutes,
"invite_as": PreregistrationUser.INVITE_AS["MEMBER"],
},
subdomain=realm.string_id if realm is not None else "zulip",
)
def check_hamlet_user_query_result(result: "TestHttpResponse") -> None:
assert_user_details_in_html_response(
result, "King Hamlet", self.example_email("hamlet"), "Member"
)
self.assert_in_success_response(
[
f"Admins: {self.example_email('iago')}\n",
f"Owners: {self.example_email('desdemona')}\n",
'class="copy-button" data-copytext="{}">'.format(self.example_email("iago")),
'class="copy-button" data-copytext="{}">'.format(
self.example_email("desdemona")
),
],
result,
)
def check_lear_user_query_result(result: "TestHttpResponse") -> None:
assert_user_details_in_html_response(
result, lear_user.full_name, lear_user.email, "Member"
)
def check_othello_user_query_result(result: "TestHttpResponse") -> None:
assert_user_details_in_html_response(
result, "Othello, the Moor of Venice", self.example_email("othello"), "Member"
)
def check_polonius_user_query_result(result: "TestHttpResponse") -> None:
assert_user_details_in_html_response(
result, "Polonius", self.example_email("polonius"), "Guest"
)
def check_zulip_realm_query_result(result: "TestHttpResponse") -> None:
zulip_realm = get_realm("zulip")
first_human_user = zulip_realm.get_first_human_user()
assert first_human_user is not None
self.assert_in_success_response(
[
f"First human user: {first_human_user.delivery_email}\n",
f'",
'',
'',
'input type="number" name="discount" value="None"',
'',
'',
f'',
'',
'input type="number" name="discount" value="None"',
'',
'',
'scrub-realm-button">',
'data-string-id="lear"',
"Plan name: Zulip Cloud Standard",
"Status: Active",
"Billing schedule: Annual",
"Licenses: 2/10 (Manual)",
"Price per license: $80.00",
"Annual recurring revenue: $800.00",
"Next invoice date: 02 January 2017",
'