diff --git a/zerver/tests/test_auth_backends.py b/zerver/tests/test_auth_backends.py index b6924ef6fa..82e44e582e 100644 --- a/zerver/tests/test_auth_backends.py +++ b/zerver/tests/test_auth_backends.py @@ -1472,7 +1472,7 @@ class SAMLAuthBackendTest(SocialAuthBase): mock.patch('zproject.backends.logging.info') as m: # This mock causes AuthFailed to be raised. saml_response = self.generate_saml_response(self.email, self.name) - relay_state = SAMLAuthBackend.put_data_in_redis({"idp": "test_idp"}) + relay_state = SAMLAuthBackend.put_data_in_redis({"idp": "test_idp", "subdomain": "zulip"}) post_params = {"SAMLResponse": saml_response, "RelayState": relay_state} result = self.client_post('/complete/saml/', post_params) self.assertEqual(result.status_code, 302) @@ -1485,7 +1485,7 @@ class SAMLAuthBackendTest(SocialAuthBase): side_effect=AuthStateForbidden('State forbidden')), \ mock.patch('zproject.backends.logging.warning') as m: saml_response = self.generate_saml_response(self.email, self.name) - relay_state = SAMLAuthBackend.put_data_in_redis({"idp": "test_idp"}) + relay_state = SAMLAuthBackend.put_data_in_redis({"idp": "test_idp", "subdomain": "zulip"}) post_params = {"SAMLResponse": saml_response, "RelayState": relay_state} result = self.client_post('/complete/saml/', post_params) self.assertEqual(result.status_code, 302) @@ -1505,7 +1505,7 @@ class SAMLAuthBackendTest(SocialAuthBase): # Check that POSTing the RelayState, but with missing SAMLResponse, # doesn't cause errors either: with mock.patch('zproject.backends.logging.info') as m: - relay_state = SAMLAuthBackend.put_data_in_redis({"idp": "test_idp"}) + relay_state = SAMLAuthBackend.put_data_in_redis({"idp": "test_idp", "subdomain": "zulip"}) post_params = {"RelayState": relay_state} result = self.client_post('/complete/saml/', post_params) self.assertEqual(result.status_code, 302) @@ -1516,7 +1516,7 @@ class SAMLAuthBackendTest(SocialAuthBase): ) with mock.patch('zproject.backends.logging.info') as m: - relay_state = SAMLAuthBackend.put_data_in_redis({"idp": "test_idp"}) + relay_state = SAMLAuthBackend.put_data_in_redis({"idp": "test_idp", "subdomain": "zulip"}) relay_state = relay_state[:-1] # Break the token by removing the last character post_params = {"RelayState": relay_state} result = self.client_post('/complete/saml/', post_params) @@ -1524,6 +1524,16 @@ class SAMLAuthBackendTest(SocialAuthBase): self.assertIn('login', result.url) m.assert_called_with("SAML authentication failed: bad RelayState token.") + def test_social_auth_complete_no_idp_and_subdomain(self) -> None: + with mock.patch('zproject.backends.logging.info') as m: + relay_state = SAMLAuthBackend.put_data_in_redis({}) + post_params = {"RelayState": relay_state} + result = self.client_post('/complete/saml/', post_params) + self.assertEqual(result.status_code, 302) + self.assertEqual('/login/', result.url) + self.assertIn("Missing idp or subdomain value in relayed_params", + m.call_args.args[0]) + def test_social_auth_saml_bad_idp_param_on_login_page(self) -> None: with mock.patch('zproject.backends.logging.info') as m: result = self.client_get('/login/saml/') @@ -1555,11 +1565,14 @@ class SAMLAuthBackendTest(SocialAuthBase): # We deepcopy() dictionaries around for the sake of brevity, # to avoid having to spell them out explicitly here. # The second idp's configuration is a copy of the first one, - # with name test_idp2 and altered url. + # with name test_idp2 and altered url. It is also configured to be + # limited to the zulip realm, so that we get to test both types + # of configs here. idps_dict = copy.deepcopy(settings.SOCIAL_AUTH_SAML_ENABLED_IDPS) idps_dict['test_idp2'] = copy.deepcopy(idps_dict['test_idp']) idps_dict['test_idp2']['url'] = 'https://idp2.example.com/idp/profile/SAML2/Redirect/SSO' idps_dict['test_idp2']['display_name'] = 'Second Test IdP' + idps_dict['test_idp2']['limit_to_subdomains'] = ['zulip', ] # Run tests with multiple idps configured: with self.settings(SOCIAL_AUTH_SAML_ENABLED_IDPS=idps_dict): @@ -1591,6 +1604,25 @@ class SAMLAuthBackendTest(SocialAuthBase): self.SIGNUP_URL = original_SIGNUP_URL self.AUTHORIZATION_URL = original_AUTHORIZATION_URL + def test_social_auth_saml_idp_limited_to_subdomains_success(self) -> None: + idps_dict = copy.deepcopy(settings.SOCIAL_AUTH_SAML_ENABLED_IDPS) + idps_dict['test_idp']['limit_to_subdomains'] = ['zulip', ] + with self.settings(SOCIAL_AUTH_SAML_ENABLED_IDPS=idps_dict): + self.test_social_auth_success() + + def test_social_auth_saml_idp_limited_to_subdomains_attempt_wrong_realm(self) -> None: + idps_dict = copy.deepcopy(settings.SOCIAL_AUTH_SAML_ENABLED_IDPS) + idps_dict['test_idp']['limit_to_subdomains'] = ['zulip', ] + with self.settings(SOCIAL_AUTH_SAML_ENABLED_IDPS=idps_dict): + account_data_dict = self.get_account_data_dict(email=self.email, name=self.name) + with mock.patch('zproject.backends.logging.info') as m: + result = self.social_auth_test(account_data_dict, subdomain='zephyr') + self.assertEqual(result.status_code, 302) + self.assertEqual('/login/', result.url) + m.assert_called_with( + "User authenticated with IdP test_idp but this provider is not enabled for this realm zephyr." + ) + def test_social_auth_saml_login_bad_idp_arg(self) -> None: for action in ['login', 'register']: result = self.client_get('/accounts/{}/social/saml'.format(action)) @@ -2446,6 +2478,36 @@ class ExternalMethodDictsTests(ZulipTestCase): self.assert_in_success_response([string.format("register") for string in expected_button_id_strings], result) + def test_get_external_method_dicts_multiple_saml_idps(self) -> None: + idps_dict = copy.deepcopy(settings.SOCIAL_AUTH_SAML_ENABLED_IDPS) + # Create another IdP config, by copying the original one and changing some details.idps_dict['test_idp']) + idps_dict['test_idp2'] = copy.deepcopy(idps_dict['test_idp']) + idps_dict['test_idp2']['url'] = 'https://idp2.example.com/idp/profile/SAML2/Redirect/SSO' + idps_dict['test_idp2']['display_name'] = 'Second Test IdP' + idps_dict['test_idp2']['limit_to_subdomains'] = ['zephyr', ] + with self.settings( + SOCIAL_AUTH_SAML_ENABLED_IDPS=idps_dict, + AUTHENTICATION_BACKENDS=('zproject.backends.EmailAuthBackend', + 'zproject.backends.GitHubAuthBackend', + 'zproject.backends.SAMLAuthBackend') + ): + # Calling get_external_method_dicts without a realm returns all methods configured on the server: + external_auth_methods = get_external_method_dicts() + self.assert_length(external_auth_methods, 3) # 2 IdP + a dict for github auth + self.assertEqual(set([external_auth_methods[0]['name'], external_auth_methods[1]['name']]), + set(['saml:test_idp', 'saml:test_idp2'])) + + external_auth_methods = get_external_method_dicts(get_realm("zulip")) + # Only test_idp enabled for the zulip realm, + github auth. + self.assert_length(external_auth_methods, 2) + self.assertEqual(external_auth_methods[0]['name'], 'saml:test_idp') + + external_auth_methods = get_external_method_dicts(get_realm("zephyr")) + # Both idps enabled for the zephyr realm, + github auth. + self.assert_length(external_auth_methods, 3) + self.assertEqual(set([external_auth_methods[0]['name'], external_auth_methods[1]['name']]), + set(['saml:test_idp', 'saml:test_idp2'])) + class FetchAuthBackends(ZulipTestCase): def assert_on_error(self, error: Optional[str]) -> None: if error: diff --git a/zproject/backends.py b/zproject/backends.py index c67bfb8865..a0378ece48 100644 --- a/zproject/backends.py +++ b/zproject/backends.py @@ -884,7 +884,7 @@ class ExternalAuthMethod(ABC): @classmethod @abstractmethod - def dict_representation(cls) -> List[ExternalAuthMethodDictT]: + def dict_representation(cls, realm: Optional[Realm]=None) -> List[ExternalAuthMethodDictT]: """ Method returning dictionaries representing the authentication methods corresponding to the backend that subclasses this. The documentation @@ -929,7 +929,7 @@ class ZulipRemoteUserBackend(RemoteUserBackend, ExternalAuthMethod): return common_get_active_user(email, realm, return_data=return_data) @classmethod - def dict_representation(cls) -> List[ExternalAuthMethodDictT]: + def dict_representation(cls, realm: Optional[Realm]=None) -> List[ExternalAuthMethodDictT]: return [dict( name=cls.name, display_name="SSO", @@ -1261,7 +1261,7 @@ class SocialAuthMixin(ZulipAuthMixin, ExternalAuthMethod): return None @classmethod - def dict_representation(cls) -> List[ExternalAuthMethodDictT]: + def dict_representation(cls, realm: Optional[Realm]=None) -> List[ExternalAuthMethodDictT]: return [dict( name=cls.name, display_name=cls.auth_backend_name, @@ -1471,6 +1471,20 @@ class SAMLAuthBackend(SocialAuthMixin, SAMLAuth): if relayed_params is None: return None + idp_name = relayed_params.get("idp") + subdomain = relayed_params.get("subdomain") + if idp_name is None or subdomain is None: + error_msg = "Missing idp or subdomain value in relayed_params in SAML auth_complete: %s" + logging.info(error_msg % (dict(subdomain=subdomain, idp=idp_name),)) + return None + + idp_valid = self.validate_idp_for_subdomain(idp_name, subdomain) + if not idp_valid: + error_msg = "User authenticated with IdP %s but this provider is not " + \ + "enabled for this realm %s." + logging.info(error_msg % (idp_name, subdomain)) + return None + result = None try: for param, value in relayed_params.items(): @@ -1480,7 +1494,7 @@ class SAMLAuthBackend(SocialAuthMixin, SAMLAuth): # super().auth_complete expects to have RelayState set to the idp_name, # so we need to replace this param. post_params = self.strategy.request.POST.copy() - post_params['RelayState'] = relayed_params["idp"] + post_params['RelayState'] = idp_name self.strategy.request.POST = post_params # Call the auth_complete method of SocialAuthMixIn @@ -1500,6 +1514,16 @@ class SAMLAuthBackend(SocialAuthMixin, SAMLAuth): return result + @classmethod + def validate_idp_for_subdomain(cls, idp_name: str, subdomain: str) -> bool: + idp_dict = settings.SOCIAL_AUTH_SAML_ENABLED_IDPS.get(idp_name) + if idp_dict is None: + raise AssertionError("IdP: %s not found" % (idp_name,)) + if 'limit_to_subdomains' in idp_dict and subdomain not in idp_dict['limit_to_subdomains']: + return False + + return True + @classmethod def check_config(cls) -> Optional[HttpResponse]: obligatory_saml_settings_list = [ @@ -1515,9 +1539,12 @@ class SAMLAuthBackend(SocialAuthMixin, SAMLAuth): return None @classmethod - def dict_representation(cls) -> List[ExternalAuthMethodDictT]: + def dict_representation(cls, realm: Optional[Realm]=None) -> List[ExternalAuthMethodDictT]: result = [] # type: List[ExternalAuthMethodDictT] for idp_name, idp_dict in settings.SOCIAL_AUTH_SAML_ENABLED_IDPS.items(): + if realm and not cls.validate_idp_for_subdomain(idp_name, realm.subdomain): + continue + saml_dict = dict( name='saml:{}'.format(idp_name), display_name=idp_dict.get('display_name', cls.auth_backend_name), @@ -1548,7 +1575,7 @@ def get_external_method_dicts(realm: Optional[Realm]=None) -> List[ExternalAuthM # EXTERNAL_AUTH_METHODS is already sorted in the correct order, # so we don't need to worry about sorting here. if auth_enabled_helper([backend.auth_backend_name], realm): - result.extend(backend.dict_representation()) + result.extend(backend.dict_representation(realm)) return result diff --git a/zproject/prod_settings_template.py b/zproject/prod_settings_template.py index 2e66b3e86c..42d180100d 100644 --- a/zproject/prod_settings_template.py +++ b/zproject/prod_settings_template.py @@ -254,6 +254,10 @@ SOCIAL_AUTH_SAML_ENABLED_IDPS = { # the left end of the login/register buttons for this IDP. # The default of None results in a text-only button. # "display_icon": "/path/to/icon.png", + + # If you want this IdP to only be enabled for authentication + # to certain subdomains, uncomment and edit the setting below. + # "limit_to_subdomains": ["subdomain1", "subdomain2"], } }