Skip to content

Commit

Permalink
sample
Browse files Browse the repository at this point in the history
  • Loading branch information
mattpodwysocki committed Nov 21, 2015
1 parent e35048d commit 1509f71
Show file tree
Hide file tree
Showing 43 changed files with 1,023 additions and 309 deletions.
4 changes: 2 additions & 2 deletions dist/rx.aggregates.js
Original file line number Diff line number Diff line change
Expand Up @@ -618,7 +618,7 @@
* @returns {Observable} An observable sequence containing a single element with the minimum element in the source sequence.
*/
observableProto.min = function (comparer) {
return this.minBy(identity, comparer).map(function (x) { return firstOnly(x); });
return this.minBy(identity, comparer).map(firstOnly);
};

/**
Expand All @@ -644,7 +644,7 @@
* @returns {Observable} An observable sequence containing a single element with the maximum element in the source sequence.
*/
observableProto.max = function (comparer) {
return this.maxBy(identity, comparer).map(function (x) { return firstOnly(x); });
return this.maxBy(identity, comparer).map(firstOnly);
};

var AverageObservable = (function (__super__) {
Expand Down
2 changes: 1 addition & 1 deletion dist/rx.aggregates.map

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion dist/rx.aggregates.min.js

Large diffs are not rendered by default.

102 changes: 70 additions & 32 deletions dist/rx.all.compat.js
Original file line number Diff line number Diff line change
Expand Up @@ -6642,7 +6642,7 @@ Rx.Observable.prototype.flatMapLatest = function(selector, resultSelector, thisA
* @returns {Observable} An observable sequence containing a single element with the minimum element in the source sequence.
*/
observableProto.min = function (comparer) {
return this.minBy(identity, comparer).map(function (x) { return firstOnly(x); });
return this.minBy(identity, comparer).map(firstOnly);
};

/**
Expand All @@ -6668,7 +6668,7 @@ Rx.Observable.prototype.flatMapLatest = function(selector, resultSelector, thisA
* @returns {Observable} An observable sequence containing a single element with the maximum element in the source sequence.
*/
observableProto.max = function (comparer) {
return this.maxBy(identity, comparer).map(function (x) { return firstOnly(x); });
return this.maxBy(identity, comparer).map(firstOnly);
};

var AverageObservable = (function (__super__) {
Expand Down Expand Up @@ -8932,6 +8932,7 @@ Observable.fromNodeCallback = function (fn, ctx, selector) {
this._o = o;
this._p = null;
this._hp = false;
__super__.call(this);
}

PairwiseObserver.prototype.next = function (x) {
Expand Down Expand Up @@ -10284,37 +10285,74 @@ Observable.fromNodeCallback = function (fn, ctx, selector) {
return new TimestampObservable(this, scheduler);
};

function sampleObservable(source, sampler) {
return new AnonymousObservable(function (o) {
var atEnd = false, value, hasValue = false;

function sampleSubscribe() {
if (hasValue) {
hasValue = false;
o.onNext(value);
}
atEnd && o.onCompleted();
}
var SampleObservable = (function(__super__) {
inherits(SampleObservable, __super__);
function SampleObservable(source, sampler) {
this.source = source;
this._sampler = sampler;
__super__.call(this);
}

var sourceSubscription = new SingleAssignmentDisposable();
sourceSubscription.setDisposable(source.subscribe(
function (newValue) {
hasValue = true;
value = newValue;
},
function (e) { o.onError(e); },
function () {
atEnd = true;
sourceSubscription.dispose();
}
));
SampleObservable.prototype.subscribeCore = function (o) {
var state = {
o: o,
atEnd: false,
value: null,
hasValue: false,
sourceSubscription: new SingleAssignmentDisposable()
};

state.sourceSubscription.setDisposable(this.source.subscribe(new SampleSourceObserver(state)));
return new BinaryDisposable(
sourceSubscription,
sampler.subscribe(sampleSubscribe, function (e) { o.onError(e); }, sampleSubscribe)
state.sourceSubscription,
this._sampler.subscribe(new SamplerObserver(state))
);
}, source);
}
};

return SampleObservable;
}(ObservableBase));

var SamplerObserver = (function(__super__) {
inherits(SamplerObserver, __super__);
function SamplerObserver(s) {
this._s = s;
__super__.call(this);
}

SamplerObserver.prototype._handleMessage = function () {
if (this._s.hasValue) {
this._s.hasValue = false;
this._s.o.onNext(this._s.value);
}
this._s.atEnd && this._s.o.onCompleted();
};

SamplerObserver.prototype.next = function () { this._handleMessage(); };
SamplerObserver.prototype.error = function (e) { this._s.onError(e); };
SamplerObserver.prototype.completed = function () { this._handleMessage(); };

return SamplerObserver;
}(AbstractObserver));

var SampleSourceObserver = (function(__super__) {
inherits(SampleSourceObserver, __super__);
function SampleSourceObserver(s) {
this._s = s;
__super__.call(this);
}

SampleSourceObserver.prototype.next = function (x) {
this._s.hasValue = true;
this._s.value = x;
};
SampleSourceObserver.prototype.error = function (e) { this._s.o.onError(e); };
SampleSourceObserver.prototype.completed = function () {
this._s.atEnd = true;
this._s.sourceSubscription.dispose();
};

return SampleSourceObserver;
}(AbstractObserver));

/**
* Samples the observable sequence at each interval.
Expand All @@ -10328,11 +10366,11 @@ Observable.fromNodeCallback = function (fn, ctx, selector) {
* @param {Scheduler} [scheduler] Scheduler to run the sampling timer on. If not specified, the timeout scheduler is used.
* @returns {Observable} Sampled observable sequence.
*/
observableProto.sample = observableProto.throttleLatest = function (intervalOrSampler, scheduler) {
observableProto.sample = function (intervalOrSampler, scheduler) {
isScheduler(scheduler) || (scheduler = defaultScheduler);
return typeof intervalOrSampler === 'number' ?
sampleObservable(this, observableinterval(intervalOrSampler, scheduler)) :
sampleObservable(this, intervalOrSampler);
new SampleObservable(this, observableinterval(intervalOrSampler, scheduler)) :
new SampleObservable(this, intervalOrSampler);
};

var TimeoutError = Rx.TimeoutError = function(message) {
Expand Down
2 changes: 1 addition & 1 deletion dist/rx.all.compat.map

Large diffs are not rendered by default.

10 changes: 5 additions & 5 deletions dist/rx.all.compat.min.js

Large diffs are not rendered by default.

102 changes: 70 additions & 32 deletions dist/rx.all.js
Original file line number Diff line number Diff line change
Expand Up @@ -6383,7 +6383,7 @@ Rx.Observable.prototype.flatMapLatest = function(selector, resultSelector, thisA
* @returns {Observable} An observable sequence containing a single element with the minimum element in the source sequence.
*/
observableProto.min = function (comparer) {
return this.minBy(identity, comparer).map(function (x) { return firstOnly(x); });
return this.minBy(identity, comparer).map(firstOnly);
};

/**
Expand All @@ -6409,7 +6409,7 @@ Rx.Observable.prototype.flatMapLatest = function(selector, resultSelector, thisA
* @returns {Observable} An observable sequence containing a single element with the maximum element in the source sequence.
*/
observableProto.max = function (comparer) {
return this.maxBy(identity, comparer).map(function (x) { return firstOnly(x); });
return this.maxBy(identity, comparer).map(firstOnly);
};

var AverageObservable = (function (__super__) {
Expand Down Expand Up @@ -8673,6 +8673,7 @@ Observable.fromNodeCallback = function (fn, ctx, selector) {
this._o = o;
this._p = null;
this._hp = false;
__super__.call(this);
}

PairwiseObserver.prototype.next = function (x) {
Expand Down Expand Up @@ -10025,37 +10026,74 @@ Observable.fromNodeCallback = function (fn, ctx, selector) {
return new TimestampObservable(this, scheduler);
};

function sampleObservable(source, sampler) {
return new AnonymousObservable(function (o) {
var atEnd = false, value, hasValue = false;

function sampleSubscribe() {
if (hasValue) {
hasValue = false;
o.onNext(value);
}
atEnd && o.onCompleted();
}
var SampleObservable = (function(__super__) {
inherits(SampleObservable, __super__);
function SampleObservable(source, sampler) {
this.source = source;
this._sampler = sampler;
__super__.call(this);
}

var sourceSubscription = new SingleAssignmentDisposable();
sourceSubscription.setDisposable(source.subscribe(
function (newValue) {
hasValue = true;
value = newValue;
},
function (e) { o.onError(e); },
function () {
atEnd = true;
sourceSubscription.dispose();
}
));
SampleObservable.prototype.subscribeCore = function (o) {
var state = {
o: o,
atEnd: false,
value: null,
hasValue: false,
sourceSubscription: new SingleAssignmentDisposable()
};

state.sourceSubscription.setDisposable(this.source.subscribe(new SampleSourceObserver(state)));
return new BinaryDisposable(
sourceSubscription,
sampler.subscribe(sampleSubscribe, function (e) { o.onError(e); }, sampleSubscribe)
state.sourceSubscription,
this._sampler.subscribe(new SamplerObserver(state))
);
}, source);
}
};

return SampleObservable;
}(ObservableBase));

var SamplerObserver = (function(__super__) {
inherits(SamplerObserver, __super__);
function SamplerObserver(s) {
this._s = s;
__super__.call(this);
}

SamplerObserver.prototype._handleMessage = function () {
if (this._s.hasValue) {
this._s.hasValue = false;
this._s.o.onNext(this._s.value);
}
this._s.atEnd && this._s.o.onCompleted();
};

SamplerObserver.prototype.next = function () { this._handleMessage(); };
SamplerObserver.prototype.error = function (e) { this._s.onError(e); };
SamplerObserver.prototype.completed = function () { this._handleMessage(); };

return SamplerObserver;
}(AbstractObserver));

var SampleSourceObserver = (function(__super__) {
inherits(SampleSourceObserver, __super__);
function SampleSourceObserver(s) {
this._s = s;
__super__.call(this);
}

SampleSourceObserver.prototype.next = function (x) {
this._s.hasValue = true;
this._s.value = x;
};
SampleSourceObserver.prototype.error = function (e) { this._s.o.onError(e); };
SampleSourceObserver.prototype.completed = function () {
this._s.atEnd = true;
this._s.sourceSubscription.dispose();
};

return SampleSourceObserver;
}(AbstractObserver));

/**
* Samples the observable sequence at each interval.
Expand All @@ -10069,11 +10107,11 @@ Observable.fromNodeCallback = function (fn, ctx, selector) {
* @param {Scheduler} [scheduler] Scheduler to run the sampling timer on. If not specified, the timeout scheduler is used.
* @returns {Observable} Sampled observable sequence.
*/
observableProto.sample = observableProto.throttleLatest = function (intervalOrSampler, scheduler) {
observableProto.sample = function (intervalOrSampler, scheduler) {
isScheduler(scheduler) || (scheduler = defaultScheduler);
return typeof intervalOrSampler === 'number' ?
sampleObservable(this, observableinterval(intervalOrSampler, scheduler)) :
sampleObservable(this, intervalOrSampler);
new SampleObservable(this, observableinterval(intervalOrSampler, scheduler)) :
new SampleObservable(this, intervalOrSampler);
};

var TimeoutError = Rx.TimeoutError = function(message) {
Expand Down
2 changes: 1 addition & 1 deletion dist/rx.all.map

Large diffs are not rendered by default.

10 changes: 5 additions & 5 deletions dist/rx.all.min.js

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions dist/rx.coincidence.js
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,7 @@
this._o = o;
this._p = null;
this._hp = false;
__super__.call(this);
}

PairwiseObserver.prototype.next = function (x) {
Expand Down
Loading

0 comments on commit 1509f71

Please sign in to comment.