Improved ddb exception handling

pull/1122/head
Luca Di Leo 2022-01-13 04:08:12 -08:00
rodzic b6b29b913b
commit efe860e72c
1 zmienionych plików z 101 dodań i 77 usunięć

Wyświetl plik

@ -1,7 +1,6 @@
import requests import requests
from os import path from os import path
from app.plugins import logger from app.plugins import logger
import validators
from urllib.parse import urlparse from urllib.parse import urlparse
VALID_IMAGE_EXTENSIONS = ['.tiff', '.tif', '.png', '.jpeg', '.jpg'] VALID_IMAGE_EXTENSIONS = ['.tiff', '.tif', '.png', '.jpeg', '.jpg']
@ -10,7 +9,7 @@ class DroneDB:
def __init__(self, registry_url, username, password): def __init__(self, registry_url, username, password):
if not validators.url(registry_url): if not self.validate_url(registry_url):
raise ValueError("Invalid registry URL.") raise ValueError("Invalid registry URL.")
self.username = username self.username = username
@ -27,109 +26,134 @@ class DroneDB:
self.__get_files_list_url = self.__registry_url + "/orgs/{0}/ds/{1}/list" self.__get_files_list_url = self.__registry_url + "/orgs/{0}/ds/{1}/list"
self.__download_file_url = self.__registry_url + "/orgs/{0}/ds/{1}/download?path={2}&inline=1" self.__download_file_url = self.__registry_url + "/orgs/{0}/ds/{1}/download?path={2}&inline=1"
# Validate url
def validate_url(self, url):
try:
result = urlparse(url)
return all([result.scheme, result.netloc])
except:
return False
def login(self): def login(self):
if (self.public): if (self.public):
logger.info("No need to login to DroneDB.") logger.info("No need to login to DroneDB.")
return True return True
# Authenticate try:
payload = {'username': self.username, 'password': self.password}
response = requests.post(self.__authenticate_url, data=payload)
if response.status_code != 200: # Authenticate
return False payload = {'username': self.username, 'password': self.password}
response = requests.post(self.__authenticate_url, data=payload)
# Get the token if response.status_code != 200:
self.token = response.json()['token'] return False
logger.info("Logged in to DroneDB as user " + self.username + ".") # Get the token
self.token = response.json()['token']
return True logger.info("Logged in to DroneDB as user " + self.username + ".")
def get_organizations(self): return True
except(Exception) as e:
logger.error(e)
return False
def wrapped_call(self, type, url, data=None, params=None, attempts=3):
headers = {} headers = {}
if not self.public and self.token is None and not self.login():
raise ValueError("Could not authenticate to DroneDB.")
if self.token is not None: cnt = attempts
headers = {'Authorization': 'Bearer ' + self.token }
# Get the organizations
response = requests.get(self.__get_organizations_url, headers=headers)
if response.status_code != 200:
raise Exception("Failed to get organizations.")
return [{'slug': o['slug'], 'name': o['name']} for o in response.json()] while True:
if not self.public and self.token is None and not self.login():
raise ValueError("Could not authenticate to DroneDB.")
if self.token is not None:
headers = {'Authorization': 'Bearer ' + self.token }
response = requests.request(type, url, data=data, params=params, headers=headers)
if response.status_code == 200:
return response
if response.status_code == 401:
if (self.public):
raise Exception("Failed to call '" + url + "'.")
if not self.login():
raise Exception("Failed to re-authenticate to DroneDB, cannot call '" + url + "'.")
else:
cnt -= 1
if cnt == 0:
raise Exception("Failed all attempts to re-authenticate to DroneDB, cannot call '" + url + "'.")
else:
raise Exception("Failed to call '" + url + "'.")
def get_organizations(self):
try:
response = self.wrapped_call('GET', self.__get_organizations_url)
return [{'slug': o['slug'], 'name': o['name']} for o in response.json()]
except Exception as e:
raise Exception("Failed to get organizations.") from e
def get_datasets(self, orgSlug): def get_datasets(self, orgSlug):
headers = {} try:
if not self.public and self.token is None and not self.login(): response = self.wrapped_call('GET', self.__get_datasets_url.format(orgSlug))
raise ValueError("Could not authenticate to DroneDB.")
return [
{'slug': o['slug'],
'name': o['name'],
'public': o['properties'].get('public'),
'size': o['size'],
'entries': o['properties'].get('entries')
} for o in response.json()]
if self.token is not None: except Exception as e:
headers = {'Authorization': 'Bearer ' + self.token } raise Exception("Failed to get datasets.") from e
# Get the datasets
response = requests.get(self.__get_datasets_url.format(orgSlug), headers=headers)
if response.status_code != 200:
raise Exception("Failed to get datasets.")
return [
{'slug': o['slug'],
'name': o['name'],
'public': o['properties'].get('public'),
'size': o['size'],
'entries': o['properties'].get('entries')
} for o in response.json()]
def get_folders(self, orgSlug, dsSlug): def get_folders(self, orgSlug, dsSlug):
headers = {} try:
# Type 1 is folder
payload = {'query': '*', 'recursive': True, 'type': 1}
if not self.public and self.token is None and not self.login(): response = self.wrapped_call('POST', self.__get_folders_url.format(orgSlug, dsSlug), data=payload)
raise ValueError("Could not authenticate to DroneDB.")
if self.token is not None:
headers = {'Authorization': 'Bearer ' + self.token }
# Type 1 is folder return [o['path'] for o in response.json()]
payload = {'query': '*', 'recursive': True, 'type': 1}
except Exception as e:
# Get the folders raise Exception("Failed to get folders.") from e
response = requests.post(self.__get_folders_url.format(orgSlug, dsSlug), data=payload, headers=headers)
if response.status_code != 200:
raise Exception("Failed to get folders.")
return [o['path'] for o in response.json()]
def get_files_list(self, orgSlug, dsSlug, folder=None): def get_files_list(self, orgSlug, dsSlug, folder=None):
headers = {} try:
# Type 1 is folder
params = {'path': '' if folder is None else folder}
if not self.public and self.token is None and not self.login(): # Get the folders
raise ValueError("Could not authenticate to DroneDB.") response = self.wrapped_call('GET', self.__get_files_list_url.format(orgSlug, dsSlug), params=params)
if self.token is not None:
headers = {'Authorization': 'Bearer ' + self.token }
# Type 1 is folder
params = {'path': '' if folder is None else folder}
# Get the folders return [{'path': o['path'],
response = requests.get(self.__get_files_list_url.format(orgSlug, dsSlug), params=params, headers=headers) 'type': o['type'],
if response.status_code != 200: 'size': o['size'],
raise Exception("Failed to get files list.") 'url': self.__download_file_url.format(orgSlug, dsSlug, o['path'])
} for o in response.json()]
return [{'path': o['path'],
'type': o['type'], except Exception as e:
'size': o['size'], raise Exception("Failed to get files list.") from e
'url': self.__download_file_url.format(orgSlug, dsSlug, o['path'])
} for o in response.json()]
def verify_url(url, username=None, password=None): def verify_url(url, username=None, password=None):
try: try:
@ -184,7 +208,7 @@ def parse_url(url):
'dsSlug': segments[2 + offset], 'dsSlug': segments[2 + offset],
'folder': '/'.join(segments[3 + offset:]) 'folder': '/'.join(segments[3 + offset:])
} }
# def verify_folder_url(self, folder_url): # def verify_folder_url(self, folder_url):
# try: # try:
# # Parse the url and get all necessary information # # Parse the url and get all necessary information