huami-token/huami_token.py

437 wiersze
16 KiB
Python
Czysty Zwykły widok Historia

2020-05-25 11:27:57 +00:00
#!/usr/bin/env python3
2020-11-12 13:41:27 +00:00
# pylint: disable=too-many-instance-attributes
# pylint: disable=invalid-name
# Copyright (c) 2020 Kirill Snezhko
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
2020-11-12 13:41:27 +00:00
"""Main module"""
2020-05-25 11:27:57 +00:00
import json
2020-06-07 21:26:18 +00:00
import uuid
import random
2020-05-28 16:53:33 +00:00
import shutil
2020-06-07 21:26:18 +00:00
import urllib
import argparse
import getpass
2020-11-12 13:41:27 +00:00
import requests
2020-06-07 21:26:18 +00:00
from rich.console import Console
2020-11-12 13:41:27 +00:00
from rich.table import Table
2020-08-04 15:14:27 +00:00
from rich import box
2020-06-07 21:26:18 +00:00
import urls
2020-05-25 11:27:57 +00:00
2021-09-21 05:48:12 +00:00
import json
2020-05-25 11:27:57 +00:00
class HuamiAmazfit:
2020-11-12 13:41:27 +00:00
"""Base class for logging in and receiving auth keys and GPS packs"""
2020-05-25 11:27:57 +00:00
def __init__(self, method="amazfit", email=None, password=None):
if method == 'amazfit' and (not email or not password):
raise ValueError("For Amazfit method E-Mail and Password can not be null.")
self.method = method
self.email = email
self.password = password
self.access_token = None
self.country_code = None
self.app_token = None
self.login_token = None
self.user_id = None
self.r = str(uuid.uuid4())
# IMEI or something unique
self.device_id = "02:00:00:%02x:%02x:%02x" % (random.randint(0, 255),
random.randint(0, 255),
random.randint(0, 255))
2020-11-12 13:41:27 +00:00
def get_access_token(self) -> str:
"""Get access token for log in"""
2020-06-01 16:49:48 +00:00
print(f"Getting access token with {self.method} login method...")
2020-05-25 11:27:57 +00:00
if self.method == 'xiaomi':
2020-06-07 21:26:18 +00:00
login_url = urls.URLS["login_xiaomi"]
2020-05-25 11:27:57 +00:00
2020-06-01 16:49:48 +00:00
print(f"Copy this URL to web-browser \n\n{login_url}\n\nand login to your Mi account.")
2020-05-25 11:27:57 +00:00
token_url = input("\nPaste URL after redirection here.\n")
parsed_token_url = urllib.parse.urlparse(token_url)
token_url_parameters = urllib.parse.parse_qs(parsed_token_url.query)
if 'code' not in token_url_parameters:
raise ValueError("No 'code' parameter in login url.")
self.access_token = token_url_parameters['code']
self.country_code = 'US'
elif self.method == 'amazfit':
2020-06-07 21:26:18 +00:00
auth_url = urls.URLS['tokens_amazfit'].format(user_email=urllib.parse.quote(self.email))
data = urls.PAYLOADS['tokens_amazfit']
data['password'] = self.password
2020-05-25 11:27:57 +00:00
response = requests.post(auth_url, data=data, allow_redirects=False)
response.raise_for_status()
# 'Location' parameter contains url with login status
redirect_url = urllib.parse.urlparse(response.headers.get('Location'))
redirect_url_parameters = urllib.parse.parse_qs(redirect_url.query)
if 'error' in redirect_url_parameters:
2020-11-12 13:41:27 +00:00
raise ValueError(f"Wrong E-mail or Password." \
f"Error: {redirect_url_parameters['error']}")
2020-05-25 11:27:57 +00:00
if 'access' not in redirect_url_parameters:
raise ValueError("No 'access' parameter in login url.")
if 'country_code' not in redirect_url_parameters:
# Sometimes for no reason server does not return country_code
# In this case we extract country_code from region, because it looks
# like this: 'eu-central-1'
region = redirect_url_parameters['region'][0]
self.country_code = region[0:2].upper()
else:
self.country_code = redirect_url_parameters['country_code']
2020-05-25 11:27:57 +00:00
self.access_token = redirect_url_parameters['access']
print("Token: {}".format(self.access_token))
return self.access_token
2020-11-12 13:41:27 +00:00
def login(self, external_token=None) -> None:
"""Perform login and get app and login tokens"""
2020-05-25 11:27:57 +00:00
print("Logging in...")
if external_token:
self.access_token = external_token
2020-06-07 21:26:18 +00:00
login_url = urls.URLS['login_amazfit']
data = urls.PAYLOADS['login_amazfit']
data['country_code'] = self.country_code
data['device_id'] = self.device_id
data['third_name'] = 'huami' if self.method == 'amazfit' else 'mi-watch'
data['code'] = self.access_token
data['grant_type'] = 'access_token' if self.method == 'amazfit' else 'request_token'
2020-05-25 11:27:57 +00:00
response = requests.post(login_url, data=data, allow_redirects=False)
response.raise_for_status()
login_result = response.json()
if 'error_code' in login_result:
2020-06-01 16:49:48 +00:00
raise ValueError(f"Login error. Error: {login_result['error_code']}")
2020-05-25 11:27:57 +00:00
if 'token_info' not in login_result:
raise ValueError("No 'token_info' parameter in login data.")
2020-11-12 13:41:27 +00:00
# else
# Do not need else, because raise breaks control flow
token_info = login_result['token_info']
if 'app_token' not in token_info:
raise ValueError("No 'app_token' parameter in login data.")
self.app_token = token_info['app_token']
if 'login_token' not in token_info:
raise ValueError("No 'login_token' parameter in login data.")
self.login_token = token_info['login_token']
if 'user_id' not in token_info:
raise ValueError("No 'user_id' parameter in login data.")
self.user_id = token_info['user_id']
2020-05-25 11:27:57 +00:00
print("Logged in! User id: {}".format(self.user_id))
def get_wearables(self) -> dict:
2020-11-12 13:41:27 +00:00
"""Request a list of linked devices"""
print("Getting linked wearables...")
2020-05-25 11:27:57 +00:00
2020-06-07 21:26:18 +00:00
devices_url = urls.URLS['devices'].format(user_id=urllib.parse.quote(self.user_id))
headers = urls.PAYLOADS['devices']
headers['apptoken'] = self.app_token
params = {'enableMultiDevice': 'true'}
2020-05-25 11:27:57 +00:00
response = requests.get(devices_url, params=params, headers=headers)
2020-05-25 11:27:57 +00:00
response.raise_for_status()
device_request = response.json()
if 'items' not in device_request:
raise ValueError("No 'items' parameter in devices data.")
devices = device_request['items']
2021-02-23 14:03:23 +00:00
_wearables = []
2021-02-23 14:03:23 +00:00
for _wearable in devices:
if 'macAddress' not in _wearable:
2020-05-25 11:27:57 +00:00
raise ValueError("No 'macAddress' parameter in device data.")
2021-02-23 14:03:23 +00:00
mac_address = _wearable['macAddress']
2020-05-25 11:27:57 +00:00
2021-02-23 14:03:23 +00:00
if 'additionalInfo' not in _wearable:
2020-05-25 11:27:57 +00:00
raise ValueError("No 'additionalInfo' parameter in device data.")
2021-02-23 14:03:23 +00:00
device_info = json.loads(_wearable['additionalInfo'])
2020-05-25 11:27:57 +00:00
if 'auth_key' not in device_info:
raise ValueError("No 'auth_key' parameter in device data.")
key_str = device_info['auth_key']
auth_key = '0x' + (key_str if key_str != '' else '00')
2021-02-23 14:03:23 +00:00
_wearables.append(
{
'active_status': str(_wearable.get('activeStatus', '-1')),
'mac_address': mac_address,
'auth_key': auth_key,
'device_source': str(_wearable.get('deviceSource', 0)),
'firmware_version': _wearable.get('firmwareVersion', 'v-1'),
'hardware_version': device_info.get('hardwareVersion', 'v-1'),
'production_source': device_info.get('productVersion', '0')
}
)
2020-05-25 11:27:57 +00:00
2021-02-23 14:03:23 +00:00
return _wearables
2020-05-25 11:27:57 +00:00
2021-02-23 23:22:05 +00:00
@staticmethod
def get_firmware(_wearable: dict) -> None:
"""Check and download updates for the furmware and fonts"""
fw_url = urls.URLS["fw_updates"]
params = urls.PAYLOADS["fw_updates"]
params['deviceSource'] = _wearable['device_source']
params['firmwareVersion'] = _wearable['firmware_version']
params['hardwareVersion'] = _wearable['hardware_version']
params['productionSource'] = _wearable['production_source']
headers = {
'appplatform': 'android_phone',
'appname': 'com.huami.midong',
'lang': 'en_US'
}
response = requests.get(fw_url, params=params, headers=headers)
response.raise_for_status()
fw_response = response.json()
links = []
hashes = []
if 'firmwareUrl' in fw_response:
links.append(fw_response['firmwareUrl'])
hashes.append(fw_response['firmwareMd5'])
if 'fontUrl' in fw_response:
links.append(fw_response['fontUrl'])
hashes.append(fw_response['fontMd5'])
if not links:
print("No updates found!")
else:
2021-02-23 23:22:05 +00:00
for link, hash_sum in zip(links, hashes):
file_name = link.split('/')[-1]
print(f"Downloading {file_name} with MD5-hash {hash_sum}...")
with requests.get(link, stream=True) as r:
with open(file_name, 'wb') as f:
shutil.copyfileobj(r.raw, f)
2020-11-12 13:41:27 +00:00
def get_gps_data(self) -> None:
"""Download GPS packs: almanac and AGPS"""
agps_packs = ["AGPS_ALM", "AGPSZIP", "LLE", "AGPS"]
agps_file_names = ["cep_1week.zip", "cep_7days.zip", "lle_1week.zip", "cep_pak.bin"]
2020-06-07 21:26:18 +00:00
agps_link = urls.URLS['agps']
2020-05-28 16:53:33 +00:00
2020-06-07 21:26:18 +00:00
headers = urls.PAYLOADS['agps']
headers['apptoken'] = self.app_token
2020-05-28 16:53:33 +00:00
2021-02-23 23:22:05 +00:00
for pack_idx, agps_pack_name in enumerate(agps_packs):
print(f"Downloading {agps_pack_name}...")
2020-06-07 21:26:18 +00:00
response = requests.get(agps_link.format(pack_name=agps_pack_name), headers=headers)
2020-05-28 16:53:33 +00:00
response.raise_for_status()
agps_result = response.json()[0]
if 'fileUrl' not in agps_result:
raise ValueError("No 'fileUrl' parameter in files request.")
2020-11-12 13:41:27 +00:00
with requests.get(agps_result['fileUrl'], stream=True) as request:
2021-02-23 23:22:05 +00:00
with open(agps_file_names[pack_idx], 'wb') as gps_file:
2020-11-12 13:41:27 +00:00
shutil.copyfileobj(request.raw, gps_file)
2020-05-28 16:53:33 +00:00
2020-11-12 13:41:27 +00:00
def logout(self) -> None:
"""Log out from the current account"""
2020-06-07 21:26:18 +00:00
logout_url = urls.URLS['logout']
data = urls.PAYLOADS['logout']
data['login_token'] = self.login_token
2020-05-25 11:27:57 +00:00
response = requests.post(logout_url, data=data)
logout_result = response.json()
if logout_result['result'] == 'ok':
print("\nLogged out.")
else:
print("\nError logging out.")
if __name__ == "__main__":
2021-09-21 05:48:12 +00:00
no_logout = None
login_method = None
login_method_required = True
config = {}
try:
config_file = open("config.json",'r')
config = json.load(config_file)
config_file.close()
login_method_required=False
except Exception as e:
print ("except",e)
login_method_required=True
2020-05-28 16:53:33 +00:00
parser = argparse.ArgumentParser(description="Obtain Bluetooth Auth key from Amazfit "
"servers and download AGPS data.")
2020-05-25 11:27:57 +00:00
parser.add_argument("-m",
"--method",
choices=["amazfit", "xiaomi"],
default="amazfit",
2021-09-21 05:48:12 +00:00
required=login_method_required,
2020-05-25 11:27:57 +00:00
help="Login method ")
parser.add_argument("-e",
"--email",
required=False,
help="Account e-mail address")
2020-05-25 11:27:57 +00:00
parser.add_argument("-p",
"--password",
required=False,
help="Account Password")
parser.add_argument("-b",
"--bt_keys",
required=False,
action='store_true',
help="Get bluetooth tokens of paired devices")
parser.add_argument("-g",
"--gps",
required=False,
action='store_true',
help="Download A-GPS files")
parser.add_argument("-f",
"--firmware",
required=False,
action='store_true',
help='Request firmware updates. Works only with -b/--bt_keys argument. '
'Extremely dangerous!')
parser.add_argument("-a",
"--all",
required=False,
action='store_true',
help="Do everything: get bluetooth tokens, download A-GPS files. But "
"do NOT download firmware updates")
parser.add_argument("-n",
"--no_logout",
required=False,
action='store_true',
2020-11-12 13:41:27 +00:00
help="Do not logout, keep active session and "\
"display app token and access token")
2020-05-25 11:27:57 +00:00
args = parser.parse_args()
console = Console()
2020-08-04 15:14:27 +00:00
table = Table(show_header=True, header_style="bold", box=box.ASCII)
table.add_column("ID", width=3, justify='center')
table.add_column("ACT", width=3, justify='center')
table.add_column("MAC", style="dim", width=17, justify='center')
table.add_column("auth_key", width=45, justify='center')
if args.firmware and not args.bt_keys:
parser.error("Can not use -f/--firmware without -b/--bt_keys!")
2021-09-21 05:48:12 +00:00
if "login_method" in config.keys():
login_method = config["login_method"]
else:
login_method = args.method
if no_logout == None:
no_logout = args.no_logout
if args.password is None and login_method == "amazfit":
args.password = getpass.getpass()
2021-09-21 05:48:12 +00:00
device = HuamiAmazfit(method=login_method,
2020-05-25 11:27:57 +00:00
email=args.email,
password=args.password)
2021-09-21 05:48:12 +00:00
if "user_id" in config.keys():
device.user_id = config["user_id"]
if "login_token" in config.keys():
device.login_token = config["login_token"]
if "app_token" in config.keys():
device.app_token = config["app_token"]
if "no_logout" in config.keys():
no_logout = config["no_logout"]
if device.login_token == None and device.app_token == None:
device.get_access_token()
device.login()
wearables = []
if args.bt_keys or args.all:
wearables = device.get_wearables()
for idx, wearable in enumerate(wearables):
table.add_row(str(idx), wearable['active_status'],
wearable['mac_address'], wearable['auth_key'])
console.print(table)
if args.firmware:
print("Downloading the firmware is untested and can brick your device. "
"I am not responsible for any problems that might arise.")
answer = input("Do you want to proceed? [yes/no] ")
if answer.lower() in ['yes', 'y', 'ye']:
wearable_id = input("ID of the device to check for updates (-1 for all of them): ")
if wearable_id == "-1":
print("Be extremely careful with downloaded files!")
for idx, wearable in enumerate(wearables):
print(f"\nChecking for device {idx}...")
device.get_firmware(wearable)
elif int(wearable_id) in range(0, len(wearables)):
2021-02-27 06:56:23 +00:00
device.get_firmware(wearables[int(wearable_id)])
else:
print("Wrong input!")
if args.gps or args.all:
device.get_gps_data()
2021-09-21 05:48:12 +00:00
if no_logout:
print("\nNo logout!")
print(f"app_token={device.app_token}\nlogin_token={device.login_token}")
2021-09-21 05:48:12 +00:00
config = {}
config["login_token"] = device.login_token
config["app_token"] = device.app_token
config["login_method"] = login_method
config["no_logout"] = no_logout
config["user_id"] = device.user_id
config['country_code'] = device.country_code
config['device_id'] = device.device_id
f = open("config.json", 'w')
json.dump(config, f, indent=4, sort_keys=True)
f.close()
else:
device.logout()
2021-09-21 05:48:12 +00:00