Skip to content

Commit

Permalink
🤖 Merge PR DefinitelyTyped#66208 [node:stream] add iterator, all the …
Browse files Browse the repository at this point in the history
…Array-like Readable methods by @Touffy

* [node] add all the new array-like Readable methods

* [jsforce] ignore overriding built-in Readable map/filter

* [node] improve typing for `reduce`, test `reduce` and `find`

* [readable-stream] copy all the array-like Readable methods

* [stream-to-array] deprecate Readable.toArray shim and remove its test
  • Loading branch information
Touffy authored Aug 24, 2023
1 parent 170b339 commit 5af7710
Show file tree
Hide file tree
Showing 9 changed files with 390 additions and 11 deletions.
5 changes: 5 additions & 0 deletions types/jsforce/query.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,13 @@ export interface QueryResult<T> {
records: T[];
}

// Unfortunately because TypeScript wants you to believe JS has classical inheritance,
// you can't say that Query "extends Readable" because `filter` and `map` disagree with Readable's own.
// That can only be fixed by a breaking change in Query's filter and map methods. Your call.
export class Query<T> extends Readable implements Promise<T> {
end(): Query<T>;

// @ts-ignore conflicts with built-in Readable.filter
filter(filter: Object): Query<T>;

include(include: string): Query<T>;
Expand Down Expand Up @@ -53,6 +57,7 @@ export class Query<T> extends Readable implements Promise<T> {

explain(callback?: (err: Error, info: ExplainInfo) => void): Promise<ExplainInfo>;

// @ts-ignore conflicts with built-in Readable.map
map(callback: (currentValue: Object) => void): Promise<any>;

scanAll(value: boolean): Query<T>;
Expand Down
138 changes: 138 additions & 0 deletions types/node/stream.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ declare module 'stream' {
import Stream = internal.Stream;
import Readable = internal.Readable;
import ReadableOptions = internal.ReadableOptions;
interface ArrayOptions {
/** the maximum concurrent invocations of `fn` to call on the stream at once. **Default: 1**. */
concurrency?: number;
/** allows destroying the stream if the signal is aborted. */
signal?: AbortSignal;
}
class ReadableBase extends Stream implements NodeJS.ReadableStream {
/**
* A utility method for creating Readable Streams out of iterators.
Expand Down Expand Up @@ -396,6 +402,138 @@ declare module 'stream' {
*/
wrap(stream: NodeJS.ReadableStream): this;
push(chunk: any, encoding?: BufferEncoding): boolean;
/**
* The iterator created by this method gives users the option to cancel the destruction
* of the stream if the `for await...of` loop is exited by `return`, `break`, or `throw`,
* or if the iterator should destroy the stream if the stream emitted an error during iteration.
* @since v16.3.0
* @param options.destroyOnReturn When set to `false`, calling `return` on the async iterator,
* or exiting a `for await...of` iteration using a `break`, `return`, or `throw` will not destroy the stream.
* **Default: `true`**.
*/
iterator(options?: {destroyOnReturn?: boolean}): AsyncIterableIterator<any>;
/**
* This method allows mapping over the stream. The *fn* function will be called for every chunk in the stream.
* If the *fn* function returns a promise - that promise will be `await`ed before being passed to the result stream.
* @since v17.4.0, v16.14.0
* @param fn a function to map over every chunk in the stream. Async or not.
* @returns a stream mapped with the function *fn*.
*/
map(fn: (data: any, options?: Pick<ArrayOptions, "signal">) => any, options?: ArrayOptions): Readable;
/**
* This method allows filtering the stream. For each chunk in the stream the *fn* function will be called
* and if it returns a truthy value, the chunk will be passed to the result stream.
* If the *fn* function returns a promise - that promise will be `await`ed.
* @since v17.4.0, v16.14.0
* @param fn a function to filter chunks from the stream. Async or not.
* @returns a stream filtered with the predicate *fn*.
*/
filter(fn: (data: any, options?: Pick<ArrayOptions, "signal">) => boolean | Promise<boolean>, options?: ArrayOptions): Readable;
/**
* This method allows iterating a stream. For each chunk in the stream the *fn* function will be called.
* If the *fn* function returns a promise - that promise will be `await`ed.
*
* This method is different from `for await...of` loops in that it can optionally process chunks concurrently.
* In addition, a `forEach` iteration can only be stopped by having passed a `signal` option
* and aborting the related AbortController while `for await...of` can be stopped with `break` or `return`.
* In either case the stream will be destroyed.
*
* This method is different from listening to the `'data'` event in that it uses the `readable` event
* in the underlying machinary and can limit the number of concurrent *fn* calls.
* @since v17.5.0
* @param fn a function to call on each chunk of the stream. Async or not.
* @returns a promise for when the stream has finished.
*/
forEach(fn: (data: any, options?: Pick<ArrayOptions, "signal">) => void | Promise<void>, options?: ArrayOptions): Promise<void>;
/**
* This method allows easily obtaining the contents of a stream.
*
* As this method reads the entire stream into memory, it negates the benefits of streams. It's intended
* for interoperability and convenience, not as the primary way to consume streams.
* @since v17.5.0
* @returns a promise containing an array with the contents of the stream.
*/
toArray(options?: Pick<ArrayOptions, "signal">): Promise<any[]>;
/**
* This method is similar to `Array.prototype.some` and calls *fn* on each chunk in the stream
* until the awaited return value is `true` (or any truthy value). Once an *fn* call on a chunk
* `await`ed return value is truthy, the stream is destroyed and the promise is fulfilled with `true`.
* If none of the *fn* calls on the chunks return a truthy value, the promise is fulfilled with `false`.
* @since v17.5.0
* @param fn a function to call on each chunk of the stream. Async or not.
* @returns a promise evaluating to `true` if *fn* returned a truthy value for at least one of the chunks.
*/
some(fn: (data: any, options?: Pick<ArrayOptions, "signal">) => boolean | Promise<boolean>, options?: ArrayOptions): Promise<boolean>;
/**
* This method is similar to `Array.prototype.find` and calls *fn* on each chunk in the stream
* to find a chunk with a truthy value for *fn*. Once an *fn* call's awaited return value is truthy,
* the stream is destroyed and the promise is fulfilled with value for which *fn* returned a truthy value.
* If all of the *fn* calls on the chunks return a falsy value, the promise is fulfilled with `undefined`.
* @since v17.5.0
* @param fn a function to call on each chunk of the stream. Async or not.
* @returns a promise evaluating to the first chunk for which *fn* evaluated with a truthy value,
* or `undefined` if no element was found.
*/
find<T>(fn: (data: any, options?: Pick<ArrayOptions, "signal">) => data is T, options?: ArrayOptions): Promise<T | undefined>;
find(fn: (data: any, options?: Pick<ArrayOptions, "signal">) => boolean | Promise<boolean>, options?: ArrayOptions): Promise<any>;
/**
* This method is similar to `Array.prototype.every` and calls *fn* on each chunk in the stream
* to check if all awaited return values are truthy value for *fn*. Once an *fn* call on a chunk
* `await`ed return value is falsy, the stream is destroyed and the promise is fulfilled with `false`.
* If all of the *fn* calls on the chunks return a truthy value, the promise is fulfilled with `true`.
* @since v17.5.0
* @param fn a function to call on each chunk of the stream. Async or not.
* @returns a promise evaluating to `true` if *fn* returned a truthy value for every one of the chunks.
*/
every(fn: (data: any, options?: Pick<ArrayOptions, "signal">) => boolean | Promise<boolean>, options?: ArrayOptions): Promise<boolean>;
/**
* This method returns a new stream by applying the given callback to each chunk of the stream
* and then flattening the result.
*
* It is possible to return a stream or another iterable or async iterable from *fn* and the result streams
* will be merged (flattened) into the returned stream.
* @since v17.5.0
* @param fn a function to map over every chunk in the stream. May be async. May be a stream or generator.
* @returns a stream flat-mapped with the function *fn*.
*/
flatMap(fn: (data: any, options?: Pick<ArrayOptions, "signal">) => any, options?: ArrayOptions): Readable;
/**
* This method returns a new stream with the first *limit* chunks dropped from the start.
* @since v17.5.0
* @param limit the number of chunks to drop from the readable.
* @returns a stream with *limit* chunks dropped from the start.
*/
drop(limit: number, options?: Pick<ArrayOptions, "signal">): Readable;
/**
* This method returns a new stream with the first *limit* chunks.
* @since v17.5.0
* @param limit the number of chunks to take from the readable.
* @returns a stream with *limit* chunks taken.
*/
take(limit: number, options?: Pick<ArrayOptions, "signal">): Readable;
/**
* This method returns a new stream with chunks of the underlying stream paired with a counter
* in the form `[index, chunk]`. The first index value is `0` and it increases by 1 for each chunk produced.
* @since v17.5.0
* @returns a stream of indexed pairs.
*/
asIndexedPairs(options?: Pick<ArrayOptions, "signal">): Readable;
/**
* This method calls *fn* on each chunk of the stream in order, passing it the result from the calculation
* on the previous element. It returns a promise for the final value of the reduction.
*
* If no *initial* value is supplied the first chunk of the stream is used as the initial value.
* If the stream is empty, the promise is rejected with a `TypeError` with the `ERR_INVALID_ARGS` code property.
*
* The reducer function iterates the stream element-by-element which means that there is no *concurrency* parameter
* or parallelism. To perform a reduce concurrently, you can extract the async function to `readable.map` method.
* @since v17.5.0
* @param fn a reducer function to call over every chunk in the stream. Async or not.
* @param initial the initial value to use in the reduction.
* @returns a promise for the final value of the reduction.
*/
reduce<T = any>(fn: (previous: any, data: any, options?: Pick<ArrayOptions, "signal">) => T, initial?: undefined, options?: Pick<ArrayOptions, "signal">): Promise<T>;
reduce<T = any>(fn: (previous: T, data: any, options?: Pick<ArrayOptions, "signal">) => T, initial: T, options?: Pick<ArrayOptions, "signal">): Promise<T>;
_destroy(error: Error | null, callback: (error?: Error | null) => void): void;
/**
* Destroy the stream. Optionally emit an `'error'` event, and emit a `'close'`event (unless `emitClose` is set to `false`). After this call, the readable
Expand Down
20 changes: 20 additions & 0 deletions types/node/test/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -633,6 +633,26 @@ addAbortSignal(new AbortSignal(), new Readable());
Duplex.from(writable);
}

function testReadableReduce() {
const readable = Readable.from([]);
// $ExpectType Promise<number>
readable.reduce((prev, data) => prev * data);
// $ExpectType Promise<number>
readable.reduce((prev, data) => prev * data, 1);
// @ts-expect-error when specifying an initial value, its type must be consistent with the reducer's return type
readable.reduce((prev, data) => prev * data, "1");
// @ts-expect-error when specifying an initial value, its type must be consistent with the reducer's first argument
readable.reduce((prev: string, data) => +prev * data, 1);
}

function testReadableFind() {
const readable = Readable.from([]);
// $ExpectType Promise<any>
readable.find(Boolean);
// $ExpectType Promise<any[] | undefined>
readable.find(Array.isArray);
}

async function testReadableStream() {
const SECOND = 1000;

Expand Down
33 changes: 33 additions & 0 deletions types/node/v16/stream.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ declare module 'stream' {
encoding?: BufferEncoding | undefined;
read?(this: Readable, size: number): void;
}
interface ArrayOptions {
/** the maximum concurrent invocations of `fn` to call on the stream at once. **Default: 1**. */
concurrency?: number;
/** allows destroying the stream if the signal is aborted. */
signal?: AbortSignal;
}
/**
* @since v0.9.4
*/
Expand Down Expand Up @@ -397,6 +403,33 @@ declare module 'stream' {
*/
wrap(stream: NodeJS.ReadableStream): this;
push(chunk: any, encoding?: BufferEncoding): boolean;
/**
* The iterator created by this method gives users the option to cancel the destruction
* of the stream if the `for await...of` loop is exited by `return`, `break`, or `throw`,
* or if the iterator should destroy the stream if the stream emitted an error during iteration.
* @since v16.3.0
* @param options.destroyOnReturn When set to `false`, calling `return` on the async iterator,
* or exiting a `for await...of` iteration using a `break`, `return`, or `throw` will not destroy the stream.
* **Default: `true`**.
*/
iterator(options?: {destroyOnReturn?: boolean}): AsyncIterableIterator<any>;
/**
* This method allows mapping over the stream. The *fn* function will be called for every chunk in the stream.
* If the *fn* function returns a promise - that promise will be `await`ed before being passed to the result stream.
* @since v17.4.0, v16.14.0
* @param fn a function to map over every chunk in the stream. Async or not.
* @returns a stream mapped with the function *fn*.
*/
map(fn: (data: any, options?: Pick<ArrayOptions, "signal">) => any, options?: ArrayOptions): Readable;
/**
* This method allows filtering the stream. For each chunk in the stream the *fn* function will be called
* and if it returns a truthy value, the chunk will be passed to the result stream.
* If the *fn* function returns a promise - that promise will be `await`ed.
* @since v17.4.0, v16.14.0
* @param fn a function to filter chunks from the stream. Async or not.
* @returns a stream filtered with the predicate *fn*.
*/
filter(fn: (data: any, options?: Pick<ArrayOptions, "signal">) => boolean | Promise<boolean>, options?: ArrayOptions): Readable;
_destroy(error: Error | null, callback: (error?: Error | null) => void): void;
/**
* Destroy the stream. Optionally emit an `'error'` event, and emit a `'close'`event (unless `emitClose` is set to `false`). After this call, the readable
Expand Down
Loading

0 comments on commit 5af7710

Please sign in to comment.