Skip to content

Commit 008cfc5

Browse files
author
Thibault Wittemberg
committed
operators: implement multicast
1 parent 78ec55f commit 008cfc5

21 files changed

Lines changed: 1026 additions & 652 deletions

CHANGELOG.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
1-
**v0.1.1 **
1+
**v0.2.0 **
22

3-
- AsyncStreams.CurrentValue `element` made public
3+
- AsyncStreams.CurrentValue `element` made public and available with get/set
4+
- new Multicast operator
45

56
**v0.1.0 - Hydrogen:**
67

README.md

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ AsyncSequences
4242
* [FlatMapLatest](#FlatMapLatest)
4343
* [HandleEvents](#HandleEvents)
4444
* [Assign](#Assign)
45+
* [Multicast] (#Multicast)
4546
* [EraseToAnyAsyncSequence](#EraseToAnyAsyncSequence)
4647

4748
More operators and extensions are to come. Pull requests are of course welcome.
@@ -237,13 +238,13 @@ The current value is replayed for any new async loop.
237238
let currentValue = AsyncStreams.CurrentValue<Int>(1)
238239

239240
Task {
240-
for try await element in passthrough {
241+
for try await element in currentValue {
241242
print(element) // will print 1 2
242243
}
243244
}
244245

245246
Task {
246-
for try await element in passthrough {
247+
for try await element in currentValue {
247248
print(element) // will print 1 2
248249
}
249250
}
@@ -391,6 +392,42 @@ let fromSequence = AsyncSequences.From(["1", "2", "3"])
391392
try await fromSequence.assign(to: \.property, on: root) // will set the property value to "1", "2", "3"
392393
```
393394

395+
### Multicast
396+
397+
`multicast(_:)` is useful when you have multiple client loops, but you want the upstream async sequence to only produce a single `AsyncIterator`.
398+
399+
```swift
400+
let stream = AsyncStreams.Passthrough<(String, Int)>()
401+
let multicastedAsyncSequence = ["First", "Second", "Third"]
402+
.asyncElements
403+
.map { ($0, Int.random(in: 0...100)) }
404+
.handleEvents(onElement: { print("AsyncSequence produces: ($0)") })
405+
.multicast(stream)
406+
407+
Task {
408+
try await multicastedAsyncSequence
409+
.collect { print ("Task 1 received: \($0)") }
410+
}
411+
412+
Task {
413+
try await multicastedAsyncSequence
414+
.collect { print ("Task 2 received: \($0)") }
415+
}
416+
417+
multicastedAsyncSequence.connect()
418+
419+
// will print:
420+
// AsyncSequence produces: ("First", 78)
421+
// Stream 2 received: ("First", 78)
422+
// Stream 1 received: ("First", 78)
423+
// AsyncSequence produces: ("Second", 98)
424+
// Stream 2 received: ("Second", 98)
425+
// Stream 1 received: ("Second", 98)
426+
// AsyncSequence produces: ("Third", 61)
427+
// Stream 2 received: ("Third", 61)
428+
// Stream 1 received: ("Third", 61)
429+
```
430+
394431
### EraseToAnyAsyncSequence
395432

396433
`eraseToAnyAsyncSequence()` type-erases the async sequence into an AnyAsyncSequence.

Sources/AsyncSequences/AsyncSequences+From.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public struct AsyncFromSequence<BaseSequence: Sequence>: AsyncSequence {
5050
}
5151

5252
public func makeAsyncIterator() -> AsyncIterator {
53-
Iterator(baseIterator: self.baseSequence.makeIterator(), interval: self.interval)
53+
return Iterator(baseIterator: self.baseSequence.makeIterator(), interval: self.interval)
5454
}
5555

5656
public struct Iterator: AsyncIteratorProtocol {

Sources/AsyncSequences/AsyncSequences+Merge.swift

Lines changed: 64 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
//
55
// Created by Thibault Wittemberg on 31/12/2021.
66
//
7+
import Foundation
78

89
public extension AsyncSequences {
910
/// `Merge` is an AsyncSequence that merges several async sequences respecting
@@ -63,63 +64,89 @@ public struct AsyncMergeSequence<UpstreamAsyncSequence: AsyncSequence>: AsyncSeq
6364
}
6465

6566
public func makeAsyncIterator() -> Iterator {
66-
return Iterator(upstreamIterators: self.upstreamAsyncSequences.map { $0.makeAsyncIterator() })
67+
return Iterator(upstreamAsyncSequences: self.upstreamAsyncSequences)
6768
}
6869

6970
enum UpstreamElement {
7071
case element(Element)
7172
case finished
7273
}
7374

74-
actor ElementCounter {
75-
var counter = 0
76-
77-
func increaseCounter() {
78-
self.counter += 1
79-
}
80-
81-
func decreaseCounter() {
82-
guard self.counter > 0 else { return }
83-
self.counter -= 1
84-
}
85-
86-
func hasElement() -> Bool {
87-
self.counter > 0
88-
}
89-
}
75+
// actor ElementCounter {
76+
// var counter = 0
77+
//
78+
// func increaseCounter() {
79+
// self.counter += 1
80+
// }
81+
//
82+
// func decreaseCounter() {
83+
// guard self.counter > 0 else { return }
84+
// self.counter -= 1
85+
// }
86+
//
87+
// func hasElement() -> Bool {
88+
// self.counter > 0
89+
// }
90+
// }
9091

9192
public struct Iterator: AsyncIteratorProtocol {
92-
let sink = AsyncStreams.Passthrough<UpstreamElement>()
93-
var sinkIterator: AsyncStreams.Passthrough<UpstreamElement>.AsyncIterator
94-
let upstreamIterators: [SharedAsyncIterator<UpstreamAsyncSequence.AsyncIterator>]
95-
let elementCounter = ElementCounter()
93+
94+
var downstreamIterator: AsyncStreams.Passthrough<UpstreamElement>.AsyncIterator
95+
let upstreamAsyncSequenceRegulators: [ConcurrentAccessRegulator<UpstreamAsyncSequence>]
96+
// let elementCounter = ElementCounter()
9697
var numberOfFinished = 0
9798

98-
public init(upstreamIterators: [UpstreamAsyncSequence.AsyncIterator]) {
99-
self.upstreamIterators = upstreamIterators.map { SharedAsyncIterator(iterator: $0) }
100-
self.sinkIterator = self.sink.makeAsyncIterator()
99+
public init(upstreamAsyncSequences: [UpstreamAsyncSequence]) {
100+
let downstreamStream = AsyncStreams.Passthrough<UpstreamElement>()
101+
self.downstreamIterator = downstreamStream.makeAsyncIterator()
102+
self.upstreamAsyncSequenceRegulators = upstreamAsyncSequences.map { upstreamAsyncSequence in
103+
var isAlreadyFinished = false
104+
return ConcurrentAccessRegulator(
105+
upstreamAsyncSequence,
106+
onNext: { element in
107+
if let nonNilElement = element {
108+
// await localElementCounter.increaseCounter()
109+
downstreamStream.send(.element(nonNilElement))
110+
return
111+
}
112+
113+
guard !isAlreadyFinished else { return }
114+
115+
isAlreadyFinished = true
116+
downstreamStream.send(.finished)
117+
},
118+
onError: { error in
119+
downstreamStream.send(termination: .failure(error))
120+
},
121+
onCancel: {
122+
guard !isAlreadyFinished else { return }
123+
isAlreadyFinished = true
124+
downstreamStream.send(.finished)
125+
})
126+
}
101127
}
102128

103-
mutating func nextElementFromSink() async throws -> Element? {
129+
mutating func nextElementFromDownstreamStream() async throws -> Element? {
104130
var noValue = true
105131
var value: Element?
106132

107133
// we now have to eliminate the intermediate ".finished" values until the next
108134
// true value is found.
109135
// if every upstream iterator has finished, then the zipped async sequence is also finished
110136
while noValue {
111-
guard let nextChildElement = try await self.sinkIterator.next() else {
112-
// the sink stream is finished, so is the zipped async sequence
137+
guard let nextChildElement = try await self.downstreamIterator.next() else {
138+
// the downstream stream is finished, so is the zipped async sequence
113139
noValue = false
114140
value = nil
115141
break
116142
}
117143

118144
switch nextChildElement {
119145
case .finished:
146+
120147
self.numberOfFinished += 1
121148

122-
if self.numberOfFinished == self.upstreamIterators.count {
149+
if self.numberOfFinished == self.upstreamAsyncSequenceRegulators.count {
123150
noValue = false
124151
value = nil
125152
break
@@ -128,7 +155,7 @@ public struct AsyncMergeSequence<UpstreamAsyncSequence: AsyncSequence>: AsyncSeq
128155
// nominal case: a next element is available
129156
noValue = false
130157
value = element
131-
await self.elementCounter.decreaseCounter()
158+
// await self.elementCounter.decreaseCounter()
132159
}
133160
}
134161

@@ -138,57 +165,27 @@ public struct AsyncMergeSequence<UpstreamAsyncSequence: AsyncSequence>: AsyncSeq
138165
public mutating func next() async throws -> Element? {
139166
guard !Task.isCancelled else { return nil }
140167

141-
// before requesting elements from the upstream iterators, we should reauest the next element from the sink iterator
168+
// before requesting elements from the upstream iterators, we should request the next element from the sink iterator
142169
// if it has some stacked values
143-
144170
// for now we leave it commented as I'm not sure it is not counterproductive.
145171
// This "early" drain might prevent from requesting the next available upstream iterators as soon as possible
146172
// since the sink iterator might deliver a value and the next will return right away
147173

148-
// guard await !self.elementCounter.hasElement() else {
149-
// return try await self.nextElementFromSink()
150-
// }
151-
152-
let localSink = self.sink
153-
let localElementCounter = self.elementCounter
174+
// guard await !self.elementCounter.hasElement() else {
175+
// return try await self.nextElementFromSink()
176+
// }
154177

155178
// iterating over the upstream iterators to ask for their next element. Only
156-
// the available iterators are requested (not already being computing the next
179+
// the available iterators will respond (not already being computing the next
157180
// element from the previous iteration and not already finished)
158-
for upstreamIterator in self.upstreamIterators {
159-
guard !Task.isCancelled else { break }
160-
161-
let localUpstreamIterator = upstreamIterator
162-
guard await !localUpstreamIterator.isFinished() else { continue }
181+
for upstreamAsyncSequenceRegulator in self.upstreamAsyncSequenceRegulators {
163182
Task {
164-
do {
165-
let nextSharedElement = try await localUpstreamIterator.next()
166-
167-
// if the next element is nil, it means one of the upstream iterator
168-
// is finished ... its does not mean the zipped async sequence is finished (all upstream iterators have to be finished)
169-
guard let nextNonNilSharedElement = nextSharedElement else {
170-
await localSink.send(.finished)
171-
return
172-
}
173-
174-
guard case let .value(nextElement) = nextNonNilSharedElement else {
175-
// the upstream iterator was not available ... see you at the next iteration
176-
return
177-
}
178-
179-
// we have a next element from an upstream iterator, pushing it in the sink stream
180-
await localSink.send(.element(nextElement))
181-
await localElementCounter.increaseCounter()
182-
} catch is CancellationError {
183-
await localSink.send(.finished)
184-
} catch {
185-
await localSink.send(termination: .failure(error))
186-
}
183+
await upstreamAsyncSequenceRegulator.requestNextIfAvailable()
187184
}
188185
}
189186

190187
// we wait for the sink iterator to deliver the next element
191-
return try await self.nextElementFromSink()
188+
return try await self.nextElementFromDownstreamStream()
192189
}
193190
}
194191
}

Sources/AsyncStreams/AsyncStreams+CurrentValue.swift

Lines changed: 37 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,8 @@ public extension AsyncStreams {
3636
typealias CurrentValue<Element> = AsyncCurrentValueStream<Element>
3737
}
3838

39-
public final class AsyncCurrentValueStream<Element>: Stream, Sendable {
40-
actor Storage {
39+
public final class AsyncCurrentValueStream<Element>: Stream, @unchecked Sendable {
40+
final class Storage {
4141
var element: Element
4242

4343
init(_ element: Element) {
@@ -54,12 +54,27 @@ public final class AsyncCurrentValueStream<Element>: Stream, Sendable {
5454
}
5555

5656
public typealias AsyncIterator = AsyncStreams.Iterator<Element>
57+
58+
// we must make sure the inner continuations and storage can be used in a concurrent context since there can be multiple
59+
// operations happening at the same time (concurrent registrations and sendings).
60+
// we could use an Actor to enforce that BUT there is a drawback. If we use an Actor to handle Continuations,
61+
// when registering a new continuation, the register function would have to be called within a Task
62+
// because of its async nature. Doing so, it means that we could call `send` while the registration is not done and we
63+
// would loose the value.
64+
let serialQueue = DispatchQueue(label: UUID().uuidString)
65+
5766
let continuations = AsyncStreams.Continuations<Element>()
5867
let storage: Storage
5968

6069
public var element: Element {
61-
get async {
62-
await self.storage.retrieve()
70+
get {
71+
self.serialQueue.sync {
72+
self.storage.retrieve()
73+
}
74+
}
75+
76+
set {
77+
self.send(newValue)
6378
}
6479
}
6580

@@ -69,25 +84,27 @@ public final class AsyncCurrentValueStream<Element>: Stream, Sendable {
6984

7085
/// Sends a value to all underlying async sequences
7186
/// - Parameter element: the value to send
72-
public func send(_ element: Element) async {
73-
await self.storage.update(element)
74-
await self.continuations.send(element)
87+
public func send(_ element: Element) {
88+
self.serialQueue.async { [weak self] in
89+
self?.storage.update(element)
90+
self?.continuations.send(element)
91+
}
7592
}
7693

7794
/// Finishes the async sequences with either a normal ending or an error.
7895
/// - Parameter termination: The termination to finish the async sequence.
79-
public func send(termination: Termination) async {
80-
await self.continuations.send(termination)
96+
public func send(termination: Termination) {
97+
self.serialQueue.async { [weak self] in
98+
self?.continuations.send(termination)
99+
}
81100
}
82101

83102
func makeStream(forClientId clientId: UUID) -> AsyncThrowingStream<Element, Error> {
84-
return AsyncThrowingStream<Element, Error>(Element.self, bufferingPolicy: .unbounded) { [continuations, storage] continuation in
85-
Task {
86-
// registration is async because the continuations are managed by an actor (to avoid race conditions on its internal storage).
87-
// registering a continuation is possible only when the actor is available.
88-
let element = await storage.retrieve()
103+
return AsyncThrowingStream<Element, Error>(Element.self, bufferingPolicy: .unbounded) { [weak self] continuation in
104+
self?.serialQueue.async { [weak self] in
105+
self?.continuations.register(continuation: continuation, forId: clientId)
106+
guard let element = self?.storage.retrieve() else { return }
89107
continuation.yield(element)
90-
await continuations.register(continuation: continuation, forId: clientId)
91108
}
92109
}
93110
}
@@ -96,9 +113,12 @@ public final class AsyncCurrentValueStream<Element>: Stream, Sendable {
96113
let clientId = UUID()
97114
let stream = self.makeStream(forClientId: clientId)
98115
return AsyncStreams.Iterator<Element>(
99-
clientId: clientId,
100116
baseIterator: stream.makeAsyncIterator(),
101-
continuations: self.continuations
117+
onCancelOrFinish: { [weak self] in
118+
self?.serialQueue.async { [weak self] in
119+
self?.continuations.unregister(id: clientId)
120+
}
121+
}
102122
)
103123
}
104124
}

0 commit comments

Comments
 (0)