Skip to content

Commit 0b3f8f5

Browse files
author
Thibault Wittemberg
committed
operators: add a Share operator
1 parent 1e2f3fe commit 0b3f8f5

6 files changed

Lines changed: 136 additions & 3 deletions

File tree

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
**v0.3.0 - Beryllium:**
2+
3+
- new Share operator
4+
15
**v0.2.1 - Lithium:**
26

37
- Enforce the call of onNext in a determinitisc way

README.md

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ AsyncSequences
4343
* [HandleEvents](#HandleEvents)
4444
* [Assign](#Assign)
4545
* [Multicast](#Multicast)
46+
* [Share](#Share)
4647
* [EraseToAnyAsyncSequence](#EraseToAnyAsyncSequence)
4748

4849
More operators and extensions are to come. Pull requests are of course welcome.
@@ -428,6 +429,39 @@ multicastedAsyncSequence.connect()
428429
// Stream 1 received: ("Third", 61)
429430
```
430431

432+
### Share
433+
434+
`share()` shares the output of an upstream async sequence with multiple client loops.
435+
`share()` is effectively a shortcut for `multicast(_:)` using a `Passthrough` stream, with an implicit `autoconnect()`.
436+
437+
```swift
438+
let sharedAsyncSequence = AsyncSequences.From(["first", "second", "third"], interval: .seconds(1))
439+
.map { ($0, Int.random(in: 0...100)) }
440+
.handleEvents(onElement: { print("AsyncSequence produces: \($0)") })
441+
.share()
442+
443+
Task {
444+
try await sharedAsyncSequence
445+
.collect { print ("Task 1 received: \($0)") }
446+
}
447+
448+
Task {
449+
try await sharedAsyncSequence
450+
.collect { print ("Task 2 received: \($0)") }
451+
}
452+
453+
// will print:
454+
// AsyncSequence produces: ("First", 78)
455+
// Stream 2 received: ("First", 78)
456+
// Stream 1 received: ("First", 78)
457+
// AsyncSequence produces: ("Second", 98)
458+
// Stream 2 received: ("Second", 98)
459+
// Stream 1 received: ("Second", 98)
460+
// AsyncSequence produces: ("Third", 61)
461+
// Stream 2 received: ("Third", 61)
462+
// Stream 1 received: ("Third", 61)
463+
```
464+
431465
### EraseToAnyAsyncSequence
432466

433467
`eraseToAnyAsyncSequence()` type-erases the async sequence into an AnyAsyncSequence.

Sources/AsyncSequences/AsyncSequences+Empty.swift

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@
55
// Created by Thibault Wittemberg on 31/12/2021.
66
//
77

8-
import Foundation
9-
108
public extension AsyncSequences {
119
/// `Empty` is an AsyncSequence that immediately finishes without emitting values.
1210
///

Sources/Operators/AsyncSequence+Multicast.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ public extension AsyncSequence {
1818
/// let multicastedAsyncSequence = ["First", "Second", "Third"]
1919
/// .asyncElements
2020
/// .map { ($0, Int.random(in: 0...100)) }
21-
/// .handleEvents(onElement: { print("AsyncSequence produces: ($0)") })
21+
/// .handleEvents(onElement: { print("AsyncSequence produces: \($0)") })
2222
/// .multicast(stream)
2323
///
2424
/// Task {
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
//
2+
// AsyncSequence+Share.swift
3+
//
4+
//
5+
// Created by Thibault Wittemberg on 03/03/2022.
6+
//
7+
8+
public extension AsyncSequence {
9+
/// Shares the output of an upstream async sequence with multiple client loops.
10+
///
11+
/// - Tip: ``share()`` is effectively a shortcut for ``multicast()`` using a ``Passthrough`` stream, with an implicit ``autoconnect()``.
12+
///
13+
/// The following example uses an async sequence as a counter to emit three random numbers. Each element is delayed by 1s to give the seconf loop a chance to
14+
/// catch all the values.
15+
///
16+
/// ```
17+
/// let sharedAsyncSequence = AsyncSequences.From(["first", "second", "third"], interval: .seconds(1))
18+
/// .map { ($0, Int.random(in: 0...100)) }
19+
/// .handleEvents(onElement: { print("AsyncSequence produces: \($0)") })
20+
/// .share()
21+
///
22+
/// Task {
23+
/// try await sharedAsyncSequence
24+
/// .collect { print ("Task 1 received: \($0)") }
25+
/// }
26+
///
27+
/// Task {
28+
/// try await sharedAsyncSequence
29+
/// .collect { print ("Task 2 received: \($0)") }
30+
/// }
31+
///
32+
/// // will print:
33+
/// // AsyncSequence produces: ("First", 78)
34+
/// // Stream 2 received: ("First", 78)
35+
/// // Stream 1 received: ("First", 78)
36+
/// // AsyncSequence produces: ("Second", 98)
37+
/// // Stream 2 received: ("Second", 98)
38+
/// // Stream 1 received: ("Second", 98)
39+
/// // AsyncSequence produces: ("Third", 61)
40+
/// // Stream 2 received: ("Third", 61)
41+
/// // Stream 1 received: ("Third", 61)
42+
/// ```
43+
/// In this example, the output shows that the upstream async sequence produces each random value only one time, and then sends the value to both client loops.
44+
///
45+
/// Without the ``share()`` operator, loop 1 receives three random values, followed by loop 2 receiving three different random values.
46+
///
47+
/// Also note that ``AsyncShareSequence`` is a class rather than a structure like most other publishers.
48+
/// This means you can use this operator to create a publisher instance that uses reference semantics.
49+
/// - Returns: A class instance that shares elements received from its upstream async sequence to multiple client loops.
50+
func share() -> AsyncSequences.AsyncShareSequence<Self> {
51+
let stream = AsyncStreams.Passthrough<Element>()
52+
return self.multicast(stream).autoconnect()
53+
}
54+
}
55+
56+
extension AsyncSequences {
57+
public typealias AsyncShareSequence<UpstreamAsyncSequence: AsyncSequence> =
58+
AsyncMulticastSequence<UpstreamAsyncSequence,AsyncStreams.Passthrough<UpstreamAsyncSequence.Element>>
59+
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
//
2+
// AsyncSequence+ShareTests.swift
3+
//
4+
//
5+
// Created by Thibault Wittemberg on 03/03/2022.
6+
//
7+
8+
import AsyncExtensions
9+
import XCTest
10+
11+
final class AsyncSequence_ShareTests: XCTestCase {
12+
func test_share_multicasts_values_to_clientLoops() {
13+
let tasksHaveFinishedExpectation = expectation(description: "the tasks have finished")
14+
tasksHaveFinishedExpectation.expectedFulfillmentCount = 2
15+
16+
let sut = AsyncSequences
17+
.From(["first", "second", "third"], interval: .milliSeconds(500))
18+
.share()
19+
20+
Task {
21+
var received = [String]()
22+
try await sut
23+
.collect { received.append($0) }
24+
XCTAssertEqual(received, ["first", "second", "third"])
25+
tasksHaveFinishedExpectation.fulfill()
26+
}
27+
28+
Task {
29+
var received = [String]()
30+
try await sut
31+
.collect { received.append($0) }
32+
XCTAssertEqual(received, ["first", "second", "third"])
33+
tasksHaveFinishedExpectation.fulfill()
34+
}
35+
36+
waitForExpectations(timeout: 5)
37+
}
38+
}

0 commit comments

Comments
 (0)