MOW-75: deliver to followers via Queue

pull/108/head
Sven Sauleau 2023-01-13 10:56:24 +00:00
rodzic 2ddfa6393b
commit c0a6911570
21 zmienionych plików z 378 dodań i 192 usunięć

Wyświetl plik

@ -249,7 +249,7 @@ jobs:
yarn build
cp -rv ./frontend/dist/* .
# remove folder that aren't needed in Pages before we upload
rm -rf ./tf ./scripts ./.github ./.npm ./consumer
rm -rf ./tf ./scripts ./.github ./.npm ./consumer ./*.md
echo "******"
command: pages publish --project-name=wildebeest-${{ env.OWNER_LOWER }} .
env:

Wyświetl plik

@ -1,6 +1,7 @@
// https://www.w3.org/TR/activitypub/#delivery
import * as actors from 'wildebeest/backend/src/activitypub/actors'
import type { MessageSendRequest, Queue, DeliverMessageBody } from 'wildebeest/backend/src/types/queue'
import { MessageType } from 'wildebeest/backend/src/types/queue'
import type { Activity } from './activities'
import type { Actor } from './actors'
import { generateDigestHeader } from 'wildebeest/backend/src/utils/http-signing-cavage'
@ -34,42 +35,30 @@ export async function deliverToActor(signingKey: CryptoKey, from: Actor, to: Act
}
}
export async function deliverFollowers(db: D1Database, signingKey: CryptoKey, from: Actor, activity: Activity) {
const body = JSON.stringify(activity)
export async function deliverFollowers(
db: D1Database,
userKEK: string,
from: Actor,
activity: Activity,
queue: Queue<DeliverMessageBody>
) {
const followers = await getFollowers(db, from)
const promises = followers.map(async (id) => {
const follower = new URL(id)
const messages: Array<MessageSendRequest<DeliverMessageBody>> = await Promise.all(
followers.map(async (id) => {
const body = {
// Make sure the object is supported by `structuredClone()`, ie
// removing the URL objects as they aren't clonabled.
activity: JSON.parse(JSON.stringify(activity)),
// FIXME: When an actor follows another Actor we should download its object
// locally, so we can retrieve the Actor's inbox without a request.
const targetActor = await actors.getAndCache(follower, db)
if (targetActor === null) {
console.warn(`actor ${follower} not found`)
return
}
const req = new Request(targetActor.inbox, {
method: 'POST',
body,
headers,
actorId: from.id.toString(),
toActorId: id,
type: MessageType.Deliver,
userKEK,
}
return { body }
})
const digest = await generateDigestHeader(body)
req.headers.set('Digest', digest)
await signRequest(req, signingKey, new URL(from.id))
)
const res = await fetch(req)
if (!res.ok) {
const body = await res.text()
console.error(`delivery to ${targetActor.inbox} returned ${res.status}: ${body}`)
return
}
{
const body = await res.text()
console.log(`${targetActor.inbox} returned 200: ${body}`)
}
})
await Promise.allSettled(promises)
await queue.sendBatch(messages)
}

Wyświetl plik

@ -14,7 +14,7 @@ export async function errorHandling(context: EventContext<Env, any, any>) {
if (sentry !== null) {
sentry.captureException(err)
}
console.error(err)
console.error(err.stack)
return internalServerError()
}
}

Wyświetl plik

@ -1,8 +1,10 @@
import type { Queue, MessageBody } from 'wildebeest/backend/src/types/queue'
export interface Env {
DATABASE: D1Database
KV_CACHE: KVNamespace
userKEK: string
QUEUE: Queue
QUEUE: Queue<MessageBody>
CF_ACCOUNT_ID: string
CF_API_TOKEN: string

Wyświetl plik

@ -1,15 +1,42 @@
import type { Activity } from 'wildebeest/backend/src/activitypub/activities'
import type { JWK } from 'wildebeest/backend/src/webpush/jwk'
export type MessageType = 'activity'
export enum MessageType {
Inbox = 1,
Deliver,
}
export type MessageBody = {
export interface MessageBody {
type: MessageType
actorId: string
content: Activity
}
// ActivityPub messages received by an Actor's Inbox are sent into the queue.
export interface InboxMessageBody extends MessageBody {
activity: Activity
// Send secrets as part of the message because it's too complicated
// to bind them to the consumer worker.
userKEK: string
vapidKeys: JWK
}
// ActivityPub message delivery job are sent to the queue and the consumer does
// the actual delivery.
export interface DeliverMessageBody extends MessageBody {
activity: Activity
toActorId: string
// Send secrets as part of the message because it's too complicated
// to bind them to the consumer worker.
userKEK: string
}
export type MessageSendRequest<Body = MessageBody> = {
body: Body
}
export interface Queue<Body = MessageBody> {
send(body: Body): Promise<void>
sendBatch(messages: Iterable<MessageSendRequest<Body>>): Promise<void>
}

Wyświetl plik

@ -1,4 +1,5 @@
import { makeDB, isUrlValid } from './utils'
import { MessageType } from 'wildebeest/backend/src/types/queue'
import type { JWK } from 'wildebeest/backend/src/webpush/jwk'
import { createPerson } from 'wildebeest/backend/src/activitypub/actors'
import { createPublicNote } from 'wildebeest/backend/src/activitypub/objects/note'
@ -148,9 +149,9 @@ describe('ActivityPub', () => {
assert.equal(res.status, 200)
assert(msg)
assert.equal(msg.type, 'activity')
assert.equal(msg.type, MessageType.Inbox)
assert.equal(msg.actorId, actor.id.toString())
assert.equal(msg.content.type, 'some activity')
assert.equal(msg.activity.type, 'some activity')
})
})
})

Wyświetl plik

@ -1,4 +1,5 @@
import { strict as assert } from 'node:assert/strict'
import { MessageType } from 'wildebeest/backend/src/types/queue'
import { addObjectInOutbox } from 'wildebeest/backend/src/activitypub/actors/outbox'
import { createPublicNote } from 'wildebeest/backend/src/activitypub/objects/note'
import * as accounts_following from 'wildebeest/functions/api/v1/accounts/[id]/following'
@ -10,7 +11,7 @@ import * as accounts_follow from 'wildebeest/functions/api/v1/accounts/[id]/foll
import * as accounts_unfollow from 'wildebeest/functions/api/v1/accounts/[id]/unfollow'
import * as accounts_statuses from 'wildebeest/functions/api/v1/accounts/[id]/statuses'
import * as accounts_get from 'wildebeest/functions/api/v1/accounts/[id]'
import { isUrlValid, makeDB, assertCORS, assertJSON } from '../utils'
import { isUrlValid, makeDB, assertCORS, assertJSON, makeQueue } from '../utils'
import * as accounts_verify_creds from 'wildebeest/functions/api/v1/accounts/verify_credentials'
import * as accounts_update_creds from 'wildebeest/functions/api/v1/accounts/update_credentials'
import { createPerson, getPersonById } from 'wildebeest/backend/src/activitypub/actors'
@ -96,6 +97,7 @@ describe('Mastodon APIs', () => {
test('update credentials', async () => {
const db = await makeDB()
const queue = makeQueue()
const connectedActor = await createPerson(domain, db, userKEK, 'sven@cloudflare.com')
const updates = new FormData()
@ -112,7 +114,8 @@ describe('Mastodon APIs', () => {
connectedActor,
'CF_ACCOUNT_ID',
'CF_API_TOKEN',
userKEK
userKEK,
queue
)
assert.equal(res.status, 200)
@ -126,25 +129,14 @@ describe('Mastodon APIs', () => {
assert.equal(updatedActor.summary, 'hein')
})
test('update credentials sends update', async () => {
test('update credentials sends update to follower', async () => {
const db = await makeDB()
const queue = makeQueue()
const connectedActor = await createPerson(domain, db, userKEK, 'sven@cloudflare.com')
const actor2 = await createPerson(domain, db, userKEK, 'sven2@cloudflare.com')
await addFollowing(db, actor2, connectedActor, 'sven2@' + domain)
await acceptFollowing(db, actor2, connectedActor)
let receivedActivity: any = null
globalThis.fetch = async (input: any) => {
if (input.url.toString() === `https://${domain}/ap/users/sven2/inbox`) {
assert.equal(input.method, 'POST')
receivedActivity = await input.json()
return new Response('')
}
throw new Error('unexpected request to ' + input.url)
}
const updates = new FormData()
updates.set('display_name', 'newsven')
@ -158,14 +150,17 @@ describe('Mastodon APIs', () => {
connectedActor,
'CF_ACCOUNT_ID',
'CF_API_TOKEN',
userKEK
userKEK,
queue
)
assert.equal(res.status, 200)
assert(receivedActivity)
assert.equal(receivedActivity.type, 'Update')
assert.equal(receivedActivity.object.id.toString(), connectedActor.id.toString())
assert.equal(receivedActivity.object.name, 'newsven')
assert.equal(queue.messages.length, 1)
assert.equal(queue.messages[0].type, MessageType.Deliver)
assert.equal(queue.messages[0].activity.type, 'Update')
assert.equal(queue.messages[0].actorId, connectedActor.id.toString())
assert.equal(queue.messages[0].toActorId, actor2.id.toString())
})
test('update credentials avatar and header', async () => {
@ -187,6 +182,7 @@ describe('Mastodon APIs', () => {
}
const db = await makeDB()
const queue = makeQueue()
const connectedActor = await createPerson(domain, db, userKEK, 'sven@cloudflare.com')
const updates = new FormData()
@ -203,7 +199,8 @@ describe('Mastodon APIs', () => {
connectedActor,
'CF_ACCOUNT_ID',
'CF_API_TOKEN',
userKEK
userKEK,
queue
)
assert.equal(res.status, 200)

Wyświetl plik

@ -12,8 +12,10 @@ import * as statuses_context from 'wildebeest/functions/api/v1/statuses/[id]/con
import { createPerson } from 'wildebeest/backend/src/activitypub/actors'
import { insertLike } from 'wildebeest/backend/src/mastodon/like'
import { insertReblog } from 'wildebeest/backend/src/mastodon/reblog'
import { isUrlValid, makeDB, assertJSON, streamToArrayBuffer } from '../utils'
import { isUrlValid, makeDB, assertJSON, streamToArrayBuffer, makeQueue } from '../utils'
import * as note from 'wildebeest/backend/src/activitypub/objects/note'
import { addFollowing, acceptFollowing } from 'wildebeest/backend/src/mastodon/follow'
import { MessageType } from 'wildebeest/backend/src/types/queue'
const userKEK = 'test_kek4'
const sleep = (ms: number) => new Promise((r) => setTimeout(r, ms))
@ -23,6 +25,7 @@ describe('Mastodon APIs', () => {
describe('statuses', () => {
test('create new status missing params', async () => {
const db = await makeDB()
const queue = makeQueue()
const body = { status: 'my status' }
const req = new Request('https://example.com', {
@ -32,12 +35,13 @@ describe('Mastodon APIs', () => {
})
const connectedActor: any = {}
const res = await statuses.handleRequest(req, db, connectedActor, userKEK)
const res = await statuses.handleRequest(req, db, connectedActor, userKEK, queue)
assert.equal(res.status, 400)
})
test('create new status creates Note', async () => {
const db = await makeDB()
const queue = makeQueue()
const actor = await createPerson(domain, db, userKEK, 'sven@cloudflare.com')
const body = {
@ -51,7 +55,7 @@ describe('Mastodon APIs', () => {
})
const connectedActor = actor
const res = await statuses.handleRequest(req, db, connectedActor, userKEK)
const res = await statuses.handleRequest(req, db, connectedActor, userKEK, queue)
assert.equal(res.status, 200)
assertJSON(res)
@ -87,6 +91,7 @@ describe('Mastodon APIs', () => {
test("create new status adds to Actor's outbox", async () => {
const db = await makeDB()
const queue = makeQueue()
const actor = await createPerson(domain, db, userKEK, 'sven@cloudflare.com')
const body = {
@ -100,13 +105,50 @@ describe('Mastodon APIs', () => {
})
const connectedActor = actor
const res = await statuses.handleRequest(req, db, connectedActor, userKEK)
const res = await statuses.handleRequest(req, db, connectedActor, userKEK, queue)
assert.equal(res.status, 200)
const row = await db.prepare(`SELECT count(*) as count FROM outbox_objects`).first()
assert.equal(row.count, 1)
})
test('create new status delivers to followers via Queue', async () => {
const queue = makeQueue()
const db = await makeDB()
const actor = await createPerson(domain, db, userKEK, 'sven@cloudflare.com')
const followerA = await createPerson(domain, db, userKEK, 'followerA@cloudflare.com')
const followerB = await createPerson(domain, db, userKEK, 'followerB@cloudflare.com')
await addFollowing(db, followerA, actor, 'not needed')
await sleep(10)
await addFollowing(db, followerB, actor, 'not needed')
await acceptFollowing(db, followerA, actor)
await acceptFollowing(db, followerB, actor)
const body = { status: 'my status', visibility: 'public' }
const req = new Request('https://example.com', {
method: 'POST',
headers: { 'content-type': 'application/json' },
body: JSON.stringify(body),
})
const res = await statuses.handleRequest(req, db, actor, userKEK, queue)
assert.equal(res.status, 200)
assert.equal(queue.messages.length, 2)
assert.equal(queue.messages[0].type, MessageType.Deliver)
assert.equal(queue.messages[0].userKEK, userKEK)
assert.equal(queue.messages[0].actorId, actor.id.toString())
assert.equal(queue.messages[0].toActorId, followerA.id.toString())
assert.equal(queue.messages[1].type, MessageType.Deliver)
assert.equal(queue.messages[1].userKEK, userKEK)
assert.equal(queue.messages[1].actorId, actor.id.toString())
assert.equal(queue.messages[1].toActorId, followerB.id.toString())
})
test('create new status with mention delivers ActivityPub Note', async () => {
let deliveredNote: any = null
@ -118,17 +160,17 @@ describe('Mastodon APIs', () => {
{
rel: 'self',
type: 'application/activity+json',
href: 'https://social.com/sven',
href: 'https://social.com/users/sven',
},
],
})
)
}
if (input.toString() === 'https://social.com/sven') {
if (input.toString() === 'https://social.com/users/sven') {
return new Response(
JSON.stringify({
id: 'https://social.com/sven',
id: 'https://social.com/users/sven',
inbox: 'https://social.com/sven/inbox',
})
)
@ -156,6 +198,7 @@ describe('Mastodon APIs', () => {
}
const db = await makeDB()
const queue = makeQueue()
const actor = await createPerson(domain, db, userKEK, 'sven@cloudflare.com')
const body = {
@ -169,7 +212,7 @@ describe('Mastodon APIs', () => {
})
const connectedActor = actor
const res = await statuses.handleRequest(req, db, connectedActor, userKEK)
const res = await statuses.handleRequest(req, db, connectedActor, userKEK, queue)
assert.equal(res.status, 200)
assert(deliveredNote)
@ -183,6 +226,7 @@ describe('Mastodon APIs', () => {
test('create new status with image', async () => {
const db = await makeDB()
const queue = makeQueue()
const connectedActor = await createPerson(domain, db, userKEK, 'sven@cloudflare.com')
const properties = { url: 'foo' }
@ -199,7 +243,7 @@ describe('Mastodon APIs', () => {
body: JSON.stringify(body),
})
const res = await statuses.handleRequest(req, db, connectedActor, userKEK)
const res = await statuses.handleRequest(req, db, connectedActor, userKEK, queue)
assert.equal(res.status, 200)
const data = await res.json<any>()
@ -229,16 +273,7 @@ describe('Mastodon APIs', () => {
.run()
globalThis.fetch = async (input: any) => {
if (input === actor.id.toString()) {
return new Response(
JSON.stringify({
id: actor.id,
inbox: 'https://social.com/sven/inbox',
})
)
}
if (input.url === 'https://social.com/sven/inbox') {
if (input.url === actor.id.toString() + '/inbox') {
assert.equal(input.method, 'POST')
const body = await input.json()
deliveredActivity = body
@ -392,12 +427,13 @@ describe('Mastodon APIs', () => {
test('reblog records in db', async () => {
const db = await makeDB()
const queue = makeQueue()
const actor = await createPerson(domain, db, userKEK, 'sven@cloudflare.com')
const note = await createPublicNote(domain, db, 'my first status', actor)
const connectedActor: any = actor
const res = await statuses_reblog.handleRequest(db, note.mastodonId!, connectedActor, userKEK)
const res = await statuses_reblog.handleRequest(db, note.mastodonId!, connectedActor, userKEK, queue)
assert.equal(res.status, 200)
const data = await res.json<any>()
@ -411,36 +447,25 @@ describe('Mastodon APIs', () => {
test('reblog status adds in actor outbox', async () => {
const db = await makeDB()
const actor = await createPerson(domain, db, userKEK, 'sven@cloudflare.com')
const originalObjectId = 'https://example.com/note123'
const queue = makeQueue()
await db
.prepare(
'INSERT INTO objects (id, type, properties, original_actor_id, original_object_id, mastodon_id, local) VALUES (?, ?, ?, ?, ?, ?, 0)'
)
.bind(
'https://example.com/object1',
'Note',
JSON.stringify({ content: 'my first status' }),
actor.id.toString(),
originalObjectId,
'mastodonid1'
)
.run()
const note = await createPublicNote(domain, db, 'my first status', actor)
const connectedActor: any = actor
const res = await statuses_reblog.handleRequest(db, 'mastodonid1', connectedActor, userKEK)
const res = await statuses_reblog.handleRequest(db, note.mastodonId!, connectedActor, userKEK, queue)
assert.equal(res.status, 200)
const row = await db.prepare(`SELECT * FROM outbox_objects`).first()
assert.equal(row.actor_id, actor.id.toString())
assert.equal(row.object_id, 'https://example.com/object1')
assert.equal(row.object_id, note.id.toString())
})
test('reblog remote status status sends Announce activity to author', async () => {
let deliveredActivity: any = null
const db = await makeDB()
const queue = makeQueue()
const actor = await createPerson(domain, db, userKEK, 'sven@cloudflare.com')
const originalObjectId = 'https://example.com/note123'
@ -459,16 +484,7 @@ describe('Mastodon APIs', () => {
.run()
globalThis.fetch = async (input: any) => {
if (input === actor.id.toString()) {
return new Response(
JSON.stringify({
id: actor.id,
inbox: 'https://social.com/sven/inbox',
})
)
}
if (input.url === 'https://social.com/sven/inbox') {
if (input.url === 'https://cloudflare.com/ap/users/sven/inbox') {
assert.equal(input.method, 'POST')
const body = await input.json()
deliveredActivity = body
@ -480,7 +496,7 @@ describe('Mastodon APIs', () => {
const connectedActor: any = actor
const res = await statuses_reblog.handleRequest(db, 'mastodonid1', connectedActor, userKEK)
const res = await statuses_reblog.handleRequest(db, 'mastodonid1', connectedActor, userKEK, queue)
assert.equal(res.status, 200)
assert(deliveredActivity)

Wyświetl plik

@ -1,4 +1,5 @@
import { strict as assert } from 'node:assert/strict'
import type { Queue } from 'wildebeest/backend/src/types/queue'
import { createClient } from 'wildebeest/backend/src/mastodon/client'
import type { Client } from 'wildebeest/backend/src/mastodon/client'
import { promises as fs } from 'fs'
@ -64,3 +65,23 @@ export async function createTestClient(
): Promise<Client> {
return createClient(db, 'test client', redirectUri, 'https://cloudflare.com', scopes)
}
type TestQueue = Queue<any> & { messages: Array<any> }
export function makeQueue(): TestQueue {
const messages: Array<any> = []
return {
messages,
async send(msg: any) {
messages.push(msg)
},
async sendBatch(batch: Array<{ body: any }>) {
for (let i = 0, len = batch.length; i < len; i++) {
messages.push(batch[i].body)
}
},
}
}

Wyświetl plik

@ -0,0 +1,43 @@
import type { MessageBody, DeliverMessageBody } from 'wildebeest/backend/src/types/queue'
import { getSigningKey } from 'wildebeest/backend/src/mastodon/account'
import * as actors from 'wildebeest/backend/src/activitypub/actors'
import type { Actor } from 'wildebeest/backend/src/activitypub/actors'
import type { Env } from './'
import { generateDigestHeader } from 'wildebeest/backend/src/utils/http-signing-cavage'
import { signRequest } from 'wildebeest/backend/src/utils/http-signing'
const headers = {
'content-type': 'application/activity+json',
}
export async function handleDeliverMessage(env: Env, actor: Actor, message: DeliverMessageBody) {
const toActorId = new URL(message.toActorId)
const targetActor = await actors.getAndCache(toActorId, env.DATABASE)
if (targetActor === null) {
console.warn(`actor ${toActorId} not found`)
return
}
const body = JSON.stringify(message.activity)
const req = new Request(targetActor.inbox, {
method: 'POST',
body,
headers,
})
const digest = await generateDigestHeader(body)
req.headers.set('Digest', digest)
const signingKey = await getSigningKey(message.userKEK, env.DATABASE, actor)
await signRequest(req, signingKey, actor.id)
const res = await fetch(req)
if (!res.ok) {
const body = await res.text()
console.error(`delivery to ${targetActor.inbox} returned ${res.status}: ${body}`)
return
}
{
const body = await res.text()
console.log(`${targetActor.inbox} returned 200: ${body}`)
}
}

Wyświetl plik

@ -0,0 +1,23 @@
import type { MessageBody, InboxMessageBody } from 'wildebeest/backend/src/types/queue'
import * as activityHandler from 'wildebeest/backend/src/activitypub/activities/handle'
import * as notification from 'wildebeest/backend/src/mastodon/notification'
import * as timeline from 'wildebeest/backend/src/mastodon/timeline'
import type { Actor } from 'wildebeest/backend/src/activitypub/actors'
import type { Env } from './'
export async function handleInboxMessage(env: Env, actor: Actor, message: InboxMessageBody) {
const domain = env.DOMAIN
const db = env.DATABASE
const adminEmail = env.ADMIN_EMAIL
const cache = env.KV_CACHE
const activity = message.activity
await activityHandler.handle(domain, activity, db, message.userKEK, adminEmail, message.vapidKeys)
// Assuming we received new posts or a like, pregenerate the user's timelines
// and notifications.
await Promise.all([
timeline.pregenerateTimelines(domain, db, cache, actor),
notification.pregenerateNotifications(db, cache, actor),
])
}

Wyświetl plik

@ -1,13 +1,14 @@
import type { MessageBody } from 'wildebeest/backend/src/types/queue'
import type { MessageBody, InboxMessageBody, DeliverMessageBody } from 'wildebeest/backend/src/types/queue'
import type { JWK } from 'wildebeest/backend/src/webpush/jwk'
import type { Actor } from 'wildebeest/backend/src/activitypub/actors'
import * as actors from 'wildebeest/backend/src/activitypub/actors'
import * as timeline from 'wildebeest/backend/src/mastodon/timeline'
import * as notification from 'wildebeest/backend/src/mastodon/notification'
import * as activityHandler from 'wildebeest/backend/src/activitypub/activities/handle'
import type { Activity } from 'wildebeest/backend/src/activitypub/activities'
import { MessageType } from 'wildebeest/backend/src/types/queue'
type Env = {
import { handleInboxMessage } from './inbox'
import { handleDeliverMessage } from './deliver'
export type Env = {
DATABASE: D1Database
DOMAIN: string
ADMIN_EMAIL: string
@ -17,42 +18,24 @@ type Env = {
export default {
async queue(batch: MessageBatch<MessageBody>, env: Env, ctx: ExecutionContext) {
for (const message of batch.messages) {
try {
const actor = await actors.getPersonById(env.DATABASE, new URL(message.body.actorId))
if (actor === null) {
console.warn(`actor ${message.body.actorId} is missing`)
return
}
const actor = await actors.getPersonById(env.DATABASE, new URL(message.body.actorId))
if (actor === null) {
console.warn(`actor ${message.body.actorId} is missing`)
return
}
switch (message.body.type) {
case 'activity': {
await handleActivityMessage(env, actor, message.body)
break
}
default:
throw new Error('unsupported message type: ' + message.body.type)
switch (message.body.type) {
case MessageType.Inbox: {
await handleInboxMessage(env, actor, message.body as InboxMessageBody)
break
}
} catch (err: any) {
console.error(err.stack)
// TODO: add sentry
case MessageType.Deliver: {
await handleDeliverMessage(env, actor, message.body as DeliverMessageBody)
break
}
default:
throw new Error('unsupported message type: ' + message.body.type)
}
}
},
}
async function handleActivityMessage(env: Env, actor: Actor, message: MessageBody) {
const domain = env.DOMAIN
const db = env.DATABASE
const adminEmail = env.ADMIN_EMAIL
const cache = env.KV_CACHE
const activity = message.content
await activityHandler.handle(domain, activity, db, message.userKEK, adminEmail, message.vapidKeys)
// Assuming we received new posts or a like, pregenerate the user's timelines
// and notifications.
await Promise.all([
timeline.pregenerateTimelines(domain, db, cache, actor),
notification.pregenerateNotifications(db, cache, actor),
])
}

Wyświetl plik

@ -0,0 +1,68 @@
import { MessageType } from 'wildebeest/backend/src/types/queue'
import { strict as assert } from 'node:assert/strict'
import type { DeliverMessageBody } from 'wildebeest/backend/src/types/queue'
import { createPerson } from 'wildebeest/backend/src/activitypub/actors'
import { makeDB } from 'wildebeest/backend/test/utils'
import { createPublicNote } from 'wildebeest/backend/src/activitypub/objects/note'
import { handleDeliverMessage } from '../src/deliver'
const domain = 'cloudflare.com'
const userKEK = 'test_kek25'
describe('Consumer', () => {
describe('Deliver', () => {
test('deliver to target Actor', async () => {
const db = await makeDB()
let receivedActivity: any = null
globalThis.fetch = async (input: any) => {
if (input.toString() === 'https://example.com/users/a') {
return new Response(
JSON.stringify({
id: 'https://example.com/users/a',
type: 'Person',
preferredUsername: 'someone',
inbox: 'https://example.com/inbox',
})
)
}
if (input.url.toString() === 'https://example.com/inbox') {
assert.equal(input.method, 'POST')
receivedActivity = await input.json()
return new Response('')
}
throw new Error('unexpected request to ' + input.url)
}
const actor = await createPerson(domain, db, userKEK, 'sven@cloudflare.com')
const note = await createPublicNote(domain, db, 'my first status', actor)
const activity: any = {
type: 'Create',
actor: actor.id.toString(),
to: ['https://example.com/users/a'],
cc: [],
object: note.id,
}
const message: DeliverMessageBody = {
activity,
type: MessageType.Deliver,
actorId: actor.id.toString(),
toActorId: 'https://example.com/users/a',
userKEK: userKEK,
}
const env = {
DATABASE: db,
} as any
await handleDeliverMessage(env, actor, message)
assert(receivedActivity)
assert.equal(receivedActivity.type, activity.type)
})
})
})

Wyświetl plik

@ -1,6 +1,7 @@
name = "wildebeest-consumer"
compatibility_date = "2023-01-12"
main = "./src/index.ts"
usage_model = "unbound"
[[queues.consumers]]
queue = "wildebeest"

Wyświetl plik

@ -5,6 +5,10 @@ import { statuses } from 'wildebeest/frontend/src/dummyData'
import type { Account, MastodonStatus } from 'wildebeest/frontend/src/types'
const kek = 'test-kek'
const queue = {
async send() {},
async sendBatch() {},
}
/**
* Run helper commands to initialize the database with actors, statuses, etc.
*/
@ -39,7 +43,7 @@ async function createStatus(db: D1Database, actor: Person, status: string, visib
headers,
body: JSON.stringify(body),
})
const resp = await statusesAPI.handleRequest(req, db, actor, kek)
const resp = await statusesAPI.handleRequest(req, db, actor, kek, queue)
return (await resp.json()) as MastodonStatus
}
@ -61,5 +65,5 @@ async function getOrCreatePerson(
}
async function reblogStatus(db: D1Database, actor: Person, status: MastodonStatus) {
await reblogAPI.handleRequest(db, status.id, actor, kek)
await reblogAPI.handleRequest(db, status.id, actor, kek, queue)
}

Wyświetl plik

@ -4,7 +4,8 @@ import type { JWK } from 'wildebeest/backend/src/webpush/jwk'
import * as actors from 'wildebeest/backend/src/activitypub/actors'
import { actorURL } from 'wildebeest/backend/src/activitypub/actors'
import type { Env } from 'wildebeest/backend/src/types/env'
import type { MessageBody } from 'wildebeest/backend/src/types/queue'
import type { InboxMessageBody } from 'wildebeest/backend/src/types/queue'
import { MessageType } from 'wildebeest/backend/src/types/queue'
import type { Activity } from 'wildebeest/backend/src/activitypub/activities'
import { parseRequest } from 'wildebeest/backend/src/utils/httpsigjs/parser'
import { fetchKey, verifySignature } from 'wildebeest/backend/src/utils/httpsigjs/verifier'
@ -37,7 +38,7 @@ export async function handleRequest(
db: D1Database,
id: string,
activity: Activity,
queue: Queue<MessageBody>,
queue: Queue<InboxMessageBody>,
userKEK: string,
vapidKeys: JWK
): Promise<Response> {
@ -54,9 +55,9 @@ export async function handleRequest(
}
await queue.send({
type: 'activity',
type: MessageType.Inbox,
actorId: actor.id.toString(),
content: activity,
activity,
userKEK,
vapidKeys,
})

Wyświetl plik

@ -1,7 +1,7 @@
// https://docs.joinmastodon.org/methods/accounts/#update_credentials
import type { Queue, DeliverMessageBody } from 'wildebeest/backend/src/types/queue'
import * as errors from 'wildebeest/backend/src/errors'
import { getSigningKey } from 'wildebeest/backend/src/mastodon/account'
import * as activities from 'wildebeest/backend/src/activitypub/activities/update'
import * as actors from 'wildebeest/backend/src/activitypub/actors'
import { deliverFollowers } from 'wildebeest/backend/src/activitypub/deliver'
@ -20,7 +20,15 @@ const headers = {
}
export const onRequest: PagesFunction<Env, any, ContextData> = async ({ request, data, env }) => {
return handleRequest(env.DATABASE, request, data.connectedActor, env.CF_ACCOUNT_ID, env.CF_API_TOKEN, env.userKEK)
return handleRequest(
env.DATABASE,
request,
data.connectedActor,
env.CF_ACCOUNT_ID,
env.CF_API_TOKEN,
env.userKEK,
env.QUEUE
)
}
export async function handleRequest(
@ -31,7 +39,8 @@ export async function handleRequest(
accountId: string,
apiToken: string,
userKEK: string
userKEK: string,
queue: Queue<DeliverMessageBody>
): Promise<Response> {
if (!connectedActor) {
return new Response('', { status: 401 })
@ -106,8 +115,7 @@ export async function handleRequest(
// send updates
const activity = activities.create(domain, connectedActor, actor)
const signingKey = await getSigningKey(userKEK, db, connectedActor)
await deliverFollowers(db, signingKey, connectedActor, activity)
await deliverFollowers(db, userKEK, connectedActor, activity, queue)
return new Response(JSON.stringify(res), { headers })
}

Wyświetl plik

@ -1,5 +1,5 @@
// https://docs.joinmastodon.org/methods/statuses/#create
import type { Queue, DeliverMessageBody } from 'wildebeest/backend/src/types/queue'
import { loadLocalMastodonAccount } from 'wildebeest/backend/src/mastodon/account'
import { createPublicNote } from 'wildebeest/backend/src/activitypub/objects/note'
import type { Document } from 'wildebeest/backend/src/activitypub/objects'
@ -23,7 +23,7 @@ type StatusCreate = {
}
export const onRequest: PagesFunction<Env, any, ContextData> = async ({ request, env, data }) => {
return handleRequest(request, env.DATABASE, data.connectedActor, env.userKEK)
return handleRequest(request, env.DATABASE, data.connectedActor, env.userKEK, env.QUEUE)
}
// FIXME: add tests for delivery to followers and mentions to a specific Actor.
@ -31,7 +31,8 @@ export async function handleRequest(
request: Request,
db: D1Database,
connectedActor: Person,
userKEK: string
userKEK: string,
queue: Queue<DeliverMessageBody>
): Promise<Response> {
// TODO: implement Idempotency-Key
@ -64,28 +65,29 @@ export async function handleRequest(
const note = await createPublicNote(domain, db, body.status, connectedActor, mediaAttachments)
await addObjectInOutbox(db, connectedActor, note)
// If the status is mentioning other persons, we need to delivery it to them.
const mentions = getMentions(body.status)
for (let i = 0, len = mentions.length; i < len; i++) {
if (mentions[i].domain === null) {
// Only deliver the note for remote actors
continue
}
const acct = `${mentions[i].localPart}@${mentions[i].domain}`
const targetActor = await queryAcct(mentions[i].domain!, acct)
if (targetActor === null) {
console.warn(`actor ${acct} not found`)
continue
}
note.to.push(targetActor.id.toString())
const activity = activities.create(domain, connectedActor, note)
const signingKey = await getSigningKey(userKEK, db, connectedActor)
await deliverToActor(signingKey, connectedActor, targetActor, activity)
}
const activity = activities.create(domain, connectedActor, note)
const signingKey = await getSigningKey(userKEK, db, connectedActor)
await deliverFollowers(db, signingKey, connectedActor, activity)
await deliverFollowers(db, userKEK, connectedActor, activity, queue)
{
// If the status is mentioning other persons, we need to delivery it to them.
const mentions = getMentions(body.status)
for (let i = 0, len = mentions.length; i < len; i++) {
if (mentions[i].domain === null) {
// Only deliver the note for remote actors
continue
}
const acct = `${mentions[i].localPart}@${mentions[i].domain}`
const targetActor = await queryAcct(mentions[i].domain!, acct)
if (targetActor === null) {
console.warn(`actor ${acct} not found`)
continue
}
note.to.push(targetActor.id.toString())
const activity = activities.create(domain, connectedActor, note)
const signingKey = await getSigningKey(userKEK, db, connectedActor)
await deliverToActor(signingKey, connectedActor, targetActor, activity)
}
}
const account = await loadLocalMastodonAccount(db, connectedActor)

Wyświetl plik

@ -34,7 +34,7 @@ export async function handleRequest(
if (obj.originalObjectId && obj.originalActorId) {
// Liking an external object delivers the like activity
const targetActor = await actors.get(obj.originalActorId)
const targetActor = await actors.getAndCache(new URL(obj.originalActorId), db)
if (!targetActor) {
return new Response(`target Actor ${obj.originalActorId} not found`, { status: 404 })
}

Wyświetl plik

@ -1,5 +1,5 @@
// https://docs.joinmastodon.org/methods/statuses/#boost
import type { Queue, DeliverMessageBody } from 'wildebeest/backend/src/types/queue'
import type { Env } from 'wildebeest/backend/src/types/env'
import { addObjectInOutbox } from 'wildebeest/backend/src/activitypub/actors/outbox'
import { insertReblog } from 'wildebeest/backend/src/mastodon/reblog'
@ -14,14 +14,15 @@ import type { ContextData } from 'wildebeest/backend/src/types/context'
import { toMastodonStatusFromObject } from 'wildebeest/backend/src/mastodon/status'
export const onRequest: PagesFunction<Env, any, ContextData> = async ({ env, data, params }) => {
return handleRequest(env.DATABASE, params.id as string, data.connectedActor, env.userKEK)
return handleRequest(env.DATABASE, params.id as string, data.connectedActor, env.userKEK, env.QUEUE)
}
export async function handleRequest(
db: D1Database,
id: string,
connectedActor: Person,
userKEK: string
userKEK: string,
queue: Queue<DeliverMessageBody>
): Promise<Response> {
const obj = await getObjectByMastodonId(db, id)
if (obj === null) {
@ -33,23 +34,22 @@ export async function handleRequest(
return new Response('', { status: 404 })
}
const signingKey = await getSigningKey(userKEK, db, connectedActor)
if (obj.originalObjectId && obj.originalActorId) {
// Rebloggin an external object delivers the announce activity to the
// post author.
const targetActor = await actors.get(obj.originalActorId)
const targetActor = await actors.getAndCache(new URL(obj.originalActorId), db)
if (!targetActor) {
return new Response(`target Actor ${obj.originalActorId} not found`, { status: 404 })
}
const activity = announce.create(connectedActor, new URL(obj.originalObjectId))
const signingKey = await getSigningKey(userKEK, db, connectedActor)
await Promise.all([
// Delivers the announce activity to the post author.
deliverToActor(signingKey, connectedActor, targetActor, activity),
// Share reblogged by delivering the announce activity to followers
deliverFollowers(db, signingKey, connectedActor, activity),
deliverFollowers(db, userKEK, connectedActor, activity, queue),
])
}

Wyświetl plik

@ -2,7 +2,7 @@
export default {
preset: 'ts-jest',
verbose: true,
testMatch: ["<rootDir>/backend/test/**/(*.)+(spec|test).[jt]s?(x)"],
testMatch: ["<rootDir>/(backend|consumer)/test/**/(*.)+(spec|test).[jt]s?(x)"],
testTimeout: 30000,
testEnvironment: 'miniflare',
// Configuration is automatically loaded from `.env`, `package.json` and