forked from ReactiveX/RxSwift
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathColdObservable.swift
46 lines (38 loc) · 1.49 KB
/
ColdObservable.swift
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
//
// ColdObservable.swift
// RxTest
//
// Created by Krunoslav Zaher on 3/14/15.
// Copyright © 2015 Krunoslav Zaher. All rights reserved.
//
import RxSwift
/// A representation of cold observable sequence.
///
/// Recorded events are replayed after subscription once per subscriber.
///
/// Event times represent relative offset to subscription time.
final class ColdObservable<Element>
: TestableObservable<Element> {
override init(testScheduler: TestScheduler, recordedEvents: [Recorded<Event<Element>>]) {
super.init(testScheduler: testScheduler, recordedEvents: recordedEvents)
}
/// Subscribes `observer` to receive events for this sequence.
override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
self.subscriptions.append(Subscription(self.testScheduler.clock))
let i = self.subscriptions.count - 1
var disposed = false
for recordedEvent in self.recordedEvents {
_ = self.testScheduler.scheduleRelativeVirtual((), dueTime: recordedEvent.time, action: { _ in
if !disposed {
observer.on(recordedEvent.value)
}
return Disposables.create()
})
}
return Disposables.create {
disposed = true
let existing = self.subscriptions[i]
self.subscriptions[i] = Subscription(existing.subscribe, self.testScheduler.clock)
}
}
}