2018-09-04 20:23:44 +02:00
|
|
|
from typing import Dict, List, Optional, Union, cast
|
2017-02-08 04:39:55 +01:00
|
|
|
|
2017-11-16 02:28:50 +01:00
|
|
|
from django.db.models.query import QuerySet
|
2017-02-08 04:39:55 +01:00
|
|
|
from django.utils.translation import ugettext as _
|
2018-06-19 10:55:56 +02:00
|
|
|
from django.conf import settings
|
2017-02-08 04:39:55 +01:00
|
|
|
|
2018-04-04 18:38:37 +02:00
|
|
|
from zerver.lib.cache import generic_bulk_cached_fetch, user_profile_cache_key_id, \
|
|
|
|
user_profile_by_id_cache_key
|
2017-02-08 04:39:55 +01:00
|
|
|
from zerver.lib.request import JsonableError
|
2018-06-19 10:55:56 +02:00
|
|
|
from zerver.lib.avatar import avatar_url
|
2017-11-01 10:04:16 +01:00
|
|
|
from zerver.models import UserProfile, Service, Realm, \
|
2018-09-04 20:23:44 +02:00
|
|
|
get_user_profile_by_id, query_for_ids, get_user_profile_by_id_in_realm, \
|
|
|
|
CustomProfileField
|
2017-02-08 04:39:55 +01:00
|
|
|
|
2018-02-13 11:47:40 +01:00
|
|
|
from zulip_bots.custom_exceptions import ConfigValidationError
|
|
|
|
|
2018-05-11 01:40:23 +02:00
|
|
|
def check_full_name(full_name_raw: str) -> str:
|
2017-02-08 04:51:01 +01:00
|
|
|
full_name = full_name_raw.strip()
|
|
|
|
if len(full_name) > UserProfile.MAX_NAME_LENGTH:
|
|
|
|
raise JsonableError(_("Name too long!"))
|
2017-05-12 04:21:49 +02:00
|
|
|
if len(full_name) < UserProfile.MIN_NAME_LENGTH:
|
|
|
|
raise JsonableError(_("Name too short!"))
|
2017-02-08 04:51:01 +01:00
|
|
|
if list(set(full_name).intersection(UserProfile.NAME_INVALID_CHARS)):
|
|
|
|
raise JsonableError(_("Invalid characters in name!"))
|
|
|
|
return full_name
|
|
|
|
|
2018-10-25 02:01:34 +02:00
|
|
|
# NOTE: We don't try to absolutely prevent 2 bots from having the same
|
|
|
|
# name (e.g. you can get there by reactivating a deactivated bot after
|
|
|
|
# making a new bot with the same name). This is just a check designed
|
|
|
|
# to make it unlikely to happen by accident.
|
bots: Prevent bots from having duplicate full names.
Bots are not allowed to use the same name as
other users in the realm (either bot or human).
This is kind of a big commit, but I wanted to
combine the post/patch (aka add/edit) checks
into one commit, since it's a change in policy
that affects both codepaths.
A lot of the noise is in tests. We had good
coverage on the previous code, including some places
like event testing where we were expediently
not bothering to use different names for
different bots in some longer tests. And then
of course I test some new scenarios that are relevant
with the new policy.
There are two new functions:
check_bot_name_available:
very simple Django query
check_change_bot_full_name:
this diverges from the 3-line
check_change_full_name, where the latter
is still used for the "humans" use case
And then we just call those in appropriate places.
Note that there is still a loophole here
where you can get two bots with the same
name if you reactivate a bot named Fred
that was inactive when the second bot named
Fred was created. Also, we don't attempt
to fix historical data. So this commit
shouldn't be considered any kind of lockdown,
it's just meant to help people from
inadvertently creating two bots of the same
name where they don't intend to. For more
context, we are continuing to allow two
human users in the same realm to have the
same full name, and our code should generally
be tolerant of that possibility. (A good
example is our new mention syntax, which disambiguates
same-named people using ids.)
It's also worth noting that our web app client
doesn't try to scrub full_name from its payload in
situations where the user has actually only modified other
fields in the "Edit bot" UI. Starting here
we just handle this on the server, since it's
easy to fix there, and even if we fixed it in the web
app, there's no guarantee that other clients won't be
just as brute force. It wasn't exactly broken before,
but we'd needlessly write rows to audit tables.
Fixes #10509
2018-09-27 19:25:18 +02:00
|
|
|
def check_bot_name_available(realm_id: int, full_name: str) -> None:
|
|
|
|
dup_exists = UserProfile.objects.filter(
|
|
|
|
realm_id=realm_id,
|
|
|
|
full_name=full_name.strip(),
|
|
|
|
is_active=True,
|
|
|
|
).exists()
|
|
|
|
|
|
|
|
if dup_exists:
|
|
|
|
raise JsonableError(_("Name is already in use!"))
|
|
|
|
|
2018-05-11 01:40:23 +02:00
|
|
|
def check_short_name(short_name_raw: str) -> str:
|
2017-06-21 13:46:58 +02:00
|
|
|
short_name = short_name_raw.strip()
|
|
|
|
if len(short_name) == 0:
|
|
|
|
raise JsonableError(_("Bad name or username"))
|
|
|
|
return short_name
|
|
|
|
|
2018-02-13 11:47:40 +01:00
|
|
|
def check_valid_bot_config(service_name: str, config_data: Dict[str, str]) -> None:
|
|
|
|
try:
|
|
|
|
from zerver.lib.bot_lib import get_bot_handler
|
|
|
|
bot_handler = get_bot_handler(service_name)
|
|
|
|
if hasattr(bot_handler, 'validate_config'):
|
|
|
|
bot_handler.validate_config(config_data)
|
|
|
|
except ConfigValidationError:
|
|
|
|
# The exception provides a specific error message, but that
|
|
|
|
# message is not tagged translatable, because it is
|
|
|
|
# triggered in the external zulip_bots package.
|
|
|
|
# TODO: Think of some clever way to provide a more specific
|
|
|
|
# error message.
|
|
|
|
raise JsonableError(_("Invalid configuration data!"))
|
|
|
|
|
2018-06-01 17:25:15 +02:00
|
|
|
# Adds an outgoing webhook or embedded bot service.
|
|
|
|
def add_service(name: str, user_profile: UserProfile, base_url: Optional[str]=None,
|
|
|
|
interface: Optional[int]=None, token: Optional[str]=None) -> None:
|
|
|
|
Service.objects.create(name=name,
|
|
|
|
user_profile=user_profile,
|
|
|
|
base_url=base_url,
|
|
|
|
interface=interface,
|
|
|
|
token=token)
|
|
|
|
|
2018-01-29 16:10:54 +01:00
|
|
|
def check_bot_creation_policy(user_profile: UserProfile, bot_type: int) -> None:
|
|
|
|
# Realm administrators can always add bot
|
|
|
|
if user_profile.is_realm_admin:
|
|
|
|
return
|
|
|
|
|
|
|
|
if user_profile.realm.bot_creation_policy == Realm.BOT_CREATION_EVERYONE:
|
|
|
|
return
|
|
|
|
if user_profile.realm.bot_creation_policy == Realm.BOT_CREATION_ADMINS_ONLY:
|
|
|
|
raise JsonableError(_("Must be an organization administrator"))
|
|
|
|
if user_profile.realm.bot_creation_policy == Realm.BOT_CREATION_LIMIT_GENERIC_BOTS and \
|
|
|
|
bot_type == UserProfile.DEFAULT_BOT:
|
|
|
|
raise JsonableError(_("Must be an organization administrator"))
|
|
|
|
|
2017-11-24 16:24:24 +01:00
|
|
|
def check_valid_bot_type(user_profile: UserProfile, bot_type: int) -> None:
|
|
|
|
if bot_type not in user_profile.allowed_bot_types:
|
2017-05-30 19:19:48 +02:00
|
|
|
raise JsonableError(_('Invalid bot type'))
|
2017-07-03 18:35:12 +02:00
|
|
|
|
2018-06-01 17:47:50 +02:00
|
|
|
def check_valid_interface_type(interface_type: Optional[int]) -> None:
|
2017-07-03 18:35:12 +02:00
|
|
|
if interface_type not in Service.ALLOWED_INTERFACE_TYPES:
|
|
|
|
raise JsonableError(_('Invalid interface type'))
|
2017-11-01 10:04:16 +01:00
|
|
|
|
2017-11-16 02:28:50 +01:00
|
|
|
def bulk_get_users(emails: List[str], realm: Optional[Realm],
|
|
|
|
base_query: 'QuerySet[UserProfile]'=None) -> Dict[str, UserProfile]:
|
|
|
|
if base_query is None:
|
|
|
|
assert realm is not None
|
2018-03-22 18:36:10 +01:00
|
|
|
query = UserProfile.objects.filter(realm=realm, is_active=True)
|
2017-11-27 23:33:13 +01:00
|
|
|
realm_id = realm.id
|
|
|
|
else:
|
2017-11-27 23:41:05 +01:00
|
|
|
# WARNING: Currently, this code path only really supports one
|
|
|
|
# version of `base_query` being used (because otherwise,
|
|
|
|
# they'll share the cache, which can screw up the filtering).
|
|
|
|
# If you're using this flow, you'll need to re-do any filters
|
|
|
|
# in base_query in the code itself; base_query is just a perf
|
|
|
|
# optimization.
|
2018-03-22 18:36:10 +01:00
|
|
|
query = base_query
|
2017-11-27 23:33:13 +01:00
|
|
|
realm_id = 0
|
2017-11-16 02:28:50 +01:00
|
|
|
|
|
|
|
def fetch_users_by_email(emails: List[str]) -> List[UserProfile]:
|
|
|
|
# This should be just
|
|
|
|
#
|
|
|
|
# UserProfile.objects.select_related("realm").filter(email__iexact__in=emails,
|
|
|
|
# realm=realm)
|
|
|
|
#
|
|
|
|
# But chaining __in and __iexact doesn't work with Django's
|
|
|
|
# ORM, so we have the following hack to construct the relevant where clause
|
|
|
|
if len(emails) == 0:
|
|
|
|
return []
|
|
|
|
|
|
|
|
upper_list = ", ".join(["UPPER(%s)"] * len(emails))
|
|
|
|
where_clause = "UPPER(zerver_userprofile.email::text) IN (%s)" % (upper_list,)
|
2018-03-22 18:36:10 +01:00
|
|
|
return query.select_related("realm").extra(
|
2017-11-16 02:28:50 +01:00
|
|
|
where=[where_clause],
|
|
|
|
params=emails)
|
|
|
|
|
|
|
|
return generic_bulk_cached_fetch(
|
2017-11-27 23:41:05 +01:00
|
|
|
# Use a separate cache key to protect us from conflicts with
|
|
|
|
# the get_user cache.
|
|
|
|
lambda email: 'bulk_get_users:' + user_profile_cache_key_id(email, realm_id),
|
2017-11-16 02:28:50 +01:00
|
|
|
fetch_users_by_email,
|
|
|
|
[email.lower() for email in emails],
|
|
|
|
id_fetcher=lambda user_profile: user_profile.email.lower()
|
|
|
|
)
|
|
|
|
|
2017-11-01 10:04:16 +01:00
|
|
|
def user_ids_to_users(user_ids: List[int], realm: Realm) -> List[UserProfile]:
|
|
|
|
# TODO: Consider adding a flag to control whether deactivated
|
|
|
|
# users should be included.
|
2018-04-04 18:38:37 +02:00
|
|
|
|
|
|
|
def fetch_users_by_id(user_ids: List[int]) -> List[UserProfile]:
|
2018-04-05 01:31:30 +02:00
|
|
|
if len(user_ids) == 0:
|
2018-04-04 18:38:37 +02:00
|
|
|
return []
|
|
|
|
|
|
|
|
return list(UserProfile.objects.filter(id__in=user_ids).select_related())
|
|
|
|
|
|
|
|
user_profiles_by_id = generic_bulk_cached_fetch(
|
|
|
|
cache_key_function=user_profile_by_id_cache_key,
|
|
|
|
query_function=fetch_users_by_id,
|
|
|
|
object_ids=user_ids
|
|
|
|
) # type: Dict[int, UserProfile]
|
|
|
|
|
|
|
|
found_user_ids = user_profiles_by_id.keys()
|
|
|
|
missed_user_ids = [user_id for user_id in user_ids if user_id not in found_user_ids]
|
|
|
|
if missed_user_ids:
|
|
|
|
raise JsonableError(_("Invalid user ID: %s" % (missed_user_ids[0])))
|
|
|
|
|
|
|
|
user_profiles = list(user_profiles_by_id.values())
|
|
|
|
for user_profile in user_profiles:
|
2017-11-01 10:04:16 +01:00
|
|
|
if user_profile.realm != realm:
|
2018-04-04 18:38:37 +02:00
|
|
|
raise JsonableError(_("Invalid user ID: %s" % (user_profile.id,)))
|
2017-11-01 10:04:16 +01:00
|
|
|
return user_profiles
|
2018-05-28 20:42:31 +02:00
|
|
|
|
|
|
|
def access_bot_by_id(user_profile: UserProfile, user_id: int) -> UserProfile:
|
|
|
|
try:
|
|
|
|
target = get_user_profile_by_id_in_realm(user_id, user_profile.realm)
|
|
|
|
except UserProfile.DoesNotExist:
|
|
|
|
raise JsonableError(_("No such bot"))
|
|
|
|
if not target.is_bot:
|
|
|
|
raise JsonableError(_("No such bot"))
|
|
|
|
if not user_profile.can_admin_user(target):
|
|
|
|
raise JsonableError(_("Insufficient permission"))
|
|
|
|
return target
|
2018-06-04 07:04:19 +02:00
|
|
|
|
|
|
|
def access_user_by_id(user_profile: UserProfile, user_id: int,
|
|
|
|
allow_deactivated: bool=False, allow_bots: bool=False) -> UserProfile:
|
|
|
|
try:
|
|
|
|
target = get_user_profile_by_id_in_realm(user_id, user_profile.realm)
|
|
|
|
except UserProfile.DoesNotExist:
|
|
|
|
raise JsonableError(_("No such user"))
|
|
|
|
if target.is_bot and not allow_bots:
|
|
|
|
raise JsonableError(_("No such user"))
|
|
|
|
if not target.is_active and not allow_deactivated:
|
|
|
|
raise JsonableError(_("User is deactivated"))
|
|
|
|
if not user_profile.can_admin_user(target):
|
|
|
|
raise JsonableError(_("Insufficient permission"))
|
|
|
|
return target
|
2018-06-19 10:55:56 +02:00
|
|
|
|
|
|
|
def get_accounts_for_email(email: str) -> List[Dict[str, Optional[str]]]:
|
|
|
|
if settings.PRODUCTION: # nocoverage
|
|
|
|
return []
|
|
|
|
profiles = UserProfile.objects.select_related('realm').filter(email__iexact=email.strip(),
|
|
|
|
is_active=True,
|
|
|
|
is_bot=False,
|
|
|
|
realm__deactivated=False)
|
|
|
|
return [{"realm_name": profile.realm.name,
|
|
|
|
"string_id": profile.realm.string_id,
|
|
|
|
"full_name": profile.full_name,
|
|
|
|
"avatar": avatar_url(profile)}
|
|
|
|
for profile in profiles]
|
2018-08-01 10:53:40 +02:00
|
|
|
|
|
|
|
def get_api_key(user_profile: UserProfile) -> str:
|
|
|
|
return user_profile.api_key
|
|
|
|
|
|
|
|
def get_all_api_keys(user_profile: UserProfile) -> List[str]:
|
|
|
|
# Users can only have one API key for now
|
|
|
|
return [user_profile.api_key]
|
2018-09-04 20:23:44 +02:00
|
|
|
|
|
|
|
def validate_user_custom_profile_data(realm_id: int,
|
|
|
|
profile_data: List[Dict[str, Union[int, str, List[int]]]]) -> None:
|
|
|
|
# This function validate all custom field values according to their field type.
|
|
|
|
for item in profile_data:
|
|
|
|
field_id = item['id']
|
|
|
|
try:
|
|
|
|
field = CustomProfileField.objects.get(id=field_id)
|
|
|
|
except CustomProfileField.DoesNotExist:
|
|
|
|
raise JsonableError(_('Field id {id} not found.').format(id=field_id))
|
|
|
|
|
|
|
|
validators = CustomProfileField.FIELD_VALIDATORS
|
|
|
|
field_type = field.field_type
|
|
|
|
var_name = '{}'.format(field.name)
|
|
|
|
value = item['value']
|
|
|
|
if field_type in validators:
|
|
|
|
validator = validators[field_type]
|
|
|
|
result = validator(var_name, value)
|
|
|
|
elif field_type == CustomProfileField.CHOICE:
|
|
|
|
choice_field_validator = CustomProfileField.CHOICE_FIELD_VALIDATORS[field_type]
|
|
|
|
field_data = field.field_data
|
|
|
|
result = choice_field_validator(var_name, field_data, value)
|
|
|
|
elif field_type == CustomProfileField.USER:
|
|
|
|
user_field_validator = CustomProfileField.USER_FIELD_VALIDATORS[field_type]
|
|
|
|
result = user_field_validator(realm_id, cast(List[int], value),
|
|
|
|
False)
|
|
|
|
else:
|
|
|
|
raise AssertionError("Invalid field type")
|
|
|
|
|
|
|
|
if result is not None:
|
|
|
|
raise JsonableError(result)
|