From fe0fbda58993bb5f24c4443665a347f6e308e892 Mon Sep 17 00:00:00 2001 From: Tomas Pazderka Date: Mon, 16 Nov 2020 15:43:01 +0100 Subject: [PATCH] Disallow none alg in flows other than Authorization --- src/oic/oic/__init__.py | 16 ++++- src/oic/oic/message.py | 12 ++-- tests/test_oic_consumer.py | 137 +++++++++++++++++++++++++++++++++---- 3 files changed, 145 insertions(+), 20 deletions(-) diff --git a/src/oic/oic/__init__.py b/src/oic/oic/__init__.py index 0840843a7..bd427f3e5 100644 --- a/src/oic/oic/__init__.py +++ b/src/oic/oic/__init__.py @@ -43,6 +43,7 @@ from oic.oauth2.message import ErrorResponse from oic.oauth2.message import Message from oic.oauth2.message import MessageFactory +from oic.oauth2.message import WrongSigningAlgorithm from oic.oauth2.util import get_or_post from oic.oic.message import SCOPE2CLAIMS from oic.oic.message import AccessTokenResponse @@ -1399,7 +1400,13 @@ def sign_enc_algs(self, typ): return resp def _verify_id_token( - self, id_token, nonce="", acr_values=None, auth_time=0, max_age=0 + self, + id_token, + nonce="", + acr_values=None, + auth_time=0, + max_age=0, + response_type="", ): """ Verify IdToken. @@ -1432,6 +1439,11 @@ def _verify_id_token( if _now > id_token["exp"]: raise OtherError("Passed best before date") + if response_type != ["code"] and id_token.jws_header["alg"] == "none": + raise WrongSigningAlgorithm( + "none is not allowed outside Authorization Flow." + ) + if ( self.id_token_max_age and _now > int(id_token["iat"]) + self.id_token_max_age @@ -1458,7 +1470,7 @@ def verify_id_token(self, id_token, authn_req): except KeyError: pass - for param in ["acr_values", "max_age"]: + for param in ["acr_values", "max_age", "response_type"]: try: kwa[param] = authn_req[param] except KeyError: diff --git a/src/oic/oic/message.py b/src/oic/oic/message.py index 09e68419e..d869b9c61 100644 --- a/src/oic/oic/message.py +++ b/src/oic/oic/message.py @@ -313,17 +313,19 @@ def verify_id_token(instance, check_hash=False, **kwargs): if check_hash: _alg = idt.jws_header["alg"] - # What if _alg == 'none' - - hfunc = "HS" + _alg[-3:] + if _alg != "none": + hfunc = "HS" + _alg[-3:] + else: + # This is allowed only for `code` and it needs to be checked by a Client + hfunc = None - if "access_token" in instance: + if "access_token" in instance and hfunc is not None: if "at_hash" not in idt: raise MissingRequiredAttribute("Missing at_hash property", idt) if idt["at_hash"] != jws.left_hash(instance["access_token"], hfunc): raise AtHashError("Failed to verify access_token hash", idt) - if "code" in instance: + if "code" in instance and hfunc is not None: if "c_hash" not in idt: raise MissingRequiredAttribute("Missing c_hash property", idt) if idt["c_hash"] != jws.left_hash(instance["code"], hfunc): diff --git a/tests/test_oic_consumer.py b/tests/test_oic_consumer.py index 1f1317050..36a13e2c2 100644 --- a/tests/test_oic_consumer.py +++ b/tests/test_oic_consumer.py @@ -510,25 +510,73 @@ def test_complete_auth_token_idtoken(self): with freeze_time("2019-08-09 11:00:00"): part = self.consumer.parse_authz(query=parsed.query) + assert isinstance(part, tuple) auth = part[0] atr = part[1] - assert part[2] is None + idt = part[2] assert auth is None assert isinstance(atr, AccessTokenResponse) assert _eq( atr.keys(), ["access_token", "id_token", "token_type", "state", "scope"] ) + assert isinstance(idt, IdToken) - with freeze_time("2019-08-09 11:00:00"): - self.consumer.verify_id_token( - atr["id_token"], self.consumer.authz_req[atr["state"]] + def test_complete_auth_token_idtoken_no_alg_config(self): + _state = "state0" + self.consumer.consumer_config["response_type"] = ["id_token", "token"] + self.consumer.provider_info = ProviderConfigurationResponse( + issuer="https://example.com" + ) # abs min + self.consumer.authz_req = {} # Store AuthzReq with state as key + + args = { + "client_id": self.consumer.client_id, + "response_type": self.consumer.consumer_config["response_type"], + "scope": ["openid"], + "nonce": "nonce", + } + token = IdToken( + iss="https://example.com", + aud="client_1", + sub="some_sub", + exp=1565348600, + iat=1565348300, + nonce="nonce", + ) + location = ( + "https://example.com/cb?state=state0&access_token=token&token_type=bearer&" + "scope=openid&id_token={}".format( + token.to_jwt(key=[SYMKey(key="hemlig")], algorithm="HS256") ) + ) + with responses.RequestsMock() as rsps: + rsps.add( + responses.GET, + "https://example.com/authorization", + status=302, + headers={"location": location}, + ) + result = self.consumer.do_authorization_request( + state=_state, request_args=args + ) + query = parse_qs(urlparse(result.request.url).query) + assert query["client_id"] == ["client_1"] + assert query["scope"] == ["openid"] + assert query["response_type"] == ["id_token token"] + assert query["state"] == ["state0"] + assert query["nonce"] == ["nonce"] + assert query["redirect_uri"] == ["https://example.com/cb"] + + parsed = urlparse(result.headers["location"]) + + with freeze_time("2019-08-09 11:00:00"): + part = self.consumer.parse_authz(query=parsed.query, algs={"sign": "HS256"}) + assert isinstance(part, tuple) auth = part[0] atr = part[1] idt = part[2] - assert isinstance(part, tuple) assert auth is None assert isinstance(atr, AccessTokenResponse) assert _eq( @@ -536,13 +584,17 @@ def test_complete_auth_token_idtoken(self): ) assert isinstance(idt, IdToken) - def test_complete_auth_token_idtoken_no_alg_config(self): + def test_complete_auth_token_idtoken_none_cipher_code(self): _state = "state0" - self.consumer.consumer_config["response_type"] = ["id_token", "token"] + self.consumer.consumer_config["response_type"] = ["code"] + self.consumer.registration_response = RegistrationResponse( + id_token_signed_response_alg="none" + ) self.consumer.provider_info = ProviderConfigurationResponse( issuer="https://example.com" ) # abs min self.consumer.authz_req = {} # Store AuthzReq with state as key + self.consumer.sdb[_state] = {"redirect_uris": []} args = { "client_id": self.consumer.client_id, @@ -557,11 +609,13 @@ def test_complete_auth_token_idtoken_no_alg_config(self): exp=1565348600, iat=1565348300, nonce="nonce", + at_hash="aaa", ) + # Downgrade the algorithm to `none` location = ( "https://example.com/cb?state=state0&access_token=token&token_type=bearer&" "scope=openid&id_token={}".format( - token.to_jwt(key=[SYMKey(key="hemlig")], algorithm="HS256") + token.to_jwt(key=KC_RSA.keys(), algorithm="none") ) ) with responses.RequestsMock() as rsps: @@ -577,7 +631,7 @@ def test_complete_auth_token_idtoken_no_alg_config(self): query = parse_qs(urlparse(result.request.url).query) assert query["client_id"] == ["client_1"] assert query["scope"] == ["openid"] - assert query["response_type"] == ["id_token token"] + assert query["response_type"] == ["code"] assert query["state"] == ["state0"] assert query["nonce"] == ["nonce"] assert query["redirect_uri"] == ["https://example.com/cb"] @@ -585,19 +639,76 @@ def test_complete_auth_token_idtoken_no_alg_config(self): parsed = urlparse(result.headers["location"]) with freeze_time("2019-08-09 11:00:00"): - part = self.consumer.parse_authz(query=parsed.query, algs={"sign": "HS256"}) + part = self.consumer.parse_authz(query=parsed.query) + assert isinstance(part, tuple) auth = part[0] atr = part[1] idt = part[2] - assert isinstance(part, tuple) - assert auth is None + assert isinstance(auth, AuthorizationResponse) assert isinstance(atr, AccessTokenResponse) assert _eq( atr.keys(), ["access_token", "id_token", "token_type", "state", "scope"] ) assert isinstance(idt, IdToken) + def test_complete_auth_token_idtoken_none_cipher_token(self): + _state = "state0" + self.consumer.consumer_config["response_type"] = ["token"] + self.consumer.registration_response = RegistrationResponse( + id_token_signed_response_alg="none" + ) + self.consumer.provider_info = ProviderConfigurationResponse( + issuer="https://example.com" + ) # abs min + self.consumer.authz_req = {} # Store AuthzReq with state as key + self.consumer.sdb[_state] = {"redirect_uris": []} + + args = { + "client_id": self.consumer.client_id, + "response_type": self.consumer.consumer_config["response_type"], + "scope": ["openid"], + "nonce": "nonce", + } + token = IdToken( + iss="https://example.com", + aud="client_1", + sub="some_sub", + exp=1565348600, + iat=1565348300, + nonce="nonce", + ) + # Downgrade the algorithm to `none` + location = ( + "https://example.com/cb?state=state0&access_token=token&token_type=bearer&" + "scope=openid&id_token={}".format( + token.to_jwt(key=KC_RSA.keys(), algorithm="none") + ) + ) + with responses.RequestsMock() as rsps: + rsps.add( + responses.GET, + "https://example.com/authorization", + status=302, + headers={"location": location}, + ) + result = self.consumer.do_authorization_request( + state=_state, request_args=args + ) + query = parse_qs(urlparse(result.request.url).query) + assert query["client_id"] == ["client_1"] + assert query["scope"] == ["openid"] + assert query["response_type"] == ["token"] + assert query["state"] == ["state0"] + assert query["nonce"] == ["nonce"] + assert query["redirect_uri"] == ["https://example.com/cb"] + + parsed = urlparse(result.headers["location"]) + + with freeze_time("2019-08-09 11:00:00"): + with pytest.raises(WrongSigningAlgorithm): + self.consumer.parse_authz(query=parsed.query) + def test_complete_auth_token_idtoken_cipher_downgrade(self): _state = "state0" self.consumer.consumer_config["response_type"] = ["id_token", "token"] @@ -649,7 +760,7 @@ def test_complete_auth_token_idtoken_cipher_downgrade(self): with freeze_time("2019-08-09 11:00:00"): with pytest.raises(WrongSigningAlgorithm): - part = self.consumer.parse_authz(query=parsed.query) + self.consumer.parse_authz(query=parsed.query) def test_userinfo(self): _state = "state0"