Convert EmailAuthBackend and LDAPAuthBackend to accept a realm.

This commit is contained in:
Tim Abbott 2017-11-17 14:56:45 -08:00
parent 53224a16a9
commit 3bfb19b5f3
8 changed files with 61 additions and 50 deletions

View File

@ -288,8 +288,9 @@ Please contact %s to reactivate this group.""" % (
if username is not None and password: if username is not None and password:
subdomain = get_subdomain(self.request) subdomain = get_subdomain(self.request)
realm = get_realm(subdomain)
self.user_cache = authenticate(self.request, username=username, password=password, self.user_cache = authenticate(self.request, username=username, password=password,
realm_subdomain=subdomain) realm=realm)
if self.user_cache is None: if self.user_cache is None:
raise forms.ValidationError( raise forms.ValidationError(
self.error_messages['invalid_login'], self.error_messages['invalid_login'],

View File

@ -43,7 +43,7 @@ def is_subdomain_root_or_alias(request: HttpRequest) -> bool:
def user_matches_subdomain(realm_subdomain: Optional[Text], user_profile: UserProfile) -> bool: def user_matches_subdomain(realm_subdomain: Optional[Text], user_profile: UserProfile) -> bool:
if realm_subdomain is None: if realm_subdomain is None:
return True return True # nocoverage # This state may no longer be possible.
return user_profile.realm.subdomain == realm_subdomain return user_profile.realm.subdomain == realm_subdomain
def is_root_domain_available() -> bool: def is_root_domain_available() -> bool:

View File

@ -292,10 +292,10 @@ class ZulipTestCase(TestCase):
password = initial_password(email) password = initial_password(email)
if not fails: if not fails:
self.assertTrue(self.client.login(username=email, password=password, self.assertTrue(self.client.login(username=email, password=password,
realm_subdomain=realm.subdomain)) realm=realm))
else: else:
self.assertFalse(self.client.login(username=email, password=password, self.assertFalse(self.client.login(username=email, password=password,
realm_subdomain=realm.subdomain)) realm=realm))
def logout(self): def logout(self):
# type: () -> None # type: () -> None

View File

@ -168,6 +168,7 @@ class AuthBackendTest(ZulipTestCase):
return_value=True): return_value=True):
return_data = {} # type: Dict[str, bool] return_data = {} # type: Dict[str, bool]
user = EmailAuthBackend().authenticate(self.example_email('hamlet'), user = EmailAuthBackend().authenticate(self.example_email('hamlet'),
realm=get_realm("zulip"),
password=password, password=password,
return_data=return_data) return_data=return_data)
self.assertEqual(user, None) self.assertEqual(user, None)
@ -176,11 +177,20 @@ class AuthBackendTest(ZulipTestCase):
self.verify_backend(EmailAuthBackend(), self.verify_backend(EmailAuthBackend(),
good_kwargs=dict(password=password, good_kwargs=dict(password=password,
username=username, username=username,
realm_subdomain='zulip', realm=get_realm('zulip'),
return_data=dict()), return_data=dict()),
bad_kwargs=dict(password=password, bad_kwargs=dict(password=password,
username=username, username=username,
realm_subdomain='acme', realm=get_realm('zephyr'),
return_data=dict()))
self.verify_backend(EmailAuthBackend(),
good_kwargs=dict(password=password,
username=username,
realm=get_realm('zulip'),
return_data=dict()),
bad_kwargs=dict(password=password,
username=username,
realm=None,
return_data=dict())) return_data=dict()))
def test_email_auth_backend_disabled_password_auth(self): def test_email_auth_backend_disabled_password_auth(self):
@ -191,7 +201,9 @@ class AuthBackendTest(ZulipTestCase):
user_profile.save() user_profile.save()
# Verify if a realm has password auth disabled, correct password is rejected # Verify if a realm has password auth disabled, correct password is rejected
with mock.patch('zproject.backends.password_auth_enabled', return_value=False): with mock.patch('zproject.backends.password_auth_enabled', return_value=False):
self.assertIsNone(EmailAuthBackend().authenticate(self.example_email('hamlet'), password)) self.assertIsNone(EmailAuthBackend().authenticate(self.example_email('hamlet'),
password,
realm=get_realm("zulip")))
@override_settings(AUTHENTICATION_BACKENDS=('zproject.backends.ZulipDummyBackend',)) @override_settings(AUTHENTICATION_BACKENDS=('zproject.backends.ZulipDummyBackend',))
def test_no_backend_enabled(self): def test_no_backend_enabled(self):
@ -266,7 +278,7 @@ class AuthBackendTest(ZulipTestCase):
mock.patch('django_auth_ldap.backend._LDAPUser._check_requirements')), ( mock.patch('django_auth_ldap.backend._LDAPUser._check_requirements')), (
mock.patch('django_auth_ldap.backend._LDAPUser._get_user_attrs', mock.patch('django_auth_ldap.backend._LDAPUser._get_user_attrs',
return_value=dict(full_name=['Hamlet']))): return_value=dict(full_name=['Hamlet']))):
self.assertIsNone(backend.authenticate(email, password)) self.assertIsNone(backend.authenticate(email, password, realm=get_realm("zulip")))
with mock.patch('django_auth_ldap.backend._LDAPUser._authenticate_user_dn'), ( with mock.patch('django_auth_ldap.backend._LDAPUser._authenticate_user_dn'), (
mock.patch('django_auth_ldap.backend._LDAPUser._check_requirements')), ( mock.patch('django_auth_ldap.backend._LDAPUser._check_requirements')), (
@ -275,10 +287,17 @@ class AuthBackendTest(ZulipTestCase):
self.verify_backend(backend, self.verify_backend(backend,
bad_kwargs=dict(username=username, bad_kwargs=dict(username=username,
password=password, password=password,
realm_subdomain='acme'), realm=get_realm('zephyr')),
good_kwargs=dict(username=username, good_kwargs=dict(username=username,
password=password, password=password,
realm_subdomain='zulip')) realm=get_realm('zulip')))
self.verify_backend(backend,
bad_kwargs=dict(username=username,
password=password,
realm=get_realm('acme')),
good_kwargs=dict(username=username,
password=password,
realm=get_realm('zulip')))
def test_devauth_backend(self): def test_devauth_backend(self):
# type: () -> None # type: () -> None
@ -1864,7 +1883,8 @@ class TestLDAP(ZulipTestCase):
LDAP_APPEND_DOMAIN='zulip.com', LDAP_APPEND_DOMAIN='zulip.com',
AUTH_LDAP_BIND_PASSWORD='', AUTH_LDAP_BIND_PASSWORD='',
AUTH_LDAP_USER_DN_TEMPLATE='uid=%(user)s,ou=users,dc=zulip,dc=com'): AUTH_LDAP_USER_DN_TEMPLATE='uid=%(user)s,ou=users,dc=zulip,dc=com'):
user_profile = self.backend.authenticate(self.example_email("hamlet"), 'testing') user_profile = self.backend.authenticate(self.example_email("hamlet"), 'testing',
realm=get_realm('zulip'))
assert(user_profile is not None) assert(user_profile is not None)
self.assertEqual(user_profile.email, self.example_email("hamlet")) self.assertEqual(user_profile.email, self.example_email("hamlet"))
@ -1881,7 +1901,8 @@ class TestLDAP(ZulipTestCase):
with self.settings(LDAP_EMAIL_ATTR='email', with self.settings(LDAP_EMAIL_ATTR='email',
AUTH_LDAP_BIND_PASSWORD='', AUTH_LDAP_BIND_PASSWORD='',
AUTH_LDAP_USER_DN_TEMPLATE='uid=%(user)s,ou=users,dc=zulip,dc=com'): AUTH_LDAP_USER_DN_TEMPLATE='uid=%(user)s,ou=users,dc=zulip,dc=com'):
user_profile = self.backend.authenticate("letham", 'testing') user_profile = self.backend.authenticate("letham", 'testing',
realm=get_realm('zulip'))
assert (user_profile is not None) assert (user_profile is not None)
self.assertEqual(user_profile.email, self.example_email("hamlet")) self.assertEqual(user_profile.email, self.example_email("hamlet"))
@ -2055,11 +2076,11 @@ class TestLDAP(ZulipTestCase):
AUTH_LDAP_BIND_PASSWORD='', AUTH_LDAP_BIND_PASSWORD='',
AUTH_LDAP_USER_DN_TEMPLATE='uid=%(user)s,ou=users,dc=zulip,dc=com'): AUTH_LDAP_USER_DN_TEMPLATE='uid=%(user)s,ou=users,dc=zulip,dc=com'):
user_profile = self.backend.authenticate(self.example_email("hamlet"), 'testing', user_profile = self.backend.authenticate(self.example_email("hamlet"), 'testing',
realm_subdomain='acme') realm=get_realm('zephyr'))
self.assertIs(user_profile, None) self.assertIs(user_profile, None)
@override_settings(AUTHENTICATION_BACKENDS=('zproject.backends.ZulipLDAPAuthBackend',)) @override_settings(AUTHENTICATION_BACKENDS=('zproject.backends.ZulipLDAPAuthBackend',))
def test_login_failure_due_to_empty_subdomain(self): def test_login_failure_due_to_invalid_subdomain(self):
# type: () -> None # type: () -> None
self.mock_ldap.directory = { self.mock_ldap.directory = {
'uid=hamlet,ou=users,dc=zulip,dc=com': { 'uid=hamlet,ou=users,dc=zulip,dc=com': {
@ -2071,26 +2092,9 @@ class TestLDAP(ZulipTestCase):
AUTH_LDAP_BIND_PASSWORD='', AUTH_LDAP_BIND_PASSWORD='',
AUTH_LDAP_USER_DN_TEMPLATE='uid=%(user)s,ou=users,dc=zulip,dc=com'): AUTH_LDAP_USER_DN_TEMPLATE='uid=%(user)s,ou=users,dc=zulip,dc=com'):
user_profile = self.backend.authenticate(self.example_email("hamlet"), 'testing', user_profile = self.backend.authenticate(self.example_email("hamlet"), 'testing',
realm_subdomain='') realm=None)
self.assertIs(user_profile, None) self.assertIs(user_profile, None)
@override_settings(AUTHENTICATION_BACKENDS=('zproject.backends.ZulipLDAPAuthBackend',))
def test_login_success_when_subdomain_is_none(self):
# type: () -> None
self.mock_ldap.directory = {
'uid=hamlet,ou=users,dc=zulip,dc=com': {
'userPassword': 'testing'
}
}
with self.settings(
LDAP_APPEND_DOMAIN='zulip.com',
AUTH_LDAP_BIND_PASSWORD='',
AUTH_LDAP_USER_DN_TEMPLATE='uid=%(user)s,ou=users,dc=zulip,dc=com'):
user_profile = self.backend.authenticate(self.example_email("hamlet"), 'testing',
realm_subdomain=None)
assert(user_profile is not None)
self.assertEqual(user_profile.email, self.example_email("hamlet"))
@override_settings(AUTHENTICATION_BACKENDS=('zproject.backends.ZulipLDAPAuthBackend',)) @override_settings(AUTHENTICATION_BACKENDS=('zproject.backends.ZulipLDAPAuthBackend',))
def test_login_success_with_valid_subdomain(self): def test_login_success_with_valid_subdomain(self):
# type: () -> None # type: () -> None
@ -2104,7 +2108,7 @@ class TestLDAP(ZulipTestCase):
AUTH_LDAP_BIND_PASSWORD='', AUTH_LDAP_BIND_PASSWORD='',
AUTH_LDAP_USER_DN_TEMPLATE='uid=%(user)s,ou=users,dc=zulip,dc=com'): AUTH_LDAP_USER_DN_TEMPLATE='uid=%(user)s,ou=users,dc=zulip,dc=com'):
user_profile = self.backend.authenticate(self.example_email("hamlet"), 'testing', user_profile = self.backend.authenticate(self.example_email("hamlet"), 'testing',
realm_subdomain='zulip') realm=get_realm('zulip'))
assert(user_profile is not None) assert(user_profile is not None)
self.assertEqual(user_profile.email, self.example_email("hamlet")) self.assertEqual(user_profile.email, self.example_email("hamlet"))
@ -2122,7 +2126,7 @@ class TestLDAP(ZulipTestCase):
AUTH_LDAP_BIND_PASSWORD='', AUTH_LDAP_BIND_PASSWORD='',
AUTH_LDAP_USER_DN_TEMPLATE='uid=%(user)s,ou=users,dc=acme,dc=com'): AUTH_LDAP_USER_DN_TEMPLATE='uid=%(user)s,ou=users,dc=acme,dc=com'):
user_profile = self.backend.authenticate('nonexisting@acme.com', 'testing', user_profile = self.backend.authenticate('nonexisting@acme.com', 'testing',
realm_subdomain='zulip') realm=get_realm('zulip'))
assert(user_profile is not None) assert(user_profile is not None)
self.assertEqual(user_profile.email, 'nonexisting@acme.com') self.assertEqual(user_profile.email, 'nonexisting@acme.com')
self.assertEqual(user_profile.full_name, 'NonExisting') self.assertEqual(user_profile.full_name, 'NonExisting')

View File

@ -644,10 +644,10 @@ def api_dev_get_emails(request):
def api_fetch_api_key(request, username=REQ(), password=REQ()): def api_fetch_api_key(request, username=REQ(), password=REQ()):
# type: (HttpRequest, str, str) -> HttpResponse # type: (HttpRequest, str, str) -> HttpResponse
return_data = {} # type: Dict[str, bool] return_data = {} # type: Dict[str, bool]
subdomain = get_subdomain(request)
realm = get_realm(subdomain)
if username == "google-oauth2-token": if username == "google-oauth2-token":
# This code path is auth for the legacy Android app # This code path is auth for the legacy Android app
subdomain = get_subdomain(request)
realm = get_realm(subdomain)
user_profile = authenticate(google_oauth2_token=password, user_profile = authenticate(google_oauth2_token=password,
realm=realm, realm=realm,
return_data=return_data) return_data=return_data)
@ -659,7 +659,7 @@ def api_fetch_api_key(request, username=REQ(), password=REQ()):
user_profile = authenticate(username=username, user_profile = authenticate(username=username,
password=password, password=password,
realm_subdomain=get_subdomain(request), realm=realm,
return_data=return_data) return_data=return_data)
if return_data.get("inactive_user"): if return_data.get("inactive_user"):
return json_error(_("Your account has been disabled."), return json_error(_("Your account has been disabled."),
@ -756,9 +756,11 @@ def api_get_server_settings(request):
@has_request_variables @has_request_variables
def json_fetch_api_key(request, user_profile, password=REQ(default='')): def json_fetch_api_key(request, user_profile, password=REQ(default='')):
# type: (HttpRequest, UserProfile, str) -> HttpResponse # type: (HttpRequest, UserProfile, str) -> HttpResponse
subdomain = get_subdomain(request)
realm = get_realm(subdomain)
if password_auth_enabled(user_profile.realm): if password_auth_enabled(user_profile.realm):
if not authenticate(username=user_profile.email, password=password, if not authenticate(username=user_profile.email, password=password,
realm_subdomain=get_subdomain(request)): realm=realm):
return json_error(_("Your username or password is incorrect.")) return json_error(_("Your username or password is incorrect."))
return json_success({"api_key": user_profile.api_key}) return json_success({"api_key": user_profile.api_key})

View File

@ -191,7 +191,7 @@ def accounts_register(request):
auth_result = authenticate(request, auth_result = authenticate(request,
username=email, username=email,
password=password, password=password,
realm_subdomain=realm.subdomain, realm=realm,
return_data=return_data) return_data=return_data)
if auth_result is None: if auth_result is None:
# TODO: This probably isn't going to give a # TODO: This probably isn't going to give a

View File

@ -92,7 +92,8 @@ def json_change_settings(request, user_profile,
if new_password != "" or confirm_password != "": if new_password != "" or confirm_password != "":
if new_password != confirm_password: if new_password != confirm_password:
return json_error(_("New password must match confirmation password!")) return json_error(_("New password must match confirmation password!"))
if not authenticate(username=user_profile.email, password=old_password): if not authenticate(username=user_profile.email, password=old_password,
realm=user_profile.realm):
return json_error(_("Wrong password!")) return json_error(_("Wrong password!"))
do_change_password(user_profile, new_password) do_change_password(user_profile, new_password)
# In Django 1.10, password changes invalidates sessions, see # In Django 1.10, password changes invalidates sessions, see

View File

@ -335,7 +335,7 @@ class EmailAuthBackend(ZulipAuthMixin):
""" """
def authenticate(self, username: Optional[str]=None, password: Optional[str]=None, def authenticate(self, username: Optional[str]=None, password: Optional[str]=None,
realm_subdomain: Optional[str]=None, realm: Optional[Realm]=None,
return_data: Optional[Dict[str, Any]]=None) -> Optional[UserProfile]: return_data: Optional[Dict[str, Any]]=None) -> Optional[UserProfile]:
""" Authenticate a user based on email address as the user name. """ """ Authenticate a user based on email address as the user name. """
if username is None or password is None: if username is None or password is None:
@ -343,20 +343,22 @@ class EmailAuthBackend(ZulipAuthMixin):
# specify which backend to use when not using # specify which backend to use when not using
# EmailAuthBackend, username and password should always be set. # EmailAuthBackend, username and password should always be set.
raise AssertionError("Invalid call to authenticate for EmailAuthBackend") raise AssertionError("Invalid call to authenticate for EmailAuthBackend")
if realm is None:
return None
user_profile = common_get_active_user_by_email(username, return_data=return_data) user_profile = common_get_active_user_by_email(username, return_data=return_data)
if user_profile is None: if user_profile is None:
return None return None
if not password_auth_enabled(user_profile.realm): if not password_auth_enabled(realm):
if return_data is not None: if return_data is not None:
return_data['password_auth_disabled'] = True return_data['password_auth_disabled'] = True
return None return None
if not email_auth_enabled(user_profile.realm): if not email_auth_enabled(realm):
if return_data is not None: if return_data is not None:
return_data['email_auth_disabled'] = True return_data['email_auth_disabled'] = True
return None return None
if user_profile.check_password(password): if user_profile.check_password(password):
if not user_matches_subdomain(realm_subdomain, user_profile): if not user_matches_subdomain(realm.subdomain, user_profile):
if return_data is not None: if return_data is not None:
return_data["invalid_subdomain"] = True return_data["invalid_subdomain"] = True
return None return None
@ -460,21 +462,22 @@ class ZulipLDAPAuthBackendBase(ZulipAuthMixin, LDAPBackend):
class ZulipLDAPAuthBackend(ZulipLDAPAuthBackendBase): class ZulipLDAPAuthBackend(ZulipLDAPAuthBackendBase):
REALM_IS_NONE_ERROR = 1 REALM_IS_NONE_ERROR = 1
def authenticate(self, username: str, password: str, realm_subdomain: Optional[Text]=None, def authenticate(self, username: str, password: str, realm: Optional[Realm]=None,
return_data: Optional[Dict[str, Any]]=None) -> Optional[UserProfile]: return_data: Optional[Dict[str, Any]]=None) -> Optional[UserProfile]:
if realm is None:
return None
self._realm = realm
try: try:
self._realm = get_realm(realm_subdomain)
username = self.django_to_ldap_username(username) username = self.django_to_ldap_username(username)
user_profile = ZulipLDAPAuthBackendBase.authenticate(self, user_profile = ZulipLDAPAuthBackendBase.authenticate(self,
username=username, username=username,
password=password) password=password)
if user_profile is None: if user_profile is None:
return None return None
if not user_matches_subdomain(realm_subdomain, user_profile): if not user_matches_subdomain(realm.subdomain, user_profile):
return None return None
return user_profile return user_profile
except Realm.DoesNotExist:
return None # nocoverage # TODO: this may no longer be possible
except ZulipLDAPException: except ZulipLDAPException:
return None # nocoverage # TODO: this may no longer be possible return None # nocoverage # TODO: this may no longer be possible
@ -518,7 +521,7 @@ class ZulipLDAPAuthBackend(ZulipLDAPAuthBackendBase):
# Just like ZulipLDAPAuthBackend, but doesn't let you log in. # Just like ZulipLDAPAuthBackend, but doesn't let you log in.
class ZulipLDAPUserPopulator(ZulipLDAPAuthBackendBase): class ZulipLDAPUserPopulator(ZulipLDAPAuthBackendBase):
def authenticate(self, username: str, password: str, realm_subdomain: Optional[Text]=None, def authenticate(self, username: str, password: str, realm: Optional[Realm]=None,
return_data: Optional[Dict[str, Any]]=None) -> None: return_data: Optional[Dict[str, Any]]=None) -> None:
return None return None