Skip to content

Commit b0687ab

Browse files
Add loadSubset State Tracking and On-Demand Sync Mode (#669)
* wip * refactor so both CollectionEventsManager and CollectionSubscription subclass the same event emiiter implimetation * changeset * rename loadMore to loadSubset * feed the subscription object through to the loadSubset call, and add an unsunbscribed event to it * feed subscription through to the loadSubset callback, add unsubscribe event to the subscription, fix types * add sync mode to base colleciton * loadSubset fn return promise or true * add comment on setting is loading * address review * remove public trackLoadPromise * setWindow returns a promise when it triggers loading subset * feat: implement useLiveInfiniteQuery hook for React (#666) * feat: implement useLiveInfiniteQuery hook for React * use the new utils.setWindow to page through the results improve types add test that checks that we detect new pages on more rows syncing changeset tweaks * isFetchingNextPage set by promise from setWindow --------- Co-authored-by: Sam Willis <sam.willis@gmail.com> --------- Co-authored-by: Kyle Mathews <mathews.kyle@gmail.com>
1 parent 63aa8ef commit b0687ab

17 files changed

Lines changed: 2453 additions & 113 deletions

.changeset/cruel-buckets-shop.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@tanstack/db": patch
3+
---
4+
5+
Added `isLoadingMore` property and `loadingMore:change` events to collections and live queries, enabling UIs to display loading indicators when more data is being fetched via `syncMore`. Each live query maintains its own isolated loading state based on its subscriptions, preventing loading status "bleed" between independent queries that share the same source collections.

.changeset/smooth-goats-ring.md

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
---
2+
"@tanstack/react-db": patch
3+
---
4+
5+
Add `useLiveInfiniteQuery` hook for infinite scrolling with live updates.
6+
7+
The new `useLiveInfiniteQuery` hook provides an infinite query pattern similar to TanStack Query's `useInfiniteQuery`, but with live updates from your local collection. It uses `liveQueryCollection.utils.setWindow()` internally to efficiently paginate through ordered data without recreating the query on each page fetch.
8+
9+
**Key features:**
10+
11+
- Automatic live updates as data changes in the collection
12+
- Efficient pagination using dynamic window adjustment
13+
- Peek-ahead mechanism to detect when more pages are available
14+
- Compatible with TanStack Query's infinite query API patterns
15+
16+
**Example usage:**
17+
18+
```tsx
19+
import { useLiveInfiniteQuery } from "@tanstack/react-db"
20+
21+
function PostList() {
22+
const { data, pages, fetchNextPage, hasNextPage, isLoading } =
23+
useLiveInfiniteQuery(
24+
(q) =>
25+
q
26+
.from({ posts: postsCollection })
27+
.orderBy(({ posts }) => posts.createdAt, "desc"),
28+
{
29+
pageSize: 20,
30+
getNextPageParam: (lastPage, allPages) =>
31+
lastPage.length === 20 ? allPages.length : undefined,
32+
}
33+
)
34+
35+
if (isLoading) return <div>Loading...</div>
36+
37+
return (
38+
<div>
39+
{pages.map((page, i) => (
40+
<div key={i}>
41+
{page.map((post) => (
42+
<PostCard key={post.id} post={post} />
43+
))}
44+
</div>
45+
))}
46+
{hasNextPage && (
47+
<button onClick={() => fetchNextPage()}>Load More</button>
48+
)}
49+
</div>
50+
)
51+
}
52+
```
53+
54+
**Requirements:**
55+
56+
- Query must include `.orderBy()` for the window mechanism to work
57+
- Returns flattened `data` array and `pages` array for flexible rendering
58+
- Automatically detects new pages when data is synced to the collection
Lines changed: 25 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import { EventEmitter } from "../event-emitter.js"
12
import type { Collection } from "./index.js"
23
import type { CollectionStatus } from "../types.js"
34

@@ -31,9 +32,21 @@ export interface CollectionSubscribersChangeEvent {
3132
subscriberCount: number
3233
}
3334

35+
/**
36+
* Event emitted when the collection's loading more state changes
37+
*/
38+
export interface CollectionLoadingSubsetChangeEvent {
39+
type: `loadingSubset:change`
40+
collection: Collection<any, any, any, any, any>
41+
isLoadingSubset: boolean
42+
previousIsLoadingSubset: boolean
43+
loadingSubsetTransition: `start` | `end`
44+
}
45+
3446
export type AllCollectionEvents = {
3547
"status:change": CollectionStatusChangeEvent
3648
"subscribers:change": CollectionSubscribersChangeEvent
49+
"loadingSubset:change": CollectionLoadingSubsetChangeEvent
3750
} & {
3851
[K in CollectionStatus as `status:${K}`]: CollectionStatusEvent<K>
3952
}
@@ -42,94 +55,32 @@ export type CollectionEvent =
4255
| AllCollectionEvents[keyof AllCollectionEvents]
4356
| CollectionStatusChangeEvent
4457
| CollectionSubscribersChangeEvent
58+
| CollectionLoadingSubsetChangeEvent
4559

4660
export type CollectionEventHandler<T extends keyof AllCollectionEvents> = (
4761
event: AllCollectionEvents[T]
4862
) => void
4963

50-
export class CollectionEventsManager {
64+
export class CollectionEventsManager extends EventEmitter<AllCollectionEvents> {
5165
private collection!: Collection<any, any, any, any, any>
52-
private listeners = new Map<
53-
keyof AllCollectionEvents,
54-
Set<CollectionEventHandler<any>>
55-
>()
5666

57-
constructor() {}
67+
constructor() {
68+
super()
69+
}
5870

5971
setDeps(deps: { collection: Collection<any, any, any, any, any> }) {
6072
this.collection = deps.collection
6173
}
6274

63-
on<T extends keyof AllCollectionEvents>(
64-
event: T,
65-
callback: CollectionEventHandler<T>
66-
) {
67-
if (!this.listeners.has(event)) {
68-
this.listeners.set(event, new Set())
69-
}
70-
this.listeners.get(event)!.add(callback)
71-
72-
return () => {
73-
this.listeners.get(event)?.delete(callback)
74-
}
75-
}
76-
77-
once<T extends keyof AllCollectionEvents>(
78-
event: T,
79-
callback: CollectionEventHandler<T>
80-
) {
81-
const unsubscribe = this.on(event, (eventPayload) => {
82-
callback(eventPayload)
83-
unsubscribe()
84-
})
85-
return unsubscribe
86-
}
87-
88-
off<T extends keyof AllCollectionEvents>(
89-
event: T,
90-
callback: CollectionEventHandler<T>
91-
) {
92-
this.listeners.get(event)?.delete(callback)
93-
}
94-
95-
waitFor<T extends keyof AllCollectionEvents>(
96-
event: T,
97-
timeout?: number
98-
): Promise<AllCollectionEvents[T]> {
99-
return new Promise((resolve, reject) => {
100-
let timeoutId: NodeJS.Timeout | undefined
101-
const unsubscribe = this.on(event, (eventPayload) => {
102-
if (timeoutId) {
103-
clearTimeout(timeoutId)
104-
timeoutId = undefined
105-
}
106-
resolve(eventPayload)
107-
unsubscribe()
108-
})
109-
if (timeout) {
110-
timeoutId = setTimeout(() => {
111-
timeoutId = undefined
112-
unsubscribe()
113-
reject(new Error(`Timeout waiting for event ${event}`))
114-
}, timeout)
115-
}
116-
})
117-
}
118-
75+
/**
76+
* Emit an event to all listeners
77+
* Public API for emitting collection events
78+
*/
11979
emit<T extends keyof AllCollectionEvents>(
12080
event: T,
12181
eventPayload: AllCollectionEvents[T]
122-
) {
123-
this.listeners.get(event)?.forEach((listener) => {
124-
try {
125-
listener(eventPayload)
126-
} catch (error) {
127-
// Re-throw in a microtask to surface the error
128-
queueMicrotask(() => {
129-
throw error
130-
})
131-
}
132-
})
82+
): void {
83+
this.emitInner(event, eventPayload)
13384
}
13485

13586
emitStatusChange<T extends CollectionStatus>(
@@ -166,6 +117,6 @@ export class CollectionEventsManager {
166117
}
167118

168119
cleanup() {
169-
this.listeners.clear()
120+
this.clearListeners()
170121
}
171122
}

packages/db/src/collection/index.ts

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ import type {
2525
InferSchemaOutput,
2626
InsertConfig,
2727
NonSingleResult,
28-
OnLoadMoreOptions,
2928
OperationConfig,
3029
SingleResult,
3130
SubscribeChangesOptions,
@@ -218,7 +217,7 @@ export class CollectionImpl<
218217
private _events: CollectionEventsManager
219218
private _changes: CollectionChangesManager<TOutput, TKey, TSchema, TInput>
220219
public _lifecycle: CollectionLifecycleManager<TOutput, TKey, TSchema, TInput>
221-
private _sync: CollectionSyncManager<TOutput, TKey, TSchema, TInput>
220+
public _sync: CollectionSyncManager<TOutput, TKey, TSchema, TInput>
222221
private _indexes: CollectionIndexesManager<TOutput, TKey, TSchema, TInput>
223222
private _mutations: CollectionMutationsManager<
224223
TOutput,
@@ -303,6 +302,7 @@ export class CollectionImpl<
303302
collection: this, // Required for passing to config.sync callback
304303
state: this._state,
305304
lifecycle: this._lifecycle,
305+
events: this._events,
306306
})
307307

308308
// Only start sync immediately if explicitly enabled
@@ -356,23 +356,19 @@ export class CollectionImpl<
356356
}
357357

358358
/**
359-
* Start sync immediately - internal method for compiled queries
360-
* This bypasses lazy loading for special cases like live query results
359+
* Check if the collection is currently loading more data
360+
* @returns true if the collection has pending load more operations, false otherwise
361361
*/
362-
public startSyncImmediate(): void {
363-
this._sync.startSync()
362+
public get isLoadingSubset(): boolean {
363+
return this._sync.isLoadingSubset
364364
}
365365

366366
/**
367-
* Requests the sync layer to load more data.
368-
* @param options Options to control what data is being loaded
369-
* @returns If data loading is asynchronous, this method returns a promise that resolves when the data is loaded.
370-
* If data loading is synchronous, the data is loaded when the method returns.
367+
* Start sync immediately - internal method for compiled queries
368+
* This bypasses lazy loading for special cases like live query results
371369
*/
372-
public syncMore(options: OnLoadMoreOptions): void | Promise<void> {
373-
if (this._sync.syncOnLoadMoreFn) {
374-
return this._sync.syncOnLoadMoreFn(options)
375-
}
370+
public startSyncImmediate(): void {
371+
this._sync.startSync()
376372
}
377373

378374
/**

0 commit comments

Comments
 (0)