py-gitea/gitea/gitea.py

1061 wiersze
35 KiB
Python

import json
import requests
import logging
from datetime import datetime
logging = logging.getLogger("gitea")
version = "0.4.6"
class AlreadyExistsException(Exception):
pass
class NotFoundException(Exception):
pass
class ObjectIsInvalid(Exception):
pass
class GiteaApiObject:
GET_API_OBJECT = "FORMAT/STINING/{argument}"
PATCH_API_OBJECT = "FORMAT/STINING/{argument}"
def __init__(self, gitea, id: int):
self.__id = id
self.gitea = gitea
self.deleted = False # set if .delete was called, so that an exception is risen
self.dirty_fields = set()
def __eq__(self, other):
return other.id == self.id if isinstance(other, type(self)) else False
def __str__(self):
return "GiteaAPIObject (%s) id: %s"%(type(self),self.id)
def __hash__(self):
return self.id
fields_to_parsers = {}
def commit(self):
raise NotImplemented()
def get_dirty_fields(self):
return {name: getattr(self,name) for name in self.dirty_fields}
@classmethod
def request(cls, gitea, id):
"""Use for ginving a nice e.g. 'request(gita, orgname, repo, ticket)'.
All args are put into an args tuple for passing around"""
return cls._request(gitea, {"id": id})
@classmethod
def _request(cls, gitea, args):
result = cls._get_gitea_api_object(gitea, args)
api_object = cls.parse_request(gitea, result)
for key, value in args.items(): # hack: not all necessary request args in api result (e.g. repo name in issue)
if not hasattr(api_object, key):
setattr(api_object, key, value)
return api_object
@classmethod
def parse_request(cls, gitea, result):
id = int(result["id"])
logging.debug("Found api object of type %s (id: %s)" % (type(cls), id))
api_object = cls(gitea, id=id)
cls._initialize(gitea, api_object, result)
return api_object
@classmethod
def _get_gitea_api_object(cls, gitea, args):
"""Retrieving an object always as GET_API_OBJECT """
return gitea.requests_get(cls.GET_API_OBJECT.format(**args))
patchable_fields = set()
@classmethod
def _initialize(cls, gitea, api_object, result):
for name, value in result.items():
if name in cls.fields_to_parsers and value is not None:
parse_func = cls.fields_to_parsers[name]
value = parse_func(gitea, value)
if name in cls.patchable_fields:
prop = property(
(lambda name: lambda self: self.__get_var(name))(name),
(lambda name: lambda self, v: self.__set_var(name, v))(name))
else:
prop = property(
(lambda name: lambda self: self.__get_var(name))(name))
setattr(cls, name, prop)
setattr(api_object, "_"+name, value)
def __set_var(self,name,i):
self.dirty_fields.add(name)
setattr(self,"_"+name,i)
def __get_var(self,name):
if self.deleted:
raise ObjectIsInvalid()
return getattr(self,"_"+name)
class Organization(GiteaApiObject):
GET_API_OBJECT = """/orgs/{name}""" # <org>
PATCH_API_OBJECT = """/orgs/{name}""" # <org>
ORG_REPOS_REQUEST = """/orgs/%s/repos""" # <org>
ORG_TEAMS_REQUEST = """/orgs/%s/teams""" # <org>
ORG_TEAMS_CREATE = """/orgs/%s/teams""" # <org>
ORG_GET_MEMBERS = """/orgs/%s/members""" # <org>
ORG_DELETE = """/orgs/%s""" # <org>
def __init__(self, gitea, id: int):
super(Organization, self).__init__(gitea, id=id)
@classmethod
def request(cls, gitea, name):
return cls._request(gitea, {"name": name})
patchable_fields = {"description", "full_name", "location", "visibility", "website"}
def commit(self):
values = self.get_dirty_fields()
args = {"name": self.name}
self.gitea.requests_patch(Organization.PATCH_API_OBJECT.format(**args), data=values)
self.dirty_fields = {}
# oldstuff
def get_repositories(self):
""" Get the Repositories of this Organization.
Returns: [Repository]
A list of Repositories this Organization is hosting.
"""
results = self.gitea.requests_get(
Organization.ORG_REPOS_REQUEST % self.username
)
return [
Repository.parse_request(self.gitea, result)
for result in results
]
def get_teams(self):
""" Get the Teams in this Organization
Returns: [Team]
A list of Teams in this Organization.
"""
results = self.gitea.requests_get(
Organization.ORG_TEAMS_REQUEST % self.username
)
return [Team.parse_request(self.gitea, result) for result in results]
def get_team_by_name(self, name):
teams = self.get_teams()
for team in teams:
if team.name == name:
return team
raise NotFoundException()
def get_members(self):
""" Get all members of this Organization
Returns: [User]
A list of Users who are members of this Organization.
"""
results = self.gitea.requests_get(Organization.ORG_GET_MEMBERS % self.username)
return [User.parse_request(self.gitea, result) for result in results]
def remove_member(self, username):
if isinstance(username, User):
username = username.username
path = "/orgs/" + self.username + "/members/" + username
self.gitea.requests_delete(path)
def delete(self):
""" Delete this Organization. Invalidates this Objects data.
Also deletes all Repositories and Teams associated with this
Organization.
"""
# TODO: Delete Repos, Teams, Users (except authenticated user)
for repo in self.get_repositories():
repo.delete()
self.gitea.requests_delete(Organization.ORG_DELETE % self.username)
class User(GiteaApiObject):
GET_API_OBJECT = """/users/{name}""" # <org>
USER_MAIL = """/user/emails?sudo=%s""" # <name>
USER_REPOS_REQUEST = """/users/%s/repos""" # <org>
USER_PATCH = """/admin/users/%s""" # <username>
ADMIN_DELETE_USER = """/admin/users/%s""" # <username>
USER_HEATMAP = """/users/%s/heatmap""" # <username>
def __init__(self, gitea, id: int):
super(User, self).__init__(gitea, id=id)
@classmethod
def request(cls, gitea, name):
api_object = cls._request(gitea, {"name": name})
api_object.update_mail()
return api_object
patchable_fields = {"active", "admin", "allow_create_organization", "allow_git_hook", "allow_import_local",
"email", "full_name", "location", "login_name", "max_repo_creation", "must_change_password",
"password", "prohibit_login", "source_id", "website"}
def commit(self):
values = self.get_dirty_fields()
args = {"name": self.name, "email": self.email}
self.gitea.requests_patch(Organization.PATCH_API_OBJECT.format(**args), data=values)
self.dirty_fields = {}
def get_repositories(self):
""" Get all Repositories owned by this User.
Returns: [Repository]
A list of Repositories this user owns.
"""
results = self.gitea.requests_get(User.USER_REPOS_REQUEST % self.username)
return [Repository.parse_request(self.gitea, result) for result in results]
def update_mail(self):
""" Update the mail of this user instance to one that is \
actually correct.
"""
prev = self._email
result = self.gitea.requests_get(User.USER_MAIL % self.login)
for mail in result:
if mail["primary"]:
self._email = mail["email"]
break
logging.info(
"User %s updated Mail: <%s> to <%s>" % (self.login, prev, self.email)
)
def set_value(self, email: str, values: dict):
""" Set certain values of this user.
Args:
email (str): The actual email of the User.
values (dict): The (updated) values.
Warning:
The email you get from the API might not be sufficient.
It needs to be the actual mail in the database.
"""
# the request requires email to be set...
values["email"] = email
result = self.gitea.requests_patch(User.USER_PATCH % self.username, data=values)
self.__initialize_user(self.username, result)
def delete(self):
""" Deletes this User. Also deletes all Repositories he owns.
Warning:
Invalidates this Objects Data.
"""
# TODO: Delete all Repositories of this user.
# Might not be deleteable otherwise.
self.gitea.requests_delete(User.ADMIN_DELETE_USER % self.username)
def get_heatmap(self):
results = self.gitea.requests_get(User.USER_HEATMAP % self.username)
results = [
(datetime.fromtimestamp(result["timestamp"]), result["contributions"])
for result in results
]
return results
class Repository(GiteaApiObject):
GET_API_OBJECT = """/repos/{owner}/{name}""" # <owner>, <reponame>
REPO_SEARCH = """/repos/search/%s""" # <reponame>
REPO_BRANCHES = """/repos/%s/%s/branches""" # <owner>, <reponame>
REPO_ISSUES = """/repos/%s/%s/issues""" # <owner, reponame>
REPO_DELETE = """/repos/%s/%s""" # <owner>, <reponame>
REPO_USER_TIME = """/repos/%s/%s/times/%s""" # <owner>, <reponame>, <username>
def __init__(self, gitea, id: int):
super(Repository, self).__init__(gitea, id=id)
fields_to_parsers = {
# dont know how to tell apart user and org as owner except form email being empty.
"owner": lambda gitea, r: Organization.parse_request(gitea,r) if r["email"] == "" else User.parse_request(gitea, r),
"updated_at": lambda gitea, t: Util.convert_time(t)
}
@classmethod
def request(cls, gitea, owner, name):
return cls._request(gitea, {"owner": owner, "name": name})
patchable_fields = {"allow_merge_commits", "allow_rebase", "allow_rebase_explicit", "allow_squash_merge",
"archived","default_branch","description","has_issues","has_pull_requests","has_wiki",
"ignore_whitespace_conflicts","name","private","website"}
def get_branches(self):
"""Get all the Branches of this Repository.
Returns: [Branch]
A list of Branches of this Repository.
"""
results = self.gitea.requests_get(
Repository.REPO_BRANCHES % (self.owner.username, self.name)
)
return [Branch.parse_request(self.gitea, result) for result in results]
def get_issues(self):
"""Get all Issues of this Repository.
Returns: [Issue]
A list of Issues of this Repository.
"""
return self.get_issues_state("open") + self.get_issues_state("closed")
def get_issues_state(self, state):
"""Get either open or closed Issues of this Repository.
Returns: [Issue]
A list of Issues of this Repository.
"""
assert state in ["open", "closed"]
index = 1
issues = []
while True:
# q=&type=all&sort=&state=open&milestone=0&assignee=0
results = self.gitea.requests_get(
Repository.REPO_ISSUES % (self.owner.username, self.name),
params={"page": index, "state": state},
)
if len(results) <= 0:
break
index += 1
for result in results:
issue = Issue.parse_request(self.gitea, result)
#again a hack because this infomation gets lost after the api call
setattr(issue, "repo", self.name)
setattr(issue, "owner", self.owner.username)
issues.append(issue)
return issues
def get_user_time(self, username, ignore_above=8):
"""Get the time a user spent working on this Repository.
Args:
username (str): Username of the user
ignore_above (int): above what amount this number should be taken as invalid (in hours)
Returns: int
Accumulated time the user worked in this Repository.
"""
if isinstance(username, User):
username = username.username
results = self.gitea.requests_get(
Repository.REPO_USER_TIME % (self.owner.username, self.name, username)
)
time = (
sum(
filter(
lambda t: t < ignore_above * 3600, map(lambda d: d["time"], results)
)
)
// 60
) / 60
return time
def delete(self):
""" Deletes this Repository.
Warning:
Invalidates this objects Data.
"""
self.gitea.requests_delete(
Repository.REPO_DELETE % (self.owner.username, self.name)
)
class Milestone(GiteaApiObject):
GET_API_OBJECT = """/repos/{owner}/{repo}/milestones/{number}""" # <owner, repo, id>
def __init__(self, gitea, id: int):
super(Milestone, self).__init__(gitea, id=id)
fields_to_parsers = {
"closed_at": lambda gitea, t: Util.convert_time(t),
"due_on": lambda gitea, t: Util.convert_time(t)
}
patchable_fields = {"allow_merge_commits", "allow_rebase", "allow_rebase_explicit", "allow_squash_merge",
"archived", "default_branch","description", "has_issues", "has_pull_requests",
"has_wiki", "ignore_whitespace_conflicts", "name", "private", "website"}
@classmethod
def request(cls, gitea, owner, repo, number):
return cls._request(gitea, {"owner":owner, "repo":repo, "number":number})
def full_print(self):
return str(vars(self))
class Issue(GiteaApiObject):
GET_API_OBJECT = """/repos/{owner}/{repo}/issues/{number}""" # <owner, repo, index>
GET_TIME = """/repos/%s/%s/issues/%s/times""" # <owner, repo, index>
closed = "closed"
open = "open"
def __init__(self, gitea, id: int):
super(Issue, self).__init__(gitea, id=id)
fields_to_parsers = {
"milestone": lambda gitea, m: Milestone.parse_request(gitea, m),
"user": lambda gitea, u: User.parse_request(gitea, u),
"assignee": lambda gitea, u: User.parse_request(gitea, u),
"assignees": lambda gitea, us: [User.parse_request(gitea, u) for u in us],
"state": lambda gitea, s: Issue.closed if s == "closed" else Issue.open
}
patchable_fields = {"assignee", "assignees", "body", "due_date", "milestone", "state", "title"}
@classmethod
def request(cls, gitea, owner, repo, number):
api_object = cls._request(gitea, {"owner":owner, "repo":repo, "number":number})
return api_object
def get_estimate_sum(self):
"""Returns the summed estimate-labeled values"""
return sum(
map(
lambda l: float(l["name"][10:]),
filter(lambda l: l["name"][:10] == "estimate: ", self.labels),
)
)
def get_time(self, user_id=None):
""" Returns the summed time on this issue for this user."""
return sum(
(t["time"] // 60) / 60
for t in self.gitea.requests_get(
Issue.GET_TIME % (self.owner, self.repo, self.number)
)
if user_id and t["user_id"] == user_id
)
class Branch(GiteaApiObject):
GET_API_OBJECT = """/repos/%s/%s/branches/%s""" # <owner>, <repo>, <ref>
def __init__(self, gitea, id: int):
super(Branch, self).__init__(gitea, id=id)
@classmethod
def request(cls, gitea, owner, repo, ref):
return cls._request(gitea, {"owner":owner, "repo":repo, "ref":ref})
class Team(GiteaApiObject):
GET_API_OBJECT = """/teams/{id}""" # <id>
ADD_USER = """/teams/%s/members/%s""" # <id, username to add>
ADD_REPO = """/teams/%s/repos/%s/%s""" # <id, org, repo>
TEAM_DELETE = """/teams/%s""" # <id>
GET_MEMBERS = """/teams/%s/members""" # <id>
GET_REPOS = """/teams/%s/repos""" # <id>
def __init__(self, gitea, id: int):
super(Team, self).__init__(gitea, id=id)
fields_to_parsers = {
"organization": lambda gitea, o: Organization.parse_request(gitea, o)
}
@classmethod
def request(cls, gitea, id):
return cls._request(gitea, {"id":id})
patchable_fields = {"description", "name", "permission", "units"}
def add(self, toAdd):
""" Adding User or Repository to Team.
Args:
toAdd (Repository/User): the object to add to this Team
"""
if isinstance(toAdd, Repository):
self.add_repo(toAdd)
return
if isinstance(toAdd, User):
self.add_user(toAdd)
return
logging.error("Could not add %s" % str(toAdd))
raise Exception("Could not add" + str(type(toAdd)))
def add_user(self, user: User):
""" Adding a User to this Team.
Args:
user (User): User to be added.
"""
self.gitea.requests_put(Team.ADD_USER % (self.id, user.login))
def add_repo(self, repo: Repository):
""" Adding a Repository to this Team.
Args:
repo (Repository): Repository to be added.
"""
self.gitea.requests_put(
Team.ADD_REPO % (self.id, self.organization.username, repo.name)
)
def get_members(self):
""" Get all members of this Team.
Returns: [User]
A list of Users in this Team.
"""
results = self.gitea.requests_get(Team.GET_MEMBERS % self.id)
return [
User.parse_request(self.gitea, result) for result in results
]
def get_repos(self):
""" Get all repos of this Team.
Returns: [Repository]
A list of Repositories of this Team
"""
results = self.gitea.requests_get(Team.GET_REPOS % self.id)
return [
Repository.parse_request(self.gitea, result)
for result in results
]
def delete(self):
""" Delete this Team.
"""
self.gitea.requests_delete(Team.TEAM_DELETE % self.id)
class Util:
@staticmethod
def convert_time(time: str) -> datetime:
"""
Gitea returns time in the formt "%Y-%m-%dT%H:%M:%S:%z" but with ":" in time zone notation.
This is a somewhat hacky solution.
"""
return datetime.strptime(
time[:-3] + "00", "%Y-%m-%dT%H:%M:%S%z"
)
class Gitea:
""" Has Gitea-authenticated session. Can Create Users/Organizations/Teams/...
Attr:
A few. Look at them.
"""
ADMIN_CREATE_USER = """/admin/users"""
ADMIN_REPO_CREATE = """/admin/users/%s/repos""" # <ownername>
GITEA_VERSION = """/version"""
GET_USER = """/user"""
CREATE_ORG = """/admin/users/%s/orgs""" # <username>
CREATE_TEAM = """/orgs/%s/teams""" # <orgname>
def __init__(self, url, token):
""" Initializing Gitea-instance.
Args:
url (str): URL of Gitea-server.
token (str): Token of acting User.
"""
self.headers = {
"Authorization": "token " + token,
"Content-type": "application/json",
}
self.url = url
self.requests = requests
def get_url(self, endpoint):
""" Returns the full API-URL.
Args:
endpoint (str): Path to be added on API-Path.
Returns: str
Combined total API endpoint.
"""
url = self.url + "/api/v1" + endpoint
logging.debug("Url: %s" % url)
return url
@staticmethod
def parse_result(result):
""" Parses the result-JSON to a dict.
Args:
result (str): Result-Object from requests (library).
Returns: dict
Parsed from JSON
"""
if result.text and len(result.text) > 3:
return json.loads(result.text)
return {}
def requests_get(self, endpoint, params={}):
""" Get parsed result from API-endpoint.
Args:
endpoint (str): Endpoint to request from.
Returns: (dict)
Parsed JSON-answer from the API.
Throws:
Exception, if answer status code is not ok.
"""
request = self.requests.get(
self.get_url(endpoint), headers=self.headers, params=params
)
if request.status_code not in [200, 201]:
if request.status_code in [404]:
raise NotFoundException()
logging.error(
"Received status code: %s (%s)" % (request.status_code, request.url)
)
if request.status_code in [403]:
raise Exception(
"Unauthorized: %s - Check your permissions and try again!"
% request.url
)
raise Exception(
"Received status code: %s (%s)" % (request.status_code, request.url)
)
return self.parse_result(request)
def requests_put(self, endpoint):
""" Get parsed result from API-endpoint.
Args:
endpoint (str): Endpoint to request from.
Throws:
Exception, if answer status code is not ok.
"""
request = self.requests.put(self.get_url(endpoint), headers=self.headers)
if request.status_code not in [204]:
logging.error(
"Received status code: %s (%s) %s"
% (request.status_code, request.url, request.text)
)
raise Exception(
"Received status code: %s (%s) %s"
% (request.status_code, request.url, request.text)
)
def requests_delete(self, endpoint):
""" Get parsed result from API-endpoint.
Args:
endpoint (str): Endpoint to request from.
Throws:
Exception, if answer status code is not ok.
"""
request = self.requests.delete(self.get_url(endpoint), headers=self.headers)
if request.status_code not in [204]:
logging.error(
"Received status code: %s (%s)" % (request.status_code, request.url)
)
raise Exception(
"Received status code: %s (%s) %s"
% (request.status_code, request.url, vars(request))
)
def requests_post(self, endpoint, data):
""" Post data to API-endpoint.
Args:
endpoint (str): endpoint of API to send data to
data (dict): Data to send.
Returns: (dict)
Parsed JSON-answer from the API.
Throws:
AlreadyExistsException, if 'already exists' in answer
Exception, if status code not ok
"""
request = self.requests.post(
self.get_url(endpoint), headers=self.headers, data=json.dumps(data)
)
if request.status_code not in [200, 201]:
if (
"already exists" in request.text
or "e-mail already in use" in request.text
):
logging.warning(request.text)
raise AlreadyExistsException()
logging.error(
"Received status code: %s (%s)" % (request.status_code, request.url)
)
logging.error("With info: %s (%s)" % (data, self.headers))
logging.error("Answer: %s" % request.text)
raise Exception(
"Received status code: %s (%s), %s"
% (request.status_code, request.url, request.text)
)
return self.parse_result(request)
def requests_patch(self, endpoint, data):
""" Patch data to API-endpoint.
Args:
endpoint (str): endpoint of API to send data to
data (dict): Data to patch.
Returns: (dict)
Parsed JSON-answer from the API. Usually it is empty.
Throws:
Exception, if status code not ok.
"""
request = self.requests.patch(
self.get_url(endpoint), headers=self.headers, data=json.dumps(data)
)
if request.status_code not in [200, 201]:
logging.error(
"Received status code: %s (%s) %s"
% (request.status_code, request.url, data)
)
raise Exception(
"Received status code: %s (%s) %s"
% (request.status_code, request.url, request.text)
)
return self.parse_result(request)
def get_users_search(self):
path = "/users/search"
return self.requests_get(path)
def delete_repos(self, username, reponame):
path = "/repos/" + username + "/" + reponame
return self.requests.delete(path)
def get_orgs_public_members_all(self, orgname):
path = "/orgs/" + orgname + "/public_members"
return self.requests_get(path)
def post_repos__forks(self, organization, repo, owner):
path = "/repos/" + owner + "/" + repo + "/forks"
return self.requests_post(path, data={"organization": organization})
def get_repos_forks(self, repo, owner):
path = "/repos/" + owner + "/" + repo + "/forks"
return self.requests_get(path)
def put_repos__subscription(self, username, reponame):
path = "/repos/" + username + "/" + reponame + "/subscription"
return self.requests.put(path)
def delete_repos_subscription(self, username, reponame):
path = "/repos/" + username + "/" + reponame + "/subscription"
return self.requests.delete(path)
def get_repos_subscription(self, username, reponame):
path = "/repos/" + username + "/" + reponame + "/subscription"
return self.requests_get(path)
def get_users_following(self, username):
path = "/users/" + username + "/following"
return self.requests_get(path)
def get_users_starred(self, username):
path = "/users/" + username + "/starred"
return self.requests_get(path)
def put_orgs_public_members(self, username, orgname):
path = "/orgs/" + orgname + "/public_members/" + username
return self.requests.put(path)
def delete_orgs_public_members(self, username, orgname):
path = "/orgs/" + orgname + "/public_members/" + username
return self.requests.delete(path)
def get_orgs_public_members(self, username, orgname):
path = "/orgs/" + orgname + "/public_members/" + username
return self.requests_get(path)
def post_org_repos(
self, name, description, private, auto_init, gitignores, license, readme, org
):
path = "/org/" + org + "/repos"
return self.requests_post(
path,
data={
"name": name,
"description": description,
"private": private,
"auto_init": auto_init,
"gitignores": gitignores,
"license": license,
"readme": readme,
},
)
def delete_orgs_members(self, orgname, username):
path = "/orgs/" + orgname + "/members/" + username
return self.requests.delete(path)
def post_repos__hooks(self, type, config, events, active, reponame, username):
path = "/repos/" + username + "/" + reponame + "/hooks"
return self.requests_post(
path,
data={"type": type, "config": config, "events": events, "active": active},
)
def get_repos_hooks(self, reponame, username):
path = "/repos/" + username + "/" + reponame + "/hooks"
return self.requests_get(path)
def post_repos_migrate(
self,
clone_addr,
auth_username,
auth_password,
uid,
repo_name,
mirror,
private,
description,
):
path = "/repos/migrate"
return self.requests_post(
path,
data={
"clone_addr": clone_addr,
"auth_username": auth_username,
"auth_password": auth_password,
"uid": uid,
"repo_name": repo_name,
"mirror": mirror,
"private": private,
"description": description,
},
)
def post_user_repos(
self, name, description, private, auto_init, gitignores, license, readme
):
path = "/user/repos"
return self.requests_post(
path,
data={
"name": name,
"description": description,
"private": private,
"auto_init": auto_init,
"gitignores": gitignores,
"license": license,
"readme": readme,
},
)
# # #
def get_user(self):
result = self.requests_get(Gitea.GET_USER)
return User.parse_request(self, result)
def get_version(self) -> str:
result = self.requests_get(Gitea.GITEA_VERSION)
return result["version"]
def create_user(
self,
userName: str,
email: str,
password: str,
change_pw=True,
sendNotify=True,
sourceId=0,
):
""" Create User.
Args:
userName (str): Name of the User that should be created.
email (str): Email of the User that should be created.
password (str): Password of the User that should be created.
change_pw (bool): Optional, True, if the User should change his Password
sendNotify (bool): Optional, True, if the User should be notified by Mail.
sourceId (int): Optional, 0, source by which the User can authentificate himself against.
Returns: User
The newly created User.
Throws:
AlreadyExistsException, if the User exists already
Exception, if something else went wrong.
"""
result = self.requests_post(
Gitea.ADMIN_CREATE_USER,
data={
"source_id": sourceId,
"login_name": userName,
"username": userName,
"email": email,
"password": password,
"send_notify": sendNotify,
"must_change_password": change_pw,
},
)
if "id" in result:
logging.info(
"Successfully created User %s <%s> (id %s)"
% (result["login"], result["email"], result["id"])
)
else:
logging.error(result["message"])
raise Exception("User not created... (gitea: %s)" % result["message"])
user = User.parse_request(self, result)
user.update_mail()
return user
def create_repo(
self,
repoOwner,
repoName: str,
description: str = "",
private: bool = False,
autoInit=True,
gitignores=None,
license=None,
readme="Default",
):
""" Create a Repository.
Args:
repoOwner (User/Organization): The owner of this Repository.
repoName (str): The name of this Repository.
description (str): Optional, None, short description of this Repository.
private (bool): Optional, False, if this Repository should be private.
autoInit (bool): Optional, True, if this Repository should auto-initialize.
gitignores ([str]): Optional, None, list of gitignores to add.
license (str): Optional, None, what sort of License to add.
readme (str): Optional, 'Default', which Readme to initialize with.
Returns: Repository
The newly created Repository
Throws:
AlreadyExistsException, if Repository exists already.
Exception, if something else went wrong.
"""
# although this only says user in the api, this also works for
# organizations
assert isinstance(repoOwner, User) or isinstance(repoOwner, Organization)
result = self.requests_post(
Gitea.ADMIN_REPO_CREATE % repoOwner.username,
data={
"name": repoName,
"description": description,
"private": private,
"auto_init": autoInit,
"gitignores": gitignores,
"license": license,
"readme": readme,
},
)
if "id" in result:
logging.info("Successfully created Repository %s " % result["name"])
else:
logging.error(result["message"])
raise Exception("Repository not created... (gitea: %s)" % result["message"])
return Repository.parse_request(self, result)
def create_org(
self,
owner: User,
orgName: str,
description: str,
location="",
website="",
full_name="",
):
assert isinstance(owner, User)
result = self.requests_post(
Gitea.CREATE_ORG % owner.username,
data={
"username": orgName,
"description": description,
"location": location,
"website": website,
"full_name": full_name,
},
)
if "id" in result:
logging.info("Successfully created Organization %s" % result["username"])
else:
logging.error("Organization not created... (gitea: %s)" % result["message"])
logging.error(result["message"])
raise Exception(
"Organization not created... (gitea: %s)" % result["message"]
)
return Organization.parse_request(self, result)
def create_team(
self,
org: Organization,
name: str,
description: str = "",
permission: str = "read",
units=[
"repo.code",
"repo.issues",
"repo.ext_issues",
"repo.wiki",
"repo.pulls",
"repo.releases",
"repo.ext_wiki",
],
):
""" Creates a Team.
Args:
org (Organization): Organization the Team will be part of.
name (str): The Name of the Team to be created.
description (str): Optional, None, short description of the new Team.
permission (str): Optional, 'read', What permissions the members
"""
result = self.requests_post(
Gitea.CREATE_TEAM % org.username,
data={
"name": name,
"description": description,
"permission": permission,
"units": units,
},
)
if "id" in result:
logging.info("Successfully created Team %s" % result["name"])
else:
logging.error("Team not created... (gitea: %s)" % result["message"])
logging.error(result["message"])
raise Exception("Team not created... (gitea: %s)" % result["message"])
api_object = Team.parse_request(self, result)
api_object.organization = org #fixes strange behaviour of gitea not returning a valid organization here.
return api_object