Update get_query, mitigate segfault

ok now it's working. it will fallback to the old function. so it works on all scenarions
pull/206/head
Arthô 2024-11-08 17:15:11 -03:00 zatwierdzone przez GitHub
rodzic b224a1634a
commit 83c181e8b8
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: B5690EEEBB952194
1 zmienionych plików z 43 dodań i 8 usunięć

Wyświetl plik

@ -2492,15 +2492,50 @@ class AppRequest:
if self._query is None:
# Cache the query parameters
buffer = ffi.new("char**")
query_string_length = lib.uws_req_get_query_string(self.req, buffer)
if query_string_length > 0:
buffer_address = ffi.addressof(buffer, 0)[0]
query_string = ffi.unpack(buffer_address, query_string_length).decode("utf-8")
from urllib.parse import parse_qs
self._query = parse_qs(query_string)
else:
try:
# Attempt to use uws_req_get_query_string
query_string_length = lib.uws_req_get_query_string(self.req, buffer)
if query_string_length > 0:
buffer_address = ffi.addressof(buffer, 0)[0]
query_string = ffi.unpack(buffer_address, query_string_length).decode("utf-8")
from urllib.parse import parse_qs
self._query = parse_qs(query_string)
else:
self._query = {}
except AttributeError:
# uws_req_get_query_string is not available, fall back to the original method
self._query = {}
return self._query.get(key, None)
return self._get_query_fallback(key)
except Exception as e:
# Handle other exceptions gracefully
logging.error(f"Exception in get_query: {e}")
self._query = {}
return None
# Retrieve the parameter from the cached query dict
values = self._query.get(key, None)
# If values is a list (from parse_qs), return the first value
if isinstance(values, list) and len(values) > 0:
return values[0]
return None
def _get_query_fallback(self, key):
# Original get_query implementation
buffer = ffi.new("char**")
if isinstance(key, str):
key_data = key.encode("utf-8")
elif isinstance(key, bytes):
key_data = key
else:
key_data = self.app._json_serializer.dumps(key).encode("utf-8")
length = lib.uws_req_get_query(self.req, key_data, len(key_data), buffer)
buffer_address = ffi.addressof(buffer, 0)[0]
if buffer_address == ffi.NULL:
return None
try:
return ffi.unpack(buffer_address, length).decode("utf-8")
except Exception: # invalid utf-8
return None
def get_parameters(self):
if self._params: