import logging from typing import Any, Dict, List, Set, Tuple, Optional, Text from apiclient.sample_tools import client as googleapiclient from django_auth_ldap.backend import LDAPBackend, _LDAPUser import django.contrib.auth from django.contrib.auth.backends import RemoteUserBackend from django.conf import settings from django.http import HttpResponse from oauth2client.crypt import AppIdentityError from social_core.backends.github import GithubOAuth2, GithubOrganizationOAuth2, \ GithubTeamOAuth2 from social_core.utils import handle_http_errors from social_core.exceptions import AuthFailed, SocialAuthBaseException from social_django.models import DjangoStorage from social_django.strategy import DjangoStrategy from zerver.lib.actions import do_create_user from zerver.lib.request import JsonableError from zerver.lib.subdomains import user_matches_subdomain, get_subdomain from zerver.lib.users import check_full_name from zerver.models import UserProfile, Realm, get_user_profile_by_id, \ get_user_profile_by_email, remote_user_to_email, email_to_username, \ get_realm def pad_method_dict(method_dict): # type: (Dict[Text, bool]) -> Dict[Text, bool] """Pads an authentication methods dict to contain all auth backends supported by the software, regardless of whether they are configured on this server""" for key in AUTH_BACKEND_NAME_MAP: if key not in method_dict: method_dict[key] = False return method_dict def auth_enabled_helper(backends_to_check, realm): # type: (List[Text], Optional[Realm]) -> bool if realm is not None: enabled_method_dict = realm.authentication_methods_dict() pad_method_dict(enabled_method_dict) else: enabled_method_dict = dict((method, True) for method in Realm.AUTHENTICATION_FLAGS) pad_method_dict(enabled_method_dict) for supported_backend in django.contrib.auth.get_backends(): for backend_name in backends_to_check: backend = AUTH_BACKEND_NAME_MAP[backend_name] if enabled_method_dict[backend_name] and isinstance(supported_backend, backend): return True return False def ldap_auth_enabled(realm=None): # type: (Optional[Realm]) -> bool return auth_enabled_helper(['LDAP'], realm) def email_auth_enabled(realm=None): # type: (Optional[Realm]) -> bool return auth_enabled_helper(['Email'], realm) def password_auth_enabled(realm=None): # type: (Optional[Realm]) -> bool return ldap_auth_enabled(realm) or email_auth_enabled(realm) def dev_auth_enabled(realm=None): # type: (Optional[Realm]) -> bool return auth_enabled_helper(['Dev'], realm) def google_auth_enabled(realm=None): # type: (Optional[Realm]) -> bool return auth_enabled_helper(['Google'], realm) def github_auth_enabled(realm=None): # type: (Optional[Realm]) -> bool return auth_enabled_helper(['GitHub'], realm) def any_oauth_backend_enabled(realm=None): # type: (Optional[Realm]) -> bool """Used by the login page process to determine whether to show the 'OR' for login with Google""" return auth_enabled_helper(['GitHub', 'Google'], realm) def require_email_format_usernames(realm=None): # type: (Optional[Realm]) -> bool if ldap_auth_enabled(realm): if settings.LDAP_EMAIL_ATTR or settings.LDAP_APPEND_DOMAIN: return False return True def common_get_active_user_by_email(email, return_data=None): # type: (Text, Optional[Dict[str, Any]]) -> Optional[UserProfile] try: user_profile = get_user_profile_by_email(email) except UserProfile.DoesNotExist: return None if not user_profile.is_active: if return_data is not None: return_data['inactive_user'] = True return None if user_profile.realm.deactivated: if return_data is not None: return_data['inactive_realm'] = True return None return user_profile class ZulipAuthMixin: def get_user(self, user_profile_id): # type: (int) -> Optional[UserProfile] """ Get a UserProfile object from the user_profile_id. """ try: return get_user_profile_by_id(user_profile_id) except UserProfile.DoesNotExist: return None class SocialAuthMixin(ZulipAuthMixin): auth_backend_name = None # type: Text def get_email_address(self, *args, **kwargs): # type: (*Any, **Any) -> Text raise NotImplementedError def get_full_name(self, *args, **kwargs): # type: (*Any, **Any) -> Text raise NotImplementedError def get_authenticated_user(self, *args: Any, **kwargs: Any) -> Optional[UserProfile]: raise NotImplementedError @handle_http_errors def do_auth(self, *args: Any, **kwargs: Any) -> Optional[HttpResponse]: """ This function is called once the authentication workflow is complete. We override this function to: 1. Inject `return_data` and `realm_subdomain` kwargs. These will be used by `authenticate()` functions of backends to make the decision. 2. Call the proper authentication function to get the user in `get_authenticated_user`. The actual decision on authentication is done in SocialAuthMixin._common_authenticate(). SocialAuthMixin.get_authenticated_user is expected to be overridden by the derived class to add custom logic for authenticating the user and returning the user. """ kwargs['return_data'] = {} kwargs['realm_subdomain'] = get_subdomain(self.strategy.request) # type: ignore # `strategy` comes from Python Social Auth. user_profile = self.get_authenticated_user(*args, **kwargs) return self.process_do_auth(user_profile, *args, **kwargs) def authenticate(self, realm_subdomain='', # type: Optional[Text] storage=None, # type: Optional[DjangoStorage] strategy=None, # type: Optional[DjangoStrategy] user=None, # type: Optional[Dict[str, Any]] return_data=None, # type: Optional[Dict[str, Any]] response=None, # type: Optional[Dict[str, Any]] backend=None # type: Optional[GithubOAuth2] ): # type: (...) -> Optional[UserProfile] """ Django decides which `authenticate` to call by inspecting the arguments. So it's better to create `authenticate` function with well defined arguments. Keeping this function separate so that it can easily be overridden. """ if user is None: user = {} assert return_data is not None assert response is not None return self._common_authenticate(self, realm_subdomain=realm_subdomain, storage=storage, strategy=strategy, user=user, return_data=return_data, response=response, backend=backend) def _common_authenticate(self, *args, **kwargs): # type: (*Any, **Any) -> Optional[UserProfile] return_data = kwargs.get('return_data', {}) email_address = self.get_email_address(*args, **kwargs) if not email_address: return_data['invalid_email'] = True return None try: user_profile = get_user_profile_by_email(email_address) except UserProfile.DoesNotExist: return_data["valid_attestation"] = True return None if not user_profile.is_active: return_data["inactive_user"] = True return None if user_profile.realm.deactivated: return_data["inactive_realm"] = True return None if not user_matches_subdomain(kwargs.get("realm_subdomain"), user_profile): return_data["invalid_subdomain"] = True return None if not auth_enabled_helper([self.auth_backend_name], user_profile.realm): return_data["auth_backend_disabled"] = True return None return user_profile def process_do_auth(self, user_profile, *args, **kwargs): # type: (UserProfile, *Any, **Any) -> Optional[HttpResponse] # These functions need to be imported here to avoid cyclic # dependency. from zerver.views.auth import (login_or_register_remote_user, redirect_to_subdomain_login_url, redirect_and_log_into_subdomain) return_data = kwargs.get('return_data', {}) inactive_user = return_data.get('inactive_user') inactive_realm = return_data.get('inactive_realm') invalid_subdomain = return_data.get('invalid_subdomain') invalid_email = return_data.get('invalid_email') if inactive_user or inactive_realm: # Redirect to login page. We can't send to registration # workflow with these errors. We will redirect to login page. return None if invalid_email: # In case of invalid email, we will end up on registration page. # This seems better than redirecting to login page. logging.warning( "{} got invalid email argument.".format(self.auth_backend_name) ) strategy = self.strategy # type: ignore # This comes from Python Social Auth. request = strategy.request email_address = self.get_email_address(*args, **kwargs) full_name = self.get_full_name(*args, **kwargs) is_signup = strategy.session_get('is_signup') == '1' mobile_flow_otp = strategy.session_get('mobile_flow_otp') subdomain = strategy.session_get('subdomain') if not subdomain: # At least in our tests, this can be None; and it's not # clear what the exact semantics of `session_get` are or # what values it might return. Historically we treated # any falsy value here as the root domain, so defensively # continue that behavior. subdomain = Realm.SUBDOMAIN_FOR_ROOT_DOMAIN if (subdomain == Realm.SUBDOMAIN_FOR_ROOT_DOMAIN or mobile_flow_otp is not None): return login_or_register_remote_user(request, email_address, user_profile, full_name, invalid_subdomain=bool(invalid_subdomain), mobile_flow_otp=mobile_flow_otp, is_signup=is_signup) realm = get_realm(subdomain) if realm is None: return redirect_to_subdomain_login_url() return redirect_and_log_into_subdomain(realm, full_name, email_address, is_signup=is_signup) def auth_complete(self, *args, **kwargs): # type: (*Any, **Any) -> Optional[HttpResponse] """ Returning `None` from this function will redirect the browser to the login page. """ try: # Call the auth_complete method of social_core.backends.oauth.BaseOAuth2 return super().auth_complete(*args, **kwargs) # type: ignore # monkey-patching except AuthFailed: return None except SocialAuthBaseException as e: logging.warning(str(e)) return None class ZulipDummyBackend(ZulipAuthMixin): """ Used when we want to log you in but we don't know which backend to use. """ def authenticate(self, username=None, realm_subdomain=None, use_dummy_backend=False, return_data=None): # type: (Optional[Text], Optional[Text], bool, Optional[Dict[str, Any]]) -> Optional[UserProfile] assert username is not None if use_dummy_backend: user_profile = common_get_active_user_by_email(username) if user_profile is None: return None if not user_matches_subdomain(realm_subdomain, user_profile): if return_data is not None: return_data["invalid_subdomain"] = True return None return user_profile return None class EmailAuthBackend(ZulipAuthMixin): """ Email Authentication Backend Allows a user to sign in using an email/password pair rather than a username/password pair. """ def authenticate(self, username=None, password=None, realm_subdomain=None, return_data=None): # type: (Optional[Text], Optional[str], Optional[Text], Optional[Dict[str, Any]]) -> Optional[UserProfile] """ Authenticate a user based on email address as the user name. """ if username is None or password is None: # Return immediately. Otherwise we will look for a SQL row with # NULL username. While that's probably harmless, it's needless # exposure. return None user_profile = common_get_active_user_by_email(username, return_data=return_data) if user_profile is None: return None if not password_auth_enabled(user_profile.realm): if return_data is not None: return_data['password_auth_disabled'] = True return None if not email_auth_enabled(user_profile.realm): if return_data is not None: return_data['email_auth_disabled'] = True return None if user_profile.check_password(password): if not user_matches_subdomain(realm_subdomain, user_profile): if return_data is not None: return_data["invalid_subdomain"] = True return None return user_profile return None class GoogleMobileOauth2Backend(ZulipAuthMixin): """ Google Apps authentication for mobile devices Allows a user to sign in using a Google-issued OAuth2 token. Ref: https://developers.google.com/+/mobile/android/sign-in#server-side_access_for_your_app https://developers.google.com/accounts/docs/CrossClientAuth#offlineAccess """ def authenticate(self, google_oauth2_token=None, realm_subdomain=None, return_data=None): # type: (Optional[str], Optional[Text], Optional[Dict[str, Any]]) -> Optional[UserProfile] if return_data is None: return_data = {} try: token_payload = googleapiclient.verify_id_token(google_oauth2_token, settings.GOOGLE_CLIENT_ID) except AppIdentityError: return None if token_payload["email_verified"] in (True, "true"): try: user_profile = get_user_profile_by_email(token_payload["email"]) except UserProfile.DoesNotExist: return_data["valid_attestation"] = True return None if not user_profile.is_active: return_data["inactive_user"] = True return None if user_profile.realm.deactivated: return_data["inactive_realm"] = True return None if not user_matches_subdomain(realm_subdomain, user_profile): return_data["invalid_subdomain"] = True return None if not google_auth_enabled(realm=user_profile.realm): return_data["google_auth_disabled"] = True return None return user_profile else: return_data["valid_attestation"] = False return None class ZulipRemoteUserBackend(RemoteUserBackend): create_unknown_user = False def authenticate(self, remote_user, realm_subdomain=None): # type: (Optional[str], Optional[Text]) -> Optional[UserProfile] if not remote_user: return None email = remote_user_to_email(remote_user) user_profile = common_get_active_user_by_email(email) if user_profile is None: return None if not user_matches_subdomain(realm_subdomain, user_profile): return None if not auth_enabled_helper(["RemoteUser"], user_profile.realm): return None return user_profile class ZulipLDAPException(_LDAPUser.AuthenticationFailed): pass class ZulipLDAPConfigurationError(Exception): pass class ZulipLDAPAuthBackendBase(ZulipAuthMixin, LDAPBackend): # Don't use Django LDAP's permissions functions def has_perm(self, user, perm, obj=None): # type: (Optional[UserProfile], Any, Any) -> bool # Using Any type is safe because we are not doing anything with # the arguments. return False def has_module_perms(self, user, app_label): # type: (Optional[UserProfile], Optional[str]) -> bool return False def get_all_permissions(self, user, obj=None): # type: (Optional[UserProfile], Any) -> Set[Any] # Using Any type is safe because we are not doing anything with # the arguments and always return empty set. return set() def get_group_permissions(self, user, obj=None): # type: (Optional[UserProfile], Any) -> Set[Any] # Using Any type is safe because we are not doing anything with # the arguments and always return empty set. return set() def django_to_ldap_username(self, username): # type: (Text) -> Text if settings.LDAP_APPEND_DOMAIN: if not username.endswith("@" + settings.LDAP_APPEND_DOMAIN): raise ZulipLDAPException("Username does not match LDAP domain.") return email_to_username(username) return username def ldap_to_django_username(self, username): # type: (str) -> str if settings.LDAP_APPEND_DOMAIN: return "@".join((username, settings.LDAP_APPEND_DOMAIN)) return username class ZulipLDAPAuthBackend(ZulipLDAPAuthBackendBase): REALM_IS_NONE_ERROR = 1 def authenticate(self, username, password, realm_subdomain=None, return_data=None): # type: (Text, str, Optional[Text], Optional[Dict[str, Any]]) -> Optional[UserProfile] try: self._realm = get_realm(realm_subdomain) username = self.django_to_ldap_username(username) user_profile = ZulipLDAPAuthBackendBase.authenticate(self, username=username, password=password) if user_profile is None: return None if not user_matches_subdomain(realm_subdomain, user_profile): return None return user_profile except Realm.DoesNotExist: return None # nocoverage # TODO: this may no longer be possible except ZulipLDAPException: return None # nocoverage # TODO: this may no longer be possible def get_or_create_user(self, username, ldap_user): # type: (str, _LDAPUser) -> Tuple[UserProfile, bool] try: if settings.LDAP_EMAIL_ATTR is not None: # Get email from ldap attributes. if settings.LDAP_EMAIL_ATTR not in ldap_user.attrs: raise ZulipLDAPException("LDAP user doesn't have the needed %s attribute" % ( settings.LDAP_EMAIL_ATTR,)) username = ldap_user.attrs[settings.LDAP_EMAIL_ATTR][0] user_profile = get_user_profile_by_email(username) if not user_profile.is_active or user_profile.realm.deactivated: raise ZulipLDAPException("Realm has been deactivated") if not ldap_auth_enabled(user_profile.realm): raise ZulipLDAPException("LDAP Authentication is not enabled") return user_profile, False except UserProfile.DoesNotExist: if self._realm is None: raise ZulipLDAPConfigurationError("Realm is None", self.REALM_IS_NONE_ERROR) # No need to check for an inactive user since they don't exist yet if self._realm.deactivated: raise ZulipLDAPException("Realm has been deactivated") full_name_attr = settings.AUTH_LDAP_USER_ATTR_MAP["full_name"] short_name = full_name = ldap_user.attrs[full_name_attr][0] try: full_name = check_full_name(full_name) except JsonableError as e: raise ZulipLDAPException(e.msg) if "short_name" in settings.AUTH_LDAP_USER_ATTR_MAP: short_name_attr = settings.AUTH_LDAP_USER_ATTR_MAP["short_name"] short_name = ldap_user.attrs[short_name_attr][0] user_profile = do_create_user(username, None, self._realm, full_name, short_name) return user_profile, True # Just like ZulipLDAPAuthBackend, but doesn't let you log in. class ZulipLDAPUserPopulator(ZulipLDAPAuthBackendBase): def authenticate(self, username, password, realm_subdomain=None): # type: (Text, str, Optional[Text]) -> None return None class DevAuthBackend(ZulipAuthMixin): # Allow logging in as any user without a password. # This is used for convenience when developing Zulip. def authenticate(self, username, realm_subdomain=None, return_data=None): # type: (Text, Optional[Text], Optional[Dict[str, Any]]) -> Optional[UserProfile] user_profile = common_get_active_user_by_email(username, return_data=return_data) if user_profile is None: return None if not dev_auth_enabled(user_profile.realm): return None return user_profile class GitHubAuthBackend(SocialAuthMixin, GithubOAuth2): auth_backend_name = "GitHub" def get_email_address(self, *args, **kwargs): # type: (*Any, **Any) -> Optional[Text] try: return kwargs['response']['email'] except KeyError: # nocoverage # TODO: investigate return None def get_full_name(self, *args, **kwargs): # type: (*Any, **Any) -> Text # In case of any error return an empty string. Name is used by # the registration page to pre-populate the name field. However, # if it is not supplied, our registration process will make sure # that the user enters a valid name. try: name = kwargs['response']['name'] except KeyError: name = '' if name is None: return '' return name def get_authenticated_user(self, *args: Any, **kwargs: Any) -> Optional[UserProfile]: """ This function is called once the OAuth2 workflow is complete. We override this function to call the proper `do_auth` function depending on whether we are doing individual, team or organization based GitHub authentication. The actual decision on authentication is done in SocialAuthMixin._common_authenticate(). """ user_profile = None team_id = settings.SOCIAL_AUTH_GITHUB_TEAM_ID org_name = settings.SOCIAL_AUTH_GITHUB_ORG_NAME if (team_id is None and org_name is None): try: user_profile = GithubOAuth2.do_auth(self, *args, **kwargs) except AuthFailed: logging.info("User authentication failed.") user_profile = None elif (team_id): backend = GithubTeamOAuth2(self.strategy, self.redirect_uri) try: user_profile = backend.do_auth(*args, **kwargs) except AuthFailed: logging.info("User is not member of GitHub team.") user_profile = None elif (org_name): backend = GithubOrganizationOAuth2(self.strategy, self.redirect_uri) try: user_profile = backend.do_auth(*args, **kwargs) except AuthFailed: logging.info("User is not member of GitHub organization.") user_profile = None return user_profile AUTH_BACKEND_NAME_MAP = { 'Dev': DevAuthBackend, 'Email': EmailAuthBackend, 'GitHub': GitHubAuthBackend, 'Google': GoogleMobileOauth2Backend, 'LDAP': ZulipLDAPAuthBackend, 'RemoteUser': ZulipRemoteUserBackend, } # type: Dict[Text, Any]