zerver/lib: Change use of typing.Text to str.

This commit is contained in:
Aditya Bansal 2018-05-10 22:43:36 +05:30 committed by Tim Abbott
parent 2f3b2fbf59
commit 1f9244e060
43 changed files with 233 additions and 241 deletions

View File

@ -1,5 +1,5 @@
from typing import Iterable, List, Optional, Sequence, Text
from typing import Iterable, List, Optional, Sequence
from django.core.exceptions import ValidationError
from django.utils.translation import ugettext as _
@ -11,7 +11,7 @@ from zerver.models import (
get_user_including_cross_realm,
)
def user_profiles_from_unvalidated_emails(emails: Iterable[Text], realm: Realm) -> List[UserProfile]:
def user_profiles_from_unvalidated_emails(emails: Iterable[str], realm: Realm) -> List[UserProfile]:
user_profiles = [] # type: List[UserProfile]
for email in emails:
try:
@ -21,7 +21,7 @@ def user_profiles_from_unvalidated_emails(emails: Iterable[Text], realm: Realm)
user_profiles.append(user_profile)
return user_profiles
def get_user_profiles(emails: Iterable[Text], realm: Realm) -> List[UserProfile]:
def get_user_profiles(emails: Iterable[str], realm: Realm) -> List[UserProfile]:
try:
return user_profiles_from_unvalidated_emails(emails, realm)
except ValidationError as e:
@ -42,8 +42,8 @@ class Addressee:
# This should be treated as an immutable class.
def __init__(self, msg_type: str,
user_profiles: Optional[Sequence[UserProfile]]=None,
stream_name: Optional[Text]=None,
topic: Optional[Text]=None) -> None:
stream_name: Optional[str]=None,
topic: Optional[str]=None) -> None:
assert(msg_type in ['stream', 'private'])
self._msg_type = msg_type
self._user_profiles = user_profiles
@ -63,21 +63,21 @@ class Addressee:
assert(self.is_private())
return self._user_profiles # type: ignore # assertion protects us
def stream_name(self) -> Text:
def stream_name(self) -> str:
assert(self.is_stream())
assert(self._stream_name is not None)
return self._stream_name
def topic(self) -> Text:
def topic(self) -> str:
assert(self.is_stream())
assert(self._topic is not None)
return self._topic
@staticmethod
def legacy_build(sender: UserProfile,
message_type_name: Text,
message_to: Sequence[Text],
topic_name: Text,
message_type_name: str,
message_to: Sequence[str],
topic_name: str,
realm: Optional[Realm]=None) -> 'Addressee':
# For legacy reason message_to used to be either a list of
@ -110,7 +110,7 @@ class Addressee:
raise JsonableError(_("Invalid message type"))
@staticmethod
def for_stream(stream_name: Text, topic: Text) -> 'Addressee':
def for_stream(stream_name: str, topic: str) -> 'Addressee':
if topic is None:
raise JsonableError(_("Missing topic"))
topic = topic.strip()
@ -123,7 +123,7 @@ class Addressee:
)
@staticmethod
def for_private(emails: Sequence[Text], realm: Realm) -> 'Addressee':
def for_private(emails: Sequence[str], realm: Realm) -> 'Addressee':
user_profiles = get_user_profiles(emails, realm)
return Addressee(
msg_type='private',

View File

@ -3,20 +3,20 @@ from django.db.models import Q
from zerver.models import UserProfile, Realm
from zerver.lib.cache import cache_with_key, realm_alert_words_cache_key
import ujson
from typing import Dict, Iterable, List, Text
from typing import Dict, Iterable, List
@cache_with_key(realm_alert_words_cache_key, timeout=3600*24)
def alert_words_in_realm(realm: Realm) -> Dict[int, List[Text]]:
def alert_words_in_realm(realm: Realm) -> Dict[int, List[str]]:
users_query = UserProfile.objects.filter(realm=realm, is_active=True)
alert_word_data = users_query.filter(~Q(alert_words=ujson.dumps([]))).values('id', 'alert_words')
all_user_words = dict((elt['id'], ujson.loads(elt['alert_words'])) for elt in alert_word_data)
user_ids_with_words = dict((user_id, w) for (user_id, w) in all_user_words.items() if len(w))
return user_ids_with_words
def user_alert_words(user_profile: UserProfile) -> List[Text]:
def user_alert_words(user_profile: UserProfile) -> List[str]:
return ujson.loads(user_profile.alert_words)
def add_user_alert_words(user_profile: UserProfile, alert_words: Iterable[Text]) -> List[Text]:
def add_user_alert_words(user_profile: UserProfile, alert_words: Iterable[str]) -> List[str]:
words = user_alert_words(user_profile)
new_words = [w for w in alert_words if w not in words]
@ -26,7 +26,7 @@ def add_user_alert_words(user_profile: UserProfile, alert_words: Iterable[Text])
return words
def remove_user_alert_words(user_profile: UserProfile, alert_words: Iterable[Text]) -> List[Text]:
def remove_user_alert_words(user_profile: UserProfile, alert_words: Iterable[str]) -> List[str]:
words = user_alert_words(user_profile)
words = [w for w in words if w not in alert_words]
@ -34,6 +34,6 @@ def remove_user_alert_words(user_profile: UserProfile, alert_words: Iterable[Tex
return words
def set_user_alert_words(user_profile: UserProfile, alert_words: List[Text]) -> None:
def set_user_alert_words(user_profile: UserProfile, alert_words: List[str]) -> None:
user_profile.alert_words = ujson.dumps(alert_words)
user_profile.save(update_fields=['alert_words'])

View File

@ -3,14 +3,14 @@ from django.conf import settings
if False:
from zerver.models import UserProfile
from typing import Any, Dict, Optional, Text
from typing import Any, Dict, Optional
from zerver.lib.avatar_hash import gravatar_hash, user_avatar_path_from_ids
from zerver.lib.upload import upload_backend, MEDIUM_AVATAR_SIZE
from zerver.models import UserProfile
import urllib
def avatar_url(user_profile: UserProfile, medium: bool=False, client_gravatar: bool=False) -> Text:
def avatar_url(user_profile: UserProfile, medium: bool=False, client_gravatar: bool=False) -> str:
return get_avatar_field(
user_id=user_profile.id,
@ -22,7 +22,7 @@ def avatar_url(user_profile: UserProfile, medium: bool=False, client_gravatar: b
client_gravatar=client_gravatar,
)
def avatar_url_from_dict(userdict: Dict[str, Any], medium: bool=False) -> Text:
def avatar_url_from_dict(userdict: Dict[str, Any], medium: bool=False) -> str:
'''
DEPRECATED: We should start using
get_avatar_field to populate users,
@ -41,11 +41,11 @@ def avatar_url_from_dict(userdict: Dict[str, Any], medium: bool=False) -> Text:
def get_avatar_field(user_id: int,
realm_id: int,
email: Text,
avatar_source: Text,
email: str,
avatar_source: str,
avatar_version: int,
medium: bool,
client_gravatar: bool) -> Optional[Text]:
client_gravatar: bool) -> Optional[str]:
'''
Most of the parameters to this function map to fields
by the same name in UserProfile (avatar_source, realm_id,
@ -88,12 +88,12 @@ def get_avatar_field(user_id: int,
url += '&version=%d' % (avatar_version,)
return url
def get_gravatar_url(email: Text, avatar_version: int, medium: bool=False) -> Text:
def get_gravatar_url(email: str, avatar_version: int, medium: bool=False) -> str:
url = _get_unversioned_gravatar_url(email, medium)
url += '&version=%d' % (avatar_version,)
return url
def _get_unversioned_gravatar_url(email: Text, medium: bool) -> Text:
def _get_unversioned_gravatar_url(email: str, medium: bool) -> str:
if settings.ENABLE_GRAVATAR:
gravitar_query_suffix = "&s=%s" % (MEDIUM_AVATAR_SIZE,) if medium else ""
hash_key = gravatar_hash(email)
@ -101,17 +101,17 @@ def _get_unversioned_gravatar_url(email: Text, medium: bool) -> Text:
return settings.DEFAULT_AVATAR_URI+'?x=x'
def _get_unversioned_avatar_url(user_profile_id: int,
avatar_source: Text,
avatar_source: str,
realm_id: int,
email: Optional[Text]=None,
medium: bool=False) -> Text:
email: Optional[str]=None,
medium: bool=False) -> str:
if avatar_source == 'U':
hash_key = user_avatar_path_from_ids(user_profile_id, realm_id)
return upload_backend.get_avatar_url(hash_key, medium=medium)
assert email is not None
return _get_unversioned_gravatar_url(email, medium)
def absolute_avatar_url(user_profile: UserProfile) -> Text:
def absolute_avatar_url(user_profile: UserProfile) -> str:
"""Absolute URLs are used to simplify logic for applications that
won't be served by browsers, such as rendering GCM notifications."""
return urllib.parse.urljoin(user_profile.realm.uri, avatar_url(user_profile))

View File

@ -1,6 +1,5 @@
from django.conf import settings
from typing import Text
from zerver.lib.utils import make_safe_digest
@ -8,7 +7,7 @@ from zerver.models import UserProfile
import hashlib
def gravatar_hash(email: Text) -> Text:
def gravatar_hash(email: str) -> str:
"""Compute the Gravatar hash for an email address."""
# Non-ASCII characters aren't permitted by the currently active e-mail
# RFCs. However, the IETF has published https://tools.ietf.org/html/rfc4952,
@ -17,7 +16,7 @@ def gravatar_hash(email: Text) -> Text:
# not error out on it.
return make_safe_digest(email.lower(), hashlib.md5)
def user_avatar_hash(uid: Text) -> Text:
def user_avatar_hash(uid: str) -> str:
# WARNING: If this method is changed, you may need to do a migration
# similar to zerver/migrations/0060_move_avatars_to_be_uid_based.py .
@ -28,12 +27,12 @@ def user_avatar_hash(uid: Text) -> Text:
user_key = uid + settings.AVATAR_SALT
return make_safe_digest(user_key, hashlib.sha1)
def user_avatar_path(user_profile: UserProfile) -> Text:
def user_avatar_path(user_profile: UserProfile) -> str:
# WARNING: If this method is changed, you may need to do a migration
# similar to zerver/migrations/0060_move_avatars_to_be_uid_based.py .
return user_avatar_path_from_ids(user_profile.id, user_profile.realm_id)
def user_avatar_path_from_ids(user_profile_id: int, realm_id: int) -> Text:
def user_avatar_path_from_ids(user_profile_id: int, realm_id: int) -> str:
user_id_hash = user_avatar_hash(str(user_profile_id))
return '%s/%s' % (str(realm_id), user_id_hash)

View File

@ -4,7 +4,7 @@ from django.db.models.query import F
from django.db.models.functions import Length
from zerver.models import BotConfigData, UserProfile
from typing import Text, List, Dict, Optional
from typing import List, Dict, Optional
from collections import defaultdict
@ -16,22 +16,22 @@ import importlib
class ConfigError(Exception):
pass
def get_bot_config(bot_profile: UserProfile) -> Dict[Text, Text]:
def get_bot_config(bot_profile: UserProfile) -> Dict[str, str]:
entries = BotConfigData.objects.filter(bot_profile=bot_profile)
if not entries:
raise ConfigError("No config data available.")
return {entry.key: entry.value for entry in entries}
def get_bot_configs(bot_profile_ids: List[int]) -> Dict[int, Dict[Text, Text]]:
def get_bot_configs(bot_profile_ids: List[int]) -> Dict[int, Dict[str, str]]:
if not bot_profile_ids:
return {}
entries = BotConfigData.objects.filter(bot_profile_id__in=bot_profile_ids)
entries_by_uid = defaultdict(dict) # type: Dict[int, Dict[Text, Text]]
entries_by_uid = defaultdict(dict) # type: Dict[int, Dict[str, str]]
for entry in entries:
entries_by_uid[entry.bot_profile_id].update({entry.key: entry.value})
return entries_by_uid
def get_bot_config_size(bot_profile: UserProfile, key: Optional[Text]=None) -> int:
def get_bot_config_size(bot_profile: UserProfile, key: Optional[str]=None) -> int:
if key is None:
return BotConfigData.objects.filter(bot_profile=bot_profile) \
.annotate(key_size=Length('key'), value_size=Length('value')) \
@ -42,7 +42,7 @@ def get_bot_config_size(bot_profile: UserProfile, key: Optional[Text]=None) -> i
except BotConfigData.DoesNotExist:
return 0
def set_bot_config(bot_profile: UserProfile, key: Text, value: Text) -> None:
def set_bot_config(bot_profile: UserProfile, key: str, value: str) -> None:
config_size_limit = settings.BOT_CONFIG_SIZE_LIMIT
old_entry_size = get_bot_config_size(bot_profile, key)
new_entry_size = len(key) + len(value)

View File

@ -18,7 +18,7 @@ import configparser
if False:
from mypy_extensions import NoReturn
from typing import Any, Optional, List, Dict, Text
from typing import Any, Optional, List, Dict
from types import ModuleType
our_dir = os.path.dirname(os.path.abspath(__file__))
@ -45,16 +45,16 @@ class StateHandler:
self.marshal = lambda obj: json.dumps(obj)
self.demarshal = lambda obj: json.loads(obj)
def get(self, key: Text) -> Text:
def get(self, key: str) -> str:
return self.demarshal(get_bot_storage(self.user_profile, key))
def put(self, key: Text, value: Text) -> None:
def put(self, key: str, value: str) -> None:
set_bot_storage(self.user_profile, [(key, self.marshal(value))])
def remove(self, key: Text) -> None:
def remove(self, key: str) -> None:
remove_bot_storage(self.user_profile, [key])
def contains(self, key: Text) -> bool:
def contains(self, key: str) -> bool:
return is_key_in_bot_storage(self.user_profile, key)
class EmbeddedBotQuitException(Exception):
@ -109,7 +109,7 @@ class EmbeddedBotHandler:
))
# The bot_name argument exists only to comply with ExternalBotHandler.get_config_info().
def get_config_info(self, bot_name: str, optional: bool=False) -> Dict[Text, Text]:
def get_config_info(self, bot_name: str, optional: bool=False) -> Dict[str, str]:
try:
return get_bot_config(self.user_profile)
except ConfigError:

View File

@ -4,18 +4,18 @@ from django.db.models.query import F
from django.db.models.functions import Length
from zerver.models import BotStorageData, UserProfile, Length
from typing import Text, Optional, List, Tuple
from typing import Optional, List, Tuple
class StateError(Exception):
pass
def get_bot_storage(bot_profile: UserProfile, key: Text) -> Text:
def get_bot_storage(bot_profile: UserProfile, key: str) -> str:
try:
return BotStorageData.objects.get(bot_profile=bot_profile, key=key).value
except BotStorageData.DoesNotExist:
raise StateError("Key does not exist.")
def get_bot_storage_size(bot_profile: UserProfile, key: Optional[Text]=None) -> int:
def get_bot_storage_size(bot_profile: UserProfile, key: Optional[str]=None) -> int:
if key is None:
return BotStorageData.objects.filter(bot_profile=bot_profile) \
.annotate(key_size=Length('key'), value_size=Length('value')) \
@ -44,14 +44,14 @@ def set_bot_storage(bot_profile: UserProfile, entries: List[Tuple[str, str]]) ->
BotStorageData.objects.update_or_create(bot_profile=bot_profile, key=key,
defaults={'value': value})
def remove_bot_storage(bot_profile: UserProfile, keys: List[Text]) -> None:
def remove_bot_storage(bot_profile: UserProfile, keys: List[str]) -> None:
queryset = BotStorageData.objects.filter(bot_profile=bot_profile, key__in=keys)
if len(queryset) < len(keys):
raise StateError("Key does not exist.")
queryset.delete()
def is_key_in_bot_storage(bot_profile: UserProfile, key: Text) -> bool:
def is_key_in_bot_storage(bot_profile: UserProfile, key: str) -> bool:
return BotStorageData.objects.filter(bot_profile=bot_profile, key=key).exists()
def get_keys_in_bot_storage(bot_profile: UserProfile) -> List[Text]:
def get_keys_in_bot_storage(bot_profile: UserProfile) -> List[str]:
return list(BotStorageData.objects.filter(bot_profile=bot_profile).values_list('key', flat=True))

View File

@ -82,7 +82,7 @@ import markdown
from django.utils.html import escape
from markdown.extensions.codehilite import CodeHilite, CodeHiliteExtension
from zerver.lib.tex import render_tex
from typing import Any, Dict, Iterable, List, MutableSequence, Optional, Tuple, Union, Text
from typing import Any, Dict, Iterable, List, MutableSequence, Optional, Tuple, Union
# Global vars
FENCE_RE = re.compile("""
@ -132,13 +132,13 @@ class FencedBlockPreprocessor(markdown.preprocessors.Preprocessor):
self.checked_for_codehilite = False
self.codehilite_conf = {} # type: Dict[str, List[Any]]
def run(self, lines: Iterable[Text]) -> List[Text]:
def run(self, lines: Iterable[str]) -> List[str]:
""" Match and store Fenced Code Blocks in the HtmlStash. """
output = [] # type: List[Text]
output = [] # type: List[str]
class BaseHandler:
def handle_line(self, line: Text) -> None:
def handle_line(self, line: str) -> None:
raise NotImplementedError()
def done(self) -> None:
@ -153,7 +153,7 @@ class FencedBlockPreprocessor(markdown.preprocessors.Preprocessor):
def pop() -> None:
handlers.pop()
def check_for_new_fence(output: MutableSequence[Text], line: Text) -> None:
def check_for_new_fence(output: MutableSequence[str], line: str) -> None:
m = FENCE_RE.match(line)
if m:
fence = m.group('fence')
@ -164,16 +164,16 @@ class FencedBlockPreprocessor(markdown.preprocessors.Preprocessor):
output.append(line)
class OuterHandler(BaseHandler):
def __init__(self, output: MutableSequence[Text]) -> None:
def __init__(self, output: MutableSequence[str]) -> None:
self.output = output
def handle_line(self, line: Text) -> None:
def handle_line(self, line: str) -> None:
check_for_new_fence(self.output, line)
def done(self) -> None:
pop()
def generic_handler(output: MutableSequence[Text], fence: Text, lang: Text) -> BaseHandler:
def generic_handler(output: MutableSequence[str], fence: str, lang: str) -> BaseHandler:
if lang in ('quote', 'quoted'):
return QuoteHandler(output, fence)
elif lang in ('math', 'tex', 'latex'):
@ -182,13 +182,13 @@ class FencedBlockPreprocessor(markdown.preprocessors.Preprocessor):
return CodeHandler(output, fence, lang)
class CodeHandler(BaseHandler):
def __init__(self, output: MutableSequence[Text], fence: Text, lang: Text) -> None:
def __init__(self, output: MutableSequence[str], fence: str, lang: str) -> None:
self.output = output
self.fence = fence
self.lang = lang
self.lines = [] # type: List[Text]
self.lines = [] # type: List[str]
def handle_line(self, line: Text) -> None:
def handle_line(self, line: str) -> None:
if line.rstrip() == self.fence:
self.done()
else:
@ -205,12 +205,12 @@ class FencedBlockPreprocessor(markdown.preprocessors.Preprocessor):
pop()
class QuoteHandler(BaseHandler):
def __init__(self, output: MutableSequence[Text], fence: Text) -> None:
def __init__(self, output: MutableSequence[str], fence: str) -> None:
self.output = output
self.fence = fence
self.lines = [] # type: List[Text]
self.lines = [] # type: List[str]
def handle_line(self, line: Text) -> None:
def handle_line(self, line: str) -> None:
if line.rstrip() == self.fence:
self.done()
else:
@ -226,12 +226,12 @@ class FencedBlockPreprocessor(markdown.preprocessors.Preprocessor):
pop()
class TexHandler(BaseHandler):
def __init__(self, output: MutableSequence[Text], fence: Text) -> None:
def __init__(self, output: MutableSequence[str], fence: str) -> None:
self.output = output
self.fence = fence
self.lines = [] # type: List[Text]
self.lines = [] # type: List[str]
def handle_line(self, line: Text) -> None:
def handle_line(self, line: str) -> None:
if line.rstrip() == self.fence:
self.done()
else:
@ -263,7 +263,7 @@ class FencedBlockPreprocessor(markdown.preprocessors.Preprocessor):
output.append('')
return output
def format_code(self, lang: Text, text: Text) -> Text:
def format_code(self, lang: str, text: str) -> str:
if lang:
langclass = LANG_TAG % (lang,)
else:
@ -296,7 +296,7 @@ class FencedBlockPreprocessor(markdown.preprocessors.Preprocessor):
return code
def format_quote(self, text: Text) -> Text:
def format_quote(self, text: str) -> str:
paragraphs = text.split("\n\n")
quoted_paragraphs = []
for paragraph in paragraphs:
@ -304,7 +304,7 @@ class FencedBlockPreprocessor(markdown.preprocessors.Preprocessor):
quoted_paragraphs.append("\n".join("> " + line for line in lines if line != ''))
return "\n\n".join(quoted_paragraphs)
def format_tex(self, text: Text) -> Text:
def format_tex(self, text: str) -> str:
paragraphs = text.split("\n\n")
tex_paragraphs = []
for paragraph in paragraphs:
@ -316,10 +316,10 @@ class FencedBlockPreprocessor(markdown.preprocessors.Preprocessor):
escape(paragraph) + '</span>')
return "\n\n".join(tex_paragraphs)
def placeholder(self, code: Text) -> Text:
def placeholder(self, code: str) -> str:
return self.markdown.htmlStash.store(code, safe=True)
def _escape(self, txt: Text) -> Text:
def _escape(self, txt: str) -> str:
""" basic html escaping """
txt = txt.replace('&', '&amp;')
txt = txt.replace('<', '&lt;')

View File

@ -1,6 +1,6 @@
import re
import markdown
from typing import Any, Dict, List, Optional, Union, Text
from typing import Any, Dict, List, Optional, Union
from typing.re import Match
from markdown.preprocessors import Preprocessor
@ -91,7 +91,7 @@ class Setting(Preprocessor):
done = True
return lines
def handleMatch(self, match: Match[Text]) -> Text:
def handleMatch(self, match: Match[str]) -> str:
setting_identifier = match.group('setting_identifier')
setting_type_name = link_mapping[setting_identifier][0]
setting_name = link_mapping[setting_identifier][1]

View File

@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
from typing import Any, Dict, Optional, Text
from typing import Any, Dict, Optional
import ujson
@ -221,7 +221,7 @@ EMOJI_TWEET = """{
]
}"""
def twitter(tweet_id: Text) -> Optional[Dict[Text, Any]]:
def twitter(tweet_id: str) -> Optional[Dict[str, Any]]:
if tweet_id in ["112652479837110273", "287977969287315456", "287977969287315457"]:
return ujson.loads(NORMAL_TWEET)
elif tweet_id == "287977969287315458":

View File

@ -3,11 +3,9 @@ import codecs
import hashlib
import hmac
from typing import Text
# Encodes the provided URL using the same algorithm used by the camo
# caching https image proxy
def get_camo_url(url: Text) -> Text:
def get_camo_url(url: str) -> str:
# Only encode the url if Camo is enabled
if settings.CAMO_URI == '':
return url

View File

@ -1,4 +1,4 @@
from typing import Any, Dict, List, Optional, Text
from typing import Any, Dict, List, Optional
# This file is adapted from samples/shellinabox/ssh-krb-wrapper in
# https://github.com/davidben/webathena, which has the following
@ -82,8 +82,8 @@ def der_encode_uint32(val: int) -> bytes:
raise ValueError("Bad value")
return der_encode_integer(val)
def der_encode_string(val: Text) -> bytes:
if not isinstance(val, Text):
def der_encode_string(val: str) -> bytes:
if not isinstance(val, str):
raise TypeError("unicode")
return der_encode_tlv(0x1b, val.encode("utf-8"))

View File

@ -3,11 +3,11 @@ import time
from psycopg2.extensions import cursor, connection
from typing import Callable, Optional, Iterable, Any, Dict, List, Union, TypeVar, \
Mapping, Text
Mapping
from zerver.lib.str_utils import NonBinaryStr
CursorObj = TypeVar('CursorObj', bound=cursor)
ParamsT = Union[Iterable[Any], Mapping[Text, Any]]
ParamsT = Union[Iterable[Any], Mapping[str, Any]]
# Similar to the tracking done in Django's CursorDebugWrapper, but done at the
# psycopg2 cursor level so it works with SQLAlchemy.

View File

@ -1,4 +1,4 @@
from typing import Any, Callable, Dict, Iterable, List, Set, Tuple, Text
from typing import Any, Callable, Dict, Iterable, List, Set, Tuple
from collections import defaultdict
import datetime
@ -88,8 +88,8 @@ def gather_hot_conversations(user_profile: UserProfile, stream_messages: QuerySe
# Returns a list of dictionaries containing the templating
# information for each hot conversation.
conversation_length = defaultdict(int) # type: Dict[Tuple[int, Text], int]
conversation_diversity = defaultdict(set) # type: Dict[Tuple[int, Text], Set[Text]]
conversation_length = defaultdict(int) # type: Dict[Tuple[int, str], int]
conversation_diversity = defaultdict(set) # type: Dict[Tuple[int, str], Set[str]]
for user_message in stream_messages:
if not user_message.message.sent_by_human():
# Don't include automated messages in the count.
@ -143,7 +143,7 @@ def gather_hot_conversations(user_profile: UserProfile, stream_messages: QuerySe
hot_conversation_render_payloads.append(teaser_data)
return hot_conversation_render_payloads
def gather_new_users(user_profile: UserProfile, threshold: datetime.datetime) -> Tuple[int, List[Text]]:
def gather_new_users(user_profile: UserProfile, threshold: datetime.datetime) -> Tuple[int, List[str]]:
# Gather information on users in the realm who have recently
# joined.
if user_profile.realm.is_zephyr_mirror_realm:
@ -157,7 +157,7 @@ def gather_new_users(user_profile: UserProfile, threshold: datetime.datetime) ->
return len(user_names), user_names
def gather_new_streams(user_profile: UserProfile,
threshold: datetime.datetime) -> Tuple[int, Dict[str, List[Text]]]:
threshold: datetime.datetime) -> Tuple[int, Dict[str, List[str]]]:
if user_profile.can_access_public_streams():
new_streams = list(get_active_streams(user_profile.realm).filter(
invite_only=False, date_created__gt=threshold))
@ -177,7 +177,7 @@ def gather_new_streams(user_profile: UserProfile,
return len(new_streams), {"html": streams_html, "plain": streams_plain}
def enough_traffic(unread_pms: Text, hot_conversations: Text, new_streams: int, new_users: int) -> bool:
def enough_traffic(unread_pms: str, hot_conversations: str, new_streams: int, new_users: int) -> bool:
if unread_pms or hot_conversations:
# If you have any unread traffic, good enough.
return True

View File

@ -2,9 +2,8 @@ from django.core.exceptions import ValidationError
from django.utils.translation import ugettext as _
import re
from typing import Text
def validate_domain(domain: Text) -> None:
def validate_domain(domain: str) -> None:
if domain is None or len(domain) == 0:
raise ValidationError(_("Domain can't be empty."))
if '.' not in domain:

View File

@ -1,4 +1,4 @@
from typing import Any, Dict, List, Optional, Text, Union
from typing import Any, Dict, List, Optional, Union
import logging
import re
@ -28,7 +28,7 @@ talon.init()
logger = logging.getLogger(__name__)
def redact_stream(error_message: Text) -> Text:
def redact_stream(error_message: str) -> str:
domain = settings.EMAIL_GATEWAY_PATTERN.rsplit('@')[-1]
stream_match = re.search('\\b(.*?)@' + domain, error_message)
if stream_match:
@ -36,7 +36,7 @@ def redact_stream(error_message: Text) -> Text:
return error_message.replace(stream_name, "X" * len(stream_name))
return error_message
def report_to_zulip(error_message: Text) -> None:
def report_to_zulip(error_message: str) -> None:
if settings.ERROR_BOT is None:
return
error_bot = get_system_bot(settings.ERROR_BOT)
@ -44,7 +44,7 @@ def report_to_zulip(error_message: Text) -> None:
send_zulip(settings.ERROR_BOT, error_stream, "email mirror error",
"""~~~\n%s\n~~~""" % (error_message,))
def log_and_report(email_message: message.Message, error_message: Text, debug_info: Dict[str, Any]) -> None:
def log_and_report(email_message: message.Message, error_message: str, debug_info: Dict[str, Any]) -> None:
scrubbed_error = u"Sender: %s\n%s" % (email_message.get("From"),
redact_stream(error_message))
@ -65,22 +65,22 @@ def log_and_report(email_message: message.Message, error_message: Text, debug_in
redis_client = get_redis_client()
def missed_message_redis_key(token: Text) -> Text:
def missed_message_redis_key(token: str) -> str:
return 'missed_message:' + token
def is_missed_message_address(address: Text) -> bool:
def is_missed_message_address(address: str) -> bool:
msg_string = get_email_gateway_message_string_from_address(address)
return is_mm_32_format(msg_string)
def is_mm_32_format(msg_string: Optional[Text]) -> bool:
def is_mm_32_format(msg_string: Optional[str]) -> bool:
'''
Missed message strings are formatted with a little "mm" prefix
followed by a randomly generated 32-character string.
'''
return msg_string is not None and msg_string.startswith('mm') and len(msg_string) == 34
def get_missed_message_token_from_address(address: Text) -> Text:
def get_missed_message_token_from_address(address: str) -> str:
msg_string = get_email_gateway_message_string_from_address(address)
if msg_string is None:
@ -125,7 +125,7 @@ def create_missed_message_address(user_profile: UserProfile, message: Message) -
return settings.EMAIL_GATEWAY_PATTERN % (address,)
def mark_missed_message_address_as_used(address: Text) -> None:
def mark_missed_message_address_as_used(address: str) -> None:
token = get_missed_message_token_from_address(address)
key = missed_message_redis_key(token)
with redis_client.pipeline() as pipeline:
@ -136,7 +136,7 @@ def mark_missed_message_address_as_used(address: Text) -> None:
redis_client.delete(key)
raise ZulipEmailForwardError('Missed message address has already been used')
def construct_zulip_body(message: message.Message, realm: Realm) -> Text:
def construct_zulip_body(message: message.Message, realm: Realm) -> str:
body = extract_body(message)
# Remove null characters, since Zulip will reject
body = body.replace("\x00", "")
@ -147,7 +147,7 @@ def construct_zulip_body(message: message.Message, realm: Realm) -> Text:
body = '(No email body)'
return body
def send_to_missed_message_address(address: Text, message: message.Message) -> None:
def send_to_missed_message_address(address: str, message: message.Message) -> None:
token = get_missed_message_token_from_address(address)
key = missed_message_redis_key(token)
result = redis_client.hmget(key, 'user_profile_id', 'recipient_id', 'subject')
@ -189,7 +189,7 @@ def send_to_missed_message_address(address: Text, message: message.Message) -> N
class ZulipEmailForwardError(Exception):
pass
def send_zulip(sender: Text, stream: Stream, topic: Text, content: Text) -> None:
def send_zulip(sender: str, stream: Stream, topic: str, content: str) -> None:
internal_send_message(
stream.realm,
sender,
@ -199,14 +199,14 @@ def send_zulip(sender: Text, stream: Stream, topic: Text, content: Text) -> None
content[:2000],
email_gateway=True)
def valid_stream(stream_name: Text, token: Text) -> bool:
def valid_stream(stream_name: str, token: str) -> bool:
try:
stream = Stream.objects.get(email_token=token)
return stream.name.lower() == stream_name.lower()
except Stream.DoesNotExist:
return False
def get_message_part_by_type(message: message.Message, content_type: Text) -> Optional[Text]:
def get_message_part_by_type(message: message.Message, content_type: str) -> Optional[str]:
charsets = message.get_charsets()
for idx, part in enumerate(message.walk()):
@ -217,7 +217,7 @@ def get_message_part_by_type(message: message.Message, content_type: Text) -> Op
return content.decode(charsets[idx], errors="ignore")
return None
def extract_body(message: message.Message) -> Text:
def extract_body(message: message.Message) -> str:
# If the message contains a plaintext version of the body, use
# that.
plaintext_content = get_message_part_by_type(message, "text/plain")
@ -231,7 +231,7 @@ def extract_body(message: message.Message) -> Text:
raise ZulipEmailForwardError("Unable to find plaintext or HTML message body")
def filter_footer(text: Text) -> Text:
def filter_footer(text: str) -> str:
# Try to filter out obvious footers.
possible_footers = [line for line in text.split("\n") if line.strip().startswith("--")]
if len(possible_footers) != 1:
@ -241,7 +241,7 @@ def filter_footer(text: Text) -> Text:
return text.partition("--")[0].strip()
def extract_and_upload_attachments(message: message.Message, realm: Realm) -> Text:
def extract_and_upload_attachments(message: message.Message, realm: Realm) -> str:
user_profile = get_system_bot(settings.EMAIL_GATEWAY_BOT)
attachment_links = []
@ -268,7 +268,7 @@ def extract_and_upload_attachments(message: message.Message, realm: Realm) -> Te
return "\n".join(attachment_links)
def extract_and_validate(email: Text) -> Stream:
def extract_and_validate(email: str) -> Stream:
temp = decode_email_address(email)
if temp is None:
raise ZulipEmailForwardError("Malformed email recipient " + email)
@ -279,12 +279,12 @@ def extract_and_validate(email: Text) -> Stream:
return Stream.objects.get(email_token=token)
def find_emailgateway_recipient(message: message.Message) -> Text:
def find_emailgateway_recipient(message: message.Message) -> str:
# We can't use Delivered-To; if there is a X-Gm-Original-To
# it is more accurate, so try to find the most-accurate
# recipient list in descending priority order
recipient_headers = ["X-Gm-Original-To", "Delivered-To", "To"]
recipients = [] # type: List[Union[Text, Header]]
recipients = [] # type: List[Union[str, Header]]
for recipient_header in recipient_headers:
r = message.get_all(recipient_header, None)
if r:
@ -299,7 +299,7 @@ def find_emailgateway_recipient(message: message.Message) -> Text:
raise ZulipEmailForwardError("Missing recipient in mirror email")
def process_stream_message(to: Text, subject: Text, message: message.Message,
def process_stream_message(to: str, subject: str, message: message.Message,
debug_info: Dict[str, Any]) -> None:
stream = extract_and_validate(to)
body = construct_zulip_body(message, stream.realm)
@ -308,12 +308,12 @@ def process_stream_message(to: Text, subject: Text, message: message.Message,
logger.info("Successfully processed email to %s (%s)" % (
stream.name, stream.realm.string_id))
def process_missed_message(to: Text, message: message.Message, pre_checked: bool) -> None:
def process_missed_message(to: str, message: message.Message, pre_checked: bool) -> None:
if not pre_checked:
mark_missed_message_address_as_used(to)
send_to_missed_message_address(to, message)
def process_message(message: message.Message, rcpt_to: Optional[Text]=None, pre_checked: bool=False) -> None:
def process_message(message: message.Message, rcpt_to: Optional[str]=None, pre_checked: bool=False) -> None:
subject_header = str(message.get("Subject", "")).strip()
if subject_header == "":
subject_header = "(no topic)"
@ -344,7 +344,7 @@ def process_message(message: message.Message, rcpt_to: Optional[Text]=None, pre_
log_and_report(message, str(e), debug_info)
def mirror_email_message(data: Dict[Text, Text]) -> Dict[str, str]:
def mirror_email_message(data: Dict[str, str]) -> Dict[str, str]:
rcpt_to = data['recipient']
if is_missed_message_address(rcpt_to):
try:

View File

@ -5,7 +5,7 @@ import ujson
from django.conf import settings
from django.utils.translation import ugettext as _
from typing import Optional, Text, Tuple
from typing import Optional, Tuple
from zerver.lib.request import JsonableError
from zerver.lib.upload import upload_backend
@ -34,7 +34,7 @@ emoticon_regex = ('(?<![^{0}])(?P<emoticon>('.format(terminal_symbols)
+ '))(?![^{0}])'.format(terminal_symbols))
# Translates emoticons to their colon syntax, e.g. `:smiley:`.
def translate_emoticons(text: Text) -> Text:
def translate_emoticons(text: str) -> str:
translated = text
for emoticon in EMOTICON_CONVERSIONS:
@ -48,7 +48,7 @@ with open(NAME_TO_CODEPOINT_PATH) as fp:
with open(CODEPOINT_TO_NAME_PATH) as fp:
codepoint_to_name = ujson.load(fp)
def emoji_name_to_emoji_code(realm: Realm, emoji_name: Text) -> Tuple[Text, Text]:
def emoji_name_to_emoji_code(realm: Realm, emoji_name: str) -> Tuple[str, str]:
realm_emojis = realm.get_active_emoji()
realm_emoji = realm_emojis.get(emoji_name)
if realm_emoji is not None:
@ -59,7 +59,7 @@ def emoji_name_to_emoji_code(realm: Realm, emoji_name: Text) -> Tuple[Text, Text
return name_to_codepoint[emoji_name], Reaction.UNICODE_EMOJI
raise JsonableError(_("Emoji '%s' does not exist" % (emoji_name,)))
def check_valid_emoji(realm: Realm, emoji_name: Text) -> None:
def check_valid_emoji(realm: Realm, emoji_name: str) -> None:
emoji_name_to_emoji_code(realm, emoji_name)
def check_emoji_request(realm: Realm, emoji_name: str, emoji_code: str,
@ -89,7 +89,7 @@ def check_emoji_request(realm: Realm, emoji_name: str, emoji_code: str,
# The above are the only valid emoji types
raise JsonableError(_("Invalid emoji type."))
def check_emoji_admin(user_profile: UserProfile, emoji_name: Optional[Text]=None) -> None:
def check_emoji_admin(user_profile: UserProfile, emoji_name: Optional[str]=None) -> None:
"""Raises an exception if the user cannot administer the target realm
emoji name in their organization."""
@ -113,15 +113,15 @@ def check_emoji_admin(user_profile: UserProfile, emoji_name: Optional[Text]=None
if not user_profile.is_realm_admin and not current_user_is_author:
raise JsonableError(_("Must be an organization administrator or emoji author"))
def check_valid_emoji_name(emoji_name: Text) -> None:
def check_valid_emoji_name(emoji_name: str) -> None:
if re.match('^[0-9a-z.\-_]+(?<![.\-_])$', emoji_name):
return
raise JsonableError(_("Invalid characters in emoji name"))
def get_emoji_url(emoji_file_name: Text, realm_id: int) -> Text:
def get_emoji_url(emoji_file_name: str, realm_id: int) -> str:
return upload_backend.get_emoji_url(emoji_file_name, realm_id)
def get_emoji_file_name(emoji_file_name: Text, emoji_id: int) -> Text:
def get_emoji_file_name(emoji_file_name: str, emoji_id: int) -> str:
_, image_ext = os.path.splitext(emoji_file_name)
return ''.join((str(emoji_id), image_ext))

View File

@ -7,7 +7,7 @@ from django.conf import settings
from django.core.mail import mail_admins
from django.http import HttpResponse
from django.utils.translation import ugettext as _
from typing import Any, Dict, Optional, Text
from typing import Any, Dict, Optional
from zerver.models import get_system_bot
from zerver.lib.actions import internal_send_message
@ -136,7 +136,7 @@ def email_server_error(report: Dict[str, Any]) -> None:
mail_admins(format_subject(subject), message, fail_silently=True)
def do_report_error(deployment_name: Text, type: Text, report: Dict[str, Any]) -> HttpResponse:
def do_report_error(deployment_name: str, type: str, report: Dict[str, Any]) -> HttpResponse:
report['deployment'] = deployment_name
if type == 'browser':
notify_browser_error(report)

View File

@ -2,7 +2,7 @@
import time
import logging
from typing import Callable, List, TypeVar, Text
from typing import Callable, List, TypeVar
from psycopg2.extensions import cursor
CursorObj = TypeVar('CursorObj', bound=cursor)
@ -20,7 +20,7 @@ migration runs.
logger = logging.getLogger('zulip.fix_unreads')
logger.setLevel(logging.WARNING)
def build_topic_mute_checker(cursor: CursorObj, user_profile: UserProfile) -> Callable[[int, Text], bool]:
def build_topic_mute_checker(cursor: CursorObj, user_profile: UserProfile) -> Callable[[int, str], bool]:
'''
This function is similar to the function of the same name
in zerver/lib/topic_mutes.py, but it works without the ORM,
@ -43,7 +43,7 @@ def build_topic_mute_checker(cursor: CursorObj, user_profile: UserProfile) -> Ca
for (recipient_id, topic_name) in rows
}
def is_muted(recipient_id: int, topic: Text) -> bool:
def is_muted(recipient_id: int, topic: str) -> bool:
return (recipient_id, topic.lower()) in tups
return is_muted

View File

@ -1,7 +1,7 @@
import itertools
import ujson
import random
from typing import List, Dict, Any, Text, Optional
from typing import List, Dict, Any, Optional
def load_config() -> Dict[str, Any]:
with open("zerver/tests/fixtures/config.generate_data.json", "r") as infile:

View File

@ -3,7 +3,7 @@ from django.utils.translation import ugettext as _
from zerver.models import UserProfile, UserHotspot
from typing import List, Text, Dict
from typing import List, Dict
ALL_HOTSPOTS = {
'intro_reply': {
@ -26,7 +26,7 @@ ALL_HOTSPOTS = {
'description': _('Click here to start a new conversation. Pick a topic '
'(2-3 words is best), and give it a go!'),
},
} # type: Dict[str, Dict[str, Text]]
} # type: Dict[str, Dict[str, str]]
def get_next_hotspots(user: UserProfile) -> List[Dict[str, object]]:
# For manual testing, it can be convenient to set

View File

@ -4,10 +4,10 @@ from django.conf import settings
import hashlib
import base64
from typing import Optional, Text
from typing import Optional
def initial_password(email: Text) -> Optional[Text]:
def initial_password(email: str) -> Optional[str]:
"""Given an email address, returns the initial password for that account, as
created by populate_db."""

View File

@ -1,10 +1,10 @@
from typing import Any, Callable, Dict, List, Tuple, Text
from typing import Any, Callable, Dict, List, Tuple
from django.db.models.query import QuerySet
import re
import time
def create_index_if_not_exist(index_name: Text, table_name: Text, column_string: Text,
where_clause: Text) -> Text:
def create_index_if_not_exist(index_name: str, table_name: str, column_string: str,
where_clause: str) -> str:
#
# FUTURE TODO: When we no longer need to support postgres 9.3 for Trusty,
# we can use "IF NOT EXISTS", which is part of postgres 9.5

View File

@ -1,7 +1,6 @@
from typing import Text
from disposable_email_domains import blacklist
def is_reserved_subdomain(subdomain: Text) -> bool:
def is_reserved_subdomain(subdomain: str) -> bool:
if subdomain in ZULIP_RESERVED_SUBDOMAINS:
return True
if subdomain[-1] == 's' and subdomain[:-1] in ZULIP_RESERVED_SUBDOMAINS:
@ -12,7 +11,7 @@ def is_reserved_subdomain(subdomain: Text) -> bool:
return True
return False
def is_disposable_domain(domain: Text) -> bool:
def is_disposable_domain(domain: str) -> bool:
return domain.lower() in DISPOSABLE_DOMAINS
ZULIP_RESERVED_SUBDOMAINS = frozenset([

View File

@ -1,16 +1,16 @@
from zerver.lib.request import JsonableError
from django.utils.translation import ugettext as _
from typing import Any, Callable, Iterable, Mapping, Sequence, Text
from typing import Any, Callable, Iterable, Mapping, Sequence
def check_supported_events_narrow_filter(narrow: Iterable[Sequence[Text]]) -> None:
def check_supported_events_narrow_filter(narrow: Iterable[Sequence[str]]) -> None:
for element in narrow:
operator = element[0]
if operator not in ["stream", "topic", "sender", "is"]:
raise JsonableError(_("Operator %s not supported.") % (operator,))
def build_narrow_filter(narrow: Iterable[Sequence[Text]]) -> Callable[[Mapping[str, Any]], bool]:
def build_narrow_filter(narrow: Iterable[Sequence[str]]) -> Callable[[Mapping[str, Any]], bool]:
"""Changes to this function should come with corresponding changes to
BuildNarrowFilterTest."""
check_supported_events_narrow_filter(narrow)

View File

@ -1,15 +1,13 @@
from django.conf import settings
from typing import Text
from zerver.lib.avatar_hash import gravatar_hash, user_avatar_hash
from zerver.lib.upload import upload_backend
from zerver.models import Realm
def realm_icon_url(realm: Realm) -> Text:
def realm_icon_url(realm: Realm) -> str:
return get_realm_icon_url(realm)
def get_realm_icon_url(realm: Realm) -> Text:
def get_realm_icon_url(realm: Realm) -> str:
if realm.icon_source == 'U':
return upload_backend.get_realm_icon_url(realm.id, realm.icon_version)
elif settings.ENABLE_GRAVATAR:

View File

@ -2,13 +2,13 @@
from django.http import HttpResponse, HttpResponseNotAllowed
import ujson
from typing import Optional, Any, Dict, List, Text
from typing import Optional, Any, Dict, List
from zerver.lib.exceptions import JsonableError
class HttpResponseUnauthorized(HttpResponse):
status_code = 401
def __init__(self, realm: Text, www_authenticate: Optional[Text]=None) -> None:
def __init__(self, realm: str, www_authenticate: Optional[str]=None) -> None:
HttpResponse.__init__(self)
if www_authenticate is None:
self["WWW-Authenticate"] = 'Basic realm="%s"' % (realm,)
@ -17,21 +17,21 @@ class HttpResponseUnauthorized(HttpResponse):
else:
raise AssertionError("Invalid www_authenticate value!")
def json_unauthorized(message: Text, www_authenticate: Optional[Text]=None) -> HttpResponse:
def json_unauthorized(message: str, www_authenticate: Optional[str]=None) -> HttpResponse:
resp = HttpResponseUnauthorized("zulip", www_authenticate=www_authenticate)
resp.content = (ujson.dumps({"result": "error",
"msg": message}) + "\n").encode()
return resp
def json_method_not_allowed(methods: List[Text]) -> HttpResponseNotAllowed:
def json_method_not_allowed(methods: List[str]) -> HttpResponseNotAllowed:
resp = HttpResponseNotAllowed(methods)
resp.content = ujson.dumps({"result": "error",
"msg": "Method Not Allowed",
"allowed_methods": methods}).encode()
return resp
def json_response(res_type: Text="success",
msg: Text="",
def json_response(res_type: str="success",
msg: str="",
data: Optional[Dict[str, Any]]=None,
status: int=200) -> HttpResponse:
content = {"result": res_type, "msg": msg}
@ -56,5 +56,5 @@ def json_response_from_error(exception: JsonableError) -> HttpResponse:
data=exception.data,
status=exception.http_status_code)
def json_error(msg: Text, data: Optional[Dict[str, Any]]=None, status: int=400) -> HttpResponse:
def json_error(msg: str, data: Optional[Dict[str, Any]]=None, status: int=400) -> HttpResponse:
return json_response(res_type="error", msg=msg, data=data, status=status)

View File

@ -6,13 +6,13 @@ from django.contrib.auth import SESSION_KEY, get_user_model
from django.contrib.sessions.models import Session
from django.utils.timezone import now as timezone_now
from importlib import import_module
from typing import List, Mapping, Optional, Text
from typing import List, Mapping, Optional
from zerver.models import Realm, UserProfile, get_user_profile_by_id
session_engine = import_module(settings.SESSION_ENGINE)
def get_session_dict_user(session_dict: Mapping[Text, int]) -> Optional[int]:
def get_session_dict_user(session_dict: Mapping[str, int]) -> Optional[int]:
# Compare django.contrib.auth._get_user_session_key
try:
return get_user_model()._meta.pk.to_python(session_dict[SESSION_KEY])

View File

@ -8,7 +8,7 @@ Currently we have strings of 3 semantic types:
1. text strings: These strings are used to represent all textual data,
like people's names, stream names, content of messages, etc.
These strings can contain non-ASCII characters, so its type should be
typing.Text (which is `str` in python 3 and `unicode` in python 2).
typing.str (which is `str` in python 3 and `unicode` in python 2).
2. binary strings: These strings are used to represent binary data.
This should be of type `bytes`
@ -29,34 +29,34 @@ force_text and force_bytes.
It is recommended to use the utility functions for other string conversions.
"""
from typing import Any, Dict, Mapping, Union, TypeVar, Text
from typing import Any, Dict, Mapping, Union, TypeVar
NonBinaryStr = TypeVar('NonBinaryStr', str, Text)
NonBinaryStr = TypeVar('NonBinaryStr', str, str)
# This is used to represent text or native strings
def force_text(s: Union[Text, bytes], encoding: str='utf-8') -> Text:
def force_text(s: Union[str, bytes], encoding: str='utf-8') -> str:
"""converts a string to a text string"""
if isinstance(s, Text):
if isinstance(s, str):
return s
elif isinstance(s, bytes):
return s.decode(encoding)
else:
raise TypeError("force_text expects a string type")
def force_bytes(s: Union[Text, bytes], encoding: str='utf-8') -> bytes:
def force_bytes(s: Union[str, bytes], encoding: str='utf-8') -> bytes:
"""converts a string to binary string"""
if isinstance(s, bytes):
return s
elif isinstance(s, Text):
elif isinstance(s, str):
return s.encode(encoding)
else:
raise TypeError("force_bytes expects a string type")
def force_str(s: Union[Text, bytes], encoding: str='utf-8') -> str:
def force_str(s: Union[str, bytes], encoding: str='utf-8') -> str:
"""converts a string to a native string"""
if isinstance(s, str):
return s
elif isinstance(s, Text):
elif isinstance(s, str):
return s.encode(encoding)
elif isinstance(s, bytes):
return s.decode(encoding)
@ -67,11 +67,11 @@ class ModelReprMixin:
"""
This mixin provides a python 2 and 3 compatible way of handling string representation of a model.
When declaring a model, inherit this mixin before django.db.models.Model.
Define __unicode__ on your model which returns a typing.Text object.
Define __unicode__ on your model which returns a str object.
This mixin will automatically define __str__ and __repr__.
"""
def __unicode__(self) -> Text:
def __unicode__(self) -> str:
# Originally raised an exception, but Django (e.g. the ./manage.py shell)
# was catching the exception and not displaying any sort of error
return "Implement __unicode__ in your subclass of ModelReprMixin!"

View File

@ -3,11 +3,11 @@
from django.conf import settings
from django.http import HttpRequest
import re
from typing import Optional, Text
from typing import Optional
from zerver.models import get_realm, Realm, UserProfile
def get_subdomain(request: HttpRequest) -> Text:
def get_subdomain(request: HttpRequest) -> str:
# The HTTP spec allows, but doesn't require, a client to omit the
# port in the `Host` header if it's "the default port for the
@ -41,7 +41,7 @@ def get_subdomain(request: HttpRequest) -> Text:
def is_subdomain_root_or_alias(request: HttpRequest) -> bool:
return get_subdomain(request) == Realm.SUBDOMAIN_FOR_ROOT_DOMAIN
def user_matches_subdomain(realm_subdomain: Optional[Text], user_profile: UserProfile) -> bool:
def user_matches_subdomain(realm_subdomain: Optional[str], user_profile: UserProfile) -> bool:
if realm_subdomain is None:
return True # nocoverage # This state may no longer be possible.
return user_profile.realm.subdomain == realm_subdomain

View File

@ -3,7 +3,7 @@ import os
import re
import hashlib
import sys
from typing import Any, List, Optional, Text
from typing import Any, List, Optional
from importlib import import_module
from io import StringIO
@ -20,7 +20,7 @@ from scripts.lib.zulip_tools import get_dev_uuid_var_path
UUID_VAR_DIR = get_dev_uuid_var_path()
FILENAME_SPLITTER = re.compile('[\W\-_]')
def database_exists(database_name: Text, **options: Any) -> bool:
def database_exists(database_name: str, **options: Any) -> bool:
db = options.get('database', DEFAULT_DB_ALIAS)
try:
connection = connections[db]
@ -59,7 +59,7 @@ def get_migration_status(**options: Any) -> str:
output = out.read()
return re.sub('\x1b\[(1|0)m', '', output)
def are_migrations_the_same(migration_file: Text, **options: Any) -> bool:
def are_migrations_the_same(migration_file: str, **options: Any) -> bool:
if not os.path.exists(migration_file):
return False

View File

@ -3,9 +3,9 @@ import logging
import os
import subprocess
from django.conf import settings
from typing import Optional, Text
from typing import Optional
def render_tex(tex: Text, is_inline: bool=True) -> Optional[Text]:
def render_tex(tex: str, is_inline: bool=True) -> Optional[str]:
"""Render a TeX string into HTML using KaTeX
Returns the HTML string, or None if there was some error in the TeX syntax

View File

@ -1,10 +1,10 @@
from typing import Text, List
from typing import List
import pytz
def get_all_timezones() -> List[Text]:
def get_all_timezones() -> List[str]:
return sorted(pytz.all_timezones)
def get_timezone(tz: Text) -> pytz.datetime.tzinfo:
def get_timezone(tz: str) -> pytz.datetime.tzinfo:
return pytz.timezone(tz)

View File

@ -2,7 +2,7 @@
import sys
import functools
from typing import Any, Callable, IO, Mapping, Sequence, TypeVar, Text
from typing import Any, Callable, IO, Mapping, Sequence, TypeVar
def get_mapping_type_str(x: Mapping[Any, Any]) -> str:
container_type = type(x).__name__
@ -44,7 +44,7 @@ def get_sequence_type_str(x: Sequence[Any]) -> str:
else:
return '%s([%s, ...])' % (container_type, elem_type)
expansion_blacklist = [Text, bytes]
expansion_blacklist = [str, bytes]
def get_type_str(x: Any) -> str:
if x is None:

View File

@ -3,17 +3,17 @@ import re
import os
import sourcemap
from typing import Dict, List, Text
from typing import Dict, List
class SourceMap:
'''Map (line, column) pairs from generated to source file.'''
def __init__(self, sourcemap_dirs: List[Text]) -> None:
def __init__(self, sourcemap_dirs: List[str]) -> None:
self._dirs = sourcemap_dirs
self._indices = {} # type: Dict[Text, sourcemap.SourceMapDecoder]
self._indices = {} # type: Dict[str, sourcemap.SourceMapDecoder]
def _index_for(self, minified_src: Text) -> sourcemap.SourceMapDecoder:
def _index_for(self, minified_src: str) -> sourcemap.SourceMapDecoder:
'''Return the source map index for minified_src, loading it if not
already loaded.'''
if minified_src not in self._indices:
@ -26,8 +26,8 @@ class SourceMap:
return self._indices[minified_src]
def annotate_stacktrace(self, stacktrace: Text) -> Text:
out = '' # type: Text
def annotate_stacktrace(self, stacktrace: str) -> str:
out = '' # type: str
for ln in stacktrace.splitlines():
out += ln + '\n'
match = re.search(r'/static/(?:webpack-bundles|min)/(.+)(\.[\.0-9a-f]+)\.js:(\d+):(\d+)', ln)

View File

@ -1,8 +1,8 @@
from typing import Optional, Text, Dict, Any
from typing import Optional, Dict, Any
from pyoembed import oEmbed, PyOembedException
def get_oembed_data(url: Text,
def get_oembed_data(url: str,
maxwidth: Optional[int]=640,
maxheight: Optional[int]=480) -> Optional[Dict[Any, Any]]:
try:

View File

@ -1,9 +1,9 @@
from typing import Any, Text
from typing import Any
from bs4 import BeautifulSoup
class BaseParser:
def __init__(self, html_source: Text) -> None:
def __init__(self, html_source: str) -> None:
self._soup = BeautifulSoup(html_source, "lxml")
def extract_data(self) -> Any:

View File

@ -1,15 +1,15 @@
from typing import Dict, Optional, Text
from typing import Dict, Optional
from zerver.lib.url_preview.parsers.base import BaseParser
class GenericParser(BaseParser):
def extract_data(self) -> Dict[str, Optional[Text]]:
def extract_data(self) -> Dict[str, Optional[str]]:
return {
'title': self._get_title(),
'description': self._get_description(),
'image': self._get_image()}
def _get_title(self) -> Optional[Text]:
def _get_title(self) -> Optional[str]:
soup = self._soup
if (soup.title and soup.title.text != ''):
return soup.title.text
@ -17,7 +17,7 @@ class GenericParser(BaseParser):
return soup.h1.text
return None
def _get_description(self) -> Optional[Text]:
def _get_description(self) -> Optional[str]:
soup = self._soup
meta_description = soup.find('meta', attrs={'name': 'description'})
if (meta_description and meta_description['content'] != ''):
@ -32,7 +32,7 @@ class GenericParser(BaseParser):
return first_p.string
return None
def _get_image(self) -> Optional[Text]:
def _get_image(self) -> Optional[str]:
"""
Finding a first image after the h1 header.
Presumably it will be the main image.

View File

@ -1,10 +1,10 @@
import re
from typing import Dict, Text
from typing import Dict
from .base import BaseParser
class OpenGraphParser(BaseParser):
def extract_data(self) -> Dict[str, Text]:
def extract_data(self) -> Dict[str, str]:
meta = self._soup.findAll('meta')
content = {}
for tag in meta:

View File

@ -1,7 +1,7 @@
import re
import logging
import traceback
from typing import Any, Optional, Text, Dict
from typing import Any, Optional, Dict
from typing.re import Match
import requests
from zerver.lib.cache import cache_with_key, get_cache_with_key
@ -20,16 +20,16 @@ link_regex = re.compile(
r'(?:/?|[/?]\S+)$', re.IGNORECASE)
def is_link(url: Text) -> Match[Text]:
def is_link(url: str) -> Match[str]:
return link_regex.match(smart_text(url))
def cache_key_func(url: Text) -> Text:
def cache_key_func(url: str) -> str:
return url
@cache_with_key(cache_key_func, cache_name=CACHE_NAME, with_statsd_key="urlpreview_data")
def get_link_embed_data(url: Text,
def get_link_embed_data(url: str,
maxwidth: Optional[int]=640,
maxheight: Optional[int]=480) -> Optional[Dict[Any, Any]]:
if not is_link(url):
@ -59,5 +59,5 @@ def get_link_embed_data(url: Text,
@get_cache_with_key(cache_key_func, cache_name=CACHE_NAME)
def link_embed_data_from_cache(url: Text, maxwidth: Optional[int]=640, maxheight: Optional[int]=480) -> Any:
def link_embed_data_from_cache(url: str, maxwidth: Optional[int]=640, maxheight: Optional[int]=480) -> Any:
return

View File

@ -5,7 +5,7 @@ from django.db import transaction
from django.utils.translation import ugettext as _
from zerver.lib.exceptions import JsonableError
from zerver.models import UserProfile, Realm, UserGroupMembership, UserGroup
from typing import Dict, Iterable, List, Text, Tuple, Any
from typing import Dict, Iterable, List, Tuple, Any
def access_user_group_by_id(user_group_id: int, user_profile: UserProfile) -> UserGroup:
try:
@ -22,7 +22,7 @@ def user_groups_in_realm(realm: Realm) -> List[UserGroup]:
user_groups = UserGroup.objects.filter(realm=realm)
return list(user_groups)
def user_groups_in_realm_serialized(realm: Realm) -> List[Dict[Text, Any]]:
def user_groups_in_realm_serialized(realm: Realm) -> List[Dict[str, Any]]:
"""This function is used in do_events_register code path so this code
should be performant. We need to do 2 database queries because
Django's ORM doesn't properly support the left join between
@ -67,8 +67,8 @@ def check_remove_user_from_user_group(user_profile: UserProfile, user_group: Use
except Exception:
return False
def create_user_group(name: Text, members: List[UserProfile], realm: Realm,
description: Text='') -> UserGroup:
def create_user_group(name: str, members: List[UserProfile], realm: Realm,
description: str='') -> UserGroup:
with transaction.atomic():
user_group = UserGroup.objects.create(name=name, realm=realm,
description=description)

View File

@ -1,4 +1,4 @@
from typing import Optional, Any, Dict, List, Text, Tuple
from typing import Optional, Any, Dict, List, Tuple
from collections import defaultdict
SUBJECT_WITH_BRANCH_TEMPLATE = '{repo} / {branch}'
SUBJECT_WITH_PR_OR_ISSUE_INFO_TEMPLATE = '{repo} / {type} #{id} {title}'
@ -49,10 +49,10 @@ TAG_WITH_URL_TEMPLATE = "[{tag_name}]({tag_url})"
TAG_WITHOUT_URL_TEMPLATE = "{tag_name}"
def get_push_commits_event_message(user_name: Text, compare_url: Optional[Text],
branch_name: Text, commits_data: List[Dict[str, Any]],
def get_push_commits_event_message(user_name: str, compare_url: Optional[str],
branch_name: str, commits_data: List[Dict[str, Any]],
is_truncated: Optional[bool]=False,
deleted: Optional[bool]=False) -> Text:
deleted: Optional[bool]=False) -> str:
if not commits_data and deleted:
return PUSH_DELETE_BRANCH_MESSAGE_TEMPLATE.format(
user_name=user_name,
@ -99,7 +99,7 @@ def get_push_commits_event_message(user_name: Text, compare_url: Optional[Text],
commits_data=get_commits_content(commits_data, is_truncated),
).rstrip()
def get_force_push_commits_event_message(user_name: Text, url: Text, branch_name: Text, head: Text) -> Text:
def get_force_push_commits_event_message(user_name: str, url: str, branch_name: str, head: str) -> str:
return FORCE_PUSH_COMMITS_MESSAGE_TEMPLATE.format(
user_name=user_name,
url=url,
@ -107,23 +107,23 @@ def get_force_push_commits_event_message(user_name: Text, url: Text, branch_name
head=head
)
def get_create_branch_event_message(user_name: Text, url: Text, branch_name: Text) -> Text:
def get_create_branch_event_message(user_name: str, url: str, branch_name: str) -> str:
return CREATE_BRANCH_MESSAGE_TEMPLATE.format(
user_name=user_name,
url=url,
branch_name=branch_name,
)
def get_remove_branch_event_message(user_name: Text, branch_name: Text) -> Text:
def get_remove_branch_event_message(user_name: str, branch_name: str) -> str:
return REMOVE_BRANCH_MESSAGE_TEMPLATE.format(
user_name=user_name,
branch_name=branch_name,
)
def get_pull_request_event_message(user_name: Text, action: Text, url: Text, number: Optional[int]=None,
target_branch: Optional[Text]=None, base_branch: Optional[Text]=None,
message: Optional[Text]=None, assignee: Optional[Text]=None,
type: Optional[Text]='PR') -> Text:
def get_pull_request_event_message(user_name: str, action: str, url: str, number: Optional[int]=None,
target_branch: Optional[str]=None, base_branch: Optional[str]=None,
message: Optional[str]=None, assignee: Optional[str]=None,
type: Optional[str]='PR') -> str:
main_message = PULL_REQUEST_OR_ISSUE_MESSAGE_TEMPLATE.format(
user_name=user_name,
action=action,
@ -143,18 +143,18 @@ def get_pull_request_event_message(user_name: Text, action: Text, url: Text, num
main_message += '\n' + CONTENT_MESSAGE_TEMPLATE.format(message=message)
return main_message.rstrip()
def get_setup_webhook_message(integration: Text, user_name: Optional[Text]=None) -> Text:
def get_setup_webhook_message(integration: str, user_name: Optional[str]=None) -> str:
content = SETUP_MESSAGE_TEMPLATE.format(integration=integration)
if user_name:
content += SETUP_MESSAGE_USER_PART.format(user_name=user_name)
return content
def get_issue_event_message(user_name: Text,
action: Text,
url: Text,
def get_issue_event_message(user_name: str,
action: str,
url: str,
number: Optional[int]=None,
message: Optional[Text]=None,
assignee: Optional[Text]=None) -> Text:
message: Optional[str]=None,
assignee: Optional[str]=None) -> str:
return get_pull_request_event_message(
user_name,
action,
@ -165,10 +165,10 @@ def get_issue_event_message(user_name: Text,
type='Issue'
)
def get_push_tag_event_message(user_name: Text,
tag_name: Text,
tag_url: Optional[Text]=None,
action: Optional[Text]='pushed') -> Text:
def get_push_tag_event_message(user_name: str,
tag_name: str,
tag_url: Optional[str]=None,
action: Optional[str]='pushed') -> str:
if tag_url:
tag_part = TAG_WITH_URL_TEMPLATE.format(tag_name=tag_name, tag_url=tag_url)
else:
@ -179,11 +179,11 @@ def get_push_tag_event_message(user_name: Text,
tag=tag_part
)
def get_commits_comment_action_message(user_name: Text,
action: Text,
commit_url: Text,
sha: Text,
message: Optional[Text]=None) -> Text:
def get_commits_comment_action_message(user_name: str,
action: str,
commit_url: str,
sha: str,
message: Optional[str]=None) -> str:
content = COMMITS_COMMENT_MESSAGE_TEMPLATE.format(
user_name=user_name,
action=action,
@ -196,7 +196,7 @@ def get_commits_comment_action_message(user_name: Text,
)
return content
def get_commits_content(commits_data: List[Dict[str, Any]], is_truncated: Optional[bool]=False) -> Text:
def get_commits_content(commits_data: List[Dict[str, Any]], is_truncated: Optional[bool]=False) -> str:
commits_content = ''
for commit in commits_data[:COMMITS_LIMIT]:
commits_content += COMMIT_ROW_TEMPLATE.format(
@ -215,7 +215,7 @@ def get_commits_content(commits_data: List[Dict[str, Any]], is_truncated: Option
).replace(' ', ' ')
return commits_content.rstrip()
def get_short_sha(sha: Text) -> Text:
def get_short_sha(sha: str) -> str:
return sha[:7]
def get_all_committers(commits_data: List[Dict[str, Any]]) -> List[Tuple[str, int]]:

View File

@ -1,4 +1,3 @@
from typing import Text
from django.utils.translation import ugettext as _
@ -8,9 +7,9 @@ class BadEventQueueIdError(JsonableError):
code = ErrorCode.BAD_EVENT_QUEUE_ID
data_fields = ['queue_id']
def __init__(self, queue_id: Text) -> None:
self.queue_id = queue_id # type: Text
def __init__(self, queue_id: str) -> None:
self.queue_id = queue_id # type: str
@staticmethod
def msg_format() -> Text:
def msg_format() -> str:
return _("Bad event queue id: {queue_id}")