diff --git a/zerver/tests/test_auth_backends.py b/zerver/tests/test_auth_backends.py index db691e61c6..4ac8015225 100644 --- a/zerver/tests/test_auth_backends.py +++ b/zerver/tests/test_auth_backends.py @@ -5415,7 +5415,7 @@ class TestJWTLogin(ZulipTestCase): """ def test_login_success(self) -> None: - payload = {"user": "hamlet", "realm": "zulip.com"} + payload = {"email": "hamlet@zulip.com"} with self.settings(JWT_AUTH_KEYS={"zulip": {"key": "key", "algorithms": ["HS256"]}}): email = self.example_email("hamlet") realm = get_realm("zulip") @@ -5429,8 +5429,8 @@ class TestJWTLogin(ZulipTestCase): self.assertEqual(result.status_code, 302) self.assert_logged_in_user_id(user_profile.id) - def test_login_failure_when_user_is_missing(self) -> None: - payload = {"realm": "zulip.com"} + def test_login_failure_when_email_is_missing(self) -> None: + payload: Dict[str, str] = {} with self.settings(JWT_AUTH_KEYS={"zulip": {"key": "key", "algorithms": ["HS256"]}}): key = settings.JWT_AUTH_KEYS["zulip"]["key"] [algorithm] = settings.JWT_AUTH_KEYS["zulip"]["algorithms"] @@ -5438,25 +5438,14 @@ class TestJWTLogin(ZulipTestCase): data = {"json_web_token": web_token} result = self.client_post("/accounts/login/jwt/", data) self.assert_json_error_contains( - result, "No user specified in JSON web token claims", 400 - ) - - def test_login_failure_when_realm_is_missing(self) -> None: - payload = {"user": "hamlet"} - with self.settings(JWT_AUTH_KEYS={"zulip": {"key": "key", "algorithms": ["HS256"]}}): - key = settings.JWT_AUTH_KEYS["zulip"]["key"] - [algorithm] = settings.JWT_AUTH_KEYS["zulip"]["algorithms"] - web_token = jwt.encode(payload, key, algorithm) - data = {"json_web_token": web_token} - result = self.client_post("/accounts/login/jwt/", data) - self.assert_json_error_contains( - result, "No organization specified in JSON web token claims", 400 + result, "No email specified in JSON web token claims", 400 ) def test_login_failure_when_key_does_not_exist(self) -> None: - data = {"json_web_token": "not relevant"} - result = self.client_post("/accounts/login/jwt/", data) - self.assert_json_error_contains(result, "Auth key for this subdomain not found.", 400) + with self.settings(JWT_AUTH_KEYS={"acme": {"key": "key", "algorithms": ["HS256"]}}): + data = {"json_web_token": "not relevant"} + result = self.client_post("/accounts/login/jwt/", data) + self.assert_json_error_contains(result, "Auth key for this subdomain not found", 400) def test_login_failure_when_key_is_missing(self) -> None: with self.settings(JWT_AUTH_KEYS={"zulip": {"key": "key", "algorithms": ["HS256"]}}): @@ -5472,7 +5461,7 @@ class TestJWTLogin(ZulipTestCase): self.assert_json_error_contains(result, "Bad JSON web token", 400) def test_login_failure_when_user_does_not_exist(self) -> None: - payload = {"user": "nonexisting", "realm": "zulip.com"} + payload = {"email": "nonexisting@zulip.com"} with self.settings(JWT_AUTH_KEYS={"zulip": {"key": "key", "algorithms": ["HS256"]}}): key = settings.JWT_AUTH_KEYS["zulip"]["key"] [algorithm] = settings.JWT_AUTH_KEYS["zulip"]["algorithms"] @@ -5483,33 +5472,20 @@ class TestJWTLogin(ZulipTestCase): self.assert_logged_in_user_id(None) def test_login_failure_due_to_wrong_subdomain(self) -> None: - payload = {"user": "hamlet", "realm": "zulip.com"} + payload = {"email": "hamlet@zulip.com"} with self.settings(JWT_AUTH_KEYS={"acme": {"key": "key", "algorithms": ["HS256"]}}): - with mock.patch("zerver.views.auth.get_subdomain", return_value="acme"): + with mock.patch("zerver.views.auth.get_realm_from_request", return_value=None): key = settings.JWT_AUTH_KEYS["acme"]["key"] [algorithm] = settings.JWT_AUTH_KEYS["acme"]["algorithms"] web_token = jwt.encode(payload, key, algorithm) data = {"json_web_token": web_token} result = self.client_post("/accounts/login/jwt/", data) - self.assert_json_error_contains(result, "Wrong subdomain", 400) - self.assert_logged_in_user_id(None) - - def test_login_failure_due_to_empty_subdomain(self) -> None: - payload = {"user": "hamlet", "realm": "zulip.com"} - with self.settings(JWT_AUTH_KEYS={"": {"key": "key", "algorithms": ["HS256"]}}): - with mock.patch("zerver.views.auth.get_subdomain", return_value=""): - key = settings.JWT_AUTH_KEYS[""]["key"] - [algorithm] = settings.JWT_AUTH_KEYS[""]["algorithms"] - web_token = jwt.encode(payload, key, algorithm) - - data = {"json_web_token": web_token} - result = self.client_post("/accounts/login/jwt/", data) - self.assert_json_error_contains(result, "Wrong subdomain", 400) + self.assert_json_error_contains(result, "Invalid subdomain", 404) self.assert_logged_in_user_id(None) def test_login_success_under_subdomains(self) -> None: - payload = {"user": "hamlet", "realm": "zulip.com"} + payload = {"email": "hamlet@zulip.com"} with self.settings(JWT_AUTH_KEYS={"zulip": {"key": "key", "algorithms": ["HS256"]}}): with mock.patch("zerver.views.auth.get_subdomain", return_value="zulip"): key = settings.JWT_AUTH_KEYS["zulip"]["key"] diff --git a/zerver/views/auth.py b/zerver/views/auth.py index 66c3c304bb..8b41046f6a 100644 --- a/zerver/views/auth.py +++ b/zerver/views/auth.py @@ -1,9 +1,8 @@ import logging import secrets import urllib -from email.headerregistry import Address from functools import wraps -from typing import TYPE_CHECKING, Any, Callable, Dict, List, Mapping, Optional, cast +from typing import TYPE_CHECKING, Any, Callable, Dict, List, Mapping, Optional, Tuple, cast from urllib.parse import urlencode import jwt @@ -473,15 +472,18 @@ def remote_user_sso( return login_or_register_remote_user(request, result) -@csrf_exempt -@log_view_func -def remote_user_jwt(request: HttpRequest) -> HttpResponse: - subdomain = get_subdomain(request) +def get_email_and_realm_from_jwt_authentication_request( + request: HttpRequest, +) -> Tuple[str, Realm]: + realm = get_realm_from_request(request) + if realm is None: + raise InvalidSubdomainError() + try: - key = settings.JWT_AUTH_KEYS[subdomain]["key"] - algorithms = settings.JWT_AUTH_KEYS[subdomain]["algorithms"] + key = settings.JWT_AUTH_KEYS[realm.subdomain]["key"] + algorithms = settings.JWT_AUTH_KEYS[realm.subdomain]["algorithms"] except KeyError: - raise JsonableError(_("Auth key for this subdomain not found.")) + raise JsonableError(_("Auth key for this subdomain not found")) try: json_web_token = request.POST["json_web_token"] @@ -492,24 +494,22 @@ def remote_user_jwt(request: HttpRequest) -> HttpResponse: except jwt.InvalidTokenError: raise JsonableError(_("Bad JSON web token")) - remote_user = payload.get("user", None) - if remote_user is None: - raise JsonableError(_("No user specified in JSON web token claims")) - email_domain = payload.get("realm", None) - if email_domain is None: - raise JsonableError(_("No organization specified in JSON web token claims")) + remote_email = payload.get("email", None) + if remote_email is None: + raise JsonableError(_("No email specified in JSON web token claims")) - email = Address(username=remote_user, domain=email_domain).addr_spec + return remote_email, realm - try: - realm = get_realm(subdomain) - except Realm.DoesNotExist: - raise JsonableError(_("Wrong subdomain")) + +@csrf_exempt +@log_view_func +def remote_user_jwt(request: HttpRequest) -> HttpResponse: + email, realm = get_email_and_realm_from_jwt_authentication_request(request) user_profile = authenticate(username=email, realm=realm, use_dummy_backend=True) if user_profile is None: result = ExternalAuthResult( - data_dict={"email": email, "full_name": remote_user, "subdomain": realm.subdomain} + data_dict={"email": email, "full_name": "", "subdomain": realm.subdomain} ) else: assert isinstance(user_profile, UserProfile)