kopia lustrzana https://github.com/cloudflare/wildebeest
Move activity; move followers
rodzic
2f49056e3b
commit
5867841881
|
|
@ -17,7 +17,7 @@ import {
|
|||
import { type APObject, updateObject } from 'wildebeest/backend/src/activitypub/objects'
|
||||
import { parseHandle } from 'wildebeest/backend/src/utils/parse'
|
||||
import type { Note } from 'wildebeest/backend/src/activitypub/objects/note'
|
||||
import { addFollowing, acceptFollowing } from 'wildebeest/backend/src/mastodon/follow'
|
||||
import { addFollowing, acceptFollowing, moveFollowers } from 'wildebeest/backend/src/mastodon/follow'
|
||||
import { deliverToActor } from 'wildebeest/backend/src/activitypub/deliver'
|
||||
import { getSigningKey } from 'wildebeest/backend/src/mastodon/account'
|
||||
import { insertLike } from 'wildebeest/backend/src/mastodon/like'
|
||||
|
|
@ -26,6 +26,7 @@ import { insertReply } from 'wildebeest/backend/src/mastodon/reply'
|
|||
import type { Activity } from 'wildebeest/backend/src/activitypub/activities'
|
||||
import { originalActorIdSymbol, deleteObject } from 'wildebeest/backend/src/activitypub/objects'
|
||||
import { hasReblog } from 'wildebeest/backend/src/mastodon/reblog'
|
||||
import { getMetadata, loadItems } from 'wildebeest/backend/src/activitypub/objects/collection'
|
||||
|
||||
function extractID(domain: string, s: string | URL): string {
|
||||
return s.toString().replace(`https://${domain}/ap/users/`, '')
|
||||
|
|
@ -367,6 +368,44 @@ export async function handle(
|
|||
break
|
||||
}
|
||||
|
||||
// https://www.w3.org/TR/activitystreams-vocabulary/#dfn-move
|
||||
case 'Move': {
|
||||
const fromActorId = getActorAsId()
|
||||
const target = new URL(activity.target)
|
||||
|
||||
if (target.hostname !== domain) {
|
||||
console.warn("Moving actor isn't local")
|
||||
break
|
||||
}
|
||||
|
||||
const fromActor = await actors.getAndCache(fromActorId, db)
|
||||
|
||||
const localActor = await actors.getActorById(db, target)
|
||||
if (localActor === null) {
|
||||
console.warn(`actor ${target} not found`)
|
||||
break
|
||||
}
|
||||
|
||||
// move followers
|
||||
{
|
||||
const collection = await getMetadata(fromActor.followers)
|
||||
collection.items = await loadItems(collection)
|
||||
|
||||
// TODO: eventually move to queue and move workers
|
||||
while (collection.items.length > 0) {
|
||||
const batch = collection.items.splice(0, 20)
|
||||
await Promise.all(
|
||||
batch.map(async (items) => {
|
||||
await moveFollowers(db, localActor, items)
|
||||
console.log(`moved ${items.length} followers`)
|
||||
})
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
break
|
||||
}
|
||||
|
||||
default:
|
||||
console.warn(`Unsupported activity: ${activity.type}`)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -11,6 +11,7 @@ export interface Collection<T> extends APObject {
|
|||
export interface OrderedCollection<T> extends Collection<T> {}
|
||||
|
||||
export interface OrderedCollectionPage<T> extends APObject {
|
||||
next?: string
|
||||
orderedItems: Array<T>
|
||||
}
|
||||
|
||||
|
|
@ -29,13 +30,33 @@ export async function getMetadata(url: URL): Promise<OrderedCollection<any>> {
|
|||
|
||||
// eslint-disable-next-line @typescript-eslint/no-unused-vars
|
||||
export async function loadItems<T>(collection: OrderedCollection<T>, max?: number): Promise<Array<T>> {
|
||||
// FIXME: implement max and multi page support
|
||||
// FIXME: implement max
|
||||
|
||||
const res = await fetch(collection.first, { headers })
|
||||
if (!res.ok) {
|
||||
throw new Error(`${collection.first} returned ${res.status}`)
|
||||
const items = []
|
||||
let pageUrl = collection.first
|
||||
|
||||
while (true) {
|
||||
const page = await loadPage<T>(pageUrl)
|
||||
if (page === null) {
|
||||
break
|
||||
}
|
||||
items.push(...page.orderedItems)
|
||||
if (page.next) {
|
||||
pageUrl = new URL(page.next)
|
||||
} else {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
const data = await res.json<OrderedCollectionPage<T>>()
|
||||
return data.orderedItems
|
||||
return items
|
||||
}
|
||||
|
||||
export async function loadPage<T>(url: URL): Promise<null | OrderedCollectionPage<T>> {
|
||||
const res = await fetch(url, { headers })
|
||||
if (!res.ok) {
|
||||
console.warn(`${url} return ${res.status}`)
|
||||
return null
|
||||
}
|
||||
|
||||
return res.json<OrderedCollectionPage<T>>()
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,9 +1,34 @@
|
|||
import type { Actor } from 'wildebeest/backend/src/activitypub/actors'
|
||||
import * as actors from 'wildebeest/backend/src/activitypub/actors'
|
||||
import { urlToHandle } from 'wildebeest/backend/src/utils/handle'
|
||||
import { getResultsField } from './utils'
|
||||
|
||||
const STATE_PENDING = 'pending'
|
||||
const STATE_ACCEPTED = 'accepted'
|
||||
|
||||
// During a migration we move the followers from the old Actor to the new
|
||||
export async function moveFollowers(db: D1Database, actor: Actor, followers: Array<string>): Promise<void> {
|
||||
const batch = []
|
||||
const stmt = db.prepare(`
|
||||
INSERT OR IGNORE
|
||||
INTO actor_following (id, actor_id, target_actor_id, target_actor_acct, state)
|
||||
VALUES (?1, ?2, ?3, ?4, 'accepted');
|
||||
`)
|
||||
|
||||
const actorId = actor.id.toString()
|
||||
const actorAcc = urlToHandle(actor.id)
|
||||
|
||||
for (let i = 0; i < followers.length; i++) {
|
||||
const follower = new URL(followers[i])
|
||||
const followActor = await actors.getAndCache(follower, db)
|
||||
|
||||
const id = crypto.randomUUID()
|
||||
batch.push(stmt.bind(id, followActor.id.toString(), actorId, actorAcc))
|
||||
}
|
||||
|
||||
await db.batch(batch)
|
||||
}
|
||||
|
||||
// Add a pending following
|
||||
export async function addFollowing(db: D1Database, actor: Actor, target: Actor, targetAcct: string): Promise<string> {
|
||||
const id = crypto.randomUUID()
|
||||
|
|
|
|||
|
|
@ -14,6 +14,7 @@ import * as ap_inbox from 'wildebeest/functions/ap/users/[id]/inbox'
|
|||
import * as ap_outbox_page from 'wildebeest/functions/ap/users/[id]/outbox/page'
|
||||
import { createStatus } from '../src/mastodon/status'
|
||||
import { mastodonIdSymbol } from 'wildebeest/backend/src/activitypub/objects'
|
||||
import { loadItems } from 'wildebeest/backend/src/activitypub/objects/collection'
|
||||
|
||||
const userKEK = 'test_kek5'
|
||||
const sleep = (ms: number) => new Promise((r) => setTimeout(r, ms))
|
||||
|
|
@ -317,4 +318,44 @@ describe('ActivityPub', () => {
|
|||
assert.equal(msg.activity.type, 'some activity')
|
||||
})
|
||||
})
|
||||
|
||||
describe('Collection', () => {
|
||||
test('loadItems walks pages', async () => {
|
||||
const collection = {
|
||||
totalItems: 6,
|
||||
first: 'https://example.com/1',
|
||||
} as any
|
||||
|
||||
globalThis.fetch = async (input: RequestInfo) => {
|
||||
if (input.toString() === 'https://example.com/1') {
|
||||
return new Response(
|
||||
JSON.stringify({
|
||||
next: 'https://example.com/2',
|
||||
orderedItems: ['a', 'b'],
|
||||
})
|
||||
)
|
||||
}
|
||||
if (input.toString() === 'https://example.com/2') {
|
||||
return new Response(
|
||||
JSON.stringify({
|
||||
next: 'https://example.com/3',
|
||||
orderedItems: ['c', 'd'],
|
||||
})
|
||||
)
|
||||
}
|
||||
if (input.toString() === 'https://example.com/3') {
|
||||
return new Response(
|
||||
JSON.stringify({
|
||||
orderedItems: ['e', 'f'],
|
||||
})
|
||||
)
|
||||
}
|
||||
|
||||
throw new Error(`unexpected request to "${input}"`)
|
||||
}
|
||||
|
||||
const items = await loadItems(collection)
|
||||
assert.deepEqual(items, ['a', 'b', 'c', 'd', 'e', 'f'])
|
||||
})
|
||||
})
|
||||
})
|
||||
|
|
|
|||
|
|
@ -8,7 +8,10 @@ import * as mutes from 'wildebeest/functions/api/v1/mutes'
|
|||
import * as blocks from 'wildebeest/functions/api/v1/blocks'
|
||||
import { makeDB, assertCORS, assertJSON, assertCache, generateVAPIDKeys } from './utils'
|
||||
import { enrichStatus } from 'wildebeest/backend/src/mastodon/microformats'
|
||||
import { moveFollowers } from 'wildebeest/backend/src/mastodon/follow'
|
||||
import { createPerson } from 'wildebeest/backend/src/activitypub/actors'
|
||||
|
||||
const userKEK = 'test_kek23'
|
||||
const domain = 'cloudflare.com'
|
||||
|
||||
describe('Mastodon APIs', () => {
|
||||
|
|
@ -227,4 +230,43 @@ describe('Mastodon APIs', () => {
|
|||
})
|
||||
})
|
||||
})
|
||||
|
||||
describe('Follow', () => {
|
||||
test('move followers', async () => {
|
||||
const db = await makeDB()
|
||||
const actor = await createPerson(domain, db, userKEK, 'sven@cloudflare.com')
|
||||
|
||||
globalThis.fetch = async (input: RequestInfo) => {
|
||||
if (input === 'https://example.com/user/a') {
|
||||
return new Response(JSON.stringify({ id: 'https://example.com/user/a', type: 'Actor' }))
|
||||
}
|
||||
if (input === 'https://example.com/user/b') {
|
||||
return new Response(JSON.stringify({ id: 'https://example.com/user/b', type: 'Actor' }))
|
||||
}
|
||||
if (input === 'https://example.com/user/c') {
|
||||
return new Response(JSON.stringify({ id: 'https://example.com/user/c', type: 'Actor' }))
|
||||
}
|
||||
|
||||
throw new Error(`unexpected request to "${input}"`)
|
||||
}
|
||||
|
||||
const followers = ['https://example.com/user/a', 'https://example.com/user/b', 'https://example.com/user/c']
|
||||
|
||||
await moveFollowers(db, actor, followers)
|
||||
|
||||
const { results, success } = await db.prepare('SELECT * FROM actor_following').all<any>()
|
||||
assert(success)
|
||||
assert(results)
|
||||
assert.equal(results.length, 3)
|
||||
assert.equal(results[0].state, 'accepted')
|
||||
assert.equal(results[0].actor_id, 'https://example.com/user/a')
|
||||
assert.equal(results[0].target_actor_acct, 'sven@cloudflare.com')
|
||||
assert.equal(results[1].state, 'accepted')
|
||||
assert.equal(results[1].actor_id, 'https://example.com/user/b')
|
||||
assert.equal(results[1].target_actor_acct, 'sven@cloudflare.com')
|
||||
assert.equal(results[2].state, 'accepted')
|
||||
assert.equal(results[2].actor_id, 'https://example.com/user/c')
|
||||
assert.equal(results[2].target_actor_acct, 'sven@cloudflare.com')
|
||||
})
|
||||
})
|
||||
})
|
||||
|
|
|
|||
Ładowanie…
Reference in New Issue