forked from reactiveui/ReactiveUI
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathObservableAsyncMRUCacheTest.cs
177 lines (143 loc) · 6.5 KB
/
ObservableAsyncMRUCacheTest.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
using System;
using System.Linq;
using System.Reactive.Linq;
using System.Threading;
using ReactiveUI.Testing;
using Xunit;
using Microsoft.Reactive.Testing;
namespace ReactiveUI.Tests
{
public class ObservableAsyncMRUCacheTest
{
[Fact(Skip="This test is badly written")]
public void GetTest()
{
(new TestScheduler()).With(sched => {
var input = new[] {1, 1, 1, 1, 1};
var delay = TimeSpan.FromSeconds(1.0);
var fixture = new ObservableAsyncMRUCache<int, int>(x => Observable.Return(x*5).Delay(delay, sched), 5, 2);
int result = 0;
var t = new Thread(() => {
// We use this side thread because there's no way to tell
// the cache to Run the Test Scheduler. So the side thread
// will do the waiting while the main thread advances the
// Scheduler
foreach (int x in input.Select(x => fixture.Get(x))) {
result += x;
}
});
t.Start();
sched.Start();
sched.AdvanceToMs(500);
// NB: The Thread.Sleep is to let our other thread catch up
Thread.Sleep(100);
Assert.Equal(0, result);
sched.AdvanceToMs(1200);
Thread.Sleep(100);
Assert.Equal(25, result);
sched.Start();
t.Join();
Assert.Equal(25, result);
});
}
[Fact]
public void AsyncGetTest()
{
var input = new[] { 1, 1, 1, 1, 1 };
var sched = new TestScheduler();
var delay = TimeSpan.FromSeconds(1.0);
var fixture = new ObservableAsyncMRUCache<int, int>(x => Observable.Return(x*5).Delay(delay, sched), 5, 2, null, sched);
int result = 0;
input.ToObservable(sched).SelectMany<int, int>(x => (IObservable<int>)fixture.AsyncGet(x)).Subscribe(x => result += x);
sched.AdvanceTo(sched.FromTimeSpan(TimeSpan.FromMilliseconds(500)));
Assert.Equal(0, result);
sched.AdvanceTo(sched.FromTimeSpan(TimeSpan.FromMilliseconds(1200)));
Assert.Equal(25, result);
sched.Start();
Assert.Equal(25, result);
}
#if FALSE
[Fact]
public void CachedValuesTest()
{
var input = new[] { 1, 2, 1, 3, 1 };
var sched = new TestScheduler();
var delay = TimeSpan.FromSeconds(1.0);
var fixture = new ObservableAsyncMRUCache<int, int>(x => Observable.Return(x*5).Delay(delay, sched), 2, 2);
var results = input.ToObservable().SelectMany(fixture.AsyncGet).CreateCollection();
sched.AdvanceTo(sched.FromTimeSpan(TimeSpan.FromMilliseconds(500)));
Assert.Equal(0, fixture.CachedValues().Count());
sched.AdvanceTo(sched.FromTimeSpan(TimeSpan.FromMilliseconds(1500)));
var output = fixture.CachedValues().ToArray();
Assert.IsTrue(output.Length == 2);
Assert.Equal(input.Length, results.Count);
}
#endif
[Fact]
public void CacheShouldQueueOnceWeHitOurConcurrentLimit()
{
var input = new[] { 1, 2, 3, 4, 1 };
var sched = new TestScheduler();
var delay = TimeSpan.FromSeconds(1.0);
var fixture = new ObservableAsyncMRUCache<int, int>(x => Observable.Return(x*5).Delay(delay, sched), 5, 2, null, sched);
int result = 0;
input.ToObservable(sched).SelectMany<int, int>(x => (IObservable<int>)fixture.AsyncGet(x)).Subscribe(x => result += x);
sched.AdvanceTo(sched.FromTimeSpan(TimeSpan.FromMilliseconds(500)));
Assert.Equal(0, result);
sched.AdvanceTo(sched.FromTimeSpan(TimeSpan.FromMilliseconds(1500)));
Assert.Equal(1*5 + 2*5 + 1*5, result);
sched.AdvanceTo(sched.FromTimeSpan(TimeSpan.FromMilliseconds(2500)));
Assert.Equal(1*5 + 2*5 + 3*5 + 4*5 + 1*5, result);
sched.AdvanceTo(sched.FromTimeSpan(TimeSpan.FromMilliseconds(5000)));
Assert.Equal(1*5 + 2*5 + 3*5 + 4*5 + 1*5, result);
}
/* NB: The ideas in this test are fundamentally flawed - while none
* of the Subjects in ObservableAsyncMRUCache are getting incorrectly
* completed, the input Observable cannot continue after an error
* is thrown. We need to make a design change to ObservableAsyncMRUCache
* so that users can decide what to return on error */
#if FALSE
[Fact]
public void CacheShouldEatExceptionsAndMarshalThemToObservable()
{
/* This is a bit tricky:
*
* 5,2 complete at t=1000 simultaneously
* 10,0 get queued up, 0 fails immediately (delay() doesn't delay the OnError),
* so 10 completes at t=2000
* The 7 completes at t=3000
*/
var input = new[] { 5, 2, 10, 0/*boom!*/, 7 };
var sched = new TestScheduler();
Observable.Throw<int>(new Exception("Foo")).Subscribe(x => {
Console.WriteLine(x);
}, ex => {
Console.WriteLine(ex);
}, () => {
Console.WriteLine("Completed");
});
var delay = TimeSpan.FromSeconds(1.0);
var fixture = new ObservableAsyncMRUCache<int, int>(x =>
(x == 0 ? Observable.Throw<int>(new Exception("Boom!")) : Observable.Return(10 * x)).Delay(delay, sched), 5, 2, null, sched);
Exception exception = null;
int completed = 0;
input.ToObservable()
.SelectMany(x => fixture.AsyncGet(x))
.Subscribe(x => {
this.Log().InfoFormat("Result = {0}", x);
completed++;
}, ex => exception = exception ?? ex);
sched.AdvanceTo(sched.FromTimeSpan(TimeSpan.FromMilliseconds(500)));
Assert.Null(exception);
Assert.Equal(0, completed);
sched.AdvanceTo(sched.FromTimeSpan(TimeSpan.FromMilliseconds(1500)));
Assert.NotNull(exception);
Assert.Equal(2, completed);
sched.AdvanceTo(sched.FromTimeSpan(TimeSpan.FromMilliseconds(7500)));
Assert.NotNull(exception);
Assert.Equal(4, completed);
this.Log().Info(exception);
}
#endif
}
}