diff --git a/hyundai_kia_connect_api/KiaUvoApiEU.py b/hyundai_kia_connect_api/KiaUvoApiEU.py index a0aa49ee..c38b5231 100644 --- a/hyundai_kia_connect_api/KiaUvoApiEU.py +++ b/hyundai_kia_connect_api/KiaUvoApiEU.py @@ -11,6 +11,7 @@ from time import sleep from urllib.parse import parse_qs, urlparse +import re import pytz import requests from bs4 import BeautifulSoup @@ -178,7 +179,7 @@ def __init__(self, region: int, brand: int, language: str) -> None: self.CFB: str = base64.b64decode( "RFtoRq/vDXJmRndoZaZQyYo3/qFLtVReW8P7utRPcc0ZxOzOELm9mexvviBk/qqIp4A=" ) - self.BASIC_AUTHORIZATION: str = "Basic NmQ0NzdjMzgtM2NhNC00Y2YzLTk1NTctMmExOTI5YTk0NjU0OktVeTQ5WHhQekxwTHVvSzB4aEJDNzdXNlZYaG10UVI5aVFobUlGampvWTRJcHhzVg==" # noqa + self.BASIC_AUTHORIZATION: str = "Basic MzAyMGFmYTItMzBmZi00MTJhLWFhNTEtZDI4ZmJlOTAxZTEwOkZLRGRsZWYyZmZkbGVGRXdlRUxGS0VSaUxFUjJGRUQyMXNEZHdkZ1F6NmhGRVNFMw==" # noqa self.LOGIN_FORM_HOST = "accounts-eu.genesis.com" self.PUSH_TYPE = "GCM" @@ -241,30 +242,98 @@ def _get_control_headers(self, token: Token, vehicle: Vehicle) -> dict: } def login(self, username: str, password: str) -> Token: - stamp = self._get_stamp() - device_id = self._get_device_id(stamp) - cookies = self._get_cookies() - self._set_session_language(cookies) - authorization_code = None - try: - authorization_code = self._get_authorization_code_with_redirect_url( - username, password, cookies - ) - except Exception: - _LOGGER.debug(f"{DOMAIN} - get_authorization_code_with_redirect_url failed") - authorization_code = self._get_authorization_code_with_form( - username, password, cookies - ) + session = requests.Session() + session.headers.update( + { + "User-Agent": USER_AGENT_MOZILLA, + "Accept-Language": "en-GB,en;q=0.9", + } + ) - if authorization_code is None: - raise AuthenticationError("Login Failed") + # Step 1: GET oauth2 - sets initial cookie + url = f"https://{self.BASE_DOMAIN}/api/v1/user/oauth2/authorize?response_type=code&client_id={self.CCSP_SERVICE_ID}&redirect_uri=https://{self.LOGIN_FORM_HOST}/realms/eugenesisidm/ga-api/redirect2&lang=en&scope=url.newapp" + response = session.get(url, allow_redirects=False) + if response.status_code != 302: + raise AuthenticationError("Initial OAuth2 request failed") + session.cookies.update(response.cookies) + location_url = response.headers["Location"] + + # Step 2: Follow the 302 to fetch authorize page + response = session.get(location_url) + if response.status_code != 200: + raise AuthenticationError("Failed to fetch authorize page") + + # Step 3: GET session + url = f"https://{self.BASE_DOMAIN}/api/v1/user/session" + response = session.get(url) + if response.status_code != 204: + raise AuthenticationError("Failed to get session") + + # Step 4: POST language + url = f"https://{self.BASE_DOMAIN}/api/v1/user/language" + headers = {"Content-Type": "text/plain;charset=UTF-8"} + response = session.post(url, headers=headers, json={"lang": "en"}) + if response.status_code != 204: + raise AuthenticationError("Failed to set language") + + # Step 5: GET integration info + url = f"https://{self.BASE_DOMAIN}/api/v1/user/integrationinfo" + response = session.get(url) + if response.status_code != 200: + raise AuthenticationError("Failed to get integration info") + integration_info = response.json() + + # Step 6: Retrieve login form URL + url = f"https://{self.LOGIN_FORM_HOST}/realms/eugenesisidm/protocol/openid-connect/auth?client_id=ga-gcs&scope=openid%20profile%20email%20phone&response_type=code&redirect_uri=https://{self.BASE_DOMAIN}/api/v1/user/integration/redirect/login&ui_locales=en&state={integration_info['serviceId']}:{integration_info['userId']}" + response = session.get(url) + match = re.search(r'"loginAction":\s*"(https://.*?)"', response.text) + if not match: + raise AuthenticationError("Could not find loginAction URL") + login_action_url = match.group(1) + + # Step 7: Submit username and password to login + payload = {"username": username, "password": password} + response = session.post(login_action_url, data=payload, allow_redirects=False) + if response.status_code != 302: + raise AuthenticationError("Login failed") + session.cookies.update(response.cookies) + redirect_url = response.headers["Location"] - _, access_token, authorization_code = self._get_access_token( - stamp, authorization_code - ) - _, refresh_token = self._get_refresh_token(stamp, authorization_code) + # Step 8: Follow the 302 redirects until the final step + response = session.get(redirect_url, allow_redirects=True) + if response.status_code != 200: + raise AuthenticationError("Redirect failed") + + # Step 9: Perform silent signin to obtain the code + url = f"https://{self.BASE_DOMAIN}/api/v1/user/silentsignin" + headers = {"Content-Type": "text/plain;charset=UTF-8"} + response = session.post(url, headers=headers, json={"intUserId": ""}) + if response.status_code != 200: + raise AuthenticationError("Silent signin failed") + redirect_url_with_code = response.json().get("redirectUrl") + + # Step 10: Fetch the access token using the authorization code + session_code = re.search(r"code=([^&]*)", redirect_url_with_code).group(1) + url = f"https://{self.BASE_DOMAIN}/api/v1/user/oauth2/token" + payload = { + "client_id": self.CCSP_SERVICE_ID, + "code": session_code, + "grant_type": "authorization_code", + "redirect_uri": f"https://{self.LOGIN_FORM_HOST}/realms/eugenesisidm/ga-api/redirect2", + } + headers = { + "Content-Type": "application/x-www-form-urlencoded; charset=utf-8", + "Authorization": self.BASIC_AUTHORIZATION, + } + response = session.post(url, headers=headers, data=payload) + if response.status_code != 200: + raise AuthenticationError("Failed to get access token") + access_token = "Bearer " + response.json().get("access_token") + refresh_token = response.json().get("refresh_token") valid_until = dt.datetime.now(pytz.utc) + LOGIN_TOKEN_LIFETIME + device_id = self._get_device_id(self._get_stamp()) + return Token( username=username, password=password, @@ -313,7 +382,9 @@ def _get_time_from_string(self, value, timesection) -> dt.datetime.time: lastTwo = int(value[-2:]) if lastTwo > 60: value = int(value) + 40 - if int(value) > 1260: + if int(value) == 0: + value = dt.time(0, 0) + elif int(value) > 1260: value = dt.datetime.strptime(str(value), "%H%M").time() else: d = dt.datetime.strptime(str(value), "%I%M") @@ -330,11 +401,14 @@ def update_vehicle_with_cached_state(self, token: Token, vehicle: Vehicle) -> No else: url += "/status/latest" + headers = self._get_authenticated_headers( + token, vehicle.ccu_ccs2_protocol_support + ) + headers["ccsp-device-id"] = token.device_id + response = requests.get( url, - headers=self._get_authenticated_headers( - token, vehicle.ccu_ccs2_protocol_support - ), + headers=headers, ).json() _LOGGER.debug(f"{DOMAIN} - get_cached_vehicle_status response: {response}")