Skip to content

Commit c09db2c

Browse files
authored
Merge pull request #12 from AsyncCommunity/feature/share
operators: add a Share operator
2 parents 1e2f3fe + 3b3810c commit c09db2c

11 files changed

Lines changed: 144 additions & 11 deletions

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
**v0.3.0 - Beryllium:**
2+
3+
- new Share operator
4+
15
**v0.2.1 - Lithium:**
26

37
- Enforce the call of onNext in a determinitisc way

README.md

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ AsyncSequences
4343
* [HandleEvents](#HandleEvents)
4444
* [Assign](#Assign)
4545
* [Multicast](#Multicast)
46+
* [Share](#Share)
4647
* [EraseToAnyAsyncSequence](#EraseToAnyAsyncSequence)
4748

4849
More operators and extensions are to come. Pull requests are of course welcome.
@@ -428,6 +429,39 @@ multicastedAsyncSequence.connect()
428429
// Stream 1 received: ("Third", 61)
429430
```
430431

432+
### Share
433+
434+
`share()` shares the output of an upstream async sequence with multiple client loops.
435+
`share()` is effectively a shortcut for `multicast(_:)` using a `Passthrough` stream, with an implicit `autoconnect()`.
436+
437+
```swift
438+
let sharedAsyncSequence = AsyncSequences.From(["first", "second", "third"], interval: .seconds(1))
439+
.map { ($0, Int.random(in: 0...100)) }
440+
.handleEvents(onElement: { print("AsyncSequence produces: \($0)") })
441+
.share()
442+
443+
Task {
444+
try await sharedAsyncSequence
445+
.collect { print ("Task 1 received: \($0)") }
446+
}
447+
448+
Task {
449+
try await sharedAsyncSequence
450+
.collect { print ("Task 2 received: \($0)") }
451+
}
452+
453+
// will print:
454+
// AsyncSequence produces: ("First", 78)
455+
// Stream 2 received: ("First", 78)
456+
// Stream 1 received: ("First", 78)
457+
// AsyncSequence produces: ("Second", 98)
458+
// Stream 2 received: ("Second", 98)
459+
// Stream 1 received: ("Second", 98)
460+
// AsyncSequence produces: ("Third", 61)
461+
// Stream 2 received: ("Third", 61)
462+
// Stream 1 received: ("Third", 61)
463+
```
464+
431465
### EraseToAnyAsyncSequence
432466

433467
`eraseToAnyAsyncSequence()` type-erases the async sequence into an AnyAsyncSequence.

Sources/AsyncSequences/AsyncSequences+Empty.swift

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@
55
// Created by Thibault Wittemberg on 31/12/2021.
66
//
77

8-
import Foundation
9-
108
public extension AsyncSequences {
119
/// `Empty` is an AsyncSequence that immediately finishes without emitting values.
1210
///

Sources/AsyncSequences/AsyncSequences+Merge.swift

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,6 @@ public struct AsyncMergeSequence<UpstreamAsyncSequence: AsyncSequence>: AsyncSeq
9090
// }
9191

9292
public struct Iterator: AsyncIteratorProtocol {
93-
9493
var downstreamIterator: AsyncStreams.Passthrough<UpstreamElement>.AsyncIterator
9594
let upstreamAsyncSequenceRegulators: [ConcurrentAccessRegulator<UpstreamAsyncSequence>]
9695
// let elementCounter = ElementCounter()

Sources/AsyncStreams/AsyncStreams+Passthrough.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public final class AsyncPassthroughStream<Element>: Stream, @unchecked Sendable
4343
// because of its async nature. Doing so, it means that we could call `send` while the registration is not done and we
4444
// would loose the value.
4545
let serialQueue = DispatchQueue(label: UUID().uuidString)
46-
46+
4747
let continuations = AsyncStreams.Continuations<Element>()
4848

4949
public init() {}

Sources/AsyncStreams/AsyncStreams+Replay.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ public final class AsyncReplayStream<Element>: Stream, @unchecked Sendable {
6363
// because of its async nature. Doing so, it means that we could call `send` while the registration is not done and we
6464
// would loose the value.
6565
let serialQueue = DispatchQueue(label: UUID().uuidString)
66-
66+
6767
let continuations = AsyncStreams.Continuations<Element>()
6868
let storage: Storage
6969

Sources/AsyncStreams/AsyncStreams.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ public enum AsyncStreams {}
1717
extension AsyncStreams {
1818
// Continuations can be accessed in a concurrent context. It is up to the caller to ensure
1919
// the usage in a safe way (cf Passthrough, CurrrentValue and Replay)
20-
final class Continuations<Element>{
20+
final class Continuations<Element> {
2121
var continuations = [AnyHashable: AsyncThrowingStream<Element, Error>.Continuation]()
2222

2323
func send(_ element: Element) {

Sources/Internal/ConcurrentAccessRegulator.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,9 +59,9 @@ final class ConcurrentAccessRegulator<UpstreamAsyncSequence: AsyncSequence>: @un
5959
do {
6060
let next = try await self.upstreamAsyncIterator?.next()
6161
await self.onNext(next)
62-
62+
6363
await self.gate.unlock()
64-
64+
6565
// yield allows to promote other tasks to resume, giving a chance to request a next element
6666
await Task.yield()
6767
} catch is CancellationError {

Sources/Operators/AsyncSequence+Multicast.swift

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ public extension AsyncSequence {
1818
/// let multicastedAsyncSequence = ["First", "Second", "Third"]
1919
/// .asyncElements
2020
/// .map { ($0, Int.random(in: 0...100)) }
21-
/// .handleEvents(onElement: { print("AsyncSequence produces: ($0)") })
21+
/// .handleEvents(onElement: { print("AsyncSequence produces: \($0)") })
2222
/// .multicast(stream)
2323
///
2424
/// Task {
@@ -44,7 +44,8 @@ public extension AsyncSequence {
4444
/// // Stream 2 received: ("Third", 61)
4545
/// // Stream 1 received: ("Third", 61)
4646
/// ```
47-
/// In this example, the output shows that the upstream async sequence produces each random value only one time, and then sends the value to both client loops.
47+
/// In this example, the output shows that the upstream async sequence produces each random value only one time,
48+
/// and then sends the value to both client loops.
4849
///
4950
/// - Parameter stream: A `Stream` to deliver elements to downstream client loops.
5051
func multicast<S: Stream>(_ stream: S) -> AsyncMulticastSequence<Self, S> where S.Element == Element {
@@ -79,7 +80,7 @@ where UpstreamAsyncSequence.Element == DownstreamStream.Element {
7980

8081
/// Automates the process of connecting the multicasted async sequence.
8182
///
82-
///```
83+
/// ```
8384
/// let stream = AsyncStreams.Passthrough<(String, Int)>()
8485
/// let multicastedAsyncSequence = ["First", "Second", "Third"]
8586
/// .asyncElements
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
//
2+
// AsyncSequence+Share.swift
3+
//
4+
//
5+
// Created by Thibault Wittemberg on 03/03/2022.
6+
//
7+
8+
public extension AsyncSequence {
9+
/// Shares the output of an upstream async sequence with multiple client loops.
10+
///
11+
/// - Tip: ``share()`` is effectively a shortcut for ``multicast()`` using a ``Passthrough`` stream, with an implicit ``autoconnect()``.
12+
///
13+
/// The following example uses an async sequence as a counter to emit three random numbers.
14+
/// Each element is delayed by 1s to give the seconf loop a chance to catch all the values.
15+
///
16+
/// ```
17+
/// let sharedAsyncSequence = AsyncSequences.From(["first", "second", "third"], interval: .seconds(1))
18+
/// .map { ($0, Int.random(in: 0...100)) }
19+
/// .handleEvents(onElement: { print("AsyncSequence produces: \($0)") })
20+
/// .share()
21+
///
22+
/// Task {
23+
/// try await sharedAsyncSequence
24+
/// .collect { print ("Task 1 received: \($0)") }
25+
/// }
26+
///
27+
/// Task {
28+
/// try await sharedAsyncSequence
29+
/// .collect { print ("Task 2 received: \($0)") }
30+
/// }
31+
///
32+
/// // will print:
33+
/// // AsyncSequence produces: ("First", 78)
34+
/// // Stream 2 received: ("First", 78)
35+
/// // Stream 1 received: ("First", 78)
36+
/// // AsyncSequence produces: ("Second", 98)
37+
/// // Stream 2 received: ("Second", 98)
38+
/// // Stream 1 received: ("Second", 98)
39+
/// // AsyncSequence produces: ("Third", 61)
40+
/// // Stream 2 received: ("Third", 61)
41+
/// // Stream 1 received: ("Third", 61)
42+
/// ```
43+
/// In this example, the output shows that the upstream async sequence produces each random value only one time,
44+
/// and then sends the value to both client loops.
45+
///
46+
/// Without the ``share()`` operator, loop 1 receives three random values, followed by loop 2 receiving three different random values.
47+
///
48+
/// Also note that ``AsyncShareSequence`` is a class rather than a structure like most other publishers.
49+
/// This means you can use this operator to create a publisher instance that uses reference semantics.
50+
/// - Returns: A class instance that shares elements received from its upstream async sequence to multiple client loops.
51+
func share() -> AsyncSequences.AsyncShareSequence<Self> {
52+
let stream = AsyncStreams.Passthrough<Element>()
53+
return self.multicast(stream).autoconnect()
54+
}
55+
}
56+
57+
public extension AsyncSequences {
58+
typealias AsyncShareSequence<S: AsyncSequence> = AsyncMulticastSequence<S, AsyncStreams.Passthrough<S.Element>>
59+
}

0 commit comments

Comments
 (0)