2016-04-14 16:26:01 +02:00
# -*- coding: utf-8 -*-
from __future__ import absolute_import
from django . conf import settings
2016-06-09 07:53:35 +02:00
from django . test import TestCase , override_settings
2016-04-14 16:26:01 +02:00
from unittest import skip
2016-07-13 01:56:59 +02:00
from zerver . lib . avatar import avatar_url
2016-06-14 04:38:30 +02:00
from zerver . lib . bugdown import url_filename
2017-02-21 03:41:20 +01:00
from zerver . lib . realm_icon import realm_icon_url
2017-02-16 10:10:37 +01:00
from zerver . lib . test_classes import ZulipTestCase , UploadSerializeMixin
2017-03-08 19:47:42 +01:00
from zerver . lib . test_helpers import (
avatar_disk_path ,
get_test_image_file ,
POSTRequestMock ,
)
2016-04-14 16:26:01 +02:00
from zerver . lib . test_runner import slow
2016-06-09 07:53:35 +02:00
from zerver . lib . upload import sanitize_name , S3UploadBackend , \
2017-03-21 23:53:54 +01:00
upload_message_image , delete_message_image , LocalUploadBackend , \
ZulipUploadBackend
2016-06-09 07:53:35 +02:00
import zerver . lib . upload
2016-03-24 20:24:01 +01:00
from zerver . models import Attachment , Recipient , get_user_profile_by_email , \
2016-06-17 19:48:17 +02:00
get_old_unclaimed_attachments , Message , UserProfile , Stream , Realm , \
RealmDomain , get_realm
2016-03-24 20:24:01 +01:00
from zerver . lib . actions import do_delete_old_unclaimed_attachments
2016-06-17 19:48:17 +02:00
from zilencer . models import Deployment
2016-04-14 16:26:01 +02:00
2017-03-08 19:47:42 +01:00
from zerver . views . upload import upload_file_backend
2016-04-14 16:26:01 +02:00
import ujson
from six . moves import urllib
2016-11-07 05:02:13 +01:00
from PIL import Image
2016-04-14 16:26:01 +02:00
from boto . s3 . connection import S3Connection
from boto . s3 . key import Key
2016-10-11 19:38:22 +02:00
from six . moves import StringIO as _StringIO
2016-09-16 17:11:54 +02:00
import mock
2016-04-14 23:44:39 +02:00
import os
2016-11-07 05:02:13 +01:00
import io
2016-04-14 23:44:39 +02:00
import shutil
2016-03-24 20:24:01 +01:00
import re
import datetime
2016-06-25 11:05:59 +02:00
import requests
import base64
2016-03-24 20:24:01 +01:00
from datetime import timedelta
2017-04-15 04:03:56 +02:00
from django . utils . timezone import now as timezone_now
2016-03-24 20:24:01 +01:00
from moto import mock_s3
2016-04-14 16:26:01 +02:00
2016-12-04 18:38:56 +01:00
from typing import Any , Callable , TypeVar , Text
2016-10-11 19:38:22 +02:00
2016-04-17 23:51:49 +02:00
def destroy_uploads ( ) :
2016-06-04 19:54:34 +02:00
# type: () -> None
2016-04-17 23:51:49 +02:00
if os . path . exists ( settings . LOCAL_UPLOADS_DIR ) :
shutil . rmtree ( settings . LOCAL_UPLOADS_DIR )
2016-10-11 19:38:22 +02:00
class StringIO ( _StringIO ) :
name = ' ' # https://github.com/python/typeshed/issues/598
2017-02-16 10:10:37 +01:00
class FileUploadTest ( UploadSerializeMixin , ZulipTestCase ) :
2016-06-25 11:05:59 +02:00
def test_rest_endpoint ( self ) :
# type: () -> None
"""
Tests the / api / v1 / user_uploads api endpoint . Here a single file is uploaded
and downloaded using a username and api_key
"""
fp = StringIO ( " zulip! " )
fp . name = " zulip.txt "
# Upload file via API
auth_headers = self . api_auth ( ' hamlet@zulip.com ' )
2016-07-28 00:30:22 +02:00
result = self . client_post ( ' /api/v1/user_uploads ' , { ' file ' : fp } , * * auth_headers )
2016-06-25 11:05:59 +02:00
json = ujson . loads ( result . content )
self . assertIn ( " uri " , json )
uri = json [ " uri " ]
base = ' /user_uploads/ '
2016-12-16 02:01:34 +01:00
self . assertEqual ( base , uri [ : len ( base ) ] )
2016-06-25 11:05:59 +02:00
2016-06-27 16:41:58 +02:00
# Download file via API
2017-04-18 03:23:32 +02:00
self . logout ( )
2016-07-28 00:38:45 +02:00
response = self . client_get ( uri , * * auth_headers )
2016-07-13 22:09:27 +02:00
data = b " " . join ( response . streaming_content )
2016-12-16 02:01:34 +01:00
self . assertEqual ( b " zulip! " , data )
2016-06-27 16:41:58 +02:00
2016-06-25 11:05:59 +02:00
# Files uploaded through the API should be accesible via the web client
self . login ( " hamlet@zulip.com " )
2016-12-19 16:17:19 +01:00
self . assert_url_serves_contents_of_file ( uri , b " zulip! " )
2016-06-25 11:05:59 +02:00
2017-03-08 19:47:42 +01:00
def test_filename_encoding ( self ) :
# type: () -> None
"""
In Python 2 , we need to encode unicode filenames ( which converts them to
str ) before they can be rendered correctly . However , in Python 3 , the
separate unicode type does not exist , and we don ' t need to perform this
encoding . This test ensures that we handle filename encodings properly ,
and does so in a way that preserves 100 % test coverage for Python 3.
"""
2017-05-07 17:21:26 +02:00
user_profile = self . example_user ( ' hamlet ' )
2017-03-08 19:47:42 +01:00
mock_file = mock . Mock ( )
mock_file . _get_size = mock . Mock ( return_value = 1024 )
mock_files = mock . Mock ( )
mock_files . __len__ = mock . Mock ( return_value = 1 )
mock_files . values = mock . Mock ( return_value = [ mock_file ] )
mock_request = mock . Mock ( )
mock_request . FILES = mock_files
# str filenames should not be encoded.
mock_filename = mock . Mock ( spec = str )
mock_file . name = mock_filename
with mock . patch ( ' zerver.views.upload.upload_message_image_from_request ' ) :
result = upload_file_backend ( mock_request , user_profile )
self . assert_json_success ( result )
mock_filename . encode . assert_not_called ( )
# Non-str filenames should be encoded.
mock_filename = mock . Mock ( spec = None ) # None is not str
mock_file . name = mock_filename
with mock . patch ( ' zerver.views.upload.upload_message_image_from_request ' ) :
result = upload_file_backend ( mock_request , user_profile )
self . assert_json_success ( result )
mock_filename . encode . assert_called_once_with ( ' ascii ' )
2016-09-16 16:41:04 +02:00
def test_file_too_big_failure ( self ) :
# type: () -> None
"""
Attempting to upload big files should fail .
"""
self . login ( " hamlet@zulip.com " )
fp = StringIO ( " bah! " )
fp . name = " a.txt "
# Use MAX_FILE_UPLOAD_SIZE of 0, because the next increment
# would be 1MB.
with self . settings ( MAX_FILE_UPLOAD_SIZE = 0 ) :
result = self . client_post ( " /json/upload_file " , { ' f1 ' : fp } )
2017-01-29 00:08:08 +01:00
self . assert_json_error ( result , ' Uploaded file is larger than the allowed limit of 0 MB ' )
2016-09-16 16:41:04 +02:00
2016-04-14 16:26:01 +02:00
def test_multiple_upload_failure ( self ) :
2016-06-04 19:54:34 +02:00
# type: () -> None
2016-04-14 16:26:01 +02:00
"""
Attempting to upload two files should fail .
"""
self . login ( " hamlet@zulip.com " )
fp = StringIO ( " bah! " )
fp . name = " a.txt "
fp2 = StringIO ( " pshaw! " )
fp2 . name = " b.txt "
2016-07-28 00:30:22 +02:00
result = self . client_post ( " /json/upload_file " , { ' f1 ' : fp , ' f2 ' : fp2 } )
2016-04-14 16:26:01 +02:00
self . assert_json_error ( result , " You may only upload one file at a time " )
def test_no_file_upload_failure ( self ) :
2016-06-04 19:54:34 +02:00
# type: () -> None
2016-04-14 16:26:01 +02:00
"""
Calling this endpoint with no files should fail .
"""
self . login ( " hamlet@zulip.com " )
2016-07-28 00:30:22 +02:00
result = self . client_post ( " /json/upload_file " )
2016-04-14 16:26:01 +02:00
self . assert_json_error ( result , " You must specify a file to upload " )
2016-09-16 17:11:54 +02:00
def test_download_non_existent_file ( self ) :
# type: () -> None
self . login ( " hamlet@zulip.com " )
response = self . client_get ( ' /user_uploads/unk/nonexistent_file ' )
2016-12-16 02:01:34 +01:00
self . assertEqual ( response . status_code , 404 )
2016-09-16 17:11:54 +02:00
self . assertIn ( ' File not found ' , str ( response . content ) )
2016-03-24 20:24:01 +01:00
# This test will go through the code path for uploading files onto LOCAL storage
# when zulip is in DEVELOPMENT mode.
2016-04-14 23:44:39 +02:00
def test_file_upload_authed ( self ) :
2016-06-04 19:54:34 +02:00
# type: () -> None
2016-04-14 23:44:39 +02:00
"""
2016-03-24 20:24:01 +01:00
A call to / json / upload_file should return a uri and actually create an
entry in the database . This entry will be marked unclaimed till a message
refers it .
2016-04-14 23:44:39 +02:00
"""
self . login ( " hamlet@zulip.com " )
fp = StringIO ( " zulip! " )
fp . name = " zulip.txt "
2016-07-28 00:30:22 +02:00
result = self . client_post ( " /json/upload_file " , { ' file ' : fp } )
2016-04-14 23:44:39 +02:00
self . assert_json_success ( result )
json = ujson . loads ( result . content )
self . assertIn ( " uri " , json )
uri = json [ " uri " ]
base = ' /user_uploads/ '
2016-12-16 02:01:34 +01:00
self . assertEqual ( base , uri [ : len ( base ) ] )
2016-04-14 23:44:39 +02:00
2016-03-24 20:24:01 +01:00
# In the future, local file requests will follow the same style as S3
# requests; they will be first authenthicated and redirected
2016-12-19 16:17:19 +01:00
self . assert_url_serves_contents_of_file ( uri , b " zulip! " )
2016-04-14 23:44:39 +02:00
2016-03-24 20:24:01 +01:00
# check if DB has attachment marked as unclaimed
entry = Attachment . objects . get ( file_name = ' zulip.txt ' )
2016-12-16 02:01:34 +01:00
self . assertEqual ( entry . is_claimed ( ) , False )
2016-03-24 20:24:01 +01:00
2016-06-14 04:38:30 +02:00
self . subscribe_to_stream ( " hamlet@zulip.com " , " Denmark " )
body = " First message ...[zulip.txt](http://localhost:9991 " + uri + " ) "
self . send_message ( " hamlet@zulip.com " , " Denmark " , Recipient . STREAM , body , " test " )
self . assertIn ( ' title= " zulip.txt " ' , self . get_last_message ( ) . rendered_content )
2016-06-17 19:48:17 +02:00
def test_file_download_unauthed ( self ) :
# type: () -> None
self . login ( " hamlet@zulip.com " )
fp = StringIO ( " zulip! " )
fp . name = " zulip.txt "
result = self . client_post ( " /json/upload_file " , { ' file ' : fp } )
json = ujson . loads ( result . content )
uri = json [ " uri " ]
2017-04-18 03:23:32 +02:00
self . logout ( )
2016-06-17 19:48:17 +02:00
response = self . client_get ( uri )
self . assert_json_error ( response , " Not logged in: API authentication or user session required " ,
status_code = 401 )
def test_removed_file_download ( self ) :
# type: () -> None
'''
Trying to download deleted files should return 404 error
'''
self . login ( " hamlet@zulip.com " )
fp = StringIO ( " zulip! " )
fp . name = " zulip.txt "
result = self . client_post ( " /json/upload_file " , { ' file ' : fp } )
json = ujson . loads ( result . content )
uri = json [ " uri " ]
destroy_uploads ( )
response = self . client_get ( uri )
self . assertEqual ( response . status_code , 404 )
def test_non_existing_file_download ( self ) :
2016-06-17 19:48:17 +02:00
# type: () -> None
2016-06-17 19:48:17 +02:00
'''
Trying to download a file that was never uploaded will return a json_error
'''
self . login ( " hamlet@zulip.com " )
response = self . client_get ( " http://localhost:9991/user_uploads/1/ff/gg/abc.py " )
self . assertEqual ( response . status_code , 404 )
self . assert_in_response ( ' File not found. ' , response )
2016-06-27 21:09:56 +02:00
2016-06-17 19:48:17 +02:00
def test_delete_old_unclaimed_attachments ( self ) :
# type: () -> None
2016-03-24 20:24:01 +01:00
# Upload some files and make them older than a weeek
self . login ( " hamlet@zulip.com " )
d1 = StringIO ( " zulip! " )
d1 . name = " dummy_1.txt "
2016-07-28 00:30:22 +02:00
result = self . client_post ( " /json/upload_file " , { ' file ' : d1 } )
2016-03-24 20:24:01 +01:00
json = ujson . loads ( result . content )
uri = json [ " uri " ]
d1_path_id = re . sub ( ' /user_uploads/ ' , ' ' , uri )
d2 = StringIO ( " zulip! " )
d2 . name = " dummy_2.txt "
2016-07-28 00:30:22 +02:00
result = self . client_post ( " /json/upload_file " , { ' file ' : d2 } )
2016-03-24 20:24:01 +01:00
json = ujson . loads ( result . content )
uri = json [ " uri " ]
d2_path_id = re . sub ( ' /user_uploads/ ' , ' ' , uri )
2017-04-15 04:03:56 +02:00
two_week_ago = timezone_now ( ) - datetime . timedelta ( weeks = 2 )
2016-03-24 20:24:01 +01:00
d1_attachment = Attachment . objects . get ( path_id = d1_path_id )
d1_attachment . create_time = two_week_ago
d1_attachment . save ( )
2016-10-04 00:41:07 +02:00
self . assertEqual ( str ( d1_attachment ) , u ' <Attachment: dummy_1.txt> ' )
2016-03-24 20:24:01 +01:00
d2_attachment = Attachment . objects . get ( path_id = d2_path_id )
d2_attachment . create_time = two_week_ago
d2_attachment . save ( )
# Send message refering only dummy_1
self . subscribe_to_stream ( " hamlet@zulip.com " , " Denmark " )
body = " Some files here ...[zulip.txt](http://localhost:9991/user_uploads/ " + d1_path_id + " ) "
self . send_message ( " hamlet@zulip.com " , " Denmark " , Recipient . STREAM , body , " test " )
# dummy_2 should not exist in database or the uploads folder
do_delete_old_unclaimed_attachments ( 2 )
self . assertTrue ( not Attachment . objects . filter ( path_id = d2_path_id ) . exists ( ) )
self . assertTrue ( not delete_message_image ( d2_path_id ) )
2017-04-14 01:15:46 +02:00
def test_attachment_url_without_upload ( self ) :
# type: () -> None
self . login ( " hamlet@zulip.com " )
body = " Test message ...[zulip.txt](http://localhost:9991/user_uploads/1/64/fake_path_id.txt) "
self . send_message ( " hamlet@zulip.com " , " Denmark " , Recipient . STREAM , body , " test " )
self . assertFalse ( Attachment . objects . filter ( path_id = " 1/64/fake_path_id.txt " ) . exists ( ) )
2016-03-24 20:24:01 +01:00
def test_multiple_claim_attachments ( self ) :
2016-06-04 19:54:34 +02:00
# type: () -> None
2016-03-24 20:24:01 +01:00
"""
This test tries to claim the same attachment twice . The messages field in
the Attachment model should have both the messages in its entry .
"""
self . login ( " hamlet@zulip.com " )
d1 = StringIO ( " zulip! " )
d1 . name = " dummy_1.txt "
2016-07-28 00:30:22 +02:00
result = self . client_post ( " /json/upload_file " , { ' file ' : d1 } )
2016-03-24 20:24:01 +01:00
json = ujson . loads ( result . content )
uri = json [ " uri " ]
d1_path_id = re . sub ( ' /user_uploads/ ' , ' ' , uri )
self . subscribe_to_stream ( " hamlet@zulip.com " , " Denmark " )
body = " First message ...[zulip.txt](http://localhost:9991/user_uploads/ " + d1_path_id + " ) "
self . send_message ( " hamlet@zulip.com " , " Denmark " , Recipient . STREAM , body , " test " )
body = " Second message ...[zulip.txt](http://localhost:9991/user_uploads/ " + d1_path_id + " ) "
self . send_message ( " hamlet@zulip.com " , " Denmark " , Recipient . STREAM , body , " test " )
2016-12-16 02:01:34 +01:00
self . assertEqual ( Attachment . objects . get ( path_id = d1_path_id ) . messages . count ( ) , 2 )
2016-03-24 20:24:01 +01:00
2017-04-14 00:59:59 +02:00
def test_multiple_claim_attachments_different_owners ( self ) :
# type: () -> None
""" This test tries to claim the same attachment more than once, first
with a private stream and then with differnet recipients . """
self . login ( " hamlet@zulip.com " )
d1 = StringIO ( " zulip! " )
d1 . name = " dummy_1.txt "
result = self . client_post ( " /json/upload_file " , { ' file ' : d1 } )
json = ujson . loads ( result . content )
uri = json [ " uri " ]
d1_path_id = re . sub ( ' /user_uploads/ ' , ' ' , uri )
self . make_stream ( " private_stream " , invite_only = True )
self . subscribe_to_stream ( " hamlet@zulip.com " , " private_stream " )
# First, send the mesasge to the new private stream.
body = " First message ...[zulip.txt](http://localhost:9991/user_uploads/ " + d1_path_id + " ) "
self . send_message ( " hamlet@zulip.com " , " private_stream " , Recipient . STREAM , body , " test " )
self . assertFalse ( Attachment . objects . get ( path_id = d1_path_id ) . is_realm_public )
self . assertEqual ( Attachment . objects . get ( path_id = d1_path_id ) . messages . count ( ) , 1 )
# Then, try having a user who didn't receive the message try to publish it, and fail
body = " Illegal message ...[zulip.txt](http://localhost:9991/user_uploads/ " + d1_path_id + " ) "
self . send_message ( " cordelia@zulip.com " , " Denmark " , Recipient . STREAM , body , " test " )
self . assertEqual ( Attachment . objects . get ( path_id = d1_path_id ) . messages . count ( ) , 1 )
self . assertFalse ( Attachment . objects . get ( path_id = d1_path_id ) . is_realm_public )
# Then, have the owner PM it to another user, giving that other user access.
body = " Second message ...[zulip.txt](http://localhost:9991/user_uploads/ " + d1_path_id + " ) "
self . send_message ( " hamlet@zulip.com " , " othello@zulip.com " , Recipient . PERSONAL , body , " test " )
self . assertEqual ( Attachment . objects . get ( path_id = d1_path_id ) . messages . count ( ) , 2 )
self . assertFalse ( Attachment . objects . get ( path_id = d1_path_id ) . is_realm_public )
# Then, have that new recipient user publish it.
body = " Third message ...[zulip.txt](http://localhost:9991/user_uploads/ " + d1_path_id + " ) "
self . send_message ( " othello@zulip.com " , " Denmark " , Recipient . STREAM , body , " test " )
self . assertEqual ( Attachment . objects . get ( path_id = d1_path_id ) . messages . count ( ) , 3 )
self . assertTrue ( Attachment . objects . get ( path_id = d1_path_id ) . is_realm_public )
2016-07-07 09:47:15 +02:00
def test_check_attachment_reference_update ( self ) :
2016-10-11 19:38:22 +02:00
# type: () -> None
2016-07-07 09:47:15 +02:00
f1 = StringIO ( " file1 " )
f1 . name = " file1.txt "
f2 = StringIO ( " file2 " )
f2 . name = " file2.txt "
f3 = StringIO ( " file3 " )
f3 . name = " file3.txt "
self . login ( " hamlet@zulip.com " )
2016-07-28 00:30:22 +02:00
result = self . client_post ( " /json/upload_file " , { ' file ' : f1 } )
2016-07-07 09:47:15 +02:00
json = ujson . loads ( result . content )
uri = json [ " uri " ]
f1_path_id = re . sub ( ' /user_uploads/ ' , ' ' , uri )
2016-07-28 00:30:22 +02:00
result = self . client_post ( " /json/upload_file " , { ' file ' : f2 } )
2016-07-07 09:47:15 +02:00
json = ujson . loads ( result . content )
uri = json [ " uri " ]
f2_path_id = re . sub ( ' /user_uploads/ ' , ' ' , uri )
self . subscribe_to_stream ( " hamlet@zulip.com " , " test " )
body = ( " [f1.txt](http://localhost:9991/user_uploads/ " + f1_path_id + " ) "
2016-12-03 00:04:17 +01:00
" [f2.txt](http://localhost:9991/user_uploads/ " + f2_path_id + " ) " )
2016-07-07 09:47:15 +02:00
msg_id = self . send_message ( " hamlet@zulip.com " , " test " , Recipient . STREAM , body , " test " )
2016-07-28 00:30:22 +02:00
result = self . client_post ( " /json/upload_file " , { ' file ' : f3 } )
2016-07-07 09:47:15 +02:00
json = ujson . loads ( result . content )
uri = json [ " uri " ]
f3_path_id = re . sub ( ' /user_uploads/ ' , ' ' , uri )
new_body = ( " [f3.txt](http://localhost:9991/user_uploads/ " + f3_path_id + " ) "
2016-12-03 00:04:17 +01:00
" [f2.txt](http://localhost:9991/user_uploads/ " + f2_path_id + " ) " )
2016-12-22 10:17:49 +01:00
result = self . client_patch ( " /json/messages/ " + str ( msg_id ) , {
2016-07-07 09:47:15 +02:00
' message_id ' : msg_id ,
' content ' : new_body
} )
self . assert_json_success ( result )
message = Message . objects . get ( id = msg_id )
f1_attachment = Attachment . objects . get ( path_id = f1_path_id )
f2_attachment = Attachment . objects . get ( path_id = f2_path_id )
2016-07-24 22:03:22 +02:00
f3_attachment = Attachment . objects . get ( path_id = f3_path_id )
2016-07-07 09:47:15 +02:00
self . assertTrue ( message not in f1_attachment . messages . all ( ) )
self . assertTrue ( message in f2_attachment . messages . all ( ) )
self . assertTrue ( message in f3_attachment . messages . all ( ) )
2016-07-23 07:06:13 +02:00
# Delete all the attachments from the message
new_body = " (deleted) "
2016-12-22 10:17:49 +01:00
result = self . client_patch ( " /json/messages/ " + str ( msg_id ) , {
2016-07-23 07:06:13 +02:00
' message_id ' : msg_id ,
' content ' : new_body
} )
self . assert_json_success ( result )
message = Message . objects . get ( id = msg_id )
f1_attachment = Attachment . objects . get ( path_id = f1_path_id )
f2_attachment = Attachment . objects . get ( path_id = f2_path_id )
f3_attachment = Attachment . objects . get ( path_id = f3_path_id )
self . assertTrue ( message not in f1_attachment . messages . all ( ) )
self . assertTrue ( message not in f2_attachment . messages . all ( ) )
self . assertTrue ( message not in f3_attachment . messages . all ( ) )
2016-09-20 11:02:15 +02:00
def test_file_name ( self ) :
# type: () -> None
"""
Unicode filenames should be processed correctly .
"""
self . login ( " hamlet@zulip.com " )
for expected in [ " Здравейте.txt " , " test " ] :
fp = StringIO ( " bah! " )
fp . name = urllib . parse . quote ( expected )
result = self . client_post ( " /json/upload_file " , { ' f1 ' : fp } )
content = ujson . loads ( result . content )
assert sanitize_name ( expected ) in content [ ' uri ' ]
2017-03-02 11:17:10 +01:00
def test_upload_size_quote ( self ) :
# type: () -> None
"""
User quote for uploading should not be exceeded
"""
self . login ( " hamlet@zulip.com " )
d1 = StringIO ( " zulip! " )
d1 . name = " dummy_1.txt "
result = self . client_post ( " /json/upload_file " , { ' file ' : d1 } )
json = ujson . loads ( result . content )
uri = json [ " uri " ]
d1_path_id = re . sub ( ' /user_uploads/ ' , ' ' , uri )
d1_attachment = Attachment . objects . get ( path_id = d1_path_id )
self . assert_json_success ( result )
"""
Below we set size quota to the limit without 1 upload ( 1 GB - 11 bytes ) .
"""
d1_attachment . size = UserProfile . DEFAULT_UPLOADS_QUOTA - 11
d1_attachment . save ( )
d2 = StringIO ( " zulip! " )
d2 . name = " dummy_2.txt "
result = self . client_post ( " /json/upload_file " , { ' file ' : d2 } )
self . assert_json_success ( result )
d3 = StringIO ( " zulip! " )
d3 . name = " dummy_3.txt "
result = self . client_post ( " /json/upload_file " , { ' file ' : d3 } )
self . assert_json_error ( result , " Upload would exceed your maximum quota. " )
2016-06-17 19:48:17 +02:00
def test_cross_realm_file_access ( self ) :
# type: () -> None
def create_user ( email ) :
# type: (Text) -> None
self . register ( email , ' test ' )
return get_user_profile_by_email ( email )
user1_email = ' user1@uploadtest.example.com '
user2_email = ' test-og-bot@zulip.com '
user3_email = ' other-user@uploadtest.example.com '
dep = Deployment ( )
dep . base_api_url = " https://zulip.com/api/ "
dep . base_site_url = " https://zulip.com/ "
# We need to save the object before we can access
# the many-to-many relationship 'realms'
dep . save ( )
dep . realms = [ get_realm ( " zulip " ) ]
dep . save ( )
r1 = Realm . objects . create ( string_id = ' uploadtest.example.com ' , invite_required = False )
RealmDomain . objects . create ( realm = r1 , domain = ' uploadtest.example.com ' )
deployment = Deployment . objects . filter ( ) [ 0 ]
deployment . realms . add ( r1 )
create_user ( user1_email )
create_user ( user2_email )
create_user ( user3_email )
# Send a message from @zulip.com -> @uploadtest.example.com
self . login ( user2_email , ' test ' )
fp = StringIO ( " zulip! " )
fp . name = " zulip.txt "
result = self . client_post ( " /json/upload_file " , { ' file ' : fp } )
json = ujson . loads ( result . content )
uri = json [ " uri " ]
fp_path_id = re . sub ( ' /user_uploads/ ' , ' ' , uri )
body = " First message ...[zulip.txt](http://localhost:9991/user_uploads/ " + fp_path_id + " ) "
2017-04-19 04:17:39 +02:00
with self . settings ( CROSS_REALM_BOT_EMAILS = set ( ( user2_email , user3_email ) ) ) :
self . send_message ( user2_email , user1_email , Recipient . PERSONAL , body )
2016-06-17 19:48:17 +02:00
self . login ( user1_email , ' test ' )
response = self . client_get ( uri )
self . assertEqual ( response . status_code , 200 )
data = b " " . join ( response . streaming_content )
self . assertEqual ( b " zulip! " , data )
2017-04-18 03:23:32 +02:00
self . logout ( )
2016-06-17 19:48:17 +02:00
# Confirm other cross-realm users can't read it.
self . login ( user3_email , ' test ' )
response = self . client_get ( uri )
self . assertEqual ( response . status_code , 403 )
self . assert_in_response ( " You are not authorized to view this file. " , response )
def test_file_download_authorization_invite_only ( self ) :
# type: () -> None
subscribed_users = [ " hamlet@zulip.com " , " iago@zulip.com " ]
unsubscribed_users = [ " othello@zulip.com " , " prospero@zulip.com " ]
for user in subscribed_users :
self . subscribe_to_stream ( user , " test-subscribe " )
# Make the stream private
stream = Stream . objects . get ( name = ' test-subscribe ' )
stream . invite_only = True
stream . save ( )
self . login ( " hamlet@zulip.com " )
fp = StringIO ( " zulip! " )
fp . name = " zulip.txt "
result = self . client_post ( " /json/upload_file " , { ' file ' : fp } )
json = ujson . loads ( result . content )
uri = json [ " uri " ]
fp_path_id = re . sub ( ' /user_uploads/ ' , ' ' , uri )
body = " First message ...[zulip.txt](http://localhost:9991/user_uploads/ " + fp_path_id + " ) "
self . send_message ( " hamlet@zulip.com " , " test-subscribe " , Recipient . STREAM , body , " test " )
2017-04-18 03:23:32 +02:00
self . logout ( )
2016-06-17 19:48:17 +02:00
# Subscribed user should be able to view file
for user in subscribed_users :
self . login ( user )
response = self . client_get ( uri )
self . assertEqual ( response . status_code , 200 )
data = b " " . join ( response . streaming_content )
self . assertEqual ( b " zulip! " , data )
2017-04-18 03:23:32 +02:00
self . logout ( )
2016-06-17 19:48:17 +02:00
# Unsubscribed user should not be able to view file
for user in unsubscribed_users :
self . login ( user )
response = self . client_get ( uri )
self . assertEqual ( response . status_code , 403 )
self . assert_in_response ( " You are not authorized to view this file. " , response )
2017-04-18 03:23:32 +02:00
self . logout ( )
2016-06-17 19:48:17 +02:00
def test_file_download_authorization_public ( self ) :
# type: () -> None
subscribed_users = [ " hamlet@zulip.com " , " iago@zulip.com " ]
unsubscribed_users = [ " othello@zulip.com " , " prospero@zulip.com " ]
for user in subscribed_users :
self . subscribe_to_stream ( user , " test-subscribe " )
self . login ( " hamlet@zulip.com " )
fp = StringIO ( " zulip! " )
fp . name = " zulip.txt "
result = self . client_post ( " /json/upload_file " , { ' file ' : fp } )
json = ujson . loads ( result . content )
uri = json [ " uri " ]
fp_path_id = re . sub ( ' /user_uploads/ ' , ' ' , uri )
body = " First message ...[zulip.txt](http://localhost:9991/user_uploads/ " + fp_path_id + " ) "
self . send_message ( " hamlet@zulip.com " , " test-subscribe " , Recipient . STREAM , body , " test " )
2017-04-18 03:23:32 +02:00
self . logout ( )
2016-06-17 19:48:17 +02:00
# Now all users should be able to access the files
for user in subscribed_users + unsubscribed_users :
self . login ( user )
response = self . client_get ( uri )
data = b " " . join ( response . streaming_content )
self . assertEqual ( b " zulip! " , data )
2017-04-18 03:23:32 +02:00
self . logout ( )
2016-06-17 19:48:17 +02:00
2016-04-14 23:44:39 +02:00
def tearDown ( self ) :
2016-06-04 19:54:34 +02:00
# type: () -> None
2016-04-17 23:51:49 +02:00
destroy_uploads ( )
2017-02-16 10:10:37 +01:00
class AvatarTest ( UploadSerializeMixin , ZulipTestCase ) :
2016-04-17 23:51:49 +02:00
2017-03-21 23:53:54 +01:00
def test_avatar_url ( self ) :
# type: () -> None
""" Verifies URL schemes for avatars and realm icons. """
backend = LocalUploadBackend ( ) # type: ZulipUploadBackend
self . assertEqual ( backend . get_avatar_url ( " hash " , False ) ,
" /user_avatars/hash.png?x=x " )
self . assertEqual ( backend . get_avatar_url ( " hash " , True ) ,
" /user_avatars/hash-medium.png?x=x " )
2017-03-22 00:09:10 +01:00
self . assertEqual ( backend . get_realm_icon_url ( 15 , 1 ) ,
" /user_avatars/15/realm/icon.png?version=1 " )
2017-03-21 23:53:54 +01:00
with self . settings ( S3_AVATAR_BUCKET = " bucket " ) :
backend = S3UploadBackend ( )
self . assertEqual ( backend . get_avatar_url ( " hash " , False ) ,
" https://bucket.s3.amazonaws.com/hash?x=x " )
self . assertEqual ( backend . get_avatar_url ( " hash " , True ) ,
" https://bucket.s3.amazonaws.com/hash-medium.png?x=x " )
2017-03-22 00:09:10 +01:00
self . assertEqual ( backend . get_realm_icon_url ( 15 , 1 ) ,
" https://bucket.s3.amazonaws.com/15/realm/icon.png?version=1 " )
2017-03-21 23:53:54 +01:00
2016-04-17 23:51:49 +02:00
def test_multiple_upload_failure ( self ) :
2016-06-04 19:54:34 +02:00
# type: () -> None
2016-04-17 23:51:49 +02:00
"""
Attempting to upload two files should fail .
"""
self . login ( " hamlet@zulip.com " )
2016-12-19 08:48:03 +01:00
with get_test_image_file ( ' img.png ' ) as fp1 , \
get_test_image_file ( ' img.png ' ) as fp2 :
2016-12-21 21:29:46 +01:00
result = self . client_put_multipart ( " /json/users/me/avatar " , { ' f1 ' : fp1 , ' f2 ' : fp2 } )
2016-04-17 23:51:49 +02:00
self . assert_json_error ( result , " You must upload exactly one avatar. " )
def test_no_file_upload_failure ( self ) :
2016-06-04 19:54:34 +02:00
# type: () -> None
2016-04-17 23:51:49 +02:00
"""
Calling this endpoint with no files should fail .
"""
self . login ( " hamlet@zulip.com " )
2016-12-21 21:29:46 +01:00
result = self . client_put_multipart ( " /json/users/me/avatar " )
2016-04-17 23:51:49 +02:00
self . assert_json_error ( result , " You must upload exactly one avatar. " )
2016-04-17 23:57:03 +02:00
correct_files = [
( ' img.png ' , ' png_resized.png ' ) ,
2016-07-26 08:14:49 +02:00
( ' img.jpg ' , None ) , # jpeg resizing is platform-dependent
2016-04-17 23:57:03 +02:00
( ' img.gif ' , ' gif_resized.png ' ) ,
( ' img.tif ' , ' tif_resized.png ' )
]
corrupt_files = [ ' text.txt ' , ' corrupt.png ' , ' corrupt.gif ' ]
2016-07-13 01:56:59 +02:00
def test_get_gravatar_avatar ( self ) :
# type: () -> None
self . login ( " hamlet@zulip.com " )
2017-05-07 17:21:26 +02:00
cordelia = self . example_user ( ' cordelia ' )
2016-07-13 01:56:59 +02:00
cordelia . avatar_source = UserProfile . AVATAR_FROM_GRAVATAR
cordelia . save ( )
with self . settings ( ENABLE_GRAVATAR = True ) :
2016-07-28 00:38:45 +02:00
response = self . client_get ( " /avatar/cordelia@zulip.com?foo=bar " )
2016-07-13 01:56:59 +02:00
redirect_url = response [ ' Location ' ]
self . assertEqual ( redirect_url , avatar_url ( cordelia ) + ' &foo=bar ' )
with self . settings ( ENABLE_GRAVATAR = False ) :
2016-07-28 00:38:45 +02:00
response = self . client_get ( " /avatar/cordelia@zulip.com?foo=bar " )
2016-07-13 01:56:59 +02:00
redirect_url = response [ ' Location ' ]
self . assertTrue ( redirect_url . endswith ( avatar_url ( cordelia ) + ' &foo=bar ' ) )
def test_get_user_avatar ( self ) :
# type: () -> None
self . login ( " hamlet@zulip.com " )
2017-05-07 17:21:26 +02:00
cordelia = self . example_user ( ' cordelia ' )
2016-07-13 01:56:59 +02:00
cordelia . avatar_source = UserProfile . AVATAR_FROM_USER
cordelia . save ( )
2016-07-28 00:38:45 +02:00
response = self . client_get ( " /avatar/cordelia@zulip.com?foo=bar " )
2016-07-13 01:56:59 +02:00
redirect_url = response [ ' Location ' ]
self . assertTrue ( redirect_url . endswith ( avatar_url ( cordelia ) + ' &foo=bar ' ) )
2016-10-24 16:42:43 +02:00
response = self . client_get ( " /avatar/ %s ?foo=bar " % ( cordelia . id ) )
redirect_url = response [ ' Location ' ]
self . assertTrue ( redirect_url . endswith ( avatar_url ( cordelia ) + ' &foo=bar ' ) )
2017-02-23 20:13:56 +01:00
def test_get_user_avatar_medium ( self ) :
# type: () -> None
self . login ( " hamlet@zulip.com " )
2017-05-07 17:21:26 +02:00
cordelia = self . example_user ( ' cordelia ' )
2017-02-23 20:13:56 +01:00
cordelia . avatar_source = UserProfile . AVATAR_FROM_USER
cordelia . save ( )
response = self . client_get ( " /avatar/cordelia@zulip.com/medium?foo=bar " )
redirect_url = response [ ' Location ' ]
self . assertTrue ( redirect_url . endswith ( avatar_url ( cordelia , True ) + ' &foo=bar ' ) )
response = self . client_get ( " /avatar/ %s /medium?foo=bar " % ( cordelia . id , ) )
redirect_url = response [ ' Location ' ]
self . assertTrue ( redirect_url . endswith ( avatar_url ( cordelia , True ) + ' &foo=bar ' ) )
2016-07-13 01:56:59 +02:00
def test_non_valid_user_avatar ( self ) :
# type: () -> None
# It's debatable whether we should generate avatars for non-users,
# but this test just validates the current code's behavior.
self . login ( " hamlet@zulip.com " )
2016-07-28 00:38:45 +02:00
response = self . client_get ( " /avatar/nonexistent_user@zulip.com?foo=bar " )
2016-07-13 01:56:59 +02:00
redirect_url = response [ ' Location ' ]
2017-02-16 22:35:57 +01:00
actual_url = ' https://secure.gravatar.com/avatar/444258b521f152129eb0c162996e572d?d=identicon&version=1&foo=bar '
2016-07-13 01:56:59 +02:00
self . assertEqual ( redirect_url , actual_url )
2016-04-17 23:57:03 +02:00
def test_valid_avatars ( self ) :
2016-06-04 19:54:34 +02:00
# type: () -> None
2016-04-17 23:57:03 +02:00
"""
2016-12-21 18:34:03 +01:00
A PUT request to / json / users / me / avatar with a valid file should return a url and actually create an avatar .
2016-04-17 23:57:03 +02:00
"""
2017-01-28 19:05:20 +01:00
version = 2
2016-04-17 23:57:03 +02:00
for fname , rfname in self . correct_files :
# TODO: use self.subTest once we're exclusively on python 3 by uncommenting the line below.
# with self.subTest(fname=fname):
self . login ( " hamlet@zulip.com " )
2016-12-19 08:48:03 +01:00
with get_test_image_file ( fname ) as fp :
2016-12-21 21:29:46 +01:00
result = self . client_put_multipart ( " /json/users/me/avatar " , { ' file ' : fp } )
2016-04-17 23:57:03 +02:00
self . assert_json_success ( result )
json = ujson . loads ( result . content )
self . assertIn ( " avatar_url " , json )
url = json [ " avatar_url " ]
base = ' /user_avatars/ '
2016-12-16 02:01:34 +01:00
self . assertEqual ( base , url [ : len ( base ) ] )
2016-04-17 23:57:03 +02:00
2016-07-26 08:14:49 +02:00
if rfname is not None :
2016-07-28 00:38:45 +02:00
response = self . client_get ( url )
2016-07-26 08:14:49 +02:00
data = b " " . join ( response . streaming_content )
2016-12-16 02:01:34 +01:00
self . assertEqual ( Image . open ( io . BytesIO ( data ) ) . size , ( 100 , 100 ) )
2016-04-17 23:57:03 +02:00
2016-09-20 21:48:48 +02:00
# Verify that the medium-size avatar was created
2017-05-07 17:21:26 +02:00
user_profile = self . example_user ( ' hamlet ' )
2016-12-19 08:48:03 +01:00
medium_avatar_disk_path = avatar_disk_path ( user_profile , medium = True )
2016-09-20 21:48:48 +02:00
self . assertTrue ( os . path . exists ( medium_avatar_disk_path ) )
# Confirm that ensure_medium_avatar_url works to recreate
# medium size avatars from the original if needed
os . remove ( medium_avatar_disk_path )
self . assertFalse ( os . path . exists ( medium_avatar_disk_path ) )
zerver . lib . upload . upload_backend . ensure_medium_avatar_image ( user_profile . email )
self . assertTrue ( os . path . exists ( medium_avatar_disk_path ) )
2017-01-28 19:05:20 +01:00
# Verify whether the avatar_version gets incremented with every new upload
self . assertEqual ( user_profile . avatar_version , version )
version + = 1
2016-04-17 23:57:03 +02:00
def test_invalid_avatars ( self ) :
2016-06-04 19:54:34 +02:00
# type: () -> None
2016-04-17 23:57:03 +02:00
"""
2016-12-21 18:34:03 +01:00
A PUT request to / json / users / me / avatar with an invalid file should fail .
2016-04-17 23:57:03 +02:00
"""
for fname in self . corrupt_files :
# with self.subTest(fname=fname):
self . login ( " hamlet@zulip.com " )
2016-12-19 08:48:03 +01:00
with get_test_image_file ( fname ) as fp :
2016-12-21 21:29:46 +01:00
result = self . client_put_multipart ( " /json/users/me/avatar " , { ' file ' : fp } )
2016-04-17 23:57:03 +02:00
2017-02-26 20:17:34 +01:00
self . assert_json_error ( result , " Could not decode image; did you upload an image file? " )
2017-05-07 17:21:26 +02:00
user_profile = self . example_user ( ' hamlet ' )
2017-01-28 19:05:20 +01:00
self . assertEqual ( user_profile . avatar_version , 1 )
2016-04-17 23:57:03 +02:00
2016-12-21 18:34:03 +01:00
def test_delete_avatar ( self ) :
# type: () -> None
"""
A DELETE request to / json / users / me / avatar should delete the user avatar and return gravatar URL
"""
self . login ( " hamlet@zulip.com " )
2017-05-07 17:21:26 +02:00
hamlet = self . example_user ( ' hamlet ' )
2016-12-21 18:34:03 +01:00
hamlet . avatar_source = UserProfile . AVATAR_FROM_USER
hamlet . save ( )
result = self . client_delete ( " /json/users/me/avatar " )
2017-05-07 17:21:26 +02:00
user_profile = self . example_user ( ' hamlet ' )
2016-12-21 18:34:03 +01:00
self . assert_json_success ( result )
json = ujson . loads ( result . content )
self . assertIn ( " avatar_url " , json )
self . assertEqual ( json [ " avatar_url " ] , avatar_url ( user_profile ) )
self . assertEqual ( user_profile . avatar_source , UserProfile . AVATAR_FROM_GRAVATAR )
2017-01-28 19:05:20 +01:00
self . assertEqual ( user_profile . avatar_version , 2 )
2016-12-21 18:34:03 +01:00
2017-03-06 06:22:28 +01:00
def test_avatar_upload_file_size_error ( self ) :
# type: () -> None
self . login ( " hamlet@zulip.com " )
with get_test_image_file ( self . correct_files [ 0 ] [ 0 ] ) as fp :
with self . settings ( MAX_AVATAR_FILE_SIZE = 0 ) :
result = self . client_put_multipart ( " /json/users/me/avatar " , { ' file ' : fp } )
self . assert_json_error ( result , " Uploaded file is larger than the allowed limit of 0 MB " )
2016-04-17 23:51:49 +02:00
def tearDown ( self ) :
2016-06-04 19:54:34 +02:00
# type: () -> None
2016-04-17 23:51:49 +02:00
destroy_uploads ( )
2016-04-14 23:44:39 +02:00
2017-02-21 03:41:20 +01:00
class RealmIconTest ( UploadSerializeMixin , ZulipTestCase ) :
def test_multiple_upload_failure ( self ) :
# type: () -> None
"""
Attempting to upload two files should fail .
"""
# Log in as admin
self . login ( " iago@zulip.com " )
with get_test_image_file ( ' img.png ' ) as fp1 , \
get_test_image_file ( ' img.png ' ) as fp2 :
result = self . client_put_multipart ( " /json/realm/icon " , { ' f1 ' : fp1 , ' f2 ' : fp2 } )
self . assert_json_error ( result , " You must upload exactly one icon. " )
def test_no_file_upload_failure ( self ) :
# type: () -> None
"""
Calling this endpoint with no files should fail .
"""
self . login ( " iago@zulip.com " )
result = self . client_put_multipart ( " /json/realm/icon " )
self . assert_json_error ( result , " You must upload exactly one icon. " )
correct_files = [
( ' img.png ' , ' png_resized.png ' ) ,
( ' img.jpg ' , None ) , # jpeg resizing is platform-dependent
( ' img.gif ' , ' gif_resized.png ' ) ,
( ' img.tif ' , ' tif_resized.png ' )
]
corrupt_files = [ ' text.txt ' , ' corrupt.png ' , ' corrupt.gif ' ]
def test_no_admin_user_upload ( self ) :
# type: () -> None
self . login ( " hamlet@zulip.com " )
with get_test_image_file ( self . correct_files [ 0 ] [ 0 ] ) as fp :
result = self . client_put_multipart ( " /json/realm/icon " , { ' file ' : fp } )
self . assert_json_error ( result , ' Must be a realm administrator ' )
def test_get_gravatar_icon ( self ) :
# type: () -> None
self . login ( " hamlet@zulip.com " )
realm = get_realm ( ' zulip ' )
realm . icon_source = Realm . ICON_FROM_GRAVATAR
realm . save ( )
with self . settings ( ENABLE_GRAVATAR = True ) :
response = self . client_get ( " /json/realm/icon?foo=bar " )
redirect_url = response [ ' Location ' ]
self . assertEqual ( redirect_url , realm_icon_url ( realm ) + ' &foo=bar ' )
with self . settings ( ENABLE_GRAVATAR = False ) :
response = self . client_get ( " /json/realm/icon?foo=bar " )
redirect_url = response [ ' Location ' ]
self . assertTrue ( redirect_url . endswith ( realm_icon_url ( realm ) + ' &foo=bar ' ) )
def test_get_realm_icon ( self ) :
# type: () -> None
self . login ( " hamlet@zulip.com " )
realm = get_realm ( ' zulip ' )
realm . icon_source = Realm . ICON_UPLOADED
realm . save ( )
response = self . client_get ( " /json/realm/icon?foo=bar " )
redirect_url = response [ ' Location ' ]
self . assertTrue ( redirect_url . endswith ( realm_icon_url ( realm ) + ' &foo=bar ' ) )
def test_valid_icons ( self ) :
# type: () -> None
"""
A PUT request to / json / realm / icon with a valid file should return a url
and actually create an realm icon .
"""
for fname , rfname in self . correct_files :
# TODO: use self.subTest once we're exclusively on python 3 by uncommenting the line below.
# with self.subTest(fname=fname):
self . login ( " iago@zulip.com " )
with get_test_image_file ( fname ) as fp :
result = self . client_put_multipart ( " /json/realm/icon " , { ' file ' : fp } )
realm = get_realm ( ' zulip ' )
self . assert_json_success ( result )
json = ujson . loads ( result . content )
self . assertIn ( " icon_url " , json )
url = json [ " icon_url " ]
base = ' /user_avatars/ %s /realm/icon.png ' % ( realm . id , )
self . assertEqual ( base , url [ : len ( base ) ] )
if rfname is not None :
response = self . client_get ( url )
data = b " " . join ( response . streaming_content )
self . assertEqual ( Image . open ( io . BytesIO ( data ) ) . size , ( 100 , 100 ) )
def test_invalid_icons ( self ) :
# type: () -> None
"""
A PUT request to / json / realm / icon with an invalid file should fail .
"""
for fname in self . corrupt_files :
# with self.subTest(fname=fname):
self . login ( " iago@zulip.com " )
with get_test_image_file ( fname ) as fp :
result = self . client_put_multipart ( " /json/realm/icon " , { ' file ' : fp } )
self . assert_json_error ( result , " Could not decode image; did you upload an image file? " )
def test_delete_icon ( self ) :
# type: () -> None
"""
A DELETE request to / json / realm / icon should delete the realm icon and return gravatar URL
"""
self . login ( " iago@zulip.com " )
realm = get_realm ( ' zulip ' )
realm . icon_source = Realm . ICON_UPLOADED
realm . save ( )
result = self . client_delete ( " /json/realm/icon " )
self . assert_json_success ( result )
json = ujson . loads ( result . content )
self . assertIn ( " icon_url " , json )
realm = get_realm ( ' zulip ' )
self . assertEqual ( json [ " icon_url " ] , realm_icon_url ( realm ) )
self . assertEqual ( realm . icon_source , Realm . ICON_FROM_GRAVATAR )
def test_realm_icon_version ( self ) :
# type: () -> None
self . login ( " iago@zulip.com " )
realm = get_realm ( ' zulip ' )
icon_version = realm . icon_version
self . assertEqual ( icon_version , 1 )
with get_test_image_file ( self . correct_files [ 0 ] [ 0 ] ) as fp :
self . client_put_multipart ( " /json/realm/icon " , { ' file ' : fp } )
realm = get_realm ( ' zulip ' )
self . assertEqual ( realm . icon_version , icon_version + 1 )
2017-03-06 06:22:28 +01:00
def test_realm_icon_upload_file_size_error ( self ) :
# type: () -> None
self . login ( " iago@zulip.com " )
with get_test_image_file ( self . correct_files [ 0 ] [ 0 ] ) as fp :
with self . settings ( MAX_ICON_FILE_SIZE = 0 ) :
result = self . client_put_multipart ( " /json/realm/icon " , { ' file ' : fp } )
self . assert_json_error ( result , " Uploaded file is larger than the allowed limit of 0 MB " )
2017-02-21 03:41:20 +01:00
def tearDown ( self ) :
# type: () -> None
destroy_uploads ( )
2017-02-16 10:10:37 +01:00
class LocalStorageTest ( UploadSerializeMixin , ZulipTestCase ) :
2016-03-24 20:24:01 +01:00
2016-04-20 21:50:56 +02:00
def test_file_upload_local ( self ) :
2016-06-04 19:54:34 +02:00
# type: () -> None
2017-05-07 21:25:59 +02:00
user_profile = self . example_user ( ' hamlet ' )
2017-02-26 11:03:45 +01:00
uri = upload_message_image ( u ' dummy.txt ' , len ( b ' zulip! ' ) , u ' text/plain ' , b ' zulip! ' , user_profile )
2016-04-20 21:50:56 +02:00
base = ' /user_uploads/ '
2016-12-16 02:01:34 +01:00
self . assertEqual ( base , uri [ : len ( base ) ] )
2016-04-20 21:50:56 +02:00
path_id = re . sub ( ' /user_uploads/ ' , ' ' , uri )
file_path = os . path . join ( settings . LOCAL_UPLOADS_DIR , ' files ' , path_id )
self . assertTrue ( os . path . isfile ( file_path ) )
2017-02-26 11:03:45 +01:00
uploaded_file = Attachment . objects . get ( owner = user_profile , path_id = path_id )
self . assertEqual ( len ( b ' zulip! ' ) , uploaded_file . size )
2016-04-20 21:50:56 +02:00
def test_delete_message_image_local ( self ) :
2016-06-04 19:54:34 +02:00
# type: () -> None
2016-04-20 21:50:56 +02:00
self . login ( " hamlet@zulip.com " )
fp = StringIO ( " zulip! " )
fp . name = " zulip.txt "
2016-07-28 00:30:22 +02:00
result = self . client_post ( " /json/upload_file " , { ' file ' : fp } )
2016-04-20 21:50:56 +02:00
json = ujson . loads ( result . content )
uri = json [ " uri " ]
path_id = re . sub ( ' /user_uploads/ ' , ' ' , uri )
2016-06-09 07:53:35 +02:00
self . assertTrue ( delete_message_image ( path_id ) )
2016-04-20 21:50:56 +02:00
def tearDown ( self ) :
2016-06-04 19:54:34 +02:00
# type: () -> None
2016-04-20 21:50:56 +02:00
destroy_uploads ( )
2016-03-24 20:24:01 +01:00
2016-10-11 19:38:22 +02:00
FuncT = TypeVar ( ' FuncT ' , bound = Callable [ . . . , None ] )
2016-06-09 07:53:35 +02:00
def use_s3_backend ( method ) :
2016-10-11 19:38:22 +02:00
# type: (FuncT) -> FuncT
2016-06-09 07:53:35 +02:00
@mock_s3
@override_settings ( LOCAL_UPLOADS_DIR = None )
def new_method ( * args , * * kwargs ) :
2016-10-11 19:38:22 +02:00
# type: (*Any, **Any) -> Any
2016-06-09 07:53:35 +02:00
zerver . lib . upload . upload_backend = S3UploadBackend ( )
try :
return method ( * args , * * kwargs )
finally :
zerver . lib . upload . upload_backend = LocalUploadBackend ( )
return new_method
2016-08-23 02:08:42 +02:00
class S3Test ( ZulipTestCase ) :
2016-04-14 16:26:01 +02:00
2016-06-09 07:53:35 +02:00
@use_s3_backend
2016-04-20 21:51:21 +02:00
def test_file_upload_s3 ( self ) :
2016-06-04 19:54:34 +02:00
# type: () -> None
2016-04-20 21:51:21 +02:00
conn = S3Connection ( settings . S3_KEY , settings . S3_SECRET_KEY )
bucket = conn . create_bucket ( settings . S3_AUTH_UPLOADS_BUCKET )
2017-05-07 21:25:59 +02:00
user_profile = self . example_user ( ' hamlet ' )
2017-02-26 11:03:45 +01:00
uri = upload_message_image ( u ' dummy.txt ' , len ( b ' zulip! ' ) , u ' text/plain ' , b ' zulip! ' , user_profile )
2016-04-20 21:51:21 +02:00
base = ' /user_uploads/ '
2016-12-16 02:01:34 +01:00
self . assertEqual ( base , uri [ : len ( base ) ] )
2016-04-20 21:51:21 +02:00
path_id = re . sub ( ' /user_uploads/ ' , ' ' , uri )
2017-02-26 11:03:45 +01:00
content = bucket . get_key ( path_id ) . get_contents_as_string ( )
self . assertEqual ( b " zulip! " , content )
uploaded_file = Attachment . objects . get ( owner = user_profile , path_id = path_id )
self . assertEqual ( len ( b " zulip! " ) , uploaded_file . size )
2016-04-20 21:51:21 +02:00
2016-06-14 04:38:30 +02:00
self . subscribe_to_stream ( " hamlet@zulip.com " , " Denmark " )
body = " First message ...[zulip.txt](http://localhost:9991 " + uri + " ) "
self . send_message ( " hamlet@zulip.com " , " Denmark " , Recipient . STREAM , body , " test " )
self . assertIn ( ' title= " dummy.txt " ' , self . get_last_message ( ) . rendered_content )
2016-06-09 07:53:35 +02:00
@use_s3_backend
2016-04-20 21:51:21 +02:00
def test_message_image_delete_s3 ( self ) :
2016-06-04 19:54:34 +02:00
# type: () -> None
2016-04-20 21:51:21 +02:00
conn = S3Connection ( settings . S3_KEY , settings . S3_SECRET_KEY )
conn . create_bucket ( settings . S3_AUTH_UPLOADS_BUCKET )
2017-05-07 21:25:59 +02:00
user_profile = self . example_user ( ' hamlet ' )
2017-02-26 11:03:45 +01:00
uri = upload_message_image ( u ' dummy.txt ' , len ( b ' zulip! ' ) , u ' text/plain ' , b ' zulip! ' , user_profile )
2016-04-20 21:51:21 +02:00
path_id = re . sub ( ' /user_uploads/ ' , ' ' , uri )
2016-06-09 07:53:35 +02:00
self . assertTrue ( delete_message_image ( path_id ) )
2016-04-20 21:51:21 +02:00
2016-06-09 07:53:35 +02:00
@use_s3_backend
2016-04-14 16:26:01 +02:00
def test_file_upload_authed ( self ) :
2016-06-04 19:54:34 +02:00
# type: () -> None
2016-04-14 16:26:01 +02:00
"""
A call to / json / upload_file should return a uri and actually create an object .
"""
2016-06-09 07:53:35 +02:00
conn = S3Connection ( settings . S3_KEY , settings . S3_SECRET_KEY )
conn . create_bucket ( settings . S3_AUTH_UPLOADS_BUCKET )
2016-04-14 16:26:01 +02:00
self . login ( " hamlet@zulip.com " )
fp = StringIO ( " zulip! " )
fp . name = " zulip.txt "
2016-07-28 00:30:22 +02:00
result = self . client_post ( " /json/upload_file " , { ' file ' : fp } )
2016-04-14 16:26:01 +02:00
self . assert_json_success ( result )
json = ujson . loads ( result . content )
self . assertIn ( " uri " , json )
uri = json [ " uri " ]
base = ' /user_uploads/ '
2016-12-16 02:01:34 +01:00
self . assertEqual ( base , uri [ : len ( base ) ] )
2016-04-14 16:26:01 +02:00
2016-07-28 00:38:45 +02:00
response = self . client_get ( uri )
2016-04-14 16:26:01 +02:00
redirect_url = response [ ' Location ' ]
2016-12-16 02:01:34 +01:00
self . assertEqual ( b " zulip! " , urllib . request . urlopen ( redirect_url ) . read ( ) . strip ( ) ) # type: ignore # six.moves.urllib.request.urlopen is not defined in typeshed
2016-04-14 16:26:01 +02:00
2016-06-14 04:38:30 +02:00
self . subscribe_to_stream ( " hamlet@zulip.com " , " Denmark " )
body = " First message ...[zulip.txt](http://localhost:9991 " + uri + " ) "
self . send_message ( " hamlet@zulip.com " , " Denmark " , Recipient . STREAM , body , " test " )
self . assertIn ( ' title= " zulip.txt " ' , self . get_last_message ( ) . rendered_content )
class UploadTitleTests ( TestCase ) :
def test_upload_titles ( self ) :
# type: () -> None
self . assertEqual ( url_filename ( " http://localhost:9991/user_uploads/1/LUeQZUG5jxkagzVzp1Ox_amr/dummy.txt " ) , " dummy.txt " )
self . assertEqual ( url_filename ( " http://localhost:9991/user_uploads/1/94/SzGYe0RFT-tEcOhQ6n-ZblFZ/zulip.txt " ) , " zulip.txt " )
self . assertEqual ( url_filename ( " https://zulip.com/user_uploads/4142/LUeQZUG5jxkagzVzp1Ox_amr/pasted_image.png " ) , " pasted_image.png " )
2016-08-23 02:25:40 +02:00
self . assertEqual ( url_filename ( " https://zulipchat.com/integrations " ) , " https://zulipchat.com/integrations " )
2016-06-14 04:38:30 +02:00
self . assertEqual ( url_filename ( " https://example.com " ) , " https://example.com " )
2016-04-14 16:26:01 +02:00
class SanitizeNameTests ( TestCase ) :
def test_file_name ( self ) :
2016-06-04 19:54:34 +02:00
# type: () -> None
2016-12-16 02:01:34 +01:00
self . assertEqual ( sanitize_name ( u ' test.txt ' ) , u ' test.txt ' )
self . assertEqual ( sanitize_name ( u ' .hidden ' ) , u ' .hidden ' )
self . assertEqual ( sanitize_name ( u ' .hidden.txt ' ) , u ' .hidden.txt ' )
self . assertEqual ( sanitize_name ( u ' tarball.tar.gz ' ) , u ' tarball.tar.gz ' )
self . assertEqual ( sanitize_name ( u ' .hidden_tarball.tar.gz ' ) , u ' .hidden_tarball.tar.gz ' )
self . assertEqual ( sanitize_name ( u ' Testing {} *&*#().ta&& % $##&&r.gz ' ) , u ' Testing.tar.gz ' )
self . assertEqual ( sanitize_name ( u ' *testingfile?*.txt ' ) , u ' testingfile.txt ' )
self . assertEqual ( sanitize_name ( u ' snowman☃.txt ' ) , u ' snowman.txt ' )
self . assertEqual ( sanitize_name ( u ' 테스트.txt ' ) , u ' 테스트.txt ' )
self . assertEqual ( sanitize_name ( u ' ~/. " \ ` \ ?* " u0`000ssh/test.t** {} ar.gz ' ) , u ' .u0000sshtest.tar.gz ' )