Skip to content

Commit

Permalink
Fix double-subscribe bug in ReplaySubject. (CombineCommunity#28)
Browse files Browse the repository at this point in the history
  • Loading branch information
jasdev authored May 1, 2020
1 parent be7e1d0 commit 5bd0d0f
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 35 deletions.
11 changes: 0 additions & 11 deletions Sources/Subjects/ReplaySubject.swift
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,6 @@ public final class ReplaySubject<Output, Failure: Error>: Subject {
// Keeping track of all live subscriptions, so `send` events can be forwarded to them.
private var subscriptions = [Subscription<AnySubscriber<Output, Failure>>]()

// We also track subscriber identifiers, to more quickly bottom-out double subscribes instead of having to do a
// linear pass over `subscriptions`.
private var subscriberIdentifiers = Set<CombineIdentifier>()

private var completion: Subscribers.Completion<Failure>?
private var isActive: Bool { completion == nil }

Expand Down Expand Up @@ -63,21 +59,14 @@ public final class ReplaySubject<Output, Failure: Error>: Subject {
public func receive<Subscriber: Combine.Subscriber>(subscriber: Subscriber) where Failure == Subscriber.Failure, Output == Subscriber.Input {
let subscriberIdentifier = subscriber.combineIdentifier

guard !subscriberIdentifiers.contains(subscriberIdentifier) else {
subscriber.receive(subscription: Subscriptions.empty)
return
}

let subscription = Subscription(downstream: AnySubscriber(subscriber)) { [weak self] in
guard let self = self,
let subscriptionIndex = self.subscriptions
.firstIndex(where: { $0.innerSubscriberIdentifier == subscriberIdentifier }) else { return }

self.subscriberIdentifiers.remove(subscriberIdentifier)
self.subscriptions.remove(at: subscriptionIndex)
}

subscriberIdentifiers.insert(subscriberIdentifier)
subscriptions.append(subscription)

subscriber.receive(subscription: subscription)
Expand Down
52 changes: 28 additions & 24 deletions Tests/ReplaySubjectTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -286,30 +286,6 @@ final class ReplaySubjectTests: XCTestCase {
XCTAssertTrue(completed)
}

func testDoubleSubscribe() {
let subject = ReplaySubject<Int, Never>(bufferSize: 1)

subject.send(1)
subject.send(completion: .finished)

var results = [Int]()
var completions = [Subscribers.Completion<Never>]()

let sink1 = Subscribers.Sink<Int, Never>(
receiveCompletion: { completions.append($0) },
receiveValue: { results.append($0) }
)

subject
.subscribe(sink1)

subject
.subscribe(sink1)

XCTAssertEqual(results, [1])
XCTAssertEqual(completions, [.finished])
}

private var demandSubscription: Subscription!
func testRespectsDemand() {
let subject = ReplaySubject<Int, Never>(bufferSize: 4)
Expand Down Expand Up @@ -341,4 +317,32 @@ final class ReplaySubjectTests: XCTestCase {

XCTAssertTrue(completed)
}

func testDoubleSubscribe() {
let subject = ReplaySubject<Int, Never>(bufferSize: 1)

subject.send(1)
subject.send(2)
subject.send(completion: .finished)

var results = [String]()
var completions = [Subscribers.Completion<Never>]()

let subscriber = AnySubscriber<String, Never>(
receiveSubscription: { $0.request(.max(1)) },
receiveValue: { results.append($0); return .none },
receiveCompletion: { completions.append($0) }
)

subject
.map { "a\($0)" }
.subscribe(subscriber)

subject
.map { "b\($0)" }
.subscribe(subscriber)

XCTAssertEqual(["a2", "b2"], results)
XCTAssertEqual([.finished, .finished], completions)
}
}

0 comments on commit 5bd0d0f

Please sign in to comment.