from typing import Any, Callable, Dict, Optional from unittest import mock import orjson from django.test import override_settings from django.utils.html import escape from requests.exceptions import ConnectionError from zerver.lib.actions import queue_json_publish from zerver.lib.cache import NotFoundInCache, cache_set, preview_url_cache_key from zerver.lib.test_classes import ZulipTestCase from zerver.lib.test_helpers import MockPythonResponse, mock_queue_publish from zerver.lib.url_preview.oembed import get_oembed_data, strip_cdata from zerver.lib.url_preview.parsers import GenericParser, OpenGraphParser from zerver.lib.url_preview.preview import get_link_embed_data, link_embed_data_from_cache from zerver.models import Message, Realm, UserProfile from zerver.worker.queue_processors import FetchLinksEmbedData TEST_CACHES = { 'default': { 'BACKEND': 'django.core.cache.backends.locmem.LocMemCache', 'LOCATION': 'default', }, 'database': { 'BACKEND': 'django.core.cache.backends.locmem.LocMemCache', 'LOCATION': 'url-preview', }, 'in-memory': { 'BACKEND': 'django.core.cache.backends.locmem.LocMemCache', 'LOCATION': 'url-preview', }, } @override_settings(INLINE_URL_EMBED_PREVIEW=True) class OembedTestCase(ZulipTestCase): @mock.patch('pyoembed.requests.get') def test_present_provider(self, get: Any) -> None: get.return_value = response = mock.Mock() response.headers = {'content-type': 'application/json'} response.ok = True response_data = { 'type': 'rich', 'thumbnail_url': 'https://scontent.cdninstagram.com/t51.2885-15/n.jpg', 'thumbnail_width': 640, 'thumbnail_height': 426, 'title': 'NASA', 'html': '
test
', 'version': '1.0', 'width': 658, 'height': 400} response.text = orjson.dumps(response_data).decode() url = 'http://instagram.com/p/BLtI2WdAymy' data = get_oembed_data(url) self.assertIsInstance(data, dict) self.assertIn('title', data) assert data is not None # allow mypy to infer data is indexable self.assertEqual(data['title'], response_data['title']) @mock.patch('pyoembed.requests.get') def test_photo_provider(self, get: Any) -> None: get.return_value = response = mock.Mock() response.headers = {'content-type': 'application/json'} response.ok = True response_data = { 'type': 'photo', 'thumbnail_url': 'https://scontent.cdninstagram.com/t51.2885-15/n.jpg', 'url': 'https://scontent.cdninstagram.com/t51.2885-15/n.jpg', 'thumbnail_width': 640, 'thumbnail_height': 426, 'title': 'NASA', 'html': 'test
', 'version': '1.0', 'width': 658, 'height': 400} response.text = orjson.dumps(response_data).decode() url = 'http://imgur.com/photo/158727223' data = get_oembed_data(url) self.assertIsInstance(data, dict) self.assertIn('title', data) assert data is not None # allow mypy to infer data is indexable self.assertEqual(data['title'], response_data['title']) self.assertTrue(data['oembed']) @mock.patch('pyoembed.requests.get') def test_video_provider(self, get: Any) -> None: get.return_value = response = mock.Mock() response.headers = {'content-type': 'application/json'} response.ok = True response_data = { 'type': 'video', 'thumbnail_url': 'https://scontent.cdninstagram.com/t51.2885-15/n.jpg', 'thumbnail_width': 640, 'thumbnail_height': 426, 'title': 'NASA', 'html': 'test
', 'version': '1.0', 'width': 658, 'height': 400} response.text = orjson.dumps(response_data).decode() url = 'http://blip.tv/video/158727223' data = get_oembed_data(url) self.assertIsInstance(data, dict) self.assertIn('title', data) assert data is not None # allow mypy to infer data is indexable self.assertEqual(data['title'], response_data['title']) @mock.patch('pyoembed.requests.get') def test_error_request(self, get: Any) -> None: get.return_value = response = mock.Mock() response.ok = False url = 'http://instagram.com/p/BLtI2WdAymy' data = get_oembed_data(url) self.assertIsNone(data) @mock.patch('pyoembed.requests.get') def test_invalid_json_in_response(self, get: Any) -> None: get.return_value = response = mock.Mock() response.headers = {'content-type': 'application/json'} response.ok = True response.text = '{invalid json}' url = 'http://instagram.com/p/BLtI2WdAymy' data = get_oembed_data(url) self.assertIsNone(data) def test_oembed_html(self) -> None: html = '' stripped_html = strip_cdata(html) self.assertEqual(html, stripped_html) def test_autodiscovered_oembed_xml_format_html(self) -> None: iframe_content = '' html = f'' stripped_html = strip_cdata(html) self.assertEqual(iframe_content, stripped_html) class OpenGraphParserTestCase(ZulipTestCase): def test_page_with_og(self) -> None: html = """ """ parser = OpenGraphParser(html) result = parser.extract_data() self.assertIn('title', result) self.assertEqual(result['title'], 'The Rock') self.assertEqual(result.get('description'), 'The Rock film') def test_page_with_evil_og_tags(self) -> None: html = """ """ parser = OpenGraphParser(html) result = parser.extract_data() self.assertIn('title', result) self.assertEqual(result['title'], 'The Rock') self.assertEqual(result.get('description'), 'The Rock film') self.assertEqual(result.get('oembed'), None) self.assertEqual(result.get('html'), None) class GenericParserTestCase(ZulipTestCase): def test_parser(self) -> None: html = """Description text
""" parser = GenericParser(html) result = parser.extract_data() self.assertEqual(result.get('title'), 'Test title') self.assertEqual(result.get('description'), 'Description text') def test_extract_image(self) -> None: html = """Description text
Description text
Description text
""" def setUp(self) -> None: super().setUp() Realm.objects.all().update(inline_url_embed_preview=True) @classmethod def create_mock_response(cls, url: str, relative_url: bool=False, headers: Optional[Dict[str, str]]=None, html: Optional[str]=None) -> Callable[..., MockPythonResponse]: if html is None: html = cls.open_graph_html if relative_url is True: html = html.replace('http://ia.media-imdb.com', '') response = MockPythonResponse(html, 200, headers) return lambda k, **kwargs: {url: response}.get(k, MockPythonResponse('', 404, headers)) @override_settings(INLINE_URL_EMBED_PREVIEW=True) def test_edit_message_history(self) -> None: user = self.example_user('hamlet') self.login_user(user) msg_id = self.send_stream_message(user, "Scotland", topic_name="editing", content="original") url = 'http://test.org/' mocked_response = mock.Mock(side_effect=self.create_mock_response(url)) with mock_queue_publish('zerver.views.message_edit.queue_json_publish') as patched: result = self.client_patch("/json/messages/" + str(msg_id), { 'message_id': msg_id, 'content': url, }) self.assert_json_success(result) patched.assert_called_once() queue = patched.call_args[0][0] self.assertEqual(queue, "embed_links") event = patched.call_args[0][1] with self.settings(TEST_SUITE=False, CACHES=TEST_CACHES): with mock.patch('requests.get', mocked_response), self.assertLogs(level='INFO') as info_logs: FetchLinksEmbedData().consume(event) self.assertTrue( 'INFO:root:Time spent on get_link_embed_data for http://test.org/: ' in info_logs.output[0] ) embedded_link = f'The Rock' msg = Message.objects.select_related("sender").get(id=msg_id) self.assertIn(embedded_link, msg.rendered_content) @override_settings(INLINE_URL_EMBED_PREVIEW=True) def _send_message_with_test_org_url(self, sender: UserProfile, queue_should_run: bool=True, relative_url: bool=False) -> Message: url = 'http://test.org/' with mock_queue_publish('zerver.lib.actions.queue_json_publish') as patched: msg_id = self.send_personal_message( sender, self.example_user('cordelia'), content=url, ) if queue_should_run: patched.assert_called_once() queue = patched.call_args[0][0] self.assertEqual(queue, "embed_links") event = patched.call_args[0][1] else: patched.assert_not_called() # If we nothing was put in the queue, we don't need to # run the queue processor or any of the following code return Message.objects.select_related("sender").get(id=msg_id) # Verify the initial message doesn't have the embedded links rendered msg = Message.objects.select_related("sender").get(id=msg_id) self.assertNotIn( f'The Rock', msg.rendered_content) # Mock the network request result so the test can be fast without Internet mocked_response = mock.Mock(side_effect=self.create_mock_response(url, relative_url=relative_url)) # Run the queue processor to potentially rerender things with self.settings(TEST_SUITE=False, CACHES=TEST_CACHES): with mock.patch('requests.get', mocked_response), self.assertLogs(level='INFO') as info_logs: FetchLinksEmbedData().consume(event) self.assertTrue( 'INFO:root:Time spent on get_link_embed_data for http://test.org/: ' in info_logs.output[0] ) msg = Message.objects.select_related("sender").get(id=msg_id) return msg @override_settings(INLINE_URL_EMBED_PREVIEW=True) def test_message_update_race_condition(self) -> None: user = self.example_user('hamlet') self.login_user(user) original_url = 'http://test.org/' edited_url = 'http://edited.org/' with mock_queue_publish('zerver.lib.actions.queue_json_publish') as patched: msg_id = self.send_stream_message(user, "Scotland", topic_name="foo", content=original_url) patched.assert_called_once() queue = patched.call_args[0][0] self.assertEqual(queue, "embed_links") event = patched.call_args[0][1] def wrapped_queue_json_publish(*args: Any, **kwargs: Any) -> None: # Mock the network request result so the test can be fast without Internet mocked_response_original = mock.Mock(side_effect=self.create_mock_response(original_url)) mocked_response_edited = mock.Mock(side_effect=self.create_mock_response(edited_url)) with self.settings(TEST_SUITE=False, CACHES=TEST_CACHES): with mock.patch('requests.get', mocked_response_original), self.assertLogs(level='INFO') as info_logs: # Run the queue processor. This will simulate the event for original_url being # processed after the message has been edited. FetchLinksEmbedData().consume(event) self.assertTrue( 'INFO:root:Time spent on get_link_embed_data for http://test.org/: ' in info_logs.output[0] ) msg = Message.objects.select_related("sender").get(id=msg_id) # The content of the message has changed since the event for original_url has been created, # it should not be rendered. Another, up-to-date event will have been sent (edited_url). self.assertNotIn(f'The Rock', msg.rendered_content) mocked_response_edited.assert_not_called() with self.settings(TEST_SUITE=False, CACHES=TEST_CACHES): with mock.patch('requests.get', mocked_response_edited), self.assertLogs(level='INFO') as info_logs: # Now proceed with the original queue_json_publish and call the # up-to-date event for edited_url. queue_json_publish(*args, **kwargs) msg = Message.objects.select_related("sender").get(id=msg_id) self.assertIn(f'The Rock', msg.rendered_content) self.assertTrue( 'INFO:root:Time spent on get_link_embed_data for http://edited.org/: ' in info_logs.output[0] ) with mock_queue_publish('zerver.views.message_edit.queue_json_publish', wraps=wrapped_queue_json_publish): result = self.client_patch("/json/messages/" + str(msg_id), { 'message_id': msg_id, 'content': edited_url, }) self.assert_json_success(result) def test_get_link_embed_data(self) -> None: url = 'http://test.org/' embedded_link = f'The Rock' # When humans send, we should get embedded content. msg = self._send_message_with_test_org_url(sender=self.example_user('hamlet')) self.assertIn(embedded_link, msg.rendered_content) # We don't want embedded content for bots. msg = self._send_message_with_test_org_url(sender=self.example_user('webhook_bot'), queue_should_run=False) self.assertNotIn(embedded_link, msg.rendered_content) # Try another human to make sure bot failure was due to the # bot sending the message and not some other reason. msg = self._send_message_with_test_org_url(sender=self.example_user('prospero')) self.assertIn(embedded_link, msg.rendered_content) def test_inline_url_embed_preview(self) -> None: with_preview = '\n ' without_preview = '' msg = self._send_message_with_test_org_url(sender=self.example_user('hamlet')) self.assertEqual(msg.rendered_content, with_preview) realm = msg.get_realm() setattr(realm, 'inline_url_embed_preview', False) realm.save() msg = self._send_message_with_test_org_url(sender=self.example_user('prospero'), queue_should_run=False) self.assertEqual(msg.rendered_content, without_preview) @override_settings(INLINE_URL_EMBED_PREVIEW=True) def test_inline_relative_url_embed_preview(self) -> None: # Relative urls should not be sent for url preview. with mock_queue_publish('zerver.lib.actions.queue_json_publish') as patched: self.send_personal_message( self.example_user('prospero'), self.example_user('cordelia'), content="http://zulip.testserver/api/", ) patched.assert_not_called() def test_inline_url_embed_preview_with_relative_image_url(self) -> None: with_preview_relative = '\n ' # Try case where the opengraph image is a relative url. msg = self._send_message_with_test_org_url(sender=self.example_user('prospero'), relative_url=True) self.assertEqual(msg.rendered_content, with_preview_relative) def test_http_error_get_data(self) -> None: url = 'http://test.org/' msg_id = self.send_personal_message( self.example_user('hamlet'), self.example_user('cordelia'), content=url, ) msg = Message.objects.select_related("sender").get(id=msg_id) event = { 'message_id': msg_id, 'urls': [url], 'message_realm_id': msg.sender.realm_id, 'message_content': url} with self.settings(INLINE_URL_EMBED_PREVIEW=True, TEST_SUITE=False, CACHES=TEST_CACHES): with mock.patch('requests.get', mock.Mock(side_effect=ConnectionError())), \ self.assertLogs(level='INFO') as info_logs: FetchLinksEmbedData().consume(event) self.assertTrue( 'INFO:root:Time spent on get_link_embed_data for http://test.org/: ' in info_logs.output[0] ) msg = Message.objects.get(id=msg_id) self.assertEqual( '', msg.rendered_content) def test_invalid_link(self) -> None: with self.settings(INLINE_URL_EMBED_PREVIEW=True, TEST_SUITE=False, CACHES=TEST_CACHES): self.assertIsNone(get_link_embed_data('com.notvalidlink')) self.assertIsNone(get_link_embed_data('μένει.com.notvalidlink')) def test_link_embed_data_from_cache(self) -> None: url = 'http://test.org/' link_embed_data = 'test data' with self.assertRaises(NotFoundInCache): link_embed_data_from_cache(url) with self.settings(CACHES=TEST_CACHES): key = preview_url_cache_key(url) cache_set(key, link_embed_data, 'database') self.assertEqual(link_embed_data, link_embed_data_from_cache(url)) @override_settings(INLINE_URL_EMBED_PREVIEW=True) def test_link_preview_non_html_data(self) -> None: user = self.example_user('hamlet') self.login_user(user) url = 'http://test.org/audio.mp3' with mock_queue_publish('zerver.lib.actions.queue_json_publish') as patched: msg_id = self.send_stream_message(user, "Scotland", topic_name="foo", content=url) patched.assert_called_once() queue = patched.call_args[0][0] self.assertEqual(queue, "embed_links") event = patched.call_args[0][1] headers = {'content-type': 'application/octet-stream'} mocked_response = mock.Mock(side_effect=self.create_mock_response(url, headers=headers)) with self.settings(TEST_SUITE=False, CACHES=TEST_CACHES): with mock.patch('requests.get', mocked_response), self.assertLogs(level='INFO') as info_logs: FetchLinksEmbedData().consume(event) cached_data = link_embed_data_from_cache(url) self.assertTrue( 'INFO:root:Time spent on get_link_embed_data for http://test.org/audio.mp3: ' in info_logs.output[0] ) self.assertIsNone(cached_data) msg = Message.objects.select_related("sender").get(id=msg_id) self.assertEqual( (''), msg.rendered_content) @override_settings(INLINE_URL_EMBED_PREVIEW=True) def test_link_preview_no_open_graph_image(self) -> None: user = self.example_user('hamlet') self.login_user(user) url = 'http://test.org/foo.html' with mock_queue_publish('zerver.lib.actions.queue_json_publish') as patched: msg_id = self.send_stream_message(user, "Scotland", topic_name="foo", content=url) patched.assert_called_once() queue = patched.call_args[0][0] self.assertEqual(queue, "embed_links") event = patched.call_args[0][1] # HTML without the og:image metadata html = '\n'.join(line for line in self.open_graph_html.splitlines() if 'og:image' not in line) mocked_response = mock.Mock(side_effect=self.create_mock_response(url, html=html)) with self.settings(TEST_SUITE=False, CACHES=TEST_CACHES): with mock.patch('requests.get', mocked_response), self.assertLogs(level='INFO') as info_logs: FetchLinksEmbedData().consume(event) cached_data = link_embed_data_from_cache(url) self.assertTrue( 'INFO:root:Time spent on get_link_embed_data for http://test.org/foo.html: ' in info_logs.output[0] ) self.assertIn('title', cached_data) self.assertNotIn('image', cached_data) msg = Message.objects.select_related("sender").get(id=msg_id) self.assertEqual( (''), msg.rendered_content) @override_settings(INLINE_URL_EMBED_PREVIEW=True) def test_link_preview_open_graph_image_missing_content(self) -> None: user = self.example_user('hamlet') self.login_user(user) url = 'http://test.org/foo.html' with mock_queue_publish('zerver.lib.actions.queue_json_publish') as patched: msg_id = self.send_stream_message(user, "Scotland", topic_name="foo", content=url) patched.assert_called_once() queue = patched.call_args[0][0] self.assertEqual(queue, "embed_links") event = patched.call_args[0][1] # HTML without the og:image metadata html = '\n'.join(line if 'og:image' not in line else '' for line in self.open_graph_html.splitlines()) mocked_response = mock.Mock(side_effect=self.create_mock_response(url, html=html)) with self.settings(TEST_SUITE=False, CACHES=TEST_CACHES): with mock.patch('requests.get', mocked_response), self.assertLogs(level='INFO') as info_logs: FetchLinksEmbedData().consume(event) cached_data = link_embed_data_from_cache(url) self.assertTrue( 'INFO:root:Time spent on get_link_embed_data for http://test.org/foo.html: ' in info_logs.output[0] ) self.assertIn('title', cached_data) self.assertNotIn('image', cached_data) msg = Message.objects.select_related("sender").get(id=msg_id) self.assertEqual( (''), msg.rendered_content) @override_settings(INLINE_URL_EMBED_PREVIEW=True) def test_link_preview_no_content_type_header(self) -> None: user = self.example_user('hamlet') self.login_user(user) url = 'http://test.org/' with mock_queue_publish('zerver.lib.actions.queue_json_publish') as patched: msg_id = self.send_stream_message(user, "Scotland", topic_name="foo", content=url) patched.assert_called_once() queue = patched.call_args[0][0] self.assertEqual(queue, "embed_links") event = patched.call_args[0][1] headers = {'content-type': ''} # No content type header mocked_response = mock.Mock(side_effect=self.create_mock_response(url, headers=headers)) with self.settings(TEST_SUITE=False, CACHES=TEST_CACHES): with mock.patch('requests.get', mocked_response), self.assertLogs(level='INFO') as info_logs: FetchLinksEmbedData().consume(event) data = link_embed_data_from_cache(url) self.assertTrue( 'INFO:root:Time spent on get_link_embed_data for http://test.org/: ' in info_logs.output[0] ) self.assertIn('title', data) self.assertIn('image', data) msg = Message.objects.select_related("sender").get(id=msg_id) self.assertIn(data['title'], msg.rendered_content) self.assertIn(data['image'], msg.rendered_content) @override_settings(INLINE_URL_EMBED_PREVIEW=True) def test_valid_content_type_error_get_data(self) -> None: url = 'http://test.org/' with mock_queue_publish('zerver.lib.actions.queue_json_publish'): msg_id = self.send_personal_message( self.example_user('hamlet'), self.example_user('cordelia'), content=url, ) msg = Message.objects.select_related("sender").get(id=msg_id) event = { 'message_id': msg_id, 'urls': [url], 'message_realm_id': msg.sender.realm_id, 'message_content': url} with mock.patch('zerver.lib.url_preview.preview.get_oembed_data', side_effect=lambda *args, **kwargs: None): with mock.patch('zerver.lib.url_preview.preview.valid_content_type', side_effect=lambda k: True): with self.settings(TEST_SUITE=False, CACHES=TEST_CACHES): with mock.patch('requests.get', mock.Mock(side_effect=ConnectionError())), \ self.assertLogs(level='INFO') as info_logs: FetchLinksEmbedData().consume(event) self.assertTrue( 'INFO:root:Time spent on get_link_embed_data for http://test.org/: ' in info_logs.output[0] ) with self.assertRaises(NotFoundInCache): link_embed_data_from_cache(url) msg.refresh_from_db() self.assertEqual( '', msg.rendered_content) @override_settings(INLINE_URL_EMBED_PREVIEW=True) def test_invalid_url(self) -> None: url = 'http://test.org/' error_url = 'http://test.org/x' with mock_queue_publish('zerver.lib.actions.queue_json_publish'): msg_id = self.send_personal_message( self.example_user('hamlet'), self.example_user('cordelia'), content=error_url, ) msg = Message.objects.select_related("sender").get(id=msg_id) event = { 'message_id': msg_id, 'urls': [error_url], 'message_realm_id': msg.sender.realm_id, 'message_content': error_url} mocked_response = mock.Mock(side_effect=self.create_mock_response(url)) with self.settings(TEST_SUITE=False, CACHES=TEST_CACHES): with mock.patch('requests.get', mocked_response), self.assertLogs(level='INFO') as info_logs: FetchLinksEmbedData().consume(event) self.assertTrue( 'INFO:root:Time spent on get_link_embed_data for http://test.org/x: ' in info_logs.output[0] ) cached_data = link_embed_data_from_cache(error_url) # FIXME: Should we really cache this, especially without cache invalidation? self.assertIsNone(cached_data) msg.refresh_from_db() self.assertEqual( '', msg.rendered_content) @override_settings(INLINE_URL_EMBED_PREVIEW=True) def test_safe_oembed_html_url(self) -> None: url = 'http://test.org/' with mock_queue_publish('zerver.lib.actions.queue_json_publish'): msg_id = self.send_personal_message( self.example_user('hamlet'), self.example_user('cordelia'), content=url, ) msg = Message.objects.select_related("sender").get(id=msg_id) event = { 'message_id': msg_id, 'urls': [url], 'message_realm_id': msg.sender.realm_id, 'message_content': url} mocked_data = {'html': f'', 'oembed': True, 'type': 'video', 'image': f'{url}/image.png'} mocked_response = mock.Mock(side_effect=self.create_mock_response(url)) with self.settings(TEST_SUITE=False, CACHES=TEST_CACHES): with mock.patch('requests.get', mocked_response), self.assertLogs(level='INFO') as info_logs: with mock.patch('zerver.lib.url_preview.preview.get_oembed_data', lambda *args, **kwargs: mocked_data): FetchLinksEmbedData().consume(event) data = link_embed_data_from_cache(url) self.assertTrue( 'INFO:root:Time spent on get_link_embed_data for http://test.org/: ' in info_logs.output[0] ) self.assertEqual(data, mocked_data) msg.refresh_from_db() self.assertIn('a data-id="{}"'.format(escape(mocked_data['html'])), msg.rendered_content) @override_settings(INLINE_URL_EMBED_PREVIEW=True) def test_youtube_url_title_replaces_url(self) -> None: url = 'https://www.youtube.com/watch?v=eSJTXC7Ixgg' with mock_queue_publish('zerver.lib.actions.queue_json_publish'): msg_id = self.send_personal_message( self.example_user('hamlet'), self.example_user('cordelia'), content=url, ) msg = Message.objects.select_related("sender").get(id=msg_id) event = { 'message_id': msg_id, 'urls': [url], 'message_realm_id': msg.sender.realm_id, 'message_content': url} mocked_data = {'title': 'Clearer Code at Scale - Static Types at Zulip and Dropbox'} mocked_response = mock.Mock(side_effect=self.create_mock_response(url)) with self.settings(TEST_SUITE=False, CACHES=TEST_CACHES): with mock.patch('requests.get', mocked_response), self.assertLogs(level='INFO') as info_logs: with mock.patch('zerver.lib.markdown.link_preview.link_embed_data_from_cache', lambda *args, **kwargs: mocked_data): FetchLinksEmbedData().consume(event) self.assertTrue( 'INFO:root:Time spent on get_link_embed_data for https://www.youtube.com/watch?v=eSJTXC7Ixgg:' in info_logs.output[0] ) msg.refresh_from_db() expected_content = 'YouTube - Clearer Code at Scale - Static Types at Zulip and Dropbox
\n ' self.assertEqual(expected_content, msg.rendered_content)