# Zulip's main Markdown implementation. See docs/subsystems/markdown.md for # detailed documentation on our Markdown syntax. import html import logging import mimetypes import re import time from collections import deque from collections.abc import Callable from dataclasses import dataclass from datetime import datetime, timezone from email.message import EmailMessage from functools import lru_cache from re import Match, Pattern from typing import Any, Generic, Optional, TypeAlias, TypedDict, TypeVar, cast from urllib.parse import parse_qs, quote, urljoin, urlsplit, urlunsplit from xml.etree.ElementTree import Element, SubElement import ahocorasick import dateutil.parser import dateutil.tz import lxml.etree import markdown import markdown.blockprocessors import markdown.inlinepatterns import markdown.postprocessors import markdown.preprocessors import markdown.treeprocessors import markdown.util import re2 import regex import requests import uri_template import urllib3.exceptions from django.conf import settings from markdown.blockparser import BlockParser from markdown.extensions import codehilite, nl2br, sane_lists, tables from tlds import tld_set from typing_extensions import Self, override from zerver.lib import mention from zerver.lib.cache import cache_with_key from zerver.lib.camo import get_camo_url from zerver.lib.emoji import EMOTICON_RE, codepoint_to_name, name_to_codepoint, translate_emoticons from zerver.lib.emoji_utils import emoji_to_hex_codepoint, unqualify_emoji from zerver.lib.exceptions import MarkdownRenderingError from zerver.lib.markdown import fenced_code from zerver.lib.markdown.fenced_code import FENCE_RE from zerver.lib.mention import ( BEFORE_MENTION_ALLOWED_REGEX, FullNameInfo, MentionBackend, MentionData, ) from zerver.lib.outgoing_http import OutgoingSession from zerver.lib.subdomains import is_static_or_current_realm_url from zerver.lib.tex import render_tex from zerver.lib.thumbnail import ( MarkdownImageMetadata, get_user_upload_previews, rewrite_thumbnailed_images, ) from zerver.lib.timeout import unsafe_timeout from zerver.lib.timezone import common_timezones from zerver.lib.types import LinkifierDict from zerver.lib.url_encoding import encode_stream, hash_util_encode from zerver.lib.url_preview.types import UrlEmbedData, UrlOEmbedData from zerver.models import Message, Realm from zerver.models.linkifiers import linkifiers_for_realm from zerver.models.realm_emoji import EmojiInfo, get_name_keyed_dict_for_active_realm_emoji ReturnT = TypeVar("ReturnT") # Taken from # https://html.spec.whatwg.org/multipage/system-state.html#safelisted-scheme html_safelisted_schemes = ( "bitcoin", "geo", "im", "irc", "ircs", "magnet", "mailto", "matrix", "mms", "news", "nntp", "openpgp4fpr", "sip", "sms", "smsto", "ssh", "tel", "urn", "webcal", "wtai", "xmpp", ) allowed_schemes = ("http", "https", "ftp", "file", *html_safelisted_schemes) class LinkInfo(TypedDict): parent: Element title: str | None index: int | None remove: Element | None @dataclass class MessageRenderingResult: rendered_content: str mentions_topic_wildcard: bool mentions_stream_wildcard: bool mentions_user_ids: set[int] mentions_user_group_ids: set[int] alert_words: set[str] links_for_preview: set[str] user_ids_with_alert_words: set[int] potential_attachment_path_ids: list[str] thumbnail_spinners: set[str] @dataclass class DbData: mention_data: MentionData realm_url: str realm_alert_words_automaton: ahocorasick.Automaton | None active_realm_emoji: dict[str, EmojiInfo] sent_by_bot: bool stream_names: dict[str, int] translate_emoticons: bool user_upload_previews: dict[str, MarkdownImageMetadata] # Format version of the Markdown rendering; stored along with rendered # messages so that we can efficiently determine what needs to be re-rendered version = 1 _T = TypeVar("_T") ElementStringNone: TypeAlias = Element | str | None EMOJI_REGEX = r"(?P:[\w\-\+]+:)" def verbose_compile(pattern: str) -> Pattern[str]: return re.compile( rf"^(.*?){pattern}(.*?)$", re.DOTALL | re.VERBOSE, ) STREAM_LINK_REGEX = rf""" {BEFORE_MENTION_ALLOWED_REGEX} # Start after whitespace or specified chars \#\*\* # and after hash sign followed by double asterisks (?P[^\*]+) # stream name can contain anything \*\* # ends by double asterisks """ @lru_cache(None) def get_compiled_stream_link_regex() -> Pattern[str]: # Not using verbose_compile as it adds ^(.*?) and # (.*?)$ which cause extra overhead of matching # pattern which is not required. # With new InlineProcessor these extra patterns # are not required. return re.compile( STREAM_LINK_REGEX, re.DOTALL | re.VERBOSE, ) STREAM_TOPIC_LINK_REGEX = rf""" {BEFORE_MENTION_ALLOWED_REGEX} # Start after whitespace or specified chars \#\*\* # and after hash sign followed by double asterisks (?P[^\*>]+) # stream name can contain anything except > > # > acts as separator (?P[^\*]+) # topic name can contain anything \*\* # ends by double asterisks """ @lru_cache(None) def get_compiled_stream_topic_link_regex() -> Pattern[str]: # Not using verbose_compile as it adds ^(.*?) and # (.*?)$ which cause extra overhead of matching # pattern which is not required. # With new InlineProcessor these extra patterns # are not required. return re.compile( STREAM_TOPIC_LINK_REGEX, re.DOTALL | re.VERBOSE, ) STREAM_TOPIC_MESSAGE_LINK_REGEX = rf""" {BEFORE_MENTION_ALLOWED_REGEX} # Start after whitespace or specified chars \#\*\* # and after hash sign followed by double asterisks (?P[^\*>]+) # stream name can contain anything except > > # > acts as separator (?P[^\*]+) # topic name can contain anything @ (?P\d+) # message id \*\* # ends by double asterisks """ @lru_cache(None) def get_compiled_stream_topic_message_link_regex() -> Pattern[str]: # Not using verbose_compile as it adds ^(.*?) and # (.*?)$ which cause extra overhead of matching # pattern which is not required. # With new InlineProcessor these extra patterns # are not required. return re.compile( STREAM_TOPIC_MESSAGE_LINK_REGEX, re.DOTALL | re.VERBOSE, ) @lru_cache(None) def get_web_link_regex() -> Pattern[str]: # We create this one time, but not at startup. So the # first message rendered in any process will have some # extra costs. It's roughly 75ms to run this code, so # caching the value is super important here. tlds = r"|".join(list_of_tlds()) # A link starts at a word boundary, and ends at space, punctuation, or end-of-input. # # We detect a URL either by the `https?://` or by building around the TLD. # In lieu of having a recursive regex (which python doesn't support) to match # arbitrary numbers of nested matching parenthesis, we manually build a regexp that # can match up to six # The inner_paren_contents chunk matches the innermore non-parenthesis-holding text, # and the paren_group matches text with, optionally, a matching set of parens inner_paren_contents = r"[^\s()\"]*" paren_group = r""" [^\s()\"]*? # Containing characters that won't end the URL (?: \( %s \) # and more characters in matched parens [^\s()\"]*? # followed by more characters )* # zero-or-more sets of paired parens """ nested_paren_chunk = paren_group for i in range(6): nested_paren_chunk %= (paren_group,) nested_paren_chunk %= (inner_paren_contents,) file_links = r"| (?:file://(/[^/ ]*)+/?)" if settings.ENABLE_FILE_LINKS else r"" REGEX = rf""" (? # Main group (?:(?: # Domain part https?://[\w.:@-]+? # If it has a protocol, anything goes. |(?: # Or, if not, be more strict to avoid false-positives (?:[\w-]+\.)+ # One or more domain components, separated by dots (?:{tlds}) # TLDs ) ) (?:/ # A path, beginning with / {nested_paren_chunk} # zero-to-6 sets of paired parens )?) # Path is optional | (?:[\w.-]+\@[\w.-]+\.[\w]+) # Email is separate, since it can't have a path {file_links} # File path start with file:///, enable by setting ENABLE_FILE_LINKS=True | (?:bitcoin:[13][a-km-zA-HJ-NP-Z1-9]{{25,34}}) # Bitcoin address pattern, see https://mokagio.github.io/tech-journal/2014/11/21/regex-bitcoin.html ) (?= # URL must be followed by (not included in group) [!:;\?\),\.\'\"\>]* # Optional punctuation characters (?:\Z|\s) # followed by whitespace or end of string ) """ return verbose_compile(REGEX) def clear_web_link_regex_for_testing() -> None: # The link regex never changes in production, but our tests # try out both sides of ENABLE_FILE_LINKS, so we need # a way to clear it. get_web_link_regex.cache_clear() markdown_logger = logging.getLogger() def rewrite_local_links_to_relative(db_data: DbData | None, link: str) -> str: """If the link points to a local destination (e.g. #narrow/...), generate a relative link that will open it in the current window. """ if db_data: realm_url_prefix = db_data.realm_url + "/" if link.startswith((realm_url_prefix + "#", realm_url_prefix + "user_uploads/")): return link.removeprefix(realm_url_prefix) return link def url_embed_preview_enabled( message: Message | None = None, realm: Realm | None = None, no_previews: bool = False ) -> bool: if not settings.INLINE_URL_EMBED_PREVIEW: return False if no_previews: return False if realm is None and message is not None: realm = message.get_realm() if realm is None: # realm can be None for odd use cases # like generating documentation or running # test code return True return realm.inline_url_embed_preview def image_preview_enabled( message: Message | None = None, realm: Realm | None = None, no_previews: bool = False ) -> bool: if not settings.INLINE_IMAGE_PREVIEW: return False if no_previews: return False if realm is None and message is not None: realm = message.get_realm() if realm is None: # realm can be None for odd use cases # like generating documentation or running # test code return True return realm.inline_image_preview def list_of_tlds() -> list[str]: # Skip a few overly-common false-positives from file extensions common_false_positives = {"java", "md", "mov", "py", "zip"} return sorted(tld_set - common_false_positives, key=len, reverse=True) def walk_tree( root: Element, processor: Callable[[Element], _T | None], stop_after_first: bool = False ) -> list[_T]: results = [] queue = deque([root]) while queue: currElement = queue.popleft() for child in currElement: queue.append(child) result = processor(child) if result is not None: results.append(result) if stop_after_first: return results return results @dataclass class ElementFamily: grandparent: Element | None parent: Element child: Element in_blockquote: bool T = TypeVar("T") class ResultWithFamily(Generic[T]): family: ElementFamily result: T def __init__(self, family: ElementFamily, result: T) -> None: self.family = family self.result = result class ElementPair: parent: Optional["ElementPair"] value: Element def __init__(self, parent: Optional["ElementPair"], value: Element) -> None: self.parent = parent self.value = value def walk_tree_with_family( root: Element, processor: Callable[[Element], _T | None], ) -> list[ResultWithFamily[_T]]: results = [] queue = deque([ElementPair(parent=None, value=root)]) while queue: currElementPair = queue.popleft() for child in currElementPair.value: queue.append(ElementPair(parent=currElementPair, value=child)) result = processor(child) if result is not None: if currElementPair.parent is not None: grandparent_element = currElementPair.parent grandparent: Element | None = grandparent_element.value else: grandparent = None family = ElementFamily( grandparent=grandparent, parent=currElementPair.value, child=child, in_blockquote=has_blockquote_ancestor(currElementPair), ) results.append( ResultWithFamily( family=family, result=result, ) ) return results def has_blockquote_ancestor(element_pair: ElementPair | None) -> bool: if element_pair is None: return False elif element_pair.value.tag == "blockquote": return True else: return has_blockquote_ancestor(element_pair.parent) @cache_with_key(lambda tweet_id: tweet_id, cache_name="database") def fetch_tweet_data(tweet_id: str) -> dict[str, Any] | None: # Twitter removed support for the v1 API that this integration # used. Given that, there's no point wasting time trying to make # network requests to Twitter. But we leave this function, because # existing cached renderings for Tweets is useful. We throw an # exception rather than returning `None` to avoid caching that the # link doesn't exist. raise NotImplementedError("Twitter desupported their v1 API") class OpenGraphSession(OutgoingSession): def __init__(self) -> None: super().__init__(role="markdown", timeout=1) def fetch_open_graph_image(url: str) -> dict[str, Any] | None: og: dict[str, str | None] = {"image": None, "title": None, "desc": None} try: with OpenGraphSession().get( url, headers={"Accept": "text/html,application/xhtml+xml"}, stream=True ) as res: if res.status_code != requests.codes.ok: return None m = EmailMessage() m["Content-Type"] = res.headers.get("Content-Type") mimetype = m.get_content_type() if mimetype not in ("text/html", "application/xhtml+xml"): return None html = mimetype == "text/html" res.raw.decode_content = True for event, element in lxml.etree.iterparse( res.raw, events=("start",), no_network=True, remove_comments=True, html=html ): parent = element.getparent() if parent is not None: # Reduce memory usage. parent.text = None parent.remove(element) if element.tag in ("body", "{http://www.w3.org/1999/xhtml}body"): break elif element.tag in ("meta", "{http://www.w3.org/1999/xhtml}meta"): if element.get("property") == "og:image": content = element.get("content") if content is not None: og["image"] = urljoin(res.url, content) elif element.get("property") == "og:title": og["title"] = element.get("content") elif element.get("property") == "og:description": og["desc"] = element.get("content") except (requests.RequestException, urllib3.exceptions.HTTPError): return None return None if og["image"] is None else og def get_tweet_id(url: str) -> str | None: parsed_url = urlsplit(url) if not (parsed_url.netloc == "twitter.com" or parsed_url.netloc.endswith(".twitter.com")): return None to_match = parsed_url.path # In old-style twitter.com/#!/wdaher/status/1231241234-style URLs, # we need to look at the fragment instead if parsed_url.path == "/" and len(parsed_url.fragment) > 5: to_match = parsed_url.fragment tweet_id_match = re.match( r"^!?/.*?/status(es)?/(?P\d{10,30})(/photo/[0-9])?/?$", to_match ) if not tweet_id_match: return None return tweet_id_match.group("tweetid") class InlineImageProcessor(markdown.treeprocessors.Treeprocessor): """ Rewrite inline img tags to serve external content via Camo. This rewrites all images, except ones that are served from the current realm or global STATIC_URL. This is to ensure that each realm only loads images that are hosted on that realm or by the global installation, avoiding information leakage to external domains or between realms. We need to disable proxying of images hosted on the same realm, because otherwise we will break images in /user_uploads/, which require authorization to view. """ def __init__(self, zmd: "ZulipMarkdown") -> None: super().__init__(zmd) self.zmd = zmd @override def run(self, root: Element) -> None: # Get all URLs from the blob found_imgs = walk_tree(root, lambda e: e if e.tag == "img" else None) for img in found_imgs: url = img.get("src") assert url is not None if is_static_or_current_realm_url(url, self.zmd.zulip_realm): # Don't rewrite images on our own site (e.g. emoji, user uploads). continue img.set("src", get_camo_url(url)) class InlineVideoProcessor(markdown.treeprocessors.Treeprocessor): """ Rewrite inline video tags to serve external content via Camo. This rewrites all video, except ones that are served from the current realm or global STATIC_URL. This is to ensure that each realm only loads videos that are hosted on that realm or by the global installation, avoiding information leakage to external domains or between realms. We need to disable proxying of videos hosted on the same realm, because otherwise we will break videos in /user_uploads/, which require authorization to view. """ def __init__(self, zmd: "ZulipMarkdown") -> None: super().__init__(zmd) self.zmd = zmd @override def run(self, root: Element) -> None: # Get all URLs from the blob found_videos = walk_tree(root, lambda e: e if e.tag == "video" else None) for video in found_videos: url = video.get("src") assert url is not None if is_static_or_current_realm_url(url, self.zmd.zulip_realm): # Don't rewrite videos on our own site (e.g. user uploads). continue # Pass down both camo generated URL and the original video URL to the client. # Camo URL is only used to generate preview of the video. When user plays the # video, we switch to the source url to fetch the video. This allows playing # the video with no load on our servers. video.set("src", get_camo_url(url)) video.set("data-video-original-url", url) class BacktickInlineProcessor(markdown.inlinepatterns.BacktickInlineProcessor): """Return a `` element containing the matching text.""" @override def handleMatch( # type: ignore[override] # https://github.com/python/mypy/issues/10197 self, m: Match[str], data: str ) -> tuple[Element | str | None, int | None, int | None]: # Let upstream's implementation do its job as it is, we'll # just replace the text to not strip the group because it # makes it impossible to put leading/trailing whitespace in # an inline code span. el, start, end = ret = super().handleMatch(m, data) if el is not None and m.group(3): assert isinstance(el, Element) # upstream's code here is: m.group(3).strip() rather than m.group(3). el.text = markdown.util.AtomicString(markdown.util.code_escape(m.group(3))) return ret # List from https://support.google.com/chromeos/bin/answer.py?hl=en&answer=183093 IMAGE_EXTENSIONS = [".bmp", ".gif", ".jpe", ".jpeg", ".jpg", ".png", ".webp"] class InlineInterestingLinkProcessor(markdown.treeprocessors.Treeprocessor): TWITTER_MAX_IMAGE_HEIGHT = 400 TWITTER_MAX_TO_PREVIEW = 3 INLINE_PREVIEW_LIMIT_PER_MESSAGE = 24 def __init__(self, zmd: "ZulipMarkdown") -> None: super().__init__(zmd) self.zmd = zmd def add_a( self, root: Element, image_url: str, link: str, title: str | None = None, desc: str | None = None, class_attr: str = "message_inline_image", data_id: str | None = None, insertion_index: int | None = None, already_thumbnailed: bool = False, ) -> None: desc = desc if desc is not None else "" # Update message.has_image attribute. if "message_inline_image" in class_attr and self.zmd.zulip_message: self.zmd.zulip_message.has_image = True if insertion_index is not None: div = Element("div") root.insert(insertion_index, div) else: div = SubElement(root, "div") div.set("class", class_attr) a = SubElement(div, "a") a.set("href", link) if title is not None: a.set("title", title) if data_id is not None: a.set("data-id", data_id) img = SubElement(a, "img") if image_url.startswith("/user_uploads/") and self.zmd.zulip_db_data: path_id = image_url.removeprefix("/user_uploads/") # We should have pulled the preview data for this image # (even if that's "no preview yet") from the database # before rendering; is_image should have enforced that. assert path_id in self.zmd.zulip_db_data.user_upload_previews metadata = self.zmd.zulip_db_data.user_upload_previews[path_id] # Insert a placeholder image spinner. We post-process # this content (see rewrite_thumbnailed_images in # zerver.lib.thumbnail), looking specifically for this # tag, and may re-write it into the thumbnail URL if it # already exists when the message is sent. img.set("class", "image-loading-placeholder") img.set("src", "/static/images/loading/loader-black.svg") img.set( "data-original-dimensions", f"{metadata.original_width_px}x{metadata.original_height_px}", ) else: img.set("src", image_url) if class_attr == "message_inline_ref": summary_div = SubElement(div, "div") title_div = SubElement(summary_div, "div") title_div.set("class", "message_inline_image_title") title_div.text = title desc_div = SubElement(summary_div, "desc") desc_div.set("class", "message_inline_image_desc") def add_oembed_data(self, root: Element, link: str, extracted_data: UrlOEmbedData) -> None: if extracted_data.image is None: # Don't add an embed if an image is not found return if extracted_data.type == "photo": self.add_a( root, image_url=extracted_data.image, link=link, title=extracted_data.title, ) elif extracted_data.type == "video": self.add_a( root, image_url=extracted_data.image, link=link, title=extracted_data.title, desc=extracted_data.description, class_attr="embed-video message_inline_image", data_id=extracted_data.html, already_thumbnailed=True, ) def add_embed(self, root: Element, link: str, extracted_data: UrlEmbedData) -> None: if isinstance(extracted_data, UrlOEmbedData): self.add_oembed_data(root, link, extracted_data) return if extracted_data.image is None: # Don't add an embed if an image is not found return container = SubElement(root, "div") container.set("class", "message_embed") img_link = get_camo_url(extracted_data.image) img = SubElement(container, "a") img.set( "style", 'background-image: url("' + img_link.replace("\\", "\\\\").replace('"', '\\"').replace("\n", "\\a ") + '")', ) img.set("href", link) img.set("class", "message_embed_image") data_container = SubElement(container, "div") data_container.set("class", "data-container") if extracted_data.title: title_elm = SubElement(data_container, "div") title_elm.set("class", "message_embed_title") a = SubElement(title_elm, "a") a.set("href", link) a.set("title", extracted_data.title) a.text = extracted_data.title if extracted_data.description: description_elm = SubElement(data_container, "div") description_elm.set("class", "message_embed_description") description_elm.text = extracted_data.description def get_actual_image_url(self, url: str) -> str: # Add specific per-site cases to convert image-preview URLs to image URLs. # See https://github.com/zulip/zulip/issues/4658 for more information parsed_url = urlsplit(url) if parsed_url.netloc == "github.com" or parsed_url.netloc.endswith(".github.com"): # https://github.com/zulip/zulip/blob/main/static/images/logo/zulip-icon-128x128.png -> # https://raw.githubusercontent.com/zulip/zulip/main/static/images/logo/zulip-icon-128x128.png split_path = parsed_url.path.split("/") if len(split_path) > 3 and split_path[3] == "blob": return urljoin( "https://raw.githubusercontent.com", "/".join(split_path[0:3] + split_path[4:]) ) return url def is_image(self, url: str) -> bool: if not self.zmd.image_preview_enabled: return False parsed_url = urlsplit(url) # remove HTML URLs which end with image extensions that cannot be shorted if parsed_url.netloc == "pasteboard.co": return False # Check against the previews we generated -- if we didn't have # a row for the ImageAttachment, then its header didn't parse # as a valid image type which libvips handles. if url.startswith("/user_uploads/") and self.zmd.zulip_db_data: path_id = url.removeprefix("/user_uploads/") return path_id in self.zmd.zulip_db_data.user_upload_previews return any(parsed_url.path.lower().endswith(ext) for ext in IMAGE_EXTENSIONS) def corrected_image_source(self, url: str) -> str | None: # This function adjusts any URLs from linx.li and # wikipedia.org to point to the actual image URL. It's # structurally very similar to dropbox_image, and possibly # should be rewritten to use open graph, but has some value. parsed_url = urlsplit(url) if parsed_url.netloc.lower().endswith(".wikipedia.org") and parsed_url.path.startswith( "/wiki/File:" ): # Redirecting from "/wiki/File:" to "/wiki/Special:FilePath/File:" # A possible alternative, that avoids the redirect after hitting "Special:" # is using the first characters of md5($filename) to generate the URL newpath = parsed_url.path.replace("/wiki/File:", "/wiki/Special:FilePath/File:", 1) return parsed_url._replace(path=newpath).geturl() if parsed_url.netloc == "linx.li": return "https://linx.li/s" + parsed_url.path return None def dropbox_image(self, url: str) -> dict[str, Any] | None: # TODO: The returned Dict could possibly be a TypedDict in future. parsed_url = urlsplit(url) if parsed_url.netloc == "dropbox.com" or parsed_url.netloc.endswith(".dropbox.com"): is_album = parsed_url.path.startswith("/sc/") or parsed_url.path.startswith("/photos/") # Only allow preview Dropbox shared links if not ( parsed_url.path.startswith("/s/") or parsed_url.path.startswith("/sh/") or is_album ): return None # Try to retrieve open graph protocol info for a preview # This might be redundant right now for shared links for images. # However, we might want to make use of title and description # in the future. If the actual image is too big, we might also # want to use the open graph image. image_info = fetch_open_graph_image(url) is_image = is_album or self.is_image(url) # If it is from an album or not an actual image file, # just use open graph image. if is_album or not is_image: # Failed to follow link to find an image preview so # use placeholder image and guess filename if image_info is None: return None image_info["is_image"] = is_image return image_info # Otherwise, try to retrieve the actual image. # This is because open graph image from Dropbox may have padding # and gifs do not work. # TODO: What if image is huge? Should we get headers first? if image_info is None: image_info = {} image_info["is_image"] = True image_info["image"] = parsed_url._replace(query="raw=1").geturl() return image_info return None def youtube_id(self, url: str) -> str | None: if not self.zmd.image_preview_enabled: return None id = None split_url = urlsplit(url) if split_url.scheme in ("http", "https"): if split_url.hostname in ( "m.youtube.com", "www.youtube.com", "www.youtube-nocookie.com", "youtube.com", "youtube-nocookie.com", ): query = parse_qs(split_url.query) if split_url.path in ("/watch", "/watch_popup") and "v" in query: id = query["v"][0] elif split_url.path == "/watch_videos" and "video_ids" in query: id = query["video_ids"][0].split(",", 1)[0] elif split_url.path.startswith(("/embed/", "/shorts/", "/v/")): id = split_url.path.split("/", 3)[2] elif split_url.hostname == "youtu.be" and split_url.path.startswith("/"): id = split_url.path.removeprefix("/") if id is not None and re.fullmatch(r"[0-9A-Za-z_-]+", id): return id return None def youtube_title(self, extracted_data: UrlEmbedData) -> str | None: if extracted_data.title is not None: return f"YouTube - {extracted_data.title}" return None def youtube_image(self, url: str) -> str | None: yt_id = self.youtube_id(url) if yt_id is not None: return f"https://i.ytimg.com/vi/{yt_id}/default.jpg" return None def vimeo_id(self, url: str) -> str | None: if not self.zmd.image_preview_enabled: return None # (http|https)?:\/\/(www\.)?vimeo.com\/(?:channels\/(?:\w+\/)?|groups\/([^\/]*)\/videos\/|)(\d+)(?:|\/\?) # If it matches, match.group('id') is the video id. vimeo_re = ( r"^((http|https)?:\/\/(www\.)?vimeo.com\/" r"(?:channels\/(?:\w+\/)?|groups\/" r"([^\/]*)\/videos\/|)(\d+)(?:|\/\?))$" ) match = re.match(vimeo_re, url) if match is None: return None return match.group(5) def vimeo_title(self, extracted_data: UrlEmbedData) -> str | None: if extracted_data.title is not None: return f"Vimeo - {extracted_data.title}" return None def twitter_text( self, text: str, urls: list[dict[str, str]], user_mentions: list[dict[str, Any]], media: list[dict[str, Any]], ) -> Element: """ Use data from the Twitter API to turn links, mentions and media into A tags. Also convert Unicode emojis to images. This works by using the URLs, user_mentions and media data from the twitter API and searching for Unicode emojis in the text using `POSSIBLE_EMOJI_RE`. The first step is finding the locations of the URLs, mentions, media and emoji in the text. For each match we build a dictionary with type, the start location, end location, the URL to link to, and the text(codepoint and title in case of emojis) to be used in the link(image in case of emojis). Next we sort the matches by start location. And for each we add the text from the end of the last link to the start of the current link to the output. The text needs to added to the text attribute of the first node (the P tag) or the tail the last link created. Finally we add any remaining text to the last node. """ to_process: list[dict[str, Any]] = [] # Build dicts for URLs for url_data in urls: to_process.extend( { "type": "url", "start": match.start(), "end": match.end(), "url": url_data["url"], "text": url_data["expanded_url"], } for match in re.finditer(re.escape(url_data["url"]), text, re.IGNORECASE) ) # Build dicts for mentions for user_mention in user_mentions: screen_name = user_mention["screen_name"] mention_string = "@" + screen_name to_process.extend( { "type": "mention", "start": match.start(), "end": match.end(), "url": "https://twitter.com/" + quote(screen_name), "text": mention_string, } for match in re.finditer(re.escape(mention_string), text, re.IGNORECASE) ) # Build dicts for media for media_item in media: short_url = media_item["url"] expanded_url = media_item["expanded_url"] to_process.extend( { "type": "media", "start": match.start(), "end": match.end(), "url": short_url, "text": expanded_url, } for match in re.finditer(re.escape(short_url), text, re.IGNORECASE) ) # Build dicts for emojis for match in POSSIBLE_EMOJI_RE.finditer(text): orig_syntax = match.group("syntax") codepoint = emoji_to_hex_codepoint(unqualify_emoji(orig_syntax)) if codepoint in codepoint_to_name: display_string = ":" + codepoint_to_name[codepoint] + ":" to_process.append( { "type": "emoji", "start": match.start(), "end": match.end(), "codepoint": codepoint, "title": display_string, } ) to_process.sort(key=lambda x: x["start"]) p = current_node = Element("p") def set_text(text: str) -> None: """ Helper to set the text or the tail of the current_node """ if current_node == p: current_node.text = text else: current_node.tail = text db_data: DbData | None = self.zmd.zulip_db_data current_index = 0 for item in to_process: # The text we want to link starts in already linked text skip it if item["start"] < current_index: continue # Add text from the end of last link to the start of the current # link set_text(text[current_index : item["start"]]) current_index = item["end"] if item["type"] != "emoji": elem = url_to_a(db_data, item["url"], item["text"]) assert isinstance(elem, Element) else: elem = make_emoji(item["codepoint"], item["title"]) current_node = elem p.append(elem) # Add any unused text set_text(text[current_index:]) return p def twitter_link(self, url: str) -> Element | None: tweet_id = get_tweet_id(url) if tweet_id is None: return None try: res = fetch_tweet_data(tweet_id) if res is None: return None user: dict[str, Any] = res["user"] tweet = Element("div") tweet.set("class", "twitter-tweet") img_a = SubElement(tweet, "a") img_a.set("href", url) profile_img = SubElement(img_a, "img") profile_img.set("class", "twitter-avatar") # For some reason, for, e.g. tweet 285072525413724161, # python-twitter does not give us a # profile_image_url_https, but instead puts that URL in # profile_image_url. So use _https if available, but fall # back gracefully. image_url = user.get("profile_image_url_https", user["profile_image_url"]) profile_img.set("src", image_url) text = html.unescape(res["full_text"]) urls = res.get("urls", []) user_mentions = res.get("user_mentions", []) media: list[dict[str, Any]] = res.get("media", []) p = self.twitter_text(text, urls, user_mentions, media) tweet.append(p) span = SubElement(tweet, "span") span.text = "- {} (@{})".format(user["name"], user["screen_name"]) # Add image previews for media_item in media: # Only photos have a preview image if media_item["type"] != "photo": continue # Find the image size that is smaller than # TWITTER_MAX_IMAGE_HEIGHT px tall or the smallest size_name_tuples = sorted( media_item["sizes"].items(), reverse=True, key=lambda x: x[1]["h"] ) for size_name, size in size_name_tuples: if size["h"] < self.TWITTER_MAX_IMAGE_HEIGHT: break media_url = "{}:{}".format(media_item["media_url_https"], size_name) img_div = SubElement(tweet, "div") img_div.set("class", "twitter-image") img_a = SubElement(img_div, "a") img_a.set("href", media_item["url"]) img = SubElement(img_a, "img") img.set("src", media_url) return tweet except NotImplementedError: return None except Exception: # We put this in its own try-except because it requires external # connectivity. If Twitter flakes out, we don't want to not-render # the entire message; we just want to not show the Twitter preview. markdown_logger.warning("Error building Twitter link", exc_info=True) return None def get_url_data(self, e: Element) -> tuple[str, str | None] | None: if e.tag == "a": url = e.get("href") assert url is not None return (url, e.text) return None def get_inlining_information( self, root: Element, found_url: ResultWithFamily[tuple[str, str | None]], ) -> LinkInfo: grandparent = found_url.family.grandparent parent = found_url.family.parent ahref_element = found_url.family.child (url, text) = found_url.result # url != text usually implies a named link, which we opt not to remove url_eq_text = text is None or url == text title = None if url_eq_text else text info: LinkInfo = { "parent": root, "title": title, "index": None, "remove": None, } if parent.tag == "li": info["parent"] = parent if not parent.text and not ahref_element.tail and url_eq_text: info["remove"] = ahref_element elif parent.tag == "p": assert grandparent is not None parent_index = None for index, uncle in enumerate(grandparent): if uncle is parent: parent_index = index break # Append to end of list of grandparent's children as normal info["parent"] = grandparent if ( len(parent) == 1 and (not parent.text or parent.text == "\n") and not ahref_element.tail and url_eq_text ): info["remove"] = parent if parent_index is not None: info["index"] = self.find_proper_insertion_index(grandparent, parent, parent_index) return info def handle_image_inlining( self, root: Element, found_url: ResultWithFamily[tuple[str, str | None]], ) -> None: info = self.get_inlining_information(root, found_url) (url, text) = found_url.result actual_url = self.get_actual_image_url(url) self.add_a( info["parent"], image_url=actual_url, link=url, title=info["title"], insertion_index=info["index"], ) if info["remove"] is not None: info["parent"].remove(info["remove"]) def handle_tweet_inlining( self, root: Element, found_url: ResultWithFamily[tuple[str, str | None]], twitter_data: Element, ) -> None: info = self.get_inlining_information(root, found_url) if info["index"] is not None: div = Element("div") root.insert(info["index"], div) else: div = SubElement(root, "div") div.set("class", "inline-preview-twitter") div.insert(0, twitter_data) def handle_youtube_url_inlining( self, root: Element, found_url: ResultWithFamily[tuple[str, str | None]], yt_image: str, ) -> None: info = self.get_inlining_information(root, found_url) (url, text) = found_url.result yt_id = self.youtube_id(url) self.add_a( info["parent"], image_url=yt_image, link=url, class_attr="youtube-video message_inline_image", data_id=yt_id, insertion_index=info["index"], already_thumbnailed=True, ) def find_proper_insertion_index( self, grandparent: Element, parent: Element, parent_index_in_grandparent: int ) -> int: # If there are several inline images from same paragraph, ensure that # they are in correct (and not opposite) order by inserting after last # inline image from paragraph 'parent' parent_links = [ele.attrib["href"] for ele in parent.iter(tag="a")] insertion_index = parent_index_in_grandparent while True: insertion_index += 1 if insertion_index >= len(grandparent): return insertion_index uncle = grandparent[insertion_index] inline_image_classes = { "message_inline_image", "message_inline_ref", "inline-preview-twitter", } if ( uncle.tag != "div" or "class" not in uncle.attrib or not (set(uncle.attrib["class"].split()) & inline_image_classes) ): return insertion_index uncle_link = uncle.find("a") assert uncle_link is not None if uncle_link.attrib["href"] not in parent_links: return insertion_index def is_video(self, url: str) -> bool: if not self.zmd.image_preview_enabled: return False url_type = mimetypes.guess_type(url)[0] # Support only video formats (containers) that are supported cross-browser and cross-device. As per # https://developer.mozilla.org/en-US/docs/Web/Media/Formats/Containers#index_of_media_container_formats_file_types # MP4 and WebM are the only formats that are widely supported. supported_mimetypes = ["video/mp4", "video/webm"] return url_type in supported_mimetypes def add_video( self, root: Element, url: str, title: str | None, class_attr: str = "message_inline_image message_inline_video", insertion_index: int | None = None, ) -> None: if insertion_index is not None: div = Element("div") root.insert(insertion_index, div) else: div = SubElement(root, "div") div.set("class", class_attr) # Add `a` tag so that the syntax of video matches with # other media types and clients don't get confused. a = SubElement(div, "a") a.set("href", url) if title: a.set("title", title) video = SubElement(a, "video") video.set("src", url) video.set("preload", "metadata") def handle_video_inlining( self, root: Element, found_url: ResultWithFamily[tuple[str, str | None]] ) -> None: info = self.get_inlining_information(root, found_url) url = found_url.result[0] self.add_video(info["parent"], url, info["title"], insertion_index=info["index"]) if info["remove"] is not None: info["parent"].remove(info["remove"]) @override def run(self, root: Element) -> None: # Get all URLs from the blob found_urls = walk_tree_with_family(root, self.get_url_data) unique_urls = {found_url.result[0] for found_url in found_urls} # Collect unique URLs which are not quoted as we don't do # inline previews for links inside blockquotes. unique_previewable_urls = { found_url.result[0] for found_url in found_urls if not found_url.family.in_blockquote } # Set has_link and similar flags whenever a message is processed by Markdown if self.zmd.zulip_message: self.zmd.zulip_message.has_link = len(found_urls) > 0 self.zmd.zulip_message.has_image = False # This is updated in self.add_a for url in unique_urls: # Due to rewrite_local_links_to_relative, we need to # handle both relative URLs beginning with # `/user_uploads` and beginning with `user_uploads`. # This urllib construction converts the latter into # the former. parsed_url = urlsplit(urljoin("/", url)) host = parsed_url.netloc if host != "" and ( self.zmd.zulip_realm is None or host != self.zmd.zulip_realm.host ): continue if not parsed_url.path.startswith("/user_uploads/"): continue path_id = parsed_url.path.removeprefix("/user_uploads/") self.zmd.zulip_rendering_result.potential_attachment_path_ids.append(path_id) if len(found_urls) == 0: return if len(unique_previewable_urls) > self.INLINE_PREVIEW_LIMIT_PER_MESSAGE: return processed_urls: set[str] = set() rendered_tweet_count = 0 for found_url in found_urls: (url, text) = found_url.result if url in unique_previewable_urls and url not in processed_urls: processed_urls.add(url) else: continue if self.is_video(url): self.handle_video_inlining(root, found_url) continue dropbox_image = self.dropbox_image(url) if dropbox_image is not None: class_attr = "message_inline_ref" is_image = dropbox_image["is_image"] if is_image: class_attr = "message_inline_image" # Not making use of title and description of images self.add_a( root, image_url=dropbox_image["image"], link=url, title=dropbox_image.get("title"), desc=dropbox_image.get("desc", ""), class_attr=class_attr, already_thumbnailed=True, ) continue if self.is_image(url): image_source = self.corrected_image_source(url) if image_source is not None: found_url = ResultWithFamily( family=found_url.family, result=(image_source, image_source), ) self.handle_image_inlining(root, found_url) continue netloc = urlsplit(url).netloc if netloc == "" or ( self.zmd.zulip_realm is not None and netloc == self.zmd.zulip_realm.host ): # We don't have a strong use case for doing URL preview for relative links. continue if get_tweet_id(url) is not None: if rendered_tweet_count >= self.TWITTER_MAX_TO_PREVIEW: # Only render at most one tweet per message continue twitter_data = self.twitter_link(url) if twitter_data is None: # This link is not actually a tweet known to twitter continue rendered_tweet_count += 1 self.handle_tweet_inlining(root, found_url, twitter_data) continue youtube = self.youtube_image(url) if youtube is not None: self.handle_youtube_url_inlining(root, found_url, youtube) # NOTE: We don't `continue` here, to allow replacing the URL with # the title, if INLINE_URL_EMBED_PREVIEW feature is enabled. # The entire preview would ideally be shown only if the feature # is enabled, but URL previews are a beta feature and YouTube # previews are pretty stable. db_data: DbData | None = self.zmd.zulip_db_data if db_data and db_data.sent_by_bot: continue if not self.zmd.url_embed_preview_enabled: continue if self.zmd.url_embed_data is None or url not in self.zmd.url_embed_data: self.zmd.zulip_rendering_result.links_for_preview.add(url) continue # Existing but being None means that we did process the # URL, but it was not valid to preview. extracted_data = self.zmd.url_embed_data[url] if extracted_data is None: continue if youtube is not None: title = self.youtube_title(extracted_data) if title is not None: if url == text: found_url.family.child.text = title else: found_url.family.child.text = text continue self.add_embed(root, url, extracted_data) if self.vimeo_id(url): title = self.vimeo_title(extracted_data) if title: if url == text: found_url.family.child.text = title else: found_url.family.child.text = text class CompiledInlineProcessor(markdown.inlinepatterns.InlineProcessor): def __init__(self, compiled_re: Pattern[str], zmd: "ZulipMarkdown") -> None: # This is similar to the superclass's small __init__ function, # but we skip the compilation step and let the caller give us # a compiled regex. self.compiled_re = compiled_re self.md = zmd self.zmd = zmd class Timestamp(markdown.inlinepatterns.Pattern): @override def handleMatch(self, match: Match[str]) -> Element | None: time_input_string = match.group("time") try: timestamp = dateutil.parser.parse(time_input_string, tzinfos=common_timezones) except (ValueError, OverflowError): try: timestamp = datetime.fromtimestamp(float(time_input_string), tz=timezone.utc) except ValueError: timestamp = None if not timestamp: error_element = Element("span") error_element.set("class", "timestamp-error") error_element.text = markdown.util.AtomicString( f"Invalid time format: {time_input_string}" ) return error_element # Use HTML5