2016-04-21 18:34:54 +02:00
|
|
|
# -*- coding: utf-8 -*-
|
|
|
|
from django.conf import settings
|
2016-09-13 21:30:18 +02:00
|
|
|
from django.http import HttpResponse
|
2016-04-21 18:34:54 +02:00
|
|
|
from django.test import TestCase
|
|
|
|
from django_auth_ldap.backend import _LDAPUser
|
2016-07-25 11:24:36 +02:00
|
|
|
from django.test.client import RequestFactory
|
2016-06-04 20:28:02 +02:00
|
|
|
from typing import Any, Callable, Dict
|
2016-04-21 18:34:54 +02:00
|
|
|
|
|
|
|
import mock
|
2016-09-13 21:30:18 +02:00
|
|
|
import re
|
2016-04-21 18:34:54 +02:00
|
|
|
|
|
|
|
from zerver.lib.actions import do_deactivate_realm, do_deactivate_user, \
|
|
|
|
do_reactivate_realm, do_reactivate_user
|
2016-04-21 21:07:43 +02:00
|
|
|
from zerver.lib.initial_password import initial_password
|
2016-09-13 21:30:18 +02:00
|
|
|
from zerver.lib.session_user import get_session_dict_user
|
2016-04-21 18:34:54 +02:00
|
|
|
from zerver.lib.test_helpers import (
|
2016-08-23 02:08:42 +02:00
|
|
|
ZulipTestCase
|
2016-04-21 18:34:54 +02:00
|
|
|
)
|
|
|
|
from zerver.models import \
|
2016-07-30 00:48:40 +02:00
|
|
|
get_realm, get_user_profile_by_email, email_to_username, UserProfile
|
2016-04-21 18:34:54 +02:00
|
|
|
|
|
|
|
from zproject.backends import ZulipDummyBackend, EmailAuthBackend, \
|
|
|
|
GoogleMobileOauth2Backend, ZulipRemoteUserBackend, ZulipLDAPAuthBackend, \
|
2016-07-29 21:47:14 +02:00
|
|
|
ZulipLDAPUserPopulator, DevAuthBackend, GitHubAuthBackend
|
2016-07-25 11:24:36 +02:00
|
|
|
|
|
|
|
from social.strategies.django_strategy import DjangoStrategy
|
|
|
|
from social.storage.django_orm import BaseDjangoStorage
|
2016-08-02 11:07:45 +02:00
|
|
|
from social.backends.github import GithubOrganizationOAuth2, GithubTeamOAuth2, \
|
|
|
|
GithubOAuth2
|
2016-04-21 18:34:54 +02:00
|
|
|
|
2016-06-04 20:28:02 +02:00
|
|
|
from six import text_type
|
2016-09-13 21:30:18 +02:00
|
|
|
from six.moves import urllib
|
2016-04-21 18:34:54 +02:00
|
|
|
import ujson
|
|
|
|
|
|
|
|
class AuthBackendTest(TestCase):
|
2016-06-03 01:43:28 +02:00
|
|
|
def verify_backend(self, backend, good_args=None,
|
|
|
|
good_kwargs=None, bad_kwargs=None,
|
2016-04-21 18:34:54 +02:00
|
|
|
email_to_username=None):
|
2016-06-04 20:28:02 +02:00
|
|
|
# type: (Any, List[Any], Dict[str, Any], Dict[str, Any], Callable[[text_type], text_type]) -> None
|
2016-06-03 01:43:28 +02:00
|
|
|
if good_args is None:
|
|
|
|
good_args = []
|
|
|
|
if good_kwargs is None:
|
|
|
|
good_kwargs = {}
|
2016-06-04 20:28:02 +02:00
|
|
|
email = u"hamlet@zulip.com"
|
2016-04-21 18:34:54 +02:00
|
|
|
user_profile = get_user_profile_by_email(email)
|
|
|
|
|
|
|
|
username = email
|
|
|
|
if email_to_username is not None:
|
|
|
|
username = email_to_username(email)
|
|
|
|
|
|
|
|
# If bad_kwargs was specified, verify auth fails in that case
|
|
|
|
if bad_kwargs is not None:
|
|
|
|
self.assertIsNone(backend.authenticate(username, **bad_kwargs))
|
|
|
|
|
|
|
|
# Verify auth works
|
|
|
|
result = backend.authenticate(username, *good_args, **good_kwargs)
|
|
|
|
self.assertEqual(user_profile, result)
|
|
|
|
|
|
|
|
# Verify auth fails with a deactivated user
|
|
|
|
do_deactivate_user(user_profile)
|
|
|
|
self.assertIsNone(backend.authenticate(username, *good_args, **good_kwargs))
|
|
|
|
|
|
|
|
# Reactivate the user and verify auth works again
|
|
|
|
do_reactivate_user(user_profile)
|
|
|
|
result = backend.authenticate(username, *good_args, **good_kwargs)
|
|
|
|
self.assertEqual(user_profile, result)
|
|
|
|
|
|
|
|
# Verify auth fails with a deactivated realm
|
|
|
|
do_deactivate_realm(user_profile.realm)
|
|
|
|
self.assertIsNone(backend.authenticate(username, *good_args, **good_kwargs))
|
|
|
|
|
|
|
|
# Verify auth works again after reactivating the realm
|
|
|
|
do_reactivate_realm(user_profile.realm)
|
|
|
|
result = backend.authenticate(username, *good_args, **good_kwargs)
|
|
|
|
self.assertEqual(user_profile, result)
|
|
|
|
|
|
|
|
def test_dummy_backend(self):
|
2016-06-04 20:28:02 +02:00
|
|
|
# type: () -> None
|
2016-04-21 18:34:54 +02:00
|
|
|
self.verify_backend(ZulipDummyBackend(),
|
|
|
|
good_kwargs=dict(use_dummy_backend=True),
|
|
|
|
bad_kwargs=dict(use_dummy_backend=False))
|
|
|
|
|
|
|
|
def test_email_auth_backend(self):
|
2016-06-04 20:28:02 +02:00
|
|
|
# type: () -> None
|
2016-04-21 18:34:54 +02:00
|
|
|
email = "hamlet@zulip.com"
|
|
|
|
user_profile = get_user_profile_by_email(email)
|
|
|
|
password = "testpassword"
|
|
|
|
user_profile.set_password(password)
|
|
|
|
user_profile.save()
|
|
|
|
self.verify_backend(EmailAuthBackend(),
|
|
|
|
bad_kwargs=dict(password=''),
|
|
|
|
good_kwargs=dict(password=password))
|
|
|
|
|
|
|
|
def test_email_auth_backend_disabled_password_auth(self):
|
2016-06-04 20:28:02 +02:00
|
|
|
# type: () -> None
|
2016-08-08 12:19:08 +02:00
|
|
|
email = u"hamlet@zulip.com"
|
2016-04-21 18:34:54 +02:00
|
|
|
user_profile = get_user_profile_by_email(email)
|
|
|
|
password = "testpassword"
|
|
|
|
user_profile.set_password(password)
|
|
|
|
user_profile.save()
|
|
|
|
# Verify if a realm has password auth disabled, correct password is rejected
|
|
|
|
with mock.patch('zproject.backends.password_auth_enabled', return_value=False):
|
2016-08-08 12:19:35 +02:00
|
|
|
self.assertIsNone(EmailAuthBackend().authenticate(email, password))
|
2016-04-21 18:34:54 +02:00
|
|
|
|
|
|
|
def test_google_backend(self):
|
2016-06-04 20:28:02 +02:00
|
|
|
# type: () -> None
|
2016-04-21 18:34:54 +02:00
|
|
|
email = "hamlet@zulip.com"
|
|
|
|
backend = GoogleMobileOauth2Backend()
|
|
|
|
payload = dict(email_verified=True,
|
|
|
|
email=email)
|
|
|
|
with mock.patch('apiclient.sample_tools.client.verify_id_token', return_value=payload):
|
|
|
|
self.verify_backend(backend)
|
|
|
|
|
|
|
|
# Verify valid_attestation parameter is set correctly
|
|
|
|
unverified_payload = dict(email_verified=False)
|
|
|
|
with mock.patch('apiclient.sample_tools.client.verify_id_token', return_value=unverified_payload):
|
2016-06-03 08:01:15 +02:00
|
|
|
ret = dict() # type: Dict[str, str]
|
2016-04-21 18:34:54 +02:00
|
|
|
result = backend.authenticate(return_data=ret)
|
|
|
|
self.assertIsNone(result)
|
|
|
|
self.assertFalse(ret["valid_attestation"])
|
|
|
|
|
|
|
|
nonexistent_user_payload = dict(email_verified=True, email="invalid@zulip.com")
|
|
|
|
with mock.patch('apiclient.sample_tools.client.verify_id_token',
|
|
|
|
return_value=nonexistent_user_payload):
|
|
|
|
ret = dict()
|
|
|
|
result = backend.authenticate(return_data=ret)
|
|
|
|
self.assertIsNone(result)
|
|
|
|
self.assertTrue(ret["valid_attestation"])
|
|
|
|
|
|
|
|
def test_ldap_backend(self):
|
2016-06-04 20:28:02 +02:00
|
|
|
# type: () -> None
|
2016-04-21 18:34:54 +02:00
|
|
|
email = "hamlet@zulip.com"
|
|
|
|
password = "test_password"
|
|
|
|
backend = ZulipLDAPAuthBackend()
|
|
|
|
|
|
|
|
# Test LDAP auth fails when LDAP server rejects password
|
|
|
|
with mock.patch('django_auth_ldap.backend._LDAPUser._authenticate_user_dn', \
|
2016-05-10 20:46:30 +02:00
|
|
|
side_effect=_LDAPUser.AuthenticationFailed("Failed")), \
|
|
|
|
mock.patch('django_auth_ldap.backend._LDAPUser._check_requirements'), \
|
2016-04-21 18:34:54 +02:00
|
|
|
mock.patch('django_auth_ldap.backend._LDAPUser._get_user_attrs',
|
2016-05-10 20:46:30 +02:00
|
|
|
return_value=dict(full_name=['Hamlet'])):
|
2016-04-21 18:34:54 +02:00
|
|
|
self.assertIsNone(backend.authenticate(email, password))
|
|
|
|
|
|
|
|
# For this backend, we mock the internals of django_auth_ldap
|
2016-05-10 20:46:30 +02:00
|
|
|
with mock.patch('django_auth_ldap.backend._LDAPUser._authenticate_user_dn'), \
|
|
|
|
mock.patch('django_auth_ldap.backend._LDAPUser._check_requirements'), \
|
2016-04-21 18:34:54 +02:00
|
|
|
mock.patch('django_auth_ldap.backend._LDAPUser._get_user_attrs',
|
2016-05-10 20:46:30 +02:00
|
|
|
return_value=dict(full_name=['Hamlet'])):
|
2016-06-04 20:28:02 +02:00
|
|
|
self.verify_backend(backend, good_kwargs=dict(password=password))
|
2016-04-21 18:34:54 +02:00
|
|
|
|
|
|
|
def test_devauth_backend(self):
|
2016-06-04 20:28:02 +02:00
|
|
|
# type: () -> None
|
2016-04-21 18:34:54 +02:00
|
|
|
self.verify_backend(DevAuthBackend())
|
|
|
|
|
|
|
|
def test_remote_user_backend(self):
|
2016-06-04 20:28:02 +02:00
|
|
|
# type: () -> None
|
2016-04-21 18:34:54 +02:00
|
|
|
self.verify_backend(ZulipRemoteUserBackend())
|
|
|
|
|
|
|
|
def test_remote_user_backend_sso_append_domain(self):
|
2016-06-04 20:28:02 +02:00
|
|
|
# type: () -> None
|
2016-04-21 18:34:54 +02:00
|
|
|
with self.settings(SSO_APPEND_DOMAIN='zulip.com'):
|
|
|
|
self.verify_backend(ZulipRemoteUserBackend(),
|
|
|
|
email_to_username=email_to_username)
|
2016-04-21 21:07:43 +02:00
|
|
|
|
2016-07-25 11:24:36 +02:00
|
|
|
def test_github_backend(self):
|
2016-07-30 00:48:40 +02:00
|
|
|
# type: () -> None
|
2016-07-25 11:24:36 +02:00
|
|
|
email = 'hamlet@zulip.com'
|
|
|
|
good_kwargs = dict(response=dict(email=email), return_data=dict())
|
|
|
|
bad_kwargs = dict() # type: Dict[str, str]
|
2016-07-29 21:47:14 +02:00
|
|
|
self.verify_backend(GitHubAuthBackend(),
|
2016-07-25 11:24:36 +02:00
|
|
|
good_kwargs=good_kwargs,
|
|
|
|
bad_kwargs=bad_kwargs)
|
|
|
|
|
2016-08-23 02:08:42 +02:00
|
|
|
class GitHubAuthBackendTest(ZulipTestCase):
|
2016-07-25 11:24:36 +02:00
|
|
|
def setUp(self):
|
2016-07-30 00:48:40 +02:00
|
|
|
# type: () -> None
|
2016-07-25 11:24:36 +02:00
|
|
|
self.email = 'hamlet@zulip.com'
|
|
|
|
self.name = 'Hamlet'
|
2016-07-29 21:47:14 +02:00
|
|
|
self.backend = GitHubAuthBackend()
|
2016-07-25 11:24:36 +02:00
|
|
|
self.backend.strategy = DjangoStrategy(storage=BaseDjangoStorage())
|
|
|
|
self.user_profile = get_user_profile_by_email(self.email)
|
|
|
|
self.user_profile.backend = self.backend
|
|
|
|
|
|
|
|
def test_github_backend_do_auth(self):
|
2016-07-30 00:48:40 +02:00
|
|
|
# type: () -> None
|
2016-08-02 11:22:55 +02:00
|
|
|
def do_auth(*args, **kwargs):
|
2016-07-30 00:48:40 +02:00
|
|
|
# type: (*Any, **Any) -> UserProfile
|
2016-07-25 11:24:36 +02:00
|
|
|
return self.user_profile
|
|
|
|
|
|
|
|
with mock.patch('zerver.views.login_or_register_remote_user') as result, \
|
|
|
|
mock.patch('social.backends.github.GithubOAuth2.do_auth',
|
|
|
|
side_effect=do_auth):
|
|
|
|
response=dict(email=self.email, name=self.name)
|
|
|
|
self.backend.do_auth(response=response)
|
|
|
|
result.assert_called_with(None, self.email, self.user_profile,
|
|
|
|
self.name)
|
|
|
|
|
2016-08-02 11:07:45 +02:00
|
|
|
def test_github_backend_do_auth_for_default(self):
|
2016-08-08 10:57:34 +02:00
|
|
|
# type: () -> None
|
2016-08-02 11:07:45 +02:00
|
|
|
def authenticate(*args, **kwargs):
|
2016-08-08 10:57:34 +02:00
|
|
|
# type: (*Any, **Any) -> None
|
2016-08-02 11:07:45 +02:00
|
|
|
assert isinstance(kwargs['backend'], GithubOAuth2) == True
|
|
|
|
|
|
|
|
with mock.patch('social.backends.github.GithubOAuth2.user_data',
|
|
|
|
return_value=dict()), \
|
|
|
|
mock.patch('zproject.backends.SocialAuthMixin.process_do_auth'), \
|
|
|
|
mock.patch('social.strategies.django_strategy.'
|
|
|
|
'DjangoStrategy.authenticate', side_effect=authenticate):
|
|
|
|
response=dict(email=self.email, name=self.name)
|
|
|
|
self.backend.do_auth('fake-access-token', response=response)
|
|
|
|
|
|
|
|
def test_github_backend_do_auth_for_team(self):
|
2016-08-08 10:57:34 +02:00
|
|
|
# type: () -> None
|
2016-08-02 11:07:45 +02:00
|
|
|
def authenticate(*args, **kwargs):
|
2016-08-08 10:57:34 +02:00
|
|
|
# type: (*Any, **Any) -> None
|
2016-08-02 11:07:45 +02:00
|
|
|
assert isinstance(kwargs['backend'], GithubTeamOAuth2) == True
|
|
|
|
|
|
|
|
with mock.patch('social.backends.github.GithubTeamOAuth2.user_data',
|
|
|
|
return_value=dict()), \
|
|
|
|
mock.patch('zproject.backends.SocialAuthMixin.process_do_auth'), \
|
|
|
|
mock.patch('social.strategies.django_strategy.'
|
|
|
|
'DjangoStrategy.authenticate', side_effect=authenticate):
|
|
|
|
response=dict(email=self.email, name=self.name)
|
|
|
|
with self.settings(SOCIAL_AUTH_GITHUB_TEAM_ID='zulip-webapp'):
|
|
|
|
self.backend.do_auth('fake-access-token', response=response)
|
|
|
|
|
|
|
|
def test_github_backend_do_auth_for_org(self):
|
2016-08-08 10:57:34 +02:00
|
|
|
# type: () -> None
|
2016-08-02 11:07:45 +02:00
|
|
|
def authenticate(*args, **kwargs):
|
2016-08-08 10:57:34 +02:00
|
|
|
# type: (*Any, **Any) -> None
|
2016-08-02 11:07:45 +02:00
|
|
|
assert isinstance(kwargs['backend'], GithubOrganizationOAuth2) == True
|
|
|
|
|
|
|
|
with mock.patch('social.backends.github.GithubOrganizationOAuth2.user_data',
|
|
|
|
return_value=dict()), \
|
|
|
|
mock.patch('zproject.backends.SocialAuthMixin.process_do_auth'), \
|
|
|
|
mock.patch('social.strategies.django_strategy.'
|
|
|
|
'DjangoStrategy.authenticate', side_effect=authenticate):
|
|
|
|
response=dict(email=self.email, name=self.name)
|
|
|
|
with self.settings(SOCIAL_AUTH_GITHUB_ORG_NAME='Zulip'):
|
|
|
|
self.backend.do_auth('fake-access-token', response=response)
|
|
|
|
|
2016-07-25 11:24:36 +02:00
|
|
|
def test_github_backend_inactive_user(self):
|
2016-07-30 00:48:40 +02:00
|
|
|
# type: () -> None
|
2016-08-02 11:22:55 +02:00
|
|
|
def do_auth_inactive(*args, **kwargs):
|
2016-07-30 00:48:40 +02:00
|
|
|
# type: (*Any, **Any) -> UserProfile
|
2016-08-02 11:22:55 +02:00
|
|
|
return_data = kwargs['return_data']
|
2016-07-25 11:24:36 +02:00
|
|
|
return_data['inactive_user'] = True
|
|
|
|
return self.user_profile
|
|
|
|
|
|
|
|
with mock.patch('zerver.views.login_or_register_remote_user') as result, \
|
|
|
|
mock.patch('social.backends.github.GithubOAuth2.do_auth',
|
|
|
|
side_effect=do_auth_inactive):
|
|
|
|
response=dict(email=self.email, name=self.name)
|
|
|
|
user = self.backend.do_auth(response=response)
|
|
|
|
result.assert_not_called()
|
|
|
|
self.assertIs(user, None)
|
|
|
|
|
|
|
|
def test_github_backend_new_user(self):
|
2016-07-30 00:48:40 +02:00
|
|
|
# type: () -> None
|
2016-07-25 11:24:36 +02:00
|
|
|
rf = RequestFactory()
|
|
|
|
request = rf.get('/complete')
|
|
|
|
request.session = {}
|
|
|
|
request.user = self.user_profile
|
|
|
|
self.backend.strategy.request = request
|
|
|
|
|
2016-08-02 11:22:55 +02:00
|
|
|
def do_auth(*args, **kwargs):
|
2016-07-30 00:48:40 +02:00
|
|
|
# type: (*Any, **Any) -> UserProfile
|
2016-08-02 11:22:55 +02:00
|
|
|
return_data = kwargs['return_data']
|
2016-07-25 11:24:36 +02:00
|
|
|
return_data['valid_attestation'] = True
|
|
|
|
return None
|
|
|
|
|
|
|
|
with mock.patch('social.backends.github.GithubOAuth2.do_auth',
|
|
|
|
side_effect=do_auth):
|
|
|
|
response=dict(email='nonexisting@phantom.com', name='Ghost')
|
|
|
|
result = self.backend.do_auth(response=response)
|
|
|
|
self.assert_in_response('action="/register/"', result)
|
|
|
|
self.assert_in_response('Your e-mail does not match any '
|
|
|
|
'existing open organization.', result)
|
|
|
|
|
2016-09-13 21:30:18 +02:00
|
|
|
class ResponseMock(object):
|
|
|
|
def __init__(self, status_code, data):
|
|
|
|
# type: (int, Any) -> None
|
|
|
|
self.status_code = status_code
|
|
|
|
self.data = data
|
|
|
|
|
|
|
|
def json(self):
|
|
|
|
# type: () -> str
|
|
|
|
return self.data
|
|
|
|
|
|
|
|
@property
|
|
|
|
def text(self):
|
|
|
|
# type: () -> str
|
|
|
|
return "Response text"
|
|
|
|
|
|
|
|
class GoogleLoginTest(ZulipTestCase):
|
|
|
|
def google_oauth2_test(self, token_response, account_response):
|
|
|
|
# type: (ResponseMock, ResponseMock) -> HttpResponse
|
|
|
|
result = self.client_get("/accounts/login/google/")
|
|
|
|
self.assertEquals(result.status_code, 302)
|
|
|
|
# Now extract the CSRF token from the redirect URL
|
|
|
|
csrf_state = urllib.parse.parse_qs(result.url)['state']
|
|
|
|
|
|
|
|
with mock.patch("requests.post", return_value=token_response), \
|
|
|
|
mock.patch("requests.get", return_value=account_response):
|
|
|
|
result = self.client_get("/accounts/login/google/done/",
|
|
|
|
dict(state=csrf_state))
|
|
|
|
return result
|
|
|
|
|
|
|
|
def test_google_oauth2_success(self):
|
|
|
|
# type: () -> None
|
|
|
|
token_response = ResponseMock(200, {'access_token': "unique_token"})
|
|
|
|
account_data = dict(name=dict(formatted="Full Name"),
|
|
|
|
emails=[dict(type="account",
|
|
|
|
value="hamlet@zulip.com")])
|
|
|
|
account_response = ResponseMock(200, account_data)
|
|
|
|
self.google_oauth2_test(token_response, account_response)
|
|
|
|
|
|
|
|
user_profile = get_user_profile_by_email('hamlet@zulip.com')
|
|
|
|
self.assertEqual(get_session_dict_user(self.client.session), user_profile.id)
|
|
|
|
|
|
|
|
def test_google_oauth2_registration(self):
|
|
|
|
# type: () -> None
|
|
|
|
"""If the user doesn't exist yet, Google auth can be used to register an account"""
|
|
|
|
email = "newuser@zulip.com"
|
|
|
|
token_response = ResponseMock(200, {'access_token': "unique_token"})
|
|
|
|
account_data = dict(name=dict(formatted="Full Name"),
|
|
|
|
emails=[dict(type="account",
|
|
|
|
value=email)])
|
|
|
|
account_response = ResponseMock(200, account_data)
|
|
|
|
result = self.google_oauth2_test(token_response, account_response)
|
|
|
|
self.assertEqual(result.status_code, 302)
|
|
|
|
|
|
|
|
result = self.client_get(result.url)
|
|
|
|
key_match = re.search('value="(?P<key>[0-9a-f]+)" name="key"', result.content.decode("utf-8"))
|
|
|
|
name_match = re.search('value="(?P<name>[^"]+)" name="full_name"', result.content.decode("utf-8"))
|
|
|
|
|
|
|
|
# This goes through a brief stop on a page that auto-submits via JS
|
|
|
|
result = self.client_post('/accounts/register/',
|
|
|
|
{'full_name': name_match.group("name"),
|
|
|
|
'key': key_match.group("key"),
|
|
|
|
'from_confirmation': "1"})
|
|
|
|
self.assertEquals(result.status_code, 200)
|
|
|
|
result = self.client_post('/accounts/register/',
|
|
|
|
{'full_name': "New User",
|
|
|
|
'password': 'test_password',
|
|
|
|
'key': key_match.group("key"),
|
|
|
|
'terms': True})
|
|
|
|
self.assertEquals(result.status_code, 302)
|
|
|
|
self.assertEquals(result.url, "http://testserver/")
|
|
|
|
|
|
|
|
def test_google_oauth2_400_token_response(self):
|
|
|
|
# type: () -> None
|
|
|
|
token_response = ResponseMock(400, {})
|
|
|
|
with mock.patch("logging.warning") as m:
|
|
|
|
result = self.google_oauth2_test(token_response, None)
|
|
|
|
self.assertEquals(result.status_code, 400)
|
|
|
|
self.assertEquals(m.call_args_list[0][0][0],
|
|
|
|
"User error converting Google oauth2 login to token: 'Response text'")
|
|
|
|
|
|
|
|
def test_google_oauth2_500_token_response(self):
|
|
|
|
# type: () -> None
|
|
|
|
token_response = ResponseMock(500, {})
|
|
|
|
with mock.patch("logging.error") as m:
|
|
|
|
result = self.google_oauth2_test(token_response, None)
|
|
|
|
self.assertEquals(result.status_code, 400)
|
|
|
|
self.assertEquals(m.call_args_list[0][0][0],
|
|
|
|
"Could not convert google oauth2 code to access_token: 'Response text'")
|
|
|
|
|
|
|
|
def test_google_oauth2_400_account_response(self):
|
|
|
|
# type: () -> None
|
|
|
|
token_response = ResponseMock(200, {'access_token': "unique_token"})
|
|
|
|
account_response = ResponseMock(400, {})
|
|
|
|
with mock.patch("logging.warning") as m:
|
|
|
|
result = self.google_oauth2_test(token_response, account_response)
|
|
|
|
self.assertEquals(result.status_code, 400)
|
|
|
|
self.assertEquals(m.call_args_list[0][0][0],
|
|
|
|
"Google login failed making info API call: 'Response text'")
|
|
|
|
|
|
|
|
def test_google_oauth2_500_account_response(self):
|
|
|
|
# type: () -> None
|
|
|
|
token_response = ResponseMock(200, {'access_token': "unique_token"})
|
|
|
|
account_response = ResponseMock(500, {})
|
|
|
|
with mock.patch("logging.error") as m:
|
|
|
|
result = self.google_oauth2_test(token_response, account_response)
|
|
|
|
self.assertEquals(result.status_code, 400)
|
|
|
|
self.assertEquals(m.call_args_list[0][0][0],
|
|
|
|
"Google login failed making API call: 'Response text'")
|
|
|
|
|
|
|
|
def test_google_oauth2_no_fullname(self):
|
|
|
|
# type: () -> None
|
|
|
|
token_response = ResponseMock(200, {'access_token': "unique_token"})
|
|
|
|
account_data = dict(name=dict(givenName="Test", familyName="User"),
|
|
|
|
emails=[dict(type="account",
|
|
|
|
value="hamlet@zulip.com")])
|
|
|
|
account_response = ResponseMock(200, account_data)
|
|
|
|
self.google_oauth2_test(token_response, account_response)
|
|
|
|
|
|
|
|
user_profile = get_user_profile_by_email('hamlet@zulip.com')
|
|
|
|
self.assertEqual(get_session_dict_user(self.client.session), user_profile.id)
|
|
|
|
|
|
|
|
def test_google_oauth2_account_response_no_email(self):
|
|
|
|
# type: () -> None
|
|
|
|
token_response = ResponseMock(200, {'access_token': "unique_token"})
|
|
|
|
account_data = dict(name=dict(formatted="Full Name"),
|
|
|
|
emails=[])
|
|
|
|
account_response = ResponseMock(200, account_data)
|
|
|
|
with mock.patch("logging.error") as m:
|
|
|
|
result = self.google_oauth2_test(token_response, account_response)
|
|
|
|
self.assertEquals(result.status_code, 400)
|
|
|
|
self.assertEquals(m.call_args_list[0][0][0],
|
|
|
|
"Google oauth2 account email not found: {'emails': [], 'name': {'formatted': 'Full Name'}}")
|
|
|
|
|
|
|
|
def test_google_oauth2_error_access_denied(self):
|
|
|
|
# type: () -> None
|
|
|
|
result = self.client_get("/accounts/login/google/done/?error=access_denied")
|
|
|
|
self.assertEquals(result.status_code, 302)
|
|
|
|
self.assertEquals(result.url, "http://testserver/")
|
|
|
|
|
|
|
|
def test_google_oauth2_error_other(self):
|
|
|
|
# type: () -> None
|
|
|
|
with mock.patch("logging.warning") as m:
|
|
|
|
result = self.client_get("/accounts/login/google/done/?error=some_other_error")
|
|
|
|
self.assertEquals(result.status_code, 400)
|
|
|
|
self.assertEquals(m.call_args_list[0][0][0],
|
|
|
|
"Error from google oauth2 login: u'some_other_error'")
|
|
|
|
|
|
|
|
def test_google_oauth2_missing_csrf(self):
|
|
|
|
# type: () -> None
|
|
|
|
with mock.patch("logging.warning") as m:
|
|
|
|
result = self.client_get("/accounts/login/google/done/")
|
|
|
|
self.assertEquals(result.status_code, 400)
|
|
|
|
self.assertEquals(m.call_args_list[0][0][0],
|
|
|
|
'Missing Google oauth2 CSRF state')
|
|
|
|
|
|
|
|
def test_google_oauth2_csrf_malformed(self):
|
|
|
|
# type: () -> None
|
|
|
|
with mock.patch("logging.warning") as m:
|
|
|
|
result = self.client_get("/accounts/login/google/done/?state=badstate")
|
|
|
|
self.assertEquals(result.status_code, 400)
|
|
|
|
self.assertEquals(m.call_args_list[0][0][0],
|
|
|
|
'Missing Google oauth2 CSRF state')
|
|
|
|
|
|
|
|
def test_google_oauth2_csrf_badstate(self):
|
|
|
|
# type: () -> None
|
|
|
|
with mock.patch("logging.warning") as m:
|
|
|
|
result = self.client_get("/accounts/login/google/done/?state=badstate:otherbadstate")
|
|
|
|
self.assertEquals(result.status_code, 400)
|
|
|
|
self.assertEquals(m.call_args_list[0][0][0],
|
|
|
|
'Google oauth2 CSRF error')
|
|
|
|
|
2016-08-23 02:08:42 +02:00
|
|
|
class FetchAPIKeyTest(ZulipTestCase):
|
2016-04-21 21:07:43 +02:00
|
|
|
def setUp(self):
|
2016-06-04 20:28:02 +02:00
|
|
|
# type: () -> None
|
2016-04-21 21:07:43 +02:00
|
|
|
self.email = "hamlet@zulip.com"
|
|
|
|
self.user_profile = get_user_profile_by_email(self.email)
|
|
|
|
|
|
|
|
def test_success(self):
|
2016-06-04 20:28:02 +02:00
|
|
|
# type: () -> None
|
2016-07-28 00:30:22 +02:00
|
|
|
result = self.client_post("/api/v1/fetch_api_key",
|
2016-04-21 21:07:43 +02:00
|
|
|
dict(username=self.email,
|
|
|
|
password=initial_password(self.email)))
|
|
|
|
self.assert_json_success(result)
|
|
|
|
|
|
|
|
def test_wrong_password(self):
|
2016-06-04 20:28:02 +02:00
|
|
|
# type: () -> None
|
2016-07-28 00:30:22 +02:00
|
|
|
result = self.client_post("/api/v1/fetch_api_key",
|
2016-04-21 21:07:43 +02:00
|
|
|
dict(username=self.email,
|
|
|
|
password="wrong"))
|
|
|
|
self.assert_json_error(result, "Your username or password is incorrect.", 403)
|
|
|
|
|
|
|
|
def test_password_auth_disabled(self):
|
2016-06-04 20:28:02 +02:00
|
|
|
# type: () -> None
|
2016-04-21 21:07:43 +02:00
|
|
|
with mock.patch('zproject.backends.password_auth_enabled', return_value=False):
|
2016-07-28 00:30:22 +02:00
|
|
|
result = self.client_post("/api/v1/fetch_api_key",
|
2016-04-21 21:07:43 +02:00
|
|
|
dict(username=self.email,
|
|
|
|
password=initial_password(self.email)))
|
|
|
|
self.assert_json_error_contains(result, "Password auth is disabled", 403)
|
|
|
|
|
|
|
|
def test_inactive_user(self):
|
2016-06-04 20:28:02 +02:00
|
|
|
# type: () -> None
|
2016-04-21 21:07:43 +02:00
|
|
|
do_deactivate_user(self.user_profile)
|
2016-07-28 00:30:22 +02:00
|
|
|
result = self.client_post("/api/v1/fetch_api_key",
|
2016-04-21 21:07:43 +02:00
|
|
|
dict(username=self.email,
|
|
|
|
password=initial_password(self.email)))
|
|
|
|
self.assert_json_error_contains(result, "Your account has been disabled", 403)
|
|
|
|
|
|
|
|
def test_deactivated_realm(self):
|
2016-06-04 20:28:02 +02:00
|
|
|
# type: () -> None
|
2016-04-21 21:07:43 +02:00
|
|
|
do_deactivate_realm(self.user_profile.realm)
|
2016-07-28 00:30:22 +02:00
|
|
|
result = self.client_post("/api/v1/fetch_api_key",
|
2016-04-21 21:07:43 +02:00
|
|
|
dict(username=self.email,
|
|
|
|
password=initial_password(self.email)))
|
|
|
|
self.assert_json_error_contains(result, "Your realm has been deactivated", 403)
|
2016-06-01 02:28:27 +02:00
|
|
|
|
2016-08-23 02:08:42 +02:00
|
|
|
class DevFetchAPIKeyTest(ZulipTestCase):
|
2016-06-01 02:28:27 +02:00
|
|
|
def setUp(self):
|
|
|
|
# type: () -> None
|
|
|
|
self.email = "hamlet@zulip.com"
|
|
|
|
self.user_profile = get_user_profile_by_email(self.email)
|
|
|
|
|
|
|
|
def test_success(self):
|
|
|
|
# type: () -> None
|
2016-07-28 00:30:22 +02:00
|
|
|
result = self.client_post("/api/v1/dev_fetch_api_key",
|
2016-06-01 02:28:27 +02:00
|
|
|
dict(username=self.email))
|
|
|
|
self.assert_json_success(result)
|
|
|
|
data = ujson.loads(result.content)
|
|
|
|
self.assertEqual(data["email"], self.email)
|
|
|
|
self.assertEqual(data['api_key'], self.user_profile.api_key)
|
|
|
|
|
|
|
|
def test_inactive_user(self):
|
|
|
|
# type: () -> None
|
|
|
|
do_deactivate_user(self.user_profile)
|
2016-07-28 00:30:22 +02:00
|
|
|
result = self.client_post("/api/v1/dev_fetch_api_key",
|
2016-06-01 02:28:27 +02:00
|
|
|
dict(username=self.email))
|
|
|
|
self.assert_json_error_contains(result, "Your account has been disabled", 403)
|
|
|
|
|
|
|
|
def test_deactivated_realm(self):
|
|
|
|
# type: () -> None
|
|
|
|
do_deactivate_realm(self.user_profile.realm)
|
2016-07-28 00:30:22 +02:00
|
|
|
result = self.client_post("/api/v1/dev_fetch_api_key",
|
2016-06-01 02:28:27 +02:00
|
|
|
dict(username=self.email))
|
|
|
|
self.assert_json_error_contains(result, "Your realm has been deactivated", 403)
|
|
|
|
|
|
|
|
def test_dev_auth_disabled(self):
|
|
|
|
# type: () -> None
|
|
|
|
with mock.patch('zerver.views.dev_auth_enabled', return_value=False):
|
2016-07-28 00:30:22 +02:00
|
|
|
result = self.client_post("/api/v1/dev_fetch_api_key",
|
2016-06-01 02:28:27 +02:00
|
|
|
dict(username=self.email))
|
|
|
|
self.assert_json_error_contains(result, "Dev environment not enabled.", 400)
|
2016-06-01 02:28:43 +02:00
|
|
|
|
2016-08-23 02:08:42 +02:00
|
|
|
class DevGetEmailsTest(ZulipTestCase):
|
2016-06-01 02:28:43 +02:00
|
|
|
def test_success(self):
|
|
|
|
# type: () -> None
|
2016-07-28 00:38:45 +02:00
|
|
|
result = self.client_get("/api/v1/dev_get_emails")
|
2016-06-01 02:28:43 +02:00
|
|
|
self.assert_json_success(result)
|
2016-07-12 15:41:45 +02:00
|
|
|
self.assert_in_response("direct_admins", result)
|
|
|
|
self.assert_in_response("direct_users", result)
|
2016-06-01 02:28:43 +02:00
|
|
|
|
|
|
|
def test_dev_auth_disabled(self):
|
|
|
|
# type: () -> None
|
|
|
|
with mock.patch('zerver.views.dev_auth_enabled', return_value=False):
|
2016-07-28 00:38:45 +02:00
|
|
|
result = self.client_get("/api/v1/dev_get_emails")
|
2016-06-01 02:28:43 +02:00
|
|
|
self.assert_json_error_contains(result, "Dev environment not enabled.", 400)
|
2016-06-21 03:32:23 +02:00
|
|
|
|
2016-08-23 02:08:42 +02:00
|
|
|
class FetchAuthBackends(ZulipTestCase):
|
2016-06-21 03:32:23 +02:00
|
|
|
def test_fetch_auth_backend_format(self):
|
|
|
|
# type: () -> None
|
2016-07-28 00:38:45 +02:00
|
|
|
result = self.client_get("/api/v1/get_auth_backends")
|
2016-06-21 03:32:23 +02:00
|
|
|
self.assert_json_success(result)
|
|
|
|
data = ujson.loads(result.content)
|
|
|
|
self.assertEqual(set(data.keys()),
|
|
|
|
{'msg', 'password', 'google', 'dev', 'result'})
|
|
|
|
for backend in set(data.keys()) - {'msg', 'result'}:
|
|
|
|
self.assertTrue(isinstance(data[backend], bool))
|
|
|
|
|
|
|
|
def test_fetch_auth_backend(self):
|
|
|
|
# type: () -> None
|
|
|
|
backends = [GoogleMobileOauth2Backend(), DevAuthBackend()]
|
|
|
|
with mock.patch('django.contrib.auth.get_backends', return_value=backends):
|
2016-07-28 00:38:45 +02:00
|
|
|
result = self.client_get("/api/v1/get_auth_backends")
|
2016-06-21 03:32:23 +02:00
|
|
|
self.assert_json_success(result)
|
|
|
|
data = ujson.loads(result.content)
|
|
|
|
self.assertEqual(data, {
|
|
|
|
'msg': '',
|
|
|
|
'password': False,
|
|
|
|
'google': True,
|
|
|
|
'dev': True,
|
|
|
|
'result': 'success',
|
|
|
|
})
|