Porównaj commity

...

18 Commity

Autor SHA1 Wiadomość Data
halcy b4ede028a7 Disable 3.6 test (missing crypto module) 2022-12-01 00:18:12 +02:00
halcy 160e4c8173 Add pytz to test deps to allow tests to not fail on lower python versions 2022-12-01 00:14:57 +02:00
halcy a4b2b180d3 More moving functions out 2022-12-01 00:11:17 +02:00
halcy d9cd7547fd Move some more methods, fix crypto heckup 2022-11-30 23:47:46 +02:00
halcy 48f1d31c72 carefully move some things into files, test if readthedocs still builds 2022-11-30 23:09:09 +02:00
halcy 99432f538e Merge branch 'eumiro-modern-datetime' 2022-11-30 22:43:50 +02:00
halcy 2d7f495b0f Merge branch 'modern-datetime' of https://github.com/eumiro/Mastodon.py into eumiro-modern-datetime 2022-11-30 22:43:03 +02:00
halcy 262d150c05 Start moving functions out of main module 2022-11-30 22:31:54 +02:00
Miroslav Šedivý bf428f58ef refactor: simplify __datetime_to_epoch 2022-11-30 20:44:13 +01:00
Miroslav Šedivý 5262d58a0b replace pytz with datetime.timezone and zoneinfo 2022-11-30 20:21:04 +01:00
halcy 53cb42117b Switch back to previous version parser 2022-11-30 19:33:09 +02:00
halcy 2453438353 reapply accidentally reverted changes 2022-11-30 19:28:42 +02:00
halcy fa91b618c3 revert what I did to version parsing. Why did this break? 2022-11-30 19:16:33 +02:00
halcy c1b7a7e4e5 Fix cross page links in docs 2022-11-30 19:04:26 +02:00
halcy 98d9dfa357 Merge branch 'master' of github.com:halcy/Mastodon.py 2022-11-30 18:02:26 +02:00
halcy a65cbdd513 ref test 2022-11-30 18:02:20 +02:00
Lorenz Diener 77e77fa9e8
Merge pull request #285 from eumiro/cmp_versions
refactor: improve version comparisons
2022-11-30 17:59:37 +02:00
Miroslav Šedivý 7ffcfb2a5e refactor: improve version comparisons 2022-11-29 18:43:57 +01:00
18 zmienionych plików z 2188 dodań i 2103 usunięć

Wyświetl plik

@ -1,5 +1,18 @@
version: 2.1
jobs:
run-tests-36:
docker:
- image: cimg/python:3.6
steps:
- checkout
- run:
name: "Install test deps"
command: "pip install .[test]"
- run:
name: "Run tests"
command: "python setup.py pytest --addopts '--junitxml=tests/result.xml'"
- store_test_results:
path: tests
run-tests-37:
docker:
- image: cimg/python:3.7
@ -8,9 +21,6 @@ jobs:
- run:
name: "Install test deps"
command: "pip install .[test]"
- run:
name: "Install codecov"
command: "pip install codecov"
- run:
name: "Run tests"
command: "python setup.py pytest --addopts '--junitxml=tests/result.xml'"
@ -42,10 +52,7 @@ jobs:
- checkout
- run:
name: "Install test deps"
command: "pip install .[test]"
- run:
name: "Install codecov"
command: "pip install codecov"
command: "pip install .[test]"
- run:
name: "Run tests"
command: "python setup.py pytest --addopts '--junitxml=tests/result.xml'"
@ -58,10 +65,7 @@ jobs:
- checkout
- run:
name: "Install test deps"
command: "pip install .[test]"
- run:
name: "Install codecov"
command: "pip install codecov"
command: "pip install .[test]"
- run:
name: "Run tests"
command: "python setup.py pytest --addopts '--junitxml=tests/result.xml'"
@ -70,6 +74,7 @@ jobs:
workflows:
run-tests-workflow:
jobs:
#- run-tests-36 # 3.6 commented out - SHOULD still work, if you can build cryptography, or don't use it
- run-tests-37
- run-tests-38-cov
- run-tests-39

Wyświetl plik

@ -95,7 +95,8 @@ manually (or persist objects, not just dicts).
There are convenience functions available for fetching the previous and next page of
a paginated request as well as for fetching all pages starting from a first page.
For details, see `fetch_next()`_, `fetch_previous()`_. and `fetch_remaining()`_.
For details, see :ref:`fetch_next() <fetch_next()>`, :ref:`fetch_previous() <fetch_previous()>`.
and :ref:`fetch_remaining() <fetch_remaining()>`.
IDs and unpacking
-----------------

Wyświetl plik

@ -50,7 +50,7 @@ Writing
Scheduled statuses
------------------
These functions allow you to get information about scheduled statuses and to update scheduled statuses that already exist.
To create new scheduled statuses, use `status_post()`_ with the `scheduled_at` parameter.
To create new scheduled statuses, use :ref:`status_post() <status_post()>` with the `scheduled_at` parameter.
Reading
~~~~~~~

Plik diff jest za duży Load Diff

Wyświetl plik

@ -0,0 +1,107 @@
from .defaults import _DEFAULT_SCOPES, _SCOPE_SETS
from .error import MastodonIllegalArgumentError, MastodonAPIError
from .utility import api_version
from .internals import Mastodon as Internals
class Mastodon(Internals):
@api_version("2.7.0", "2.7.0", "3.4.0")
def create_account(self, username, password, email, agreement=False, reason=None, locale="en", scopes=_DEFAULT_SCOPES, to_file=None, return_detailed_error=False):
"""
Creates a new user account with the given username, password and email. "agreement"
must be set to true (after showing the user the instance's user agreement and having
them agree to it), "locale" specifies the language for the confirmation email as an
ISO 639-1 (two letter) or, if a language does not have one, 639-3 (three letter) language
code. `reason` can be used to specify why a user would like to join if approved-registrations
mode is on.
Does not require an access token, but does require a client grant.
By default, this method is rate-limited by IP to 5 requests per 30 minutes.
Returns an access token (just like log_in), which it can also persist to to_file,
and sets it internally so that the user is now logged in. Note that this token
can only be used after the user has confirmed their email.
By default, the function will throw if the account could not be created. Alternately,
when `return_detailed_error` is passed, Mastodon.py will return the detailed error
response that the API provides (Starting from version 3.4.0 - not checked here) as an dict with
error details as the second return value and the token returned as `None` in case of error.
The dict will contain a text `error` values as well as a `details` value which is a dict with
one optional key for each potential field (`username`, `password`, `email` and `agreement`),
each if present containing a dict with an `error` category and free text `description`.
Valid error categories are:
* ERR_BLOCKED - When e-mail provider is not allowed
* ERR_UNREACHABLE - When e-mail address does not resolve to any IP via DNS (MX, A, AAAA)
* ERR_TAKEN - When username or e-mail are already taken
* ERR_RESERVED - When a username is reserved, e.g. "webmaster" or "admin"
* ERR_ACCEPTED - When agreement has not been accepted
* ERR_BLANK - When a required attribute is blank
* ERR_INVALID - When an attribute is malformed, e.g. wrong characters or invalid e-mail address
* ERR_TOO_LONG - When an attribute is over the character limit
* ERR_TOO_SHORT - When an attribute is under the character requirement
* ERR_INCLUSION - When an attribute is not one of the allowed values, e.g. unsupported locale
"""
params = self.__generate_params(locals(), ['to_file', 'scopes'])
params['client_id'] = self.client_id
params['client_secret'] = self.client_secret
if not agreement:
del params['agreement']
# Step 1: Get a user-free token via oauth
try:
oauth_params = {}
oauth_params['scope'] = " ".join(scopes)
oauth_params['client_id'] = self.client_id
oauth_params['client_secret'] = self.client_secret
oauth_params['grant_type'] = 'client_credentials'
response = self.__api_request('POST', '/oauth/token', oauth_params, do_ratelimiting=False)
temp_access_token = response['access_token']
except Exception as e:
raise MastodonIllegalArgumentError('Invalid request during oauth phase: %s' % e)
# Step 2: Use that to create a user
try:
response = self.__api_request('POST', '/api/v1/accounts', params, do_ratelimiting=False, access_token_override=temp_access_token, skip_error_check=True)
if "error" in response:
if return_detailed_error:
return None, response
raise MastodonIllegalArgumentError('Invalid request: %s' % e)
self.access_token = response['access_token']
self.__set_refresh_token(response.get('refresh_token'))
self.__set_token_expired(int(response.get('expires_in', 0)))
except Exception as e:
raise MastodonIllegalArgumentError('Invalid request')
# Step 3: Check scopes, persist, et cetera
received_scopes = response["scope"].split(" ")
for scope_set in _SCOPE_SETS.keys():
if scope_set in received_scopes:
received_scopes += _SCOPE_SETS[scope_set]
if not set(scopes) <= set(received_scopes):
raise MastodonAPIError('Granted scopes "' + " ".join(received_scopes) + '" do not contain all of the requested scopes "' + " ".join(scopes) + '".')
if to_file is not None:
with open(to_file, 'w') as token_file:
token_file.write(response['access_token'] + "\n")
token_file.write(self.api_base_url + "\n")
self.__logged_in_id = None
if return_detailed_error:
return response['access_token'], {}
else:
return response['access_token']
@api_version("3.4.0", "3.4.0", "3.4.0")
def email_resend_confirmation(self):
"""
Requests a re-send of the users confirmation mail for an unconfirmed logged in user.
Only available to the app that the user originally signed up with.
"""
self.__api_request('POST', '/api/v1/emails/confirmations')

Wyświetl plik

@ -0,0 +1,370 @@
import requests
from requests.models import urlencode
import datetime
import os
import time
import collections
from .error import MastodonIllegalArgumentError, MastodonNetworkError, MastodonVersionError, MastodonAPIError
from .defaults import _DEFAULT_SCOPES, _SCOPE_SETS, _DEFAULT_TIMEOUT
from .utility import parse_version_string
from .internals import Mastodon as Internals
class Mastodon(Internals):
###
# Registering apps
###
@staticmethod
def create_app(client_name, scopes=_DEFAULT_SCOPES, redirect_uris=None, website=None, to_file=None,
api_base_url=None, request_timeout=_DEFAULT_TIMEOUT, session=None):
"""
Create a new app with given `client_name` and `scopes` (The basic scopes are "read", "write", "follow" and "push"
- more granular scopes are available, please refer to Mastodon documentation for which) on the instance given
by `api_base_url`.
Specify `redirect_uris` if you want users to be redirected to a certain page after authenticating in an OAuth flow.
You can specify multiple URLs by passing a list. Note that if you wish to use OAuth authentication with redirects,
the redirect URI must be one of the URLs specified here.
Specify `to_file` to persist your app's info to a file so you can use it in the constructor.
Specify `website` to give a website for your app.
Specify `session` with a requests.Session for it to be used instead of the default. This can be
used to, amongst other things, adjust proxy or SSL certificate settings.
Presently, app registration is open by default, but this is not guaranteed to be the case for all
Mastodon instances in the future.
Returns `client_id` and `client_secret`, both as strings.
"""
if api_base_url is None:
raise MastodonIllegalArgumentError("API base URL is required.")
api_base_url = Mastodon.__protocolize(api_base_url)
request_data = {
'client_name': client_name,
'scopes': " ".join(scopes)
}
try:
if redirect_uris is not None:
if isinstance(redirect_uris, (list, tuple)):
redirect_uris = "\n".join(list(redirect_uris))
request_data['redirect_uris'] = redirect_uris
else:
request_data['redirect_uris'] = 'urn:ietf:wg:oauth:2.0:oob'
if website is not None:
request_data['website'] = website
if session:
ret = session.post(api_base_url + '/api/v1/apps', data=request_data, timeout=request_timeout)
response = ret.json()
else:
response = requests.post(api_base_url + '/api/v1/apps', data=request_data, timeout=request_timeout)
response = response.json()
except Exception as e:
raise MastodonNetworkError("Could not complete request: %s" % e)
if to_file is not None:
with open(to_file, 'w') as secret_file:
secret_file.write(response['client_id'] + "\n")
secret_file.write(response['client_secret'] + "\n")
secret_file.write(api_base_url + "\n")
secret_file.write(client_name + "\n")
return (response['client_id'], response['client_secret'])
###
# Authentication, including constructor
###
def __init__(self, client_id=None, client_secret=None, access_token=None, api_base_url=None, debug_requests=False,
ratelimit_method="wait", ratelimit_pacefactor=1.1, request_timeout=_DEFAULT_TIMEOUT, mastodon_version=None,
version_check_mode="created", session=None, feature_set="mainline", user_agent="mastodonpy", lang=None):
"""
Create a new API wrapper instance based on the given `client_secret` and `client_id` on the
instance given by `api_base_url`. If you give a `client_id` and it is not a file, you must
also give a secret. If you specify an `access_token` then you don't need to specify a `client_id`.
It is allowed to specify neither - in this case, you will be restricted to only using endpoints
that do not require authentication. If a file is given as `client_id`, client ID, secret and
base url are read from that file.
You can also specify an `access_token`, directly or as a file (as written by :ref:`log_in() <log_in()>`). If
a file is given, Mastodon.py also tries to load the base URL from this file, if present. A
client id and secret are not required in this case.
Mastodon.py can try to respect rate limits in several ways, controlled by `ratelimit_method`.
"throw" makes functions throw a `MastodonRatelimitError` when the rate
limit is hit. "wait" mode will, once the limit is hit, wait and retry the request as soon
as the rate limit resets, until it succeeds. "pace" works like throw, but tries to wait in
between calls so that the limit is generally not hit (how hard it tries to avoid hitting the rate
limit can be controlled by ratelimit_pacefactor). The default setting is "wait". Note that
even in "wait" and "pace" mode, requests can still fail due to network or other problems! Also
note that "pace" and "wait" are NOT thread safe.
By default, a timeout of 300 seconds is used for all requests. If you wish to change this,
pass the desired timeout (in seconds) as `request_timeout`.
For fine-tuned control over the requests object use `session` with a requests.Session.
The `mastodon_version` parameter can be used to specify the version of Mastodon that Mastodon.py will
expect to be installed on the server. The function will throw an error if an unparseable
Version is specified. If no version is specified, Mastodon.py will set `mastodon_version` to the
detected version.
The version check mode can be set to "created" (the default behaviour), "changed" or "none". If set to
"created", Mastodon.py will throw an error if the version of Mastodon it is connected to is too old
to have an endpoint. If it is set to "changed", it will throw an error if the endpoint's behaviour has
changed after the version of Mastodon that is connected has been released. If it is set to "none",
version checking is disabled.
`feature_set` can be used to enable behaviour specific to non-mainline Mastodon API implementations.
Details are documented in the functions that provide such functionality. Currently supported feature
sets are `mainline`, `fedibird` and `pleroma`.
For some Mastodon instances a `User-Agent` header is needed. This can be set by parameter `user_agent`. Starting from
Mastodon.py 1.5.2 `create_app()` stores the application name into the client secret file. If `client_id` points to this file,
the app name will be used as `User-Agent` header as default. It is possible to modify old secret files and append
a client app name to use it as a `User-Agent` name.
`lang` can be used to change the locale Mastodon will use to generate responses. Valid parameters are all ISO 639-1 (two letter)
or for a language that has none, 639-3 (three letter) language codes. This affects some error messages (those related to validation) and
trends. You can change the language using :ref:`set_language()`.
If no other `User-Agent` is specified, "mastodonpy" will be used.
"""
self.api_base_url = api_base_url
if self.api_base_url is not None:
self.api_base_url = self.__protocolize(self.api_base_url)
self.client_id = client_id
self.client_secret = client_secret
self.access_token = access_token
self.debug_requests = debug_requests
self.ratelimit_method = ratelimit_method
self._token_expired = datetime.datetime.now()
self._refresh_token = None
self.__logged_in_id = None
self.ratelimit_limit = 300
self.ratelimit_reset = time.time()
self.ratelimit_remaining = 300
self.ratelimit_lastcall = time.time()
self.ratelimit_pacefactor = ratelimit_pacefactor
self.request_timeout = request_timeout
if session:
self.session = session
else:
self.session = requests.Session()
self.feature_set = feature_set
if not self.feature_set in ["mainline", "fedibird", "pleroma"]:
raise MastodonIllegalArgumentError('Requested invalid feature set')
# General defined user-agent
self.user_agent = user_agent
# Save language
self.lang = lang
# Token loading
if self.client_id is not None:
if os.path.isfile(self.client_id):
with open(self.client_id, 'r') as secret_file:
self.client_id = secret_file.readline().rstrip()
self.client_secret = secret_file.readline().rstrip()
try_base_url = secret_file.readline().rstrip()
if try_base_url is not None and len(try_base_url) != 0:
try_base_url = Mastodon.__protocolize(try_base_url)
if not (self.api_base_url is None or try_base_url == self.api_base_url):
raise MastodonIllegalArgumentError('Mismatch in base URLs between files and/or specified')
self.api_base_url = try_base_url
# With new registrations we support the 4th line to store a client_name and use it as user-agent
client_name = secret_file.readline()
if client_name and self.user_agent is None:
self.user_agent = client_name.rstrip()
else:
if self.client_secret is None:
raise MastodonIllegalArgumentError('Specified client id directly, but did not supply secret')
if self.access_token is not None and os.path.isfile(self.access_token):
with open(self.access_token, 'r') as token_file:
self.access_token = token_file.readline().rstrip()
# For newer versions, we also store the URL
try_base_url = token_file.readline().rstrip()
if try_base_url is not None and len(try_base_url) != 0:
try_base_url = Mastodon.__protocolize(try_base_url)
if not (self.api_base_url is None or try_base_url == self.api_base_url):
raise MastodonIllegalArgumentError('Mismatch in base URLs between files and/or specified')
self.api_base_url = try_base_url
# For EVEN newer vesions, we ALSO ALSO store the client id and secret so that you don't need to reauth to revoke
if self.client_id is None:
try:
self.client_id = token_file.readline().rstrip()
self.client_secret = token_file.readline().rstrip()
except:
pass
# Verify we have a base URL, protocolize
if self.api_base_url is None:
raise MastodonIllegalArgumentError("API base URL is required.")
self.api_base_url = Mastodon.__protocolize(self.api_base_url)
if not version_check_mode in ["created", "changed", "none"]:
raise MastodonIllegalArgumentError("Invalid version check method.")
self.version_check_mode = version_check_mode
self.mastodon_major = 1
self.mastodon_minor = 0
self.mastodon_patch = 0
self.version_check_worked = None
# Versioning
if mastodon_version is None and self.version_check_mode != 'none':
self.retrieve_mastodon_version()
elif self.version_check_mode != 'none':
try:
self.mastodon_major, self.mastodon_minor, self.mastodon_patch = parse_version_string(mastodon_version)
except:
raise MastodonVersionError("Bad version specified")
# Ratelimiting parameter check
if ratelimit_method not in ["throw", "wait", "pace"]:
raise MastodonIllegalArgumentError("Invalid ratelimit method.")
def auth_request_url(self, client_id=None, redirect_uris="urn:ietf:wg:oauth:2.0:oob", scopes=_DEFAULT_SCOPES, force_login=False, state=None, lang=None):
"""
Returns the URL that a client needs to request an OAuth grant from the server.
To log in with OAuth, send your user to this URL. The user will then log in and
get a code which you can pass to :ref:`log_in() <log_in()>`.
`scopes` are as in :ref:`log_in() <log_in()>`, redirect_uris is where the user should be redirected to
after authentication. Note that `redirect_uris` must be one of the URLs given during
app registration. When using urn:ietf:wg:oauth:2.0:oob, the code is simply displayed,
otherwise it is added to the given URL as the "code" request parameter.
Pass force_login if you want the user to always log in even when already logged
into web Mastodon (i.e. when registering multiple different accounts in an app).
`state` is the oauth `state` parameter to pass to the server. It is strongly suggested
to use a random, nonguessable value (i.e. nothing meaningful and no incrementing ID)
to preserve security guarantees. It can be left out for non-web login flows.
Pass an ISO 639-1 (two letter) or, for languages that do not have one, 639-3 (three letter)
language code as `lang` to control the display language for the oauth form.
"""
if client_id is None:
client_id = self.client_id
else:
if os.path.isfile(client_id):
with open(client_id, 'r') as secret_file:
client_id = secret_file.readline().rstrip()
params = dict()
params['client_id'] = client_id
params['response_type'] = "code"
params['redirect_uri'] = redirect_uris
params['scope'] = " ".join(scopes)
params['force_login'] = force_login
params['state'] = state
params['lang'] = lang
formatted_params = urlencode(params)
return "".join([self.api_base_url, "/oauth/authorize?", formatted_params])
def log_in(self, username=None, password=None, code=None, redirect_uri="urn:ietf:wg:oauth:2.0:oob", refresh_token=None, scopes=_DEFAULT_SCOPES, to_file=None):
"""
Get the access token for a user.
The username is the email address used to log in into Mastodon.
Can persist access token to file `to_file`, to be used in the constructor.
Handles password and OAuth-based authorization.
Will throw a `MastodonIllegalArgumentError` if the OAuth flow data or the
username / password credentials given are incorrect, and
`MastodonAPIError` if all of the requested scopes were not granted.
For OAuth 2, obtain a code via having your user go to the URL returned by
:ref:`auth_request_url() <auth_request_url()>` and pass it as the code parameter. In this case,
make sure to also pass the same redirect_uri parameter as you used when
generating the auth request URL.
Returns the access token as a string.
"""
if username is not None and password is not None:
params = self.__generate_params(locals(), ['scopes', 'to_file', 'code', 'refresh_token'])
params['grant_type'] = 'password'
elif code is not None:
params = self.__generate_params(locals(), ['scopes', 'to_file', 'username', 'password', 'refresh_token'])
params['grant_type'] = 'authorization_code'
elif refresh_token is not None:
params = self.__generate_params(locals(), ['scopes', 'to_file', 'username', 'password', 'code'])
params['grant_type'] = 'refresh_token'
else:
raise MastodonIllegalArgumentError('Invalid arguments given. username and password or code are required.')
params['client_id'] = self.client_id
params['client_secret'] = self.client_secret
params['scope'] = " ".join(scopes)
try:
response = self.__api_request('POST', '/oauth/token', params, do_ratelimiting=False)
self.access_token = response['access_token']
self.__set_refresh_token(response.get('refresh_token'))
self.__set_token_expired(int(response.get('expires_in', 0)))
except Exception as e:
if username is not None or password is not None:
raise MastodonIllegalArgumentError('Invalid user name, password, or redirect_uris: %s' % e)
elif code is not None:
raise MastodonIllegalArgumentError('Invalid access token or redirect_uris: %s' % e)
else:
raise MastodonIllegalArgumentError('Invalid request: %s' % e)
received_scopes = response["scope"].split(" ")
for scope_set in _SCOPE_SETS.keys():
if scope_set in received_scopes:
received_scopes += _SCOPE_SETS[scope_set]
if not set(scopes) <= set(received_scopes):
raise MastodonAPIError('Granted scopes "' + " ".join(received_scopes) + '" do not contain all of the requested scopes "' + " ".join(scopes) + '".')
if to_file is not None:
with open(to_file, 'w') as token_file:
token_file.write(response['access_token'] + "\n")
token_file.write(self.api_base_url + "\n")
token_file.write(self.client_id + "\n")
token_file.write(self.client_secret + "\n")
self.__logged_in_id = None
# Retry version check if needed (might be required in limited federation mode)
if not self.version_check_worked:
self.retrieve_mastodon_version()
return response['access_token']
def revoke_access_token(self):
"""
Revoke the oauth token the user is currently authenticated with, effectively removing
the apps access and requiring the user to log in again.
"""
if self.access_token is None:
raise MastodonIllegalArgumentError("Not logged in, do not have a token to revoke.")
if self.client_id is None or self.client_secret is None:
raise MastodonIllegalArgumentError("Client authentication (id + secret) is required to revoke tokens.")
params = collections.OrderedDict([])
params['client_id'] = self.client_id
params['client_secret'] = self.client_secret
params['token'] = self.access_token
self.__api_request('POST', '/oauth/revoke', params)
# We are now logged out, clear token and logged in id
self.access_token = None
self.__logged_in_id = None

45
mastodon/compat.py 100644
Wyświetl plik

@ -0,0 +1,45 @@
# compat.py - backwards compatible optional imports
IMPL_HAS_CRYPTO = True
try:
import cryptography
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives import serialization
except:
IMPL_HAS_CRYPTO = False
cryptography = None
default_backend = None
ec = None
serialization = None
IMPL_HAS_ECE = True
try:
import http_ece
except:
IMPL_HAS_ECE = False
http_ece = None
IMPL_HAS_BLURHASH = True
try:
import blurhash
except:
IMPL_HAS_BLURHASH = False
blurhash = None
try:
from urllib.parse import urlparse
except ImportError:
from urlparse import urlparse
try:
import magic
except ImportError:
magic = None
try:
from pathlib import PurePath
except:
class PurePath:
pass

Wyświetl plik

@ -0,0 +1,64 @@
# defaults.py - default values for various parameters
_DEFAULT_TIMEOUT = 300
_DEFAULT_STREAM_TIMEOUT = 300
_DEFAULT_STREAM_RECONNECT_WAIT_SEC = 5
_DEFAULT_SCOPES = ['read', 'write', 'follow', 'push']
_SCOPE_SETS = {
'read': [
'read:accounts',
'read:blocks',
'read:favourites',
'read:filters',
'read:follows',
'read:lists',
'read:mutes',
'read:notifications',
'read:search',
'read:statuses',
'read:bookmarks'
],
'write': [
'write:accounts',
'write:blocks',
'write:favourites',
'write:filters',
'write:follows',
'write:lists',
'write:media',
'write:mutes',
'write:notifications',
'write:reports',
'write:statuses',
'write:bookmarks'
],
'follow': [
'read:blocks',
'read:follows',
'read:mutes',
'write:blocks',
'write:follows',
'write:mutes',
],
'admin:read': [
'admin:read:accounts',
'admin:read:reports',
'admin:read:domain_allows',
'admin:read:domain_blocks',
'admin:read:ip_blocks',
'admin:read:email_domain_blocks',
'admin:read:canonical_email_blocks',
],
'admin:write': [
'admin:write:accounts',
'admin:write:reports',
'admin:write:domain_allows',
'admin:write:domain_blocks',
'admin:write:ip_blocks',
'admin:write:email_domain_blocks',
'admin:write:canonical_email_blocks',
],
}
_VALID_SCOPES = ['read', 'write', 'follow', 'push', 'admin:read', 'admin:write'] + \
_SCOPE_SETS['read'] + _SCOPE_SETS['write'] + \
_SCOPE_SETS['admin:read'] + _SCOPE_SETS['admin:write']

90
mastodon/error.py 100644
Wyświetl plik

@ -0,0 +1,90 @@
# error.py - error classes
##
# Exceptions
##
class MastodonError(Exception):
"""Base class for Mastodon.py exceptions"""
class MastodonVersionError(MastodonError):
"""Raised when a function is called that the version of Mastodon for which
Mastodon.py was instantiated does not support"""
class MastodonIllegalArgumentError(ValueError, MastodonError):
"""Raised when an incorrect parameter is passed to a function"""
pass
class MastodonIOError(IOError, MastodonError):
"""Base class for Mastodon.py I/O errors"""
class MastodonFileNotFoundError(MastodonIOError):
"""Raised when a file requested to be loaded can not be opened"""
pass
class MastodonNetworkError(MastodonIOError):
"""Raised when network communication with the server fails"""
pass
class MastodonReadTimeout(MastodonNetworkError):
"""Raised when a stream times out"""
pass
class MastodonAPIError(MastodonError):
"""Raised when the mastodon API generates a response that cannot be handled"""
pass
class MastodonServerError(MastodonAPIError):
"""Raised if the Server is malconfigured and returns a 5xx error code"""
pass
class MastodonInternalServerError(MastodonServerError):
"""Raised if the Server returns a 500 error"""
pass
class MastodonBadGatewayError(MastodonServerError):
"""Raised if the Server returns a 502 error"""
pass
class MastodonServiceUnavailableError(MastodonServerError):
"""Raised if the Server returns a 503 error"""
pass
class MastodonGatewayTimeoutError(MastodonServerError):
"""Raised if the Server returns a 504 error"""
pass
class MastodonNotFoundError(MastodonAPIError):
"""Raised when the Mastodon API returns a 404 Not Found error"""
pass
class MastodonUnauthorizedError(MastodonAPIError):
"""Raised when the Mastodon API returns a 401 Unauthorized error
This happens when an OAuth token is invalid or has been revoked,
or when trying to access an endpoint that can't be used without
authentication without providing credentials."""
pass
class MastodonRatelimitError(MastodonError):
"""Raised when rate limiting is set to manual mode and the rate limit is exceeded"""
pass
class MastodonMalformedEventError(MastodonError):
"""Raised when the server-sent event stream is malformed"""
pass

Wyświetl plik

@ -0,0 +1,96 @@
from .versions import _DICT_VERSION_INSTANCE, _DICT_VERSION_ACTIVITY
from .error import MastodonIllegalArgumentError, MastodonNotFoundError
from .utility import api_version
from .compat import urlparse
from .internals import Mastodon as Internals
class Mastodon(Internals):
###
# Reading data: Instances
###
@api_version("1.1.0", "2.3.0", _DICT_VERSION_INSTANCE)
def instance(self):
"""
Retrieve basic information about the instance, including the URI and administrative contact email.
Does not require authentication unless locked down by the administrator.
Returns an :ref:`instance dict <instance dict>`.
"""
return self.__instance()
def __instance(self):
"""
Internal, non-version-checking helper that does the same as instance()
"""
instance = self.__api_request('GET', '/api/v1/instance/')
return instance
@api_version("2.1.2", "2.1.2", _DICT_VERSION_ACTIVITY)
def instance_activity(self):
"""
Retrieve activity stats about the instance. May be disabled by the instance administrator - throws
a MastodonNotFoundError in that case.
Activity is returned for 12 weeks going back from the current week.
Returns a list of :ref:`activity dicts <activity dicts>`.
"""
return self.__api_request('GET', '/api/v1/instance/activity')
@api_version("2.1.2", "2.1.2", "2.1.2")
def instance_peers(self):
"""
Retrieve the instances that this instance knows about. May be disabled by the instance administrator - throws
a MastodonNotFoundError in that case.
Returns a list of URL strings.
"""
return self.__api_request('GET', '/api/v1/instance/peers')
@api_version("3.0.0", "3.0.0", "3.0.0")
def instance_health(self):
"""
Basic health check. Returns True if healthy, False if not.
"""
status = self.__api_request('GET', '/health', parse=False).decode("utf-8")
return status in ["OK", "success"]
@api_version("3.0.0", "3.0.0", "3.0.0")
def instance_nodeinfo(self, schema="http://nodeinfo.diaspora.software/ns/schema/2.0"):
"""
Retrieves the instance's nodeinfo information.
For information on what the nodeinfo can contain, see the nodeinfo
specification: https://github.com/jhass/nodeinfo . By default,
Mastodon.py will try to retrieve the version 2.0 schema nodeinfo.
To override the schema, specify the desired schema with the `schema`
parameter.
"""
links = self.__api_request('GET', '/.well-known/nodeinfo')["links"]
schema_url = None
for available_schema in links:
if available_schema.rel == schema:
schema_url = available_schema.href
if schema_url is None:
raise MastodonIllegalArgumentError(
"Requested nodeinfo schema is not available.")
try:
return self.__api_request('GET', schema_url, base_url_override="")
except MastodonNotFoundError:
parse = urlparse(schema_url)
return self.__api_request('GET', parse.path + parse.params + parse.query + parse.fragment)
@api_version("3.4.0", "3.4.0", _DICT_VERSION_INSTANCE)
def instance_rules(self):
"""
Retrieve instance rules.
Returns a list of `id` + `text` dicts, same as the `rules` field in the :ref:`instance dicts <instance dicts>`.
"""
return self.__api_request('GET', '/api/v1/instance/rules')

Wyświetl plik

@ -0,0 +1,669 @@
import datetime
from contextlib import closing
import mimetypes
import threading
import six
import uuid
import dateutil.parser
import time
import copy
import requests
import re
import collections
import base64
import os
from .utility import AttribAccessDict, AttribAccessList, parse_version_string
from .error import MastodonNetworkError, MastodonIllegalArgumentError, MastodonRatelimitError, MastodonNotFoundError, \
MastodonUnauthorizedError, MastodonInternalServerError, MastodonBadGatewayError, MastodonServiceUnavailableError, \
MastodonGatewayTimeoutError, MastodonServerError, MastodonAPIError, MastodonMalformedEventError
from .compat import urlparse, magic, PurePath
from .defaults import _DEFAULT_STREAM_TIMEOUT, _DEFAULT_STREAM_RECONNECT_WAIT_SEC
###
# Internal helpers, dragons probably
###
class Mastodon():
def __datetime_to_epoch(self, date_time):
"""
Converts a python datetime to unix epoch, accounting for
time zones and such.
Assumes UTC if timezone is not given.
"""
if date_time.tzinfo is None:
date_time = date_time.replace(tzinfo=datetime.timezone.utc)
return date_time.timestamp()
def __get_logged_in_id(self):
"""
Fetch the logged in user's ID, with caching. ID is reset on calls to log_in.
"""
if self.__logged_in_id is None:
self.__logged_in_id = self.account_verify_credentials().id
return self.__logged_in_id
@staticmethod
def __json_allow_dict_attrs(json_object):
"""
Makes it possible to use attribute notation to access a dicts
elements, while still allowing the dict to act as a dict.
"""
if isinstance(json_object, dict):
return AttribAccessDict(json_object)
return json_object
@staticmethod
def __json_date_parse(json_object):
"""
Parse dates in certain known json fields, if possible.
"""
known_date_fields = ["created_at", "week", "day", "expires_at", "scheduled_at",
"updated_at", "last_status_at", "starts_at", "ends_at", "published_at", "edited_at"]
mark_delete = []
for k, v in json_object.items():
if k in known_date_fields:
if v is not None:
try:
if isinstance(v, int):
json_object[k] = datetime.datetime.fromtimestamp(v, datetime.timezone.utc)
else:
json_object[k] = dateutil.parser.parse(v)
except:
# When we can't parse a date, we just leave the field out
mark_delete.append(k)
# Two step process because otherwise python gets very upset
for k in mark_delete:
del json_object[k]
return json_object
@staticmethod
def __json_truefalse_parse(json_object):
"""
Parse 'True' / 'False' strings in certain known fields
"""
for key in ('follow', 'favourite', 'reblog', 'mention'):
if (key in json_object and isinstance(json_object[key], six.text_type)):
if json_object[key].lower() == 'true':
json_object[key] = True
if json_object[key].lower() == 'false':
json_object[key] = False
return json_object
@staticmethod
def __json_strnum_to_bignum(json_object):
"""
Converts json string numerals to native python bignums.
"""
for key in ('id', 'week', 'in_reply_to_id', 'in_reply_to_account_id', 'logins', 'registrations', 'statuses', 'day', 'last_read_id'):
if (key in json_object and isinstance(json_object[key], six.text_type)):
try:
json_object[key] = int(json_object[key])
except ValueError:
pass
return json_object
@staticmethod
def __json_hooks(json_object):
"""
All the json hooks. Used in request parsing.
"""
json_object = Mastodon.__json_strnum_to_bignum(json_object)
json_object = Mastodon.__json_date_parse(json_object)
json_object = Mastodon.__json_truefalse_parse(json_object)
json_object = Mastodon.__json_allow_dict_attrs(json_object)
return json_object
@staticmethod
def __consistent_isoformat_utc(datetime_val):
"""
Function that does what isoformat does but it actually does the same
every time instead of randomly doing different things on some systems
and also it represents that time as the equivalent UTC time.
"""
isotime = datetime_val.astimezone(datetime.timezone.utc).strftime("%Y-%m-%dT%H:%M:%S%z")
if isotime[-2] != ":":
isotime = isotime[:-2] + ":" + isotime[-2:]
return isotime
def __api_request(self, method, endpoint, params={}, files={}, headers={}, access_token_override=None, base_url_override=None,
do_ratelimiting=True, use_json=False, parse=True, return_response_object=False, skip_error_check=False, lang_override=None):
"""
Internal API request helper.
"""
response = None
remaining_wait = 0
# Add language to params if not None
lang = self.lang
if lang_override is not None:
lang = lang_override
if lang is not None:
params["lang"] = lang
# "pace" mode ratelimiting: Assume constant rate of requests, sleep a little less long than it
# would take to not hit the rate limit at that request rate.
if do_ratelimiting and self.ratelimit_method == "pace":
if self.ratelimit_remaining == 0:
to_next = self.ratelimit_reset - time.time()
if to_next > 0:
# As a precaution, never sleep longer than 5 minutes
to_next = min(to_next, 5 * 60)
time.sleep(to_next)
else:
time_waited = time.time() - self.ratelimit_lastcall
time_wait = float(self.ratelimit_reset - time.time()) / float(self.ratelimit_remaining)
remaining_wait = time_wait - time_waited
if remaining_wait > 0:
to_next = remaining_wait / self.ratelimit_pacefactor
to_next = min(to_next, 5 * 60)
time.sleep(to_next)
# Generate request headers
headers = copy.deepcopy(headers)
if self.access_token is not None:
headers['Authorization'] = 'Bearer ' + self.access_token
if access_token_override is not None:
headers['Authorization'] = 'Bearer ' + access_token_override
# Add user-agent
if self.user_agent:
headers['User-Agent'] = self.user_agent
# Determine base URL
base_url = self.api_base_url
if base_url_override is not None:
base_url = base_url_override
if self.debug_requests:
print('Mastodon: Request to endpoint "' + base_url +
endpoint + '" using method "' + method + '".')
print('Parameters: ' + str(params))
print('Headers: ' + str(headers))
print('Files: ' + str(files))
# Make request
request_complete = False
while not request_complete:
request_complete = True
response_object = None
try:
kwargs = dict(headers=headers, files=files, timeout=self.request_timeout)
if use_json:
kwargs['json'] = params
elif method == 'GET':
kwargs['params'] = params
else:
kwargs['data'] = params
response_object = self.session.request(method, base_url + endpoint, **kwargs)
except Exception as e:
raise MastodonNetworkError("Could not complete request: %s" % e)
if response_object is None:
raise MastodonIllegalArgumentError("Illegal request.")
# Parse rate limiting headers
if 'X-RateLimit-Remaining' in response_object.headers and do_ratelimiting:
self.ratelimit_remaining = int(
response_object.headers['X-RateLimit-Remaining'])
self.ratelimit_limit = int(
response_object.headers['X-RateLimit-Limit'])
# For gotosocial, we need an int representation, but for non-ints this would crash
try:
ratelimit_intrep = str(
int(response_object.headers['X-RateLimit-Reset']))
except:
ratelimit_intrep = None
try:
if ratelimit_intrep is not None and ratelimit_intrep == response_object.headers['X-RateLimit-Reset']:
self.ratelimit_reset = int(
response_object.headers['X-RateLimit-Reset'])
else:
ratelimit_reset_datetime = dateutil.parser.parse(response_object.headers['X-RateLimit-Reset'])
self.ratelimit_reset = self.__datetime_to_epoch(ratelimit_reset_datetime)
# Adjust server time to local clock
if 'Date' in response_object.headers:
server_time_datetime = dateutil.parser.parse(response_object.headers['Date'])
server_time = self.__datetime_to_epoch(server_time_datetime)
server_time_diff = time.time() - server_time
self.ratelimit_reset += server_time_diff
self.ratelimit_lastcall = time.time()
except Exception as e:
raise MastodonRatelimitError("Rate limit time calculations failed: %s" % e)
# Handle response
if self.debug_requests:
print('Mastodon: Response received with code ' + str(response_object.status_code) + '.')
print('response headers: ' + str(response_object.headers))
print('Response text content: ' + str(response_object.text))
if not response_object.ok:
try:
response = response_object.json(object_hook=self.__json_hooks)
if isinstance(response, dict) and 'error' in response:
error_msg = response['error']
elif isinstance(response, str):
error_msg = response
else:
error_msg = None
except ValueError:
error_msg = None
# Handle rate limiting
if response_object.status_code == 429:
if self.ratelimit_method == 'throw' or not do_ratelimiting:
raise MastodonRatelimitError('Hit rate limit.')
elif self.ratelimit_method in ('wait', 'pace'):
to_next = self.ratelimit_reset - time.time()
if to_next > 0:
# As a precaution, never sleep longer than 5 minutes
to_next = min(to_next, 5 * 60)
time.sleep(to_next)
request_complete = False
continue
if not skip_error_check:
if response_object.status_code == 404:
ex_type = MastodonNotFoundError
if not error_msg:
error_msg = 'Endpoint not found.'
# this is for compatibility with older versions
# which raised MastodonAPIError('Endpoint not found.')
# on any 404
elif response_object.status_code == 401:
ex_type = MastodonUnauthorizedError
elif response_object.status_code == 500:
ex_type = MastodonInternalServerError
elif response_object.status_code == 502:
ex_type = MastodonBadGatewayError
elif response_object.status_code == 503:
ex_type = MastodonServiceUnavailableError
elif response_object.status_code == 504:
ex_type = MastodonGatewayTimeoutError
elif response_object.status_code >= 500 and response_object.status_code <= 511:
ex_type = MastodonServerError
else:
ex_type = MastodonAPIError
raise ex_type('Mastodon API returned error', response_object.status_code, response_object.reason, error_msg)
if return_response_object:
return response_object
if parse:
try:
response = response_object.json(object_hook=self.__json_hooks)
except:
raise MastodonAPIError(
"Could not parse response as JSON, response code was %s, "
"bad json content was '%s'" % (response_object.status_code,
response_object.content))
else:
response = response_object.content
# Parse link headers
if isinstance(response, list) and \
'Link' in response_object.headers and \
response_object.headers['Link'] != "":
response = AttribAccessList(response)
tmp_urls = requests.utils.parse_header_links(
response_object.headers['Link'].rstrip('>').replace('>,<', ',<'))
for url in tmp_urls:
if 'rel' not in url:
continue
if url['rel'] == 'next':
# Be paranoid and extract max_id specifically
next_url = url['url']
matchgroups = re.search(r"[?&]max_id=([^&]+)", next_url)
if matchgroups:
next_params = copy.deepcopy(params)
next_params['_pagination_method'] = method
next_params['_pagination_endpoint'] = endpoint
max_id = matchgroups.group(1)
if max_id.isdigit():
next_params['max_id'] = int(max_id)
else:
next_params['max_id'] = max_id
if "since_id" in next_params:
del next_params['since_id']
if "min_id" in next_params:
del next_params['min_id']
response._pagination_next = next_params
# Maybe other API users rely on the pagination info in the last item
# Will be removed in future
if isinstance(response[-1], AttribAccessDict):
response[-1]._pagination_next = next_params
if url['rel'] == 'prev':
# Be paranoid and extract since_id or min_id specifically
prev_url = url['url']
# Old and busted (pre-2.6.0): since_id pagination
matchgroups = re.search(
r"[?&]since_id=([^&]+)", prev_url)
if matchgroups:
prev_params = copy.deepcopy(params)
prev_params['_pagination_method'] = method
prev_params['_pagination_endpoint'] = endpoint
since_id = matchgroups.group(1)
if since_id.isdigit():
prev_params['since_id'] = int(since_id)
else:
prev_params['since_id'] = since_id
if "max_id" in prev_params:
del prev_params['max_id']
response._pagination_prev = prev_params
# Maybe other API users rely on the pagination info in the first item
# Will be removed in future
if isinstance(response[0], AttribAccessDict):
response[0]._pagination_prev = prev_params
# New and fantastico (post-2.6.0): min_id pagination
matchgroups = re.search(
r"[?&]min_id=([^&]+)", prev_url)
if matchgroups:
prev_params = copy.deepcopy(params)
prev_params['_pagination_method'] = method
prev_params['_pagination_endpoint'] = endpoint
min_id = matchgroups.group(1)
if min_id.isdigit():
prev_params['min_id'] = int(min_id)
else:
prev_params['min_id'] = min_id
if "max_id" in prev_params:
del prev_params['max_id']
response._pagination_prev = prev_params
# Maybe other API users rely on the pagination info in the first item
# Will be removed in future
if isinstance(response[0], AttribAccessDict):
response[0]._pagination_prev = prev_params
return response
def __get_streaming_base(self):
"""
Internal streaming API helper.
Returns the correct URL for the streaming API.
"""
instance = self.instance()
if "streaming_api" in instance["urls"] and instance["urls"]["streaming_api"] != self.api_base_url:
# This is probably a websockets URL, which is really for the browser, but requests can't handle it
# So we do this below to turn it into an HTTPS or HTTP URL
parse = urlparse(instance["urls"]["streaming_api"])
if parse.scheme == 'wss':
url = "https://" + parse.netloc
elif parse.scheme == 'ws':
url = "http://" + parse.netloc
else:
raise MastodonAPIError(
"Could not parse streaming api location returned from server: {}.".format(
instance["urls"]["streaming_api"]))
else:
url = self.api_base_url
return url
def __stream(self, endpoint, listener, params={}, run_async=False, timeout=_DEFAULT_STREAM_TIMEOUT, reconnect_async=False, reconnect_async_wait_sec=_DEFAULT_STREAM_RECONNECT_WAIT_SEC):
"""
Internal streaming API helper.
Returns a handle to the open connection that the user can close if they
wish to terminate it.
"""
# Check if we have to redirect
url = self.__get_streaming_base()
# The streaming server can't handle two slashes in a path, so remove trailing slashes
if url[-1] == '/':
url = url[:-1]
# Connect function (called and then potentially passed to async handler)
def connect_func():
headers = {"Authorization": "Bearer " +
self.access_token} if self.access_token else {}
if self.user_agent:
headers['User-Agent'] = self.user_agent
connection = self.session.get(url + endpoint, headers=headers, data=params, stream=True,
timeout=(self.request_timeout, timeout))
if connection.status_code != 200:
raise MastodonNetworkError(
"Could not connect to streaming server: %s" % connection.reason)
return connection
connection = None
# Async stream handler
class __stream_handle():
def __init__(self, connection, connect_func, reconnect_async, reconnect_async_wait_sec):
self.closed = False
self.running = True
self.connection = connection
self.connect_func = connect_func
self.reconnect_async = reconnect_async
self.reconnect_async_wait_sec = reconnect_async_wait_sec
self.reconnecting = False
def close(self):
self.closed = True
if self.connection is not None:
self.connection.close()
def is_alive(self):
return self._thread.is_alive()
def is_receiving(self):
if self.closed or not self.running or self.reconnecting or not self.is_alive():
return False
else:
return True
def _sleep_attentive(self):
if self._thread != threading.current_thread():
raise RuntimeError(
"Illegal call from outside the stream_handle thread")
time_remaining = self.reconnect_async_wait_sec
while time_remaining > 0 and not self.closed:
time.sleep(0.5)
time_remaining -= 0.5
def _threadproc(self):
self._thread = threading.current_thread()
# Run until closed or until error if not autoreconnecting
while self.running:
if self.connection is not None:
with closing(self.connection) as r:
try:
listener.handle_stream(r)
except (AttributeError, MastodonMalformedEventError, MastodonNetworkError) as e:
if not (self.closed or self.reconnect_async):
raise e
else:
if self.closed:
self.running = False
# Reconnect loop. Try immediately once, then with delays on error.
if (self.reconnect_async and not self.closed) or self.connection is None:
self.reconnecting = True
connect_success = False
while not connect_success:
if self.closed:
# Someone from outside stopped the streaming
self.running = False
break
try:
the_connection = self.connect_func()
if the_connection.status_code != 200:
exception = MastodonNetworkError(f"Could not connect to server. "
f"HTTP status: {the_connection.status_code}")
listener.on_abort(exception)
self._sleep_attentive()
if self.closed:
# Here we have maybe a rare race condition. Exactly on connect, someone
# stopped the streaming before. We close the previous established connection:
the_connection.close()
else:
self.connection = the_connection
connect_success = True
except:
self._sleep_attentive()
connect_success = False
self.reconnecting = False
else:
self.running = False
return 0
if run_async:
handle = __stream_handle(
connection, connect_func, reconnect_async, reconnect_async_wait_sec)
t = threading.Thread(args=(), target=handle._threadproc)
t.daemon = True
t.start()
return handle
else:
# Blocking, never returns (can only leave via exception)
connection = connect_func()
with closing(connection) as r:
listener.handle_stream(r)
def __generate_params(self, params, exclude=[]):
"""
Internal named-parameters-to-dict helper.
Note for developers: If called with locals() as params,
as is the usual practice in this code, the __generate_params call
(or at least the locals() call) should generally be the first thing
in your function.
"""
params = collections.OrderedDict(params)
if 'self' in params:
del params['self']
param_keys = list(params.keys())
for key in param_keys:
if isinstance(params[key], bool):
params[key] = '1' if params[key] else '0'
for key in param_keys:
if params[key] is None or key in exclude:
del params[key]
param_keys = list(params.keys())
for key in param_keys:
if isinstance(params[key], list):
params[key + "[]"] = params[key]
del params[key]
return params
def __unpack_id(self, id, dateconv=False):
"""
Internal object-to-id converter
Checks if id is a dict that contains id and
returns the id inside, otherwise just returns
the id straight.
Also unpacks datetimes to snowflake IDs if requested.
"""
if isinstance(id, dict) and "id" in id:
id = id["id"]
if dateconv and isinstance(id, datetime.datetime):
id = (int(id.timestamp()) << 16) * 1000
return id
def __decode_webpush_b64(self, data):
"""
Re-pads and decodes urlsafe base64.
"""
missing_padding = len(data) % 4
if missing_padding != 0:
data += '=' * (4 - missing_padding)
return base64.urlsafe_b64decode(data)
def __get_token_expired(self):
"""Internal helper for oauth code"""
return self._token_expired < datetime.datetime.now()
def __set_token_expired(self, value):
"""Internal helper for oauth code"""
self._token_expired = datetime.datetime.now() + datetime.timedelta(seconds=value)
return
def __get_refresh_token(self):
"""Internal helper for oauth code"""
return self._refresh_token
def __set_refresh_token(self, value):
"""Internal helper for oauth code"""
self._refresh_token = value
return
def __guess_type(self, media_file):
"""Internal helper to guess media file type"""
mime_type = None
try:
mime_type = magic.from_file(media_file, mime=True)
except AttributeError:
mime_type = mimetypes.guess_type(media_file)[0]
return mime_type
def __load_media_file(self, media_file, mime_type=None, file_name=None):
if isinstance(media_file, PurePath):
media_file = str(media_file)
if isinstance(media_file, str) and os.path.isfile(media_file):
mime_type = self.__guess_type(media_file)
media_file = open(media_file, 'rb')
elif isinstance(media_file, str) and os.path.isfile(media_file):
media_file = open(media_file, 'rb')
if mime_type is None:
raise MastodonIllegalArgumentError('Could not determine mime type or data passed directly without mime type.')
if file_name is None:
random_suffix = uuid.uuid4().hex
file_name = "mastodonpyupload_" + str(time.time()) + "_" + str(random_suffix) + mimetypes.guess_extension(mime_type)
return (file_name, media_file, mime_type)
@staticmethod
def __protocolize(base_url):
"""Internal add-protocol-to-url helper"""
if not base_url.startswith("http://") and not base_url.startswith("https://"):
base_url = "https://" + base_url
# Some API endpoints can't handle extra /'s in path requests
base_url = base_url.rstrip("/")
return base_url
@staticmethod
def __deprotocolize(base_url):
"""Internal helper to strip http and https from a URL"""
if base_url.startswith("http://"):
base_url = base_url[7:]
elif base_url.startswith("https://") or base_url.startswith("onion://"):
base_url = base_url[8:]
return base_url
def __normalize_version_string(self, version_string):
# Split off everything after the first space, to take care of Pleromalikes so that the parser doesn't get confused in case those have a + somewhere in their version
version_string = version_string.split(" ")[0]
try:
# Attempt to split at + and check if the part after parses as a version string, to account for hometown
parse_version_string(version_string.split("+")[1])
return version_string.split("+")[1]
except:
# If this fails, assume that if there is a +, what is before that is the masto version (or that there is no +)
return version_string.split("+")[0]

Wyświetl plik

@ -0,0 +1,108 @@
from .versions import _DICT_VERSION_STATUS, _DICT_VERSION_CARD, _DICT_VERSION_CONTEXT, _DICT_VERSION_ACCOUNT, _DICT_VERSION_SCHEDULED_STATUS
from .utility import api_version
from .internals import Mastodon as Internals
class Mastodon(Internals):
###
# Reading data: Statuses
###
@api_version("1.0.0", "2.0.0", _DICT_VERSION_STATUS)
def status(self, id):
"""
Fetch information about a single toot.
Does not require authentication for publicly visible statuses.
Returns a :ref:`status dict <status dict>`.
"""
id = self.__unpack_id(id)
url = '/api/v1/statuses/{0}'.format(str(id))
return self.__api_request('GET', url)
@api_version("1.0.0", "3.0.0", _DICT_VERSION_CARD)
def status_card(self, id):
"""
Fetch a card associated with a status. A card describes an object (such as an
external video or link) embedded into a status.
Does not require authentication for publicly visible statuses.
This function is deprecated as of 3.0.0 and the endpoint does not
exist anymore - you should just use the "card" field of the status dicts
instead. Mastodon.py will try to mimic the old behaviour, but this
is somewhat inefficient and not guaranteed to be the case forever.
Returns a :ref:`card dict <card dict>`.
"""
if self.verify_minimum_version("3.0.0", cached=True):
return self.status(id).card
else:
id = self.__unpack_id(id)
url = '/api/v1/statuses/{0}/card'.format(str(id))
return self.__api_request('GET', url)
@api_version("1.0.0", "1.0.0", _DICT_VERSION_CONTEXT)
def status_context(self, id):
"""
Fetch information about ancestors and descendants of a toot.
Does not require authentication for publicly visible statuses.
Returns a :ref:`context dict <context dict>`.
"""
id = self.__unpack_id(id)
url = '/api/v1/statuses/{0}/context'.format(str(id))
return self.__api_request('GET', url)
@api_version("1.0.0", "2.1.0", _DICT_VERSION_ACCOUNT)
def status_reblogged_by(self, id):
"""
Fetch a list of users that have reblogged a status.
Does not require authentication for publicly visible statuses.
Returns a list of :ref:`account dicts <account dicts>`.
"""
id = self.__unpack_id(id)
url = '/api/v1/statuses/{0}/reblogged_by'.format(str(id))
return self.__api_request('GET', url)
@api_version("1.0.0", "2.1.0", _DICT_VERSION_ACCOUNT)
def status_favourited_by(self, id):
"""
Fetch a list of users that have favourited a status.
Does not require authentication for publicly visible statuses.
Returns a list of :ref:`account dicts <account dicts>`.
"""
id = self.__unpack_id(id)
url = '/api/v1/statuses/{0}/favourited_by'.format(str(id))
return self.__api_request('GET', url)
###
# Reading data: Scheduled statuses
###
@api_version("2.7.0", "2.7.0", _DICT_VERSION_SCHEDULED_STATUS)
def scheduled_statuses(self):
"""
Fetch a list of scheduled statuses
Returns a list of :ref:`scheduled status dicts <scheduled status dicts>`.
"""
return self.__api_request('GET', '/api/v1/scheduled_statuses')
@api_version("2.7.0", "2.7.0", _DICT_VERSION_SCHEDULED_STATUS)
def scheduled_status(self, id):
"""
Fetch information about the scheduled status with the given id.
Returns a :ref:`scheduled status dict <scheduled status dict>`.
"""
id = self.__unpack_id(id)
url = '/api/v1/scheduled_statuses/{0}'.format(str(id))
return self.__api_request('GET', url)

Wyświetl plik

@ -0,0 +1,121 @@
from .versions import _DICT_VERSION_STATUS, _DICT_VERSION_CONVERSATION
from .error import MastodonIllegalArgumentError, MastodonNotFoundError
from .utility import api_version
from .internals import Mastodon as Internals
class Mastodon(Internals):
###
# Reading data: Timelines
##
@api_version("1.0.0", "3.1.4", _DICT_VERSION_STATUS)
def timeline(self, timeline="home", max_id=None, min_id=None, since_id=None, limit=None, only_media=False, local=False, remote=False):
"""
Fetch statuses, most recent ones first. `timeline` can be 'home', 'local', 'public',
'tag/hashtag' or 'list/id'. See the following functions documentation for what those do.
The default timeline is the "home" timeline.
Specify `only_media` to only get posts with attached media. Specify `local` to only get local statuses,
and `remote` to only get remote statuses. Some options are mutually incompatible as dictated by logic.
May or may not require authentication depending on server settings and what is specifically requested.
Returns a list of :ref:`status dicts <status dicts>`.
"""
if max_id is not None:
max_id = self.__unpack_id(max_id, dateconv=True)
if min_id is not None:
min_id = self.__unpack_id(min_id, dateconv=True)
if since_id is not None:
since_id = self.__unpack_id(since_id, dateconv=True)
params_initial = locals()
if not local:
del params_initial['local']
if not remote:
del params_initial['remote']
if not only_media:
del params_initial['only_media']
if timeline == "local":
timeline = "public"
params_initial['local'] = True
params = self.__generate_params(params_initial, ['timeline'])
url = '/api/v1/timelines/{0}'.format(timeline)
return self.__api_request('GET', url, params)
@api_version("1.0.0", "3.1.4", _DICT_VERSION_STATUS)
def timeline_home(self, max_id=None, min_id=None, since_id=None, limit=None, only_media=False, local=False, remote=False):
"""
Convenience method: Fetches the logged-in user's home timeline (i.e. followed users and self). Params as in `timeline()`.
Returns a list of :ref:`status dicts <status dicts>`.
"""
return self.timeline('home', max_id=max_id, min_id=min_id, since_id=since_id, limit=limit, only_media=only_media, local=local, remote=remote)
@api_version("1.0.0", "3.1.4", _DICT_VERSION_STATUS)
def timeline_local(self, max_id=None, min_id=None, since_id=None, limit=None, only_media=False):
"""
Convenience method: Fetches the local / instance-wide timeline, not including replies. Params as in `timeline()`.
Returns a list of :ref:`status dicts <status dicts>`.
"""
return self.timeline('local', max_id=max_id, min_id=min_id, since_id=since_id, limit=limit, only_media=only_media)
@api_version("1.0.0", "3.1.4", _DICT_VERSION_STATUS)
def timeline_public(self, max_id=None, min_id=None, since_id=None, limit=None, only_media=False, local=False, remote=False):
"""
Convenience method: Fetches the public / visible-network / federated timeline, not including replies. Params as in `timeline()`.
Returns a list of :ref:`status dicts <status dicts>`.
"""
return self.timeline('public', max_id=max_id, min_id=min_id, since_id=since_id, limit=limit, only_media=only_media, local=local, remote=remote)
@api_version("1.0.0", "3.1.4", _DICT_VERSION_STATUS)
def timeline_hashtag(self, hashtag, local=False, max_id=None, min_id=None, since_id=None, limit=None, only_media=False, remote=False):
"""
Convenience method: Fetch a timeline of toots with a given hashtag. The hashtag parameter
should not contain the leading #. Params as in `timeline()`.
Returns a list of :ref:`status dicts <status dicts>`.
"""
if hashtag.startswith("#"):
raise MastodonIllegalArgumentError(
"Hashtag parameter should omit leading #")
return self.timeline('tag/{0}'.format(hashtag), max_id=max_id, min_id=min_id, since_id=since_id, limit=limit, only_media=only_media, local=local, remote=remote)
@api_version("2.1.0", "3.1.4", _DICT_VERSION_STATUS)
def timeline_list(self, id, max_id=None, min_id=None, since_id=None, limit=None, only_media=False, local=False, remote=False):
"""
Convenience method: Fetches a timeline containing all the toots by users in a given list. Params as in `timeline()`.
Returns a list of :ref:`status dicts <status dicts>`.
"""
id = self.__unpack_id(id)
return self.timeline('list/{0}'.format(id), max_id=max_id, min_id=min_id, since_id=since_id, limit=limit, only_media=only_media, local=local, remote=remote)
@api_version("2.6.0", "2.6.0", _DICT_VERSION_CONVERSATION)
def conversations(self, max_id=None, min_id=None, since_id=None, limit=None):
"""
Fetches a user's conversations.
Returns a list of :ref:`conversation dicts <conversation dicts>`.
"""
if max_id is not None:
max_id = self.__unpack_id(max_id, dateconv=True)
if min_id is not None:
min_id = self.__unpack_id(min_id, dateconv=True)
if since_id is not None:
since_id = self.__unpack_id(since_id, dateconv=True)
params = self.__generate_params(locals())
return self.__api_request('GET', "/api/v1/conversations/", params)

143
mastodon/utility.py 100644
Wyświetl plik

@ -0,0 +1,143 @@
# utility.py - utility functions, externally usable
import re
from decorator import decorate
from .error import MastodonVersionError, MastodonAPIError
import dateutil
import datetime
# Module level:
###
# Version check functions, including decorator and parser
###
def parse_version_string(version_string):
"""Parses a semver version string, stripping off "rc" stuff if present."""
string_parts = version_string.split(".")
version_parts = (
int(re.match("([0-9]*)", string_parts[0]).group(0)),
int(re.match("([0-9]*)", string_parts[1]).group(0)),
int(re.match("([0-9]*)", string_parts[2]).group(0))
)
return version_parts
def max_version(*version_strings):
"""Returns the maximum version of all provided version strings."""
return max(version_strings, key=parse_version_string)
def api_version(created_ver, last_changed_ver, return_value_ver):
"""Version check decorator. Currently only checks Bigger Than."""
def api_min_version_decorator(function):
def wrapper(function, self, *args, **kwargs):
if not self.version_check_mode == "none":
if self.version_check_mode == "created":
version = created_ver
else:
version = max_version(last_changed_ver, return_value_ver)
major, minor, patch = parse_version_string(version)
if major > self.mastodon_major:
raise MastodonVersionError("Version check failed (Need version " + version + ")")
elif major == self.mastodon_major and minor > self.mastodon_minor:
raise MastodonVersionError("Version check failed (Need version " + version + ")")
elif major == self.mastodon_major and minor == self.mastodon_minor and patch > self.mastodon_patch:
raise MastodonVersionError("Version check failed (Need version " + version + ", patch is " + str(self.mastodon_patch) + ")")
return function(self, *args, **kwargs)
function.__doc__ = function.__doc__ + "\n\n *Added: Mastodon v" + \
created_ver + ", last changed: Mastodon v" + last_changed_ver + "*"
return decorate(function, wrapper)
return api_min_version_decorator
###
# Dict helper class.
# Defined at top level so it can be pickled.
###
class AttribAccessDict(dict):
def __getattr__(self, attr):
if attr in self:
return self[attr]
else:
raise AttributeError("Attribute not found: " + str(attr))
def __setattr__(self, attr, val):
if attr in self:
raise AttributeError("Attribute-style access is read only")
super(AttribAccessDict, self).__setattr__(attr, val)
###
# List helper class.
# Defined at top level so it can be pickled.
###
class AttribAccessList(list):
def __getattr__(self, attr):
if attr in self:
return self[attr]
else:
raise AttributeError("Attribute not found: " + str(attr))
def __setattr__(self, attr, val):
if attr in self:
raise AttributeError("Attribute-style access is read only")
super(AttribAccessList, self).__setattr__(attr, val)
# Class level:
class Mastodon():
def set_language(self, lang):
"""
Set the locale Mastodon will use to generate responses. Valid parameters are all ISO 639-1 (two letter) or, for languages that do
not have one, 639-3 (three letter) language codes. This affects some error messages (those related to validation) and trends.
"""
self.lang = lang
def retrieve_mastodon_version(self):
"""
Determine installed Mastodon version and set major, minor and patch (not including RC info) accordingly.
Returns the version string, possibly including rc info.
"""
try:
version_str = self.__normalize_version_string(self.__instance()["version"])
self.version_check_worked = True
except:
# instance() was added in 1.1.0, so our best guess is 1.0.0.
version_str = "1.0.0"
self.version_check_worked = False
self.mastodon_major, self.mastodon_minor, self.mastodon_patch = parse_version_string(version_str)
return version_str
def verify_minimum_version(self, version_str, cached=False):
"""
Update version info from server and verify that at least the specified version is present.
If you specify "cached", the version info update part is skipped.
Returns True if version requirement is satisfied, False if not.
"""
if not cached:
self.retrieve_mastodon_version()
major, minor, patch = parse_version_string(version_str)
if major > self.mastodon_major:
return False
elif major == self.mastodon_major and minor > self.mastodon_minor:
return False
elif major == self.mastodon_major and minor == self.mastodon_minor and patch > self.mastodon_patch:
return False
return True
def get_approx_server_time(self):
"""
Retrieve the approximate server time
We parse this from the hopefully present "Date" header, but make no effort to compensate for latency.
"""
response = self.__api_request("HEAD", "/", return_response_object=True)
if 'Date' in response.headers:
server_time_datetime = dateutil.parser.parse(response.headers['Date'])
# Make sure we're in local time
epoch_time = self.__datetime_to_epoch(server_time_datetime)
return datetime.datetime.fromtimestamp(epoch_time)
else:
raise MastodonAPIError("No server time in response.")

Wyświetl plik

@ -0,0 +1,40 @@
# versions.py - versioning of return values
from .utility import max_version
# Dict versions
_DICT_VERSION_APPLICATION = "2.7.2"
_DICT_VERSION_MENTION = "1.0.0"
_DICT_VERSION_MEDIA = "3.2.0"
_DICT_VERSION_ACCOUNT = "3.3.0"
_DICT_VERSION_POLL = "2.8.0"
_DICT_VERSION_STATUS = max_version("3.1.0", _DICT_VERSION_MEDIA, _DICT_VERSION_ACCOUNT, _DICT_VERSION_APPLICATION, _DICT_VERSION_MENTION, _DICT_VERSION_POLL)
_DICT_VERSION_INSTANCE = max_version("3.4.0", _DICT_VERSION_ACCOUNT)
_DICT_VERSION_HASHTAG = "2.3.4"
_DICT_VERSION_EMOJI = "3.0.0"
_DICT_VERSION_RELATIONSHIP = "3.3.0"
_DICT_VERSION_NOTIFICATION = max_version("3.5.0", _DICT_VERSION_ACCOUNT, _DICT_VERSION_STATUS)
_DICT_VERSION_CONTEXT = max_version("1.0.0", _DICT_VERSION_STATUS)
_DICT_VERSION_LIST = "2.1.0"
_DICT_VERSION_CARD = "3.2.0"
_DICT_VERSION_SEARCHRESULT = max_version("1.0.0", _DICT_VERSION_ACCOUNT, _DICT_VERSION_STATUS, _DICT_VERSION_HASHTAG)
_DICT_VERSION_ACTIVITY = "2.1.2"
_DICT_VERSION_REPORT = "2.9.1"
_DICT_VERSION_PUSH = "2.4.0"
_DICT_VERSION_PUSH_NOTIF = "2.4.0"
_DICT_VERSION_FILTER = "2.4.3"
_DICT_VERSION_CONVERSATION = max_version("2.6.0", _DICT_VERSION_ACCOUNT, _DICT_VERSION_STATUS)
_DICT_VERSION_SCHEDULED_STATUS = max_version("2.7.0", _DICT_VERSION_STATUS)
_DICT_VERSION_PREFERENCES = "2.8.0"
_DICT_VERSION_ADMIN_ACCOUNT = max_version("4.0.0", _DICT_VERSION_ACCOUNT)
_DICT_VERSION_FEATURED_TAG = "3.0.0"
_DICT_VERSION_MARKER = "3.0.0"
_DICT_VERSION_REACTION = "3.1.0"
_DICT_VERSION_ANNOUNCEMENT = max_version("3.1.0", _DICT_VERSION_REACTION)
_DICT_VERSION_STATUS_EDIT = "3.5.0"
_DICT_VERSION_FAMILIAR_FOLLOWERS = max_version("3.5.0", _DICT_VERSION_ACCOUNT)
_DICT_VERSION_ADMIN_DOMAIN_BLOCK = "4.0.0"
_DICT_VERSION_ADMIN_MEASURE = "3.5.0"
_DICT_VERSION_ADMIN_DIMENSION = "3.5.0"
_DICT_VERSION_ADMIN_RETENTION = "3.5.0"

Wyświetl plik

@ -7,7 +7,8 @@ test_deps = [
'vcrpy',
'pytest-vcr',
'pytest-mock',
'requests-mock'
'requests-mock',
'pytz'
]
webpush_deps = [
@ -33,7 +34,6 @@ setup(name='Mastodon.py',
'requests>=2.4.2',
'python-dateutil',
'six',
'pytz',
'python-magic',
'decorator>=4.0.0',
] + blurhash_deps,

Wyświetl plik

@ -74,13 +74,13 @@ def test_instance_rules(api):
assert isinstance(api.instance_rules(), list)
def test_version_parsing(api):
assert parse_version_string(api._Mastodon__normalize_version_string("4.0.2")) == [4, 0, 2]
assert parse_version_string(api._Mastodon__normalize_version_string("2.1.0rc3")) == [2, 1, 0]
assert parse_version_string(api._Mastodon__normalize_version_string("1.0.7+3.5.5")) == [3, 5, 5]
assert parse_version_string(api._Mastodon__normalize_version_string("1.0.7+3.5.5rc2")) == [3, 5, 5]
assert parse_version_string(api._Mastodon__normalize_version_string("3.5.1+chitter")) == [3, 5, 1]
assert parse_version_string(api._Mastodon__normalize_version_string("3.5.1+chitter-6.6.6")) == [3, 5, 1]
assert parse_version_string(api._Mastodon__normalize_version_string("3.5.1rc4+chitter-6.6.6")) == [3, 5, 1]
assert parse_version_string(api._Mastodon__normalize_version_string("3.5.1+chitter6.6.6")) == [3, 5, 1]
assert parse_version_string(api._Mastodon__normalize_version_string("3.5.0 (compatible; Pleroma 1.2.3)")) == [3, 5, 0]
assert parse_version_string(api._Mastodon__normalize_version_string("3.2.1rc3 (compatible; Akkoma 3.2.4+shinychariot)")) == [3, 2, 1]
assert parse_version_string(api._Mastodon__normalize_version_string("4.0.2")) == (4, 0, 2)
assert parse_version_string(api._Mastodon__normalize_version_string("2.1.0rc3")) == (2, 1, 0)
assert parse_version_string(api._Mastodon__normalize_version_string("1.0.7+3.5.5")) == (3, 5, 5)
assert parse_version_string(api._Mastodon__normalize_version_string("1.0.7+3.5.5rc2")) == (3, 5, 5)
assert parse_version_string(api._Mastodon__normalize_version_string("3.5.1+chitter")) == (3, 5, 1)
assert parse_version_string(api._Mastodon__normalize_version_string("3.5.1+chitter-6.6.6")) == (3, 5, 1)
assert parse_version_string(api._Mastodon__normalize_version_string("3.5.1rc4+chitter-6.6.6")) == (3, 5, 1)
assert parse_version_string(api._Mastodon__normalize_version_string("3.5.1+chitter6.6.6")) == (3, 5, 1)
assert parse_version_string(api._Mastodon__normalize_version_string("3.5.0 (compatible; Pleroma 1.2.3)")) == (3, 5, 0)
assert parse_version_string(api._Mastodon__normalize_version_string("3.2.1rc3 (compatible; Akkoma 3.2.4+shinychariot)")) == (3, 2, 1)

Wyświetl plik

@ -1,7 +1,13 @@
import pytest
from mastodon.Mastodon import MastodonAPIError, MastodonNotFoundError
import datetime
import pytz
try:
import zoneinfo
timezone = zoneinfo.ZoneInfo
except:
import pytz
timezone = pytz.timezone
import vcr
import time
import pickle
@ -154,7 +160,7 @@ def test_status_pin_unpin(status, api):
@pytest.mark.vcr(match_on=['path'])
def test_scheduled_status(api):
base_time = datetime.datetime(4000, 1, 1, 12, 13, 14, 0, pytz.timezone("Etc/GMT+2"))
base_time = datetime.datetime(4000, 1, 1, 12, 13, 14, 0, timezone("Etc/GMT+2"))
the_future = base_time + datetime.timedelta(minutes=20)
scheduled_toot = api.status_post("please ensure adequate headroom", scheduled_at=the_future)
assert scheduled_toot