From 9f61e8f8c765a21aba95ffb88d1b35dd19910906 Mon Sep 17 00:00:00 2001 From: Ben Jansen Date: Mon, 9 Oct 2023 14:59:44 -0700 Subject: [PATCH] Add Duo Universal Prompt push support Duo's Universal Prompt Okta integration involves a new type of MFA factor, claims_provider. This change treats claims_provider as Duo Universal Prompt and implements the interactions required to trigger a push to the user's preferred device, then complete the transaction and obtain a valid Okta session. Supported authentication includes Duo Push, Phone Call, and Passcode. --- README.md | 6 +- gimme_aws_creds/duo_universal.py | 197 +++++++++ gimme_aws_creds/main.py | 3 + gimme_aws_creds/okta_classic.py | 73 ++-- requirements.txt | 2 + tests/__init__.py | 8 + tests/fixtures/duo_universal_login_form.html | 391 +++++++++++++++++ ...l_login_form_without_preferred_device.html | 405 ++++++++++++++++++ tests/fixtures/duo_universal_plugin_form.html | 62 +++ tests/test_aws_resolver.py | 10 +- tests/test_duo_universal_client.py | 325 ++++++++++++++ 11 files changed, 1449 insertions(+), 33 deletions(-) create mode 100644 gimme_aws_creds/duo_universal.py create mode 100644 tests/fixtures/duo_universal_login_form.html create mode 100644 tests/fixtures/duo_universal_login_form_without_preferred_device.html create mode 100644 tests/fixtures/duo_universal_plugin_form.html create mode 100644 tests/test_duo_universal_client.py diff --git a/README.md b/README.md index b29482fb..fc00f1ed 100644 --- a/README.md +++ b/README.md @@ -225,7 +225,11 @@ A configuration wizard will prompt you to enter the necessary configuration para - email - OTP via email - web - DUO uses localhost webbrowser to support push|call|passcode - passcode - DUO uses `OKTA_MFA_CODE` or `--mfa-code` if set, or prompts user for passcode(OTP). - + - claims_provider - DUO Universal Prompt +- duo_universal_factor - (optional) Configure which type of factor to use with Duo Universal Prompt. Must be one of (case-sensitive): + - `Duo Push` (default) + - `Passcode` + - `Phone Call` - resolve_aws_alias - y or n. If yes, gimme-aws-creds will try to resolve AWS account ids with respective alias names (default: n). This option can also be set interactively in the command line using `-r` or `--resolve` parameter - include_path - (optional) Includes full role path to the role name in AWS credential profile name. (default: n). If `y`: `-/some/path/administrator`. If `n`: `-administrator` - remember_device - y or n. If yes, the MFA device will be remembered by Okta service for a limited time. This option can also be set interactively in the command line using `-m` or `--remember-device` diff --git a/gimme_aws_creds/duo_universal.py b/gimme_aws_creds/duo_universal.py new file mode 100644 index 00000000..e3a3c6b0 --- /dev/null +++ b/gimme_aws_creds/duo_universal.py @@ -0,0 +1,197 @@ +import time + +import html5lib +from furl import furl + +from . import version + + +class DuoMfaDenied(BaseException): + """ Duo MFA was denied """ + + def __init__(self, response): + super(DuoMfaDenied, self).__init__(f'Duo MFA denied: {response}') + + +class OktaDuoUniversal: + """ Handles interaction with the Duo Universal Prompt """ + + def __init__(self, ui, session, state_token, okta_factor, remember_device, duo_factor='Duo Push', duo_passcode=None): + self.ui = ui + self.state_token = state_token + self.okta_factor = okta_factor + self.remember_device = remember_device + self.session = session + if duo_factor not in ['Duo Push', 'Passcode', 'Phone Call']: + raise Exception('Preferred Duo Universal factor must be one of: Duo Push, Passcode, Phone Call') + self.duo_factor = duo_factor + self.duo_passcode = duo_passcode + + def do_auth(self): + """ Follow Duo Universal Prompt flow through to an active Okta user session """ + + duo_prompt_url, okta_profile_login = self._initiate_okta_factor_verification() + duo_origin, duo_plugin_form_response = self._handle_duo_plugin_form(duo_prompt_url) + + # Submit second Duo form (login-form), which triggers a Duo Push, phone call, or accepts the Passcode + login_form_action, duo_login_form_data = self._get_duo_universal_login_form_data(duo_plugin_form_response) + login_form_action_url = furl(duo_origin) / login_form_action + duo_factor, duo_sid, duo_txid, duo_xsrf = self._submit_duo_login_form(duo_login_form_data, + login_form_action_url) + + self.ui.info(f"Duo Universal: Using {self.duo_factor}...") + + self._wait_for_duo_universal_transaction(duo_origin, duo_txid, duo_sid) + + # Once Duo has been approved, load the OIDC exit URL to be redirected to Okta and gain a user session + oidc_exit_url = furl(duo_origin) / 'frame/v4/oidc/exit' + exit_headers = self._get_form_headers() + exit_response = self.session.post( + oidc_exit_url.url, + data={ + 'txid': duo_txid, + 'sid': duo_sid, + 'factor': duo_factor, + '_xsrf': duo_xsrf, + 'device_key': '', + 'dampen_choice': 'false', + }, + headers=exit_headers, + ) + exit_response.raise_for_status() + + # The claims_provider factor immediately yields an active user session, no subsequent request for SID required. + return { + 'apiResponse': { + 'status': 'SUCCESS', + 'userSession': { + "username": okta_profile_login, + "session": self.session.cookies['sid'], + "device_token": self.session.cookies['DT'] + } + }, + } + + def _submit_duo_login_form(self, duo_login_form_data, login_form_action_url): + # Submit Duo's form id=login-form, which triggers a Duo Push, phone call, or accepts a Passcode. + duo_login_form_response = self.session.post( + login_form_action_url.url, + data=duo_login_form_data, + headers=self._get_form_headers(), + ) + duo_login_form_response.raise_for_status() + duo_sid = duo_login_form_data['sid'] + duo_factor = duo_login_form_data['factor'] + duo_xsrf = duo_login_form_data['_xsrf'] + duo_login_response_data = duo_login_form_response.json() + if duo_login_response_data['stat'] != 'OK': + raise Exception(f"Triggering Duo MFA failed: {duo_login_form_response.content}") + duo_txid = duo_login_response_data['response']['txid'] + return duo_factor, duo_sid, duo_txid, duo_xsrf + + def _handle_duo_plugin_form(self, duo_prompt_url): + # Request Duo prompt + verify_get_response = self.session.get( + duo_prompt_url, + ) + verify_get_response.raise_for_status() + duo_origin = furl(verify_get_response.url).origin + # Submit first Duo form (plugin_form) + form_data = self._get_duo_universal_plugin_form_data(verify_get_response) + duo_plugin_form_response = self.session.post( + verify_get_response.url, + data=form_data, + headers=self._get_form_headers(), + ) + duo_plugin_form_response.raise_for_status() + return duo_origin, duo_plugin_form_response + + def _initiate_okta_factor_verification(self): + # POST to the Okta factor verify URL gives us the URL to request to load Duo + verify_post_response = self.session.post( + self.okta_factor['_links']['verify']['href'], + params={'rememberDevice': self.remember_device}, + json={'stateToken': self.state_token}, + ) + verify_post_response.raise_for_status() + verify_response_data = verify_post_response.json() + duo_prompt_url = verify_response_data['_links']['next']['href'] + okta_profile_login = verify_response_data['_embedded']['user']['profile']['login'] + return duo_prompt_url, okta_profile_login + + def _wait_for_duo_universal_transaction(self, duo_host, txid, sid): + status_url = furl(duo_host) / 'frame/v4/status' + status_data = { + 'txid': txid, + 'sid': sid + } + headers = self._get_form_headers() + + tries = 0 + while tries < 16: + tries += 1 + time.sleep(0.5) + + status_response = self.session.post( + status_url.url, + data=status_data, + headers=headers, + ) + status_response.raise_for_status() + + json_response = status_response.json() + if json_response['stat'] != 'OK': + raise Exception(f"Error checking Duo MFA status: {status_response.text}") + + if json_response['response']['status_code'] == 'allow': + return txid + if json_response['response']['status_code'] == 'deny': + raise DuoMfaDenied(json_response) + + raise Exception('Timed out waiting for Duo MFA') + + @staticmethod + def _get_form_headers(): + form_headers = { + 'User-Agent': "gimme-aws-creds {}".format(version), + 'Accept': 'application/json', + 'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8' + } + return form_headers + + def _get_duo_universal_login_form_data(self, plugin_form_response): + """ Get form data to post when submitting the Duo login-form """ + + doc = html5lib.parse(plugin_form_response.content, namespaceHTMLElements=False) + form_action = doc.find('.//form[@id="login-form"]').get('action') + form_data = {} + for field in doc.iterfind('.//form[@id="login-form"]/input'): + form_data[field.get('name')] = field.get('value') + + preferred_device = self._find_device_to_use(doc) + + form_data['factor'] = self.duo_factor + form_data['device'] = preferred_device + form_data['postAuthDestination'] = 'OIDC_EXIT' + if self.duo_passcode: + form_data['passcode'] = self.duo_passcode + + return form_action, form_data + + @staticmethod + def _find_device_to_use(doc): + device = doc.find('.//input[@name="preferred_device"]').get('value') + if device is None or device == '': + device = doc.find('.//select[@name="device"]/option').get('value') + return device + + @staticmethod + def _get_duo_universal_plugin_form_data(response): + """ Get form data to post when submitting the Duo plugin_form """ + + doc = html5lib.parse(response.content, namespaceHTMLElements=False) + form_data = {} + for field in doc.iterfind('.//form[@id="plugin_form"]/input'): + form_data[field.get('name')] = field.get('value') + + return form_data diff --git a/gimme_aws_creds/main.py b/gimme_aws_creds/main.py index f897f001..44599389 100644 --- a/gimme_aws_creds/main.py +++ b/gimme_aws_creds/main.py @@ -579,6 +579,9 @@ def okta(self): if self.conf_dict.get('preferred_mfa_type'): okta.set_preferred_mfa_type(self.conf_dict['preferred_mfa_type']) + if self.conf_dict.get('duo_universal_factor'): + okta.set_duo_universal_factor(self.conf_dict.get('duo_universal_factor')) + if self.config.mfa_code is not None: okta.set_mfa_code(self.config.mfa_code) elif self.conf_dict.get('okta_mfa_code'): diff --git a/gimme_aws_creds/okta_classic.py b/gimme_aws_creds/okta_classic.py index fcd090a5..68a131b6 100644 --- a/gimme_aws_creds/okta_classic.py +++ b/gimme_aws_creds/okta_classic.py @@ -32,6 +32,7 @@ from gimme_aws_creds.u2f import FactorU2F from gimme_aws_creds.webauthn import WebAuthnClient, FakeAssertion from . import errors, ui, version, duo +from .duo_universal import OktaDuoUniversal from .errors import GimmeAWSCredsMFAEnrollStatus from .registered_authenticators import RegisteredAuthenticators @@ -62,6 +63,7 @@ def __init__(self, gac_ui, okta_org_url, verify_ssl_certs=True, device_token=Non self._username = None self._password = None self._preferred_mfa_type = None + self._duo_universal_factor = 'Duo Push' self._mfa_code = None self._remember_device = None @@ -104,6 +106,9 @@ def set_preferred_mfa_type(self, preferred_mfa_type): def set_mfa_code(self, mfa_code): self._mfa_code = mfa_code + def set_duo_universal_factor(self, duo_universal_factor): + self._duo_universal_factor = duo_universal_factor + def set_remember_device(self, remember_device): self._remember_device = bool(remember_device) @@ -158,30 +163,33 @@ def auth_session(self, **kwargs): """ Authenticate the user and return the Okta Session ID and username""" login_response = self.auth() - session_url = self._okta_org_url + '/login/sessionCookieRedirect' - - if 'redirect_uri' not in kwargs: - redirect_uri = 'http://localhost:8080/login' + if 'userSession' in login_response: + return login_response['userSession'] else: - redirect_uri = kwargs['redirect_uri'] + session_url = self._okta_org_url + '/login/sessionCookieRedirect' - params = { - 'token': login_response['sessionToken'], - 'redirectUrl': redirect_uri - } + if 'redirect_uri' not in kwargs: + redirect_uri = 'http://localhost:8080/login' + else: + redirect_uri = kwargs['redirect_uri'] - response = self._http_client.get( - session_url, - params=params, - headers=self._get_headers(), - verify=self._verify_ssl_certs, - allow_redirects=False - ) - return { - "username": login_response['_embedded']['user']['profile']['login'], - "session": response.cookies['sid'], - "device_token": self._http_client.cookies['DT'] - } + params = { + 'token': login_response['sessionToken'], + 'redirectUrl': redirect_uri + } + + response = self._http_client.get( + session_url, + params=params, + headers=self._get_headers(), + verify=self._verify_ssl_certs, + allow_redirects=False + ) + return { + "username": login_response['_embedded']['user']['profile']['login'], + "session": response.cookies['sid'], + "device_token": self._http_client.cookies['DT'] + } def auth_oauth(self, client_id, **kwargs): """ Login to Okta and retrieve access token, ID token or both """ @@ -451,6 +459,19 @@ def _login_send_push(self, state_token, factor): if 'sessionToken' in response_data: return {'stateToken': None, 'sessionToken': response_data['sessionToken'], 'apiResponse': response_data} + def _login_duo_universal(self, state_token, factor): + duo_passcode = None + if self._duo_universal_factor == 'Passcode': + duo_passcode = self.ui.input(message='Duo Passcode: ') + duo_client = OktaDuoUniversal(self.ui, + self._http_client, + state_token, + factor, + self._remember_device, + self._duo_universal_factor, + duo_passcode) + return duo_client.do_auth() + def _login_input_webauthn_challenge(self, state_token, factor): """ Retrieve nonce """ response = self._http_client.post( @@ -593,6 +614,8 @@ def _login_multi_factor(self, state_token, login_data): return self._login_input_webauthn_challenge(state_token, factor) elif factor['factorType'] == 'token:hardware': return self._login_input_mfa_challenge(state_token, factor['_links']['verify']['href']) + elif factor['factorType'] == 'claims_provider': + return self._login_duo_universal(state_token, factor) def _login_input_mfa_challenge(self, state_token, next_url): """ Submit verification code for SMS or TOTP authentication methods""" @@ -717,7 +740,7 @@ def _check_webauthn_result(self, state_token, login_data): else: return {'stateToken': None, 'sessionToken': None, 'apiResponse': response_data} - def get_saml_response(self, url, auth_session = None): + def get_saml_response(self, url, auth_session=None): """ return the base64 SAML value object from the SAML Response""" response = self._http_client.get(url, verify=self._verify_ssl_certs) response.raise_for_status() @@ -888,6 +911,8 @@ def _build_factor_name(self, factor): return factor['factorType'] + ": " + factor_name elif factor['factorType'] == 'token:hardware': return factor['factorType'] + ": " + factor['provider'] + elif factor['factorType'] == 'claims_provider': + return factor['factorType'] + ": " + factor['vendorName'] else: return "Unknown MFA type: " + factor['factorType'] @@ -1063,7 +1088,7 @@ def _introspect_factors(self, state_token): @staticmethod def _extract_state_token_from_http_response(http_res): # extract the stateToken from a javascript variable - state_token_re = re.search(r"var stateToken = '(.*)';", http_res.text) + state_token_re = re.search(r"var stateToken = '(.*)';", http_res.text) if state_token_re is not None: return decode(state_token_re.group(1), "unicode-escape") @@ -1074,4 +1099,4 @@ def _extract_state_token_from_http_response(http_res): state_token_re = re.search(r"stateToken=(.*?[ \"])", http_res.text) if state_token_re is not None: pre_state_token = decode(state_token_re.group(1), "unicode-escape") - return pre_state_token.rstrip('\"') \ No newline at end of file + return pre_state_token.rstrip('\"') diff --git a/requirements.txt b/requirements.txt index 7ebe2947..613e1e78 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,3 +7,5 @@ okta>=0.0.4,<1.0.0 ctap-keyring-device==1.0.6 pyjwt>=2.4.0,<3.0.0 urllib3>=1.26.0,<2.0.0 +html5lib>=1.1,<2.0.0 +furl>=2.1.3,<3.0.0 diff --git a/tests/__init__.py b/tests/__init__.py index e69de29b..6b8a7e8f 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -0,0 +1,8 @@ +import os + + +def read_fixture(file_name): + """Read a fixture file""" + fixture_path = os.path.join(os.path.dirname(__file__), 'fixtures', file_name) + with open(fixture_path, 'r', encoding='utf-8') as file: + return file.read() diff --git a/tests/fixtures/duo_universal_login_form.html b/tests/fixtures/duo_universal_login_form.html new file mode 100644 index 00000000..48271c8e --- /dev/null +++ b/tests/fixtures/duo_universal_login_form.html @@ -0,0 +1,391 @@ + + + + + + + + + + + + +Two-Factor Authentication + + + + + + + + + + + + + + +
+
+ +
+
+ + + + + +
+ + + +
+ + +
+
+ +
+ +
+ + + +
+
+ + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/tests/fixtures/duo_universal_login_form_without_preferred_device.html b/tests/fixtures/duo_universal_login_form_without_preferred_device.html new file mode 100644 index 00000000..fc0ae786 --- /dev/null +++ b/tests/fixtures/duo_universal_login_form_without_preferred_device.html @@ -0,0 +1,405 @@ + + + + + + + + + + + + + + Two-Factor Authentication + + + + + + + + + + + + + + +
+
+ +
+
+ + + + +
+ + + +
+ + +
+
+ +
+ +
+ + + +
+
+ + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/tests/fixtures/duo_universal_plugin_form.html b/tests/fixtures/duo_universal_plugin_form.html new file mode 100644 index 00000000..bbe19546 --- /dev/null +++ b/tests/fixtures/duo_universal_plugin_form.html @@ -0,0 +1,62 @@ + + + +Duo Security - Two-Factor Authentication + + + + + + + + + +
+ +
+ + + + + + + + + + + + + + + + + + + + + + + + + + + +
+ + + +
+ +
+ + + + + + + + + + + diff --git a/tests/test_aws_resolver.py b/tests/test_aws_resolver.py index 6cc405f9..365ad8bc 100644 --- a/tests/test_aws_resolver.py +++ b/tests/test_aws_resolver.py @@ -1,6 +1,5 @@ """Unit tests for gimme_aws_creds""" import sys -import os import unittest from contextlib import contextmanager from io import StringIO @@ -10,13 +9,9 @@ import gimme_aws_creds.common as common_def from gimme_aws_creds.aws import AwsResolver +from tests import read_fixture + -def read_fixture(file_name): - """Read a fixture file""" - fixture_path = os.path.join(os.path.dirname(__file__), 'fixtures', file_name) - with open(fixture_path, 'r', encoding='utf-8') as file: - return file.read() - class TestAwsResolver(unittest.TestCase): """Class to test Okta Client Class. Mock is used to mock external calls""" @@ -109,4 +104,3 @@ def test_display_role(self): self.assertEqual(result[4], self.display_role[4]) self.assertEqual(result[5], self.display_role[5]) self.assertEqual(result[6], self.display_role[6]) - diff --git a/tests/test_duo_universal_client.py b/tests/test_duo_universal_client.py new file mode 100644 index 00000000..ac08c064 --- /dev/null +++ b/tests/test_duo_universal_client.py @@ -0,0 +1,325 @@ +import json +import unittest +from unittest.mock import Mock + +import requests +import responses +from tests import read_fixture + +from gimme_aws_creds.duo_universal import OktaDuoUniversal +from tests.user_interface_mock import MockUserInterface + + +class TestDuoUniversalClient(unittest.TestCase): + def setUp(self): + self.OKTA_STATE_TOKEN = 'statetokenstatetokenstatetokenstatetokenstateto' + self.OKTA_LOGIN = 'okta.user@example.com' + self.OKTA_FIRST_NAME = 'Okta' + self.OKTA_LAST_NAME = 'User' + self.OKTA_FACTOR_ID = 'oktafactorid' + self.OKTA_FACTOR = { + 'factorType': 'claims_provider', + 'provider': 'CUSTOM', + 'vendorName': 'Duo Universal Prompt', + '_links': { + 'verify': { + 'href': 'https://oktatenant.oktapreview.com/sso/idps/foo/verify', + 'hints': { + 'allow': ['POST'] + } + } + } + } + self.OKTA_DT_VALUE = 'oktadtvalue' + self.OKTA_SID_VALUE = 'oktasidvalue' + + self.REQ_0_OKTA_AUTHN_FACTORS_VERIFY_RESPONSE = { + 'stateToken': self.OKTA_STATE_TOKEN, + 'expiresAt': '2100-10-17T19:35:07.000Z', 'status': 'MFA_CHALLENGE', + 'factorResult': 'CHALLENGE', + '_embedded': { + 'user': { + 'id': 'oktauseridoktauserid', + 'passwordChanged': '2100-05-17T18:56:58.000Z', + 'profile': { + 'login': self.OKTA_LOGIN, + 'firstName': self.OKTA_FIRST_NAME, + 'lastName': self.OKTA_LAST_NAME, + 'locale': 'en_US', 'timeZone': 'America/Los_Angeles' + } + }, + 'factor': { + 'id': self.OKTA_FACTOR_ID, + 'factorType': 'claims_provider', + 'provider': 'CUSTOM', + 'vendorName': 'Duo Universal Prompt'}, + 'policy': { + 'allowRememberDevice': True, + 'rememberDeviceLifetimeInMinutes': 720, + 'rememberDeviceByDefault': False, + 'factorsPolicyInfo': {} + } + }, + '_links': { + 'next': { + 'name': 'redirect', + 'href': f'https://oktatenant.oktapreview.com/sso/idps/oktaclaimsidpid?stateToken={self.OKTA_STATE_TOKEN}', + 'hints': { + 'allow': ['GET'] + } + }, + 'cancel': { + 'href': 'https://oktatenant.oktapreview.com/api/v1/authn/cancel', + 'hints': { + 'allow': ['POST'] + } + }, + 'prev': { + 'href': 'https://oktatenant.oktapreview.com/api/v1/authn/previous', + 'hints': { + 'allow': ['POST'] + } + } + } + } + self.DUO_ORIGIN = 'https://duo-tenant.duosecurity.com' + self.DUO_TX = 'tttttttttttttttttttttttttttttttttttt.tttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttt.ttttt_tt-tttttttttttttttttttttttt_tttttttttttttttttttttttttt_ttttttttttttttttttt-tt-tt' + self.DUO_SID = 'frameless-aaaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaaaa' + self.DUO_AUTHORIZE_URL = f'{self.DUO_ORIGIN}/oauth/v1/authorize?client_id=oktaclientidoktaclientid&response_type=code&scope=openid&request=rrrrrrrrrrrrrrrrrrrr.rrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrr._rrrrrrrrrrr-rrrrrrrrrrrrrrrrrrrrrrrrrrrrrr' + self.DUO_FRAMELESS_AUTH_PATH = f'/frame/frameless/v4/auth?sid={self.DUO_SID}&tx={self.DUO_TX}' + self.DUO_PROMPT_PATH = '/frame/prompt?sid=frameless-aaaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaaaa' + self.DUO_XSRF = 'xsrfxsrfxsrfxsrfxsrfxsrfxsrfxsrfxsrfxs' + self.DUO_PLUGIN_FORM_CONTENT = read_fixture('duo_universal_plugin_form.html') + self.DUO_LOGIN_FORM_CONTENT = read_fixture('duo_universal_login_form.html') + self.DUO_TXID = 'txidtxid-txid-txid-txid-txidtxid' + self.DUO_LOGIN_FORM_SUBMISSION_RESPONSE = { + 'stat': 'OK', + 'response': { + 'txid': self.DUO_TXID + } + } + self.DUO_STATUS_PUSH_IN_PROGRESS_RESPONSE = { + 'stat': 'OK', + 'response': { + 'status_enum': 13, + 'status_code': 'pushed' + } + } + self.DUO_STATUS_PUSH_COMPLETE_RESPONSE = { + 'stat': 'OK', + 'response': { + 'status_enum': 5, + 'status_code': 'allow', + 'result': 'SUCCESS', + 'reason': 'User approved', + 'post_auth_action': 'oidc_exit' + } + } + + @responses.activate(registry=responses.registries.OrderedRegistry) + def test_universal_push(self): + self.configure_duo_responses(duo_factor='Duo Push') + + session = requests.Session() + duo = OktaDuoUniversal(ui=MockUserInterface(), + session=session, + state_token=self.OKTA_STATE_TOKEN, + okta_factor=self.OKTA_FACTOR, + remember_device=True, + duo_factor='Duo Push') + result = duo.do_auth() + assert result == { + 'apiResponse': { + 'status': 'SUCCESS', + 'userSession': { + 'username': self.OKTA_LOGIN, + 'session': self.OKTA_SID_VALUE, + 'device_token': self.OKTA_DT_VALUE + } + }, + } + + @responses.activate(registry=responses.registries.OrderedRegistry) + def test_universal_phone_call(self): + self.configure_duo_responses(duo_factor='Phone Call') + session = requests.Session() + duo = OktaDuoUniversal(ui=MockUserInterface(), + session=session, + state_token=self.OKTA_STATE_TOKEN, + okta_factor=self.OKTA_FACTOR, + remember_device=True, + duo_factor='Phone Call') + result = duo.do_auth() + assert result == { + 'apiResponse': { + 'status': 'SUCCESS', + 'userSession': { + 'username': self.OKTA_LOGIN, + 'session': self.OKTA_SID_VALUE, + 'device_token': self.OKTA_DT_VALUE + } + }, + } + + @responses.activate(registry=responses.registries.OrderedRegistry) + def test_universal_passcode(self): + self.configure_duo_responses(duo_factor='Passcode', passcode='12345') + session = requests.Session() + duo = OktaDuoUniversal(ui=MockUserInterface(), + session=session, + state_token=self.OKTA_STATE_TOKEN, + okta_factor=self.OKTA_FACTOR, + remember_device=True, + duo_factor='Passcode', + duo_passcode='12345') + result = duo.do_auth() + assert result == { + 'apiResponse': { + 'status': 'SUCCESS', + 'userSession': { + 'username': self.OKTA_LOGIN, + 'session': self.OKTA_SID_VALUE, + 'device_token': self.OKTA_DT_VALUE + } + }, + } + + def test_no_preferred_device(self): + session = requests.Session() + login_form_response = Mock() + login_form_response.content = read_fixture('duo_universal_login_form_without_preferred_device.html') + duo = OktaDuoUniversal(ui=MockUserInterface(), + session=session, + state_token=self.OKTA_STATE_TOKEN, + okta_factor=self.OKTA_FACTOR, + remember_device=True, + duo_factor='Passcode', + duo_passcode='12345') + form_action, form_data = duo._get_duo_universal_login_form_data(login_form_response) + assert form_data['device'] == 'phone1' + + def configure_duo_responses(self, duo_factor, passcode=None): + # Initial request to Okta to verify IDP factor + responses.add(responses.POST, + self.OKTA_FACTOR['_links']['verify']['href'] + '?rememberDevice=True', + body=json.dumps(self.REQ_0_OKTA_AUTHN_FACTORS_VERIFY_RESPONSE), + match=[ + responses.matchers.json_params_matcher( + { + 'stateToken': self.OKTA_STATE_TOKEN + } + )] + ) + # Request to Okta IDP next step + responses.add(method=responses.GET, + url=self.REQ_0_OKTA_AUTHN_FACTORS_VERIFY_RESPONSE['_links']['next']['href'], + status=302, + adding_headers={ + 'Location': self.DUO_AUTHORIZE_URL, + 'Set-Cookie': f'DT={self.OKTA_DT_VALUE};Version=1;Path=/;Max-Age=63072000;Secure;Expires=Thu, 16 Oct 2100 19:30:07 GMT;HttpOnly' + }) + # Duo redirects a couple of times before presenting a form + responses.add(method=responses.GET, + url=self.DUO_AUTHORIZE_URL, + status=303, + adding_headers={ + 'Location': self.DUO_FRAMELESS_AUTH_PATH + }) + responses.add(method=responses.GET, + url=self.DUO_ORIGIN + self.DUO_FRAMELESS_AUTH_PATH, + body=self.DUO_PLUGIN_FORM_CONTENT + ) + # Duo plugin-form submission + responses.add(method=responses.POST, + url=self.DUO_ORIGIN + self.DUO_FRAMELESS_AUTH_PATH, + status=302, + adding_headers={ + 'Location': self.DUO_PROMPT_PATH + }, + match=[ + responses.matchers.urlencoded_params_matcher( + { + 'tx': self.DUO_TX, + '_xsrf': self.DUO_XSRF, + 'parent': 'None', + } + )] + ) + # Duo presents a second form, login-form + responses.add(method=responses.GET, + url=self.DUO_ORIGIN + self.DUO_PROMPT_PATH, + body=self.DUO_LOGIN_FORM_CONTENT + ) + # login-form submit, which triggers Push or Phone call if that's what the user wanted + duo_login_form_values = { + # Important to Duo + 'sid': self.DUO_SID, + 'postAuthDestination': 'OIDC_EXIT', + 'factor': duo_factor, + 'device': 'phone1', + '_xsrf': 'xsrfxsrfxsrfxsrfxsrfxsrfxsrfxsrfxsrfxs', + # Unimportant for triggering universal MFA, but part of the form inputs + 'should_update_dm': 'False', + 'should_retry_u2f_timeouts': 'True', + 'preferred_factor': 'Duo Push', + 'preferred_device': 'phone1', + 'out_of_date': 'False', + 'itype': 'okta', + 'has_phone_that_requires_compliance_text': 'False', + 'days_to_block': 'None', + 'days_out_of_date': '0', + 'url': '/frame/prompt', + 'ukey': 'duoukeyvalue' + } + if passcode: + duo_login_form_values['passcode'] = passcode + responses.add(method=responses.POST, + url=self.DUO_ORIGIN + '/frame/prompt', + body=json.dumps(self.DUO_LOGIN_FORM_SUBMISSION_RESPONSE), + match=[responses.matchers.urlencoded_params_matcher( + duo_login_form_values)] + ) + # 'No response yet' status + responses.add(method=responses.POST, + url=self.DUO_ORIGIN + '/frame/v4/status', + body=json.dumps(self.DUO_STATUS_PUSH_IN_PROGRESS_RESPONSE), + match=[responses.matchers.urlencoded_params_matcher( + { + 'txid': self.DUO_TXID, + 'sid': self.DUO_SID, + })] + ) + # 'User approved' status + responses.add(method=responses.POST, + url=self.DUO_ORIGIN + '/frame/v4/status', + body=json.dumps(self.DUO_STATUS_PUSH_COMPLETE_RESPONSE), + match=[responses.matchers.urlencoded_params_matcher( + { + 'txid': self.DUO_TXID, + 'sid': self.DUO_SID, + })] + ) + # Client posts to a Duo OIDC exit URL, which redirects to Okta + okta_oidc_callback = 'https://oktatenant.oktapreview.com/oauth2/v1/authorize/callback?state=oidcexitstate&code=oidccode' + responses.add(method=responses.POST, + url=self.DUO_ORIGIN + '/frame/v4/oidc/exit', + status=303, + adding_headers={ + 'Location': okta_oidc_callback + }, + match=[responses.matchers.urlencoded_params_matcher( + { + 'txid': self.DUO_TXID, + 'sid': self.DUO_SID, + 'dampen_choice': 'false', + '_xsrf': self.DUO_XSRF, + 'factor': duo_factor, + })] + ) + responses.add(method=responses.GET, + url=okta_oidc_callback, + body='', + adding_headers={ + 'Set-Cookie': f'sid={self.OKTA_SID_VALUE};Version=1;Path=/;Secure' + } + )