little-boxes/little_boxes/urlutils.py

67 wiersze
1.7 KiB
Python
Czysty Zwykły widok Historia

2018-06-15 22:27:49 +00:00
import ipaddress
2018-06-11 20:50:02 +00:00
import logging
import socket
2018-06-22 21:46:38 +00:00
from typing import Dict
2018-06-11 20:50:02 +00:00
from urllib.parse import urlparse
2018-07-08 21:02:15 +00:00
from .errors import Error
2018-06-24 17:48:07 +00:00
from .errors import ServerError
2018-06-11 20:50:02 +00:00
logger = logging.getLogger(__name__)
2018-06-22 21:46:38 +00:00
_CACHE: Dict[str, bool] = {}
2018-06-24 17:48:07 +00:00
class InvalidURLError(ServerError):
2018-06-11 20:50:02 +00:00
pass
2018-07-08 21:02:15 +00:00
class URLLookupFailedError(Error):
pass
2018-06-24 09:33:14 +00:00
def is_url_valid(url: str, debug: bool = False) -> bool:
2018-06-11 20:50:02 +00:00
parsed = urlparse(url)
2018-06-12 17:57:40 +00:00
if parsed.scheme not in ["http", "https"]:
2018-06-11 20:50:02 +00:00
return False
# XXX in debug mode, we want to allow requests to localhost to test the federation with local instances
2018-06-24 09:33:14 +00:00
if debug: # pragma: no cover
2018-06-11 20:50:02 +00:00
return True
2018-06-12 17:57:40 +00:00
if parsed.hostname in ["localhost"]:
2018-06-11 20:50:02 +00:00
return False
2018-06-22 21:46:38 +00:00
if _CACHE.get(parsed.hostname, False):
return True
2018-06-11 20:50:02 +00:00
try:
2018-06-15 21:53:25 +00:00
ip_address = ipaddress.ip_address(parsed.hostname)
except ValueError:
try:
ip_address = socket.getaddrinfo(parsed.hostname, parsed.port or 80)[0][4][0]
logger.debug(f"dns lookup: {parsed.hostname} -> {ip_address}")
except socket.gaierror:
logger.exception(f"failed to lookup url {url}")
2018-06-22 21:46:38 +00:00
_CACHE[parsed.hostname] = False
2018-07-08 21:02:15 +00:00
raise URLLookupFailedError(f"failed to lookup url {url}")
2018-06-15 21:53:25 +00:00
logger.debug(f"{ip_address}")
2018-06-11 20:50:02 +00:00
if ipaddress.ip_address(ip_address).is_private:
2018-06-12 17:57:40 +00:00
logger.info(f"rejecting private URL {url}")
2018-06-22 21:46:38 +00:00
_CACHE[parsed.hostname] = False
2018-06-11 20:50:02 +00:00
return False
2018-06-22 21:46:38 +00:00
_CACHE[parsed.hostname] = True
2018-06-11 20:50:02 +00:00
return True
2018-06-24 09:33:14 +00:00
def check_url(url: str, debug: bool = False) -> None:
logger.debug(f"check_url {url} debug={debug}")
if not is_url_valid(url, debug=debug):
2018-06-11 20:50:02 +00:00
raise InvalidURLError(f'"{url}" is invalid')
return None