markdown: Stream and parse incrementally in fetch_open_graph_image.

This way we can stop reading as soon as we get to the body.  Also,
send an Accept header, check that the request was actually successful,
use lxml.etree.iterparse instead of a broken hand-rolled state
machine, and support XHTML, all for negative 28 lines of code.

Signed-off-by: Anders Kaseorg <anders@zulip.com>
This commit is contained in:
Anders Kaseorg 2021-08-04 01:10:20 -07:00 committed by Tim Abbott
parent d384ec55e2
commit 806494da06
1 changed files with 36 additions and 64 deletions

View File

@ -1,5 +1,6 @@
# Zulip's main Markdown implementation. See docs/subsystems/markdown.md for
# detailed documentation on our Markdown syntax.
import cgi
import datetime
import html
import logging
@ -30,6 +31,7 @@ from xml.etree.ElementTree import Element, SubElement
import ahocorasick
import dateutil.parser
import dateutil.tz
import lxml.etree
import markdown
import markdown.blockprocessors
import markdown.inlinepatterns
@ -473,75 +475,45 @@ def fetch_tweet_data(tweet_id: str) -> Optional[Dict[str, Any]]:
return res
HEAD_START_RE = re.compile("^head[ >]")
HEAD_END_RE = re.compile("^/head[ >]")
META_START_RE = re.compile("^meta[ >]")
META_END_RE = re.compile("^/meta[ >]")
def fetch_open_graph_image(url: str) -> Optional[Dict[str, Any]]:
in_head = False
# HTML will auto close meta tags, when we start the next tag add
# a closing tag if it has not been closed yet.
last_closed = True
head = []
# TODO: What if response content is huge? Should we get headers first?
og = {"image": None, "title": None, "desc": None}
try:
content = requests.get(url, timeout=1).text
with requests.get(
url, headers={"Accept": "text/html,application/xhtml+xml"}, stream=True, timeout=1
) as res:
if res.status_code != requests.codes.ok:
return None
mimetype, options = cgi.parse_header(res.headers["Content-Type"])
if mimetype not in ("text/html", "application/xhtml+xml"):
return None
html = mimetype == "text/html"
res.raw.decode_content = True
for event, element in lxml.etree.iterparse(
res.raw, events=("start",), no_network=True, remove_comments=True, html=html
):
parent = element.getparent()
if parent is not None:
# Reduce memory usage.
parent.text = None
parent.remove(element)
if element.tag in ("body", "{http://www.w3.org/1999/xhtml}body"):
break
elif element.tag in ("meta", "{http://www.w3.org/1999/xhtml}meta"):
if element.get("property") == "og:image":
og["image"] = element.get("content")
elif element.get("property") == "og:title":
og["title"] = element.get("content")
elif element.get("property") == "og:description":
og["desc"] = element.get("content")
except requests.RequestException:
return None
# Extract the head and meta tags
# All meta tags are self closing, have no children or are closed
# automatically.
for part in content.split("<"):
if not in_head and HEAD_START_RE.match(part):
# Started the head node output it to have a document root
in_head = True
head.append("<head>")
elif in_head and HEAD_END_RE.match(part):
# Found the end of the head close any remaining tag then stop
# processing
in_head = False
if not last_closed:
last_closed = True
head.append("</meta>")
head.append("</head>")
break
elif in_head and META_START_RE.match(part):
# Found a meta node copy it
if not last_closed:
head.append("</meta>")
last_closed = True
head.append("<")
head.append(part)
if "/>" not in part:
last_closed = False
elif in_head and META_END_RE.match(part):
# End of a meta node just copy it to close the tag
head.append("<")
head.append(part)
last_closed = True
try:
doc = etree.fromstring("".join(head))
except etree.ParseError:
return None
og_image = doc.find('meta[@property="og:image"]')
og_title = doc.find('meta[@property="og:title"]')
og_desc = doc.find('meta[@property="og:description"]')
title = None
desc = None
if og_image is not None:
image = og_image.get("content")
else:
return None
if og_title is not None:
title = og_title.get("content")
if og_desc is not None:
desc = og_desc.get("content")
return {"image": image, "title": title, "desc": desc}
return None if og["image"] is None else og
def get_tweet_id(url: str) -> Optional[str]: