Skip to content

Commit

Permalink
testing 2
Browse files Browse the repository at this point in the history
  • Loading branch information
nbsp committed Dec 23, 2024
1 parent 98c22e4 commit ece515b
Show file tree
Hide file tree
Showing 7 changed files with 114 additions and 126 deletions.
22 changes: 9 additions & 13 deletions agents/src/llm/llm.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,14 @@ export abstract class LLMStream implements AsyncIterableIterator<ChatChunk> {
#llm: LLM;
#chatCtx: ChatContext;
#fncCtx?: FunctionContext;
#outputReadable: ReadableStream<ChatChunk>;
#reader: ReadableStreamDefaultReader<ChatChunk>;

constructor(llm: LLM, chatCtx: ChatContext, fncCtx?: FunctionContext) {
this.#llm = llm;
this.#chatCtx = chatCtx;
this.#fncCtx = fncCtx;
const [r1, r2] = this.output.readable.tee();
this.#outputReadable = r1;
this.#reader = r1.getReader();
this.monitorMetrics(r2);
}

Expand Down Expand Up @@ -140,20 +140,16 @@ export abstract class LLMStream implements AsyncIterableIterator<ChatChunk> {
}

async next(): Promise<IteratorResult<ChatChunk>> {
return this.#outputReadable
.getReader()
.read()
.then(({ value }) => {
if (value) {
return { value, done: false };
} else {
return { value: undefined, done: true };
}
});
return this.#reader.read().then(({ value }) => {
if (value) {
return { value, done: false };
} else {
return { value: undefined, done: true };
}
});
}

close() {
this.output.writable.close();
this.closed = true;
}

Expand Down
8 changes: 5 additions & 3 deletions agents/src/pipeline/pipeline_agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ export class VoicePipelineAgent extends (EventEmitter as new () => TypedEmitter<
#agentPublication?: LocalTrackPublication;
#lastFinalTranscriptTime?: number;
#lastSpeechTime?: number;
#writer: WritableStreamDefaultWriter<SpeechHandle | typeof VoicePipelineAgent.FLUSH_SENTINEL>;

constructor(
/** Voice Activity Detection instance. */
Expand Down Expand Up @@ -289,6 +290,8 @@ export class VoicePipelineAgent extends (EventEmitter as new () => TypedEmitter<
this.#validateReplyIfPossible.bind(this),
this.#opts.minEndpointingDelay,
);

this.#writer = this.#speechQueue.writable.getWriter();
}

get fncCtx(): FunctionContext | undefined {
Expand Down Expand Up @@ -924,9 +927,8 @@ export class VoicePipelineAgent extends (EventEmitter as new () => TypedEmitter<
}

#addSpeechForPlayout(handle: SpeechHandle) {
const writer = this.#speechQueue.writable.getWriter();
writer.write(handle);
writer.write(VoicePipelineAgent.FLUSH_SENTINEL);
this.#writer.write(handle);
this.#writer.write(VoicePipelineAgent.FLUSH_SENTINEL);
this.#speechQueueOpen.resolve();
}

Expand Down
22 changes: 9 additions & 13 deletions agents/src/stt/stt.ts
Original file line number Diff line number Diff line change
Expand Up @@ -147,14 +147,14 @@ export abstract class SpeechStream implements AsyncIterableIterator<SpeechEvent>
protected closed = false;
protected inputClosed = false;
#stt: STT;
#outputReadable: ReadableStream<SpeechEvent>;
#reader: ReadableStreamDefaultReader<SpeechEvent>;
#writer: WritableStreamDefaultWriter<AudioFrame | typeof SpeechStream.FLUSH_SENTINEL>;

constructor(stt: STT) {
this.#stt = stt;
this.#writer = this.input.writable.getWriter();
const [r1, r2] = this.output.readable.tee();
this.#outputReadable = r1;
this.#reader = r1.getReader();
this.monitorMetrics(r2);
}

Expand Down Expand Up @@ -211,22 +211,18 @@ export abstract class SpeechStream implements AsyncIterableIterator<SpeechEvent>
}

async next(): Promise<IteratorResult<SpeechEvent>> {
return this.#outputReadable
.getReader()
.read()
.then(({ value }) => {
if (value) {
return { value, done: false };
} else {
return { value: undefined, done: true };
}
});
return this.#reader.read().then(({ value }) => {
if (value) {
return { value, done: false };
} else {
return { value: undefined, done: true };
}
});
}

/** Close both the input and output of the STT stream */
close() {
this.input.writable.close();
this.output.writable.close();
this.closed = true;
this.inputClosed = true;
}
Expand Down
35 changes: 17 additions & 18 deletions agents/src/tokenize/token_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,15 @@ export class BufferedTokenStream implements AsyncIterableIterator<TokenData> {
#inBuf = '';
#outBuf = '';
#currentSegmentId: string;
#writer: WritableStreamDefaultWriter<TokenData>;
#reader: ReadableStreamDefaultReader<TokenData>;

constructor(func: TokenizeFunc, minTokenLength: number, minContextLength: number) {
this.#func = func;
this.#minTokenLength = minTokenLength;
this.#minContextLength = minContextLength;
this.#reader = this.queue.readable.getReader();
this.#writer = this.queue.writable.getWriter();

this.#currentSegmentId = randomUUID();
}
Expand All @@ -33,8 +37,6 @@ export class BufferedTokenStream implements AsyncIterableIterator<TokenData> {
throw new Error('Stream is closed');
}

const writer = this.queue.writable.getWriter();

this.#inBuf += text;
if (this.#inBuf.length < this.#minContextLength) return;

Expand All @@ -52,7 +54,7 @@ export class BufferedTokenStream implements AsyncIterableIterator<TokenData> {

this.#outBuf += tokText;
if (this.#outBuf.length >= this.#minTokenLength) {
writer.write({ token: this.#outBuf, segmentId: this.#currentSegmentId });
this.#writer.write({ token: this.#outBuf, segmentId: this.#currentSegmentId });
this.#outBuf = '';
}

Expand All @@ -72,8 +74,6 @@ export class BufferedTokenStream implements AsyncIterableIterator<TokenData> {
throw new Error('Stream is closed');
}

const writer = this.queue.writable.getWriter();

if (this.#inBuf || this.#outBuf) {
const tokens = this.#func(this.#inBuf);
if (tokens) {
Expand All @@ -87,7 +87,7 @@ export class BufferedTokenStream implements AsyncIterableIterator<TokenData> {
}

if (this.#outBuf) {
writer.write({ token: this.#outBuf, segmentId: this.#currentSegmentId });
this.#writer.write({ token: this.#outBuf, segmentId: this.#currentSegmentId });
}

this.#currentSegmentId = randomUUID();
Expand All @@ -107,22 +107,21 @@ export class BufferedTokenStream implements AsyncIterableIterator<TokenData> {
}

async next(): Promise<IteratorResult<TokenData>> {
return this.queue.readable
.getReader()
.read()
.then(({ value }) => {
if (value) {
return { value, done: false };
} else {
return { value: undefined, done: true };
}
});
return this.#reader.read().then(({ value }) => {
if (value) {
return { value, done: false };
} else {
return { value: undefined, done: true };
}
});
}

/** Close both the input and output of the token stream */
close() {
this.queue.writable.close();
this.closed = true;
if (!this.closed) {
this.#writer.close();
this.closed = true;
}
}

[Symbol.asyncIterator](): BufferedTokenStream {
Expand Down
96 changes: 49 additions & 47 deletions agents/src/tokenize/tokenizer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,27 +31,30 @@ export abstract class SentenceStream {
>();
protected output = new TransformStream<TokenData, TokenData>();
#closed = false;
#inputClosed = false;
#reader = this.output.readable.getReader();
#writer = this.input.writable.getWriter();

get closed(): boolean {
return this.#closed;
}

/** Push a string of text to the tokenizer */
pushText(text: string) {
// if (this.input.closed) {
// throw new Error('Input is closed');
// }
if (this.#inputClosed) {
throw new Error('Input is closed');
}
if (this.#closed) {
throw new Error('Stream is closed');
}
this.input.writable.getWriter().write(text);
this.#writer.write(text);
}

/** Flush the tokenizer, causing it to process all pending text */
flush() {
// if (this.input.closed) {
// throw new Error('Input is closed');
// }
if (this.#inputClosed) {
throw new Error('Input is closed');
}
if (this.#closed) {
throw new Error('Stream is closed');
}
Expand All @@ -60,32 +63,31 @@ export abstract class SentenceStream {

/** Mark the input as ended and forbid additional pushes */
endInput() {
// if (this.input.closed) {
// throw new Error('Input is closed');
// }
if (this.#inputClosed) {
throw new Error('Input is closed');
}
if (this.#closed) {
throw new Error('Stream is closed');
}
this.input.writable.close();
this.#writer.close();
this.#inputClosed = true;
}

async next(): Promise<IteratorResult<TokenData>> {
return this.output.readable
.getReader()
.read()
.then(({ value }) => {
if (value) {
return { value, done: false };
} else {
return { value: undefined, done: true };
}
});
return this.#reader.read().then(({ value }) => {
if (value) {
return { value, done: false };
} else {
return { value: undefined, done: true };
}
});
}

/** Close both the input and output of the tokenizer stream */
close() {
this.input.writable.close();
this.output.writable.close();
if (!this.#inputClosed) {
this.endInput();
}
this.#closed = true;
}

Expand All @@ -110,6 +112,9 @@ export abstract class WordStream {
string | typeof WordStream.FLUSH_SENTINEL
>();
protected output = new TransformStream<TokenData, TokenData>();
#writer = this.input.writable.getWriter();
#reader = this.output.readable.getReader();
#inputClosed = false;
#closed = false;

get closed(): boolean {
Expand All @@ -118,54 +123,51 @@ export abstract class WordStream {

/** Push a string of text to the tokenizer */
pushText(text: string) {
// if (this.input.closed) {
// throw new Error('Input is closed');
// }
if (this.#inputClosed) {
throw new Error('Input is closed');
}
if (this.#closed) {
throw new Error('Stream is closed');
}
this.input.writable.getWriter().write(text);
this.#writer.write(text);
}

/** Flush the tokenizer, causing it to process all pending text */
flush() {
// if (this.input.closed) {
// throw new Error('Input is closed');
// }
if (this.#inputClosed) {
throw new Error('Input is closed');
}
if (this.#closed) {
throw new Error('Stream is closed');
}
this.input.writable.getWriter().write(WordStream.FLUSH_SENTINEL);
this.#writer.write(WordStream.FLUSH_SENTINEL);
}

/** Mark the input as ended and forbid additional pushes */
endInput() {
// if (this.input.closed) {
// throw new Error('Input is closed');
// }
if (this.#inputClosed) {
throw new Error('Input is closed');
}
if (this.#closed) {
throw new Error('Stream is closed');
}
this.input.writable.close();
this.#inputClosed = true;
}

async next(): Promise<IteratorResult<TokenData>> {
return this.output.readable
.getReader()
.read()
.then(({ value }) => {
if (value) {
return { value, done: false };
} else {
return { value: undefined, done: true };
}
});
return this.#reader.read().then(({ value }) => {
if (value) {
return { value, done: false };
} else {
return { value: undefined, done: true };
}
});
}

/** Close both the input and output of the tokenizer stream */
close() {
this.input.writable.close();
this.output.writable.close();
this.endInput();
this.#writer.close();
this.#closed = true;
}

Expand Down
Loading

0 comments on commit ece515b

Please sign in to comment.