Skip to content

Commit 669d2fc

Browse files
committed
WIP: Cleaner subscription API. Aggregate isn't publishing, premature cancel
1 parent a657740 commit 669d2fc

8 files changed

Lines changed: 335 additions & 204 deletions

Sources/CoreDataRepository/AggregateRepository.swift

Lines changed: 100 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ public final class AggregateRepository {
1212
// MARK: Properties
1313
/// The context used by the repository
1414
public let context: NSManagedObjectContext
15+
var cancellables = [AnyCancellable]()
16+
var subscriptions = [SubscriptionProvider]()
1517

1618
// MARK: Init
1719
/// Initializes a repository
@@ -36,34 +38,25 @@ public final class AggregateRepository {
3638
public struct Success<Value: Numeric> {
3739
let function: Function
3840
let result: [[String: Value]]
39-
let predicate: NSPredicate
41+
let request: NSFetchRequest<NSDictionary>
42+
43+
var predicate: NSPredicate? {
44+
request.predicate
45+
}
4046
}
4147

4248
/// A return type for failure to calculate
4349
public struct Failure: Error {
4450
let function: Function
45-
let predicate: NSPredicate
51+
let request: NSFetchRequest<NSDictionary>
4652
let error: RepositoryErrors
53+
54+
var predicate: NSPredicate? {
55+
request.predicate
56+
}
4757
}
4858

49-
// MARK: Private Functions
50-
/// Calculates aggregate values
51-
/// - Parameters
52-
/// - function: Function
53-
/// - predicate: NSPredicate
54-
/// - entityDesc: NSEntityDescription
55-
/// - attributeDesc: NSAttributeDescription
56-
/// - groupBy: NSAttributeDescription? = nil
57-
/// - Returns
58-
/// - `[[String: Value]]`
59-
///
60-
private func aggregate<Value: Numeric>(
61-
function: Function,
62-
predicate: NSPredicate,
63-
entityDesc: NSEntityDescription,
64-
attributeDesc: NSAttributeDescription,
65-
groupBy: NSAttributeDescription? = nil
66-
) throws -> [[String: Value]] {
59+
private func request(function: Function, predicate: NSPredicate, entityDesc: NSEntityDescription, attributeDesc: NSAttributeDescription, groupBy: NSAttributeDescription? = nil) -> NSFetchRequest<NSDictionary> {
6760
let expDesc = NSExpressionDescription.aggregate(function: function, attributeDesc: attributeDesc)
6861
let request = NSFetchRequest<NSDictionary>(entityName: entityDesc.className)
6962
request.entity = entityDesc
@@ -79,6 +72,21 @@ public final class AggregateRepository {
7972
request.propertiesToGroupBy = [groupBy.name]
8073
}
8174
request.sortDescriptors = [NSSortDescriptor(key: attributeDesc.name, ascending: false)]
75+
return request
76+
}
77+
78+
// MARK: Private Functions
79+
/// Calculates aggregate values
80+
/// - Parameters
81+
/// - function: Function
82+
/// - predicate: NSPredicate
83+
/// - entityDesc: NSEntityDescription
84+
/// - attributeDesc: NSAttributeDescription
85+
/// - groupBy: NSAttributeDescription? = nil
86+
/// - Returns
87+
/// - `[[String: Value]]`
88+
///
89+
private func aggregate<Value: Numeric>(request: NSFetchRequest<NSDictionary>) throws -> [[String: Value]] {
8290
let result = try self.context.fetch(request)
8391
return result as? [[String: Value]] ?? []
8492
}
@@ -93,14 +101,15 @@ public final class AggregateRepository {
93101
///
94102
public func count(predicate: NSPredicate, entityDesc: NSEntityDescription) -> AnyPublisher<Success<Int>, Failure> {
95103
return Deferred { Future { [weak self] callback in
96-
guard let self = self else { return callback(.failure(Failure(function: .count, predicate: predicate, error: .unknown))) }
97-
let request = NSFetchRequest<NSFetchRequestResult>(entityName: entityDesc.name ?? "")
104+
let request = NSFetchRequest<NSDictionary>(entityName: entityDesc.name ?? "")
98105
request.predicate = predicate
106+
request.sortDescriptors = [NSSortDescriptor(key: entityDesc.attributesByName.values.first!.name, ascending: true)]
107+
guard let self = self else { return callback(.failure(Failure(function: .count, request: request, error: .unknown))) }
99108
do {
100109
let count = try self.context.count(for: request)
101-
callback(.success(Success(function: .count, result: [["countOf\(entityDesc.name ?? "")": count]], predicate: predicate)))
110+
callback(.success(Success(function: .count, result: [["countOf\(entityDesc.name ?? "")": count]], request: request)))
102111
} catch {
103-
callback(.failure(Failure(function: .count, predicate: predicate, error: .cocoa(error as NSError))))
112+
callback(.failure(Failure(function: .count, request: request, error: .cocoa(error as NSError))))
104113
}
105114

106115
}}.eraseToAnyPublisher()
@@ -116,16 +125,17 @@ public final class AggregateRepository {
116125
/// - AnyPublisher<Success<Value>, Failure<Value>>
117126
///
118127
public func sum<Value: Numeric>(predicate: NSPredicate, entityDesc: NSEntityDescription, attributeDesc: NSAttributeDescription, groupBy: NSAttributeDescription? = nil) -> AnyPublisher<Success<Value>, Failure> {
128+
let request = self.request(function: .sum, predicate: predicate, entityDesc: entityDesc, attributeDesc: attributeDesc, groupBy: groupBy)
119129
guard entityDesc == attributeDesc.entity else {
120-
return Fail(error: Failure(function: .sum, predicate: predicate, error: .propertyDoesNotMatchEntity)).eraseToAnyPublisher()
130+
return Fail(error: Failure(function: .sum, request: request, error: .propertyDoesNotMatchEntity)).eraseToAnyPublisher()
121131
}
122132
return Deferred { Future { [weak self] callback in
123-
guard let self = self else { return callback(.failure(Failure(function: .sum, predicate: predicate, error: .unknown))) }
133+
guard let self = self else { return callback(.failure(Failure(function: .sum, request: request, error: .unknown))) }
124134
do {
125-
let result: [[String: Value]] = try self.aggregate(function: .sum, predicate: predicate, entityDesc: entityDesc, attributeDesc: attributeDesc, groupBy: groupBy)
126-
callback(.success(Success(function: .sum, result: result, predicate: predicate)))
135+
let result: [[String: Value]] = try self.aggregate(request: request)
136+
callback(.success(Success(function: .sum, result: result, request: request)))
127137
} catch {
128-
callback(.failure(Failure(function: .sum, predicate: predicate, error: .cocoa(error as NSError))))
138+
callback(.failure(Failure(function: .sum, request: request, error: .cocoa(error as NSError))))
129139
}
130140
}}.eraseToAnyPublisher()
131141
}
@@ -140,16 +150,17 @@ public final class AggregateRepository {
140150
/// - AnyPublisher<Success<Value>, Failure<Value>>
141151
///
142152
public func average<Value: Numeric>(predicate: NSPredicate, entityDesc: NSEntityDescription, attributeDesc: NSAttributeDescription, groupBy: NSAttributeDescription? = nil) -> AnyPublisher<Success<Value>, Failure> {
153+
let request = self.request(function: .average, predicate: predicate, entityDesc: entityDesc, attributeDesc: attributeDesc, groupBy: groupBy)
143154
guard entityDesc == attributeDesc.entity else {
144-
return Fail(error: Failure(function: .average, predicate: predicate, error: .propertyDoesNotMatchEntity)).eraseToAnyPublisher()
155+
return Fail(error: Failure(function: .average, request: request, error: .propertyDoesNotMatchEntity)).eraseToAnyPublisher()
145156
}
146157
return Deferred { Future { [weak self] callback in
147-
guard let self = self else { return callback(.failure(Failure(function: .average, predicate: predicate, error: .unknown))) }
158+
guard let self = self else { return callback(.failure(Failure(function: .average, request: request, error: .unknown))) }
148159
do {
149-
let result: [[String: Value]] = try self.aggregate(function: .average, predicate: predicate, entityDesc: entityDesc, attributeDesc: attributeDesc, groupBy: groupBy)
150-
callback(.success(Success(function: .average, result: result, predicate: predicate)))
160+
let result: [[String: Value]] = try self.aggregate(request: request)
161+
callback(.success(Success(function: .average, result: result, request: request)))
151162
} catch {
152-
callback(.failure(Failure(function: .average, predicate: predicate, error: .cocoa(error as NSError))))
163+
callback(.failure(Failure(function: .average, request: request, error: .cocoa(error as NSError))))
153164
}
154165
}}.eraseToAnyPublisher()
155166
}
@@ -164,16 +175,17 @@ public final class AggregateRepository {
164175
/// - AnyPublisher<Success<Value>, Failure<Value>>
165176
///
166177
public func min<Value: Numeric>(predicate: NSPredicate, entityDesc: NSEntityDescription, attributeDesc: NSAttributeDescription, groupBy: NSAttributeDescription? = nil) -> AnyPublisher<Success<Value>, Failure> {
178+
let request = self.request(function: .min, predicate: predicate, entityDesc: entityDesc, attributeDesc: attributeDesc, groupBy: groupBy)
167179
guard entityDesc == attributeDesc.entity else {
168-
return Fail(error: Failure(function: .min, predicate: predicate, error: .propertyDoesNotMatchEntity)).eraseToAnyPublisher()
180+
return Fail(error: Failure(function: .min, request: request, error: .propertyDoesNotMatchEntity)).eraseToAnyPublisher()
169181
}
170182
return Deferred { Future { [weak self] callback in
171-
guard let self = self else { return callback(.failure(Failure(function: .min, predicate: predicate, error: .unknown))) }
183+
guard let self = self else { return callback(.failure(Failure(function: .min, request: request, error: .unknown))) }
172184
do {
173-
let result: [[String: Value]] = try self.aggregate(function: .min, predicate: predicate, entityDesc: entityDesc, attributeDesc: attributeDesc, groupBy: groupBy)
174-
callback(.success(Success(function: .min, result: result, predicate: predicate)))
185+
let result: [[String: Value]] = try self.aggregate(request: request)
186+
callback(.success(Success(function: .min, result: result, request: request)))
175187
} catch {
176-
callback(.failure(Failure(function: .min, predicate: predicate, error: .cocoa(error as NSError))))
188+
callback(.failure(Failure(function: .min, request: request, error: .cocoa(error as NSError))))
177189
}
178190
}}.eraseToAnyPublisher()
179191
}
@@ -188,19 +200,57 @@ public final class AggregateRepository {
188200
/// - AnyPublisher<Success<Value>, Failure<Value>>
189201
///
190202
public func max<Value: Numeric>(predicate: NSPredicate, entityDesc: NSEntityDescription, attributeDesc: NSAttributeDescription, groupBy: NSAttributeDescription? = nil) -> AnyPublisher<Success<Value>, Failure> {
203+
let request = self.request(function: .max, predicate: predicate, entityDesc: entityDesc, attributeDesc: attributeDesc, groupBy: groupBy)
191204
guard entityDesc == attributeDesc.entity else {
192-
return Fail(error: Failure(function: .max, predicate: predicate, error: .propertyDoesNotMatchEntity)).eraseToAnyPublisher()
205+
return Fail(error: Failure(function: .max, request: request, error: .propertyDoesNotMatchEntity)).eraseToAnyPublisher()
193206
}
194207
return Deferred { Future { [weak self] callback in
195-
guard let self = self else { return callback(.failure(Failure(function: .max, predicate: predicate, error: .unknown))) }
208+
guard let self = self else { return callback(.failure(Failure(function: .max, request: request, error: .unknown))) }
196209
do {
197-
let result: [[String: Value]] = try self.aggregate(function: .max, predicate: predicate, entityDesc: entityDesc, attributeDesc: attributeDesc, groupBy: groupBy)
198-
callback(.success(Success(function: .max, result: result, predicate: predicate)))
210+
let result: [[String: Value]] = try self.aggregate(request: request)
211+
callback(.success(Success(function: .max, result: result, request: request)))
199212
} catch {
200-
callback(.failure(Failure(function: .max, predicate: predicate, error: .cocoa(error as NSError))))
213+
callback(.failure(Failure(function: .max, request: request, error: .cocoa(error as NSError))))
201214
}
202215
}}.eraseToAnyPublisher()
203216
}
217+
218+
public func subscription<Value: Numeric>(_ publisher: AnyPublisher<Success<Value>, Failure>) -> AnyPublisher<Success<Value>, Failure> {
219+
return AnyPublisher.create { subscriber in
220+
let subject = PassthroughSubject<Success<Value>, Failure>()
221+
subject.sink(receiveCompletion: subscriber.send, receiveValue: subscriber.send).store(in: &self.cancellables)
222+
let id = UUID()
223+
var subscription: SubscriptionProvider?
224+
publisher.sink(
225+
receiveCompletion: { completion in
226+
if case .failure = completion {
227+
subject.send(completion: completion)
228+
}
229+
},
230+
receiveValue: { value in
231+
let castValue = value.result.first!.values.first!
232+
subscription = RepositorySubscription(
233+
id: id,
234+
request: value.request,
235+
context: self.context,
236+
success: { Success(function: value.function, result: $0 as? [[String: Value]] ?? [], request: value.request) },
237+
failure: { Failure(function: value.function, request: value.request, error: $0) },
238+
subject: subject
239+
)
240+
subscription?.start()
241+
if let sub = subscription {
242+
self.subscriptions.append(sub)
243+
}
244+
subject.send(value)
245+
246+
}
247+
).store(in: &self.cancellables)
248+
return AnyCancellable {
249+
subscription?.cancel()
250+
self.subscriptions.removeAll(where: { $0.id == id as AnyHashable })
251+
}
252+
}
253+
}
204254
}
205255

206256
// MARK: Extensions
@@ -223,3 +273,9 @@ extension NSExpressionDescription {
223273
return expDesc
224274
}
225275
}
276+
277+
extension AnyPublisher where Failure == AggregateRepository.Failure {
278+
func subscription<Value: Numeric>(_ repository: AggregateRepository) -> Self where Self.Output == AggregateRepository.Success<Value>, Self.Failure == AggregateRepository.Failure {
279+
repository.subscription(self)
280+
}
281+
}

Sources/CoreDataRepository/FetchRepository+Subscription.swift

Lines changed: 0 additions & 83 deletions
This file was deleted.

Sources/CoreDataRepository/FetchRepository.swift

Lines changed: 30 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -93,25 +93,36 @@ public final class FetchRepository {
9393
}}.eraseToAnyPublisher()
9494
}
9595

96-
public func fetchSubscription<Model: UnmanagedModel>(_ request: NSFetchRequest<Model.RepoManaged>) -> AnyPublisher<Success<Model>, Failure<Model>> {
97-
AnyPublisher.create { [weak self] subscriber -> AnyCancellable in
98-
guard let self = self else {
99-
subscriber.send(completion: .failure(Failure(error: .unknown, fetchRequest: request)))
100-
return AnyCancellable {}
101-
}
102-
let id = UUID()
103-
let subscription = FetchRepository.Subscription<Model>(
104-
id: id,
105-
request: request,
106-
context: self.context
107-
)
108-
subscription.subject.sink(receiveCompletion: subscriber.send, receiveValue: subscriber.send).store(in: &self.cancellables)
109-
self.subscriptions.append(subscription)
110-
subscription.manualFetch()
111-
return AnyCancellable {
112-
subscription.cancel()
113-
self.subscriptions.removeAll(where: { $0.id == subscription.id })
96+
public func subscription<Model: UnmanagedModel>(_ publisher: AnyPublisher<Success<Model>, Failure<Model>>) -> AnyPublisher<Success<Model>, Failure<Model>> {
97+
let subject = PassthroughSubject<Success<Model>, Failure<Model>>()
98+
let id = UUID()
99+
publisher.sink(
100+
receiveCompletion: { completion in
101+
if case .failure = completion {
102+
subject.send(completion: completion)
103+
}
104+
},
105+
receiveValue: { value in
106+
let subscription = RepositorySubscription(
107+
id: id,
108+
request: value.fetchRequest,
109+
context: self.context,
110+
success: { Success(items: $0.map { $0.asUnmanaged }, fetchRequest: value.fetchRequest) },
111+
failure: { Failure(error: $0, fetchRequest: value.fetchRequest) },
112+
subject: subject
113+
)
114+
subscription.start()
115+
self.subscriptions.append(subscription)
116+
subject.send(value)
114117
}
115-
}
118+
).store(in: &self.cancellables)
119+
return subject.eraseToAnyPublisher()
120+
}
121+
}
122+
123+
// MARK: Extensions
124+
extension AnyPublisher {
125+
func subscription<Model: UnmanagedModel>(_ repository: FetchRepository) -> Self where Self.Output == FetchRepository.Success<Model>, Self.Failure == FetchRepository.Failure<Model> {
126+
repository.subscription(self)
116127
}
117128
}

0 commit comments

Comments
 (0)