Skip to content

Commit 5c9f907

Browse files
author
Kirk Shoop
committed
Merge pull request ReactiveX#35 from kirkshoop/cppcon
allow conversions to dynamic
2 parents ea75f6f + ff573fc commit 5c9f907

File tree

4 files changed

+33
-73
lines changed

4 files changed

+33
-73
lines changed

Rx/v2/src/rxcpp/operators/rx-group_by.hpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ struct group_by
5959
typedef group_by_traits<T, Observable, KeySelector, MarbleSelector, BinaryPredicate> traits_type;
6060
typedef typename traits_type::key_selector_type key_selector_type;
6161
typedef typename traits_type::marble_selector_type marble_selector_type;
62+
typedef typename traits_type::marble_type marble_type;
6263
typedef typename traits_type::predicate_type predicate_type;
6364
typedef typename traits_type::subject_type subject_type;
6465
typedef typename traits_type::key_type key_type;
@@ -83,7 +84,7 @@ struct group_by
8384
{
8485
}
8586

86-
struct group_by_observable
87+
struct group_by_observable : public rxs::source_base<marble_type>
8788
{
8889
subject_type subject;
8990
key_type key;
@@ -109,7 +110,6 @@ struct group_by
109110
{
110111
typedef group_by_observer<Subscriber> this_type;
111112
typedef typename traits_type::grouped_observable_type value_type;
112-
typedef typename traits_type::marble_type marble_type;
113113
typedef typename std::decay<Subscriber>::type dest_type;
114114
typedef observer<T, this_type> observer_type;
115115
dest_type dest;

Rx/v2/src/rxcpp/rx-connectable_observable.hpp

+8-35
Original file line numberDiff line numberDiff line change
@@ -28,15 +28,13 @@ struct has_on_connect
2828

2929
template<class T>
3030
class dynamic_connectable_observable
31-
: public rxs::source_base<T>
31+
: public dynamic_observable<T>
3232
{
3333
struct state_type
3434
: public std::enable_shared_from_this<state_type>
3535
{
36-
typedef std::function<void(subscriber<T>)> onsubscribe_type;
3736
typedef std::function<void(composite_subscription)> onconnect_type;
3837

39-
onsubscribe_type on_subscribe;
4038
onconnect_type on_connect;
4139
};
4240
std::shared_ptr<state_type> state;
@@ -54,9 +52,6 @@ class dynamic_connectable_observable
5452
template<class SO>
5553
void construct(SO&& source, rxs::tag_source&&) {
5654
auto so = std::make_shared<typename std::decay<SO>::type>(std::forward<SO>(source));
57-
state->on_subscribe = [so](subscriber<T> o) mutable {
58-
so->on_subscribe(std::move(o));
59-
};
6055
state->on_connect = [so](composite_subscription cs) mutable {
6156
so->on_connect(std::move(cs));
6257
};
@@ -71,45 +66,23 @@ class dynamic_connectable_observable
7166
}
7267

7368
template<class SOF>
74-
explicit dynamic_connectable_observable(SOF&& sof)
75-
: state(std::make_shared<state_type>())
69+
explicit dynamic_connectable_observable(SOF sof)
70+
: dynamic_observable<T>(sof)
71+
, state(std::make_shared<state_type>())
7672
{
77-
construct(std::forward<SOF>(sof),
73+
construct(std::move(sof),
7874
typename std::conditional<is_dynamic_observable<SOF>::value, tag_dynamic_observable, rxs::tag_source>::type());
7975
}
8076

8177
template<class SF, class CF>
8278
dynamic_connectable_observable(SF&& sf, CF&& cf)
83-
: state(std::make_shared<state_type>())
79+
: dynamic_observable<T>(std::forward<SF>(sf))
80+
, state(std::make_shared<state_type>())
8481
{
85-
state->on_subscribe = std::forward<SF>(sf);
8682
state->on_connect = std::forward<CF>(cf);
8783
}
8884

89-
void on_subscribe(subscriber<T> o) const {
90-
state->on_subscribe(std::move(o));
91-
}
92-
93-
template<class Subscriber>
94-
typename std::enable_if<!std::is_same<typename std::decay<Subscriber>::type, observer<T>>::value, void>::type
95-
on_subscribe(Subscriber&& o) const {
96-
auto so = std::make_shared<typename std::decay<Subscriber>::type>(std::forward<Subscriber>(o));
97-
state->on_subscribe(make_subscriber<T>(
98-
*so,
99-
make_observer_dynamic<T>(
100-
// on_next
101-
[so](T t){
102-
so->on_next(t);
103-
},
104-
// on_error
105-
[so](std::exception_ptr e){
106-
so->on_error(e);
107-
},
108-
// on_completed
109-
[so](){
110-
so->on_completed();
111-
})));
112-
}
85+
using dynamic_observable<T>::on_subscribe;
11386

11487
void on_connect(composite_subscription cs) const {
11588
state->on_connect(std::move(cs));

Rx/v2/src/rxcpp/rx-grouped_observable.hpp

+8-36
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ struct has_on_get_key_for
2828

2929
template<class K, class T>
3030
class dynamic_grouped_observable
31-
: public rxs::source_base<T>
31+
: public dynamic_observable<T>
3232
{
3333
public:
3434
typedef typename std::decay<K>::type key_type;
@@ -38,10 +38,8 @@ class dynamic_grouped_observable
3838
struct state_type
3939
: public std::enable_shared_from_this<state_type>
4040
{
41-
typedef std::function<void(subscriber<T>)> onsubscribe_type;
4241
typedef std::function<key_type()> ongetkey_type;
4342

44-
onsubscribe_type on_subscribe;
4543
ongetkey_type on_get_key;
4644
};
4745
std::shared_ptr<state_type> state;
@@ -62,9 +60,6 @@ class dynamic_grouped_observable
6260
template<class SO>
6361
void construct(SO&& source, const rxs::tag_source&) {
6462
auto so = std::make_shared<typename std::decay<SO>::type>(std::forward<SO>(source));
65-
state->on_subscribe = [so](subscriber<T> o) mutable {
66-
so->on_subscribe(std::move(o));
67-
};
6863
state->on_get_key = [so]() mutable {
6964
return so->on_get_key();
7065
};
@@ -77,46 +72,23 @@ class dynamic_grouped_observable
7772
}
7873

7974
template<class SOF>
80-
explicit dynamic_grouped_observable(SOF&& sof)
81-
: state(std::make_shared<state_type>())
75+
explicit dynamic_grouped_observable(SOF sof)
76+
: dynamic_observable<T>(sof)
77+
, state(std::make_shared<state_type>())
8278
{
83-
construct(std::forward<SOF>(sof),
79+
construct(std::move(sof),
8480
typename std::conditional<is_dynamic_grouped_observable<SOF>::value, tag_dynamic_grouped_observable, rxs::tag_source>::type());
8581
}
8682

8783
template<class SF, class CF>
8884
dynamic_grouped_observable(SF&& sf, CF&& cf)
89-
: state(std::make_shared<state_type>())
85+
: dynamic_observable<T>(std::forward<SF>(sf))
86+
, state(std::make_shared<state_type>())
9087
{
91-
state->on_subscribe = std::forward<SF>(sf);
9288
state->on_connect = std::forward<CF>(cf);
9389
}
9490

95-
void on_subscribe(subscriber<T> o) const {
96-
state->on_subscribe(std::move(o));
97-
}
98-
99-
template<class Subscriber>
100-
typename std::enable_if<!std::is_same<typename std::decay<Subscriber>::type, observer<T>>::value, void>::type
101-
on_subscribe(Subscriber&& o) const {
102-
auto so = std::make_shared<typename std::decay<Subscriber>::type>(std::forward<Subscriber>(o));
103-
state->on_subscribe(
104-
make_subscriber<T>(
105-
*so,
106-
// on_next
107-
[so](T t){
108-
so->on_next(t);
109-
},
110-
// on_error
111-
[so](std::exception_ptr e){
112-
so->on_error(e);
113-
},
114-
// on_completed
115-
[so](){
116-
so->on_completed();
117-
}).
118-
as_dynamic());
119-
}
91+
using dynamic_observable<T>::on_subscribe;
12092

12193
key_type on_get_key() const {
12294
return state->on_get_key();

Rx/v2/src/rxcpp/rx-subscriber.hpp

+15
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,21 @@ class subscriber : public subscriber_base<T>
104104
{
105105
}
106106

107+
template<class U, class O>
108+
friend class subscriber;
109+
110+
template<class O>
111+
subscriber(
112+
const subscriber<T, O>& o,
113+
typename std::enable_if<
114+
!std::is_same<O, observer<T>>::value &&
115+
std::is_same<Observer, observer<T>>::value, void**>::type select = nullptr)
116+
: lifetime(o.lifetime)
117+
, destination(o.destination.as_dynamic())
118+
, id(o.id)
119+
{
120+
}
121+
107122
template<class U>
108123
subscriber(trace_id id, composite_subscription cs, U&& o)
109124
: lifetime(std::move(cs))

0 commit comments

Comments
 (0)