Skip to content

Commit

Permalink
Add Zip for multiple Publishers (CombineCommunity#6)
Browse files Browse the repository at this point in the history
  • Loading branch information
jasdev authored Mar 21, 2020
1 parent ecef1a1 commit 4febfd7
Show file tree
Hide file tree
Showing 5 changed files with 229 additions and 2 deletions.
10 changes: 9 additions & 1 deletion CombineExt.xcodeproj/project.pbxproj
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
78C193E0241D4D8D0001B7FD /* MaterializeTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 78C193DF241D4D8D0001B7FD /* MaterializeTests.swift */; };
78C193E2241D596F0001B7FD /* Dematerialize.swift in Sources */ = {isa = PBXBuildFile; fileRef = 78C193E1241D596F0001B7FD /* Dematerialize.swift */; };
78C193E4241D63620001B7FD /* DematerializeTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 78C193E3241D63620001B7FD /* DematerializeTests.swift */; };
BF50924B241FFE8E00600DF4 /* ZipManyTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = BF50924A241FFE8E00600DF4 /* ZipManyTests.swift */; };
BF8121BC241FF42C006A93B8 /* ZipMany.swift in Sources */ = {isa = PBXBuildFile; fileRef = BF8121BB241FF42C006A93B8 /* ZipMany.swift */; };
OBJ_22 /* AssignToMany.swift in Sources */ = {isa = PBXBuildFile; fileRef = OBJ_9 /* AssignToMany.swift */; };
OBJ_23 /* WithLatestFrom.swift in Sources */ = {isa = PBXBuildFile; fileRef = OBJ_10 /* WithLatestFrom.swift */; };
OBJ_30 /* Package.swift in Sources */ = {isa = PBXBuildFile; fileRef = OBJ_6 /* Package.swift */; };
Expand Down Expand Up @@ -77,7 +79,9 @@
78C193DF241D4D8D0001B7FD /* MaterializeTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = MaterializeTests.swift; sourceTree = "<group>"; };
78C193E1241D596F0001B7FD /* Dematerialize.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = Dematerialize.swift; sourceTree = "<group>"; };
78C193E3241D63620001B7FD /* DematerializeTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = DematerializeTests.swift; sourceTree = "<group>"; };
"CombineExt::CombineExt::Product" /* CombineExt.framework */ = {isa = PBXFileReference; lastKnownFileType = wrapper.framework; path = CombineExt.framework; sourceTree = BUILT_PRODUCTS_DIR; };
BF50924A241FFE8E00600DF4 /* ZipManyTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = ZipManyTests.swift; sourceTree = "<group>"; };
BF8121BB241FF42C006A93B8 /* ZipMany.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = ZipMany.swift; sourceTree = "<group>"; };
"CombineExt::CombineExt::Product" /* CombineExt.framework */ = {isa = PBXFileReference; explicitFileType = wrapper.framework; path = CombineExt.framework; sourceTree = BUILT_PRODUCTS_DIR; };
"CombineExt::CombineExtTests::Product" /* CombineExtTests.xctest */ = {isa = PBXFileReference; lastKnownFileType = file; path = CombineExtTests.xctest; sourceTree = BUILT_PRODUCTS_DIR; };
OBJ_10 /* WithLatestFrom.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = WithLatestFrom.swift; sourceTree = "<group>"; };
OBJ_12 /* WithLatestFromTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = WithLatestFromTests.swift; sourceTree = "<group>"; };
Expand Down Expand Up @@ -151,6 +155,7 @@
78002BBA241E97350018AA28 /* CurrentValueRelayTests.swift */,
78988A24241FFE2E00F3A4AF /* PartitionTests.swift */,
78988A1D241EAFDD00F3A4AF /* PassthroughRelayTests.swift */,
BF50924A241FFE8E00600DF4 /* ZipManyTests.swift */,
);
path = Tests;
sourceTree = SOURCE_ROOT;
Expand Down Expand Up @@ -197,6 +202,7 @@
78C193DD241D46F40001B7FD /* Materialize.swift */,
78988A22241FFE2400F3A4AF /* Partition.swift */,
78C193E1241D596F0001B7FD /* Dematerialize.swift */,
BF8121BB241FF42C006A93B8 /* ZipMany.swift */,
);
path = Operators;
sourceTree = "<group>";
Expand Down Expand Up @@ -313,6 +319,7 @@
78002BB7241E915E0018AA28 /* CurrentValueRelay.swift in Sources */,
78C193DE241D46F40001B7FD /* Materialize.swift in Sources */,
78988A23241FFE2400F3A4AF /* Partition.swift in Sources */,
BF8121BC241FF42C006A93B8 /* ZipMany.swift in Sources */,
78C193D4241C2DE00001B7FD /* Create.swift in Sources */,
OBJ_22 /* AssignToMany.swift in Sources */,
78C193D7241C2E580001B7FD /* Event.swift in Sources */,
Expand Down Expand Up @@ -340,6 +347,7 @@
78C193E0241D4D8D0001B7FD /* MaterializeTests.swift in Sources */,
78002BBB241E97350018AA28 /* CurrentValueRelayTests.swift in Sources */,
78C193E4241D63620001B7FD /* DematerializeTests.swift in Sources */,
BF50924B241FFE8E00600DF4 /* ZipManyTests.swift in Sources */,
78988A1E241EAFDD00F3A4AF /* PassthroughRelayTests.swift in Sources */,
78C193D9241CEEA80001B7FD /* CreateTests.swift in Sources */,
);
Expand Down
32 changes: 32 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ All operators, utilities and helpers respect Combine's publisher contract, inclu
* [values](#values)
* [failures](#failures)
* [dematerialize](#dematerialize)
* [zip(with:)](#ZipMany)

### Publishers
* [AnyPublisher.create](#anypublisher.create)
Expand Down Expand Up @@ -267,6 +268,37 @@ even: 4
odd: 5
```

------

### ZipMany

This repo includes two overloads on Combine’s `Publisher.zip` methods (which, at the time of writing only go up to arity three).

This lets you arbitrarily zip many publishers and receive either an array of inner publisher outputs back.

```swift
let first = PassthroughSubject<Int, Never>()
let second = PassthroughSubject<Int, Never>()
let third = PassthroughSubject<Int, Never>()
let fourth = PassthroughSubject<Int, Never>()

subscription = first
.zip(with: second, third, fourth)
.map { $0.reduce(0, +) }
.sink(receiveValue: { print("zipped: \($0)") })

first.send(1)
second.send(2)
third.send(3)
fourth.send(4)
```

#### Output:

```none
zipped: 10
```

## Publishers

This section outlines some of the custom Combine publishers CombineExt provides
Expand Down
38 changes: 38 additions & 0 deletions Sources/Operators/ZipMany.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
//
// ZipMany.swift
// CombineExt
//
// Created by Jasdev Singh on 16/03/2020.
// Copyright © 2020 Combine Community. All rights reserved.
//

import Combine

public extension Publisher {
/// Zips `self` with an array of publishers with the same output and failure types.
///
/// Since there can be any number of `others`, arrays of `Output` values are emitted after zipping.
///
/// - parameter others: The other publishers to zip with.
///
/// - returns: A type-erased publisher with value events from each of the inner publishers zipped together in an array.
func zip<Other: Publisher>(with others: [Other])
-> AnyPublisher<[Output], Failure> where Other.Output == Output, Other.Failure == Failure {
let seed = map { [$0] }.eraseToAnyPublisher()

return others
.reduce(seed) { zipped, next in
zipped
.zip(next)
.map { $0.0 + [$0.1] }
.eraseToAnyPublisher()
}
.eraseToAnyPublisher()
}

/// A variadic overload on `Publisher.zip(with:)`.
func zip<Other: Publisher>(with others: Other...)
-> AnyPublisher<[Output], Failure> where Other.Output == Output, Other.Failure == Failure {
zip(with: others)
}
}
2 changes: 1 addition & 1 deletion Tests/WithLatestFromTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class WithLatestFromTests: XCTestCase {
XCTAssertTrue(completed)
}

// We have to hold a reference to the subsccription or the
// We have to hold a reference to the subscription or the
// publisher will get deallocated and canceled
var demandSubscription: Subscription!
func testWithResultSelectorLimitedDemand() {
Expand Down
149 changes: 149 additions & 0 deletions Tests/ZipManyTests.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
//
// ZipManyTests.swift
// CombineExtTests
//
// Created by Jasdev Singh on 16/03/2020.
// Copyright © 2020 Combine Community. All rights reserved.
//

import XCTest
import Combine
import CombineExt

final class ZipManyTests: XCTestCase {
private var subscription: AnyCancellable!

private enum ZipManyError: Error {
case anError
}

func testOneEmissionZipping() {
let first = PassthroughSubject<Int, Never>()
let second = PassthroughSubject<Int, Never>()
let third = PassthroughSubject<Int, Never>()

var results = [[Int]]()
var completed = false

subscription = first
.zip(with: second, third)
.sink(receiveCompletion: { _ in completed = true },
receiveValue: { results.append($0) })

first.send(1)
second.send(2)
third.send(3)

XCTAssertEqual(results, [[1, 2, 3]])
XCTAssertFalse(completed)
first.send(completion: .finished)
XCTAssertTrue(completed)
}

func testMultipleEmissionZippingEndingWithAnError() {
let first = PassthroughSubject<Int, ZipManyError>()
let second = PassthroughSubject<Int, ZipManyError>()
let third = PassthroughSubject<Int, ZipManyError>()

var results = [[Int]]()
var completed: Subscribers.Completion<ZipManyError>?

subscription = first
.zip(with: second, third)
.sink(receiveCompletion: { completed = $0 },
receiveValue: { results.append($0) })

first.send(1)
first.send(1)
first.send(1)
first.send(1)

second.send(2)
second.send(2)
second.send(2)

third.send(3)
third.send(3)

XCTAssertEqual(results, [[1, 2, 3], [1, 2, 3]])
XCTAssertNil(completed)
first.send(completion: .failure(.anError))
XCTAssertEqual(completed, .failure(.anError))
}

func testNoEmissionZipping() {
let first = PassthroughSubject<Int, ZipManyError>()
let second = PassthroughSubject<Int, ZipManyError>()
let third = PassthroughSubject<Int, ZipManyError>()

var results = [[Int]]()
var completed = false

subscription = first
.zip(with: second, third)
.sink(receiveCompletion: { _ in completed = true },
receiveValue: { results.append($0) })

first.send(1)
second.send(2)

// Gated by `third` not emitting.

XCTAssertTrue(results.isEmpty)
XCTAssertFalse(completed)
}

func testZippingEndingWithAFinishedCompletion() {
let first = PassthroughSubject<Int, Never>()
let second = PassthroughSubject<Int, Never>()
let third = PassthroughSubject<Int, Never>()

var results = [[Int]]()
var completed: Subscribers.Completion<Never>?

subscription = first
.zip(with: second, third)
.sink(receiveCompletion: { completed = $0 },
receiveValue: { results.append($0) })

first.send(1)

second.send(2)
second.send(2)

third.send(3)

XCTAssertEqual(results, [[1, 2, 3]])
XCTAssertNil(completed)
first.send(completion: .finished) // Triggers a completion, since, there
// aren’t any buffered events from `first` (or `third`) to possibly pair with.
XCTAssertEqual(completed, .finished)
}

func testZippingWithAnInnerCompletionButNotAnOuter() {
let first = PassthroughSubject<Int, Never>()
let second = PassthroughSubject<Int, Never>()
let third = PassthroughSubject<Int, Never>()

var results = [[Int]]()
var completed: Subscribers.Completion<Never>?

subscription = first
.zip(with: second, third)
.sink(receiveCompletion: { completed = $0 },
receiveValue: { results.append($0) })

first.send(1)
first.send(1)

second.send(2)
second.send(2)

third.send(3)

XCTAssertEqual(results, [[1, 2, 3]])
XCTAssertNil(completed)
first.send(completion: .finished) // Doesn’t trigger a completion, since `first` has an extra un-paired value event.
XCTAssertNil(completed)
}
}

0 comments on commit 4febfd7

Please sign in to comment.