Skip to content

Commit a9cd8d0

Browse files
author
Thibault Wittemberg
committed
operators: implement WithLatestFrom
1 parent af684d5 commit a9cd8d0

5 files changed

Lines changed: 330 additions & 11 deletions

File tree

README.md

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ AsyncSequences
4545
* [Assign](#Assign)
4646
* [Multicast](#Multicast)
4747
* [Share](#Share)
48+
* [WithLatestFrom](#WithLatestFrom)
4849
* [EraseToAnyAsyncSequence](#EraseToAnyAsyncSequence)
4950

5051
More operators and extensions are to come. Pull requests are of course welcome.
@@ -485,6 +486,33 @@ Task {
485486
// Stream 1 received: ("Third", 61)
486487
```
487488

489+
### WithLatestFrom
490+
491+
`withLatestFrom(_:)` merges two async sequences into a single one by combining each value
492+
from self with the latest value from the other sequence, if any.
493+
494+
```
495+
let seq1 = AsyncStreams.CurrentValue<Int>(1)
496+
let seq2 = AsyncStreams.CurrentValue<String>("1")
497+
498+
let combinedSeq = seq1.withLatestFrom(seq2)
499+
500+
Task {
501+
for try await element in combinedSeq {
502+
print(element)
503+
}
504+
}
505+
506+
seq1.send(2)
507+
seq2.send("2")
508+
seq1.send(3)
509+
510+
// will print:
511+
(1, "1")
512+
(2, "1")
513+
(3, "2")
514+
```
515+
488516
### EraseToAnyAsyncSequence
489517

490518
`eraseToAnyAsyncSequence()` type-erases the async sequence into an AnyAsyncSequence.

Sources/Operators/AsyncSequence+SwitchToLatest.swift

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,11 @@ public extension AsyncSequence where Element: AsyncSequence {
2222
/// // will print:
2323
/// a3, b3
2424
/// ```
25+
/// - parameter upstreamPriority: can be used to change the priority of the task that supports the iteration over the upstream sequence (nil by default)
2526
///
2627
/// - Returns: The async sequence that republishes elements sent by the most recently received async sequence.
27-
func switchToLatest() -> AsyncSwitchToLatestSequence<Self> {
28-
AsyncSwitchToLatestSequence<Self>(self)
28+
func switchToLatest(upstreamPriority: TaskPriority? = nil) -> AsyncSwitchToLatestSequence<Self> {
29+
AsyncSwitchToLatestSequence<Self>(self, upstreamPriority: upstreamPriority)
2930
}
3031
}
3132

@@ -34,13 +35,21 @@ public struct AsyncSwitchToLatestSequence<UpstreamAsyncSequence: AsyncSequence>:
3435
public typealias AsyncIterator = Iterator
3536

3637
let upstreamAsyncSequence: UpstreamAsyncSequence
38+
let upstreamPriority: TaskPriority?
3739

38-
public init(_ upstreamAsyncSequence: UpstreamAsyncSequence) {
40+
public init(
41+
_ upstreamAsyncSequence: UpstreamAsyncSequence,
42+
upstreamPriority: TaskPriority?
43+
) {
3944
self.upstreamAsyncSequence = upstreamAsyncSequence
45+
self.upstreamPriority = upstreamPriority
4046
}
4147

4248
public func makeAsyncIterator() -> AsyncIterator {
43-
Iterator(upstreamIterator: self.upstreamAsyncSequence.makeAsyncIterator())
49+
Iterator(
50+
upstreamIterator: self.upstreamAsyncSequence.makeAsyncIterator(),
51+
upstreamPriority: self.upstreamPriority
52+
)
4453
}
4554

4655
final class UpstreamIteratorManager {
@@ -49,10 +58,15 @@ public struct AsyncSwitchToLatestSequence<UpstreamAsyncSequence: AsyncSequence>:
4958
var hasStarted = false
5059
var currentTask: Task<Element?, Error>?
5160

61+
let upstreamPriority: TaskPriority?
5262
let serialQueue = DispatchQueue(label: UUID().uuidString)
5363

54-
init(upstreamIterator: UpstreamAsyncSequence.AsyncIterator) {
64+
init(
65+
upstreamIterator: UpstreamAsyncSequence.AsyncIterator,
66+
upstreamPriority: TaskPriority?
67+
) {
5568
self.upstreamIterator = upstreamIterator
69+
self.upstreamPriority = upstreamPriority
5670
}
5771

5872
func setCurrentTask(task: Task<Element?, Error>) {
@@ -70,7 +84,7 @@ public struct AsyncSwitchToLatestSequence<UpstreamAsyncSequence: AsyncSequence>:
7084
}
7185
}
7286

73-
Task { [weak self] in
87+
Task(priority: self.upstreamPriority) { [weak self] in
7488
while let nextChildSequence = try await self?.upstreamIterator.next() {
7589
self?.serialQueue.async { [weak self] in
7690
self?.childIterators.removeFirst()
@@ -92,8 +106,14 @@ public struct AsyncSwitchToLatestSequence<UpstreamAsyncSequence: AsyncSequence>:
92106
public struct Iterator: AsyncIteratorProtocol {
93107
let upstreamIteratorManager: UpstreamIteratorManager
94108

95-
init(upstreamIterator: UpstreamAsyncSequence.AsyncIterator) {
96-
self.upstreamIteratorManager = UpstreamIteratorManager(upstreamIterator: upstreamIterator)
109+
init(
110+
upstreamIterator: UpstreamAsyncSequence.AsyncIterator,
111+
upstreamPriority: TaskPriority?
112+
) {
113+
self.upstreamIteratorManager = UpstreamIteratorManager(
114+
upstreamIterator: upstreamIterator,
115+
upstreamPriority: upstreamPriority
116+
)
97117
}
98118

99119
public mutating func next() async throws -> Element? {
Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
//
2+
// AsyncSequence+WithLatestFrom.swift
3+
//
4+
//
5+
// Created by Thibault Wittemberg on 07/03/2022.
6+
//
7+
8+
public extension AsyncSequence {
9+
/// Merges two AsyncSequences into a single one by combining each value
10+
/// from self with the latest value from the other sequence, if any.
11+
///
12+
/// ```
13+
/// let seq1 = AsyncStreams.CurrentValue<Int>(1)
14+
/// let seq2 = AsyncStreams.CurrentValue<String>("1")
15+
///
16+
/// let combinedSeq = seq1.withLatestFrom(seq2)
17+
///
18+
/// Task {
19+
/// for try await element in combinedSeq {
20+
/// print(element)
21+
/// }
22+
/// }
23+
///
24+
/// seq1.send(2)
25+
/// seq2.send("2")
26+
/// seq1.send(3)
27+
///
28+
/// // will print:
29+
/// (1, "1")
30+
/// (2, "1")
31+
/// (3, "2")
32+
/// ```
33+
///
34+
/// - parameter other: the other async sequence
35+
///
36+
/// - returns: An async sequence emitting the result of combining each value of the self
37+
/// with the latest value from the other sequence. If the other sequence finishes, the returned sequence
38+
/// will finish with the next value from self.
39+
///
40+
func withLatestFrom<OtherAsyncSequence: AsyncSequence>(
41+
_ other: OtherAsyncSequence,
42+
otherPriority: TaskPriority? = nil
43+
) -> AsyncWithLatestFromSequence<Self, OtherAsyncSequence> {
44+
AsyncWithLatestFromSequence(
45+
self,
46+
other: other,
47+
otherPriority: otherPriority
48+
)
49+
}
50+
}
51+
52+
public struct AsyncWithLatestFromSequence<UpstreamAsyncSequence: AsyncSequence, OtherAsyncSequence: AsyncSequence>: AsyncSequence {
53+
public typealias Element = (UpstreamAsyncSequence.Element, OtherAsyncSequence.Element)
54+
public typealias AsyncIterator = Iterator
55+
56+
let upstreamAsyncSequence: UpstreamAsyncSequence
57+
let otherAsyncSequence: OtherAsyncSequence
58+
let otherPriority: TaskPriority?
59+
60+
public init(
61+
_ upstreamAsyncSequence: UpstreamAsyncSequence,
62+
other otherAsyncSequence: OtherAsyncSequence,
63+
otherPriority: TaskPriority? = nil
64+
) {
65+
self.upstreamAsyncSequence = upstreamAsyncSequence
66+
self.otherAsyncSequence = otherAsyncSequence
67+
self.otherPriority = otherPriority
68+
}
69+
70+
public func makeAsyncIterator() -> AsyncIterator {
71+
Iterator(
72+
self.upstreamAsyncSequence.makeAsyncIterator(),
73+
other: self.otherAsyncSequence.makeAsyncIterator(),
74+
otherPriority: self.otherPriority
75+
)
76+
}
77+
78+
final class OtherIteratorManager {
79+
var otherElement: OtherAsyncSequence.Element?
80+
var otherIterator: OtherAsyncSequence.AsyncIterator
81+
var hasStarted = false
82+
83+
let otherPriority: TaskPriority?
84+
85+
init(
86+
otherIterator: OtherAsyncSequence.AsyncIterator,
87+
otherPriority: TaskPriority?
88+
) {
89+
self.otherIterator = otherIterator
90+
self.otherPriority = otherPriority
91+
}
92+
93+
/// iterates over the other sequence and track its current value
94+
func startOtherIterator() async throws {
95+
guard !self.hasStarted else { return }
96+
self.hasStarted = true
97+
98+
self.otherElement = try await self.otherIterator.next()
99+
100+
Task(priority: self.otherPriority) { [weak self] in
101+
while let element = try await self?.otherIterator.next() {
102+
guard !Task.isCancelled else { break }
103+
104+
self?.otherElement = element
105+
}
106+
self?.otherElement = nil
107+
}
108+
}
109+
}
110+
111+
public struct Iterator: AsyncIteratorProtocol {
112+
var upstreamAsyncIterator: UpstreamAsyncSequence.AsyncIterator
113+
let otherIteratorManager: OtherIteratorManager
114+
115+
init(
116+
_ upstreamAsyncIterator: UpstreamAsyncSequence.AsyncIterator,
117+
other otherAsyncIterator: OtherAsyncSequence.AsyncIterator,
118+
otherPriority: TaskPriority?
119+
) {
120+
self.upstreamAsyncIterator = upstreamAsyncIterator
121+
self.otherIteratorManager = OtherIteratorManager(
122+
otherIterator: otherAsyncIterator,
123+
otherPriority: otherPriority
124+
)
125+
}
126+
127+
public mutating func next() async throws -> Element? {
128+
guard !Task.isCancelled else { return nil }
129+
130+
try await self.otherIteratorManager.startOtherIterator()
131+
132+
let upstreamElement = try await self.upstreamAsyncIterator.next()
133+
let otherElement = self.otherIteratorManager.otherElement
134+
135+
guard let nonNilUpstreamElement = upstreamElement,
136+
let nonNilOtherElement = otherElement else {
137+
return nil
138+
}
139+
140+
return (nonNilUpstreamElement, nonNilOtherElement)
141+
}
142+
}
143+
}

Tests/Operators/AsyncSequence+SwitchToLatestTests.swift

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ final class AsyncSequence_SwitchToLatestTests: XCTestCase {
6060
interval: .milliSeconds(30),
6161
onCancel: {})
6262

63-
let sut = mainAsyncSequence.switchToLatest()
63+
let sut = mainAsyncSequence.switchToLatest(upstreamPriority: .high)
6464

6565
var receivedElements = [Int]()
6666
let expectedElements = [0, 4, 8, 9, 10, 11]
@@ -81,7 +81,7 @@ final class AsyncSequence_SwitchToLatestTests: XCTestCase {
8181
LongAsyncSequence(elements: [1], onCancel: {}).eraseToAnyAsyncSequence(),
8282
AsyncSequences.Fail<Int>(error: expectedError).eraseToAnyAsyncSequence()].asyncElements
8383

84-
let sut = sourceSequence.switchToLatest()
84+
let sut = sourceSequence.switchToLatest(upstreamPriority: .high)
8585

8686
do {
8787
for try await _ in sut {}
@@ -101,7 +101,7 @@ final class AsyncSequence_SwitchToLatestTests: XCTestCase {
101101
interval: .milliSeconds(50),
102102
onCancel: {}
103103
)}
104-
let sut = mappedSequence.switchToLatest()
104+
let sut = mappedSequence.switchToLatest(upstreamPriority: .high)
105105

106106
let task = Task {
107107
var firstElement: Int?

0 commit comments

Comments
 (0)