saml: Refactor some SAMLRequest/SAMLResponse processing logic.

Using these tuples is clearly uglier than using classes for storing
these encoded stream. This can be built on further to implement the
various fiddly logic around handling these objects inside appropriate
class method.
This commit is contained in:
Mateusz Mandera 2021-10-30 17:39:18 +02:00 committed by Tim Abbott
parent 64a04d90b2
commit 3702b31dd2
1 changed files with 35 additions and 16 deletions

View File

@ -2164,6 +2164,25 @@ class ZulipSAMLIdentityProvider(SAMLIdentityProvider):
return result return result
class SAMLDocument:
def __init__(self, encoded_saml_message: str) -> None:
self.encoded_saml_message = encoded_saml_message
def document_type(self) -> str:
"""
Returns whether the instance is a SAMLRequest or SAMLResponse.
"""
return type(self).__name__
class SAMLRequest(SAMLDocument):
pass
class SAMLResponse(SAMLDocument):
pass
@external_auth_method @external_auth_method
class SAMLAuthBackend(SocialAuthMixin, SAMLAuth): class SAMLAuthBackend(SocialAuthMixin, SAMLAuth):
auth_backend_name = "SAML" auth_backend_name = "SAML"
@ -2255,7 +2274,7 @@ class SAMLAuthBackend(SocialAuthMixin, SAMLAuth):
return data return data
def get_issuing_idp(self, saml_response_or_request: Tuple[str, str]) -> Optional[str]: def get_issuing_idp(self, saml_document: SAMLDocument) -> Optional[str]:
""" """
Given a SAMLResponse or SAMLRequest, returns which of the configured IdPs Given a SAMLResponse or SAMLRequest, returns which of the configured IdPs
is declared as the issuer. is declared as the issuer.
@ -2269,17 +2288,17 @@ class SAMLAuthBackend(SocialAuthMixin, SAMLAuth):
try: try:
config = self.generate_saml_config() config = self.generate_saml_config()
saml_settings = OneLogin_Saml2_Settings(config, sp_validation_only=True) saml_settings = OneLogin_Saml2_Settings(config, sp_validation_only=True)
if saml_response_or_request[1] == "SAMLResponse": if isinstance(saml_document, SAMLResponse):
resp = OneLogin_Saml2_Response( resp = OneLogin_Saml2_Response(
settings=saml_settings, response=saml_response_or_request[0] settings=saml_settings, response=saml_document.encoded_saml_message
) )
issuers = resp.get_issuers() issuers = resp.get_issuers()
else: else:
assert saml_response_or_request[1] == "SAMLRequest" assert isinstance(saml_document, SAMLRequest)
# The only valid SAMLRequest we can receive is a LogoutRequest. # The only valid SAMLRequest we can receive is a LogoutRequest.
logout_request_xml = OneLogin_Saml2_Logout_Request( logout_request_xml = OneLogin_Saml2_Logout_Request(
saml_settings, saml_response_or_request[0] saml_settings, saml_document.encoded_saml_message
).get_xml() ).get_xml()
issuers = [OneLogin_Saml2_Logout_Request.get_issuer(logout_request_xml)] issuers = [OneLogin_Saml2_Logout_Request.get_issuer(logout_request_xml)]
except self.SAMLRESPONSE_PARSING_EXCEPTIONS: except self.SAMLRESPONSE_PARSING_EXCEPTIONS:
@ -2455,15 +2474,15 @@ class SAMLAuthBackend(SocialAuthMixin, SAMLAuth):
Additionally, this handles incoming LogoutRequests for IdP-initated logout. Additionally, this handles incoming LogoutRequests for IdP-initated logout.
""" """
SAMLRequest = self.strategy.request_data().get("SAMLRequest") encoded_saml_request = self.strategy.request_data().get("SAMLRequest")
SAMLResponse = self.strategy.request_data().get("SAMLResponse") encoded_saml_response = self.strategy.request_data().get("SAMLResponse")
if SAMLResponse is None and SAMLRequest is None: if encoded_saml_response is None and encoded_saml_request is None:
self.logger.info("/complete/saml/: No SAMLResponse or SAMLRequest in request.") self.logger.info("/complete/saml/: No SAMLResponse or SAMLRequest in request.")
return None return None
elif SAMLRequest is not None: elif encoded_saml_request is not None:
saml_response_or_request = (SAMLRequest, "SAMLRequest") saml_document: SAMLDocument = SAMLRequest(encoded_saml_request)
elif SAMLResponse is not None: elif encoded_saml_response is not None:
saml_response_or_request = (SAMLResponse, "SAMLResponse") saml_document = SAMLResponse(encoded_saml_response)
relayed_params = self.get_relayed_params() relayed_params = self.get_relayed_params()
@ -2472,13 +2491,13 @@ class SAMLAuthBackend(SocialAuthMixin, SAMLAuth):
error_msg = ( error_msg = (
"/complete/saml/: Can't figure out subdomain for this %s. " + "relayed_params: %s" "/complete/saml/: Can't figure out subdomain for this %s. " + "relayed_params: %s"
) )
self.logger.info(error_msg, saml_response_or_request[1], relayed_params) self.logger.info(error_msg, saml_document.document_type(), relayed_params)
return None return None
idp_name = self.get_issuing_idp(saml_response_or_request) idp_name = self.get_issuing_idp(saml_document)
if idp_name is None: if idp_name is None:
self.logger.info( self.logger.info(
"/complete/saml/: No valid IdP as issuer of the %s.", saml_response_or_request[1] "/complete/saml/: No valid IdP as issuer of the %s.", saml_document.document_type()
) )
return None return None
@ -2491,7 +2510,7 @@ class SAMLAuthBackend(SocialAuthMixin, SAMLAuth):
self.logger.info(error_msg, idp_name, subdomain) self.logger.info(error_msg, idp_name, subdomain)
return None return None
if saml_response_or_request[1] == "SAMLRequest": if isinstance(saml_document, SAMLRequest):
return self.process_logout(subdomain, idp_name) return self.process_logout(subdomain, idp_name)
result = None result = None