Skip to content

Commit

Permalink
toSet
Browse files Browse the repository at this point in the history
  • Loading branch information
mattpodwysocki committed Nov 30, 2015
1 parent c2be28b commit 06eb81c
Show file tree
Hide file tree
Showing 2 changed files with 140 additions and 0 deletions.
36 changes: 36 additions & 0 deletions src/modular/observable/toset.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
'use strict';

var ObservableBase = require('./observablebase');
var AbstractObserver = require('../observer/abstractobserver');
var inherits = require('util').inherits;

function ToSetObserver(o) {
this._o = o;
this._s = new global.Set();
AbstractObserver.call(this);
}

inherits(ToSetObserver, AbstractObserver);

ToSetObserver.prototype.next = function (x) { this._s.add(x); };
ToSetObserver.prototype.error = function (e) { this._o.onError(e); };
ToSetObserver.prototype.completed = function () {
this._o.onNext(this._s);
this._o.onCompleted();
};

function ToSetObservable(source) {
this.source = source;
ObservableBase.call(this);
}

inherits(ToSetObservable, ObservableBase);

ToSetObservable.prototype.subscribeCore = function (o) {
return this.source.subscribe(new ToSetObserver(o));
};

module.exports = function toSet (source) {
if (typeof global.Set === 'undefined') { throw new TypeError(); }
return new ToSetObservable(source);
};
104 changes: 104 additions & 0 deletions src/modular/test/toset.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
'use strict';

var test = require('tape');
var Observable = require('../observable');
var TestScheduler = require('../testing/testscheduler');
var reactiveAssert = require('../testing/reactiveassert');
var ReactiveTest = require('../testing/reactivetest');
var onNext = ReactiveTest.onNext,
onError = ReactiveTest.onError,
onCompleted = ReactiveTest.onCompleted,
subscribe = ReactiveTest.subscribe;

Observable.addToPrototype({
toSet: require('../observable/toset')
});

function extractValues(x) {
var arr = [];
x.forEach(function (item) {
arr.push(item);
});
return arr;
}

test('Observable#toSet completed', function (t) {
var scheduler = new TestScheduler();

var xs = scheduler.createHotObservable(
onNext(110, 1),
onNext(220, 2),
onNext(330, 3),
onNext(440, 4),
onNext(550, 5),
onCompleted(660)
);

var results = scheduler.startScheduler(function () {
return xs.toSet().map(extractValues);
});

reactiveAssert(t, results.messages, [
onNext(660, [2,3,4,5]),
onCompleted(660)
]);

reactiveAssert(t, xs.subscriptions, [
subscribe(200, 660)
]);

t.end();
});

test('Observable#toSet error', function (t) {
var error = new Error();

var scheduler = new TestScheduler();

var xs = scheduler.createHotObservable(
onNext(110, 1),
onNext(220, 2),
onNext(330, 3),
onNext(440, 4),
onNext(550, 5),
onError(660, error)
);

var results = scheduler.startScheduler(function () {
return xs.toSet().map(extractValues);
});

reactiveAssert(t, results.messages, [
onError(660, error)
]);

reactiveAssert(t, xs.subscriptions, [
subscribe(200, 660)
]);

t.end();
});

test('Observable#toSet disposed', function (t) {
var scheduler = new TestScheduler();

var xs = scheduler.createHotObservable(
onNext(110, 1),
onNext(220, 2),
onNext(330, 3),
onNext(440, 4),
onNext(550, 5)
);

var results = scheduler.startScheduler(function () {
return xs.toSet().map(extractValues);
});

reactiveAssert(t, results.messages, []);

reactiveAssert(t, xs.subscriptions, [
subscribe(200, 1000)
]);

t.end();
});

0 comments on commit 06eb81c

Please sign in to comment.