zulip/zerver/lib/user_groups.py

287 lines
11 KiB
Python

from typing import Dict, Iterable, List, Sequence, TypedDict
from django.db import transaction
from django.db.models import F, QuerySet
from django.utils.translation import gettext as _
from django_cte import With
from django_stubs_ext import ValuesQuerySet
from zerver.lib.exceptions import JsonableError
from zerver.models import GroupGroupMembership, Realm, UserGroup, UserGroupMembership, UserProfile
class UserGroupDict(TypedDict):
id: int
name: str
description: str
members: List[int]
direct_subgroup_ids: List[int]
is_system_group: bool
def access_user_group_by_id(
user_group_id: int, user_profile: UserProfile, *, for_read: bool = False
) -> UserGroup:
try:
user_group = UserGroup.objects.get(id=user_group_id, realm=user_profile.realm)
if for_read and not user_profile.is_guest:
# Everyone is allowed to read a user group and check who
# are its members. Guests should be unable to reach this
# code path, since they can't access user groups API
# endpoints, but we check for guests here for defense in
# depth.
return user_group
if user_group.is_system_group:
raise JsonableError(_("Insufficient permission"))
group_member_ids = get_user_group_direct_member_ids(user_group)
if (
not user_profile.is_realm_admin
and not user_profile.is_moderator
and user_profile.id not in group_member_ids
):
raise JsonableError(_("Insufficient permission"))
except UserGroup.DoesNotExist:
raise JsonableError(_("Invalid user group"))
return user_group
def access_user_groups_as_potential_subgroups(
user_group_ids: Sequence[int], acting_user: UserProfile
) -> List[UserGroup]:
user_groups = UserGroup.objects.filter(id__in=user_group_ids, realm=acting_user.realm)
valid_group_ids = [group.id for group in user_groups]
invalid_group_ids = [group_id for group_id in user_group_ids if group_id not in valid_group_ids]
if invalid_group_ids:
raise JsonableError(_("Invalid user group ID: {}").format(invalid_group_ids[0]))
return list(user_groups)
def access_user_group_for_setting(
user_group_id: int,
user_profile: UserProfile,
*,
setting_name: str,
require_system_group: bool = False,
allow_internet_group: bool = False,
allow_owners_group: bool = False,
allow_nobody_group: bool = True,
) -> UserGroup:
user_group = access_user_group_by_id(user_group_id, user_profile, for_read=True)
if require_system_group and not user_group.is_system_group:
raise JsonableError(_("'{}' must be a system user group.").format(setting_name))
if not allow_internet_group and user_group.name == UserGroup.EVERYONE_ON_INTERNET_GROUP_NAME:
raise JsonableError(
_("'{}' setting cannot be set to '@role:internet' group.").format(setting_name)
)
if not allow_owners_group and user_group.name == UserGroup.OWNERS_GROUP_NAME:
raise JsonableError(
_("'{}' setting cannot be set to '@role:owners' group.").format(setting_name)
)
if not allow_nobody_group and user_group.name == UserGroup.NOBODY_GROUP_NAME:
raise JsonableError(
_("'{}' setting cannot be set to '@role:nobody' group.").format(setting_name)
)
return user_group
def user_groups_in_realm_serialized(realm: Realm) -> List[UserGroupDict]:
"""This function is used in do_events_register code path so this code
should be performant. We need to do 2 database queries because
Django's ORM doesn't properly support the left join between
UserGroup and UserGroupMembership that we need.
"""
realm_groups = UserGroup.objects.filter(realm=realm)
group_dicts: Dict[int, UserGroupDict] = {}
for user_group in realm_groups:
group_dicts[user_group.id] = dict(
id=user_group.id,
name=user_group.name,
description=user_group.description,
members=[],
direct_subgroup_ids=[],
is_system_group=user_group.is_system_group,
)
membership = UserGroupMembership.objects.filter(user_group__realm=realm).values_list(
"user_group_id", "user_profile_id"
)
for user_group_id, user_profile_id in membership:
group_dicts[user_group_id]["members"].append(user_profile_id)
group_membership = GroupGroupMembership.objects.filter(subgroup__realm=realm).values_list(
"subgroup_id", "supergroup_id"
)
for subgroup_id, supergroup_id in group_membership:
group_dicts[supergroup_id]["direct_subgroup_ids"].append(subgroup_id)
for group_dict in group_dicts.values():
group_dict["members"] = sorted(group_dict["members"])
group_dict["direct_subgroup_ids"] = sorted(group_dict["direct_subgroup_ids"])
return sorted(group_dicts.values(), key=lambda group_dict: group_dict["id"])
def get_direct_user_groups(user_profile: UserProfile) -> List[UserGroup]:
return list(user_profile.direct_groups.all())
def get_user_group_direct_member_ids(
user_group: UserGroup,
) -> ValuesQuerySet[UserGroupMembership, int]:
return UserGroupMembership.objects.filter(user_group=user_group).values_list(
"user_profile_id", flat=True
)
def get_user_group_direct_members(user_group: UserGroup) -> QuerySet[UserProfile]:
return user_group.direct_members.all()
def get_direct_memberships_of_users(user_group: UserGroup, members: List[UserProfile]) -> List[int]:
return list(
UserGroupMembership.objects.filter(
user_group=user_group, user_profile__in=members
).values_list("user_profile_id", flat=True)
)
# These recursive lookups use standard PostgreSQL common table
# expression (CTE) queries. These queries use the django-cte library,
# because upstream Django does not yet support CTE.
#
# https://www.postgresql.org/docs/current/queries-with.html
# https://pypi.org/project/django-cte/
# https://code.djangoproject.com/ticket/28919
def get_recursive_subgroups(user_group: UserGroup) -> QuerySet[UserGroup]:
cte = With.recursive(
lambda cte: UserGroup.objects.filter(id=user_group.id)
.values(group_id=F("id"))
.union(cte.join(UserGroup, direct_supergroups=cte.col.group_id).values(group_id=F("id")))
)
return cte.join(UserGroup, id=cte.col.group_id).with_cte(cte)
def get_recursive_group_members(user_group: UserGroup) -> QuerySet[UserProfile]:
return UserProfile.objects.filter(direct_groups__in=get_recursive_subgroups(user_group))
def get_recursive_membership_groups(user_profile: UserProfile) -> QuerySet[UserGroup]:
cte = With.recursive(
lambda cte: user_profile.direct_groups.values(group_id=F("id")).union(
cte.join(UserGroup, direct_subgroups=cte.col.group_id).values(group_id=F("id"))
)
)
return cte.join(UserGroup, id=cte.col.group_id).with_cte(cte)
def is_user_in_group(
user_group: UserGroup, user: UserProfile, *, direct_member_only: bool = False
) -> bool:
if direct_member_only:
return get_user_group_direct_members(user_group=user_group).filter(id=user.id).exists()
return get_recursive_group_members(user_group=user_group).filter(id=user.id).exists()
def get_user_group_member_ids(
user_group: UserGroup, *, direct_member_only: bool = False
) -> List[int]:
if direct_member_only:
member_ids: Iterable[int] = get_user_group_direct_member_ids(user_group)
else:
member_ids = get_recursive_group_members(user_group).values_list("id", flat=True)
return list(member_ids)
def get_subgroup_ids(user_group: UserGroup, *, direct_subgroup_only: bool = False) -> List[int]:
if direct_subgroup_only:
subgroup_ids = user_group.direct_subgroups.all().values_list("id", flat=True)
else:
subgroup_ids = (
get_recursive_subgroups(user_group)
.exclude(id=user_group.id)
.values_list("id", flat=True)
)
return list(subgroup_ids)
@transaction.atomic(savepoint=False)
def create_system_user_groups_for_realm(realm: Realm) -> Dict[int, UserGroup]:
"""Any changes to this function likely require a migration to adjust
existing realms. See e.g. migration 0382_create_role_based_system_groups.py,
which is a copy of this function from when we introduced system groups.
"""
role_system_groups_dict: Dict[int, UserGroup] = {}
for role in UserGroup.SYSTEM_USER_GROUP_ROLE_MAP:
user_group_params = UserGroup.SYSTEM_USER_GROUP_ROLE_MAP[role]
user_group = UserGroup(
name=user_group_params["name"],
description=user_group_params["description"],
realm=realm,
is_system_group=True,
)
role_system_groups_dict[role] = user_group
full_members_system_group = UserGroup(
name=UserGroup.FULL_MEMBERS_GROUP_NAME,
description="Members of this organization, not including new accounts and guests",
realm=realm,
is_system_group=True,
)
everyone_on_internet_system_group = UserGroup(
name=UserGroup.EVERYONE_ON_INTERNET_GROUP_NAME,
description="Everyone on the Internet",
realm=realm,
is_system_group=True,
)
nobody_system_group = UserGroup(
name=UserGroup.NOBODY_GROUP_NAME,
description="Nobody",
realm=realm,
is_system_group=True,
)
# Order of this list here is important to create correct GroupGroupMembership objects
system_user_groups_list = [
nobody_system_group,
role_system_groups_dict[UserProfile.ROLE_REALM_OWNER],
role_system_groups_dict[UserProfile.ROLE_REALM_ADMINISTRATOR],
role_system_groups_dict[UserProfile.ROLE_MODERATOR],
full_members_system_group,
role_system_groups_dict[UserProfile.ROLE_MEMBER],
role_system_groups_dict[UserProfile.ROLE_GUEST],
everyone_on_internet_system_group,
]
UserGroup.objects.bulk_create(system_user_groups_list)
subgroup_objects = []
# "Nobody" system group is not a subgroup of any user group, since it is already empty.
subgroup, remaining_groups = system_user_groups_list[1], system_user_groups_list[2:]
for supergroup in remaining_groups:
subgroup_objects.append(GroupGroupMembership(subgroup=subgroup, supergroup=supergroup))
subgroup = supergroup
GroupGroupMembership.objects.bulk_create(subgroup_objects)
return role_system_groups_dict
def get_system_user_group_for_user(user_profile: UserProfile) -> UserGroup:
system_user_group_name = UserGroup.SYSTEM_USER_GROUP_ROLE_MAP[user_profile.role]["name"]
system_user_group = UserGroup.objects.get(
name=system_user_group_name, realm=user_profile.realm, is_system_group=True
)
return system_user_group