Added unit tests for several more methods:

-json/subscriptions/exists
-json/subscriptions/list
-json/subscriptions/add
-json/subscriptions/remove

(imported from commit 95eb5421ac586bea59f749411b54661689946915)
This commit is contained in:
Jacob Hurwitz 2013-01-29 13:47:28 -05:00
parent 4d361f8290
commit 7c59d41006
1 changed files with 277 additions and 0 deletions

View File

@ -18,6 +18,7 @@ import optparse
from django.conf import settings
import re
import sys
import random
try:
settings.TEST_SUITE
@ -571,6 +572,282 @@ class SubscriptionPropertiesTest(AuthedTestCase):
self.assert_json_error(result,
"Unknown property or invalid verb for bad")
class SubscriptionAPITest(AuthedTestCase):
fixtures = ['messages.json']
def setUp(self):
"""
All tests will be logged in as hamlet. Also save various useful values
as attributes that tests can access.
"""
self.test_email = "hamlet@humbughq.com"
self.login(self.test_email)
self.user_profile = self.get_user_profile(self.test_email)
self.realm = self.user_profile.realm
self.streams = self.get_streams(self.test_email)
def get_streams(self, email):
"""
Helper function to get the stream names for a user
"""
user_profile = self.get_user_profile(email)
subs = Subscription.objects.filter(
user_profile = user_profile,
active = True,
recipient__type = Recipient.STREAM)
return [get_display_recipient(sub.recipient) for sub in subs]
def make_random_stream_names(self, existing_stream_names, names_to_avoid):
"""
Helper function to make up random stream names. It takes
existing_stream_names and randomly appends a digit to the end of each,
but avoids names that appear in the list names_to_avoid.
"""
random_streams = []
for stream in existing_stream_names:
random_stream = stream + str(random.randint(0, 9))
if not random_stream in names_to_avoid:
random_streams.append(random_stream)
return random_streams
def test_successful_subscriptions_list(self):
"""
Calling /json/subscriptions/list should successfully return your subscriptions.
"""
result = self.client.post("/json/subscriptions/list", {})
self.assert_json_success(result)
json = simplejson.loads(result.content)
self.assertIn("subscriptions", json)
for stream, color in json["subscriptions"]:
self.assertIsInstance(stream, str)
self.assertIsInstance(color, str)
# check that the stream name corresponds to an actual stream
try:
Stream.objects.get(name__iexact=stream, realm=self.realm)
except Stream.DoesNotExist:
self.fail("stream does not exist")
list_streams = [stream for stream, color in json["subscriptions"]]
# also check that this matches the list of your subscriptions
self.assertItemsEqual(list_streams, self.streams)
def helper_check_subs_before_and_after(self, url, subscriptions, other_params, json_dict, email, new_subs):
"""
Helper function that posts to url with the subscriptions value set to
subscriptions and optional other_params specified as a dict. It checks
that the returned JSON dict contains all keys in json_dict with the
corresponding correct values. Finally, it checks that the subscriptions
after this call for the given email are equal to new_subs.
"""
data = {"subscriptions": simplejson.dumps(subscriptions)}
data.update(other_params)
result = self.client.post(url, data)
self.assert_json_success(result)
json = simplejson.loads(result.content)
for key, val in json_dict.iteritems():
self.assertIn(key, json)
if isinstance(val, list):
self.assertItemsEqual(val, json[key]) # we don't care about the order of the items
else:
self.assertEqual(val, json[key])
new_streams = self.get_streams(email)
self.assertItemsEqual(new_streams, new_subs)
def test_successful_subscriptions_add(self):
"""
Calling /json/subscriptions/add should successfully add streams, and
should determine which are new subscriptions vs which were already
subscribed. We randomly generate stream names to add, because it
doesn't matter whether the stream already exists.
"""
self.assertNotEqual(len(self.streams), 0) # necessary for full test coverage
add_streams = self.make_random_stream_names(self.streams, self.streams)
self.assertNotEqual(len(add_streams), 0) # necessary for full test coverage
self.helper_check_subs_before_and_after("/json/subscriptions/add", self.streams + add_streams, {},
{"subscribed": add_streams,
"already_subscribed": self.streams},
self.test_email, self.streams + add_streams)
def test_subscriptions_add_too_long(self):
"""
Calling /json/subscriptions/add on a stream whose name is >30
characters should return a JSON error.
"""
# character limit is 30 characters
long_stream_name = "a" * 31
result = self.client.post("/json/subscriptions/add",
{"subscriptions": simplejson.dumps([long_stream_name])})
self.assert_json_error(result,
"Stream name (%s) too long." % (long_stream_name,))
def test_subscriptions_add_invalid_stream(self):
"""
Calling /json/subscriptions/add on a stream whose name is invalid (as
defined by valid_stream_name in zephyr/views.py) should return a JSON
error.
"""
# currently, the only invalid name is the empty string
invalid_stream_name = ""
result = self.client.post("/json/subscriptions/add",
{"subscriptions": simplejson.dumps([invalid_stream_name])})
self.assert_json_error(result,
"Invalid stream name (%s)." % (invalid_stream_name,))
def test_subscriptions_add_for_principal(self):
"""
Calling /json/subscriptions/add on behalf of another principal (for
whom you have permission to add subscriptions) should successfully add
those subscriptions and send a message to the subscribee notifying
them.
"""
other_email = "iago@humbughq.com"
other_profile = UserProfile.objects.get(user__email=other_email)
self.assertIsInstance(other_profile, UserProfile)
current_streams = self.get_streams(other_email)
self.assertNotEqual(len(current_streams), 0) # necessary for full test coverage
add_streams = self.make_random_stream_names(current_streams, current_streams)
self.assertNotEqual(len(add_streams), 0) # necessary for full test coverage
streams_to_sub = add_streams[:1] # just add one, to make the message easier to check
streams_to_sub.extend(current_streams)
self.helper_check_subs_before_and_after("/json/subscriptions/add", streams_to_sub,
{"principal": other_email},
{"subscribed": add_streams[:1],
"already_subscribed": current_streams},
other_email, streams_to_sub)
# verify that the user was sent a message informing them about the subscription
msg = Message.objects.latest('id')
self.assertEqual(msg.recipient.type, msg.recipient.PERSONAL)
self.assertEqual(msg.sender_id, UserProfile.objects.get(user__email="humbug+notifications@humbughq.com").id)
expected_msg = ("Hi there! We thought you'd like to know that %s just "
"subscribed you to stream '%s'"
% (self.user_profile.full_name, add_streams[0]))
self.assertEqual(msg.content, expected_msg)
recipients = get_display_recipient(msg.recipient)
self.assertEqual(len(recipients), 1)
self.assertEqual(recipients[0]['email'], other_email)
def test_subscription_add_invalid_principal(self):
"""
Calling /json/subscriptions/add on behalf of a principal that does not
exist should return a JSON error.
"""
invalid_principal = "rosencrantz-and-guildenstern@humbughq.com"
# verify that invalid_principal actually doesn't exist
with self.assertRaises(UserProfile.DoesNotExist):
UserProfile.objects.get(user__email=invalid_principal)
result = self.client.post("/json/subscriptions/add",
{"subscriptions": simplejson.dumps(self.streams),
"principal": invalid_principal})
self.assert_json_error(result, "User not authorized to execute queries on behalf of '%s'"
% (invalid_principal,))
def test_subscription_add_principal_other_realm(self):
"""
Calling /json/subscriptions/add on behalf of a principal in another
realm should return a JSON error.
"""
principal = "starnine@mit.edu"
profile = UserProfile.objects.get(user__email=principal)
# verify that principal exists (thus, the reason for the error is the cross-realming)
self.assertIsInstance(profile, UserProfile)
result = self.client.post("/json/subscriptions/add",
{"subscriptions": simplejson.dumps(self.streams),
"principal": principal})
self.assert_json_error(result, "User not authorized to execute queries on behalf of '%s'"
% (principal,))
def test_successful_subscriptions_remove(self):
"""
Calling /json/subscriptions/remove should successfully remove streams,
and should determine which were removed vs which weren't subscribed to.
We cannot randomly generate stream names because the remove code
verifies whether streams exist.
"""
if len(self.streams) < 2:
self.fail() # necesssary for full test coverage
streams_to_remove = self.streams[1:]
not_subbed = []
for stream in Stream.objects.all():
if not stream.name in self.streams:
not_subbed.append(stream.name)
random.shuffle(not_subbed)
self.assertNotEqual(len(not_subbed), 0) # necessary for full test coverage
try_to_remove = not_subbed[:3] # attempt to remove up to 3 streams not already subbed to
streams_to_remove.extend(try_to_remove)
self.helper_check_subs_before_and_after("/json/subscriptions/remove", streams_to_remove, {},
{"removed": self.streams[1:],
"not_subscribed": try_to_remove},
self.test_email, [self.streams[0]])
def test_subscriptions_remove_fake_stream(self):
"""
Calling /json/subscriptions/remove on a stream that doesn't exist
should return a JSON error.
"""
all_stream_names = [stream.name for stream in Stream.objects.filter(realm=self.realm)]
random_streams = self.make_random_stream_names(self.streams, all_stream_names)
self.assertNotEqual(len(random_streams), 0) # necessary for full test coverage
streams_to_remove = random_streams[:1] # pick only one fake stream, to make checking the error message easy
result = self.client.post("/json/subscriptions/remove",
{"subscriptions": simplejson.dumps(streams_to_remove)})
self.assert_json_error(result, "Stream %s does not exist" % (random_streams[0],))
def helper_subscriptions_exists(self, stream, exists, subscribed):
"""
A helper function that calls /json/subscriptions/exists on a stream and
verifies that the returned JSON dictionary has the exists and
subscribed values passed in as parameters. (If subscribed should not be
present, pass in None.)
"""
result = self.client.post("/json/subscriptions/exists",
{"stream": stream})
self.assert_json_success(result)
json = simplejson.loads(result.content)
self.assertIn("exists", json)
self.assertEqual(json["exists"], exists)
if not subscribed is None:
self.assertIn("subscribed", json)
self.assertEqual(json["subscribed"], subscribed)
def test_successful_subscriptions_exists_subbed(self):
"""
Calling /json/subscriptions/exist on a stream to which you are subbed
should return that it exists and that you are subbed.
"""
self.assertNotEqual(len(self.streams), 0) # necessary for full test coverage
self.helper_subscriptions_exists(self.streams[0], True, True)
def test_successful_subscriptions_exists_not_subbed(self):
"""
Calling /json/subscriptions/exist on a stream to which you are not
subbed should return that it exists and that you are not subbed.
"""
all_stream_names = [stream.name for stream in Stream.objects.filter(realm=self.realm)]
streams_not_subbed = list(set(all_stream_names) - set(self.streams))
self.assertNotEqual(len(streams_not_subbed), 0) # necessary for full test coverage
self.helper_subscriptions_exists(streams_not_subbed[0], True, False)
def test_subscriptions_does_not_exist(self):
"""
Calling /json/subscriptions/exist on a stream that doesn't exist should
return that it doesn't exist.
"""
all_stream_names = [stream.name for stream in Stream.objects.filter(realm=self.realm)]
random_streams = self.make_random_stream_names(self.streams, all_stream_names)
self.assertNotEqual(len(random_streams), 0) # necessary for full test coverage
self.helper_subscriptions_exists(random_streams[0], False, None)
def test_subscriptions_exist_invalid_name(self):
"""
Calling /json/subscriptions/exist on a stream whose name is invalid (as
defined by valid_stream_name in zephyr/views.py) should return a JSON
error.
"""
# currently, the only invalid stream name is the empty string
invalid_stream_name = ""
result = self.client.post("/json/subscriptions/exists",
{"stream": invalid_stream_name})
self.assert_json_error(result, "Invalid characters in stream name")
class GetOldMessagesTest(AuthedTestCase):
fixtures = ['messages.json']