saml: Implement limiting of IdP to specified realms.

Through the limit_to_subdomains setting on IdP dicts it's now possible
to limit the IdP to only allow authenticating to the specified realms.

Fixes #13340.
This commit is contained in:
Mateusz Mandera 2020-04-10 16:30:02 +02:00 committed by Tim Abbott
parent 7a9d592dbe
commit 143db68422
3 changed files with 104 additions and 11 deletions

View File

@ -1472,7 +1472,7 @@ class SAMLAuthBackendTest(SocialAuthBase):
mock.patch('zproject.backends.logging.info') as m:
# This mock causes AuthFailed to be raised.
saml_response = self.generate_saml_response(self.email, self.name)
relay_state = SAMLAuthBackend.put_data_in_redis({"idp": "test_idp"})
relay_state = SAMLAuthBackend.put_data_in_redis({"idp": "test_idp", "subdomain": "zulip"})
post_params = {"SAMLResponse": saml_response, "RelayState": relay_state}
result = self.client_post('/complete/saml/', post_params)
self.assertEqual(result.status_code, 302)
@ -1485,7 +1485,7 @@ class SAMLAuthBackendTest(SocialAuthBase):
side_effect=AuthStateForbidden('State forbidden')), \
mock.patch('zproject.backends.logging.warning') as m:
saml_response = self.generate_saml_response(self.email, self.name)
relay_state = SAMLAuthBackend.put_data_in_redis({"idp": "test_idp"})
relay_state = SAMLAuthBackend.put_data_in_redis({"idp": "test_idp", "subdomain": "zulip"})
post_params = {"SAMLResponse": saml_response, "RelayState": relay_state}
result = self.client_post('/complete/saml/', post_params)
self.assertEqual(result.status_code, 302)
@ -1505,7 +1505,7 @@ class SAMLAuthBackendTest(SocialAuthBase):
# Check that POSTing the RelayState, but with missing SAMLResponse,
# doesn't cause errors either:
with mock.patch('zproject.backends.logging.info') as m:
relay_state = SAMLAuthBackend.put_data_in_redis({"idp": "test_idp"})
relay_state = SAMLAuthBackend.put_data_in_redis({"idp": "test_idp", "subdomain": "zulip"})
post_params = {"RelayState": relay_state}
result = self.client_post('/complete/saml/', post_params)
self.assertEqual(result.status_code, 302)
@ -1516,7 +1516,7 @@ class SAMLAuthBackendTest(SocialAuthBase):
)
with mock.patch('zproject.backends.logging.info') as m:
relay_state = SAMLAuthBackend.put_data_in_redis({"idp": "test_idp"})
relay_state = SAMLAuthBackend.put_data_in_redis({"idp": "test_idp", "subdomain": "zulip"})
relay_state = relay_state[:-1] # Break the token by removing the last character
post_params = {"RelayState": relay_state}
result = self.client_post('/complete/saml/', post_params)
@ -1524,6 +1524,16 @@ class SAMLAuthBackendTest(SocialAuthBase):
self.assertIn('login', result.url)
m.assert_called_with("SAML authentication failed: bad RelayState token.")
def test_social_auth_complete_no_idp_and_subdomain(self) -> None:
with mock.patch('zproject.backends.logging.info') as m:
relay_state = SAMLAuthBackend.put_data_in_redis({})
post_params = {"RelayState": relay_state}
result = self.client_post('/complete/saml/', post_params)
self.assertEqual(result.status_code, 302)
self.assertEqual('/login/', result.url)
self.assertIn("Missing idp or subdomain value in relayed_params",
m.call_args.args[0])
def test_social_auth_saml_bad_idp_param_on_login_page(self) -> None:
with mock.patch('zproject.backends.logging.info') as m:
result = self.client_get('/login/saml/')
@ -1555,11 +1565,14 @@ class SAMLAuthBackendTest(SocialAuthBase):
# We deepcopy() dictionaries around for the sake of brevity,
# to avoid having to spell them out explicitly here.
# The second idp's configuration is a copy of the first one,
# with name test_idp2 and altered url.
# with name test_idp2 and altered url. It is also configured to be
# limited to the zulip realm, so that we get to test both types
# of configs here.
idps_dict = copy.deepcopy(settings.SOCIAL_AUTH_SAML_ENABLED_IDPS)
idps_dict['test_idp2'] = copy.deepcopy(idps_dict['test_idp'])
idps_dict['test_idp2']['url'] = 'https://idp2.example.com/idp/profile/SAML2/Redirect/SSO'
idps_dict['test_idp2']['display_name'] = 'Second Test IdP'
idps_dict['test_idp2']['limit_to_subdomains'] = ['zulip', ]
# Run tests with multiple idps configured:
with self.settings(SOCIAL_AUTH_SAML_ENABLED_IDPS=idps_dict):
@ -1591,6 +1604,25 @@ class SAMLAuthBackendTest(SocialAuthBase):
self.SIGNUP_URL = original_SIGNUP_URL
self.AUTHORIZATION_URL = original_AUTHORIZATION_URL
def test_social_auth_saml_idp_limited_to_subdomains_success(self) -> None:
idps_dict = copy.deepcopy(settings.SOCIAL_AUTH_SAML_ENABLED_IDPS)
idps_dict['test_idp']['limit_to_subdomains'] = ['zulip', ]
with self.settings(SOCIAL_AUTH_SAML_ENABLED_IDPS=idps_dict):
self.test_social_auth_success()
def test_social_auth_saml_idp_limited_to_subdomains_attempt_wrong_realm(self) -> None:
idps_dict = copy.deepcopy(settings.SOCIAL_AUTH_SAML_ENABLED_IDPS)
idps_dict['test_idp']['limit_to_subdomains'] = ['zulip', ]
with self.settings(SOCIAL_AUTH_SAML_ENABLED_IDPS=idps_dict):
account_data_dict = self.get_account_data_dict(email=self.email, name=self.name)
with mock.patch('zproject.backends.logging.info') as m:
result = self.social_auth_test(account_data_dict, subdomain='zephyr')
self.assertEqual(result.status_code, 302)
self.assertEqual('/login/', result.url)
m.assert_called_with(
"User authenticated with IdP test_idp but this provider is not enabled for this realm zephyr."
)
def test_social_auth_saml_login_bad_idp_arg(self) -> None:
for action in ['login', 'register']:
result = self.client_get('/accounts/{}/social/saml'.format(action))
@ -2446,6 +2478,36 @@ class ExternalMethodDictsTests(ZulipTestCase):
self.assert_in_success_response([string.format("register") for string in expected_button_id_strings],
result)
def test_get_external_method_dicts_multiple_saml_idps(self) -> None:
idps_dict = copy.deepcopy(settings.SOCIAL_AUTH_SAML_ENABLED_IDPS)
# Create another IdP config, by copying the original one and changing some details.idps_dict['test_idp'])
idps_dict['test_idp2'] = copy.deepcopy(idps_dict['test_idp'])
idps_dict['test_idp2']['url'] = 'https://idp2.example.com/idp/profile/SAML2/Redirect/SSO'
idps_dict['test_idp2']['display_name'] = 'Second Test IdP'
idps_dict['test_idp2']['limit_to_subdomains'] = ['zephyr', ]
with self.settings(
SOCIAL_AUTH_SAML_ENABLED_IDPS=idps_dict,
AUTHENTICATION_BACKENDS=('zproject.backends.EmailAuthBackend',
'zproject.backends.GitHubAuthBackend',
'zproject.backends.SAMLAuthBackend')
):
# Calling get_external_method_dicts without a realm returns all methods configured on the server:
external_auth_methods = get_external_method_dicts()
self.assert_length(external_auth_methods, 3) # 2 IdP + a dict for github auth
self.assertEqual(set([external_auth_methods[0]['name'], external_auth_methods[1]['name']]),
set(['saml:test_idp', 'saml:test_idp2']))
external_auth_methods = get_external_method_dicts(get_realm("zulip"))
# Only test_idp enabled for the zulip realm, + github auth.
self.assert_length(external_auth_methods, 2)
self.assertEqual(external_auth_methods[0]['name'], 'saml:test_idp')
external_auth_methods = get_external_method_dicts(get_realm("zephyr"))
# Both idps enabled for the zephyr realm, + github auth.
self.assert_length(external_auth_methods, 3)
self.assertEqual(set([external_auth_methods[0]['name'], external_auth_methods[1]['name']]),
set(['saml:test_idp', 'saml:test_idp2']))
class FetchAuthBackends(ZulipTestCase):
def assert_on_error(self, error: Optional[str]) -> None:
if error:

View File

@ -884,7 +884,7 @@ class ExternalAuthMethod(ABC):
@classmethod
@abstractmethod
def dict_representation(cls) -> List[ExternalAuthMethodDictT]:
def dict_representation(cls, realm: Optional[Realm]=None) -> List[ExternalAuthMethodDictT]:
"""
Method returning dictionaries representing the authentication methods
corresponding to the backend that subclasses this. The documentation
@ -929,7 +929,7 @@ class ZulipRemoteUserBackend(RemoteUserBackend, ExternalAuthMethod):
return common_get_active_user(email, realm, return_data=return_data)
@classmethod
def dict_representation(cls) -> List[ExternalAuthMethodDictT]:
def dict_representation(cls, realm: Optional[Realm]=None) -> List[ExternalAuthMethodDictT]:
return [dict(
name=cls.name,
display_name="SSO",
@ -1261,7 +1261,7 @@ class SocialAuthMixin(ZulipAuthMixin, ExternalAuthMethod):
return None
@classmethod
def dict_representation(cls) -> List[ExternalAuthMethodDictT]:
def dict_representation(cls, realm: Optional[Realm]=None) -> List[ExternalAuthMethodDictT]:
return [dict(
name=cls.name,
display_name=cls.auth_backend_name,
@ -1471,6 +1471,20 @@ class SAMLAuthBackend(SocialAuthMixin, SAMLAuth):
if relayed_params is None:
return None
idp_name = relayed_params.get("idp")
subdomain = relayed_params.get("subdomain")
if idp_name is None or subdomain is None:
error_msg = "Missing idp or subdomain value in relayed_params in SAML auth_complete: %s"
logging.info(error_msg % (dict(subdomain=subdomain, idp=idp_name),))
return None
idp_valid = self.validate_idp_for_subdomain(idp_name, subdomain)
if not idp_valid:
error_msg = "User authenticated with IdP %s but this provider is not " + \
"enabled for this realm %s."
logging.info(error_msg % (idp_name, subdomain))
return None
result = None
try:
for param, value in relayed_params.items():
@ -1480,7 +1494,7 @@ class SAMLAuthBackend(SocialAuthMixin, SAMLAuth):
# super().auth_complete expects to have RelayState set to the idp_name,
# so we need to replace this param.
post_params = self.strategy.request.POST.copy()
post_params['RelayState'] = relayed_params["idp"]
post_params['RelayState'] = idp_name
self.strategy.request.POST = post_params
# Call the auth_complete method of SocialAuthMixIn
@ -1500,6 +1514,16 @@ class SAMLAuthBackend(SocialAuthMixin, SAMLAuth):
return result
@classmethod
def validate_idp_for_subdomain(cls, idp_name: str, subdomain: str) -> bool:
idp_dict = settings.SOCIAL_AUTH_SAML_ENABLED_IDPS.get(idp_name)
if idp_dict is None:
raise AssertionError("IdP: %s not found" % (idp_name,))
if 'limit_to_subdomains' in idp_dict and subdomain not in idp_dict['limit_to_subdomains']:
return False
return True
@classmethod
def check_config(cls) -> Optional[HttpResponse]:
obligatory_saml_settings_list = [
@ -1515,9 +1539,12 @@ class SAMLAuthBackend(SocialAuthMixin, SAMLAuth):
return None
@classmethod
def dict_representation(cls) -> List[ExternalAuthMethodDictT]:
def dict_representation(cls, realm: Optional[Realm]=None) -> List[ExternalAuthMethodDictT]:
result = [] # type: List[ExternalAuthMethodDictT]
for idp_name, idp_dict in settings.SOCIAL_AUTH_SAML_ENABLED_IDPS.items():
if realm and not cls.validate_idp_for_subdomain(idp_name, realm.subdomain):
continue
saml_dict = dict(
name='saml:{}'.format(idp_name),
display_name=idp_dict.get('display_name', cls.auth_backend_name),
@ -1548,7 +1575,7 @@ def get_external_method_dicts(realm: Optional[Realm]=None) -> List[ExternalAuthM
# EXTERNAL_AUTH_METHODS is already sorted in the correct order,
# so we don't need to worry about sorting here.
if auth_enabled_helper([backend.auth_backend_name], realm):
result.extend(backend.dict_representation())
result.extend(backend.dict_representation(realm))
return result

View File

@ -254,6 +254,10 @@ SOCIAL_AUTH_SAML_ENABLED_IDPS = {
# the left end of the login/register buttons for this IDP.
# The default of None results in a text-only button.
# "display_icon": "/path/to/icon.png",
# If you want this IdP to only be enabled for authentication
# to certain subdomains, uncomment and edit the setting below.
# "limit_to_subdomains": ["subdomain1", "subdomain2"],
}
}