socketify.py/examples/upload_or_post.py

96 wiersze
2.6 KiB
Python
Czysty Zwykły widok Historia

from socketify import App
###
2022-11-16 19:28:46 +00:00
# We always recomend check res.aborted in async operations
###
2022-11-16 19:28:46 +00:00
def upload(res, req):
print(f"Posted to {req.get_url()}")
def on_data(res, chunk, is_end):
print(f"Got chunk of data with length {len(chunk)}, is_end: {is_end}")
2022-11-16 19:28:46 +00:00
if is_end:
2022-10-27 12:51:15 +00:00
res.cork_end("Thanks for the data!")
res.on_data(on_data)
2022-11-16 19:28:46 +00:00
async def upload_chunks(res, req):
print(f"Posted to {req.get_url()}")
2022-11-16 19:28:46 +00:00
# await all the data, returns received chunks if fail (most likely fail is aborted requests)
data = await res.get_data()
2022-11-16 19:28:46 +00:00
2022-11-30 11:56:27 +00:00
print(f"Got {len(data.getvalue())} bytes of data!")
2022-11-16 19:28:46 +00:00
# We respond when we are done
2022-10-27 12:51:15 +00:00
res.cork_end("Thanks for the data!")
2022-11-16 19:28:46 +00:00
async def upload_json(res, req):
print(f"Posted to {req.get_url()}")
2022-11-16 19:28:46 +00:00
# await all the data and parses as json, returns None if fail
info = await res.get_json()
2022-11-16 19:28:46 +00:00
print(info)
2022-11-16 19:28:46 +00:00
# We respond when we are done
2022-10-27 12:51:15 +00:00
res.cork_end("Thanks for the data!")
2022-11-16 19:28:46 +00:00
async def upload_text(res, req):
print(f"Posted to {req.get_url()}")
2022-11-16 19:28:46 +00:00
# await all the data and decode as text, returns None if fail
text = await res.get_text() # first parameter is the encoding (default utf-8)
print(f"Your text is {text}")
2022-11-16 19:28:46 +00:00
# We respond when we are done
2022-10-27 12:51:15 +00:00
res.cork_end("Thanks for the data!")
2022-11-16 19:28:46 +00:00
2022-06-03 23:01:56 +00:00
async def upload_urlencoded(res, req):
print(f"Posted to {req.get_url()}")
2022-11-16 19:28:46 +00:00
# await all the data and decode as application/x-www-form-urlencoded, returns None if fails
form = (
await res.get_form_urlencoded()
) # first parameter is the encoding (default utf-8)
print(f"Your form is {form}")
2022-11-16 19:28:46 +00:00
# We respond when we are done
2022-10-27 12:51:15 +00:00
res.cork_end("Thanks for the data!")
2022-06-03 23:01:56 +00:00
2022-11-16 19:28:46 +00:00
2022-06-03 23:01:56 +00:00
async def upload_multiple(res, req):
print(f"Posted to {req.get_url()}")
content_type = req.get_header("content-type")
2022-11-16 19:28:46 +00:00
# we can check the Content-Type to accept multiple formats
2022-06-03 23:01:56 +00:00
if content_type == "application/json":
data = await res.get_json()
elif content_type == "application/x-www-form-urlencoded":
data = await res.get_form_urlencoded()
else:
data = await res.get_text()
2022-11-16 19:28:46 +00:00
print(f"Your data is {data}")
2022-11-16 19:28:46 +00:00
# We respond when we are done
2022-10-27 12:51:15 +00:00
res.cork_end("Thanks for the data!")
app = App()
app.post("/", upload)
app.post("/chunks", upload_chunks)
app.post("/json", upload_json)
app.post("/text", upload_text)
2022-06-03 23:01:56 +00:00
app.post("/urlencoded", upload_urlencoded)
app.post("/multiple", upload_multiple)
2022-11-16 19:28:46 +00:00
app.any("/*", lambda res, _: res.write_status(404).end("Not Found"))
app.listen(
3000,
lambda config: print("Listening on port http://localhost:%d now\n" % config.port),
)
app.run()