mirror of https://github.com/zulip/zulip.git
Create test_signup.py.
(imported from commit 4dffbed84fbe60d5f5deaedfb614e4eb156b34c7)
This commit is contained in:
parent
88fbd5d16a
commit
f42b526749
|
@ -0,0 +1,486 @@
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
from django.test import TestCase
|
||||||
|
|
||||||
|
from zilencer.models import Deployment
|
||||||
|
|
||||||
|
from zerver.models import (
|
||||||
|
get_user_profile_by_email,
|
||||||
|
PreregistrationUser, Realm, ScheduledJob, UserProfile,
|
||||||
|
)
|
||||||
|
|
||||||
|
from zerver.lib.actions import (
|
||||||
|
create_stream_if_needed,
|
||||||
|
do_add_subscription,
|
||||||
|
set_default_streams,
|
||||||
|
)
|
||||||
|
|
||||||
|
from zerver.lib.digest import send_digest_email
|
||||||
|
from zerver.lib.notifications import enqueue_welcome_emails, one_click_unsubscribe_link
|
||||||
|
from zerver.lib.test_helpers import AuthedTestCase, find_key_by_email, queries_captured
|
||||||
|
from zerver.lib.test_runner import slow
|
||||||
|
|
||||||
|
import re
|
||||||
|
import ujson
|
||||||
|
|
||||||
|
from urlparse import urlparse
|
||||||
|
|
||||||
|
|
||||||
|
class PublicURLTest(TestCase):
|
||||||
|
"""
|
||||||
|
Account creation URLs are accessible even when not logged in. Authenticated
|
||||||
|
URLs redirect to a page.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def fetch(self, method, urls, expected_status):
|
||||||
|
for url in urls:
|
||||||
|
if method == "get":
|
||||||
|
response = self.client.get(url)
|
||||||
|
else:
|
||||||
|
response = self.client.post(url)
|
||||||
|
self.assertEqual(response.status_code, expected_status,
|
||||||
|
msg="Expected %d, received %d for %s to %s" % (
|
||||||
|
expected_status, response.status_code, method, url))
|
||||||
|
|
||||||
|
def test_public_urls(self):
|
||||||
|
"""
|
||||||
|
Test which views are accessible when not logged in.
|
||||||
|
"""
|
||||||
|
# FIXME: We should also test the Tornado URLs -- this codepath
|
||||||
|
# can't do so because this Django test mechanism doesn't go
|
||||||
|
# through Tornado.
|
||||||
|
get_urls = {200: ["/accounts/home/", "/accounts/login/"],
|
||||||
|
302: ["/"],
|
||||||
|
401: ["/api/v1/streams/Denmark/members",
|
||||||
|
"/api/v1/users/me/subscriptions",
|
||||||
|
"/api/v1/messages",
|
||||||
|
],
|
||||||
|
}
|
||||||
|
post_urls = {200: ["/accounts/login/"],
|
||||||
|
302: ["/accounts/logout/"],
|
||||||
|
401: ["/json/get_public_streams",
|
||||||
|
"/json/get_old_messages",
|
||||||
|
"/json/update_pointer",
|
||||||
|
"/json/send_message",
|
||||||
|
"/json/invite_users",
|
||||||
|
"/json/settings/change",
|
||||||
|
"/json/subscriptions/remove",
|
||||||
|
"/json/subscriptions/exists",
|
||||||
|
"/json/subscriptions/add",
|
||||||
|
"/json/subscriptions/property",
|
||||||
|
"/json/get_subscribers",
|
||||||
|
"/json/fetch_api_key",
|
||||||
|
"/api/v1/users/me/subscriptions",
|
||||||
|
],
|
||||||
|
400: ["/api/v1/send_message",
|
||||||
|
"/api/v1/external/github",
|
||||||
|
"/api/v1/fetch_api_key",
|
||||||
|
],
|
||||||
|
}
|
||||||
|
for status_code, url_set in get_urls.iteritems():
|
||||||
|
self.fetch("get", url_set, status_code)
|
||||||
|
for status_code, url_set in post_urls.iteritems():
|
||||||
|
self.fetch("post", url_set, status_code)
|
||||||
|
|
||||||
|
class LoginTest(AuthedTestCase):
|
||||||
|
"""
|
||||||
|
Logging in, registration, and logging out.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def test_login(self):
|
||||||
|
self.login("hamlet@zulip.com")
|
||||||
|
user_profile = get_user_profile_by_email('hamlet@zulip.com')
|
||||||
|
self.assertEqual(self.client.session['_auth_user_id'], user_profile.id)
|
||||||
|
|
||||||
|
def test_login_bad_password(self):
|
||||||
|
self.login("hamlet@zulip.com", "wrongpassword")
|
||||||
|
self.assertIsNone(self.client.session.get('_auth_user_id', None))
|
||||||
|
|
||||||
|
def test_login_nonexist_user(self):
|
||||||
|
result = self.login("xxx@zulip.com", "xxx")
|
||||||
|
self.assertIn("Please enter a correct email and password", result.content)
|
||||||
|
|
||||||
|
def test_register(self):
|
||||||
|
realm = Realm.objects.get(domain="zulip.com")
|
||||||
|
streams = ["stream_%s" % i for i in xrange(40)]
|
||||||
|
for stream in streams:
|
||||||
|
create_stream_if_needed(realm, stream)
|
||||||
|
|
||||||
|
set_default_streams(realm, streams)
|
||||||
|
with queries_captured() as queries:
|
||||||
|
self.register("test", "test")
|
||||||
|
# Ensure the number of queries we make is not O(streams)
|
||||||
|
self.assert_length(queries, 59)
|
||||||
|
user_profile = get_user_profile_by_email('test@zulip.com')
|
||||||
|
self.assertEqual(self.client.session['_auth_user_id'], user_profile.id)
|
||||||
|
|
||||||
|
def test_register_deactivated(self):
|
||||||
|
"""
|
||||||
|
If you try to register for a deactivated realm, you get a clear error
|
||||||
|
page.
|
||||||
|
"""
|
||||||
|
realm = Realm.objects.get(domain="zulip.com")
|
||||||
|
realm.deactivated = True
|
||||||
|
realm.save(update_fields=["deactivated"])
|
||||||
|
|
||||||
|
result = self.register("test", "test")
|
||||||
|
self.assertIn("has been deactivated", result.content.replace("\n", " "))
|
||||||
|
|
||||||
|
with self.assertRaises(UserProfile.DoesNotExist):
|
||||||
|
get_user_profile_by_email('test@zulip.com')
|
||||||
|
|
||||||
|
def test_login_deactivated(self):
|
||||||
|
"""
|
||||||
|
If you try to log in to a deactivated realm, you get a clear error page.
|
||||||
|
"""
|
||||||
|
realm = Realm.objects.get(domain="zulip.com")
|
||||||
|
realm.deactivated = True
|
||||||
|
realm.save(update_fields=["deactivated"])
|
||||||
|
|
||||||
|
result = self.login("hamlet@zulip.com")
|
||||||
|
self.assertIn("has been deactivated", result.content.replace("\n", " "))
|
||||||
|
|
||||||
|
def test_logout(self):
|
||||||
|
self.login("hamlet@zulip.com")
|
||||||
|
self.client.post('/accounts/logout/')
|
||||||
|
self.assertIsNone(self.client.session.get('_auth_user_id', None))
|
||||||
|
|
||||||
|
def test_non_ascii_login(self):
|
||||||
|
"""
|
||||||
|
You can log in even if your password contain non-ASCII characters.
|
||||||
|
"""
|
||||||
|
email = "test@zulip.com"
|
||||||
|
password = u"hümbüǵ"
|
||||||
|
|
||||||
|
# Registering succeeds.
|
||||||
|
self.register("test", password)
|
||||||
|
user_profile = get_user_profile_by_email(email)
|
||||||
|
self.assertEqual(self.client.session['_auth_user_id'], user_profile.id)
|
||||||
|
self.client.post('/accounts/logout/')
|
||||||
|
self.assertIsNone(self.client.session.get('_auth_user_id', None))
|
||||||
|
|
||||||
|
# Logging in succeeds.
|
||||||
|
self.client.post('/accounts/logout/')
|
||||||
|
self.login(email, password)
|
||||||
|
self.assertEqual(self.client.session['_auth_user_id'], user_profile.id)
|
||||||
|
|
||||||
|
def test_register_first_user_with_invites(self):
|
||||||
|
"""
|
||||||
|
The first user in a realm has a special step in their signup workflow
|
||||||
|
for inviting coworkers. Do as realistic an end-to-end test as we can
|
||||||
|
without Tornado running.
|
||||||
|
"""
|
||||||
|
username = "user1"
|
||||||
|
password = "test"
|
||||||
|
domain = "test.com"
|
||||||
|
email = "user1@test.com"
|
||||||
|
|
||||||
|
# Create a new realm to ensure that we're the first user in it.
|
||||||
|
realm = Realm.objects.create(domain=domain, name="Test Inc.")
|
||||||
|
deployment = Deployment.objects.all().first()
|
||||||
|
deployment.realms.add(realm)
|
||||||
|
deployment.save()
|
||||||
|
|
||||||
|
# Start the signup process by supplying an email address.
|
||||||
|
result = self.client.post('/accounts/home/', {'email': email})
|
||||||
|
|
||||||
|
# Check the redirect telling you to check your mail for a confirmation
|
||||||
|
# link.
|
||||||
|
self.assertEquals(result.status_code, 302)
|
||||||
|
self.assertTrue(result["Location"].endswith(
|
||||||
|
"/accounts/send_confirm/%s%%40%s" % (username, domain)))
|
||||||
|
result = self.client.get(result["Location"])
|
||||||
|
self.assertIn("Check your email so we can get started.", result.content)
|
||||||
|
|
||||||
|
# Visit the confirmation link.
|
||||||
|
from django.core.mail import outbox
|
||||||
|
for message in reversed(outbox):
|
||||||
|
if email in message.to:
|
||||||
|
confirmation_link_pattern = re.compile("example.com(\S+)>")
|
||||||
|
confirmation_url = confirmation_link_pattern.search(
|
||||||
|
message.body).groups()[0]
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
raise ValueError("Couldn't find a confirmation email.")
|
||||||
|
|
||||||
|
result = self.client.get(confirmation_url)
|
||||||
|
self.assertEquals(result.status_code, 200)
|
||||||
|
|
||||||
|
# Pick a password and agree to the ToS.
|
||||||
|
result = self.submit_reg_form_for_user(username, password, domain)
|
||||||
|
self.assertEquals(result.status_code, 302)
|
||||||
|
self.assertTrue(result["Location"].endswith("/invite/"))
|
||||||
|
|
||||||
|
# Invite coworkers to join you.
|
||||||
|
result = self.client.get(result["Location"])
|
||||||
|
self.assertIn("You're the first one here!", result.content)
|
||||||
|
|
||||||
|
# Reset the outbox for our invites.
|
||||||
|
outbox.pop()
|
||||||
|
|
||||||
|
invitees = ['alice@' + domain, 'bob@' + domain]
|
||||||
|
params = {
|
||||||
|
'invitee_emails': ujson.dumps(invitees)
|
||||||
|
}
|
||||||
|
result = self.client.post('/json/bulk_invite_users', params)
|
||||||
|
self.assert_json_success(result)
|
||||||
|
|
||||||
|
# We really did email these users, and they have PreregistrationUser
|
||||||
|
# objects.
|
||||||
|
email_recipients = [message.recipients()[0] for message in outbox]
|
||||||
|
self.assertEqual(len(outbox), len(invitees))
|
||||||
|
self.assertItemsEqual(email_recipients, invitees)
|
||||||
|
|
||||||
|
user_profile = get_user_profile_by_email(email)
|
||||||
|
self.assertEqual(len(invitees), PreregistrationUser.objects.filter(
|
||||||
|
referred_by=user_profile).count())
|
||||||
|
|
||||||
|
# After this we start manipulating browser information, so stop here.
|
||||||
|
|
||||||
|
class InviteUserTest(AuthedTestCase):
|
||||||
|
|
||||||
|
def invite(self, users, streams):
|
||||||
|
"""
|
||||||
|
Invites the specified users to Zulip with the specified streams.
|
||||||
|
|
||||||
|
users should be a string containing the users to invite, comma or
|
||||||
|
newline separated.
|
||||||
|
|
||||||
|
streams should be a list of strings.
|
||||||
|
"""
|
||||||
|
|
||||||
|
return self.client.post("/json/invite_users",
|
||||||
|
{"invitee_emails": users,
|
||||||
|
"stream": streams})
|
||||||
|
|
||||||
|
def check_sent_emails(self, correct_recipients):
|
||||||
|
from django.core.mail import outbox
|
||||||
|
self.assertEqual(len(outbox), len(correct_recipients))
|
||||||
|
email_recipients = [email.recipients()[0] for email in outbox]
|
||||||
|
self.assertItemsEqual(email_recipients, correct_recipients)
|
||||||
|
|
||||||
|
def test_bulk_invite_users(self):
|
||||||
|
# The bulk_invite_users code path is for the first user in a realm.
|
||||||
|
self.login('hamlet@zulip.com')
|
||||||
|
invitees = ['alice@zulip.com', 'bob@zulip.com']
|
||||||
|
params = {
|
||||||
|
'invitee_emails': ujson.dumps(invitees)
|
||||||
|
}
|
||||||
|
result = self.client.post('/json/bulk_invite_users', params)
|
||||||
|
self.assert_json_success(result)
|
||||||
|
self.check_sent_emails(invitees)
|
||||||
|
|
||||||
|
def test_successful_invite_user(self):
|
||||||
|
"""
|
||||||
|
A call to /json/invite_users with valid parameters causes an invitation
|
||||||
|
email to be sent.
|
||||||
|
"""
|
||||||
|
self.login("hamlet@zulip.com")
|
||||||
|
invitee = "alice-test@zulip.com"
|
||||||
|
self.assert_json_success(self.invite(invitee, ["Denmark"]))
|
||||||
|
self.assertTrue(find_key_by_email(invitee))
|
||||||
|
self.check_sent_emails([invitee])
|
||||||
|
|
||||||
|
def test_multi_user_invite(self):
|
||||||
|
"""
|
||||||
|
Invites multiple users with a variety of delimiters.
|
||||||
|
"""
|
||||||
|
self.login("hamlet@zulip.com")
|
||||||
|
# Intentionally use a weird string.
|
||||||
|
self.assert_json_success(self.invite(
|
||||||
|
"""bob-test@zulip.com, carol-test@zulip.com,
|
||||||
|
dave-test@zulip.com
|
||||||
|
|
||||||
|
|
||||||
|
earl-test@zulip.com""", ["Denmark"]))
|
||||||
|
for user in ("bob", "carol", "dave", "earl"):
|
||||||
|
self.assertTrue(find_key_by_email("%s-test@zulip.com" % user))
|
||||||
|
self.check_sent_emails(["bob-test@zulip.com", "carol-test@zulip.com",
|
||||||
|
"dave-test@zulip.com", "earl-test@zulip.com"])
|
||||||
|
|
||||||
|
def test_missing_or_invalid_params(self):
|
||||||
|
"""
|
||||||
|
Tests inviting with various missing or invalid parameters.
|
||||||
|
"""
|
||||||
|
self.login("hamlet@zulip.com")
|
||||||
|
self.assert_json_error(
|
||||||
|
self.client.post("/json/invite_users", {"invitee_emails": "foo@zulip.com"}),
|
||||||
|
"You must specify at least one stream for invitees to join.")
|
||||||
|
|
||||||
|
for address in ("noatsign.com", "outsideyourdomain@example.net"):
|
||||||
|
self.assert_json_error(
|
||||||
|
self.invite(address, ["Denmark"]),
|
||||||
|
"Some emails did not validate, so we didn't send any invitations.")
|
||||||
|
self.check_sent_emails([])
|
||||||
|
|
||||||
|
def test_invalid_stream(self):
|
||||||
|
"""
|
||||||
|
Tests inviting to a non-existent stream.
|
||||||
|
"""
|
||||||
|
self.login("hamlet@zulip.com")
|
||||||
|
self.assert_json_error(self.invite("iago-test@zulip.com", ["NotARealStream"]),
|
||||||
|
"Stream does not exist: NotARealStream. No invites were sent.")
|
||||||
|
self.check_sent_emails([])
|
||||||
|
|
||||||
|
def test_invite_existing_user(self):
|
||||||
|
"""
|
||||||
|
If you invite an address already using Zulip, no invitation is sent.
|
||||||
|
"""
|
||||||
|
self.login("hamlet@zulip.com")
|
||||||
|
self.assert_json_error(
|
||||||
|
self.client.post("/json/invite_users",
|
||||||
|
{"invitee_emails": "hamlet@zulip.com",
|
||||||
|
"stream": ["Denmark"]}),
|
||||||
|
"We weren't able to invite anyone.")
|
||||||
|
self.assertRaises(PreregistrationUser.DoesNotExist,
|
||||||
|
lambda: PreregistrationUser.objects.get(
|
||||||
|
email="hamlet@zulip.com"))
|
||||||
|
self.check_sent_emails([])
|
||||||
|
|
||||||
|
def test_invite_some_existing_some_new(self):
|
||||||
|
"""
|
||||||
|
If you invite a mix of already existing and new users, invitations are
|
||||||
|
only sent to the new users.
|
||||||
|
"""
|
||||||
|
self.login("hamlet@zulip.com")
|
||||||
|
existing = ["hamlet@zulip.com", "othello@zulip.com"]
|
||||||
|
new = ["foo-test@zulip.com", "bar-test@zulip.com"]
|
||||||
|
|
||||||
|
result = self.client.post("/json/invite_users",
|
||||||
|
{"invitee_emails": "\n".join(existing + new),
|
||||||
|
"stream": ["Denmark"]})
|
||||||
|
self.assert_json_error(result,
|
||||||
|
"Some of those addresses are already using Zulip, \
|
||||||
|
so we didn't send them an invitation. We did send invitations to everyone else!")
|
||||||
|
|
||||||
|
# We only created accounts for the new users.
|
||||||
|
for email in existing:
|
||||||
|
self.assertRaises(PreregistrationUser.DoesNotExist,
|
||||||
|
lambda: PreregistrationUser.objects.get(
|
||||||
|
email=email))
|
||||||
|
for email in new:
|
||||||
|
self.assertTrue(PreregistrationUser.objects.get(email=email))
|
||||||
|
|
||||||
|
# We only sent emails to the new users.
|
||||||
|
self.check_sent_emails(new)
|
||||||
|
|
||||||
|
def test_invite_outside_domain_in_closed_realm(self):
|
||||||
|
"""
|
||||||
|
In a realm with `restricted_to_domain = True`, you can't invite people
|
||||||
|
with a different domain from that of the realm or your e-mail address.
|
||||||
|
"""
|
||||||
|
zulip_realm = Realm.objects.get(domain="zulip.com")
|
||||||
|
zulip_realm.restricted_to_domain = True
|
||||||
|
zulip_realm.save()
|
||||||
|
|
||||||
|
self.login("hamlet@zulip.com")
|
||||||
|
external_address = "foo@example.com"
|
||||||
|
|
||||||
|
self.assert_json_error(
|
||||||
|
self.invite(external_address, ["Denmark"]),
|
||||||
|
"Some emails did not validate, so we didn't send any invitations.")
|
||||||
|
|
||||||
|
@slow(0.20, 'inviting is slow')
|
||||||
|
def test_invite_outside_domain_in_open_realm(self):
|
||||||
|
"""
|
||||||
|
In a realm with `restricted_to_domain = False`, you can invite people
|
||||||
|
with a different domain from that of the realm or your e-mail address.
|
||||||
|
"""
|
||||||
|
zulip_realm = Realm.objects.get(domain="zulip.com")
|
||||||
|
zulip_realm.restricted_to_domain = False
|
||||||
|
zulip_realm.save()
|
||||||
|
|
||||||
|
self.login("hamlet@zulip.com")
|
||||||
|
external_address = "foo@example.com"
|
||||||
|
|
||||||
|
self.assert_json_success(self.invite(external_address, ["Denmark"]))
|
||||||
|
self.check_sent_emails([external_address])
|
||||||
|
|
||||||
|
def test_invite_with_non_ascii_streams(self):
|
||||||
|
"""
|
||||||
|
Inviting someone to streams with non-ASCII characters succeeds.
|
||||||
|
"""
|
||||||
|
self.login("hamlet@zulip.com")
|
||||||
|
invitee = "alice-test@zulip.com"
|
||||||
|
|
||||||
|
stream_name = u"hümbüǵ"
|
||||||
|
realm = Realm.objects.get(domain="zulip.com")
|
||||||
|
stream, _ = create_stream_if_needed(realm, stream_name)
|
||||||
|
|
||||||
|
# Make sure we're subscribed before inviting someone.
|
||||||
|
do_add_subscription(
|
||||||
|
get_user_profile_by_email("hamlet@zulip.com"),
|
||||||
|
stream, no_log=True)
|
||||||
|
|
||||||
|
self.assert_json_success(self.invite(invitee, [stream_name]))
|
||||||
|
|
||||||
|
class EmailUnsubscribeTests(AuthedTestCase):
|
||||||
|
def test_missedmessage_unsubscribe(self):
|
||||||
|
"""
|
||||||
|
We provide one-click unsubscribe links in missed message
|
||||||
|
e-mails that you can click even when logged out to update your
|
||||||
|
email notification settings.
|
||||||
|
"""
|
||||||
|
user_profile = get_user_profile_by_email("hamlet@zulip.com")
|
||||||
|
user_profile.enable_offline_email_notifications = True
|
||||||
|
user_profile.save()
|
||||||
|
|
||||||
|
unsubscribe_link = one_click_unsubscribe_link(user_profile,
|
||||||
|
"missed_messages")
|
||||||
|
result = self.client.get(urlparse(unsubscribe_link).path)
|
||||||
|
|
||||||
|
self.assertEqual(result.status_code, 200)
|
||||||
|
# Circumvent user_profile caching.
|
||||||
|
user_profile = UserProfile.objects.get(email="hamlet@zulip.com")
|
||||||
|
self.assertFalse(user_profile.enable_offline_email_notifications)
|
||||||
|
|
||||||
|
def test_welcome_unsubscribe(self):
|
||||||
|
"""
|
||||||
|
We provide one-click unsubscribe links in welcome e-mails that you can
|
||||||
|
click even when logged out to stop receiving them.
|
||||||
|
"""
|
||||||
|
email = "hamlet@zulip.com"
|
||||||
|
user_profile = get_user_profile_by_email("hamlet@zulip.com")
|
||||||
|
|
||||||
|
# Simulate a new user signing up, which enqueues 2 welcome e-mails.
|
||||||
|
enqueue_welcome_emails(email, "King Hamlet")
|
||||||
|
self.assertEqual(2, len(ScheduledJob.objects.filter(
|
||||||
|
type=ScheduledJob.EMAIL, filter_string__iexact=email)))
|
||||||
|
|
||||||
|
# Simulate unsubscribing from the welcome e-mails.
|
||||||
|
unsubscribe_link = one_click_unsubscribe_link(user_profile, "welcome")
|
||||||
|
result = self.client.get(urlparse(unsubscribe_link).path)
|
||||||
|
|
||||||
|
# The welcome email jobs are no longer scheduled.
|
||||||
|
self.assertEqual(result.status_code, 200)
|
||||||
|
self.assertEqual(0, len(ScheduledJob.objects.filter(
|
||||||
|
type=ScheduledJob.EMAIL, filter_string__iexact=email)))
|
||||||
|
|
||||||
|
def test_digest_unsubscribe(self):
|
||||||
|
"""
|
||||||
|
We provide one-click unsubscribe links in digest e-mails that you can
|
||||||
|
click even when logged out to stop receiving them.
|
||||||
|
|
||||||
|
Unsubscribing from these emails also dequeues any digest email jobs that
|
||||||
|
have been queued.
|
||||||
|
"""
|
||||||
|
email = "hamlet@zulip.com"
|
||||||
|
user_profile = get_user_profile_by_email("hamlet@zulip.com")
|
||||||
|
self.assertTrue(user_profile.enable_digest_emails)
|
||||||
|
|
||||||
|
# Enqueue a fake digest email.
|
||||||
|
send_digest_email(user_profile, "", "")
|
||||||
|
self.assertEqual(1, len(ScheduledJob.objects.filter(
|
||||||
|
type=ScheduledJob.EMAIL, filter_string__iexact=email)))
|
||||||
|
|
||||||
|
# Simulate unsubscribing from digest e-mails.
|
||||||
|
unsubscribe_link = one_click_unsubscribe_link(user_profile, "digest")
|
||||||
|
result = self.client.get(urlparse(unsubscribe_link).path)
|
||||||
|
|
||||||
|
# The setting is toggled off, and scheduled jobs have been removed.
|
||||||
|
self.assertEqual(result.status_code, 200)
|
||||||
|
# Circumvent user_profile caching.
|
||||||
|
user_profile = UserProfile.objects.get(email="hamlet@zulip.com")
|
||||||
|
self.assertFalse(user_profile.enable_digest_emails)
|
||||||
|
self.assertEqual(0, len(ScheduledJob.objects.filter(
|
||||||
|
type=ScheduledJob.EMAIL, filter_string__iexact=email)))
|
||||||
|
|
477
zerver/tests.py
477
zerver/tests.py
|
@ -4,18 +4,14 @@ from __future__ import absolute_import
|
||||||
from django.test import TestCase
|
from django.test import TestCase
|
||||||
|
|
||||||
from zerver.lib.test_helpers import (
|
from zerver.lib.test_helpers import (
|
||||||
find_key_by_email, queries_captured, simulated_empty_cache,
|
queries_captured, simulated_empty_cache,
|
||||||
simulated_queue_client, tornado_redirected_to_list, AuthedTestCase,
|
simulated_queue_client, tornado_redirected_to_list, AuthedTestCase,
|
||||||
most_recent_usermessage, POSTRequestMock, most_recent_message,
|
most_recent_usermessage, POSTRequestMock, most_recent_message,
|
||||||
)
|
)
|
||||||
|
|
||||||
from zerver.lib.test_runner import slow
|
|
||||||
|
|
||||||
from zilencer.models import Deployment
|
|
||||||
|
|
||||||
from zerver.models import Message, UserProfile, Recipient, \
|
from zerver.models import Message, UserProfile, Recipient, \
|
||||||
Realm, Client, UserActivity, ScheduledJob, \
|
Realm, Client, UserActivity, \
|
||||||
PreregistrationUser, UserMessage, \
|
UserMessage, \
|
||||||
get_user_profile_by_email, split_email_to_domain, get_realm, \
|
get_user_profile_by_email, split_email_to_domain, get_realm, \
|
||||||
get_stream, get_client, RealmFilter
|
get_stream, get_client, RealmFilter
|
||||||
from zerver.decorator import \
|
from zerver.decorator import \
|
||||||
|
@ -27,8 +23,8 @@ from zerver.lib.actions import \
|
||||||
do_set_muted_topics, \
|
do_set_muted_topics, \
|
||||||
do_remove_alert_words, do_remove_subscription, do_add_realm_filter, \
|
do_remove_alert_words, do_remove_subscription, do_add_realm_filter, \
|
||||||
do_remove_realm_filter, do_change_full_name, create_stream_if_needed, \
|
do_remove_realm_filter, do_change_full_name, create_stream_if_needed, \
|
||||||
do_add_subscription, do_add_realm_emoji, \
|
do_add_realm_emoji, \
|
||||||
do_remove_realm_emoji, set_default_streams, \
|
do_remove_realm_emoji, \
|
||||||
get_emails_from_user_ids, do_deactivate_user, do_reactivate_user, \
|
get_emails_from_user_ids, do_deactivate_user, do_reactivate_user, \
|
||||||
do_change_is_admin, do_rename_stream, do_change_stream_description, \
|
do_change_is_admin, do_rename_stream, do_change_stream_description, \
|
||||||
do_set_realm_name, get_realm_name, do_deactivate_realm
|
do_set_realm_name, get_realm_name, do_deactivate_realm
|
||||||
|
@ -36,8 +32,6 @@ from zerver.lib import bugdown
|
||||||
from zerver.lib.event_queue import allocate_client_descriptor
|
from zerver.lib.event_queue import allocate_client_descriptor
|
||||||
from zerver.lib.alert_words import alert_words_in_realm, user_alert_words, \
|
from zerver.lib.alert_words import alert_words_in_realm, user_alert_words, \
|
||||||
add_user_alert_words, remove_user_alert_words
|
add_user_alert_words, remove_user_alert_words
|
||||||
from zerver.lib.digest import send_digest_email
|
|
||||||
from zerver.lib.notifications import enqueue_welcome_emails, one_click_unsubscribe_link
|
|
||||||
from zerver.lib.validator import check_string, check_list, check_dict, \
|
from zerver.lib.validator import check_string, check_list, check_dict, \
|
||||||
check_bool, check_int
|
check_bool, check_int
|
||||||
from zerver.middleware import is_slow_query
|
from zerver.middleware import is_slow_query
|
||||||
|
@ -47,11 +41,9 @@ from zerver.worker import queue_processors
|
||||||
from django.conf import settings
|
from django.conf import settings
|
||||||
import datetime
|
import datetime
|
||||||
import os
|
import os
|
||||||
import re
|
|
||||||
import sys
|
import sys
|
||||||
import time
|
import time
|
||||||
import ujson
|
import ujson
|
||||||
from urlparse import urlparse
|
|
||||||
|
|
||||||
from collections import OrderedDict
|
from collections import OrderedDict
|
||||||
|
|
||||||
|
@ -461,217 +453,6 @@ class UserProfileTest(TestCase):
|
||||||
self.assertEqual(dct[hamlet.id], 'hamlet@zulip.com')
|
self.assertEqual(dct[hamlet.id], 'hamlet@zulip.com')
|
||||||
self.assertEqual(dct[othello.id], 'othello@zulip.com')
|
self.assertEqual(dct[othello.id], 'othello@zulip.com')
|
||||||
|
|
||||||
class PublicURLTest(TestCase):
|
|
||||||
"""
|
|
||||||
Account creation URLs are accessible even when not logged in. Authenticated
|
|
||||||
URLs redirect to a page.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def fetch(self, method, urls, expected_status):
|
|
||||||
for url in urls:
|
|
||||||
if method == "get":
|
|
||||||
response = self.client.get(url)
|
|
||||||
else:
|
|
||||||
response = self.client.post(url)
|
|
||||||
self.assertEqual(response.status_code, expected_status,
|
|
||||||
msg="Expected %d, received %d for %s to %s" % (
|
|
||||||
expected_status, response.status_code, method, url))
|
|
||||||
|
|
||||||
def test_public_urls(self):
|
|
||||||
"""
|
|
||||||
Test which views are accessible when not logged in.
|
|
||||||
"""
|
|
||||||
# FIXME: We should also test the Tornado URLs -- this codepath
|
|
||||||
# can't do so because this Django test mechanism doesn't go
|
|
||||||
# through Tornado.
|
|
||||||
get_urls = {200: ["/accounts/home/", "/accounts/login/"],
|
|
||||||
302: ["/"],
|
|
||||||
401: ["/api/v1/streams/Denmark/members",
|
|
||||||
"/api/v1/users/me/subscriptions",
|
|
||||||
"/api/v1/messages",
|
|
||||||
],
|
|
||||||
}
|
|
||||||
post_urls = {200: ["/accounts/login/"],
|
|
||||||
302: ["/accounts/logout/"],
|
|
||||||
401: ["/json/get_public_streams",
|
|
||||||
"/json/get_old_messages",
|
|
||||||
"/json/update_pointer",
|
|
||||||
"/json/send_message",
|
|
||||||
"/json/invite_users",
|
|
||||||
"/json/settings/change",
|
|
||||||
"/json/subscriptions/remove",
|
|
||||||
"/json/subscriptions/exists",
|
|
||||||
"/json/subscriptions/add",
|
|
||||||
"/json/subscriptions/property",
|
|
||||||
"/json/get_subscribers",
|
|
||||||
"/json/fetch_api_key",
|
|
||||||
"/api/v1/users/me/subscriptions",
|
|
||||||
],
|
|
||||||
400: ["/api/v1/send_message",
|
|
||||||
"/api/v1/external/github",
|
|
||||||
"/api/v1/fetch_api_key",
|
|
||||||
],
|
|
||||||
}
|
|
||||||
for status_code, url_set in get_urls.iteritems():
|
|
||||||
self.fetch("get", url_set, status_code)
|
|
||||||
for status_code, url_set in post_urls.iteritems():
|
|
||||||
self.fetch("post", url_set, status_code)
|
|
||||||
|
|
||||||
class LoginTest(AuthedTestCase):
|
|
||||||
"""
|
|
||||||
Logging in, registration, and logging out.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def test_login(self):
|
|
||||||
self.login("hamlet@zulip.com")
|
|
||||||
user_profile = get_user_profile_by_email('hamlet@zulip.com')
|
|
||||||
self.assertEqual(self.client.session['_auth_user_id'], user_profile.id)
|
|
||||||
|
|
||||||
def test_login_bad_password(self):
|
|
||||||
self.login("hamlet@zulip.com", "wrongpassword")
|
|
||||||
self.assertIsNone(self.client.session.get('_auth_user_id', None))
|
|
||||||
|
|
||||||
def test_login_nonexist_user(self):
|
|
||||||
result = self.login("xxx@zulip.com", "xxx")
|
|
||||||
self.assertIn("Please enter a correct email and password", result.content)
|
|
||||||
|
|
||||||
def test_register(self):
|
|
||||||
realm = Realm.objects.get(domain="zulip.com")
|
|
||||||
streams = ["stream_%s" % i for i in xrange(40)]
|
|
||||||
for stream in streams:
|
|
||||||
create_stream_if_needed(realm, stream)
|
|
||||||
|
|
||||||
set_default_streams(realm, streams)
|
|
||||||
with queries_captured() as queries:
|
|
||||||
self.register("test", "test")
|
|
||||||
# Ensure the number of queries we make is not O(streams)
|
|
||||||
self.assert_length(queries, 59)
|
|
||||||
user_profile = get_user_profile_by_email('test@zulip.com')
|
|
||||||
self.assertEqual(self.client.session['_auth_user_id'], user_profile.id)
|
|
||||||
|
|
||||||
def test_register_deactivated(self):
|
|
||||||
"""
|
|
||||||
If you try to register for a deactivated realm, you get a clear error
|
|
||||||
page.
|
|
||||||
"""
|
|
||||||
realm = Realm.objects.get(domain="zulip.com")
|
|
||||||
realm.deactivated = True
|
|
||||||
realm.save(update_fields=["deactivated"])
|
|
||||||
|
|
||||||
result = self.register("test", "test")
|
|
||||||
self.assertIn("has been deactivated", result.content.replace("\n", " "))
|
|
||||||
|
|
||||||
with self.assertRaises(UserProfile.DoesNotExist):
|
|
||||||
get_user_profile_by_email('test@zulip.com')
|
|
||||||
|
|
||||||
def test_login_deactivated(self):
|
|
||||||
"""
|
|
||||||
If you try to log in to a deactivated realm, you get a clear error page.
|
|
||||||
"""
|
|
||||||
realm = Realm.objects.get(domain="zulip.com")
|
|
||||||
realm.deactivated = True
|
|
||||||
realm.save(update_fields=["deactivated"])
|
|
||||||
|
|
||||||
result = self.login("hamlet@zulip.com")
|
|
||||||
self.assertIn("has been deactivated", result.content.replace("\n", " "))
|
|
||||||
|
|
||||||
def test_logout(self):
|
|
||||||
self.login("hamlet@zulip.com")
|
|
||||||
self.client.post('/accounts/logout/')
|
|
||||||
self.assertIsNone(self.client.session.get('_auth_user_id', None))
|
|
||||||
|
|
||||||
def test_non_ascii_login(self):
|
|
||||||
"""
|
|
||||||
You can log in even if your password contain non-ASCII characters.
|
|
||||||
"""
|
|
||||||
email = "test@zulip.com"
|
|
||||||
password = u"hümbüǵ"
|
|
||||||
|
|
||||||
# Registering succeeds.
|
|
||||||
self.register("test", password)
|
|
||||||
user_profile = get_user_profile_by_email(email)
|
|
||||||
self.assertEqual(self.client.session['_auth_user_id'], user_profile.id)
|
|
||||||
self.client.post('/accounts/logout/')
|
|
||||||
self.assertIsNone(self.client.session.get('_auth_user_id', None))
|
|
||||||
|
|
||||||
# Logging in succeeds.
|
|
||||||
self.client.post('/accounts/logout/')
|
|
||||||
self.login(email, password)
|
|
||||||
self.assertEqual(self.client.session['_auth_user_id'], user_profile.id)
|
|
||||||
|
|
||||||
def test_register_first_user_with_invites(self):
|
|
||||||
"""
|
|
||||||
The first user in a realm has a special step in their signup workflow
|
|
||||||
for inviting coworkers. Do as realistic an end-to-end test as we can
|
|
||||||
without Tornado running.
|
|
||||||
"""
|
|
||||||
username = "user1"
|
|
||||||
password = "test"
|
|
||||||
domain = "test.com"
|
|
||||||
email = "user1@test.com"
|
|
||||||
|
|
||||||
# Create a new realm to ensure that we're the first user in it.
|
|
||||||
realm = Realm.objects.create(domain=domain, name="Test Inc.")
|
|
||||||
deployment = Deployment.objects.all().first()
|
|
||||||
deployment.realms.add(realm)
|
|
||||||
deployment.save()
|
|
||||||
|
|
||||||
# Start the signup process by supplying an email address.
|
|
||||||
result = self.client.post('/accounts/home/', {'email': email})
|
|
||||||
|
|
||||||
# Check the redirect telling you to check your mail for a confirmation
|
|
||||||
# link.
|
|
||||||
self.assertEquals(result.status_code, 302)
|
|
||||||
self.assertTrue(result["Location"].endswith(
|
|
||||||
"/accounts/send_confirm/%s%%40%s" % (username, domain)))
|
|
||||||
result = self.client.get(result["Location"])
|
|
||||||
self.assertIn("Check your email so we can get started.", result.content)
|
|
||||||
|
|
||||||
# Visit the confirmation link.
|
|
||||||
from django.core.mail import outbox
|
|
||||||
for message in reversed(outbox):
|
|
||||||
if email in message.to:
|
|
||||||
confirmation_link_pattern = re.compile("example.com(\S+)>")
|
|
||||||
confirmation_url = confirmation_link_pattern.search(
|
|
||||||
message.body).groups()[0]
|
|
||||||
break
|
|
||||||
else:
|
|
||||||
raise ValueError("Couldn't find a confirmation email.")
|
|
||||||
|
|
||||||
result = self.client.get(confirmation_url)
|
|
||||||
self.assertEquals(result.status_code, 200)
|
|
||||||
|
|
||||||
# Pick a password and agree to the ToS.
|
|
||||||
result = self.submit_reg_form_for_user(username, password, domain)
|
|
||||||
self.assertEquals(result.status_code, 302)
|
|
||||||
self.assertTrue(result["Location"].endswith("/invite/"))
|
|
||||||
|
|
||||||
# Invite coworkers to join you.
|
|
||||||
result = self.client.get(result["Location"])
|
|
||||||
self.assertIn("You're the first one here!", result.content)
|
|
||||||
|
|
||||||
# Reset the outbox for our invites.
|
|
||||||
outbox.pop()
|
|
||||||
|
|
||||||
invitees = ['alice@' + domain, 'bob@' + domain]
|
|
||||||
params = {
|
|
||||||
'invitee_emails': ujson.dumps(invitees)
|
|
||||||
}
|
|
||||||
result = self.client.post('/json/bulk_invite_users', params)
|
|
||||||
self.assert_json_success(result)
|
|
||||||
|
|
||||||
# We really did email these users, and they have PreregistrationUser
|
|
||||||
# objects.
|
|
||||||
email_recipients = [message.recipients()[0] for message in outbox]
|
|
||||||
self.assertEqual(len(outbox), len(invitees))
|
|
||||||
self.assertItemsEqual(email_recipients, invitees)
|
|
||||||
|
|
||||||
user_profile = get_user_profile_by_email(email)
|
|
||||||
self.assertEqual(len(invitees), PreregistrationUser.objects.filter(
|
|
||||||
referred_by=user_profile).count())
|
|
||||||
|
|
||||||
# After this we start manipulating browser information, so stop here.
|
|
||||||
|
|
||||||
class UserChangesTest(AuthedTestCase):
|
class UserChangesTest(AuthedTestCase):
|
||||||
def test_update_api_key(self):
|
def test_update_api_key(self):
|
||||||
email = "hamlet@zulip.com"
|
email = "hamlet@zulip.com"
|
||||||
|
@ -923,183 +704,6 @@ class PointerTest(AuthedTestCase):
|
||||||
self.assert_json_error(result, "Bad value for 'pointer': -2")
|
self.assert_json_error(result, "Bad value for 'pointer': -2")
|
||||||
self.assertEqual(get_user_profile_by_email("hamlet@zulip.com").pointer, -1)
|
self.assertEqual(get_user_profile_by_email("hamlet@zulip.com").pointer, -1)
|
||||||
|
|
||||||
class InviteUserTest(AuthedTestCase):
|
|
||||||
|
|
||||||
def invite(self, users, streams):
|
|
||||||
"""
|
|
||||||
Invites the specified users to Zulip with the specified streams.
|
|
||||||
|
|
||||||
users should be a string containing the users to invite, comma or
|
|
||||||
newline separated.
|
|
||||||
|
|
||||||
streams should be a list of strings.
|
|
||||||
"""
|
|
||||||
|
|
||||||
return self.client.post("/json/invite_users",
|
|
||||||
{"invitee_emails": users,
|
|
||||||
"stream": streams})
|
|
||||||
|
|
||||||
def check_sent_emails(self, correct_recipients):
|
|
||||||
from django.core.mail import outbox
|
|
||||||
self.assertEqual(len(outbox), len(correct_recipients))
|
|
||||||
email_recipients = [email.recipients()[0] for email in outbox]
|
|
||||||
self.assertItemsEqual(email_recipients, correct_recipients)
|
|
||||||
|
|
||||||
def test_bulk_invite_users(self):
|
|
||||||
# The bulk_invite_users code path is for the first user in a realm.
|
|
||||||
self.login('hamlet@zulip.com')
|
|
||||||
invitees = ['alice@zulip.com', 'bob@zulip.com']
|
|
||||||
params = {
|
|
||||||
'invitee_emails': ujson.dumps(invitees)
|
|
||||||
}
|
|
||||||
result = self.client.post('/json/bulk_invite_users', params)
|
|
||||||
self.assert_json_success(result)
|
|
||||||
self.check_sent_emails(invitees)
|
|
||||||
|
|
||||||
def test_successful_invite_user(self):
|
|
||||||
"""
|
|
||||||
A call to /json/invite_users with valid parameters causes an invitation
|
|
||||||
email to be sent.
|
|
||||||
"""
|
|
||||||
self.login("hamlet@zulip.com")
|
|
||||||
invitee = "alice-test@zulip.com"
|
|
||||||
self.assert_json_success(self.invite(invitee, ["Denmark"]))
|
|
||||||
self.assertTrue(find_key_by_email(invitee))
|
|
||||||
self.check_sent_emails([invitee])
|
|
||||||
|
|
||||||
def test_multi_user_invite(self):
|
|
||||||
"""
|
|
||||||
Invites multiple users with a variety of delimiters.
|
|
||||||
"""
|
|
||||||
self.login("hamlet@zulip.com")
|
|
||||||
# Intentionally use a weird string.
|
|
||||||
self.assert_json_success(self.invite(
|
|
||||||
"""bob-test@zulip.com, carol-test@zulip.com,
|
|
||||||
dave-test@zulip.com
|
|
||||||
|
|
||||||
|
|
||||||
earl-test@zulip.com""", ["Denmark"]))
|
|
||||||
for user in ("bob", "carol", "dave", "earl"):
|
|
||||||
self.assertTrue(find_key_by_email("%s-test@zulip.com" % user))
|
|
||||||
self.check_sent_emails(["bob-test@zulip.com", "carol-test@zulip.com",
|
|
||||||
"dave-test@zulip.com", "earl-test@zulip.com"])
|
|
||||||
|
|
||||||
def test_missing_or_invalid_params(self):
|
|
||||||
"""
|
|
||||||
Tests inviting with various missing or invalid parameters.
|
|
||||||
"""
|
|
||||||
self.login("hamlet@zulip.com")
|
|
||||||
self.assert_json_error(
|
|
||||||
self.client.post("/json/invite_users", {"invitee_emails": "foo@zulip.com"}),
|
|
||||||
"You must specify at least one stream for invitees to join.")
|
|
||||||
|
|
||||||
for address in ("noatsign.com", "outsideyourdomain@example.net"):
|
|
||||||
self.assert_json_error(
|
|
||||||
self.invite(address, ["Denmark"]),
|
|
||||||
"Some emails did not validate, so we didn't send any invitations.")
|
|
||||||
self.check_sent_emails([])
|
|
||||||
|
|
||||||
def test_invalid_stream(self):
|
|
||||||
"""
|
|
||||||
Tests inviting to a non-existent stream.
|
|
||||||
"""
|
|
||||||
self.login("hamlet@zulip.com")
|
|
||||||
self.assert_json_error(self.invite("iago-test@zulip.com", ["NotARealStream"]),
|
|
||||||
"Stream does not exist: NotARealStream. No invites were sent.")
|
|
||||||
self.check_sent_emails([])
|
|
||||||
|
|
||||||
def test_invite_existing_user(self):
|
|
||||||
"""
|
|
||||||
If you invite an address already using Zulip, no invitation is sent.
|
|
||||||
"""
|
|
||||||
self.login("hamlet@zulip.com")
|
|
||||||
self.assert_json_error(
|
|
||||||
self.client.post("/json/invite_users",
|
|
||||||
{"invitee_emails": "hamlet@zulip.com",
|
|
||||||
"stream": ["Denmark"]}),
|
|
||||||
"We weren't able to invite anyone.")
|
|
||||||
self.assertRaises(PreregistrationUser.DoesNotExist,
|
|
||||||
lambda: PreregistrationUser.objects.get(
|
|
||||||
email="hamlet@zulip.com"))
|
|
||||||
self.check_sent_emails([])
|
|
||||||
|
|
||||||
def test_invite_some_existing_some_new(self):
|
|
||||||
"""
|
|
||||||
If you invite a mix of already existing and new users, invitations are
|
|
||||||
only sent to the new users.
|
|
||||||
"""
|
|
||||||
self.login("hamlet@zulip.com")
|
|
||||||
existing = ["hamlet@zulip.com", "othello@zulip.com"]
|
|
||||||
new = ["foo-test@zulip.com", "bar-test@zulip.com"]
|
|
||||||
|
|
||||||
result = self.client.post("/json/invite_users",
|
|
||||||
{"invitee_emails": "\n".join(existing + new),
|
|
||||||
"stream": ["Denmark"]})
|
|
||||||
self.assert_json_error(result,
|
|
||||||
"Some of those addresses are already using Zulip, \
|
|
||||||
so we didn't send them an invitation. We did send invitations to everyone else!")
|
|
||||||
|
|
||||||
# We only created accounts for the new users.
|
|
||||||
for email in existing:
|
|
||||||
self.assertRaises(PreregistrationUser.DoesNotExist,
|
|
||||||
lambda: PreregistrationUser.objects.get(
|
|
||||||
email=email))
|
|
||||||
for email in new:
|
|
||||||
self.assertTrue(PreregistrationUser.objects.get(email=email))
|
|
||||||
|
|
||||||
# We only sent emails to the new users.
|
|
||||||
self.check_sent_emails(new)
|
|
||||||
|
|
||||||
def test_invite_outside_domain_in_closed_realm(self):
|
|
||||||
"""
|
|
||||||
In a realm with `restricted_to_domain = True`, you can't invite people
|
|
||||||
with a different domain from that of the realm or your e-mail address.
|
|
||||||
"""
|
|
||||||
zulip_realm = Realm.objects.get(domain="zulip.com")
|
|
||||||
zulip_realm.restricted_to_domain = True
|
|
||||||
zulip_realm.save()
|
|
||||||
|
|
||||||
self.login("hamlet@zulip.com")
|
|
||||||
external_address = "foo@example.com"
|
|
||||||
|
|
||||||
self.assert_json_error(
|
|
||||||
self.invite(external_address, ["Denmark"]),
|
|
||||||
"Some emails did not validate, so we didn't send any invitations.")
|
|
||||||
|
|
||||||
@slow(0.20, 'inviting is slow')
|
|
||||||
def test_invite_outside_domain_in_open_realm(self):
|
|
||||||
"""
|
|
||||||
In a realm with `restricted_to_domain = False`, you can invite people
|
|
||||||
with a different domain from that of the realm or your e-mail address.
|
|
||||||
"""
|
|
||||||
zulip_realm = Realm.objects.get(domain="zulip.com")
|
|
||||||
zulip_realm.restricted_to_domain = False
|
|
||||||
zulip_realm.save()
|
|
||||||
|
|
||||||
self.login("hamlet@zulip.com")
|
|
||||||
external_address = "foo@example.com"
|
|
||||||
|
|
||||||
self.assert_json_success(self.invite(external_address, ["Denmark"]))
|
|
||||||
self.check_sent_emails([external_address])
|
|
||||||
|
|
||||||
def test_invite_with_non_ascii_streams(self):
|
|
||||||
"""
|
|
||||||
Inviting someone to streams with non-ASCII characters succeeds.
|
|
||||||
"""
|
|
||||||
self.login("hamlet@zulip.com")
|
|
||||||
invitee = "alice-test@zulip.com"
|
|
||||||
|
|
||||||
stream_name = u"hümbüǵ"
|
|
||||||
realm = Realm.objects.get(domain="zulip.com")
|
|
||||||
stream, _ = create_stream_if_needed(realm, stream_name)
|
|
||||||
|
|
||||||
# Make sure we're subscribed before inviting someone.
|
|
||||||
do_add_subscription(
|
|
||||||
get_user_profile_by_email("hamlet@zulip.com"),
|
|
||||||
stream, no_log=True)
|
|
||||||
|
|
||||||
self.assert_json_success(self.invite(invitee, [stream_name]))
|
|
||||||
|
|
||||||
class ChangeSettingsTest(AuthedTestCase):
|
class ChangeSettingsTest(AuthedTestCase):
|
||||||
|
|
||||||
def post_with_params(self, modified_params):
|
def post_with_params(self, modified_params):
|
||||||
|
@ -2319,74 +1923,3 @@ class MutedTopicsTests(AuthedTestCase):
|
||||||
user = get_user_profile_by_email(email)
|
user = get_user_profile_by_email(email)
|
||||||
self.assertEqual(ujson.loads(user.muted_topics), [["stream2", "topic2"]])
|
self.assertEqual(ujson.loads(user.muted_topics), [["stream2", "topic2"]])
|
||||||
|
|
||||||
class EmailUnsubscribeTests(AuthedTestCase):
|
|
||||||
def test_missedmessage_unsubscribe(self):
|
|
||||||
"""
|
|
||||||
We provide one-click unsubscribe links in missed message
|
|
||||||
e-mails that you can click even when logged out to update your
|
|
||||||
email notification settings.
|
|
||||||
"""
|
|
||||||
user_profile = get_user_profile_by_email("hamlet@zulip.com")
|
|
||||||
user_profile.enable_offline_email_notifications = True
|
|
||||||
user_profile.save()
|
|
||||||
|
|
||||||
unsubscribe_link = one_click_unsubscribe_link(user_profile,
|
|
||||||
"missed_messages")
|
|
||||||
result = self.client.get(urlparse(unsubscribe_link).path)
|
|
||||||
|
|
||||||
self.assertEqual(result.status_code, 200)
|
|
||||||
# Circumvent user_profile caching.
|
|
||||||
user_profile = UserProfile.objects.get(email="hamlet@zulip.com")
|
|
||||||
self.assertFalse(user_profile.enable_offline_email_notifications)
|
|
||||||
|
|
||||||
def test_welcome_unsubscribe(self):
|
|
||||||
"""
|
|
||||||
We provide one-click unsubscribe links in welcome e-mails that you can
|
|
||||||
click even when logged out to stop receiving them.
|
|
||||||
"""
|
|
||||||
email = "hamlet@zulip.com"
|
|
||||||
user_profile = get_user_profile_by_email("hamlet@zulip.com")
|
|
||||||
|
|
||||||
# Simulate a new user signing up, which enqueues 2 welcome e-mails.
|
|
||||||
enqueue_welcome_emails(email, "King Hamlet")
|
|
||||||
self.assertEqual(2, len(ScheduledJob.objects.filter(
|
|
||||||
type=ScheduledJob.EMAIL, filter_string__iexact=email)))
|
|
||||||
|
|
||||||
# Simulate unsubscribing from the welcome e-mails.
|
|
||||||
unsubscribe_link = one_click_unsubscribe_link(user_profile, "welcome")
|
|
||||||
result = self.client.get(urlparse(unsubscribe_link).path)
|
|
||||||
|
|
||||||
# The welcome email jobs are no longer scheduled.
|
|
||||||
self.assertEqual(result.status_code, 200)
|
|
||||||
self.assertEqual(0, len(ScheduledJob.objects.filter(
|
|
||||||
type=ScheduledJob.EMAIL, filter_string__iexact=email)))
|
|
||||||
|
|
||||||
def test_digest_unsubscribe(self):
|
|
||||||
"""
|
|
||||||
We provide one-click unsubscribe links in digest e-mails that you can
|
|
||||||
click even when logged out to stop receiving them.
|
|
||||||
|
|
||||||
Unsubscribing from these emails also dequeues any digest email jobs that
|
|
||||||
have been queued.
|
|
||||||
"""
|
|
||||||
email = "hamlet@zulip.com"
|
|
||||||
user_profile = get_user_profile_by_email("hamlet@zulip.com")
|
|
||||||
self.assertTrue(user_profile.enable_digest_emails)
|
|
||||||
|
|
||||||
# Enqueue a fake digest email.
|
|
||||||
send_digest_email(user_profile, "", "")
|
|
||||||
self.assertEqual(1, len(ScheduledJob.objects.filter(
|
|
||||||
type=ScheduledJob.EMAIL, filter_string__iexact=email)))
|
|
||||||
|
|
||||||
# Simulate unsubscribing from digest e-mails.
|
|
||||||
unsubscribe_link = one_click_unsubscribe_link(user_profile, "digest")
|
|
||||||
result = self.client.get(urlparse(unsubscribe_link).path)
|
|
||||||
|
|
||||||
# The setting is toggled off, and scheduled jobs have been removed.
|
|
||||||
self.assertEqual(result.status_code, 200)
|
|
||||||
# Circumvent user_profile caching.
|
|
||||||
user_profile = UserProfile.objects.get(email="hamlet@zulip.com")
|
|
||||||
self.assertFalse(user_profile.enable_digest_emails)
|
|
||||||
self.assertEqual(0, len(ScheduledJob.objects.filter(
|
|
||||||
type=ScheduledJob.EMAIL, filter_string__iexact=email)))
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue