diff --git a/zerver/lib/markdown/__init__.py b/zerver/lib/markdown/__init__.py index cf3d667386..d2f9f051ed 100644 --- a/zerver/lib/markdown/__init__.py +++ b/zerver/lib/markdown/__init__.py @@ -1,5 +1,6 @@ # Zulip's main Markdown implementation. See docs/subsystems/markdown.md for # detailed documentation on our Markdown syntax. +import cgi import datetime import html import logging @@ -30,6 +31,7 @@ from xml.etree.ElementTree import Element, SubElement import ahocorasick import dateutil.parser import dateutil.tz +import lxml.etree import markdown import markdown.blockprocessors import markdown.inlinepatterns @@ -473,75 +475,45 @@ def fetch_tweet_data(tweet_id: str) -> Optional[Dict[str, Any]]: return res -HEAD_START_RE = re.compile("^head[ >]") -HEAD_END_RE = re.compile("^/head[ >]") -META_START_RE = re.compile("^meta[ >]") -META_END_RE = re.compile("^/meta[ >]") - - def fetch_open_graph_image(url: str) -> Optional[Dict[str, Any]]: - in_head = False - # HTML will auto close meta tags, when we start the next tag add - # a closing tag if it has not been closed yet. - last_closed = True - head = [] - # TODO: What if response content is huge? Should we get headers first? + og = {"image": None, "title": None, "desc": None} + try: - content = requests.get(url, timeout=1).text + with requests.get( + url, headers={"Accept": "text/html,application/xhtml+xml"}, stream=True, timeout=1 + ) as res: + if res.status_code != requests.codes.ok: + return None + + mimetype, options = cgi.parse_header(res.headers["Content-Type"]) + if mimetype not in ("text/html", "application/xhtml+xml"): + return None + html = mimetype == "text/html" + + res.raw.decode_content = True + for event, element in lxml.etree.iterparse( + res.raw, events=("start",), no_network=True, remove_comments=True, html=html + ): + parent = element.getparent() + if parent is not None: + # Reduce memory usage. + parent.text = None + parent.remove(element) + + if element.tag in ("body", "{http://www.w3.org/1999/xhtml}body"): + break + elif element.tag in ("meta", "{http://www.w3.org/1999/xhtml}meta"): + if element.get("property") == "og:image": + og["image"] = element.get("content") + elif element.get("property") == "og:title": + og["title"] = element.get("content") + elif element.get("property") == "og:description": + og["desc"] = element.get("content") + except requests.RequestException: return None - # Extract the head and meta tags - # All meta tags are self closing, have no children or are closed - # automatically. - for part in content.split("<"): - if not in_head and HEAD_START_RE.match(part): - # Started the head node output it to have a document root - in_head = True - head.append("") - elif in_head and HEAD_END_RE.match(part): - # Found the end of the head close any remaining tag then stop - # processing - in_head = False - if not last_closed: - last_closed = True - head.append("") - head.append("") - break - elif in_head and META_START_RE.match(part): - # Found a meta node copy it - if not last_closed: - head.append("") - last_closed = True - head.append("<") - head.append(part) - if "/>" not in part: - last_closed = False - - elif in_head and META_END_RE.match(part): - # End of a meta node just copy it to close the tag - head.append("<") - head.append(part) - last_closed = True - - try: - doc = etree.fromstring("".join(head)) - except etree.ParseError: - return None - og_image = doc.find('meta[@property="og:image"]') - og_title = doc.find('meta[@property="og:title"]') - og_desc = doc.find('meta[@property="og:description"]') - title = None - desc = None - if og_image is not None: - image = og_image.get("content") - else: - return None - if og_title is not None: - title = og_title.get("content") - if og_desc is not None: - desc = og_desc.get("content") - return {"image": image, "title": title, "desc": desc} + return None if og["image"] is None else og def get_tweet_id(url: str) -> Optional[str]: