Skip to content

Commit

Permalink
feat(transducers): update/fix/extend streamShuffle()
Browse files Browse the repository at this point in the history
- add StreamShuffleOpts, add IRandom opt
- fix args for internal shuffle() calls
- optimize number of shuffle calls
- optimize final buffer drain
  • Loading branch information
postspectacular committed Dec 14, 2021
1 parent e095190 commit dc26203
Showing 1 changed file with 55 additions and 11 deletions.
66 changes: 55 additions & 11 deletions packages/transducers/src/stream-shuffle.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,44 @@
import { shuffle } from "@thi.ng/arrays/shuffle";
import { isPlainObject } from "@thi.ng/checks";
import type { IRandom } from "@thi.ng/random";
import { SYSTEM } from "@thi.ng/random/system";
import type { Reducer, Transducer } from "./api.js";
import { __iter, iterator } from "./iterator.js";
import { isReduced } from "./reduced.js";

export interface StreamShuffleOpts {
/**
* Sliding window size
*/
n: number;
/**
* Max. shuffle ops per new input (in [0..n] range)
*
* @defaultValue same as `n`
*/
max?: number;
/**
* PRNG instance to use for shuffling
*
* @defaultValue SYSTEM
*/
rnd?: IRandom;
}

/**
* Transducer. Creates internal sliding window of `n` values and
* performs `maxSwaps` random shuffle operations for each new value and
* yields values in shuffled order. By default `maxSwaps` is the same as
* the chosen chunk size.
* Transducer. Creates internal sliding window of `n` values and performs
* `maxSwaps` random shuffle operations for each new value and yields values in
* shuffled order. By default `maxSwaps` is the same as the chosen chunk size.
* If given a {@link StreamShuffleOpts} options object, further configurations
* are possible.
*
* @example
* ```ts
* [...streamShuffle(5, range(10))]
* // [ 3, 2, 5, 0, 8, 7, 1, 6, 4, 9 ]
*
* [...streamShuffle({ n: 5, rnd: new XsAdd(12345) }, range(10))]
* [ 0, 4, 3, 7, 8, 1, 5, 2, 6, 9 ]
* ```
*
* @param n - sliding window size
Expand All @@ -22,8 +48,9 @@ export function streamShuffle<T>(
n: number,
maxSwaps?: number
): Transducer<T, T>;
export function streamShuffle<T>(opts: StreamShuffleOpts): Transducer<T, T>;
export function streamShuffle<T>(
n: number,
opts: number | StreamShuffleOpts,
src: Iterable<T>
): IterableIterator<T>;
export function streamShuffle<T>(
Expand All @@ -35,23 +62,40 @@ export function streamShuffle<T>(...args: any[]): any {
return (
__iter(streamShuffle, args, iterator) ||
(([init, complete, reduce]: Reducer<any, T>) => {
const n: number = args[0];
const maxSwaps: number = args[1] || n;
let n: number;
let maxSwaps: number;
let rnd: IRandom = SYSTEM;
const opts = <StreamShuffleOpts>args[0];
if (isPlainObject(opts)) {
n = opts.n;
maxSwaps = opts.max || n;
opts.rnd && (rnd = opts.rnd);
} else {
n = args[0];
maxSwaps = args[1] || n;
}
const buf: T[] = [];
return <Reducer<any, T>>[
init,
(acc) => {
while (buf.length && !isReduced(acc)) {
shuffle(buf, maxSwaps);
acc = reduce(acc, buf.shift()!);
if (buf.length) {
shuffle(buf, Math.min(maxSwaps, buf.length), rnd);
for (
let i = 0, n = buf.length;
i < n && !isReduced(acc);
i++
) {
acc = reduce(acc, buf[i]);
}
}
buf.length = 0;
acc = complete(acc);
return acc;
},
(acc, x: T) => {
buf.push(x);
shuffle(buf, maxSwaps);
if (buf.length === n) {
shuffle(buf, Math.min(maxSwaps, n), rnd);
acc = reduce(acc, buf.shift()!);
}
return acc;
Expand Down

0 comments on commit dc26203

Please sign in to comment.