2016-04-14 16:26:01 +02:00
|
|
|
# -*- coding: utf-8 -*-
|
|
|
|
from django.conf import settings
|
2017-10-28 22:52:40 +02:00
|
|
|
from django.test import TestCase
|
2016-04-14 16:26:01 +02:00
|
|
|
from unittest import skip
|
|
|
|
|
2017-10-10 03:53:25 +02:00
|
|
|
from zerver.lib.avatar import (
|
|
|
|
avatar_url,
|
|
|
|
get_avatar_field,
|
|
|
|
)
|
2017-12-21 09:37:59 +01:00
|
|
|
from zerver.lib.avatar_hash import user_avatar_path
|
2016-06-14 04:38:30 +02:00
|
|
|
from zerver.lib.bugdown import url_filename
|
2017-02-21 03:41:20 +01:00
|
|
|
from zerver.lib.realm_icon import realm_icon_url
|
2017-02-16 10:10:37 +01:00
|
|
|
from zerver.lib.test_classes import ZulipTestCase, UploadSerializeMixin
|
2017-03-08 19:47:42 +01:00
|
|
|
from zerver.lib.test_helpers import (
|
|
|
|
avatar_disk_path,
|
|
|
|
get_test_image_file,
|
|
|
|
POSTRequestMock,
|
2017-10-28 22:52:40 +02:00
|
|
|
use_s3_backend,
|
2018-06-05 21:12:28 +02:00
|
|
|
queries_captured,
|
2017-03-08 19:47:42 +01:00
|
|
|
)
|
2016-04-14 16:26:01 +02:00
|
|
|
from zerver.lib.test_runner import slow
|
2016-06-09 07:53:35 +02:00
|
|
|
from zerver.lib.upload import sanitize_name, S3UploadBackend, \
|
2018-05-15 00:10:30 +02:00
|
|
|
upload_message_file, upload_emoji_image, delete_message_image, LocalUploadBackend, \
|
2018-04-15 13:20:36 +02:00
|
|
|
ZulipUploadBackend, MEDIUM_AVATAR_SIZE, resize_avatar, \
|
2018-05-14 21:33:51 +02:00
|
|
|
resize_emoji, BadImageError, get_realm_for_filename, \
|
2018-05-29 17:29:57 +02:00
|
|
|
currently_used_upload_space, DEFAULT_AVATAR_SIZE, DEFAULT_EMOJI_SIZE, \
|
|
|
|
exif_rotate
|
2016-06-09 07:53:35 +02:00
|
|
|
import zerver.lib.upload
|
2017-10-28 16:17:29 +02:00
|
|
|
from zerver.models import Attachment, get_user, \
|
2016-06-17 19:48:17 +02:00
|
|
|
get_old_unclaimed_attachments, Message, UserProfile, Stream, Realm, \
|
2018-06-05 21:12:28 +02:00
|
|
|
RealmDomain, RealmEmoji, get_realm, get_system_bot, \
|
|
|
|
validate_attachment_request
|
2017-08-18 12:26:43 +02:00
|
|
|
from zerver.lib.actions import (
|
|
|
|
do_delete_old_unclaimed_attachments,
|
|
|
|
internal_send_private_message,
|
|
|
|
)
|
2018-06-06 14:30:26 +02:00
|
|
|
from zerver.lib.create_user import copy_user_settings
|
2018-04-15 13:20:36 +02:00
|
|
|
from zerver.lib.request import JsonableError
|
2018-02-12 18:18:03 +01:00
|
|
|
from zerver.views.upload import upload_file_backend, serve_local
|
2017-03-08 19:47:42 +01:00
|
|
|
|
2017-11-05 05:30:31 +01:00
|
|
|
import urllib
|
2016-11-07 05:02:13 +01:00
|
|
|
from PIL import Image
|
2016-04-14 16:26:01 +02:00
|
|
|
|
|
|
|
from boto.s3.connection import S3Connection
|
|
|
|
from boto.s3.key import Key
|
2017-11-06 02:56:09 +01:00
|
|
|
from io import StringIO
|
2016-09-16 17:11:54 +02:00
|
|
|
import mock
|
2016-04-14 23:44:39 +02:00
|
|
|
import os
|
2016-11-07 05:02:13 +01:00
|
|
|
import io
|
2016-04-14 23:44:39 +02:00
|
|
|
import shutil
|
2016-03-24 20:24:01 +01:00
|
|
|
import re
|
|
|
|
import datetime
|
2016-06-25 11:05:59 +02:00
|
|
|
import requests
|
|
|
|
import base64
|
2016-03-24 20:24:01 +01:00
|
|
|
from datetime import timedelta
|
2018-02-12 18:18:03 +01:00
|
|
|
from django.http import HttpRequest
|
2017-04-15 04:03:56 +02:00
|
|
|
from django.utils.timezone import now as timezone_now
|
2018-02-12 18:18:03 +01:00
|
|
|
from sendfile import _get_sendfile
|
2016-03-24 20:24:01 +01:00
|
|
|
|
2018-05-11 01:39:38 +02:00
|
|
|
from typing import Any, Callable
|
2016-10-11 19:38:22 +02:00
|
|
|
|
2017-11-05 10:51:25 +01:00
|
|
|
def destroy_uploads() -> None:
|
2016-04-17 23:51:49 +02:00
|
|
|
if os.path.exists(settings.LOCAL_UPLOADS_DIR):
|
|
|
|
shutil.rmtree(settings.LOCAL_UPLOADS_DIR)
|
|
|
|
|
2017-02-16 10:10:37 +01:00
|
|
|
class FileUploadTest(UploadSerializeMixin, ZulipTestCase):
|
2016-06-25 11:05:59 +02:00
|
|
|
|
2017-11-05 10:51:25 +01:00
|
|
|
def test_rest_endpoint(self) -> None:
|
2016-06-25 11:05:59 +02:00
|
|
|
"""
|
|
|
|
Tests the /api/v1/user_uploads api endpoint. Here a single file is uploaded
|
|
|
|
and downloaded using a username and api_key
|
|
|
|
"""
|
|
|
|
fp = StringIO("zulip!")
|
|
|
|
fp.name = "zulip.txt"
|
|
|
|
|
|
|
|
# Upload file via API
|
2017-12-14 19:02:31 +01:00
|
|
|
result = self.api_post(self.example_email("hamlet"), '/api/v1/user_uploads', {'file': fp})
|
2017-08-16 09:52:16 +02:00
|
|
|
self.assertIn("uri", result.json())
|
|
|
|
uri = result.json()['uri']
|
2016-06-25 11:05:59 +02:00
|
|
|
base = '/user_uploads/'
|
2016-12-16 02:01:34 +01:00
|
|
|
self.assertEqual(base, uri[:len(base)])
|
2016-06-25 11:05:59 +02:00
|
|
|
|
2016-06-27 16:41:58 +02:00
|
|
|
# Download file via API
|
2017-04-18 03:23:32 +02:00
|
|
|
self.logout()
|
2017-12-14 19:02:31 +01:00
|
|
|
response = self.api_get(self.example_email("hamlet"), uri)
|
2018-04-13 19:04:39 +02:00
|
|
|
self.assertEqual(response.status_code, 200)
|
2016-07-13 22:09:27 +02:00
|
|
|
data = b"".join(response.streaming_content)
|
2016-12-16 02:01:34 +01:00
|
|
|
self.assertEqual(b"zulip!", data)
|
2016-06-27 16:41:58 +02:00
|
|
|
|
2016-06-25 11:05:59 +02:00
|
|
|
# Files uploaded through the API should be accesible via the web client
|
2017-05-25 01:40:26 +02:00
|
|
|
self.login(self.example_email("hamlet"))
|
2016-12-19 16:17:19 +01:00
|
|
|
self.assert_url_serves_contents_of_file(uri, b"zulip!")
|
2016-06-25 11:05:59 +02:00
|
|
|
|
2018-04-13 19:04:39 +02:00
|
|
|
def test_mobile_api_endpoint(self) -> None:
|
|
|
|
"""
|
|
|
|
Tests the /api/v1/user_uploads api endpoint with ?api_key
|
|
|
|
auth. Here a single file is uploaded and downloaded using a
|
|
|
|
username and api_key
|
|
|
|
"""
|
|
|
|
fp = StringIO("zulip!")
|
|
|
|
fp.name = "zulip.txt"
|
|
|
|
|
|
|
|
# Upload file via API
|
|
|
|
result = self.api_post(self.example_email("hamlet"), '/api/v1/user_uploads', {'file': fp})
|
|
|
|
self.assertIn("uri", result.json())
|
|
|
|
uri = result.json()['uri']
|
|
|
|
base = '/user_uploads/'
|
|
|
|
self.assertEqual(base, uri[:len(base)])
|
|
|
|
|
|
|
|
self.logout()
|
|
|
|
|
|
|
|
# Try to download file via API, passing URL and invalid API key
|
|
|
|
user_profile = self.example_user("hamlet")
|
|
|
|
|
|
|
|
response = self.client_get(uri + "?api_key=" + "invalid")
|
|
|
|
self.assertEqual(response.status_code, 400)
|
|
|
|
|
|
|
|
response = self.client_get(uri + "?api_key=" + user_profile.api_key)
|
|
|
|
self.assertEqual(response.status_code, 200)
|
|
|
|
data = b"".join(response.streaming_content)
|
|
|
|
self.assertEqual(b"zulip!", data)
|
|
|
|
|
2018-05-14 19:07:38 +02:00
|
|
|
def test_upload_file_with_supplied_mimetype(self) -> None:
|
|
|
|
"""
|
|
|
|
When files are copied into the system clipboard and pasted for upload
|
|
|
|
the filename may not be supplied so the extension is determined from a
|
|
|
|
query string parameter.
|
|
|
|
"""
|
|
|
|
fp = StringIO("zulip!")
|
|
|
|
fp.name = "pasted_file"
|
|
|
|
result = self.api_post(self.example_email("hamlet"),
|
|
|
|
"/api/v1/user_uploads?mimetype=image/png",
|
|
|
|
{"file": fp})
|
|
|
|
self.assertEqual(result.status_code, 200)
|
|
|
|
uri = result.json()["uri"]
|
|
|
|
self.assertTrue(uri.endswith("pasted_file.png"))
|
|
|
|
|
2017-11-05 10:51:25 +01:00
|
|
|
def test_filename_encoding(self) -> None:
|
2017-03-08 19:47:42 +01:00
|
|
|
"""
|
|
|
|
In Python 2, we need to encode unicode filenames (which converts them to
|
|
|
|
str) before they can be rendered correctly. However, in Python 3, the
|
|
|
|
separate unicode type does not exist, and we don't need to perform this
|
|
|
|
encoding. This test ensures that we handle filename encodings properly,
|
|
|
|
and does so in a way that preserves 100% test coverage for Python 3.
|
|
|
|
"""
|
|
|
|
|
2017-05-07 17:21:26 +02:00
|
|
|
user_profile = self.example_user('hamlet')
|
2017-03-08 19:47:42 +01:00
|
|
|
|
|
|
|
mock_file = mock.Mock()
|
|
|
|
mock_file._get_size = mock.Mock(return_value=1024)
|
|
|
|
|
|
|
|
mock_files = mock.Mock()
|
|
|
|
mock_files.__len__ = mock.Mock(return_value=1)
|
|
|
|
mock_files.values = mock.Mock(return_value=[mock_file])
|
|
|
|
|
|
|
|
mock_request = mock.Mock()
|
|
|
|
mock_request.FILES = mock_files
|
|
|
|
|
|
|
|
# str filenames should not be encoded.
|
|
|
|
mock_filename = mock.Mock(spec=str)
|
|
|
|
mock_file.name = mock_filename
|
|
|
|
with mock.patch('zerver.views.upload.upload_message_image_from_request'):
|
|
|
|
result = upload_file_backend(mock_request, user_profile)
|
|
|
|
self.assert_json_success(result)
|
|
|
|
mock_filename.encode.assert_not_called()
|
|
|
|
|
|
|
|
# Non-str filenames should be encoded.
|
2017-07-11 21:53:16 +02:00
|
|
|
mock_filename = mock.Mock(spec=None) # None is not str
|
2017-03-08 19:47:42 +01:00
|
|
|
mock_file.name = mock_filename
|
|
|
|
with mock.patch('zerver.views.upload.upload_message_image_from_request'):
|
|
|
|
result = upload_file_backend(mock_request, user_profile)
|
|
|
|
self.assert_json_success(result)
|
|
|
|
mock_filename.encode.assert_called_once_with('ascii')
|
|
|
|
|
2017-11-05 10:51:25 +01:00
|
|
|
def test_file_too_big_failure(self) -> None:
|
2016-09-16 16:41:04 +02:00
|
|
|
"""
|
|
|
|
Attempting to upload big files should fail.
|
|
|
|
"""
|
2017-05-25 01:40:26 +02:00
|
|
|
self.login(self.example_email("hamlet"))
|
2016-09-16 16:41:04 +02:00
|
|
|
fp = StringIO("bah!")
|
|
|
|
fp.name = "a.txt"
|
|
|
|
|
|
|
|
# Use MAX_FILE_UPLOAD_SIZE of 0, because the next increment
|
|
|
|
# would be 1MB.
|
|
|
|
with self.settings(MAX_FILE_UPLOAD_SIZE=0):
|
2017-07-31 20:52:17 +02:00
|
|
|
result = self.client_post("/json/user_uploads", {'f1': fp})
|
2017-01-29 00:08:08 +01:00
|
|
|
self.assert_json_error(result, 'Uploaded file is larger than the allowed limit of 0 MB')
|
2016-09-16 16:41:04 +02:00
|
|
|
|
2017-11-05 10:51:25 +01:00
|
|
|
def test_multiple_upload_failure(self) -> None:
|
2016-04-14 16:26:01 +02:00
|
|
|
"""
|
|
|
|
Attempting to upload two files should fail.
|
|
|
|
"""
|
2017-05-25 01:40:26 +02:00
|
|
|
self.login(self.example_email("hamlet"))
|
2016-04-14 16:26:01 +02:00
|
|
|
fp = StringIO("bah!")
|
|
|
|
fp.name = "a.txt"
|
|
|
|
fp2 = StringIO("pshaw!")
|
|
|
|
fp2.name = "b.txt"
|
|
|
|
|
2017-07-31 20:52:17 +02:00
|
|
|
result = self.client_post("/json/user_uploads", {'f1': fp, 'f2': fp2})
|
2016-04-14 16:26:01 +02:00
|
|
|
self.assert_json_error(result, "You may only upload one file at a time")
|
|
|
|
|
2017-11-05 10:51:25 +01:00
|
|
|
def test_no_file_upload_failure(self) -> None:
|
2016-04-14 16:26:01 +02:00
|
|
|
"""
|
|
|
|
Calling this endpoint with no files should fail.
|
|
|
|
"""
|
2017-05-25 01:40:26 +02:00
|
|
|
self.login(self.example_email("hamlet"))
|
2016-04-14 16:26:01 +02:00
|
|
|
|
2017-07-31 20:52:17 +02:00
|
|
|
result = self.client_post("/json/user_uploads")
|
2016-04-14 16:26:01 +02:00
|
|
|
self.assert_json_error(result, "You must specify a file to upload")
|
|
|
|
|
2016-03-24 20:24:01 +01:00
|
|
|
# This test will go through the code path for uploading files onto LOCAL storage
|
|
|
|
# when zulip is in DEVELOPMENT mode.
|
2017-11-05 10:51:25 +01:00
|
|
|
def test_file_upload_authed(self) -> None:
|
2016-04-14 23:44:39 +02:00
|
|
|
"""
|
2017-07-31 20:52:17 +02:00
|
|
|
A call to /json/user_uploads should return a uri and actually create an
|
2016-03-24 20:24:01 +01:00
|
|
|
entry in the database. This entry will be marked unclaimed till a message
|
|
|
|
refers it.
|
2016-04-14 23:44:39 +02:00
|
|
|
"""
|
2017-05-25 01:40:26 +02:00
|
|
|
self.login(self.example_email("hamlet"))
|
2016-04-14 23:44:39 +02:00
|
|
|
fp = StringIO("zulip!")
|
|
|
|
fp.name = "zulip.txt"
|
|
|
|
|
2017-07-31 20:52:17 +02:00
|
|
|
result = self.client_post("/json/user_uploads", {'file': fp})
|
2016-04-14 23:44:39 +02:00
|
|
|
self.assert_json_success(result)
|
2017-08-16 09:52:16 +02:00
|
|
|
self.assertIn("uri", result.json())
|
|
|
|
uri = result.json()["uri"]
|
2016-04-14 23:44:39 +02:00
|
|
|
base = '/user_uploads/'
|
2016-12-16 02:01:34 +01:00
|
|
|
self.assertEqual(base, uri[:len(base)])
|
2016-04-14 23:44:39 +02:00
|
|
|
|
2016-03-24 20:24:01 +01:00
|
|
|
# In the future, local file requests will follow the same style as S3
|
|
|
|
# requests; they will be first authenthicated and redirected
|
2016-12-19 16:17:19 +01:00
|
|
|
self.assert_url_serves_contents_of_file(uri, b"zulip!")
|
2016-04-14 23:44:39 +02:00
|
|
|
|
2016-03-24 20:24:01 +01:00
|
|
|
# check if DB has attachment marked as unclaimed
|
|
|
|
entry = Attachment.objects.get(file_name='zulip.txt')
|
2016-12-16 02:01:34 +01:00
|
|
|
self.assertEqual(entry.is_claimed(), False)
|
2016-03-24 20:24:01 +01:00
|
|
|
|
2017-08-25 06:01:29 +02:00
|
|
|
self.subscribe(self.example_user("hamlet"), "Denmark")
|
2016-06-14 04:38:30 +02:00
|
|
|
body = "First message ...[zulip.txt](http://localhost:9991" + uri + ")"
|
2017-10-28 16:17:29 +02:00
|
|
|
self.send_stream_message(self.example_email("hamlet"), "Denmark", body, "test")
|
2016-06-14 04:38:30 +02:00
|
|
|
self.assertIn('title="zulip.txt"', self.get_last_message().rendered_content)
|
|
|
|
|
2017-11-05 10:51:25 +01:00
|
|
|
def test_file_download_unauthed(self) -> None:
|
2017-05-25 01:40:26 +02:00
|
|
|
self.login(self.example_email("hamlet"))
|
2016-06-17 19:48:17 +02:00
|
|
|
fp = StringIO("zulip!")
|
|
|
|
fp.name = "zulip.txt"
|
2017-07-31 20:52:17 +02:00
|
|
|
result = self.client_post("/json/user_uploads", {'file': fp})
|
2017-08-16 09:52:16 +02:00
|
|
|
uri = result.json()["uri"]
|
2016-06-17 19:48:17 +02:00
|
|
|
|
2017-04-18 03:23:32 +02:00
|
|
|
self.logout()
|
2016-06-17 19:48:17 +02:00
|
|
|
response = self.client_get(uri)
|
|
|
|
self.assert_json_error(response, "Not logged in: API authentication or user session required",
|
|
|
|
status_code=401)
|
|
|
|
|
2017-11-05 10:51:25 +01:00
|
|
|
def test_removed_file_download(self) -> None:
|
2016-06-17 19:48:17 +02:00
|
|
|
'''
|
|
|
|
Trying to download deleted files should return 404 error
|
|
|
|
'''
|
2017-05-25 01:40:26 +02:00
|
|
|
self.login(self.example_email("hamlet"))
|
2016-06-17 19:48:17 +02:00
|
|
|
fp = StringIO("zulip!")
|
|
|
|
fp.name = "zulip.txt"
|
2017-07-31 20:52:17 +02:00
|
|
|
result = self.client_post("/json/user_uploads", {'file': fp})
|
2016-06-17 19:48:17 +02:00
|
|
|
|
|
|
|
destroy_uploads()
|
|
|
|
|
2017-08-16 09:52:16 +02:00
|
|
|
response = self.client_get(result.json()["uri"])
|
2016-06-17 19:48:17 +02:00
|
|
|
self.assertEqual(response.status_code, 404)
|
|
|
|
|
2017-11-05 10:51:25 +01:00
|
|
|
def test_non_existing_file_download(self) -> None:
|
2016-06-17 19:48:17 +02:00
|
|
|
'''
|
|
|
|
Trying to download a file that was never uploaded will return a json_error
|
|
|
|
'''
|
2017-05-25 01:40:26 +02:00
|
|
|
self.login(self.example_email("hamlet"))
|
2016-06-17 19:48:17 +02:00
|
|
|
response = self.client_get("http://localhost:9991/user_uploads/1/ff/gg/abc.py")
|
|
|
|
self.assertEqual(response.status_code, 404)
|
|
|
|
self.assert_in_response('File not found.', response)
|
2016-06-27 21:09:56 +02:00
|
|
|
|
2017-11-05 10:51:25 +01:00
|
|
|
def test_delete_old_unclaimed_attachments(self) -> None:
|
2016-03-24 20:24:01 +01:00
|
|
|
# Upload some files and make them older than a weeek
|
2017-05-25 01:40:26 +02:00
|
|
|
self.login(self.example_email("hamlet"))
|
2016-03-24 20:24:01 +01:00
|
|
|
d1 = StringIO("zulip!")
|
|
|
|
d1.name = "dummy_1.txt"
|
2017-07-31 20:52:17 +02:00
|
|
|
result = self.client_post("/json/user_uploads", {'file': d1})
|
2017-08-16 09:52:16 +02:00
|
|
|
d1_path_id = re.sub('/user_uploads/', '', result.json()['uri'])
|
2016-03-24 20:24:01 +01:00
|
|
|
|
|
|
|
d2 = StringIO("zulip!")
|
|
|
|
d2.name = "dummy_2.txt"
|
2017-07-31 20:52:17 +02:00
|
|
|
result = self.client_post("/json/user_uploads", {'file': d2})
|
2017-08-16 09:52:16 +02:00
|
|
|
d2_path_id = re.sub('/user_uploads/', '', result.json()['uri'])
|
2016-03-24 20:24:01 +01:00
|
|
|
|
2017-04-15 04:03:56 +02:00
|
|
|
two_week_ago = timezone_now() - datetime.timedelta(weeks=2)
|
2016-03-24 20:24:01 +01:00
|
|
|
d1_attachment = Attachment.objects.get(path_id = d1_path_id)
|
|
|
|
d1_attachment.create_time = two_week_ago
|
|
|
|
d1_attachment.save()
|
2016-10-04 00:41:07 +02:00
|
|
|
self.assertEqual(str(d1_attachment), u'<Attachment: dummy_1.txt>')
|
2016-03-24 20:24:01 +01:00
|
|
|
d2_attachment = Attachment.objects.get(path_id = d2_path_id)
|
|
|
|
d2_attachment.create_time = two_week_ago
|
|
|
|
d2_attachment.save()
|
|
|
|
|
2017-11-09 16:26:38 +01:00
|
|
|
# Send message referring only dummy_1
|
2017-08-25 06:01:29 +02:00
|
|
|
self.subscribe(self.example_user("hamlet"), "Denmark")
|
2016-03-24 20:24:01 +01:00
|
|
|
body = "Some files here ...[zulip.txt](http://localhost:9991/user_uploads/" + d1_path_id + ")"
|
2017-10-28 16:17:29 +02:00
|
|
|
self.send_stream_message(self.example_email("hamlet"), "Denmark", body, "test")
|
2016-03-24 20:24:01 +01:00
|
|
|
|
|
|
|
# dummy_2 should not exist in database or the uploads folder
|
|
|
|
do_delete_old_unclaimed_attachments(2)
|
|
|
|
self.assertTrue(not Attachment.objects.filter(path_id = d2_path_id).exists())
|
|
|
|
self.assertTrue(not delete_message_image(d2_path_id))
|
|
|
|
|
2017-11-05 10:51:25 +01:00
|
|
|
def test_attachment_url_without_upload(self) -> None:
|
2017-05-25 01:40:26 +02:00
|
|
|
self.login(self.example_email("hamlet"))
|
2017-04-14 01:15:46 +02:00
|
|
|
body = "Test message ...[zulip.txt](http://localhost:9991/user_uploads/1/64/fake_path_id.txt)"
|
2017-10-28 16:17:29 +02:00
|
|
|
self.send_stream_message(self.example_email("hamlet"), "Denmark", body, "test")
|
2017-04-14 01:15:46 +02:00
|
|
|
self.assertFalse(Attachment.objects.filter(path_id = "1/64/fake_path_id.txt").exists())
|
|
|
|
|
2017-11-05 10:51:25 +01:00
|
|
|
def test_multiple_claim_attachments(self) -> None:
|
2016-03-24 20:24:01 +01:00
|
|
|
"""
|
|
|
|
This test tries to claim the same attachment twice. The messages field in
|
|
|
|
the Attachment model should have both the messages in its entry.
|
|
|
|
"""
|
2017-05-25 01:40:26 +02:00
|
|
|
self.login(self.example_email("hamlet"))
|
2016-03-24 20:24:01 +01:00
|
|
|
d1 = StringIO("zulip!")
|
|
|
|
d1.name = "dummy_1.txt"
|
2017-07-31 20:52:17 +02:00
|
|
|
result = self.client_post("/json/user_uploads", {'file': d1})
|
2017-08-16 09:52:16 +02:00
|
|
|
d1_path_id = re.sub('/user_uploads/', '', result.json()['uri'])
|
2016-03-24 20:24:01 +01:00
|
|
|
|
2017-08-25 06:01:29 +02:00
|
|
|
self.subscribe(self.example_user("hamlet"), "Denmark")
|
2016-03-24 20:24:01 +01:00
|
|
|
body = "First message ...[zulip.txt](http://localhost:9991/user_uploads/" + d1_path_id + ")"
|
2017-10-28 16:17:29 +02:00
|
|
|
self.send_stream_message(self.example_email("hamlet"), "Denmark", body, "test")
|
2016-03-24 20:24:01 +01:00
|
|
|
body = "Second message ...[zulip.txt](http://localhost:9991/user_uploads/" + d1_path_id + ")"
|
2017-10-28 16:17:29 +02:00
|
|
|
self.send_stream_message(self.example_email("hamlet"), "Denmark", body, "test")
|
2016-03-24 20:24:01 +01:00
|
|
|
|
2016-12-16 02:01:34 +01:00
|
|
|
self.assertEqual(Attachment.objects.get(path_id=d1_path_id).messages.count(), 2)
|
2016-03-24 20:24:01 +01:00
|
|
|
|
2017-11-05 10:51:25 +01:00
|
|
|
def test_multiple_claim_attachments_different_owners(self) -> None:
|
2017-04-14 00:59:59 +02:00
|
|
|
"""This test tries to claim the same attachment more than once, first
|
2017-11-09 16:26:38 +01:00
|
|
|
with a private stream and then with different recipients."""
|
2017-05-25 01:40:26 +02:00
|
|
|
self.login(self.example_email("hamlet"))
|
2017-04-14 00:59:59 +02:00
|
|
|
d1 = StringIO("zulip!")
|
|
|
|
d1.name = "dummy_1.txt"
|
2017-07-31 20:52:17 +02:00
|
|
|
result = self.client_post("/json/user_uploads", {'file': d1})
|
2017-08-16 09:52:16 +02:00
|
|
|
d1_path_id = re.sub('/user_uploads/', '', result.json()['uri'])
|
2017-04-14 00:59:59 +02:00
|
|
|
|
|
|
|
self.make_stream("private_stream", invite_only=True)
|
2017-08-25 06:01:29 +02:00
|
|
|
self.subscribe(self.example_user("hamlet"), "private_stream")
|
2017-04-14 00:59:59 +02:00
|
|
|
|
|
|
|
# First, send the mesasge to the new private stream.
|
|
|
|
body = "First message ...[zulip.txt](http://localhost:9991/user_uploads/" + d1_path_id + ")"
|
2017-10-28 16:17:29 +02:00
|
|
|
self.send_stream_message(self.example_email("hamlet"), "private_stream", body, "test")
|
2017-04-14 00:59:59 +02:00
|
|
|
self.assertFalse(Attachment.objects.get(path_id=d1_path_id).is_realm_public)
|
|
|
|
self.assertEqual(Attachment.objects.get(path_id=d1_path_id).messages.count(), 1)
|
|
|
|
|
|
|
|
# Then, try having a user who didn't receive the message try to publish it, and fail
|
|
|
|
body = "Illegal message ...[zulip.txt](http://localhost:9991/user_uploads/" + d1_path_id + ")"
|
2017-10-28 16:17:29 +02:00
|
|
|
self.send_stream_message(self.example_email("cordelia"), "Denmark", body, "test")
|
2017-04-14 00:59:59 +02:00
|
|
|
self.assertEqual(Attachment.objects.get(path_id=d1_path_id).messages.count(), 1)
|
|
|
|
self.assertFalse(Attachment.objects.get(path_id=d1_path_id).is_realm_public)
|
|
|
|
|
|
|
|
# Then, have the owner PM it to another user, giving that other user access.
|
|
|
|
body = "Second message ...[zulip.txt](http://localhost:9991/user_uploads/" + d1_path_id + ")"
|
2017-10-28 16:17:29 +02:00
|
|
|
self.send_personal_message(self.example_email("hamlet"), self.example_email("othello"), body)
|
2017-04-14 00:59:59 +02:00
|
|
|
self.assertEqual(Attachment.objects.get(path_id=d1_path_id).messages.count(), 2)
|
|
|
|
self.assertFalse(Attachment.objects.get(path_id=d1_path_id).is_realm_public)
|
|
|
|
|
|
|
|
# Then, have that new recipient user publish it.
|
|
|
|
body = "Third message ...[zulip.txt](http://localhost:9991/user_uploads/" + d1_path_id + ")"
|
2017-10-28 16:17:29 +02:00
|
|
|
self.send_stream_message(self.example_email("othello"), "Denmark", body, "test")
|
2017-04-14 00:59:59 +02:00
|
|
|
self.assertEqual(Attachment.objects.get(path_id=d1_path_id).messages.count(), 3)
|
|
|
|
self.assertTrue(Attachment.objects.get(path_id=d1_path_id).is_realm_public)
|
|
|
|
|
2017-11-05 10:51:25 +01:00
|
|
|
def test_check_attachment_reference_update(self) -> None:
|
2016-07-07 09:47:15 +02:00
|
|
|
f1 = StringIO("file1")
|
|
|
|
f1.name = "file1.txt"
|
|
|
|
f2 = StringIO("file2")
|
|
|
|
f2.name = "file2.txt"
|
|
|
|
f3 = StringIO("file3")
|
|
|
|
f3.name = "file3.txt"
|
|
|
|
|
2017-05-25 01:40:26 +02:00
|
|
|
self.login(self.example_email("hamlet"))
|
2017-07-31 20:52:17 +02:00
|
|
|
result = self.client_post("/json/user_uploads", {'file': f1})
|
2017-08-16 09:52:16 +02:00
|
|
|
f1_path_id = re.sub('/user_uploads/', '', result.json()['uri'])
|
2016-07-07 09:47:15 +02:00
|
|
|
|
2017-07-31 20:52:17 +02:00
|
|
|
result = self.client_post("/json/user_uploads", {'file': f2})
|
2017-08-16 09:52:16 +02:00
|
|
|
f2_path_id = re.sub('/user_uploads/', '', result.json()['uri'])
|
2016-07-07 09:47:15 +02:00
|
|
|
|
2017-08-25 06:01:29 +02:00
|
|
|
self.subscribe(self.example_user("hamlet"), "test")
|
2016-07-07 09:47:15 +02:00
|
|
|
body = ("[f1.txt](http://localhost:9991/user_uploads/" + f1_path_id + ")"
|
2016-12-03 00:04:17 +01:00
|
|
|
"[f2.txt](http://localhost:9991/user_uploads/" + f2_path_id + ")")
|
2017-10-28 16:17:29 +02:00
|
|
|
msg_id = self.send_stream_message(self.example_email("hamlet"), "test", body, "test")
|
2016-07-07 09:47:15 +02:00
|
|
|
|
2017-07-31 20:52:17 +02:00
|
|
|
result = self.client_post("/json/user_uploads", {'file': f3})
|
2017-08-16 09:52:16 +02:00
|
|
|
f3_path_id = re.sub('/user_uploads/', '', result.json()['uri'])
|
2016-07-07 09:47:15 +02:00
|
|
|
|
|
|
|
new_body = ("[f3.txt](http://localhost:9991/user_uploads/" + f3_path_id + ")"
|
2016-12-03 00:04:17 +01:00
|
|
|
"[f2.txt](http://localhost:9991/user_uploads/" + f2_path_id + ")")
|
2016-12-22 10:17:49 +01:00
|
|
|
result = self.client_patch("/json/messages/" + str(msg_id), {
|
2016-07-07 09:47:15 +02:00
|
|
|
'message_id': msg_id,
|
|
|
|
'content': new_body
|
|
|
|
})
|
|
|
|
self.assert_json_success(result)
|
|
|
|
|
|
|
|
message = Message.objects.get(id=msg_id)
|
|
|
|
f1_attachment = Attachment.objects.get(path_id=f1_path_id)
|
|
|
|
f2_attachment = Attachment.objects.get(path_id=f2_path_id)
|
2016-07-24 22:03:22 +02:00
|
|
|
f3_attachment = Attachment.objects.get(path_id=f3_path_id)
|
2016-07-07 09:47:15 +02:00
|
|
|
|
|
|
|
self.assertTrue(message not in f1_attachment.messages.all())
|
|
|
|
self.assertTrue(message in f2_attachment.messages.all())
|
|
|
|
self.assertTrue(message in f3_attachment.messages.all())
|
|
|
|
|
2016-07-23 07:06:13 +02:00
|
|
|
# Delete all the attachments from the message
|
|
|
|
new_body = "(deleted)"
|
2016-12-22 10:17:49 +01:00
|
|
|
result = self.client_patch("/json/messages/" + str(msg_id), {
|
2016-07-23 07:06:13 +02:00
|
|
|
'message_id': msg_id,
|
|
|
|
'content': new_body
|
|
|
|
})
|
|
|
|
self.assert_json_success(result)
|
|
|
|
|
|
|
|
message = Message.objects.get(id=msg_id)
|
|
|
|
f1_attachment = Attachment.objects.get(path_id=f1_path_id)
|
|
|
|
f2_attachment = Attachment.objects.get(path_id=f2_path_id)
|
|
|
|
f3_attachment = Attachment.objects.get(path_id=f3_path_id)
|
|
|
|
self.assertTrue(message not in f1_attachment.messages.all())
|
|
|
|
self.assertTrue(message not in f2_attachment.messages.all())
|
|
|
|
self.assertTrue(message not in f3_attachment.messages.all())
|
|
|
|
|
2017-11-05 10:51:25 +01:00
|
|
|
def test_file_name(self) -> None:
|
2016-09-20 11:02:15 +02:00
|
|
|
"""
|
|
|
|
Unicode filenames should be processed correctly.
|
|
|
|
"""
|
2017-05-25 01:40:26 +02:00
|
|
|
self.login(self.example_email("hamlet"))
|
2016-09-20 11:02:15 +02:00
|
|
|
for expected in ["Здравейте.txt", "test"]:
|
|
|
|
fp = StringIO("bah!")
|
|
|
|
fp.name = urllib.parse.quote(expected)
|
|
|
|
|
2017-07-31 20:52:17 +02:00
|
|
|
result = self.client_post("/json/user_uploads", {'f1': fp})
|
2017-08-16 09:52:16 +02:00
|
|
|
assert sanitize_name(expected) in result.json()['uri']
|
2016-09-20 11:02:15 +02:00
|
|
|
|
2018-01-26 16:13:33 +01:00
|
|
|
def test_realm_quota(self) -> None:
|
|
|
|
"""
|
|
|
|
Realm quota for uploading should not be exceeded.
|
|
|
|
"""
|
|
|
|
self.login(self.example_email("hamlet"))
|
|
|
|
|
|
|
|
d1 = StringIO("zulip!")
|
|
|
|
d1.name = "dummy_1.txt"
|
|
|
|
result = self.client_post("/json/user_uploads", {'file': d1})
|
|
|
|
d1_path_id = re.sub('/user_uploads/', '', result.json()['uri'])
|
|
|
|
d1_attachment = Attachment.objects.get(path_id = d1_path_id)
|
|
|
|
self.assert_json_success(result)
|
|
|
|
|
|
|
|
realm = get_realm("zulip")
|
|
|
|
realm.upload_quota_gb = 1
|
|
|
|
realm.save(update_fields=['upload_quota_gb'])
|
|
|
|
|
|
|
|
# The size of StringIO("zulip!") is 6 bytes. Setting the size of
|
|
|
|
# d1_attachment to realm.upload_quota_bytes() - 11 should allow
|
|
|
|
# us to upload only one more attachment.
|
2018-02-19 06:39:38 +01:00
|
|
|
quota = realm.upload_quota_bytes()
|
|
|
|
assert(quota is not None)
|
|
|
|
d1_attachment.size = quota - 11
|
2018-01-26 16:13:33 +01:00
|
|
|
d1_attachment.save(update_fields=['size'])
|
|
|
|
|
|
|
|
d2 = StringIO("zulip!")
|
|
|
|
d2.name = "dummy_2.txt"
|
|
|
|
result = self.client_post("/json/user_uploads", {'file': d2})
|
|
|
|
self.assert_json_success(result)
|
|
|
|
|
|
|
|
d3 = StringIO("zulip!")
|
|
|
|
d3.name = "dummy_3.txt"
|
|
|
|
result = self.client_post("/json/user_uploads", {'file': d3})
|
|
|
|
self.assert_json_error(result, "Upload would exceed your organization's upload quota.")
|
|
|
|
|
|
|
|
realm.upload_quota_gb = None
|
|
|
|
realm.save(update_fields=['upload_quota_gb'])
|
|
|
|
result = self.client_post("/json/user_uploads", {'file': d3})
|
|
|
|
self.assert_json_success(result)
|
|
|
|
|
2017-11-05 10:51:25 +01:00
|
|
|
def test_cross_realm_file_access(self) -> None:
|
2016-06-17 19:48:17 +02:00
|
|
|
|
2018-05-11 01:39:38 +02:00
|
|
|
def create_user(email: str, realm_id: str) -> UserProfile:
|
2017-08-26 01:20:47 +02:00
|
|
|
self.register(email, 'test', subdomain=realm_id)
|
2017-05-24 02:42:31 +02:00
|
|
|
return get_user(email, get_realm(realm_id))
|
2016-06-17 19:48:17 +02:00
|
|
|
|
2017-08-26 01:20:47 +02:00
|
|
|
test_subdomain = "uploadtest.example.com"
|
2016-06-17 19:48:17 +02:00
|
|
|
user1_email = 'user1@uploadtest.example.com'
|
|
|
|
user2_email = 'test-og-bot@zulip.com'
|
|
|
|
user3_email = 'other-user@uploadtest.example.com'
|
|
|
|
|
2017-08-26 01:20:47 +02:00
|
|
|
r1 = Realm.objects.create(string_id=test_subdomain, invite_required=False)
|
|
|
|
RealmDomain.objects.create(realm=r1, domain=test_subdomain)
|
2016-06-17 19:48:17 +02:00
|
|
|
|
2017-08-26 01:20:47 +02:00
|
|
|
create_user(user1_email, test_subdomain)
|
2017-05-24 02:42:31 +02:00
|
|
|
create_user(user2_email, 'zulip')
|
2017-08-26 01:20:47 +02:00
|
|
|
create_user(user3_email, test_subdomain)
|
2016-06-17 19:48:17 +02:00
|
|
|
|
|
|
|
# Send a message from @zulip.com -> @uploadtest.example.com
|
|
|
|
self.login(user2_email, 'test')
|
|
|
|
fp = StringIO("zulip!")
|
|
|
|
fp.name = "zulip.txt"
|
2017-07-31 20:52:17 +02:00
|
|
|
result = self.client_post("/json/user_uploads", {'file': fp})
|
2017-08-16 09:52:16 +02:00
|
|
|
uri = result.json()['uri']
|
2016-06-17 19:48:17 +02:00
|
|
|
fp_path_id = re.sub('/user_uploads/', '', uri)
|
|
|
|
body = "First message ...[zulip.txt](http://localhost:9991/user_uploads/" + fp_path_id + ")"
|
2017-04-19 04:17:39 +02:00
|
|
|
with self.settings(CROSS_REALM_BOT_EMAILS = set((user2_email, user3_email))):
|
2017-08-18 12:26:43 +02:00
|
|
|
internal_send_private_message(
|
|
|
|
realm=r1,
|
|
|
|
sender=get_system_bot(user2_email),
|
|
|
|
recipient_user=get_user(user1_email, r1),
|
|
|
|
content=body,
|
|
|
|
)
|
2016-06-17 19:48:17 +02:00
|
|
|
|
2017-11-18 00:11:24 +01:00
|
|
|
self.login(user1_email, 'test', realm=r1)
|
2017-08-26 01:20:47 +02:00
|
|
|
response = self.client_get(uri, subdomain=test_subdomain)
|
2016-06-17 19:48:17 +02:00
|
|
|
self.assertEqual(response.status_code, 200)
|
|
|
|
data = b"".join(response.streaming_content)
|
|
|
|
self.assertEqual(b"zulip!", data)
|
2017-04-18 03:23:32 +02:00
|
|
|
self.logout()
|
2016-06-17 19:48:17 +02:00
|
|
|
|
|
|
|
# Confirm other cross-realm users can't read it.
|
2017-11-18 00:11:24 +01:00
|
|
|
self.login(user3_email, 'test', realm=r1)
|
2017-08-26 01:20:47 +02:00
|
|
|
response = self.client_get(uri, subdomain=test_subdomain)
|
2016-06-17 19:48:17 +02:00
|
|
|
self.assertEqual(response.status_code, 403)
|
|
|
|
self.assert_in_response("You are not authorized to view this file.", response)
|
|
|
|
|
2017-11-05 10:51:25 +01:00
|
|
|
def test_file_download_authorization_invite_only(self) -> None:
|
2018-06-05 21:12:28 +02:00
|
|
|
user = self.example_user("hamlet")
|
|
|
|
subscribed_emails = [user.email, self.example_email("cordelia")]
|
|
|
|
unsubscribed_emails = [self.example_email("othello"), self.example_email("prospero")]
|
|
|
|
stream_name = "test-subscribe"
|
|
|
|
self.make_stream(stream_name, realm=user.realm, invite_only=True, history_public_to_subscribers=False)
|
2016-06-17 19:48:17 +02:00
|
|
|
|
2018-06-05 21:12:28 +02:00
|
|
|
for email in subscribed_emails:
|
|
|
|
self.subscribe(get_user(email, user.realm), stream_name)
|
2016-06-17 19:48:17 +02:00
|
|
|
|
2018-06-05 21:12:28 +02:00
|
|
|
self.login(user.email)
|
2016-06-17 19:48:17 +02:00
|
|
|
fp = StringIO("zulip!")
|
|
|
|
fp.name = "zulip.txt"
|
2017-07-31 20:52:17 +02:00
|
|
|
result = self.client_post("/json/user_uploads", {'file': fp})
|
2017-08-16 09:52:16 +02:00
|
|
|
uri = result.json()['uri']
|
2016-06-17 19:48:17 +02:00
|
|
|
fp_path_id = re.sub('/user_uploads/', '', uri)
|
|
|
|
body = "First message ...[zulip.txt](http://localhost:9991/user_uploads/" + fp_path_id + ")"
|
2018-06-05 21:12:28 +02:00
|
|
|
self.send_stream_message(user.email, stream_name, body, "test")
|
2017-04-18 03:23:32 +02:00
|
|
|
self.logout()
|
2016-06-17 19:48:17 +02:00
|
|
|
|
2018-06-05 21:12:28 +02:00
|
|
|
# Owner user should be able to view file
|
|
|
|
self.login(user.email)
|
|
|
|
with queries_captured() as queries:
|
2016-06-17 19:48:17 +02:00
|
|
|
response = self.client_get(uri)
|
|
|
|
self.assertEqual(response.status_code, 200)
|
|
|
|
data = b"".join(response.streaming_content)
|
|
|
|
self.assertEqual(b"zulip!", data)
|
2018-06-05 21:12:28 +02:00
|
|
|
self.logout()
|
|
|
|
self.assertEqual(len(queries), 5)
|
|
|
|
|
|
|
|
# Subscribed user who recieved the message should be able to view file
|
|
|
|
self.login(subscribed_emails[1])
|
|
|
|
with queries_captured() as queries:
|
|
|
|
response = self.client_get(uri)
|
|
|
|
self.assertEqual(response.status_code, 200)
|
|
|
|
data = b"".join(response.streaming_content)
|
|
|
|
self.assertEqual(b"zulip!", data)
|
|
|
|
self.logout()
|
|
|
|
self.assertEqual(len(queries), 6)
|
|
|
|
|
|
|
|
def assert_cannot_access_file(user_email: str) -> None:
|
|
|
|
response = self.api_get(user_email, uri)
|
|
|
|
self.assertEqual(response.status_code, 403)
|
|
|
|
self.assert_in_response("You are not authorized to view this file.", response)
|
|
|
|
|
|
|
|
late_subscribed_user = self.example_user("aaron")
|
|
|
|
self.subscribe(late_subscribed_user, stream_name)
|
|
|
|
assert_cannot_access_file(late_subscribed_user.email)
|
2016-06-17 19:48:17 +02:00
|
|
|
|
|
|
|
# Unsubscribed user should not be able to view file
|
2018-06-05 21:12:28 +02:00
|
|
|
for unsubscribed_user in unsubscribed_emails:
|
|
|
|
assert_cannot_access_file(unsubscribed_user)
|
|
|
|
|
|
|
|
def test_file_download_authorization_invite_only_with_shared_history(self) -> None:
|
|
|
|
user = self.example_user("hamlet")
|
|
|
|
subscribed_emails = [user.email, self.example_email("polonius")]
|
|
|
|
unsubscribed_emails = [self.example_email("othello"), self.example_email("prospero")]
|
|
|
|
stream_name = "test-subscribe"
|
|
|
|
self.make_stream(stream_name, realm=user.realm, invite_only=True, history_public_to_subscribers=True)
|
|
|
|
|
|
|
|
for email in subscribed_emails:
|
|
|
|
self.subscribe(get_user(email, user.realm), stream_name)
|
|
|
|
|
|
|
|
self.login(user.email)
|
|
|
|
fp = StringIO("zulip!")
|
|
|
|
fp.name = "zulip.txt"
|
|
|
|
result = self.client_post("/json/user_uploads", {'file': fp})
|
|
|
|
uri = result.json()['uri']
|
|
|
|
fp_path_id = re.sub('/user_uploads/', '', uri)
|
|
|
|
body = "First message ...[zulip.txt](http://localhost:9991/user_uploads/" + fp_path_id + ")"
|
|
|
|
self.send_stream_message(user.email, stream_name, body, "test")
|
|
|
|
self.logout()
|
|
|
|
|
|
|
|
# Add aaron as a subscribed after the message was sent
|
|
|
|
late_subscribed_user = self.example_user("aaron")
|
|
|
|
self.subscribe(late_subscribed_user, stream_name)
|
|
|
|
subscribed_emails.append(late_subscribed_user.email)
|
|
|
|
|
|
|
|
# Owner user should be able to view file
|
|
|
|
self.login(user.email)
|
|
|
|
with queries_captured() as queries:
|
2016-06-17 19:48:17 +02:00
|
|
|
response = self.client_get(uri)
|
2018-06-05 21:12:28 +02:00
|
|
|
self.assertEqual(response.status_code, 200)
|
|
|
|
data = b"".join(response.streaming_content)
|
|
|
|
self.assertEqual(b"zulip!", data)
|
|
|
|
self.logout()
|
|
|
|
self.assertEqual(len(queries), 5)
|
|
|
|
|
|
|
|
# Originally subscribed user should be able to view file
|
|
|
|
self.login(subscribed_emails[1])
|
|
|
|
with queries_captured() as queries:
|
|
|
|
response = self.client_get(uri)
|
|
|
|
self.assertEqual(response.status_code, 200)
|
|
|
|
data = b"".join(response.streaming_content)
|
|
|
|
self.assertEqual(b"zulip!", data)
|
|
|
|
self.logout()
|
|
|
|
self.assertEqual(len(queries), 6)
|
|
|
|
|
|
|
|
# Subscribed user who did not receive the message should also be able to view file
|
|
|
|
self.login(late_subscribed_user.email)
|
|
|
|
with queries_captured() as queries:
|
|
|
|
response = self.client_get(uri)
|
|
|
|
self.assertEqual(response.status_code, 200)
|
|
|
|
data = b"".join(response.streaming_content)
|
|
|
|
self.assertEqual(b"zulip!", data)
|
|
|
|
self.logout()
|
|
|
|
# It takes a few extra queries to verify access because of shared history.
|
|
|
|
self.assertEqual(len(queries), 9)
|
|
|
|
|
|
|
|
def assert_cannot_access_file(user_email: str) -> None:
|
|
|
|
self.login(user_email)
|
|
|
|
with queries_captured() as queries:
|
|
|
|
response = self.client_get(uri)
|
2016-06-17 19:48:17 +02:00
|
|
|
self.assertEqual(response.status_code, 403)
|
2018-06-05 21:12:28 +02:00
|
|
|
# It takes a few extra queries to verify lack of access with shared history.
|
|
|
|
self.assertEqual(len(queries), 8)
|
2016-06-17 19:48:17 +02:00
|
|
|
self.assert_in_response("You are not authorized to view this file.", response)
|
2017-04-18 03:23:32 +02:00
|
|
|
self.logout()
|
2016-06-17 19:48:17 +02:00
|
|
|
|
2018-06-05 21:12:28 +02:00
|
|
|
# Unsubscribed user should not be able to view file
|
|
|
|
for unsubscribed_user in unsubscribed_emails:
|
|
|
|
assert_cannot_access_file(unsubscribed_user)
|
|
|
|
|
|
|
|
def test_multiple_message_attachment_file_download(self) -> None:
|
|
|
|
hamlet = self.example_user("hamlet")
|
|
|
|
for i in range(0, 5):
|
|
|
|
stream_name = "test-subscribe %s" % (i,)
|
|
|
|
self.make_stream(stream_name, realm=hamlet.realm, invite_only=True, history_public_to_subscribers=True)
|
|
|
|
self.subscribe(hamlet, stream_name)
|
|
|
|
|
|
|
|
self.login(hamlet.email)
|
|
|
|
fp = StringIO("zulip!")
|
|
|
|
fp.name = "zulip.txt"
|
|
|
|
result = self.client_post("/json/user_uploads", {'file': fp})
|
|
|
|
uri = result.json()['uri']
|
|
|
|
fp_path_id = re.sub('/user_uploads/', '', uri)
|
|
|
|
for i in range(20):
|
|
|
|
body = "First message ...[zulip.txt](http://localhost:9991/user_uploads/" + fp_path_id + ")"
|
|
|
|
self.send_stream_message(self.example_email("hamlet"), "test-subscribe %s" % (i % 5,), body, "test")
|
|
|
|
self.logout()
|
|
|
|
|
|
|
|
user = self.example_user("aaron")
|
|
|
|
self.login(user.email)
|
|
|
|
with queries_captured() as queries:
|
|
|
|
response = self.client_get(uri)
|
|
|
|
self.assertEqual(response.status_code, 403)
|
|
|
|
self.assert_in_response("You are not authorized to view this file.", response)
|
|
|
|
self.assertEqual(len(queries), 8)
|
|
|
|
|
|
|
|
self.subscribe(user, "test-subscribe 1")
|
|
|
|
self.subscribe(user, "test-subscribe 2")
|
|
|
|
|
|
|
|
with queries_captured() as queries:
|
|
|
|
response = self.client_get(uri)
|
|
|
|
self.assertEqual(response.status_code, 200)
|
|
|
|
data = b"".join(response.streaming_content)
|
|
|
|
self.assertEqual(b"zulip!", data)
|
|
|
|
# If we were accidentally one query per message, this would be 20+
|
|
|
|
self.assertEqual(len(queries), 9)
|
|
|
|
|
|
|
|
with queries_captured() as queries:
|
|
|
|
self.assertTrue(validate_attachment_request(user, fp_path_id))
|
|
|
|
self.assertEqual(len(queries), 6)
|
|
|
|
|
|
|
|
self.logout()
|
|
|
|
|
2017-11-05 10:51:25 +01:00
|
|
|
def test_file_download_authorization_public(self) -> None:
|
2017-05-25 01:44:04 +02:00
|
|
|
subscribed_users = [self.example_email("hamlet"), self.example_email("iago")]
|
2017-05-25 02:08:35 +02:00
|
|
|
unsubscribed_users = [self.example_email("othello"), self.example_email("prospero")]
|
2017-08-25 06:01:29 +02:00
|
|
|
realm = get_realm("zulip")
|
|
|
|
for email in subscribed_users:
|
|
|
|
self.subscribe(get_user(email, realm), "test-subscribe")
|
2016-06-17 19:48:17 +02:00
|
|
|
|
2017-05-25 01:40:26 +02:00
|
|
|
self.login(self.example_email("hamlet"))
|
2016-06-17 19:48:17 +02:00
|
|
|
fp = StringIO("zulip!")
|
|
|
|
fp.name = "zulip.txt"
|
2017-07-31 20:52:17 +02:00
|
|
|
result = self.client_post("/json/user_uploads", {'file': fp})
|
2017-08-16 09:52:16 +02:00
|
|
|
uri = result.json()['uri']
|
2016-06-17 19:48:17 +02:00
|
|
|
fp_path_id = re.sub('/user_uploads/', '', uri)
|
|
|
|
body = "First message ...[zulip.txt](http://localhost:9991/user_uploads/" + fp_path_id + ")"
|
2017-10-28 16:17:29 +02:00
|
|
|
self.send_stream_message(self.example_email("hamlet"), "test-subscribe", body, "test")
|
2017-04-18 03:23:32 +02:00
|
|
|
self.logout()
|
2016-06-17 19:48:17 +02:00
|
|
|
|
|
|
|
# Now all users should be able to access the files
|
|
|
|
for user in subscribed_users + unsubscribed_users:
|
|
|
|
self.login(user)
|
|
|
|
response = self.client_get(uri)
|
|
|
|
data = b"".join(response.streaming_content)
|
|
|
|
self.assertEqual(b"zulip!", data)
|
2017-04-18 03:23:32 +02:00
|
|
|
self.logout()
|
2016-06-17 19:48:17 +02:00
|
|
|
|
2018-02-12 18:18:03 +01:00
|
|
|
def test_serve_local(self) -> None:
|
2018-05-11 01:39:38 +02:00
|
|
|
def check_xsend_links(name: str, name_str_for_test: str,
|
|
|
|
content_disposition: str='') -> None:
|
2018-02-12 18:18:03 +01:00
|
|
|
with self.settings(SENDFILE_BACKEND='sendfile.backends.nginx'):
|
|
|
|
_get_sendfile.clear() # To clearout cached version of backend from djangosendfile
|
|
|
|
self.login(self.example_email("hamlet"))
|
|
|
|
fp = StringIO("zulip!")
|
|
|
|
fp.name = name
|
|
|
|
result = self.client_post("/json/user_uploads", {'file': fp})
|
|
|
|
uri = result.json()['uri']
|
|
|
|
fp_path_id = re.sub('/user_uploads/', '', uri)
|
|
|
|
fp_path = os.path.split(fp_path_id)[0]
|
|
|
|
response = self.client_get(uri)
|
|
|
|
_get_sendfile.clear()
|
|
|
|
test_upload_dir = os.path.split(settings.LOCAL_UPLOADS_DIR)[1]
|
|
|
|
self.assertEqual(response['X-Accel-Redirect'],
|
|
|
|
'/serve_uploads/../../' + test_upload_dir +
|
|
|
|
'/files/' + fp_path + '/' + name_str_for_test)
|
2018-03-13 07:08:27 +01:00
|
|
|
if content_disposition != '':
|
|
|
|
self.assertIn('attachment;', response['Content-disposition'])
|
|
|
|
self.assertIn(content_disposition, response['Content-disposition'])
|
|
|
|
else:
|
|
|
|
self.assertEqual(response.get('Content-disposition'), None)
|
|
|
|
|
|
|
|
check_xsend_links('zulip.txt', 'zulip.txt', "filename*=UTF-8''zulip.txt")
|
|
|
|
check_xsend_links('áéБД.txt', '%C3%A1%C3%A9%D0%91%D0%94.txt',
|
|
|
|
"filename*=UTF-8''%C3%A1%C3%A9%D0%91%D0%94.txt")
|
|
|
|
check_xsend_links('zulip.html', 'zulip.html', "filename*=UTF-8''zulip.html")
|
|
|
|
check_xsend_links('zulip.sh', 'zulip.sh', "filename*=UTF-8''zulip.sh")
|
|
|
|
check_xsend_links('zulip.jpeg', 'zulip.jpeg')
|
|
|
|
check_xsend_links('áéБД.pdf', '%C3%A1%C3%A9%D0%91%D0%94.pdf')
|
|
|
|
check_xsend_links('zulip', 'zulip', "filename*=UTF-8''zulip")
|
2018-02-12 18:18:03 +01:00
|
|
|
|
2017-11-05 10:51:25 +01:00
|
|
|
def tearDown(self) -> None:
|
2016-04-17 23:51:49 +02:00
|
|
|
destroy_uploads()
|
|
|
|
|
2018-05-14 23:47:19 +02:00
|
|
|
|
2017-02-16 10:10:37 +01:00
|
|
|
class AvatarTest(UploadSerializeMixin, ZulipTestCase):
|
2016-04-17 23:51:49 +02:00
|
|
|
|
2017-11-05 10:51:25 +01:00
|
|
|
def test_get_avatar_field(self) -> None:
|
2017-10-10 03:53:25 +02:00
|
|
|
with self.settings(AVATAR_SALT="salt"):
|
|
|
|
url = get_avatar_field(
|
|
|
|
user_id=17,
|
|
|
|
realm_id=5,
|
|
|
|
email='foo@example.com',
|
|
|
|
avatar_source=UserProfile.AVATAR_FROM_USER,
|
|
|
|
avatar_version=2,
|
|
|
|
medium=True,
|
|
|
|
client_gravatar=False,
|
|
|
|
)
|
|
|
|
|
|
|
|
self.assertEqual(
|
|
|
|
url,
|
|
|
|
'/user_avatars/5/fc2b9f1a81f4508a4df2d95451a2a77e0524ca0e-medium.png?x=x&version=2'
|
|
|
|
)
|
|
|
|
|
|
|
|
url = get_avatar_field(
|
|
|
|
user_id=9999,
|
|
|
|
realm_id=9999,
|
|
|
|
email='foo@example.com',
|
|
|
|
avatar_source=UserProfile.AVATAR_FROM_GRAVATAR,
|
|
|
|
avatar_version=2,
|
|
|
|
medium=True,
|
|
|
|
client_gravatar=False,
|
|
|
|
)
|
|
|
|
|
|
|
|
self.assertEqual(
|
|
|
|
url,
|
|
|
|
'https://secure.gravatar.com/avatar/b48def645758b95537d4424c84d1a9ff?d=identicon&s=500&version=2'
|
|
|
|
)
|
|
|
|
|
|
|
|
url = get_avatar_field(
|
|
|
|
user_id=9999,
|
|
|
|
realm_id=9999,
|
|
|
|
email='foo@example.com',
|
|
|
|
avatar_source=UserProfile.AVATAR_FROM_GRAVATAR,
|
|
|
|
avatar_version=2,
|
|
|
|
medium=True,
|
|
|
|
client_gravatar=True,
|
|
|
|
)
|
|
|
|
|
|
|
|
self.assertEqual(url, None)
|
|
|
|
|
2017-11-05 10:51:25 +01:00
|
|
|
def test_avatar_url(self) -> None:
|
2017-03-21 23:53:54 +01:00
|
|
|
"""Verifies URL schemes for avatars and realm icons."""
|
|
|
|
backend = LocalUploadBackend() # type: ZulipUploadBackend
|
|
|
|
self.assertEqual(backend.get_avatar_url("hash", False),
|
|
|
|
"/user_avatars/hash.png?x=x")
|
|
|
|
self.assertEqual(backend.get_avatar_url("hash", True),
|
|
|
|
"/user_avatars/hash-medium.png?x=x")
|
2017-03-22 00:09:10 +01:00
|
|
|
self.assertEqual(backend.get_realm_icon_url(15, 1),
|
|
|
|
"/user_avatars/15/realm/icon.png?version=1")
|
2017-03-21 23:53:54 +01:00
|
|
|
|
|
|
|
with self.settings(S3_AVATAR_BUCKET="bucket"):
|
|
|
|
backend = S3UploadBackend()
|
|
|
|
self.assertEqual(backend.get_avatar_url("hash", False),
|
|
|
|
"https://bucket.s3.amazonaws.com/hash?x=x")
|
|
|
|
self.assertEqual(backend.get_avatar_url("hash", True),
|
|
|
|
"https://bucket.s3.amazonaws.com/hash-medium.png?x=x")
|
2017-03-22 00:09:10 +01:00
|
|
|
self.assertEqual(backend.get_realm_icon_url(15, 1),
|
|
|
|
"https://bucket.s3.amazonaws.com/15/realm/icon.png?version=1")
|
2017-03-21 23:53:54 +01:00
|
|
|
|
2017-11-05 10:51:25 +01:00
|
|
|
def test_multiple_upload_failure(self) -> None:
|
2016-04-17 23:51:49 +02:00
|
|
|
"""
|
|
|
|
Attempting to upload two files should fail.
|
|
|
|
"""
|
2017-05-25 01:40:26 +02:00
|
|
|
self.login(self.example_email("hamlet"))
|
2016-12-19 08:48:03 +01:00
|
|
|
with get_test_image_file('img.png') as fp1, \
|
|
|
|
get_test_image_file('img.png') as fp2:
|
2017-07-05 19:15:15 +02:00
|
|
|
result = self.client_post("/json/users/me/avatar", {'f1': fp1, 'f2': fp2})
|
2016-04-17 23:51:49 +02:00
|
|
|
self.assert_json_error(result, "You must upload exactly one avatar.")
|
|
|
|
|
2017-11-05 10:51:25 +01:00
|
|
|
def test_no_file_upload_failure(self) -> None:
|
2016-04-17 23:51:49 +02:00
|
|
|
"""
|
|
|
|
Calling this endpoint with no files should fail.
|
|
|
|
"""
|
2017-05-25 01:40:26 +02:00
|
|
|
self.login(self.example_email("hamlet"))
|
2016-04-17 23:51:49 +02:00
|
|
|
|
2017-07-05 19:15:15 +02:00
|
|
|
result = self.client_post("/json/users/me/avatar")
|
2016-04-17 23:51:49 +02:00
|
|
|
self.assert_json_error(result, "You must upload exactly one avatar.")
|
|
|
|
|
2016-04-17 23:57:03 +02:00
|
|
|
correct_files = [
|
|
|
|
('img.png', 'png_resized.png'),
|
2017-07-11 21:53:16 +02:00
|
|
|
('img.jpg', None), # jpeg resizing is platform-dependent
|
2016-04-17 23:57:03 +02:00
|
|
|
('img.gif', 'gif_resized.png'),
|
2018-03-02 17:56:25 +01:00
|
|
|
('img.tif', 'tif_resized.png'),
|
|
|
|
('cmyk.jpg', None)
|
2016-04-17 23:57:03 +02:00
|
|
|
]
|
|
|
|
corrupt_files = ['text.txt', 'corrupt.png', 'corrupt.gif']
|
|
|
|
|
2017-11-05 10:51:25 +01:00
|
|
|
def test_get_gravatar_avatar(self) -> None:
|
2017-05-25 01:40:26 +02:00
|
|
|
self.login(self.example_email("hamlet"))
|
2017-05-07 17:21:26 +02:00
|
|
|
cordelia = self.example_user('cordelia')
|
2016-07-13 01:56:59 +02:00
|
|
|
|
|
|
|
cordelia.avatar_source = UserProfile.AVATAR_FROM_GRAVATAR
|
|
|
|
cordelia.save()
|
|
|
|
with self.settings(ENABLE_GRAVATAR=True):
|
2016-07-28 00:38:45 +02:00
|
|
|
response = self.client_get("/avatar/cordelia@zulip.com?foo=bar")
|
2016-07-13 01:56:59 +02:00
|
|
|
redirect_url = response['Location']
|
2018-05-17 16:22:08 +02:00
|
|
|
self.assertEqual(redirect_url, str(avatar_url(cordelia)) + '&foo=bar')
|
2016-07-13 01:56:59 +02:00
|
|
|
|
|
|
|
with self.settings(ENABLE_GRAVATAR=False):
|
2016-07-28 00:38:45 +02:00
|
|
|
response = self.client_get("/avatar/cordelia@zulip.com?foo=bar")
|
2016-07-13 01:56:59 +02:00
|
|
|
redirect_url = response['Location']
|
2018-05-17 16:22:08 +02:00
|
|
|
self.assertTrue(redirect_url.endswith(str(avatar_url(cordelia)) + '&foo=bar'))
|
2016-07-13 01:56:59 +02:00
|
|
|
|
2017-11-05 10:51:25 +01:00
|
|
|
def test_get_user_avatar(self) -> None:
|
2017-05-25 01:40:26 +02:00
|
|
|
self.login(self.example_email("hamlet"))
|
2017-05-07 17:21:26 +02:00
|
|
|
cordelia = self.example_user('cordelia')
|
2016-07-13 01:56:59 +02:00
|
|
|
|
|
|
|
cordelia.avatar_source = UserProfile.AVATAR_FROM_USER
|
|
|
|
cordelia.save()
|
2016-07-28 00:38:45 +02:00
|
|
|
response = self.client_get("/avatar/cordelia@zulip.com?foo=bar")
|
2016-07-13 01:56:59 +02:00
|
|
|
redirect_url = response['Location']
|
2018-05-17 16:22:08 +02:00
|
|
|
self.assertTrue(redirect_url.endswith(str(avatar_url(cordelia)) + '&foo=bar'))
|
2016-07-13 01:56:59 +02:00
|
|
|
|
2016-10-24 16:42:43 +02:00
|
|
|
response = self.client_get("/avatar/%s?foo=bar" % (cordelia.id))
|
|
|
|
redirect_url = response['Location']
|
2018-05-17 16:22:08 +02:00
|
|
|
self.assertTrue(redirect_url.endswith(str(avatar_url(cordelia)) + '&foo=bar'))
|
2016-10-24 16:42:43 +02:00
|
|
|
|
2018-04-18 21:40:54 +02:00
|
|
|
response = self.client_get("/avatar/")
|
|
|
|
self.assertEqual(response.status_code, 404)
|
|
|
|
|
2017-11-05 10:51:25 +01:00
|
|
|
def test_get_user_avatar_medium(self) -> None:
|
2017-05-25 01:40:26 +02:00
|
|
|
self.login(self.example_email("hamlet"))
|
2017-05-07 17:21:26 +02:00
|
|
|
cordelia = self.example_user('cordelia')
|
2017-02-23 20:13:56 +01:00
|
|
|
|
|
|
|
cordelia.avatar_source = UserProfile.AVATAR_FROM_USER
|
|
|
|
cordelia.save()
|
|
|
|
response = self.client_get("/avatar/cordelia@zulip.com/medium?foo=bar")
|
|
|
|
redirect_url = response['Location']
|
2018-05-17 16:22:08 +02:00
|
|
|
self.assertTrue(redirect_url.endswith(str(avatar_url(cordelia, True)) + '&foo=bar'))
|
2017-02-23 20:13:56 +01:00
|
|
|
|
|
|
|
response = self.client_get("/avatar/%s/medium?foo=bar" % (cordelia.id,))
|
|
|
|
redirect_url = response['Location']
|
2018-05-17 16:22:08 +02:00
|
|
|
self.assertTrue(redirect_url.endswith(str(avatar_url(cordelia, True)) + '&foo=bar'))
|
2017-02-23 20:13:56 +01:00
|
|
|
|
2017-11-05 10:51:25 +01:00
|
|
|
def test_non_valid_user_avatar(self) -> None:
|
2016-07-13 01:56:59 +02:00
|
|
|
|
|
|
|
# It's debatable whether we should generate avatars for non-users,
|
|
|
|
# but this test just validates the current code's behavior.
|
2017-05-25 01:40:26 +02:00
|
|
|
self.login(self.example_email("hamlet"))
|
2016-07-13 01:56:59 +02:00
|
|
|
|
2016-07-28 00:38:45 +02:00
|
|
|
response = self.client_get("/avatar/nonexistent_user@zulip.com?foo=bar")
|
2016-07-13 01:56:59 +02:00
|
|
|
redirect_url = response['Location']
|
2017-02-16 22:35:57 +01:00
|
|
|
actual_url = 'https://secure.gravatar.com/avatar/444258b521f152129eb0c162996e572d?d=identicon&version=1&foo=bar'
|
2016-07-13 01:56:59 +02:00
|
|
|
self.assertEqual(redirect_url, actual_url)
|
|
|
|
|
2017-11-05 10:51:25 +01:00
|
|
|
def test_valid_avatars(self) -> None:
|
2016-04-17 23:57:03 +02:00
|
|
|
"""
|
2016-12-21 18:34:03 +01:00
|
|
|
A PUT request to /json/users/me/avatar with a valid file should return a url and actually create an avatar.
|
2016-04-17 23:57:03 +02:00
|
|
|
"""
|
2017-01-28 19:05:20 +01:00
|
|
|
version = 2
|
2016-04-17 23:57:03 +02:00
|
|
|
for fname, rfname in self.correct_files:
|
|
|
|
# TODO: use self.subTest once we're exclusively on python 3 by uncommenting the line below.
|
|
|
|
# with self.subTest(fname=fname):
|
2017-05-25 01:40:26 +02:00
|
|
|
self.login(self.example_email("hamlet"))
|
2016-12-19 08:48:03 +01:00
|
|
|
with get_test_image_file(fname) as fp:
|
2017-07-05 19:15:15 +02:00
|
|
|
result = self.client_post("/json/users/me/avatar", {'file': fp})
|
2016-04-17 23:57:03 +02:00
|
|
|
|
|
|
|
self.assert_json_success(result)
|
2017-08-16 09:52:16 +02:00
|
|
|
self.assertIn("avatar_url", result.json())
|
2016-04-17 23:57:03 +02:00
|
|
|
base = '/user_avatars/'
|
2017-08-16 09:52:16 +02:00
|
|
|
url = result.json()['avatar_url']
|
2016-12-16 02:01:34 +01:00
|
|
|
self.assertEqual(base, url[:len(base)])
|
2016-04-17 23:57:03 +02:00
|
|
|
|
2016-07-26 08:14:49 +02:00
|
|
|
if rfname is not None:
|
2016-07-28 00:38:45 +02:00
|
|
|
response = self.client_get(url)
|
2016-07-26 08:14:49 +02:00
|
|
|
data = b"".join(response.streaming_content)
|
2016-12-16 02:01:34 +01:00
|
|
|
self.assertEqual(Image.open(io.BytesIO(data)).size, (100, 100))
|
2016-04-17 23:57:03 +02:00
|
|
|
|
2016-09-20 21:48:48 +02:00
|
|
|
# Verify that the medium-size avatar was created
|
2017-05-07 17:21:26 +02:00
|
|
|
user_profile = self.example_user('hamlet')
|
2016-12-19 08:48:03 +01:00
|
|
|
medium_avatar_disk_path = avatar_disk_path(user_profile, medium=True)
|
2016-09-20 21:48:48 +02:00
|
|
|
self.assertTrue(os.path.exists(medium_avatar_disk_path))
|
|
|
|
|
2018-05-14 23:47:19 +02:00
|
|
|
# Verify that ensure_medium_avatar_url does not overwrite this file if it exists
|
|
|
|
with mock.patch('zerver.lib.upload.write_local_file') as mock_write_local_file:
|
|
|
|
zerver.lib.upload.upload_backend.ensure_medium_avatar_image(user_profile)
|
|
|
|
self.assertFalse(mock_write_local_file.called)
|
|
|
|
|
2016-09-20 21:48:48 +02:00
|
|
|
# Confirm that ensure_medium_avatar_url works to recreate
|
|
|
|
# medium size avatars from the original if needed
|
|
|
|
os.remove(medium_avatar_disk_path)
|
|
|
|
self.assertFalse(os.path.exists(medium_avatar_disk_path))
|
2017-05-23 21:17:47 +02:00
|
|
|
zerver.lib.upload.upload_backend.ensure_medium_avatar_image(user_profile)
|
2016-09-20 21:48:48 +02:00
|
|
|
self.assertTrue(os.path.exists(medium_avatar_disk_path))
|
|
|
|
|
2017-01-28 19:05:20 +01:00
|
|
|
# Verify whether the avatar_version gets incremented with every new upload
|
|
|
|
self.assertEqual(user_profile.avatar_version, version)
|
|
|
|
version += 1
|
|
|
|
|
2018-06-06 14:30:26 +02:00
|
|
|
def test_copy_avatar_image(self) -> None:
|
|
|
|
self.login(self.example_email("hamlet"))
|
|
|
|
with get_test_image_file('img.png') as image_file:
|
|
|
|
self.client_post("/json/users/me/avatar", {'file': image_file})
|
|
|
|
|
|
|
|
source_user_profile = self.example_user('hamlet')
|
|
|
|
target_user_profile = self.example_user('iago')
|
|
|
|
|
|
|
|
copy_user_settings(source_user_profile, target_user_profile)
|
|
|
|
|
|
|
|
source_path_id = avatar_disk_path(source_user_profile)
|
|
|
|
target_path_id = avatar_disk_path(target_user_profile)
|
|
|
|
self.assertNotEqual(source_path_id, target_path_id)
|
|
|
|
self.assertEqual(open(source_path_id, "rb").read(), open(target_path_id, "rb").read())
|
|
|
|
|
|
|
|
source_original_path_id = avatar_disk_path(source_user_profile, original=True)
|
|
|
|
target_original_path_id = avatar_disk_path(target_user_profile, original=True)
|
|
|
|
self.assertEqual(open(source_original_path_id, "rb").read(), open(target_original_path_id, "rb").read())
|
|
|
|
|
|
|
|
source_medium_path_id = avatar_disk_path(source_user_profile, medium=True)
|
|
|
|
target_medium_path_id = avatar_disk_path(target_user_profile, medium=True)
|
|
|
|
self.assertEqual(open(source_medium_path_id, "rb").read(), open(target_medium_path_id, "rb").read())
|
|
|
|
|
2017-11-05 10:51:25 +01:00
|
|
|
def test_invalid_avatars(self) -> None:
|
2016-04-17 23:57:03 +02:00
|
|
|
"""
|
2016-12-21 18:34:03 +01:00
|
|
|
A PUT request to /json/users/me/avatar with an invalid file should fail.
|
2016-04-17 23:57:03 +02:00
|
|
|
"""
|
|
|
|
for fname in self.corrupt_files:
|
|
|
|
# with self.subTest(fname=fname):
|
2017-05-25 01:40:26 +02:00
|
|
|
self.login(self.example_email("hamlet"))
|
2016-12-19 08:48:03 +01:00
|
|
|
with get_test_image_file(fname) as fp:
|
2017-07-05 19:15:15 +02:00
|
|
|
result = self.client_post("/json/users/me/avatar", {'file': fp})
|
2016-04-17 23:57:03 +02:00
|
|
|
|
2017-02-26 20:17:34 +01:00
|
|
|
self.assert_json_error(result, "Could not decode image; did you upload an image file?")
|
2017-05-07 17:21:26 +02:00
|
|
|
user_profile = self.example_user('hamlet')
|
2017-01-28 19:05:20 +01:00
|
|
|
self.assertEqual(user_profile.avatar_version, 1)
|
2016-04-17 23:57:03 +02:00
|
|
|
|
2017-11-05 10:51:25 +01:00
|
|
|
def test_delete_avatar(self) -> None:
|
2016-12-21 18:34:03 +01:00
|
|
|
"""
|
|
|
|
A DELETE request to /json/users/me/avatar should delete the user avatar and return gravatar URL
|
|
|
|
"""
|
2017-05-25 01:40:26 +02:00
|
|
|
self.login(self.example_email("hamlet"))
|
2017-05-07 17:21:26 +02:00
|
|
|
hamlet = self.example_user('hamlet')
|
2016-12-21 18:34:03 +01:00
|
|
|
hamlet.avatar_source = UserProfile.AVATAR_FROM_USER
|
|
|
|
hamlet.save()
|
|
|
|
|
|
|
|
result = self.client_delete("/json/users/me/avatar")
|
2017-05-07 17:21:26 +02:00
|
|
|
user_profile = self.example_user('hamlet')
|
2016-12-21 18:34:03 +01:00
|
|
|
|
|
|
|
self.assert_json_success(result)
|
2017-08-16 09:52:16 +02:00
|
|
|
self.assertIn("avatar_url", result.json())
|
|
|
|
self.assertEqual(result.json()["avatar_url"], avatar_url(user_profile))
|
2016-12-21 18:34:03 +01:00
|
|
|
|
|
|
|
self.assertEqual(user_profile.avatar_source, UserProfile.AVATAR_FROM_GRAVATAR)
|
2017-01-28 19:05:20 +01:00
|
|
|
self.assertEqual(user_profile.avatar_version, 2)
|
2016-12-21 18:34:03 +01:00
|
|
|
|
2017-11-05 10:51:25 +01:00
|
|
|
def test_avatar_upload_file_size_error(self) -> None:
|
2017-05-25 01:40:26 +02:00
|
|
|
self.login(self.example_email("hamlet"))
|
2017-03-06 06:22:28 +01:00
|
|
|
with get_test_image_file(self.correct_files[0][0]) as fp:
|
|
|
|
with self.settings(MAX_AVATAR_FILE_SIZE=0):
|
2017-07-05 19:15:15 +02:00
|
|
|
result = self.client_post("/json/users/me/avatar", {'file': fp})
|
2017-03-06 06:22:28 +01:00
|
|
|
self.assert_json_error(result, "Uploaded file is larger than the allowed limit of 0 MB")
|
|
|
|
|
2017-11-05 10:51:25 +01:00
|
|
|
def tearDown(self) -> None:
|
2016-04-17 23:51:49 +02:00
|
|
|
destroy_uploads()
|
2016-04-14 23:44:39 +02:00
|
|
|
|
2018-04-15 13:20:36 +02:00
|
|
|
class EmojiTest(UploadSerializeMixin, ZulipTestCase):
|
|
|
|
def test_resize_emoji(self) -> None:
|
|
|
|
# Test unequal width and height of animated GIF image
|
2018-05-14 20:23:57 +02:00
|
|
|
animated_unequal_img_data = get_test_image_file('animated_unequal_img.gif').read()
|
2018-04-15 13:20:36 +02:00
|
|
|
with self.assertRaises(JsonableError):
|
|
|
|
resize_emoji(animated_unequal_img_data)
|
|
|
|
|
2018-05-14 20:23:57 +02:00
|
|
|
# Test for large animated image (128x128)
|
|
|
|
animated_large_img_data = get_test_image_file('animated_large_img.gif').read()
|
2018-04-15 13:20:36 +02:00
|
|
|
with self.assertRaises(JsonableError):
|
|
|
|
resize_emoji(animated_large_img_data)
|
|
|
|
|
|
|
|
# Test for no resize case
|
2018-05-14 20:23:57 +02:00
|
|
|
animated_img_data = get_test_image_file('animated_img.gif').read()
|
2018-04-15 13:20:36 +02:00
|
|
|
self.assertEqual(animated_img_data, resize_emoji(animated_img_data))
|
|
|
|
|
2018-05-14 20:27:44 +02:00
|
|
|
# Test for resize case
|
|
|
|
img_data = get_test_image_file('img.gif').read()
|
|
|
|
resized_img_data = resize_emoji(img_data, size=80)
|
|
|
|
im = Image.open(io.BytesIO(resized_img_data))
|
|
|
|
self.assertEqual((80, 80), im.size)
|
|
|
|
|
2018-04-15 13:20:36 +02:00
|
|
|
# Test corrupt image exception
|
2018-05-14 20:23:57 +02:00
|
|
|
corrupted_img_data = get_test_image_file('corrupt.gif').read()
|
2018-04-15 13:20:36 +02:00
|
|
|
with self.assertRaises(BadImageError):
|
|
|
|
resize_emoji(corrupted_img_data)
|
|
|
|
|
|
|
|
def tearDown(self) -> None:
|
|
|
|
destroy_uploads()
|
|
|
|
|
2017-02-21 03:41:20 +01:00
|
|
|
class RealmIconTest(UploadSerializeMixin, ZulipTestCase):
|
|
|
|
|
2017-11-05 10:51:25 +01:00
|
|
|
def test_multiple_upload_failure(self) -> None:
|
2017-02-21 03:41:20 +01:00
|
|
|
"""
|
|
|
|
Attempting to upload two files should fail.
|
|
|
|
"""
|
|
|
|
# Log in as admin
|
2017-05-25 01:44:04 +02:00
|
|
|
self.login(self.example_email("iago"))
|
2017-02-21 03:41:20 +01:00
|
|
|
with get_test_image_file('img.png') as fp1, \
|
|
|
|
get_test_image_file('img.png') as fp2:
|
2017-07-05 19:02:54 +02:00
|
|
|
result = self.client_post("/json/realm/icon", {'f1': fp1, 'f2': fp2})
|
2017-02-21 03:41:20 +01:00
|
|
|
self.assert_json_error(result, "You must upload exactly one icon.")
|
|
|
|
|
2017-11-05 10:51:25 +01:00
|
|
|
def test_no_file_upload_failure(self) -> None:
|
2017-02-21 03:41:20 +01:00
|
|
|
"""
|
|
|
|
Calling this endpoint with no files should fail.
|
|
|
|
"""
|
2017-05-25 01:44:04 +02:00
|
|
|
self.login(self.example_email("iago"))
|
2017-02-21 03:41:20 +01:00
|
|
|
|
2017-07-05 19:02:54 +02:00
|
|
|
result = self.client_post("/json/realm/icon")
|
2017-02-21 03:41:20 +01:00
|
|
|
self.assert_json_error(result, "You must upload exactly one icon.")
|
|
|
|
|
|
|
|
correct_files = [
|
|
|
|
('img.png', 'png_resized.png'),
|
2017-07-11 21:53:16 +02:00
|
|
|
('img.jpg', None), # jpeg resizing is platform-dependent
|
2017-02-21 03:41:20 +01:00
|
|
|
('img.gif', 'gif_resized.png'),
|
2018-03-02 17:56:25 +01:00
|
|
|
('img.tif', 'tif_resized.png'),
|
|
|
|
('cmyk.jpg', None)
|
2017-02-21 03:41:20 +01:00
|
|
|
]
|
|
|
|
corrupt_files = ['text.txt', 'corrupt.png', 'corrupt.gif']
|
|
|
|
|
2017-11-05 10:51:25 +01:00
|
|
|
def test_no_admin_user_upload(self) -> None:
|
2017-05-25 01:40:26 +02:00
|
|
|
self.login(self.example_email("hamlet"))
|
2017-02-21 03:41:20 +01:00
|
|
|
with get_test_image_file(self.correct_files[0][0]) as fp:
|
2017-07-05 19:02:54 +02:00
|
|
|
result = self.client_post("/json/realm/icon", {'file': fp})
|
2018-03-08 01:47:17 +01:00
|
|
|
self.assert_json_error(result, 'Must be an organization administrator')
|
2017-02-21 03:41:20 +01:00
|
|
|
|
2017-11-05 10:51:25 +01:00
|
|
|
def test_get_gravatar_icon(self) -> None:
|
2017-05-25 01:40:26 +02:00
|
|
|
self.login(self.example_email("hamlet"))
|
2017-02-21 03:41:20 +01:00
|
|
|
realm = get_realm('zulip')
|
|
|
|
realm.icon_source = Realm.ICON_FROM_GRAVATAR
|
|
|
|
realm.save()
|
|
|
|
with self.settings(ENABLE_GRAVATAR=True):
|
|
|
|
response = self.client_get("/json/realm/icon?foo=bar")
|
|
|
|
redirect_url = response['Location']
|
|
|
|
self.assertEqual(redirect_url, realm_icon_url(realm) + '&foo=bar')
|
|
|
|
|
|
|
|
with self.settings(ENABLE_GRAVATAR=False):
|
|
|
|
response = self.client_get("/json/realm/icon?foo=bar")
|
|
|
|
redirect_url = response['Location']
|
|
|
|
self.assertTrue(redirect_url.endswith(realm_icon_url(realm) + '&foo=bar'))
|
|
|
|
|
2017-11-05 10:51:25 +01:00
|
|
|
def test_get_realm_icon(self) -> None:
|
2017-05-25 01:40:26 +02:00
|
|
|
self.login(self.example_email("hamlet"))
|
2017-02-21 03:41:20 +01:00
|
|
|
|
|
|
|
realm = get_realm('zulip')
|
|
|
|
realm.icon_source = Realm.ICON_UPLOADED
|
|
|
|
realm.save()
|
|
|
|
response = self.client_get("/json/realm/icon?foo=bar")
|
|
|
|
redirect_url = response['Location']
|
|
|
|
self.assertTrue(redirect_url.endswith(realm_icon_url(realm) + '&foo=bar'))
|
|
|
|
|
2017-11-05 10:51:25 +01:00
|
|
|
def test_valid_icons(self) -> None:
|
2017-02-21 03:41:20 +01:00
|
|
|
"""
|
|
|
|
A PUT request to /json/realm/icon with a valid file should return a url
|
|
|
|
and actually create an realm icon.
|
|
|
|
"""
|
|
|
|
for fname, rfname in self.correct_files:
|
|
|
|
# TODO: use self.subTest once we're exclusively on python 3 by uncommenting the line below.
|
|
|
|
# with self.subTest(fname=fname):
|
2017-05-25 01:44:04 +02:00
|
|
|
self.login(self.example_email("iago"))
|
2017-02-21 03:41:20 +01:00
|
|
|
with get_test_image_file(fname) as fp:
|
2017-07-05 19:02:54 +02:00
|
|
|
result = self.client_post("/json/realm/icon", {'file': fp})
|
2017-02-21 03:41:20 +01:00
|
|
|
realm = get_realm('zulip')
|
|
|
|
self.assert_json_success(result)
|
2017-08-16 09:52:16 +02:00
|
|
|
self.assertIn("icon_url", result.json())
|
2017-02-21 03:41:20 +01:00
|
|
|
base = '/user_avatars/%s/realm/icon.png' % (realm.id,)
|
2017-08-16 09:52:16 +02:00
|
|
|
url = result.json()['icon_url']
|
2017-02-21 03:41:20 +01:00
|
|
|
self.assertEqual(base, url[:len(base)])
|
|
|
|
|
|
|
|
if rfname is not None:
|
|
|
|
response = self.client_get(url)
|
|
|
|
data = b"".join(response.streaming_content)
|
|
|
|
self.assertEqual(Image.open(io.BytesIO(data)).size, (100, 100))
|
|
|
|
|
2017-11-05 10:51:25 +01:00
|
|
|
def test_invalid_icons(self) -> None:
|
2017-02-21 03:41:20 +01:00
|
|
|
"""
|
|
|
|
A PUT request to /json/realm/icon with an invalid file should fail.
|
|
|
|
"""
|
|
|
|
for fname in self.corrupt_files:
|
|
|
|
# with self.subTest(fname=fname):
|
2017-05-25 01:44:04 +02:00
|
|
|
self.login(self.example_email("iago"))
|
2017-02-21 03:41:20 +01:00
|
|
|
with get_test_image_file(fname) as fp:
|
2017-07-05 19:02:54 +02:00
|
|
|
result = self.client_post("/json/realm/icon", {'file': fp})
|
2017-02-21 03:41:20 +01:00
|
|
|
|
|
|
|
self.assert_json_error(result, "Could not decode image; did you upload an image file?")
|
|
|
|
|
2017-11-05 10:51:25 +01:00
|
|
|
def test_delete_icon(self) -> None:
|
2017-02-21 03:41:20 +01:00
|
|
|
"""
|
|
|
|
A DELETE request to /json/realm/icon should delete the realm icon and return gravatar URL
|
|
|
|
"""
|
2017-05-25 01:44:04 +02:00
|
|
|
self.login(self.example_email("iago"))
|
2017-02-21 03:41:20 +01:00
|
|
|
realm = get_realm('zulip')
|
|
|
|
realm.icon_source = Realm.ICON_UPLOADED
|
|
|
|
realm.save()
|
|
|
|
|
|
|
|
result = self.client_delete("/json/realm/icon")
|
|
|
|
|
|
|
|
self.assert_json_success(result)
|
2017-08-16 09:52:16 +02:00
|
|
|
self.assertIn("icon_url", result.json())
|
2017-02-21 03:41:20 +01:00
|
|
|
realm = get_realm('zulip')
|
2017-08-16 09:52:16 +02:00
|
|
|
self.assertEqual(result.json()["icon_url"], realm_icon_url(realm))
|
2017-02-21 03:41:20 +01:00
|
|
|
self.assertEqual(realm.icon_source, Realm.ICON_FROM_GRAVATAR)
|
|
|
|
|
2017-11-05 10:51:25 +01:00
|
|
|
def test_realm_icon_version(self) -> None:
|
2017-05-25 01:44:04 +02:00
|
|
|
self.login(self.example_email("iago"))
|
2017-02-21 03:41:20 +01:00
|
|
|
realm = get_realm('zulip')
|
|
|
|
icon_version = realm.icon_version
|
|
|
|
self.assertEqual(icon_version, 1)
|
|
|
|
with get_test_image_file(self.correct_files[0][0]) as fp:
|
2017-07-05 19:02:54 +02:00
|
|
|
self.client_post("/json/realm/icon", {'file': fp})
|
2017-02-21 03:41:20 +01:00
|
|
|
realm = get_realm('zulip')
|
|
|
|
self.assertEqual(realm.icon_version, icon_version + 1)
|
|
|
|
|
2017-11-05 10:51:25 +01:00
|
|
|
def test_realm_icon_upload_file_size_error(self) -> None:
|
2017-05-25 01:44:04 +02:00
|
|
|
self.login(self.example_email("iago"))
|
2017-03-06 06:22:28 +01:00
|
|
|
with get_test_image_file(self.correct_files[0][0]) as fp:
|
|
|
|
with self.settings(MAX_ICON_FILE_SIZE=0):
|
2017-07-05 19:02:54 +02:00
|
|
|
result = self.client_post("/json/realm/icon", {'file': fp})
|
2017-03-06 06:22:28 +01:00
|
|
|
self.assert_json_error(result, "Uploaded file is larger than the allowed limit of 0 MB")
|
|
|
|
|
2017-11-05 10:51:25 +01:00
|
|
|
def tearDown(self) -> None:
|
2017-02-21 03:41:20 +01:00
|
|
|
destroy_uploads()
|
|
|
|
|
2017-02-16 10:10:37 +01:00
|
|
|
class LocalStorageTest(UploadSerializeMixin, ZulipTestCase):
|
2016-03-24 20:24:01 +01:00
|
|
|
|
2017-11-05 10:51:25 +01:00
|
|
|
def test_file_upload_local(self) -> None:
|
2017-05-07 21:25:59 +02:00
|
|
|
user_profile = self.example_user('hamlet')
|
2018-03-28 18:14:17 +02:00
|
|
|
uri = upload_message_file(u'dummy.txt', len(b'zulip!'), u'text/plain', b'zulip!', user_profile)
|
2016-04-20 21:50:56 +02:00
|
|
|
|
|
|
|
base = '/user_uploads/'
|
2016-12-16 02:01:34 +01:00
|
|
|
self.assertEqual(base, uri[:len(base)])
|
2016-04-20 21:50:56 +02:00
|
|
|
path_id = re.sub('/user_uploads/', '', uri)
|
|
|
|
file_path = os.path.join(settings.LOCAL_UPLOADS_DIR, 'files', path_id)
|
|
|
|
self.assertTrue(os.path.isfile(file_path))
|
|
|
|
|
2017-02-26 11:03:45 +01:00
|
|
|
uploaded_file = Attachment.objects.get(owner=user_profile, path_id=path_id)
|
|
|
|
self.assertEqual(len(b'zulip!'), uploaded_file.size)
|
|
|
|
|
2017-11-05 10:51:25 +01:00
|
|
|
def test_delete_message_image_local(self) -> None:
|
2017-05-25 01:40:26 +02:00
|
|
|
self.login(self.example_email("hamlet"))
|
2016-04-20 21:50:56 +02:00
|
|
|
fp = StringIO("zulip!")
|
|
|
|
fp.name = "zulip.txt"
|
2017-07-31 20:52:17 +02:00
|
|
|
result = self.client_post("/json/user_uploads", {'file': fp})
|
2016-04-20 21:50:56 +02:00
|
|
|
|
2017-08-16 09:52:16 +02:00
|
|
|
path_id = re.sub('/user_uploads/', '', result.json()['uri'])
|
2016-06-09 07:53:35 +02:00
|
|
|
self.assertTrue(delete_message_image(path_id))
|
2016-04-20 21:50:56 +02:00
|
|
|
|
2018-05-15 00:10:30 +02:00
|
|
|
def test_emoji_upload_local(self) -> None:
|
|
|
|
user_profile = self.example_user("hamlet")
|
|
|
|
image_file = get_test_image_file("img.png")
|
|
|
|
file_name = "emoji.png"
|
|
|
|
|
|
|
|
upload_emoji_image(image_file, file_name, user_profile)
|
|
|
|
|
|
|
|
emoji_path = RealmEmoji.PATH_ID_TEMPLATE.format(
|
|
|
|
realm_id=user_profile.realm_id,
|
|
|
|
emoji_file_name=file_name,
|
|
|
|
)
|
|
|
|
|
|
|
|
file_path = os.path.join(settings.LOCAL_UPLOADS_DIR, "avatars", emoji_path)
|
|
|
|
image_file.seek(0)
|
|
|
|
self.assertEqual(image_file.read(), open(file_path + ".original", "rb").read())
|
|
|
|
|
|
|
|
resized_image = Image.open(open(file_path, "rb"))
|
|
|
|
expected_size = (DEFAULT_EMOJI_SIZE, DEFAULT_EMOJI_SIZE)
|
|
|
|
self.assertEqual(expected_size, resized_image.size)
|
|
|
|
|
2018-05-15 00:25:06 +02:00
|
|
|
def test_get_emoji_url_local(self) -> None:
|
|
|
|
user_profile = self.example_user("hamlet")
|
|
|
|
image_file = get_test_image_file("img.png")
|
|
|
|
file_name = "emoji.png"
|
|
|
|
|
|
|
|
upload_emoji_image(image_file, file_name, user_profile)
|
|
|
|
url = zerver.lib.upload.upload_backend.get_emoji_url(file_name, user_profile.realm_id)
|
|
|
|
|
|
|
|
emoji_path = RealmEmoji.PATH_ID_TEMPLATE.format(
|
|
|
|
realm_id=user_profile.realm_id,
|
|
|
|
emoji_file_name=file_name,
|
|
|
|
)
|
|
|
|
expected_url = "/user_avatars/{emoji_path}".format(emoji_path=emoji_path)
|
|
|
|
self.assertEqual(expected_url, url)
|
|
|
|
|
2017-11-05 10:51:25 +01:00
|
|
|
def tearDown(self) -> None:
|
2016-04-20 21:50:56 +02:00
|
|
|
destroy_uploads()
|
2016-03-24 20:24:01 +01:00
|
|
|
|
2018-05-15 00:10:30 +02:00
|
|
|
|
2016-08-23 02:08:42 +02:00
|
|
|
class S3Test(ZulipTestCase):
|
2016-04-14 16:26:01 +02:00
|
|
|
|
2016-06-09 07:53:35 +02:00
|
|
|
@use_s3_backend
|
2017-11-05 10:51:25 +01:00
|
|
|
def test_file_upload_s3(self) -> None:
|
2016-04-20 21:51:21 +02:00
|
|
|
conn = S3Connection(settings.S3_KEY, settings.S3_SECRET_KEY)
|
|
|
|
bucket = conn.create_bucket(settings.S3_AUTH_UPLOADS_BUCKET)
|
|
|
|
|
2017-05-07 21:25:59 +02:00
|
|
|
user_profile = self.example_user('hamlet')
|
2018-03-28 18:14:17 +02:00
|
|
|
uri = upload_message_file(u'dummy.txt', len(b'zulip!'), u'text/plain', b'zulip!', user_profile)
|
2016-04-20 21:51:21 +02:00
|
|
|
|
|
|
|
base = '/user_uploads/'
|
2016-12-16 02:01:34 +01:00
|
|
|
self.assertEqual(base, uri[:len(base)])
|
2016-04-20 21:51:21 +02:00
|
|
|
path_id = re.sub('/user_uploads/', '', uri)
|
2017-02-26 11:03:45 +01:00
|
|
|
content = bucket.get_key(path_id).get_contents_as_string()
|
|
|
|
self.assertEqual(b"zulip!", content)
|
|
|
|
|
|
|
|
uploaded_file = Attachment.objects.get(owner=user_profile, path_id=path_id)
|
|
|
|
self.assertEqual(len(b"zulip!"), uploaded_file.size)
|
2016-04-20 21:51:21 +02:00
|
|
|
|
2017-08-25 06:01:29 +02:00
|
|
|
self.subscribe(self.example_user("hamlet"), "Denmark")
|
2016-06-14 04:38:30 +02:00
|
|
|
body = "First message ...[zulip.txt](http://localhost:9991" + uri + ")"
|
2017-10-28 16:17:29 +02:00
|
|
|
self.send_stream_message(self.example_email("hamlet"), "Denmark", body, "test")
|
2016-06-14 04:38:30 +02:00
|
|
|
self.assertIn('title="dummy.txt"', self.get_last_message().rendered_content)
|
|
|
|
|
2018-05-14 20:46:59 +02:00
|
|
|
@use_s3_backend
|
|
|
|
def test_file_upload_s3_with_undefined_content_type(self) -> None:
|
|
|
|
conn = S3Connection(settings.S3_KEY, settings.S3_SECRET_KEY)
|
|
|
|
bucket = conn.create_bucket(settings.S3_AUTH_UPLOADS_BUCKET)
|
|
|
|
|
|
|
|
user_profile = self.example_user('hamlet')
|
|
|
|
uri = upload_message_file(u'dummy.txt', len(b'zulip!'), None, b'zulip!', user_profile)
|
|
|
|
|
|
|
|
path_id = re.sub('/user_uploads/', '', uri)
|
|
|
|
self.assertEqual(b"zulip!", bucket.get_key(path_id).get_contents_as_string())
|
|
|
|
uploaded_file = Attachment.objects.get(owner=user_profile, path_id=path_id)
|
|
|
|
self.assertEqual(len(b"zulip!"), uploaded_file.size)
|
|
|
|
|
2016-06-09 07:53:35 +02:00
|
|
|
@use_s3_backend
|
2017-11-05 10:51:25 +01:00
|
|
|
def test_message_image_delete_s3(self) -> None:
|
2016-04-20 21:51:21 +02:00
|
|
|
conn = S3Connection(settings.S3_KEY, settings.S3_SECRET_KEY)
|
|
|
|
conn.create_bucket(settings.S3_AUTH_UPLOADS_BUCKET)
|
|
|
|
|
2017-05-07 21:25:59 +02:00
|
|
|
user_profile = self.example_user('hamlet')
|
2018-03-28 18:14:17 +02:00
|
|
|
uri = upload_message_file(u'dummy.txt', len(b'zulip!'), u'text/plain', b'zulip!', user_profile)
|
2016-04-20 21:51:21 +02:00
|
|
|
|
|
|
|
path_id = re.sub('/user_uploads/', '', uri)
|
2016-06-09 07:53:35 +02:00
|
|
|
self.assertTrue(delete_message_image(path_id))
|
2016-04-20 21:51:21 +02:00
|
|
|
|
2018-05-14 21:56:49 +02:00
|
|
|
@use_s3_backend
|
|
|
|
def test_message_image_delete_when_file_doesnt_exist(self) -> None:
|
|
|
|
self.assertEqual(False, delete_message_image('non-existant-file'))
|
|
|
|
|
2016-06-09 07:53:35 +02:00
|
|
|
@use_s3_backend
|
2017-11-05 10:51:25 +01:00
|
|
|
def test_file_upload_authed(self) -> None:
|
2016-04-14 16:26:01 +02:00
|
|
|
"""
|
2017-07-31 20:52:17 +02:00
|
|
|
A call to /json/user_uploads should return a uri and actually create an object.
|
2016-04-14 16:26:01 +02:00
|
|
|
"""
|
2016-06-09 07:53:35 +02:00
|
|
|
conn = S3Connection(settings.S3_KEY, settings.S3_SECRET_KEY)
|
|
|
|
conn.create_bucket(settings.S3_AUTH_UPLOADS_BUCKET)
|
|
|
|
|
2017-05-25 01:40:26 +02:00
|
|
|
self.login(self.example_email("hamlet"))
|
2016-04-14 16:26:01 +02:00
|
|
|
fp = StringIO("zulip!")
|
|
|
|
fp.name = "zulip.txt"
|
|
|
|
|
2017-07-31 20:52:17 +02:00
|
|
|
result = self.client_post("/json/user_uploads", {'file': fp})
|
2016-04-14 16:26:01 +02:00
|
|
|
self.assert_json_success(result)
|
2017-08-16 09:52:16 +02:00
|
|
|
self.assertIn("uri", result.json())
|
2016-04-14 16:26:01 +02:00
|
|
|
base = '/user_uploads/'
|
2017-08-16 09:52:16 +02:00
|
|
|
uri = result.json()['uri']
|
2016-12-16 02:01:34 +01:00
|
|
|
self.assertEqual(base, uri[:len(base)])
|
2016-04-14 16:26:01 +02:00
|
|
|
|
2016-07-28 00:38:45 +02:00
|
|
|
response = self.client_get(uri)
|
2016-04-14 16:26:01 +02:00
|
|
|
redirect_url = response['Location']
|
|
|
|
|
2017-08-07 02:01:59 +02:00
|
|
|
self.assertEqual(b"zulip!", urllib.request.urlopen(redirect_url).read().strip())
|
2016-04-14 16:26:01 +02:00
|
|
|
|
2017-08-25 06:01:29 +02:00
|
|
|
self.subscribe(self.example_user("hamlet"), "Denmark")
|
2016-06-14 04:38:30 +02:00
|
|
|
body = "First message ...[zulip.txt](http://localhost:9991" + uri + ")"
|
2017-10-28 16:17:29 +02:00
|
|
|
self.send_stream_message(self.example_email("hamlet"), "Denmark", body, "test")
|
2016-06-14 04:38:30 +02:00
|
|
|
self.assertIn('title="zulip.txt"', self.get_last_message().rendered_content)
|
|
|
|
|
2017-12-21 09:37:59 +01:00
|
|
|
@use_s3_backend
|
|
|
|
def test_upload_avatar_image(self) -> None:
|
|
|
|
conn = S3Connection(settings.S3_KEY, settings.S3_SECRET_KEY)
|
|
|
|
bucket = conn.create_bucket(settings.S3_AVATAR_BUCKET)
|
|
|
|
|
|
|
|
user_profile = self.example_user('hamlet')
|
|
|
|
path_id = user_avatar_path(user_profile)
|
|
|
|
original_image_path_id = path_id + ".original"
|
|
|
|
medium_path_id = path_id + "-medium.png"
|
|
|
|
|
|
|
|
with get_test_image_file('img.png') as image_file:
|
|
|
|
zerver.lib.upload.upload_backend.upload_avatar_image(image_file, user_profile, user_profile)
|
|
|
|
test_image_data = open(get_test_image_file('img.png').name, 'rb').read()
|
|
|
|
test_medium_image_data = resize_avatar(test_image_data, MEDIUM_AVATAR_SIZE)
|
|
|
|
|
|
|
|
original_image_key = bucket.get_key(original_image_path_id)
|
|
|
|
self.assertEqual(original_image_key.key, original_image_path_id)
|
|
|
|
image_data = original_image_key.get_contents_as_string()
|
|
|
|
self.assertEqual(image_data, test_image_data)
|
|
|
|
|
|
|
|
medium_image_key = bucket.get_key(medium_path_id)
|
|
|
|
self.assertEqual(medium_image_key.key, medium_path_id)
|
|
|
|
medium_image_data = medium_image_key.get_contents_as_string()
|
|
|
|
self.assertEqual(medium_image_data, test_medium_image_data)
|
|
|
|
bucket.delete_key(medium_image_key)
|
|
|
|
|
|
|
|
zerver.lib.upload.upload_backend.ensure_medium_avatar_image(user_profile)
|
|
|
|
medium_image_key = bucket.get_key(medium_path_id)
|
|
|
|
self.assertEqual(medium_image_key.key, medium_path_id)
|
|
|
|
|
2018-06-06 14:30:26 +02:00
|
|
|
@use_s3_backend
|
|
|
|
def test_copy_avatar_image(self) -> None:
|
|
|
|
conn = S3Connection(settings.S3_KEY, settings.S3_SECRET_KEY)
|
|
|
|
bucket = conn.create_bucket(settings.S3_AVATAR_BUCKET)
|
|
|
|
|
|
|
|
self.login(self.example_email("hamlet"))
|
|
|
|
with get_test_image_file('img.png') as image_file:
|
|
|
|
self.client_post("/json/users/me/avatar", {'file': image_file})
|
|
|
|
|
|
|
|
source_user_profile = self.example_user('hamlet')
|
|
|
|
target_user_profile = self.example_user('othello')
|
|
|
|
|
|
|
|
copy_user_settings(source_user_profile, target_user_profile)
|
|
|
|
|
|
|
|
source_path_id = user_avatar_path(source_user_profile)
|
|
|
|
target_path_id = user_avatar_path(target_user_profile)
|
|
|
|
self.assertNotEqual(source_path_id, target_path_id)
|
|
|
|
|
|
|
|
source_image_key = bucket.get_key(source_path_id)
|
|
|
|
target_image_key = bucket.get_key(target_path_id)
|
|
|
|
self.assertEqual(target_image_key.key, target_path_id)
|
|
|
|
self.assertEqual(source_image_key.content_type, target_image_key.content_type)
|
|
|
|
source_image_data = source_image_key.get_contents_as_string()
|
|
|
|
target_image_data = target_image_key.get_contents_as_string()
|
|
|
|
self.assertEqual(source_image_data, target_image_data)
|
|
|
|
|
|
|
|
source_original_image_path_id = source_path_id + ".original"
|
|
|
|
target_original_image_path_id = target_path_id + ".original"
|
|
|
|
target_original_image_key = bucket.get_key(target_original_image_path_id)
|
|
|
|
self.assertEqual(target_original_image_key.key, target_original_image_path_id)
|
|
|
|
source_original_image_key = bucket.get_key(source_original_image_path_id)
|
|
|
|
self.assertEqual(source_original_image_key.content_type, target_original_image_key.content_type)
|
|
|
|
source_image_data = source_original_image_key.get_contents_as_string()
|
|
|
|
target_image_data = target_original_image_key.get_contents_as_string()
|
|
|
|
self.assertEqual(source_image_data, target_image_data)
|
|
|
|
|
|
|
|
target_medium_path_id = target_path_id + "-medium.png"
|
|
|
|
source_medium_path_id = source_path_id + "-medium.png"
|
|
|
|
source_medium_image_key = bucket.get_key(source_medium_path_id)
|
|
|
|
target_medium_image_key = bucket.get_key(target_medium_path_id)
|
|
|
|
self.assertEqual(target_medium_image_key.key, target_medium_path_id)
|
|
|
|
self.assertEqual(source_medium_image_key.content_type, target_medium_image_key.content_type)
|
|
|
|
source_medium_image_data = source_medium_image_key.get_contents_as_string()
|
|
|
|
target_medium_image_data = target_medium_image_key.get_contents_as_string()
|
|
|
|
self.assertEqual(source_medium_image_data, target_medium_image_data)
|
|
|
|
|
2018-04-15 23:43:48 +02:00
|
|
|
@use_s3_backend
|
|
|
|
def test_get_realm_for_filename(self) -> None:
|
|
|
|
conn = S3Connection(settings.S3_KEY, settings.S3_SECRET_KEY)
|
|
|
|
conn.create_bucket(settings.S3_AUTH_UPLOADS_BUCKET)
|
|
|
|
|
|
|
|
user_profile = self.example_user('hamlet')
|
|
|
|
uri = upload_message_file(u'dummy.txt', len(b'zulip!'), u'text/plain', b'zulip!', user_profile)
|
|
|
|
path_id = re.sub('/user_uploads/', '', uri)
|
|
|
|
self.assertEqual(user_profile.realm_id, get_realm_for_filename(path_id))
|
|
|
|
|
2018-05-14 19:53:26 +02:00
|
|
|
@use_s3_backend
|
|
|
|
def test_get_realm_for_filename_when_key_doesnt_exist(self) -> None:
|
|
|
|
self.assertEqual(None, get_realm_for_filename('non-existent-file-path'))
|
|
|
|
|
2018-05-14 22:55:03 +02:00
|
|
|
@use_s3_backend
|
|
|
|
def test_upload_realm_icon_image(self) -> None:
|
|
|
|
conn = S3Connection(settings.S3_KEY, settings.S3_SECRET_KEY)
|
|
|
|
bucket = conn.create_bucket(settings.S3_AVATAR_BUCKET)
|
|
|
|
|
|
|
|
user_profile = self.example_user("hamlet")
|
|
|
|
image_file = get_test_image_file("img.png")
|
|
|
|
zerver.lib.upload.upload_backend.upload_realm_icon_image(image_file, user_profile)
|
|
|
|
|
|
|
|
original_path_id = os.path.join(str(user_profile.realm.id), "realm", "icon.original")
|
|
|
|
original_key = bucket.get_key(original_path_id)
|
|
|
|
image_file.seek(0)
|
|
|
|
self.assertEqual(image_file.read(), original_key.get_contents_as_string())
|
|
|
|
|
|
|
|
resized_path_id = os.path.join(str(user_profile.realm.id), "realm", "icon.png")
|
|
|
|
resized_data = bucket.get_key(resized_path_id).read()
|
2018-05-14 23:23:32 +02:00
|
|
|
resized_image = Image.open(io.BytesIO(resized_data)).size
|
|
|
|
self.assertEqual(resized_image, (DEFAULT_AVATAR_SIZE, DEFAULT_AVATAR_SIZE))
|
|
|
|
|
|
|
|
@use_s3_backend
|
|
|
|
def test_upload_emoji_image(self) -> None:
|
|
|
|
conn = S3Connection(settings.S3_KEY, settings.S3_SECRET_KEY)
|
|
|
|
bucket = conn.create_bucket(settings.S3_AVATAR_BUCKET)
|
|
|
|
|
|
|
|
user_profile = self.example_user("hamlet")
|
|
|
|
image_file = get_test_image_file("img.png")
|
|
|
|
emoji_name = "emoji.png"
|
|
|
|
zerver.lib.upload.upload_backend.upload_emoji_image(image_file, emoji_name, user_profile)
|
|
|
|
|
|
|
|
emoji_path = RealmEmoji.PATH_ID_TEMPLATE.format(
|
|
|
|
realm_id=user_profile.realm_id,
|
|
|
|
emoji_file_name=emoji_name,
|
|
|
|
)
|
|
|
|
original_key = bucket.get_key(emoji_path + ".original")
|
|
|
|
image_file.seek(0)
|
|
|
|
self.assertEqual(image_file.read(), original_key.get_contents_as_string())
|
|
|
|
|
|
|
|
resized_data = bucket.get_key(emoji_path).read()
|
|
|
|
resized_image = Image.open(io.BytesIO(resized_data))
|
|
|
|
self.assertEqual(resized_image.size, (DEFAULT_EMOJI_SIZE, DEFAULT_EMOJI_SIZE))
|
2018-05-14 22:55:03 +02:00
|
|
|
|
2018-05-14 23:37:02 +02:00
|
|
|
@use_s3_backend
|
|
|
|
def test_get_emoji_url(self) -> None:
|
|
|
|
emoji_name = "emoji.png"
|
|
|
|
realm_id = 1
|
|
|
|
bucket = settings.S3_AVATAR_BUCKET
|
|
|
|
path = RealmEmoji.PATH_ID_TEMPLATE.format(realm_id=realm_id, emoji_file_name=emoji_name)
|
|
|
|
|
|
|
|
url = zerver.lib.upload.upload_backend.get_emoji_url('emoji.png', realm_id)
|
|
|
|
|
|
|
|
expected_url = "https://{bucket}.s3.amazonaws.com/{path}".format(bucket=bucket, path=path)
|
|
|
|
self.assertEqual(expected_url, url)
|
|
|
|
|
2018-05-14 19:53:26 +02:00
|
|
|
|
2016-06-14 04:38:30 +02:00
|
|
|
class UploadTitleTests(TestCase):
|
2017-11-05 10:51:25 +01:00
|
|
|
def test_upload_titles(self) -> None:
|
2016-06-14 04:38:30 +02:00
|
|
|
self.assertEqual(url_filename("http://localhost:9991/user_uploads/1/LUeQZUG5jxkagzVzp1Ox_amr/dummy.txt"), "dummy.txt")
|
|
|
|
self.assertEqual(url_filename("http://localhost:9991/user_uploads/1/94/SzGYe0RFT-tEcOhQ6n-ZblFZ/zulip.txt"), "zulip.txt")
|
|
|
|
self.assertEqual(url_filename("https://zulip.com/user_uploads/4142/LUeQZUG5jxkagzVzp1Ox_amr/pasted_image.png"), "pasted_image.png")
|
2016-08-23 02:25:40 +02:00
|
|
|
self.assertEqual(url_filename("https://zulipchat.com/integrations"), "https://zulipchat.com/integrations")
|
2016-06-14 04:38:30 +02:00
|
|
|
self.assertEqual(url_filename("https://example.com"), "https://example.com")
|
|
|
|
|
2016-04-14 16:26:01 +02:00
|
|
|
class SanitizeNameTests(TestCase):
|
2017-11-05 10:51:25 +01:00
|
|
|
def test_file_name(self) -> None:
|
2016-12-16 02:01:34 +01:00
|
|
|
self.assertEqual(sanitize_name(u'test.txt'), u'test.txt')
|
|
|
|
self.assertEqual(sanitize_name(u'.hidden'), u'.hidden')
|
|
|
|
self.assertEqual(sanitize_name(u'.hidden.txt'), u'.hidden.txt')
|
|
|
|
self.assertEqual(sanitize_name(u'tarball.tar.gz'), u'tarball.tar.gz')
|
|
|
|
self.assertEqual(sanitize_name(u'.hidden_tarball.tar.gz'), u'.hidden_tarball.tar.gz')
|
|
|
|
self.assertEqual(sanitize_name(u'Testing{}*&*#().ta&&%$##&&r.gz'), u'Testing.tar.gz')
|
|
|
|
self.assertEqual(sanitize_name(u'*testingfile?*.txt'), u'testingfile.txt')
|
|
|
|
self.assertEqual(sanitize_name(u'snowman☃.txt'), u'snowman.txt')
|
|
|
|
self.assertEqual(sanitize_name(u'테스트.txt'), u'테스트.txt')
|
|
|
|
self.assertEqual(sanitize_name(u'~/."\`\?*"u0`000ssh/test.t**{}ar.gz'), u'.u0000sshtest.tar.gz')
|
2018-05-14 21:33:51 +02:00
|
|
|
|
|
|
|
|
|
|
|
class UploadSpaceTests(UploadSerializeMixin, ZulipTestCase):
|
|
|
|
def setUp(self) -> None:
|
|
|
|
self.realm = get_realm("zulip")
|
|
|
|
self.user_profile = self.example_user('hamlet')
|
|
|
|
|
|
|
|
def test_currently_used_upload_space(self) -> None:
|
|
|
|
self.assertEqual(0, currently_used_upload_space(self.realm))
|
|
|
|
|
|
|
|
data = b'zulip!'
|
|
|
|
upload_message_file(u'dummy.txt', len(data), u'text/plain', data, self.user_profile)
|
|
|
|
self.assertEqual(len(data), currently_used_upload_space(self.realm))
|
|
|
|
|
|
|
|
data2 = b'more-data!'
|
|
|
|
upload_message_file(u'dummy2.txt', len(data2), u'text/plain', data2, self.user_profile)
|
|
|
|
self.assertEqual(len(data) + len(data2), currently_used_upload_space(self.realm))
|
2018-05-29 17:29:57 +02:00
|
|
|
|
|
|
|
class ExifRotateTests(TestCase):
|
|
|
|
def test_image_do_not_rotate(self) -> None:
|
|
|
|
# Image does not have _getexif method.
|
|
|
|
img_data = get_test_image_file('img.png').read()
|
|
|
|
img = Image.open(io.BytesIO(img_data))
|
|
|
|
result = exif_rotate(img)
|
2018-05-31 13:25:55 +02:00
|
|
|
self.assertEqual(result, img)
|
|
|
|
|
|
|
|
# Image with no exif data.
|
|
|
|
img_data = get_test_image_file('img_no_exif.jpg').read()
|
|
|
|
img = Image.open(io.BytesIO(img_data))
|
|
|
|
result = exif_rotate(img)
|
2018-05-29 17:29:57 +02:00
|
|
|
self.assertEqual(result, img)
|
|
|
|
|
|
|
|
# Orientation of the image is 1.
|
|
|
|
img_data = get_test_image_file('img.jpg').read()
|
|
|
|
img = Image.open(io.BytesIO(img_data))
|
|
|
|
result = exif_rotate(img)
|
|
|
|
self.assertEqual(result, img)
|
|
|
|
|
|
|
|
def test_image_rotate(self) -> None:
|
|
|
|
with mock.patch('PIL.Image.Image.rotate') as rotate:
|
|
|
|
img_data = get_test_image_file('img_orientation_3.jpg').read()
|
|
|
|
img = Image.open(io.BytesIO(img_data))
|
|
|
|
exif_rotate(img)
|
|
|
|
rotate.assert_called_with(180, expand=True)
|
|
|
|
|
|
|
|
img_data = get_test_image_file('img_orientation_6.jpg').read()
|
|
|
|
img = Image.open(io.BytesIO(img_data))
|
|
|
|
exif_rotate(img)
|
|
|
|
rotate.assert_called_with(270, expand=True)
|
|
|
|
|
|
|
|
img_data = get_test_image_file('img_orientation_8.jpg').read()
|
|
|
|
img = Image.open(io.BytesIO(img_data))
|
|
|
|
exif_rotate(img)
|
|
|
|
rotate.assert_called_with(90, expand=True)
|