mirror of https://github.com/zulip/zulip.git
89 lines
3.8 KiB
Python
89 lines
3.8 KiB
Python
from typing import Any, Dict, List
|
|
|
|
from django.db import transaction
|
|
from django.utils.translation import ugettext as _
|
|
|
|
from zerver.lib.exceptions import JsonableError
|
|
from zerver.models import Realm, UserGroup, UserGroupMembership, UserProfile
|
|
|
|
|
|
def access_user_group_by_id(user_group_id: int, user_profile: UserProfile) -> UserGroup:
|
|
try:
|
|
user_group = UserGroup.objects.get(id=user_group_id, realm=user_profile.realm)
|
|
group_member_ids = get_user_group_members(user_group)
|
|
msg = _("Only group members and organization administrators can administer this group.")
|
|
if (not user_profile.is_realm_admin and user_profile.id not in group_member_ids):
|
|
raise JsonableError(msg)
|
|
except UserGroup.DoesNotExist:
|
|
raise JsonableError(_("Invalid user group"))
|
|
return user_group
|
|
|
|
def user_groups_in_realm(realm: Realm) -> List[UserGroup]:
|
|
user_groups = UserGroup.objects.filter(realm=realm)
|
|
return list(user_groups)
|
|
|
|
def user_groups_in_realm_serialized(realm: Realm) -> List[Dict[str, Any]]:
|
|
"""This function is used in do_events_register code path so this code
|
|
should be performant. We need to do 2 database queries because
|
|
Django's ORM doesn't properly support the left join between
|
|
UserGroup and UserGroupMembership that we need.
|
|
"""
|
|
realm_groups = UserGroup.objects.filter(realm=realm)
|
|
group_dicts: Dict[str, Any] = {}
|
|
for user_group in realm_groups:
|
|
group_dicts[user_group.id] = dict(
|
|
id=user_group.id,
|
|
name=user_group.name,
|
|
description=user_group.description,
|
|
members=[],
|
|
)
|
|
|
|
membership = UserGroupMembership.objects.filter(user_group__realm=realm).values_list(
|
|
'user_group_id', 'user_profile_id')
|
|
for (user_group_id, user_profile_id) in membership:
|
|
group_dicts[user_group_id]['members'].append(user_profile_id)
|
|
for group_dict in group_dicts.values():
|
|
group_dict['members'] = sorted(group_dict['members'])
|
|
|
|
return sorted(group_dicts.values(), key=lambda group_dict: group_dict['id'])
|
|
|
|
def get_user_groups(user_profile: UserProfile) -> List[UserGroup]:
|
|
return list(user_profile.usergroup_set.all())
|
|
|
|
def check_add_user_to_user_group(user_profile: UserProfile, user_group: UserGroup) -> bool:
|
|
member_obj, created = UserGroupMembership.objects.get_or_create(
|
|
user_group=user_group, user_profile=user_profile)
|
|
return created
|
|
|
|
def remove_user_from_user_group(user_profile: UserProfile, user_group: UserGroup) -> int:
|
|
num_deleted, _ = UserGroupMembership.objects.filter(
|
|
user_profile=user_profile, user_group=user_group).delete()
|
|
return num_deleted
|
|
|
|
def check_remove_user_from_user_group(user_profile: UserProfile, user_group: UserGroup) -> bool:
|
|
try:
|
|
num_deleted = remove_user_from_user_group(user_profile, user_group)
|
|
return bool(num_deleted)
|
|
except Exception:
|
|
return False
|
|
|
|
def create_user_group(name: str, members: List[UserProfile], realm: Realm,
|
|
description: str='') -> UserGroup:
|
|
with transaction.atomic():
|
|
user_group = UserGroup.objects.create(name=name, realm=realm,
|
|
description=description)
|
|
UserGroupMembership.objects.bulk_create([
|
|
UserGroupMembership(user_profile=member, user_group=user_group)
|
|
for member in members
|
|
])
|
|
return user_group
|
|
|
|
def get_user_group_members(user_group: UserGroup) -> List[UserProfile]:
|
|
members = UserGroupMembership.objects.filter(user_group=user_group)
|
|
return [member.user_profile.id for member in members]
|
|
|
|
def get_memberships_of_users(user_group: UserGroup, members: List[UserProfile]) -> List[int]:
|
|
return list(UserGroupMembership.objects.filter(
|
|
user_group=user_group,
|
|
user_profile__in=members).values_list('user_profile_id', flat=True))
|