removed legacy stuff (hopefully this will not break anything)

pull/12/head
Langenfeld 2021-11-15 17:39:11 +01:00
rodzic 29e581f7dd
commit d1430acf8a
2 zmienionych plików z 39 dodań i 241 usunięć

Wyświetl plik

@ -318,7 +318,7 @@ class Repository(ApiObject):
def get_commits(self) -> List["Commit"]:
"""Get all the Commits of this Repository."""
try:
results = self.gitea.requests_get_commits(
results = self.gitea.requests_get_paginated(
Repository.REPO_COMMITS % (self.owner.username, self.name)
)
except ConflictException as err:

Wyświetl plik

@ -1,8 +1,9 @@
import json
import logging
from typing import List, Dict
from typing import List, Dict, Union
import requests
from frozendict import frozendict
from httpcache import CachingHTTPAdapter
from .exceptions import NotFoundException, ConflictException, AlreadyExistsException
@ -44,158 +45,72 @@ class Gitea:
return json.loads(result.text)
return {}
def requests_get_paginated(self, endpoint, params ={}, requests=None, sudo=None, page_key: str = "page"):
def requests_get(self, endpoint:str , params= frozendict(), sudo=None):
combined_params = {}
combined_params.update(params)
if sudo:
combined_params["sudo"] = sudo.username
request = self.requests.get(self.__get_url(endpoint), headers=self.headers, params=combined_params)
if request.status_code == 204:
return None
if request.status_code not in [200, 201]:
message = f"Received status code: {request.status_code} ({request.url})"
if request.status_code in [404]:
raise NotFoundException(message)
if request.status_code in [403]:
raise Exception(f"Unauthorized: {request.url} - Check your permissions and try again! ({message})")
if request.status_code in [409]:
raise ConflictException(message)
raise Exception(message)
return self.parse_result(request)
def requests_get_paginated(self, endpoint:str , params = frozendict(), sudo=None, page_key: str = "page"):
page = 1
combined_params = {}
combined_params.update(params)
aggregated_result = []
while True:
combined_params[page_key] = page
result = self.requests_get(endpoint, combined_params, requests, sudo)
result = self.requests_get(endpoint, combined_params, sudo)
aggregated_result.extend(result)
page += 1
if len(result) == 0:
return aggregated_result
def requests_get(self, endpoint, params={}, requests=None, sudo=None):
combined_params = {}
combined_params.update(params)
if sudo:
combined_params["sudo"] = sudo.username
if not requests:
request = self.requests.get(
self.__get_url(endpoint), headers=self.headers, params=combined_params
)
else:
request = requests.get(
self.__get_url(endpoint), headers=self.headers, params=combined_params
)
if request.status_code == 204:
return None
if request.status_code not in [200, 201]:
message = "Received status code: %s (%s)" % (
request.status_code,
request.url,
)
if request.status_code in [404]:
raise NotFoundException(message)
if request.status_code in [403]:
raise Exception(
"Unauthorized: %s - Check your permissions and try again! (%s)"
% (request.url, message)
)
if request.status_code in [409]:
raise ConflictException(message)
raise Exception(message)
return self.parse_result(request)
def requests_get_commits(self, endpoint, page=1, params={}, requests=None):
results = []
page_endpoint = endpoint + f"?page={page}"
if not requests:
request = self.requests.get(
self.__get_url(page_endpoint), headers=self.headers, params=params
)
else:
request = requests.get(
self.__get_url(page_endpoint), headers=self.headers, params=params
)
results += self.parse_result(request)
if request.headers.get("x-hasmore") == "true":
page += 1
results += self.requests_get_commits(endpoint, page)
elif request.status_code not in [200, 201]:
message = "Received status code: %s (%s)" % (
request.status_code,
request.url,
)
if request.status_code in [404]:
raise NotFoundException(message)
if request.status_code in [403]:
raise Exception(
"Unauthorized: %s - Check your permissions and try again! (%s)"
% (request.url, message)
)
if request.status_code in [409]:
raise ConflictException(message)
raise Exception(message)
return results
def requests_put(self, endpoint):
def requests_put(self, endpoint: str):
request = self.requests.put(self.__get_url(endpoint), headers=self.headers)
if request.status_code not in [204]:
message = "Received status code: %s (%s) %s" % (
request.status_code,
request.url,
request.text,
)
message = f"Received status code: {request.status_code} ({request.url}) {request.text}"
self.logger.error(message)
raise Exception(message)
def requests_delete(self, endpoint):
def requests_delete(self, endpoint: str):
request = self.requests.delete(self.__get_url(endpoint), headers=self.headers)
if request.status_code not in [204]:
message = "Received status code: %s (%s)" % (
request.status_code,
request.url,
)
message = f"Received status code: {request.status_code} ({request.url})"
self.logger.error(message)
raise Exception(message)
def requests_post(self, endpoint, data):
""" Post data to API-endpoint.
Args:
endpoint (str): endpoint of API to send data to
data (dict): Data to send.
Returns: (dict)
Parsed JSON-answer from the API.
Throws:
AlreadyExistsException, if 'already exists' in answer
Exception, if status code not ok
"""
request = self.requests.post(
self.__get_url(endpoint), headers=self.headers, data=json.dumps(data)
)
def requests_post(self, endpoint:str , data:dict):
request = self.requests.post(self.__get_url(endpoint), headers=self.headers, data=json.dumps(data))
if request.status_code not in [200, 201, 202]:
if (
"already exists" in request.text
or "e-mail already in use" in request.text
):
if ("already exists" in request.text or "e-mail already in use" in request.text):
self.logger.warning(request.text)
raise AlreadyExistsException()
self.logger.error(
"Received status code: %s (%s)" % (request.status_code, request.url)
)
self.logger.error("With info: %s (%s)" % (data, self.headers))
self.logger.error("Answer: %s" % request.text)
raise Exception(
"Received status code: %s (%s), %s"
% (request.status_code, request.url, request.text)
)
self.logger.error(f"Received status code: {request.status_code} ({ request.url})")
self.logger.error(f"With info: {data} ({self.headers})")
self.logger.error(f"Answer: {request.text}")
raise Exception(f"Received status code: {request.status_code} ({request.url}), {request.text}")
return self.parse_result(request)
def requests_patch(self, endpoint, data):
request = self.requests.patch(
self.__get_url(endpoint), headers=self.headers, data=json.dumps(data)
)
def requests_patch(self, endpoint:str, data:dict):
request = self.requests.patch(self.__get_url(endpoint), headers=self.headers, data=json.dumps(data))
if request.status_code not in [200, 201]:
error_message = "Received status code: %s (%s) %s" % (
request.status_code,
request.url,
data,
)
error_message = f"Received status code: {request.status_code} ({request.url}) {data}"
self.logger.error(error_message)
raise Exception(error_message)
return self.parse_result(request)
def get_users_search(self):
path = "/users/search"
return self.requests_get(path)
def get_orgs_public_members_all(self, orgname):
path = "/orgs/" + orgname + "/public_members"
return self.requests_get(path)
@ -205,123 +120,6 @@ class Gitea:
results = self.requests_get(path)
return [Organization.parse_response(self, result) for result in results]
def post_repos__forks(self, organization, repo, owner):
path = "/repos/" + owner + "/" + repo + "/forks"
return self.requests_post(path, data={"organization": organization})
def get_repos_forks(self, repo, owner):
path = "/repos/" + owner + "/" + repo + "/forks"
return self.requests_get(path)
def put_repos__subscription(self, username, reponame):
path = "/repos/" + username + "/" + reponame + "/subscription"
return self.requests.put(path)
def delete_repos_subscription(self, username, reponame):
path = "/repos/" + username + "/" + reponame + "/subscription"
return self.requests.delete(path)
def get_repos_subscription(self, username, reponame):
path = "/repos/" + username + "/" + reponame + "/subscription"
return self.requests_get(path)
def get_users_following(self, username):
path = "/users/" + username + "/following"
return self.requests_get(path)
def get_users_starred(self, username):
path = "/users/" + username + "/starred"
return self.requests_get(path)
def put_orgs_public_members(self, username, orgname):
path = "/orgs/" + orgname + "/public_members/" + username
return self.requests.put(path)
def delete_orgs_public_members(self, username, orgname):
path = "/orgs/" + orgname + "/public_members/" + username
return self.requests.delete(path)
def get_orgs_public_members(self, username, orgname):
path = "/orgs/" + orgname + "/public_members/" + username
return self.requests_get(path)
def post_org_repos(
self, name, description, private, auto_init, gitignores, license, readme, org
):
path = "/org/" + org + "/repos"
return self.requests_post(
path,
data={
"name": name,
"description": description,
"private": private,
"auto_init": auto_init,
"gitignores": gitignores,
"license": license,
"readme": readme,
},
)
def delete_orgs_members(self, orgname, username):
path = "/orgs/" + orgname + "/members/" + username
return self.requests.delete(path)
def post_repos__hooks(self, type, config, events, active, reponame, username):
path = "/repos/" + username + "/" + reponame + "/hooks"
return self.requests_post(
path,
data={"type": type, "config": config, "events": events, "active": active},
)
def get_repos_hooks(self, reponame, username):
path = "/repos/" + username + "/" + reponame + "/hooks"
return self.requests_get(path)
def post_repos_migrate(
self,
clone_addr,
auth_username,
auth_password,
uid,
repo_name,
mirror,
private,
description,
):
path = "/repos/migrate"
return self.requests_post(
path,
data={
"clone_addr": clone_addr,
"auth_username": auth_username,
"auth_password": auth_password,
"uid": uid,
"repo_name": repo_name,
"mirror": mirror,
"private": private,
"description": description,
},
)
def post_user_repos(
self, name, description, private, auto_init, gitignores, license, readme
):
path = "/user/repos"
return self.requests_post(
path,
data={
"name": name,
"description": description,
"private": private,
"auto_init": auto_init,
"gitignores": gitignores,
"license": license,
"readme": readme,
},
)
# # #
def get_user(self):
result = self.requests_get(Gitea.GET_USER)
return User.parse_response(self, result)
@ -393,7 +191,7 @@ class Gitea:
def create_repo(
self,
repoOwner,
repoOwner: Union[User,Organization],
repoName: str,
description: str = "",
private: bool = False,