auth: Rewrite our social auth integration to use pipeline.

This new implementation model is a lot cleaner and should extend
better to the non-oauth backend supported by python-social-auth (since
we're not relying on monkey-patching `do_auth` in the OAuth backend
base class).
This commit is contained in:
Tim Abbott 2018-05-30 15:12:39 -07:00
parent 84d3a4dbf3
commit 35c4a9f1d2
3 changed files with 351 additions and 557 deletions

View File

@ -49,7 +49,7 @@ from zproject.backends import ZulipDummyBackend, EmailAuthBackend, \
GoogleMobileOauth2Backend, ZulipRemoteUserBackend, ZulipLDAPAuthBackend, \
ZulipLDAPUserPopulator, DevAuthBackend, GitHubAuthBackend, ZulipAuthMixin, \
dev_auth_enabled, password_auth_enabled, github_auth_enabled, \
require_email_format_usernames, SocialAuthMixin, AUTH_BACKEND_NAME_MAP, \
require_email_format_usernames, AUTH_BACKEND_NAME_MAP, \
ZulipLDAPConfigurationError
from zerver.views.auth import (maybe_send_to_registration,
@ -331,18 +331,51 @@ class AuthBackendTest(ZulipTestCase):
@override_settings(AUTHENTICATION_BACKENDS=('zproject.backends.GitHubAuthBackend',))
def test_github_backend(self) -> None:
user = self.example_user('hamlet')
email = user.email
good_kwargs = dict(response=dict(email=email), return_data=dict(),
realm=get_realm('zulip'))
bad_kwargs = dict(response=dict(email=email), return_data=dict(),
realm=None)
self.verify_backend(GitHubAuthBackend(),
good_kwargs=good_kwargs,
bad_kwargs=bad_kwargs)
bad_kwargs['realm'] = get_realm("zephyr")
self.verify_backend(GitHubAuthBackend(),
good_kwargs=good_kwargs,
bad_kwargs=bad_kwargs)
token_data_dict = {
'access_token': 'foobar',
'token_type': 'bearer'
}
account_data_dict = dict(email=user.email, name=user.full_name)
httpretty.enable()
httpretty.register_uri(
httpretty.POST,
"https://github.com/login/oauth/access_token",
match_querystring=False,
status=200,
body=json.dumps(token_data_dict))
httpretty.register_uri(
httpretty.GET,
"https://api.github.com/user",
status=200,
body=json.dumps(account_data_dict)
)
backend = GitHubAuthBackend()
backend.strategy = DjangoStrategy(storage=BaseDjangoStorage())
orig_authenticate = GitHubAuthBackend.authenticate
def patched_authenticate(*args: Any, **kwargs: Any) -> Any:
if 'subdomain' in kwargs:
backend.strategy.session_set("subdomain", kwargs["subdomain"])
del kwargs['subdomain']
result = orig_authenticate(backend, *args, **kwargs)
return result
backend.authenticate = patched_authenticate
good_kwargs = dict(backend=backend, strategy=backend.strategy, storage=backend.strategy.storage,
response=account_data_dict,
subdomain='zulip')
bad_kwargs = dict(subdomain='acme')
with mock.patch('zerver.views.auth.redirect_and_log_into_subdomain',
return_value=user):
self.verify_backend(backend,
good_kwargs=good_kwargs,
bad_kwargs=bad_kwargs)
bad_kwargs['subdomain'] = "zephyr"
self.verify_backend(backend,
good_kwargs=good_kwargs,
bad_kwargs=bad_kwargs)
backend.authenticate = orig_authenticate
httpretty.disable()
class ResponseMock:
def __init__(self, status_code: int, data: Any) -> None:
@ -356,14 +389,6 @@ class ResponseMock:
def text(self) -> str:
return "Response text"
class SocialAuthMixinTest(ZulipTestCase):
def test_social_auth_mixing(self) -> None:
mixin = SocialAuthMixin()
with self.assertRaises(NotImplementedError):
mixin.get_email_address()
with self.assertRaises(NotImplementedError):
mixin.get_full_name()
class GitHubAuthBackendTest(ZulipTestCase):
def setUp(self) -> None:
self.user_profile = self.example_user('hamlet')
@ -373,12 +398,13 @@ class GitHubAuthBackendTest(ZulipTestCase):
self.backend.strategy = DjangoStrategy(storage=BaseDjangoStorage())
self.user_profile.backend = self.backend
rf = RequestFactory()
request = rf.get('/complete')
request.session = {}
request.get_host = lambda: 'zulip.testserver'
request.user = self.user_profile
self.backend.strategy.request = request
# This is a workaround for the fact that Python social auth
# caches the set of authentication backends that are enabled
# the first time that `social_django.utils` is imported. See
# https://github.com/python-social-auth/social-app-django/pull/162
# for details.
from social_core.backends.utils import load_backends
load_backends(settings.AUTHENTICATION_BACKENDS, force_load=True)
def github_oauth2_test(self, token_data_dict: Dict[str, str], account_data_dict: Dict[str, str],
*, subdomain: Optional[str]=None,
@ -466,6 +492,70 @@ class GitHubAuthBackendTest(ZulipTestCase):
parsed_url.path)
self.assertTrue(uri.startswith('http://zulip.testserver/accounts/login/subdomain/'))
@override_settings(SOCIAL_AUTH_GITHUB_TEAM_ID='zulip-webapp')
def test_github_oauth2_github_team_not_member_failed(self) -> None:
token_data_dict = {
'access_token': 'foobar',
'token_type': 'bearer'
}
account_data_dict = dict(email=self.email, name=self.name)
with mock.patch('social_core.backends.github.GithubTeamOAuth2.user_data',
side_effect=AuthFailed('Not found')), \
mock.patch('logging.info') as mock_info:
result = self.github_oauth2_test(token_data_dict, account_data_dict,
subdomain='zulip')
self.assertEqual(result.status_code, 302)
self.assertEqual(result.url, "/login/")
mock_info.assert_called_once_with("GitHub user is not member of required team")
@override_settings(SOCIAL_AUTH_GITHUB_TEAM_ID='zulip-webapp')
def test_github_oauth2_github_team_member_success(self) -> None:
token_data_dict = {
'access_token': 'foobar',
'token_type': 'bearer'
}
account_data_dict = dict(email=self.email, name=self.name)
with mock.patch('social_core.backends.github.GithubTeamOAuth2.user_data',
return_value=account_data_dict):
result = self.github_oauth2_test(token_data_dict, account_data_dict,
subdomain='zulip')
data = load_subdomain_token(result)
self.assertEqual(data['email'], self.example_email("hamlet"))
self.assertEqual(data['name'], 'Hamlet')
self.assertEqual(data['subdomain'], 'zulip')
@override_settings(SOCIAL_AUTH_GITHUB_ORG_NAME='Zulip')
def test_github_oauth2_github_organization_not_member_failed(self) -> None:
token_data_dict = {
'access_token': 'foobar',
'token_type': 'bearer'
}
account_data_dict = dict(email=self.email, name=self.name)
with mock.patch('social_core.backends.github.GithubOrganizationOAuth2.user_data',
side_effect=AuthFailed('Not found')), \
mock.patch('logging.info') as mock_info:
result = self.github_oauth2_test(token_data_dict, account_data_dict,
subdomain='zulip')
self.assertEqual(result.status_code, 302)
self.assertEqual(result.url, "/login/")
mock_info.assert_called_once_with("GitHub user is not member of required organization")
@override_settings(SOCIAL_AUTH_GITHUB_ORG_NAME='Zulip')
def test_github_oauth2_github_organization_member_success(self) -> None:
token_data_dict = {
'access_token': 'foobar',
'token_type': 'bearer'
}
account_data_dict = dict(email=self.email, name=self.name)
with mock.patch('social_core.backends.github.GithubOrganizationOAuth2.user_data',
return_value=account_data_dict):
result = self.github_oauth2_test(token_data_dict, account_data_dict,
subdomain='zulip')
data = load_subdomain_token(result)
self.assertEqual(data['email'], self.example_email("hamlet"))
self.assertEqual(data['name'], 'Hamlet')
self.assertEqual(data['subdomain'], 'zulip')
def test_github_oauth2_deactivated_user(self) -> None:
user_profile = self.example_user("hamlet")
do_deactivate_user(user_profile)
@ -474,6 +564,32 @@ class GitHubAuthBackendTest(ZulipTestCase):
'token_type': 'bearer'
}
account_data_dict = dict(email=self.email, name=self.name)
result = self.github_oauth2_test(token_data_dict, account_data_dict,
subdomain='zulip')
self.assertEqual(result.status_code, 302)
self.assertEqual(result.url, "/login/")
# TODO: verify whether we provide a clear error message
def test_github_oauth2_invalid_realm(self) -> None:
token_data_dict = {
'access_token': 'foobar',
'token_type': 'bearer'
}
account_data_dict = dict(email=self.email, name=self.name)
with mock.patch('zerver.middleware.get_realm', return_value=get_realm("zulip")):
# This mock.patch case somewhat hackishly arranges it so
# that we switch realms halfway through the test
result = self.github_oauth2_test(token_data_dict, account_data_dict,
subdomain='invalid', next='/user_uploads/image')
self.assertEqual(result.status_code, 302)
self.assertEqual(result.url, "/accounts/login/?subdomain=1")
def test_github_oauth2_invalid_email(self) -> None:
token_data_dict = {
'access_token': 'foobar',
'token_type': 'bearer'
}
account_data_dict = dict(email="invalid", name=self.name)
result = self.github_oauth2_test(token_data_dict, account_data_dict,
subdomain='zulip', next='/user_uploads/image')
self.assertEqual(result.status_code, 302)
@ -537,6 +653,30 @@ class GitHubAuthBackendTest(ZulipTestCase):
self.assertEqual(len(mail.outbox), 1)
self.assertIn('Zulip on Android', mail.outbox[0].body)
def test_github_oauth2_registration_existing_account(self) -> None:
"""If the user already exists, signup flow just logs them in"""
email = "hamlet@zulip.com"
name = 'Full Name'
token_data_dict = {
'access_token': 'foobar',
'token_type': 'bearer'
}
account_data_dict = dict(email=email, name=name)
result = self.github_oauth2_test(token_data_dict, account_data_dict,
subdomain='zulip', is_signup='1')
data = load_subdomain_token(result)
self.assertEqual(data['email'], self.example_email("hamlet"))
self.assertEqual(data['name'], 'Full Name')
self.assertEqual(data['subdomain'], 'zulip')
self.assertEqual(result.status_code, 302)
parsed_url = urllib.parse.urlparse(result.url)
uri = "{}://{}{}".format(parsed_url.scheme, parsed_url.netloc,
parsed_url.path)
self.assertTrue(uri.startswith('http://zulip.testserver/accounts/login/subdomain/'))
hamlet = self.example_user("hamlet")
# Name wasn't changed at all
self.assertEqual(hamlet.full_name, "King Hamlet")
def test_github_oauth2_registration(self) -> None:
"""If the user doesn't exist yet, GitHub auth can be used to register an account"""
email = "newuser@zulip.com"
@ -615,340 +755,39 @@ class GitHubAuthBackendTest(ZulipTestCase):
self.assertEqual(result.status_code, 200)
self.assert_in_response("No account found for newuser@zulip.com.", result)
def test_github_oauth2_registration_without_is_signup_closed_realm(self) -> None:
"""If the user doesn't exist yet in closed realm, give an error"""
email = "nonexisting@phantom.com"
name = 'Full Name'
token_data_dict = {
'access_token': 'foobar',
'token_type': 'bearer'
}
account_data_dict = dict(email=email, name=name)
result = self.github_oauth2_test(token_data_dict, account_data_dict,
subdomain='zulip')
self.assertEqual(result.status_code, 302)
data = load_subdomain_token(result)
self.assertEqual(data['email'], email)
self.assertEqual(data['name'], name)
self.assertEqual(data['subdomain'], 'zulip')
self.assertEqual(result.status_code, 302)
parsed_url = urllib.parse.urlparse(result.url)
uri = "{}://{}{}".format(parsed_url.scheme, parsed_url.netloc,
parsed_url.path)
self.assertTrue(uri.startswith('http://zulip.testserver/accounts/login/subdomain/'))
result = self.client_get(result.url)
self.assertEqual(result.status_code, 200)
self.assert_in_response('action="/register/"', result)
self.assert_in_response('Your email address, {}, is not '
'in one of the domains that are allowed to register '
'for accounts in this organization.'.format(email), result)
def test_github_auth_enabled(self) -> None:
with self.settings(AUTHENTICATION_BACKENDS=('zproject.backends.GitHubAuthBackend',)):
self.assertTrue(github_auth_enabled())
def test_login_url(self) -> None:
result = self.client_get('/accounts/login/social/github')
self.assertIn(reverse('social:begin', args=['github']), result.url)
self.assertIn('is_signup=0', result.url)
def test_login_url_with_next_param(self) -> None:
result = self.client_get('/accounts/login/social/github',
{'next': "/image_path"})
self.assertIn(reverse('social:begin', args=['github']), result.url)
self.assertIn('is_signup=0', result.url)
self.assertIn('image_path', result.url)
result = self.client_get('/accounts/login/social/github',
{'next': '/#narrow/stream/7-test-here'})
self.assertIn(reverse('social:begin', args=['github']), result.url)
self.assertIn('is_signup=0', result.url)
self.assertIn('narrow', result.url)
def test_signup_url(self) -> None:
result = self.client_get('/accounts/register/social/github')
self.assertIn(reverse('social:begin', args=['github']), result.url)
self.assertIn('is_signup=1', result.url)
class GitHubAuthBackendLegacyTest(ZulipTestCase):
def setUp(self) -> None:
self.user_profile = self.example_user('hamlet')
self.email = self.user_profile.email
self.name = 'Hamlet'
self.backend = GitHubAuthBackend()
self.backend.strategy = DjangoStrategy(storage=BaseDjangoStorage())
self.user_profile.backend = self.backend
rf = RequestFactory()
request = rf.get('/complete')
request.session = {}
request.get_host = lambda: 'zulip.testserver'
request.user = self.user_profile
self.backend.strategy.request = request
def do_auth(self, *args: Any, **kwargs: Any) -> UserProfile:
with self.settings(AUTHENTICATION_BACKENDS=('zproject.backends.GitHubAuthBackend',)):
return authenticate(**kwargs)
def test_full_name_with_missing_key(self) -> None:
self.assertEqual(self.backend.get_full_name(), '')
self.assertEqual(self.backend.get_full_name(response={'name': None}), '')
def test_full_name_with_none(self) -> None:
self.assertEqual(self.backend.get_full_name(response={'email': None}), '')
def test_github_backend_do_auth_with_non_existing_subdomain(self) -> None:
with mock.patch('social_core.backends.github.GithubOAuth2.do_auth',
side_effect=self.do_auth):
self.backend.strategy.session_set('subdomain', 'test')
response = dict(email=self.email, name=self.name)
result = self.backend.do_auth(response=response)
assert(result is not None)
self.assertIn('subdomain=1', result.url)
def test_github_backend_do_auth_with_subdomains(self) -> None:
with mock.patch('social_core.backends.github.GithubOAuth2.do_auth',
side_effect=self.do_auth):
self.backend.strategy.session_set('subdomain', 'zulip')
response = dict(email=self.email, name=self.name)
result = self.backend.do_auth(response=response)
assert(result is not None)
self.assertTrue(result.url.startswith('http://zulip.testserver/accounts/login/subdomain/'))
@override_settings(SEND_LOGIN_EMAILS=True)
def test_github_backend_do_auth_mobile_otp_flow(self) -> None:
self.backend.strategy.request.META['HTTP_USER_AGENT'] = "ZulipAndroid"
mobile_flow_otp = '1234abcd' * 8
with mock.patch('social_core.backends.github.GithubOAuth2.do_auth',
side_effect=self.do_auth):
self.backend.strategy.session_set('subdomain', 'zulip')
self.backend.strategy.session_set('mobile_flow_otp', mobile_flow_otp)
response = dict(email=self.email, name=self.name)
result = self.backend.do_auth(response=response)
self.assertEqual(result.status_code, 302)
redirect_url = result['Location']
parsed_url = urllib.parse.urlparse(redirect_url)
query_params = urllib.parse.parse_qs(parsed_url.query)
self.assertEqual(parsed_url.scheme, 'zulip')
self.assertEqual(query_params["realm"], ['http://zulip.testserver'])
self.assertEqual(query_params["email"], [self.example_email("hamlet")])
encrypted_api_key = query_params["otp_encrypted_api_key"][0]
self.assertEqual(self.example_user('hamlet').api_key,
otp_decrypt_api_key(encrypted_api_key, mobile_flow_otp))
self.assertEqual(len(mail.outbox), 1)
self.assertIn('Zulip on Android', mail.outbox[0].body)
def test_github_backend_do_auth_for_default(self) -> None:
with mock.patch('social_core.backends.github.GithubOAuth2.do_auth',
side_effect=self.do_auth), \
mock.patch('zproject.backends.SocialAuthMixin.process_do_auth') as result:
self.backend.strategy.session_set('subdomain', 'zulip')
response = dict(email=self.email, name=self.name)
self.backend.do_auth('fake-access-token', response=response)
kwargs = {'realm': get_realm('zulip'),
'response': response,
'return_data': {'valid_attestation': True}}
result.assert_called_with(self.user_profile, 'fake-access-token', **kwargs)
def test_github_backend_do_auth_for_default_auth_failed(self) -> None:
with mock.patch('social_core.backends.github.GithubOAuth2.do_auth',
side_effect=AuthFailed('Not found')), \
mock.patch('logging.info'), \
mock.patch('zproject.backends.SocialAuthMixin.process_do_auth') as result:
self.backend.strategy.session_set('subdomain', 'zulip')
response = dict(email=self.email, name=self.name)
self.backend.do_auth('fake-access-token', response=response)
kwargs = {'realm': get_realm('zulip'),
'response': response,
'return_data': {}}
result.assert_called_with(None, 'fake-access-token', **kwargs)
def test_github_backend_do_auth_for_team(self) -> None:
with mock.patch('social_core.backends.github.GithubTeamOAuth2.do_auth',
side_effect=self.do_auth), \
mock.patch('zproject.backends.SocialAuthMixin.process_do_auth') as result:
self.backend.strategy.session_set('subdomain', 'zulip')
response = dict(email=self.email, name=self.name)
with self.settings(SOCIAL_AUTH_GITHUB_TEAM_ID='zulip-webapp'):
self.backend.do_auth('fake-access-token', response=response)
kwargs = {'realm': get_realm('zulip'),
'response': response,
'return_data': {'valid_attestation': True}}
result.assert_called_with(self.user_profile, 'fake-access-token', **kwargs)
def test_github_backend_do_auth_for_team_auth_failed(self) -> None:
with mock.patch('social_core.backends.github.GithubTeamOAuth2.do_auth',
side_effect=AuthFailed('Not found')), \
mock.patch('logging.info'), \
mock.patch('zproject.backends.SocialAuthMixin.process_do_auth') as result:
self.backend.strategy.session_set('subdomain', 'zulip')
response = dict(email=self.email, name=self.name)
with self.settings(SOCIAL_AUTH_GITHUB_TEAM_ID='zulip-webapp'):
self.backend.do_auth('fake-access-token', response=response)
kwargs = {'realm': get_realm('zulip'),
'response': response,
'return_data': {}}
result.assert_called_with(None, 'fake-access-token', **kwargs)
def test_github_backend_do_auth_for_org(self) -> None:
with mock.patch('social_core.backends.github.GithubOrganizationOAuth2.do_auth',
side_effect=self.do_auth), \
mock.patch('zproject.backends.SocialAuthMixin.process_do_auth') as result:
self.backend.strategy.session_set('subdomain', 'zulip')
response = dict(email=self.email, name=self.name)
with self.settings(SOCIAL_AUTH_GITHUB_ORG_NAME='Zulip'):
self.backend.do_auth('fake-access-token', response=response)
kwargs = {'realm': get_realm('zulip'),
'response': response,
'return_data': {'valid_attestation': True}}
result.assert_called_with(self.user_profile, 'fake-access-token', **kwargs)
def test_github_backend_do_auth_for_org_auth_failed(self) -> None:
with mock.patch('social_core.backends.github.GithubOrganizationOAuth2.do_auth',
side_effect=AuthFailed('Not found')), \
mock.patch('logging.info'), \
mock.patch('zproject.backends.SocialAuthMixin.process_do_auth') as result:
self.backend.strategy.session_set('subdomain', 'zulip')
response = dict(email=self.email, name=self.name)
with self.settings(SOCIAL_AUTH_GITHUB_ORG_NAME='Zulip'):
self.backend.do_auth('fake-access-token', response=response)
kwargs = {'realm': get_realm('zulip'),
'response': response,
'return_data': {}}
result.assert_called_with(None, 'fake-access-token', **kwargs)
def test_github_backend_authenticate_nonexisting_user(self) -> None:
self.backend.strategy.session_set('subdomain', 'zulip')
response = dict(email="invalid@zulip.com", name=self.name)
return_data = dict() # type: Dict[str, Any]
user = self.backend.authenticate(return_data=return_data, response=response,
realm=get_realm("zulip"))
self.assertIs(user, None)
self.assertTrue(return_data['valid_attestation'])
def test_github_backend_authenticate_invalid_email(self) -> None:
response = dict(email=None, name=self.name)
return_data = dict() # type: Dict[str, Any]
user = self.backend.authenticate(return_data=return_data, response=response,
realm=get_realm("zulip"))
self.assertIs(user, None)
self.assertTrue(return_data['invalid_email'])
result = self.backend.process_do_auth(user, return_data=return_data, response=response)
self.assertIs(result, None)
def test_github_backend_new_user_wrong_domain(self) -> None:
rf = RequestFactory()
request = rf.get('/complete')
request.session = {}
request.user = self.user_profile
self.backend.strategy.request = request
session_data = {'subdomain': 'zulip', 'is_signup': '1'}
self.backend.strategy.session_get = lambda k: session_data.get(k)
def do_auth(*args: Any, **kwargs: Any) -> None:
return_data = kwargs['return_data']
return_data['valid_attestation'] = True
return None
with mock.patch('social_core.backends.github.GithubOAuth2.do_auth',
side_effect=do_auth):
email = 'nonexisting@phantom.com'
response = dict(email=email, name='Ghost')
result = self.backend.do_auth(response=response)
self.assertEqual(result.status_code, 302)
self.assertTrue(result.url.startswith('http://zulip.testserver/accounts/login/subdomain/'))
result = self.client_get(result.url)
self.assert_in_response('action="/register/"', result)
self.assert_in_response('Your email address, {}, is not '
'in one of the domains that are allowed to register '
'for accounts in this organization.'.format(email), result)
def test_github_backend_existing_user(self) -> None:
rf = RequestFactory()
request = rf.get('/complete')
request.session = {}
request.user = self.user_profile
self.backend.strategy.request = request
session_data = {'subdomain': 'zulip', 'is_signup': '1'}
self.backend.strategy.session_get = lambda k: session_data.get(k)
def do_auth(*args: Any, **kwargs: Any) -> None:
return_data = kwargs['return_data']
return_data['valid_attestation'] = True
return None
with mock.patch('social_core.backends.github.GithubOAuth2.do_auth',
side_effect=do_auth):
email = self.example_email("hamlet")
response = dict(email=email, name='Hamlet')
result = self.backend.do_auth(response=response)
self.assertEqual(result.status_code, 302)
self.assertTrue(result.url.startswith('http://zulip.testserver/accounts/login/subdomain/'))
result = self.client_get(result.url)
self.assert_in_response('action="/register/"', result)
self.assert_in_response('hamlet@zulip.com already has an account',
result)
def test_github_backend_existing_deactivated_user(self) -> None:
rf = RequestFactory()
request = rf.get('/complete')
request.session = {}
request.user = self.user_profile
self.backend.strategy.request = request
session_data = {'subdomain': 'zulip', 'is_signup': '1'}
self.backend.strategy.session_get = lambda k: session_data.get(k)
def do_auth(*args: Any, **kwargs: Any) -> None:
return_data = kwargs['return_data']
return_data['valid_attestation'] = True
return None
do_deactivate_user(self.example_user("hamlet"))
with mock.patch('social_core.backends.github.GithubOAuth2.do_auth',
side_effect=do_auth):
email = self.example_email("hamlet")
response = dict(email=email, name='Hamlet')
result = self.backend.do_auth(response=response)
self.assertEqual(result.status_code, 302)
self.assertTrue(result.url.startswith('http://zulip.testserver/accounts/login/subdomain/'))
result = self.client_get(result.url)
self.assert_in_response('action="/register/"', result)
self.assert_in_response('The account for hamlet@zulip.com has been deactivated',
result)
def test_github_backend_realm_invalid_user_when_is_signup_is_false(self) -> None:
rf = RequestFactory()
request = rf.get('/complete')
request.session = {}
request.user = self.user_profile
self.backend.strategy.request = request
session_data = {'subdomain': 'zulip', 'is_signup': '0'}
self.backend.strategy.session_get = lambda k: session_data.get(k)
def do_auth(*args: Any, **kwargs: Any) -> None:
return_data = kwargs['return_data']
return_data['valid_attestation'] = True
return None
with mock.patch('social_core.backends.github.GithubOAuth2.do_auth',
side_effect=do_auth):
email = 'nonexisting@phantom.com'
response = dict(email=email, name='Ghost')
result = self.backend.do_auth(response=response)
self.assertEqual(result.status_code, 302)
self.assertTrue(result.url.startswith('http://zulip.testserver/accounts/login/subdomain/'))
result = self.client_get(result.url)
self.assertEqual(result.status_code, 200)
self.assert_in_response('Your email address, nonexisting@phantom.com, is not in one of the domains '
'that are allowed to register for accounts in this organization.',
result)
def test_github_complete(self) -> None:
from social_django import utils
utils.BACKENDS = ('zproject.backends.GitHubAuthBackend',)
with mock.patch('social_core.backends.oauth.BaseOAuth2.process_error',
side_effect=AuthFailed('Not found')):
result = self.client_get(reverse('social:complete', args=['github']))
self.assertEqual(result.status_code, 302)
self.assertIn('login', result.url)
utils.BACKENDS = settings.AUTHENTICATION_BACKENDS
def test_github_complete_when_base_exc_is_raised(self) -> None:
from social_django import utils
utils.BACKENDS = ('zproject.backends.GitHubAuthBackend',)
with mock.patch('social_core.backends.oauth.BaseOAuth2.auth_complete',
side_effect=AuthStateForbidden('State forbidden')), \
mock.patch('zproject.backends.logging.warning'):
result = self.client_get(reverse('social:complete', args=['github']))
self.assertEqual(result.status_code, 302)
self.assertIn('login', result.url)
utils.BACKENDS = settings.AUTHENTICATION_BACKENDS
class GoogleOAuthTest(ZulipTestCase):
def google_oauth2_test(self, token_response: ResponseMock, account_response: ResponseMock,
*, subdomain: Optional[str]=None,

View File

@ -6,10 +6,13 @@ from django_auth_ldap.backend import LDAPBackend, _LDAPUser
import django.contrib.auth
from django.contrib.auth.backends import RemoteUserBackend
from django.conf import settings
from django.core.exceptions import ValidationError
from django.core.validators import validate_email
from django.http import HttpResponse
from oauth2client.crypt import AppIdentityError
from social_core.backends.github import GithubOAuth2, GithubOrganizationOAuth2, \
GithubTeamOAuth2
from social_core.backends.base import BaseAuth
from social_core.utils import handle_http_errors
from social_core.exceptions import AuthFailed, SocialAuthBaseException
from social_django.models import DjangoStorage
@ -112,158 +115,6 @@ class ZulipAuthMixin:
except UserProfile.DoesNotExist:
return None
class SocialAuthMixin(ZulipAuthMixin):
auth_backend_name = None # type: str
def get_email_address(self, *args: Any, **kwargs: Any) -> str:
raise NotImplementedError
def get_full_name(self, *args: Any, **kwargs: Any) -> str:
raise NotImplementedError
def get_authenticated_user(self, *args: Any, **kwargs: Any) -> Optional[UserProfile]:
raise NotImplementedError
@handle_http_errors
def do_auth(self, *args: Any, **kwargs: Any) -> Optional[HttpResponse]:
"""
This function is called once the authentication workflow is complete.
We override this function to:
1. Inject `return_data` and `realm_subdomain` kwargs. These will be
used by `authenticate()` functions of backends to make the
decision.
2. Call the proper authentication function to get the user in
`get_authenticated_user`.
The actual decision on authentication is done in
SocialAuthMixin._common_authenticate().
SocialAuthMixin.get_authenticated_user is expected to be overridden by
the derived class to add custom logic for authenticating the user and
returning the user.
"""
kwargs['return_data'] = {}
subdomain = self.strategy.session_get('subdomain') # type: ignore # `strategy` comes from Python Social Auth.
realm = get_realm(subdomain)
kwargs['realm'] = realm
user_profile = self.get_authenticated_user(*args, **kwargs)
return self.process_do_auth(user_profile, *args, **kwargs)
def authenticate(self,
realm: Optional[Realm]=None,
storage: Optional[DjangoStorage]=None,
strategy: Optional[DjangoStrategy]=None,
user: Optional[Dict[str, Any]]=None,
return_data: Optional[Dict[str, Any]]=None,
response: Optional[Dict[str, Any]]=None,
backend: Optional[GithubOAuth2]=None
) -> Optional[UserProfile]:
"""
Django decides which `authenticate` to call by inspecting the
arguments. So it's better to create `authenticate` function
with well defined arguments.
Keeping this function separate so that it can easily be
overridden.
"""
if user is None:
user = {}
assert return_data is not None
assert response is not None
return self._common_authenticate(self,
realm=realm,
storage=storage,
strategy=strategy,
user=user,
return_data=return_data,
response=response,
backend=backend)
def _common_authenticate(self, *args: Any, **kwargs: Any) -> Optional[UserProfile]:
return_data = kwargs.get('return_data', {})
realm = kwargs.get("realm")
if realm is None:
return None
if not auth_enabled_helper([self.auth_backend_name], realm):
return_data["auth_backend_disabled"] = True
return None
email_address = self.get_email_address(*args, **kwargs)
if not email_address:
return_data['invalid_email'] = True
return None
return_data["valid_attestation"] = True
return common_get_active_user(email_address, realm, return_data)
def process_do_auth(self, user_profile: UserProfile, *args: Any,
**kwargs: Any) -> Optional[HttpResponse]:
# These functions need to be imported here to avoid cyclic
# dependency.
from zerver.views.auth import (login_or_register_remote_user,
redirect_to_subdomain_login_url,
redirect_and_log_into_subdomain)
return_data = kwargs.get('return_data', {})
inactive_user = return_data.get('inactive_user')
inactive_realm = return_data.get('inactive_realm')
invalid_subdomain = return_data.get('invalid_subdomain')
invalid_email = return_data.get('invalid_email')
if inactive_user or inactive_realm:
# Redirect to login page. We can't send to registration
# workflow with these errors. We will redirect to login page.
return None
if invalid_email:
# In case of invalid email, we will end up on registration page.
# This seems better than redirecting to login page.
logging.warning(
"{} got invalid email argument.".format(self.auth_backend_name)
)
return None
strategy = self.strategy # type: ignore # This comes from Python Social Auth.
request = strategy.request
email_address = self.get_email_address(*args, **kwargs)
full_name = self.get_full_name(*args, **kwargs)
is_signup = strategy.session_get('is_signup') == '1'
redirect_to = strategy.session_get('next')
mobile_flow_otp = strategy.session_get('mobile_flow_otp')
subdomain = strategy.session_get('subdomain')
assert subdomain is not None
if mobile_flow_otp is not None:
return login_or_register_remote_user(request, email_address,
user_profile, full_name,
invalid_subdomain=bool(invalid_subdomain),
mobile_flow_otp=mobile_flow_otp,
is_signup=is_signup,
redirect_to=redirect_to)
realm = get_realm(subdomain)
if realm is None:
return redirect_to_subdomain_login_url()
return redirect_and_log_into_subdomain(realm, full_name, email_address,
is_signup=is_signup,
redirect_to=redirect_to)
def auth_complete(self, *args: Any, **kwargs: Any) -> Optional[HttpResponse]:
"""
Returning `None` from this function will redirect the browser
to the login page.
"""
try:
# Call the auth_complete method of social_core.backends.oauth.BaseOAuth2
return super().auth_complete(*args, **kwargs) # type: ignore # monkey-patching
except AuthFailed:
return None
except SocialAuthBaseException as e:
logging.warning(str(e))
return None
class ZulipDummyBackend(ZulipAuthMixin):
"""
Used when we want to log you in without checking any
@ -509,67 +360,165 @@ class DevAuthBackend(ZulipAuthMixin):
return None
return common_get_active_user(dev_auth_username, realm, return_data=return_data)
class GitHubAuthBackend(SocialAuthMixin, GithubOAuth2):
def social_associate_user_helper(backend: BaseAuth, return_data: Dict[str, Any],
*args: Any, **kwargs: Any) -> Optional[UserProfile]:
"""Responsible for doing the Zulip-account lookup and validation parts
of the Zulip Social auth pipeline (similar to the authenticate()
methods in most other auth backends in this file).
"""
subdomain = backend.strategy.session_get('subdomain')
realm = get_realm(subdomain)
if realm is None:
return_data["invalid_realm"] = True
return None
if not auth_enabled_helper([backend.auth_backend_name], realm):
return_data["auth_backend_disabled"] = True
return None
if 'auth_failed_reason' in kwargs.get('response', {}):
return_data["social_auth_failed_reason"] = kwargs['response']["auth_failed_reason"]
return None
elif hasattr(backend, 'get_validated_email'):
# Some social backends, like GitHubAuthBackend, don't guarantee that
# the `details` data is validated.
validated_email = backend.get_validated_email(*args, **kwargs)
else: # nocoverage
# This code path isn't used by GitHubAuthBackend
validated_email = kwargs["details"].get("email")
if not validated_email: # nocoverage
# This code path isn't used with GitHubAuthBackend, but may be relevant for other
# social auth backends.
return_data['invalid_email'] = True
return None
try:
validate_email(validated_email)
except ValidationError:
return_data['invalid_email'] = True
return None
return_data["valid_attestation"] = True
return_data['validated_email'] = validated_email
user_profile = common_get_active_user(validated_email, realm, return_data)
if 'fullname' in kwargs["details"]:
return_data["full_name"] = kwargs["details"]["fullname"]
else:
# If we add support for any of the social auth backends that
# don't provide this feature, we'll need to add code here.
raise AssertionError("Social auth backend doesn't provide fullname")
return user_profile
def social_auth_associate_user(
backend: BaseAuth,
*args: Any,
**kwargs: Any) -> Dict[str, Any]:
return_data = {} # type: Dict[str, Any]
user_profile = social_associate_user_helper(
backend, return_data, *args, **kwargs)
return {'user_profile': user_profile,
'return_data': return_data}
def social_auth_finish(backend: Any,
details: Dict[str, Any],
response: HttpResponse,
*args: Any,
**kwargs: Any) -> Optional[UserProfile]:
from zerver.views.auth import (login_or_register_remote_user,
redirect_and_log_into_subdomain)
user_profile = kwargs['user_profile']
return_data = kwargs['return_data']
auth_backend_disabled = return_data.get('auth_backend_disabled')
inactive_user = return_data.get('inactive_user')
inactive_realm = return_data.get('inactive_realm')
invalid_realm = return_data.get('invalid_realm')
invalid_subdomain = return_data.get('invalid_subdomain')
invalid_email = return_data.get('invalid_email')
auth_failed_reason = return_data.get("social_auth_failed_reason")
if invalid_realm:
from zerver.views.auth import redirect_to_subdomain_login_url
return redirect_to_subdomain_login_url()
if auth_backend_disabled or inactive_user or inactive_realm:
# Redirect to login page. We can't send to registration
# workflow with these errors. We will redirect to login page.
return None
if invalid_email:
# In case of invalid email, we will end up on registration page.
# This seems better than redirecting to login page.
logging.warning(
"{} got invalid email argument.".format(backend.auth_backend_name)
)
return None
if auth_failed_reason:
logging.info(auth_failed_reason)
return None
# Structurally, all the cases where we don't have an authenticated
# email for the user should be handled above; this assertion helps
# prevent any violations of that contract from resulting in a user
# being incorrectly authenticated.
assert return_data.get('valid_attestation') is True
strategy = backend.strategy # type: ignore # This comes from Python Social Auth.
email_address = return_data['validated_email']
full_name = return_data['full_name']
is_signup = strategy.session_get('is_signup') == '1'
redirect_to = strategy.session_get('next')
mobile_flow_otp = strategy.session_get('mobile_flow_otp')
subdomain = strategy.session_get('subdomain')
assert subdomain is not None
if mobile_flow_otp is not None:
return login_or_register_remote_user(strategy.request, email_address,
user_profile, full_name,
invalid_subdomain=bool(invalid_subdomain),
mobile_flow_otp=mobile_flow_otp,
is_signup=is_signup,
redirect_to=redirect_to)
realm = get_realm(subdomain)
return redirect_and_log_into_subdomain(realm, full_name, email_address,
is_signup=is_signup,
redirect_to=redirect_to)
class GitHubAuthBackend(GithubOAuth2):
auth_backend_name = "GitHub"
def get_email_address(self, *args: Any, **kwargs: Any) -> Optional[str]:
try:
return kwargs['response']['email']
except KeyError: # nocoverage # TODO: investigate
return None
def get_full_name(self, *args: Any, **kwargs: Any) -> str:
# In case of any error return an empty string. Name is used by
# the registration page to pre-populate the name field. However,
# if it is not supplied, our registration process will make sure
# that the user enters a valid name.
try:
name = kwargs['response']['name']
except KeyError:
name = ''
if name is None:
return ''
return name
def get_authenticated_user(self, *args: Any, **kwargs: Any) -> Optional[UserProfile]:
"""
This function is called once the OAuth2 workflow is complete. We
override this function to call the proper `do_auth` function depending
on whether we are doing individual, team or organization based GitHub
authentication. The actual decision on authentication is done in
SocialAuthMixin._common_authenticate().
"""
user_profile = None
def get_validated_email(self, *args: Any, **kwargs: Any) -> Optional[str]:
return kwargs['response']['email']
def user_data(self, access_token: str, *args: Any, **kwargs: Any) -> Dict[str, str]:
"""This patched user_data function lets us combine together the 3
social auth backends into a single Zulip backend for GitHub Oauth2"""
team_id = settings.SOCIAL_AUTH_GITHUB_TEAM_ID
org_name = settings.SOCIAL_AUTH_GITHUB_ORG_NAME
if (team_id is None and org_name is None):
try:
user_profile = GithubOAuth2.do_auth(self, *args, **kwargs)
except AuthFailed:
logging.info("User authentication failed.")
user_profile = None
elif (team_id):
if team_id is None and org_name is None:
# I believe this can't raise AuthFailed, so we don't try to catch it here.
return super().user_data(
access_token, *args, **kwargs
)
elif team_id is not None:
backend = GithubTeamOAuth2(self.strategy, self.redirect_uri)
try:
user_profile = backend.do_auth(*args, **kwargs)
return backend.user_data(access_token, *args, **kwargs)
except AuthFailed:
logging.info("User is not member of GitHub team.")
user_profile = None
elif (org_name):
return dict(auth_failed_reason="GitHub user is not member of required team")
elif org_name is not None:
backend = GithubOrganizationOAuth2(self.strategy, self.redirect_uri)
try:
user_profile = backend.do_auth(*args, **kwargs)
return backend.user_data(access_token, *args, **kwargs)
except AuthFailed:
logging.info("User is not member of GitHub organization.")
user_profile = None
return dict(auth_failed_reason="GitHub user is not member of required organization")
return user_profile
raise AssertionError("Invalid configuration")
AUTH_BACKEND_NAME_MAP = {
'Dev': DevAuthBackend,

View File

@ -1530,6 +1530,12 @@ SOCIAL_AUTH_GITHUB_ORG_SECRET = SOCIAL_AUTH_GITHUB_SECRET
SOCIAL_AUTH_GITHUB_TEAM_KEY = SOCIAL_AUTH_GITHUB_KEY
SOCIAL_AUTH_GITHUB_TEAM_SECRET = SOCIAL_AUTH_GITHUB_SECRET
SOCIAL_AUTH_PIPELINE = [
'social_core.pipeline.social_auth.social_details',
'zproject.backends.social_auth_associate_user',
'zproject.backends.social_auth_finish',
]
########################################################################
# EMAIL SETTINGS
########################################################################