2016-04-14 16:26:01 +02:00
# -*- coding: utf-8 -*-
from __future__ import absolute_import
from django . conf import settings
2016-06-09 07:53:35 +02:00
from django . test import TestCase , override_settings
2016-04-14 16:26:01 +02:00
from unittest import skip
2016-07-13 01:56:59 +02:00
from zerver . lib . avatar import avatar_url
2016-06-14 04:38:30 +02:00
from zerver . lib . bugdown import url_filename
2016-11-10 19:30:09 +01:00
from zerver . lib . test_classes import ZulipTestCase
2016-04-14 16:26:01 +02:00
from zerver . lib . test_runner import slow
2016-06-09 07:53:35 +02:00
from zerver . lib . upload import sanitize_name , S3UploadBackend , \
upload_message_image , delete_message_image , LocalUploadBackend
import zerver . lib . upload
2016-03-24 20:24:01 +01:00
from zerver . models import Attachment , Recipient , get_user_profile_by_email , \
2016-07-13 01:56:59 +02:00
get_old_unclaimed_attachments , Message , UserProfile
2016-03-24 20:24:01 +01:00
from zerver . lib . actions import do_delete_old_unclaimed_attachments
2016-04-14 16:26:01 +02:00
import ujson
from six . moves import urllib
2016-11-07 05:02:13 +01:00
from PIL import Image
2016-04-14 16:26:01 +02:00
from boto . s3 . connection import S3Connection
from boto . s3 . key import Key
2016-10-11 19:38:22 +02:00
from six . moves import StringIO as _StringIO
2016-09-16 17:11:54 +02:00
import mock
2016-04-14 23:44:39 +02:00
import os
2016-11-07 05:02:13 +01:00
import io
2016-04-14 23:44:39 +02:00
import shutil
2016-03-24 20:24:01 +01:00
import re
import datetime
2016-06-25 11:05:59 +02:00
import requests
import base64
2016-03-24 20:24:01 +01:00
from datetime import timedelta
from django . utils import timezone
from moto import mock_s3
2016-04-14 16:26:01 +02:00
2016-04-17 23:51:49 +02:00
TEST_AVATAR_DIR = os . path . join ( os . path . dirname ( __file__ ) , ' images ' )
2016-12-04 18:38:56 +01:00
from typing import Any , Callable , TypeVar , Text
2016-10-11 19:38:22 +02:00
2016-04-17 23:51:49 +02:00
def destroy_uploads ( ) :
2016-06-04 19:54:34 +02:00
# type: () -> None
2016-04-17 23:51:49 +02:00
if os . path . exists ( settings . LOCAL_UPLOADS_DIR ) :
shutil . rmtree ( settings . LOCAL_UPLOADS_DIR )
2016-10-11 19:38:22 +02:00
class StringIO ( _StringIO ) :
name = ' ' # https://github.com/python/typeshed/issues/598
2016-08-23 02:08:42 +02:00
class FileUploadTest ( ZulipTestCase ) :
2016-06-25 11:05:59 +02:00
def test_rest_endpoint ( self ) :
# type: () -> None
"""
Tests the / api / v1 / user_uploads api endpoint . Here a single file is uploaded
and downloaded using a username and api_key
"""
fp = StringIO ( " zulip! " )
fp . name = " zulip.txt "
# Upload file via API
auth_headers = self . api_auth ( ' hamlet@zulip.com ' )
2016-07-28 00:30:22 +02:00
result = self . client_post ( ' /api/v1/user_uploads ' , { ' file ' : fp } , * * auth_headers )
2016-06-25 11:05:59 +02:00
json = ujson . loads ( result . content )
self . assertIn ( " uri " , json )
uri = json [ " uri " ]
base = ' /user_uploads/ '
self . assertEquals ( base , uri [ : len ( base ) ] )
2016-06-27 16:41:58 +02:00
# Download file via API
2016-07-28 00:30:22 +02:00
self . client_post ( ' /accounts/logout/ ' )
2016-07-28 00:38:45 +02:00
response = self . client_get ( uri , * * auth_headers )
2016-07-13 22:09:27 +02:00
data = b " " . join ( response . streaming_content )
self . assertEquals ( b " zulip! " , data )
2016-06-27 16:41:58 +02:00
2016-06-25 11:05:59 +02:00
# Files uploaded through the API should be accesible via the web client
self . login ( " hamlet@zulip.com " )
2016-07-28 00:38:45 +02:00
response = self . client_get ( uri )
2016-07-13 22:09:27 +02:00
data = b " " . join ( response . streaming_content )
self . assertEquals ( b " zulip! " , data )
2016-06-25 11:05:59 +02:00
2016-09-16 16:41:04 +02:00
def test_file_too_big_failure ( self ) :
# type: () -> None
"""
Attempting to upload big files should fail .
"""
self . login ( " hamlet@zulip.com " )
fp = StringIO ( " bah! " )
fp . name = " a.txt "
# Use MAX_FILE_UPLOAD_SIZE of 0, because the next increment
# would be 1MB.
with self . settings ( MAX_FILE_UPLOAD_SIZE = 0 ) :
result = self . client_post ( " /json/upload_file " , { ' f1 ' : fp } )
self . assert_json_error ( result , ' File Upload is larger than allowed limit ' )
2016-04-14 16:26:01 +02:00
def test_multiple_upload_failure ( self ) :
2016-06-04 19:54:34 +02:00
# type: () -> None
2016-04-14 16:26:01 +02:00
"""
Attempting to upload two files should fail .
"""
self . login ( " hamlet@zulip.com " )
fp = StringIO ( " bah! " )
fp . name = " a.txt "
fp2 = StringIO ( " pshaw! " )
fp2 . name = " b.txt "
2016-07-28 00:30:22 +02:00
result = self . client_post ( " /json/upload_file " , { ' f1 ' : fp , ' f2 ' : fp2 } )
2016-04-14 16:26:01 +02:00
self . assert_json_error ( result , " You may only upload one file at a time " )
def test_no_file_upload_failure ( self ) :
2016-06-04 19:54:34 +02:00
# type: () -> None
2016-04-14 16:26:01 +02:00
"""
Calling this endpoint with no files should fail .
"""
self . login ( " hamlet@zulip.com " )
2016-07-28 00:30:22 +02:00
result = self . client_post ( " /json/upload_file " )
2016-04-14 16:26:01 +02:00
self . assert_json_error ( result , " You must specify a file to upload " )
2016-09-16 17:11:54 +02:00
def test_download_non_existent_file ( self ) :
# type: () -> None
self . login ( " hamlet@zulip.com " )
response = self . client_get ( ' /user_uploads/unk/nonexistent_file ' )
self . assertEquals ( response . status_code , 404 )
self . assertIn ( ' File not found ' , str ( response . content ) )
2016-09-16 17:32:55 +02:00
def test_serve_s3_error_handling ( self ) :
# type: () -> None
self . login ( " hamlet@zulip.com " )
use_s3 = lambda : self . settings ( LOCAL_UPLOADS_DIR = None )
getting_realm_id = lambda realm_id : mock . patch (
' zerver.views.upload.get_realm_for_filename ' ,
return_value = realm_id
)
# nonexistent_file
with use_s3 ( ) , getting_realm_id ( None ) :
response = self . client_get ( ' /user_uploads/unk/nonexistent_file ' )
self . assertEquals ( response . status_code , 404 )
self . assertIn ( ' File not found ' , str ( response . content ) )
# invalid realm of 999999 (for non-zulip.com)
user = get_user_profile_by_email ( ' hamlet@zulip.com ' )
user . realm . domain = ' example.com '
user . realm . save ( )
with use_s3 ( ) , getting_realm_id ( 999999 ) :
response = self . client_get ( ' /user_uploads/unk/whatever ' )
self . assertEquals ( response . status_code , 403 )
2016-03-24 20:24:01 +01:00
# This test will go through the code path for uploading files onto LOCAL storage
# when zulip is in DEVELOPMENT mode.
2016-04-14 23:44:39 +02:00
def test_file_upload_authed ( self ) :
2016-06-04 19:54:34 +02:00
# type: () -> None
2016-04-14 23:44:39 +02:00
"""
2016-03-24 20:24:01 +01:00
A call to / json / upload_file should return a uri and actually create an
entry in the database . This entry will be marked unclaimed till a message
refers it .
2016-04-14 23:44:39 +02:00
"""
self . login ( " hamlet@zulip.com " )
fp = StringIO ( " zulip! " )
fp . name = " zulip.txt "
2016-07-28 00:30:22 +02:00
result = self . client_post ( " /json/upload_file " , { ' file ' : fp } )
2016-04-14 23:44:39 +02:00
self . assert_json_success ( result )
json = ujson . loads ( result . content )
self . assertIn ( " uri " , json )
uri = json [ " uri " ]
base = ' /user_uploads/ '
self . assertEquals ( base , uri [ : len ( base ) ] )
2016-03-24 20:24:01 +01:00
# In the future, local file requests will follow the same style as S3
# requests; they will be first authenthicated and redirected
2016-07-28 00:38:45 +02:00
response = self . client_get ( uri )
2016-07-13 22:09:27 +02:00
data = b " " . join ( response . streaming_content )
self . assertEquals ( b " zulip! " , data )
2016-04-14 23:44:39 +02:00
2016-03-24 20:24:01 +01:00
# check if DB has attachment marked as unclaimed
entry = Attachment . objects . get ( file_name = ' zulip.txt ' )
self . assertEquals ( entry . is_claimed ( ) , False )
2016-06-14 04:38:30 +02:00
self . subscribe_to_stream ( " hamlet@zulip.com " , " Denmark " )
body = " First message ...[zulip.txt](http://localhost:9991 " + uri + " ) "
self . send_message ( " hamlet@zulip.com " , " Denmark " , Recipient . STREAM , body , " test " )
self . assertIn ( ' title= " zulip.txt " ' , self . get_last_message ( ) . rendered_content )
2016-06-17 19:48:17 +02:00
def test_delete_old_unclaimed_attachments ( self ) :
# type: () -> None
2016-06-27 21:09:56 +02:00
2016-03-24 20:24:01 +01:00
# Upload some files and make them older than a weeek
self . login ( " hamlet@zulip.com " )
d1 = StringIO ( " zulip! " )
d1 . name = " dummy_1.txt "
2016-07-28 00:30:22 +02:00
result = self . client_post ( " /json/upload_file " , { ' file ' : d1 } )
2016-03-24 20:24:01 +01:00
json = ujson . loads ( result . content )
uri = json [ " uri " ]
d1_path_id = re . sub ( ' /user_uploads/ ' , ' ' , uri )
d2 = StringIO ( " zulip! " )
d2 . name = " dummy_2.txt "
2016-07-28 00:30:22 +02:00
result = self . client_post ( " /json/upload_file " , { ' file ' : d2 } )
2016-03-24 20:24:01 +01:00
json = ujson . loads ( result . content )
uri = json [ " uri " ]
d2_path_id = re . sub ( ' /user_uploads/ ' , ' ' , uri )
two_week_ago = timezone . now ( ) - datetime . timedelta ( weeks = 2 )
d1_attachment = Attachment . objects . get ( path_id = d1_path_id )
d1_attachment . create_time = two_week_ago
d1_attachment . save ( )
2016-10-04 00:41:07 +02:00
self . assertEqual ( str ( d1_attachment ) , u ' <Attachment: dummy_1.txt> ' )
2016-03-24 20:24:01 +01:00
d2_attachment = Attachment . objects . get ( path_id = d2_path_id )
d2_attachment . create_time = two_week_ago
d2_attachment . save ( )
# Send message refering only dummy_1
self . subscribe_to_stream ( " hamlet@zulip.com " , " Denmark " )
body = " Some files here ...[zulip.txt](http://localhost:9991/user_uploads/ " + d1_path_id + " ) "
self . send_message ( " hamlet@zulip.com " , " Denmark " , Recipient . STREAM , body , " test " )
# dummy_2 should not exist in database or the uploads folder
do_delete_old_unclaimed_attachments ( 2 )
self . assertTrue ( not Attachment . objects . filter ( path_id = d2_path_id ) . exists ( ) )
self . assertTrue ( not delete_message_image ( d2_path_id ) )
def test_multiple_claim_attachments ( self ) :
2016-06-04 19:54:34 +02:00
# type: () -> None
2016-03-24 20:24:01 +01:00
"""
This test tries to claim the same attachment twice . The messages field in
the Attachment model should have both the messages in its entry .
"""
self . login ( " hamlet@zulip.com " )
d1 = StringIO ( " zulip! " )
d1 . name = " dummy_1.txt "
2016-07-28 00:30:22 +02:00
result = self . client_post ( " /json/upload_file " , { ' file ' : d1 } )
2016-03-24 20:24:01 +01:00
json = ujson . loads ( result . content )
uri = json [ " uri " ]
d1_path_id = re . sub ( ' /user_uploads/ ' , ' ' , uri )
self . subscribe_to_stream ( " hamlet@zulip.com " , " Denmark " )
body = " First message ...[zulip.txt](http://localhost:9991/user_uploads/ " + d1_path_id + " ) "
self . send_message ( " hamlet@zulip.com " , " Denmark " , Recipient . STREAM , body , " test " )
body = " Second message ...[zulip.txt](http://localhost:9991/user_uploads/ " + d1_path_id + " ) "
self . send_message ( " hamlet@zulip.com " , " Denmark " , Recipient . STREAM , body , " test " )
self . assertEquals ( Attachment . objects . get ( path_id = d1_path_id ) . messages . count ( ) , 2 )
2016-07-07 09:47:15 +02:00
def test_check_attachment_reference_update ( self ) :
2016-10-11 19:38:22 +02:00
# type: () -> None
2016-07-07 09:47:15 +02:00
f1 = StringIO ( " file1 " )
f1 . name = " file1.txt "
f2 = StringIO ( " file2 " )
f2 . name = " file2.txt "
f3 = StringIO ( " file3 " )
f3 . name = " file3.txt "
self . login ( " hamlet@zulip.com " )
2016-07-28 00:30:22 +02:00
result = self . client_post ( " /json/upload_file " , { ' file ' : f1 } )
2016-07-07 09:47:15 +02:00
json = ujson . loads ( result . content )
uri = json [ " uri " ]
f1_path_id = re . sub ( ' /user_uploads/ ' , ' ' , uri )
2016-07-28 00:30:22 +02:00
result = self . client_post ( " /json/upload_file " , { ' file ' : f2 } )
2016-07-07 09:47:15 +02:00
json = ujson . loads ( result . content )
uri = json [ " uri " ]
f2_path_id = re . sub ( ' /user_uploads/ ' , ' ' , uri )
self . subscribe_to_stream ( " hamlet@zulip.com " , " test " )
body = ( " [f1.txt](http://localhost:9991/user_uploads/ " + f1_path_id + " ) "
2016-12-03 00:04:17 +01:00
" [f2.txt](http://localhost:9991/user_uploads/ " + f2_path_id + " ) " )
2016-07-07 09:47:15 +02:00
msg_id = self . send_message ( " hamlet@zulip.com " , " test " , Recipient . STREAM , body , " test " )
2016-07-28 00:30:22 +02:00
result = self . client_post ( " /json/upload_file " , { ' file ' : f3 } )
2016-07-07 09:47:15 +02:00
json = ujson . loads ( result . content )
uri = json [ " uri " ]
f3_path_id = re . sub ( ' /user_uploads/ ' , ' ' , uri )
new_body = ( " [f3.txt](http://localhost:9991/user_uploads/ " + f3_path_id + " ) "
2016-12-03 00:04:17 +01:00
" [f2.txt](http://localhost:9991/user_uploads/ " + f2_path_id + " ) " )
2016-07-28 00:30:22 +02:00
result = self . client_post ( " /json/update_message " , {
2016-07-07 09:47:15 +02:00
' message_id ' : msg_id ,
' content ' : new_body
} )
self . assert_json_success ( result )
message = Message . objects . get ( id = msg_id )
f1_attachment = Attachment . objects . get ( path_id = f1_path_id )
f2_attachment = Attachment . objects . get ( path_id = f2_path_id )
2016-07-24 22:03:22 +02:00
f3_attachment = Attachment . objects . get ( path_id = f3_path_id )
2016-07-07 09:47:15 +02:00
self . assertTrue ( message not in f1_attachment . messages . all ( ) )
self . assertTrue ( message in f2_attachment . messages . all ( ) )
self . assertTrue ( message in f3_attachment . messages . all ( ) )
2016-07-23 07:06:13 +02:00
# Delete all the attachments from the message
new_body = " (deleted) "
2016-07-28 00:30:22 +02:00
result = self . client_post ( " /json/update_message " , {
2016-07-23 07:06:13 +02:00
' message_id ' : msg_id ,
' content ' : new_body
} )
self . assert_json_success ( result )
message = Message . objects . get ( id = msg_id )
f1_attachment = Attachment . objects . get ( path_id = f1_path_id )
f2_attachment = Attachment . objects . get ( path_id = f2_path_id )
f3_attachment = Attachment . objects . get ( path_id = f3_path_id )
self . assertTrue ( message not in f1_attachment . messages . all ( ) )
self . assertTrue ( message not in f2_attachment . messages . all ( ) )
self . assertTrue ( message not in f3_attachment . messages . all ( ) )
2016-09-20 11:02:15 +02:00
def test_file_name ( self ) :
# type: () -> None
"""
Unicode filenames should be processed correctly .
"""
self . login ( " hamlet@zulip.com " )
for expected in [ " Здравейте.txt " , " test " ] :
fp = StringIO ( " bah! " )
fp . name = urllib . parse . quote ( expected )
result = self . client_post ( " /json/upload_file " , { ' f1 ' : fp } )
content = ujson . loads ( result . content )
assert sanitize_name ( expected ) in content [ ' uri ' ]
2016-04-14 23:44:39 +02:00
def tearDown ( self ) :
2016-06-04 19:54:34 +02:00
# type: () -> None
2016-04-17 23:51:49 +02:00
destroy_uploads ( )
2016-08-23 02:08:42 +02:00
class AvatarTest ( ZulipTestCase ) :
2016-04-17 23:51:49 +02:00
def test_multiple_upload_failure ( self ) :
2016-06-04 19:54:34 +02:00
# type: () -> None
2016-04-17 23:51:49 +02:00
"""
Attempting to upload two files should fail .
"""
self . login ( " hamlet@zulip.com " )
fp1 = open ( os . path . join ( TEST_AVATAR_DIR , ' img.png ' ) , ' rb ' )
fp2 = open ( os . path . join ( TEST_AVATAR_DIR , ' img.png ' ) , ' rb ' )
2016-07-28 00:30:22 +02:00
result = self . client_post ( " /json/set_avatar " , { ' f1 ' : fp1 , ' f2 ' : fp2 } )
2016-04-17 23:51:49 +02:00
self . assert_json_error ( result , " You must upload exactly one avatar. " )
def test_no_file_upload_failure ( self ) :
2016-06-04 19:54:34 +02:00
# type: () -> None
2016-04-17 23:51:49 +02:00
"""
Calling this endpoint with no files should fail .
"""
self . login ( " hamlet@zulip.com " )
2016-07-28 00:30:22 +02:00
result = self . client_post ( " /json/set_avatar " )
2016-04-17 23:51:49 +02:00
self . assert_json_error ( result , " You must upload exactly one avatar. " )
2016-04-17 23:57:03 +02:00
correct_files = [
( ' img.png ' , ' png_resized.png ' ) ,
2016-07-26 08:14:49 +02:00
( ' img.jpg ' , None ) , # jpeg resizing is platform-dependent
2016-04-17 23:57:03 +02:00
( ' img.gif ' , ' gif_resized.png ' ) ,
( ' img.tif ' , ' tif_resized.png ' )
]
corrupt_files = [ ' text.txt ' , ' corrupt.png ' , ' corrupt.gif ' ]
2016-07-13 01:56:59 +02:00
def test_get_gravatar_avatar ( self ) :
# type: () -> None
self . login ( " hamlet@zulip.com " )
cordelia = get_user_profile_by_email ( ' cordelia@zulip.com ' )
cordelia . avatar_source = UserProfile . AVATAR_FROM_GRAVATAR
cordelia . save ( )
with self . settings ( ENABLE_GRAVATAR = True ) :
2016-07-28 00:38:45 +02:00
response = self . client_get ( " /avatar/cordelia@zulip.com?foo=bar " )
2016-07-13 01:56:59 +02:00
redirect_url = response [ ' Location ' ]
self . assertEqual ( redirect_url , avatar_url ( cordelia ) + ' &foo=bar ' )
with self . settings ( ENABLE_GRAVATAR = False ) :
2016-07-28 00:38:45 +02:00
response = self . client_get ( " /avatar/cordelia@zulip.com?foo=bar " )
2016-07-13 01:56:59 +02:00
redirect_url = response [ ' Location ' ]
self . assertTrue ( redirect_url . endswith ( avatar_url ( cordelia ) + ' &foo=bar ' ) )
def test_get_user_avatar ( self ) :
# type: () -> None
self . login ( " hamlet@zulip.com " )
cordelia = get_user_profile_by_email ( ' cordelia@zulip.com ' )
cordelia . avatar_source = UserProfile . AVATAR_FROM_USER
cordelia . save ( )
2016-07-28 00:38:45 +02:00
response = self . client_get ( " /avatar/cordelia@zulip.com?foo=bar " )
2016-07-13 01:56:59 +02:00
redirect_url = response [ ' Location ' ]
self . assertTrue ( redirect_url . endswith ( avatar_url ( cordelia ) + ' &foo=bar ' ) )
def test_non_valid_user_avatar ( self ) :
# type: () -> None
# It's debatable whether we should generate avatars for non-users,
# but this test just validates the current code's behavior.
self . login ( " hamlet@zulip.com " )
2016-07-28 00:38:45 +02:00
response = self . client_get ( " /avatar/nonexistent_user@zulip.com?foo=bar " )
2016-07-13 01:56:59 +02:00
redirect_url = response [ ' Location ' ]
actual_url = ' https://secure.gravatar.com/avatar/444258b521f152129eb0c162996e572d?d=identicon&foo=bar '
self . assertEqual ( redirect_url , actual_url )
2016-04-17 23:57:03 +02:00
def test_valid_avatars ( self ) :
2016-06-04 19:54:34 +02:00
# type: () -> None
2016-04-17 23:57:03 +02:00
"""
A call to / json / set_avatar with a valid file should return a url and actually create an avatar .
"""
for fname , rfname in self . correct_files :
# TODO: use self.subTest once we're exclusively on python 3 by uncommenting the line below.
# with self.subTest(fname=fname):
self . login ( " hamlet@zulip.com " )
fp = open ( os . path . join ( TEST_AVATAR_DIR , fname ) , ' rb ' )
2016-07-28 00:30:22 +02:00
result = self . client_post ( " /json/set_avatar " , { ' file ' : fp } )
2016-04-17 23:57:03 +02:00
self . assert_json_success ( result )
json = ujson . loads ( result . content )
self . assertIn ( " avatar_url " , json )
url = json [ " avatar_url " ]
base = ' /user_avatars/ '
self . assertEquals ( base , url [ : len ( base ) ] )
2016-07-26 08:14:49 +02:00
if rfname is not None :
2016-07-28 00:38:45 +02:00
response = self . client_get ( url )
2016-07-26 08:14:49 +02:00
data = b " " . join ( response . streaming_content )
2016-11-07 05:02:13 +01:00
self . assertEquals ( Image . open ( io . BytesIO ( data ) ) . size , ( 100 , 100 ) )
2016-04-17 23:57:03 +02:00
2016-09-20 21:48:48 +02:00
# Verify that the medium-size avatar was created
user_profile = get_user_profile_by_email ( ' hamlet@zulip.com ' )
medium_avatar_url = avatar_url ( user_profile , medium = True )
medium_avatar_disk_path = os . path . join ( settings . LOCAL_UPLOADS_DIR , " avatars " ,
medium_avatar_url . split ( " / " ) [ - 1 ] . split ( " ? " ) [ 0 ] )
self . assertTrue ( os . path . exists ( medium_avatar_disk_path ) )
# Confirm that ensure_medium_avatar_url works to recreate
# medium size avatars from the original if needed
os . remove ( medium_avatar_disk_path )
self . assertFalse ( os . path . exists ( medium_avatar_disk_path ) )
zerver . lib . upload . upload_backend . ensure_medium_avatar_image ( user_profile . email )
self . assertTrue ( os . path . exists ( medium_avatar_disk_path ) )
2016-04-17 23:57:03 +02:00
def test_invalid_avatars ( self ) :
2016-06-04 19:54:34 +02:00
# type: () -> None
2016-04-17 23:57:03 +02:00
"""
A call to / json / set_avatar with an invalid file should fail .
"""
for fname in self . corrupt_files :
# with self.subTest(fname=fname):
self . login ( " hamlet@zulip.com " )
fp = open ( os . path . join ( TEST_AVATAR_DIR , fname ) , ' rb ' )
2016-07-28 00:30:22 +02:00
result = self . client_post ( " /json/set_avatar " , { ' file ' : fp } )
2016-04-17 23:57:03 +02:00
self . assert_json_error ( result , " Could not decode avatar image; did you upload an image file? " )
2016-04-17 23:51:49 +02:00
def tearDown ( self ) :
2016-06-04 19:54:34 +02:00
# type: () -> None
2016-04-17 23:51:49 +02:00
destroy_uploads ( )
2016-04-14 23:44:39 +02:00
2016-08-23 02:08:42 +02:00
class LocalStorageTest ( ZulipTestCase ) :
2016-03-24 20:24:01 +01:00
2016-04-20 21:50:56 +02:00
def test_file_upload_local ( self ) :
2016-06-04 19:54:34 +02:00
# type: () -> None
2016-04-20 21:50:56 +02:00
sender_email = " hamlet@zulip.com "
user_profile = get_user_profile_by_email ( sender_email )
2016-06-29 17:13:28 +02:00
uri = upload_message_image ( u ' dummy.txt ' , u ' text/plain ' , b ' zulip! ' , user_profile )
2016-04-20 21:50:56 +02:00
base = ' /user_uploads/ '
self . assertEquals ( base , uri [ : len ( base ) ] )
path_id = re . sub ( ' /user_uploads/ ' , ' ' , uri )
file_path = os . path . join ( settings . LOCAL_UPLOADS_DIR , ' files ' , path_id )
self . assertTrue ( os . path . isfile ( file_path ) )
def test_delete_message_image_local ( self ) :
2016-06-04 19:54:34 +02:00
# type: () -> None
2016-04-20 21:50:56 +02:00
self . login ( " hamlet@zulip.com " )
fp = StringIO ( " zulip! " )
fp . name = " zulip.txt "
2016-07-28 00:30:22 +02:00
result = self . client_post ( " /json/upload_file " , { ' file ' : fp } )
2016-04-20 21:50:56 +02:00
json = ujson . loads ( result . content )
uri = json [ " uri " ]
path_id = re . sub ( ' /user_uploads/ ' , ' ' , uri )
2016-06-09 07:53:35 +02:00
self . assertTrue ( delete_message_image ( path_id ) )
2016-04-20 21:50:56 +02:00
def tearDown ( self ) :
2016-06-04 19:54:34 +02:00
# type: () -> None
2016-04-20 21:50:56 +02:00
destroy_uploads ( )
2016-03-24 20:24:01 +01:00
2016-10-11 19:38:22 +02:00
FuncT = TypeVar ( ' FuncT ' , bound = Callable [ . . . , None ] )
2016-06-09 07:53:35 +02:00
def use_s3_backend ( method ) :
2016-10-11 19:38:22 +02:00
# type: (FuncT) -> FuncT
2016-06-09 07:53:35 +02:00
@mock_s3
@override_settings ( LOCAL_UPLOADS_DIR = None )
def new_method ( * args , * * kwargs ) :
2016-10-11 19:38:22 +02:00
# type: (*Any, **Any) -> Any
2016-06-09 07:53:35 +02:00
zerver . lib . upload . upload_backend = S3UploadBackend ( )
try :
return method ( * args , * * kwargs )
finally :
zerver . lib . upload . upload_backend = LocalUploadBackend ( )
return new_method
2016-08-23 02:08:42 +02:00
class S3Test ( ZulipTestCase ) :
2016-04-14 16:26:01 +02:00
2016-06-09 07:53:35 +02:00
@use_s3_backend
2016-04-20 21:51:21 +02:00
def test_file_upload_s3 ( self ) :
2016-06-04 19:54:34 +02:00
# type: () -> None
2016-04-20 21:51:21 +02:00
conn = S3Connection ( settings . S3_KEY , settings . S3_SECRET_KEY )
bucket = conn . create_bucket ( settings . S3_AUTH_UPLOADS_BUCKET )
sender_email = " hamlet@zulip.com "
user_profile = get_user_profile_by_email ( sender_email )
2016-06-29 17:13:28 +02:00
uri = upload_message_image ( u ' dummy.txt ' , u ' text/plain ' , b ' zulip! ' , user_profile )
2016-04-20 21:51:21 +02:00
base = ' /user_uploads/ '
self . assertEquals ( base , uri [ : len ( base ) ] )
path_id = re . sub ( ' /user_uploads/ ' , ' ' , uri )
2016-07-13 22:09:27 +02:00
self . assertEquals ( b " zulip! " , bucket . get_key ( path_id ) . get_contents_as_string ( ) )
2016-04-20 21:51:21 +02:00
2016-06-14 04:38:30 +02:00
self . subscribe_to_stream ( " hamlet@zulip.com " , " Denmark " )
body = " First message ...[zulip.txt](http://localhost:9991 " + uri + " ) "
self . send_message ( " hamlet@zulip.com " , " Denmark " , Recipient . STREAM , body , " test " )
self . assertIn ( ' title= " dummy.txt " ' , self . get_last_message ( ) . rendered_content )
2016-06-09 07:53:35 +02:00
@use_s3_backend
2016-04-20 21:51:21 +02:00
def test_message_image_delete_s3 ( self ) :
2016-06-04 19:54:34 +02:00
# type: () -> None
2016-04-20 21:51:21 +02:00
conn = S3Connection ( settings . S3_KEY , settings . S3_SECRET_KEY )
conn . create_bucket ( settings . S3_AUTH_UPLOADS_BUCKET )
sender_email = " hamlet@zulip.com "
user_profile = get_user_profile_by_email ( sender_email )
2016-06-29 17:13:28 +02:00
uri = upload_message_image ( u ' dummy.txt ' , u ' text/plain ' , b ' zulip! ' , user_profile )
2016-04-20 21:51:21 +02:00
path_id = re . sub ( ' /user_uploads/ ' , ' ' , uri )
2016-06-09 07:53:35 +02:00
self . assertTrue ( delete_message_image ( path_id ) )
2016-04-20 21:51:21 +02:00
2016-06-09 07:53:35 +02:00
@use_s3_backend
2016-04-14 16:26:01 +02:00
def test_file_upload_authed ( self ) :
2016-06-04 19:54:34 +02:00
# type: () -> None
2016-04-14 16:26:01 +02:00
"""
A call to / json / upload_file should return a uri and actually create an object .
"""
2016-06-09 07:53:35 +02:00
conn = S3Connection ( settings . S3_KEY , settings . S3_SECRET_KEY )
conn . create_bucket ( settings . S3_AUTH_UPLOADS_BUCKET )
2016-04-14 16:26:01 +02:00
self . login ( " hamlet@zulip.com " )
fp = StringIO ( " zulip! " )
fp . name = " zulip.txt "
2016-07-28 00:30:22 +02:00
result = self . client_post ( " /json/upload_file " , { ' file ' : fp } )
2016-04-14 16:26:01 +02:00
self . assert_json_success ( result )
json = ujson . loads ( result . content )
self . assertIn ( " uri " , json )
uri = json [ " uri " ]
base = ' /user_uploads/ '
self . assertEquals ( base , uri [ : len ( base ) ] )
2016-07-28 00:38:45 +02:00
response = self . client_get ( uri )
2016-04-14 16:26:01 +02:00
redirect_url = response [ ' Location ' ]
2016-10-11 19:38:22 +02:00
self . assertEquals ( b " zulip! " , urllib . request . urlopen ( redirect_url ) . read ( ) . strip ( ) ) # type: ignore # six.moves.urllib.request.urlopen is not defined in typeshed
2016-04-14 16:26:01 +02:00
2016-06-14 04:38:30 +02:00
self . subscribe_to_stream ( " hamlet@zulip.com " , " Denmark " )
body = " First message ...[zulip.txt](http://localhost:9991 " + uri + " ) "
self . send_message ( " hamlet@zulip.com " , " Denmark " , Recipient . STREAM , body , " test " )
self . assertIn ( ' title= " zulip.txt " ' , self . get_last_message ( ) . rendered_content )
class UploadTitleTests ( TestCase ) :
def test_upload_titles ( self ) :
# type: () -> None
self . assertEqual ( url_filename ( " http://localhost:9991/user_uploads/1/LUeQZUG5jxkagzVzp1Ox_amr/dummy.txt " ) , " dummy.txt " )
self . assertEqual ( url_filename ( " http://localhost:9991/user_uploads/1/94/SzGYe0RFT-tEcOhQ6n-ZblFZ/zulip.txt " ) , " zulip.txt " )
self . assertEqual ( url_filename ( " https://zulip.com/user_uploads/4142/LUeQZUG5jxkagzVzp1Ox_amr/pasted_image.png " ) , " pasted_image.png " )
2016-08-23 02:25:40 +02:00
self . assertEqual ( url_filename ( " https://zulipchat.com/integrations " ) , " https://zulipchat.com/integrations " )
2016-06-14 04:38:30 +02:00
self . assertEqual ( url_filename ( " https://example.com " ) , " https://example.com " )
2016-04-14 16:26:01 +02:00
class SanitizeNameTests ( TestCase ) :
def test_file_name ( self ) :
2016-06-04 19:54:34 +02:00
# type: () -> None
2016-04-14 16:26:01 +02:00
self . assertEquals ( sanitize_name ( u ' test.txt ' ) , u ' test.txt ' )
self . assertEquals ( sanitize_name ( u ' .hidden ' ) , u ' .hidden ' )
self . assertEquals ( sanitize_name ( u ' .hidden.txt ' ) , u ' .hidden.txt ' )
self . assertEquals ( sanitize_name ( u ' tarball.tar.gz ' ) , u ' tarball.tar.gz ' )
self . assertEquals ( sanitize_name ( u ' .hidden_tarball.tar.gz ' ) , u ' .hidden_tarball.tar.gz ' )
self . assertEquals ( sanitize_name ( u ' Testing {} *&*#().ta&& % $##&&r.gz ' ) , u ' Testing.tar.gz ' )
self . assertEquals ( sanitize_name ( u ' *testingfile?*.txt ' ) , u ' testingfile.txt ' )
self . assertEquals ( sanitize_name ( u ' snowman☃.txt ' ) , u ' snowman.txt ' )
self . assertEquals ( sanitize_name ( u ' 테스트.txt ' ) , u ' 테스트.txt ' )
self . assertEquals ( sanitize_name ( u ' ~/. " \ ` \ ?* " u0`000ssh/test.t** {} ar.gz ' ) , u ' .u0000sshtest.tar.gz ' )