Skip to content

Commit

Permalink
Add RetryWhen operator.
Browse files Browse the repository at this point in the history
  • Loading branch information
danielt1263 authored and freak4pc committed Mar 30, 2022
1 parent 547d11a commit 02d113a
Show file tree
Hide file tree
Showing 3 changed files with 227 additions and 0 deletions.
8 changes: 8 additions & 0 deletions CombineExt.xcodeproj/project.pbxproj
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
/* Begin PBXBuildFile section */
1970A8AA25246FBD00799AB6 /* FilterMany.swift in Sources */ = {isa = PBXBuildFile; fileRef = 1970A8A925246FBD00799AB6 /* FilterMany.swift */; };
1970A8B42524730500799AB6 /* FilterManyTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 1970A8B32524730400799AB6 /* FilterManyTests.swift */; };
7182326D26DAAE380026BAD3 /* RetryWhen.swift in Sources */ = {isa = PBXBuildFile; fileRef = 7182326C26DAAE380026BAD3 /* RetryWhen.swift */; };
7182326F26DAAF230026BAD3 /* RetryWhenTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 7182326E26DAAF230026BAD3 /* RetryWhenTests.swift */; };
BF330EF624F1FFFE001281FC /* CombineSchedulers in Frameworks */ = {isa = PBXBuildFile; productRef = BF330EF524F1FFFE001281FC /* CombineSchedulers */; };
BF330EF924F20032001281FC /* Timer.swift in Sources */ = {isa = PBXBuildFile; fileRef = BF330EF824F20032001281FC /* Timer.swift */; };
BF330EFB24F20080001281FC /* Lock.swift in Sources */ = {isa = PBXBuildFile; fileRef = BF330EFA24F20080001281FC /* Lock.swift */; };
Expand Down Expand Up @@ -108,6 +110,8 @@
/* Begin PBXFileReference section */
1970A8A925246FBD00799AB6 /* FilterMany.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = FilterMany.swift; sourceTree = "<group>"; };
1970A8B32524730400799AB6 /* FilterManyTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = FilterManyTests.swift; sourceTree = "<group>"; };
7182326C26DAAE380026BAD3 /* RetryWhen.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; name = RetryWhen.swift; path = ../../../../Desktop/RetryWhen.swift; sourceTree = "<group>"; };
7182326E26DAAF230026BAD3 /* RetryWhenTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = RetryWhenTests.swift; sourceTree = "<group>"; };
BF330EF824F20032001281FC /* Timer.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = Timer.swift; sourceTree = "<group>"; };
BF330EFA24F20080001281FC /* Lock.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = Lock.swift; sourceTree = "<group>"; };
BF3D3B5C253B83F300D830ED /* IgnoreFailure.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = IgnoreFailure.swift; sourceTree = "<group>"; };
Expand Down Expand Up @@ -252,6 +256,7 @@
OBJ_26 /* Partition.swift */,
OBJ_27 /* PrefixDuration.swift */,
OBJ_28 /* RemoveAllDuplicates.swift */,
7182326C26DAAE380026BAD3 /* RetryWhen.swift */,
OBJ_29 /* SetOutputType.swift */,
BF43CC1425008B4F005AFA28 /* IgnoreOutputSetOutputType.swift */,
BF3D3B5C253B83F300D830ED /* IgnoreFailure.swift */,
Expand Down Expand Up @@ -305,6 +310,7 @@
OBJ_54 /* PrefixDurationTests.swift */,
OBJ_55 /* RemoveAllDuplicatesTests.swift */,
OBJ_56 /* ReplaySubjectTests.swift */,
7182326E26DAAF230026BAD3 /* RetryWhenTests.swift */,
OBJ_57 /* SetOutputTypeTests.swift */,
BF43CC1625008C45005AFA28 /* IgnoreOutputSetOutputTypeTests.swift */,
BF3D3B66253B88E500D830ED /* IgnoreFailureTests.swift */,
Expand Down Expand Up @@ -560,6 +566,7 @@
OBJ_129 /* FlatMapLatestTests.swift in Sources */,
OBJ_130 /* MapManyTests.swift in Sources */,
OBJ_131 /* MaterializeTests.swift in Sources */,
7182326F26DAAF230026BAD3 /* RetryWhenTests.swift in Sources */,
OBJ_132 /* OptionalTests.swift in Sources */,
OBJ_133 /* PartitionTests.swift in Sources */,
OBJ_134 /* PassthroughRelayTests.swift in Sources */,
Expand Down Expand Up @@ -595,6 +602,7 @@
OBJ_87 /* CombineLatestMany.swift in Sources */,
OBJ_88 /* Create.swift in Sources */,
OBJ_89 /* Dematerialize.swift in Sources */,
7182326D26DAAE380026BAD3 /* RetryWhen.swift in Sources */,
OBJ_90 /* FlatMapLatest.swift in Sources */,
OBJ_91 /* MapMany.swift in Sources */,
1970A8AA25246FBD00799AB6 /* FilterMany.swift in Sources */,
Expand Down
95 changes: 95 additions & 0 deletions Sources/Operators/RetryWhen.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
//
// RetryWhen.swift
// CombineExt
//
// Created by Daniel Tartaglia on 3/21/20.
//

import Combine

public extension Publisher {

/// Repeats the source publisher on error when the notifier emits a next value. If the source publisher errors and the notifier completes, it will complete the source sequence.
///
/// - Parameter notificationHandler: A handler that is passed a publisher of errors raised by the source publisher and returns a publisher that either continues, completes or errors. This behavior is then applied to the source publisher.
/// - Returns: A publisher producing the elements of the given sequence repeatedly until it terminates successfully or is notified to error or complete.
func retryWhen<Trigger>(_ notificationHandler: @escaping (AnyPublisher<Self.Failure, Never>) -> Trigger)
-> Publishers.RetryWhen<Self, Trigger, Output, Failure> where Trigger: Publisher {
.init(upstream: self, notificationHandler: notificationHandler)
}
}

public extension Publishers {
class RetryWhen<Upstream, Trigger, Output, Failure>: Publisher where Upstream: Publisher, Upstream.Output == Output, Upstream.Failure == Failure, Trigger: Publisher {

typealias Handler = (AnyPublisher<Upstream.Failure, Never>) -> Trigger

private let upstream: Upstream
private let handler: Handler

init(upstream: Upstream, notificationHandler: @escaping Handler) {
self.upstream = upstream
self.handler = notificationHandler
}

public func receive<S>(subscriber: S) where S: Subscriber, Failure == S.Failure, Output == S.Input {
subscriber.receive(subscription: Subscription(upstream: upstream, downstream: subscriber, handler: handler))
}
}
}

extension Publishers.RetryWhen {
class Subscription<Downstream>: Combine.Subscription where Downstream: Subscriber, Downstream.Input == Upstream.Output, Downstream.Failure == Upstream.Failure {

private let upstream: Upstream
private let downstream: Downstream
private let errorSubject = PassthroughSubject<Upstream.Failure, Never>()
private var sink: Sink<Upstream, Downstream>?
private var cancellable: AnyCancellable?

init(upstream: Upstream, downstream: Downstream, handler: @escaping (AnyPublisher<Upstream.Failure, Never>) -> Trigger) {
self.upstream = upstream
self.downstream = downstream
self.sink = Sink(
downstream: downstream,
transformOutput: { $0 },
transformFailure: { [errorSubject] in
errorSubject.send($0)
return nil
}
)
self.cancellable = handler(errorSubject.eraseToAnyPublisher())
.sink(
receiveCompletion: { [sink] completion in
switch completion {
case .finished:
sink?.buffer.complete(completion: Subscribers.Completion<Downstream.Failure>.finished)
case .failure(let error):
if let error = error as? Downstream.Failure {
sink?.buffer.complete(completion: Subscribers.Completion<Downstream.Failure>.failure(error))
}
}
},
receiveValue: { [upstream, sink] _ in
guard let sink = sink else { return }
upstream.subscribe(sink)
}
)
upstream.subscribe(sink!)
}

func request(_ demand: Subscribers.Demand) {
sink?.demand(demand)
}

func cancel() {
sink = nil
}
}
}

extension Publishers.RetryWhen.Subscription: CustomStringConvertible {
var description: String {
return "RetryWhen.Subscription<\(Output.self), \(Failure.self)>"
}
}
124 changes: 124 additions & 0 deletions Tests/RetryWhenTests.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
//
// RetryWhenTests.swift
// CombineExtTests
//
// Created by Daniel Tartaglia on 8/28/21.
//

#if !os(watchOS)
import XCTest
import Combine
import CombineExt

@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
class RetryWhenTests: XCTestCase {
var subscription: AnyCancellable!

func testPassthroughNextAndComplete() {
let source = PassthroughSubject<Int, MyError>()

var expectedOutput: Int?

var completion: Subscribers.Completion<MyError>?

subscription = source
.retryWhen { error in
error.filter { _ in false }
}
.sink(
receiveCompletion: { completion = $0 },
receiveValue: { expectedOutput = $0 }
)

source.send(2)
source.send(completion: .finished)

XCTAssertEqual(
expectedOutput,
2
)
XCTAssertEqual(completion, .finished)

}

func testSuccessfulRetry() {
var times = 0

var expectedOutput: Int?

var completion: Subscribers.Completion<RetryWhenTests.MyError>?

subscription = Deferred(createPublisher: { () -> AnyPublisher<Int, MyError> in
defer { times += 1 }
if times == 0 {
return Fail<Int, MyError>(error: MyError.someError).eraseToAnyPublisher()
}
else {
return Just(5).setFailureType(to: MyError.self).eraseToAnyPublisher()
}
})
.retryWhen { error in
error.map { _ in }
}
.sink(
receiveCompletion: { completion = $0 },
receiveValue: { expectedOutput = $0 }
)

XCTAssertEqual(
expectedOutput,
5
)
XCTAssertEqual(completion, .finished)
XCTAssertEqual(times, 2)
}

func testRetryFailure() {
var expectedOutput: Int?

var completion: Subscribers.Completion<RetryWhenTests.MyError>?

subscription = Fail<Int, MyError>(error: MyError.someError)
.retryWhen { error in
error.tryMap { _ in throw MyError.retryError }
}
.sink(
receiveCompletion: { completion = $0 },
receiveValue: { expectedOutput = $0 }
)

XCTAssertEqual(
expectedOutput,
nil
)
XCTAssertEqual(completion, .failure(MyError.retryError))
}

func testRetryComplete() {
var expectedOutput: Int?

var completion: Subscribers.Completion<RetryWhenTests.MyError>?

subscription = Fail<Int, MyError>(error: MyError.someError)
.retryWhen { error in
error.prefix(1)
}
.sink(
receiveCompletion: { completion = $0 },
receiveValue: { expectedOutput = $0 }
)

XCTAssertEqual(
expectedOutput,
nil
)
XCTAssertEqual(completion, .finished)
}

enum MyError: Swift.Error {
case someError
case retryError
}

}
#endif

0 comments on commit 02d113a

Please sign in to comment.