Skip to content

Commit

Permalink
Add dematerialize()
Browse files Browse the repository at this point in the history
  • Loading branch information
freak4pc committed Mar 14, 2020
1 parent fde0a58 commit 4c9b5e8
Show file tree
Hide file tree
Showing 6 changed files with 255 additions and 10 deletions.
8 changes: 8 additions & 0 deletions CombineExt.xcodeproj/project.pbxproj
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
78C193DC241D0A9F0001B7FD /* Sink.swift in Sources */ = {isa = PBXBuildFile; fileRef = 78C193DB241D0A9F0001B7FD /* Sink.swift */; };
78C193DE241D46F40001B7FD /* Materialize.swift in Sources */ = {isa = PBXBuildFile; fileRef = 78C193DD241D46F40001B7FD /* Materialize.swift */; };
78C193E0241D4D8D0001B7FD /* MaterializeTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 78C193DF241D4D8D0001B7FD /* MaterializeTests.swift */; };
78C193E2241D596F0001B7FD /* Dematerialize.swift in Sources */ = {isa = PBXBuildFile; fileRef = 78C193E1241D596F0001B7FD /* Dematerialize.swift */; };
78C193E4241D63620001B7FD /* DematerializeTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 78C193E3241D63620001B7FD /* DematerializeTests.swift */; };
OBJ_22 /* AssignToMany.swift in Sources */ = {isa = PBXBuildFile; fileRef = OBJ_9 /* AssignToMany.swift */; };
OBJ_23 /* WithLatestFrom.swift in Sources */ = {isa = PBXBuildFile; fileRef = OBJ_10 /* WithLatestFrom.swift */; };
OBJ_30 /* Package.swift in Sources */ = {isa = PBXBuildFile; fileRef = OBJ_6 /* Package.swift */; };
Expand Down Expand Up @@ -58,6 +60,8 @@
78C193DB241D0A9F0001B7FD /* Sink.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = Sink.swift; sourceTree = "<group>"; };
78C193DD241D46F40001B7FD /* Materialize.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = Materialize.swift; sourceTree = "<group>"; };
78C193DF241D4D8D0001B7FD /* MaterializeTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = MaterializeTests.swift; sourceTree = "<group>"; };
78C193E1241D596F0001B7FD /* Dematerialize.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = Dematerialize.swift; sourceTree = "<group>"; };
78C193E3241D63620001B7FD /* DematerializeTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = DematerializeTests.swift; sourceTree = "<group>"; };
"CombineExt::CombineExt::Product" /* CombineExt.framework */ = {isa = PBXFileReference; lastKnownFileType = wrapper.framework; path = CombineExt.framework; sourceTree = BUILT_PRODUCTS_DIR; };
"CombineExt::CombineExtTests::Product" /* CombineExtTests.xctest */ = {isa = PBXFileReference; lastKnownFileType = file; path = CombineExtTests.xctest; sourceTree = BUILT_PRODUCTS_DIR; };
OBJ_10 /* WithLatestFrom.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = WithLatestFrom.swift; sourceTree = "<group>"; };
Expand Down Expand Up @@ -110,6 +114,7 @@
78C193D0241C1B450001B7FD /* FlatMapLatestTests.swift */,
78C193D8241CEEA80001B7FD /* CreateTests.swift */,
78C193DF241D4D8D0001B7FD /* MaterializeTests.swift */,
78C193E3241D63620001B7FD /* DematerializeTests.swift */,
);
path = Tests;
sourceTree = SOURCE_ROOT;
Expand Down Expand Up @@ -152,6 +157,7 @@
78C193CE241C16C40001B7FD /* FlatMapLatest.swift */,
78C193D3241C2DE00001B7FD /* Create.swift */,
78C193DD241D46F40001B7FD /* Materialize.swift */,
78C193E1241D596F0001B7FD /* Dematerialize.swift */,
);
path = Operators;
sourceTree = "<group>";
Expand Down Expand Up @@ -242,6 +248,7 @@
78C193CF241C16C40001B7FD /* FlatMapLatest.swift in Sources */,
78C193DC241D0A9F0001B7FD /* Sink.swift in Sources */,
78AA9299241B8C45009BD68B /* DemandBuffer.swift in Sources */,
78C193E2241D596F0001B7FD /* Dematerialize.swift in Sources */,
78C193DE241D46F40001B7FD /* Materialize.swift in Sources */,
78C193D4241C2DE00001B7FD /* Create.swift in Sources */,
OBJ_22 /* AssignToMany.swift in Sources */,
Expand All @@ -266,6 +273,7 @@
78AA9297241B8532009BD68B /* AssignToManyTests.swift in Sources */,
OBJ_41 /* WithLatestFromTests.swift in Sources */,
78C193E0241D4D8D0001B7FD /* MaterializeTests.swift in Sources */,
78C193E4241D63620001B7FD /* DematerializeTests.swift in Sources */,
78C193D9241CEEA80001B7FD /* CreateTests.swift in Sources */,
);
runOnlyForDeploymentPostprocessing = 0;
Expand Down
36 changes: 30 additions & 6 deletions Sources/Common/Sink.swift
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ class Sink<Upstream: Publisher, Downstream: Subscriber>: Subscriber {

private(set) var buffer: DemandBuffer<Downstream>
private var upstreamSubscription: Subscription?
private let transformOutput: TransformOutput
private let transformFailure: TransformFailure
private let transformOutput: TransformOutput?
private let transformFailure: TransformFailure?

/// Initialize a new sink subscribing to the upstream publisher and
/// fulfilling the demand of the downstream subscriber using a backpresurre
Expand All @@ -27,10 +27,12 @@ class Sink<Upstream: Publisher, Downstream: Subscriber>: Subscriber {
/// - parameter downstream: The downstream subscriber
/// - parameter transformOutput: Transform the upstream publisher's output type to the downstream's input type
/// - parameter transformFailure: Transform the upstream failure type to the downstream's failure type
///
/// - note: You **must** provide the two transformation functions above if you using the default `Sink` implementation. Otherwise, you must subclass `Sink` with your own publisher's sink and manage the buffer accordingly.
init(upstream: Upstream,
downstream: Downstream,
transformOutput: @escaping TransformOutput,
transformFailure: @escaping TransformFailure) {
transformOutput: TransformOutput? = nil,
transformFailure: TransformFailure? = nil) {
self.buffer = DemandBuffer(subscriber: downstream)
self.transformOutput = transformOutput
self.transformFailure = transformFailure
Expand All @@ -47,7 +49,18 @@ class Sink<Upstream: Publisher, Downstream: Subscriber>: Subscriber {
}

func receive(_ input: Upstream.Output) -> Subscribers.Demand {
guard let input = transformOutput(input) else { return .none }
guard let transform = transformOutput else {
fatalError("""
❌ Missing output transformation
=========================
You must either:
- Provide a transformation function from the upstream's output to the downstream's input; or
- Subclass `Sink` with your own publisher's Sink and manage the buffer yourself
""")
}

guard let input = transform(input) else { return .none }
return buffer.buffer(value: input)
}

Expand All @@ -56,7 +69,18 @@ class Sink<Upstream: Publisher, Downstream: Subscriber>: Subscriber {
case .finished:
buffer.complete(completion: .finished)
case .failure(let error):
guard let error = transformFailure(error) else { return }
guard let transform = transformFailure else {
fatalError("""
❌ Missing failure transformation
=========================
You must either:
- Provide a transformation function from the upstream's failure to the downstream's failuer; or
- Subclass `Sink` with your own publisher's Sink and manage the buffer yourself
""")
}

guard let error = transform(error) else { return }
buffer.complete(completion: .failure(error))
}

Expand Down
80 changes: 80 additions & 0 deletions Sources/Operators/Dematerialize.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
//
// Dematerialize.swift
// CombineExt
//
// Created by Shai Mishali on 14/03/2020.
//

import Combine

public extension Publisher where Output: EventConvertible, Failure == Never {
/// Converts any previously-materialized publisher into its original form
///
/// - returns: A publisher dematerializing the materialized events
func dematerialize() -> Publishers.Dematerialize<Self> {
Publishers.Dematerialize(upstream: self)
}
}

// MARK: - Publisher
public extension Publishers {
/// A publisher which takes a materialized upstream publisher and converts
/// the wrapped events back into their original form
class Dematerialize<Upstream: Publisher>: Publisher where Upstream.Output: EventConvertible {
public typealias Output = Upstream.Output.Output
public typealias Failure = Upstream.Output.Failure

private let upstream: Upstream

public init(upstream: Upstream) {
self.upstream = upstream
}

public func receive<S: Subscriber>(subscriber: S) where Failure == S.Failure, Output == S.Input {
subscriber.receive(subscription: Subscription(upstream: upstream, downstream: subscriber))
}
}
}

// MARK: - Subscrription
private extension Publishers.Dematerialize {
class Subscription<Downstream: Subscriber>: Combine.Subscription
where Downstream.Input == Upstream.Output.Output, Downstream.Failure == Upstream.Output.Failure {
private var sink: Sink<Downstream>?

init(upstream: Upstream,
downstream: Downstream) {
self.sink = Sink(upstream: upstream,
downstream: downstream)
}

func request(_ demand: Subscribers.Demand) {
sink?.demand(demand)
}

func cancel() {
sink = nil
}
}
}

// MARK: - Sink
private extension Publishers.Dematerialize {
class Sink<Downstream: Subscriber>: CombineExt.Sink<Upstream, Downstream>
where Downstream.Input == Upstream.Output.Output, Downstream.Failure == Upstream.Output.Failure {
override func receive(_ input: Upstream.Output) -> Subscribers.Demand {
/// We have to override the default mechanism here to convert a
/// materialized failure into an actual failure
switch input.event {
case .value(let value):
return buffer.buffer(value: value)
case .failure(let failure):
buffer.complete(completion: .failure(failure))
return .none
case .finished:
buffer.complete(completion: .finished)
return .none
}
}
}
}
3 changes: 1 addition & 2 deletions Sources/Operators/Materialize.swift
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,7 @@ private extension Publishers.Materialize {
downstream: Downstream) {
self.sink = Sink(upstream: upstream,
downstream: downstream,
transformOutput: { Event.value($0) },
transformFailure: { _ in nil })
transformOutput: { .value($0) })
}

func request(_ demand: Subscribers.Demand) {
Expand Down
4 changes: 2 additions & 2 deletions Tests/CreateTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ private extension CreateTests {
}
}

func makeSubscriber(demand: Subscribers.Demand, expectation: XCTestExpectation) -> AnySubscriber<String, MyError> {
func makeSubscriber(demand: Subscribers.Demand, expectation: XCTestExpectation?) -> AnySubscriber<String, MyError> {
return AnySubscriber(
receiveSubscription: { subscription in
subscription.request(demand)
Expand All @@ -124,7 +124,7 @@ private extension CreateTests {
},
receiveCompletion: { finished in
self.completion = finished
expectation.fulfill()
expectation?.fulfill()
})
}
}
134 changes: 134 additions & 0 deletions Tests/DematerializeTests.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
//
// DematerializeTests.swift
// CombineExtTests
//
// Created by Shai Mishali on 14/03/2020.
//

import XCTest
import Combine
import CombineExt

class DematerializeTests: XCTestCase {
var subscription: AnyCancellable?
var values = [String]()
var completion: Subscribers.Completion<MyError>?
var subject = PassthroughSubject<Event<String, MyError>, Never>()

override func setUp() {
values = []
completion = nil
subject = PassthroughSubject<Event<String, MyError>, Never>()
}

override func tearDown() {
subscription?.cancel()
}

enum MyError: Swift.Error {
case someError
}

func testEmpty() {
subscription = subject
.dematerialize()
.sink(receiveCompletion: { self.completion = $0 },
receiveValue: { self.values.append($0) })

subject.send(.finished)

XCTAssertTrue(values.isEmpty)
XCTAssertEqual(completion, .finished)
}

func testFail() {
subscription = subject
.dematerialize()
.sink(receiveCompletion: { self.completion = $0 },
receiveValue: { self.values.append($0) })

subject.send(.failure(.someError))

XCTAssertTrue(values.isEmpty)
XCTAssertEqual(completion, .failure(.someError))
}

func testFinished() {
subscription = subject
.dematerialize()
.sink(receiveCompletion: { self.completion = $0 },
receiveValue: { self.values.append($0) })

subject.send(.value("Hello"))
subject.send(.value("There"))
subject.send(.value("World!"))
subject.send(.finished)

XCTAssertEqual(values, ["Hello", "There", "World!"])
XCTAssertEqual(completion, .finished)
}

func testFinishedLimitedDemand() {
let subscriber = makeSubscriber(demand: .max(2), expectation: nil)

subject
.dematerialize()
.subscribe(subscriber)

subject.send(.value("Hello"))
subject.send(.value("There"))
subject.send(.value("World!"))
subject.send(.finished)

XCTAssertEqual(values, ["Hello", "There"])
XCTAssertEqual(completion, nil)
}

func testError() {
subscription = subject
.dematerialize()
.sink(receiveCompletion: { self.completion = $0 },
receiveValue: { self.values.append($0) })

subject.send(.value("Hello"))
subject.send(.value("There"))
subject.send(.value("World!"))
subject.send(.failure(.someError))

XCTAssertEqual(values, ["Hello", "There", "World!"])
XCTAssertEqual(completion, .failure(.someError))
}

func testErrorLimitedDemand() {
let subscriber = makeSubscriber(demand: .max(2), expectation: nil)

subject
.dematerialize()
.subscribe(subscriber)

subject.send(.value("Hello"))
subject.send(.value("There"))
subject.send(.value("World!"))
subject.send(.failure(.someError))

XCTAssertEqual(values, ["Hello", "There"])
XCTAssertEqual(completion, nil)
}
}

private extension DematerializeTests {
func makeSubscriber(demand: Subscribers.Demand, expectation: XCTestExpectation?) -> AnySubscriber<String, MyError> {
return AnySubscriber(
receiveSubscription: { subscription in
subscription.request(demand)
},
receiveValue: { value in
self.values.append(value)
return .none
},
receiveCompletion: { finished in
self.completion = finished
expectation?.fulfill()
})
}
}

0 comments on commit 4c9b5e8

Please sign in to comment.