import re from unittest.mock import patch import pyvips from zerver.actions.message_delete import do_delete_messages from zerver.actions.message_send import check_message, do_send_messages from zerver.lib.addressee import Addressee from zerver.lib.camo import get_camo_url from zerver.lib.markdown import render_message_markdown from zerver.lib.test_classes import ZulipTestCase from zerver.lib.test_helpers import get_test_image_file, read_test_image_file from zerver.lib.thumbnail import ThumbnailFormat from zerver.lib.upload import upload_message_attachment from zerver.models import ( ArchivedAttachment, ArchivedMessage, Attachment, Client, ImageAttachment, Message, ) from zerver.models.clients import get_client from zerver.worker.thumbnail import ensure_thumbnails class MarkdownThumbnailTest(ZulipTestCase): def upload_image(self, image_name: str) -> str: self.login("othello") with get_test_image_file(image_name) as image_file: response = self.assert_json_success( self.client_post("/json/user_uploads", {"file": image_file}) ) return re.sub(r"/user_uploads/", "", response["url"]) def upload_and_thumbnail_image(self, image_name: str) -> str: with self.captureOnCommitCallbacks(execute=True): # Running captureOnCommitCallbacks includes inserting into # the Rabbitmq queue, which in testing means we # immediately run the worker for it, producing the thumbnails. return self.upload_image(image_name) def assert_message_content_is( self, message_id: int, rendered_content: str, user_name: str = "othello" ) -> None: sender_user_profile = self.example_user(user_name) result = self.assert_json_success( self.api_get(sender_user_profile, f"/api/v1/messages/{message_id}") ) self.assertEqual(result["message"]["content"], rendered_content) def send_message_content( self, content: str, do_thumbnail: bool = False, user_name: str = "othello" ) -> int: sender_user_profile = self.example_user(user_name) return self.send_stream_message( sender=sender_user_profile, stream_name="Verona", content=content, skip_capture_on_commit_callbacks=not do_thumbnail, ) def test_uploads_preview_order(self) -> None: image_names = ["img.jpg", "img.png", "img.gif"] path_ids = [self.upload_and_thumbnail_image(image_name) for image_name in image_names] content = ( f"Test 1\n[{image_names[0]}](/user_uploads/{path_ids[0]}) \n\n" f"Next image\n[{image_names[1]}](/user_uploads/{path_ids[1]}) \n\n" f"Another screenshot\n[{image_names[2]}](/user_uploads/{path_ids[2]})" ) sender_user_profile = self.example_user("othello") msg = Message( sender=sender_user_profile, sending_client=get_client("test"), realm=sender_user_profile.realm, ) converted = render_message_markdown(msg, content) self.assertEqual( converted.rendered_content, ( "

Test 1
\n" f'{image_names[0]}

\n' f'
' f'
' "

Next image
\n" f'{image_names[1]}

\n' f'
' f'
' "

Another screenshot
\n" f'{image_names[2]}

\n' f'
' f'
' ), ) def test_thumbnail_code_block(self) -> None: url = "http://example.com/image.png" path_id = self.upload_and_thumbnail_image("img.png") # We have a path_id of an image in the message content, so we # will prefetch the thumbnail metadata -- but not insert it. sender_user_profile = self.example_user("othello") msg = Message( sender=sender_user_profile, sending_client=get_client("test"), realm=sender_user_profile.realm, ) converted = render_message_markdown(msg, f"{url}\n```\n/user_uploads/{path_id}\n```") self.assertEqual( converted.rendered_content, ( f'
' f'
/user_uploads/{path_id}\n'
                "
" ), ) def test_thumbnail_after_send(self) -> None: with self.captureOnCommitCallbacks(execute=True): path_id = self.upload_image("img.png") content = f"[image](/user_uploads/{path_id})" expected = ( f'

image

\n' f'
' '
' ) message_id = self.send_message_content(content) self.assert_message_content_is(message_id, expected) # Exit the block and run thumbnailing expected = ( f'

image

\n' f'
' f'
' ) self.assert_message_content_is(message_id, expected) def test_thumbnail_escaping(self) -> None: self.login("othello") with self.captureOnCommitCallbacks(execute=True): url = upload_message_attachment( "I am 95% ± 5% certain!", "image/png", read_test_image_file("img.png"), self.example_user("othello"), )[0] path_id = re.sub(r"/user_uploads/", "", url) self.assertTrue(ImageAttachment.objects.filter(path_id=path_id).exists()) message_id = self.send_message_content(f"[I am 95% ± 5% certain!](/user_uploads/{path_id})") expected = ( f'

I am 95% ± 5% certain!

\n' f'
' f'
' ) self.assert_message_content_is(message_id, expected) def test_thumbnail_repeated(self) -> None: # We currently have no way to generate a thumbnailing event # for the worker except during upload, meaning that we will # never repeat a ImageAttachment thumbnailing. However, the # code supports it, so test it. # Thumbnail with one set of sizes with self.thumbnail_formats( ThumbnailFormat("webp", 100, 75, animated=True), ThumbnailFormat("webp", 100, 75, animated=False), ): path_id = self.upload_and_thumbnail_image("animated_unequal_img.gif") content = f"[animated_unequal_img.gif](/user_uploads/{path_id})" expected = ( f'

animated_unequal_img.gif

\n' f'
' '
' ) message_id = self.send_message_content(content, do_thumbnail=True) self.assert_message_content_is(message_id, expected) self.assert_length(ImageAttachment.objects.get(path_id=path_id).thumbnail_metadata, 2) # Re-thumbnail with a non-overlapping set of sizes with self.thumbnail_formats(ThumbnailFormat("jpg", 100, 75, animated=False)): ensure_thumbnails(ImageAttachment.objects.get(path_id=path_id)) # We generate a new size but leave the old ones self.assert_length(ImageAttachment.objects.get(path_id=path_id).thumbnail_metadata, 3) # And the contents are not updated to the new size self.assert_message_content_is(message_id, expected) def test_thumbnail_sequential_edits(self) -> None: first_path_id = self.upload_image("img.png") second_path_id = self.upload_image("img.jpg") message_id = self.send_message_content( f"[first image](/user_uploads/{first_path_id})\n[second image](/user_uploads/{second_path_id})", do_thumbnail=False, ) self.assert_message_content_is( message_id, ( f'

first image
\n' f'second image

\n' f'
' '
' f'
' '
' ), ) # Complete thumbnailing the second image first -- replacing only that spinner ensure_thumbnails(ImageAttachment.objects.get(path_id=second_path_id)) self.assert_message_content_is( message_id, ( f'

first image
\n' f'second image

\n' f'
' '
' f'
' f'
' ), ) # Finish the other thumbnail ensure_thumbnails(ImageAttachment.objects.get(path_id=first_path_id)) self.assert_message_content_is( message_id, ( f'

first image
\n' f'second image

\n' f'
' f'
' f'
' f'
' ), ) def test_thumbnail_of_deleted(self) -> None: sender_user_profile = self.example_user("othello") path_id = self.upload_image("img.png") message_id = self.send_message_content(f"[image](/user_uploads/{path_id})") # Delete the message do_delete_messages( sender_user_profile.realm, [Message.objects.get(id=message_id)], acting_user=None ) # There is still an ImageAttachment row self.assertFalse(Attachment.objects.filter(path_id=path_id).exists()) self.assertTrue(ArchivedAttachment.objects.filter(path_id=path_id).exists()) self.assertTrue(ImageAttachment.objects.filter(path_id=path_id).exists()) # Completing rendering after it is deleted should work, and # update the rendered content in the archived message ensure_thumbnails(ImageAttachment.objects.get(path_id=path_id)) expected = ( f'

image

\n' f'
' f'
' ) self.assertEqual( ArchivedMessage.objects.get(id=message_id).rendered_content, expected, ) # See test_delete_unclaimed_attachments for tests of the # archiving process itself, and how it interacts with # thumbnails. def test_thumbnail_bad_image(self) -> None: """Test what happens if the file looks fine, but resizing later fails""" path_id = self.upload_image("img.png") message_id = self.send_message_content(f"[image](/user_uploads/{path_id})") self.assert_length(ImageAttachment.objects.get(path_id=path_id).thumbnail_metadata, 0) # If the image is found to be bad, we remove all trace of the preview with ( patch.object( pyvips.Image, "thumbnail_buffer", side_effect=pyvips.Error("some bad error") ) as thumb_mock, self.assertLogs("zerver.worker.thumbnail", "ERROR") as thumbnail_logs, ): ensure_thumbnails(ImageAttachment.objects.get(path_id=path_id)) thumb_mock.assert_called_once() self.assert_length(thumbnail_logs.output, 1) self.assertTrue( thumbnail_logs.output[0].startswith("ERROR:zerver.worker.thumbnail:some bad error") ) self.assertFalse(ImageAttachment.objects.filter(path_id=path_id).exists()) self.assert_message_content_is( message_id, f'

image

' ) def test_thumbnail_multiple_messages(self) -> None: sender_user_profile = self.example_user("othello") path_id = self.upload_image("img.png") channel_message_id = self.send_message_content(f"A public [image](/user_uploads/{path_id})") private_message_id = self.send_personal_message( from_user=sender_user_profile, to_user=self.example_user("hamlet"), content=f"This [image](/user_uploads/{path_id}) is private", skip_capture_on_commit_callbacks=True, ) placeholder = ( f'
' '
' ) self.assert_message_content_is( channel_message_id, f'

A public image

\n{placeholder}', ) self.assert_message_content_is( private_message_id, f'

This image is private

\n{placeholder}', ) with ( patch.object( pyvips.Image, "thumbnail_buffer", wraps=pyvips.Image.thumbnail_buffer ) as thumb_mock, self.thumbnail_formats( ThumbnailFormat("webp", 100, 75, animated=False), ThumbnailFormat("webp", 200, 150, animated=False), ), ): ensure_thumbnails(ImageAttachment.objects.get(path_id=path_id)) # Called once per format self.assertEqual(thumb_mock.call_count, 2) rendered_thumb = ( f'
' f'
' ) self.assert_message_content_is( channel_message_id, f'

A public image

\n{rendered_thumb}', ) self.assert_message_content_is( private_message_id, f'

This image is private

\n{rendered_thumb}', ) def test_thumbnail_race(self) -> None: """Test what happens when thumbnailing happens between rendering and sending""" path_id = self.upload_image("img.png") self.assert_length(ImageAttachment.objects.get(path_id=path_id).thumbnail_metadata, 0) # Render, but do not send, the message referencing the image. # This will render as a spinner, since the thumbnail has not # been generated yet. send_request = check_message( self.example_user("othello"), Client.objects.get_or_create(name="test suite")[0], Addressee.for_stream_name("Verona", "test"), f"[image](/user_uploads/{path_id})", ) expected = ( f'

image

\n' f'
' '
' ) self.assertEqual(send_request.message.rendered_content, expected) # Thumbnail the image. The message does not exist yet, so # nothing is re-written. ensure_thumbnails(ImageAttachment.objects.get(path_id=path_id)) # Send the message; this should re-check the ImageAttachment # data, find the thumbnails, and update the rendered_content # to no longer contain a spinner. message_id = do_send_messages([send_request])[0].message_id rendered_thumb = ( f'
' f'
' ) self.assert_message_content_is( message_id, f'

image

\n{rendered_thumb}' ) def test_thumbnail_historical_image(self) -> None: # Note that this is outside the captureOnCommitCallbacks, so # we don't actually run thumbnailing for it. This results in # a ImageAttachment row but no thumbnails, which matches the # state of backfilled previously-uploaded images. path_id = self.upload_image("img.png") with self.captureOnCommitCallbacks(execute=True): message_id = self.send_message_content(f"An [image](/user_uploads/{path_id})") content = f"[image](/user_uploads/{path_id})" expected = ( f'

image

\n' f'
' '
' ) message_id = self.send_message_content(content) self.assert_message_content_is(message_id, expected) # Exiting the block should have run the thumbnailing that was # enqueued when rendering the message. expected = ( f'

image

\n' f'
' f'
' ) self.assert_message_content_is(message_id, expected)