kopia lustrzana https://github.com/halcy/Mastodon.py
Add OAuth authorization server info retrieval and usage if available
rodzic
1af3e8d34d
commit
cb48c95930
|
@ -10,6 +10,9 @@ v2.1.0 (IN PROGRESS)
|
|||
* Improved docs for stream_healthy (thanks @codl)
|
||||
* Add offset parameter to trending_tags and trending_links (Thanks @ghost)
|
||||
* Added support for retrieving API version and a warning for if it is not present despite the mastodon version suggesting it should be.
|
||||
* Added support for retrieving OAuth authorization server info
|
||||
* Added check for supported password grant type based on authorization server info (Thanks @thisismissem for the suggestion)
|
||||
* Added support for alternate OAuth URLs based on the authorization server info
|
||||
|
||||
v2.0.1
|
||||
------
|
||||
|
|
|
@ -7,14 +7,14 @@ import os
|
|||
import time
|
||||
import collections
|
||||
|
||||
from mastodon.errors import MastodonIllegalArgumentError, MastodonNetworkError, MastodonVersionError, MastodonAPIError
|
||||
from mastodon.errors import MastodonIllegalArgumentError, MastodonNetworkError, MastodonVersionError, MastodonAPIError, MastodonNotFoundError
|
||||
from mastodon.defaults import _DEFAULT_SCOPES, _SCOPE_SETS, _DEFAULT_TIMEOUT, _DEFAULT_USER_AGENT
|
||||
from mastodon.utility import parse_version_string, api_version
|
||||
|
||||
from mastodon.internals import Mastodon as Internals
|
||||
from mastodon.utility import Mastodon as Utility
|
||||
from typing import List, Optional, Union, Tuple
|
||||
from mastodon.return_types import Application
|
||||
from mastodon.return_types import Application, AttribAccessDict
|
||||
from mastodon.compat import PurePath
|
||||
|
||||
class Mastodon(Internals):
|
||||
|
@ -277,11 +277,10 @@ class Mastodon(Internals):
|
|||
self.__version_check_worked = None
|
||||
self.__version_check_tried = False
|
||||
self.__streaming_base = None
|
||||
self.__oauth_grant_info = None
|
||||
|
||||
def auth_request_url(self, client_id: Optional[Union[str, PurePath]] = None, redirect_uris: str = "urn:ietf:wg:oauth:2.0:oob",
|
||||
scopes: List[str] =_DEFAULT_SCOPES, force_login: bool = False, state: Optional[str] = None,
|
||||
lang: Optional[str] = None) -> str:
|
||||
lang: Optional[str] = None, skip_server_info = False, allow_http: bool = False) -> str:
|
||||
"""
|
||||
Returns the URL that a client needs to request an OAuth grant from the server.
|
||||
|
||||
|
@ -303,6 +302,9 @@ class Mastodon(Internals):
|
|||
|
||||
Pass an ISO 639-1 (two letter) or, for languages that do not have one, 639-3 (three letter)
|
||||
language code as `lang` to control the display language for the oauth form.
|
||||
|
||||
Pass `skip_server_info` to skip retrieving the OAuth authorization server info, in case you want to
|
||||
avoid the extra network request and are confident that the oauth server is at the default location.
|
||||
"""
|
||||
assert self.api_base_url is not None
|
||||
if client_id is None:
|
||||
|
@ -321,17 +323,43 @@ class Mastodon(Internals):
|
|||
params['state'] = state
|
||||
params['lang'] = lang
|
||||
formatted_params = urlencode(params)
|
||||
return "".join([self.api_base_url, "/oauth/authorize?", formatted_params])
|
||||
|
||||
# If we don't know better, assume the OAuth endpoint is at /oauth/authorize
|
||||
oauth_url = "".join([self.api_base_url, "/oauth/authorize?", formatted_params])
|
||||
|
||||
# Let's see if we *do* know better
|
||||
if not skip_server_info:
|
||||
oauth_info = self.oauth_authorization_server_info()
|
||||
if "authorization_endpoint" in oauth_info:
|
||||
Mastodon.__oauth_url_check(oauth_info["authorization_endpoint"], allow_http=allow_http)
|
||||
oauth_url = oauth_info["authorization_endpoint"] + "?" + formatted_params
|
||||
return oauth_url
|
||||
|
||||
def oauth_authorization_server_info(self) -> AttribAccessDict: # TODO real type for this
|
||||
"""
|
||||
Returns the OAuth authorization server information, including the supported grant types.
|
||||
This is useful to determine which authentication methods are available on the server, supported scopes,
|
||||
URLs to make various OAuth requests, to, etc. Mastodon only supports this after version 4.3.0, and alternative
|
||||
implementations may or may not support it, so if aiming for maximum compatibility, you should likely assume
|
||||
it is not present.
|
||||
|
||||
Returns an empty dictionary if unsupported by the server.
|
||||
"""
|
||||
assert self.api_base_url is not None
|
||||
try:
|
||||
response = self.__api_request('GET', '/.well-known/oauth-authorization-server', do_ratelimiting=False)
|
||||
except MastodonNotFoundError:
|
||||
response = AttribAccessDict()
|
||||
return response
|
||||
|
||||
def log_in(self, username: Optional[str] = None, password: Optional[str] = None, code: Optional[str] = None,
|
||||
redirect_uri: str = "urn:ietf:wg:oauth:2.0:oob", refresh_token: Optional[str] = None, scopes: List[str] = _DEFAULT_SCOPES,
|
||||
to_file: Optional[Union[str, PurePath]] = None) -> str:
|
||||
to_file: Optional[Union[str, PurePath]] = None, allow_http: bool = False) -> str:
|
||||
"""
|
||||
Get the access token for a user, either via OAuth code flow, or (deprecated) password flow.
|
||||
|
||||
Will throw a `MastodonIllegalArgumentError` if the OAuth flow data or the
|
||||
username / password credentials given are incorrect, and
|
||||
`MastodonAPIError` if all of the requested scopes were not granted.
|
||||
Will throw a `MastodonIllegalArgumentError` if the OAuth flow data is incorrect, and `MastodonAPIError` if all
|
||||
of the requested scopes were not granted.
|
||||
|
||||
For OAuth2, obtain a code via having your user go to the URL returned by
|
||||
:ref:`auth_request_url() <auth_request_url()>` and pass it as the code parameter. In this case,
|
||||
|
@ -339,26 +367,29 @@ class Mastodon(Internals):
|
|||
generating the auth request URL. If passing `code`you should not pass `username` or `password`.
|
||||
|
||||
When using the password flow, the username is the email address used to log in into Mastodon.
|
||||
Note that Mastodon has removed this flow starting with 4.4.0, so it is unfortunately not
|
||||
**Note that Mastodon has removed this flow starting with 4.4.0, so it is unfortunately not
|
||||
possible to log in in this way anymore. Please use either the code flow, or generate
|
||||
a token from the web UI.
|
||||
a token from the web UI.**
|
||||
|
||||
Can persist access token to file `to_file`, to be used in the constructor.
|
||||
Can persist access token to file `to_file`, to be used in the constructor. Pass `allow_http` to allow
|
||||
HTTP URLs for the OAuth server, which is recommended only for testing.
|
||||
|
||||
Returns the access token as a string.
|
||||
"""
|
||||
# This isn't called often, so no real need to cache
|
||||
oauth_info = self.oauth_authorization_server_info()
|
||||
|
||||
|
||||
# Is the version > 4.4.0? Throw on trying to log in with password with a more informative message than the API error
|
||||
# This is left in here even though we check for available grant types above because that way
|
||||
# we can give a more informative error message to the user ("not supported after version 4.4.0") instead of the
|
||||
# generic one.
|
||||
if self.mastodon_major >= 4 and self.mastodon_minor >= 4 or self.mastodon_major > 4:
|
||||
if password is not None:
|
||||
raise MastodonIllegalArgumentError('Password flow is no longer supported in Mastodon 4.4.0 and later.')
|
||||
|
||||
#
|
||||
|
||||
# Trying to use password flow?
|
||||
if password is not None:
|
||||
# but it is not supported?
|
||||
if "grant_types_supported" in oauth_info:
|
||||
if "password" not in oauth_info["grant_types_supported"]:
|
||||
if self.verify_minimum_version("4.4.0"):
|
||||
# Give more useful error message if we know the version
|
||||
raise MastodonIllegalArgumentError('Password flow is no longer supported in Mastodon 4.4.0 and later. Please use the code flow instead.')
|
||||
else:
|
||||
raise MastodonIllegalArgumentError('Password flow is not supported by this instance. Please use the code flow instead.')
|
||||
|
||||
if username is not None and password is not None:
|
||||
params = self.__generate_params(locals(), ['scopes', 'to_file', 'code', 'refresh_token'])
|
||||
params['grant_type'] = 'password'
|
||||
|
@ -376,7 +407,15 @@ class Mastodon(Internals):
|
|||
params['scope'] = " ".join(scopes)
|
||||
|
||||
try:
|
||||
response = self.__api_request('POST', '/oauth/token', params, do_ratelimiting = False, override_type = dict)
|
||||
# If we don't know any better, assume the OAuth endpoint is at /oauth/token
|
||||
oauth_url = "".join([self.api_base_url, "/oauth/token"])
|
||||
|
||||
# Let's see if we *do* know better
|
||||
if "token_endpoint" in oauth_info:
|
||||
oauth_url = oauth_info["token_endpoint"]
|
||||
Mastodon.__oauth_url_check(oauth_url, allow_http=allow_http)
|
||||
|
||||
response = self.__api_request('POST', oauth_url, params, do_ratelimiting = False, override_type = dict, base_url_override="")
|
||||
self.access_token = response['access_token']
|
||||
self.__set_refresh_token(response.get('refresh_token'))
|
||||
self.__set_token_expired(int(response.get('expires_in', 0)))
|
||||
|
@ -423,7 +462,7 @@ class Mastodon(Internals):
|
|||
raise MastodonIllegalArgumentError("Client authentication (id + secret) is required to persist tokens.")
|
||||
return self.access_token + "\n" + self.api_base_url + "\n" + self.client_id + "\n" + self.client_secret + "\n"
|
||||
|
||||
def revoke_access_token(self):
|
||||
def revoke_access_token(self, allow_http: bool = False):
|
||||
"""
|
||||
Revoke the oauth token the user is currently authenticated with, effectively removing
|
||||
the apps access and requiring the user to log in again.
|
||||
|
@ -436,7 +475,17 @@ class Mastodon(Internals):
|
|||
params['client_id'] = self.client_id
|
||||
params['client_secret'] = self.client_secret
|
||||
params['token'] = self.access_token
|
||||
self.__api_request('POST', '/oauth/revoke', params)
|
||||
|
||||
# If we don't know any better, assume the OAuth endpoint is at /oauth/revoke
|
||||
oauth_url = "".join([self.api_base_url, "/oauth/revoke"])
|
||||
|
||||
# Let's see if we *do* know better
|
||||
oauth_info = self.oauth_authorization_server_info()
|
||||
if "revocation_endpoint" in oauth_info:
|
||||
oauth_url = Mastodon.__protocolize(oauth_info["revocation_endpoint"])
|
||||
Mastodon.__oauth_url_check(oauth_url, allow_http=allow_http)
|
||||
|
||||
self.__api_request('POST', oauth_url, params, do_ratelimiting=False, override_type=dict, base_url_override="")
|
||||
|
||||
# We are now logged out, clear token and logged in id
|
||||
self.access_token = None
|
||||
|
|
|
@ -644,6 +644,31 @@ class Mastodon():
|
|||
base_url = base_url.rstrip("/")
|
||||
return base_url
|
||||
|
||||
@staticmethod
|
||||
def __oauth_url_check(oauth_url, allow_http=False):
|
||||
"""Internal helper to check and normalize OAuth URLs"""
|
||||
if "?" in oauth_url:
|
||||
# Throw an error, we do not support OAuth URLs with query parameters, even if this is in theory a
|
||||
# valid thing to have for most endpoints.
|
||||
raise MastodonIllegalArgumentError("OAuth URLs with query parameters are not supported by Mastodon.py.")
|
||||
|
||||
if "#" in oauth_url:
|
||||
# A fragment is just straight up not allowed by the spec.
|
||||
raise MastodonIllegalArgumentError("OAuth URLs with fragments are not permitted.")
|
||||
|
||||
if "@" in oauth_url:
|
||||
# Username/password is RIGHT OUT.
|
||||
raise MastodonIllegalArgumentError("OAuth URLs with username/password are not permitted.")
|
||||
|
||||
# OAuth URLs *must* include the scheme, and the scheme *must* be https.
|
||||
# We allow http if a flag is set because testing requires it.
|
||||
if not oauth_url.startswith("https://"):
|
||||
if allow_http:
|
||||
if not oauth_url.startswith("http://"):
|
||||
raise MastodonIllegalArgumentError("OAuth URLs must use with http or https.")
|
||||
else:
|
||||
raise MastodonIllegalArgumentError("OAuth URLs must use with https.")
|
||||
|
||||
@staticmethod
|
||||
def __deprotocolize(base_url):
|
||||
"""Internal helper to strip http and https from a URL"""
|
||||
|
|
|
@ -48,7 +48,7 @@ class Mastodon(Internals):
|
|||
if "mastodon" in self.__instance()["api_versions"]:
|
||||
self.mastodon_api_version = int(self.__instance()["api_versions"]["mastodon"])
|
||||
found_api_version = True
|
||||
if not found_api_version and self.mastodon_major >= 4 and self.mastodon_minor >= 3:
|
||||
if not found_api_version and self.verify_minimum_version("4.3.0", cached=True):
|
||||
warnings.warn("Mastodon version is detected as >= 4.3.0, but no API version found. Please report this.")
|
||||
|
||||
self.__version_check_tried = True
|
||||
|
|
Ładowanie…
Reference in New Issue