zulip/zerver/lib/url_preview/preview.py

108 lines
4.2 KiB
Python

import re
from typing import Any, Callable, Dict, Optional
from typing.re import Match
import magic
import requests
from django.conf import settings
from django.utils.encoding import smart_text
from version import ZULIP_VERSION
from zerver.lib.cache import cache_with_key, get_cache_with_key, preview_url_cache_key
from zerver.lib.pysa import mark_sanitized
from zerver.lib.url_preview.oembed import get_oembed_data
from zerver.lib.url_preview.parsers import GenericParser, OpenGraphParser
# FIXME: Should we use a database cache or a memcached in production? What if
# opengraph data is changed for a site?
# Use an in-memory cache for development, to make it easy to develop this code
CACHE_NAME = "database" if not settings.DEVELOPMENT else "in-memory"
# Based on django.core.validators.URLValidator, with ftp support removed.
link_regex = re.compile(
r'^(?:http)s?://' # http:// or https://
r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+(?:[A-Z]{2,6}\.?|[A-Z0-9-]{2,}\.?)|' # domain...
r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})' # ...or ip
r'(?::\d+)?' # optional port
r'(?:/?|[/?]\S+)$', re.IGNORECASE)
# Use Chrome User-Agent, since some sites refuse to work on old browsers
ZULIP_URL_PREVIEW_USER_AGENT = (
'Mozilla/5.0 AppleWebKit/537.36 (KHTML, like Gecko; compatible; ZulipURLPreview/{version}; '
'+{external_host}) Chrome/81.0.4044.113 Safari/537.36'
).format(version=ZULIP_VERSION, external_host=settings.EXTERNAL_HOST)
# FIXME: This header and timeout are not used by pyoembed, when trying to autodiscover!
HEADERS = {'User-Agent': ZULIP_URL_PREVIEW_USER_AGENT}
TIMEOUT = 15
def is_link(url: str) -> Match[str]:
return link_regex.match(smart_text(url))
def guess_mimetype_from_content(response: requests.Response) -> str:
mime_magic = magic.Magic(mime=True)
try:
content = next(response.iter_content(1000))
except StopIteration:
content = ''
return mime_magic.from_buffer(content)
def valid_content_type(url: str) -> bool:
try:
response = requests.get(url, stream=True, headers=HEADERS, timeout=TIMEOUT)
except requests.RequestException:
return False
if not response.ok:
return False
content_type = response.headers.get('content-type')
# Be accommodating of bad servers: assume content may be html if no content-type header
if not content_type or content_type.startswith('text/html'):
# Verify that the content is actually HTML if the server claims it is
content_type = guess_mimetype_from_content(response)
return content_type.startswith('text/html')
def catch_network_errors(func: Callable[..., Any]) -> Callable[..., Any]:
def wrapper(*args: Any, **kwargs: Any) -> Any:
try:
return func(*args, **kwargs)
except requests.exceptions.RequestException:
pass
return wrapper
@catch_network_errors
@cache_with_key(preview_url_cache_key, cache_name=CACHE_NAME, with_statsd_key="urlpreview_data")
def get_link_embed_data(url: str,
maxwidth: int=640,
maxheight: int=480) -> Optional[Dict[str, Any]]:
if not is_link(url):
return None
if not valid_content_type(url):
return None
# We are using two different mechanisms to get the embed data
# 1. Use OEmbed data, if found, for photo and video "type" sites
# 2. Otherwise, use a combination of Open Graph tags and Meta tags
data = get_oembed_data(url, maxwidth=maxwidth, maxheight=maxheight) or {}
if data.get('oembed'):
return data
response = requests.get(mark_sanitized(url), stream=True, headers=HEADERS, timeout=TIMEOUT)
if response.ok:
og_data = OpenGraphParser(response.text).extract_data()
for key in ['title', 'description', 'image']:
if not data.get(key) and og_data.get(key):
data[key] = og_data[key]
generic_data = GenericParser(response.text).extract_data() or {}
for key in ['title', 'description', 'image']:
if not data.get(key) and generic_data.get(key):
data[key] = generic_data[key]
return data
@get_cache_with_key(preview_url_cache_key, cache_name=CACHE_NAME)
def link_embed_data_from_cache(url: str, maxwidth: int=640, maxheight: int=480) -> Any:
return