# MicroPython package installer # MIT license; Copyright (c) 2022 Jim Mussared import urequests as requests import sys _PACKAGE_INDEX = const("https://micropython.org/pi/v2") _CHUNK_SIZE = 128 # This implements os.makedirs(os.dirname(path)) def _ensure_path_exists(path): import os split = path.split("/") # Handle paths starting with "/". if not split[0]: split.pop(0) split[0] = "/" + split[0] prefix = "" for i in range(len(split) - 1): prefix += split[i] try: os.stat(prefix) except: os.mkdir(prefix) prefix += "/" # Copy from src (stream) to dest (function-taking-bytes) def _chunk(src, dest): buf = memoryview(bytearray(_CHUNK_SIZE)) while True: n = src.readinto(buf) if n == 0: break dest(buf if n == _CHUNK_SIZE else buf[:n]) # Check if the specified path exists and matches the hash. def _check_exists(path, short_hash): import os try: import binascii import hashlib with open(path, "rb") as f: hs256 = hashlib.sha256() _chunk(f, hs256.update) existing_hash = str(binascii.hexlify(hs256.digest())[: len(short_hash)], "utf-8") return existing_hash == short_hash except: return False def _rewrite_url(url, branch=None): if not branch: branch = "HEAD" if url.startswith("github:"): url = url[7:].split("/") url = ( "https://raw.githubusercontent.com/" + url[0] + "/" + url[1] + "/" + branch + "/" + "/".join(url[2:]) ) return url def _download_file(url, dest): response = requests.get(url) try: if response.status_code != 200: print("Error", response.status_code, "requesting", url) return False print("Copying:", dest) _ensure_path_exists(dest) with open(dest, "wb") as f: _chunk(response.raw, f.write) return True finally: response.close() def _install_json(package_json_url, index, target, version, mpy): response = requests.get(_rewrite_url(package_json_url, version)) try: if response.status_code != 200: print("Package not found:", package_json_url) return False package_json = response.json() finally: response.close() for target_path, short_hash in package_json.get("hashes", ()): fs_target_path = target + "/" + target_path if _check_exists(fs_target_path, short_hash): print("Exists:", fs_target_path) else: file_url = "{}/file/{}/{}".format(index, short_hash[:2], short_hash) if not _download_file(file_url, fs_target_path): print("File not found: {} {}".format(target_path, short_hash)) return False for target_path, url in package_json.get("urls", ()): fs_target_path = target + "/" + target_path if not _download_file(_rewrite_url(url, version), fs_target_path): print("File not found: {} {}".format(target_path, url)) return False for dep, dep_version in package_json.get("deps", ()): if not _install_package(dep, index, target, dep_version, mpy): return False return True def _install_package(package, index, target, version, mpy): if ( package.startswith("http://") or package.startswith("https://") or package.startswith("github:") ): if package.endswith(".py") or package.endswith(".mpy"): print("Downloading {} to {}".format(package, target)) return _download_file( _rewrite_url(package, version), target + "/" + package.rsplit("/")[-1] ) else: if not package.endswith(".json"): if not package.endswith("/"): package += "/" package += "package.json" print("Installing {} to {}".format(package, target)) else: if not version: version = "latest" print("Installing {} ({}) from {} to {}".format(package, version, index, target)) mpy_version = ( sys.implementation._mpy & 0xFF if mpy and hasattr(sys.implementation, "_mpy") else "py" ) package = "{}/package/{}/{}/{}.json".format(index, mpy_version, package, version) return _install_json(package, index, target, version, mpy) def install(package, index=_PACKAGE_INDEX, target=None, version=None, mpy=True): if not target: for p in sys.path: if p.endswith("/lib"): target = p break else: print("Unable to find lib dir in sys.path") return if _install_package(package, index.rstrip("/"), target, version, mpy): print("Done") else: print("Package may be partially installed")