kopia lustrzana https://github.com/cirospaciari/socketify.py
added get_form_urlencoded
rodzic
c2d1de2a02
commit
e754e29c05
|
@ -7,6 +7,8 @@ import inspect
|
|||
import signal
|
||||
from http import cookies
|
||||
from datetime import datetime
|
||||
from urllib.parse import parse_qs, quote_plus, unquote_plus
|
||||
|
||||
ffi = cffi.FFI()
|
||||
ffi.cdef("""
|
||||
|
||||
|
@ -393,7 +395,7 @@ class AppResponse:
|
|||
def set_cookie(self, name, value, options={}):
|
||||
if self._write_jar == None:
|
||||
self._write_jar = cookies.SimpleCookie()
|
||||
self._write_jar[name] = value
|
||||
self._write_jar[name] = quote_plus(value)
|
||||
if isinstance(options, dict):
|
||||
for key in options:
|
||||
if key == "expires" and isinstance(options[key], datetime):
|
||||
|
@ -446,7 +448,27 @@ class AppResponse:
|
|||
self.grab_aborted_handler()
|
||||
return self.loop.run_async(task, self)
|
||||
|
||||
|
||||
async def get_form_urlencoded(self, encoding='utf-8'):
|
||||
data = await self.get_data()
|
||||
try:
|
||||
#decode and unquote all
|
||||
result = {}
|
||||
parsed = parse_qs(b''.join(data), encoding=encoding)
|
||||
has_value = False
|
||||
for key in parsed:
|
||||
has_value = True
|
||||
try:
|
||||
value = parsed[key]
|
||||
new_key = key.decode(encoding)
|
||||
last_value = value[len(value)-1]
|
||||
|
||||
result[new_key] = unquote_plus(last_value.decode(encoding))
|
||||
except Exception as error:
|
||||
pass
|
||||
return result if has_value else None
|
||||
except Exception as error:
|
||||
return None #invalid encoding
|
||||
|
||||
async def get_text(self, encoding='utf-8'):
|
||||
data = await self.get_data()
|
||||
try:
|
||||
|
@ -565,7 +587,7 @@ class AppResponse:
|
|||
lib.uws_res_write_header(self.SSL, self.res, key_data, len(key_data), value_data, len(value_data))
|
||||
return self
|
||||
|
||||
def end_without_body(self, end_connection=False):
|
||||
def end_without_body(self, end_connection=False):
|
||||
if not self.aborted:
|
||||
if self._write_jar != None:
|
||||
self.write_header("Set-Cookie", self._write_jar.output(header=""))
|
||||
|
|
|
@ -19,7 +19,7 @@ async def upload_chunks(res, req):
|
|||
#await all the data, returns received chunks if fail (most likely fail is aborted requests)
|
||||
data = await res.get_data()
|
||||
|
||||
print(f"Got {len(data)} chunks if data!")
|
||||
print(f"Got {len(data)} chunks of data!")
|
||||
for chunk in data:
|
||||
print(f"Got chunk of data with length {len(chunk)}")
|
||||
|
||||
|
@ -47,6 +47,32 @@ async def upload_text(res, req):
|
|||
#We respond when we are done
|
||||
res.end("Thanks for the data!")
|
||||
|
||||
async def upload_urlencoded(res, req):
|
||||
print(f"Posted to {req.get_url()}")
|
||||
#await all the data and decode as application/x-www-form-urlencoded, returns None if fails
|
||||
form = await res.get_form_urlencoded() #first parameter is the encoding (default utf-8)
|
||||
|
||||
print(f"Your form is ${form}")
|
||||
|
||||
#We respond when we are done
|
||||
res.end("Thanks for the data!")
|
||||
|
||||
|
||||
async def upload_multiple(res, req):
|
||||
print(f"Posted to {req.get_url()}")
|
||||
content_type = req.get_header("content-type")
|
||||
#we can check the Content-Type to accept multiple formats
|
||||
if content_type == "application/json":
|
||||
data = await res.get_json()
|
||||
elif content_type == "application/x-www-form-urlencoded":
|
||||
data = await res.get_form_urlencoded()
|
||||
else:
|
||||
data = await res.get_text()
|
||||
|
||||
print(f"Your data is ${data}")
|
||||
|
||||
#We respond when we are done
|
||||
res.end("Thanks for the data!")
|
||||
|
||||
|
||||
app = App()
|
||||
|
@ -54,6 +80,8 @@ app.post("/", upload)
|
|||
app.post("/chunks", upload_chunks)
|
||||
app.post("/json", upload_json)
|
||||
app.post("/text", upload_text)
|
||||
app.post("/urlencoded", upload_urlencoded)
|
||||
app.post("/multiple", upload_multiple)
|
||||
|
||||
app.any("/*", lambda res,_: res.write_status(404).end("Not Found"))
|
||||
app.listen(3000, lambda config: print("Listening on port http://localhost:%d now\n" % config.port))
|
||||
|
|
|
@ -14,4 +14,34 @@
|
|||
|
||||
# application/x-www-form-urlencoded
|
||||
# application/x-www-form-urlencoded
|
||||
# multipart/form-data
|
||||
# multipart/form-data
|
||||
|
||||
|
||||
from socketify import App
|
||||
from datetime import datetime
|
||||
from datetime import timedelta
|
||||
|
||||
async def home(res, req):
|
||||
data = await res.get_form_urlencoded()
|
||||
print(data)
|
||||
res.end(f"DATA! {data}")
|
||||
|
||||
app = App()
|
||||
app.post("/", home)
|
||||
app.listen(3000, lambda config: print("Listening on port http://localhost:%d now\n" % config.port))
|
||||
app.run()
|
||||
|
||||
# from datetime import datetime
|
||||
# raw = "_ga=GA1.1.1871393672.1649875681; affclick=null; __udf_j=d31b9af0d332fec181c1a893320322c0cb33ce95d7bdbd21a4cc4ee66d6d8c23817686b4ba59dd0e015cb95e8196157c"
|
||||
|
||||
# jar = Cookies(None)
|
||||
# jar.set("session_id", "123132", {
|
||||
# "path": "/",
|
||||
# "domain": "*.test.com",
|
||||
# "httponly": True,
|
||||
# "expires": datetime.now()
|
||||
# })
|
||||
# print(jar.output())
|
||||
# jar = cookies.SimpleCookie(raw)
|
||||
# print(jar["_gaasasd"])
|
||||
# print(split_header_words(raw))
|
Ładowanie…
Reference in New Issue