user_groups: Update code to check whether user can edit a user group.

Earlier there was only a realm level setting for configuring
who can edit user groups. A new group level setting is also added
for configuring who can manage that particular group.
Now, a user group can be edited by a user if it is allowed from
realm level setting or group level setting.

This commit make changes to also use group level setting
in determining whether a group can be edited by user or not.

Also, updated tests to use api_post and api_delete helpers instead
of using client_post and client_delete helpers with different users
being logged in.
This commit is contained in:
Ujjawal Modi 2023-07-17 13:13:11 +05:30 committed by Tim Abbott
parent 03220ba456
commit 423d5c42f6
5 changed files with 311 additions and 42 deletions

View File

@ -658,25 +658,6 @@ def require_member_or_admin(
return _wrapped_view_func
def require_user_group_edit_permission(
view_func: Callable[Concatenate[HttpRequest, UserProfile, ParamT], HttpResponse],
) -> Callable[Concatenate[HttpRequest, UserProfile, ParamT], HttpResponse]:
@require_member_or_admin
@wraps(view_func)
def _wrapped_view_func(
request: HttpRequest,
user_profile: UserProfile,
/,
*args: ParamT.args,
**kwargs: ParamT.kwargs,
) -> HttpResponse:
if not user_profile.can_edit_user_groups():
raise JsonableError(_("Insufficient permission"))
return view_func(request, user_profile, *args, **kwargs)
return _wrapped_view_func
def require_user_group_create_permission(
view_func: Callable[Concatenate[HttpRequest, UserProfile, ParamT], HttpResponse],
) -> Callable[Concatenate[HttpRequest, UserProfile, ParamT], HttpResponse]:

View File

@ -90,15 +90,20 @@ def has_user_group_access(
if user_group.is_system_group:
return False
can_edit_all_user_groups = user_profile.can_edit_all_user_groups()
group_member_ids = get_user_group_direct_member_ids(user_group)
if (
not user_profile.is_realm_admin
and not user_profile.is_moderator
and user_profile.id not in group_member_ids
):
return False
can_edit_all_user_groups = False
return True
if can_edit_all_user_groups:
return True
return is_user_in_group(user_group.can_manage_group, user_profile)
def access_user_group_by_id(

View File

@ -847,7 +847,7 @@ class UserProfile(AbstractBaseUser, PermissionsMixin, UserBaseSettings):
def can_create_user_groups(self) -> bool:
return self.has_permission("user_group_edit_policy")
def can_edit_user_groups(self) -> bool:
def can_edit_all_user_groups(self) -> bool:
return self.has_permission("user_group_edit_policy")
def can_move_messages_to_another_topic(self) -> bool:

View File

@ -11,8 +11,11 @@ from zerver.actions.create_realm import do_create_realm
from zerver.actions.realm_settings import do_set_realm_property
from zerver.actions.user_groups import (
add_subgroups_to_user_group,
bulk_add_members_to_user_groups,
bulk_remove_members_from_user_groups,
check_add_user_group,
create_user_group_in_database,
do_change_user_group_permission_setting,
promote_new_full_members,
)
from zerver.actions.users import do_deactivate_user
@ -28,6 +31,7 @@ from zerver.lib.user_groups import (
get_recursive_membership_groups,
get_recursive_strict_subgroups,
get_recursive_subgroups,
get_role_based_system_groups_dict,
get_subgroup_ids,
get_user_group_member_ids,
has_user_group_access,
@ -38,6 +42,7 @@ from zerver.lib.user_groups import (
from zerver.models import (
GroupGroupMembership,
NamedUserGroup,
Realm,
UserGroup,
UserGroupMembership,
UserProfile,
@ -628,9 +633,9 @@ class UserGroupAPITestCase(UserGroupTestCase):
NamedUserGroup.objects.filter(realm=user_profile.realm).count(),
)
def test_can_edit_user_groups(self) -> None:
def test_can_edit_all_user_groups(self) -> None:
def validation_func(user_profile: UserProfile) -> bool:
return user_profile.can_edit_user_groups()
return user_profile.can_edit_all_user_groups()
self.check_has_permission_policies("user_group_edit_policy", validation_func)
@ -1386,7 +1391,7 @@ class UserGroupAPITestCase(UserGroupTestCase):
othello.save()
check_create_user_group("othello")
def test_user_group_edit_policy_for_deleting_user_group(self) -> None:
def test_realm_level_policy_for_deleting_user_group(self) -> None:
hamlet = self.example_user("hamlet")
realm = hamlet.realm
@ -1465,7 +1470,95 @@ class UserGroupAPITestCase(UserGroupTestCase):
othello.save()
check_delete_user_group("othello")
def test_user_group_edit_policy_for_updating_user_groups(self) -> None:
def test_group_level_setting_for_deleting_user_groups(self) -> None:
othello = self.example_user("othello")
realm = othello.realm
hamlet = self.example_user("hamlet")
leadership_group = check_add_user_group(
get_realm("zulip"), "leadership", [hamlet], acting_user=hamlet
)
def check_delete_user_group(acting_user: str, error_msg: str | None = None) -> None:
user_group = NamedUserGroup.objects.get(name="support")
with transaction.atomic():
result = self.api_delete(
self.example_user(acting_user), f"/api/v1/user_groups/{user_group.id}"
)
if error_msg is None:
self.assert_json_success(result)
else:
self.assert_json_error(result, error_msg)
do_set_realm_property(
realm,
"user_group_edit_policy",
Realm.POLICY_NOBODY,
acting_user=None,
)
support_group = check_add_user_group(
get_realm("zulip"), "support", [othello], acting_user=None
)
# Default value of can_manage_group is "Nobody" system group.
check_delete_user_group("desdemona", "Insufficient permission")
check_delete_user_group("othello", "Insufficient permission")
system_group_dict = get_role_based_system_groups_dict(realm)
owners_group = system_group_dict[SystemGroups.OWNERS]
do_change_user_group_permission_setting(
support_group, "can_manage_group", owners_group, acting_user=None
)
check_delete_user_group("iago", "Insufficient permission")
check_delete_user_group("desdemona")
check_add_user_group(
get_realm("zulip"),
"support",
[othello],
"",
{"can_manage_group": system_group_dict[SystemGroups.MEMBERS]},
acting_user=None,
)
check_delete_user_group("polonius", "Not allowed for guest users")
check_delete_user_group("cordelia")
setting_group = self.create_or_update_anonymous_group_for_setting(
[self.example_user("cordelia")], [leadership_group, owners_group]
)
check_add_user_group(
get_realm("zulip"),
"support",
[othello],
"",
{"can_manage_group": setting_group},
acting_user=None,
)
check_delete_user_group("iago", "Insufficient permission")
check_delete_user_group("hamlet")
check_add_user_group(
get_realm("zulip"),
"support",
[othello],
"",
{"can_manage_group": setting_group},
acting_user=None,
)
check_delete_user_group("cordelia")
check_add_user_group(
get_realm("zulip"),
"support",
[othello],
"",
{"can_manage_group": setting_group},
acting_user=None,
)
check_delete_user_group("desdemona")
def test_realm_level_setting_for_updating_user_groups(self) -> None:
othello = self.example_user("othello")
self.login("othello")
params = {
@ -1482,7 +1575,6 @@ class UserGroupAPITestCase(UserGroupTestCase):
acting_user: str,
error_msg: str | None = None,
) -> None:
self.login(acting_user)
params = {
"name": new_name,
"description": new_description,
@ -1491,7 +1583,9 @@ class UserGroupAPITestCase(UserGroupTestCase):
self.assertNotEqual(user_group.name, new_name)
self.assertNotEqual(user_group.description, new_description)
result = self.client_patch(f"/json/user_groups/{user_group.id}", info=params)
result = self.api_patch(
self.example_user(acting_user), f"/api/v1/user_groups/{user_group.id}", info=params
)
if error_msg is None:
self.assert_json_success(result)
user_group.refresh_from_db()
@ -1568,17 +1662,105 @@ class UserGroupAPITestCase(UserGroupTestCase):
othello.save()
check_update_user_group("support", "Support team", "othello")
def test_user_group_edit_policy_for_updating_members(self) -> None:
def test_group_level_setting_for_updating_user_groups(self) -> None:
othello = self.example_user("othello")
iago = self.example_user("iago")
user_group = check_add_user_group(
get_realm("zulip"), "support", [othello, iago], acting_user=othello
)
hamlet = self.example_user("hamlet")
leadership_group = check_add_user_group(
get_realm("zulip"), "leadership", [hamlet], acting_user=hamlet
)
def check_update_user_group(
new_name: str,
new_description: str,
acting_user: str,
error_msg: str | None = None,
) -> None:
params = {
"name": new_name,
"description": new_description,
}
# Ensure that this update request is not a no-op.
self.assertNotEqual(user_group.name, new_name)
self.assertNotEqual(user_group.description, new_description)
result = self.api_patch(
self.example_user(acting_user), f"/api/v1/user_groups/{user_group.id}", info=params
)
if error_msg is None:
self.assert_json_success(result)
user_group.refresh_from_db()
self.assertEqual(user_group.name, new_name)
self.assertEqual(user_group.description, new_description)
else:
self.assert_json_error(result, error_msg)
realm = othello.realm
do_set_realm_property(
realm,
"user_group_edit_policy",
Realm.POLICY_NOBODY,
acting_user=None,
)
# Default value of can_manage_group is "Nobody" system group.
check_update_user_group("help", "Troubleshooting team", "iago", "Insufficient permission")
check_update_user_group(
"help", "Troubleshooting team", "othello", "Insufficient permission"
)
system_group_dict = get_role_based_system_groups_dict(realm)
owners_group = system_group_dict[SystemGroups.OWNERS]
do_change_user_group_permission_setting(
user_group, "can_manage_group", owners_group, acting_user=None
)
check_update_user_group("help", "Troubleshooting team", "iago", "Insufficient permission")
check_update_user_group("help", "Troubleshooting team", "desdemona")
user_group.can_manage_group = system_group_dict[SystemGroups.MEMBERS]
user_group.save()
check_update_user_group(
"support", "Support team", "polonius", "Not allowed for guest users"
)
check_update_user_group(
"support",
"Support team",
"cordelia",
)
setting_group = self.create_or_update_anonymous_group_for_setting(
[self.example_user("cordelia")], [leadership_group, owners_group]
)
do_change_user_group_permission_setting(
user_group, "can_manage_group", setting_group, acting_user=None
)
check_update_user_group("help", "Troubleshooting team", "iago", "Insufficient permission")
check_update_user_group("help", "Troubleshooting team", "hamlet")
check_update_user_group(
"support",
"Support team",
"cordelia",
)
check_update_user_group("help", "Troubleshooting team", "desdemona")
def test_realm_level_setting_for_updating_members(self) -> None:
user_group = self.create_user_group_for_test("support")
aaron = self.example_user("aaron")
othello = self.example_user("othello")
cordelia = self.example_user("cordelia")
def check_adding_members_to_group(acting_user: str, error_msg: str | None = None) -> None:
self.login(acting_user)
params = {"add": orjson.dumps([aaron.id]).decode()}
self.assert_user_membership(user_group, [othello])
result = self.client_post(f"/json/user_groups/{user_group.id}/members", info=params)
result = self.api_post(
self.example_user(acting_user),
f"/api/v1/user_groups/{user_group.id}/members",
info=params,
)
if error_msg is None:
self.assert_json_success(result)
self.assert_user_membership(user_group, [aaron, othello])
@ -1588,10 +1770,13 @@ class UserGroupAPITestCase(UserGroupTestCase):
def check_removing_members_from_group(
acting_user: str, error_msg: str | None = None
) -> None:
self.login(acting_user)
params = {"delete": orjson.dumps([aaron.id]).decode()}
self.assert_user_membership(user_group, [aaron, othello])
result = self.client_post(f"/json/user_groups/{user_group.id}/members", info=params)
result = self.api_post(
self.example_user(acting_user),
f"/api/v1/user_groups/{user_group.id}/members",
info=params,
)
if error_msg is None:
self.assert_json_success(result)
self.assert_user_membership(user_group, [othello])
@ -1675,6 +1860,108 @@ class UserGroupAPITestCase(UserGroupTestCase):
othello.save()
check_removing_members_from_group("othello")
def test_group_level_setting_for_updating_members(self) -> None:
othello = self.example_user("othello")
user_group = check_add_user_group(
get_realm("zulip"), "support", [othello], acting_user=None
)
hamlet = self.example_user("hamlet")
leadership_group = check_add_user_group(
get_realm("zulip"), "leadership", [hamlet], acting_user=None
)
aaron = self.example_user("aaron")
def check_adding_members_to_group(acting_user: str, error_msg: str | None = None) -> None:
params = {"add": orjson.dumps([aaron.id]).decode()}
self.assert_user_membership(user_group, [othello])
result = self.api_post(
self.example_user(acting_user),
f"/api/v1/user_groups/{user_group.id}/members",
info=params,
)
if error_msg is None:
self.assert_json_success(result)
self.assert_user_membership(user_group, [aaron, othello])
else:
self.assert_json_error(result, error_msg)
def check_removing_members_from_group(
acting_user: str, error_msg: str | None = None
) -> None:
params = {"delete": orjson.dumps([aaron.id]).decode()}
self.assert_user_membership(user_group, [aaron, othello])
result = self.api_post(
self.example_user(acting_user),
f"/api/v1/user_groups/{user_group.id}/members",
info=params,
)
if error_msg is None:
self.assert_json_success(result)
self.assert_user_membership(user_group, [othello])
else:
self.assert_json_error(result, error_msg)
realm = get_realm("zulip")
do_set_realm_property(
realm,
"user_group_edit_policy",
Realm.POLICY_NOBODY,
acting_user=None,
)
# Default value of can_manage_group is "Nobody system group".
check_adding_members_to_group("iago", "Insufficient permission")
check_adding_members_to_group("othello", "Insufficient permission")
# Add aaron to group so that we can test the removal.
bulk_add_members_to_user_groups([user_group], [aaron.id], acting_user=None)
check_removing_members_from_group("iago", "Insufficient permission")
check_removing_members_from_group("othello", "Insufficient permission")
# Remove aaron from group to add them again in further tests.
bulk_remove_members_from_user_groups([user_group], [aaron.id], acting_user=None)
system_group_dict = get_role_based_system_groups_dict(realm)
owners_group = system_group_dict[SystemGroups.OWNERS]
do_change_user_group_permission_setting(
user_group, "can_manage_group", owners_group, acting_user=None
)
check_adding_members_to_group("iago", "Insufficient permission")
check_adding_members_to_group("desdemona")
check_removing_members_from_group("iago", "Insufficient permission")
check_removing_members_from_group("desdemona")
members_group = system_group_dict[SystemGroups.MEMBERS]
do_change_user_group_permission_setting(
user_group, "can_manage_group", members_group, acting_user=None
)
check_adding_members_to_group("polonius", "Not allowed for guest users")
check_adding_members_to_group("cordelia")
check_removing_members_from_group("polonius", "Not allowed for guest users")
check_removing_members_from_group("cordelia")
setting_group = self.create_or_update_anonymous_group_for_setting(
[self.example_user("cordelia")], [leadership_group, owners_group]
)
do_change_user_group_permission_setting(
user_group, "can_manage_group", setting_group, acting_user=None
)
check_adding_members_to_group("iago", "Insufficient permission")
check_adding_members_to_group("hamlet")
check_removing_members_from_group("iago", "Insufficient permission")
check_removing_members_from_group("hamlet")
check_adding_members_to_group("cordelia")
check_removing_members_from_group("cordelia")
check_adding_members_to_group("desdemona")
check_removing_members_from_group("desdemona")
def test_editing_system_user_groups(self) -> None:
desdemona = self.example_user("desdemona")
iago = self.example_user("iago")

View File

@ -17,11 +17,7 @@ from zerver.actions.user_groups import (
do_update_user_group_name,
remove_subgroups_from_user_group,
)
from zerver.decorator import (
require_member_or_admin,
require_user_group_create_permission,
require_user_group_edit_permission,
)
from zerver.decorator import require_member_or_admin, require_user_group_create_permission
from zerver.lib.exceptions import JsonableError
from zerver.lib.mention import MentionBackend, silent_mention_syntax_for_user
from zerver.lib.response import json_success
@ -101,7 +97,7 @@ def get_user_group(request: HttpRequest, user_profile: UserProfile) -> HttpRespo
@transaction.atomic(durable=True)
@require_user_group_edit_permission
@require_member_or_admin
@typed_endpoint
def edit_user_group(
request: HttpRequest,
@ -164,7 +160,7 @@ def edit_user_group(
return json_success(request)
@require_user_group_edit_permission
@require_member_or_admin
@typed_endpoint
def delete_user_group(
request: HttpRequest,
@ -180,7 +176,7 @@ def delete_user_group(
return json_success(request)
@require_user_group_edit_permission
@require_member_or_admin
@typed_endpoint
def update_user_group_backend(
request: HttpRequest,
@ -386,7 +382,7 @@ def remove_subgroups_from_group_backend(
return json_success(request)
@require_user_group_edit_permission
@require_member_or_admin
@typed_endpoint
def update_subgroups_of_user_group(
request: HttpRequest,