kopia lustrzana https://github.com/halcy/Mastodon.py
commit
a73fce4b81
|
@ -42,7 +42,7 @@ class Mastodon(Internals):
|
|||
if role_ids is not None:
|
||||
if not isinstance(role_ids, list):
|
||||
role_ids = [role_ids]
|
||||
role_ids = list(map(self.__unpack_id, role_ids))
|
||||
role_ids = [self.__unpack_id(x) for x in role_ids]
|
||||
|
||||
if invited_by is not None:
|
||||
invited_by = self.__unpack_id(invited_by)
|
||||
|
|
|
@ -90,7 +90,7 @@ class Mastodon(Internals):
|
|||
|
||||
if not isinstance(account_ids, list):
|
||||
account_ids = [account_ids]
|
||||
account_ids = list(map(lambda x: self.__unpack_id(x), account_ids))
|
||||
account_ids = [self.__unpack_id(x) for x in account_ids]
|
||||
|
||||
params = self.__generate_params(locals(), ['id'])
|
||||
self.__api_request('POST', f'/api/v1/lists/{id}/accounts', params)
|
||||
|
@ -104,7 +104,7 @@ class Mastodon(Internals):
|
|||
|
||||
if not isinstance(account_ids, list):
|
||||
account_ids = [account_ids]
|
||||
account_ids = list(map(lambda x: self.__unpack_id(x), account_ids))
|
||||
account_ids = [self.__unpack_id(x) for x in account_ids]
|
||||
|
||||
params = self.__generate_params(locals(), ['id'])
|
||||
self.__api_request('DELETE', f'/api/v1/lists/{id}/accounts', params)
|
|
@ -52,7 +52,7 @@ class Mastodon(Internals):
|
|||
if status_ids is not None:
|
||||
if not isinstance(status_ids, list):
|
||||
status_ids = [status_ids]
|
||||
status_ids = list(map(lambda x: self.__unpack_id(x), status_ids))
|
||||
status_ids = [self.__unpack_id(x) for x in status_ids]
|
||||
|
||||
params_initial = locals()
|
||||
if not forward:
|
||||
|
|
|
@ -341,8 +341,7 @@ class Mastodon(Internals):
|
|||
mentioned_accounts[account.id] = account.acct
|
||||
|
||||
# Join into one piece of text. The space is added inside because of self-replies.
|
||||
status = "".join(map(lambda x: "@" + x + " ",
|
||||
mentioned_accounts.values())) + status
|
||||
status = " ".join(f"@{x}" for x in mentioned_accounts.values()) + " " + status
|
||||
|
||||
# Retain visibility / cw
|
||||
if visibility is None and 'visibility' in to_status:
|
||||
|
|
|
@ -206,14 +206,14 @@ def test_account_pin_unpin(api, api2):
|
|||
try:
|
||||
assert relationship
|
||||
assert relationship['endorsed']
|
||||
assert user["id"] in map(lambda x: x["id"], endorsed)
|
||||
assert any(x["id"] == user["id"] for x in endorsed)
|
||||
finally:
|
||||
relationship = api.account_unpin(user)
|
||||
endorsed2 = api.endorsements()
|
||||
api.account_unfollow(user)
|
||||
assert relationship
|
||||
assert not relationship['endorsed']
|
||||
assert not user["id"] in map(lambda x: x["id"], endorsed2)
|
||||
assert not any(x["id"] == user["id"] for x in endorsed2)
|
||||
|
||||
@pytest.mark.vcr()
|
||||
def test_preferences(api):
|
||||
|
|
|
@ -173,7 +173,7 @@ def test_admin_domain_blocks(api2):
|
|||
assert block3.public_comment == "sicko behaviour"
|
||||
assert block3.private_comment == "jk ilu <3"
|
||||
api2.admin_delete_domain_block(block2)
|
||||
assert not block3.id in map(lambda x: x.id, api2.admin_domain_blocks())
|
||||
assert not any(x.id == block3.id for x in api2.admin_domain_blocks())
|
||||
|
||||
@pytest.mark.vcr(match_on=['path'])
|
||||
def test_admin_stats(api2):
|
||||
|
|
|
@ -58,10 +58,11 @@ def test_filter_serverside(api, api2):
|
|||
time.sleep(2)
|
||||
tl = api.timeline_home()
|
||||
try:
|
||||
assert not status_1['id'] in map(lambda st: st['id'], tl)
|
||||
assert not status_2['id'] in map(lambda st: st['id'], tl)
|
||||
assert status_3['id'] in map(lambda st: st['id'], tl)
|
||||
assert status_4['id'] in map(lambda st: st['id'], tl)
|
||||
st_ids = {st["id"] for st in tl}
|
||||
assert status_1["id"] not in st_ids
|
||||
assert status_2["id"] not in st_ids
|
||||
assert status_3["id"] in st_ids
|
||||
assert status_4["id"] in st_ids
|
||||
finally:
|
||||
api.filter_delete(keyword_filter_1)
|
||||
api.filter_delete(keyword_filter_2)
|
||||
|
@ -94,16 +95,18 @@ def test_filter_clientside(api, api2):
|
|||
|
||||
tl = api.timeline_home()
|
||||
try:
|
||||
assert status_1['id'] in map(lambda st: st['id'], tl)
|
||||
assert status_2['id'] in map(lambda st: st['id'], tl)
|
||||
assert status_3['id'] in map(lambda st: st['id'], tl)
|
||||
assert status_4['id'] in map(lambda st: st['id'], tl)
|
||||
st_ids = {st["id"] for st in tl}
|
||||
assert status_1['id'] in st_ids
|
||||
assert status_2['id'] in st_ids
|
||||
assert status_3['id'] in st_ids
|
||||
assert status_4['id'] in st_ids
|
||||
|
||||
filtered = api.filters_apply(tl, [keyword_filter_1, keyword_filter_2, keyword_filter_3], 'home')
|
||||
assert not status_1['id'] in map(lambda st: st['id'], filtered)
|
||||
assert not status_2['id'] in map(lambda st: st['id'], filtered)
|
||||
assert status_3['id'] in map(lambda st: st['id'], filtered)
|
||||
assert status_4['id'] in map(lambda st: st['id'], filtered)
|
||||
st_ids = {st["id"] for st in filtered}
|
||||
assert status_1['id'] not in st_ids
|
||||
assert status_2['id'] not in st_ids
|
||||
assert status_3['id'] in st_ids
|
||||
assert status_4['id'] in st_ids
|
||||
finally:
|
||||
api.filter_delete(keyword_filter_1)
|
||||
api.filter_delete(keyword_filter_2)
|
||||
|
|
|
@ -24,14 +24,14 @@ def test_list_add_remove_account(api, api2, mastodon_list):
|
|||
|
||||
api.account_follow(user)
|
||||
api.list_accounts_add(mastodon_list, user)
|
||||
assert user.id in map(lambda x: x.id, api.list_accounts(mastodon_list))
|
||||
assert any(x.id == user.id for x in api.list_accounts(mastodon_list))
|
||||
|
||||
api.account_unfollow(user)
|
||||
assert len(api.list_accounts(mastodon_list)) == 0
|
||||
|
||||
api.account_follow(user)
|
||||
api.list_accounts_add(mastodon_list, user)
|
||||
assert user.id in map(lambda x: x.id, api.list_accounts(mastodon_list))
|
||||
assert any(x.id == user.id for x in api.list_accounts(mastodon_list))
|
||||
|
||||
api.list_accounts_delete(mastodon_list, user)
|
||||
assert len(api.list_accounts(mastodon_list)) == 0
|
||||
|
@ -56,8 +56,7 @@ def test_list_timeline(api, api2, mastodon_list):
|
|||
|
||||
status = api2.status_post("I have never stolen a ham in my life.", visibility="public")
|
||||
time.sleep(2)
|
||||
list_tl = list(map(lambda x: x.id, api.timeline_list(mastodon_list)))
|
||||
assert status.id in list_tl
|
||||
assert any(x.id == status.id for x in api.timeline_list(mastodon_list))
|
||||
|
||||
api2.status_delete(status)
|
||||
api.account_unfollow(user)
|
||||
|
|
|
@ -172,14 +172,14 @@ def test_scheduled_status(api):
|
|||
assert scheduled_toot_2.scheduled_at < scheduled_toot.scheduled_at
|
||||
|
||||
scheduled_toot_list = api.scheduled_statuses()
|
||||
assert scheduled_toot_2.id in map(lambda x: x.id, scheduled_toot_list)
|
||||
assert any(x.id == scheduled_toot_2.id for x in scheduled_toot_list)
|
||||
|
||||
scheduled_toot_3 = api.scheduled_status(scheduled_toot.id)
|
||||
assert scheduled_toot_2.id == scheduled_toot_3.id
|
||||
|
||||
api.scheduled_status_delete(scheduled_toot_2)
|
||||
scheduled_toot_list_2 = api.scheduled_statuses()
|
||||
assert not scheduled_toot_2.id in map(lambda x: x.id, scheduled_toot_list_2)
|
||||
assert not any(x.id == scheduled_toot_2.id for x in scheduled_toot_list_2)
|
||||
|
||||
if os.path.exists("tests/cassettes/test_scheduled_status_datetimeobjects.pkl"):
|
||||
the_very_immediate_future = datetime.datetime.fromtimestamp(pickle.load(open("tests/cassettes/test_scheduled_status_datetimeobjects.pkl", 'rb')))
|
||||
|
@ -190,8 +190,8 @@ def test_scheduled_status(api):
|
|||
time.sleep(15)
|
||||
statuses = api.timeline_home()
|
||||
scheduled_toot_list_3 = api.scheduled_statuses()
|
||||
assert scheduled_toot_4.id in map(lambda x: x.id, statuses)
|
||||
assert not scheduled_toot_4.id in map(lambda x: x.id, scheduled_toot_list_3)
|
||||
assert any(x.id == scheduled_toot_4.id for x in statuses)
|
||||
assert not any(x.id == scheduled_toot_4.id for x in scheduled_toot_list_3)
|
||||
|
||||
# The following two tests need to be manually (!) ran 10 minutes apart when recording.
|
||||
# Sorry, I can't think of a better way to test scheduled statuses actually work as intended.
|
||||
|
@ -205,7 +205,7 @@ def test_scheduled_status_long_part1(api):
|
|||
pickle.dump(the_medium_term_future.timestamp(), open("tests/cassettes_special/test_scheduled_status_long_datetimeobjects.pkl", 'wb'))
|
||||
scheduled_toot = api.status_post(f"please ensure maximum headroom at {the_medium_term_future}", scheduled_at=the_medium_term_future)
|
||||
scheduled_toot_list = api.scheduled_statuses()
|
||||
assert scheduled_toot.id in map(lambda x: x.id, scheduled_toot_list)
|
||||
assert any(x.id == scheduled_toot.id for x in scheduled_toot_list)
|
||||
pickle.dump(scheduled_toot.params.text, open("tests/cassettes_special/test_scheduled_status_long_text.pkl", 'wb'))
|
||||
|
||||
@pytest.mark.vcr(match_on=['path'])
|
||||
|
|
|
@ -323,17 +323,17 @@ def test_stream_user_direct(api, api2, api3):
|
|||
conversations = []
|
||||
edits = []
|
||||
listener = CallbackStreamListener(
|
||||
update_handler = lambda x: updates.append(x),
|
||||
local_update_handler = lambda x: local_updates.append(x),
|
||||
notification_handler = lambda x: notifications.append(x),
|
||||
delete_handler = lambda x: deletes.append(x),
|
||||
conversation_handler = lambda x: conversations.append(x),
|
||||
status_update_handler = lambda x: edits.append(x),
|
||||
filters_changed_handler = lambda x: 0,
|
||||
announcement_handler = lambda x: 0,
|
||||
announcement_reaction_handler = lambda x: 0,
|
||||
announcement_delete_handler = lambda x: 0,
|
||||
encryted_message_handler = lambda x: 0,
|
||||
update_handler=updates.append,
|
||||
local_update_handler=local_updates.append,
|
||||
notification_handler=notifications.append,
|
||||
delete_handler=deletes.append,
|
||||
conversation_handler=conversations.append,
|
||||
status_update_handler=edits.append,
|
||||
filters_changed_handler=lambda x: 0,
|
||||
announcement_handler=lambda x: 0,
|
||||
announcement_reaction_handler=lambda x: 0,
|
||||
announcement_delete_handler=lambda x: 0,
|
||||
encryted_message_handler=lambda x: 0,
|
||||
)
|
||||
|
||||
posted = []
|
||||
|
@ -384,7 +384,7 @@ def test_stream_user_local(api, api2):
|
|||
|
||||
updates = []
|
||||
listener = CallbackStreamListener(
|
||||
local_update_handler = lambda x: updates.append(x),
|
||||
local_update_handler=updates.append,
|
||||
)
|
||||
|
||||
posted = []
|
||||
|
@ -412,7 +412,7 @@ def test_stream_direct(api, api2):
|
|||
|
||||
conversations = []
|
||||
listener = CallbackStreamListener(
|
||||
conversation_handler = lambda x: conversations.append(x),
|
||||
conversation_handler=conversations.append,
|
||||
)
|
||||
|
||||
def do_activities():
|
||||
|
|
|
@ -9,14 +9,14 @@ import os
|
|||
def test_public_tl_anonymous(api_anonymous, status3):
|
||||
time.sleep(3)
|
||||
tl = api_anonymous.timeline_public()
|
||||
assert status3['id'] in list(map(lambda st: st['id'], tl))
|
||||
assert any(st["id"] == status3["id"] for st in tl)
|
||||
|
||||
@pytest.mark.vcr()
|
||||
def test_public_tl(api, status):
|
||||
public = api.timeline_public()
|
||||
local = api.timeline_local()
|
||||
assert status['id'] in map(lambda st: st['id'], public)
|
||||
assert status['id'] in map(lambda st: st['id'], local)
|
||||
assert any(st["id"] == status["id"] for st in public)
|
||||
assert any(st["id"] == status["id"] for st in local)
|
||||
|
||||
@pytest.mark.vcr()
|
||||
def test_unauthed_home_tl_throws(api_anonymous, status):
|
||||
|
@ -27,14 +27,14 @@ def test_unauthed_home_tl_throws(api_anonymous, status):
|
|||
def test_home_tl(api, status):
|
||||
time.sleep(3)
|
||||
tl = api.timeline_home()
|
||||
assert status['id'] in map(lambda st: st['id'], tl)
|
||||
assert any(st["id"] == status["id"] for st in tl)
|
||||
|
||||
@pytest.mark.vcr()
|
||||
def test_hashtag_tl(api):
|
||||
status = api.status_post('#hoot (hashtag toot)')
|
||||
tl = api.timeline_hashtag('hoot')
|
||||
try:
|
||||
assert status['id'] in map(lambda st: st['id'], tl)
|
||||
assert any(st["id"] == status["id"] for st in tl)
|
||||
finally:
|
||||
api.status_delete(status['id'])
|
||||
|
||||
|
@ -58,8 +58,8 @@ def test_conversations(api, api2):
|
|||
conversations2 = api2.conversations()
|
||||
api.status_delete(status)
|
||||
assert conversations
|
||||
assert status.id in map(lambda x: x.last_status.id, conversations)
|
||||
assert account.id in map(lambda x: x.accounts[0].id, conversations)
|
||||
assert any(x.last_status.id == status.id for x in conversations)
|
||||
assert any(x.accounts[0].id == account.id for x in conversations)
|
||||
assert conversations[0].unread is True
|
||||
assert conversations2[0].unread is False
|
||||
|
||||
|
@ -67,16 +67,16 @@ def test_conversations(api, api2):
|
|||
def test_min_max_id(api, status):
|
||||
time.sleep(3)
|
||||
tl = api.timeline_home(min_id = status.id - 1000, max_id = status.id + 1000)
|
||||
assert status['id'] in map(lambda st: st['id'], tl)
|
||||
assert any(st["id"] == status["id"] for st in tl)
|
||||
|
||||
tl = api.timeline_home(min_id = status.id - 2000, max_id = status.id - 1000)
|
||||
assert not status['id'] in map(lambda st: st['id'], tl)
|
||||
assert not any(st["id"] == status["id"] for st in tl)
|
||||
|
||||
tl = api.timeline_home(min_id = status.id + 1000, max_id = status.id + 2000)
|
||||
assert not status['id'] in map(lambda st: st['id'], tl)
|
||||
assert not any(st["id"] == status["id"] for st in tl)
|
||||
|
||||
tl = api.timeline_home(since_id = status.id - 1000)
|
||||
assert status['id'] in map(lambda st: st['id'], tl)
|
||||
assert any(st["id"] == status["id"] for st in tl)
|
||||
|
||||
@pytest.mark.vcr()
|
||||
def test_min_max_id_datetimes(api, status):
|
||||
|
@ -99,7 +99,7 @@ def test_min_max_id_datetimes(api, status):
|
|||
|
||||
time.sleep(3)
|
||||
tl = api.timeline_home(min_id = the_past, max_id = the_future)
|
||||
assert status['id'] in map(lambda st: st['id'], tl)
|
||||
assert any(st["id"] == status["id"] for st in tl)
|
||||
|
||||
tl = api.timeline_home(min_id = the_future, max_id = the_far_future)
|
||||
assert not status['id'] in map(lambda st: st['id'], tl)
|
||||
assert not any(st["id"] == status["id"] for st in tl)
|
||||
|
|
Ładowanie…
Reference in New Issue