refactor: extend event emitter in agentic and task class

old-agentic-v1^2
Philipp Burckhardt 2023-07-05 13:57:45 -04:00
rodzic 999cb4a0fc
commit eef5f88c29
4 zmienionych plików z 41 dodań i 78 usunięć

Wyświetl plik

@ -11,11 +11,10 @@ import { OpenAIChatCompletion } from './llms/openai'
import { defaultLogger } from './logger'
import { defaultIDGeneratorFn, isFunction, isString } from './utils'
export class Agentic {
export class Agentic extends EventEmitter {
protected _ky: types.KyInstance
protected _logger: types.Logger
protected _taskTracker: TerminalTaskTracker
protected _eventEmitter: EventEmitter
protected _openai?: types.openai.OpenAIClient
protected _anthropic?: types.anthropic.Client
@ -41,6 +40,8 @@ export class Agentic {
ky?: types.KyInstance
taskTracker?: TerminalTaskTracker
}) {
super()
// TODO: This is a bit hacky, but we're doing it to have a slightly nicer API
// for the end developer when creating subclasses of `BaseTask` to use as
// tools.
@ -54,7 +55,6 @@ export class Agentic {
this._ky = opts.ky ?? defaultKy
this._logger = opts.logger ?? defaultLogger
this._taskTracker = opts.taskTracker ?? new TerminalTaskTracker()
this._eventEmitter = new EventEmitter()
this._openaiModelDefaults = {
provider: 'openai',
@ -109,10 +109,6 @@ export class Agentic {
return this._taskTracker
}
public get eventEmitter(): EventEmitter {
return this._eventEmitter
}
public get idGeneratorFn(): types.IDGeneratorFunction {
return this._idGeneratorFn
}

Wyświetl plik

@ -1,54 +0,0 @@
import { EventEmitter } from 'eventemitter3'
import * as types from '@/types'
import type { Agentic } from '@/agentic'
import { BaseTask } from '@/task'
import { TaskEvent, TaskStatus } from './event'
/**
* Event emitter for task events.
*/
export class TaskEventEmitter<
TInput extends types.TaskInput = void,
TOutput extends types.TaskOutput = string
> extends EventEmitter {
protected _agentic: Agentic
protected _task: BaseTask<TInput, TOutput>
constructor(task: BaseTask<TInput, TOutput>, agentic: Agentic) {
super()
this._task = task
this._agentic = agentic
}
on<T extends string | symbol>(
takStatus: T,
fn: (event: TaskEvent<TInput, TOutput>) => void,
context?: any
): this {
return super.on(takStatus, fn, context)
}
emit(taskStatus: string | symbol, payload: object = {}): boolean {
if (!Object.values(TaskStatus).includes(taskStatus as TaskStatus)) {
throw new Error(`Invalid task status: ${String(taskStatus)}`)
}
const { id, nameForModel } = this._task
const event = new TaskEvent<TInput, TOutput>({
payload: {
taskStatus: taskStatus as TaskStatus,
taskId: id,
taskName: nameForModel,
...payload
}
})
this._agentic.taskTracker.addEvent(event)
this._agentic.eventEmitter.emit(taskStatus, event)
return super.emit(taskStatus, event)
}
}

Wyświetl plik

@ -1,3 +1,2 @@
export * from './emitters'
export * from './event'
export * from './tracker'

Wyświetl plik

@ -1,3 +1,4 @@
import EventEmitter from 'eventemitter3'
import pRetry, { FailedAttemptError } from 'p-retry'
import QuickLRU from 'quick-lru'
import { ZodType } from 'zod'
@ -6,7 +7,7 @@ import * as errors from './errors'
import * as types from './types'
import type { Agentic } from './agentic'
import { SKIP_HOOKS } from './constants'
import { TaskEventEmitter, TaskStatus } from './events'
import { TaskEvent, TaskStatus } from './events'
import {
HumanFeedbackMechanismCLI,
HumanFeedbackOptions,
@ -30,14 +31,13 @@ import { defaultIDGeneratorFn, isValidTaskIdentifier } from './utils'
export abstract class BaseTask<
TInput extends types.TaskInput = void,
TOutput extends types.TaskOutput = string
> {
> extends EventEmitter {
protected _agentic: Agentic
protected _id: string
protected _timeoutMs?: number
protected _retryConfig: types.RetryConfig
protected _cacheConfig: types.CacheConfig<TInput, TOutput>
protected _eventEmitter: TaskEventEmitter<TInput, TOutput>
protected _preHooks: Array<{
hook: types.TaskBeforeCallHook<TInput>
@ -50,6 +50,8 @@ export abstract class BaseTask<
}> = []
constructor(options: types.BaseTaskOptions = {}) {
super()
this._agentic = options.agentic ?? globalThis.__agentic?.deref()
this._timeoutMs = options.timeoutMs
@ -73,11 +75,6 @@ export abstract class BaseTask<
this._id =
options.id ?? this._agentic?.idGeneratorFn() ?? defaultIDGeneratorFn()
this._eventEmitter = new TaskEventEmitter<TInput, TOutput>(
this,
this._agentic
)
}
public get agentic(): Agentic {
@ -96,10 +93,6 @@ export abstract class BaseTask<
return this._agentic.logger
}
public get eventEmitter(): TaskEventEmitter<TInput, TOutput> {
return this._eventEmitter
}
public abstract get inputSchema(): ZodType<TInput>
public abstract get outputSchema(): ZodType<TOutput>
@ -285,7 +278,7 @@ export abstract class BaseTask<
}
}
this._eventEmitter.emit(TaskStatus.RUNNING, {
this.emit(TaskStatus.RUNNING, {
taskInputs: input,
...ctx.metadata
})
@ -360,7 +353,7 @@ export abstract class BaseTask<
ctx.attemptNumber = err.attemptNumber + 1
ctx.metadata.error = err
this._eventEmitter.emit(TaskStatus.RETRYING, {
this.emit(TaskStatus.RETRYING, {
taskInputs: input,
taskOutput: err,
...ctx.metadata
@ -389,7 +382,7 @@ export abstract class BaseTask<
// task for now.
return
} else {
this._eventEmitter.emit(TaskStatus.FAILED, {
this.emit(TaskStatus.FAILED, {
taskInputs: input,
taskOutput: err,
...ctx.metadata
@ -411,7 +404,7 @@ export abstract class BaseTask<
// ctx.tracker.setOutput(stringifyForDebugging(result, { maxLength: 100 }))
this._eventEmitter.emit(TaskStatus.COMPLETED, {
this.emit(TaskStatus.COMPLETED, {
taskInputs: input,
taskOutput: result,
...ctx.metadata
@ -433,4 +426,33 @@ export abstract class BaseTask<
// input: TInput,
// onProgress: types.ProgressFunction
// }): Promise<TOutput>
on<T extends string | symbol>(
takStatus: T,
fn: (event: TaskEvent<TInput, TOutput>) => void,
context?: any
): this {
return super.on(takStatus, fn, context)
}
emit(taskStatus: string | symbol, payload: object = {}): boolean {
if (!Object.values(TaskStatus).includes(taskStatus as TaskStatus)) {
throw new Error(`Invalid task status: ${String(taskStatus)}`)
}
const { id, nameForModel } = this
const event = new TaskEvent<TInput, TOutput>({
payload: {
taskStatus: taskStatus as TaskStatus,
taskId: id,
taskName: nameForModel,
...payload
}
})
this._agentic.taskTracker.addEvent(event)
this._agentic.emit(taskStatus, event)
return super.emit(taskStatus, event)
}
}