ci: fix pylint errors

enable-ci
argrento 2023-06-03 20:48:43 +02:00
rodzic e1d4832a11
commit 56e2978245
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: AF1F4775B5A95299
7 zmienionych plików z 82 dodań i 51 usunięć

1
.gitignore vendored
Wyświetl plik

@ -7,3 +7,4 @@ __pycache__
*.fw
*.bin
venv
.mypy_cache

Wyświetl plik

@ -23,9 +23,9 @@ import getpass
import json
import random
import shutil
import urllib
import urllib.parse
import uuid
from typing import Iterator, Tuple, Optional, Dict, Union, Any, List
from typing import Tuple, Dict, Union, Any, List
import zipfile
import zlib
@ -35,7 +35,9 @@ import errors
import urls
def encode_uint32(value: int) -> bytes:
return bytes([value & 0xff]) + bytes([(value >> 8) & 0xff]) + bytes([(value >> 16) & 0xff]) + bytes([(value >> 24) & 0xff]);
"""Convert 4-bytes value into a list with 4 bytes"""
return bytes([value & 0xff]) + bytes([(value >> 8) & 0xff]) + \
bytes([(value >> 16) & 0xff]) + bytes([(value >> 24) & 0xff])
class HuamiAmazfit:
"""Base class for logging in and receiving auth keys and GPS packs"""
@ -56,9 +58,11 @@ class HuamiAmazfit:
self.r = str(uuid.uuid4())
# IMEI or something unique
self.device_id = "02:00:00:%02x:%02x:%02x" % (random.randint(0, 255),
random.randint(0, 255),
random.randint(0, 255))
self.device_id = (
f"02:00:00:{random.randint(0, 255):02x}:{random.randint(0, 255):02x}:"
f"{random.randint(0, 255):02x}"
)
def get_access_token(self) -> str:
"""Get access token for log in"""
@ -76,7 +80,7 @@ class HuamiAmazfit:
if 'code' not in token_url_parameters:
raise ValueError("No 'code' parameter in login url.")
self.access_token = str(token_url_parameters['code'])
self.access_token = token_url_parameters['code'][0]
self.country_code = 'US'
elif self.method == 'amazfit':
@ -86,7 +90,7 @@ class HuamiAmazfit:
data = urls.PAYLOADS['tokens_amazfit']
data['password'] = self.password
response = requests.post(auth_url, data=data, allow_redirects=False)
response = requests.post(auth_url, data=data, allow_redirects=False, timeout=10)
response.raise_for_status()
# 'Location' parameter contains url with login status
@ -127,7 +131,7 @@ class HuamiAmazfit:
data['code'] = self.access_token
data['grant_type'] = 'access_token' if self.method == 'amazfit' else 'request_token'
response = requests.post(login_url, data=data, allow_redirects=False)
response = requests.post(login_url, data=data, allow_redirects=False, timeout=10)
response.raise_for_status()
login_result = response.json()
@ -162,7 +166,7 @@ class HuamiAmazfit:
headers['apptoken'] = self.app_token
params = {'enableMultiDevice': 'true'}
response = requests.get(devices_url, params=params, headers=headers)
response = requests.get(devices_url, params=params, headers=headers, timeout=10)
response.raise_for_status()
device_request = response.json()
if 'items' not in device_request:
@ -211,20 +215,20 @@ class HuamiAmazfit:
'appname': 'com.huami.midong',
'lang': 'en_US'
}
response = requests.get(fw_url, params=params, headers=headers)
response = requests.get(fw_url, params=params, headers=headers, timeout=10)
response.raise_for_status()
fw_response = response.json()
links = []
hashes = []
fw_links = []
fw_hashes = []
if 'firmwareUrl' in fw_response:
links.append(fw_response['firmwareUrl'])
hashes.append(fw_response['firmwareMd5'])
fw_links.append(fw_response['firmwareUrl'])
fw_hashes.append(fw_response['firmwareMd5'])
if 'fontUrl' in fw_response:
links.append(fw_response['fontUrl'])
hashes.append(fw_response['fontMd5'])
fw_links.append(fw_response['fontUrl'])
fw_hashes.append(fw_response['fontMd5'])
return (links, hashes)
return (fw_links, fw_hashes)
def get_gps_data(self) -> None:
"""Download GPS packs: almanac and AGPS"""
@ -237,40 +241,47 @@ class HuamiAmazfit:
for pack_idx, agps_pack_name in enumerate(agps_packs):
print(f"Downloading {agps_pack_name}...")
response = requests.get(agps_link.format(pack_name=agps_pack_name), headers=headers)
response = requests.get(agps_link.format(pack_name=agps_pack_name),
headers=headers, timeout=10)
response.raise_for_status()
agps_result = response.json()[0]
if 'fileUrl' not in agps_result:
raise ValueError("No 'fileUrl' parameter in files request.")
with requests.get(agps_result['fileUrl'], stream=True) as request:
with requests.get(agps_result['fileUrl'], stream=True, timeout=10) as request:
with open(agps_file_names[pack_idx], 'wb') as gps_file:
shutil.copyfileobj(request.raw, gps_file)
def build_gps_uihh(self) -> None:
""" Prepare uihh gps file """
print("Building gps_uihh.bin")
d = {'gps_alm.bin':0x05, 'gln_alm.bin':0x0f, 'lle_bds.lle':0x86, 'lle_gps.lle':0x87, 'lle_glo.lle':0x88, 'lle_gal.lle':0x89, 'lle_qzss.lle':0x8a}
cep_archive = zipfile.ZipFile('cep_7days.zip', 'r')
lle_archive = zipfile.ZipFile('lle_1week.zip', 'r')
f = open('gps_uihh.bin', 'wb')
content = bytes()
filecontent = bytes()
fileheader = bytes()
d = {'gps_alm.bin':0x05, 'gln_alm.bin':0x0f, 'lle_bds.lle':0x86, 'lle_gps.lle':0x87,
'lle_glo.lle':0x88, 'lle_gal.lle':0x89, 'lle_qzss.lle':0x8a}
with zipfile.ZipFile('cep_7days.zip', 'r') as cep_archive, \
zipfile.ZipFile('lle_1week.zip', 'r') as lle_archive, \
open('gps_uihh.bin', 'wb') as uihh_file:
content = bytes()
filecontent = bytes()
fileheader = bytes()
for key, value in d.items():
if value >= 0x86:
filecontent = lle_archive.read(key)
else:
filecontent = cep_archive.read(key)
for key, value in d.items():
if value >= 0x86:
filecontent = lle_archive.read(key)
else:
filecontent = cep_archive.read(key)
fileheader = bytes([1]) + bytes([value]) + encode_uint32(len(filecontent)) + encode_uint32(zlib.crc32(filecontent) & 0xffffffff)
content += fileheader + filecontent
fileheader = bytes([1]) + bytes([value]) + encode_uint32(len(filecontent)) + \
encode_uint32(zlib.crc32(filecontent) & 0xffffffff)
content += fileheader + filecontent
header = b'UIHH' + bytes([0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01]) + encode_uint32(zlib.crc32(content) & 0xffffffff) + \
bytes([0x00, 0x00, 0x00, 0x00, 0x00, 0x00]) + encode_uint32(len(content)) + bytes([0x00, 0x00, 0x00, 0x00, 0x00, 0x00])
header = b'UIHH' + \
bytes([0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01]) + \
encode_uint32(zlib.crc32(content) & 0xffffffff) + \
bytes([0x00, 0x00, 0x00, 0x00, 0x00, 0x00]) + \
encode_uint32(len(content)) + \
bytes([0x00, 0x00, 0x00, 0x00, 0x00, 0x00])
content = header + content
f.write(content)
f.close()
content = header + content
uihh_file.write(content)
def logout(self) -> str:
"""Log out from the current account"""
@ -279,9 +290,9 @@ class HuamiAmazfit:
data = urls.PAYLOADS['logout']
data['login_token'] = self.login_token
response = requests.post(logout_url, data=data)
logout_result = str(response.json()['result'])
return logout_result
response = requests.post(logout_url, data=data, timeout=10)
result = str(response.json()['result'])
return result
if __name__ == "__main__":
@ -382,7 +393,7 @@ if __name__ == "__main__":
print("Be extremely careful with downloaded files!")
for idx, wearable in enumerate(wearables):
if idx == wearable_id or wearable_id == -1:
if wearable_id in (idx, -1):
print(f"\n\u2553\u2500\u2500\u2500Device {idx}")
links, hashes = device.get_firmware(wearables[int(idx)])
if links:
@ -390,11 +401,11 @@ if __name__ == "__main__":
file_name = link.split('/')[-1]
print(f"\u2551 File: {file_name}")
print(f"\u2551 Hash: {hash_sum}")
with requests.get(link, stream=True) as r:
with requests.get(link, stream=True, timeout=10) as r:
with open(file_name, 'wb') as f:
shutil.copyfileobj(r.raw, f)
else:
print(f"\u2551 No updates found")
print("\u2551 No updates found")
print(footer)
if args.no_logout:

Wyświetl plik

@ -1,3 +1,4 @@
requests
pytest-pylint==0.18.0
pytest-flake8==1.0.6
types-requests

Wyświetl plik

Wyświetl plik

@ -1,5 +0,0 @@
from unittest import TestCase
class TryTesting(TestCase):
def test_always_passes(self) -> None:
self.assertTrue(True)

Wyświetl plik

@ -0,0 +1,23 @@
import os
import unittest
from huami_token import HuamiAmazfit
class TestAmazfit(unittest.TestCase):
def test_login(self) -> None:
email: str = os.environ.get('AMAZFIT_EMAIL', '')
password: str = os.environ.get('AMAZFIT_PASSWORD', '')
device = HuamiAmazfit(method="amazfit",
email=email,
password=password)
access_token = device.get_access_token()
user_id = device.login(external_token=access_token)
logout_result = device.logout()
print(user_id)
self.assertEqual(user_id,
"1132356262",
"Unexpected user id")
if __name__ == '__main__':
unittest.main()

Wyświetl plik

@ -20,7 +20,7 @@
"""Module for storin urls and payloads fro different requests"""
from typing import Dict, Any
from typing import Dict
URLS = {
'login_xiaomi': 'https://account.xiaomi.com/oauth2/authorize?skip_confirm=false&'