auth: Extract token-check logic of remote_user_jwt.

This will be useful for re-use for implementation of an endpoint for
obtaining the API by submitting a JWT in the next commits.

It's not a pure refactor, as it requires some tweaks to remote_user_jwt
behavior:
1. The expected format of the request is changed a bit. It used to
   expect "user" and "realm" keys, from which the intended email was
   just generated by joining with @. Now it just expects "email"
   straight-up. The prior design was a bt strange to begin with, so this
   might be an improvement actually.
2. In the case of the codepath of new user signup, this will no longer
   pre-populate the Full Name in the registration form with the value
   from the "user" key. This should be a very minor lost of
   functionality, because the "user" value was not going to be a proper
   Full Name anyway. This functionality can be restored in a future
   commit if desired.

This is an API change, but this endpoint is nearly unused as far as
we're aware.
This commit is contained in:
Mateusz Mandera 2023-01-27 14:47:06 +01:00 committed by Tim Abbott
parent dd85e8b4f5
commit 6c638a1057
2 changed files with 34 additions and 58 deletions

View File

@ -5415,7 +5415,7 @@ class TestJWTLogin(ZulipTestCase):
"""
def test_login_success(self) -> None:
payload = {"user": "hamlet", "realm": "zulip.com"}
payload = {"email": "hamlet@zulip.com"}
with self.settings(JWT_AUTH_KEYS={"zulip": {"key": "key", "algorithms": ["HS256"]}}):
email = self.example_email("hamlet")
realm = get_realm("zulip")
@ -5429,8 +5429,8 @@ class TestJWTLogin(ZulipTestCase):
self.assertEqual(result.status_code, 302)
self.assert_logged_in_user_id(user_profile.id)
def test_login_failure_when_user_is_missing(self) -> None:
payload = {"realm": "zulip.com"}
def test_login_failure_when_email_is_missing(self) -> None:
payload: Dict[str, str] = {}
with self.settings(JWT_AUTH_KEYS={"zulip": {"key": "key", "algorithms": ["HS256"]}}):
key = settings.JWT_AUTH_KEYS["zulip"]["key"]
[algorithm] = settings.JWT_AUTH_KEYS["zulip"]["algorithms"]
@ -5438,25 +5438,14 @@ class TestJWTLogin(ZulipTestCase):
data = {"json_web_token": web_token}
result = self.client_post("/accounts/login/jwt/", data)
self.assert_json_error_contains(
result, "No user specified in JSON web token claims", 400
)
def test_login_failure_when_realm_is_missing(self) -> None:
payload = {"user": "hamlet"}
with self.settings(JWT_AUTH_KEYS={"zulip": {"key": "key", "algorithms": ["HS256"]}}):
key = settings.JWT_AUTH_KEYS["zulip"]["key"]
[algorithm] = settings.JWT_AUTH_KEYS["zulip"]["algorithms"]
web_token = jwt.encode(payload, key, algorithm)
data = {"json_web_token": web_token}
result = self.client_post("/accounts/login/jwt/", data)
self.assert_json_error_contains(
result, "No organization specified in JSON web token claims", 400
result, "No email specified in JSON web token claims", 400
)
def test_login_failure_when_key_does_not_exist(self) -> None:
data = {"json_web_token": "not relevant"}
result = self.client_post("/accounts/login/jwt/", data)
self.assert_json_error_contains(result, "Auth key for this subdomain not found.", 400)
with self.settings(JWT_AUTH_KEYS={"acme": {"key": "key", "algorithms": ["HS256"]}}):
data = {"json_web_token": "not relevant"}
result = self.client_post("/accounts/login/jwt/", data)
self.assert_json_error_contains(result, "Auth key for this subdomain not found", 400)
def test_login_failure_when_key_is_missing(self) -> None:
with self.settings(JWT_AUTH_KEYS={"zulip": {"key": "key", "algorithms": ["HS256"]}}):
@ -5472,7 +5461,7 @@ class TestJWTLogin(ZulipTestCase):
self.assert_json_error_contains(result, "Bad JSON web token", 400)
def test_login_failure_when_user_does_not_exist(self) -> None:
payload = {"user": "nonexisting", "realm": "zulip.com"}
payload = {"email": "nonexisting@zulip.com"}
with self.settings(JWT_AUTH_KEYS={"zulip": {"key": "key", "algorithms": ["HS256"]}}):
key = settings.JWT_AUTH_KEYS["zulip"]["key"]
[algorithm] = settings.JWT_AUTH_KEYS["zulip"]["algorithms"]
@ -5483,33 +5472,20 @@ class TestJWTLogin(ZulipTestCase):
self.assert_logged_in_user_id(None)
def test_login_failure_due_to_wrong_subdomain(self) -> None:
payload = {"user": "hamlet", "realm": "zulip.com"}
payload = {"email": "hamlet@zulip.com"}
with self.settings(JWT_AUTH_KEYS={"acme": {"key": "key", "algorithms": ["HS256"]}}):
with mock.patch("zerver.views.auth.get_subdomain", return_value="acme"):
with mock.patch("zerver.views.auth.get_realm_from_request", return_value=None):
key = settings.JWT_AUTH_KEYS["acme"]["key"]
[algorithm] = settings.JWT_AUTH_KEYS["acme"]["algorithms"]
web_token = jwt.encode(payload, key, algorithm)
data = {"json_web_token": web_token}
result = self.client_post("/accounts/login/jwt/", data)
self.assert_json_error_contains(result, "Wrong subdomain", 400)
self.assert_logged_in_user_id(None)
def test_login_failure_due_to_empty_subdomain(self) -> None:
payload = {"user": "hamlet", "realm": "zulip.com"}
with self.settings(JWT_AUTH_KEYS={"": {"key": "key", "algorithms": ["HS256"]}}):
with mock.patch("zerver.views.auth.get_subdomain", return_value=""):
key = settings.JWT_AUTH_KEYS[""]["key"]
[algorithm] = settings.JWT_AUTH_KEYS[""]["algorithms"]
web_token = jwt.encode(payload, key, algorithm)
data = {"json_web_token": web_token}
result = self.client_post("/accounts/login/jwt/", data)
self.assert_json_error_contains(result, "Wrong subdomain", 400)
self.assert_json_error_contains(result, "Invalid subdomain", 404)
self.assert_logged_in_user_id(None)
def test_login_success_under_subdomains(self) -> None:
payload = {"user": "hamlet", "realm": "zulip.com"}
payload = {"email": "hamlet@zulip.com"}
with self.settings(JWT_AUTH_KEYS={"zulip": {"key": "key", "algorithms": ["HS256"]}}):
with mock.patch("zerver.views.auth.get_subdomain", return_value="zulip"):
key = settings.JWT_AUTH_KEYS["zulip"]["key"]

View File

@ -1,9 +1,8 @@
import logging
import secrets
import urllib
from email.headerregistry import Address
from functools import wraps
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Mapping, Optional, cast
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Mapping, Optional, Tuple, cast
from urllib.parse import urlencode
import jwt
@ -473,15 +472,18 @@ def remote_user_sso(
return login_or_register_remote_user(request, result)
@csrf_exempt
@log_view_func
def remote_user_jwt(request: HttpRequest) -> HttpResponse:
subdomain = get_subdomain(request)
def get_email_and_realm_from_jwt_authentication_request(
request: HttpRequest,
) -> Tuple[str, Realm]:
realm = get_realm_from_request(request)
if realm is None:
raise InvalidSubdomainError()
try:
key = settings.JWT_AUTH_KEYS[subdomain]["key"]
algorithms = settings.JWT_AUTH_KEYS[subdomain]["algorithms"]
key = settings.JWT_AUTH_KEYS[realm.subdomain]["key"]
algorithms = settings.JWT_AUTH_KEYS[realm.subdomain]["algorithms"]
except KeyError:
raise JsonableError(_("Auth key for this subdomain not found."))
raise JsonableError(_("Auth key for this subdomain not found"))
try:
json_web_token = request.POST["json_web_token"]
@ -492,24 +494,22 @@ def remote_user_jwt(request: HttpRequest) -> HttpResponse:
except jwt.InvalidTokenError:
raise JsonableError(_("Bad JSON web token"))
remote_user = payload.get("user", None)
if remote_user is None:
raise JsonableError(_("No user specified in JSON web token claims"))
email_domain = payload.get("realm", None)
if email_domain is None:
raise JsonableError(_("No organization specified in JSON web token claims"))
remote_email = payload.get("email", None)
if remote_email is None:
raise JsonableError(_("No email specified in JSON web token claims"))
email = Address(username=remote_user, domain=email_domain).addr_spec
return remote_email, realm
try:
realm = get_realm(subdomain)
except Realm.DoesNotExist:
raise JsonableError(_("Wrong subdomain"))
@csrf_exempt
@log_view_func
def remote_user_jwt(request: HttpRequest) -> HttpResponse:
email, realm = get_email_and_realm_from_jwt_authentication_request(request)
user_profile = authenticate(username=email, realm=realm, use_dummy_backend=True)
if user_profile is None:
result = ExternalAuthResult(
data_dict={"email": email, "full_name": remote_user, "subdomain": realm.subdomain}
data_dict={"email": email, "full_name": "", "subdomain": realm.subdomain}
)
else:
assert isinstance(user_profile, UserProfile)