ssl: Restructure micropython SSL interface to a new tls module.

MicroPython now supplies SSL/TLS functionality in a new built-in `tls`
module.  The `ssl` module is now implemented purely in Python, in this
repository.  Other libraries are updated to work with this scheme.

Signed-off-by: Felix Dörre <felix@dogcraft.de>
pull/793/head
Felix Dörre 2024-02-01 12:36:19 +00:00 zatwierdzone przez Damien George
rodzic 803452a1ac
commit 35d41dbb0e
9 zmienionych plików z 81 dodań i 44 usunięć

Wyświetl plik

@ -1,10 +1,11 @@
metadata( metadata(
version="0.1.0", version="0.2.0",
description="Common networking packages for all network-capable deployments of MicroPython.", description="Common networking packages for all network-capable deployments of MicroPython.",
) )
require("mip") require("mip")
require("ntptime") require("ntptime")
require("ssl")
require("requests") require("requests")
require("webrepl") require("webrepl")

Wyświetl plik

@ -1,4 +1,4 @@
metadata(description="Lightweight MQTT client for MicroPython.", version="1.3.4") metadata(description="Lightweight MQTT client for MicroPython.", version="1.4.0")
# Originally written by Paul Sokolovsky. # Originally written by Paul Sokolovsky.

Wyświetl plik

@ -16,8 +16,7 @@ class MQTTClient:
user=None, user=None,
password=None, password=None,
keepalive=0, keepalive=0,
ssl=False, ssl=None,
ssl_params={},
): ):
if port == 0: if port == 0:
port = 8883 if ssl else 1883 port = 8883 if ssl else 1883
@ -26,7 +25,6 @@ class MQTTClient:
self.server = server self.server = server
self.port = port self.port = port
self.ssl = ssl self.ssl = ssl
self.ssl_params = ssl_params
self.pid = 0 self.pid = 0
self.cb = None self.cb = None
self.user = user self.user = user
@ -67,9 +65,7 @@ class MQTTClient:
addr = socket.getaddrinfo(self.server, self.port)[0][-1] addr = socket.getaddrinfo(self.server, self.port)[0][-1]
self.sock.connect(addr) self.sock.connect(addr)
if self.ssl: if self.ssl:
import ussl self.sock = self.ssl.wrap_socket(self.sock, server_hostname=self.server)
self.sock = ussl.wrap_socket(self.sock, **self.ssl_params)
premsg = bytearray(b"\x10\0\0\0\0\0") premsg = bytearray(b"\x10\0\0\0\0\0")
msg = bytearray(b"\x04MQTT\x04\x02\0\0") msg = bytearray(b"\x04MQTT\x04\x02\0\0")

Wyświetl plik

@ -1,4 +1,4 @@
metadata(version="0.6.0") metadata(version="0.7.0")
# Originally written by Paul Sokolovsky. # Originally written by Paul Sokolovsky.

Wyświetl plik

@ -12,7 +12,7 @@ def urlopen(url, data=None, method="GET"):
if proto == "http:": if proto == "http:":
port = 80 port = 80
elif proto == "https:": elif proto == "https:":
import ussl import tls
port = 443 port = 443
else: else:
@ -29,7 +29,9 @@ def urlopen(url, data=None, method="GET"):
try: try:
s.connect(ai[-1]) s.connect(ai[-1])
if proto == "https:": if proto == "https:":
s = ussl.wrap_socket(s, server_hostname=host) context = tls.SSLContext(tls.PROTOCOL_TLS_CLIENT)
context.verify_mode = tls.CERT_NONE
s = context.wrap_socket(s, server_hostname=host)
s.write(method) s.write(method)
s.write(b" /") s.write(b" /")

Wyświetl plik

@ -1,3 +1,3 @@
metadata(version="0.8.1", pypi="requests") metadata(version="0.9.0", pypi="requests")
package("requests") package("requests")

Wyświetl plik

@ -63,7 +63,7 @@ def request(
if proto == "http:": if proto == "http:":
port = 80 port = 80
elif proto == "https:": elif proto == "https:":
import ussl import tls
port = 443 port = 443
else: else:
@ -90,7 +90,9 @@ def request(
try: try:
s.connect(ai[-1]) s.connect(ai[-1])
if proto == "https:": if proto == "https:":
s = ussl.wrap_socket(s, server_hostname=host) context = tls.SSLContext(tls.PROTOCOL_TLS_CLIENT)
context.verify_mode = tls.CERT_NONE
s = context.wrap_socket(s, server_hostname=host)
s.write(b"%s /%s HTTP/1.0\r\n" % (method, path)) s.write(b"%s /%s HTTP/1.0\r\n" % (method, path))
if "Host" not in headers: if "Host" not in headers:
s.write(b"Host: %s\r\n" % host) s.write(b"Host: %s\r\n" % host)

Wyświetl plik

@ -1,3 +1,3 @@
metadata(version="0.1.0") metadata(version="0.2.0")
module("ssl.py") module("ssl.py", opt=3)

Wyświetl plik

@ -1,36 +1,72 @@
from ussl import * import tls
import ussl as _ussl from tls import (
CERT_NONE,
CERT_OPTIONAL,
CERT_REQUIRED,
MBEDTLS_VERSION,
PROTOCOL_TLS_CLIENT,
PROTOCOL_TLS_SERVER,
)
# Constants
for sym in "CERT_NONE", "CERT_OPTIONAL", "CERT_REQUIRED": class SSLContext:
if sym not in globals(): def __init__(self, *args):
globals()[sym] = object() self._context = tls.SSLContext(*args)
self._context.verify_mode = CERT_NONE
@property
def verify_mode(self):
return self._context.verify_mode
@verify_mode.setter
def verify_mode(self, val):
self._context.verify_mode = val
def load_cert_chain(self, certfile, keyfile):
if isinstance(certfile, str):
with open(certfile, "rb") as f:
certfile = f.read()
if isinstance(keyfile, str):
with open(keyfile, "rb") as f:
keyfile = f.read()
self._context.load_cert_chain(certfile, keyfile)
def load_verify_locations(self, cafile=None, cadata=None):
if cafile:
with open(cafile, "rb") as f:
cadata = f.read()
self._context.load_verify_locations(cadata)
def wrap_socket(
self, sock, server_side=False, do_handshake_on_connect=True, server_hostname=None
):
return self._context.wrap_socket(
sock,
server_side=server_side,
do_handshake_on_connect=do_handshake_on_connect,
server_hostname=server_hostname,
)
def wrap_socket( def wrap_socket(
sock, sock,
keyfile=None,
certfile=None,
server_side=False, server_side=False,
key=None,
cert=None,
cert_reqs=CERT_NONE, cert_reqs=CERT_NONE,
*, cadata=None,
ca_certs=None, server_hostname=None,
server_hostname=None do_handshake=True,
): ):
# TODO: More arguments accepted by CPython could also be handled here. con = SSLContext(PROTOCOL_TLS_SERVER if server_side else PROTOCOL_TLS_CLIENT)
# That would allow us to accept ca_certs as a positional argument, which if cert or key:
# we should. con.load_cert_chain(cert, key)
kw = {} if cadata:
if keyfile is not None: con.load_verify_locations(cadata=cadata)
kw["keyfile"] = keyfile con.verify_mode = cert_reqs
if certfile is not None: return con.wrap_socket(
kw["certfile"] = certfile sock,
if server_side is not False: server_side=server_side,
kw["server_side"] = server_side do_handshake_on_connect=do_handshake,
if cert_reqs is not CERT_NONE: server_hostname=server_hostname,
kw["cert_reqs"] = cert_reqs )
if ca_certs is not None:
kw["ca_certs"] = ca_certs
if server_hostname is not None:
kw["server_hostname"] = server_hostname
return _ussl.wrap_socket(sock, **kw)