Skip to content

Commit a657740

Browse files
committed
Fetch Subscription
1 parent 3e9edbe commit a657740

4 files changed

Lines changed: 298 additions & 2 deletions

File tree

Sources/CoreDataRepository/FetchRepository.swift

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,4 +92,26 @@ public final class FetchRepository {
9292
}
9393
}}.eraseToAnyPublisher()
9494
}
95+
96+
public func fetchSubscription<Model: UnmanagedModel>(_ request: NSFetchRequest<Model.RepoManaged>) -> AnyPublisher<Success<Model>, Failure<Model>> {
97+
AnyPublisher.create { [weak self] subscriber -> AnyCancellable in
98+
guard let self = self else {
99+
subscriber.send(completion: .failure(Failure(error: .unknown, fetchRequest: request)))
100+
return AnyCancellable {}
101+
}
102+
let id = UUID()
103+
let subscription = FetchRepository.Subscription<Model>(
104+
id: id,
105+
request: request,
106+
context: self.context
107+
)
108+
subscription.subject.sink(receiveCompletion: subscriber.send, receiveValue: subscriber.send).store(in: &self.cancellables)
109+
self.subscriptions.append(subscription)
110+
subscription.manualFetch()
111+
return AnyCancellable {
112+
subscription.cancel()
113+
self.subscriptions.removeAll(where: { $0.id == subscription.id })
114+
}
115+
}
116+
}
95117
}
Lines changed: 236 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,236 @@
1+
// https://github.com/CombineCommunity/CombineExt/blob/master/Sources/Operators/Create.swift
2+
// https://github.com/pointfreeco/swift-composable-architecture/blob/main/Sources/ComposableArchitecture/Internal/Locking.swift
3+
// https://github.com/pointfreeco/swift-composable-architecture/blob/main/Sources/ComposableArchitecture/Internal/Create.swift
4+
/*
5+
MIT License
6+
7+
Copyright (c) 2020 Point-Free, Inc.
8+
9+
Permission is hereby granted, free of charge, to any person obtaining a copy
10+
of this software and associated documentation files (the "Software"), to deal
11+
in the Software without restriction, including without limitation the rights
12+
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
13+
copies of the Software, and to permit persons to whom the Software is
14+
furnished to do so, subject to the following conditions:
15+
16+
The above copyright notice and this permission notice shall be included in all
17+
copies or substantial portions of the Software.
18+
19+
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
20+
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
21+
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
22+
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
23+
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
24+
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
25+
SOFTWARE.
26+
*/
27+
28+
/*
29+
Copyright (c) 2020 Combine Community, and/or Shai Mishali
30+
31+
Permission is hereby granted, free of charge, to any person obtaining a copy
32+
of this software and associated documentation files (the "Software"), to deal
33+
in the Software without restriction, including without limitation the rights
34+
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
35+
copies of the Software, and to permit persons to whom the Software is
36+
furnished to do so, subject to the following conditions:
37+
38+
The above copyright notice and this permission notice shall be included in
39+
all copies or substantial portions of the Software.
40+
41+
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
42+
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
43+
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
44+
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
45+
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
46+
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
47+
THE SOFTWARE.
48+
*/
49+
import Foundation
50+
import Combine
51+
import Darwin
52+
53+
extension UnsafeMutablePointer where Pointee == os_unfair_lock_s {
54+
@inlinable @discardableResult
55+
func sync<R>(_ work: () -> R) -> R {
56+
os_unfair_lock_lock(self)
57+
defer { os_unfair_lock_unlock(self) }
58+
return work()
59+
}
60+
}
61+
62+
extension NSRecursiveLock {
63+
@inlinable @discardableResult
64+
func sync<R>(work: () -> R) -> R {
65+
self.lock()
66+
defer { self.unlock() }
67+
return work()
68+
}
69+
}
70+
71+
private class DemandBuffer<S: Subscriber> {
72+
private var buffer = [S.Input]()
73+
private let subscriber: S
74+
private var completion: Subscribers.Completion<S.Failure>?
75+
private var demandState = Demand()
76+
private let lock: os_unfair_lock_t
77+
78+
init(subscriber: S) {
79+
self.subscriber = subscriber
80+
self.lock = os_unfair_lock_t.allocate(capacity: 1)
81+
self.lock.initialize(to: os_unfair_lock())
82+
}
83+
84+
deinit {
85+
self.lock.deinitialize(count: 1)
86+
self.lock.deallocate()
87+
}
88+
89+
func buffer(value: S.Input) -> Subscribers.Demand {
90+
precondition(
91+
self.completion == nil, "How could a completed publisher sent values?! Beats me 🤷‍♂️")
92+
93+
switch demandState.requested {
94+
case .unlimited:
95+
return subscriber.receive(value)
96+
default:
97+
buffer.append(value)
98+
return flush()
99+
}
100+
}
101+
102+
func complete(completion: Subscribers.Completion<S.Failure>) {
103+
precondition(
104+
self.completion == nil, "Completion have already occurred, which is quite awkward 🥺")
105+
106+
self.completion = completion
107+
_ = flush()
108+
}
109+
110+
func demand(_ demand: Subscribers.Demand) -> Subscribers.Demand {
111+
flush(adding: demand)
112+
}
113+
114+
private func flush(adding newDemand: Subscribers.Demand? = nil) -> Subscribers.Demand {
115+
self.lock.sync {
116+
117+
if let newDemand = newDemand {
118+
demandState.requested += newDemand
119+
}
120+
121+
// If buffer isn't ready for flushing, return immediately
122+
guard demandState.requested > 0 || newDemand == Subscribers.Demand.none else { return .none }
123+
124+
while !buffer.isEmpty && demandState.processed < demandState.requested {
125+
demandState.requested += subscriber.receive(buffer.remove(at: 0))
126+
demandState.processed += 1
127+
}
128+
129+
if let completion = completion {
130+
// Completion event was already sent
131+
buffer = []
132+
demandState = .init()
133+
self.completion = nil
134+
subscriber.receive(completion: completion)
135+
return .none
136+
}
137+
138+
let sentDemand = demandState.requested - demandState.sent
139+
demandState.sent += sentDemand
140+
return sentDemand
141+
}
142+
}
143+
144+
struct Demand {
145+
var processed: Subscribers.Demand = .none
146+
var requested: Subscribers.Demand = .none
147+
var sent: Subscribers.Demand = .none
148+
}
149+
}
150+
151+
extension AnyPublisher {
152+
private init(_ callback: @escaping (AnyPublisher<Output, Failure>.Subscriber) -> Cancellable) {
153+
self = Publishers.Create(callback: callback).eraseToAnyPublisher()
154+
}
155+
156+
static func create(
157+
_ factory: @escaping (AnyPublisher<Output, Failure>.Subscriber) -> Cancellable
158+
) -> AnyPublisher<Output, Failure> {
159+
AnyPublisher(factory)
160+
}
161+
}
162+
163+
extension Publishers {
164+
fileprivate class Create<Output, Failure: Swift.Error>: Publisher {
165+
private let callback: (AnyPublisher<Output, Failure>.Subscriber) -> Cancellable
166+
167+
init(callback: @escaping (AnyPublisher<Output, Failure>.Subscriber) -> Cancellable) {
168+
self.callback = callback
169+
}
170+
171+
func receive<S: Subscriber>(subscriber: S) where Failure == S.Failure, Output == S.Input {
172+
subscriber.receive(subscription: Subscription(callback: callback, downstream: subscriber))
173+
}
174+
}
175+
}
176+
177+
extension Publishers.Create {
178+
fileprivate class Subscription<Downstream: Subscriber>: Combine.Subscription
179+
where Output == Downstream.Input, Failure == Downstream.Failure {
180+
private let buffer: DemandBuffer<Downstream>
181+
private var cancellable: Cancellable?
182+
183+
init(
184+
callback: @escaping (AnyPublisher<Output, Failure>.Subscriber) -> Cancellable,
185+
downstream: Downstream
186+
) {
187+
self.buffer = DemandBuffer(subscriber: downstream)
188+
189+
let cancellable = callback(
190+
.init(
191+
send: { [weak self] in _ = self?.buffer.buffer(value: $0) },
192+
complete: { [weak self] in self?.buffer.complete(completion: $0) }
193+
)
194+
)
195+
196+
self.cancellable = cancellable
197+
}
198+
199+
func request(_ demand: Subscribers.Demand) {
200+
_ = self.buffer.demand(demand)
201+
}
202+
203+
func cancel() {
204+
self.cancellable?.cancel()
205+
}
206+
}
207+
}
208+
209+
extension Publishers.Create.Subscription: CustomStringConvertible {
210+
var description: String {
211+
return "Create.Subscription<\(Output.self), \(Failure.self)>"
212+
}
213+
}
214+
215+
extension AnyPublisher {
216+
public struct Subscriber {
217+
private let _send: (Output) -> Void
218+
private let _complete: (Subscribers.Completion<Failure>) -> Void
219+
220+
init(
221+
send: @escaping (Output) -> Void,
222+
complete: @escaping (Subscribers.Completion<Failure>) -> Void
223+
) {
224+
self._send = send
225+
self._complete = complete
226+
}
227+
228+
public func send(_ value: Output) {
229+
self._send(value)
230+
}
231+
232+
public func send(completion: Subscribers.Completion<Failure>) {
233+
self._complete(completion)
234+
}
235+
}
236+
}

Tests/CoreDataRepositoryTests/FetchRepositorySubscriptionTests.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import XCTest
1111
@testable import CoreDataRepository
1212

1313
extension FetchRepositoryTests {
14-
func testFetchSubscriptionSuccess() {
14+
func testSubscriptionSuccess() {
1515
let firstExp = expectation(description: "Fetch movies from CoreData")
1616
let secondExp = expectation(description: "Fetch movies again after CoreData context is updated")
1717
let finalExp = expectation(description: "Finish fetching movies after canceled.")

Tests/CoreDataRepositoryTests/FetchRepositoryTests.swift

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@ final class FetchRepositoryTests: CoreDataXCTestCase {
1414

1515
static var allTests = [
1616
("testFetchSuccess", testFetchSuccess),
17-
("testFetchSubscriptionSuccess", testFetchSubscriptionSuccess)
17+
("testFetchSubscriptionSuccess", testFetchSubscriptionSuccess),
18+
("testSubscriptionSuccess", testSubscriptionSuccess)
1819
]
1920

2021
typealias Success = FetchRepository.Success<Movie>
@@ -68,4 +69,41 @@ final class FetchRepositoryTests: CoreDataXCTestCase {
6869
})
6970
wait(for: [exp], timeout: 5)
7071
}
72+
73+
func testFetchSubscriptionSuccess() {
74+
let firstExp = expectation(description: "Fetch movies from CoreData")
75+
let secondExp = expectation(description: "Fetch movies again after CoreData context is updated")
76+
var resultCount = 0
77+
let result: AnyPublisher<Success, Failure> = repository.fetchSubscription(fetchRequest)
78+
let cancellable = result.subscribe(on: backgroundQueue)
79+
.receive(on: mainQueue)
80+
.sink(receiveCompletion: { completion in
81+
switch completion {
82+
case .finished:
83+
XCTFail("Not expecting completion since subscription finishes after subscriber cancel")
84+
default:
85+
XCTFail("Not expecting failure")
86+
}
87+
}, receiveValue: { value in
88+
resultCount += 1
89+
switch resultCount {
90+
case 1:
91+
assert(value.items.count == 5, "Result items count should match expectation")
92+
assert(value.items == self.expectedMovies, "Result items should match expectations")
93+
firstExp.fulfill()
94+
case 2:
95+
assert(value.items.count == 4, "Result items count should match expectation")
96+
assert(value.items == Array(self.expectedMovies[0...3]), "Result items should match expectations")
97+
secondExp.fulfill()
98+
default:
99+
break
100+
}
101+
102+
})
103+
wait(for: [firstExp], timeout: 5)
104+
let crudRepository = CRUDRepository(context: self.backgroundContext)
105+
let _: AnyPublisher<CRUDRepository.Success<Movie>, CRUDRepository.Failure<Movie>> = crudRepository.delete(self.expectedMovies.last!.objectID!)
106+
wait(for: [secondExp], timeout: 5)
107+
cancellable.cancel()
108+
}
71109
}

0 commit comments

Comments
 (0)