diff --git a/gitea/__init__.py b/gitea/__init__.py index 366c91c..97bbedd 100644 --- a/gitea/__init__.py +++ b/gitea/__init__.py @@ -1,5 +1,9 @@ from .gitea import ( Gitea, + NotFoundException, + AlreadyExistsException, +) +from .apiobject import ( User, Organization, Team, @@ -9,6 +13,9 @@ from .gitea import ( AlreadyExistsException, Issue, Milestone, + Commit, + Comment, + Content ) __all__ = [ @@ -21,5 +28,8 @@ __all__ = [ 'NotFoundException', 'AlreadyExistsException', 'Issue', - 'Milestone' + 'Milestone', + 'Commit', + 'Comment', + 'Content' ] diff --git a/gitea/apiobject.py b/gitea/apiobject.py new file mode 100644 index 0000000..38fc24f --- /dev/null +++ b/gitea/apiobject.py @@ -0,0 +1,716 @@ +import logging +from datetime import datetime +from typing import List, Tuple, Dict, Sequence, Optional, Union, Set + +from .baseapiobject import ReadonlyApiObject, ApiObject +from .exceptions import * + +class Organization(ApiObject): + """see https://try.gitea.io/api/swagger#/organization/orgGetAll""" + + API_OBJECT = """/orgs/{name}""" # + ORG_REPOS_REQUEST = """/orgs/%s/repos""" # + ORG_TEAMS_REQUEST = """/orgs/%s/teams""" # + ORG_TEAMS_CREATE = """/orgs/%s/teams""" # + ORG_GET_MEMBERS = """/orgs/%s/members""" # + ORG_IS_MEMBER = """/orgs/%s/members/%s""" # , + ORG_HEATMAP = """/users/%s/heatmap""" # + + def __init__(self, gitea): + super(Organization, self).__init__(gitea) + + def __eq__(self, other): + if not isinstance(other, Organization): return False + return self.gitea == other.gitea and self.name == other.name + + def __hash__(self): + return hash(self.gitea) ^ hash(self.name) + + @classmethod + def request(cls, gitea, name): + return cls._request(gitea, {"name": name}) + + @classmethod + def parse_response(cls, gitea, result): + api_object = super().parse_response(gitea, result) + # add "name" field to make this behave similar to users + Organization._add_read_property("name", result["username"], api_object) + return api_object + + patchable_fields = {"description", "full_name", "location", "visibility", "website"} + + def commit(self): + values = self.get_dirty_fields() + args = {"name": self.name} + self.gitea.requests_patch( + Organization.API_OBJECT.format(**args), data=values + ) + self.dirty_fields = {} + + def get_repositories(self) -> List["Repository"]: + results = self.gitea.requests_get_paginated( + Organization.ORG_REPOS_REQUEST % self.username + ) + return [Repository.parse_response(self.gitea, result) for result in results] + + def get_repository(self, name) -> "Repository": + repos = self.get_repositories() + for repo in repos: + if repo.name == name: + return repo + raise NotFoundException("Repository %s not existent in organization." % name) + + def get_teams(self) -> List["Team"]: + results = self.gitea.requests_get( + Organization.ORG_TEAMS_REQUEST % self.username + ) + teams = [Team.parse_response(self.gitea, result) for result in results] + # organisation seems to be missing using this request, so we add org manually + for t in teams: setattr(t, "_organization", self) + return teams + + def get_team(self, name) -> "Team": + teams = self.get_teams() + for team in teams: + if team.name == name: + return team + raise NotFoundException("Team not existent in organization.") + + def get_members(self) -> List["User"]: + results = self.gitea.requests_get(Organization.ORG_GET_MEMBERS % self.username) + return [User.parse_response(self.gitea, result) for result in results] + + def is_member(self, username) -> bool: + if isinstance(username, User): + username = username.username + try: + # returns 204 if its ok, 404 if its not + self.gitea.requests_get( + Organization.ORG_IS_MEMBER % (self.username, username) + ) + return True + except: + return False + + def remove_member(self, user: "User"): + path = f"/orgs/{self.username}/members/{user.username}" + self.gitea.requests_delete(path) + + def delete(self): + """ Delete this Organization. Invalidates this Objects data. Also deletes all Repositories owned by the User""" + for repo in self.get_repositories(): + repo.delete() + self.gitea.requests_delete(Organization.API_OBJECT.format(name=self.username)) + self.deleted = True + + def get_heatmap(self) -> List[Tuple[datetime, int]]: + results = self.gitea.requests_get(User.USER_HEATMAP % self.username) + results = [ + (datetime.fromtimestamp(result["timestamp"]), result["contributions"]) + for result in results + ] + return results + + +class User(ApiObject): + API_OBJECT = """/users/{name}""" # + USER_MAIL = """/user/emails?sudo=%s""" # + USER_PATCH = """/admin/users/%s""" # + ADMIN_DELETE_USER = """/admin/users/%s""" # + ADMIN_EDIT_USER = """/admin/users/{username}""" # + USER_HEATMAP = """/users/%s/heatmap""" # + + def __init__(self, gitea): + super(User, self).__init__(gitea) + self._emails = [] + + def __eq__(self, other): + if not isinstance(other, User): return False + return self.gitea == other.gitea and self.id == other.id + + def __hash__(self): + return hash(self.gitea) ^ hash(self.id) + + @property + def emails(self): + self.__request_emails() + return self._emails + + @classmethod + def request(cls, gitea, name) -> "User": + api_object = cls._request(gitea, {"name": name}) + return api_object + + patchable_fields = { + "active", + "admin", + "allow_create_organization", + "allow_git_hook", + "allow_import_local", + "email", + "full_name", + "location", + "login_name", + "max_repo_creation", + "must_change_password", + "password", + "prohibit_login", + "website", + } + + def commit(self, login_name: str, source_id: int = 0): + """ + Unfortunately it is necessary to require the login name + as well as the login source (that is not supplied when getting a user) for + changing a user. + Usually source_id is 0 and the login_name is equal to the username. + """ + values = self.get_dirty_fields() + values.update( + # api-doc says that the "source_id" is necessary; works without though + {"login_name": login_name, "source_id": source_id} + ) + args = {"username": self.username} + self.gitea.requests_patch(User.ADMIN_EDIT_USER.format(**args), data=values) + self.dirty_fields = {} + + def get_repositories(self) -> List["Repository"]: + """ Get all Repositories owned by this User.""" + url = f"/users/{self.username}/repos" + results = self.gitea.requests_get(url) + return [Repository.parse_response(self.gitea, result) for result in results] + + def get_orgs(self) -> List[Organization]: + """ Get all Organizations this user is a member of.""" + url = f"/users/{self.username}/orgs" + results = self.gitea.requests_get(url) + return [Organization.parse_response(self.gitea, result) for result in results] + + def get_teams(self) -> List['Team']: + url = f"/user/teams" + results = self.gitea.requests_get(url, sudo=self) + return [Team.parse_response(self.gitea, result) for result in results] + + def get_accessible_repos(self) -> List['Repository']: + """ Get all Repositories accessible by the logged in User.""" + results = self.gitea.requests_get("/user/repos", sudo=self) + return [Repository.parse_response(self, result) for result in results] + + def __request_emails(self): + result = self.gitea.requests_get(User.USER_MAIL % self.login) + # report if the adress changed by this + for mail in result: + self._emails.append(mail["email"]) + if mail["primary"]: + self._email = mail["email"] + + def delete(self): + """ Deletes this User. Also deletes all Repositories he owns.""" + self.gitea.requests_delete(User.ADMIN_DELETE_USER % self.username) + self.deleted = True + + def get_heatmap(self) -> List[Tuple[datetime, int]]: + results = self.gitea.requests_get(User.USER_HEATMAP % self.username) + results = [ + (datetime.fromtimestamp(result["timestamp"]), result["contributions"]) + for result in results + ] + return results + + +class Branch(ReadonlyApiObject): + + def __init__(self, gitea): + super(Branch, self).__init__(gitea) + + def __eq__(self, other): + if not isinstance(other, Branch): + return False + return self.commit == other.commit and self.name == other.name + + def __hash__(self): + return hash(self.commit["id"]) ^ hash(self.name) + + fields_to_parsers = { + # This is not a commit object + #"commit": lambda gitea, c: Commit.parse_response(gitea, c) + } + + @classmethod + def request(cls, gitea, owner, repo, ref): + return cls._request(gitea, {"owner": owner, "repo": repo, "ref": ref}) + + +class Repository(ApiObject): + API_OBJECT = """/repos/{owner}/{name}""" # , + REPO_IS_COLLABORATOR = """/repos/%s/%s/collaborators/%s""" # , , + REPO_SEARCH = """/repos/search/%s""" # + REPO_BRANCHES = """/repos/%s/%s/branches""" # , + REPO_ISSUES = """/repos/%s/%s/issues""" # + REPO_DELETE = """/repos/%s/%s""" # , + REPO_TIMES = """/repos/%s/%s/times""" # , + REPO_USER_TIME = """/repos/%s/%s/times/%s""" # , , + REPO_COMMITS = "/repos/%s/%s/commits" # , + REPO_TRANSFER = "/repos/{owner}/{repo}/transfer" + REPO_CONTENTS = "/repos/{owner}/{repo}/contents" + REPO_CONTENT = """/repos/{owner}/{repo}/contents/{filepath}""" + REPO_MILESTONES = """/repos/{owner}/{repo}/milestones""" + + def __init__(self, gitea): + super(Repository, self).__init__(gitea) + + def __eq__(self, other): + if not isinstance(other, Repository): return False + return self.owner == other.owner and self.name == other.name + + def __hash__(self): + return hash(self.owner) ^ hash(self.name) + + fields_to_parsers = { + # dont know how to tell apart user and org as owner except form email being empty. + "owner": lambda gitea, r: Organization.parse_response(gitea, r) + if r["email"] == "" + else User.parse_response(gitea, r), + "updated_at": lambda gitea, t: Util.convert_time(t), + } + + @classmethod + def request(cls, gitea, owner, name): + return cls._request(gitea, {"owner": owner, "name": name}) + + patchable_fields = { + "allow_merge_commits", + "allow_rebase", + "allow_rebase_explicit", + "allow_squash_merge", + "archived", + "default_branch", + "description", + "has_issues", + "has_pull_requests", + "has_wiki", + "ignore_whitespace_conflicts", + "name", + "private", + "website", + } + + def get_branches(self) -> List['Branch']: + """Get all the Branches of this Repository.""" + results = self.gitea.requests_get( + Repository.REPO_BRANCHES % (self.owner.username, self.name) + ) + return [Branch.parse_response(self.gitea, result) for result in results] + + def add_branch(self, create_from: Branch, newname: str) -> "Commit": + """Add a branch to the repository""" + # Note: will only work with gitea 1.13 or higher! + data = {"new_branch_name": newname, "old_branch_name": create_from.name} + result = self.gitea.requests_post( + Repository.REPO_BRANCHES % (self.owner.username, self.name), data=data + ) + return Branch.parse_response(self.gitea, result) + + def get_issues(self) -> List["Issue"]: + """Get all Issues of this Repository (open and closed)""" + return self.get_issues_state(Issue.OPENED) + self.get_issues_state(Issue.CLOSED) + + def get_commits(self) -> List["Commit"]: + """Get all the Commits of this Repository.""" + try: + results = self.gitea.requests_get_commits( + Repository.REPO_COMMITS % (self.owner.username, self.name) + ) + except ConflictException as err: + logging.warning(err) + logging.warning( + "Repository %s/%s is Empty" % (self.owner.username, self.name) + ) + results = [] + return [Commit.parse_response(self.gitea, result) for result in results] + + def get_issues_state(self, state) -> List["Issue"]: + """Get issues of state Issue.open or Issue.closed of a repository.""" + assert state in [Issue.OPENED, Issue.CLOSED] + issues = [] + # "page": -1 is returning _all_ issues instead of pages. Hopefully this is intended behaviour. + data = {"page": -1, "state": state} + results = self.gitea.requests_get( + Repository.REPO_ISSUES % (self.owner.username, self.name), params=data + ) + for result in results: + issue = Issue.parse_response(self.gitea, result) + # adding data not contained in the issue answer + setattr(issue, "repo", self.name) + setattr(issue, "owner", self.owner) + issues.append(issue) + return issues + + def get_times(self): + results = self.gitea.requests_get( + Repository.REPO_TIMES % (self.owner.username, self.name) + ) + return results + + def get_user_time(self, username) -> float: + if isinstance(username, User): + username = username.username + results = self.gitea.requests_get( + Repository.REPO_USER_TIME % (self.owner.username, self.name, username) + ) + time = sum(r["time"] for r in results) + return time + + def get_full_name(self) -> str: + return self.owner.username + "/" + self.name + + def create_issue(self, title, assignees=[], description="") -> ApiObject: + data = { + "assignees": assignees, + "body": description, + "closed": False, + "title": title, + } + result = self.gitea.requests_post( + Repository.REPO_ISSUES % (self.owner.username, self.name), data=data + ) + return Issue.parse_response(self.gitea, result) + + def create_milestone(self, title: str, description: str, due_date: str = None, state:str = "open") -> "Milestone": + url = Repository.REPO_MILESTONES.format(owner=self.owner.username, repo=self.name) + data= {"title": title, "description": description, "state": state} + if due_date: data["due_date"] = due_date + result = self.gitea.requests_post(url, data=data) + return Milestone.parse_response(self.gitea, result) + + def create_gitea_hook(self, hook_url: str, events: List[str]): + url = f"/repos/{self.owner.username}/{self.name}/hooks" + data = { + "type": "gitea", + "config": {"content_type": "json", "url": hook_url}, + "events": events, + "active": True, + } + return self.gitea.requests_post(url, data=data) + + def list_hooks(self): + url = f"/repos/{self.owner.username}/{self.name}/hooks" + return self.gitea.requests_get(url) + + def delete_hook(self, id: str): + url = f"/repos/{self.owner.username}/{self.name}/hooks/{id}" + self.gitea.requests_delete(url) + + def is_collaborator(self, username) -> bool: + if isinstance(username, User): + username = username.username + try: + # returns 204 if its ok, 404 if its not + self.gitea.requests_get( + Repository.REPO_IS_COLLABORATOR + % (self.owner.username, self.name, username) + ) + return True + except: + return False + + def get_users_with_access(self) -> Sequence[User]: + url = f"/repos/{self.owner.username}/{self.name}/collaborators" + response = self.gitea.requests_get(url) + collabs = [User.parse_response(self.gitea, user) for user in response] + if isinstance(self.owner, User): + return collabs + [self.owner] + else: + # owner must be org + teams = self.owner.get_teams() + for team in teams: + team_repos = team.get_repos() + if self.name in [n.name for n in team_repos]: + collabs += team.get_members() + return collabs + + def remove_collaborator(self, user_name: str): + url = f"/repos/{self.owner.username}/{self.name}/collaborators/{user_name}" + self.gitea.requests_delete(url) + + def transfer_ownership(self, new_owner: Union["User", "Organization"], new_teams: Set["Team"] = frozenset()): + url = Repository.REPO_TRANSFER.format(owner=self.owner.username, repo=self.name) + data = {"new_owner": new_owner.username} + if isinstance(new_owner, Organization): + new_team_ids = [team.id for team in new_teams if team in new_owner.get_teams()] + data["team_ids"] = new_team_ids + self.gitea.requests_post(url, data=data) + # TODO: make sure this instance is either updated or discarded + + def get_git_content(self: str = None, commit : "Commit" = None) -> List["Content"]: + """https://git.sopranium.de/api/swagger#/repository/repoGetContentsList""" + url = Repository.REPO_CONTENTS.format(owner=self.owner.username, repo=self.name) + data = {"ref": "HEAD" if commit is None else commit.sha} + result = [Content.parse_response(self.gitea, f) for f in self.gitea.requests_get(url, data)] + return result + + def get_file_content(self, content: "Content", commit : "Commit" = None) -> Union[str, List["Content"]]: + """https://git.sopranium.de/api/swagger#/repository/repoGetContents""" + url = Repository.REPO_CONTENT.format(owner=self.owner.username, + repo=self.name, filepath=content.path) + data = {"ref": "HEAD" if commit is None else commit.sha} + if content.type == Content.FILE: + return self.gitea.requests_get(url, data)["content"] + else: + return [Content.parse_response(self.gitea, f) for f in self.gitea.requests_get(url, data)] + + def delete(self): + self.gitea.requests_delete( + Repository.REPO_DELETE % (self.owner.username, self.name) + ) + self.deleted = True + +class Milestone(ApiObject): + API_OBJECT = """/repos/{owner}/{repo}/milestones/{number}""" # + + def __init__(self, gitea): + super(Milestone, self).__init__(gitea) + + def __eq__(self, other): + if not isinstance(other, Milestone): return False + return self.gitea == other.gitea and self.id == other.id + + def __hash__(self): + return hash(self.gitea) ^ hash(self.id) + + fields_to_parsers = { + "closed_at": lambda gitea, t: Util.convert_time(t), + "due_on": lambda gitea, t: Util.convert_time(t), + } + + patchable_fields = { + "allow_merge_commits", + "allow_rebase", + "allow_rebase_explicit", + "allow_squash_merge", + "archived", + "default_branch", + "description", + "has_issues", + "has_pull_requests", + "has_wiki", + "ignore_whitespace_conflicts", + "name", + "private", + "website", + } + + @classmethod + def request(cls, gitea, owner, repo, number): + return cls._request(gitea, {"owner": owner, "repo": repo, "number": number}) + + +class Comment(ApiObject): + + def __init__(self, gitea): + super(Comment, self).__init__(gitea) + + def __eq__(self, other): + if not isinstance(other, Comment): return False + return self.repo == other.repo and self.id == other.id + + def __hash__(self): + return hash(self.repo) ^ hash(self.id) + + fields_to_parsers = { + "user": lambda gitea, r: User.parse_response(gitea, r), + "created_at": lambda gitea, t: Util.convert_time(t), + "updated_at": lambda gitea, t: Util.convert_time(t), + } + + +class Commit(ReadonlyApiObject): + def __init__(self, gitea): + super(Commit, self).__init__(gitea) + + fields_to_parsers = { + # NOTE: api may return None for commiters that are no gitea users + "author": lambda gitea, u: User.parse_response(gitea, u) if u else None + } + + def __eq__(self, other): + if not isinstance(other, Commit): return False + return self.sha == other.sha + + def __hash__(self): + return hash(self.sha) + + @classmethod + def parse_response(cls, gitea, result): + commit_cache = result["commit"] + api_object = cls(gitea) + cls._initialize(gitea, api_object, result) + # inner_commit for legacy reasons + Commit._add_read_property("inner_commit", commit_cache, api_object) + return api_object + + +class Issue(ApiObject): + API_OBJECT = """/repos/{owner}/{repo}/issues/{number}""" # + GET_TIME = """/repos/%s/%s/issues/%s/times""" # + GET_COMMENTS = """/repos/%s/%s/issues/comments""" + CREATE_ISSUE = """/repos/{owner}/{repo}/issues""" + + OPENED = "open" + CLOSED = "closed" + + def __init__(self, gitea): + super(Issue, self).__init__(gitea) + + def __eq__(self, other): + if not isinstance(other, Issue): return False + return self.repo == other.repo and self.id == other.id + + def __hash__(self): + return hash(self.repo) ^ hash(self.id) + + fields_to_parsers = { + "milestone": lambda gitea, m: Milestone.parse_response(gitea, m), + "user": lambda gitea, u: User.parse_response(gitea, u), + "assignee": lambda gitea, u: User.parse_response(gitea, u), + "assignees": lambda gitea, us: [User.parse_response(gitea, u) for u in us], + "state": lambda gitea, s: Issue.CLOSED if s == "closed" else Issue.OPENED, + } + + patchable_fields = { + "assignee", + "assignees", + "body", + "due_date", + "milestone", + "state", + "title", + } + + @classmethod + def request(cls, gitea, owner, repo, number): + api_object = cls._request(gitea, {"owner": owner, "repo": repo, "number": number}) + return api_object + + @classmethod + def create_issue(cls, gitea, repo: Repository, title: str, body: str = ""): + args = {"owner": repo.owner.username, "repo": repo.name} + data = {"title": title, "body": body} + result = gitea.requests_post(Issue.CREATE_ISSUE.format(**args), data=data) + return Issue.parse_response(gitea, result) + + def get_time_sum(self, user: User) -> int: + results = self.gitea.requests_get( + Issue.GET_TIME % (self.owner.username, self.repo, self.number) + ) + return sum( + result["time"] + for result in results + if result and result["user_id"] == user.id + ) + + def get_times(self) -> Optional[Dict]: + return self.gitea.requests_get( + Issue.GET_TIME % (self.owner.username, self.repo, self.number) + ) + + def delete_time(self, time_id: str): + path = f"/repos/{self.owner.username}/{self.repo}/issues/{self.number}/times/{time_id}" + self.gitea.requests_delete(path) + + def add_time(self, time: int, created: str = None, user_name: str = None): + path = f"/repos/{self.owner.username}/{self.repo}/issues/{self.number}/times" + self.gitea.requests_post( + path, data={"created": created, "time": int(time), "user_name": user_name} + ) + + def get_comments(self) -> List[ApiObject]: + results = self.gitea.requests_get( + Issue.GET_COMMENTS % (self.owner.username, self.repo) + ) + allProjectComments = [ + Comment.parse_response(self.gitea, result) for result in results + ] + # Comparing the issue id with the URL seems to be the only (!) way to get to the comments of one issue + return [ + comment + for comment in allProjectComments + if comment.issue_url.endswith("/" + str(self.number)) + ] + + +class Team(ApiObject): + API_OBJECT = """/teams/{id}""" # + ADD_USER = """/teams/%s/members/%s""" # + ADD_REPO = """/teams/%s/repos/%s/%s""" # + TEAM_DELETE = """/teams/%s""" # + GET_MEMBERS = """/teams/%s/members""" # + GET_REPOS = """/teams/%s/repos""" # + + def __init__(self, gitea): + super(Team, self).__init__(gitea) + + def __eq__(self, other): + if not isinstance(other, Team): return False + return self.organization == other.organization and self.id == other.id + + def __hash__(self): + return hash(self.organization) ^ hash(self.id) + + fields_to_parsers = { + "organization": lambda gitea, o: Organization.parse_response(gitea, o) + } + + @classmethod + def request(cls, gitea, organization, team): + return cls._request(gitea, {"id": id}) + + patchable_fields = {"description", "name", "permission", "units"} + + def add_user(self, user: User): + self.gitea.requests_put(Team.ADD_USER % (self.id, user.login)) + + def add_repo(self, org: Organization, repo: Repository): + self.gitea.requests_put(Team.ADD_REPO % (self.id, org, repo.name)) + + def get_members(self): + """ Get all users assigned to the team. """ + results = self.gitea.requests_get(Team.GET_MEMBERS % self.id) + return [User.parse_response(self.gitea, result) for result in results] + + def get_repos(self): + """ Get all repos of this Team.""" + results = self.gitea.requests_get(Team.GET_REPOS % self.id) + return [Repository.parse_response(self.gitea, result) for result in results] + + def delete(self): + self.gitea.requests_delete(Team.TEAM_DELETE % self.id) + self.deleted = True + + def remove_team_member(self, user_name: str): + url = f"/teams/{self.id}/members/{user_name}" + self.gitea.requests_delete(url) + +class Content(ReadonlyApiObject): + FILE = "file" + + def __init__(self, gitea): + super(Content, self).__init__(gitea) + + def __eq__(self, other): + if not isinstance(other, Team): return False + return self.repo == self.repo and self.sha == other.sha and self.name == other.name + + def __hash__(self): + return hash(self.repo) ^ hash(self.sha) ^ hash(self.name) + + +class Util: + @staticmethod + def convert_time(time: str) -> datetime: + """ Parsing of strange Gitea time format ("%Y-%m-%dT%H:%M:%S:%z" but with ":" in time zone notation)""" + try: + return datetime.strptime(time[:-3] + "00", "%Y-%m-%dT%H:%M:%S%z") + except ValueError: + return datetime.strptime(time[:-3] + "00", "%Y-%m-%dT%H:%M:%S") + diff --git a/gitea/baseapiobject.py b/gitea/baseapiobject.py index 89fca9a..a9499a8 100644 --- a/gitea/baseapiobject.py +++ b/gitea/baseapiobject.py @@ -1,7 +1,5 @@ from .exceptions import ObjectIsInvalid, MissiongEqualyImplementation,RawRequestEndpointMissing - - class ReadonlyApiObject: def __init__(self, gitea): @@ -23,7 +21,7 @@ class ReadonlyApiObject: @classmethod def request(cls, gitea, id): - if hasattr("GET_API_OBJECT", cls): + if hasattr("API_OBJECT", cls): return cls._request(gitea) else: raise RawRequestEndpointMissing() @@ -41,7 +39,7 @@ class ReadonlyApiObject: @classmethod def _get_gitea_api_object(cls, gitea, args): """Retrieving an object always as GET_API_OBJECT """ - return gitea.requests_get(cls.GET_API_OBJECT.format(**args)) + return gitea.requests_get(cls.API_OBJECT.format(**args)) @classmethod def parse_response(cls, gitea, result) -> "ReadonlyApiObject": diff --git a/gitea/gitea.py b/gitea/gitea.py index 037162c..f7f1906 100644 --- a/gitea/gitea.py +++ b/gitea/gitea.py @@ -1,746 +1,15 @@ import json import logging -from datetime import datetime -from typing import List, Tuple, Dict, Sequence, Optional, Union, Set +from typing import List, Dict import requests from httpcache import CachingHTTPAdapter -from .baseapiobject import ReadonlyApiObject, ApiObject -from .exceptions import * - - -class Organization(ApiObject): - """see https://try.gitea.io/api/swagger#/organization/orgGetAll""" - - GET_API_OBJECT = """/orgs/{name}""" # - PATCH_API_OBJECT = """/orgs/{name}""" # - ORG_REPOS_REQUEST = """/orgs/%s/repos""" # - ORG_TEAMS_REQUEST = """/orgs/%s/teams""" # - ORG_TEAMS_CREATE = """/orgs/%s/teams""" # - ORG_GET_MEMBERS = """/orgs/%s/members""" # - ORG_IS_MEMBER = """/orgs/%s/members/%s""" # , - ORG_DELETE = """/orgs/%s""" # - ORG_HEATMAP = """/users/%s/heatmap""" # - - def __init__(self, gitea): - super(Organization, self).__init__(gitea) - - def __eq__(self, other): - if not isinstance(other, Organization): return False - return self.gitea == other.gitea and self.name == other.name - - def __hash__(self): - return hash(self.gitea) ^ hash(self.name) - - @classmethod - def request(cls, gitea, name): - return cls._request(gitea, {"name": name}) - - @classmethod - def parse_response(cls, gitea, result): - api_object = super().parse_response(gitea, result) - # add "name" field to make this behave similar to users - Organization._add_read_property("name", result["username"], api_object) - return api_object - - patchable_fields = {"description", "full_name", "location", "visibility", "website"} - - def commit(self): - values = self.get_dirty_fields() - args = {"name": self.name} - self.gitea.requests_patch( - Organization.PATCH_API_OBJECT.format(**args), data=values - ) - self.dirty_fields = {} - - def get_repositories(self) -> List["Repository"]: - results = self.gitea.requests_get_paginated( - Organization.ORG_REPOS_REQUEST % self.username - ) - return [Repository.parse_response(self.gitea, result) for result in results] - - def get_repository(self, name) -> "Repository": - repos = self.get_repositories() - for repo in repos: - if repo.name == name: - return repo - raise NotFoundException("Repository %s not existent in organization." % name) - - def get_teams(self) -> List["Team"]: - results = self.gitea.requests_get( - Organization.ORG_TEAMS_REQUEST % self.username - ) - teams = [Team.parse_response(self.gitea, result) for result in results] - # organisation seems to be missing using this request, so we add org manually - for t in teams: setattr(t, "_organization", self) - return teams - - def get_team(self, name) -> "Team": - teams = self.get_teams() - for team in teams: - if team.name == name: - return team - raise NotFoundException("Team not existent in organization.") - - def get_members(self) -> List["User"]: - results = self.gitea.requests_get(Organization.ORG_GET_MEMBERS % self.username) - return [User.parse_response(self.gitea, result) for result in results] - - def is_member(self, username) -> bool: - if isinstance(username, User): - username = username.username - try: - # returns 204 if its ok, 404 if its not - self.gitea.requests_get( - Organization.ORG_IS_MEMBER % (self.username, username) - ) - return True - except: - return False - - def remove_member(self, user: "User"): - path = f"/orgs/{self.username}/members/{user.username}" - self.gitea.requests_delete(path) - - def delete(self): - """ Delete this Organization. Invalidates this Objects data. Also deletes all Repositories owned by the User""" - for repo in self.get_repositories(): - repo.delete() - self.gitea.requests_delete(Organization.ORG_DELETE % self.username) - self.deleted = True - - def get_heatmap(self) -> List[Tuple[datetime, int]]: - results = self.gitea.requests_get(User.USER_HEATMAP % self.username) - results = [ - (datetime.fromtimestamp(result["timestamp"]), result["contributions"]) - for result in results - ] - return results - - -class User(ApiObject): - GET_API_OBJECT = """/users/{name}""" # - USER_MAIL = """/user/emails?sudo=%s""" # - USER_PATCH = """/admin/users/%s""" # - ADMIN_DELETE_USER = """/admin/users/%s""" # - ADMIN_EDIT_USER = """/admin/users/{username}""" # - USER_HEATMAP = """/users/%s/heatmap""" # - - def __init__(self, gitea): - super(User, self).__init__(gitea) - self._emails = [] - - def __eq__(self, other): - if not isinstance(other, User): return False - return self.gitea == other.gitea and self.id == other.id - - def __hash__(self): - return hash(self.gitea) ^ hash(self.id) - - @property - def emails(self): - self.__request_emails() - return self._emails - - @classmethod - def request(cls, gitea, name) -> "User": - api_object = cls._request(gitea, {"name": name}) - return api_object - - patchable_fields = { - "active", - "admin", - "allow_create_organization", - "allow_git_hook", - "allow_import_local", - "email", - "full_name", - "location", - "login_name", - "max_repo_creation", - "must_change_password", - "password", - "prohibit_login", - "website", - } - - def commit(self, login_name: str, source_id: int = 0): - """ - Unfortunately it is necessary to require the login name - as well as the login source (that is not supplied when getting a user) for - changing a user. - Usually source_id is 0 and the login_name is equal to the username. - """ - values = self.get_dirty_fields() - values.update( - # api-doc says that the "source_id" is necessary; works without though - {"login_name": login_name, "source_id": source_id} - ) - args = {"username": self.username} - self.gitea.requests_patch(User.ADMIN_EDIT_USER.format(**args), data=values) - self.dirty_fields = {} - - def get_repositories(self) -> List["Repository"]: - """ Get all Repositories owned by this User.""" - url = f"/users/{self.username}/repos" - results = self.gitea.requests_get(url) - return [Repository.parse_response(self.gitea, result) for result in results] - - def get_orgs(self) -> List[Organization]: - """ Get all Organizations this user is a member of.""" - url = f"/users/{self.username}/orgs" - results = self.gitea.requests_get(url) - return [Organization.parse_response(self.gitea, result) for result in results] - - def get_teams(self) -> List['Team']: - url = f"/user/teams" - results = self.gitea.requests_get(url, sudo=self) - return [Team.parse_response(self.gitea, result) for result in results] - - def get_accessible_repos(self) -> List['Repository']: - """ Get all Repositories accessible by the logged in User.""" - results = self.gitea.requests_get("/user/repos", sudo=self) - return [Repository.parse_response(self, result) for result in results] - - def __request_emails(self): - result = self.gitea.requests_get(User.USER_MAIL % self.login) - # report if the adress changed by this - for mail in result: - self._emails.append(mail["email"]) - if mail["primary"]: - self._email = mail["email"] - - def delete(self): - """ Deletes this User. Also deletes all Repositories he owns.""" - self.gitea.requests_delete(User.ADMIN_DELETE_USER % self.username) - self.deleted = True - - def get_heatmap(self) -> List[Tuple[datetime, int]]: - results = self.gitea.requests_get(User.USER_HEATMAP % self.username) - results = [ - (datetime.fromtimestamp(result["timestamp"]), result["contributions"]) - for result in results - ] - return results - - -class Branch(ReadonlyApiObject): - GET_API_OBJECT = """/repos/%s/%s/branches/%s""" # , , - - def __init__(self, gitea): - super(Branch, self).__init__(gitea) - - def __eq__(self, other): - if not isinstance(other, Branch): - return False - return self.commit == other.commit and self.name == other.name - - def __hash__(self): - return hash(self.commit["id"]) ^ hash(self.name) - - fields_to_parsers = { - # This is not a commit object - #"commit": lambda gitea, c: Commit.parse_response(gitea, c) - } - - @classmethod - def request(cls, gitea, owner, repo, ref): - return cls._request(gitea, {"owner": owner, "repo": repo, "ref": ref}) - - -class Repository(ApiObject): - REPO_IS_COLLABORATOR = ( - """/repos/%s/%s/collaborators/%s""" # , , - ) - GET_API_OBJECT = """/repos/{owner}/{name}""" # , - REPO_SEARCH = """/repos/search/%s""" # - REPO_BRANCHES = """/repos/%s/%s/branches""" # , - REPO_ISSUES = """/repos/%s/%s/issues""" # - REPO_DELETE = """/repos/%s/%s""" # , - REPO_TIMES = """/repos/%s/%s/times""" # , - REPO_USER_TIME = """/repos/%s/%s/times/%s""" # , , - REPO_COMMITS = "/repos/%s/%s/commits" # , - REPO_TRANSFER = "/repos/{owner}/{repo}/transfer" - REPO_CONTENTS = "/repos/{owner}/{repo}/contents" - REPO_CONTENT = """/repos/{owner}/{repo}/contents/{filepath}""" - REPO_MILESTONES = """/repos/{owner}/{repo}/milestones""" - - def __init__(self, gitea): - super(Repository, self).__init__(gitea) - - def __eq__(self, other): - if not isinstance(other, Repository): return False - return self.owner == other.owner and self.name == other.name - - def __hash__(self): - return hash(self.owner) ^ hash(self.name) - - fields_to_parsers = { - # dont know how to tell apart user and org as owner except form email being empty. - "owner": lambda gitea, r: Organization.parse_response(gitea, r) - if r["email"] == "" - else User.parse_response(gitea, r), - "updated_at": lambda gitea, t: Util.convert_time(t), - } - - @classmethod - def request(cls, gitea, owner, name): - return cls._request(gitea, {"owner": owner, "name": name}) - - patchable_fields = { - "allow_merge_commits", - "allow_rebase", - "allow_rebase_explicit", - "allow_squash_merge", - "archived", - "default_branch", - "description", - "has_issues", - "has_pull_requests", - "has_wiki", - "ignore_whitespace_conflicts", - "name", - "private", - "website", - } - - def get_branches(self) -> List['Branch']: - """Get all the Branches of this Repository.""" - results = self.gitea.requests_get( - Repository.REPO_BRANCHES % (self.owner.username, self.name) - ) - return [Branch.parse_response(self.gitea, result) for result in results] - - def add_branch(self, create_from: Branch, newname: str) -> "Commit": - """Add a branch to the repository""" - # Note: will only work with gitea 1.13 or higher! - data = {"new_branch_name": newname, "old_branch_name": create_from.name} - result = self.gitea.requests_post( - Repository.REPO_BRANCHES % (self.owner.username, self.name), data=data - ) - return Branch.parse_response(self.gitea, result) - - def get_issues(self) -> List["Issue"]: - """Get all Issues of this Repository (open and closed)""" - return self.get_issues_state(Issue.OPENED) + self.get_issues_state(Issue.CLOSED) - - def get_commits(self) -> List["Commit"]: - """Get all the Commits of this Repository.""" - try: - results = self.gitea.requests_get_commits( - Repository.REPO_COMMITS % (self.owner.username, self.name) - ) - except ConflictException as err: - logging.warning(err) - logging.warning( - "Repository %s/%s is Empty" % (self.owner.username, self.name) - ) - results = [] - return [Commit.parse_response(self.gitea, result) for result in results] - - def get_issues_state(self, state) -> List["Issue"]: - """Get issues of state Issue.open or Issue.closed of a repository.""" - assert state in [Issue.OPENED, Issue.CLOSED] - issues = [] - # "page": -1 is returning _all_ issues instead of pages. Hopefully this is intended behaviour. - data = {"page": -1, "state": state} - results = self.gitea.requests_get( - Repository.REPO_ISSUES % (self.owner.username, self.name), params=data - ) - for result in results: - issue = Issue.parse_response(self.gitea, result) - # adding data not contained in the issue answer - setattr(issue, "repo", self.name) - setattr(issue, "owner", self.owner) - issues.append(issue) - return issues - - def get_times(self): - results = self.gitea.requests_get( - Repository.REPO_TIMES % (self.owner.username, self.name) - ) - return results - - def get_user_time(self, username) -> float: - if isinstance(username, User): - username = username.username - results = self.gitea.requests_get( - Repository.REPO_USER_TIME % (self.owner.username, self.name, username) - ) - time = sum(r["time"] for r in results) - return time - - def get_full_name(self) -> str: - return self.owner.username + "/" + self.name - - def create_issue(self, title, assignees=[], description="") -> ApiObject: - data = { - "assignees": assignees, - "body": description, - "closed": False, - "title": title, - } - result = self.gitea.requests_post( - Repository.REPO_ISSUES % (self.owner.username, self.name), data=data - ) - return Issue.parse_response(self.gitea, result) - - def create_milestone(self, title: str, description: str, due_date: str = None, state:str = "open") -> "Milestone": - url = Repository.REPO_MILESTONES.format(owner=self.owner.username, repo=self.name) - data= {"title": title, "description": description, "state": state} - if due_date: data["due_date"] = due_date - result = self.gitea.requests_post(url, data=data) - return Milestone.parse_response(self.gitea, result) - - def create_gitea_hook(self, hook_url: str, events: List[str]): - url = f"/repos/{self.owner.username}/{self.name}/hooks" - data = { - "type": "gitea", - "config": {"content_type": "json", "url": hook_url}, - "events": events, - "active": True, - } - return self.gitea.requests_post(url, data=data) - - def list_hooks(self): - url = f"/repos/{self.owner.username}/{self.name}/hooks" - return self.gitea.requests_get(url) - - def delete_hook(self, id: str): - url = f"/repos/{self.owner.username}/{self.name}/hooks/{id}" - self.gitea.requests_delete(url) - - def is_collaborator(self, username) -> bool: - if isinstance(username, User): - username = username.username - try: - # returns 204 if its ok, 404 if its not - self.gitea.requests_get( - Repository.REPO_IS_COLLABORATOR - % (self.owner.username, self.name, username) - ) - return True - except: - return False - - def get_users_with_access(self) -> Sequence[User]: - url = f"/repos/{self.owner.username}/{self.name}/collaborators" - response = self.gitea.requests_get(url) - collabs = [User.parse_response(self.gitea, user) for user in response] - if isinstance(self.owner, User): - return collabs + [self.owner] - else: - # owner must be org - teams = self.owner.get_teams() - for team in teams: - team_repos = team.get_repos() - if self.name in [n.name for n in team_repos]: - collabs += team.get_members() - return collabs - - def remove_collaborator(self, user_name: str): - url = f"/repos/{self.owner.username}/{self.name}/collaborators/{user_name}" - self.gitea.requests_delete(url) - - def transfer_ownership(self, new_owner: Union["User", "Organization"], new_teams: Set["Team"] = frozenset()): - url = Repository.REPO_TRANSFER.format(owner=self.owner.username, repo=self.name) - data = {"new_owner": new_owner.username} - if isinstance(new_owner, Organization): - new_team_ids = [team.id for team in new_teams if team in new_owner.get_teams()] - data["team_ids"] = new_team_ids - self.gitea.requests_post(url, data=data) - # TODO: make sure this instance is either updated or discarded - - def get_git_content(self: str = None, commit : "Commit" = None) -> List["Content"]: - """https://git.sopranium.de/api/swagger#/repository/repoGetContentsList""" - url = Repository.REPO_CONTENTS.format(owner=self.owner.username, repo=self.name) - data = {"ref": "HEAD" if commit is None else commit.sha} - result = [Content.parse_response(self.gitea, f) for f in self.gitea.requests_get(url, data)] - return result - - def get_file_content(self, content: "Content", commit : "Commit" = None) -> Union[str, List["Content"]]: - """https://git.sopranium.de/api/swagger#/repository/repoGetContents""" - url = Repository.REPO_CONTENT.format(owner=self.owner.username, - repo=self.name, filepath=content.path) - data = {"ref": "HEAD" if commit is None else commit.sha} - if content.type == Content.FILE: - return self.gitea.requests_get(url, data)["content"] - else: - return [Content.parse_response(self.gitea, f) for f in self.gitea.requests_get(url, data)] - - def delete(self): - self.gitea.requests_delete( - Repository.REPO_DELETE % (self.owner.username, self.name) - ) - self.deleted = True - -class Milestone(ApiObject): - GET_API_OBJECT = ( - """/repos/{owner}/{repo}/milestones/{number}""" # - ) - - def __init__(self, gitea): - super(Milestone, self).__init__(gitea) - - def __eq__(self, other): - if not isinstance(other, Milestone): return False - return self.gitea == other.gitea and self.id == other.id - - def __hash__(self): - return hash(self.gitea) ^ hash(self.id) - - fields_to_parsers = { - "closed_at": lambda gitea, t: Util.convert_time(t), - "due_on": lambda gitea, t: Util.convert_time(t), - } - - patchable_fields = { - "allow_merge_commits", - "allow_rebase", - "allow_rebase_explicit", - "allow_squash_merge", - "archived", - "default_branch", - "description", - "has_issues", - "has_pull_requests", - "has_wiki", - "ignore_whitespace_conflicts", - "name", - "private", - "website", - } - - @classmethod - def request(cls, gitea, owner, repo, number): - return cls._request(gitea, {"owner": owner, "repo": repo, "number": number}) - - -class Comment(ApiObject): - PATCH_API_OBJECT = "/repos/{owner}/{repo}/issues/comments/{id}" - - def __init__(self, gitea): - super(Comment, self).__init__(gitea) - - def __eq__(self, other): - if not isinstance(other, Comment): return False - return self.repo == other.repo and self.id == other.id - - def __hash__(self): - return hash(self.repo) ^ hash(self.id) - - fields_to_parsers = { - "user": lambda gitea, r: User.parse_response(gitea, r), - "created_at": lambda gitea, t: Util.convert_time(t), - "updated_at": lambda gitea, t: Util.convert_time(t), - } - - patchable_fields = {"body"} - - -class Commit(ReadonlyApiObject): - def __init__(self, gitea): - super(Commit, self).__init__(gitea) - - fields_to_parsers = { - # NOTE: api may return None for commiters that are no gitea users - "author": lambda gitea, u: User.parse_response(gitea, u) if u else None - } - - def __eq__(self, other): - if not isinstance(other, Commit): return False - return self.sha == other.sha - - def __hash__(self): - return hash(self.sha) - - @classmethod - def request(cls, gitea, owner, repo): - api_object = cls._request(gitea, {"owner": owner, "repo": repo}) - return api_object - - @classmethod - def parse_response(cls, gitea, result): - commit_cache = result["commit"] - api_object = cls(gitea) - cls._initialize(gitea, api_object, result) - Commit._add_read_property("inner_commit", commit_cache, api_object) - return api_object - - -class Issue(ApiObject): - GET_API_OBJECT = """/repos/{owner}/{repo}/issues/{number}""" # - GET_TIME = """/repos/%s/%s/issues/%s/times""" # - GET_COMMENTS = """/repos/%s/%s/issues/comments""" - CREATE_ISSUE = """/repos/{owner}/{repo}/issues""" - - OPENED = "open" - CLOSED = "closed" - - def __init__(self, gitea): - super(Issue, self).__init__(gitea) - - def __eq__(self, other): - if not isinstance(other, Issue): return False - return self.repo == other.repo and self.id == other.id - - def __hash__(self): - return hash(self.repo) ^ hash(self.id) - - fields_to_parsers = { - "milestone": lambda gitea, m: Milestone.parse_response(gitea, m), - "user": lambda gitea, u: User.parse_response(gitea, u), - "assignee": lambda gitea, u: User.parse_response(gitea, u), - "assignees": lambda gitea, us: [User.parse_response(gitea, u) for u in us], - "state": lambda gitea, s: Issue.CLOSED if s == "closed" else Issue.OPENED, - } - - patchable_fields = { - "assignee", - "assignees", - "body", - "due_date", - "milestone", - "state", - "title", - } - - @classmethod - def request(cls, gitea, owner, repo, number): - api_object = cls._request( - gitea, {"owner": owner, "repo": repo, "number": number} - ) - return api_object - - @classmethod - def create_issue(cls, gitea, repo: Repository, title: str, body: str = ""): - args = {"owner": repo.owner.username, "repo": repo.name} - data = {"title": title, "body": body} - result = gitea.requests_post(Issue.CREATE_ISSUE.format(**args), data=data) - return Issue.parse_response(gitea, result) - - def get_time_sum(self, user: User) -> int: - results = self.gitea.requests_get( - Issue.GET_TIME % (self.owner.username, self.repo, self.number) - ) - return sum( - result["time"] - for result in results - if result and result["user_id"] == user.id - ) - - def get_times(self) -> Optional[Dict]: - return self.gitea.requests_get( - Issue.GET_TIME % (self.owner.username, self.repo, self.number) - ) - - def delete_time(self, time_id: str): - path = f"/repos/{self.owner.username}/{self.repo}/issues/{self.number}/times/{time_id}" - self.gitea.requests_delete(path) - - def add_time(self, time: int, created: str = None, user_name: str = None): - path = f"/repos/{self.owner.username}/{self.repo}/issues/{self.number}/times" - self.gitea.requests_post( - path, data={"created": created, "time": int(time), "user_name": user_name} - ) - - def get_comments(self) -> List[ApiObject]: - results = self.gitea.requests_get( - Issue.GET_COMMENTS % (self.owner.username, self.repo) - ) - allProjectComments = [ - Comment.parse_response(self.gitea, result) for result in results - ] - # Comparing the issue id with the URL seems to be the only (!) way to get to the comments of one issue - return [ - comment - for comment in allProjectComments - if comment.issue_url.endswith("/" + str(self.number)) - ] - - -class Team(ApiObject): - GET_API_OBJECT = """/teams/{id}""" # - ADD_USER = """/teams/%s/members/%s""" # - ADD_REPO = """/teams/%s/repos/%s/%s""" # - TEAM_DELETE = """/teams/%s""" # - GET_MEMBERS = """/teams/%s/members""" # - GET_REPOS = """/teams/%s/repos""" # - - def __init__(self, gitea): - super(Team, self).__init__(gitea) - - def __eq__(self, other): - if not isinstance(other, Team): return False - return self.organization == other.organization and self.id == other.id - - def __hash__(self): - return hash(self.organization) ^ hash(self.id) - - fields_to_parsers = { - "organization": lambda gitea, o: Organization.parse_response(gitea, o) - } - - @classmethod - def request(cls, gitea, organization, team): - return cls._request(gitea, {"id": id}) - - patchable_fields = {"description", "name", "permission", "units"} - - def add_user(self, user: User): - self.gitea.requests_put(Team.ADD_USER % (self.id, user.login)) - - def add_repo(self, org: Organization, repo: Repository): - self.gitea.requests_put(Team.ADD_REPO % (self.id, org, repo.name)) - - def get_members(self): - """ Get all users assigned to the team. """ - results = self.gitea.requests_get(Team.GET_MEMBERS % self.id) - return [User.parse_response(self.gitea, result) for result in results] - - def get_repos(self): - """ Get all repos of this Team.""" - results = self.gitea.requests_get(Team.GET_REPOS % self.id) - return [Repository.parse_response(self.gitea, result) for result in results] - - def delete(self): - self.gitea.requests_delete(Team.TEAM_DELETE % self.id) - self.deleted = True - - def remove_team_member(self, user_name: str): - url = f"/teams/{self.id}/members/{user_name}" - self.gitea.requests_delete(url) - -class Content(ReadonlyApiObject): - GET_API_OBJECT = """/repos/{owner}/{repo}/contents/{filepath}""" - - FILE = "file" - - def __init__(self, gitea): - super(Content, self).__init__(gitea) - - def __eq__(self, other): - if not isinstance(other, Team): return False - return self.repo == self.repo and self.sha == other.sha and self.name == other.name - - def __hash__(self): - return hash(self.repo) ^ hash(self.sha) ^ hash(self.name) - - -class Util: - @staticmethod - def convert_time(time: str) -> datetime: - """ Parsing of strange Gitea time format ("%Y-%m-%dT%H:%M:%S:%z" but with ":" in time zone notation)""" - try: - return datetime.strptime(time[:-3] + "00", "%Y-%m-%dT%H:%M:%S%z") - except ValueError: - return datetime.strptime(time[:-3] + "00", "%Y-%m-%dT%H:%M:%S") - +from .exceptions import NotFoundException, ConflictException, AlreadyExistsException +from .apiobject import User, Organization, Repository, Team class Gitea: """ Object to establish a session with Gitea. """ - ADMIN_CREATE_USER = """/admin/users""" GET_USERS_ADMIN = """/admin/users""" ADMIN_REPO_CREATE = """/admin/users/%s/repos""" # diff --git a/setup.py b/setup.py index 2d2aedb..7978a90 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,7 @@ with open('README.md') as readme_file: setup_args = dict( name='py-gitea', - version='0.1.8', + version='0.1.9', description='A python wrapper for the Gitea API', long_description_content_type="text/markdown", long_description=README, diff --git a/tests/test_api.py b/tests/test_api.py index 4030386..184d6c6 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -282,7 +282,10 @@ def test_delete_org(instance): def test_delete_user(instance): - user = User.request(instance, test_user) + user_name = test_user + "delte_test" + email = user_name + "@example.org" + user = instance.create_user(user_name, email, "abcdefg1.23AB", send_notify=False) + assert user.username == user_name user.delete() with pytest.raises(NotFoundException) as e: - User.request(instance, test_user) + User.request(instance, user_name)