Skip to content

Commit

Permalink
Fix formatting per review...
Browse files Browse the repository at this point in the history
  • Loading branch information
danielt1263 authored and freak4pc committed Mar 30, 2022
1 parent f601ada commit 83dfccc
Show file tree
Hide file tree
Showing 2 changed files with 132 additions and 133 deletions.
51 changes: 26 additions & 25 deletions Sources/Operators/RetryWhen.swift
Original file line number Diff line number Diff line change
Expand Up @@ -10,83 +10,84 @@ import Combine

@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
public extension Publisher {

/// Repeats the source publisher on error when the notifier emits a next value. If the source publisher errors and the notifier completes, it will complete the source sequence.
///
/// - Parameter notificationHandler: A handler that is passed a publisher of errors raised by the source publisher and returns a publisher that either continues, completes or errors. This behavior is then applied to the source publisher.
/// - Returns: A publisher producing the elements of the given sequence repeatedly until it terminates successfully or is notified to error or complete.
func retryWhen<Trigger>(_ notificationHandler: @escaping (AnyPublisher<Self.Failure, Never>) -> Trigger)
-> Publishers.RetryWhen<Self, Trigger, Output, Failure> where Trigger: Publisher {
.init(upstream: self, notificationHandler: notificationHandler)
func retryWhen<RetryTrigger>(_ errorTrigger: @escaping (AnyPublisher<Self.Failure, Never>) -> RetryTrigger)
-> Publishers.RetryWhen<Self, RetryTrigger, Output, Failure> where RetryTrigger: Publisher {
.init(upstream: self, errorTrigger: errorTrigger)
}
}

@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
public extension Publishers {
class RetryWhen<Upstream, Trigger, Output, Failure>: Publisher where Upstream: Publisher, Upstream.Output == Output, Upstream.Failure == Failure, Trigger: Publisher {

typealias Handler = (AnyPublisher<Upstream.Failure, Never>) -> Trigger

class RetryWhen<Upstream, RetryTrigger, Output, Failure>: Publisher where Upstream: Publisher, Upstream.Output == Output, Upstream.Failure == Failure, RetryTrigger: Publisher {
typealias ErrorTrigger = (AnyPublisher<Upstream.Failure, Never>) -> RetryTrigger

private let upstream: Upstream
private let handler: Handler
init(upstream: Upstream, notificationHandler: @escaping Handler) {
private let errorTrigger: ErrorTrigger

init(upstream: Upstream, errorTrigger: @escaping ErrorTrigger) {
self.upstream = upstream
self.handler = notificationHandler
self.errorTrigger = errorTrigger
}

public func receive<S>(subscriber: S) where S: Subscriber, Failure == S.Failure, Output == S.Input {
subscriber.receive(subscription: Subscription(upstream: upstream, downstream: subscriber, handler: handler))
subscriber.receive(subscription: Subscription(upstream: upstream, downstream: subscriber, errorTrigger: errorTrigger))
}
}
}

@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
extension Publishers.RetryWhen {
class Subscription<Downstream>: Combine.Subscription where Downstream: Subscriber, Downstream.Input == Upstream.Output, Downstream.Failure == Upstream.Failure {

private let upstream: Upstream
private let downstream: Downstream
private let errorSubject = PassthroughSubject<Upstream.Failure, Never>()
private var sink: Sink<Upstream, Downstream>?
private var cancellable: AnyCancellable?

init(upstream: Upstream, downstream: Downstream, handler: @escaping (AnyPublisher<Upstream.Failure, Never>) -> Trigger) {

init(
upstream: Upstream,
downstream: Downstream,
errorTrigger: @escaping (AnyPublisher<Upstream.Failure, Never>) -> RetryTrigger
) {
self.upstream = upstream
self.downstream = downstream
self.sink = Sink(
upstream: upstream,
downstream: downstream,
upstream: upstream,
downstream: downstream,
transformOutput: { $0 },
transformFailure: { [errorSubject] in
errorSubject.send($0)
return nil
}
)
self.cancellable = handler(errorSubject.eraseToAnyPublisher())
self.cancellable = errorTrigger(errorSubject.eraseToAnyPublisher())
.sink(
receiveCompletion: { [sink] completion in
switch completion {
case .finished:
sink?.buffer.complete(completion: Subscribers.Completion<Downstream.Failure>.finished)
sink?.buffer.complete(completion: .finished)
case .failure(let error):
if let error = error as? Downstream.Failure {
sink?.buffer.complete(completion: Subscribers.Completion<Downstream.Failure>.failure(error))
sink?.buffer.complete(completion: .failure(error))
}
}
},
receiveValue: { [upstream, sink] _ in
guard let sink = sink else { return }
upstream.subscribe(sink)
}
)
)
upstream.subscribe(sink!)
}

func request(_ demand: Subscribers.Demand) {
sink?.demand(demand)
}

func cancel() {
sink = nil
}
Expand Down
214 changes: 106 additions & 108 deletions Tests/RetryWhenTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -12,113 +12,111 @@ import CombineExt

@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
class RetryWhenTests: XCTestCase {
var subscription: AnyCancellable!

func testPassthroughNextAndComplete() {
let source = PassthroughSubject<Int, MyError>()

var expectedOutput: Int?

var completion: Subscribers.Completion<MyError>?

subscription = source
.retryWhen { error in
error.filter { _ in false }
}
.sink(
receiveCompletion: { completion = $0 },
receiveValue: { expectedOutput = $0 }
)

source.send(2)
source.send(completion: .finished)

XCTAssertEqual(
expectedOutput,
2
)
XCTAssertEqual(completion, .finished)

}

func testSuccessfulRetry() {
var times = 0

var expectedOutput: Int?

var completion: Subscribers.Completion<RetryWhenTests.MyError>?

subscription = Deferred(createPublisher: { () -> AnyPublisher<Int, MyError> in
defer { times += 1 }
if times == 0 {
return Fail<Int, MyError>(error: MyError.someError).eraseToAnyPublisher()
}
else {
return Just(5).setFailureType(to: MyError.self).eraseToAnyPublisher()
}
})
.retryWhen { error in
error.map { _ in }
}
.sink(
receiveCompletion: { completion = $0 },
receiveValue: { expectedOutput = $0 }
)

XCTAssertEqual(
expectedOutput,
5
)
XCTAssertEqual(completion, .finished)
XCTAssertEqual(times, 2)
}

func testRetryFailure() {
var expectedOutput: Int?

var completion: Subscribers.Completion<RetryWhenTests.MyError>?

subscription = Fail<Int, MyError>(error: MyError.someError)
.retryWhen { error in
error.tryMap { _ in throw MyError.retryError }
}
.sink(
receiveCompletion: { completion = $0 },
receiveValue: { expectedOutput = $0 }
)

XCTAssertEqual(
expectedOutput,
nil
)
XCTAssertEqual(completion, .failure(MyError.retryError))
}

func testRetryComplete() {
var expectedOutput: Int?

var completion: Subscribers.Completion<RetryWhenTests.MyError>?

subscription = Fail<Int, MyError>(error: MyError.someError)
.retryWhen { error in
error.prefix(1)
}
.sink(
receiveCompletion: { completion = $0 },
receiveValue: { expectedOutput = $0 }
)

XCTAssertEqual(
expectedOutput,
nil
)
XCTAssertEqual(completion, .finished)
}

enum MyError: Swift.Error {
case someError
case retryError
}

var subscription: AnyCancellable!

func testPassthroughNextAndComplete() {
let source = PassthroughSubject<Int, MyError>()

var expectedOutput: Int?

var completion: Subscribers.Completion<MyError>?

subscription = source
.retryWhen { error in
error.filter { _ in false }
}
.sink(
receiveCompletion: { completion = $0 },
receiveValue: { expectedOutput = $0 }
)

source.send(2)
source.send(completion: .finished)

XCTAssertEqual(
expectedOutput,
2
)
XCTAssertEqual(completion, .finished)
}

func testSuccessfulRetry() {
var times = 0

var expectedOutput: Int?

var completion: Subscribers.Completion<RetryWhenTests.MyError>?

subscription = Deferred(createPublisher: { () -> AnyPublisher<Int, MyError> in
defer { times += 1 }
if times == 0 {
return Fail<Int, MyError>(error: MyError.someError).eraseToAnyPublisher()
}
else {
return Just(5).setFailureType(to: MyError.self).eraseToAnyPublisher()
}
})
.retryWhen { error in
error.map { _ in }
}
.sink(
receiveCompletion: { completion = $0 },
receiveValue: { expectedOutput = $0 }
)

XCTAssertEqual(
expectedOutput,
5
)
XCTAssertEqual(completion, .finished)
XCTAssertEqual(times, 2)
}

func testRetryFailure() {
var expectedOutput: Int?

var completion: Subscribers.Completion<RetryWhenTests.MyError>?

subscription = Fail<Int, MyError>(error: MyError.someError)
.retryWhen { error in
error.tryMap { _ in throw MyError.retryError }
}
.sink(
receiveCompletion: { completion = $0 },
receiveValue: { expectedOutput = $0 }
)

XCTAssertEqual(
expectedOutput,
nil
)
XCTAssertEqual(completion, .failure(MyError.retryError))
}

func testRetryComplete() {
var expectedOutput: Int?

var completion: Subscribers.Completion<RetryWhenTests.MyError>?

subscription = Fail<Int, MyError>(error: MyError.someError)
.retryWhen { error in
error.prefix(1)
}
.sink(
receiveCompletion: { completion = $0 },
receiveValue: { expectedOutput = $0 }
)

XCTAssertEqual(
expectedOutput,
nil
)
XCTAssertEqual(completion, .finished)
}

enum MyError: Swift.Error {
case someError
case retryError
}
}
#endif

0 comments on commit 83dfccc

Please sign in to comment.