zulip/zerver/tests/test_markdown_thumbnail.py

385 lines
18 KiB
Python

import re
from unittest.mock import patch
import pyvips
from zerver.actions.message_delete import do_delete_messages
from zerver.actions.message_send import check_message, do_send_messages
from zerver.lib.addressee import Addressee
from zerver.lib.camo import get_camo_url
from zerver.lib.markdown import render_message_markdown
from zerver.lib.test_classes import ZulipTestCase
from zerver.lib.test_helpers import get_test_image_file, read_test_image_file
from zerver.lib.thumbnail import ThumbnailFormat
from zerver.lib.upload import upload_message_attachment
from zerver.models import (
ArchivedAttachment,
ArchivedMessage,
Attachment,
Client,
ImageAttachment,
Message,
)
from zerver.models.clients import get_client
from zerver.worker.thumbnail import ensure_thumbnails
class MarkdownThumbnailTest(ZulipTestCase):
def upload_image(self, image_name: str) -> str:
self.login("othello")
with get_test_image_file(image_name) as image_file:
response = self.assert_json_success(
self.client_post("/json/user_uploads", {"file": image_file})
)
return re.sub(r"/user_uploads/", "", response["url"])
def upload_and_thumbnail_image(self, image_name: str) -> str:
with self.captureOnCommitCallbacks(execute=True):
# Running captureOnCommitCallbacks includes inserting into
# the Rabbitmq queue, which in testing means we
# immediately run the worker for it, producing the thumbnails.
return self.upload_image(image_name)
def assert_message_content_is(
self, message_id: int, rendered_content: str, user_name: str = "othello"
) -> None:
sender_user_profile = self.example_user(user_name)
result = self.assert_json_success(
self.api_get(sender_user_profile, f"/api/v1/messages/{message_id}")
)
self.assertEqual(result["message"]["content"], rendered_content)
def send_message_content(
self, content: str, do_thumbnail: bool = False, user_name: str = "othello"
) -> int:
sender_user_profile = self.example_user(user_name)
return self.send_stream_message(
sender=sender_user_profile,
stream_name="Verona",
content=content,
skip_capture_on_commit_callbacks=not do_thumbnail,
)
def test_uploads_preview_order(self) -> None:
image_names = ["img.jpg", "img.png", "img.gif"]
path_ids = [self.upload_and_thumbnail_image(image_name) for image_name in image_names]
content = (
f"Test 1\n[{image_names[0]}](/user_uploads/{path_ids[0]}) \n\n"
f"Next image\n[{image_names[1]}](/user_uploads/{path_ids[1]}) \n\n"
f"Another screenshot\n[{image_names[2]}](/user_uploads/{path_ids[2]})"
)
sender_user_profile = self.example_user("othello")
msg = Message(
sender=sender_user_profile,
sending_client=get_client("test"),
realm=sender_user_profile.realm,
)
converted = render_message_markdown(msg, content)
self.assertEqual(
converted.rendered_content,
(
"<p>Test 1<br>\n"
f'<a href="/user_uploads/{path_ids[0]}">{image_names[0]}</a> </p>\n'
f'<div class="message_inline_image"><a href="/user_uploads/{path_ids[0]}" title="{image_names[0]}">'
f'<img data-original-dimensions="128x128" src="/user_uploads/thumbnail/{path_ids[0]}/840x560.webp"></a></div>'
"<p>Next image<br>\n"
f'<a href="/user_uploads/{path_ids[1]}">{image_names[1]}</a> </p>\n'
f'<div class="message_inline_image"><a href="/user_uploads/{path_ids[1]}" title="{image_names[1]}">'
f'<img data-original-dimensions="128x128" src="/user_uploads/thumbnail/{path_ids[1]}/840x560.webp"></a></div>'
"<p>Another screenshot<br>\n"
f'<a href="/user_uploads/{path_ids[2]}">{image_names[2]}</a></p>\n'
f'<div class="message_inline_image"><a href="/user_uploads/{path_ids[2]}" title="{image_names[2]}">'
f'<img data-original-dimensions="128x128" src="/user_uploads/thumbnail/{path_ids[2]}/840x560.webp"></a></div>'
),
)
def test_thumbnail_code_block(self) -> None:
url = "http://example.com/image.png"
path_id = self.upload_and_thumbnail_image("img.png")
# We have a path_id of an image in the message content, so we
# will prefetch the thumbnail metadata -- but not insert it.
sender_user_profile = self.example_user("othello")
msg = Message(
sender=sender_user_profile,
sending_client=get_client("test"),
realm=sender_user_profile.realm,
)
converted = render_message_markdown(msg, f"{url}\n```\n/user_uploads/{path_id}\n```")
self.assertEqual(
converted.rendered_content,
(
f'<div class="message_inline_image"><a href="{url}"><img src="{get_camo_url(url)}"></a></div>'
f'<div class="codehilite"><pre><span></span><code>/user_uploads/{path_id}\n'
"</code></pre></div>"
),
)
def test_thumbnail_after_send(self) -> None:
with self.captureOnCommitCallbacks(execute=True):
path_id = self.upload_image("img.png")
content = f"[image](/user_uploads/{path_id})"
expected = (
f'<p><a href="/user_uploads/{path_id}">image</a></p>\n'
f'<div class="message_inline_image"><a href="/user_uploads/{path_id}" title="image">'
'<img class="image-loading-placeholder" src="/static/images/loading/loader-black.svg"></a></div>'
)
message_id = self.send_message_content(content)
self.assert_message_content_is(message_id, expected)
# Exit the block and run thumbnailing
expected = (
f'<p><a href="/user_uploads/{path_id}">image</a></p>\n'
f'<div class="message_inline_image"><a href="/user_uploads/{path_id}" title="image">'
f'<img data-original-dimensions="128x128" src="/user_uploads/thumbnail/{path_id}/840x560.webp"></a></div>'
)
self.assert_message_content_is(message_id, expected)
def test_thumbnail_escaping(self) -> None:
self.login("othello")
with self.captureOnCommitCallbacks(execute=True):
url = upload_message_attachment(
"I am 95% ± 5% certain!",
"image/png",
read_test_image_file("img.png"),
self.example_user("othello"),
)
path_id = re.sub(r"/user_uploads/", "", url)
self.assertTrue(ImageAttachment.objects.filter(path_id=path_id).exists())
message_id = self.send_message_content(f"[I am 95% ± 5% certain!](/user_uploads/{path_id})")
expected = (
f'<p><a href="/user_uploads/{path_id}">I am 95% &plusmn; 5% certain!</a></p>\n'
f'<div class="message_inline_image"><a href="/user_uploads/{path_id}" title="I am 95% &plusmn; 5% certain!">'
f'<img data-original-dimensions="128x128" src="/user_uploads/thumbnail/{path_id}/840x560.webp"></a></div>'
)
self.assert_message_content_is(message_id, expected)
def test_thumbnail_repeated(self) -> None:
# We currently have no way to generate a thumbnailing event
# for the worker except during upload, meaning that we will
# never repeat a ImageAttachment thumbnailing. However, the
# code supports it, so test it.
# Thumbnail with one set of sizes
with self.thumbnail_formats(
ThumbnailFormat("webp", 100, 75, animated=True),
ThumbnailFormat("webp", 100, 75, animated=False),
):
path_id = self.upload_and_thumbnail_image("animated_unequal_img.gif")
content = f"[animated_unequal_img.gif](/user_uploads/{path_id})"
expected = (
f'<p><a href="/user_uploads/{path_id}">animated_unequal_img.gif</a></p>\n'
f'<div class="message_inline_image"><a href="/user_uploads/{path_id}" title="animated_unequal_img.gif">'
'<img data-animated="true" data-original-dimensions="128x56"'
f' src="/user_uploads/thumbnail/{path_id}/100x75-anim.webp"></a></div>'
)
message_id = self.send_message_content(content, do_thumbnail=True)
self.assert_message_content_is(message_id, expected)
self.assert_length(ImageAttachment.objects.get(path_id=path_id).thumbnail_metadata, 2)
# Re-thumbnail with a non-overlapping set of sizes
with self.thumbnail_formats(ThumbnailFormat("jpg", 100, 75, animated=False)):
ensure_thumbnails(ImageAttachment.objects.get(path_id=path_id))
# We generate a new size but leave the old ones
self.assert_length(ImageAttachment.objects.get(path_id=path_id).thumbnail_metadata, 3)
# And the contents are not updated to the new size
self.assert_message_content_is(message_id, expected)
def test_thumbnail_sequential_edits(self) -> None:
first_path_id = self.upload_image("img.png")
second_path_id = self.upload_image("img.jpg")
message_id = self.send_message_content(
f"[first image](/user_uploads/{first_path_id})\n[second image](/user_uploads/{second_path_id})",
do_thumbnail=False,
)
self.assert_message_content_is(
message_id,
(
f'<p><a href="/user_uploads/{first_path_id}">first image</a><br>\n'
f'<a href="/user_uploads/{second_path_id}">second image</a></p>\n'
f'<div class="message_inline_image"><a href="/user_uploads/{first_path_id}" title="first image">'
'<img class="image-loading-placeholder" src="/static/images/loading/loader-black.svg"></a></div>'
f'<div class="message_inline_image"><a href="/user_uploads/{second_path_id}" title="second image">'
'<img class="image-loading-placeholder" src="/static/images/loading/loader-black.svg"></a></div>'
),
)
# Complete thumbnailing the second image first -- replacing only that spinner
ensure_thumbnails(ImageAttachment.objects.get(path_id=second_path_id))
self.assert_message_content_is(
message_id,
(
f'<p><a href="/user_uploads/{first_path_id}">first image</a><br>\n'
f'<a href="/user_uploads/{second_path_id}">second image</a></p>\n'
f'<div class="message_inline_image"><a href="/user_uploads/{first_path_id}" title="first image">'
'<img class="image-loading-placeholder" src="/static/images/loading/loader-black.svg"></a></div>'
f'<div class="message_inline_image"><a href="/user_uploads/{second_path_id}" title="second image">'
f'<img data-original-dimensions="128x128" src="/user_uploads/thumbnail/{second_path_id}/840x560.webp"></a></div>'
),
)
# Finish the other thumbnail
ensure_thumbnails(ImageAttachment.objects.get(path_id=first_path_id))
self.assert_message_content_is(
message_id,
(
f'<p><a href="/user_uploads/{first_path_id}">first image</a><br>\n'
f'<a href="/user_uploads/{second_path_id}">second image</a></p>\n'
f'<div class="message_inline_image"><a href="/user_uploads/{first_path_id}" title="first image">'
f'<img data-original-dimensions="128x128" src="/user_uploads/thumbnail/{first_path_id}/840x560.webp"></a></div>'
f'<div class="message_inline_image"><a href="/user_uploads/{second_path_id}" title="second image">'
f'<img data-original-dimensions="128x128" src="/user_uploads/thumbnail/{second_path_id}/840x560.webp"></a></div>'
),
)
def test_thumbnail_of_deleted(self) -> None:
sender_user_profile = self.example_user("othello")
path_id = self.upload_image("img.png")
message_id = self.send_message_content(f"[image](/user_uploads/{path_id})")
# Delete the message
do_delete_messages(
sender_user_profile.realm, [Message.objects.get(id=message_id)], acting_user=None
)
# There is still an ImageAttachment row
self.assertFalse(Attachment.objects.filter(path_id=path_id).exists())
self.assertTrue(ArchivedAttachment.objects.filter(path_id=path_id).exists())
self.assertTrue(ImageAttachment.objects.filter(path_id=path_id).exists())
# Completing rendering after it is deleted should work, and
# update the rendered content in the archived message
ensure_thumbnails(ImageAttachment.objects.get(path_id=path_id))
expected = (
f'<p><a href="/user_uploads/{path_id}">image</a></p>\n'
f'<div class="message_inline_image"><a href="/user_uploads/{path_id}" title="image">'
f'<img data-original-dimensions="128x128" src="/user_uploads/thumbnail/{path_id}/840x560.webp"></a></div>'
)
self.assertEqual(
ArchivedMessage.objects.get(id=message_id).rendered_content,
expected,
)
# See test_delete_unclaimed_attachments for tests of the
# archiving process itself, and how it interacts with
# thumbnails.
def test_thumbnail_bad_image(self) -> None:
"""Test what happens if the file looks fine, but resizing later fails"""
path_id = self.upload_image("img.png")
message_id = self.send_message_content(f"[image](/user_uploads/{path_id})")
self.assert_length(ImageAttachment.objects.get(path_id=path_id).thumbnail_metadata, 0)
# If the image is found to be bad, we remove all trace of the preview
with (
patch.object(
pyvips.Image, "thumbnail_buffer", side_effect=pyvips.Error("some bad error")
) as thumb_mock,
self.assertLogs("zerver.worker.thumbnail", "ERROR") as thumbnail_logs,
):
ensure_thumbnails(ImageAttachment.objects.get(path_id=path_id))
thumb_mock.assert_called_once()
self.assert_length(thumbnail_logs.output, 1)
self.assertTrue(
thumbnail_logs.output[0].startswith("ERROR:zerver.worker.thumbnail:some bad error")
)
self.assertFalse(ImageAttachment.objects.filter(path_id=path_id).exists())
self.assert_message_content_is(
message_id, f'<p><a href="/user_uploads/{path_id}">image</a></p>'
)
def test_thumbnail_multiple_messages(self) -> None:
sender_user_profile = self.example_user("othello")
path_id = self.upload_image("img.png")
channel_message_id = self.send_message_content(f"A public [image](/user_uploads/{path_id})")
private_message_id = self.send_personal_message(
from_user=sender_user_profile,
to_user=self.example_user("hamlet"),
content=f"This [image](/user_uploads/{path_id}) is private",
)
placeholder = (
f'<div class="message_inline_image"><a href="/user_uploads/{path_id}" title="image">'
'<img class="image-loading-placeholder" src="/static/images/loading/loader-black.svg"></a></div>'
)
self.assert_message_content_is(
channel_message_id,
f'<p>A public <a href="/user_uploads/{path_id}">image</a></p>\n{placeholder}',
)
self.assert_message_content_is(
private_message_id,
f'<p>This <a href="/user_uploads/{path_id}">image</a> is private</p>\n{placeholder}',
)
with (
patch.object(
pyvips.Image, "thumbnail_buffer", wraps=pyvips.Image.thumbnail_buffer
) as thumb_mock,
self.thumbnail_formats(
ThumbnailFormat("webp", 100, 75, animated=False),
ThumbnailFormat("webp", 200, 150, animated=False),
),
):
ensure_thumbnails(ImageAttachment.objects.get(path_id=path_id))
# Called once per format
self.assertEqual(thumb_mock.call_count, 2)
rendered_thumb = (
f'<div class="message_inline_image"><a href="/user_uploads/{path_id}" title="image">'
f'<img data-original-dimensions="128x128" src="/user_uploads/thumbnail/{path_id}/100x75.webp"></a></div>'
)
self.assert_message_content_is(
channel_message_id,
f'<p>A public <a href="/user_uploads/{path_id}">image</a></p>\n{rendered_thumb}',
)
self.assert_message_content_is(
private_message_id,
f'<p>This <a href="/user_uploads/{path_id}">image</a> is private</p>\n{rendered_thumb}',
)
def test_thumbnail_race(self) -> None:
"""Test what happens when thumbnailing happens between rendering and sending"""
path_id = self.upload_image("img.png")
self.assert_length(ImageAttachment.objects.get(path_id=path_id).thumbnail_metadata, 0)
# Render, but do not send, the message referencing the image.
# This will render as a spinner, since the thumbnail has not
# been generated yet.
send_request = check_message(
self.example_user("othello"),
Client.objects.get_or_create(name="test suite")[0],
Addressee.for_stream_name("Verona", "test"),
f"[image](/user_uploads/{path_id})",
)
expected = (
f'<p><a href="/user_uploads/{path_id}">image</a></p>\n'
f'<div class="message_inline_image"><a href="/user_uploads/{path_id}" title="image">'
'<img class="image-loading-placeholder" src="/static/images/loading/loader-black.svg"></a></div>'
)
self.assertEqual(send_request.message.rendered_content, expected)
# Thumbnail the image. The message does not exist yet, so
# nothing is re-written.
ensure_thumbnails(ImageAttachment.objects.get(path_id=path_id))
# Send the message; this should re-check the ImageAttachment
# data, find the thumbnails, and update the rendered_content
# to no longer contain a spinner.
message_id = do_send_messages([send_request])[0].message_id
rendered_thumb = (
f'<div class="message_inline_image"><a href="/user_uploads/{path_id}" title="image">'
f'<img data-original-dimensions="128x128" src="/user_uploads/thumbnail/{path_id}/840x560.webp"></a></div>'
)
self.assert_message_content_is(
message_id, f'<p><a href="/user_uploads/{path_id}">image</a></p>\n{rendered_thumb}'
)