Skip to content

Commit

Permalink
Fixes period.within to not include end time
Browse files Browse the repository at this point in the history
This is because that can cause repeats
Fixes repeated items in the timeseries alignment when points already fall on boundaries (regression from non-typescript version)
Also removes a bunch of unused imports
  • Loading branch information
Peter Murphy committed Sep 13, 2018
1 parent 718b698 commit 9722832
Show file tree
Hide file tree
Showing 31 changed files with 84 additions and 200 deletions.
7 changes: 5 additions & 2 deletions packages/pond/src/align.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,11 @@ import * as Immutable from "immutable";
import * as _ from "lodash";

import { Event } from "./event";
import { Index } from "./index";
import { Key } from "./key";
import { Period } from "./period";
import { Processor } from "./processor";
import { time, Time } from "./time";
import { timerange } from "./timerange";
import util from "./util";

import { AlignmentMethod, AlignmentOptions } from "./types";

Expand Down Expand Up @@ -92,6 +90,7 @@ export class Align<T extends Key> extends Processor<T, T> {
}

const boundaries: Immutable.List<Time> = this.getBoundaries(event);

boundaries.forEach(boundaryTime => {
let outputEvent;
if (this._limit && boundaries.size > this._limit) {
Expand Down Expand Up @@ -129,6 +128,10 @@ export class Align<T extends Key> extends Processor<T, T> {
* they are in the same window, return an empty list.
*/
private getBoundaries(event: Event<T>): Immutable.List<Time> {
if (+this._previous.timestamp() === +event.timestamp()) {
return Immutable.List<Time>([]);
}

const range = timerange(this._previous.timestamp(), event.timestamp());
return this._period.within(range);
}
Expand Down
16 changes: 0 additions & 16 deletions packages/pond/src/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,8 @@
* LICENSE file in the root directory of this source tree.
*/

import * as Immutable from "immutable";
import * as _ from "lodash";

import { Event } from "./event";
import { grouped, GroupedCollection, GroupingFunction } from "./groupedcollection";
import { Index } from "./index";
import { Key } from "./key";
import { Period } from "./period";
import { Time } from "./time";
import { timerange, TimeRange } from "./timerange";
import { windowed, WindowedCollection } from "./windowedcollection";

import { Align } from "./align";
import { Collapse } from "./collapse";
import { Rate } from "./rate";

import { AlignmentMethod, AlignmentOptions, CollapseOptions, RateOptions } from "./types";

/**
* Abstract base class used by classes which maybe passed within the streaming code.
*/
Expand Down
1 change: 0 additions & 1 deletion packages/pond/src/collapse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import * as _ from "lodash";
import { Event } from "./event";
import { Key } from "./key";
import { Processor } from "./processor";
import util from "./util";

import { CollapseOptions } from "./types";

Expand Down
19 changes: 1 addition & 18 deletions packages/pond/src/collection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,31 +11,14 @@
import * as Immutable from "immutable";
import * as _ from "lodash";

import { Align } from "./align";
import { Base } from "./base";
import { Collapse } from "./collapse";
import { Event } from "./event";
import { Fill } from "./fill";
import { grouped, GroupedCollection, GroupingFunction } from "./groupedcollection";
import { Index } from "./index";
import { Key } from "./key";
import { Period } from "./period";
import { Rate } from "./rate";
import { Select } from "./select";
import { Time } from "./time";
import { timerange, TimeRange } from "./timerange";

import {
AlignmentMethod,
AlignmentOptions,
CollapseOptions,
FillOptions,
RateOptions,
SelectOptions,
WindowingOptions
} from "./types";

import util from "./util";
import { CollapseOptions, SelectOptions } from "./types";

import { DedupFunction, ReducerFunction, ValueMap } from "./types";

Expand Down
12 changes: 1 addition & 11 deletions packages/pond/src/groupedcollection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,27 +11,17 @@
import * as Immutable from "immutable";
import * as _ from "lodash";

import { Align } from "./align";
import { Base } from "./base";
import { Event } from "./event";
import { Index } from "./index";
import { Key } from "./key";
import { Period } from "./period";
import { Rate } from "./rate";
import { SortedCollection } from "./sortedcollection";
import { Time } from "./time";
import { timerange, TimeRange } from "./timerange";
import { TimeRange } from "./timerange";
import { WindowedCollection } from "./windowedcollection";

import {
Aggregation,
AggregationMapFunction,
AggregationTuple,
AlignmentOptions,
CollapseOptions,
DedupFunction,
RateOptions,
ReducerFunction,
WindowingOptions
} from "./types";

Expand Down
4 changes: 1 addition & 3 deletions packages/pond/src/node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@ import * as Immutable from "immutable";
import * as _ from "lodash";

import { Base } from "./base";
import { Collection } from "./collection";
import { Event, event } from "./event";
import { Event } from "./event";
import { Index, index } from "./index";
import { Key } from "./key";
import { TimeRange } from "./timerange";
Expand All @@ -15,7 +14,6 @@ import { Rate } from "./rate";
import { Reducer } from "./reduce";
import { Select } from "./select";

import { GroupedCollection } from "./groupedcollection";
import { WindowedCollection } from "./windowedcollection";

import {
Expand Down
7 changes: 3 additions & 4 deletions packages/pond/src/period.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,8 @@

import * as Immutable from "immutable";
import * as _ from "lodash";
import * as moment from "moment";

import { Duration, duration } from "./duration";
import { Index } from "./index";
import { Time, time } from "./time";
import { TimeRange } from "./timerange";

Expand Down Expand Up @@ -110,7 +108,8 @@ export class Period {

/**
* Returns an `Immutable.List` of `Time`s within the given `TimeRange`
* that align with this `Period`.
* that align with this `Period`. Not this will potentially include
* the start time of the timerange but never the end time of the timerange.
*
* Example:
* ```
Expand All @@ -132,7 +131,7 @@ export class Period {
const t2 = time(timerange.end());

let scan = this.isAligned(t1) ? t1 : this.next(t1);
while (+scan <= +t2) {
while (+scan < +t2) {
result = result.push(scan);
scan = this.next(scan);
}
Expand Down
3 changes: 0 additions & 3 deletions packages/pond/src/rate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,8 @@ import * as Immutable from "immutable";
import * as _ from "lodash";

import { Event } from "./event";
import { Index } from "./index";
import { Key } from "./key";
import { Period } from "./period";
import { Processor } from "./processor";
import { time, Time } from "./time";
import { TimeRange, timerange } from "./timerange";
import util from "./util";

Expand Down
7 changes: 1 addition & 6 deletions packages/pond/src/reduce.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,10 @@ import * as Immutable from "immutable";
import * as _ from "lodash";

import { Event } from "./event";
import { Index } from "./index";
import { Key } from "./key";
import { Period } from "./period";
import { Processor } from "./processor";
import { time, Time } from "./time";
import { TimeRange, timerange } from "./timerange";
import util from "./util";

import { ListReducer, ReduceOptions, ReducerFunction } from "./types";
import { ListReducer, ReduceOptions } from "./types";

/**
* A `Processor` to take a rolling set of incoming `Event`s, a
Expand Down
1 change: 0 additions & 1 deletion packages/pond/src/select.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import * as _ from "lodash";
import { Event } from "./event";
import { Key } from "./key";
import { Processor } from "./processor";
import { TimeRange } from "./timerange";

import { SelectOptions } from "./types";

Expand Down
14 changes: 3 additions & 11 deletions packages/pond/src/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,10 @@ import * as Immutable from "immutable";
import * as _ from "lodash";

import { Base } from "./base";
import { Collection } from "./collection";
import { Event, event } from "./event";
import { Index, index } from "./index";
import { Index } from "./index";
import { Key } from "./key";
import { Period } from "./period";
import { Processor } from "./processor";
import { Time, time } from "./time";
import { TimeRange } from "./timerange";

import { Trigger } from "./types";

import { GroupedCollection } from "./groupedcollection";
import { WindowedCollection } from "./windowedcollection";
import { Time } from "./time";

import {
AggregationNode,
Expand Down Expand Up @@ -241,6 +232,7 @@ export class EventStream<IN extends Key, S extends Key> extends StreamInterface<
coalesce(options: CoalesceOptions) {
const { fields } = options;
function keyIn(...keys) {
// @ts-ignore
const keySet = Immutable.Set(...keys);
return (v, k) => {
return keySet.has(k);
Expand Down
1 change: 0 additions & 1 deletion packages/pond/src/timerange.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import * as moment from "moment";
import Moment = moment.Moment;

import { Key } from "./key";
import { Period } from "./period";
import { Time } from "./time";

/**
Expand Down
28 changes: 17 additions & 11 deletions packages/pond/src/timeseries.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1102,11 +1102,15 @@ export class TimeSeries<T extends Key> {
*
* Example:
* ```
* const aligned = ts.align({
* fieldSpec: "value",
* period: "1m",
* method: "linear"
* });
* const alignOptions: AlignmentOptions = {
* fieldSpec: ["value"],
* period: period(duration("30s")),
* method: AlignmentMethod.Linear,
* limit: 3
* };
*
* const aligned = series.align(alignOptions);
*
* ```
*/
align(options: AlignmentOptions) {
Expand Down Expand Up @@ -1364,10 +1368,10 @@ export class TimeSeries<T extends Key> {
* });
* ```
*/
static timeSeriesListReduce(options: TimeSeriesOptions) {
static timeSeriesListReduce<T extends Key>(options: TimeSeriesOptions): TimeSeries<T> {
const { seriesList, fieldSpec, reducer, ...data } = options;
const combiner = Event.combiner(fieldSpec, reducer);
return TimeSeries.timeSeriesListEventReduce({
return TimeSeries.timeSeriesListEventReduce<T>({
seriesList,
fieldSpec,
reducer: combiner,
Expand Down Expand Up @@ -1397,10 +1401,10 @@ export class TimeSeries<T extends Key> {
* });
* ```
*/
static timeSeriesListMerge(options: TimeSeriesOptions) {
static timeSeriesListMerge<T extends Key>(options: TimeSeriesOptions): TimeSeries<T> {
const { seriesList, fieldSpec, reducer, deep = false, ...data } = options;
const merger = Event.merger(deep);
return TimeSeries.timeSeriesListEventReduce({
return TimeSeries.timeSeriesListEventReduce<T>({
seriesList,
fieldSpec,
reducer: merger,
Expand All @@ -1411,7 +1415,9 @@ export class TimeSeries<T extends Key> {
/**
* @private
*/
static timeSeriesListEventReduce(options: TimeSeriesListReducerOptions) {
static timeSeriesListEventReduce<T extends Key>(
options: TimeSeriesListReducerOptions
): TimeSeries<T> {
const { seriesList, fieldSpec, reducer, ...data } = options;
if (!seriesList || !_.isArray(seriesList)) {
throw new Error("A list of TimeSeries must be supplied to reduce");
Expand All @@ -1437,7 +1443,7 @@ export class TimeSeries<T extends Key> {
// on the start times of the series, along with it the series
// have missing data, so I think we don't have a choice here.
const collection = new SortedCollection(events);
const timeseries = new TimeSeries({ ...data, collection });
const timeseries = new TimeSeries<T>({ ...data, collection });

return timeseries;
}
Expand Down
5 changes: 2 additions & 3 deletions packages/pond/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import { Event } from "./event";
import { Key } from "./key";
import { Period } from "./period";
import { TimeSeries } from "./timeseries";
import { Window, WindowBase } from "./window";
import { WindowBase } from "./window";

//
// General types
Expand Down Expand Up @@ -215,8 +215,7 @@ export interface SelectOptions {
*/
export interface RenameColumnOptions {
renameMap: {
key: string;
value: string;
[key: string]: string;
};
}

Expand Down
3 changes: 1 addition & 2 deletions packages/pond/src/window.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,10 @@
*/

import * as Immutable from "immutable";
import { OrderedSet } from "immutable";
import * as _ from "lodash";
import * as moment from "moment-timezone";

import { Duration, duration } from "./duration";
import { Duration } from "./duration";
import { Index, index } from "./index";
import { Period } from "./period";
import { Time, time } from "./time";
Expand Down
12 changes: 1 addition & 11 deletions packages/pond/src/windowedcollection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,29 +11,19 @@
import * as Immutable from "immutable";
import * as _ from "lodash";

import { Align } from "./align";
import { Base } from "./base";
import { Event } from "./event";
import { GroupedCollection, GroupingFunction } from "./groupedcollection";
import { Index, index } from "./index";
import { Key } from "./key";
import { Period } from "./period";
import { Processor } from "./processor";
import { Rate } from "./rate";
import { SortedCollection } from "./sortedcollection";
import { Time, time } from "./time";
import { timerange, TimeRange } from "./timerange";

import { time } from "./time";
import util from "./util";

import {
AggregationSpec,
AggregationTuple,
AlignmentOptions,
DedupFunction,
KeyedCollection,
RateOptions,
ReducerFunction,
Trigger,
WindowingOptions
} from "./types";
Expand Down
Loading

0 comments on commit 9722832

Please sign in to comment.