2014-01-31 18:53:33 +01:00
|
|
|
# -*- coding: utf-8 -*-
|
|
|
|
from __future__ import absolute_import
|
|
|
|
from django.conf import settings
|
|
|
|
from django.core.exceptions import ValidationError
|
|
|
|
from django.test import TestCase
|
2015-06-10 20:28:32 +02:00
|
|
|
from unittest import skip
|
2014-01-31 18:53:33 +01:00
|
|
|
|
|
|
|
from zerver.forms import not_mit_mailing_list
|
|
|
|
|
|
|
|
from zerver.lib.rate_limiter import (
|
|
|
|
add_ratelimit_rule,
|
|
|
|
clear_user_history,
|
|
|
|
remove_ratelimit_rule,
|
|
|
|
)
|
|
|
|
|
|
|
|
from zerver.lib.actions import compute_mit_user_fullname
|
|
|
|
from zerver.lib.test_helpers import AuthedTestCase
|
|
|
|
from zerver.models import get_user_profile_by_email
|
|
|
|
from zerver.lib.test_runner import slow
|
2016-03-13 10:29:33 +01:00
|
|
|
from zerver.lib.upload import sanitize_name
|
2014-01-31 18:53:33 +01:00
|
|
|
|
|
|
|
import time
|
|
|
|
import ujson
|
2016-01-24 03:39:44 +01:00
|
|
|
from six.moves import urllib
|
2014-01-31 18:53:33 +01:00
|
|
|
|
|
|
|
from boto.s3.connection import S3Connection
|
|
|
|
from boto.s3.key import Key
|
2016-01-24 04:22:35 +01:00
|
|
|
from six.moves import StringIO
|
2015-11-01 17:15:05 +01:00
|
|
|
from six.moves import range
|
2014-01-31 18:53:33 +01:00
|
|
|
|
|
|
|
class MITNameTest(TestCase):
|
|
|
|
def test_valid_hesiod(self):
|
|
|
|
self.assertEquals(compute_mit_user_fullname("starnine@mit.edu"), "Athena Consulting Exchange User")
|
|
|
|
self.assertEquals(compute_mit_user_fullname("sipbexch@mit.edu"), "Exch Sipb")
|
|
|
|
def test_invalid_hesiod(self):
|
|
|
|
self.assertEquals(compute_mit_user_fullname("1234567890@mit.edu"), "1234567890@mit.edu")
|
|
|
|
self.assertEquals(compute_mit_user_fullname("ec-discuss@mit.edu"), "ec-discuss@mit.edu")
|
|
|
|
|
|
|
|
def test_mailinglist(self):
|
|
|
|
self.assertRaises(ValidationError, not_mit_mailing_list, "1234567890@mit.edu")
|
|
|
|
self.assertRaises(ValidationError, not_mit_mailing_list, "ec-discuss@mit.edu")
|
|
|
|
def test_notmailinglist(self):
|
|
|
|
self.assertTrue(not_mit_mailing_list("sipbexch@mit.edu"))
|
|
|
|
|
|
|
|
class S3Test(AuthedTestCase):
|
2016-01-23 01:55:47 +01:00
|
|
|
# full URIs in public bucket
|
|
|
|
test_uris = [] # type: List[str]
|
|
|
|
# keys in authed bucket
|
|
|
|
test_keys = [] # type: List[str]
|
2014-01-31 18:53:33 +01:00
|
|
|
|
|
|
|
@slow(2.6, "has to contact external S3 service")
|
2015-06-10 20:28:32 +02:00
|
|
|
@skip("Need S3 mock")
|
2014-01-31 18:53:33 +01:00
|
|
|
def test_file_upload_authed(self):
|
|
|
|
"""
|
|
|
|
A call to /json/upload_file should return a uri and actually create an object.
|
|
|
|
"""
|
|
|
|
self.login("hamlet@zulip.com")
|
|
|
|
fp = StringIO("zulip!")
|
|
|
|
fp.name = "zulip.txt"
|
|
|
|
|
2014-05-06 01:14:09 +02:00
|
|
|
result = self.client.post("/json/upload_file", {'file': fp})
|
2014-01-31 18:53:33 +01:00
|
|
|
self.assert_json_success(result)
|
|
|
|
json = ujson.loads(result.content)
|
|
|
|
self.assertIn("uri", json)
|
|
|
|
uri = json["uri"]
|
|
|
|
base = '/user_uploads/'
|
|
|
|
self.assertEquals(base, uri[:len(base)])
|
|
|
|
self.test_keys.append(uri[len(base):])
|
|
|
|
|
|
|
|
response = self.client.get(uri)
|
|
|
|
redirect_url = response['Location']
|
|
|
|
|
2016-01-24 03:39:44 +01:00
|
|
|
self.assertEquals("zulip!", urllib.request.urlopen(redirect_url).read().strip())
|
2014-01-31 18:53:33 +01:00
|
|
|
|
2016-04-14 19:35:07 +02:00
|
|
|
def tearDown(self):
|
|
|
|
# clean up
|
|
|
|
return
|
|
|
|
# TODO: un-deadden this code when we have proper S3 mocking.
|
|
|
|
conn = S3Connection(settings.S3_KEY, settings.S3_SECRET_KEY)
|
|
|
|
for uri in self.test_uris:
|
|
|
|
key = Key(conn.get_bucket(settings.S3_BUCKET))
|
|
|
|
key.name = urllib.parse.urlparse(uri).path[1:]
|
|
|
|
key.delete()
|
|
|
|
self.test_uris.remove(uri)
|
|
|
|
|
|
|
|
for path in self.test_keys:
|
|
|
|
key = Key(conn.get_bucket(settings.S3_AUTH_UPLOADS_BUCKET))
|
|
|
|
key.name = path
|
|
|
|
key.delete()
|
|
|
|
self.test_keys.remove(path)
|
|
|
|
|
|
|
|
class FileUploadTest(AuthedTestCase):
|
2014-01-31 18:53:33 +01:00
|
|
|
def test_multiple_upload_failure(self):
|
|
|
|
"""
|
|
|
|
Attempting to upload two files should fail.
|
|
|
|
"""
|
|
|
|
self.login("hamlet@zulip.com")
|
|
|
|
fp = StringIO("bah!")
|
|
|
|
fp.name = "a.txt"
|
|
|
|
fp2 = StringIO("pshaw!")
|
|
|
|
fp2.name = "b.txt"
|
|
|
|
|
|
|
|
result = self.client.post("/json/upload_file", {'f1': fp, 'f2': fp2})
|
|
|
|
self.assert_json_error(result, "You may only upload one file at a time")
|
|
|
|
|
|
|
|
def test_no_file_upload_failure(self):
|
|
|
|
"""
|
|
|
|
Calling this endpoint with no files should fail.
|
|
|
|
"""
|
|
|
|
self.login("hamlet@zulip.com")
|
|
|
|
|
|
|
|
result = self.client.post("/json/upload_file")
|
|
|
|
self.assert_json_error(result, "You must specify a file to upload")
|
|
|
|
|
|
|
|
class RateLimitTests(AuthedTestCase):
|
|
|
|
|
|
|
|
def setUp(self):
|
|
|
|
settings.RATE_LIMITING = True
|
|
|
|
add_ratelimit_rule(1, 5)
|
|
|
|
|
|
|
|
|
|
|
|
def tearDown(self):
|
|
|
|
settings.RATE_LIMITING = False
|
|
|
|
remove_ratelimit_rule(1, 5)
|
|
|
|
|
|
|
|
def send_api_message(self, email, api_key, content):
|
|
|
|
return self.client.post("/api/v1/send_message", {"type": "stream",
|
|
|
|
"to": "Verona",
|
|
|
|
"client": "test suite",
|
|
|
|
"content": content,
|
|
|
|
"subject": "Test subject",
|
|
|
|
"email": email,
|
|
|
|
"api-key": api_key})
|
|
|
|
def test_headers(self):
|
|
|
|
email = "hamlet@zulip.com"
|
|
|
|
user = get_user_profile_by_email(email)
|
|
|
|
clear_user_history(user)
|
|
|
|
api_key = self.get_api_key(email)
|
|
|
|
|
|
|
|
result = self.send_api_message(email, api_key, "some stuff")
|
|
|
|
self.assertTrue('X-RateLimit-Remaining' in result)
|
|
|
|
self.assertTrue('X-RateLimit-Limit' in result)
|
|
|
|
self.assertTrue('X-RateLimit-Reset' in result)
|
|
|
|
|
|
|
|
def test_ratelimit_decrease(self):
|
|
|
|
email = "hamlet@zulip.com"
|
|
|
|
user = get_user_profile_by_email(email)
|
|
|
|
clear_user_history(user)
|
|
|
|
api_key = self.get_api_key(email)
|
|
|
|
result = self.send_api_message(email, api_key, "some stuff")
|
|
|
|
limit = int(result['X-RateLimit-Remaining'])
|
|
|
|
|
|
|
|
result = self.send_api_message(email, api_key, "some stuff 2")
|
|
|
|
newlimit = int(result['X-RateLimit-Remaining'])
|
|
|
|
self.assertEqual(limit, newlimit + 1)
|
|
|
|
|
|
|
|
@slow(1.1, 'has to sleep to work')
|
|
|
|
def test_hit_ratelimits(self):
|
|
|
|
email = "cordelia@zulip.com"
|
|
|
|
user = get_user_profile_by_email(email)
|
|
|
|
clear_user_history(user)
|
|
|
|
|
|
|
|
api_key = self.get_api_key(email)
|
|
|
|
for i in range(6):
|
|
|
|
result = self.send_api_message(email, api_key, "some stuff %s" % (i,))
|
|
|
|
|
2014-03-07 16:47:30 +01:00
|
|
|
self.assertEqual(result.status_code, 429)
|
2014-01-31 18:53:33 +01:00
|
|
|
json = ujson.loads(result.content)
|
|
|
|
self.assertEqual(json.get("result"), "error")
|
|
|
|
self.assertIn("API usage exceeded rate limit, try again in", json.get("msg"))
|
2014-03-07 16:47:30 +01:00
|
|
|
self.assertTrue('Retry-After' in result)
|
|
|
|
self.assertIn(result['Retry-After'], json.get("msg"))
|
2014-01-31 18:53:33 +01:00
|
|
|
|
|
|
|
# We actually wait a second here, rather than force-clearing our history,
|
|
|
|
# to make sure the rate-limiting code automatically forgives a user
|
|
|
|
# after some time has passed.
|
|
|
|
time.sleep(1)
|
|
|
|
|
|
|
|
result = self.send_api_message(email, api_key, "Good message")
|
|
|
|
|
|
|
|
self.assert_json_success(result)
|
|
|
|
|
|
|
|
class APNSTokenTests(AuthedTestCase):
|
|
|
|
def test_add_token(self):
|
|
|
|
email = "cordelia@zulip.com"
|
|
|
|
self.login(email)
|
|
|
|
|
|
|
|
result = self.client.post('/json/users/me/apns_device_token', {'token': "test_token"})
|
|
|
|
self.assert_json_success(result)
|
|
|
|
|
|
|
|
def test_delete_token(self):
|
|
|
|
email = "cordelia@zulip.com"
|
|
|
|
self.login(email)
|
|
|
|
|
|
|
|
token = "test_token"
|
|
|
|
result = self.client.post('/json/users/me/apns_device_token', {'token':token})
|
|
|
|
self.assert_json_success(result)
|
|
|
|
|
|
|
|
result = self.client_delete('/json/users/me/apns_device_token', {'token': token})
|
|
|
|
self.assert_json_success(result)
|
|
|
|
|
|
|
|
class GCMTokenTests(AuthedTestCase):
|
|
|
|
def test_add_token(self):
|
|
|
|
email = "cordelia@zulip.com"
|
|
|
|
self.login(email)
|
|
|
|
|
|
|
|
result = self.client.post('/json/users/me/apns_device_token', {'token': "test_token"})
|
|
|
|
self.assert_json_success(result)
|
|
|
|
|
|
|
|
def test_delete_token(self):
|
|
|
|
email = "cordelia@zulip.com"
|
|
|
|
self.login(email)
|
|
|
|
|
|
|
|
token = "test_token"
|
|
|
|
result = self.client.post('/json/users/me/android_gcm_reg_id', {'token':token})
|
|
|
|
self.assert_json_success(result)
|
|
|
|
|
2016-01-24 03:39:44 +01:00
|
|
|
result = self.client.delete('/json/users/me/android_gcm_reg_id', urllib.parse.urlencode({'token': token}))
|
2014-01-31 18:53:33 +01:00
|
|
|
self.assert_json_success(result)
|
|
|
|
|
|
|
|
def test_change_user(self):
|
|
|
|
token = "test_token"
|
|
|
|
|
|
|
|
self.login("cordelia@zulip.com")
|
|
|
|
result = self.client.post('/json/users/me/android_gcm_reg_id', {'token':token})
|
|
|
|
self.assert_json_success(result)
|
|
|
|
|
|
|
|
self.login("hamlet@zulip.com")
|
|
|
|
result = self.client.post('/json/users/me/android_gcm_reg_id', {'token':token})
|
|
|
|
self.assert_json_success(result)
|
|
|
|
|
2016-03-13 10:29:33 +01:00
|
|
|
class SanitizeNameTests(TestCase):
|
|
|
|
def test_file_name(self):
|
|
|
|
self.assertEquals(sanitize_name(u'test.txt'), u'test.txt')
|
|
|
|
self.assertEquals(sanitize_name(u'.hidden'), u'.hidden')
|
|
|
|
self.assertEquals(sanitize_name(u'.hidden.txt'), u'.hidden.txt')
|
|
|
|
self.assertEquals(sanitize_name(u'tarball.tar.gz'), u'tarball.tar.gz')
|
|
|
|
self.assertEquals(sanitize_name(u'.hidden_tarball.tar.gz'), u'.hidden_tarball.tar.gz')
|
|
|
|
self.assertEquals(sanitize_name(u'Testing{}*&*#().ta&&%$##&&r.gz'), u'Testing.tar.gz')
|
|
|
|
self.assertEquals(sanitize_name(u'*testingfile?*.txt'), u'testingfile.txt')
|
|
|
|
self.assertEquals(sanitize_name(u'snowman☃.txt'), u'snowman.txt')
|
|
|
|
self.assertEquals(sanitize_name(u'테스트.txt'), u'테스트.txt')
|
|
|
|
self.assertEquals(sanitize_name(u'~/."\`\?*"u0`000ssh/test.t**{}ar.gz'), u'.u0000sshtest.tar.gz')
|