diff --git a/.github/workflows/deploy.yml b/.github/workflows/deploy.yml index 7b29b5d..259e9e4 100644 --- a/.github/workflows/deploy.yml +++ b/.github/workflows/deploy.yml @@ -249,7 +249,7 @@ jobs: yarn build cp -rv ./frontend/dist/* . # remove folder that aren't needed in Pages before we upload - rm -rf ./tf ./scripts ./.github ./.npm ./consumer + rm -rf ./tf ./scripts ./.github ./.npm ./consumer ./*.md echo "******" command: pages publish --project-name=wildebeest-${{ env.OWNER_LOWER }} . env: diff --git a/backend/src/activitypub/deliver.ts b/backend/src/activitypub/deliver.ts index 9806f14..7a16b33 100644 --- a/backend/src/activitypub/deliver.ts +++ b/backend/src/activitypub/deliver.ts @@ -1,6 +1,7 @@ // https://www.w3.org/TR/activitypub/#delivery -import * as actors from 'wildebeest/backend/src/activitypub/actors' +import type { MessageSendRequest, Queue, DeliverMessageBody } from 'wildebeest/backend/src/types/queue' +import { MessageType } from 'wildebeest/backend/src/types/queue' import type { Activity } from './activities' import type { Actor } from './actors' import { generateDigestHeader } from 'wildebeest/backend/src/utils/http-signing-cavage' @@ -34,42 +35,30 @@ export async function deliverToActor(signingKey: CryptoKey, from: Actor, to: Act } } -export async function deliverFollowers(db: D1Database, signingKey: CryptoKey, from: Actor, activity: Activity) { - const body = JSON.stringify(activity) +export async function deliverFollowers( + db: D1Database, + userKEK: string, + from: Actor, + activity: Activity, + queue: Queue +) { const followers = await getFollowers(db, from) - const promises = followers.map(async (id) => { - const follower = new URL(id) + const messages: Array> = await Promise.all( + followers.map(async (id) => { + const body = { + // Make sure the object is supported by `structuredClone()`, ie + // removing the URL objects as they aren't clonabled. + activity: JSON.parse(JSON.stringify(activity)), - // FIXME: When an actor follows another Actor we should download its object - // locally, so we can retrieve the Actor's inbox without a request. - - const targetActor = await actors.getAndCache(follower, db) - if (targetActor === null) { - console.warn(`actor ${follower} not found`) - return - } - - const req = new Request(targetActor.inbox, { - method: 'POST', - body, - headers, + actorId: from.id.toString(), + toActorId: id, + type: MessageType.Deliver, + userKEK, + } + return { body } }) - const digest = await generateDigestHeader(body) - req.headers.set('Digest', digest) - await signRequest(req, signingKey, new URL(from.id)) + ) - const res = await fetch(req) - if (!res.ok) { - const body = await res.text() - console.error(`delivery to ${targetActor.inbox} returned ${res.status}: ${body}`) - return - } - { - const body = await res.text() - console.log(`${targetActor.inbox} returned 200: ${body}`) - } - }) - - await Promise.allSettled(promises) + await queue.sendBatch(messages) } diff --git a/backend/src/middleware/error.ts b/backend/src/middleware/error.ts index e48dfd6..439c75c 100644 --- a/backend/src/middleware/error.ts +++ b/backend/src/middleware/error.ts @@ -14,7 +14,7 @@ export async function errorHandling(context: EventContext) { if (sentry !== null) { sentry.captureException(err) } - console.error(err) + console.error(err.stack) return internalServerError() } } diff --git a/backend/src/types/env.ts b/backend/src/types/env.ts index 9a77112..a156cc0 100644 --- a/backend/src/types/env.ts +++ b/backend/src/types/env.ts @@ -1,8 +1,10 @@ +import type { Queue, MessageBody } from 'wildebeest/backend/src/types/queue' + export interface Env { DATABASE: D1Database KV_CACHE: KVNamespace userKEK: string - QUEUE: Queue + QUEUE: Queue CF_ACCOUNT_ID: string CF_API_TOKEN: string diff --git a/backend/src/types/queue.ts b/backend/src/types/queue.ts index e4c5072..e55d76f 100644 --- a/backend/src/types/queue.ts +++ b/backend/src/types/queue.ts @@ -1,15 +1,42 @@ import type { Activity } from 'wildebeest/backend/src/activitypub/activities' import type { JWK } from 'wildebeest/backend/src/webpush/jwk' -export type MessageType = 'activity' +export enum MessageType { + Inbox = 1, + Deliver, +} -export type MessageBody = { +export interface MessageBody { type: MessageType actorId: string - content: Activity +} + +// ActivityPub messages received by an Actor's Inbox are sent into the queue. +export interface InboxMessageBody extends MessageBody { + activity: Activity // Send secrets as part of the message because it's too complicated // to bind them to the consumer worker. userKEK: string vapidKeys: JWK } + +// ActivityPub message delivery job are sent to the queue and the consumer does +// the actual delivery. +export interface DeliverMessageBody extends MessageBody { + activity: Activity + toActorId: string + + // Send secrets as part of the message because it's too complicated + // to bind them to the consumer worker. + userKEK: string +} + +export type MessageSendRequest = { + body: Body +} + +export interface Queue { + send(body: Body): Promise + sendBatch(messages: Iterable>): Promise +} diff --git a/backend/test/activitypub.spec.ts b/backend/test/activitypub.spec.ts index 2142149..47fb303 100644 --- a/backend/test/activitypub.spec.ts +++ b/backend/test/activitypub.spec.ts @@ -1,4 +1,5 @@ import { makeDB, isUrlValid } from './utils' +import { MessageType } from 'wildebeest/backend/src/types/queue' import type { JWK } from 'wildebeest/backend/src/webpush/jwk' import { createPerson } from 'wildebeest/backend/src/activitypub/actors' import { createPublicNote } from 'wildebeest/backend/src/activitypub/objects/note' @@ -148,9 +149,9 @@ describe('ActivityPub', () => { assert.equal(res.status, 200) assert(msg) - assert.equal(msg.type, 'activity') + assert.equal(msg.type, MessageType.Inbox) assert.equal(msg.actorId, actor.id.toString()) - assert.equal(msg.content.type, 'some activity') + assert.equal(msg.activity.type, 'some activity') }) }) }) diff --git a/backend/test/mastodon/accounts.spec.ts b/backend/test/mastodon/accounts.spec.ts index aaba5d9..259b43e 100644 --- a/backend/test/mastodon/accounts.spec.ts +++ b/backend/test/mastodon/accounts.spec.ts @@ -1,4 +1,5 @@ import { strict as assert } from 'node:assert/strict' +import { MessageType } from 'wildebeest/backend/src/types/queue' import { addObjectInOutbox } from 'wildebeest/backend/src/activitypub/actors/outbox' import { createPublicNote } from 'wildebeest/backend/src/activitypub/objects/note' import * as accounts_following from 'wildebeest/functions/api/v1/accounts/[id]/following' @@ -10,7 +11,7 @@ import * as accounts_follow from 'wildebeest/functions/api/v1/accounts/[id]/foll import * as accounts_unfollow from 'wildebeest/functions/api/v1/accounts/[id]/unfollow' import * as accounts_statuses from 'wildebeest/functions/api/v1/accounts/[id]/statuses' import * as accounts_get from 'wildebeest/functions/api/v1/accounts/[id]' -import { isUrlValid, makeDB, assertCORS, assertJSON } from '../utils' +import { isUrlValid, makeDB, assertCORS, assertJSON, makeQueue } from '../utils' import * as accounts_verify_creds from 'wildebeest/functions/api/v1/accounts/verify_credentials' import * as accounts_update_creds from 'wildebeest/functions/api/v1/accounts/update_credentials' import { createPerson, getPersonById } from 'wildebeest/backend/src/activitypub/actors' @@ -96,6 +97,7 @@ describe('Mastodon APIs', () => { test('update credentials', async () => { const db = await makeDB() + const queue = makeQueue() const connectedActor = await createPerson(domain, db, userKEK, 'sven@cloudflare.com') const updates = new FormData() @@ -112,7 +114,8 @@ describe('Mastodon APIs', () => { connectedActor, 'CF_ACCOUNT_ID', 'CF_API_TOKEN', - userKEK + userKEK, + queue ) assert.equal(res.status, 200) @@ -126,25 +129,14 @@ describe('Mastodon APIs', () => { assert.equal(updatedActor.summary, 'hein') }) - test('update credentials sends update', async () => { + test('update credentials sends update to follower', async () => { const db = await makeDB() + const queue = makeQueue() const connectedActor = await createPerson(domain, db, userKEK, 'sven@cloudflare.com') const actor2 = await createPerson(domain, db, userKEK, 'sven2@cloudflare.com') await addFollowing(db, actor2, connectedActor, 'sven2@' + domain) await acceptFollowing(db, actor2, connectedActor) - let receivedActivity: any = null - - globalThis.fetch = async (input: any) => { - if (input.url.toString() === `https://${domain}/ap/users/sven2/inbox`) { - assert.equal(input.method, 'POST') - receivedActivity = await input.json() - return new Response('') - } - - throw new Error('unexpected request to ' + input.url) - } - const updates = new FormData() updates.set('display_name', 'newsven') @@ -158,14 +150,17 @@ describe('Mastodon APIs', () => { connectedActor, 'CF_ACCOUNT_ID', 'CF_API_TOKEN', - userKEK + userKEK, + queue ) assert.equal(res.status, 200) - assert(receivedActivity) - assert.equal(receivedActivity.type, 'Update') - assert.equal(receivedActivity.object.id.toString(), connectedActor.id.toString()) - assert.equal(receivedActivity.object.name, 'newsven') + assert.equal(queue.messages.length, 1) + + assert.equal(queue.messages[0].type, MessageType.Deliver) + assert.equal(queue.messages[0].activity.type, 'Update') + assert.equal(queue.messages[0].actorId, connectedActor.id.toString()) + assert.equal(queue.messages[0].toActorId, actor2.id.toString()) }) test('update credentials avatar and header', async () => { @@ -187,6 +182,7 @@ describe('Mastodon APIs', () => { } const db = await makeDB() + const queue = makeQueue() const connectedActor = await createPerson(domain, db, userKEK, 'sven@cloudflare.com') const updates = new FormData() @@ -203,7 +199,8 @@ describe('Mastodon APIs', () => { connectedActor, 'CF_ACCOUNT_ID', 'CF_API_TOKEN', - userKEK + userKEK, + queue ) assert.equal(res.status, 200) diff --git a/backend/test/mastodon/statuses.spec.ts b/backend/test/mastodon/statuses.spec.ts index e5437d1..7d60461 100644 --- a/backend/test/mastodon/statuses.spec.ts +++ b/backend/test/mastodon/statuses.spec.ts @@ -12,8 +12,10 @@ import * as statuses_context from 'wildebeest/functions/api/v1/statuses/[id]/con import { createPerson } from 'wildebeest/backend/src/activitypub/actors' import { insertLike } from 'wildebeest/backend/src/mastodon/like' import { insertReblog } from 'wildebeest/backend/src/mastodon/reblog' -import { isUrlValid, makeDB, assertJSON, streamToArrayBuffer } from '../utils' +import { isUrlValid, makeDB, assertJSON, streamToArrayBuffer, makeQueue } from '../utils' import * as note from 'wildebeest/backend/src/activitypub/objects/note' +import { addFollowing, acceptFollowing } from 'wildebeest/backend/src/mastodon/follow' +import { MessageType } from 'wildebeest/backend/src/types/queue' const userKEK = 'test_kek4' const sleep = (ms: number) => new Promise((r) => setTimeout(r, ms)) @@ -23,6 +25,7 @@ describe('Mastodon APIs', () => { describe('statuses', () => { test('create new status missing params', async () => { const db = await makeDB() + const queue = makeQueue() const body = { status: 'my status' } const req = new Request('https://example.com', { @@ -32,12 +35,13 @@ describe('Mastodon APIs', () => { }) const connectedActor: any = {} - const res = await statuses.handleRequest(req, db, connectedActor, userKEK) + const res = await statuses.handleRequest(req, db, connectedActor, userKEK, queue) assert.equal(res.status, 400) }) test('create new status creates Note', async () => { const db = await makeDB() + const queue = makeQueue() const actor = await createPerson(domain, db, userKEK, 'sven@cloudflare.com') const body = { @@ -51,7 +55,7 @@ describe('Mastodon APIs', () => { }) const connectedActor = actor - const res = await statuses.handleRequest(req, db, connectedActor, userKEK) + const res = await statuses.handleRequest(req, db, connectedActor, userKEK, queue) assert.equal(res.status, 200) assertJSON(res) @@ -87,6 +91,7 @@ describe('Mastodon APIs', () => { test("create new status adds to Actor's outbox", async () => { const db = await makeDB() + const queue = makeQueue() const actor = await createPerson(domain, db, userKEK, 'sven@cloudflare.com') const body = { @@ -100,13 +105,50 @@ describe('Mastodon APIs', () => { }) const connectedActor = actor - const res = await statuses.handleRequest(req, db, connectedActor, userKEK) + const res = await statuses.handleRequest(req, db, connectedActor, userKEK, queue) assert.equal(res.status, 200) const row = await db.prepare(`SELECT count(*) as count FROM outbox_objects`).first() assert.equal(row.count, 1) }) + test('create new status delivers to followers via Queue', async () => { + const queue = makeQueue() + const db = await makeDB() + + const actor = await createPerson(domain, db, userKEK, 'sven@cloudflare.com') + const followerA = await createPerson(domain, db, userKEK, 'followerA@cloudflare.com') + const followerB = await createPerson(domain, db, userKEK, 'followerB@cloudflare.com') + + await addFollowing(db, followerA, actor, 'not needed') + await sleep(10) + await addFollowing(db, followerB, actor, 'not needed') + await acceptFollowing(db, followerA, actor) + await acceptFollowing(db, followerB, actor) + + const body = { status: 'my status', visibility: 'public' } + const req = new Request('https://example.com', { + method: 'POST', + headers: { 'content-type': 'application/json' }, + body: JSON.stringify(body), + }) + + const res = await statuses.handleRequest(req, db, actor, userKEK, queue) + assert.equal(res.status, 200) + + assert.equal(queue.messages.length, 2) + + assert.equal(queue.messages[0].type, MessageType.Deliver) + assert.equal(queue.messages[0].userKEK, userKEK) + assert.equal(queue.messages[0].actorId, actor.id.toString()) + assert.equal(queue.messages[0].toActorId, followerA.id.toString()) + + assert.equal(queue.messages[1].type, MessageType.Deliver) + assert.equal(queue.messages[1].userKEK, userKEK) + assert.equal(queue.messages[1].actorId, actor.id.toString()) + assert.equal(queue.messages[1].toActorId, followerB.id.toString()) + }) + test('create new status with mention delivers ActivityPub Note', async () => { let deliveredNote: any = null @@ -118,17 +160,17 @@ describe('Mastodon APIs', () => { { rel: 'self', type: 'application/activity+json', - href: 'https://social.com/sven', + href: 'https://social.com/users/sven', }, ], }) ) } - if (input.toString() === 'https://social.com/sven') { + if (input.toString() === 'https://social.com/users/sven') { return new Response( JSON.stringify({ - id: 'https://social.com/sven', + id: 'https://social.com/users/sven', inbox: 'https://social.com/sven/inbox', }) ) @@ -156,6 +198,7 @@ describe('Mastodon APIs', () => { } const db = await makeDB() + const queue = makeQueue() const actor = await createPerson(domain, db, userKEK, 'sven@cloudflare.com') const body = { @@ -169,7 +212,7 @@ describe('Mastodon APIs', () => { }) const connectedActor = actor - const res = await statuses.handleRequest(req, db, connectedActor, userKEK) + const res = await statuses.handleRequest(req, db, connectedActor, userKEK, queue) assert.equal(res.status, 200) assert(deliveredNote) @@ -183,6 +226,7 @@ describe('Mastodon APIs', () => { test('create new status with image', async () => { const db = await makeDB() + const queue = makeQueue() const connectedActor = await createPerson(domain, db, userKEK, 'sven@cloudflare.com') const properties = { url: 'foo' } @@ -199,7 +243,7 @@ describe('Mastodon APIs', () => { body: JSON.stringify(body), }) - const res = await statuses.handleRequest(req, db, connectedActor, userKEK) + const res = await statuses.handleRequest(req, db, connectedActor, userKEK, queue) assert.equal(res.status, 200) const data = await res.json() @@ -229,16 +273,7 @@ describe('Mastodon APIs', () => { .run() globalThis.fetch = async (input: any) => { - if (input === actor.id.toString()) { - return new Response( - JSON.stringify({ - id: actor.id, - inbox: 'https://social.com/sven/inbox', - }) - ) - } - - if (input.url === 'https://social.com/sven/inbox') { + if (input.url === actor.id.toString() + '/inbox') { assert.equal(input.method, 'POST') const body = await input.json() deliveredActivity = body @@ -392,12 +427,13 @@ describe('Mastodon APIs', () => { test('reblog records in db', async () => { const db = await makeDB() + const queue = makeQueue() const actor = await createPerson(domain, db, userKEK, 'sven@cloudflare.com') const note = await createPublicNote(domain, db, 'my first status', actor) const connectedActor: any = actor - const res = await statuses_reblog.handleRequest(db, note.mastodonId!, connectedActor, userKEK) + const res = await statuses_reblog.handleRequest(db, note.mastodonId!, connectedActor, userKEK, queue) assert.equal(res.status, 200) const data = await res.json() @@ -411,36 +447,25 @@ describe('Mastodon APIs', () => { test('reblog status adds in actor outbox', async () => { const db = await makeDB() const actor = await createPerson(domain, db, userKEK, 'sven@cloudflare.com') - const originalObjectId = 'https://example.com/note123' + const queue = makeQueue() - await db - .prepare( - 'INSERT INTO objects (id, type, properties, original_actor_id, original_object_id, mastodon_id, local) VALUES (?, ?, ?, ?, ?, ?, 0)' - ) - .bind( - 'https://example.com/object1', - 'Note', - JSON.stringify({ content: 'my first status' }), - actor.id.toString(), - originalObjectId, - 'mastodonid1' - ) - .run() + const note = await createPublicNote(domain, db, 'my first status', actor) const connectedActor: any = actor - const res = await statuses_reblog.handleRequest(db, 'mastodonid1', connectedActor, userKEK) + const res = await statuses_reblog.handleRequest(db, note.mastodonId!, connectedActor, userKEK, queue) assert.equal(res.status, 200) const row = await db.prepare(`SELECT * FROM outbox_objects`).first() assert.equal(row.actor_id, actor.id.toString()) - assert.equal(row.object_id, 'https://example.com/object1') + assert.equal(row.object_id, note.id.toString()) }) test('reblog remote status status sends Announce activity to author', async () => { let deliveredActivity: any = null const db = await makeDB() + const queue = makeQueue() const actor = await createPerson(domain, db, userKEK, 'sven@cloudflare.com') const originalObjectId = 'https://example.com/note123' @@ -459,16 +484,7 @@ describe('Mastodon APIs', () => { .run() globalThis.fetch = async (input: any) => { - if (input === actor.id.toString()) { - return new Response( - JSON.stringify({ - id: actor.id, - inbox: 'https://social.com/sven/inbox', - }) - ) - } - - if (input.url === 'https://social.com/sven/inbox') { + if (input.url === 'https://cloudflare.com/ap/users/sven/inbox') { assert.equal(input.method, 'POST') const body = await input.json() deliveredActivity = body @@ -480,7 +496,7 @@ describe('Mastodon APIs', () => { const connectedActor: any = actor - const res = await statuses_reblog.handleRequest(db, 'mastodonid1', connectedActor, userKEK) + const res = await statuses_reblog.handleRequest(db, 'mastodonid1', connectedActor, userKEK, queue) assert.equal(res.status, 200) assert(deliveredActivity) diff --git a/backend/test/utils.ts b/backend/test/utils.ts index 94cd3ce..ffec3a7 100644 --- a/backend/test/utils.ts +++ b/backend/test/utils.ts @@ -1,4 +1,5 @@ import { strict as assert } from 'node:assert/strict' +import type { Queue } from 'wildebeest/backend/src/types/queue' import { createClient } from 'wildebeest/backend/src/mastodon/client' import type { Client } from 'wildebeest/backend/src/mastodon/client' import { promises as fs } from 'fs' @@ -64,3 +65,23 @@ export async function createTestClient( ): Promise { return createClient(db, 'test client', redirectUri, 'https://cloudflare.com', scopes) } + +type TestQueue = Queue & { messages: Array } + +export function makeQueue(): TestQueue { + const messages: Array = [] + + return { + messages, + + async send(msg: any) { + messages.push(msg) + }, + + async sendBatch(batch: Array<{ body: any }>) { + for (let i = 0, len = batch.length; i < len; i++) { + messages.push(batch[i].body) + } + }, + } +} diff --git a/consumer/src/deliver.ts b/consumer/src/deliver.ts new file mode 100644 index 0000000..d3a08a5 --- /dev/null +++ b/consumer/src/deliver.ts @@ -0,0 +1,43 @@ +import type { MessageBody, DeliverMessageBody } from 'wildebeest/backend/src/types/queue' +import { getSigningKey } from 'wildebeest/backend/src/mastodon/account' +import * as actors from 'wildebeest/backend/src/activitypub/actors' +import type { Actor } from 'wildebeest/backend/src/activitypub/actors' +import type { Env } from './' +import { generateDigestHeader } from 'wildebeest/backend/src/utils/http-signing-cavage' +import { signRequest } from 'wildebeest/backend/src/utils/http-signing' + +const headers = { + 'content-type': 'application/activity+json', +} + +export async function handleDeliverMessage(env: Env, actor: Actor, message: DeliverMessageBody) { + const toActorId = new URL(message.toActorId) + const targetActor = await actors.getAndCache(toActorId, env.DATABASE) + if (targetActor === null) { + console.warn(`actor ${toActorId} not found`) + return + } + + const body = JSON.stringify(message.activity) + + const req = new Request(targetActor.inbox, { + method: 'POST', + body, + headers, + }) + const digest = await generateDigestHeader(body) + req.headers.set('Digest', digest) + const signingKey = await getSigningKey(message.userKEK, env.DATABASE, actor) + await signRequest(req, signingKey, actor.id) + + const res = await fetch(req) + if (!res.ok) { + const body = await res.text() + console.error(`delivery to ${targetActor.inbox} returned ${res.status}: ${body}`) + return + } + { + const body = await res.text() + console.log(`${targetActor.inbox} returned 200: ${body}`) + } +} diff --git a/consumer/src/inbox.ts b/consumer/src/inbox.ts new file mode 100644 index 0000000..20ed511 --- /dev/null +++ b/consumer/src/inbox.ts @@ -0,0 +1,23 @@ +import type { MessageBody, InboxMessageBody } from 'wildebeest/backend/src/types/queue' +import * as activityHandler from 'wildebeest/backend/src/activitypub/activities/handle' +import * as notification from 'wildebeest/backend/src/mastodon/notification' +import * as timeline from 'wildebeest/backend/src/mastodon/timeline' +import type { Actor } from 'wildebeest/backend/src/activitypub/actors' +import type { Env } from './' + +export async function handleInboxMessage(env: Env, actor: Actor, message: InboxMessageBody) { + const domain = env.DOMAIN + const db = env.DATABASE + const adminEmail = env.ADMIN_EMAIL + const cache = env.KV_CACHE + const activity = message.activity + + await activityHandler.handle(domain, activity, db, message.userKEK, adminEmail, message.vapidKeys) + + // Assuming we received new posts or a like, pregenerate the user's timelines + // and notifications. + await Promise.all([ + timeline.pregenerateTimelines(domain, db, cache, actor), + notification.pregenerateNotifications(db, cache, actor), + ]) +} diff --git a/consumer/src/index.ts b/consumer/src/index.ts index 4c1bd91..b4fb5cb 100644 --- a/consumer/src/index.ts +++ b/consumer/src/index.ts @@ -1,13 +1,14 @@ -import type { MessageBody } from 'wildebeest/backend/src/types/queue' +import type { MessageBody, InboxMessageBody, DeliverMessageBody } from 'wildebeest/backend/src/types/queue' import type { JWK } from 'wildebeest/backend/src/webpush/jwk' import type { Actor } from 'wildebeest/backend/src/activitypub/actors' import * as actors from 'wildebeest/backend/src/activitypub/actors' -import * as timeline from 'wildebeest/backend/src/mastodon/timeline' -import * as notification from 'wildebeest/backend/src/mastodon/notification' -import * as activityHandler from 'wildebeest/backend/src/activitypub/activities/handle' import type { Activity } from 'wildebeest/backend/src/activitypub/activities' +import { MessageType } from 'wildebeest/backend/src/types/queue' -type Env = { +import { handleInboxMessage } from './inbox' +import { handleDeliverMessage } from './deliver' + +export type Env = { DATABASE: D1Database DOMAIN: string ADMIN_EMAIL: string @@ -17,42 +18,24 @@ type Env = { export default { async queue(batch: MessageBatch, env: Env, ctx: ExecutionContext) { for (const message of batch.messages) { - try { - const actor = await actors.getPersonById(env.DATABASE, new URL(message.body.actorId)) - if (actor === null) { - console.warn(`actor ${message.body.actorId} is missing`) - return - } + const actor = await actors.getPersonById(env.DATABASE, new URL(message.body.actorId)) + if (actor === null) { + console.warn(`actor ${message.body.actorId} is missing`) + return + } - switch (message.body.type) { - case 'activity': { - await handleActivityMessage(env, actor, message.body) - break - } - default: - throw new Error('unsupported message type: ' + message.body.type) + switch (message.body.type) { + case MessageType.Inbox: { + await handleInboxMessage(env, actor, message.body as InboxMessageBody) + break } - } catch (err: any) { - console.error(err.stack) - // TODO: add sentry + case MessageType.Deliver: { + await handleDeliverMessage(env, actor, message.body as DeliverMessageBody) + break + } + default: + throw new Error('unsupported message type: ' + message.body.type) } } }, } - -async function handleActivityMessage(env: Env, actor: Actor, message: MessageBody) { - const domain = env.DOMAIN - const db = env.DATABASE - const adminEmail = env.ADMIN_EMAIL - const cache = env.KV_CACHE - const activity = message.content - - await activityHandler.handle(domain, activity, db, message.userKEK, adminEmail, message.vapidKeys) - - // Assuming we received new posts or a like, pregenerate the user's timelines - // and notifications. - await Promise.all([ - timeline.pregenerateTimelines(domain, db, cache, actor), - notification.pregenerateNotifications(db, cache, actor), - ]) -} diff --git a/consumer/test/consumer.spec.ts b/consumer/test/consumer.spec.ts new file mode 100644 index 0000000..0f1106e --- /dev/null +++ b/consumer/test/consumer.spec.ts @@ -0,0 +1,68 @@ +import { MessageType } from 'wildebeest/backend/src/types/queue' +import { strict as assert } from 'node:assert/strict' +import type { DeliverMessageBody } from 'wildebeest/backend/src/types/queue' +import { createPerson } from 'wildebeest/backend/src/activitypub/actors' +import { makeDB } from 'wildebeest/backend/test/utils' +import { createPublicNote } from 'wildebeest/backend/src/activitypub/objects/note' +import { handleDeliverMessage } from '../src/deliver' + +const domain = 'cloudflare.com' +const userKEK = 'test_kek25' + +describe('Consumer', () => { + describe('Deliver', () => { + test('deliver to target Actor', async () => { + const db = await makeDB() + + let receivedActivity: any = null + + globalThis.fetch = async (input: any) => { + if (input.toString() === 'https://example.com/users/a') { + return new Response( + JSON.stringify({ + id: 'https://example.com/users/a', + type: 'Person', + preferredUsername: 'someone', + inbox: 'https://example.com/inbox', + }) + ) + } + + if (input.url.toString() === 'https://example.com/inbox') { + assert.equal(input.method, 'POST') + receivedActivity = await input.json() + return new Response('') + } + + throw new Error('unexpected request to ' + input.url) + } + + const actor = await createPerson(domain, db, userKEK, 'sven@cloudflare.com') + const note = await createPublicNote(domain, db, 'my first status', actor) + + const activity: any = { + type: 'Create', + actor: actor.id.toString(), + to: ['https://example.com/users/a'], + cc: [], + object: note.id, + } + + const message: DeliverMessageBody = { + activity, + type: MessageType.Deliver, + actorId: actor.id.toString(), + toActorId: 'https://example.com/users/a', + userKEK: userKEK, + } + + const env = { + DATABASE: db, + } as any + await handleDeliverMessage(env, actor, message) + + assert(receivedActivity) + assert.equal(receivedActivity.type, activity.type) + }) + }) +}) diff --git a/consumer/wrangler.toml b/consumer/wrangler.toml index 4d5ffaf..24108d8 100644 --- a/consumer/wrangler.toml +++ b/consumer/wrangler.toml @@ -1,6 +1,7 @@ name = "wildebeest-consumer" compatibility_date = "2023-01-12" main = "./src/index.ts" +usage_model = "unbound" [[queues.consumers]] queue = "wildebeest" diff --git a/frontend/mock-db/init.ts b/frontend/mock-db/init.ts index 5d5c161..31f952f 100644 --- a/frontend/mock-db/init.ts +++ b/frontend/mock-db/init.ts @@ -5,6 +5,10 @@ import { statuses } from 'wildebeest/frontend/src/dummyData' import type { Account, MastodonStatus } from 'wildebeest/frontend/src/types' const kek = 'test-kek' +const queue = { + async send() {}, + async sendBatch() {}, +} /** * Run helper commands to initialize the database with actors, statuses, etc. */ @@ -39,7 +43,7 @@ async function createStatus(db: D1Database, actor: Person, status: string, visib headers, body: JSON.stringify(body), }) - const resp = await statusesAPI.handleRequest(req, db, actor, kek) + const resp = await statusesAPI.handleRequest(req, db, actor, kek, queue) return (await resp.json()) as MastodonStatus } @@ -61,5 +65,5 @@ async function getOrCreatePerson( } async function reblogStatus(db: D1Database, actor: Person, status: MastodonStatus) { - await reblogAPI.handleRequest(db, status.id, actor, kek) + await reblogAPI.handleRequest(db, status.id, actor, kek, queue) } diff --git a/functions/ap/users/[id]/inbox.ts b/functions/ap/users/[id]/inbox.ts index ec90ba6..b84b288 100644 --- a/functions/ap/users/[id]/inbox.ts +++ b/functions/ap/users/[id]/inbox.ts @@ -4,7 +4,8 @@ import type { JWK } from 'wildebeest/backend/src/webpush/jwk' import * as actors from 'wildebeest/backend/src/activitypub/actors' import { actorURL } from 'wildebeest/backend/src/activitypub/actors' import type { Env } from 'wildebeest/backend/src/types/env' -import type { MessageBody } from 'wildebeest/backend/src/types/queue' +import type { InboxMessageBody } from 'wildebeest/backend/src/types/queue' +import { MessageType } from 'wildebeest/backend/src/types/queue' import type { Activity } from 'wildebeest/backend/src/activitypub/activities' import { parseRequest } from 'wildebeest/backend/src/utils/httpsigjs/parser' import { fetchKey, verifySignature } from 'wildebeest/backend/src/utils/httpsigjs/verifier' @@ -37,7 +38,7 @@ export async function handleRequest( db: D1Database, id: string, activity: Activity, - queue: Queue, + queue: Queue, userKEK: string, vapidKeys: JWK ): Promise { @@ -54,9 +55,9 @@ export async function handleRequest( } await queue.send({ - type: 'activity', + type: MessageType.Inbox, actorId: actor.id.toString(), - content: activity, + activity, userKEK, vapidKeys, }) diff --git a/functions/api/v1/accounts/update_credentials.ts b/functions/api/v1/accounts/update_credentials.ts index 02dcf1e..5d84980 100644 --- a/functions/api/v1/accounts/update_credentials.ts +++ b/functions/api/v1/accounts/update_credentials.ts @@ -1,7 +1,7 @@ // https://docs.joinmastodon.org/methods/accounts/#update_credentials +import type { Queue, DeliverMessageBody } from 'wildebeest/backend/src/types/queue' import * as errors from 'wildebeest/backend/src/errors' -import { getSigningKey } from 'wildebeest/backend/src/mastodon/account' import * as activities from 'wildebeest/backend/src/activitypub/activities/update' import * as actors from 'wildebeest/backend/src/activitypub/actors' import { deliverFollowers } from 'wildebeest/backend/src/activitypub/deliver' @@ -20,7 +20,15 @@ const headers = { } export const onRequest: PagesFunction = async ({ request, data, env }) => { - return handleRequest(env.DATABASE, request, data.connectedActor, env.CF_ACCOUNT_ID, env.CF_API_TOKEN, env.userKEK) + return handleRequest( + env.DATABASE, + request, + data.connectedActor, + env.CF_ACCOUNT_ID, + env.CF_API_TOKEN, + env.userKEK, + env.QUEUE + ) } export async function handleRequest( @@ -31,7 +39,8 @@ export async function handleRequest( accountId: string, apiToken: string, - userKEK: string + userKEK: string, + queue: Queue ): Promise { if (!connectedActor) { return new Response('', { status: 401 }) @@ -106,8 +115,7 @@ export async function handleRequest( // send updates const activity = activities.create(domain, connectedActor, actor) - const signingKey = await getSigningKey(userKEK, db, connectedActor) - await deliverFollowers(db, signingKey, connectedActor, activity) + await deliverFollowers(db, userKEK, connectedActor, activity, queue) return new Response(JSON.stringify(res), { headers }) } diff --git a/functions/api/v1/statuses.ts b/functions/api/v1/statuses.ts index c13b576..92c6983 100644 --- a/functions/api/v1/statuses.ts +++ b/functions/api/v1/statuses.ts @@ -1,5 +1,5 @@ // https://docs.joinmastodon.org/methods/statuses/#create - +import type { Queue, DeliverMessageBody } from 'wildebeest/backend/src/types/queue' import { loadLocalMastodonAccount } from 'wildebeest/backend/src/mastodon/account' import { createPublicNote } from 'wildebeest/backend/src/activitypub/objects/note' import type { Document } from 'wildebeest/backend/src/activitypub/objects' @@ -23,7 +23,7 @@ type StatusCreate = { } export const onRequest: PagesFunction = async ({ request, env, data }) => { - return handleRequest(request, env.DATABASE, data.connectedActor, env.userKEK) + return handleRequest(request, env.DATABASE, data.connectedActor, env.userKEK, env.QUEUE) } // FIXME: add tests for delivery to followers and mentions to a specific Actor. @@ -31,7 +31,8 @@ export async function handleRequest( request: Request, db: D1Database, connectedActor: Person, - userKEK: string + userKEK: string, + queue: Queue ): Promise { // TODO: implement Idempotency-Key @@ -64,28 +65,29 @@ export async function handleRequest( const note = await createPublicNote(domain, db, body.status, connectedActor, mediaAttachments) await addObjectInOutbox(db, connectedActor, note) - // If the status is mentioning other persons, we need to delivery it to them. - const mentions = getMentions(body.status) - for (let i = 0, len = mentions.length; i < len; i++) { - if (mentions[i].domain === null) { - // Only deliver the note for remote actors - continue - } - const acct = `${mentions[i].localPart}@${mentions[i].domain}` - const targetActor = await queryAcct(mentions[i].domain!, acct) - if (targetActor === null) { - console.warn(`actor ${acct} not found`) - continue - } - note.to.push(targetActor.id.toString()) - const activity = activities.create(domain, connectedActor, note) - const signingKey = await getSigningKey(userKEK, db, connectedActor) - await deliverToActor(signingKey, connectedActor, targetActor, activity) - } - const activity = activities.create(domain, connectedActor, note) - const signingKey = await getSigningKey(userKEK, db, connectedActor) - await deliverFollowers(db, signingKey, connectedActor, activity) + await deliverFollowers(db, userKEK, connectedActor, activity, queue) + + { + // If the status is mentioning other persons, we need to delivery it to them. + const mentions = getMentions(body.status) + for (let i = 0, len = mentions.length; i < len; i++) { + if (mentions[i].domain === null) { + // Only deliver the note for remote actors + continue + } + const acct = `${mentions[i].localPart}@${mentions[i].domain}` + const targetActor = await queryAcct(mentions[i].domain!, acct) + if (targetActor === null) { + console.warn(`actor ${acct} not found`) + continue + } + note.to.push(targetActor.id.toString()) + const activity = activities.create(domain, connectedActor, note) + const signingKey = await getSigningKey(userKEK, db, connectedActor) + await deliverToActor(signingKey, connectedActor, targetActor, activity) + } + } const account = await loadLocalMastodonAccount(db, connectedActor) diff --git a/functions/api/v1/statuses/[id]/favourite.ts b/functions/api/v1/statuses/[id]/favourite.ts index 36f7ea8..e95752e 100644 --- a/functions/api/v1/statuses/[id]/favourite.ts +++ b/functions/api/v1/statuses/[id]/favourite.ts @@ -34,7 +34,7 @@ export async function handleRequest( if (obj.originalObjectId && obj.originalActorId) { // Liking an external object delivers the like activity - const targetActor = await actors.get(obj.originalActorId) + const targetActor = await actors.getAndCache(new URL(obj.originalActorId), db) if (!targetActor) { return new Response(`target Actor ${obj.originalActorId} not found`, { status: 404 }) } diff --git a/functions/api/v1/statuses/[id]/reblog.ts b/functions/api/v1/statuses/[id]/reblog.ts index fe89945..f88ced3 100644 --- a/functions/api/v1/statuses/[id]/reblog.ts +++ b/functions/api/v1/statuses/[id]/reblog.ts @@ -1,5 +1,5 @@ // https://docs.joinmastodon.org/methods/statuses/#boost - +import type { Queue, DeliverMessageBody } from 'wildebeest/backend/src/types/queue' import type { Env } from 'wildebeest/backend/src/types/env' import { addObjectInOutbox } from 'wildebeest/backend/src/activitypub/actors/outbox' import { insertReblog } from 'wildebeest/backend/src/mastodon/reblog' @@ -14,14 +14,15 @@ import type { ContextData } from 'wildebeest/backend/src/types/context' import { toMastodonStatusFromObject } from 'wildebeest/backend/src/mastodon/status' export const onRequest: PagesFunction = async ({ env, data, params }) => { - return handleRequest(env.DATABASE, params.id as string, data.connectedActor, env.userKEK) + return handleRequest(env.DATABASE, params.id as string, data.connectedActor, env.userKEK, env.QUEUE) } export async function handleRequest( db: D1Database, id: string, connectedActor: Person, - userKEK: string + userKEK: string, + queue: Queue ): Promise { const obj = await getObjectByMastodonId(db, id) if (obj === null) { @@ -33,23 +34,22 @@ export async function handleRequest( return new Response('', { status: 404 }) } - const signingKey = await getSigningKey(userKEK, db, connectedActor) - if (obj.originalObjectId && obj.originalActorId) { // Rebloggin an external object delivers the announce activity to the // post author. - const targetActor = await actors.get(obj.originalActorId) + const targetActor = await actors.getAndCache(new URL(obj.originalActorId), db) if (!targetActor) { return new Response(`target Actor ${obj.originalActorId} not found`, { status: 404 }) } const activity = announce.create(connectedActor, new URL(obj.originalObjectId)) + const signingKey = await getSigningKey(userKEK, db, connectedActor) await Promise.all([ // Delivers the announce activity to the post author. deliverToActor(signingKey, connectedActor, targetActor, activity), // Share reblogged by delivering the announce activity to followers - deliverFollowers(db, signingKey, connectedActor, activity), + deliverFollowers(db, userKEK, connectedActor, activity, queue), ]) } diff --git a/jest.config.js b/jest.config.js index 254b055..3e23f9f 100644 --- a/jest.config.js +++ b/jest.config.js @@ -2,7 +2,7 @@ export default { preset: 'ts-jest', verbose: true, - testMatch: ["/backend/test/**/(*.)+(spec|test).[jt]s?(x)"], + testMatch: ["/(backend|consumer)/test/**/(*.)+(spec|test).[jt]s?(x)"], testTimeout: 30000, testEnvironment: 'miniflare', // Configuration is automatically loaded from `.env`, `package.json` and