From eef5f88c29a1c8ffda14539b8b22a0a64cbcd688 Mon Sep 17 00:00:00 2001 From: Philipp Burckhardt Date: Wed, 5 Jul 2023 13:57:45 -0400 Subject: [PATCH] refactor: extend event emitter in agentic and task class --- legacy/packages/core/src/agentic.ts | 10 ++-- legacy/packages/core/src/events/emitters.ts | 54 --------------------- legacy/packages/core/src/events/index.ts | 1 - legacy/packages/core/src/task.ts | 54 +++++++++++++++------ 4 files changed, 41 insertions(+), 78 deletions(-) delete mode 100644 legacy/packages/core/src/events/emitters.ts diff --git a/legacy/packages/core/src/agentic.ts b/legacy/packages/core/src/agentic.ts index 110e378c..e543eb57 100644 --- a/legacy/packages/core/src/agentic.ts +++ b/legacy/packages/core/src/agentic.ts @@ -11,11 +11,10 @@ import { OpenAIChatCompletion } from './llms/openai' import { defaultLogger } from './logger' import { defaultIDGeneratorFn, isFunction, isString } from './utils' -export class Agentic { +export class Agentic extends EventEmitter { protected _ky: types.KyInstance protected _logger: types.Logger protected _taskTracker: TerminalTaskTracker - protected _eventEmitter: EventEmitter protected _openai?: types.openai.OpenAIClient protected _anthropic?: types.anthropic.Client @@ -41,6 +40,8 @@ export class Agentic { ky?: types.KyInstance taskTracker?: TerminalTaskTracker }) { + super() + // TODO: This is a bit hacky, but we're doing it to have a slightly nicer API // for the end developer when creating subclasses of `BaseTask` to use as // tools. @@ -54,7 +55,6 @@ export class Agentic { this._ky = opts.ky ?? defaultKy this._logger = opts.logger ?? defaultLogger this._taskTracker = opts.taskTracker ?? new TerminalTaskTracker() - this._eventEmitter = new EventEmitter() this._openaiModelDefaults = { provider: 'openai', @@ -109,10 +109,6 @@ export class Agentic { return this._taskTracker } - public get eventEmitter(): EventEmitter { - return this._eventEmitter - } - public get idGeneratorFn(): types.IDGeneratorFunction { return this._idGeneratorFn } diff --git a/legacy/packages/core/src/events/emitters.ts b/legacy/packages/core/src/events/emitters.ts deleted file mode 100644 index 2b54ad6d..00000000 --- a/legacy/packages/core/src/events/emitters.ts +++ /dev/null @@ -1,54 +0,0 @@ -import { EventEmitter } from 'eventemitter3' - -import * as types from '@/types' -import type { Agentic } from '@/agentic' -import { BaseTask } from '@/task' - -import { TaskEvent, TaskStatus } from './event' - -/** - * Event emitter for task events. - */ -export class TaskEventEmitter< - TInput extends types.TaskInput = void, - TOutput extends types.TaskOutput = string -> extends EventEmitter { - protected _agentic: Agentic - protected _task: BaseTask - - constructor(task: BaseTask, agentic: Agentic) { - super() - - this._task = task - this._agentic = agentic - } - - on( - takStatus: T, - fn: (event: TaskEvent) => void, - context?: any - ): this { - return super.on(takStatus, fn, context) - } - - emit(taskStatus: string | symbol, payload: object = {}): boolean { - if (!Object.values(TaskStatus).includes(taskStatus as TaskStatus)) { - throw new Error(`Invalid task status: ${String(taskStatus)}`) - } - - const { id, nameForModel } = this._task - const event = new TaskEvent({ - payload: { - taskStatus: taskStatus as TaskStatus, - taskId: id, - taskName: nameForModel, - ...payload - } - }) - this._agentic.taskTracker.addEvent(event) - - this._agentic.eventEmitter.emit(taskStatus, event) - - return super.emit(taskStatus, event) - } -} diff --git a/legacy/packages/core/src/events/index.ts b/legacy/packages/core/src/events/index.ts index c52faae3..b15f5f24 100644 --- a/legacy/packages/core/src/events/index.ts +++ b/legacy/packages/core/src/events/index.ts @@ -1,3 +1,2 @@ -export * from './emitters' export * from './event' export * from './tracker' diff --git a/legacy/packages/core/src/task.ts b/legacy/packages/core/src/task.ts index c6188c33..8942a6a4 100644 --- a/legacy/packages/core/src/task.ts +++ b/legacy/packages/core/src/task.ts @@ -1,3 +1,4 @@ +import EventEmitter from 'eventemitter3' import pRetry, { FailedAttemptError } from 'p-retry' import QuickLRU from 'quick-lru' import { ZodType } from 'zod' @@ -6,7 +7,7 @@ import * as errors from './errors' import * as types from './types' import type { Agentic } from './agentic' import { SKIP_HOOKS } from './constants' -import { TaskEventEmitter, TaskStatus } from './events' +import { TaskEvent, TaskStatus } from './events' import { HumanFeedbackMechanismCLI, HumanFeedbackOptions, @@ -30,14 +31,13 @@ import { defaultIDGeneratorFn, isValidTaskIdentifier } from './utils' export abstract class BaseTask< TInput extends types.TaskInput = void, TOutput extends types.TaskOutput = string -> { +> extends EventEmitter { protected _agentic: Agentic protected _id: string protected _timeoutMs?: number protected _retryConfig: types.RetryConfig protected _cacheConfig: types.CacheConfig - protected _eventEmitter: TaskEventEmitter protected _preHooks: Array<{ hook: types.TaskBeforeCallHook @@ -50,6 +50,8 @@ export abstract class BaseTask< }> = [] constructor(options: types.BaseTaskOptions = {}) { + super() + this._agentic = options.agentic ?? globalThis.__agentic?.deref() this._timeoutMs = options.timeoutMs @@ -73,11 +75,6 @@ export abstract class BaseTask< this._id = options.id ?? this._agentic?.idGeneratorFn() ?? defaultIDGeneratorFn() - - this._eventEmitter = new TaskEventEmitter( - this, - this._agentic - ) } public get agentic(): Agentic { @@ -96,10 +93,6 @@ export abstract class BaseTask< return this._agentic.logger } - public get eventEmitter(): TaskEventEmitter { - return this._eventEmitter - } - public abstract get inputSchema(): ZodType public abstract get outputSchema(): ZodType @@ -285,7 +278,7 @@ export abstract class BaseTask< } } - this._eventEmitter.emit(TaskStatus.RUNNING, { + this.emit(TaskStatus.RUNNING, { taskInputs: input, ...ctx.metadata }) @@ -360,7 +353,7 @@ export abstract class BaseTask< ctx.attemptNumber = err.attemptNumber + 1 ctx.metadata.error = err - this._eventEmitter.emit(TaskStatus.RETRYING, { + this.emit(TaskStatus.RETRYING, { taskInputs: input, taskOutput: err, ...ctx.metadata @@ -389,7 +382,7 @@ export abstract class BaseTask< // task for now. return } else { - this._eventEmitter.emit(TaskStatus.FAILED, { + this.emit(TaskStatus.FAILED, { taskInputs: input, taskOutput: err, ...ctx.metadata @@ -411,7 +404,7 @@ export abstract class BaseTask< // ctx.tracker.setOutput(stringifyForDebugging(result, { maxLength: 100 })) - this._eventEmitter.emit(TaskStatus.COMPLETED, { + this.emit(TaskStatus.COMPLETED, { taskInputs: input, taskOutput: result, ...ctx.metadata @@ -433,4 +426,33 @@ export abstract class BaseTask< // input: TInput, // onProgress: types.ProgressFunction // }): Promise + + on( + takStatus: T, + fn: (event: TaskEvent) => void, + context?: any + ): this { + return super.on(takStatus, fn, context) + } + + emit(taskStatus: string | symbol, payload: object = {}): boolean { + if (!Object.values(TaskStatus).includes(taskStatus as TaskStatus)) { + throw new Error(`Invalid task status: ${String(taskStatus)}`) + } + + const { id, nameForModel } = this + const event = new TaskEvent({ + payload: { + taskStatus: taskStatus as TaskStatus, + taskId: id, + taskName: nameForModel, + ...payload + } + }) + this._agentic.taskTracker.addEvent(event) + + this._agentic.emit(taskStatus, event) + + return super.emit(taskStatus, event) + } }