import base64 import logging import re import shlex import subprocess from email.headerregistry import Address import orjson from django.conf import settings from django.http import HttpRequest, HttpResponse from django.utils.translation import gettext as _ from zerver.decorator import authenticated_json_view from zerver.lib.ccache import make_ccache from zerver.lib.exceptions import JsonableError from zerver.lib.pysa import mark_sanitized from zerver.lib.response import json_success from zerver.lib.typed_endpoint import typed_endpoint from zerver.lib.users import get_api_key from zerver.models import UserProfile # Hack for mit.edu users whose Kerberos usernames don't match what they zephyr # as. The key is for Kerberos and the value is for zephyr. kerberos_alter_egos = { "golem": "ctl", } @authenticated_json_view @typed_endpoint def webathena_kerberos_login( request: HttpRequest, user_profile: UserProfile, *, cred: str | None = None ) -> HttpResponse: if cred is None: raise JsonableError(_("Could not find Kerberos credential")) if not user_profile.realm.webathena_enabled: raise JsonableError(_("Webathena login not enabled")) try: parsed_cred = orjson.loads(cred) user = parsed_cred["cname"]["nameString"][0] if user in kerberos_alter_egos: user = kerberos_alter_egos[user] assert user == Address(addr_spec=user_profile.email).username # Limit characters in usernames to valid MIT usernames # This is important for security since DNS is not secure. assert re.match(r"^[a-z0-9_.-]+$", user) is not None ccache = make_ccache(parsed_cred) # 'user' has been verified to contain only benign characters that won't # help with shell injection. user = mark_sanitized(user) # 'ccache' is only written to disk by the script and used as a kerberos # credential cache file. ccache = mark_sanitized(ccache) except Exception: raise JsonableError(_("Invalid Kerberos cache")) if settings.PERSONAL_ZMIRROR_SERVER is None: logging.error("PERSONAL_ZMIRROR_SERVER is not properly configured", stack_info=True) raise JsonableError(_("We were unable to set up mirroring for you")) # TODO: Send these data via (say) RabbitMQ try: api_key = get_api_key(user_profile) command = [ "/home/zulip/python-zulip-api/zulip/integrations/zephyr/process_ccache", user, api_key, base64.b64encode(ccache).decode(), ] subprocess.check_call(["ssh", settings.PERSONAL_ZMIRROR_SERVER, "--", shlex.join(command)]) except subprocess.CalledProcessError: logging.exception("Error updating the user's ccache", stack_info=True) raise JsonableError(_("We were unable to set up mirroring for you")) return json_success(request)