zerver/lib/actions.py: Change use of typing.Text to str.

This commit is contained in:
Aditya Bansal 2018-05-11 05:31:29 +05:30 committed by Tim Abbott
parent e9f87671e2
commit 97b7075dd2
1 changed files with 132 additions and 132 deletions

View File

@ -1,6 +1,6 @@
from typing import (
AbstractSet, Any, AnyStr, Callable, Dict, Iterable, List, Mapping, MutableMapping,
Optional, Sequence, Set, Text, Tuple, TypeVar, Union, cast
Optional, Sequence, Set, Tuple, TypeVar, Union, cast
)
from mypy_extensions import TypedDict
@ -142,7 +142,7 @@ from operator import itemgetter
# This will be used to type annotate parameters in a function if the function
# works on both str and unicode in python 2 but in python 3 it only works on str.
SizedTextIterable = Union[Sequence[Text], AbstractSet[Text]]
SizedTextIterable = Union[Sequence[str], AbstractSet[str]]
STREAM_ASSIGNMENT_COLORS = [
"#76ce90", "#fae589", "#a6c7e5", "#e79ab5",
@ -257,7 +257,7 @@ def get_topic_history_for_stream(user_profile: UserProfile,
return history
def send_signup_message(sender: UserProfile, admin_realm_signup_notifications_stream: Text,
def send_signup_message(sender: UserProfile, admin_realm_signup_notifications_stream: str,
user_profile: UserProfile, internal: bool=False,
realm: Optional[Realm]=None) -> None:
if internal:
@ -433,7 +433,7 @@ def notify_created_user(user_profile: UserProfile) -> None:
send_event(event, active_user_ids(user_profile.realm_id))
def created_bot_event(user_profile: UserProfile) -> Dict[str, Any]:
def stream_name(stream: Optional[Stream]) -> Optional[Text]:
def stream_name(stream: Optional[Stream]) -> Optional[str]:
if not stream:
return None
return stream.name
@ -466,17 +466,17 @@ def notify_created_bot(user_profile: UserProfile) -> None:
event = created_bot_event(user_profile)
send_event(event, bot_owner_user_ids(user_profile))
def create_users(realm: Realm, name_list: Iterable[Tuple[Text, Text]], bot_type: int=None) -> None:
def create_users(realm: Realm, name_list: Iterable[Tuple[str, str]], bot_type: int=None) -> None:
user_set = set()
for full_name, email in name_list:
short_name = email_to_username(email)
user_set.add((email, full_name, short_name, True))
bulk_create_users(realm, user_set, bot_type)
def do_create_user(email: Text, password: Optional[Text], realm: Realm, full_name: Text,
short_name: Text, is_realm_admin: bool=False, bot_type: Optional[int]=None,
bot_owner: Optional[UserProfile]=None, tos_version: Optional[Text]=None,
timezone: Text="", avatar_source: Text=UserProfile.AVATAR_FROM_GRAVATAR,
def do_create_user(email: str, password: Optional[str], realm: Realm, full_name: str,
short_name: str, is_realm_admin: bool=False, bot_type: Optional[int]=None,
bot_owner: Optional[UserProfile]=None, tos_version: Optional[str]=None,
timezone: str="", avatar_source: str=UserProfile.AVATAR_FROM_GRAVATAR,
default_sending_stream: Optional[Stream]=None,
default_events_register_stream: Optional[Stream]=None,
default_all_public_streams: bool=None,
@ -756,7 +756,7 @@ def do_deactivate_stream(stream: Stream, log: bool=True) -> None:
streams=[stream_dict])
send_event(event, affected_user_ids)
def do_change_user_email(user_profile: UserProfile, new_email: Text) -> None:
def do_change_user_email(user_profile: UserProfile, new_email: str) -> None:
delete_user_profile_caches([user_profile])
user_profile.email = new_email
@ -771,7 +771,7 @@ def do_change_user_email(user_profile: UserProfile, new_email: Text) -> None:
modified_user=user_profile, event_type='user_email_changed',
event_time=event_time)
def do_start_email_change_process(user_profile: UserProfile, new_email: Text) -> None:
def do_start_email_change_process(user_profile: UserProfile, new_email: str) -> None:
old_email = user_profile.email
user_profile.email = new_email
obj = EmailChangeStatus.objects.create(new_email=new_email, old_email=old_email,
@ -817,8 +817,8 @@ def compute_mit_user_fullname(email: NonBinaryStr) -> NonBinaryStr:
@cache_with_key(lambda realm, email, f: user_profile_by_email_cache_key(email),
timeout=3600*24*7)
def create_mirror_user_if_needed(realm: Realm, email: Text,
email_to_fullname: Callable[[Text], Text]) -> UserProfile:
def create_mirror_user_if_needed(realm: Realm, email: str,
email_to_fullname: Callable[[str], str]) -> UserProfile:
try:
return get_user(email, realm)
except UserProfile.DoesNotExist:
@ -841,11 +841,11 @@ def send_welcome_bot_response(message: MutableMapping[str, Any]) -> None:
"skills. Or, try clicking on some of the stream names to your left!")
def render_incoming_message(message: Message,
content: Text,
content: str,
user_ids: Set[int],
realm: Realm,
mention_data: Optional[bugdown.MentionData]=None,
email_gateway: Optional[bool]=False) -> Text:
email_gateway: Optional[bool]=False) -> str:
realm_alert_words = alert_words_in_realm(realm)
try:
rendered_content = render_markdown(
@ -1127,7 +1127,7 @@ def do_send_messages(messages_maybe_none: Sequence[Optional[MutableMapping[str,
new_messages.append(message)
messages = new_messages
links_for_embed = set() # type: Set[Text]
links_for_embed = set() # type: Set[str]
# For consistency, changes to the default values for these gets should also be applied
# to the default args in do_send_message
for message in messages:
@ -1425,7 +1425,7 @@ def bulk_insert_ums(ums: List[UserMessageLite]) -> None:
cursor.execute(query)
def notify_reaction_update(user_profile: UserProfile, message: Message,
reaction: Reaction, op: Text) -> None:
reaction: Reaction, op: str) -> None:
user_dict = {'user_id': user_profile.id,
'email': user_profile.email,
'full_name': user_profile.full_name}
@ -1455,7 +1455,7 @@ def notify_reaction_update(user_profile: UserProfile, message: Message,
ums = UserMessage.objects.filter(message=message.id)
send_event(event, [um.user_profile_id for um in ums])
def do_add_reaction_legacy(user_profile: UserProfile, message: Message, emoji_name: Text) -> None:
def do_add_reaction_legacy(user_profile: UserProfile, message: Message, emoji_name: str) -> None:
(emoji_code, reaction_type) = emoji_name_to_emoji_code(user_profile.realm, emoji_name)
reaction = Reaction(user_profile=user_profile, message=message,
emoji_name=emoji_name, emoji_code=emoji_code,
@ -1463,7 +1463,7 @@ def do_add_reaction_legacy(user_profile: UserProfile, message: Message, emoji_na
reaction.save()
notify_reaction_update(user_profile, message, reaction, "add")
def do_remove_reaction_legacy(user_profile: UserProfile, message: Message, emoji_name: Text) -> None:
def do_remove_reaction_legacy(user_profile: UserProfile, message: Message, emoji_name: str) -> None:
reaction = Reaction.objects.filter(user_profile=user_profile,
message=message,
emoji_name=emoji_name).get()
@ -1471,7 +1471,7 @@ def do_remove_reaction_legacy(user_profile: UserProfile, message: Message, emoji
notify_reaction_update(user_profile, message, reaction, "remove")
def do_add_reaction(user_profile: UserProfile, message: Message,
emoji_name: Text, emoji_code: Text, reaction_type: Text) -> None:
emoji_name: str, emoji_code: str, reaction_type: str) -> None:
reaction = Reaction(user_profile=user_profile, message=message,
emoji_name=emoji_name, emoji_code=emoji_code,
reaction_type=reaction_type)
@ -1479,7 +1479,7 @@ def do_add_reaction(user_profile: UserProfile, message: Message,
notify_reaction_update(user_profile, message, reaction, "add")
def do_remove_reaction(user_profile: UserProfile, message: Message,
emoji_code: Text, reaction_type: Text) -> None:
emoji_code: str, reaction_type: str) -> None:
reaction = Reaction.objects.filter(user_profile=user_profile,
message=message,
emoji_code=emoji_code,
@ -1506,16 +1506,16 @@ def do_send_typing_notification(notification: Dict[str, Any]) -> None:
# check_send_typing_notification:
# Checks the typing notification and sends it
def check_send_typing_notification(sender: UserProfile, notification_to: Sequence[Text],
operator: Text) -> None:
def check_send_typing_notification(sender: UserProfile, notification_to: Sequence[str],
operator: str) -> None:
typing_notification = check_typing_notification(sender, notification_to, operator)
do_send_typing_notification(typing_notification)
# check_typing_notification:
# Returns typing notification ready for sending with do_send_typing_notification on success
# or the error message (string) on error.
def check_typing_notification(sender: UserProfile, notification_to: Sequence[Text],
operator: Text) -> Dict[str, Any]:
def check_typing_notification(sender: UserProfile, notification_to: Sequence[str],
operator: str) -> Dict[str, Any]:
if len(notification_to) == 0:
raise JsonableError(_('Missing parameter: \'to\' (recipient)'))
elif operator not in ('start', 'stop'):
@ -1531,7 +1531,7 @@ def check_typing_notification(sender: UserProfile, notification_to: Sequence[Tex
raise ValueError('Forbidden recipient type')
return {'sender': sender, 'recipient': recipient, 'op': operator}
def stream_welcome_message(stream: Stream) -> Text:
def stream_welcome_message(stream: Stream) -> str:
content = _('Welcome to #**%s**.') % (stream.name,)
if stream.description:
@ -1584,11 +1584,11 @@ def get_default_value_for_history_public_to_subscribers(
return history_public_to_subscribers
def create_stream_if_needed(realm: Realm,
stream_name: Text,
stream_name: str,
*,
invite_only: bool=False,
history_public_to_subscribers: Optional[bool]=None,
stream_description: Text="") -> Tuple[Stream, bool]:
stream_description: str="") -> Tuple[Stream, bool]:
history_public_to_subscribers = get_default_value_for_history_public_to_subscribers(
realm, invite_only, history_public_to_subscribers)
@ -1615,9 +1615,9 @@ def create_stream_if_needed(realm: Realm,
return stream, created
def ensure_stream(realm: Realm,
stream_name: Text,
stream_name: str,
invite_only: bool=False,
stream_description: Text="") -> Stream:
stream_description: str="") -> Stream:
return create_stream_if_needed(realm, stream_name,
invite_only=invite_only,
stream_description=stream_description)[0]
@ -1694,7 +1694,7 @@ def validate_recipient_user_profiles(user_profiles: List[UserProfile],
return recipient_profile_ids
def recipient_for_emails(emails: Iterable[Text], not_forged_mirror_message: bool,
def recipient_for_emails(emails: Iterable[str], not_forged_mirror_message: bool,
forwarder_user_profile: Optional[UserProfile],
sender: UserProfile) -> Recipient:
@ -1738,7 +1738,7 @@ def already_sent_mirrored_message_id(message: Message) -> Optional[int]:
return messages[0].id
return None
def extract_recipients(s: Union[str, Iterable[Text]]) -> List[Text]:
def extract_recipients(s: Union[str, Iterable[str]]) -> List[str]:
# We try to accept multiple incoming formats for recipients.
# See test_extract_recipients() for examples of what we allow.
try:
@ -1759,15 +1759,15 @@ def extract_recipients(s: Union[str, Iterable[Text]]) -> List[Text]:
recipients = [recipient.strip() for recipient in recipients]
return list(set(recipient for recipient in recipients if recipient))
def check_send_stream_message(sender: UserProfile, client: Client, stream_name: Text,
topic: Text, body: Text) -> int:
def check_send_stream_message(sender: UserProfile, client: Client, stream_name: str,
topic: str, body: str) -> int:
addressee = Addressee.for_stream(stream_name, topic)
message = check_message(sender, client, addressee, body)
return do_send_messages([message])[0]
def check_send_private_message(sender: UserProfile, client: Client,
receiving_user: UserProfile, body: Text) -> int:
receiving_user: UserProfile, body: str) -> int:
addressee = Addressee.for_user_profile(receiving_user)
message = check_message(sender, client, addressee, body)
@ -1775,13 +1775,13 @@ def check_send_private_message(sender: UserProfile, client: Client,
# check_send_message:
# Returns the id of the sent message. Has same argspec as check_message.
def check_send_message(sender: UserProfile, client: Client, message_type_name: Text,
message_to: Sequence[Text], topic_name: Optional[Text],
message_content: Text, realm: Optional[Realm]=None,
def check_send_message(sender: UserProfile, client: Client, message_type_name: str,
message_to: Sequence[str], topic_name: Optional[str],
message_content: str, realm: Optional[Realm]=None,
forged: bool=False, forged_timestamp: Optional[float]=None,
forwarder_user_profile: Optional[UserProfile]=None,
local_id: Optional[Text]=None,
sender_queue_id: Optional[Text]=None) -> int:
local_id: Optional[str]=None,
sender_queue_id: Optional[str]=None) -> int:
addressee = Addressee.legacy_build(
sender,
@ -1795,9 +1795,9 @@ def check_send_message(sender: UserProfile, client: Client, message_type_name: T
return do_send_messages([message])[0]
def check_schedule_message(sender: UserProfile, client: Client,
message_type_name: Text, message_to: Sequence[Text],
topic_name: Optional[Text], message_content: Text,
delivery_type: Text, deliver_at: datetime.datetime,
message_type_name: str, message_to: Sequence[str],
topic_name: Optional[str], message_content: str,
delivery_type: str, deliver_at: datetime.datetime,
realm: Optional[Realm]=None,
forwarder_user_profile: Optional[UserProfile]=None
) -> int:
@ -1814,7 +1814,7 @@ def check_schedule_message(sender: UserProfile, client: Client,
message['delivery_type'] = delivery_type
return do_schedule_messages([message])[0]
def check_stream_name(stream_name: Text) -> None:
def check_stream_name(stream_name: str) -> None:
if stream_name.strip() == "":
raise JsonableError(_("Invalid stream name '%s'" % (stream_name)))
if len(stream_name) > Stream.MAX_NAME_LENGTH:
@ -1823,7 +1823,7 @@ def check_stream_name(stream_name: Text) -> None:
if ord(i) == 0:
raise JsonableError(_("Stream name '%s' contains NULL (0x00) characters." % (stream_name)))
def check_default_stream_group_name(group_name: Text) -> None:
def check_default_stream_group_name(group_name: str) -> None:
if group_name.strip() == "":
raise JsonableError(_("Invalid default stream group name '%s'" % (group_name)))
if len(group_name) > DefaultStreamGroup.MAX_NAME_LENGTH:
@ -1836,7 +1836,7 @@ def check_default_stream_group_name(group_name: Text) -> None:
def send_rate_limited_pm_notification_to_bot_owner(sender: UserProfile,
realm: Realm,
content: Text) -> None:
content: str) -> None:
"""
Sends a PM error notification to a bot's owner if one hasn't already
been sent in the last 5 minutes.
@ -1871,7 +1871,7 @@ def send_rate_limited_pm_notification_to_bot_owner(sender: UserProfile,
def send_pm_if_empty_stream(sender: UserProfile,
stream: Optional[Stream],
stream_name: Text,
stream_name: str,
realm: Realm) -> None:
"""If a bot sends a message to a stream that doesn't exist or has no
subscribers, sends a notification to the bot owner (if not a
@ -1900,11 +1900,11 @@ def send_pm_if_empty_stream(sender: UserProfile,
# check_message:
# Returns message ready for sending with do_send_message on success or the error message (string) on error.
def check_message(sender: UserProfile, client: Client, addressee: Addressee,
message_content_raw: Text, realm: Optional[Realm]=None, forged: bool=False,
message_content_raw: str, realm: Optional[Realm]=None, forged: bool=False,
forged_timestamp: Optional[float]=None,
forwarder_user_profile: Optional[UserProfile]=None,
local_id: Optional[Text]=None,
sender_queue_id: Optional[Text]=None) -> Dict[str, Any]:
local_id: Optional[str]=None,
sender_queue_id: Optional[str]=None) -> Dict[str, Any]:
stream = None
message_content = message_content_raw.rstrip()
@ -2005,7 +2005,7 @@ def check_message(sender: UserProfile, client: Client, addressee: Addressee,
def _internal_prep_message(realm: Realm,
sender: UserProfile,
addressee: Addressee,
content: Text) -> Optional[Dict[str, Any]]:
content: str) -> Optional[Dict[str, Any]]:
"""
Create a message object and checks it, but doesn't send it or save it to the database.
The internal function that calls this can therefore batch send a bunch of created
@ -2031,8 +2031,8 @@ def _internal_prep_message(realm: Realm,
return None
def internal_prep_stream_message(realm: Realm, sender: UserProfile,
stream_name: Text, topic: Text,
content: Text) -> Optional[Dict[str, Any]]:
stream_name: str, topic: str,
content: str) -> Optional[Dict[str, Any]]:
"""
See _internal_prep_message for details of how this works.
"""
@ -2048,7 +2048,7 @@ def internal_prep_stream_message(realm: Realm, sender: UserProfile,
def internal_prep_private_message(realm: Realm,
sender: UserProfile,
recipient_user: UserProfile,
content: Text) -> Optional[Dict[str, Any]]:
content: str) -> Optional[Dict[str, Any]]:
"""
See _internal_prep_message for details of how this works.
"""
@ -2061,8 +2061,8 @@ def internal_prep_private_message(realm: Realm,
content=content,
)
def internal_send_message(realm: Realm, sender_email: Text, recipient_type_name: str,
recipients: Text, topic_name: Text, content: Text,
def internal_send_message(realm: Realm, sender_email: str, recipient_type_name: str,
recipients: str, topic_name: str, content: str,
email_gateway: Optional[bool]=False) -> None:
"""internal_send_message should only be used where `sender_email` is a
system bot."""
@ -2094,7 +2094,7 @@ def internal_send_message(realm: Realm, sender_email: Text, recipient_type_name:
def internal_send_private_message(realm: Realm,
sender: UserProfile,
recipient_user: UserProfile,
content: Text) -> None:
content: str) -> None:
message = internal_prep_private_message(realm, sender, recipient_user, content)
if message is None:
return
@ -2120,11 +2120,11 @@ def internal_send_huddle_message(realm: Realm, sender: UserProfile, emails: List
return
do_send_messages([message])
def pick_color(user_profile: UserProfile) -> Text:
def pick_color(user_profile: UserProfile) -> str:
subs = get_stream_subscriptions_for_user(user_profile).filter(active=True)
return pick_color_helper(user_profile, subs)
def pick_color_helper(user_profile: UserProfile, subs: Iterable[Subscription]) -> Text:
def pick_color_helper(user_profile: UserProfile, subs: Iterable[Subscription]) -> str:
# These colors are shared with the palette in subs.js.
used_colors = [sub.color for sub in subs if sub.active]
available_colors = [s for s in STREAM_ASSIGNMENT_COLORS if s not in used_colors]
@ -2266,12 +2266,12 @@ def get_subscribers(stream: Stream,
return [subscription.user_profile for subscription in subscriptions]
def get_subscriber_emails(stream: Stream,
requesting_user: Optional[UserProfile]=None) -> List[Text]:
requesting_user: Optional[UserProfile]=None) -> List[str]:
subscriptions_query = get_subscribers_query(stream, requesting_user)
subscriptions = subscriptions_query.values('user_profile__email')
return [subscription['user_profile__email'] for subscription in subscriptions]
def maybe_get_subscriber_emails(stream: Stream, user_profile: UserProfile) -> List[Text]:
def maybe_get_subscriber_emails(stream: Stream, user_profile: UserProfile) -> List[str]:
""" Alternate version of get_subscriber_emails that takes a Stream object only
(not a name), and simply returns an empty list if unable to get a real
subscriber list (because we're on the MIT realm). """
@ -2675,7 +2675,7 @@ def bulk_remove_subscriptions(users: Iterable[UserProfile],
not_subscribed,
)
def log_subscription_property_change(user_email: Text, stream_name: Text, property: Text,
def log_subscription_property_change(user_email: str, stream_name: str, property: str,
value: Any) -> None:
event = {'type': 'subscription_property',
'property': property,
@ -2685,7 +2685,7 @@ def log_subscription_property_change(user_email: Text, stream_name: Text, proper
log_event(event)
def do_change_subscription_property(user_profile: UserProfile, sub: Subscription,
stream: Stream, property_name: Text, value: Any
stream: Stream, property_name: str, value: Any
) -> None:
setattr(sub, property_name, value)
sub.save(update_fields=[property_name])
@ -2701,7 +2701,7 @@ def do_change_subscription_property(user_profile: UserProfile, sub: Subscription
name=stream.name)
send_event(event, [user_profile.id])
def do_change_password(user_profile: UserProfile, password: Text, commit: bool=True,
def do_change_password(user_profile: UserProfile, password: str, commit: bool=True,
hashed_password: bool=False) -> None:
if hashed_password:
# This is a hashed password, not the password itself.
@ -2715,7 +2715,7 @@ def do_change_password(user_profile: UserProfile, password: Text, commit: bool=T
modified_user=user_profile, event_type='user_change_password',
event_time=event_time)
def do_change_full_name(user_profile: UserProfile, full_name: Text,
def do_change_full_name(user_profile: UserProfile, full_name: str,
acting_user: Optional[UserProfile]) -> None:
old_name = user_profile.full_name
user_profile.full_name = full_name
@ -2733,8 +2733,8 @@ def do_change_full_name(user_profile: UserProfile, full_name: Text,
send_event(dict(type='realm_bot', op='update', bot=payload),
bot_owner_user_ids(user_profile))
def check_change_full_name(user_profile: UserProfile, full_name_raw: Text,
acting_user: UserProfile) -> Text:
def check_change_full_name(user_profile: UserProfile, full_name_raw: str,
acting_user: UserProfile) -> str:
"""Verifies that the user's proposed full name is valid. The caller
is responsible for checking check permissions. Returns the new
full name, which may differ from what was passed in (because this
@ -2788,7 +2788,7 @@ def do_change_bot_owner(user_profile: UserProfile, bot_owner: UserProfile,
)),
update_users)
def do_change_tos_version(user_profile: UserProfile, tos_version: Text) -> None:
def do_change_tos_version(user_profile: UserProfile, tos_version: str) -> None:
user_profile.tos_version = tos_version
user_profile.save(update_fields=["tos_version"])
event_time = timezone_now()
@ -2813,7 +2813,7 @@ def do_regenerate_api_key(user_profile: UserProfile, acting_user: UserProfile) -
)),
bot_owner_user_ids(user_profile))
def do_change_avatar_fields(user_profile: UserProfile, avatar_source: Text) -> None:
def do_change_avatar_fields(user_profile: UserProfile, avatar_source: str) -> None:
user_profile.avatar_source = avatar_source
user_profile.avatar_version += 1
user_profile.save(update_fields=["avatar_source", "avatar_version"])
@ -2846,7 +2846,7 @@ def do_change_avatar_fields(user_profile: UserProfile, avatar_source: Text) -> N
active_user_ids(user_profile.realm_id))
def do_change_icon_source(realm: Realm, icon_source: Text, log: bool=True) -> None:
def do_change_icon_source(realm: Realm, icon_source: str, log: bool=True) -> None:
realm.icon_source = icon_source
realm.icon_version += 1
realm.save(update_fields=["icon_source", "icon_version"])
@ -2885,7 +2885,7 @@ def do_change_default_sending_stream(user_profile: UserProfile, stream: Optional
'stream': str(stream)})
if user_profile.is_bot:
if stream:
stream_name = stream.name # type: Optional[Text]
stream_name = stream.name # type: Optional[str]
else:
stream_name = None
send_event(dict(type='realm_bot',
@ -2909,7 +2909,7 @@ def do_change_default_events_register_stream(user_profile: UserProfile,
'stream': str(stream)})
if user_profile.is_bot:
if stream:
stream_name = stream.name # type: Optional[Text]
stream_name = stream.name # type: Optional[str]
else:
stream_name = None
send_event(dict(type='realm_bot',
@ -2974,7 +2974,7 @@ def do_change_stream_web_public(stream: Stream, is_web_public: bool) -> None:
stream.is_web_public = is_web_public
stream.save(update_fields=['is_web_public'])
def do_rename_stream(stream: Stream, new_name: Text, log: bool=True) -> Dict[str, Text]:
def do_rename_stream(stream: Stream, new_name: str, log: bool=True) -> Dict[str, str]:
old_name = stream.name
stream.name = new_name
stream.save(update_fields=["name"])
@ -3030,7 +3030,7 @@ def do_rename_stream(stream: Stream, new_name: Text, log: bool=True) -> Dict[str
# email forwarding address to display the correctly-escaped new name.
return {"email_address": new_email}
def do_change_stream_description(stream: Stream, new_description: Text) -> None:
def do_change_stream_description(stream: Stream, new_description: str) -> None:
stream.description = new_description
stream.save(update_fields=['description'])
@ -3044,7 +3044,7 @@ def do_change_stream_description(stream: Stream, new_description: Text) -> None:
)
send_event(event, can_access_stream_user_ids(stream))
def do_create_realm(string_id: Text, name: Text, restricted_to_domain: Optional[bool]=None,
def do_create_realm(string_id: str, name: str, restricted_to_domain: Optional[bool]=None,
invite_required: Optional[bool]=None, org_type: Optional[int]=None
) -> Realm:
existing_realm = get_realm(string_id)
@ -3119,7 +3119,7 @@ def do_change_enter_sends(user_profile: UserProfile, enter_sends: bool) -> None:
def do_set_user_display_setting(user_profile: UserProfile,
setting_name: str,
setting_value: Union[bool, Text]) -> None:
setting_value: Union[bool, str]) -> None:
property_type = UserProfile.property_types[setting_name]
assert isinstance(setting_value, property_type)
setattr(user_profile, setting_name, setting_value)
@ -3154,7 +3154,7 @@ def lookup_default_stream_groups(default_stream_group_names: List[str],
default_stream_groups.append(default_stream_group)
return default_stream_groups
def set_default_streams(realm: Realm, stream_dict: Dict[Text, Dict[Text, Any]]) -> None:
def set_default_streams(realm: Realm, stream_dict: Dict[str, Dict[str, Any]]) -> None:
DefaultStream.objects.filter(realm=realm).delete()
stream_names = []
for name, options in stream_dict.items():
@ -3200,8 +3200,8 @@ def do_remove_default_stream(stream: Stream) -> None:
DefaultStream.objects.filter(realm_id=realm_id, stream_id=stream_id).delete()
notify_default_streams(realm_id)
def do_create_default_stream_group(realm: Realm, group_name: Text,
description: Text, streams: List[Stream]) -> None:
def do_create_default_stream_group(realm: Realm, group_name: str,
description: str, streams: List[Stream]) -> None:
default_streams = get_default_streams_for_realm(realm.id)
for stream in streams:
if stream in default_streams:
@ -3249,7 +3249,7 @@ def do_remove_streams_from_default_stream_group(realm: Realm, group: DefaultStre
notify_default_stream_groups(realm)
def do_change_default_stream_group_name(realm: Realm, group: DefaultStreamGroup,
new_group_name: Text) -> None:
new_group_name: str) -> None:
if group.name == new_group_name:
raise JsonableError(_("This default stream group is already named '%s'") % (new_group_name,))
@ -3261,7 +3261,7 @@ def do_change_default_stream_group_name(realm: Realm, group: DefaultStreamGroup,
notify_default_stream_groups(realm)
def do_change_default_stream_group_description(realm: Realm, group: DefaultStreamGroup,
new_description: Text) -> None:
new_description: str) -> None:
group.description = new_description
group.save()
notify_default_stream_groups(realm)
@ -3315,7 +3315,7 @@ def do_update_user_activity_interval(user_profile: UserProfile,
@statsd_increment('user_activity')
def do_update_user_activity(user_profile: UserProfile,
client: Client,
query: Text,
query: str,
log_time: datetime.datetime) -> None:
(activity, created) = UserActivity.objects.get_or_create(
user_profile = user_profile,
@ -3456,7 +3456,7 @@ def do_mark_all_as_read(user_profile: UserProfile) -> int:
def do_mark_stream_messages_as_read(user_profile: UserProfile,
stream: Optional[Stream],
topic_name: Optional[Text]=None) -> int:
topic_name: Optional[str]=None) -> int:
log_statsd_event('mark_stream_as_read')
msgs = UserMessage.objects.filter(
@ -3492,8 +3492,8 @@ def do_mark_stream_messages_as_read(user_profile: UserProfile,
return count
def do_update_message_flags(user_profile: UserProfile,
operation: Text,
flag: Text,
operation: str,
flag: str,
messages: Optional[Sequence[int]]) -> int:
flagattr = getattr(UserMessage.flags, flag)
@ -3545,15 +3545,15 @@ def subscribed_to_stream(user_profile: UserProfile, stream_id: int) -> bool:
except Subscription.DoesNotExist:
return False
def truncate_content(content: Text, max_length: int, truncation_message: Text) -> Text:
def truncate_content(content: str, max_length: int, truncation_message: str) -> str:
if len(content) > max_length:
content = content[:max_length - len(truncation_message)] + truncation_message
return content
def truncate_body(body: Text) -> Text:
def truncate_body(body: str) -> str:
return truncate_content(body, MAX_MESSAGE_LENGTH, "...")
def truncate_topic(topic: Text) -> Text:
def truncate_topic(topic: str) -> str:
return truncate_content(topic, MAX_SUBJECT_LENGTH, "...")
MessageUpdateUserInfoResult = TypedDict('MessageUpdateUserInfoResult', {
@ -3638,8 +3638,8 @@ def update_to_dict_cache(changed_messages: List[Message]) -> List[int]:
@transaction.atomic
def do_update_embedded_data(user_profile: UserProfile,
message: Message,
content: Optional[Text],
rendered_content: Optional[Text]) -> None:
content: Optional[str],
rendered_content: Optional[str]) -> None:
event = {
'type': 'update_message',
'sender': user_profile.email,
@ -3669,9 +3669,9 @@ def do_update_embedded_data(user_profile: UserProfile,
# We use transaction.atomic to support select_for_update in the attachment codepath.
@transaction.atomic
def do_update_message(user_profile: UserProfile, message: Message, topic_name: Optional[Text],
propagate_mode: str, content: Optional[Text],
rendered_content: Optional[Text], prior_mention_user_ids: Set[int],
def do_update_message(user_profile: UserProfile, message: Message, topic_name: Optional[str],
propagate_mode: str, content: Optional[str],
rendered_content: Optional[str], prior_mention_user_ids: Set[int],
mention_user_ids: Set[int]) -> int:
event = {'type': 'update_message',
# TODO: We probably want to remove the 'sender' field
@ -3867,10 +3867,10 @@ def get_average_weekly_stream_traffic(stream_id: int, stream_date_created: datet
def is_old_stream(stream_date_created: datetime.datetime) -> bool:
return (datetime.date.today() - stream_date_created.date()).days >= 7
def encode_email_address(stream: Stream) -> Text:
def encode_email_address(stream: Stream) -> str:
return encode_email_address_helper(stream.name, stream.email_token)
def encode_email_address_helper(name: Text, email_token: Text) -> Text:
def encode_email_address_helper(name: str, email_token: str) -> str:
# Some deployments may not use the email gateway
if settings.EMAIL_GATEWAY_PATTERN == '':
return ''
@ -3887,7 +3887,7 @@ def encode_email_address_helper(name: Text, email_token: Text) -> Text:
encoded_token = "%s+%s" % (encoded_name, email_token)
return settings.EMAIL_GATEWAY_PATTERN % (encoded_token,)
def get_email_gateway_message_string_from_address(address: Text) -> Optional[Text]:
def get_email_gateway_message_string_from_address(address: str) -> Optional[str]:
pattern_parts = [re.escape(part) for part in settings.EMAIL_GATEWAY_PATTERN.split('%s')]
if settings.EMAIL_GATEWAY_EXTRA_PATTERN_HACK:
# Accept mails delivered to any Zulip server
@ -3902,7 +3902,7 @@ def get_email_gateway_message_string_from_address(address: Text) -> Optional[Tex
return msg_string
def decode_email_address(email: Text) -> Optional[Tuple[Text, Text]]:
def decode_email_address(email: str) -> Optional[Tuple[str, str]]:
# Perform the reverse of encode_email_address. Returns a tuple of (streamname, email_token)
msg_string = get_email_gateway_message_string_from_address(email)
@ -4135,7 +4135,7 @@ def filter_presence_idle_user_ids(user_ids: Set[int]) -> List[int]:
idle_user_ids = user_ids - active_user_ids
return sorted(list(idle_user_ids))
def get_status_dict(requesting_user_profile: UserProfile) -> Dict[Text, Dict[Text, Dict[str, Any]]]:
def get_status_dict(requesting_user_profile: UserProfile) -> Dict[str, Dict[str, Dict[str, Any]]]:
if requesting_user_profile.realm.presence_disabled:
# Return an empty dict if presence is disabled in this realm
return defaultdict(dict)
@ -4169,11 +4169,11 @@ def do_send_confirmation_email(invitee: PreregistrationUser,
send_email('zerver/emails/invitation', to_email=invitee.email, from_name=from_name,
from_address=FromAddress.NOREPLY, context=context)
def email_not_system_bot(email: Text) -> None:
def email_not_system_bot(email: str) -> None:
if is_cross_realm_bot_email(email):
raise ValidationError('%s is an email address reserved for system bots' % (email,))
def validate_email_for_realm(target_realm: Realm, email: Text) -> None:
def validate_email_for_realm(target_realm: Realm, email: str) -> None:
email_not_system_bot(email)
try:
@ -4189,7 +4189,7 @@ def validate_email_for_realm(target_realm: Realm, email: Text) -> None:
# Other users should not already exist at all.
raise ValidationError('%s already has an account' % (email,))
def validate_email(user_profile: UserProfile, email: Text) -> Tuple[Optional[str], Optional[str]]:
def validate_email(user_profile: UserProfile, email: str) -> Tuple[Optional[str], Optional[str]]:
try:
validators.validate_email(email)
except ValidationError:
@ -4213,9 +4213,9 @@ class InvitationError(JsonableError):
code = ErrorCode.INVITATION_FAILED
data_fields = ['errors', 'sent_invitations']
def __init__(self, msg: Text, errors: List[Tuple[Text, str]], sent_invitations: bool) -> None:
self._msg = msg # type: Text
self.errors = errors # type: List[Tuple[Text, str]]
def __init__(self, msg: str, errors: List[Tuple[str, str]], sent_invitations: bool) -> None:
self._msg = msg # type: str
self.errors = errors # type: List[Tuple[str, str]]
self.sent_invitations = sent_invitations # type: bool
def estimate_recent_invites(realms: Iterable[Realm], *, days: int) -> int:
@ -4270,9 +4270,9 @@ def do_invite_users(user_profile: UserProfile,
"Ask an organization admin, or a more experienced user."),
[], sent_invitations=False)
validated_emails = [] # type: List[Text]
errors = [] # type: List[Tuple[Text, str]]
skipped = [] # type: List[Tuple[Text, str]]
validated_emails = [] # type: List[str]
errors = [] # type: List[Tuple[str, str]]
skipped = [] # type: List[Tuple[str, str]]
for email in invitee_emails:
if email == '':
continue
@ -4383,7 +4383,7 @@ def notify_realm_emoji(realm: Realm) -> None:
send_event(event, active_user_ids(realm.id))
def check_add_realm_emoji(realm: Realm,
name: Text,
name: str,
author: UserProfile,
image_file: File) -> Optional[RealmEmoji]:
realm_emoji = RealmEmoji(realm=realm, name=name, author=author)
@ -4405,25 +4405,25 @@ def check_add_realm_emoji(realm: Realm,
notify_realm_emoji(realm_emoji.realm)
return realm_emoji
def do_remove_realm_emoji(realm: Realm, name: Text) -> None:
def do_remove_realm_emoji(realm: Realm, name: str) -> None:
emoji = RealmEmoji.objects.get(realm=realm, name=name, deactivated=False)
emoji.deactivated = True
emoji.save(update_fields=['deactivated'])
notify_realm_emoji(realm)
def notify_alert_words(user_profile: UserProfile, words: Iterable[Text]) -> None:
def notify_alert_words(user_profile: UserProfile, words: Iterable[str]) -> None:
event = dict(type="alert_words", alert_words=words)
send_event(event, [user_profile.id])
def do_add_alert_words(user_profile: UserProfile, alert_words: Iterable[Text]) -> None:
def do_add_alert_words(user_profile: UserProfile, alert_words: Iterable[str]) -> None:
words = add_user_alert_words(user_profile, alert_words)
notify_alert_words(user_profile, words)
def do_remove_alert_words(user_profile: UserProfile, alert_words: Iterable[Text]) -> None:
def do_remove_alert_words(user_profile: UserProfile, alert_words: Iterable[str]) -> None:
words = remove_user_alert_words(user_profile, alert_words)
notify_alert_words(user_profile, words)
def do_set_alert_words(user_profile: UserProfile, alert_words: List[Text]) -> None:
def do_set_alert_words(user_profile: UserProfile, alert_words: List[str]) -> None:
set_user_alert_words(user_profile, alert_words)
notify_alert_words(user_profile, alert_words)
@ -4451,7 +4451,7 @@ def notify_realm_filters(realm: Realm) -> None:
# RegExp syntax. In addition to JS-compatible syntax, the following features are available:
# * Named groups will be converted to numbered groups automatically
# * Inline-regex flags will be stripped, and where possible translated to RegExp-wide flags
def do_add_realm_filter(realm: Realm, pattern: Text, url_format_string: Text) -> int:
def do_add_realm_filter(realm: Realm, pattern: str, url_format_string: str) -> int:
pattern = pattern.strip()
url_format_string = url_format_string.strip()
realm_filter = RealmFilter(
@ -4463,7 +4463,7 @@ def do_add_realm_filter(realm: Realm, pattern: Text, url_format_string: Text) ->
return realm_filter.id
def do_remove_realm_filter(realm: Realm, pattern: Optional[Text]=None,
def do_remove_realm_filter(realm: Realm, pattern: Optional[str]=None,
id: Optional[int]=None) -> None:
if pattern is not None:
RealmFilter.objects.get(realm=realm, pattern=pattern).delete()
@ -4471,11 +4471,11 @@ def do_remove_realm_filter(realm: Realm, pattern: Optional[Text]=None,
RealmFilter.objects.get(realm=realm, pk=id).delete()
notify_realm_filters(realm)
def get_emails_from_user_ids(user_ids: Sequence[int]) -> Dict[int, Text]:
def get_emails_from_user_ids(user_ids: Sequence[int]) -> Dict[int, str]:
# We may eventually use memcached to speed this up, but the DB is fast.
return UserProfile.emails_from_ids(user_ids)
def do_add_realm_domain(realm: Realm, domain: Text, allow_subdomains: bool) -> (RealmDomain):
def do_add_realm_domain(realm: Realm, domain: str, allow_subdomains: bool) -> (RealmDomain):
realm_domain = RealmDomain.objects.create(realm=realm, domain=domain,
allow_subdomains=allow_subdomains)
event = dict(type="realm_domains", op="add",
@ -4605,7 +4605,7 @@ def do_delete_old_unclaimed_attachments(weeks_ago: int) -> None:
delete_message_image(attachment.path_id)
attachment.delete()
def check_attachment_reference_change(prev_content: Text, message: Message) -> None:
def check_attachment_reference_change(prev_content: str, message: Message) -> None:
new_content = message.content
prev_attachments = set(attachment_url_re.findall(prev_content))
new_attachments = set(attachment_url_re.findall(new_content))
@ -4630,8 +4630,8 @@ def notify_realm_custom_profile_fields(realm: Realm, operation: str) -> None:
fields=[f.as_dict() for f in fields])
send_event(event, active_user_ids(realm.id))
def try_add_realm_custom_profile_field(realm: Realm, name: Text, field_type: int,
hint: Text='',
def try_add_realm_custom_profile_field(realm: Realm, name: str, field_type: int,
hint: str='',
field_data: ProfileFieldData=None) -> CustomProfileField:
field = CustomProfileField(realm=realm, name=name, field_type=field_type)
field.hint = hint
@ -4653,7 +4653,7 @@ def do_remove_realm_custom_profile_field(realm: Realm, field: CustomProfileField
notify_realm_custom_profile_fields(realm, 'delete')
def try_update_realm_custom_profile_field(realm: Realm, field: CustomProfileField,
name: Text, hint: Text='',
name: str, hint: str='',
field_data: ProfileFieldData=None) -> None:
field.name = name
field.hint = hint
@ -4674,7 +4674,7 @@ def try_reorder_realm_custom_profile_fields(realm: Realm, order: List[int]) -> N
notify_realm_custom_profile_fields(realm, 'update')
def do_update_user_custom_profile_data(user_profile: UserProfile,
data: List[Dict[str, Union[int, Text]]]) -> None:
data: List[Dict[str, Union[int, str]]]) -> None:
with transaction.atomic():
update_or_create = CustomProfileFieldValue.objects.update_or_create
for field in data:
@ -4697,8 +4697,8 @@ def do_send_create_user_group_event(user_group: UserGroup, members: List[UserPro
)
send_event(event, active_user_ids(user_group.realm_id))
def check_add_user_group(realm: Realm, name: Text, initial_members: List[UserProfile],
description: Text) -> None:
def check_add_user_group(realm: Realm, name: str, initial_members: List[UserProfile],
description: str) -> None:
try:
user_group = create_user_group(name, initial_members, realm, description=description)
do_send_create_user_group_event(user_group, initial_members)
@ -4709,19 +4709,19 @@ def do_send_user_group_update_event(user_group: UserGroup, data: Dict[str, Any])
event = dict(type="user_group", op='update', group_id=user_group.id, data=data)
send_event(event, active_user_ids(user_group.realm_id))
def do_update_user_group_name(user_group: UserGroup, name: Text) -> None:
def do_update_user_group_name(user_group: UserGroup, name: str) -> None:
user_group.name = name
user_group.save(update_fields=['name'])
do_send_user_group_update_event(user_group, dict(name=name))
def do_update_user_group_description(user_group: UserGroup, description: Text) -> None:
def do_update_user_group_description(user_group: UserGroup, description: str) -> None:
user_group.description = description
user_group.save(update_fields=['description'])
do_send_user_group_update_event(user_group, dict(description=description))
def do_update_outgoing_webhook_service(bot_profile: UserProfile,
service_interface: int,
service_payload_url: Text) -> None:
service_payload_url: str) -> None:
# TODO: First service is chosen because currently one bot can only have one service.
# Update this once multiple services are supported.
service = get_bot_services(bot_profile.id)[0]
@ -4739,7 +4739,7 @@ def do_update_outgoing_webhook_service(bot_profile: UserProfile,
bot_owner_user_ids(bot_profile))
def do_update_bot_config_data(bot_profile: UserProfile,
config_data: Dict[Text, Text]) -> None:
config_data: Dict[str, str]) -> None:
for key, value in config_data.items():
set_bot_config(bot_profile, key, value)
updated_config_data = get_bot_config(bot_profile)
@ -4755,7 +4755,7 @@ def do_update_bot_config_data(bot_profile: UserProfile,
def get_service_dicts_for_bot(user_profile_id: str) -> List[Dict[str, Any]]:
user_profile = get_user_profile_by_id(user_profile_id)
services = get_bot_services(user_profile_id)
service_dicts = [] # type: List[Dict[Text, Any]]
service_dicts = [] # type: List[Dict[str, Any]]
if user_profile.bot_type == UserProfile.OUTGOING_WEBHOOK_BOT:
service_dicts = [{'base_url': service.base_url,
'interface': service.interface,
@ -4782,12 +4782,12 @@ def get_service_dicts_for_bots(bot_dicts: List[Dict[str, Any]],
if bot_dict['bot_type'] == UserProfile.EMBEDDED_BOT]
embedded_bot_configs = get_bot_configs(embedded_bot_ids)
service_dicts_by_uid = {} # type: Dict[int, List[Dict[Text, Any]]]
service_dicts_by_uid = {} # type: Dict[int, List[Dict[str, Any]]]
for bot_dict in bot_dicts:
bot_profile_id = bot_dict["id"]
bot_type = bot_dict["bot_type"]
services = bot_services_by_uid[bot_profile_id]
service_dicts = [] # type: List[Dict[Text, Any]]
service_dicts = [] # type: List[Dict[str, Any]]
if bot_type == UserProfile.OUTGOING_WEBHOOK_BOT:
service_dicts = [{'base_url': service.base_url,
'interface': service.interface,
@ -4825,7 +4825,7 @@ def get_owned_bot_dicts(user_profile: UserProfile,
}
for botdict in result]
def do_send_user_group_members_update_event(event_name: Text,
def do_send_user_group_members_update_event(event_name: str,
user_group: UserGroup,
user_ids: List[int]) -> None:
event = dict(type="user_group",