feat: handle both thread and channel replies

old-agentic-v1^2
Philipp Burckhardt 2023-06-10 14:37:58 -04:00 zatwierdzone przez Travis Fischer
rodzic 7ddb15b306
commit df03a81ddf
1 zmienionych plików z 144 dodań i 44 usunięć

Wyświetl plik

@ -17,18 +17,6 @@ export interface SlackBotProfile {
team_id: string
}
export interface SlackBotMessage {
bot_id: string
type: string
text: string
user: string
ts: string
app_id: string
blocks: Record<string, unknown>[]
team: string
bot_profile: SlackBotProfile
}
export interface SlackReplies {
messages: SlackMessage[]
has_more: boolean
@ -37,11 +25,13 @@ export interface SlackReplies {
}
export interface SlackMessage {
bot_id?: string
client_msg_id?: string
type: string
text: string
user: string
ts: string
app_id?: string
blocks?: Record<string, unknown>[]
reply_count?: number
subscribed?: boolean
@ -50,22 +40,67 @@ export interface SlackMessage {
team?: string
thread_ts: string
parent_user_id?: string
bot_profile?: SlackBotProfile
}
export interface SlackResponseMetadata {
next_cursor: string
}
export type SlackSendMessageOptions = {
export type SlackAttachment = {
[key: string]: any
}
export type SlackBlock = {
[key: string]: any
}
export type SlackPostMessageParams = {
/**
* The ID of the channel to send the message to.
*/
channel: string
/**
* The text of the message to send.
*/
text: string
/**
* The channel ID to send the message to.
* The timestamp of a parent message to send the message as a reply to.
*/
channelId: string
thread_ts?: string
attachments?: SlackAttachment[]
blocks?: SlackBlock[]
icon_emoji?: string
icon_url?: string
link_names?: boolean
parse?: 'full' | 'none'
reply_broadcast?: boolean
unfurl_links?: boolean
unfurl_media?: boolean
username?: string
}
export type SlackConversationHistoryParams = {
channel: string
oldest?: string
cursor?: string
latest?: string
limit?: number
inclusive?: boolean
include_all_metadata?: boolean
}
export type SlackConversationRepliesParams = {
channel: string
ts: string
cursor?: string
latest?: string
oddest?: string
limit?: number
inclusive?: boolean
include_thread_metadata?: boolean
}
export type SlackSendAndWaitOptions = {
@ -75,9 +110,9 @@ export type SlackSendAndWaitOptions = {
text: string
/**
* The channel ID to send the message to.
* The ID of the channel to send the message to.
*/
channelId: string
channel?: string
/**
* The timeout in milliseconds to wait for a reply before throwing an error.
@ -93,17 +128,26 @@ export type SlackSendAndWaitOptions = {
* A function to validate the reply message. If the function returns `true`, the reply is considered valid and the function will return the message. If the function returns `false`, the reply is considered invalid and the function will continue to wait for a reply until the timeout is reached.
*/
validate?: (message: SlackMessage) => boolean
/**
* A stop signal from an [`AbortController`](https://developer.mozilla.org/en-US/docs/Web/API/AbortController), which can be used to abort retrying. More specifically, when `AbortController.abort(reason)` is called, the function will throw an error with the `reason` argument as the error message.
*/
stopSignal?: AbortSignal
}
export class SlackClient {
private api: typeof ky
protected defaultChannel?: string
constructor({
apiKey = process.env.SLACK_API_KEY,
baseUrl = SLACK_API_BASE_URL
baseUrl = SLACK_API_BASE_URL,
defaultChannel = process.env.SLACK_DEFAULT_CHANNEL
}: {
apiKey?: string
baseUrl?: string
defaultChannel?: string
} = {}) {
if (!apiKey) {
throw new Error(`Error SlackClient missing required "apiKey"`)
@ -114,34 +158,66 @@ export class SlackClient {
Authorization: `Bearer ${apiKey}`
}
})
this.defaultChannel = defaultChannel
}
/**
* Sends a message to a channel.
*/
public async sendMessage({ text, channelId }: SlackSendMessageOptions) {
public async sendMessage(options: SlackPostMessageParams) {
const res = await this.api.post('chat.postMessage', {
json: {
channel: channelId,
text: text
}
json: options
})
return res.json<SlackBotMessage>()
return res.json<SlackMessage>()
}
/**
* Fetches a conversation's history of messages and events.
*/
public async fetchConversationHistory(
options: SlackConversationHistoryParams
) {
const response = await this.api.get('conversations.history', {
searchParams: options
})
return response.json<SlackReplies>()
}
/**
* Fetches replies to a message in a channel.
*/
protected async fetchReplies(channelId: string, messageTs: string) {
protected async fetchReplies(options: SlackConversationRepliesParams) {
const response = await this.api.get('conversations.replies', {
searchParams: {
channel: channelId,
ts: messageTs
}
searchParams: options
})
return response.json<SlackReplies>()
}
/**
* Returns a list of messages that were sent in a channel after a given timestamp both directly and in threads.
*/
private async fetchCandidates(channel: string, ts: string) {
let candidates: SlackMessage[] = []
const history = await this.fetchConversationHistory({ channel })
const directReplies = await this.fetchReplies({ channel, ts })
if (directReplies.ok) {
candidates = candidates.concat(directReplies.messages)
}
if (history.ok) {
candidates = candidates.concat(history.messages)
}
// Filter out older messages before the message was sent and drop bot messages:
candidates = candidates.filter(
(message) => message.ts > ts && !message.bot_id
)
// Sort by timestamp so that the most recent messages come first:
candidates.sort((a, b) => {
return parseFloat(b.ts) - parseFloat(a.ts)
})
return candidates
}
/**
* Sends a message to a channel and waits for a reply to the message, which is returned if it passes validation.
*
@ -151,32 +227,56 @@ export class SlackClient {
*/
public async sendAndWaitForReply({
text,
channelId,
channel = this.defaultChannel,
timeoutMs = DEFAULT_SLACK_TIMEOUT_MS,
intervalMs = DEFAULT_SLACK_INTERVAL_MS,
validate = () => true
validate = () => true,
stopSignal
}: SlackSendAndWaitOptions) {
const res = await this.sendMessage({ text, channelId })
if (!channel) {
throw new Error(`Error SlackClient missing required "channel"`)
}
let aborted = false
stopSignal?.addEventListener(
'abort',
() => {
aborted = true
},
{ once: true }
)
const res = await this.sendMessage({ text, channel })
if (!res.ts) {
throw new Error('Missing ts in response')
}
const start = Date.now()
while (Date.now() - start < timeoutMs) {
const response = await this.fetchReplies(channelId, res.ts)
if (response.ok && response.messages.length > 1) {
// first message is the original message
const candidate = response.messages[response.messages.length - 1]
let nUserMessages = 0
do {
if (aborted) {
const reason = stopSignal?.reason || 'Aborted waiting for reply'
if (reason instanceof Error) {
throw reason
} else {
throw new Error(reason)
}
}
const candidates = await this.fetchCandidates(channel, res.ts)
if (candidates.length > 0) {
const candidate = candidates[0]
if (validate(candidate)) {
return candidate
} else {
}
if (nUserMessages !== candidates.length) {
await this.sendMessage({
text: `Invalid response: ${candidate.text}. Please try again with a valid response format.`,
channelId
text: `Invalid response: ${candidate.text}. Please try again following the instructions.`,
channel,
thread_ts: candidate.ts
})
}
nUserMessages = candidates.length
}
await sleep(intervalMs)
}
} while (Date.now() - start < timeoutMs)
throw new Error('Reached timeout waiting for reply')
}
}