forked from swiftlang/swift
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathAsyncThrowingFilterSequence.swift
149 lines (134 loc) · 5.02 KB
/
AsyncThrowingFilterSequence.swift
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
//===----------------------------------------------------------------------===//
//
// This source file is part of the Swift.org open source project
//
// Copyright (c) 2021 Apple Inc. and the Swift project authors
// Licensed under Apache License v2.0 with Runtime Library Exception
//
// See https://swift.org/LICENSE.txt for license information
// See https://swift.org/CONTRIBUTORS.txt for the list of Swift project authors
//
//===----------------------------------------------------------------------===//
import Swift
@available(SwiftStdlib 5.1, *)
extension AsyncSequence {
/// Creates an asynchronous sequence that contains, in order, the elements of
/// the base sequence that satisfy the given error-throwing predicate.
///
/// In this example, an asynchronous sequence called `Counter` produces `Int`
/// values from `1` to `10`. The `filter(_:)` method returns `true` for even
/// values and `false` for odd values, thereby filtering out the odd values,
/// but also throws an error for values divisible by 5:
///
/// do {
/// let stream = Counter(howHigh: 10)
/// .filter {
/// if $0 % 5 == 0 {
/// throw MyError()
/// }
/// return $0 % 2 == 0
/// }
/// for try await number in stream {
/// print(number, terminator: " ")
/// }
/// } catch {
/// print("Error: \(error)")
/// }
/// // Prints "2 4 Error: MyError() "
///
/// - Parameter isIncluded: An error-throwing closure that takes an element
/// of the asynchronous sequence as its argument and returns a Boolean value
/// that indicates whether to include the element in the filtered sequence.
/// - Returns: An asynchronous sequence that contains, in order, the elements
/// of the base sequence that satisfy the given predicate. If the predicate
/// throws an error, the sequence contains only values produced prior to
/// the error.
@preconcurrency
@inlinable
public __consuming func filter(
_ isIncluded: @Sendable @escaping (Element) async throws -> Bool
) -> AsyncThrowingFilterSequence<Self> {
return AsyncThrowingFilterSequence(self, isIncluded: isIncluded)
}
}
/// An asynchronous sequence that contains, in order, the elements of
/// the base sequence that satisfy the given error-throwing predicate.
@available(SwiftStdlib 5.1, *)
public struct AsyncThrowingFilterSequence<Base: AsyncSequence> {
@usableFromInline
let base: Base
@usableFromInline
let isIncluded: (Element) async throws -> Bool
@usableFromInline
init(
_ base: Base,
isIncluded: @escaping (Base.Element) async throws -> Bool
) {
self.base = base
self.isIncluded = isIncluded
}
}
@available(SwiftStdlib 5.1, *)
extension AsyncThrowingFilterSequence: AsyncSequence {
/// The type of element produced by this asynchronous sequence.
///
/// The filter sequence produces whatever type of element its base
/// sequence produces.
public typealias Element = Base.Element
/// The type of iterator that produces elements of the sequence.
public typealias AsyncIterator = Iterator
/// The iterator that produces elements of the filter sequence.
public struct Iterator: AsyncIteratorProtocol {
@usableFromInline
var baseIterator: Base.AsyncIterator
@usableFromInline
let isIncluded: (Base.Element) async throws -> Bool
@usableFromInline
var finished = false
@usableFromInline
init(
_ baseIterator: Base.AsyncIterator,
isIncluded: @escaping (Base.Element) async throws -> Bool
) {
self.baseIterator = baseIterator
self.isIncluded = isIncluded
}
/// Produces the next element in the filter sequence.
///
/// This iterator calls `next()` on its base iterator; if this call returns
/// `nil`, `next()` returns nil. Otherwise, `next()` evaluates the
/// result with the `predicate` closure. If the closure returns `true`,
/// `next()` returns the received element; otherwise it awaits the next
/// element from the base iterator. If calling the closure throws an error,
/// the sequence ends and `next()` rethrows the error.
@inlinable
public mutating func next() async throws -> Base.Element? {
while !finished {
guard let element = try await baseIterator.next() else {
return nil
}
do {
if try await isIncluded(element) {
return element
}
} catch {
finished = true
throw error
}
}
return nil
}
}
@inlinable
public __consuming func makeAsyncIterator() -> Iterator {
return Iterator(base.makeAsyncIterator(), isIncluded: isIncluded)
}
}
@available(SwiftStdlib 5.1, *)
extension AsyncThrowingFilterSequence: @unchecked Sendable
where Base: Sendable,
Base.Element: Sendable { }
@available(SwiftStdlib 5.1, *)
extension AsyncThrowingFilterSequence.Iterator: @unchecked Sendable
where Base.AsyncIterator: Sendable,
Base.Element: Sendable { }