zulip/zerver/views/invite.py

256 lines
9.3 KiB
Python
Raw Normal View History

import re
from typing import Annotated
from django.conf import settings
from django.http import HttpRequest, HttpResponse
from django.utils.timezone import now as timezone_now
from django.utils.translation import gettext as _
from pydantic import Json
from confirmation import settings as confirmation_settings
from zerver.actions.invites import (
do_create_multiuse_invite_link,
do_get_invites_controlled_by_user,
do_invite_users,
do_revoke_multi_use_invite,
do_revoke_user_invite,
do_send_user_invite_email,
)
from zerver.decorator import require_member_or_admin
from zerver.lib.exceptions import InvitationError, JsonableError, OrganizationOwnerRequiredError
from zerver.lib.response import json_success
from zerver.lib.streams import access_stream_by_id
from zerver.lib.typed_endpoint import ApiParamConfig, PathOnly, typed_endpoint
from zerver.lib.typed_endpoint_validators import check_int_in_validator
from zerver.models import MultiuseInvite, PreregistrationUser, Stream, UserProfile
# Convert INVITATION_LINK_VALIDITY_DAYS into minutes.
# Because mypy fails to correctly infer the type of the validator, we want this constant
# to be Optional[int] to avoid a mypy error when using it as the default value.
# https://github.com/python/mypy/issues/13234
INVITATION_LINK_VALIDITY_MINUTES: int | None = 24 * 60 * settings.INVITATION_LINK_VALIDITY_DAYS
def check_role_based_permissions(
invited_as: int, user_profile: UserProfile, *, require_admin: bool
) -> None:
if (
invited_as == PreregistrationUser.INVITE_AS["REALM_OWNER"]
and not user_profile.is_realm_owner
):
raise OrganizationOwnerRequiredError
if require_admin and not user_profile.is_realm_admin:
raise JsonableError(_("Must be an organization administrator"))
@require_member_or_admin
@typed_endpoint
def invite_users_backend(
request: HttpRequest,
user_profile: UserProfile,
*,
invitee_emails_raw: Annotated[str, ApiParamConfig("invitee_emails")],
invite_expires_in_minutes: Json[int | None] = INVITATION_LINK_VALIDITY_MINUTES,
invite_as: Annotated[
Json[int],
check_int_in_validator(list(PreregistrationUser.INVITE_AS.values())),
] = PreregistrationUser.INVITE_AS["MEMBER"],
notify_referrer_on_join: Json[bool] = True,
stream_ids: Json[list[int]],
include_realm_default_subscriptions: Json[bool] = False,
) -> HttpResponse:
if not user_profile.can_invite_users_by_email():
# Guest users case will not be handled here as it will
# be handled by the decorator above.
raise JsonableError(_("Insufficient permission"))
require_admin = invite_as in [
# Owners can only be invited by owners, checked by separate
# logic in check_role_based_permissions.
PreregistrationUser.INVITE_AS["REALM_OWNER"],
PreregistrationUser.INVITE_AS["REALM_ADMIN"],
PreregistrationUser.INVITE_AS["MODERATOR"],
]
check_role_based_permissions(invite_as, user_profile, require_admin=require_admin)
if not invitee_emails_raw:
raise JsonableError(_("You must specify at least one email address."))
invitee_emails = get_invitee_emails_set(invitee_emails_raw)
streams: list[Stream] = []
for stream_id in stream_ids:
try:
(stream, sub) = access_stream_by_id(user_profile, stream_id)
except JsonableError:
raise JsonableError(
_("Invalid channel ID {channel_id}. No invites were sent.").format(
channel_id=stream_id
)
)
streams.append(stream)
if len(streams) and not user_profile.can_subscribe_other_users():
raise JsonableError(_("You do not have permission to subscribe other users to channels."))
skipped = do_invite_users(
user_profile,
invitee_emails,
streams,
notify_referrer_on_join,
invite_expires_in_minutes=invite_expires_in_minutes,
include_realm_default_subscriptions=include_realm_default_subscriptions,
invite_as=invite_as,
)
if skipped:
raise InvitationError(
_(
"Some of those addresses are already using Zulip, "
"so we didn't send them an invitation. We did send "
"invitations to everyone else!"
),
skipped,
sent_invitations=True,
)
return json_success(request)
def get_invitee_emails_set(invitee_emails_raw: str) -> set[str]:
invitee_emails_list = set(re.split(r"[,\n]", invitee_emails_raw))
invitee_emails = set()
for email in invitee_emails_list:
is_email_with_name = re.search(r"<(?P<email>.*)>", email)
if is_email_with_name:
email = is_email_with_name.group("email")
invitee_emails.add(email.strip())
return invitee_emails
@require_member_or_admin
def get_user_invites(request: HttpRequest, user_profile: UserProfile) -> HttpResponse:
all_users = do_get_invites_controlled_by_user(user_profile)
return json_success(request, data={"invites": all_users})
@require_member_or_admin
@typed_endpoint
def revoke_user_invite(
request: HttpRequest, user_profile: UserProfile, *, invite_id: PathOnly[int]
) -> HttpResponse:
try:
prereg_user = PreregistrationUser.objects.get(id=invite_id)
except PreregistrationUser.DoesNotExist:
raise JsonableError(_("No such invitation"))
if prereg_user.realm != user_profile.realm:
raise JsonableError(_("No such invitation"))
if prereg_user.referred_by_id != user_profile.id:
check_role_based_permissions(prereg_user.invited_as, user_profile, require_admin=True)
do_revoke_user_invite(prereg_user)
return json_success(request)
@require_member_or_admin
@typed_endpoint
def revoke_multiuse_invite(
request: HttpRequest, user_profile: UserProfile, *, invite_id: PathOnly[int]
) -> HttpResponse:
try:
invite = MultiuseInvite.objects.get(id=invite_id)
except MultiuseInvite.DoesNotExist:
raise JsonableError(_("No such invitation"))
if invite.realm != user_profile.realm:
raise JsonableError(_("No such invitation"))
if invite.referred_by_id != user_profile.id:
check_role_based_permissions(invite.invited_as, user_profile, require_admin=True)
if invite.status == confirmation_settings.STATUS_REVOKED:
raise JsonableError(_("Invitation has already been revoked"))
do_revoke_multi_use_invite(invite)
return json_success(request)
@require_member_or_admin
@typed_endpoint
def resend_user_invite_email(
request: HttpRequest, user_profile: UserProfile, *, invite_id: PathOnly[int]
) -> HttpResponse:
try:
prereg_user = PreregistrationUser.objects.get(id=invite_id)
except PreregistrationUser.DoesNotExist:
raise JsonableError(_("No such invitation"))
# Structurally, any invitation the user can actually access should
# have a referred_by set for the user who created it.
if prereg_user.referred_by is None or prereg_user.referred_by.realm != user_profile.realm:
raise JsonableError(_("No such invitation"))
if prereg_user.referred_by_id != user_profile.id:
check_role_based_permissions(prereg_user.invited_as, user_profile, require_admin=True)
do_send_user_invite_email(prereg_user, event_time=timezone_now())
return json_success(request)
@require_member_or_admin
@typed_endpoint
def generate_multiuse_invite_backend(
request: HttpRequest,
user_profile: UserProfile,
*,
invite_expires_in_minutes: Json[int | None] = INVITATION_LINK_VALIDITY_MINUTES,
invite_as: Annotated[
Json[int],
check_int_in_validator(list(PreregistrationUser.INVITE_AS.values())),
] = PreregistrationUser.INVITE_AS["MEMBER"],
stream_ids: Json[list[int]] | None = None,
include_realm_default_subscriptions: Json[bool] = False,
) -> HttpResponse:
if stream_ids is None:
stream_ids = []
if not user_profile.can_create_multiuse_invite_to_realm():
# Guest users case will not be handled here as it will
# be handled by the decorator above.
raise JsonableError(_("Insufficient permission"))
require_admin = invite_as in [
# Owners can only be invited by owners, checked by separate
# logic in check_role_based_permissions.
PreregistrationUser.INVITE_AS["REALM_OWNER"],
PreregistrationUser.INVITE_AS["REALM_ADMIN"],
PreregistrationUser.INVITE_AS["MODERATOR"],
]
check_role_based_permissions(invite_as, user_profile, require_admin=require_admin)
streams = []
for stream_id in stream_ids:
try:
(stream, sub) = access_stream_by_id(user_profile, stream_id)
except JsonableError:
raise JsonableError(
_("Invalid channel ID {channel_id}. No invites were sent.").format(
channel_id=stream_id
)
)
streams.append(stream)
if len(streams) and not user_profile.can_subscribe_other_users():
raise JsonableError(_("You do not have permission to subscribe other users to channels."))
invite_link = do_create_multiuse_invite_link(
user_profile,
invite_as,
invite_expires_in_minutes,
include_realm_default_subscriptions,
streams,
)
return json_success(request, data={"invite_link": invite_link})