refactor: Split dev related code from `auth.py` into `dev_login.py`.

This refactor will help us make our development-related authentication
code to avoid production security impact.
This commit is contained in:
Riken Shah 2021-04-21 20:07:08 +00:00 committed by Tim Abbott
parent 9f5efd208f
commit 0e5f6c21ad
5 changed files with 156 additions and 131 deletions

View File

@ -292,6 +292,7 @@ python_rules = RuleList(
{ {
"pattern": "django.utils.translation", "pattern": "django.utils.translation",
"include_only": {"test/", "zerver/views/development/"}, "include_only": {"test/", "zerver/views/development/"},
"exclude": {"zerver/views/development/dev_login.py"},
"description": "Test strings should not be tagged for translation", "description": "Test strings should not be tagged for translation",
"good_lines": [""], "good_lines": [""],
"bad_lines": ["django.utils.translation"], "bad_lines": ["django.utils.translation"],

View File

@ -3864,12 +3864,14 @@ class DevFetchAPIKeyTest(ZulipTestCase):
self.assert_json_error_contains(result, "This organization has been deactivated", 403) self.assert_json_error_contains(result, "This organization has been deactivated", 403)
def test_dev_auth_disabled(self) -> None: def test_dev_auth_disabled(self) -> None:
with mock.patch("zerver.views.auth.dev_auth_enabled", return_value=False): with mock.patch("zerver.views.development.dev_login.dev_auth_enabled", return_value=False):
result = self.client_post("/api/v1/dev_fetch_api_key", dict(username=self.email)) result = self.client_post("/api/v1/dev_fetch_api_key", dict(username=self.email))
self.assert_json_error_contains(result, "DevAuthBackend not enabled.", 400) self.assert_json_error_contains(result, "DevAuthBackend not enabled.", 400)
def test_invalid_subdomain(self) -> None: def test_invalid_subdomain(self) -> None:
with mock.patch("zerver.views.auth.get_realm_from_request", return_value=None): with mock.patch(
"zerver.views.development.dev_login.get_realm_from_request", return_value=None
):
result = self.client_post( result = self.client_post(
"/api/v1/dev_fetch_api_key", "/api/v1/dev_fetch_api_key",
dict(username=self.email, password=initial_password(self.email)), dict(username=self.email, password=initial_password(self.email)),
@ -3885,7 +3887,7 @@ class DevGetEmailsTest(ZulipTestCase):
self.assert_in_response("direct_users", result) self.assert_in_response("direct_users", result)
def test_dev_auth_disabled(self) -> None: def test_dev_auth_disabled(self) -> None:
with mock.patch("zerver.views.auth.dev_auth_enabled", return_value=False): with mock.patch("zerver.views.development.dev_login.dev_auth_enabled", return_value=False):
result = self.client_get("/api/v1/dev_list_users") result = self.client_get("/api/v1/dev_list_users")
self.assert_json_error_contains(result, "DevAuthBackend not enabled.", 400) self.assert_json_error_contains(result, "DevAuthBackend not enabled.", 400)

View File

@ -619,25 +619,6 @@ def redirect_and_log_into_subdomain(result: ExternalAuthResult) -> HttpResponse:
return redirect(subdomain_login_uri) return redirect(subdomain_login_uri)
def get_dev_users(realm: Optional[Realm] = None, extra_users_count: int = 10) -> List[UserProfile]:
# Development environments usually have only a few users, but
# it still makes sense to limit how many extra users we render to
# support performance testing with DevAuthBackend.
if realm is not None:
users_query = UserProfile.objects.select_related().filter(
is_bot=False, is_active=True, realm=realm
)
else:
users_query = UserProfile.objects.select_related().filter(is_bot=False, is_active=True)
shakespearian_users = users_query.exclude(email__startswith="extrauser").order_by("email")
extra_users = users_query.filter(email__startswith="extrauser").order_by("email")
# Limit the number of extra users we offer by default
extra_users = extra_users[0:extra_users_count]
users = list(shakespearian_users) + list(extra_users)
return users
def redirect_to_misconfigured_ldap_notice(request: HttpResponse, error_type: int) -> HttpResponse: def redirect_to_misconfigured_ldap_notice(request: HttpResponse, error_type: int) -> HttpResponse:
if error_type == ZulipLDAPAuthBackend.REALM_IS_NONE_ERROR: if error_type == ZulipLDAPAuthBackend.REALM_IS_NONE_ERROR:
return config_error(request, "ldap") return config_error(request, "ldap")
@ -660,20 +641,6 @@ def redirect_to_deactivation_notice() -> HttpResponse:
return HttpResponseRedirect(reverse(show_deactivation_notice)) return HttpResponseRedirect(reverse(show_deactivation_notice))
def add_dev_login_context(realm: Optional[Realm], context: Dict[str, Any]) -> None:
users = get_dev_users(realm)
context["current_realm"] = realm
context["all_realms"] = Realm.objects.all()
def sort(lst: List[UserProfile]) -> List[UserProfile]:
return sorted(lst, key=lambda u: u.delivery_email)
context["direct_owners"] = sort([u for u in users if u.is_realm_owner])
context["direct_admins"] = sort([u for u in users if u.is_realm_admin and not u.is_realm_owner])
context["guest_users"] = sort([u for u in users if u.is_guest])
context["direct_users"] = sort([u for u in users if not (u.is_realm_admin or u.is_guest)])
def update_login_page_context(request: HttpRequest, context: Dict[str, Any]) -> None: def update_login_page_context(request: HttpRequest, context: Dict[str, Any]) -> None:
for key in ("email", "already_registered", "is_deactivated"): for key in ("email", "already_registered", "is_deactivated"):
try: try:
@ -758,6 +725,8 @@ def login_page(
extra_context = kwargs.pop("extra_context", {}) extra_context = kwargs.pop("extra_context", {})
extra_context["next"] = next extra_context["next"] = next
if dev_auth_enabled() and kwargs.get("template_name") == "zerver/development/dev_login.html": if dev_auth_enabled() and kwargs.get("template_name") == "zerver/development/dev_login.html":
from zerver.views.development.dev_login import add_dev_login_context
if "new_realm" in request.POST: if "new_realm" in request.POST:
try: try:
realm = get_realm(request.POST["new_realm"]) realm = get_realm(request.POST["new_realm"])
@ -827,98 +796,6 @@ def start_two_factor_auth(
return two_fa_view(request, **kwargs) return two_fa_view(request, **kwargs)
@csrf_exempt
@has_request_variables
def dev_direct_login(
request: HttpRequest,
next: str = REQ(default="/"),
) -> HttpResponse:
# This function allows logging in without a password and should only be called
# in development environments. It may be called if the DevAuthBackend is included
# in settings.AUTHENTICATION_BACKENDS
if (not dev_auth_enabled()) or settings.PRODUCTION:
# This check is probably not required, since authenticate would fail without
# an enabled DevAuthBackend.
return config_error(request, "dev")
email = request.POST["direct_email"]
subdomain = get_subdomain(request)
realm = get_realm(subdomain)
user_profile = authenticate(dev_auth_username=email, realm=realm)
if user_profile is None:
return config_error(request, "dev")
do_login(request, user_profile)
redirect_to = get_safe_redirect_to(next, user_profile.realm.uri)
return HttpResponseRedirect(redirect_to)
def check_dev_auth_backend() -> None:
if settings.PRODUCTION:
raise JsonableError(_("Endpoint not available in production."))
if not dev_auth_enabled():
raise JsonableError(_("DevAuthBackend not enabled."))
@csrf_exempt
@require_post
@has_request_variables
def api_dev_fetch_api_key(request: HttpRequest, username: str = REQ()) -> HttpResponse:
"""This function allows logging in without a password on the Zulip
mobile apps when connecting to a Zulip development environment. It
requires DevAuthBackend to be included in settings.AUTHENTICATION_BACKENDS.
"""
check_dev_auth_backend()
# Django invokes authenticate methods by matching arguments, and this
# authentication flow will not invoke LDAP authentication because of
# this condition of Django so no need to check if LDAP backend is
# enabled.
validate_login_email(username)
realm = get_realm_from_request(request)
if realm is None:
return json_error(_("Invalid subdomain"))
return_data: Dict[str, bool] = {}
user_profile = authenticate(dev_auth_username=username, realm=realm, return_data=return_data)
if return_data.get("inactive_realm"):
return json_error(
_("This organization has been deactivated."),
data={"reason": "realm deactivated"},
status=403,
)
if return_data.get("inactive_user"):
return json_error(
_("Your account has been disabled."), data={"reason": "user disable"}, status=403
)
if user_profile is None:
return json_error(
_("This user is not registered."), data={"reason": "unregistered"}, status=403
)
do_login(request, user_profile)
api_key = get_api_key(user_profile)
return json_success({"api_key": api_key, "email": user_profile.delivery_email})
@csrf_exempt
def api_dev_list_users(request: HttpRequest) -> HttpResponse:
check_dev_auth_backend()
users = get_dev_users()
return json_success(
dict(
direct_admins=[
dict(email=u.delivery_email, realm_uri=u.realm.uri)
for u in users
if u.is_realm_admin
],
direct_users=[
dict(email=u.delivery_email, realm_uri=u.realm.uri)
for u in users
if not u.is_realm_admin
],
)
)
@csrf_exempt @csrf_exempt
@require_post @require_post
@has_request_variables @has_request_variables

View File

@ -0,0 +1,143 @@
from typing import Any, Dict, List, Optional
from django.conf import settings
from django.contrib.auth import authenticate
from django.http import HttpRequest, HttpResponse, HttpResponseRedirect
from django.utils.translation import gettext as _
from django.views.decorators.csrf import csrf_exempt
from zerver.context_processors import get_realm_from_request
from zerver.decorator import do_login, require_post
from zerver.lib.request import REQ, JsonableError, has_request_variables
from zerver.lib.response import json_error, json_success
from zerver.lib.subdomains import get_subdomain
from zerver.lib.users import get_api_key
from zerver.lib.validator import validate_login_email
from zerver.models import Realm, UserProfile, get_realm
from zerver.views.auth import config_error, get_safe_redirect_to
from zproject.backends import dev_auth_enabled
def get_dev_users(realm: Optional[Realm] = None, extra_users_count: int = 10) -> List[UserProfile]:
# Development environments usually have only a few users, but
# it still makes sense to limit how many extra users we render to
# support performance testing with DevAuthBackend.
if realm is not None:
users_query = UserProfile.objects.select_related().filter(
is_bot=False, is_active=True, realm=realm
)
else:
users_query = UserProfile.objects.select_related().filter(is_bot=False, is_active=True)
shakespearian_users = users_query.exclude(email__startswith="extrauser").order_by("email")
extra_users = users_query.filter(email__startswith="extrauser").order_by("email")
# Limit the number of extra users we offer by default
extra_users = extra_users[0:extra_users_count]
users = list(shakespearian_users) + list(extra_users)
return users
def add_dev_login_context(realm: Optional[Realm], context: Dict[str, Any]) -> None:
users = get_dev_users(realm)
context["current_realm"] = realm
context["all_realms"] = Realm.objects.all()
def sort(lst: List[UserProfile]) -> List[UserProfile]:
return sorted(lst, key=lambda u: u.delivery_email)
context["direct_owners"] = sort([u for u in users if u.is_realm_owner])
context["direct_admins"] = sort([u for u in users if u.is_realm_admin and not u.is_realm_owner])
context["guest_users"] = sort([u for u in users if u.is_guest])
context["direct_users"] = sort([u for u in users if not (u.is_realm_admin or u.is_guest)])
@csrf_exempt
@has_request_variables
def dev_direct_login(
request: HttpRequest,
next: str = REQ(default="/"),
) -> HttpResponse:
# This function allows logging in without a password and should only be called
# in development environments. It may be called if the DevAuthBackend is included
# in settings.AUTHENTICATION_BACKENDS
if (not dev_auth_enabled()) or settings.PRODUCTION:
# This check is probably not required, since authenticate would fail without
# an enabled DevAuthBackend.
return config_error(request, "dev")
email = request.POST["direct_email"]
subdomain = get_subdomain(request)
realm = get_realm(subdomain)
user_profile = authenticate(dev_auth_username=email, realm=realm)
if user_profile is None:
return config_error(request, "dev")
do_login(request, user_profile)
redirect_to = get_safe_redirect_to(next, user_profile.realm.uri)
return HttpResponseRedirect(redirect_to)
def check_dev_auth_backend() -> None:
if settings.PRODUCTION:
raise JsonableError(_("Endpoint not available in production."))
if not dev_auth_enabled():
raise JsonableError(_("DevAuthBackend not enabled."))
@csrf_exempt
@require_post
@has_request_variables
def api_dev_fetch_api_key(request: HttpRequest, username: str = REQ()) -> HttpResponse:
"""This function allows logging in without a password on the Zulip
mobile apps when connecting to a Zulip development environment. It
requires DevAuthBackend to be included in settings.AUTHENTICATION_BACKENDS.
"""
check_dev_auth_backend()
# Django invokes authenticate methods by matching arguments, and this
# authentication flow will not invoke LDAP authentication because of
# this condition of Django so no need to check if LDAP backend is
# enabled.
validate_login_email(username)
realm = get_realm_from_request(request)
if realm is None:
return json_error(_("Invalid subdomain"))
return_data: Dict[str, bool] = {}
user_profile = authenticate(dev_auth_username=username, realm=realm, return_data=return_data)
if return_data.get("inactive_realm"):
return json_error(
_("This organization has been deactivated."),
data={"reason": "realm deactivated"},
status=403,
)
if return_data.get("inactive_user"):
return json_error(
_("Your account has been disabled."), data={"reason": "user disable"}, status=403
)
if user_profile is None:
return json_error(
_("This user is not registered."), data={"reason": "unregistered"}, status=403
)
do_login(request, user_profile)
api_key = get_api_key(user_profile)
return json_success({"api_key": api_key, "email": user_profile.delivery_email})
@csrf_exempt
def api_dev_list_users(request: HttpRequest) -> HttpResponse:
check_dev_auth_backend()
users = get_dev_users()
return json_success(
dict(
direct_admins=[
dict(email=u.delivery_email, realm_uri=u.realm.uri)
for u in users
if u.is_realm_admin
],
direct_users=[
dict(email=u.delivery_email, realm_uri=u.realm.uri)
for u in users
if not u.is_realm_admin
],
)
)

View File

@ -21,11 +21,8 @@ from zerver.views.alert_words import add_alert_words, list_alert_words, remove_a
from zerver.views.archive import archive, get_web_public_topics_backend from zerver.views.archive import archive, get_web_public_topics_backend
from zerver.views.attachments import list_by_user, remove from zerver.views.attachments import list_by_user, remove
from zerver.views.auth import ( from zerver.views.auth import (
api_dev_fetch_api_key,
api_dev_list_users,
api_fetch_api_key, api_fetch_api_key,
api_get_server_settings, api_get_server_settings,
dev_direct_login,
json_fetch_api_key, json_fetch_api_key,
log_into_subdomain, log_into_subdomain,
login_page, login_page,
@ -50,6 +47,11 @@ from zerver.views.custom_profile_fields import (
update_realm_custom_profile_field, update_realm_custom_profile_field,
update_user_custom_profile_data, update_user_custom_profile_data,
) )
from zerver.views.development.dev_login import (
api_dev_fetch_api_key,
api_dev_list_users,
dev_direct_login,
)
from zerver.views.digest import digest_page from zerver.views.digest import digest_page
from zerver.views.documentation import IntegrationView, MarkdownDirectoryView, integration_doc from zerver.views.documentation import IntegrationView, MarkdownDirectoryView, integration_doc
from zerver.views.drafts import create_drafts, delete_draft, edit_draft, fetch_drafts from zerver.views.drafts import create_drafts, delete_draft, edit_draft, fetch_drafts