Skip to content

Commit 705110a

Browse files
authored
[ZEPPELIN-6387] Fix WebSocket reconnection not reloading note in Angular UI
### What is this PR for? Fixes an issue where WebSocket reconnection in the new Angular UI (zeppelin-web-angular) does not reload the current note, causing "Note is null" errors when attempting to run paragraphs after reconnection. ### What type of PR is it? Bug Fix ### What is the Jira issue? https://issues.apache.org/jira/browse/ZEPPELIN-6387 ### How should this be tested? 1. Open any notebook in the new Angular UI. 2. Trigger a WebSocket timeout by switching to another browser tab or window and leaving the Zeppelin tab in the background for a few minutes. - Even without interacting with the browser, you can confirm that the WebSocket has reconnected by checking the server logs. 3. Allow the system to automatically reconnect. 4. Try running any paragraph → It should fail before this PR and succeed after applying this PR. ### Questions: * Does the license files need to update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Closes #5129 from tbonelee/websocket-reconnection. Signed-off-by: ChanHo Lee <chanholee@apache.org>
1 parent 5e9847b commit 705110a

2 files changed

Lines changed: 41 additions & 10 deletions

File tree

zeppelin-web-angular/projects/zeppelin-sdk/src/message.ts

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ export class Message {
4343
public connectedStatus = false;
4444
public connectedStatus$ = new Subject<boolean>();
4545
private ws: WebSocketSubject<WebSocketMessage<MessageDataTypeMap>> | null = null;
46+
private wsSubscription: Subscription | null = null;
4647
private open$ = new Subject<Event>();
4748
private close$ = new Subject<CloseEvent>();
4849
private sent$ = new Subject<WebSocketMessage<MessageSendDataTypeMap>>();
@@ -99,13 +100,26 @@ export class Message {
99100
if (!this.wsUrl) {
100101
throw new Error('WebSocket URL is not set. Please call setWsUrl() before connect()');
101102
}
103+
104+
// Unsubscribe from existing subscription first
105+
if (this.wsSubscription) {
106+
this.wsSubscription.unsubscribe();
107+
this.wsSubscription = null;
108+
}
109+
110+
// Then close existing WebSocket
111+
if (this.ws) {
112+
this.ws.complete();
113+
this.ws = null;
114+
}
115+
102116
this.ws = webSocket<WebSocketMessage<MessageDataTypeMap>>({
103117
url: this.wsUrl,
104118
openObserver: this.open$,
105119
closeObserver: this.close$
106120
});
107121

108-
this.ws
122+
this.wsSubscription = this.ws
109123
.pipe(
110124
// reconnect
111125
retryWhen(errors => errors.pipe(mergeMap(() => this.close$.pipe(take(1), delay(4000)))))
@@ -190,6 +204,10 @@ export class Message {
190204
}
191205

192206
destroy(): void {
207+
if (this.wsSubscription) {
208+
this.wsSubscription.unsubscribe();
209+
this.wsSubscription = null;
210+
}
193211
if (this.ws) {
194212
this.ws.complete();
195213
this.ws = null;

zeppelin-web-angular/src/app/pages/workspace/notebook/notebook.component.ts

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -435,18 +435,31 @@ export class NotebookComponent extends MessageListenersManager implements OnInit
435435
this.noteVarShareService.clear();
436436
});
437437
this.activatedRoute.params.pipe(takeUntil(this.destroy$)).subscribe(param => {
438-
const { noteId, revisionId } = param;
439-
if (revisionId) {
440-
this.messageService.noteRevision(noteId, revisionId);
441-
} else {
442-
this.messageService.getNote(noteId);
443-
}
444-
this.revisionView = !!revisionId;
438+
this.revisionView = !!param.revisionId;
445439
this.cdr.markForCheck();
446-
this.messageService.listRevisionHistory(noteId);
447-
// TODO(hsuanxyz) scroll to current paragraph
448440
});
449441
this.revisionView = !!this.activatedRoute.snapshot.params.revisionId;
442+
443+
// Fetch note when WebSocket connects or reconnects
444+
this.messageService.connectedStatus$
445+
.pipe(startWith(this.messageService.connectedStatus), takeUntil(this.destroy$))
446+
.subscribe(connected => {
447+
console.log('connectedStatus$ changed to ', connected ? 'connected' : 'disconnected');
448+
if (connected) {
449+
const { noteId, revisionId } = this.activatedRoute.snapshot.params;
450+
if (!noteId) {
451+
throw new Error('Route parameter `noteId` is required.');
452+
}
453+
if (revisionId) {
454+
this.messageService.noteRevision(noteId, revisionId);
455+
} else {
456+
this.messageService.getNote(noteId);
457+
}
458+
this.cdr.markForCheck();
459+
this.messageService.listRevisionHistory(noteId);
460+
// TODO(hsuanxyz) scroll to current paragraph
461+
}
462+
});
450463
}
451464

452465
removeParagraphFromNgZ(): void {

0 commit comments

Comments
 (0)