# -*- coding: utf-8 -*- import mock import ujson from typing import Any from requests.exceptions import ConnectionError from django.test import override_settings from zerver.models import Recipient, Message from zerver.lib.actions import queue_json_publish from zerver.lib.test_classes import ZulipTestCase from zerver.lib.test_helpers import MockPythonResponse from zerver.worker.queue_processors import FetchLinksEmbedData from zerver.lib.url_preview.preview import ( get_link_embed_data, link_embed_data_from_cache) from zerver.lib.url_preview.oembed import get_oembed_data from zerver.lib.url_preview.parsers import ( OpenGraphParser, GenericParser) from zerver.lib.cache import cache_set, NotFoundInCache, preview_url_cache_key TEST_CACHES = { 'default': { 'BACKEND': 'django.core.cache.backends.locmem.LocMemCache', 'LOCATION': 'default', }, 'database': { 'BACKEND': 'django.core.cache.backends.locmem.LocMemCache', 'LOCATION': 'url-preview', } } @override_settings(INLINE_URL_EMBED_PREVIEW=True) class OembedTestCase(ZulipTestCase): @mock.patch('pyoembed.requests.get') def test_present_provider(self, get: Any) -> None: get.return_value = response = mock.Mock() response.headers = {'content-type': 'application/json'} response.ok = True response_data = { 'type': 'rich', 'thumbnail_url': 'https://scontent.cdninstagram.com/t51.2885-15/n.jpg', 'thumbnail_width': 640, 'thumbnail_height': 426, 'title': 'NASA', 'html': '
test
', 'version': '1.0', 'width': 658, 'height': None} response.text = ujson.dumps(response_data) url = 'http://instagram.com/p/BLtI2WdAymy' data = get_oembed_data(url) self.assertIsInstance(data, dict) self.assertIn('title', data) assert data is not None # allow mypy to infer data is indexable self.assertEqual(data['title'], response_data['title']) @mock.patch('pyoembed.requests.get') def test_error_request(self, get: Any) -> None: get.return_value = response = mock.Mock() response.ok = False url = 'http://instagram.com/p/BLtI2WdAymy' data = get_oembed_data(url) self.assertIsNone(data) class OpenGraphParserTestCase(ZulipTestCase): def test_page_with_og(self) -> None: html = """ """ parser = OpenGraphParser(html) result = parser.extract_data() self.assertIn('title', result) self.assertEqual(result['title'], 'The Rock') self.assertEqual(result.get('description'), 'The Rock film') class GenericParserTestCase(ZulipTestCase): def test_parser(self) -> None: html = """Description text
""" parser = GenericParser(html) result = parser.extract_data() self.assertEqual(result.get('title'), 'Test title') self.assertEqual(result.get('description'), 'Description text') def test_extract_image(self) -> None: html = """Description text
Description text
Description text
""" @override_settings(INLINE_URL_EMBED_PREVIEW=True) def test_edit_message_history(self) -> None: email = self.example_email('hamlet') self.login(email) msg_id = self.send_stream_message(email, "Scotland", topic_name="editing", content="original") url = 'http://test.org/' response = MockPythonResponse(self.open_graph_html, 200) mocked_response = mock.Mock( side_effect=lambda k: {url: response}.get(k, MockPythonResponse('', 404))) with mock.patch('zerver.views.messages.queue_json_publish') as patched: result = self.client_patch("/json/messages/" + str(msg_id), { 'message_id': msg_id, 'content': url, }) self.assert_json_success(result) patched.assert_called_once() queue = patched.call_args[0][0] self.assertEqual(queue, "embed_links") event = patched.call_args[0][1] with self.settings(TEST_SUITE=False, CACHES=TEST_CACHES): with mock.patch('requests.get', mocked_response): FetchLinksEmbedData().consume(event) embedded_link = 'The Rock'.format(url) msg = Message.objects.select_related("sender").get(id=msg_id) self.assertIn(embedded_link, msg.rendered_content) @override_settings(INLINE_URL_EMBED_PREVIEW=True) def _send_message_with_test_org_url(self, sender_email: str, queue_should_run: bool=True, relative_url: bool=False) -> Message: url = 'http://test.org/' with mock.patch('zerver.lib.actions.queue_json_publish') as patched: msg_id = self.send_personal_message( sender_email, self.example_email('cordelia'), content=url, ) if queue_should_run: patched.assert_called_once() queue = patched.call_args[0][0] self.assertEqual(queue, "embed_links") event = patched.call_args[0][1] else: patched.assert_not_called() # If we nothing was put in the queue, we don't need to # run the queue processor or any of the following code return Message.objects.select_related("sender").get(id=msg_id) # Verify the initial message doesn't have the embedded links rendered msg = Message.objects.select_related("sender").get(id=msg_id) self.assertNotIn( 'The Rock'.format(url), msg.rendered_content) # Mock the network request result so the test can be fast without Internet response = MockPythonResponse(self.open_graph_html, 200) if relative_url is True: response = MockPythonResponse(self.open_graph_html.replace('http://ia.media-imdb.com', ''), 200) mocked_response = mock.Mock( side_effect=lambda k: {url: response}.get(k, MockPythonResponse('', 404))) # Run the queue processor to potentially rerender things with self.settings(TEST_SUITE=False, CACHES=TEST_CACHES): with mock.patch('requests.get', mocked_response): FetchLinksEmbedData().consume(event) msg = Message.objects.select_related("sender").get(id=msg_id) return msg @override_settings(INLINE_URL_EMBED_PREVIEW=True) def test_message_update_race_condition(self) -> None: email = self.example_email('hamlet') self.login(email) original_url = 'http://test.org/' edited_url = 'http://edited.org/' with mock.patch('zerver.lib.actions.queue_json_publish') as patched: msg_id = self.send_stream_message(email, "Scotland", topic_name="foo", content=original_url) patched.assert_called_once() queue = patched.call_args[0][0] self.assertEqual(queue, "embed_links") event = patched.call_args[0][1] def wrapped_queue_json_publish(*args: Any, **kwargs: Any) -> None: # Mock the network request result so the test can be fast without Internet response = MockPythonResponse(self.open_graph_html, 200) mocked_response_original = mock.Mock( side_effect=lambda k: {original_url: response}.get(k, MockPythonResponse('', 404))) mocked_response_edited = mock.Mock( side_effect=lambda k: {edited_url: response}.get(k, MockPythonResponse('', 404))) with self.settings(TEST_SUITE=False, CACHES=TEST_CACHES): with mock.patch('requests.get', mocked_response_original): # Run the queue processor. This will simulate the event for original_url being # processed after the message has been edited. FetchLinksEmbedData().consume(event) msg = Message.objects.select_related("sender").get(id=msg_id) # The content of the message has changed since the event for original_url has been created, # it should not be rendered. Another, up-to-date event will have been sent (edited_url). self.assertNotIn('The Rock'.format(original_url), msg.rendered_content) mocked_response_edited.assert_not_called() with self.settings(TEST_SUITE=False, CACHES=TEST_CACHES): with mock.patch('requests.get', mocked_response_edited): # Now proceed with the original queue_json_publish and call the # up-to-date event for edited_url. queue_json_publish(*args, **kwargs) msg = Message.objects.select_related("sender").get(id=msg_id) self.assertIn('The Rock'.format(edited_url), msg.rendered_content) with mock.patch('zerver.views.messages.queue_json_publish', wraps=wrapped_queue_json_publish) as patched: result = self.client_patch("/json/messages/" + str(msg_id), { 'message_id': msg_id, 'content': edited_url, }) self.assert_json_success(result) def test_get_link_embed_data(self) -> None: url = 'http://test.org/' embedded_link = 'The Rock'.format(url) # When humans send, we should get embedded content. msg = self._send_message_with_test_org_url(sender_email=self.example_email('hamlet')) self.assertIn(embedded_link, msg.rendered_content) # We don't want embedded content for bots. msg = self._send_message_with_test_org_url(sender_email='webhook-bot@zulip.com', queue_should_run=False) self.assertNotIn(embedded_link, msg.rendered_content) # Try another human to make sure bot failure was due to the # bot sending the message and not some other reason. msg = self._send_message_with_test_org_url(sender_email=self.example_email('prospero')) self.assertIn(embedded_link, msg.rendered_content) def test_inline_url_embed_preview(self) -> None: with_preview = '\n ' without_preview = '' msg = self._send_message_with_test_org_url(sender_email=self.example_email('hamlet')) self.assertEqual(msg.rendered_content, with_preview) realm = msg.get_realm() setattr(realm, 'inline_url_embed_preview', False) realm.save() msg = self._send_message_with_test_org_url(sender_email=self.example_email('prospero'), queue_should_run=False) self.assertEqual(msg.rendered_content, without_preview) @override_settings(INLINE_URL_EMBED_PREVIEW=True) def test_inline_relative_url_embed_preview(self) -> None: # Relative urls should not be sent for url preview. with mock.patch('zerver.lib.actions.queue_json_publish') as patched: self.send_personal_message( self.example_email('prospero'), self.example_email('cordelia'), content="http://zulip.testserver/api/", ) patched.assert_not_called() def test_inline_url_embed_preview_with_relative_image_url(self) -> None: with_preview_relative = '\n ' # Try case where the opengraph image is a relative url. msg = self._send_message_with_test_org_url(sender_email=self.example_email('prospero'), relative_url=True) self.assertEqual(msg.rendered_content, with_preview_relative) def test_http_error_get_data(self) -> None: url = 'http://test.org/' msg_id = self.send_personal_message( self.example_email('hamlet'), self.example_email('cordelia'), content=url, ) msg = Message.objects.select_related("sender").get(id=msg_id) event = { 'message_id': msg_id, 'urls': [url], 'message_realm_id': msg.sender.realm_id, 'message_content': url} with self.settings(INLINE_URL_EMBED_PREVIEW=True, TEST_SUITE=False, CACHES=TEST_CACHES): with mock.patch('requests.get', mock.Mock(side_effect=ConnectionError())): with mock.patch('logging.error') as error_mock: FetchLinksEmbedData().consume(event) self.assertEqual(error_mock.call_count, 1) msg = Message.objects.get(id=msg_id) self.assertEqual( '', msg.rendered_content) def test_invalid_link(self) -> None: with self.settings(INLINE_URL_EMBED_PREVIEW=True, TEST_SUITE=False, CACHES=TEST_CACHES): self.assertIsNone(get_link_embed_data('com.notvalidlink')) self.assertIsNone(get_link_embed_data(u'μένει.com.notvalidlink')) def test_link_embed_data_from_cache(self) -> None: url = 'http://test.org/' link_embed_data = 'test data' with self.assertRaises(NotFoundInCache): link_embed_data_from_cache(url) with self.settings(CACHES=TEST_CACHES): key = preview_url_cache_key(url) cache_set(key, link_embed_data, 'database') self.assertEqual(link_embed_data, link_embed_data_from_cache(url))