refactor remote statuses and detect duplicated objects on create

pull/26/head
Sven Sauleau 2023-01-06 14:40:20 +00:00
rodzic d179701e2f
commit 33b43d06f9
8 zmienionych plików z 159 dodań i 132 usunięć

Wyświetl plik

@ -28,30 +28,8 @@ function extractID(domain: string, s: string | URL): string {
return s.toString().replace(`https://${domain}/ap/users/`, '')
}
export type HandleResponse = {
createdObjects: Array<Object>
}
export type HandleMode = 'caching' | 'inbox'
export async function handle(
domain: string,
activity: Activity,
db: D1Database,
userKEK: string,
mode: HandleMode
): Promise<HandleResponse> {
const createdObjects: Array<Object> = []
// The `object` field of the activity is required to be an object, with an
// `id` and a `type` field.
const requireComplexObject = () => {
if (typeof activity.object !== 'object') {
throw new Error('`activity.object` must be of type object')
}
}
const getObjectAsId = () => {
export function makeGetObjectAsId(activity: Activity): Function {
return () => {
let url: any = null
if (activity.object.id !== undefined) {
url = activity.object.id
@ -74,8 +52,10 @@ export async function handle(
throw err
}
}
}
const getActorAsId = () => {
export function makeGetActorAsId(activity: Activity): Function {
return () => {
let url: any = null
if (activity.actor.id !== undefined) {
url = activity.actor.id
@ -98,6 +78,19 @@ export async function handle(
throw err
}
}
}
export async function handle(domain: string, activity: Activity, db: D1Database, userKEK: string) {
// The `object` field of the activity is required to be an object, with an
// `id` and a `type` field.
const requireComplexObject = () => {
if (typeof activity.object !== 'object') {
throw new Error('`activity.object` must be of type object')
}
}
const getObjectAsId = makeGetObjectAsId(activity)
const getActorAsId = makeGetActorAsId(activity)
console.log(activity)
switch (activity.type) {
@ -140,11 +133,17 @@ export async function handle(
}
const objectId = getObjectAsId()
const obj = await createObject(domain, activity.object, db, actorId, objectId)
if (obj === null) {
const res = await cacheObject(domain, activity.object, db, actorId, objectId)
if (res === null) {
break
}
createdObjects.push(obj)
if (!res.created) {
// Object already existed in our database. Probably a duplicated
// message
break
}
const obj = res.object
const actor = await actors.getAndCache(actorId, db)
@ -156,8 +155,8 @@ export async function handle(
if (inReplyToObject === null) {
const remoteObject = await objects.get(inReplyToObjectId)
inReplyToObject = await objects.cacheObject(domain, db, remoteObject, actorId, inReplyToObjectId, false)
createdObjects.push(inReplyToObject)
const res = await objects.cacheObject(domain, db, remoteObject, actorId, inReplyToObjectId, false)
inReplyToObject = res.object
}
await insertReply(db, actor, obj, inReplyToObject)
@ -168,27 +167,25 @@ export async function handle(
// actors on this instance to see the note in their timelines.
await addObjectInOutbox(db, fromActor, obj, activity.published)
if (mode === 'inbox') {
for (let i = 0, len = recipients.length; i < len; i++) {
const handle = parseHandle(extractID(domain, recipients[i]))
if (handle.domain !== null && handle.domain !== domain) {
console.warn('activity not for current instance')
continue
}
const person = await actors.getPersonById(db, actorURL(domain, handle.localPart))
if (person === null) {
console.warn(`person ${recipients[i]} not found`)
continue
}
// FIXME: check if the actor mentions the person
const notifId = await createNotification(db, 'mention', person, fromActor, obj)
await Promise.all([
await addObjectInInbox(db, person, obj),
await sendMentionNotification(db, fromActor, person, notifId),
])
for (let i = 0, len = recipients.length; i < len; i++) {
const handle = parseHandle(extractID(domain, recipients[i]))
if (handle.domain !== null && handle.domain !== domain) {
console.warn('activity not for current instance')
continue
}
const person = await actors.getPersonById(db, actorURL(domain, handle.localPart))
if (person === null) {
console.warn(`person ${recipients[i]} not found`)
continue
}
// FIXME: check if the actor mentions the person
const notifId = await createNotification(db, 'mention', person, fromActor, obj)
await Promise.all([
await addObjectInInbox(db, person, obj),
await sendMentionNotification(db, fromActor, person, notifId),
])
}
break
@ -253,11 +250,11 @@ export async function handle(
// Object doesn't exists locally, we'll need to download it.
const remoteObject = await objects.get<Note>(objectId)
obj = await createObject(domain, remoteObject, db, actorId, objectId)
if (obj === null) {
const res = await cacheObject(domain, remoteObject, db, actorId, objectId)
if (res === null) {
break
}
createdObjects.push(obj)
obj = res.object
} catch (err: any) {
console.warn(`failed to retrieve object ${objectId}: ${err.message}`)
break
@ -323,17 +320,15 @@ export async function handle(
default:
console.warn(`Unsupported activity: ${activity.type}`)
}
return { createdObjects }
}
async function createObject(
async function cacheObject(
domain: string,
obj: Object,
db: D1Database,
originalActorId: URL,
originalObjectId: URL
): Promise<Object | null> {
): Promise<{ created: boolean; object: Object } | null> {
switch (obj.type) {
case 'Note': {
return objects.cacheObject(domain, db, obj, originalActorId, originalObjectId, false)

Wyświetl plik

@ -71,6 +71,11 @@ export async function get<T>(url: URL): Promise<T> {
return res.json<T>()
}
type CacheObjectRes = {
created: boolean
object: Object
}
export async function cacheObject(
domain: string,
db: D1Database,
@ -78,10 +83,13 @@ export async function cacheObject(
originalActorId: URL,
originalObjectId: URL,
local: boolean
): Promise<Object> {
): Promise<CacheObjectRes> {
const cachedObject = await getObjectBy(db, 'original_object_id', originalObjectId.toString())
if (cachedObject !== null) {
return cachedObject
return {
created: false,
object: cachedObject,
}
}
const uuid = crypto.randomUUID()
@ -104,8 +112,7 @@ export async function cacheObject(
{
const properties = JSON.parse(row.properties)
return {
const object = {
published: new Date(row.cdate).toISOString(),
...properties,
@ -115,6 +122,8 @@ export async function cacheObject(
originalActorId: row.original_actor_id,
originalObjectId: row.original_object_id,
} as Object
return { object, created: true }
}
}

Wyświetl plik

@ -52,13 +52,17 @@ export async function toMastodonStatusFromObject(db: D1Database, obj: Note): Pro
if (Array.isArray(obj.attachment)) {
for (let i = 0, len = obj.attachment.length; i < len; i++) {
const document = await getObjectById(db, obj.attachment[i].id)
if (document === null) {
console.warn('missing attachment object: ' + obj.attachment[i].id)
continue
}
if (obj.attachment[i].id) {
const document = await getObjectById(db, obj.attachment[i].id)
if (document === null) {
console.warn('missing attachment object: ' + obj.attachment[i].id)
continue
}
mediaAttachments.push(media.fromObject(document))
mediaAttachments.push(media.fromObject(document))
} else {
console.warn('attachment has no id')
}
}
}

Wyświetl plik

@ -74,7 +74,7 @@ describe('ActivityPub', () => {
},
}
await activityHandler.handle(domain, activity, db, userKEK, 'inbox')
await activityHandler.handle(domain, activity, db, userKEK)
const row = await db
.prepare(`SELECT target_actor_id, state FROM actor_following WHERE actor_id=?`)
@ -96,7 +96,7 @@ describe('ActivityPub', () => {
object: 'a',
}
await assert.rejects(activityHandler.handle(domain, activity, db, userKEK, 'inbox'), {
await assert.rejects(activityHandler.handle(domain, activity, db, userKEK), {
message: '`activity.object` must be of type object',
})
})
@ -114,7 +114,7 @@ describe('ActivityPub', () => {
object: 'a',
}
await assert.rejects(activityHandler.handle(domain, activity, db, userKEK, 'inbox'), {
await assert.rejects(activityHandler.handle(domain, activity, db, userKEK), {
message: '`activity.object` must be of type object',
})
})
@ -131,7 +131,7 @@ describe('ActivityPub', () => {
object: 'a',
}
await assert.rejects(activityHandler.handle(domain, activity, db, userKEK, 'inbox'), {
await assert.rejects(activityHandler.handle(domain, activity, db, userKEK), {
message: '`activity.object` must be of type object',
})
})
@ -150,7 +150,7 @@ describe('ActivityPub', () => {
},
}
await assert.rejects(activityHandler.handle(domain, activity, db, userKEK, 'inbox'), {
await assert.rejects(activityHandler.handle(domain, activity, db, userKEK), {
message: 'object https://example.com/note2 does not exist',
})
})
@ -174,7 +174,7 @@ describe('ActivityPub', () => {
object: object,
}
await assert.rejects(activityHandler.handle(domain, activity, db, userKEK, 'inbox'), {
await assert.rejects(activityHandler.handle(domain, activity, db, userKEK), {
message: 'actorid mismatch when updating object',
})
})
@ -204,7 +204,7 @@ describe('ActivityPub', () => {
object: newObject,
}
await activityHandler.handle(domain, activity, db, userKEK, 'inbox')
await activityHandler.handle(domain, activity, db, userKEK)
const updatedObject = await db.prepare('SELECT * FROM objects WHERE original_object_id=?').bind(object.id).first()
assert(updatedObject)
@ -287,7 +287,7 @@ describe('ActivityPub', () => {
cc: [],
object: objectId,
}
await activityHandler.handle(domain, activity, db, userKEK, 'inbox')
await activityHandler.handle(domain, activity, db, userKEK)
const object = await db.prepare('SELECT * FROM objects').bind(remoteActorId).first()
assert(object)
@ -313,20 +313,22 @@ describe('ActivityPub', () => {
let result: any
// Cache object once adds it to the database
const obj1: any = await cacheObject(domain, db, properties, actor.id, originalObjectId, false)
assert.equal(obj1.a, 1)
assert.equal(obj1.b, 2)
const res1: any = await cacheObject(domain, db, properties, actor.id, originalObjectId, false)
assert.equal(res1.object.a, 1)
assert.equal(res1.object.b, 2)
assert(res1.created)
result = await db.prepare('SELECT count(*) as count from objects').first()
assert.equal(result.count, 1)
// Cache object second time updates the first one
properties.a = 3
const obj2: any = await cacheObject(domain, db, properties, actor.id, originalObjectId, false)
const res2: any = await cacheObject(domain, db, properties, actor.id, originalObjectId, false)
// The creation date and properties don't change
assert.equal(obj1.a, obj2.a)
assert.equal(obj1.b, obj2.b)
assert.equal(obj1.published, obj2.published)
assert.equal(res1.object.a, res2.object.a)
assert.equal(res1.object.b, res2.object.b)
assert.equal(res1.object.published, res2.object.published)
assert(!res2.created)
result = await db.prepare('SELECT count(*) as count from objects').first()
assert.equal(result.count, 1)

Wyświetl plik

@ -46,7 +46,7 @@ describe('ActivityPub', () => {
object: actor.id.toString(),
}
await activityHandler.handle(domain, activity, db, userKEK, 'inbox')
await activityHandler.handle(domain, activity, db, userKEK)
const row = await db
.prepare(`SELECT target_actor_id, state FROM actor_following WHERE actor_id=?`)
@ -144,7 +144,7 @@ describe('ActivityPub', () => {
object: actor.id,
}
await activityHandler.handle(domain, activity, db, userKEK, 'inbox')
await activityHandler.handle(domain, activity, db, userKEK)
const entry = await db.prepare('SELECT * FROM actor_notifications').first()
assert.equal(entry.type, 'follow')

Wyświetl plik

@ -420,8 +420,11 @@ describe('Mastodon APIs', () => {
await configure(db, { title: 'title', description: 'a', email: 'email' })
await generateVAPIDKeys(db)
const actor = await createPerson(domain, db, userKEK, 'sven@cloudflare.com')
const localNote = await createPublicNote(domain, db, 'my localnote status', actor)
const actorA = await createPerson(domain, db, userKEK, 'a@cloudflare.com')
const note = await createPublicNote(domain, db, 'my localnote status', actorA, [], {
attributedTo: actorA.id.toString(),
})
globalThis.fetch = async (input: RequestInfo) => {
if (input.toString() === 'https://social.com/.well-known/webfinger?resource=acct%3Asomeone%40social.com') {
@ -431,17 +434,17 @@ describe('Mastodon APIs', () => {
{
rel: 'self',
type: 'application/activity+json',
href: 'https://social.com/someone',
href: 'https://social.com/users/someone',
},
],
})
)
}
if (input.toString() === 'https://social.com/someone') {
if (input.toString() === 'https://social.com/users/someone') {
return new Response(
JSON.stringify({
id: 'https://social.com/someone',
id: 'https://social.com/users/someone',
type: 'Person',
preferredUsername: 'someone',
outbox: 'https://social.com/outbox',
@ -449,15 +452,6 @@ describe('Mastodon APIs', () => {
)
}
if (input.toString() === 'https://mastodon.social/users/someone') {
return new Response(
JSON.stringify({
id: 'https://mastodon.social/users/someone',
type: 'Person',
})
)
}
if (input.toString() === 'https://social.com/outbox') {
return new Response(
JSON.stringify({
@ -473,7 +467,7 @@ describe('Mastodon APIs', () => {
{
id: 'https://mastodon.social/users/a/statuses/b/activity',
type: 'Create',
actor: 'https://mastodon.social/users/someone',
actor: 'https://social.com/users/someone',
published: '2022-12-10T23:48:38Z',
object: {
id: 'https://example.com/object1',
@ -484,9 +478,9 @@ describe('Mastodon APIs', () => {
{
id: 'https://mastodon.social/users/c/statuses/d/activity',
type: 'Announce',
actor: 'https://mastodon.social/users/someone',
actor: 'https://social.com/users/someone',
published: '2022-12-10T23:48:38Z',
object: localNote.id,
object: note.id,
},
],
})
@ -501,7 +495,7 @@ describe('Mastodon APIs', () => {
assert.equal(res.status, 200)
const data = await res.json<Array<any>>()
assert.equal(data.length, 1)
assert.equal(data.length, 2)
assert.equal(data[0].content, '<p>p</p>')
assert.equal(data[0].account.username, 'someone')

Wyświetl plik

@ -54,7 +54,7 @@ export async function handleRequest(
return new Response('', { status: 404 })
}
await activityHandler.handle(domain, activity, db, userKEK, 'inbox')
await activityHandler.handle(domain, activity, db, userKEK)
// Assuming we received new posts or a like, pregenerate the user's timelines
// and notifications.

Wyświetl plik

@ -1,12 +1,14 @@
import type { Env } from 'wildebeest/backend/src/types/env'
import type { Note } from 'wildebeest/backend/src/activitypub/objects/note'
import { loadExternalMastodonAccount } from 'wildebeest/backend/src/mastodon/account'
import type { Object } from 'wildebeest/backend/src/activitypub/objects'
import { getPersonById } from 'wildebeest/backend/src/activitypub/actors'
import * as activityHandler from 'wildebeest/backend/src/activitypub/activities/handle'
import { makeGetActorAsId, makeGetObjectAsId } from 'wildebeest/backend/src/activitypub/activities/handle'
import { parseHandle } from 'wildebeest/backend/src/utils/parse'
import type { Handle } from 'wildebeest/backend/src/utils/parse'
import type { ContextData } from 'wildebeest/backend/src/types/context'
import type { MastodonAccount, MastodonStatus } from 'wildebeest/backend/src/types'
import { toMastodonStatusFromObject } from 'wildebeest/backend/src/mastodon/status'
import * as objects from 'wildebeest/backend/src/activitypub/objects'
import { actorURL } from 'wildebeest/backend/src/activitypub/actors'
import * as webfinger from 'wildebeest/backend/src/webfinger'
@ -39,15 +41,13 @@ export async function handleRequest(request: Request, db: D1Database, id: string
}
async function getRemoteStatuses(request: Request, handle: Handle, db: D1Database, userKEK: string): Promise<Response> {
const out: Array<MastodonStatus> = []
const url = new URL(request.url)
const domain = url.hostname
const isPinned = url.searchParams.get('pinned') === 'true'
if (isPinned) {
// TODO: pinned statuses are not implemented yet. Stub the endpoint
// to avoid returning statuses that aren't pinned.
return new Response(JSON.stringify(out), { headers })
return new Response(JSON.stringify([]), { headers })
}
const acct = `${handle.localPart}@${handle.domain}`
@ -59,39 +59,62 @@ async function getRemoteStatuses(request: Request, handle: Handle, db: D1Databas
const actor = await actors.getAndCache(link, db)
const activities = await outbox.get(actor)
let results: Array<Object> = []
for (let i = 0, len = activities.items.length; i < len; i++) {
const activity = activities.items[i]
const { createdObjects } = await activityHandler.handle(domain, activity, db, userKEK, 'caching')
results = [...results, ...createdObjects]
}
let statuses: Array<MastodonStatus> = []
const account = await loadExternalMastodonAccount(acct, actor)
if (results && results.length > 0) {
for (let i = 0, len = results.length; i < len; i++) {
const result: any = results[i]
for (let i = 0, len = activities.items.length; i < len; i++) {
const activity = activities.items[i]
out.push({
id: result.mastodonId,
uri: objects.uri(domain, result.id),
created_at: result.published,
content: result.content,
emojis: [],
media_attachments: [],
tags: [],
mentions: [],
account,
const getObjectAsId = makeGetObjectAsId(activity)
const getActorAsId = makeGetActorAsId(activity)
// TODO: stub values
visibility: 'public',
spoiler_text: '',
})
if (activity.type === 'Create') {
const actorId = getActorAsId()
const originalObjectId = getObjectAsId()
const res = await objects.cacheObject(domain, db, activity.object, actorId, originalObjectId, false)
const status = await toMastodonStatusFromObject(db, res.object as Note)
if (status !== null) {
statuses.push(status)
}
}
if (activity.type === 'Announce') {
let obj: any
const actorId = getActorAsId()
const objectId = getObjectAsId()
const localObject = await objects.getObjectById(db, objectId)
if (localObject === null) {
try {
// Object doesn't exists locally, we'll need to download it.
const remoteObject = await objects.get<Note>(objectId)
const res = await objects.cacheObject(domain, db, remoteObject, actorId, objectId, false)
if (res === null) {
break
}
obj = res.object
} catch (err: any) {
console.warn(`failed to retrieve object ${objectId}: ${err.message}`)
break
}
} else {
// Object already exists locally, we can just use it.
obj = localObject
}
const status = await toMastodonStatusFromObject(db, obj)
if (status !== null) {
statuses.push(status)
}
}
// FIXME: support other Activities, like Update.
}
return new Response(JSON.stringify(out), { headers })
return new Response(JSON.stringify(statuses), { headers })
}
async function getLocalStatuses(request: Request, db: D1Database, handle: Handle): Promise<Response> {