Skip to content

Commit

Permalink
Adds timeout parameter to blocking observable sequence.
Browse files Browse the repository at this point in the history
  • Loading branch information
kzaher committed Aug 28, 2016
1 parent d19cab3 commit 09a844e
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 33 deletions.
41 changes: 25 additions & 16 deletions RxBlocking/BlockingObservable+Operators.swift
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,14 @@ extension BlockingObservable {

var error: Swift.Error?

let lock = RunLoopLock()
let lock = RunLoopLock(timeout: timeout)

let d = SingleAssignmentDisposable()

defer {
d.dispose()
}

lock.dispatch {
d.disposable = self.source.subscribe { e in
if d.isDisposed {
Expand All @@ -47,9 +51,7 @@ extension BlockingObservable {
}
}

lock.run()

d.dispose()
try lock.run()

if let error = error {
throw error
Expand All @@ -74,7 +76,11 @@ extension BlockingObservable {

let d = SingleAssignmentDisposable()

let lock = RunLoopLock()
defer {
d.dispose()
}

let lock = RunLoopLock(timeout: timeout)

lock.dispatch {
d.disposable = self.source.subscribe { e in
Expand All @@ -99,9 +105,7 @@ extension BlockingObservable {
}
}

lock.run()

d.dispose()
try lock.run()

if let error = error {
throw error
Expand All @@ -126,7 +130,11 @@ extension BlockingObservable {

let d = SingleAssignmentDisposable()

let lock = RunLoopLock()
defer {
d.dispose()
}

let lock = RunLoopLock(timeout: timeout)

lock.dispatch {
d.disposable = self.source.subscribe { e in
Expand All @@ -148,9 +156,7 @@ extension BlockingObservable {
}
}

lock.run()

d.dispose()
try lock.run()

if let error = error {
throw error
Expand Down Expand Up @@ -186,8 +192,12 @@ extension BlockingObservable {
var error: Swift.Error?

let d = SingleAssignmentDisposable()

defer {
d.dispose()
}

let lock = RunLoopLock()
let lock = RunLoopLock(timeout: timeout)

lock.dispatch {
d.disposable = self.source.subscribe { e in
Expand Down Expand Up @@ -224,9 +234,8 @@ extension BlockingObservable {
}
}

lock.run()
d.dispose()

try lock.run()

if let error = error {
throw error
}
Expand Down
3 changes: 2 additions & 1 deletion RxBlocking/BlockingObservable.swift
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,6 @@ If you think you need to use a `BlockingObservable` this is usually a sign that
design.
*/
public struct BlockingObservable<E> {
let timeout: RxTimeInterval?
let source: Observable<E>
}
}
5 changes: 3 additions & 2 deletions RxBlocking/ObservableConvertibleType+Blocking.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@ extension ObservableConvertibleType {
/**
Converts an Observable into a `BlockingObservable` (an Observable with blocking operators).

- parameter timeout: Maximal time interval BlockingObservable can block without throwing `RxError.timeout`.
- returns: `BlockingObservable` version of `self`
*/
// @warn_unused_result(message:"http://git.io/rxs.uo")
public func toBlocking() -> BlockingObservable<E> {
return BlockingObservable(source: self.asObservable())
public func toBlocking(timeout: RxTimeInterval? = nil) -> BlockingObservable<E> {
return BlockingObservable(timeout: timeout, source: self.asObservable())
}
}
44 changes: 30 additions & 14 deletions RxBlocking/RunLoopLock.swift
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,19 @@ typealias AtomicInt = Int32
#endif

class RunLoopLock {
let currentRunLoop: CFRunLoop
let _currentRunLoop: CFRunLoop

var calledRun: AtomicInt = 0
var calledStop: AtomicInt = 0
var _calledRun: AtomicInt = 0
var _calledStop: AtomicInt = 0
var _timeout: RxTimeInterval?

init() {
currentRunLoop = CFRunLoopGetCurrent()
init(timeout: RxTimeInterval?) {
_timeout = timeout
_currentRunLoop = CFRunLoopGetCurrent()
}

func dispatch(_ action: @escaping () -> ()) {
CFRunLoopPerformBlock(currentRunLoop, CFRunLoopMode.defaultMode.rawValue) {
CFRunLoopPerformBlock(_currentRunLoop, CFRunLoopMode.defaultMode.rawValue) {
if CurrentThreadScheduler.isScheduleRequired {
_ = CurrentThreadScheduler.instance.schedule(()) { _ in
action()
Expand All @@ -50,23 +52,37 @@ class RunLoopLock {
action()
}
}
CFRunLoopWakeUp(currentRunLoop)
CFRunLoopWakeUp(_currentRunLoop)
}

func stop() {
if AtomicIncrement(&calledStop) != 1 {
if AtomicIncrement(&_calledStop) != 1 {
return
}
CFRunLoopPerformBlock(currentRunLoop, CFRunLoopMode.defaultMode.rawValue) {
CFRunLoopStop(self.currentRunLoop)
CFRunLoopPerformBlock(_currentRunLoop, CFRunLoopMode.defaultMode.rawValue) {
CFRunLoopStop(self._currentRunLoop)
}
CFRunLoopWakeUp(currentRunLoop)
CFRunLoopWakeUp(_currentRunLoop)
}

func run() {
if AtomicIncrement(&calledRun) != 1 {
func run() throws {
if AtomicIncrement(&_calledRun) != 1 {
fatalError("Run can be only called once")
}
CFRunLoopRun()
if let timeout = _timeout {
switch CFRunLoopRunInMode(CFRunLoopMode.defaultMode, timeout, false) {
case .finished:
return
case .handledSource:
return
case .stopped:
return
case .timedOut:
throw RxError.timeout
}
}
else {
CFRunLoopRun()
}
}
}
75 changes: 75 additions & 0 deletions Tests/RxSwiftTests/Tests/Observable+BlockingTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,21 @@ extension ObservableBlockingTest {
XCTAssertEqual(d, [1, 2])
}
}

func testToArray_timeout() {
do {
_ = try Observable<Int>.never().toBlocking(timeout: 0.01).toArray()
XCTFail("It should fail")
}
catch let e {
if case .timeout = e as! RxError {

}
else {
XCTFail()
}
}
}
}

// first
Expand Down Expand Up @@ -126,6 +141,21 @@ extension ObservableBlockingTest {
XCTAssertEqual(d, 1)
}
}

func testFirst_timeout() {
do {
_ = try Observable<Int>.never().toBlocking(timeout: 0.01).first()
XCTFail("It should fail")
}
catch let e {
if case .timeout = e as! RxError {

}
else {
XCTFail()
}
}
}
}

// last
Expand Down Expand Up @@ -183,6 +213,21 @@ extension ObservableBlockingTest {
XCTAssertEqual(d, 1)
}
}

func testLast_timeout() {
do {
_ = try Observable<Int>.never().toBlocking(timeout: 0.01).last()
XCTFail("It should fail")
}
catch let e {
if case .timeout = e as! RxError {

}
else {
XCTFail()
}
}
}
}


Expand Down Expand Up @@ -360,4 +405,34 @@ extension ObservableBlockingTest {
XCTAssertEqual(d, 1)
}
}

func testSingle_timeout() {
do {
_ = try Observable<Int>.never().toBlocking(timeout: 0.01).single()
XCTFail("It should fail")
}
catch let e {
if case .timeout = e as! RxError {

}
else {
XCTFail()
}
}
}

func testSinglePredicate_timeout() {
do {
_ = try Observable<Int>.never().toBlocking(timeout: 0.01).single { _ in true }
XCTFail("It should fail")
}
catch let e {
if case .timeout = e as! RxError {

}
else {
XCTFail()
}
}
}
}

0 comments on commit 09a844e

Please sign in to comment.