micropython-lib/urequests/urequests.py

97 wiersze
2.3 KiB
Python
Czysty Zwykły widok Historia

import usocket
class Response:
def __init__(self, f):
self.raw = f
self.encoding = "utf-8"
self._cached = None
def close(self):
if self.raw:
self.raw.close()
self.raw = None
self._cached = None
@property
def content(self):
if self._cached is None:
self._cached = self.raw.read()
self.raw.close()
self.raw = None
return self._cached
@property
def text(self):
return str(self.content, self.encoding)
def json(self):
import ujson
return ujson.loads(self.content)
def request(method, url, data=None, json=None, headers={}, stream=None):
try:
proto, dummy, host, path = url.split("/", 3)
except ValueError:
proto, dummy, host = url.split("/", 2)
path = ""
if proto != "http:":
raise ValueError("Unsupported protocol: " + proto)
ai = usocket.getaddrinfo(host, 80)
addr = ai[0][4]
s = usocket.socket()
s.connect(addr)
s.send(b"%s /%s HTTP/1.0\r\n" % (method, path))
if not "Host" in headers:
s.send(b"Host: %s\r\n" % host)
if json is not None:
assert data is None
import ujson
data = ujson.dumps(json)
if data:
s.send(b"Content-Length: %d\r\n" % len(data))
s.send(b"\r\n")
if data:
s.send(data)
l = s.readline()
protover, status, msg = l.split(None, 2)
status = int(status)
#print(protover, status, msg)
while True:
l = s.readline()
if not l or l == b"\r\n":
break
#print(line)
if l.startswith(b"Transfer-Encoding:"):
if b"chunked" in line:
raise ValueError("Unsupported " + l)
elif l.startswith(b"Location:"):
raise NotImplementedError("Redirects not yet supported")
resp = Response(s)
resp.status_code = status
resp.reason = msg.rstrip()
return resp
def head(url, **kw):
return request("HEAD", url, **kw)
def get(url, **kw):
return request("GET", url, **kw)
def post(url, **kw):
return request("POST", url, **kw)
def put(url, **kw):
return request("PUT", url, **kw)
def patch(url, **kw):
return request("PATCH", url, **kw)
def delete(url, **kw):
return request("DELETE", url, **kw)