From 86c330b7520714e5e78ae7751d18fbc928096eb5 Mon Sep 17 00:00:00 2001 From: Mateusz Mandera Date: Sat, 26 Jun 2021 18:51:43 +0200 Subject: [PATCH] social_auth: Fix handling of user errors in the authentication process. The code didn't account for existence of SOCIAL_AUTH_SUBDOMAIN. So the redirects would happen to endpoints on the SOCIAL_AUTH_SUBDOMAIN, which is incorrect. The redirects should happen to the realm from which the user came. --- zerver/tests/test_auth_backends.py | 83 +++++++++++++++++++++--------- zproject/backends.py | 26 +++++++--- 2 files changed, 77 insertions(+), 32 deletions(-) diff --git a/zerver/tests/test_auth_backends.py b/zerver/tests/test_auth_backends.py index d489d8cfad..0bee6d0b44 100644 --- a/zerver/tests/test_auth_backends.py +++ b/zerver/tests/test_auth_backends.py @@ -168,7 +168,7 @@ class AuthBackendTest(ZulipTestCase): if isinstance(backend, SocialAuthMixin): # Returns a redirect to login page with an error. self.assertEqual(result.status_code, 302) - self.assertEqual(result.url, "/login/?is_deactivated=true") + self.assertEqual(result.url, user_profile.realm.uri + "/login/?is_deactivated=true") else: # Just takes you back to the login page treating as # invalid auth; this is correct because the form will @@ -183,7 +183,12 @@ class AuthBackendTest(ZulipTestCase): # Verify auth fails with a deactivated realm do_deactivate_realm(user_profile.realm, acting_user=None) - self.assertIsNone(backend.authenticate(**good_kwargs)) + result = backend.authenticate(**good_kwargs) + if isinstance(backend, SocialAuthMixin): + self.assertEqual(result.status_code, 302) + self.assertEqual(result.url, user_profile.realm.uri + "/login/") + else: + self.assertIsNone(result) # Verify auth works again after reactivating the realm do_reactivate_realm(user_profile.realm) @@ -198,7 +203,12 @@ class AuthBackendTest(ZulipTestCase): # Verify auth fails if the auth backend is disabled on server with self.settings(AUTHENTICATION_BACKENDS=("zproject.backends.ZulipDummyBackend",)): clear_supported_auth_backends_cache() - self.assertIsNone(backend.authenticate(**good_kwargs)) + result = backend.authenticate(**good_kwargs) + if isinstance(backend, SocialAuthMixin): + self.assertEqual(result.status_code, 302) + self.assertEqual(result.url, user_profile.realm.uri + "/login/") + else: + self.assertIsNone(result) clear_supported_auth_backends_cache() # Verify auth fails if the auth backend is disabled for the realm @@ -217,7 +227,12 @@ class AuthBackendTest(ZulipTestCase): # propagate the changes we just made to the actual realm # object in good_kwargs. good_kwargs["realm"] = user_profile.realm - self.assertIsNone(backend.authenticate(**good_kwargs)) + + if isinstance(backend, SocialAuthMixin): + self.assertEqual(result.status_code, 302) + self.assertEqual(result.url, user_profile.realm.uri + "/login/") + else: + self.assertIsNone(result) user_profile.realm.authentication_methods.set_bit(index, True) user_profile.realm.save() @@ -1084,7 +1099,7 @@ class SocialAuthBase(DesktopFlowTestingLib, ZulipTestCase): account_data_dict, expect_choose_email_screen=True, subdomain="zulip" ) self.assertEqual(result.status_code, 302) - self.assertEqual(result.url, "/login/?is_deactivated=true") + self.assertEqual(result.url, user_profile.realm.uri + "/login/?is_deactivated=true") self.assertEqual( m.output, [ @@ -1108,15 +1123,15 @@ class SocialAuthBase(DesktopFlowTestingLib, ZulipTestCase): def test_social_auth_invalid_email(self) -> None: account_data_dict = self.get_account_data_dict(email="invalid", name=self.name) + subdomain = "zulip" + realm = get_realm(subdomain) with self.assertLogs(self.logger_string, level="INFO") as m: result = self.social_auth_test( account_data_dict, expect_choose_email_screen=True, - subdomain="zulip", + subdomain=subdomain, next="/user_uploads/image", ) - self.assertEqual(result.status_code, 302) - self.assertEqual(result.url, "/login/?next=/user_uploads/image") self.assertEqual( m.output, [ @@ -1126,6 +1141,8 @@ class SocialAuthBase(DesktopFlowTestingLib, ZulipTestCase): ) ], ) + self.assertEqual(result.status_code, 302) + self.assertEqual(result.url, realm.uri + "/register/") def test_user_cannot_log_into_nonexisting_realm(self) -> None: account_data_dict = self.get_account_data_dict(email=self.email, name=self.name) @@ -2204,18 +2221,20 @@ class SAMLAuthBackendTest(SocialAuthBase): if the authentication attempt failed. See SAMLAuthBackend.auth_complete for details. """ account_data_dict = self.get_account_data_dict(email="invalid", name=self.name) + subdomain = "zulip" + realm = get_realm(subdomain) with self.assertLogs(self.logger_string, "WARNING") as warn_log: result = self.social_auth_test( account_data_dict, expect_choose_email_screen=True, - subdomain="zulip", + subdomain=subdomain, next="/user_uploads/image", ) self.assertEqual( warn_log.output, [self.logger_output("SAML got invalid email argument.", "warning")] ) self.assertEqual(result.status_code, 302) - self.assertEqual(result.url, "/login/") + self.assertEqual(result.url, realm.uri + "/register/") def test_social_auth_saml_multiple_idps_configured(self) -> None: # Setup a new SOCIAL_AUTH_SAML_ENABLED_IDPS dict with two idps. @@ -3292,12 +3311,14 @@ class GitHubAuthBackendTest(SocialAuthBase): email_data = [ dict(email=account_data_dict["email"], verified=False, primary=True), ] + subdomain = "zulip" + realm = get_realm(subdomain) with self.assertLogs(self.logger_string, level="WARNING") as m: result = self.social_auth_test( - account_data_dict, subdomain="zulip", email_data=email_data + account_data_dict, subdomain=subdomain, email_data=email_data ) self.assertEqual(result.status_code, 302) - self.assertEqual(result.url, "/login/") + self.assertEqual(result.url, realm.uri + "/login/") self.assertEqual( m.output, [ @@ -3311,13 +3332,15 @@ class GitHubAuthBackendTest(SocialAuthBase): @override_settings(SOCIAL_AUTH_GITHUB_TEAM_ID="zulip-webapp") def test_social_auth_github_team_not_member_failed(self) -> None: account_data_dict = self.get_account_data_dict(email=self.email, name=self.name) + subdomain = "zulip" + realm = get_realm(subdomain) with mock.patch( "social_core.backends.github.GithubTeamOAuth2.user_data", side_effect=AuthFailed("Not found"), ), self.assertLogs(self.logger_string, level="INFO") as mock_info: - result = self.social_auth_test(account_data_dict, subdomain="zulip") + result = self.social_auth_test(account_data_dict, subdomain=subdomain) self.assertEqual(result.status_code, 302) - self.assertEqual(result.url, "/login/") + self.assertEqual(result.url, realm.uri + "/login/") self.assertEqual( mock_info.output, [ @@ -3345,13 +3368,15 @@ class GitHubAuthBackendTest(SocialAuthBase): @override_settings(SOCIAL_AUTH_GITHUB_ORG_NAME="Zulip") def test_social_auth_github_organization_not_member_failed(self) -> None: account_data_dict = self.get_account_data_dict(email=self.email, name=self.name) + subdomain = "zulip" + realm = get_realm(subdomain) with mock.patch( "social_core.backends.github.GithubOrganizationOAuth2.user_data", side_effect=AuthFailed("Not found"), ), self.assertLogs(self.logger_string, level="INFO") as mock_info: - result = self.social_auth_test(account_data_dict, subdomain="zulip") + result = self.social_auth_test(account_data_dict, subdomain=subdomain) self.assertEqual(result.status_code, 302) - self.assertEqual(result.url, "/login/") + self.assertEqual(result.url, realm.uri + "/login/") self.assertEqual( mock_info.output, [ @@ -3591,15 +3616,17 @@ class GitHubAuthBackendTest(SocialAuthBase): dict(email="aaron@zulip.com", verified=True), dict(email=account_data_dict["email"], verified=True), ] + subdomain = "zulip" + realm = get_realm(subdomain) with self.assertLogs(self.logger_string, level="WARNING") as m: result = self.social_auth_test( account_data_dict, - subdomain="zulip", + subdomain=subdomain, expect_choose_email_screen=True, email_data=email_data, ) self.assertEqual(result.status_code, 302) - self.assertEqual(result.url, "/login/") + self.assertEqual(result.url, realm.uri + "/login/") self.assertEqual( m.output, [ @@ -3642,15 +3669,17 @@ class GitHubAuthBackendTest(SocialAuthBase): dict(email="hamlet@zulip.com", verified=True, primary=True), dict(email="aaron@zulip.com", verified=True), ] + subdomain = "zulip" + realm = get_realm(subdomain) with self.assertLogs(self.logger_string, level="WARNING") as m: result = self.social_auth_test( account_data_dict, - subdomain="zulip", + subdomain=subdomain, expect_choose_email_screen=True, email_data=email_data, ) self.assertEqual(result.status_code, 302) - self.assertEqual(result.url, "/login/") + self.assertEqual(result.url, realm.uri + "/login/") self.assertEqual( m.output, [ @@ -3666,7 +3695,9 @@ class GitHubAuthBackendTest(SocialAuthBase): # check if a user is denied to log in if the user manages to # send an unverified email that has an existing account in # organisation through `email` GET parameter. - account_data_dict = self.get_account_data_dict(email="hamlet@zulip.com", name=self.name) + subdomain = "zulip" + realm = get_realm(subdomain) + account_data_dict = dict(email="hamlet@zulip.com", name=self.name) email_data = [ dict(email="iago@zulip.com", verified=True), dict(email="hamlet@zulip.com", verified=False), @@ -3675,12 +3706,12 @@ class GitHubAuthBackendTest(SocialAuthBase): with self.assertLogs(self.logger_string, level="WARNING") as m: result = self.social_auth_test( account_data_dict, - subdomain="zulip", + subdomain=subdomain, expect_choose_email_screen=True, email_data=email_data, ) self.assertEqual(result.status_code, 302) - self.assertEqual(result.url, "/login/") + self.assertEqual(result.url, realm.uri + "/login/") self.assertEqual( m.output, [ @@ -3735,10 +3766,12 @@ class GoogleAuthBackendTest(SocialAuthBase): def test_social_auth_email_not_verified(self) -> None: account_data_dict = dict(email=self.email, name=self.name) + subdomain = "zulip" + realm = get_realm(subdomain) with self.assertLogs(self.logger_string, level="WARNING") as m: - result = self.social_auth_test(account_data_dict, subdomain="zulip") + result = self.social_auth_test(account_data_dict, subdomain=subdomain) self.assertEqual(result.status_code, 302) - self.assertEqual(result.url, "/login/") + self.assertEqual(result.url, realm.uri + "/login/") self.assertEqual( m.output, [ diff --git a/zproject/backends.py b/zproject/backends.py index 7ea7bba310..b5251cc3a6 100644 --- a/zproject/backends.py +++ b/zproject/backends.py @@ -1296,11 +1296,23 @@ class ZulipRemoteUserBackend(RemoteUserBackend, ExternalAuthMethod): ] -def redirect_deactivated_user_to_login() -> HttpResponseRedirect: +def redirect_to_signup(realm: Realm) -> HttpResponseRedirect: + signup_url = reverse("register") + redirect_url = realm.uri + signup_url + return HttpResponseRedirect(redirect_url) + + +def redirect_to_login(realm: Realm) -> HttpResponseRedirect: + login_url = reverse("login_page", kwargs={"template_name": "zerver/login.html"}) + redirect_url = realm.uri + login_url + return HttpResponseRedirect(redirect_url) + + +def redirect_deactivated_user_to_login(realm: Realm) -> HttpResponseRedirect: # Specifying the template name makes sure that the user is not redirected to dev_login in case of # a deactivated account on a test server. login_url = reverse("login_page", kwargs={"template_name": "zerver/login.html"}) - redirect_url = login_url + "?is_deactivated=true" + redirect_url = realm.uri + login_url + "?is_deactivated=true" return HttpResponseRedirect(redirect_url) @@ -1508,18 +1520,19 @@ def social_auth_finish( # form on. return HttpResponseRedirect(reverse("find_account")) + realm = Realm.objects.get(id=return_data["realm_id"]) if inactive_user: backend.logger.info( "Failed login attempt for deactivated account: %s@%s", return_data["inactive_user_id"], return_data["realm_string_id"], ) - return redirect_deactivated_user_to_login() + return redirect_deactivated_user_to_login(realm) if auth_backend_disabled or inactive_realm or no_verified_email or email_not_associated: # Redirect to login page. We can't send to registration # workflow with these errors. We will redirect to login page. - return None + return redirect_to_login(realm) if invalid_email: # In case of invalid email, we will end up on registration page. @@ -1528,11 +1541,11 @@ def social_auth_finish( "%s got invalid email argument.", backend.auth_backend_name, ) - return None + return redirect_to_signup(realm) if auth_failed_reason: backend.logger.info(auth_failed_reason) - return None + return redirect_to_login(realm) # Structurally, all the cases where we don't have an authenticated # email for the user should be handled above; this assertion helps @@ -1545,7 +1558,6 @@ def social_auth_finish( email_address = return_data["validated_email"] full_name = return_data["full_name"] redirect_to = strategy.session_get("next") - realm = Realm.objects.get(id=return_data["realm_id"]) multiuse_object_key = strategy.session_get("multiuse_object_key", "") mobile_flow_otp = strategy.session_get("mobile_flow_otp")