@@ -5,7 +5,7 @@ import { ObjectCacheService } from '../../cache/object-cache.service';
55import { HALEndpointService } from '../../shared/hal-endpoint.service' ;
66import { Process } from '../../../process-page/processes/process.model' ;
77import { PROCESS } from '../../../process-page/processes/process.resource-type' ;
8- import { Observable } from 'rxjs' ;
8+ import { Observable , Subscription } from 'rxjs' ;
99import { switchMap , filter , distinctUntilChanged , find } from 'rxjs/operators' ;
1010import { PaginatedList } from '../paginated-list.model' ;
1111import { Bitstream } from '../../shared/bitstream.model' ;
@@ -22,6 +22,7 @@ import { NoContent } from '../../shared/NoContent.model';
2222import { getAllCompletedRemoteData } from '../../shared/operators' ;
2323import { ProcessStatus } from 'src/app/process-page/processes/process-status.model' ;
2424import { hasValue } from '../../../shared/empty.util' ;
25+ import { SearchData , SearchDataImpl } from '../base/search-data' ;
2526
2627/**
2728 * Create an InjectionToken for the default JS setTimeout function, purely so we can mock it during
@@ -34,11 +35,13 @@ export const TIMER_FACTORY = new InjectionToken<(callback: (...args: any[]) => v
3435
3536@Injectable ( )
3637@dataService ( PROCESS )
37- export class ProcessDataService extends IdentifiableDataService < Process > implements FindAllData < Process > , DeleteData < Process > {
38+ export class ProcessDataService extends IdentifiableDataService < Process > implements FindAllData < Process > , DeleteData < Process > , SearchData < Process > {
3839
3940 private findAllData : FindAllData < Process > ;
4041 private deleteData : DeleteData < Process > ;
42+ private searchData : SearchData < Process > ;
4143 protected activelyBeingPolled : Map < string , NodeJS . Timeout > = new Map ( ) ;
44+ protected subs : Map < string , Subscription > = new Map ( ) ;
4245
4346 constructor (
4447 protected requestService : RequestService ,
@@ -54,6 +57,7 @@ export class ProcessDataService extends IdentifiableDataService<Process> impleme
5457
5558 this . findAllData = new FindAllDataImpl ( this . linkPath , requestService , rdbService , objectCache , halService , this . responseMsToLive ) ;
5659 this . deleteData = new DeleteDataImpl ( this . linkPath , requestService , rdbService , objectCache , halService , notificationsService , this . responseMsToLive , this . constructIdEndpoint ) ;
60+ this . searchData = new SearchDataImpl ( this . linkPath , requestService , rdbService , objectCache , halService , this . responseMsToLive ) ;
5761 }
5862
5963 /**
@@ -109,6 +113,71 @@ export class ProcessDataService extends IdentifiableDataService<Process> impleme
109113 return this . findAllData . findAll ( options , useCachedVersionIfAvailable , reRequestOnStale , ...linksToFollow ) ;
110114 }
111115
116+ /**
117+ * @param searchMethod The search method for the Process
118+ * @param options The FindListOptions object
119+ * @param useCachedVersionIfAvailable If this is true, the request will only be sent if there's
120+ * no valid cached version. Defaults to true.
121+ * @param reRequestOnStale Whether the request should automatically be re-
122+ * requested after the response becomes stale.
123+ * @param linksToFollow List of {@link FollowLinkConfig} that indicate which
124+ * {@link HALLink}s should automatically be resolved.
125+ * @return {Observable<RemoteData<PaginatedList<Process>>> }
126+ * Return an observable that emits a paginated list of processes
127+ */
128+ searchBy ( searchMethod : string , options ?: FindListOptions , useCachedVersionIfAvailable ?: boolean , reRequestOnStale ?: boolean , ...linksToFollow : FollowLinkConfig < Process > [ ] ) : Observable < RemoteData < PaginatedList < Process > > > {
129+ return this . searchData . searchBy ( searchMethod , options , useCachedVersionIfAvailable , reRequestOnStale , ...linksToFollow ) ;
130+ }
131+
132+ /**
133+ * @param id The id for this auto-refreshing search. Used to stop
134+ * auto-refreshing afterwards, and ensure we're not
135+ * auto-refreshing the same thing multiple times.
136+ * @param searchMethod The search method for the Process
137+ * @param options The FindListOptions object
138+ * @param pollingIntervalInMs The interval by which the search will be repeated
139+ * @param linksToFollow List of {@link FollowLinkConfig} that indicate which
140+ * {@link HALLink}s should automatically be resolved.
141+ * @return {Observable<RemoteData<PaginatedList<Process>>> }
142+ * Return an observable that emits a paginated list of processes every interval
143+ */
144+ autoRefreshingSearchBy ( id : string , searchMethod : string , options ?: FindListOptions , pollingIntervalInMs : number = 5000 , ...linksToFollow : FollowLinkConfig < Process > [ ] ) : Observable < RemoteData < PaginatedList < Process > > > {
145+
146+ const result$ = this . searchBy ( searchMethod , options , true , true , ...linksToFollow ) . pipe (
147+ getAllCompletedRemoteData ( )
148+ ) ;
149+
150+ const sub = result$ . pipe (
151+ filter ( ( ) =>
152+ ! this . activelyBeingPolled . has ( id )
153+ )
154+ ) . subscribe ( ( processListRd : RemoteData < PaginatedList < Process > > ) => {
155+ this . clearCurrentTimeout ( id ) ;
156+ const nextTimeout = this . timer ( ( ) => {
157+ this . activelyBeingPolled . delete ( id ) ;
158+ this . requestService . setStaleByHrefSubstring ( processListRd . payload . _links . self . href ) ;
159+ } , pollingIntervalInMs ) ;
160+
161+ this . activelyBeingPolled . set ( id , nextTimeout ) ;
162+ } ) ;
163+
164+ this . subs . set ( id , sub ) ;
165+
166+ return result$ ;
167+ }
168+
169+ /**
170+ * Stop auto-refreshing the request with the given id
171+ * @param id the id of the request to stop automatically refreshing
172+ */
173+ stopAutoRefreshing ( id : string ) {
174+ this . clearCurrentTimeout ( id ) ;
175+ if ( hasValue ( this . subs . get ( id ) ) ) {
176+ this . subs . get ( id ) . unsubscribe ( ) ;
177+ this . subs . delete ( id ) ;
178+ }
179+ }
180+
112181 /**
113182 * Delete an existing object on the server
114183 * @param objectId The id of the object to be removed
@@ -135,14 +204,15 @@ export class ProcessDataService extends IdentifiableDataService<Process> impleme
135204 }
136205
137206 /**
138- * Clear the timeout for the given process , if that timeout exists
207+ * Clear the timeout for the given id , if that timeout exists
139208 * @protected
140209 */
141- protected clearCurrentTimeout ( processId : string ) : void {
142- const timeout = this . activelyBeingPolled . get ( processId ) ;
210+ protected clearCurrentTimeout ( id : string ) : void {
211+ const timeout = this . activelyBeingPolled . get ( id ) ;
143212 if ( hasValue ( timeout ) ) {
144213 clearTimeout ( timeout ) ;
145214 }
215+ this . activelyBeingPolled . delete ( id ) ;
146216 }
147217
148218 /**
@@ -185,15 +255,15 @@ export class ProcessDataService extends IdentifiableDataService<Process> impleme
185255 }
186256 } ) ;
187257
258+ this . subs . set ( processId , sub ) ;
259+
188260 // When the process completes create a one off subscription (the `find` completes the
189261 // observable) that unsubscribes the previous one, removes the processId from the list of
190262 // processes being polled and clears any running timeouts
191263 process$ . pipe (
192264 find ( ( processRD : RemoteData < Process > ) => ProcessDataService . hasCompletedOrFailed ( processRD . payload ) )
193265 ) . subscribe ( ( ) => {
194- this . clearCurrentTimeout ( processId ) ;
195- this . activelyBeingPolled . delete ( processId ) ;
196- sub . unsubscribe ( ) ;
266+ this . stopAutoRefreshing ( processId ) ;
197267 } ) ;
198268
199269 return process$ . pipe (
0 commit comments