Skip to content

Commit 78ec55f

Browse files
authored
Merge pull request #6 from AsyncCommunity/feature/adopt-actors-for-streams
project: make streams async (using actors) + improve concurrent iterators
2 parents e4aeb0f + ba369c0 commit 78ec55f

24 files changed

Lines changed: 915 additions & 477 deletions

Sources/AsyncSequences/AsyncSequences+From.swift

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -57,17 +57,14 @@ public struct AsyncFromSequence<BaseSequence: Sequence>: AsyncSequence {
5757
var baseIterator: BaseSequence.Iterator
5858
var interval: AsyncSequences.Interval
5959

60-
public mutating func next() async -> BaseSequence.Element? {
60+
public mutating func next() async throws -> BaseSequence.Element? {
6161
guard !Task.isCancelled else { return nil }
6262

6363
if self.interval != .immediate {
64-
do {
65-
try await Task.sleep(nanoseconds: self.interval.value)
66-
} catch {}
64+
try await Task.sleep(nanoseconds: self.interval.value)
6765
}
6866

69-
let next = self.baseIterator.next()
70-
return next
67+
return self.baseIterator.next()
7168
}
7269
}
7370
}

Sources/AsyncSequences/AsyncSequences+Merge.swift

Lines changed: 79 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -66,112 +66,50 @@ public struct AsyncMergeSequence<UpstreamAsyncSequence: AsyncSequence>: AsyncSeq
6666
return Iterator(upstreamIterators: self.upstreamAsyncSequences.map { $0.makeAsyncIterator() })
6767
}
6868

69-
actor UpstreamAsyncIteratorState {
70-
var busy = false
71-
var finished = false
72-
73-
func setBusy(_ value: Bool) {
74-
self.busy = value
75-
}
76-
77-
func setFinished() {
78-
self.finished = true
79-
self.busy = false
80-
}
81-
82-
func isAvailable() -> Bool {
83-
!self.busy && !self.finished
84-
}
69+
enum UpstreamElement {
70+
case element(Element)
71+
case finished
8572
}
8673

87-
final class UpstreamAsyncIterator<BaseAsyncIterator: AsyncIteratorProtocol>: AsyncIteratorProtocol {
88-
public typealias Element = BaseAsyncIterator.Element
89-
var iterator: BaseAsyncIterator?
90-
let state = UpstreamAsyncIteratorState()
74+
actor ElementCounter {
75+
var counter = 0
9176

92-
init(iterator: BaseAsyncIterator?) {
93-
self.iterator = iterator
77+
func increaseCounter() {
78+
self.counter += 1
9479
}
9580

96-
public func next() async throws -> BaseAsyncIterator.Element? {
97-
guard !Task.isCancelled else { return nil }
98-
99-
await self.state.setBusy(true)
100-
let next = try await self.iterator?.next()
101-
if next == nil {
102-
await self.state.setFinished()
103-
}
104-
await self.state.setBusy(false)
105-
return next
81+
func decreaseCounter() {
82+
guard self.counter > 0 else { return }
83+
self.counter -= 1
10684
}
10785

108-
public func isAvailable() async -> Bool {
109-
await self.state.isAvailable()
86+
func hasElement() -> Bool {
87+
self.counter > 0
11088
}
11189
}
11290

113-
enum UpstreamElement {
114-
case element(Element)
115-
case finished
116-
}
117-
11891
public struct Iterator: AsyncIteratorProtocol {
119-
let passthrough = AsyncStreams.Passthrough<UpstreamElement>()
120-
var passthroughIterator: AsyncStreams.Passthrough<UpstreamElement>.AsyncIterator
121-
let upstreamIterators: [UpstreamAsyncIterator<UpstreamAsyncSequence.AsyncIterator>]
92+
let sink = AsyncStreams.Passthrough<UpstreamElement>()
93+
var sinkIterator: AsyncStreams.Passthrough<UpstreamElement>.AsyncIterator
94+
let upstreamIterators: [SharedAsyncIterator<UpstreamAsyncSequence.AsyncIterator>]
95+
let elementCounter = ElementCounter()
12296
var numberOfFinished = 0
12397

12498
public init(upstreamIterators: [UpstreamAsyncSequence.AsyncIterator]) {
125-
self.upstreamIterators = upstreamIterators.map { UpstreamAsyncIterator(iterator: $0) }
126-
self.passthroughIterator = self.passthrough.makeAsyncIterator()
99+
self.upstreamIterators = upstreamIterators.map { SharedAsyncIterator(iterator: $0) }
100+
self.sinkIterator = self.sink.makeAsyncIterator()
127101
}
128102

129-
// swiftlint:disable:next cyclomatic_complexity
130-
public mutating func next() async throws -> Element? {
131-
guard !Task.isCancelled else { return nil }
132-
133-
let localPassthrough = self.passthrough
134-
135-
// iterating over the upstream iterators to ask for their next element. Only
136-
// the available iterators are requested (not already being computing the next
137-
// element from the previous iteration)
138-
for upstreamIterator in self.upstreamIterators {
139-
guard !Task.isCancelled else { break }
140-
141-
let localUpstreamIterator = upstreamIterator
142-
143-
// isAvailable() means is not busy and not finished
144-
if await localUpstreamIterator.isAvailable() {
145-
Task {
146-
do {
147-
let nextElement = try await localUpstreamIterator.next()
148-
149-
// if the next element is nil, it means one if the upstream iterator
150-
// is finished ... its does not mean the zipped async sequence is finished
151-
guard let nonNilNextElement = nextElement else {
152-
localPassthrough.send(.finished)
153-
return
154-
}
155-
156-
localPassthrough.send(.element(nonNilNextElement))
157-
} catch is CancellationError {
158-
localPassthrough.send(.finished)
159-
} catch {
160-
localPassthrough.send(termination: .failure(error))
161-
}
162-
}
163-
}
164-
}
165-
103+
mutating func nextElementFromSink() async throws -> Element? {
166104
var noValue = true
167105
var value: Element?
168106

169107
// we now have to eliminate the intermediate ".finished" values until the next
170108
// true value is found.
171109
// if every upstream iterator has finished, then the zipped async sequence is also finished
172110
while noValue {
173-
guard let nextChildElement = try await self.passthroughIterator.next() else {
174-
// the passthrough is finished, so is the zipped async sequence
111+
guard let nextChildElement = try await self.sinkIterator.next() else {
112+
// the sink stream is finished, so is the zipped async sequence
175113
noValue = false
176114
value = nil
177115
break
@@ -187,13 +125,70 @@ public struct AsyncMergeSequence<UpstreamAsyncSequence: AsyncSequence>: AsyncSeq
187125
break
188126
}
189127
case let .element(element):
190-
// nominal case: a net element is available
128+
// nominal case: a next element is available
191129
noValue = false
192130
value = element
131+
await self.elementCounter.decreaseCounter()
193132
}
194133
}
195134

196135
return value
197136
}
137+
138+
public mutating func next() async throws -> Element? {
139+
guard !Task.isCancelled else { return nil }
140+
141+
// before requesting elements from the upstream iterators, we should reauest the next element from the sink iterator
142+
// if it has some stacked values
143+
144+
// for now we leave it commented as I'm not sure it is not counterproductive.
145+
// This "early" drain might prevent from requesting the next available upstream iterators as soon as possible
146+
// since the sink iterator might deliver a value and the next will return right away
147+
148+
// guard await !self.elementCounter.hasElement() else {
149+
// return try await self.nextElementFromSink()
150+
// }
151+
152+
let localSink = self.sink
153+
let localElementCounter = self.elementCounter
154+
155+
// iterating over the upstream iterators to ask for their next element. Only
156+
// the available iterators are requested (not already being computing the next
157+
// element from the previous iteration and not already finished)
158+
for upstreamIterator in self.upstreamIterators {
159+
guard !Task.isCancelled else { break }
160+
161+
let localUpstreamIterator = upstreamIterator
162+
guard await !localUpstreamIterator.isFinished() else { continue }
163+
Task {
164+
do {
165+
let nextSharedElement = try await localUpstreamIterator.next()
166+
167+
// if the next element is nil, it means one of the upstream iterator
168+
// is finished ... its does not mean the zipped async sequence is finished (all upstream iterators have to be finished)
169+
guard let nextNonNilSharedElement = nextSharedElement else {
170+
await localSink.send(.finished)
171+
return
172+
}
173+
174+
guard case let .value(nextElement) = nextNonNilSharedElement else {
175+
// the upstream iterator was not available ... see you at the next iteration
176+
return
177+
}
178+
179+
// we have a next element from an upstream iterator, pushing it in the sink stream
180+
await localSink.send(.element(nextElement))
181+
await localElementCounter.increaseCounter()
182+
} catch is CancellationError {
183+
await localSink.send(.finished)
184+
} catch {
185+
await localSink.send(termination: .failure(error))
186+
}
187+
}
188+
}
189+
190+
// we wait for the sink iterator to deliver the next element
191+
return try await self.nextElementFromSink()
192+
}
198193
}
199194
}

Sources/AsyncSequences/AsyncSequences+Zip.swift

Lines changed: 24 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -80,14 +80,14 @@ public struct AsyncZip2Sequence<UpstreamAsyncSequenceA: AsyncSequence, UpstreamA
8080

8181
public func makeAsyncIterator() -> AsyncIterator {
8282
return Iterator(
83-
upstreamIteratorA: SharedAsyncIterator(iterator: self.upstreamAsyncSequenceA.makeAsyncIterator()),
84-
upstreamIteratorB: SharedAsyncIterator(iterator: self.upstreamAsyncSequenceB.makeAsyncIterator())
83+
upstreamIteratorA: AsyncIteratorByRef(iterator: self.upstreamAsyncSequenceA.makeAsyncIterator()),
84+
upstreamIteratorB: AsyncIteratorByRef(iterator: self.upstreamAsyncSequenceB.makeAsyncIterator())
8585
)
8686
}
8787

8888
public struct Iterator: AsyncIteratorProtocol {
89-
let upstreamIteratorA: SharedAsyncIterator<UpstreamAsyncSequenceA.AsyncIterator>
90-
let upstreamIteratorB: SharedAsyncIterator<UpstreamAsyncSequenceB.AsyncIterator>
89+
let upstreamIteratorA: AsyncIteratorByRef<UpstreamAsyncSequenceA.AsyncIterator>
90+
let upstreamIteratorB: AsyncIteratorByRef<UpstreamAsyncSequenceB.AsyncIterator>
9191

9292
public mutating func next() async throws -> Element? {
9393
guard !Task.isCancelled else { return nil }
@@ -136,9 +136,9 @@ public struct AsyncZip2Sequence<UpstreamAsyncSequenceA: AsyncSequence, UpstreamA
136136
}
137137
}
138138

139-
public struct AsyncZip3Sequence < UpstreamAsyncSequenceA: AsyncSequence,
140-
UpstreamAsyncSequenceB: AsyncSequence,
141-
UpstreamAsyncSequenceC: AsyncSequence>: AsyncSequence {
139+
public struct AsyncZip3Sequence <UpstreamAsyncSequenceA: AsyncSequence,
140+
UpstreamAsyncSequenceB: AsyncSequence,
141+
UpstreamAsyncSequenceC: AsyncSequence>: AsyncSequence {
142142
public typealias Element = (UpstreamAsyncSequenceA.Element, UpstreamAsyncSequenceB.Element, UpstreamAsyncSequenceC.Element)
143143
public typealias AsyncIterator = Iterator
144144

@@ -156,16 +156,16 @@ public struct AsyncZip3Sequence < UpstreamAsyncSequenceA: AsyncSequence,
156156

157157
public func makeAsyncIterator() -> AsyncIterator {
158158
return Iterator(
159-
upstreamIteratorA: SharedAsyncIterator(iterator: self.upstreamAsyncSequenceA.makeAsyncIterator()),
160-
upstreamIteratorB: SharedAsyncIterator(iterator: self.upstreamAsyncSequenceB.makeAsyncIterator()),
161-
upstreamIteratorC: SharedAsyncIterator(iterator: self.upstreamAsyncSequenceC.makeAsyncIterator())
159+
upstreamIteratorA: AsyncIteratorByRef(iterator: self.upstreamAsyncSequenceA.makeAsyncIterator()),
160+
upstreamIteratorB: AsyncIteratorByRef(iterator: self.upstreamAsyncSequenceB.makeAsyncIterator()),
161+
upstreamIteratorC: AsyncIteratorByRef(iterator: self.upstreamAsyncSequenceC.makeAsyncIterator())
162162
)
163163
}
164164

165165
public struct Iterator: AsyncIteratorProtocol {
166-
let upstreamIteratorA: SharedAsyncIterator<UpstreamAsyncSequenceA.AsyncIterator>
167-
let upstreamIteratorB: SharedAsyncIterator<UpstreamAsyncSequenceB.AsyncIterator>
168-
let upstreamIteratorC: SharedAsyncIterator<UpstreamAsyncSequenceC.AsyncIterator>
166+
let upstreamIteratorA: AsyncIteratorByRef<UpstreamAsyncSequenceA.AsyncIterator>
167+
let upstreamIteratorB: AsyncIteratorByRef<UpstreamAsyncSequenceB.AsyncIterator>
168+
let upstreamIteratorC: AsyncIteratorByRef<UpstreamAsyncSequenceC.AsyncIterator>
169169

170170
public mutating func next() async throws -> Element? {
171171
guard !Task.isCancelled else { return nil }
@@ -242,20 +242,11 @@ public struct AsyncZipSequence<UpstreamAsyncSequence: AsyncSequence>: AsyncSeque
242242
}
243243

244244
public func makeAsyncIterator() -> AsyncIterator {
245-
return Iterator(upstreamIterators: self.upstreamAsyncSequences.map { SharedAsyncIterator(iterator: $0.makeAsyncIterator()) })
246-
}
247-
248-
actor SequenceIndexGenerator {
249-
var index: Int = 0
250-
251-
func nextIndex() -> Int {
252-
self.index += 1
253-
return index
254-
}
245+
return Iterator(upstreamIterators: self.upstreamAsyncSequences.map { AsyncIteratorByRef(iterator: $0.makeAsyncIterator()) })
255246
}
256247

257248
public struct Iterator: AsyncIteratorProtocol {
258-
let upstreamIterators: [SharedAsyncIterator<UpstreamAsyncSequence.AsyncIterator>]
249+
let upstreamIterators: [AsyncIteratorByRef<UpstreamAsyncSequence.AsyncIterator>]
259250

260251
public mutating func next() async throws -> Element? {
261252
guard !Task.isCancelled else { return nil }
@@ -302,3 +293,12 @@ public struct AsyncZipSequence<UpstreamAsyncSequence: AsyncSequence>: AsyncSeque
302293
}
303294
}
304295
}
296+
297+
actor SequenceIndexGenerator {
298+
var index: Int = 0
299+
300+
func nextIndex() -> Int {
301+
self.index += 1
302+
return index
303+
}
304+
}

0 commit comments

Comments
 (0)