mypy: Convert more zerver/lib files to typing.Text.

This commit touches files in zerver/lib/bugdown/ and
zerver/lib/webhooks.
This commit is contained in:
Mikebarson 2016-12-05 06:05:33 +00:00 committed by Tim Abbott
parent c51d2c3d8f
commit 690d72d35f
4 changed files with 84 additions and 87 deletions

View File

@ -1,7 +1,7 @@
from __future__ import absolute_import
# Zulip's main markdown implementation. See docs/markdown.md for
# detailed documentation on our markdown syntax.
from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, TypeVar, Union
from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, TypeVar, Union, Text
from typing.re import Match
import markdown
@ -61,14 +61,14 @@ class BugdownRenderingException(Exception):
pass
def unescape(s):
# type: (text_type) -> (text_type)
# type: (Text) -> (Text)
if six.PY2:
return html_parser.HTMLParser().unescape(s)
else:
return html.unescape(s)
def list_of_tlds():
# type: () -> List[text_type]
# type: () -> List[Text]
# HACK we manually blacklist .py
blacklist = [u'PY\n', ]
@ -101,7 +101,7 @@ def walk_tree(root, processor, stop_after_first=False):
# height is not actually used
def add_a(root, url, link, height="", title=None, desc=None,
class_attr="message_inline_image", data_id=None):
# type: (Element, text_type, text_type, text_type, Optional[text_type], Optional[text_type], text_type, Optional[text_type]) -> None
# type: (Element, Text, Text, Text, Optional[Text], Optional[Text], Text, Optional[Text]) -> None
title = title if title is not None else url_filename(link)
title = title if title else ""
desc = desc if desc is not None else ""
@ -126,7 +126,7 @@ def add_a(root, url, link, height="", title=None, desc=None,
@cache_with_key(lambda tweet_id: tweet_id, cache_name="database", with_statsd_key="tweet_data")
def fetch_tweet_data(tweet_id):
# type: (text_type) -> Optional[Dict[text_type, Any]]
# type: (Text) -> Optional[Dict[Text, Any]]
if settings.TEST_SUITE:
from . import testing_mocks
res = testing_mocks.twitter(tweet_id)
@ -186,7 +186,7 @@ META_START_RE = re.compile(u'^meta[ >]')
META_END_RE = re.compile(u'^/meta[ >]')
def fetch_open_graph_image(url):
# type: (text_type) -> Optional[Dict[str, Any]]
# type: (Text) -> Optional[Dict[str, Any]]
in_head = False
# HTML will auto close meta tags, when we start the next tag add a closing tag if it has not been closed yet.
last_closed = True
@ -252,7 +252,7 @@ def fetch_open_graph_image(url):
return {'image': image, 'title': title, 'desc': desc}
def get_tweet_id(url):
# type: (text_type) -> Optional[text_type]
# type: (Text) -> Optional[Text]
parsed_url = urllib.parse.urlparse(url)
if not (parsed_url.netloc == 'twitter.com' or parsed_url.netloc.endswith('.twitter.com')):
return None
@ -289,7 +289,7 @@ class InlineInterestingLinkProcessor(markdown.treeprocessors.Treeprocessor):
markdown.treeprocessors.Treeprocessor.__init__(self, md)
def is_image(self, url):
# type: (text_type) -> bool
# type: (Text) -> bool
if not settings.INLINE_IMAGE_PREVIEW:
return False
parsed_url = urllib.parse.urlparse(url)
@ -300,7 +300,7 @@ class InlineInterestingLinkProcessor(markdown.treeprocessors.Treeprocessor):
return False
def dropbox_image(self, url):
# type: (text_type) -> Optional[Dict]
# type: (Text) -> Optional[Dict]
# TODO: specify details of returned Dict
parsed_url = urllib.parse.urlparse(url)
if (parsed_url.netloc == 'dropbox.com' or parsed_url.netloc.endswith('.dropbox.com')):
@ -346,7 +346,7 @@ class InlineInterestingLinkProcessor(markdown.treeprocessors.Treeprocessor):
return None
def youtube_id(self, url):
# type: (text_type) -> Optional[text_type]
# type: (Text) -> Optional[Text]
if not settings.INLINE_IMAGE_PREVIEW:
return None
# Youtube video id extraction regular expression from http://pastebin.com/KyKAFv1s
@ -360,14 +360,14 @@ class InlineInterestingLinkProcessor(markdown.treeprocessors.Treeprocessor):
return match.group(2)
def youtube_image(self, url):
# type: (text_type) -> Optional[text_type]
# type: (Text) -> Optional[Text]
yt_id = self.youtube_id(url)
if yt_id is not None:
return "https://i.ytimg.com/vi/%s/default.jpg" % (yt_id,)
def twitter_text(self, text, urls, user_mentions, media):
# type: (text_type, List[Dict[text_type, text_type]], List[Dict[text_type, Any]], List[Dict[text_type, Any]]) -> Element
# type: (Text, List[Dict[Text, Text]], List[Dict[Text, Any]], List[Dict[Text, Any]]) -> Element
"""
Use data from the twitter API to turn links, mentions and media into A
tags.
@ -388,7 +388,7 @@ class InlineInterestingLinkProcessor(markdown.treeprocessors.Treeprocessor):
Finally we add any remaining text to the last node.
"""
to_linkify = [] # type: List[Dict[text_type, Any]]
to_linkify = [] # type: List[Dict[Text, Any]]
# Build dicts for URLs
for url_data in urls:
short_url = url_data["url"]
@ -427,7 +427,7 @@ class InlineInterestingLinkProcessor(markdown.treeprocessors.Treeprocessor):
p = current_node = markdown.util.etree.Element('p')
def set_text(text):
# type: (text_type) -> None
# type: (Text) -> None
"""
Helper to set the text or the tail of the current_node
"""
@ -453,7 +453,7 @@ class InlineInterestingLinkProcessor(markdown.treeprocessors.Treeprocessor):
return p
def twitter_link(self, url):
# type: (text_type) -> Optional[Element]
# type: (Text) -> Optional[Element]
tweet_id = get_tweet_id(url)
if tweet_id is None:
@ -463,7 +463,7 @@ class InlineInterestingLinkProcessor(markdown.treeprocessors.Treeprocessor):
res = fetch_tweet_data(tweet_id)
if res is None:
return None
user = res['user'] # type: Dict[text_type, Any]
user = res['user'] # type: Dict[Text, Any]
tweet = markdown.util.etree.Element("div")
tweet.set("class", "twitter-tweet")
img_a = markdown.util.etree.SubElement(tweet, 'a')
@ -482,7 +482,7 @@ class InlineInterestingLinkProcessor(markdown.treeprocessors.Treeprocessor):
text = unescape(res['text'])
urls = res.get('urls', [])
user_mentions = res.get('user_mentions', [])
media = res.get('media', []) # type: List[Dict[text_type, Any]]
media = res.get('media', []) # type: List[Dict[Text, Any]]
p = self.twitter_text(text, urls, user_mentions, media)
tweet.append(p)
@ -523,7 +523,7 @@ class InlineInterestingLinkProcessor(markdown.treeprocessors.Treeprocessor):
return None
def get_url_data(self, e):
# type: (Element) -> Optional[Tuple[text_type, text_type]]
# type: (Element) -> Optional[Tuple[Text, Text]]
if e.tag == "a":
if e.text is not None:
return (e.get("href"), force_text(e.text))
@ -579,7 +579,7 @@ class InlineInterestingLinkProcessor(markdown.treeprocessors.Treeprocessor):
class Avatar(markdown.inlinepatterns.Pattern):
def handleMatch(self, match):
# type: (Match[text_type]) -> Optional[Element]
# type: (Match[Text]) -> Optional[Element]
img = markdown.util.etree.Element('img')
email_address = match.group('email')
img.set('class', 'message_body_gravatar')
@ -597,7 +597,7 @@ unicode_emoji_list = [os.path.splitext(os.path.basename(fn))[0] for fn in glob.g
def make_emoji(emoji_name, src, display_string):
# type: (text_type, text_type, text_type) -> Element
# type: (Text, Text, Text) -> Element
elt = markdown.util.etree.Element('img')
elt.set('src', src)
elt.set('class', 'emoji')
@ -607,7 +607,7 @@ def make_emoji(emoji_name, src, display_string):
class UnicodeEmoji(markdown.inlinepatterns.Pattern):
def handleMatch(self, match):
# type: (Match[text_type]) -> Optional[Element]
# type: (Match[Text]) -> Optional[Element]
orig_syntax = match.group('syntax')
name = hex(ord(orig_syntax))[2:]
if name in unicode_emoji_list:
@ -618,11 +618,11 @@ class UnicodeEmoji(markdown.inlinepatterns.Pattern):
class Emoji(markdown.inlinepatterns.Pattern):
def handleMatch(self, match):
# type: (Match[text_type]) -> Optional[Element]
# type: (Match[Text]) -> Optional[Element]
orig_syntax = match.group("syntax")
name = orig_syntax[1:-1]
realm_emoji = {} # type: Dict[text_type, Dict[str, text_type]]
realm_emoji = {} # type: Dict[Text, Dict[str, Text]]
if db_data is not None:
realm_emoji = db_data['emoji']
@ -638,7 +638,7 @@ class StreamSubscribeButton(markdown.inlinepatterns.Pattern):
# This markdown extension has required javascript in
# static/js/custom_markdown.js
def handleMatch(self, match):
# type: (Match[text_type]) -> Element
# type: (Match[Text]) -> Element
stream_name = match.group('stream_name')
stream_name = stream_name.replace('\\)', ')').replace('\\\\', '\\')
@ -661,7 +661,7 @@ class ModalLink(markdown.inlinepatterns.Pattern):
"""
def handleMatch(self, match):
# type: (Match[text_type]) -> Element
# type: (Match[Text]) -> Element
relative_url = match.group('relative_url')
text = match.group('text')
@ -675,7 +675,7 @@ class ModalLink(markdown.inlinepatterns.Pattern):
upload_title_re = re.compile(u"^(https?://[^/]*)?(/user_uploads/\\d+)(/[^/]*)?/[^/]*/(?P<filename>[^/]*)$")
def url_filename(url):
# type: (text_type) -> text_type
# type: (Text) -> Text
"""Extract the filename if a URL is an uploaded file, or return the original URL"""
match = upload_title_re.match(url)
if match:
@ -692,7 +692,7 @@ def fixup_link(link, target_blank=True):
def sanitize_url(url):
# type: (text_type) -> text_type
# type: (Text) -> Text
"""
Sanitize a url against xss attacks.
See the docstring on markdown.inlinepatterns.LinkPattern.sanitize_url.
@ -747,7 +747,7 @@ def sanitize_url(url):
return urllib.parse.urlunparse((scheme, netloc, path, params, query, fragment))
def url_to_a(url, text = None):
# type: (text_type, Optional[text_type]) -> Union[Element, text_type]
# type: (Text, Optional[Text]) -> Union[Element, Text]
a = markdown.util.etree.Element('a')
href = sanitize_url(url)
@ -764,7 +764,7 @@ def url_to_a(url, text = None):
class VerbosePattern(markdown.inlinepatterns.Pattern):
def __init__(self, pattern):
# type: (text_type) -> None
# type: (Text) -> None
markdown.inlinepatterns.Pattern.__init__(self, ' ')
# HACK: we just had python-markdown compile an empty regex.
@ -776,7 +776,7 @@ class VerbosePattern(markdown.inlinepatterns.Pattern):
class AutoLink(VerbosePattern):
def handleMatch(self, match):
# type: (Match[text_type]) -> ElementStringNone
# type: (Match[Text]) -> ElementStringNone
url = match.group('url')
return url_to_a(url)
@ -801,7 +801,7 @@ class BugdownUListPreprocessor(markdown.preprocessors.Preprocessor):
HANGING_ULIST_RE = re.compile(u'^.+\\n([ ]{0,3}[*][ ]+.*)', re.MULTILINE)
def run(self, lines):
# type: (List[text_type]) -> List[text_type]
# type: (List[Text]) -> List[Text]
""" Insert a newline between a paragraph and ulist if missing """
inserts = 0
fence = None
@ -829,7 +829,7 @@ class LinkPattern(markdown.inlinepatterns.Pattern):
""" Return a link element from the given match. """
def handleMatch(self, m):
# type: (Match[text_type]) -> Optional[Element]
# type: (Match[Text]) -> Optional[Element]
href = m.group(9)
if not href:
return None
@ -847,7 +847,7 @@ class LinkPattern(markdown.inlinepatterns.Pattern):
return el
def prepare_realm_pattern(source):
# type: (text_type) -> text_type
# type: (Text) -> Text
""" Augment a realm filter so it only matches after start-of-string,
whitespace, or opening delimiters, won't match if there are word
characters directly after, and saves what was matched as "name". """
@ -859,19 +859,19 @@ class RealmFilterPattern(markdown.inlinepatterns.Pattern):
""" Applied a given realm filter to the input """
def __init__(self, source_pattern, format_string, markdown_instance=None):
# type: (text_type, text_type, Optional[markdown.Markdown]) -> None
# type: (Text, Text, Optional[markdown.Markdown]) -> None
self.pattern = prepare_realm_pattern(source_pattern)
self.format_string = format_string
markdown.inlinepatterns.Pattern.__init__(self, self.pattern, markdown_instance)
def handleMatch(self, m):
# type: (Match[text_type]) -> Union[Element, text_type]
# type: (Match[Text]) -> Union[Element, Text]
return url_to_a(self.format_string % m.groupdict(),
m.group("name"))
class UserMentionPattern(markdown.inlinepatterns.Pattern):
def find_user_for_mention(self, name):
# type: (text_type) -> Tuple[bool, Dict[str, Any]]
# type: (Text) -> Tuple[bool, Dict[str, Any]]
if db_data is None:
return (False, None)
@ -885,7 +885,7 @@ class UserMentionPattern(markdown.inlinepatterns.Pattern):
return (False, user)
def handleMatch(self, m):
# type: (Match[text_type]) -> Optional[Element]
# type: (Match[Text]) -> Optional[Element]
name = m.group(2) or m.group(3)
if current_message:
@ -911,14 +911,14 @@ class UserMentionPattern(markdown.inlinepatterns.Pattern):
class StreamPattern(VerbosePattern):
def find_stream_by_name(self, name):
# type: (Match[text_type]) -> Dict[str, Any]
# type: (Match[Text]) -> Dict[str, Any]
if db_data is None:
return None
stream = db_data['stream_names'].get(name)
return stream
def handleMatch(self, m):
# type: (Match[text_type]) -> Optional[Element]
# type: (Match[Text]) -> Optional[Element]
name = m.group('stream_name')
if current_message:
@ -940,7 +940,7 @@ class StreamPattern(VerbosePattern):
class AlertWordsNotificationProcessor(markdown.preprocessors.Preprocessor):
def run(self, lines):
# type: (Iterable[text_type]) -> Iterable[text_type]
# type: (Iterable[Text]) -> Iterable[Text]
if current_message and db_data is not None:
# We check for custom alert words here, the set of which are
# dependent on which users may see this message.
@ -972,7 +972,7 @@ class AlertWordsNotificationProcessor(markdown.preprocessors.Preprocessor):
# might be worth sending a version of this change upstream.
class AtomicLinkPattern(LinkPattern):
def handleMatch(self, m):
# type: (Match[text_type]) -> Optional[Element]
# type: (Match[Text]) -> Optional[Element]
ret = LinkPattern.handleMatch(self, m)
if ret is None:
return None
@ -982,7 +982,7 @@ class AtomicLinkPattern(LinkPattern):
class Bugdown(markdown.Extension):
def __init__(self, *args, **kwargs):
# type: (*Any, **Union[bool, None, text_type]) -> None
# type: (*Any, **Union[bool, None, Text]) -> None
# define default configs
self.config = {
"realm_filters": [kwargs['realm_filters'], "Realm-specific filters for domain"],
@ -1140,7 +1140,7 @@ class Bugdown(markdown.Extension):
del md.parser.blockprocessors[k]
md_engines = {}
realm_filter_data = {} # type: Dict[text_type, List[Tuple[text_type, text_type, int]]]
realm_filter_data = {} # type: Dict[Text, List[Tuple[Text, Text, int]]]
class EscapeHtml(markdown.Extension):
def extendMarkdown(self, md, md_globals):
@ -1149,7 +1149,7 @@ class EscapeHtml(markdown.Extension):
del md.inlinePatterns['html']
def make_md_engine(key, opts):
# type: (text_type, Dict[str, Any]) -> None
# type: (Text, Dict[str, Any]) -> None
md_engines[key] = markdown.Markdown(
output_format = 'html',
extensions = [
@ -1165,9 +1165,9 @@ def make_md_engine(key, opts):
realm=opts["realm"][0])])
def subject_links(domain, subject):
# type: (text_type, text_type) -> List[text_type]
# type: (Text, Text) -> List[Text]
from zerver.models import get_realm, RealmFilter, realm_filters_for_domain
matches = [] # type: List[text_type]
matches = [] # type: List[Text]
realm_filters = realm_filters_for_domain(domain)
@ -1178,7 +1178,7 @@ def subject_links(domain, subject):
return matches
def make_realm_filters(domain, filters):
# type: (text_type, List[Tuple[text_type, text_type, int]]) -> None
# type: (Text, List[Tuple[Text, Text, int]]) -> None
global md_engines, realm_filter_data
if domain in md_engines:
del md_engines[domain]
@ -1190,7 +1190,7 @@ def make_realm_filters(domain, filters):
"realm": [domain, "Realm name"]})
def maybe_update_realm_filters(domain):
# type: (Optional[text_type]) -> None
# type: (Optional[Text]) -> None
from zerver.models import realm_filters_for_domain, all_realm_filters
# If domain is None, load all filters
@ -1215,7 +1215,7 @@ def maybe_update_realm_filters(domain):
# codes, which can do surprisingly nasty things.
_privacy_re = re.compile(u'\\w', flags=re.UNICODE)
def _sanitize_for_log(content):
# type: (text_type) -> text_type
# type: (Text) -> Text
return repr(_privacy_re.sub('x', content))
@ -1226,7 +1226,7 @@ current_message = None # type: Optional[Message]
# We avoid doing DB queries in our markdown thread to avoid the overhead of
# opening a new DB connection. These connections tend to live longer than the
# threads themselves, as well.
db_data = None # type: Dict[text_type, Any]
db_data = None # type: Dict[Text, Any]
def log_bugdown_error(msg):
# type: (str) -> None
@ -1237,7 +1237,7 @@ def log_bugdown_error(msg):
logging.getLogger('').error(msg)
def do_convert(content, realm_domain=None, message=None, possible_words=None):
# type: (text_type, Optional[text_type], Optional[Message], Optional[Set[text_type]]) -> Optional[text_type]
# type: (Text, Optional[Text], Optional[Message], Optional[Set[Text]]) -> Optional[Text]
"""Convert Markdown to HTML, with Zulip-specific settings and hacks."""
from zerver.models import get_active_user_dicts_in_realm, get_active_streams, UserProfile
@ -1264,7 +1264,7 @@ def do_convert(content, realm_domain=None, message=None, possible_words=None):
realm_streams = get_active_streams(message.get_realm()).values('id', 'name')
if possible_words is None:
possible_words = set() # Set[text_type]
possible_words = set() # Set[Text]
db_data = {'possible_words': possible_words,
'full_names': dict((user['full_name'].lower(), user) for user in realm_users),
@ -1323,7 +1323,7 @@ def bugdown_stats_finish():
bugdown_total_time += (time.time() - bugdown_time_start)
def convert(content, realm_domain=None, message=None, possible_words=None):
# type: (text_type, Optional[text_type], Optional[Message], Optional[Set[text_type]]) -> Optional[text_type]
# type: (Text, Optional[Text], Optional[Message], Optional[Set[Text]]) -> Optional[Text]
bugdown_stats_start()
ret = do_convert(content, realm_domain, message, possible_words)
bugdown_stats_finish()

View File

@ -64,8 +64,7 @@ Dependencies:
import re
import markdown
from markdown.extensions.codehilite import CodeHilite, CodeHiliteExtension
from six import text_type
from typing import Any, Dict, Iterable, List, MutableSequence, Optional, Tuple, Union
from typing import Any, Dict, Iterable, List, MutableSequence, Optional, Tuple, Union, Text
# Global vars
FENCE_RE = re.compile(u"""
@ -118,14 +117,14 @@ class FencedBlockPreprocessor(markdown.preprocessors.Preprocessor):
self.codehilite_conf = {} # type: Dict[str, List[Any]]
def run(self, lines):
# type: (Iterable[text_type]) -> List[text_type]
# type: (Iterable[Text]) -> List[Text]
""" Match and store Fenced Code Blocks in the HtmlStash. """
output = [] # type: List[text_type]
output = [] # type: List[Text]
class BaseHandler(object):
def handle_line(self, line):
# type: (text_type) -> None
# type: (Text) -> None
raise NotImplementedError()
def done(self):
@ -144,7 +143,7 @@ class FencedBlockPreprocessor(markdown.preprocessors.Preprocessor):
handlers.pop()
def check_for_new_fence(output, line):
# type: (MutableSequence[text_type], text_type) -> None
# type: (MutableSequence[Text], Text) -> None
m = FENCE_RE.match(line)
if m:
fence = m.group('fence')
@ -156,11 +155,11 @@ class FencedBlockPreprocessor(markdown.preprocessors.Preprocessor):
class OuterHandler(BaseHandler):
def __init__(self, output):
# type: (MutableSequence[text_type]) -> None
# type: (MutableSequence[Text]) -> None
self.output = output
def handle_line(self, line):
# type: (text_type) -> None
# type: (Text) -> None
check_for_new_fence(self.output, line)
def done(self):
@ -168,7 +167,7 @@ class FencedBlockPreprocessor(markdown.preprocessors.Preprocessor):
pop()
def generic_handler(output, fence, lang):
# type: (MutableSequence[text_type], text_type, text_type) -> BaseHandler
# type: (MutableSequence[Text], Text, Text) -> BaseHandler
if lang in ('quote', 'quoted'):
return QuoteHandler(output, fence)
else:
@ -176,13 +175,13 @@ class FencedBlockPreprocessor(markdown.preprocessors.Preprocessor):
class QuoteHandler(BaseHandler):
def __init__(self, output, fence):
# type: (MutableSequence[text_type], text_type) -> None
# type: (MutableSequence[Text], Text) -> None
self.output = output
self.fence = fence
self.lines = [] # type: List[text_type]
self.lines = [] # type: List[Text]
def handle_line(self, line):
# type: (text_type) -> None
# type: (Text) -> None
if line.rstrip() == self.fence:
self.done()
else:
@ -200,14 +199,14 @@ class FencedBlockPreprocessor(markdown.preprocessors.Preprocessor):
class CodeHandler(BaseHandler):
def __init__(self, output, fence, lang):
# type: (MutableSequence[text_type], text_type, text_type) -> None
# type: (MutableSequence[Text], Text, Text) -> None
self.output = output
self.fence = fence
self.lang = lang
self.lines = [] # type: List[text_type]
self.lines = [] # type: List[Text]
def handle_line(self, line):
# type: (text_type) -> None
# type: (Text) -> None
if line.rstrip() == self.fence:
self.done()
else:
@ -241,7 +240,7 @@ class FencedBlockPreprocessor(markdown.preprocessors.Preprocessor):
return output
def format_code(self, lang, text):
# type: (text_type, text_type) -> text_type
# type: (Text, Text) -> Text
if lang:
langclass = LANG_TAG % (lang,)
else:
@ -275,7 +274,7 @@ class FencedBlockPreprocessor(markdown.preprocessors.Preprocessor):
return code
def format_quote(self, text):
# type: (text_type) -> text_type
# type: (Text) -> Text
paragraphs = text.split("\n\n")
quoted_paragraphs = []
for paragraph in paragraphs:
@ -284,11 +283,11 @@ class FencedBlockPreprocessor(markdown.preprocessors.Preprocessor):
return "\n\n".join(quoted_paragraphs)
def placeholder(self, code):
# type: (text_type) -> text_type
# type: (Text) -> Text
return self.markdown.htmlStash.store(code, safe=True)
def _escape(self, txt):
# type: (text_type) -> text_type
# type: (Text) -> Text
""" basic html escaping """
txt = txt.replace('&', '&amp;')
txt = txt.replace('<', '&lt;')
@ -298,7 +297,7 @@ class FencedBlockPreprocessor(markdown.preprocessors.Preprocessor):
def makeExtension(*args, **kwargs):
# type: (*Any, **Union[bool, None, text_type]) -> FencedCodeExtension
# type: (*Any, **Union[bool, None, Text]) -> FencedCodeExtension
return FencedCodeExtension(*args, **kwargs)
if __name__ == "__main__":

View File

@ -1,8 +1,7 @@
from __future__ import absolute_import
from __future__ import unicode_literals
from six import text_type
from typing import Any, Dict, Optional
from typing import Any, Dict, Optional, Text
import ujson
@ -183,7 +182,7 @@ MEDIA_TWEET = """{
def twitter(tweet_id):
# type: (text_type) -> Optional[Dict[text_type, Any]]
# type: (Text) -> Optional[Dict[Text, Any]]
if tweet_id in ["112652479837110273", "287977969287315456", "287977969287315457"]:
return ujson.loads(NORMAL_TWEET)
elif tweet_id == "287977969287315458":

View File

@ -1,5 +1,4 @@
from six import text_type
from typing import Optional, Any
from typing import Optional, Any, Text
SUBJECT_WITH_BRANCH_TEMPLATE = u'{repo} / {branch}'
SUBJECT_WITH_PR_OR_ISSUE_INFO_TEMPLATE = u'{repo} / {type} #{id} {title}'
@ -33,7 +32,7 @@ TAG_WITH_URL_TEMPLATE = u"[{tag_name}]({tag_url})"
TAG_WITHOUT_URL_TEMPLATE = u"{tag_name}"
def get_push_commits_event_message(user_name, compare_url, branch_name, commits_data, is_truncated=False):
# type: (text_type, Optional[text_type], text_type, List[Dict[str, Any]], Optional[bool]) -> text_type
# type: (Text, Optional[Text], Text, List[Dict[str, Any]], Optional[bool]) -> Text
if compare_url:
pushed_text_message = PUSH_PUSHED_TEXT_WITH_URL.format(compare_url=compare_url)
else:
@ -47,7 +46,7 @@ def get_push_commits_event_message(user_name, compare_url, branch_name, commits_
).rstrip()
def get_force_push_commits_event_message(user_name, url, branch_name, head):
# type: (text_type, text_type, text_type, text_type) -> text_type
# type: (Text, Text, Text, Text) -> Text
return FORCE_PUSH_COMMITS_MESSAGE_TEMPLATE.format(
user_name=user_name,
url=url,
@ -56,7 +55,7 @@ def get_force_push_commits_event_message(user_name, url, branch_name, head):
)
def get_remove_branch_event_message(user_name, branch_name):
# type: (text_type, text_type) -> text_type
# type: (Text, Text) -> Text
return REMOVE_BRANCH_MESSAGE_TEMPLATE.format(
user_name=user_name,
branch_name=branch_name,
@ -67,7 +66,7 @@ def get_pull_request_event_message(
target_branch=None, base_branch=None,
message=None, assignee=None, type='PR'
):
# type: (text_type, text_type, text_type, Optional[int], Optional[text_type], Optional[text_type], Optional[text_type], Optional[text_type], Optional[text_type]) -> text_type
# type: (Text, Text, Text, Optional[int], Optional[Text], Optional[Text], Optional[Text], Optional[Text], Optional[Text]) -> Text
main_message = PULL_REQUEST_OR_ISSUE_MESSAGE_TEMPLATE.format(
user_name=user_name,
action=action,
@ -88,7 +87,7 @@ def get_pull_request_event_message(
return main_message.rstrip()
def get_issue_event_message(user_name, action, url, number=None, message=None, assignee=None):
# type: (text_type, text_type, text_type, Optional[int], Optional[text_type], Optional[text_type]) -> text_type
# type: (Text, Text, Text, Optional[int], Optional[Text], Optional[Text]) -> Text
return get_pull_request_event_message(
user_name,
action,
@ -100,7 +99,7 @@ def get_issue_event_message(user_name, action, url, number=None, message=None, a
)
def get_push_tag_event_message(user_name, tag_name, tag_url=None, action='pushed'):
# type: (text_type, text_type, Optional[text_type], Optional[text_type]) -> text_type
# type: (Text, Text, Optional[Text], Optional[Text]) -> Text
if tag_url:
tag_part = TAG_WITH_URL_TEMPLATE.format(tag_name=tag_name, tag_url=tag_url)
else:
@ -112,7 +111,7 @@ def get_push_tag_event_message(user_name, tag_name, tag_url=None, action='pushed
)
def get_commits_comment_action_message(user_name, action, commit_url, sha, message=None):
# type: (text_type, text_type, text_type, text_type, Optional[text_type]) -> text_type
# type: (Text, Text, Text, Text, Optional[Text]) -> Text
content = COMMITS_COMMENT_MESSAGE_TEMPLATE.format(
user_name=user_name,
action=action,
@ -126,7 +125,7 @@ def get_commits_comment_action_message(user_name, action, commit_url, sha, messa
return content
def get_commits_content(commits_data, is_truncated=False):
# type: (List[Dict[str, Any]], Optional[bool]) -> text_type
# type: (List[Dict[str, Any]], Optional[bool]) -> Text
commits_content = u''
for commit in commits_data[:COMMITS_LIMIT]:
commits_content += COMMIT_ROW_TEMPLATE.format(
@ -146,5 +145,5 @@ def get_commits_content(commits_data, is_truncated=False):
return commits_content.rstrip()
def get_short_sha(sha):
# type: (text_type) -> text_type
# type: (Text) -> Text
return sha[:7]