Skip to content

Commit 998e06d

Browse files
author
Kirk Shoop
committed
Merge pull request ReactiveX#209 from kirkshoop/operatorfixes
make operator | work better
2 parents 18605f6 + 33ccae2 commit 998e06d

11 files changed

+229
-90
lines changed

README.md

+31-29
Original file line numberDiff line numberDiff line change
@@ -3,24 +3,23 @@ The Reactive Extensions for Native (__RxCpp__) is a library for composing asynch
33
Windows: [![Windows Status](http://img.shields.io/appveyor/ci/kirkshoop/RxCpp-446.svg?style=flat-square)](https://ci.appveyor.com/project/kirkshoop/rxcpp-446)
44
Linux & OSX: [![Linux & Osx Status](http://img.shields.io/travis/Reactive-Extensions/RxCpp.svg?style=flat-square)](https://travis-ci.org/Reactive-Extensions/RxCpp)
55

6+
[![GitHub license](https://img.shields.io/github/license/Reactive-Extensions/RxCpp.svg?style=flat-square)](https://github.com/Reactive-Extensions/RxCpp)
7+
8+
[![Join in on gitter.im](https://img.shields.io/gitter/room/Reactive-Extensions/RxCpp.svg?style=flat-square)](https://gitter.im/Reactive-Extensions/RxCpp?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge)
9+
[![doxygen documentation](https://img.shields.io/badge/documentation-latest-brightgreen.svg?style=flat-square)](http://reactive-extensions.github.io/RxCpp)
10+
611
Github: [![GitHub release](https://img.shields.io/github/release/Reactive-Extensions/RxCpp.svg?style=flat-square)](https://github.com/Reactive-Extensions/RxCpp/releases)
712
[![GitHub commits](https://img.shields.io/github/commits-since/Reactive-Extensions/RxCpp/v2.2.0.svg?style=flat-square)](https://github.com/Reactive-Extensions/RxCpp)
813

914
NuGet: [![NuGet version](http://img.shields.io/nuget/v/RxCpp.svg?style=flat-square)](http://www.nuget.org/packages/RxCpp/)
1015
[![NuGet downloads](http://img.shields.io/nuget/dt/RxCpp.svg?style=flat-square)](http://www.nuget.org/packages/RxCpp/)
1116

12-
[![GitHub license](https://img.shields.io/github/license/Reactive-Extensions/RxCpp.svg?style=flat-square)](https://github.com/Reactive-Extensions/RxCpp)
13-
14-
[![Join in on gitter.im](https://img.shields.io/gitter/room/Reactive-Extensions/RxCpp.svg?style=flat-square)](https://gitter.im/Reactive-Extensions/RxCpp?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge)
15-
[![doxygen documentation](https://img.shields.io/badge/documentation-latest-brightgreen.svg?style=flat-square)](http://reactive-extensions.github.io/RxCpp)
16-
1717
#Example
1818
Add ```Rx/v2/src``` to the include paths
1919

2020
[![lines from bytes](https://img.shields.io/badge/blog%20post-lines%20from%20bytes-blue.svg?style=flat-square)](http://kirkshoop.github.io/async/rxcpp/c++/2015/07/07/rxcpp_-_parsing_bytes_to_lines_of_text.html)
2121

2222
```cpp
23-
2423
#include "rxcpp/rx.hpp"
2524
using namespace rxcpp;
2625
using namespace rxcpp::sources;
@@ -31,41 +30,42 @@ using namespace rxcpp::util;
3130
#include <random>
3231
using namespace std;
3332

34-
//using rxcpp::operators::sum;
35-
3633
int main()
3734
{
3835
random_device rd; // non-deterministic generator
3936
mt19937 gen(rd());
4037
uniform_int_distribution<> dist(4, 18);
4138

42-
// produce byte stream that contains lines of text
39+
// for testing purposes, produce byte stream that from lines of text
4340
auto bytes = range(1, 10) |
44-
flat_map([&](int i){
45-
return from(
46-
from((uint8_t)('A' + i)) |
47-
repeat(dist(gen)),
48-
from((uint8_t)'\r')) |
49-
concat();
41+
flat_map([&](int i){
42+
auto body = from((uint8_t)('A' + i)) |
43+
repeat(dist(gen)) |
44+
as_dynamic();
45+
auto delim = from((uint8_t)'\r');
46+
return from(body, delim) | concat();
5047
}) |
5148
window(17) |
52-
flat_map([](observable<uint8_t> w){
49+
flat_map([](observable<uint8_t> w){
5350
return w |
5451
reduce(
55-
vector<uint8_t>(),
52+
vector<uint8_t>(),
5653
[](vector<uint8_t>& v, uint8_t b){
57-
v.push_back(b);
54+
v.push_back(b);
5855
return move(v);
59-
},
60-
[](vector<uint8_t>& v){return move(v);}) |
61-
as_dynamic();
56+
}) |
57+
as_dynamic();
6258
}) |
63-
filter([](vector<uint8_t>& v){
59+
tap([](vector<uint8_t>& v){
60+
// print input packet of bytes
6461
copy(v.begin(), v.end(), ostream_iterator<long>(cout, " "));
65-
cout << endl;
66-
return true;
62+
cout << endl;
6763
});
6864

65+
//
66+
// recover lines of text from byte stream
67+
//
68+
6969
// create strings split on \r
7070
auto strings = bytes |
7171
concat_map([](vector<uint8_t> v){
@@ -75,6 +75,9 @@ int main()
7575
sregex_token_iterator end;
7676
vector<string> splits(cursor, end);
7777
return iterate(move(splits));
78+
}) |
79+
filter([](string& s){
80+
return !s.empty();
7881
});
7982

8083
// group strings by line
@@ -83,18 +86,17 @@ int main()
8386
group_by(
8487
[=](string& s) mutable {
8588
return s.back() == '\r' ? group++ : group;
86-
},
87-
[](string& s) { return move(s);});
89+
});
8890

8991
// reduce the strings for a line into one string
9092
auto lines = linewindows |
91-
flat_map([](grouped_observable<int, string> w){
92-
return w | sum();
93+
flat_map([](grouped_observable<int, string> w){
94+
return w | sum();
9395
});
9496

9597
// print result
9698
lines |
97-
subscribe(println(cout));
99+
subscribe<string>(println(cout));
98100

99101
return 0;
100102
}

Rx/v2/examples/linesfrombytes/main.cpp

+41-37
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
#include "rxcpp/rx.hpp"
33
using namespace rxcpp;
44
using namespace rxcpp::sources;
5+
using namespace rxcpp::operators;
56
using namespace rxcpp::util;
67

78
#include <regex>
@@ -14,64 +15,67 @@ int main()
1415
mt19937 gen(rd());
1516
uniform_int_distribution<> dist(4, 18);
1617

17-
// produce byte stream that contains lines of text
18-
auto bytes = range(1, 10).
19-
map([&](int i){
20-
return from((uint8_t)('A' + i)).
21-
repeat(dist(gen)).
22-
concat(from((uint8_t)'\r'));
23-
}).
24-
merge().
25-
window(17).
26-
map([](observable<uint8_t> w){
27-
return w.
18+
// for testing purposes, produce byte stream that from lines of text
19+
auto bytes = range(1, 10) |
20+
flat_map([&](int i){
21+
auto body = from((uint8_t)('A' + i)) |
22+
repeat(dist(gen)) |
23+
as_dynamic();
24+
auto delim = from((uint8_t)'\r');
25+
return from(body, delim) | concat();
26+
}) |
27+
window(17) |
28+
flat_map([](observable<uint8_t> w){
29+
return w |
2830
reduce(
29-
vector<uint8_t>(),
31+
vector<uint8_t>(),
3032
[](vector<uint8_t>& v, uint8_t b){
31-
v.push_back(b);
33+
v.push_back(b);
3234
return move(v);
33-
},
34-
[](vector<uint8_t>& v){return move(v);}).
35-
as_dynamic();
36-
}).
37-
merge().
38-
filter([](vector<uint8_t>& v){
35+
}) |
36+
as_dynamic();
37+
}) |
38+
tap([](vector<uint8_t>& v){
39+
// print input packet of bytes
3940
copy(v.begin(), v.end(), ostream_iterator<long>(cout, " "));
40-
cout << endl;
41-
return true;
41+
cout << endl;
4242
});
4343

44+
//
45+
// recover lines of text from byte stream
46+
//
47+
4448
// create strings split on \r
45-
auto strings = bytes.
46-
map([](vector<uint8_t> v){
49+
auto strings = bytes |
50+
concat_map([](vector<uint8_t> v){
4751
string s(v.begin(), v.end());
4852
regex delim(R"/(\r)/");
49-
sregex_token_iterator cursor(s.begin(), s.end(), delim, {-1, 0});
50-
sregex_token_iterator end;
53+
cregex_token_iterator cursor(&s[0], &s[0] + s.size(), delim, {-1, 0});
54+
cregex_token_iterator end;
5155
vector<string> splits(cursor, end);
5256
return iterate(move(splits));
53-
}).
54-
concat();
57+
}) |
58+
filter([](string& s){
59+
return !s.empty();
60+
});
5561

5662
// group strings by line
5763
int group = 0;
58-
auto linewindows = strings.
64+
auto linewindows = strings |
5965
group_by(
6066
[=](string& s) mutable {
6167
return s.back() == '\r' ? group++ : group;
62-
},
63-
[](string& s) { return move(s);});
68+
});
6469

6570
// reduce the strings for a line into one string
66-
auto lines = linewindows.
67-
map([](grouped_observable<int, string> w){
68-
return w.sum();
69-
}).
70-
merge();
71+
auto lines = linewindows |
72+
flat_map([](grouped_observable<int, string> w) {
73+
return w | sum();
74+
});
7175

7276
// print result
73-
lines.
74-
subscribe(println(cout));
77+
lines |
78+
subscribe<string>(println(cout));
7579

7680
return 0;
7781
}

Rx/v2/src/rxcpp/operators/rx-concat.hpp

+20-1
Original file line numberDiff line numberDiff line change
@@ -203,12 +203,31 @@ class concat_factory
203203

204204
}
205205

206-
template<class Coordination>
206+
inline auto concat()
207+
-> detail::concat_factory<identity_one_worker> {
208+
return detail::concat_factory<identity_one_worker>(identity_current_thread());
209+
}
210+
211+
template<class Coordination, class Check = typename std::enable_if<is_coordination<Coordination>::value>::type>
207212
auto concat(Coordination&& sf)
208213
-> detail::concat_factory<Coordination> {
209214
return detail::concat_factory<Coordination>(std::forward<Coordination>(sf));
210215
}
211216

217+
template<class O0, class... ON, class Check = typename std::enable_if<is_observable<O0>::value>::type>
218+
auto concat(O0&& o0, ON&&... on)
219+
-> detail::concat_factory<identity_one_worker> {
220+
return detail::concat_factory<identity_one_worker>(identity_current_thread())(from(std::forward<O0>(o0), std::forward<ON>(on)...));
221+
}
222+
223+
template<class Coordination, class O0, class... ON,
224+
class CheckC = typename std::enable_if<is_coordination<Coordination>::value>::type,
225+
class CheckO = typename std::enable_if<is_observable<O0>::value>::type>
226+
auto concat(Coordination&& sf, O0&& o0, ON&&... on)
227+
-> detail::concat_factory<Coordination> {
228+
return detail::concat_factory<Coordination>(std::forward<Coordination>(sf))(from(std::forward<O0>(o0), std::forward<ON>(on)...));
229+
}
230+
212231
}
213232

214233
}

Rx/v2/src/rxcpp/operators/rx-concat_map.hpp

+14-1
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ struct concat_traits {
4545

4646
static_assert(!std::is_same<decltype(result_check<source_value_type, collection_value_type, result_selector_type>(0)), tag_not_valid>::value, "concat_map ResultSelector must be a function with the signature concat_map::value_type(concat_map::source_value_type, concat_map::collection_value_type)");
4747

48-
typedef decltype((*(result_selector_type*)nullptr)(*(source_value_type*)nullptr, *(collection_value_type*)nullptr)) value_type;
48+
typedef rxu::decay_t<decltype((*(result_selector_type*)nullptr)(*(source_value_type*)nullptr, *(collection_value_type*)nullptr))> value_type;
4949
};
5050

5151
template<class Observable, class CollectionSelector, class ResultSelector, class Coordination>
@@ -267,6 +267,19 @@ auto concat_map(CollectionSelector&& s, ResultSelector&& rs, Coordination&& sf)
267267
return detail::concat_map_factory<CollectionSelector, ResultSelector, Coordination>(std::forward<CollectionSelector>(s), std::forward<ResultSelector>(rs), std::forward<Coordination>(sf));
268268
}
269269

270+
template<class CollectionSelector, class Coordination, class CheckC = typename std::enable_if<is_coordination<Coordination>::value>::type>
271+
auto concat_map(CollectionSelector&& s, Coordination&& sf)
272+
-> detail::concat_map_factory<CollectionSelector, rxu::detail::take_at<1>, Coordination> {
273+
return detail::concat_map_factory<CollectionSelector, rxu::detail::take_at<1>, Coordination>(std::forward<CollectionSelector>(s), rxu::take_at<1>(), std::forward<Coordination>(sf));
274+
}
275+
276+
template<class CollectionSelector>
277+
auto concat_map(CollectionSelector&& s)
278+
-> detail::concat_map_factory<CollectionSelector, rxu::detail::take_at<1>, identity_one_worker> {
279+
return detail::concat_map_factory<CollectionSelector, rxu::detail::take_at<1>, identity_one_worker>(std::forward<CollectionSelector>(s), rxu::take_at<1>(), identity_current_thread());
280+
}
281+
282+
270283
}
271284

272285
}

Rx/v2/src/rxcpp/operators/rx-flat_map.hpp

+15-3
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ struct flat_map_traits {
3030

3131
static_assert(!std::is_same<decltype(collection_check<source_value_type, collection_selector_type>(0)), tag_not_valid>::value, "flat_map CollectionSelector must be a function with the signature observable(flat_map::source_value_type)");
3232

33-
typedef decltype((*(collection_selector_type*)nullptr)((*(source_value_type*)nullptr))) collection_type;
33+
typedef rxu::decay_t<decltype((*(collection_selector_type*)nullptr)((*(source_value_type*)nullptr)))> collection_type;
3434

3535
static_assert(is_observable<collection_type>::value, "flat_map CollectionSelector must return an observable");
3636

@@ -43,7 +43,7 @@ struct flat_map_traits {
4343

4444
static_assert(!std::is_same<decltype(result_check<source_value_type, collection_value_type, result_selector_type>(0)), tag_not_valid>::value, "flat_map ResultSelector must be a function with the signature flat_map::value_type(flat_map::source_value_type, flat_map::collection_value_type)");
4545

46-
typedef decltype((*(result_selector_type*)nullptr)(*(source_value_type*)nullptr, *(collection_value_type*)nullptr)) value_type;
46+
typedef rxu::decay_t<decltype((*(result_selector_type*)nullptr)(*(source_value_type*)nullptr, *(collection_value_type*)nullptr))> value_type;
4747
};
4848

4949
template<class Observable, class CollectionSelector, class ResultSelector, class Coordination>
@@ -234,8 +234,20 @@ auto flat_map(CollectionSelector&& s, ResultSelector&& rs, Coordination&& sf)
234234
return detail::flat_map_factory<CollectionSelector, ResultSelector, Coordination>(std::forward<CollectionSelector>(s), std::forward<ResultSelector>(rs), std::forward<Coordination>(sf));
235235
}
236236

237+
template<class CollectionSelector, class Coordination, class CheckC = typename std::enable_if<is_coordination<Coordination>::value>::type>
238+
auto flat_map(CollectionSelector&& s, Coordination&& sf)
239+
-> detail::flat_map_factory<CollectionSelector, rxu::detail::take_at<1>, Coordination> {
240+
return detail::flat_map_factory<CollectionSelector, rxu::detail::take_at<1>, Coordination>(std::forward<CollectionSelector>(s), rxu::take_at<1>(), std::forward<Coordination>(sf));
237241
}
238242

243+
template<class CollectionSelector>
244+
auto flat_map(CollectionSelector&& s)
245+
-> detail::flat_map_factory<CollectionSelector, rxu::detail::take_at<1>, identity_one_worker> {
246+
return detail::flat_map_factory<CollectionSelector, rxu::detail::take_at<1>, identity_one_worker>(std::forward<CollectionSelector>(s), rxu::take_at<1>(), identity_current_thread());
239247
}
240248

241-
#endif
249+
}
250+
251+
}
252+
253+
#endif

Rx/v2/src/rxcpp/operators/rx-group_by.hpp

+13-1
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ class group_by_factory
190190
template<class Observable>
191191
struct group_by_factory_traits
192192
{
193-
typedef rxu::value_type_t<Observable> value_type;
193+
typedef rxu::value_type_t<rxu::decay_t<Observable>> value_type;
194194
typedef detail::group_by_traits<value_type, Observable, KeySelector, MarbleSelector, BinaryPredicate> traits_type;
195195
typedef detail::group_by<value_type, Observable, KeySelector, MarbleSelector, BinaryPredicate> group_by_type;
196196
};
@@ -209,6 +209,18 @@ inline auto group_by(KeySelector ks, MarbleSelector ms, BinaryPredicate p)
209209
return detail::group_by_factory<KeySelector, MarbleSelector, BinaryPredicate>(std::move(ks), std::move(ms), std::move(p));
210210
}
211211

212+
template<class KeySelector, class MarbleSelector>
213+
inline auto group_by(KeySelector ks, MarbleSelector ms)
214+
-> detail::group_by_factory<KeySelector, MarbleSelector, rxu::less> {
215+
return detail::group_by_factory<KeySelector, MarbleSelector, rxu::less>(std::move(ks), std::move(ms), rxu::less());
216+
}
217+
218+
template<class KeySelector>
219+
inline auto group_by(KeySelector ks)
220+
-> detail::group_by_factory<KeySelector, rxu::detail::take_at<0>, rxu::less> {
221+
return detail::group_by_factory<KeySelector, rxu::detail::take_at<0>, rxu::less>(std::move(ks), rxu::take_at<0>(), rxu::less());
222+
}
223+
212224

213225
}
214226

Rx/v2/src/rxcpp/operators/rx-merge.hpp

+5
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,11 @@ class merge_factory
182182

183183
}
184184

185+
inline auto merge()
186+
-> detail::merge_factory<identity_one_worker> {
187+
return detail::merge_factory<identity_one_worker>(identity_current_thread());
188+
}
189+
185190
template<class Coordination>
186191
auto merge(Coordination&& sf)
187192
-> detail::merge_factory<Coordination> {

0 commit comments

Comments
 (0)