management: Add new library for specifying realms.

This new library is intended to make it easy for management commands
to access a realm or a user in a realm without having to duplicate any
of the annoying parsing/extraction code.
This commit is contained in:
Tim Abbott 2017-07-07 11:35:31 -07:00
parent a9d52052f0
commit 982667acf5
5 changed files with 107 additions and 28 deletions

67
zerver/lib/management.py Normal file
View File

@ -0,0 +1,67 @@
# Library code for use in management commands
from __future__ import absolute_import
from __future__ import print_function
from argparse import ArgumentParser
from django.core.exceptions import MultipleObjectsReturned
from django.core.management.base import BaseCommand, CommandError
from typing import Any, Dict, Optional, Text
from zerver.models import get_realm, Realm, UserProfile
def is_integer_string(val):
# type: (str) -> bool
try:
int(val)
return True
except ValueError:
return False
class ZulipBaseCommand(BaseCommand):
def add_realm_args(self, parser, required=False):
# type: (ArgumentParser, bool) -> None
parser.add_argument(
'-r', '--realm',
dest='realm_id',
required=required,
type=str,
help='The numeric or string ID (subdomain) of the Zulip organization to modify.')
def get_realm(self, options):
# type: (Dict[str, Any]) -> Optional[Realm]
val = options["realm_id"]
if val is None:
return None
# If they specified a realm argument, we need to ensure the
# realm exists. We allow two formats: the numeric ID for the
# realm and the string ID of the realm.
try:
if is_integer_string(val):
return Realm.objects.get(id=val)
return get_realm(val)
except Realm.DoesNotExist:
raise CommandError("The is no realm with id '%s'. Aborting." %
(options["realm_id"],))
def get_user(self, email, realm):
# type: (Text, Optional[Realm]) -> UserProfile
# If a realm is specified, try to find the user there, and
# throw an error if they don't exist.
if realm is not None:
try:
return UserProfile.objects.select_related().get(email__iexact=email.strip(), realm=realm)
except UserProfile.DoesNotExist:
raise CommandError("The realm '%s' does not contain a user with email '%s'" % (realm, email))
# Realm is None in the remaining code path. Here, we
# optimistically try to see if there is exactly one user with
# that email; if so, we'll return it.
try:
return UserProfile.objects.select_related().get(email__iexact=email.strip())
except MultipleObjectsReturned:
raise CommandError("This Zulip server contains multiple users with that email " +
"(in different realms); please pass `--realm` to specify which one to modify.")
except UserProfile.DoesNotExist:
raise CommandError("This Zulip server does not contain a user with email '%s'" % (email,))

View File

@ -8,18 +8,15 @@ from typing import Any
from django.core.management.base import BaseCommand, CommandParser
from zerver.lib.actions import create_stream_if_needed, bulk_add_subscriptions
from zerver.lib.management import ZulipBaseCommand
from zerver.models import UserProfile, get_realm, get_user_profile_by_email
class Command(BaseCommand):
class Command(ZulipBaseCommand):
help = """Add some or all users in a realm to a set of streams."""
def add_arguments(self, parser):
# type: (CommandParser) -> None
parser.add_argument(
'-r', '--realm',
dest='string_id',
type=str,
help='The name of the realm in which you are adding people to streams.')
self.add_realm_args(parser)
parser.add_argument(
'-s', '--streams',
@ -42,13 +39,13 @@ class Command(BaseCommand):
def handle(self, **options):
# type: (**Any) -> None
if options["string_id"] is None or options["streams"] is None or \
if options["streams"] is None or \
(options["users"] is None and options["all_users"] is None):
self.print_help("./manage.py", "add_users_to_streams")
exit(1)
stream_names = set([stream.strip() for stream in options["streams"].split(",")])
realm = get_realm(options["string_id"])
realm = self.get_realm(options)
if options["all_users"]:
user_profiles = UserProfile.objects.filter(realm=realm)
@ -56,7 +53,7 @@ class Command(BaseCommand):
emails = set([email.strip() for email in options["users"].split(",")])
user_profiles = []
for email in emails:
user_profiles.append(get_user_profile_by_email(email))
user_profiles.append(self.get_user(email, realm))
for stream_name in set(stream_names):
for user_profile in user_profiles:

View File

@ -4,17 +4,17 @@ from __future__ import print_function
import sys
from argparse import ArgumentParser
from django.core.management.base import BaseCommand
from typing import Any
from zerver.lib.actions import do_change_user_email
from zerver.models import UserProfile, get_user_for_mgmt
from zerver.lib.management import ZulipBaseCommand
class Command(BaseCommand):
class Command(ZulipBaseCommand):
help = """Change the email address for a user."""
def add_arguments(self, parser):
# type: (ArgumentParser) -> None
self.add_realm_args(parser)
parser.add_argument('old_email', metavar='<old email>', type=str,
help='email address to change')
parser.add_argument('new_email', metavar='<new email>', type=str,
@ -24,10 +24,8 @@ class Command(BaseCommand):
# type: (*Any, **str) -> None
old_email = options['old_email']
new_email = options['new_email']
try:
user_profile = get_user_for_mgmt(old_email)
except UserProfile.DoesNotExist:
print("Old e-mail doesn't exist in the system.")
sys.exit(1)
realm = self.get_realm(options)
user_profile = self.get_user(old_email, realm)
do_change_user_email(user_profile, new_email)

View File

@ -11,7 +11,7 @@ from django.conf import settings
from django.contrib.auth.models import AbstractBaseUser, UserManager, \
PermissionsMixin
import django.contrib.auth
from django.core.exceptions import ValidationError, MultipleObjectsReturned
from django.core.exceptions import ValidationError
from django.core.validators import URLValidator, MinLengthValidator, \
RegexValidator
from django.dispatch import receiver
@ -1414,16 +1414,6 @@ def get_user(email, realm):
# type: (Text, Realm) -> UserProfile
return UserProfile.objects.select_related().get(email__iexact=email.strip(), realm=realm)
def get_user_for_mgmt(email, realm=None):
# type: (Text, Optional[Realm]) -> UserProfile
if realm is not None:
return get_user(email, realm)
try:
return UserProfile.objects.select_related().get(email__iexact=email.strip())
except MultipleObjectsReturned:
logging.warning("Please specify the realm as this email is a member of multiple realms.")
sys.exit(1)
@cache_with_key(bot_profile_cache_key, timeout=3600*24*7)
def get_system_bot(email):
# type: (Text) -> UserProfile

View File

@ -10,11 +10,38 @@ from six.moves import map, filter
from django.conf import settings
from django.core.management import call_command
from django.test import TestCase
from zerver.lib.management import ZulipBaseCommand, CommandError
from zerver.lib.test_classes import ZulipTestCase
from zerver.lib.test_helpers import stdout_suppressed
from zerver.models import get_realm
from confirmation.models import RealmCreationKey, generate_realm_creation_url
class TestZulipBaseCommand(ZulipTestCase):
def test_get_realm(self):
# type: () -> None
command = ZulipBaseCommand()
self.assertEqual(command.get_realm(dict(realm_id='zulip')), get_realm("zulip"))
self.assertEqual(command.get_realm(dict(realm_id=None)), None)
self.assertEqual(command.get_realm(dict(realm_id='1')), get_realm("zulip"))
with self.assertRaisesRegex(CommandError, "The is no realm with id"):
command.get_realm(dict(realm_id='17'))
def test_get_user(self):
# type: () -> None
command = ZulipBaseCommand()
zulip_realm = get_realm("zulip")
mit_realm = get_realm("zephyr")
user_profile = self.example_user("hamlet")
email = user_profile.email
self.assertEqual(command.get_user(email, zulip_realm), user_profile)
self.assertEqual(command.get_user(email, None), user_profile)
with self.assertRaisesRegex(CommandError, "The realm '<Realm: zephyr 2>' does not contain a user with email"):
self.assertEqual(command.get_user(email, mit_realm), user_profile)
with self.assertRaisesRegex(CommandError, "server does not contain a user with email"):
self.assertEqual(command.get_user('invalid_email@example.com', None), user_profile)
# TODO: Add a test for the MultipleObjectsReturned case once we make that possible.
class TestCommandsCanStart(TestCase):
def setUp(self):