@@ -5,8 +5,8 @@ import { ObjectCacheService } from '../../cache/object-cache.service';
55import { HALEndpointService } from '../../shared/hal-endpoint.service' ;
66import { Process } from '../../../process-page/processes/process.model' ;
77import { PROCESS } from '../../../process-page/processes/process.resource-type' ;
8- import { Observable , timer as rxjsTimer , concatMap } from 'rxjs' ;
9- import { switchMap , filter , distinctUntilChanged , find , tap } from 'rxjs/operators' ;
8+ import { Observable , Subscription } from 'rxjs' ;
9+ import { switchMap , filter , distinctUntilChanged , find } from 'rxjs/operators' ;
1010import { PaginatedList } from '../paginated-list.model' ;
1111import { Bitstream } from '../../shared/bitstream.model' ;
1212import { RemoteData } from '../remote-data' ;
@@ -19,7 +19,7 @@ import { dataService } from '../base/data-service.decorator';
1919import { DeleteData , DeleteDataImpl } from '../base/delete-data' ;
2020import { NotificationsService } from '../../../shared/notifications/notifications.service' ;
2121import { NoContent } from '../../shared/NoContent.model' ;
22- import { getAllCompletedRemoteData , getFirstCompletedRemoteData } from '../../shared/operators' ;
22+ import { getAllCompletedRemoteData } from '../../shared/operators' ;
2323import { ProcessStatus } from 'src/app/process-page/processes/process-status.model' ;
2424import { hasValue } from '../../../shared/empty.util' ;
2525import { SearchData , SearchDataImpl } from '../base/search-data' ;
@@ -41,6 +41,7 @@ export class ProcessDataService extends IdentifiableDataService<Process> impleme
4141 private deleteData : DeleteData < Process > ;
4242 private searchData : SearchData < Process > ;
4343 protected activelyBeingPolled : Map < string , NodeJS . Timeout > = new Map ( ) ;
44+ protected subs : Map < string , Subscription > = new Map ( ) ;
4445
4546 constructor (
4647 protected requestService : RequestService ,
@@ -129,6 +130,9 @@ export class ProcessDataService extends IdentifiableDataService<Process> impleme
129130 }
130131
131132 /**
133+ * @param id The id for this auto-refreshing search. Used to stop
134+ * auto-refreshing afterwards, and ensure we're not
135+ * auto-refreshing the same thing multiple times.
132136 * @param searchMethod The search method for the Process
133137 * @param options The FindListOptions object
134138 * @param pollingIntervalInMs The interval by which the search will be repeated
@@ -137,22 +141,41 @@ export class ProcessDataService extends IdentifiableDataService<Process> impleme
137141 * @return {Observable<RemoteData<PaginatedList<Process>>> }
138142 * Return an observable that emits a paginated list of processes every interval
139143 */
140- autoRefreshingSearchBy ( searchMethod : string , options ?: FindListOptions , pollingIntervalInMs : number = 5000 , ...linksToFollow : FollowLinkConfig < Process > [ ] ) : Observable < RemoteData < PaginatedList < Process > > > {
141- // Create observable that emits every pollingInterval
142- return rxjsTimer ( 0 , pollingIntervalInMs ) . pipe (
143- concatMap ( ( ) => {
144- // Every time the timer emits, request the current state of the processes
145- return this . searchBy ( searchMethod , options , false , false , ...linksToFollow ) . pipe (
146- getFirstCompletedRemoteData ( ) ,
147- tap ( ( processListRD : RemoteData < PaginatedList < Process > > ) => {
148- // Once the response has been received, invalidate the response right before the next request
149- setTimeout ( ( ) => {
150- this . invalidateByHref ( processListRD . payload . _links . self . href ) ;
151- } , Math . max ( pollingIntervalInMs - 100 , 0 ) ) ;
152- } ) ,
153- ) ;
154- } )
144+ autoRefreshingSearchBy ( id : string , searchMethod : string , options ?: FindListOptions , pollingIntervalInMs : number = 5000 , ...linksToFollow : FollowLinkConfig < Process > [ ] ) : Observable < RemoteData < PaginatedList < Process > > > {
145+
146+ const result$ = this . searchBy ( searchMethod , options , true , true , ...linksToFollow ) . pipe (
147+ getAllCompletedRemoteData ( )
155148 ) ;
149+
150+ const sub = result$ . pipe (
151+ filter ( ( ) =>
152+ ! this . activelyBeingPolled . has ( id )
153+ )
154+ ) . subscribe ( ( processListRd : RemoteData < PaginatedList < Process > > ) => {
155+ this . clearCurrentTimeout ( id ) ;
156+ const nextTimeout = this . timer ( ( ) => {
157+ this . activelyBeingPolled . delete ( id ) ;
158+ this . requestService . setStaleByHrefSubstring ( processListRd . payload . _links . self . href ) ;
159+ } , pollingIntervalInMs ) ;
160+
161+ this . activelyBeingPolled . set ( id , nextTimeout ) ;
162+ } ) ;
163+
164+ this . subs . set ( id , sub ) ;
165+
166+ return result$ ;
167+ }
168+
169+ /**
170+ * Stop auto-refreshing the request with the given id
171+ * @param id the id of the request to stop automatically refreshing
172+ */
173+ stopAutoRefreshing ( id : string ) {
174+ this . clearCurrentTimeout ( id ) ;
175+ if ( hasValue ( this . subs . get ( id ) ) ) {
176+ this . subs . get ( id ) . unsubscribe ( ) ;
177+ this . subs . delete ( id ) ;
178+ }
156179 }
157180
158181 /**
@@ -181,14 +204,15 @@ export class ProcessDataService extends IdentifiableDataService<Process> impleme
181204 }
182205
183206 /**
184- * Clear the timeout for the given process , if that timeout exists
207+ * Clear the timeout for the given id , if that timeout exists
185208 * @protected
186209 */
187- protected clearCurrentTimeout ( processId : string ) : void {
188- const timeout = this . activelyBeingPolled . get ( processId ) ;
210+ protected clearCurrentTimeout ( id : string ) : void {
211+ const timeout = this . activelyBeingPolled . get ( id ) ;
189212 if ( hasValue ( timeout ) ) {
190213 clearTimeout ( timeout ) ;
191214 }
215+ this . activelyBeingPolled . delete ( id ) ;
192216 }
193217
194218 /**
@@ -229,15 +253,15 @@ export class ProcessDataService extends IdentifiableDataService<Process> impleme
229253 this . activelyBeingPolled . set ( processId , nextTimeout ) ;
230254 } ) ;
231255
256+ this . subs . set ( processId , sub ) ;
257+
232258 // When the process completes create a one off subscription (the `find` completes the
233259 // observable) that unsubscribes the previous one, removes the processId from the list of
234260 // processes being polled and clears any running timeouts
235261 process$ . pipe (
236262 find ( ( processRD : RemoteData < Process > ) => ProcessDataService . hasCompletedOrFailed ( processRD . payload ) )
237263 ) . subscribe ( ( ) => {
238- this . clearCurrentTimeout ( processId ) ;
239- this . activelyBeingPolled . delete ( processId ) ;
240- sub . unsubscribe ( ) ;
264+ this . stopAutoRefreshing ( processId ) ;
241265 } ) ;
242266
243267 return process$ . pipe (
0 commit comments