upip: add upip package and command line to install from Python Package Index.

pull/632/head
Jonas Scharpf 2023-03-25 15:00:15 +01:00
rodzic d8e058dab3
commit 28467b5a8f
6 zmienionych plików z 357 dodań i 153 usunięć

Wyświetl plik

@ -117,8 +117,8 @@ of the fork, the branch the packages were built from, and the package name.)
## Installing packages from Python Package Index
It is possible to use the `mpremote mip install` or `mip.install()` methods to
install packages built from the official
It is possible to use the `mpremote upip install` or `upip.install()` methods
to install packages built from the official
[PyPI](https://pypi.org/), [Test PyPI](https://test.pypi.org/) or a selfhosted
Python Package Index.
@ -126,18 +126,22 @@ To install a package and its dependencies from a Python Package Index, use
commands such as:
```bash
$ mpremote connect /dev/ttyUSB0 mip install --index PACKAGE_INDEX --pypi PACKAGE_NAME
$ mpremote connect /dev/ttyUSB0 upip install PACKAGE_NAME
```
Or from a networked device:
```py
import mip
mip.install(PACKAGE_NAME, index=PACKAGE_INDEX, pypi=True)
import upip
upip.install(PACKAGE_NAME)
```
(Where `PACKAGE_NAME` and `PACKAGE_INDEX` are replaced with the package name
and the package index URL, e.g. `https://test.pypi.org/pypi` for Test PyPI)
A custom Python Package Index can be specified with the `index` keyword or
`--index` on the command line. `PACKAGE_NAME` and `PACKAGE_INDEX` are replaced
with the package name and the package index URL, e.g.
`https://test.pypi.org/pypi` for Test PyPI, default is `https://pypi.org/pypi`.
Additionally the `version` keyword or `--version` on the command line allows
the installation of a specific package version, default is latest available.
## Contributing

Wyświetl plik

@ -1,7 +1,5 @@
# MicroPython package installer
# MIT license
# Copyright (c) 2022 Jim Mussared
# Extended with PyPI support by brainelectronics 2023
# MIT license; Copyright (c) 2022 Jim Mussared
import urequests as requests
import sys
@ -44,6 +42,8 @@ def _chunk(src, dest):
# Check if the specified path exists and matches the hash.
def _check_exists(path, short_hash):
import os
try:
import binascii
import hashlib
@ -92,24 +92,16 @@ def _download_file(url, dest):
response.close()
def _get_package_json(package_json_url, version):
package_json = {}
def _install_json(package_json_url, index, target, version, mpy):
response = requests.get(_rewrite_url(package_json_url, version))
try:
if response.status_code != 200:
print("Package not found:", package_json_url)
return package_json
return False
package_json = response.json()
finally:
response.close()
return package_json
def _install_json(package_json_url, index, target, version, mpy):
package_json = _get_package_json(package_json_url, version)
for target_path, short_hash in package_json.get("hashes", ()):
fs_target_path = target + "/" + target_path
if _check_exists(fs_target_path, short_hash):
@ -130,124 +122,11 @@ def _install_json(package_json_url, index, target, version, mpy):
return True
def _install_tar(package_json_url, index, target, version):
import gc
package_json = _get_package_json(package_json_url, version)
meta = {}
if not version:
version = package_json.get("info", {}).get("version", "")
if version not in package_json.get("releases", ()):
print("Version {} not found".format(version))
return False
package_url = package_json["releases"][version][0]["url"]
# save some memory, the large dict is no longer required
del package_json
gc.collect()
fs_target_path = target + "/" + package_url.rsplit("/", 1)[1]
if not _download_file(package_url, fs_target_path):
print("Failed to download {} to {}".format(package_url, fs_target_path))
return False
try:
from uzlib import DecompIO
from utarfile import TarFile
gzdict_sz = 16 + 15
sz = gc.mem_free() + gc.mem_alloc()
if sz <= 65536:
gzdict_sz = 16 + 12
zipped_file = open(fs_target_path, "rb")
decompressed_file = DecompIO(zipped_file, gzdict_sz)
tar_file = TarFile(fileobj=decompressed_file)
meta = _install_tar_file(tar_file, target)
zipped_file.close()
del zipped_file
del decompressed_file
del tar_file
except Exception as e:
print("Failed to decompress downloaded file due to {}".format(e))
return False
# cleanup downloaded file
try:
from os import unlink
unlink(fs_target_path)
except Exception as e:
print("Error during cleanup of {}".format(fs_target_path), e)
gc.collect()
deps = meta.get("deps", "").rstrip()
if deps:
deps = deps.decode("utf-8").split("\n")
print("Install additional deps: {}".format(deps))
results = []
for ele in deps:
res = _install_package(
package=ele, index=index, target=target, version=None, mpy=False, pypi=True
)
if not res:
print("Package may be partially installed")
results.append(res)
return all(results)
return True
def _install_tar_file(f, target):
from utarfile import DIRTYPE
from shutil import copyfileobj
meta = {}
for info in f:
if "PaxHeader" in info.name:
continue
print("Processing: {}".format(info))
fname = info.name
try:
fname = fname[fname.index("/") + 1 :]
except ValueError:
fname = ""
save = True
for p in ("setup.", "PKG-INFO", "README"):
if fname.startswith(p) or ".egg-info" in fname:
if fname.endswith("/requires.txt"):
meta["deps"] = f.extractfile(info).read()
save = False
break
if save:
outfname = target + "/" + fname
_ensure_path_exists(outfname)
if info.type != DIRTYPE:
this_file = f.extractfile(info)
copyfileobj(this_file, open(outfname, "wb"))
return meta
def _install_package(package, index, target, version, mpy, pypi):
def _install_package(package, index, target, version, mpy):
if (
package.startswith("http://")
or package.startswith("https://")
or package.startswith("github:")
or pypi
):
if package.endswith(".py") or package.endswith(".mpy"):
print("Downloading {} to {}".format(package, target))
@ -255,23 +134,11 @@ def _install_package(package, index, target, version, mpy, pypi):
_rewrite_url(package, version), target + "/" + package.rsplit("/")[-1]
)
else:
if pypi:
this_version = version
if not version:
this_version = "latest"
print(
"Installing {} ({}) from {} to {}".format(package, this_version, index, target)
)
package = "{}/{}/json".format(index, package)
install("utarfile")
install("shutil")
return _install_tar(package, index, target, version)
else:
if not package.endswith(".json"):
if not package.endswith("/"):
package += "/"
package += "package.json"
print("Installing {} to {}".format(package, target))
if not package.endswith(".json"):
if not package.endswith("/"):
package += "/"
package += "package.json"
print("Installing {} to {}".format(package, target))
else:
if not version:
version = "latest"
@ -286,7 +153,7 @@ def _install_package(package, index, target, version, mpy, pypi):
return _install_json(package, index, target, version, mpy)
def install(package, index=None, target=None, version=None, mpy=True, pypi=False):
def install(package, index=None, target=None, version=None, mpy=True):
if not target:
for p in sys.path:
if p.endswith("/lib"):
@ -299,7 +166,7 @@ def install(package, index=None, target=None, version=None, mpy=True, pypi=False
if not index:
index = _PACKAGE_INDEX
if _install_package(package, index.rstrip("/"), target, version, mpy, pypi):
if _install_package(package, index.rstrip("/"), target, version, mpy):
print("Done")
else:
print("Package may be partially installed")

Wyświetl plik

@ -0,0 +1,6 @@
metadata(version="0.1.0", description="Optional support for running `micropython -m upip`")
require("argparse")
require("upip")
package("upip")

Wyświetl plik

@ -0,0 +1,41 @@
# MicroPython package installer command line
# MIT license
# Copyright (c) 2023 Jonas Scharpf (brainelectronics)
from argparse import ArgumentParser
import sys
def do_install():
parser = ArgumentParser()
parser.add_argument(
"-t",
"--target",
help="Directory to install package",
)
parser.add_argument(
"-i",
"--index",
help="Python Package Index, defaults to 'https://pypi.org/pypi'",
)
parser.add_argument(
"--version",
help="Specific package version, defaults to latest available",
)
parser.add_argument("package", nargs="+")
args = parser.parse_args(args=sys.argv[2:])
from . import install
for package in args.package:
version = None
if "==" in package:
package, version = package.split("==")
install(package=package, index=args.index, target=args.target, version=version)
if len(sys.argv) >= 2:
if sys.argv[1] == "install":
do_install()
else:
print('upip: Unknown command "{}"'.format(sys.argv[1]))

Wyświetl plik

@ -0,0 +1,6 @@
metadata(version="0.1.0", description="On-device PyPI package installer for network-capable boards")
require("shutil")
require("utarfile")
package("upip", opt=3)

Wyświetl plik

@ -0,0 +1,280 @@
# MicroPython package installer
# MIT license
# Copyright (c) 2023 Jonas Scharpf (brainelectronics)
import gc
import sys
from binascii import hexlify
from hashlib import sha256
from os import mkdir, stat, unlink
from shutil import copyfileobj
import urequests as requests
from utarfile import DIRTYPE, TarFile
from uzlib import DecompIO
_PACKAGE_INDEX = const("https://pypi.org/pypi")
_CHUNK_SIZE = 128
class _Subscriptable:
def __getitem__(self, item) -> None:
return None
_subscriptable = _Subscriptable()
Optional = _subscriptable
Callable = _subscriptable
# This implements os.makedirs(os.dirname(path))
def _ensure_path_exists(path: str) -> None:
split = path.split("/")
# Handle paths starting with "/".
if not split[0]:
split.pop(0)
split[0] = "/" + split[0]
prefix = ""
for i in range(len(split) - 1):
prefix += split[i]
try:
stat(prefix)
except:
mkdir(prefix)
prefix += "/"
# Copy from src (stream) to dest (function-taking-bytes)
def _chunk(src: Callable, dest: Callable) -> None:
buf = memoryview(bytearray(_CHUNK_SIZE))
while True:
n = src.readinto(buf)
if n == 0:
break
dest(buf if n == _CHUNK_SIZE else buf[:n])
def _download_file(url: str, dest: str) -> bool:
response = requests.get(url)
try:
if response.status_code != 200:
print("Error", response.status_code, "requesting", url)
return False
print("Copying:", dest)
_ensure_path_exists(dest)
with open(dest, "wb") as f:
_chunk(response.raw, f.write)
return True
finally:
response.close()
# Check if the hash of the specified path matches the hash.
def _verify_file(path: str, digests_256: str) -> bool:
try:
with open(path, "rb") as f:
hs256 = sha256()
_chunk(f, hs256.update)
existing_hash = str(hexlify(hs256.digest())[: len(digests_256)], "utf-8")
return existing_hash == digests_256
except Exception as e:
print("Failed to verify file {} hash {} due to {}".format(path, digests_256, e))
return False
def _get_package_json(package_json_url: str) -> dict:
package_json = {}
response = requests.get(package_json_url)
try:
if response.status_code != 200:
print("Package not found:", package_json_url)
return package_json
package_json = response.json()
finally:
response.close()
return package_json
def _get_version_info(package_json_url: str, version: str) -> dict:
version_info = {}
package_json = _get_package_json(package_json_url=package_json_url)
if not package_json:
return version_info
if not version:
# get latest version from info property
version = package_json.get("info", {}).get("version", "")
if version not in package_json.get("releases", ()):
print("Version {} not found".format(version))
return version_info
# Use last element to get ".tar.gz" in case ".wheel" is also available
version_info = package_json["releases"][version][-1]
else:
# specific version given, lookup directly in urls property
# Use last element to get ".tar.gz" in case ".wheel" is also available
version_info = package_json["urls"][-1]
return version_info
def _install_tar(
package_json_url: str, index: Optional[str], target: str, version: Optional[str]
) -> bool:
meta = {}
version_info = _get_version_info(package_json_url=package_json_url, version=version)
if not version_info:
return False
package_url = version_info.get("url", "")
package_sha256 = version_info.get("digests", {}).get("sha256", "")
# save some memory, the large dict is no longer required
del version_info
gc.collect()
fs_target_path = target + "/" + package_url.rsplit("/", 1)[1]
if not _download_file(url=package_url, dest=fs_target_path):
print("Failed to download {} to {}".format(package_url, fs_target_path))
return False
if package_sha256:
if not _verify_file(path=fs_target_path, digests_256=package_sha256):
print("Mismatch between calculated and given SHA256 of downloaded file")
return False
else:
print("SHA256 digest not found, downloaded file might be unverified")
try:
gzdict_sz = 16 + 15
sz = gc.mem_free() + gc.mem_alloc()
if sz <= 65536:
gzdict_sz = 16 + 12
zipped_file = open(fs_target_path, "rb")
decompressed_file = DecompIO(zipped_file, gzdict_sz)
tar_file = TarFile(fileobj=decompressed_file)
meta = _install_tar_file(f=tar_file, target=target)
zipped_file.close()
del zipped_file
del decompressed_file
del tar_file
except Exception as e:
print("Failed to decompress downloaded file due to {}".format(e))
return False
# cleanup downloaded file
try:
unlink(fs_target_path)
except Exception as e:
print("Error during cleanup of {}".format(fs_target_path), e)
gc.collect()
deps = meta.get("deps", "").rstrip()
if deps:
return _handle_dependencies(deps=deps, index=index, target=target)
return True
def _handle_dependencies(deps: dict, index: str, target: str) -> bool:
deps = deps.decode("utf-8").split("\n")
print("Install additional deps: {}".format(deps))
results = []
for ele in deps:
res = _install_package(package=ele, index=index, target=target, version=None)
if not res:
print("Package may be partially installed")
results.append(res)
return all(results)
def _install_tar_file(f: TarFile, target: str) -> dict:
meta = {}
for info in f:
if "PaxHeader" in info.name:
continue
print("Processing: {}".format(info))
fname = info.name
try:
fname = fname[fname.index("/") + 1 :]
except ValueError:
fname = ""
save = True
for p in ("setup.", "PKG-INFO", "README"):
if fname.startswith(p) or ".egg-info" in fname:
if fname.endswith("/requires.txt"):
meta["deps"] = f.extractfile(info).read()
save = False
break
if save:
outfname = target + "/" + fname
_ensure_path_exists(outfname)
if info.type != DIRTYPE:
this_file = f.extractfile(info)
copyfileobj(this_file, open(outfname, "wb"))
return meta
def _install_package(
package: str, index: Optional[str], target: Optional[str], version: Optional[str]
) -> bool:
# https://warehouse.pypa.io/api-reference/json.html
this_version = version
if not version:
this_version = "latest"
# /pypi/<project_name>/json
package_url = "{}/{}/json".format(index, package)
else:
# /pypi/<project_name>/<version>/json
package_url = "{}/{}/{}/json".format(index, package, version)
print("Installing {} ({}) from {} to {}".format(package, this_version, index, target))
return _install_tar(package_json_url=package_url, index=index, target=target, version=version)
def install(
package: str,
index: Optional[str] = None,
target: Optional[str] = None,
version: Optional[str] = None,
) -> None:
if not target:
for p in sys.path:
if p.endswith("/lib"):
target = p
break
else:
print("Unable to find lib dir in sys.path")
return
if not index:
index = _PACKAGE_INDEX
if _install_package(package=package, index=index.rstrip("/"), target=target, version=version):
print("Done")
else:
print("Package may be partially installed")