Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix #514 Genesis EU authentication (needs review) #638

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 99 additions & 25 deletions hyundai_kia_connect_api/KiaUvoApiEU.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from time import sleep
from urllib.parse import parse_qs, urlparse

import re
import pytz
import requests
from bs4 import BeautifulSoup
Expand Down Expand Up @@ -178,7 +179,7 @@ def __init__(self, region: int, brand: int, language: str) -> None:
self.CFB: str = base64.b64decode(
"RFtoRq/vDXJmRndoZaZQyYo3/qFLtVReW8P7utRPcc0ZxOzOELm9mexvviBk/qqIp4A="
)
self.BASIC_AUTHORIZATION: str = "Basic NmQ0NzdjMzgtM2NhNC00Y2YzLTk1NTctMmExOTI5YTk0NjU0OktVeTQ5WHhQekxwTHVvSzB4aEJDNzdXNlZYaG10UVI5aVFobUlGampvWTRJcHhzVg==" # noqa
self.BASIC_AUTHORIZATION: str = "Basic MzAyMGFmYTItMzBmZi00MTJhLWFhNTEtZDI4ZmJlOTAxZTEwOkZLRGRsZWYyZmZkbGVGRXdlRUxGS0VSaUxFUjJGRUQyMXNEZHdkZ1F6NmhGRVNFMw==" # noqa
self.LOGIN_FORM_HOST = "accounts-eu.genesis.com"
self.PUSH_TYPE = "GCM"

Expand Down Expand Up @@ -241,30 +242,98 @@ def _get_control_headers(self, token: Token, vehicle: Vehicle) -> dict:
}

def login(self, username: str, password: str) -> Token:
stamp = self._get_stamp()
device_id = self._get_device_id(stamp)
cookies = self._get_cookies()
self._set_session_language(cookies)
authorization_code = None
try:
authorization_code = self._get_authorization_code_with_redirect_url(
username, password, cookies
)
except Exception:
_LOGGER.debug(f"{DOMAIN} - get_authorization_code_with_redirect_url failed")
authorization_code = self._get_authorization_code_with_form(
username, password, cookies
)
session = requests.Session()
session.headers.update(
{
"User-Agent": USER_AGENT_MOZILLA,
"Accept-Language": "en-GB,en;q=0.9",
}
)

if authorization_code is None:
raise AuthenticationError("Login Failed")
# Step 1: GET oauth2 - sets initial cookie
url = f"https://{self.BASE_DOMAIN}/api/v1/user/oauth2/authorize?response_type=code&client_id={self.CCSP_SERVICE_ID}&redirect_uri=https://{self.LOGIN_FORM_HOST}/realms/eugenesisidm/ga-api/redirect2&lang=en&scope=url.newapp"
response = session.get(url, allow_redirects=False)
if response.status_code != 302:
raise AuthenticationError("Initial OAuth2 request failed")
session.cookies.update(response.cookies)
location_url = response.headers["Location"]

# Step 2: Follow the 302 to fetch authorize page
response = session.get(location_url)
if response.status_code != 200:
raise AuthenticationError("Failed to fetch authorize page")

# Step 3: GET session
url = f"https://{self.BASE_DOMAIN}/api/v1/user/session"
response = session.get(url)
if response.status_code != 204:
raise AuthenticationError("Failed to get session")

# Step 4: POST language
url = f"https://{self.BASE_DOMAIN}/api/v1/user/language"
headers = {"Content-Type": "text/plain;charset=UTF-8"}
response = session.post(url, headers=headers, json={"lang": "en"})
if response.status_code != 204:
raise AuthenticationError("Failed to set language")

# Step 5: GET integration info
url = f"https://{self.BASE_DOMAIN}/api/v1/user/integrationinfo"
response = session.get(url)
if response.status_code != 200:
raise AuthenticationError("Failed to get integration info")
integration_info = response.json()

# Step 6: Retrieve login form URL
url = f"https://{self.LOGIN_FORM_HOST}/realms/eugenesisidm/protocol/openid-connect/auth?client_id=ga-gcs&scope=openid%20profile%20email%20phone&response_type=code&redirect_uri=https://{self.BASE_DOMAIN}/api/v1/user/integration/redirect/login&ui_locales=en&state={integration_info['serviceId']}:{integration_info['userId']}"
response = session.get(url)
match = re.search(r'"loginAction":\s*"(https://.*?)"', response.text)
if not match:
raise AuthenticationError("Could not find loginAction URL")
login_action_url = match.group(1)

# Step 7: Submit username and password to login
payload = {"username": username, "password": password}
response = session.post(login_action_url, data=payload, allow_redirects=False)
if response.status_code != 302:
raise AuthenticationError("Login failed")
session.cookies.update(response.cookies)
redirect_url = response.headers["Location"]

_, access_token, authorization_code = self._get_access_token(
stamp, authorization_code
)
_, refresh_token = self._get_refresh_token(stamp, authorization_code)
# Step 8: Follow the 302 redirects until the final step
response = session.get(redirect_url, allow_redirects=True)
if response.status_code != 200:
raise AuthenticationError("Redirect failed")

# Step 9: Perform silent signin to obtain the code
url = f"https://{self.BASE_DOMAIN}/api/v1/user/silentsignin"
headers = {"Content-Type": "text/plain;charset=UTF-8"}
response = session.post(url, headers=headers, json={"intUserId": ""})
if response.status_code != 200:
raise AuthenticationError("Silent signin failed")
redirect_url_with_code = response.json().get("redirectUrl")

# Step 10: Fetch the access token using the authorization code
session_code = re.search(r"code=([^&]*)", redirect_url_with_code).group(1)
url = f"https://{self.BASE_DOMAIN}/api/v1/user/oauth2/token"
payload = {
"client_id": self.CCSP_SERVICE_ID,
"code": session_code,
"grant_type": "authorization_code",
"redirect_uri": f"https://{self.LOGIN_FORM_HOST}/realms/eugenesisidm/ga-api/redirect2",
}
headers = {
"Content-Type": "application/x-www-form-urlencoded; charset=utf-8",
"Authorization": self.BASIC_AUTHORIZATION,
}
response = session.post(url, headers=headers, data=payload)
if response.status_code != 200:
raise AuthenticationError("Failed to get access token")
access_token = "Bearer " + response.json().get("access_token")
refresh_token = response.json().get("refresh_token")
valid_until = dt.datetime.now(pytz.utc) + LOGIN_TOKEN_LIFETIME

device_id = self._get_device_id(self._get_stamp())

return Token(
username=username,
password=password,
Expand Down Expand Up @@ -313,7 +382,9 @@ def _get_time_from_string(self, value, timesection) -> dt.datetime.time:
lastTwo = int(value[-2:])
if lastTwo > 60:
value = int(value) + 40
if int(value) > 1260:
if int(value) == 0:
value = dt.time(0, 0)
elif int(value) > 1260:
value = dt.datetime.strptime(str(value), "%H%M").time()
else:
d = dt.datetime.strptime(str(value), "%I%M")
Expand All @@ -330,11 +401,14 @@ def update_vehicle_with_cached_state(self, token: Token, vehicle: Vehicle) -> No
else:
url += "/status/latest"

headers = self._get_authenticated_headers(
token, vehicle.ccu_ccs2_protocol_support
)
headers["ccsp-device-id"] = token.device_id

response = requests.get(
url,
headers=self._get_authenticated_headers(
token, vehicle.ccu_ccs2_protocol_support
),
headers=headers,
).json()

_LOGGER.debug(f"{DOMAIN} - get_cached_vehicle_status response: {response}")
Expand Down
Loading