mirror of https://github.com/zulip/zulip.git
737 lines
30 KiB
Python
737 lines
30 KiB
Python
from django.contrib.auth.models import User
|
|
from django.test import TestCase
|
|
from django.test.simple import DjangoTestSuiteRunner
|
|
from django.utils.timezone import now
|
|
from django.db.models import Q
|
|
|
|
from zephyr.models import Message, UserProfile, Stream, Recipient, Subscription, \
|
|
filter_by_subscriptions, get_display_recipient, Realm, do_send_message, Client
|
|
from zephyr.views import json_get_updates, api_get_messages, gather_subscriptions
|
|
from zephyr.decorator import RespondAsynchronously
|
|
from zephyr.lib.initial_password import initial_password, initial_api_key
|
|
|
|
import simplejson
|
|
import subprocess
|
|
import optparse
|
|
from django.conf import settings
|
|
import re
|
|
|
|
settings.MESSAGE_LOG = "/tmp/test-message-log"
|
|
settings.EMAIL_BACKEND = 'django.core.mail.backends.locmem.EmailBackend'
|
|
settings.TORNADO_SERVER = None
|
|
|
|
|
|
def find_key_by_email(address):
|
|
from django.core.mail import outbox
|
|
key_regex = re.compile("accounts/do_confirm/([a-f0-9]{40})>")
|
|
for message in reversed(outbox):
|
|
if address in message.to:
|
|
return key_regex.search(message.body).groups()[0]
|
|
|
|
class AuthedTestCase(TestCase):
|
|
def login(self, email, password=None):
|
|
if password is None:
|
|
password = initial_password(email)
|
|
return self.client.post('/accounts/login/',
|
|
{'username':email, 'password':password})
|
|
|
|
def register(self, username, password):
|
|
self.client.post('/accounts/home/',
|
|
{'email': username + '@humbughq.com'})
|
|
return self.client.post('/accounts/register/',
|
|
{'full_name': username, 'password': password,
|
|
'key': find_key_by_email(username + '@humbughq.com'),
|
|
'terms': True})
|
|
def get_api_key(self, email):
|
|
return initial_api_key(email)
|
|
|
|
def get_user_profile(self, email):
|
|
"""
|
|
Given an email address, return the UserProfile object for the
|
|
User that has that email.
|
|
"""
|
|
# Usernames are unique, even across Realms.
|
|
return UserProfile.objects.get(user__email=email)
|
|
|
|
def send_message(self, sender_name, recipient_name, message_type):
|
|
sender = self.get_user_profile(sender_name)
|
|
if message_type == Recipient.PERSONAL:
|
|
recipient = self.get_user_profile(recipient_name)
|
|
else:
|
|
recipient = Stream.objects.get(name=recipient_name, realm=sender.realm)
|
|
recipient = Recipient.objects.get(type_id=recipient.id, type=message_type)
|
|
pub_date = now()
|
|
(sending_client, _) = Client.objects.get_or_create(name="test suite")
|
|
do_send_message(Message(sender=sender, recipient=recipient, subject="test",
|
|
pub_date=pub_date, sending_client=sending_client))
|
|
|
|
def users_subscribed_to_stream(self, stream_name, realm_domain):
|
|
realm = Realm.objects.get(domain=realm_domain)
|
|
stream = Stream.objects.get(name=stream_name, realm=realm)
|
|
recipient = Recipient.objects.get(type_id=stream.id, type=Recipient.STREAM)
|
|
subscriptions = Subscription.objects.filter(recipient=recipient)
|
|
|
|
return [subscription.user_profile.user for subscription in subscriptions]
|
|
|
|
def message_stream(self, user):
|
|
return filter_by_subscriptions(Message.objects.all(), user)
|
|
|
|
def assert_json_success(self, result):
|
|
"""
|
|
Successful POSTs return a 200 and JSON of the form {"result": "success",
|
|
"msg": ""}.
|
|
"""
|
|
self.assertEquals(result.status_code, 200)
|
|
json = simplejson.loads(result.content)
|
|
self.assertEquals(json.get("result"), "success")
|
|
# We have a msg key for consistency with errors, but it typically has an
|
|
# empty value.
|
|
self.assertTrue("msg" in json)
|
|
|
|
def assert_json_error(self, result, msg):
|
|
"""
|
|
Invalid POSTs return a 400 and JSON of the form {"result": "error",
|
|
"msg": "reason"}.
|
|
"""
|
|
self.assertEquals(result.status_code, 400)
|
|
json = simplejson.loads(result.content)
|
|
self.assertEquals(json.get("result"), "error")
|
|
self.assertEquals(json.get("msg"), msg)
|
|
|
|
class PublicURLTest(TestCase):
|
|
"""
|
|
Account creation URLs are accessible even when not logged in. Authenticated
|
|
URLs redirect to a page.
|
|
"""
|
|
def fetch(self, urls, expected_status):
|
|
for url in urls:
|
|
response = self.client.get(url)
|
|
self.assertEqual(response.status_code, expected_status,
|
|
msg="Expected %d, received %d for %s" % (
|
|
expected_status, response.status_code, url))
|
|
|
|
def test_public_urls(self):
|
|
"""
|
|
Test which views are accessible when not logged in.
|
|
"""
|
|
# FIXME: We should also test the Tornado URLs -- this codepath
|
|
# can't do so because this Django test mechanism doesn't go
|
|
# through Tornado.
|
|
urls = {200: ["/accounts/home/", "/accounts/login/"],
|
|
302: ["/", "/accounts/logout/"],
|
|
405: ["/accounts/register/",
|
|
"/api/v1/get_public_streams",
|
|
"/api/v1/subscriptions/list",
|
|
"/api/v1/subscriptions/add",
|
|
"/api/v1/subscriptions/remove",
|
|
"/api/v1/send_message",
|
|
"/api/v1/fetch_api_key",
|
|
"/json/fetch_api_key",
|
|
"/json/send_message",
|
|
"/json/update_pointer",
|
|
"/json/settings/change",
|
|
"/json/subscriptions/list",
|
|
"/json/subscriptions/remove",
|
|
"/json/subscriptions/exists",
|
|
"/json/subscriptions/add"],
|
|
}
|
|
for status_code, url_set in urls.iteritems():
|
|
self.fetch(url_set, status_code)
|
|
|
|
|
|
class LoginTest(AuthedTestCase):
|
|
"""
|
|
Logging in, registration, and logging out.
|
|
"""
|
|
fixtures = ['messages.json']
|
|
|
|
def test_login(self):
|
|
self.login("hamlet@humbughq.com")
|
|
user = User.objects.get(email='hamlet@humbughq.com')
|
|
self.assertEqual(self.client.session['_auth_user_id'], user.id)
|
|
|
|
def test_login_bad_password(self):
|
|
self.login("hamlet@humbughq.com", "wrongpassword")
|
|
self.assertIsNone(self.client.session.get('_auth_user_id', None))
|
|
|
|
def test_register(self):
|
|
self.register("test", "test")
|
|
user = User.objects.get(email='test@humbughq.com')
|
|
self.assertEqual(self.client.session['_auth_user_id'], user.id)
|
|
|
|
def test_logout(self):
|
|
self.login("hamlet@humbughq.com")
|
|
self.client.post('/accounts/logout/')
|
|
self.assertIsNone(self.client.session.get('_auth_user_id', None))
|
|
|
|
|
|
class PersonalMessagesTest(AuthedTestCase):
|
|
fixtures = ['messages.json']
|
|
|
|
def test_auto_subbed_to_personals(self):
|
|
"""
|
|
Newly created users are auto-subbed to the ability to receive
|
|
personals.
|
|
"""
|
|
self.register("test", "test")
|
|
user = User.objects.get(email='test@humbughq.com')
|
|
old_messages = self.message_stream(user)
|
|
self.send_message("test@humbughq.com", "test@humbughq.com", Recipient.PERSONAL)
|
|
new_messages = self.message_stream(user)
|
|
self.assertEqual(len(new_messages) - len(old_messages), 1)
|
|
|
|
recipient = Recipient.objects.get(type_id=user.id, type=Recipient.PERSONAL)
|
|
self.assertEqual(new_messages[-1].recipient, recipient)
|
|
|
|
def test_personal_to_self(self):
|
|
"""
|
|
If you send a personal to yourself, only you see it.
|
|
"""
|
|
old_users = list(User.objects.all())
|
|
self.register("test1", "test1")
|
|
|
|
old_messages = []
|
|
for user in old_users:
|
|
old_messages.append(len(self.message_stream(user)))
|
|
|
|
self.send_message("test1@humbughq.com", "test1@humbughq.com", Recipient.PERSONAL)
|
|
|
|
new_messages = []
|
|
for user in old_users:
|
|
new_messages.append(len(self.message_stream(user)))
|
|
|
|
self.assertEqual(old_messages, new_messages)
|
|
|
|
user = User.objects.get(email="test1@humbughq.com")
|
|
recipient = Recipient.objects.get(type_id=user.id, type=Recipient.PERSONAL)
|
|
self.assertEqual(self.message_stream(user)[-1].recipient, recipient)
|
|
|
|
def test_personal(self):
|
|
"""
|
|
If you send a personal, only you and the recipient see it.
|
|
"""
|
|
self.login("hamlet@humbughq.com")
|
|
|
|
old_sender = User.objects.filter(email="hamlet@humbughq.com")
|
|
old_sender_messages = len(self.message_stream(old_sender))
|
|
|
|
old_recipient = User.objects.filter(email="othello@humbughq.com")
|
|
old_recipient_messages = len(self.message_stream(old_recipient))
|
|
|
|
other_users = User.objects.filter(~Q(email="hamlet@humbughq.com") & ~Q(email="othello@humbughq.com"))
|
|
old_other_messages = []
|
|
for user in other_users:
|
|
old_other_messages.append(len(self.message_stream(user)))
|
|
|
|
self.send_message("hamlet@humbughq.com", "othello@humbughq.com", Recipient.PERSONAL)
|
|
|
|
# Users outside the conversation don't get the message.
|
|
new_other_messages = []
|
|
for user in other_users:
|
|
new_other_messages.append(len(self.message_stream(user)))
|
|
|
|
self.assertEqual(old_other_messages, new_other_messages)
|
|
|
|
# The personal message is in the streams of both the sender and receiver.
|
|
self.assertEqual(len(self.message_stream(old_sender)),
|
|
old_sender_messages + 1)
|
|
self.assertEqual(len(self.message_stream(old_recipient)),
|
|
old_recipient_messages + 1)
|
|
|
|
sender = User.objects.get(email="hamlet@humbughq.com")
|
|
receiver = User.objects.get(email="othello@humbughq.com")
|
|
recipient = Recipient.objects.get(type_id=receiver.id, type=Recipient.PERSONAL)
|
|
self.assertEqual(self.message_stream(sender)[-1].recipient, recipient)
|
|
self.assertEqual(self.message_stream(receiver)[-1].recipient, recipient)
|
|
|
|
class StreamMessagesTest(AuthedTestCase):
|
|
fixtures = ['messages.json']
|
|
|
|
def test_message_to_stream(self):
|
|
"""
|
|
If you send a message to a stream, everyone subscribed to the stream
|
|
receives the messages.
|
|
"""
|
|
subscribers = self.users_subscribed_to_stream("Scotland", "humbughq.com")
|
|
old_subscriber_messages = []
|
|
for subscriber in subscribers:
|
|
old_subscriber_messages.append(len(self.message_stream(subscriber)))
|
|
|
|
non_subscribers = [user for user in User.objects.all() if user not in subscribers]
|
|
old_non_subscriber_messages = []
|
|
for non_subscriber in non_subscribers:
|
|
old_non_subscriber_messages.append(len(self.message_stream(non_subscriber)))
|
|
|
|
a_subscriber_email = subscribers[0].email
|
|
self.login(a_subscriber_email)
|
|
self.send_message(a_subscriber_email, "Scotland", Recipient.STREAM)
|
|
|
|
new_subscriber_messages = []
|
|
for subscriber in subscribers:
|
|
new_subscriber_messages.append(len(self.message_stream(subscriber)))
|
|
|
|
new_non_subscriber_messages = []
|
|
for non_subscriber in non_subscribers:
|
|
new_non_subscriber_messages.append(len(self.message_stream(non_subscriber)))
|
|
|
|
self.assertEqual(old_non_subscriber_messages, new_non_subscriber_messages)
|
|
self.assertEqual(new_subscriber_messages, [elt + 1 for elt in old_subscriber_messages])
|
|
|
|
class PointerTest(AuthedTestCase):
|
|
fixtures = ['messages.json']
|
|
|
|
def test_update_pointer(self):
|
|
"""
|
|
Posting a pointer to /update (in the form {"pointer": pointer}) changes
|
|
the pointer we store for your UserProfile.
|
|
"""
|
|
self.login("hamlet@humbughq.com")
|
|
self.assertEquals(self.get_user_profile("hamlet@humbughq.com").pointer, -1)
|
|
result = self.client.post("/json/update_pointer", {"pointer": 1})
|
|
self.assert_json_success(result)
|
|
self.assertEquals(self.get_user_profile("hamlet@humbughq.com").pointer, 1)
|
|
|
|
def test_api_update_pointer(self):
|
|
"""
|
|
Same as above, but for the API view
|
|
"""
|
|
email = "hamlet@humbughq.com"
|
|
api_key = self.get_api_key(email)
|
|
self.assertEquals(self.get_user_profile(email).pointer, -1)
|
|
result = self.client.post("/api/v1/update_pointer", {"email": email,
|
|
"api-key": api_key,
|
|
"client_id": "blah",
|
|
"pointer": 1})
|
|
self.assert_json_success(result)
|
|
self.assertEquals(self.get_user_profile(email).pointer, 1)
|
|
|
|
def test_missing_pointer(self):
|
|
"""
|
|
Posting json to /json/update_pointer which does not contain a pointer key/value pair
|
|
returns a 400 and error message.
|
|
"""
|
|
self.login("hamlet@humbughq.com")
|
|
self.assertEquals(self.get_user_profile("hamlet@humbughq.com").pointer, -1)
|
|
result = self.client.post("/json/update_pointer", {"foo": 1})
|
|
self.assert_json_error(result, "Missing 'pointer' argument")
|
|
self.assertEquals(self.get_user_profile("hamlet@humbughq.com").pointer, -1)
|
|
|
|
def test_invalid_pointer(self):
|
|
"""
|
|
Posting json to /json/update_pointer with an invalid pointer returns a 400 and error
|
|
message.
|
|
"""
|
|
self.login("hamlet@humbughq.com")
|
|
self.assertEquals(self.get_user_profile("hamlet@humbughq.com").pointer, -1)
|
|
result = self.client.post("/json/update_pointer", {"pointer": "foo"})
|
|
self.assert_json_error(result, "Bad value for 'pointer': foo")
|
|
self.assertEquals(self.get_user_profile("hamlet@humbughq.com").pointer, -1)
|
|
|
|
def test_pointer_out_of_range(self):
|
|
"""
|
|
Posting json to /json/update_pointer with an out of range (< 0) pointer returns a 400
|
|
and error message.
|
|
"""
|
|
self.login("hamlet@humbughq.com")
|
|
self.assertEquals(self.get_user_profile("hamlet@humbughq.com").pointer, -1)
|
|
result = self.client.post("/json/update_pointer", {"pointer": -2})
|
|
self.assert_json_error(result, "Invalid pointer value")
|
|
self.assertEquals(self.get_user_profile("hamlet@humbughq.com").pointer, -1)
|
|
|
|
class MessagePOSTTest(AuthedTestCase):
|
|
fixtures = ['messages.json']
|
|
|
|
def test_message_to_self(self):
|
|
"""
|
|
Sending a message to a stream to which you are subscribed is
|
|
successful.
|
|
"""
|
|
self.login("hamlet@humbughq.com")
|
|
result = self.client.post("/json/send_message", {"type": "stream",
|
|
"to": "Verona",
|
|
"client": "test suite",
|
|
"content": "Test message",
|
|
"subject": "Test subject"})
|
|
self.assert_json_success(result)
|
|
|
|
def test_api_message_to_self(self):
|
|
"""
|
|
Same as above, but for the API view
|
|
"""
|
|
email = "hamlet@humbughq.com"
|
|
api_key = self.get_api_key(email)
|
|
result = self.client.post("/api/v1/send_message", {"type": "stream",
|
|
"to": "Verona",
|
|
"client": "test suite",
|
|
"content": "Test message",
|
|
"subject": "Test subject",
|
|
"email": email,
|
|
"api-key": api_key})
|
|
self.assert_json_success(result)
|
|
|
|
def test_message_to_nonexistent_stream(self):
|
|
"""
|
|
Sending a message to a nonexistent stream fails.
|
|
"""
|
|
self.login("hamlet@humbughq.com")
|
|
self.assertFalse(Stream.objects.filter(name="nonexistent_stream"))
|
|
result = self.client.post("/json/send_message", {"type": "stream",
|
|
"to": "nonexistent_stream",
|
|
"client": "test suite",
|
|
"content": "Test message",
|
|
"subject": "Test subject"})
|
|
self.assert_json_error(result, "Stream does not exist")
|
|
|
|
def test_personal_message(self):
|
|
"""
|
|
Sending a personal message to a valid username is successful.
|
|
"""
|
|
self.login("hamlet@humbughq.com")
|
|
result = self.client.post("/json/send_message", {"type": "private",
|
|
"content": "Test message",
|
|
"client": "test suite",
|
|
"to": "othello@humbughq.com"})
|
|
self.assert_json_success(result)
|
|
|
|
def test_personal_message_to_nonexistent_user(self):
|
|
"""
|
|
Sending a personal message to an invalid email returns error JSON.
|
|
"""
|
|
self.login("hamlet@humbughq.com")
|
|
result = self.client.post("/json/send_message", {"type": "private",
|
|
"content": "Test message",
|
|
"client": "test suite",
|
|
"to": "nonexistent"})
|
|
self.assert_json_error(result, "Invalid email 'nonexistent'")
|
|
|
|
def test_invalid_type(self):
|
|
"""
|
|
Sending a message of unknown type returns error JSON.
|
|
"""
|
|
self.login("hamlet@humbughq.com")
|
|
result = self.client.post("/json/send_message", {"type": "invalid type",
|
|
"content": "Test message",
|
|
"client": "test suite",
|
|
"to": "othello@humbughq.com"})
|
|
self.assert_json_error(result, "Invalid message type")
|
|
|
|
class SubscriptionPropertiesTest(AuthedTestCase):
|
|
fixtures = ['messages.json']
|
|
|
|
def test_get_stream_colors(self):
|
|
"""
|
|
A GET request to
|
|
/json/subscriptions/property?property=stream_colors returns a
|
|
list of (stream, color) pairs, both of which are strings.
|
|
"""
|
|
test_email = "hamlet@humbughq.com"
|
|
self.login(test_email)
|
|
result = self.client.get("/json/subscriptions/property",
|
|
{"property": "stream_colors"})
|
|
|
|
self.assert_json_success(result)
|
|
json = simplejson.loads(result.content)
|
|
self.assertIn("stream_colors", json)
|
|
|
|
subs = gather_subscriptions(self.get_user_profile(test_email))
|
|
for stream, color in json["stream_colors"]:
|
|
self.assertTrue(isinstance(color, str))
|
|
self.assertTrue(isinstance(stream, str))
|
|
self.assertIn((stream, color), subs)
|
|
subs.remove((stream, color))
|
|
self.assertFalse(subs)
|
|
|
|
def test_set_stream_color(self):
|
|
"""
|
|
A POST request to /json/subscriptions/property with stream_name and
|
|
color data sets the stream color, and for that stream only.
|
|
"""
|
|
test_email = "hamlet@humbughq.com"
|
|
self.login(test_email)
|
|
|
|
old_subs = gather_subscriptions(self.get_user_profile(test_email))
|
|
stream_name, old_color = old_subs[0]
|
|
new_color = "#ffffff" # TODO: ensure that this is different from old_color
|
|
result = self.client.post("/json/subscriptions/property",
|
|
{"property": "stream_colors",
|
|
"stream_name": stream_name,
|
|
"color": "#ffffff"})
|
|
|
|
self.assert_json_success(result)
|
|
|
|
new_subs = gather_subscriptions(self.get_user_profile(test_email))
|
|
self.assertIn((stream_name, new_color), new_subs)
|
|
|
|
old_subs.remove((stream_name, old_color))
|
|
new_subs.remove((stream_name, new_color))
|
|
self.assertEqual(old_subs, new_subs)
|
|
|
|
def test_set_color_missing_stream_name(self):
|
|
"""
|
|
Updating the stream_colors property requires a stream_name.
|
|
"""
|
|
test_email = "hamlet@humbughq.com"
|
|
self.login(test_email)
|
|
result = self.client.post("/json/subscriptions/property",
|
|
{"property": "stream_colors",
|
|
"color": "#ffffff"})
|
|
|
|
self.assert_json_error(result, "Missing stream_name")
|
|
|
|
def test_set_color_missing_color(self):
|
|
"""
|
|
Updating the stream_colors property requires a color.
|
|
"""
|
|
test_email = "hamlet@humbughq.com"
|
|
self.login(test_email)
|
|
result = self.client.post("/json/subscriptions/property",
|
|
{"property": "stream_colors",
|
|
"stream_name": "test"})
|
|
|
|
self.assert_json_error(result, "Missing color")
|
|
|
|
def test_set_invalid_property(self):
|
|
"""
|
|
Trying to set an invalid property returns a JSON error.
|
|
"""
|
|
self.login("hamlet@humbughq.com")
|
|
result = self.client.post("/json/subscriptions/property",
|
|
{"property": "bad"})
|
|
|
|
self.assert_json_error(result,
|
|
"Unknown property or invalid verb for bad")
|
|
|
|
class GetOldMessagesTest(AuthedTestCase):
|
|
fixtures = ['messages.json']
|
|
|
|
def post_with_params(self, modified_params):
|
|
post_params = {"anchor": 1, "num_before": 1, "num_after": 1,
|
|
"narrow": simplejson.dumps({})}
|
|
post_params.update(modified_params)
|
|
return self.client.post("/json/get_old_messages", dict(post_params))
|
|
|
|
def check_well_formed_messages_response(self, result):
|
|
self.assertIn("messages", result)
|
|
self.assertTrue(isinstance(result["messages"], list))
|
|
for message in result["messages"]:
|
|
for field in ("content", "content_type", "display_recipient",
|
|
"gravatar_hash", "recipient_id", "sender_full_name",
|
|
"sender_short_name", "timestamp"):
|
|
self.assertIn(field, message)
|
|
|
|
def successful_get_old_messages(self):
|
|
"""
|
|
A call to /json/get_old_messages with valid parameters returns a list of
|
|
messages.
|
|
"""
|
|
self.login("hamlet@humbughq.com")
|
|
self.check_well_formed_messages_response(self.post_with_params({}))
|
|
|
|
def get_old_messages_with_narrow_recipient_id(self):
|
|
"""
|
|
A request for old messages with a narrow recipient_id only returns
|
|
messages for that id.
|
|
"""
|
|
self.login("hamlet@humbughq.com")
|
|
messages = self.message_stream(User.objects.get(email="hamlet@humbughq.com"))
|
|
recipient_id = messages[0].recipient.id
|
|
|
|
json_result = self.post_with_params({"narrow": simplejson.dumps(
|
|
{"recipient_id": recipient_id})})
|
|
self.assert_json_success(json_result)
|
|
|
|
result = simplejson.loads(json_result.content)
|
|
self.check_well_formed_messages_response(result)
|
|
|
|
for message in result["messages"]:
|
|
self.assertEquals(message["recipient_id"], recipient_id)
|
|
|
|
def get_old_messages_with_narrow_stream(self):
|
|
"""
|
|
A request for old messages with a narrow stream only returns messages
|
|
for that stream.
|
|
"""
|
|
self.login("hamlet@humbughq.com")
|
|
messages = self.message_stream(User.objects.get(email="hamlet@humbughq.com"))
|
|
stream_messages = filter(lambda msg: msg.recipient.type == Recipient.STREAM,
|
|
messages)
|
|
stream_name = get_display_recipient(stream_messages[0].recipient)
|
|
stream_id = stream_messages[0].recipient.id
|
|
|
|
json_result = self.post_with_params({"narrow": simplejson.dumps(
|
|
{"stream": stream_name})})
|
|
self.assert_json_success(json_result)
|
|
|
|
result = simplejson.loads(json_result.content)
|
|
self.check_well_formed_messages_response(result)
|
|
|
|
for message in result["messages"]:
|
|
self.assertEquals(message["type"], "stream")
|
|
self.assertEquals(message["recipient_id"], stream_id)
|
|
|
|
def test_missing_params(self):
|
|
"""
|
|
anchor, num_before, num_after, and narrow are all required
|
|
POST parameters for get_old_messages.
|
|
"""
|
|
self.login("hamlet@humbughq.com")
|
|
|
|
required_args = (("anchor", 1), ("num_before", 1), ("num_after", 1),
|
|
("narrow", {}))
|
|
|
|
for i in range(len(required_args)):
|
|
post_params = dict(required_args[:i])
|
|
result = self.client.post("/json/get_old_messages", post_params)
|
|
self.assert_json_error(result,
|
|
"Missing '%s' argument" % (required_args[i][0],))
|
|
|
|
def test_bad_int_params(self):
|
|
"""
|
|
anchor, num_before, num_after, and narrow must all be non-negative
|
|
integers or strings that can be converted to non-negative integers.
|
|
"""
|
|
self.login("hamlet@humbughq.com")
|
|
|
|
other_params = [("narrow", {})]
|
|
int_params = ["anchor", "num_before", "num_after"]
|
|
|
|
bad_types = (False, "", "-1", -1)
|
|
for idx, param in enumerate(int_params):
|
|
for type in bad_types:
|
|
# Rotate through every bad type for every integer
|
|
# parameter, one at a time.
|
|
post_params = dict(other_params + [(param, type)] + \
|
|
[(other_param, 0) for other_param in \
|
|
int_params[:idx] + int_params[idx + 1:]]
|
|
)
|
|
result = self.client.post("/json/get_old_messages", post_params)
|
|
self.assert_json_error(result,
|
|
"Bad value for '%s': %s" % (param, type))
|
|
|
|
def test_bad_narrow_type(self):
|
|
"""
|
|
narrow must be a dictionary.
|
|
"""
|
|
self.login("hamlet@humbughq.com")
|
|
|
|
other_params = [("anchor", 0), ("num_before", 0), ("num_after", 0)]
|
|
|
|
bad_types = (False, 0, "", "{malformed json,")
|
|
for type in bad_types:
|
|
post_params = dict(other_params + [("narrow", type)])
|
|
result = self.client.post("/json/get_old_messages", post_params)
|
|
self.assert_json_error(result,
|
|
"Bad value for 'narrow': %s" % (type,))
|
|
|
|
def exercise_bad_narrow_content(self, narrow_key, bad_content):
|
|
other_params = [("anchor", 0), ("num_before", 0), ("num_after", 0)]
|
|
for content in bad_content:
|
|
post_params = dict(other_params + [("narrow",
|
|
simplejson.dumps({narrow_key: content}))])
|
|
result = self.client.post("/json/get_old_messages", post_params)
|
|
self.assert_json_error(result,
|
|
"Invalid %s %s" % (narrow_key, content,))
|
|
|
|
def test_bad_narrow_stream_content(self):
|
|
"""
|
|
If an invalid stream name is requested in get_old_messages, an error is
|
|
returned.
|
|
"""
|
|
self.login("hamlet@humbughq.com")
|
|
bad_stream_content = ("non-existent stream", 0, [])
|
|
self.exercise_bad_narrow_content("stream", bad_stream_content)
|
|
|
|
def test_bad_narrow_one_on_one_email_content(self):
|
|
"""
|
|
If an invalid one_on_one_email is requested in get_old_messages, an
|
|
error is returned.
|
|
"""
|
|
self.login("hamlet@humbughq.com")
|
|
bad_stream_content = ("non-existent email", 0, [])
|
|
self.exercise_bad_narrow_content("one_on_one_email", bad_stream_content)
|
|
|
|
class DummyHandler(object):
|
|
def __init__(self, assert_callback):
|
|
self.assert_callback = assert_callback
|
|
|
|
# Mocks RequestHandler.async_callback, which wraps a callback to
|
|
# handle exceptions. We return the callback as-is.
|
|
def async_callback(self, cb):
|
|
return cb
|
|
|
|
def write(self, response):
|
|
raise NotImplemented
|
|
|
|
def finish(self, response):
|
|
if self.assert_callback:
|
|
self.assert_callback(response)
|
|
|
|
class DummySession(object):
|
|
session_key = "0"
|
|
|
|
class POSTRequestMock(object):
|
|
method = "POST"
|
|
|
|
def __init__(self, post_data, user, assert_callback=None):
|
|
self.POST = post_data
|
|
self.user = user
|
|
self._tornado_handler = DummyHandler(assert_callback)
|
|
self.session = DummySession()
|
|
self.META = {'PATH_INFO': 'test'}
|
|
|
|
class GetUpdatesTest(AuthedTestCase):
|
|
fixtures = ['messages.json']
|
|
|
|
def common_test_get_updates(self, view_func, extra_post_data = {}):
|
|
user = User.objects.get(email="hamlet@humbughq.com")
|
|
|
|
def callback(response):
|
|
correct_message_ids = [m.id for m in
|
|
filter_by_subscriptions(Message.objects.all(), user)]
|
|
for message in response['messages']:
|
|
self.assertGreater(message['id'], 1)
|
|
self.assertIn(message['id'], correct_message_ids)
|
|
|
|
post_data = {"last": str(1), "first": str(1)}
|
|
post_data.update(extra_post_data)
|
|
request = POSTRequestMock(post_data, user, callback)
|
|
self.assertEquals(view_func(request), RespondAsynchronously)
|
|
|
|
def test_json_get_updates(self):
|
|
"""
|
|
json_get_updates returns messages with IDs greater than the
|
|
last_received ID.
|
|
"""
|
|
self.login("hamlet@humbughq.com")
|
|
self.common_test_get_updates(json_get_updates)
|
|
|
|
def test_api_get_messages(self):
|
|
"""
|
|
Same as above, but for the API view
|
|
"""
|
|
email = "hamlet@humbughq.com"
|
|
api_key = self.get_api_key(email)
|
|
self.common_test_get_updates(api_get_messages, {'email': email, 'api-key': api_key})
|
|
|
|
def test_missing_last_received(self):
|
|
"""
|
|
Calling json_get_updates without any arguments should work
|
|
"""
|
|
self.login("hamlet@humbughq.com")
|
|
user = User.objects.get(email="hamlet@humbughq.com")
|
|
|
|
request = POSTRequestMock({}, user)
|
|
self.assertEquals(json_get_updates(request), RespondAsynchronously)
|
|
|
|
class Runner(DjangoTestSuiteRunner):
|
|
option_list = (
|
|
optparse.make_option('--skip-generate',
|
|
dest='generate', default=True, action='store_false',
|
|
help='Skip generating test fixtures')
|
|
,)
|
|
|
|
def __init__(self, generate, *args, **kwargs):
|
|
if generate:
|
|
subprocess.check_call("zephyr/tests/generate-fixtures");
|
|
DjangoTestSuiteRunner.__init__(self, *args, **kwargs)
|