toot/tests/test_integration.py

435 wiersze
13 KiB
Python

"""
This module contains integration tests meant to run against a test Mastodon instance.
You can set up a test instance locally by following this guide:
https://docs.joinmastodon.org/dev/setup/
To enable integration tests, export the following environment variables to match
your test server and database:
```
export TOOT_TEST_HOSTNAME="localhost:3000"
export TOOT_TEST_DATABASE_DSN="dbname=mastodon_development"
```
"""
import os
import psycopg2
import pytest
import re
import time
import uuid
from datetime import datetime, timedelta, timezone
from os import path
from toot import CLIENT_NAME, CLIENT_WEBSITE, api, App, User
from toot.console import run_command
from toot.exceptions import ConsoleError, NotFoundError
from toot.utils import get_text
from unittest import mock
# Host name of a test instance to run integration tests against
# DO NOT USE PUBLIC INSTANCES!!!
HOSTNAME = os.getenv("TOOT_TEST_HOSTNAME")
# Mastodon database name, used to confirm user registration without having to click the link
DATABASE_DSN = os.getenv("TOOT_TEST_DATABASE_DSN")
if not HOSTNAME or not DATABASE_DSN:
pytest.skip("Skipping integration tests", allow_module_level=True)
# ------------------------------------------------------------------------------
# Fixtures
# ------------------------------------------------------------------------------
def create_app():
response = api.create_app(HOSTNAME, scheme="http")
return App(HOSTNAME, f"http://{HOSTNAME}", response["client_id"], response["client_secret"])
def register_account(app: App):
username = str(uuid.uuid4())[-10:]
email = f"{username}@example.com"
response = api.register_account(app, username, email, "password", "en")
confirm_user(email)
return User(app.instance, username, response["access_token"])
def confirm_user(email):
conn = psycopg2.connect(DATABASE_DSN)
cursor = conn.cursor()
cursor.execute("UPDATE users SET confirmed_at = now() WHERE email = %s;", (email,))
conn.commit()
@pytest.fixture(scope="session")
def app():
return create_app()
@pytest.fixture(scope="session")
def user(app):
return register_account(app)
@pytest.fixture(scope="session")
def friend(app):
return register_account(app)
@pytest.fixture
def run(app, user, capsys):
def _run(command, *params, as_user=None):
run_command(app, as_user or user, command, params or [])
out, err = capsys.readouterr()
assert err == ""
return strip_ansi(out)
return _run
@pytest.fixture
def run_anon(capsys):
def _run(command, *params):
run_command(None, None, command, params or [])
out, err = capsys.readouterr()
assert err == ""
return strip_ansi(out)
return _run
# ------------------------------------------------------------------------------
# Tests
# ------------------------------------------------------------------------------
def test_instance(app, run):
out = run("instance", "--disable-https")
assert "Mastodon" in out
assert app.instance in out
assert "running Mastodon" in out
def test_instance_anon(app, run_anon):
out = run_anon("instance", "--disable-https", HOSTNAME)
assert "Mastodon" in out
assert app.instance in out
assert "running Mastodon" in out
# Need to specify the instance name when running anon
with pytest.raises(ConsoleError) as exc:
run_anon("instance")
assert str(exc.value) == "Please specify instance name."
def test_post(app, user, run):
text = "i wish i was a #lumberjack"
out = run("post", text)
status_id = _posted_status_id(out)
status = api.fetch_status(app, user, status_id)
assert text == get_text(status["content"])
assert status["visibility"] == "public"
assert status["sensitive"] is False
assert status["spoiler_text"] == ""
# Pleroma doesn't return the application
if status["application"]:
assert status["application"]["name"] == CLIENT_NAME
assert status["application"]["website"] == CLIENT_WEBSITE
def test_post_visibility(app, user, run):
for visibility in ["public", "unlisted", "private", "direct"]:
out = run("post", "foo", "--visibility", visibility)
status_id = _posted_status_id(out)
status = api.fetch_status(app, user, status_id)
assert status["visibility"] == visibility
def test_post_scheduled_at(app, user, run):
text = str(uuid.uuid4())
scheduled_at = datetime.now(timezone.utc).replace(microsecond=0) + timedelta(minutes=10)
out = run("post", text, "--scheduled-at", scheduled_at.isoformat())
assert "Toot scheduled for" in out
statuses = api.scheduled_statuses(app, user)
[status] = [s for s in statuses if s["params"]["text"] == text]
assert datetime.strptime(status["scheduled_at"], "%Y-%m-%dT%H:%M:%S.%f%z") == scheduled_at
def test_post_scheduled_in(app, user, run):
text = str(uuid.uuid4())
variants = [
("1 day", timedelta(days=1)),
("1 day 6 hours", timedelta(days=1, hours=6)),
("1 day 6 hours 13 minutes", timedelta(days=1, hours=6, minutes=13)),
("1 day 6 hours 13 minutes 51 second", timedelta(days=1, hours=6, minutes=13, seconds=51)),
("2d", timedelta(days=2)),
("2d6h", timedelta(days=2, hours=6)),
("2d6h13m", timedelta(days=2, hours=6, minutes=13)),
("2d6h13m51s", timedelta(days=2, hours=6, minutes=13, seconds=51)),
]
datetimes = []
for scheduled_in, delta in variants:
out = run("post", text, "--scheduled-in", scheduled_in)
dttm = datetime.utcnow() + delta
assert out.startswith(f"Toot scheduled for: {str(dttm)[:16]}")
datetimes.append(dttm)
scheduled = api.scheduled_statuses(app, user)
scheduled = [s for s in scheduled if s["params"]["text"] == text]
scheduled = sorted(scheduled, key=lambda s: s["scheduled_at"])
assert len(scheduled) == 8
for expected, status in zip(datetimes, scheduled):
actual = datetime.strptime(status["scheduled_at"], "%Y-%m-%dT%H:%M:%S.%fZ")
delta = expected - actual
assert delta.total_seconds() < 5
def test_media_attachments(app, user, run):
assets_dir = path.realpath(path.join(path.dirname(__file__), "assets"))
path1 = path.join(assets_dir, "test1.png")
path2 = path.join(assets_dir, "test2.png")
path3 = path.join(assets_dir, "test3.png")
path4 = path.join(assets_dir, "test4.png")
out = run(
"post",
"--media", path1,
"--media", path2,
"--media", path3,
"--media", path4,
"--description", "Test 1",
"--description", "Test 2",
"--description", "Test 3",
"--description", "Test 4",
"some text"
)
status_id = _posted_status_id(out)
status = api.fetch_status(app, user, status_id)
[a1, a2, a3, a4] = status["media_attachments"]
# Pleroma doesn't send metadata
if "meta" in a1:
assert a1["meta"]["original"]["size"] == "50x50"
assert a2["meta"]["original"]["size"] == "50x60"
assert a3["meta"]["original"]["size"] == "50x70"
assert a4["meta"]["original"]["size"] == "50x80"
assert a1["description"] == "Test 1"
assert a2["description"] == "Test 2"
assert a3["description"] == "Test 3"
assert a4["description"] == "Test 4"
@mock.patch("toot.utils.multiline_input")
@mock.patch("sys.stdin.read")
def test_media_attachment_without_text(mock_read, mock_ml, app, user, run):
# No status from stdin or readline
mock_read.return_value = ""
mock_ml.return_value = ""
assets_dir = path.realpath(path.join(path.dirname(__file__), "assets"))
media_path = path.join(assets_dir, "test1.png")
out = run("post", "--media", media_path)
status_id = _posted_status_id(out)
status = api.fetch_status(app, user, status_id)
assert status["content"] == ""
[attachment] = status["media_attachments"]
assert not attachment["description"]
# Pleroma doesn't send metadata
if "meta" in attachment:
assert attachment["meta"]["original"]["size"] == "50x50"
def test_delete_status(app, user, run):
status = api.post_status(app, user, "foo")
out = run("delete", status["id"])
assert out == "✓ Status deleted"
with pytest.raises(NotFoundError):
api.fetch_status(app, user, status["id"])
def test_reply_thread(app, user, friend, run):
status = api.post_status(app, friend, "This is the status")
out = run("post", "--reply-to", status["id"], "This is the reply")
status_id = _posted_status_id(out)
reply = api.fetch_status(app, user, status_id)
assert reply["in_reply_to_id"] == status["id"]
out = run("thread", status["id"])
[s1, s2] = [s.strip() for s in re.split(r"─+", out) if s.strip()]
assert "This is the status" in s1
assert "This is the reply" in s2
assert friend.username in s1
assert user.username in s2
assert status["id"] in s1
assert reply["id"] in s2
def test_favourite(app, user, run):
status = api.post_status(app, user, "foo")
assert not status["favourited"]
out = run("favourite", status["id"])
assert out == "✓ Status favourited"
status = api.fetch_status(app, user, status["id"])
assert status["favourited"]
out = run("unfavourite", status["id"])
assert out == "✓ Status unfavourited"
# A short delay is required before the server returns new data
time.sleep(0.1)
status = api.fetch_status(app, user, status["id"])
assert not status["favourited"]
def test_reblog(app, user, run):
status = api.post_status(app, user, "foo")
assert not status["reblogged"]
out = run("reblog", status["id"])
assert out == "✓ Status reblogged"
status = api.fetch_status(app, user, status["id"])
assert status["reblogged"]
out = run("reblogged_by", status["id"])
assert out == f"@{user.username}"
out = run("unreblog", status["id"])
assert out == "✓ Status unreblogged"
status = api.fetch_status(app, user, status["id"])
assert not status["reblogged"]
def test_pin(app, user, run):
status = api.post_status(app, user, "foo")
assert not status["pinned"]
out = run("pin", status["id"])
assert out == "✓ Status pinned"
status = api.fetch_status(app, user, status["id"])
assert status["pinned"]
out = run("unpin", status["id"])
assert out == "✓ Status unpinned"
status = api.fetch_status(app, user, status["id"])
assert not status["pinned"]
def test_bookmark(app, user, run):
status = api.post_status(app, user, "foo")
assert not status["bookmarked"]
out = run("bookmark", status["id"])
assert out == "✓ Status bookmarked"
status = api.fetch_status(app, user, status["id"])
assert status["bookmarked"]
out = run("unbookmark", status["id"])
assert out == "✓ Status unbookmarked"
status = api.fetch_status(app, user, status["id"])
assert not status["bookmarked"]
def test_whoami(user, run):
out = run("whoami")
# TODO: test other fields once updating account is supported
assert f"@{user.username}" in out
assert f"http://{HOSTNAME}/@{user.username}" in out
def test_whois(friend, run):
out = run("whois", friend.username)
assert f"@{friend.username}" in out
assert f"http://{HOSTNAME}/@{friend.username}" in out
def test_search_account(friend, run):
out = run("search", friend.username)
assert out == f"Accounts:\n* @{friend.username}"
def test_search_hashtag(app, user, run):
api.post_status(app, user, "#hashtag_x")
api.post_status(app, user, "#hashtag_y")
api.post_status(app, user, "#hashtag_z")
out = run("search", "#hashtag")
assert out == "Hashtags:\n#hashtag_x, #hashtag_y, #hashtag_z"
def test_follow(friend, run):
out = run("follow", friend.username)
assert out == f"✓ You are now following {friend.username}"
out = run("unfollow", friend.username)
assert out == f"✓ You are no longer following {friend.username}"
def test_follow_case_insensitive(friend, run):
username = friend.username.upper()
out = run("follow", username)
assert out == f"✓ You are now following {username}"
out = run("unfollow", username)
assert out == f"✓ You are no longer following {username}"
# TODO: improve testing stderr, catching exceptions is not optimal
def test_follow_not_found(run):
with pytest.raises(ConsoleError) as ex_info:
run("follow", "banana")
assert str(ex_info.value) == "Account not found"
# ------------------------------------------------------------------------------
# Utils
# ------------------------------------------------------------------------------
strip_ansi_pattern = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])")
def strip_ansi(string):
return strip_ansi_pattern.sub("", string).strip()
def _posted_status_id(out):
pattern = re.compile(r"Toot posted: http://([^/]+)/([^/]+)/(.+)")
match = re.search(pattern, out)
assert match
host, _, status_id = match.groups()
assert host == HOSTNAME
return status_id