From 76022ce34a4f4266eae61368eb7c1b6cd3ff0ac3 Mon Sep 17 00:00:00 2001 From: Tomasz Mazur <47872060+AHGIJMKLKKZNPJKQR@users.noreply.github.com> Date: Tue, 23 Jun 2026 15:40:38 +0200 Subject: [PATCH 01/12] Add fishjam hooks to composition package --- packages/composition/package.json | 18 +- packages/composition/src/eventBus.ts | 5 + packages/composition/src/hooks.ts | 46 +++ packages/composition/src/index.ts | 15 +- packages/composition/src/internal.ts | 2 + packages/composition/src/store.ts | 330 ++++++++++++++++++ packages/composition/src/types.ts | 43 +++ packages/composition/tests/hooks.test.ts | 71 ++++ packages/composition/tests/store.test.ts | 253 ++++++++++++++ packages/fishjam-proto/protos | 2 +- packages/js-server-sdk/src/index.ts | 3 + packages/js-server-sdk/src/notifications.ts | 39 ++- packages/js-server-sdk/src/types.ts | 5 + packages/js-server-sdk/src/ws_notifier.ts | 8 + .../js-server-sdk/tests/notifications.test.ts | 100 +++++- packages/js-server-sdk/tests/webhook.test.ts | 2 +- .../js-server-sdk/tests/ws_notifier.test.ts | 40 ++- yarn.lock | 41 +++ 18 files changed, 998 insertions(+), 25 deletions(-) create mode 100644 packages/composition/src/eventBus.ts create mode 100644 packages/composition/src/hooks.ts create mode 100644 packages/composition/src/internal.ts create mode 100644 packages/composition/src/store.ts create mode 100644 packages/composition/src/types.ts create mode 100644 packages/composition/tests/hooks.test.ts create mode 100644 packages/composition/tests/store.test.ts diff --git a/packages/composition/package.json b/packages/composition/package.json index affbe311..efca9767 100644 --- a/packages/composition/package.json +++ b/packages/composition/package.json @@ -1,7 +1,7 @@ { "name": "@fishjam-cloud/composition", "version": "0.29.0-rc.0", - "description": "Composition event bus contract for Fishjam templates", + "description": "React hooks and event bus for building Fishjam composition templates", "homepage": "https://github.com/fishjam-cloud/js-server-sdk", "author": "Fishjam Team", "repository": { @@ -32,6 +32,11 @@ "types": "./dist/index.d.ts", "import": "./dist/index.mjs", "require": "./dist/index.js" + }, + "./internal": { + "types": "./dist/internal.d.ts", + "import": "./dist/internal.mjs", + "require": "./dist/internal.js" } }, "scripts": { @@ -45,7 +50,8 @@ }, "tsup": { "entry": [ - "src/index.ts" + "src/index.ts", + "src/internal.ts" ], "minify": false, "format": [ @@ -54,9 +60,17 @@ ], "outDir": "dist" }, + "peerDependencies": { + "react": ">=18" + }, "devDependencies": { + "@fishjam-cloud/js-server-sdk": "workspace:*", + "@types/react": "^19.0.0", + "@types/react-dom": "^19.1.9", "eslint": "^9.33.0", "prettier": "^3.6.2", + "react": "^19.0.0", + "react-dom": "^19.0.0", "tsup": "^8.4.0", "vitest": "^3.0.0" }, diff --git a/packages/composition/src/eventBus.ts b/packages/composition/src/eventBus.ts new file mode 100644 index 00000000..ca146d9d --- /dev/null +++ b/packages/composition/src/eventBus.ts @@ -0,0 +1,5 @@ +export interface CompositionEventBus { + on(eventName: string, callback: (data: T) => void): () => void; +} + +export const eventBus = (globalThis as unknown as { eventBus: CompositionEventBus }).eventBus; diff --git a/packages/composition/src/hooks.ts b/packages/composition/src/hooks.ts new file mode 100644 index 00000000..7afac7dd --- /dev/null +++ b/packages/composition/src/hooks.ts @@ -0,0 +1,46 @@ +import { useSyncExternalStore } from 'react'; +import { compositionStore } from './store'; +import type { PeerWithStreams, VadStatus } from './types'; + +const useSnapshot = () => + useSyncExternalStore(compositionStore.subscribe, compositionStore.getSnapshot, compositionStore.getSnapshot); + +/** + * All peers in the linked room, projected into composition streams. Flat list — + * the worker is not a peer, so there is no local/remote split. + */ +export function usePeers(): { + peers: PeerWithStreams[]; +} { + const snapshot = useSnapshot(); + return { peers: snapshot.peers as PeerWithStreams[] }; +} + +/** + * The peer that owns any {@link Stream} with the given `inputId`, or `undefined`. + */ +export function usePeer( + inputId: string +): PeerWithStreams | undefined { + const snapshot = useSnapshot(); + const peer = snapshot.peers.find((p) => p.streams.some((s) => s.inputId === inputId)); + return peer as PeerWithStreams | undefined; +} + +/** + * The linked room's id, or `undefined` when no room is linked. + */ +export function useRoom(): { id: string } | undefined { + const snapshot = useSnapshot(); + if (!snapshot.roomId) return undefined; + return { id: snapshot.roomId }; +} + +/** + * Voice-activity status for the input identified by `inputId`. Defaults to + * `'silence'` until the first VAD notification for the input arrives. + */ +export function useSpeakingState(inputId: string): VadStatus { + const snapshot = useSnapshot(); + return snapshot.vad[inputId] ?? 'silence'; +} diff --git a/packages/composition/src/index.ts b/packages/composition/src/index.ts index ca146d9d..053d4241 100644 --- a/packages/composition/src/index.ts +++ b/packages/composition/src/index.ts @@ -1,5 +1,10 @@ -export interface CompositionEventBus { - on(eventName: string, callback: (data: T) => void): () => void; -} - -export const eventBus = (globalThis as unknown as { eventBus: CompositionEventBus }).eventBus; +/** + * React hooks for composition templates receiving track forwardings from rooms. + * Templates have access to a room's state (peers, tracks, voice activity). + * + * @packageDocumentation + */ +export { usePeers, usePeer, useRoom, useSpeakingState } from './hooks'; +export type { PeerWithStreams, Stream, TrackState, VideoTrackState, AudioTrackState, VadStatus } from './types'; +export { eventBus } from './eventBus'; +export type { CompositionEventBus } from './eventBus'; diff --git a/packages/composition/src/internal.ts b/packages/composition/src/internal.ts new file mode 100644 index 00000000..e2b73cb6 --- /dev/null +++ b/packages/composition/src/internal.ts @@ -0,0 +1,2 @@ +export { compositionStore } from './store'; +export type { CompositionEvent, RoomSnapshot } from './store'; diff --git a/packages/composition/src/store.ts b/packages/composition/src/store.ts new file mode 100644 index 00000000..88287ec5 --- /dev/null +++ b/packages/composition/src/store.ts @@ -0,0 +1,330 @@ +import type { + Room, + VadStatus, + PeerConnected, + PeerDisconnected, + PeerMetadataUpdated, + TrackAdded, + TrackForwarding, + TrackForwardingRemoved, + TrackMetadataUpdated, + TrackRemoved, + VadNotification, +} from '@fishjam-cloud/js-server-sdk'; +import type { AudioTrackState, PeerWithStreams, Stream, VideoTrackState } from './types'; + +/** + * Discriminated notifier event accepted by {@link CompositionStore.applyNotification}. + * Each `FishjamWSNotifier` listener maps to one of these variants. + */ +export type CompositionEvent = + | { type: 'peerConnected'; data: PeerConnected } + | { type: 'peerDisconnected'; data: PeerDisconnected } + | { type: 'peerMetadataUpdated'; data: PeerMetadataUpdated } + | { type: 'trackAdded'; data: TrackAdded } + | { type: 'trackRemoved'; data: TrackRemoved } + | { type: 'trackMetadataUpdated'; data: TrackMetadataUpdated } + | { type: 'trackForwarding'; data: TrackForwarding } + | { type: 'trackForwardingRemoved'; data: TrackForwardingRemoved } + | { type: 'vadNotification'; data: VadNotification }; + +/** + * Complete snapshot of a linked room's state at a given moment. + */ +export type RoomSnapshot = { + peers: PeerWithStreams[]; + roomId?: string; + /** per-`inputId` voice activity, consumed by `useSpeakingState`. */ + vad: Record; +}; + +type Metadata = Record; + +type InternalTrack = { id: string; metadata: Metadata }; + +type InternalStream = { + inputId: string; + video?: InternalTrack; + audio?: InternalTrack; +}; + +type InternalPeer = { + id: string; + metadata: { peer: unknown; server: unknown }; + tracks: Map; + streams: Map; +}; + +const EMPTY_SNAPSHOT: RoomSnapshot = { peers: [], vad: {} }; + +const assertNever = (event: never): never => { + throw new Error(`Unhandled composition event: ${JSON.stringify(event)}`); +}; + +const normalizeMetadata = (raw: string | object | null | undefined): Metadata => { + if (raw == null) return {}; + if (typeof raw === 'string') { + try { + const parsed = JSON.parse(raw); + return parsed && typeof parsed === 'object' ? (parsed as Metadata) : {}; + } catch { + return {}; + } + } + return raw as Metadata; +}; + +const splitMetadata = (raw: string | object | null | undefined): { peer: unknown; server: unknown } => { + const obj = normalizeMetadata(raw); + return { peer: obj.peer, server: obj.server }; +}; + +const roleOf = (stream: InternalStream): 'camera' | 'screenShare' | 'custom' => { + const type = stream.video?.metadata.type ?? stream.audio?.metadata.type; + if (type === 'camera') return 'camera'; + if (type === 'screenShare') return 'screenShare'; + return 'custom'; +}; + +class CompositionStore { + private peers = new Map(); + private roomId: string | undefined; + /** trackId -> inputId, used to resolve VAD/metadata events to a stream. */ + private forwarding = new Map(); + private vad = new Map(); + + private listeners = new Set<() => void>(); + private cachedSnapshot: RoomSnapshot | null = EMPTY_SNAPSHOT; + + readonly subscribe = (cb: () => void): (() => void) => { + this.listeners.add(cb); + return () => this.listeners.delete(cb); + }; + + readonly getSnapshot = (): RoomSnapshot => { + if (this.cachedSnapshot === null) this.cachedSnapshot = this.buildSnapshot(); + return this.cachedSnapshot; + }; + + // -- backward-facing feed API ------------------------------------------------ + + reset(): void { + this.peers.clear(); + this.forwarding.clear(); + this.vad.clear(); + this.roomId = undefined; + this.commit(); + } + + seedFromRoom(room: Room): void { + this.peers.clear(); + this.forwarding.clear(); + this.vad.clear(); + this.roomId = room.id; + + for (const peer of room.peers) { + const internal: InternalPeer = { + id: peer.id, + metadata: splitMetadata(peer.metadata), + tracks: new Map(), + streams: new Map(), + }; + for (const track of peer.tracks ?? []) { + if (!track.id) continue; + internal.tracks.set(track.id, { id: track.id, metadata: normalizeMetadata(track.metadata) }); + } + this.peers.set(peer.id, internal); + } + this.commit(); + } + + applyNotification(event: CompositionEvent): void { + switch (event.type) { + case 'peerConnected': + this.onPeerConnected(event.data); + break; + case 'peerDisconnected': + this.onPeerDisconnected(event.data); + break; + case 'peerMetadataUpdated': + this.onPeerMetadataUpdated(event.data); + break; + case 'trackAdded': + this.onTrackAdded(event.data); + break; + case 'trackRemoved': + this.onTrackRemoved(event.data); + break; + case 'trackMetadataUpdated': + this.onTrackMetadataUpdated(event.data); + break; + case 'trackForwarding': + this.onTrackForwarding(event.data); + break; + case 'trackForwardingRemoved': + this.onTrackForwardingRemoved(event.data); + break; + case 'vadNotification': + this.onVadNotification(event.data); + break; + default: + assertNever(event); + } + this.commit(); + } + + // -- reducer --------------------------------------------------------------- + + private ensurePeer(peerId: string): InternalPeer { + let peer = this.peers.get(peerId); + if (!peer) { + peer = { id: peerId, metadata: { peer: undefined, server: undefined }, tracks: new Map(), streams: new Map() }; + this.peers.set(peerId, peer); + } + return peer; + } + + private onPeerConnected(data: PeerConnected): void { + this.ensurePeer(data.peerId); + } + + private onPeerDisconnected(data: PeerDisconnected): void { + const peer = this.peers.get(data.peerId); + if (peer) { + for (const inputId of peer.streams.keys()) this.vad.delete(inputId); + } + this.peers.delete(data.peerId); + for (const key of this.forwarding.keys()) { + if (key.startsWith(`${data.peerId}:`)) this.forwarding.delete(key); + } + } + + private onPeerMetadataUpdated(data: PeerMetadataUpdated): void { + this.ensurePeer(data.peerId).metadata = splitMetadata(data.metadata); + } + + private onTrackAdded(data: TrackAdded): void { + if (!data.peerId || !data.track) return; + const peer = this.ensurePeer(data.peerId); + peer.tracks.set(data.track.id, { id: data.track.id, metadata: normalizeMetadata(data.track.metadata) }); + } + + private onTrackMetadataUpdated(data: TrackMetadataUpdated): void { + if (!data.peerId || !data.track) return; + const peer = this.peers.get(data.peerId); + if (!peer) return; + const metadata = normalizeMetadata(data.track.metadata); + peer.tracks.set(data.track.id, { id: data.track.id, metadata }); + + const inputId = this.forwarding.get(`${data.peerId}:${data.track.id}`); + if (!inputId) return; + const stream = peer.streams.get(inputId); + if (!stream) return; + if (stream.video?.id === data.track.id) stream.video = { id: data.track.id, metadata }; + if (stream.audio?.id === data.track.id) stream.audio = { id: data.track.id, metadata }; + } + + private onTrackRemoved(data: TrackRemoved): void { + if (!data.peerId || !data.track) return; + const peer = this.peers.get(data.peerId); + if (!peer) return; + peer.tracks.delete(data.track.id); + + const key = `${data.peerId}:${data.track.id}`; + const inputId = this.forwarding.get(key); + this.forwarding.delete(key); + if (!inputId) return; + const stream = peer.streams.get(inputId); + if (!stream) return; + if (stream.video?.id === data.track.id) stream.video = undefined; + if (stream.audio?.id === data.track.id) stream.audio = undefined; + } + + private onTrackForwarding(data: TrackForwarding): void { + const peer = this.ensurePeer(data.peerId); + const stream: InternalStream = peer.streams.get(data.inputId) ?? { inputId: data.inputId }; + + if (data.videoTrack) { + stream.video = { id: data.videoTrack.id, metadata: normalizeMetadata(data.videoTrack.metadata) }; + this.forwarding.set(`${data.peerId}:${data.videoTrack.id}`, data.inputId); + peer.tracks.set(data.videoTrack.id, stream.video); + } + if (data.audioTrack) { + stream.audio = { id: data.audioTrack.id, metadata: normalizeMetadata(data.audioTrack.metadata) }; + this.forwarding.set(`${data.peerId}:${data.audioTrack.id}`, data.inputId); + peer.tracks.set(data.audioTrack.id, stream.audio); + } + peer.streams.set(data.inputId, stream); + } + + private onTrackForwardingRemoved(data: TrackForwardingRemoved): void { + const peer = this.peers.get(data.peerId); + this.vad.delete(data.inputId); + if (!peer) return; + peer.streams.delete(data.inputId); + for (const [key, inputId] of this.forwarding) { + if (inputId === data.inputId && key.startsWith(`${data.peerId}:`)) this.forwarding.delete(key); + } + } + + private onVadNotification(data: VadNotification): void { + const inputId = this.forwarding.get(`${data.peerId}:${data.trackId}`); + if (!inputId) return; + this.vad.set(inputId, data.status); + } + + // -- snapshot -------------------------------------------------------------- + + private commit(): void { + this.cachedSnapshot = null; + for (const cb of this.listeners) cb(); + } + + private buildSnapshot(): RoomSnapshot { + const vad: Record = {}; + for (const [inputId, status] of this.vad) vad[inputId] = status; + + const peers = Array.from(this.peers.values()).map((peer) => this.derivePeer(peer, vad)); + return { peers, roomId: this.roomId, vad }; + } + + private derivePeer(peer: InternalPeer, vad: Record): PeerWithStreams { + const streams: Stream[] = []; + const customStreams: Stream[] = []; + let cameraStream: Stream | undefined; + let screenShareStream: Stream | undefined; + + for (const internal of peer.streams.values()) { + const video: VideoTrackState | undefined = internal.video + ? { + id: internal.video.id, + paused: Boolean(internal.video.metadata.paused), + metadata: internal.video.metadata, + type: 'video', + } + : undefined; + const audio: AudioTrackState | undefined = internal.audio + ? { + id: internal.audio.id, + paused: Boolean(internal.audio.metadata.paused), + metadata: internal.audio.metadata, + type: 'audio', + ...(vad[internal.inputId] !== undefined ? { vadStatus: vad[internal.inputId] } : {}), + } + : undefined; + + const stream: Stream = { inputId: internal.inputId, video, audio }; + streams.push(stream); + + const role = roleOf(internal); + if (role === 'camera') cameraStream = stream; + else if (role === 'screenShare') screenShareStream = stream; + else customStreams.push(stream); + } + + return { id: peer.id, metadata: peer.metadata, streams, cameraStream, screenShareStream, customStreams }; + } +} + +/** Module-level singleton shared by the hooks and the worker feed API. */ +export const compositionStore = new CompositionStore(); diff --git a/packages/composition/src/types.ts b/packages/composition/src/types.ts new file mode 100644 index 00000000..fef20d76 --- /dev/null +++ b/packages/composition/src/types.ts @@ -0,0 +1,43 @@ +import type { VadStatus } from '@fishjam-cloud/js-server-sdk'; + +export type { VadStatus }; + +/** + * State of a single fishjam track. + * The `paused` flag mirrors the track's mute state. + */ +export type TrackState = { + /** fishjam track id */ + id: string; + /** mute state, read from the built-in `paused` metadata key */ + paused: boolean; + metadata: Record; +}; + +export type VideoTrackState = TrackState & { type: 'video' }; +export type AudioTrackState = TrackState & { type: 'audio'; vadStatus?: VadStatus }; + +/** + * A forwarded input. + * `inputId` is the handle a template passes to ``. + * A stream carries at most one video and one audio track. + */ +export type Stream = { + inputId: string; + video?: VideoTrackState; + audio?: AudioTrackState; +}; + +/** + * A room peer projected for composition templates. + * Streams are split by role: camera, screen share, or custom. + * The camera stream's audio is the peer's mic. + */ +export type PeerWithStreams = { + id: string; + metadata: { peer: PeerMetadata; server: ServerMetadata }; + streams: Stream[]; + cameraStream?: Stream; + screenShareStream?: Stream; + customStreams: Stream[]; +}; diff --git a/packages/composition/tests/hooks.test.ts b/packages/composition/tests/hooks.test.ts new file mode 100644 index 00000000..8484266b --- /dev/null +++ b/packages/composition/tests/hooks.test.ts @@ -0,0 +1,71 @@ +import { describe, it, expect, beforeEach, expectTypeOf } from 'vitest'; +import { createElement, type FunctionComponent } from 'react'; +import { renderToStaticMarkup } from 'react-dom/server'; +import { compositionStore, type CompositionEvent } from '../src/store'; +import { usePeers, usePeer, useRoom, useSpeakingState } from '../src/hooks'; +import type { PeerWithStreams } from '../src/types'; + +const apply = (event: CompositionEvent) => compositionStore.applyNotification(event); +const render = (component: FunctionComponent) => renderToStaticMarkup(createElement(component)); + +const forwardCamera = () => + apply({ + type: 'trackForwarding', + data: { + roomId: 'r1', + peerId: 'p1', + compositionUrl: 'url', + inputId: 'in1', + videoTrack: { id: 'v1', type: 'video', metadata: JSON.stringify({ type: 'camera' }) }, + audioTrack: { id: 'a1', type: 'audio', metadata: JSON.stringify({ type: 'camera' }) }, + } as never, + }); + +beforeEach(() => compositionStore.reset()); + +describe('composition hooks', () => { + it('usePeers reflects the current store state', () => { + const Probe: FunctionComponent = () => + createElement( + 'div', + null, + usePeers() + .peers.map((p) => p.id) + .join(',') + ); + expect(render(Probe)).toBe('
'); + forwardCamera(); + expect(render(Probe)).toBe('
p1
'); + }); + + it('usePeer selects the peer owning the inputId', () => { + const Probe: FunctionComponent = () => createElement('div', null, usePeer('in1')?.id ?? 'none'); + expect(render(Probe)).toBe('
none
'); + forwardCamera(); + expect(render(Probe)).toBe('
p1
'); + }); + + it('useRoom reflects the linked room', () => { + const Probe: FunctionComponent = () => createElement('div', null, useRoom()?.id ?? 'none'); + expect(render(Probe)).toBe('
none
'); + compositionStore.seedFromRoom({ id: 'r1', config: {}, peers: [] } as never); + expect(render(Probe)).toBe('
r1
'); + }); + + it('useSpeakingState tracks VAD per input', () => { + const Probe: FunctionComponent = () => createElement('div', null, String(useSpeakingState('in1'))); + forwardCamera(); + expect(render(Probe)).toBe('
silence
'); + apply({ type: 'vadNotification', data: { roomId: 'r1', peerId: 'p1', trackId: 'a1', status: 'speech' } as never }); + expect(render(Probe)).toBe('
speech
'); + }); + + it('flows the metadata generics through the return types', () => { + expectTypeOf>>().toEqualTypeOf<{ + peers: PeerWithStreams<{ name: string }, { trusted: boolean }>[]; + }>(); + expectTypeOf>>().toEqualTypeOf< + PeerWithStreams<{ name: string }, unknown> | undefined + >(); + }); +}); diff --git a/packages/composition/tests/store.test.ts b/packages/composition/tests/store.test.ts new file mode 100644 index 00000000..5f117a15 --- /dev/null +++ b/packages/composition/tests/store.test.ts @@ -0,0 +1,253 @@ +import { describe, it, expect, beforeEach } from 'vitest'; +import { compositionStore, type CompositionEvent } from '../src/store'; +import type { Room } from '@fishjam-cloud/js-server-sdk'; + +const apply = (event: CompositionEvent) => compositionStore.applyNotification(event); + +const track = (id: string, type: 'video' | 'audio', metadata: Record) => + ({ id, type, metadata: JSON.stringify(metadata) }) as never; + +const forwardCamera = (peerId = 'p1', inputId = 'in1') => + apply({ + type: 'trackForwarding', + data: { + roomId: 'r1', + peerId, + compositionUrl: 'url', + inputId, + videoTrack: track('v1', 'video', { type: 'camera', paused: false }), + audioTrack: track('a1', 'audio', { type: 'camera', paused: false }), + } as never, + }); + +const forwardScreenShare = (peerId = 'p1', inputId = 'in2') => + apply({ + type: 'trackForwarding', + data: { + roomId: 'r1', + peerId, + compositionUrl: 'url', + inputId, + videoTrack: track('v2', 'video', { type: 'screenShare', paused: false }), + } as never, + }); + +const peers = () => compositionStore.getSnapshot().peers; + +beforeEach(() => compositionStore.reset()); + +describe('composition store reducer', () => { + it('peerConnected adds an empty peer', () => { + apply({ type: 'peerConnected', data: { roomId: 'r1', peerId: 'p1', peerType: 'webrtc' } as never }); + expect(peers()).toHaveLength(1); + expect(peers()[0]).toMatchObject({ id: 'p1', streams: [], customStreams: [] }); + expect(peers()[0].cameraStream).toBeUndefined(); + }); + + it('peerDisconnected removes the peer and its forwarding entries', () => { + apply({ type: 'peerConnected', data: { roomId: 'r1', peerId: 'p1', peerType: 'webrtc' } as never }); + forwardCamera(); + apply({ type: 'vadNotification', data: { roomId: 'r1', peerId: 'p1', trackId: 'a1', status: 'speech' } as never }); + apply({ type: 'peerDisconnected', data: { roomId: 'r1', peerId: 'p1', peerType: 'webrtc' } as never }); + expect(peers()).toHaveLength(0); + expect(compositionStore.getSnapshot().vad).toEqual({}); + // forwarding table cleared: a later vad for the same key resolves to nothing + apply({ type: 'vadNotification', data: { roomId: 'r1', peerId: 'p1', trackId: 'a1', status: 'speech' } as never }); + expect(compositionStore.getSnapshot().vad).toEqual({}); + }); + + it('trackAdded before trackForwarding produces no Stream; forwarding then creates and classifies it', () => { + apply({ type: 'peerConnected', data: { roomId: 'r1', peerId: 'p1', peerType: 'webrtc' } as never }); + apply({ + type: 'trackAdded', + data: { roomId: 'r1', peerId: 'p1', track: track('v1', 'video', { type: 'camera' }) } as never, + }); + expect(peers()[0].streams).toHaveLength(0); + + forwardCamera(); + const peer = peers()[0]; + expect(peer.streams).toHaveLength(1); + expect(peer.cameraStream?.inputId).toBe('in1'); + expect(peer.cameraStream?.video).toMatchObject({ id: 'v1', type: 'video', paused: false }); + expect(peer.cameraStream?.audio).toMatchObject({ id: 'a1', type: 'audio' }); + }); + + it('paused is read from track metadata', () => { + apply({ + type: 'trackForwarding', + data: { + roomId: 'r1', + peerId: 'p1', + compositionUrl: 'url', + inputId: 'in1', + videoTrack: track('v1', 'video', { type: 'camera', paused: true }), + } as never, + }); + expect(peers()[0].cameraStream?.video?.paused).toBe(true); + }); + + it('trackMetadataUpdated re-classifies a stream when the role changes', () => { + forwardCamera(); + expect(peers()[0].cameraStream).toBeDefined(); + expect(peers()[0].screenShareStream).toBeUndefined(); + + apply({ + type: 'trackMetadataUpdated', + data: { roomId: 'r1', peerId: 'p1', track: track('v1', 'video', { type: 'screenShare' }) } as never, + }); + expect(peers()[0].cameraStream).toBeUndefined(); + expect(peers()[0].screenShareStream?.inputId).toBe('in1'); + }); + + it('trackRemoved clears one slot but keeps the other; trackForwardingRemoved drops the stream', () => { + forwardCamera(); + apply({ + type: 'trackRemoved', + data: { roomId: 'r1', peerId: 'p1', track: track('a1', 'audio', {}) } as never, + }); + expect(peers()[0].cameraStream?.audio).toBeUndefined(); + expect(peers()[0].cameraStream?.video).toBeDefined(); + + apply({ + type: 'trackForwardingRemoved', + data: { roomId: 'r1', peerId: 'p1', compositionUrl: 'url', inputId: 'in1' } as never, + }); + expect(peers()[0].streams).toHaveLength(0); + expect(peers()[0].cameraStream).toBeUndefined(); + }); + + it('VAD is keyed by (peerId, trackId) → inputId and surfaces on the audio track', () => { + forwardCamera(); + apply({ type: 'vadNotification', data: { roomId: 'r1', peerId: 'p1', trackId: 'a1', status: 'speech' } as never }); + expect(compositionStore.getSnapshot().vad).toEqual({ in1: 'speech' }); + expect(peers()[0].cameraStream?.audio?.vadStatus).toBe('speech'); + }); + + it('ignores a VAD notification for an unknown (peerId, trackId)', () => { + forwardCamera(); + apply({ + type: 'vadNotification', + data: { roomId: 'r1', peerId: 'p1', trackId: 'nope', status: 'speech' } as never, + }); + expect(compositionStore.getSnapshot().vad).toEqual({}); + }); + + it('peerMetadataUpdated splits into { peer, server }', () => { + apply({ type: 'peerConnected', data: { roomId: 'r1', peerId: 'p1', peerType: 'webrtc' } as never }); + apply({ + type: 'peerMetadataUpdated', + data: { + roomId: 'r1', + peerId: 'p1', + peerType: 'webrtc', + metadata: JSON.stringify({ peer: { name: 'Ada' }, server: { trusted: true } }), + } as never, + }); + expect(peers()[0].metadata).toEqual({ peer: { name: 'Ada' }, server: { trusted: true } }); + }); + + it('normalizes notifier (string) and REST (object) metadata to the same shape', () => { + const value = { peer: { name: 'Ada' }, server: { trusted: true } }; + + apply({ type: 'peerConnected', data: { roomId: 'r1', peerId: 'p1', peerType: 'webrtc' } as never }); + apply({ + type: 'peerMetadataUpdated', + data: { roomId: 'r1', peerId: 'p1', peerType: 'webrtc', metadata: JSON.stringify(value) } as never, + }); + const fromNotifier = peers()[0].metadata; + + compositionStore.seedFromRoom({ + id: 'r1', + config: {}, + peers: [{ id: 'p1', metadata: value, tracks: [] }], + } as unknown as Room); + const fromRest = peers()[0].metadata; + + expect(fromRest).toEqual(fromNotifier); + }); + + it('seedFromRoom records peers without creating streams for un-forwarded tracks', () => { + compositionStore.seedFromRoom({ + id: 'r1', + config: {}, + peers: [{ id: 'p1', metadata: {}, tracks: [{ id: 'v1', type: 'video', metadata: { type: 'camera' } }] }], + } as unknown as Room); + expect(peers()).toHaveLength(1); + expect(peers()[0].streams).toHaveLength(0); + }); + + it('getSnapshot returns a stable reference until a mutation occurs', () => { + const a = compositionStore.getSnapshot(); + expect(compositionStore.getSnapshot()).toBe(a); + forwardCamera(); + const b = compositionStore.getSnapshot(); + expect(b).not.toBe(a); + expect(compositionStore.getSnapshot()).toBe(b); + }); + + it('peerDisconnected does not clear forwarding for peers sharing an id prefix', () => { + forwardCamera('p1', 'in1'); + forwardCamera('p10', 'in10'); + apply({ type: 'peerDisconnected', data: { roomId: 'r1', peerId: 'p1', peerType: 'webrtc' } as never }); + expect(peers().map((p) => p.id)).toEqual(['p10']); + // p10's forwarding survives p1's disconnect: a vad for p10's audio still resolves + apply({ type: 'vadNotification', data: { roomId: 'r1', peerId: 'p10', trackId: 'a1', status: 'speech' } as never }); + expect(compositionStore.getSnapshot().vad).toEqual({ in10: 'speech' }); + }); + + it('trackMetadataUpdated updates a forwarded audio track in place', () => { + forwardCamera(); + expect(peers()[0].cameraStream?.audio?.paused).toBe(false); + apply({ + type: 'trackMetadataUpdated', + data: { roomId: 'r1', peerId: 'p1', track: track('a1', 'audio', { type: 'camera', paused: true }) } as never, + }); + expect(peers()[0].cameraStream?.audio?.paused).toBe(true); + expect(peers()[0].cameraStream?.video?.paused).toBe(false); + }); + + it('classifies camera, screenShare, and custom streams independently', () => { + forwardCamera('p1', 'in1'); + forwardScreenShare('p1', 'in2'); + apply({ + type: 'trackForwarding', + data: { + roomId: 'r1', + peerId: 'p1', + compositionUrl: 'url', + inputId: 'in3', + videoTrack: track('v3', 'video', { type: 'whiteboard' }), + } as never, + }); + const peer = peers()[0]; + expect(peer.streams).toHaveLength(3); + expect(peer.cameraStream?.inputId).toBe('in1'); + expect(peer.screenShareStream?.inputId).toBe('in2'); + expect(peer.customStreams.map((s) => s.inputId)).toEqual(['in3']); + }); + + it('trackForwardingRemoved drops only the removed input, leaving the peer other stream routable', () => { + forwardCamera('p1', 'in1'); + forwardScreenShare('p1', 'in2'); + apply({ + type: 'trackForwardingRemoved', + data: { roomId: 'r1', peerId: 'p1', compositionUrl: 'url', inputId: 'in1' } as never, + }); + const peer = peers()[0]; + expect(peer.cameraStream).toBeUndefined(); + expect(peer.screenShareStream?.inputId).toBe('in2'); + // in1's forwarding entries are cleared: a vad for its audio no longer resolves + apply({ type: 'vadNotification', data: { roomId: 'r1', peerId: 'p1', trackId: 'a1', status: 'speech' } as never }); + expect(compositionStore.getSnapshot().vad).toEqual({}); + }); + + it('notifies subscribers on mutation and stops after unsubscribe', () => { + let calls = 0; + const unsubscribe = compositionStore.subscribe(() => calls++); + forwardCamera(); + expect(calls).toBe(1); + unsubscribe(); + apply({ type: 'peerConnected', data: { roomId: 'r1', peerId: 'p2', peerType: 'webrtc' } as never }); + expect(calls).toBe(1); + }); +}); diff --git a/packages/fishjam-proto/protos b/packages/fishjam-proto/protos index 24e62066..85ccd2bb 160000 --- a/packages/fishjam-proto/protos +++ b/packages/fishjam-proto/protos @@ -1 +1 @@ -Subproject commit 24e62066320eb079695c0c571b1c475e82e14ec5 +Subproject commit 85ccd2bbb82ed5a5528c2dba836b05e64f62424f diff --git a/packages/js-server-sdk/src/index.ts b/packages/js-server-sdk/src/index.ts index 25b2df72..0360b06a 100644 --- a/packages/js-server-sdk/src/index.ts +++ b/packages/js-server-sdk/src/index.ts @@ -42,6 +42,9 @@ export type { TrackAdded, TrackRemoved, TrackMetadataUpdated, + TrackForwarding, + TrackForwardingRemoved, + VadNotification, ChannelAdded, ChannelRemoved, NotificationEvents, diff --git a/packages/js-server-sdk/src/notifications.ts b/packages/js-server-sdk/src/notifications.ts index 630175e4..7d82d177 100644 --- a/packages/js-server-sdk/src/notifications.ts +++ b/packages/js-server-sdk/src/notifications.ts @@ -1,10 +1,13 @@ import { ServerMessage, ServerMessage_PeerType, + ServerMessage_TrackForwarding, + ServerMessage_VadNotification, + ServerMessage_VadNotification_Status, TrackType as ProtoTrackType, Track as ProtoTrack, } from '@fishjam-cloud/fishjam-proto'; -import { Override, PeerId, PeerType, RoomId, TrackType } from './types'; +import { Override, PeerId, PeerType, RoomId, TrackType, VadStatus } from './types'; /** * Track payload embedded in {@link TrackAdded}, {@link TrackRemoved}, {@link TrackMetadataUpdated}. @@ -30,6 +33,13 @@ const trackTypeMap: Record = { [ProtoTrackType.UNRECOGNIZED]: 'unspecified', }; +const vadStatusMap: Record = { + [ServerMessage_VadNotification_Status.STATUS_UNSPECIFIED]: 'silence', + [ServerMessage_VadNotification_Status.STATUS_SILENCE]: 'silence', + [ServerMessage_VadNotification_Status.STATUS_SPEECH]: 'speech', + [ServerMessage_VadNotification_Status.UNRECOGNIZED]: 'silence', +}; + export const expectedEventsList = [ 'roomCreated', 'roomDeleted', @@ -47,6 +57,9 @@ export const expectedEventsList = [ 'trackAdded', 'trackRemoved', 'trackMetadataUpdated', + 'trackForwarding', + 'trackForwardingRemoved', + 'vadNotification', 'channelAdded', 'channelRemoved', ] as const; @@ -64,12 +77,7 @@ export const ignoredEventsList = [ 'authRequest', 'subscribeRequest', 'subscribeResponse', - // Currently unsurfaced server notifications — no consumer demand yet. - 'trackForwarding', - 'trackForwardingRemoved', - 'vadNotification', - // Transport wrapper, not a user-facing event: unwrapped by `extractNotifications` - // into its constituent notifications rather than emitted under its own key. + // Transport wrapper, emitted only over webhooks 'notificationBatch', // Deprecated 'streamConnected', @@ -94,6 +102,9 @@ type NotificationOverrides = { peerId: PeerId; peerType: PeerType; track: Track | undefined; + audioTrack: Track | undefined; + videoTrack: Track | undefined; + status: VadStatus; }; /** @inline */ @@ -119,6 +130,9 @@ export type Notifications = { trackAdded: Notification<'trackAdded'>; trackRemoved: Notification<'trackRemoved'>; trackMetadataUpdated: Notification<'trackMetadataUpdated'>; + trackForwarding: Notification<'trackForwarding'>; + trackForwardingRemoved: Notification<'trackForwardingRemoved'>; + vadNotification: Notification<'vadNotification'>; channelAdded: Notification<'channelAdded'>; channelRemoved: Notification<'channelRemoved'>; }; @@ -139,6 +153,9 @@ export type ViewerDisconnected = Notifications['viewerDisconnected']; export type TrackAdded = Notifications['trackAdded']; export type TrackRemoved = Notifications['trackRemoved']; export type TrackMetadataUpdated = Notifications['trackMetadataUpdated']; +export type TrackForwarding = Notifications['trackForwarding']; +export type TrackForwardingRemoved = Notifications['trackForwardingRemoved']; +export type VadNotification = Notifications['vadNotification']; export type ChannelAdded = Notifications['channelAdded']; export type ChannelRemoved = Notifications['channelRemoved']; @@ -165,6 +182,14 @@ export const mapNotification = (event: ExpectedEvents, msg: unknown): unknown => const trackMsg = msg as { track: ProtoTrack | undefined }; return { ...trackMsg, track: mapTrack(trackMsg.track) }; } + if (event === 'trackForwarding') { + const fwd = msg as ServerMessage_TrackForwarding; + return { ...fwd, audioTrack: mapTrack(fwd.audioTrack), videoTrack: mapTrack(fwd.videoTrack) }; + } + if (event === 'vadNotification') { + const vad = msg as ServerMessage_VadNotification; + return { ...vad, status: vadStatusMap[vad.status] }; + } return msg; }; diff --git a/packages/js-server-sdk/src/types.ts b/packages/js-server-sdk/src/types.ts index ad87ff3d..9c902fb3 100644 --- a/packages/js-server-sdk/src/types.ts +++ b/packages/js-server-sdk/src/types.ts @@ -52,6 +52,11 @@ export type PeerType = OpenApiPeerType | 'unspecified'; */ export type TrackType = OpenApiTrackType | 'unspecified'; +/** + * Voice activity status of a track. + */ +export type VadStatus = 'speech' | 'silence'; + export type Room = { id: RoomId; peers: Peer[]; diff --git a/packages/js-server-sdk/src/ws_notifier.ts b/packages/js-server-sdk/src/ws_notifier.ts index fb85dd74..ed01a0eb 100644 --- a/packages/js-server-sdk/src/ws_notifier.ts +++ b/packages/js-server-sdk/src/ws_notifier.ts @@ -28,6 +28,14 @@ export class FishjamWSNotifier extends (EventEmitter as new () => TypedEmitter this.setupConnection(config.managementToken); } + /** + * Close the underlying WebSocket and stop emitting notifications. + */ + public disconnect(): void { + this.removeAllListeners(); + this.client.close(); + } + private dispatchNotification(message: MessageEvent) { try { const decodedMessage = ServerMessage.decode(new Uint8Array(message.data)); diff --git a/packages/js-server-sdk/tests/notifications.test.ts b/packages/js-server-sdk/tests/notifications.test.ts index 81748000..b753a6b6 100644 --- a/packages/js-server-sdk/tests/notifications.test.ts +++ b/packages/js-server-sdk/tests/notifications.test.ts @@ -1,7 +1,24 @@ import { describe, it, expect, expectTypeOf } from 'vitest'; -import type { ServerMessage } from '@fishjam-cloud/fishjam-proto'; -import { expectedEventsList, ignoredEventsList, peerEventsWithPeerType, trackEvents } from '../src/notifications'; -import type { ExpectedEvents, IgnoredEvents, Notifications, ServerNotification } from '../src/notifications'; +import { + ServerMessage, + ServerMessage_VadNotification_Status, + TrackType as ProtoTrackType, +} from '@fishjam-cloud/fishjam-proto'; +import { + expectedEventsList, + ignoredEventsList, + mapNotification, + peerEventsWithPeerType, + trackEvents, +} from '../src/notifications'; +import type { + ExpectedEvents, + IgnoredEvents, + Notifications, + TrackForwarding, + ServerNotification, + VadNotification, +} from '../src/notifications'; import type * as SDK from '../src'; // Compile-time completeness: every `ServerMessage` oneof must be classified @@ -51,6 +68,83 @@ describe('notifications module', () => { expectTypeOf().toEqualTypeOf(); expectTypeOf().toEqualTypeOf(); expectTypeOf().toEqualTypeOf(); + expectTypeOf().toEqualTypeOf(); + expectTypeOf().toEqualTypeOf(); + expectTypeOf().toEqualTypeOf(); + }); + + it('trackForwarding and vadNotification are expected, not ignored', () => { + for (const event of ['trackForwarding', 'trackForwardingRemoved', 'vadNotification'] as const) { + expect(expectedEventsList).toContain(event); + expect(ignoredEventsList as readonly string[]).not.toContain(event); + } + }); + + it('maps trackForwarding tracks to friendly track types, keeping metadata as a JSON string', () => { + const encoded = ServerMessage.encode({ + trackForwarding: { + roomId: 'r1', + peerId: 'p1', + compositionUrl: 'url', + inputId: 'in1', + audioTrack: { id: 'a1', type: ProtoTrackType.TRACK_TYPE_AUDIO, metadata: '{"type":"camera"}' }, + videoTrack: { id: 'v1', type: ProtoTrackType.TRACK_TYPE_VIDEO, metadata: '{"type":"camera"}' }, + }, + }).finish(); + const { trackForwarding } = ServerMessage.decode(encoded); + const mapped = mapNotification('trackForwarding', trackForwarding) as TrackForwarding; + + expect(mapped.audioTrack).toEqual({ id: 'a1', type: 'audio', metadata: '{"type":"camera"}' }); + expect(mapped.videoTrack).toEqual({ id: 'v1', type: 'video', metadata: '{"type":"camera"}' }); + expect(mapped.inputId).toBe('in1'); + }); + + it('maps vadNotification status to the friendly union', () => { + const speech = ServerMessage.decode( + ServerMessage.encode({ + vadNotification: { + roomId: 'r1', + peerId: 'p1', + trackId: 'a1', + status: ServerMessage_VadNotification_Status.STATUS_SPEECH, + }, + }).finish() + ).vadNotification; + expect((mapNotification('vadNotification', speech) as VadNotification).status).toBe('speech'); + + const silence = ServerMessage.decode( + ServerMessage.encode({ + vadNotification: { + roomId: 'r1', + peerId: 'p1', + trackId: 'a1', + status: ServerMessage_VadNotification_Status.STATUS_SILENCE, + }, + }).finish() + ).vadNotification; + expect((mapNotification('vadNotification', silence) as VadNotification).status).toBe('silence'); + }); + + it('maps unspecified and unrecognized vad statuses to silence', () => { + const unspecified = ServerMessage.decode( + ServerMessage.encode({ + vadNotification: { + roomId: 'r1', + peerId: 'p1', + trackId: 'a1', + status: ServerMessage_VadNotification_Status.STATUS_UNSPECIFIED, + }, + }).finish() + ).vadNotification; + expect((mapNotification('vadNotification', unspecified) as VadNotification).status).toBe('silence'); + + const unrecognized = { + roomId: 'r1', + peerId: 'p1', + trackId: 'a1', + status: ServerMessage_VadNotification_Status.UNRECOGNIZED, + }; + expect((mapNotification('vadNotification', unrecognized) as VadNotification).status).toBe('silence'); }); it('peerEventsWithPeerType covers all ExpectedEvents with a peerType field', () => { diff --git a/packages/js-server-sdk/tests/webhook.test.ts b/packages/js-server-sdk/tests/webhook.test.ts index cf7f1dee..efdb1134 100644 --- a/packages/js-server-sdk/tests/webhook.test.ts +++ b/packages/js-server-sdk/tests/webhook.test.ts @@ -57,7 +57,7 @@ describe('decodeServerNotifications', () => { notifications: [ { subscribeResponse: { eventType: 0 } }, // handshake — ignored peerConnected, - { vadNotification: { roomId: 'room-1', peerId: 'peer-1', trackId: 'track-1', status: 0 } }, // ignored + { hlsPlayable: { roomId: 'room-1', componentId: 'comp-1' } }, // deprecated — ignored ], }, }); diff --git a/packages/js-server-sdk/tests/ws_notifier.test.ts b/packages/js-server-sdk/tests/ws_notifier.test.ts index f2df8e8e..c68dbcdc 100644 --- a/packages/js-server-sdk/tests/ws_notifier.test.ts +++ b/packages/js-server-sdk/tests/ws_notifier.test.ts @@ -7,20 +7,24 @@ const encode = (message: Parameters[0]): Uint8Array type MessageLike = { data: Uint8Array | ArrayBuffer }; -/** - * Minimal stand-in for the global `WebSocket`: records constructed instances and - * exposes the handlers the notifier assigns, so a test can drive inbound messages - * through `dispatchNotification` without a live server. - */ class FakeWebSocket { static instances: FakeWebSocket[] = []; + static readonly CONNECTING = 0; + static readonly OPEN = 1; + static readonly CLOSING = 2; + static readonly CLOSED = 3; + binaryType = 'blob'; + readyState: number = FakeWebSocket.OPEN; onopen: (() => void) | null = null; onclose: ((event: { code: number; reason: string }) => void) | null = null; onerror: ((event: unknown) => void) | null = null; onmessage: ((event: MessageLike) => void) | null = null; readonly sent: unknown[] = []; + close = vi.fn(() => { + this.readyState = FakeWebSocket.CLOSED; + }); constructor(public readonly url: string) { FakeWebSocket.instances.push(this); @@ -32,7 +36,7 @@ class FakeWebSocket { } const config = { fishjamId: 'test-id', managementToken: 'test-token' }; -const noop = () => {}; +const noop = () => { }; const peerConnected = { peerConnected: { roomId: 'room-1', peerId: 'peer-1', peerType: ServerMessage_PeerType.PEER_TYPE_WEBRTC }, @@ -148,3 +152,27 @@ describe('FishjamWSNotifier.dispatchNotification', () => { expect(events).toEqual(['roomCreated', 'peerConnected', 'roomDeleted']); }); }); + +describe('FishjamWSNotifier.disconnect', () => { + beforeEach(() => { + FakeWebSocket.instances = []; + vi.stubGlobal('WebSocket', FakeWebSocket); + }); + + afterEach(() => { + vi.unstubAllGlobals(); + vi.restoreAllMocks(); + }); + + it('closes the socket and stops emitting notifications', () => { + const { notifier, socket } = createNotifier(); + const handler = vi.fn(); + notifier.on('peerConnected', handler); + + notifier.disconnect(); + + expect(socket.close).toHaveBeenCalledTimes(1); + socket.onmessage?.({ data: encode(peerConnected) }); + expect(handler).not.toHaveBeenCalled(); + }); +}); diff --git a/yarn.lock b/yarn.lock index f14b55c1..398a13b0 100644 --- a/yarn.lock +++ b/yarn.lock @@ -1569,10 +1569,17 @@ __metadata: version: 0.0.0-use.local resolution: "@fishjam-cloud/composition@workspace:packages/composition" dependencies: + "@fishjam-cloud/js-server-sdk": "workspace:*" + "@types/react": "npm:^19.0.0" + "@types/react-dom": "npm:^19.1.9" eslint: "npm:^9.33.0" prettier: "npm:^3.6.2" + react: "npm:^19.0.0" + react-dom: "npm:^19.0.0" tsup: "npm:^8.4.0" vitest: "npm:^3.0.0" + peerDependencies: + react: ">=18" languageName: unknown linkType: soft @@ -2602,6 +2609,15 @@ __metadata: languageName: node linkType: hard +"@types/react@npm:^19.0.0": + version: 19.2.17 + resolution: "@types/react@npm:19.2.17" + dependencies: + csstype: "npm:^3.2.2" + checksum: 10c0/bc2c4af96b3e480604424de70d5ebda90c5f4b485df471858c0bc2d7d70364b606ec3c4d8579f94f01aa0c6c0591f56bcf14cba5689f5eea4b74250ccdc3a232 + languageName: node + linkType: hard + "@types/react@npm:^19.1.13": version: 19.2.2 resolution: "@types/react@npm:19.2.2" @@ -3854,6 +3870,13 @@ __metadata: languageName: node linkType: hard +"csstype@npm:^3.2.2": + version: 3.2.3 + resolution: "csstype@npm:3.2.3" + checksum: 10c0/cd29c51e70fa822f1cecd8641a1445bed7063697469d35633b516e60fe8c1bde04b08f6c5b6022136bb669b64c63d4173af54864510fbb4ee23281801841a3ce + languageName: node + linkType: hard + "data-uri-to-buffer@npm:^6.0.2": version: 6.0.2 resolution: "data-uri-to-buffer@npm:6.0.2" @@ -6953,6 +6976,17 @@ __metadata: languageName: node linkType: hard +"react-dom@npm:^19.0.0": + version: 19.2.7 + resolution: "react-dom@npm:19.2.7" + dependencies: + scheduler: "npm:^0.27.0" + peerDependencies: + react: ^19.2.7 + checksum: 10c0/970ff600f6e80d47d39e2f226f12f226173b3cba3382efc97c5f0cd663de9af38c7a4c11c213fb936094faeac83060d660247accaa96b752180d5b951b9cfecb + languageName: node + linkType: hard + "react-dom@npm:^19.1.1": version: 19.2.0 resolution: "react-dom@npm:19.2.0" @@ -6971,6 +7005,13 @@ __metadata: languageName: node linkType: hard +"react@npm:^19.0.0": + version: 19.2.7 + resolution: "react@npm:19.2.7" + checksum: 10c0/0bd0e2f1bbd4ba97561c6597bf8a5fec05e6476fe61e165c1065598d16668efc6715205599c94d3ddd49d36cb0f21cbf1b9bcc18ee840b805ce222c3e8d558ac + languageName: node + linkType: hard + "react@npm:^19.1.1": version: 19.2.0 resolution: "react@npm:19.2.0" From 3b07f87553668caf8239ac174e50a482e77af6b6 Mon Sep 17 00:00:00 2001 From: Tomasz Mazur <47872060+AHGIJMKLKKZNPJKQR@users.noreply.github.com> Date: Wed, 24 Jun 2026 17:06:42 +0200 Subject: [PATCH 02/12] Format --- packages/composition/src/store.ts | 22 +++++++++---------- .../js-server-sdk/tests/ws_notifier.test.ts | 2 +- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/packages/composition/src/store.ts b/packages/composition/src/store.ts index 88287ec5..6eedc029 100644 --- a/packages/composition/src/store.ts +++ b/packages/composition/src/store.ts @@ -297,20 +297,20 @@ class CompositionStore { for (const internal of peer.streams.values()) { const video: VideoTrackState | undefined = internal.video ? { - id: internal.video.id, - paused: Boolean(internal.video.metadata.paused), - metadata: internal.video.metadata, - type: 'video', - } + id: internal.video.id, + paused: Boolean(internal.video.metadata.paused), + metadata: internal.video.metadata, + type: 'video', + } : undefined; const audio: AudioTrackState | undefined = internal.audio ? { - id: internal.audio.id, - paused: Boolean(internal.audio.metadata.paused), - metadata: internal.audio.metadata, - type: 'audio', - ...(vad[internal.inputId] !== undefined ? { vadStatus: vad[internal.inputId] } : {}), - } + id: internal.audio.id, + paused: Boolean(internal.audio.metadata.paused), + metadata: internal.audio.metadata, + type: 'audio', + ...(vad[internal.inputId] !== undefined ? { vadStatus: vad[internal.inputId] } : {}), + } : undefined; const stream: Stream = { inputId: internal.inputId, video, audio }; diff --git a/packages/js-server-sdk/tests/ws_notifier.test.ts b/packages/js-server-sdk/tests/ws_notifier.test.ts index c68dbcdc..10d338f6 100644 --- a/packages/js-server-sdk/tests/ws_notifier.test.ts +++ b/packages/js-server-sdk/tests/ws_notifier.test.ts @@ -36,7 +36,7 @@ class FakeWebSocket { } const config = { fishjamId: 'test-id', managementToken: 'test-token' }; -const noop = () => { }; +const noop = () => {}; const peerConnected = { peerConnected: { roomId: 'room-1', peerId: 'peer-1', peerType: ServerMessage_PeerType.PEER_TYPE_WEBRTC }, From c4f181f49cc7618e9cea11a4685f01ac7eeeb1d1 Mon Sep 17 00:00:00 2001 From: Tomasz Mazur <47872060+AHGIJMKLKKZNPJKQR@users.noreply.github.com> Date: Wed, 24 Jun 2026 17:20:44 +0200 Subject: [PATCH 03/12] Fix build ordering --- package.json | 6 +- packages/composition/src/store.ts | 118 +++++++++++++++++------------- 2 files changed, 72 insertions(+), 52 deletions(-) diff --git a/package.json b/package.json index b0acc395..de3e0ff0 100644 --- a/package.json +++ b/package.json @@ -20,9 +20,9 @@ "typescript-eslint": "^8.29.0" }, "scripts": { - "build": "yarn workspaces foreach -At run build", - "lint": "yarn workspaces foreach -At run lint", - "lint:check": "yarn workspaces foreach -At run lint:check", + "build": "yarn workspaces foreach -A --topological-dev run build", + "lint": "yarn workspaces foreach -A -p run lint", + "lint:check": "yarn workspaces foreach -A -p run lint:check", "typecheck": "yarn workspaces foreach -A -p run typecheck", "format:check": "yarn workspaces foreach -A -p run format:check", "format": "yarn workspaces foreach -A -p run format", diff --git a/packages/composition/src/store.ts b/packages/composition/src/store.ts index 6eedc029..744445ad 100644 --- a/packages/composition/src/store.ts +++ b/packages/composition/src/store.ts @@ -139,38 +139,39 @@ class CompositionStore { } applyNotification(event: CompositionEvent): void { + let changed: boolean; switch (event.type) { case 'peerConnected': - this.onPeerConnected(event.data); + changed = this.onPeerConnected(event.data); break; case 'peerDisconnected': - this.onPeerDisconnected(event.data); + changed = this.onPeerDisconnected(event.data); break; case 'peerMetadataUpdated': - this.onPeerMetadataUpdated(event.data); + changed = this.onPeerMetadataUpdated(event.data); break; case 'trackAdded': - this.onTrackAdded(event.data); + changed = this.onTrackAdded(event.data); break; case 'trackRemoved': - this.onTrackRemoved(event.data); + changed = this.onTrackRemoved(event.data); break; case 'trackMetadataUpdated': - this.onTrackMetadataUpdated(event.data); + changed = this.onTrackMetadataUpdated(event.data); break; case 'trackForwarding': - this.onTrackForwarding(event.data); + changed = this.onTrackForwarding(event.data); break; case 'trackForwardingRemoved': - this.onTrackForwardingRemoved(event.data); + changed = this.onTrackForwardingRemoved(event.data); break; case 'vadNotification': - this.onVadNotification(event.data); + changed = this.onVadNotification(event.data); break; default: - assertNever(event); + return assertNever(event); } - this.commit(); + if (changed) this.commit(); } // -- reducer --------------------------------------------------------------- @@ -184,63 +185,75 @@ class CompositionStore { return peer; } - private onPeerConnected(data: PeerConnected): void { + private onPeerConnected(data: PeerConnected): boolean { + if (this.peers.has(data.peerId)) return false; this.ensurePeer(data.peerId); + return true; } - private onPeerDisconnected(data: PeerDisconnected): void { + private onPeerDisconnected(data: PeerDisconnected): boolean { + let changed = false; const peer = this.peers.get(data.peerId); if (peer) { for (const inputId of peer.streams.keys()) this.vad.delete(inputId); + this.peers.delete(data.peerId); + changed = true; } - this.peers.delete(data.peerId); for (const key of this.forwarding.keys()) { - if (key.startsWith(`${data.peerId}:`)) this.forwarding.delete(key); + if (key.startsWith(`${data.peerId}:`)) { + this.forwarding.delete(key); + changed = true; + } } + return changed; } - private onPeerMetadataUpdated(data: PeerMetadataUpdated): void { + private onPeerMetadataUpdated(data: PeerMetadataUpdated): boolean { this.ensurePeer(data.peerId).metadata = splitMetadata(data.metadata); + return true; } - private onTrackAdded(data: TrackAdded): void { - if (!data.peerId || !data.track) return; + private onTrackAdded(data: TrackAdded): boolean { + if (!data.peerId || !data.track) return false; const peer = this.ensurePeer(data.peerId); peer.tracks.set(data.track.id, { id: data.track.id, metadata: normalizeMetadata(data.track.metadata) }); + return true; } - private onTrackMetadataUpdated(data: TrackMetadataUpdated): void { - if (!data.peerId || !data.track) return; + private onTrackMetadataUpdated(data: TrackMetadataUpdated): boolean { + if (!data.peerId || !data.track) return false; const peer = this.peers.get(data.peerId); - if (!peer) return; + if (!peer) return false; const metadata = normalizeMetadata(data.track.metadata); peer.tracks.set(data.track.id, { id: data.track.id, metadata }); const inputId = this.forwarding.get(`${data.peerId}:${data.track.id}`); - if (!inputId) return; + if (!inputId) return true; const stream = peer.streams.get(inputId); - if (!stream) return; + if (!stream) return true; if (stream.video?.id === data.track.id) stream.video = { id: data.track.id, metadata }; if (stream.audio?.id === data.track.id) stream.audio = { id: data.track.id, metadata }; + return true; } - private onTrackRemoved(data: TrackRemoved): void { - if (!data.peerId || !data.track) return; + private onTrackRemoved(data: TrackRemoved): boolean { + if (!data.peerId || !data.track) return false; const peer = this.peers.get(data.peerId); - if (!peer) return; - peer.tracks.delete(data.track.id); + if (!peer) return false; + const removed = peer.tracks.delete(data.track.id); const key = `${data.peerId}:${data.track.id}`; const inputId = this.forwarding.get(key); - this.forwarding.delete(key); - if (!inputId) return; + const forwardingRemoved = this.forwarding.delete(key); + if (!inputId) return removed || forwardingRemoved; const stream = peer.streams.get(inputId); - if (!stream) return; + if (!stream) return removed || forwardingRemoved; if (stream.video?.id === data.track.id) stream.video = undefined; if (stream.audio?.id === data.track.id) stream.audio = undefined; + return true; } - private onTrackForwarding(data: TrackForwarding): void { + private onTrackForwarding(data: TrackForwarding): boolean { const peer = this.ensurePeer(data.peerId); const stream: InternalStream = peer.streams.get(data.inputId) ?? { inputId: data.inputId }; @@ -255,22 +268,29 @@ class CompositionStore { peer.tracks.set(data.audioTrack.id, stream.audio); } peer.streams.set(data.inputId, stream); + return true; } - private onTrackForwardingRemoved(data: TrackForwardingRemoved): void { + private onTrackForwardingRemoved(data: TrackForwardingRemoved): boolean { + let changed = this.vad.delete(data.inputId); const peer = this.peers.get(data.peerId); - this.vad.delete(data.inputId); - if (!peer) return; - peer.streams.delete(data.inputId); + if (!peer) return changed; + if (peer.streams.delete(data.inputId)) changed = true; for (const [key, inputId] of this.forwarding) { - if (inputId === data.inputId && key.startsWith(`${data.peerId}:`)) this.forwarding.delete(key); + if (inputId === data.inputId && key.startsWith(`${data.peerId}:`)) { + this.forwarding.delete(key); + changed = true; + } } + return changed; } - private onVadNotification(data: VadNotification): void { + private onVadNotification(data: VadNotification): boolean { const inputId = this.forwarding.get(`${data.peerId}:${data.trackId}`); - if (!inputId) return; + if (!inputId) return false; + if (this.vad.get(inputId) === data.status) return false; this.vad.set(inputId, data.status); + return true; } // -- snapshot -------------------------------------------------------------- @@ -297,20 +317,20 @@ class CompositionStore { for (const internal of peer.streams.values()) { const video: VideoTrackState | undefined = internal.video ? { - id: internal.video.id, - paused: Boolean(internal.video.metadata.paused), - metadata: internal.video.metadata, - type: 'video', - } + id: internal.video.id, + paused: Boolean(internal.video.metadata.paused), + metadata: internal.video.metadata, + type: 'video', + } : undefined; const audio: AudioTrackState | undefined = internal.audio ? { - id: internal.audio.id, - paused: Boolean(internal.audio.metadata.paused), - metadata: internal.audio.metadata, - type: 'audio', - ...(vad[internal.inputId] !== undefined ? { vadStatus: vad[internal.inputId] } : {}), - } + id: internal.audio.id, + paused: Boolean(internal.audio.metadata.paused), + metadata: internal.audio.metadata, + type: 'audio', + ...(vad[internal.inputId] !== undefined ? { vadStatus: vad[internal.inputId] } : {}), + } : undefined; const stream: Stream = { inputId: internal.inputId, video, audio }; From e1e7b7a145add99ff8bd19bdff0218be99798186 Mon Sep 17 00:00:00 2001 From: Tomasz Mazur <47872060+AHGIJMKLKKZNPJKQR@users.noreply.github.com> Date: Wed, 24 Jun 2026 17:22:14 +0200 Subject: [PATCH 04/12] Format --- packages/composition/src/store.ts | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/packages/composition/src/store.ts b/packages/composition/src/store.ts index 744445ad..44087bdc 100644 --- a/packages/composition/src/store.ts +++ b/packages/composition/src/store.ts @@ -317,20 +317,20 @@ class CompositionStore { for (const internal of peer.streams.values()) { const video: VideoTrackState | undefined = internal.video ? { - id: internal.video.id, - paused: Boolean(internal.video.metadata.paused), - metadata: internal.video.metadata, - type: 'video', - } + id: internal.video.id, + paused: Boolean(internal.video.metadata.paused), + metadata: internal.video.metadata, + type: 'video', + } : undefined; const audio: AudioTrackState | undefined = internal.audio ? { - id: internal.audio.id, - paused: Boolean(internal.audio.metadata.paused), - metadata: internal.audio.metadata, - type: 'audio', - ...(vad[internal.inputId] !== undefined ? { vadStatus: vad[internal.inputId] } : {}), - } + id: internal.audio.id, + paused: Boolean(internal.audio.metadata.paused), + metadata: internal.audio.metadata, + type: 'audio', + ...(vad[internal.inputId] !== undefined ? { vadStatus: vad[internal.inputId] } : {}), + } : undefined; const stream: Stream = { inputId: internal.inputId, video, audio }; From 402a0238d42ebd2d223dba3277488029a7a63dc5 Mon Sep 17 00:00:00 2001 From: Tomasz Mazur <47872060+AHGIJMKLKKZNPJKQR@users.noreply.github.com> Date: Thu, 25 Jun 2026 17:09:57 +0200 Subject: [PATCH 05/12] Prevent unnecessary rerenders --- packages/composition/src/hooks.ts | 26 +-- packages/composition/src/store.ts | 262 ++++++++++++++--------- packages/composition/tests/hooks.test.ts | 10 +- packages/composition/tests/store.test.ts | 45 ++-- 4 files changed, 203 insertions(+), 140 deletions(-) diff --git a/packages/composition/src/hooks.ts b/packages/composition/src/hooks.ts index 7afac7dd..67448249 100644 --- a/packages/composition/src/hooks.ts +++ b/packages/composition/src/hooks.ts @@ -1,10 +1,7 @@ -import { useSyncExternalStore } from 'react'; +import { useCallback, useSyncExternalStore } from 'react'; import { compositionStore } from './store'; import type { PeerWithStreams, VadStatus } from './types'; -const useSnapshot = () => - useSyncExternalStore(compositionStore.subscribe, compositionStore.getSnapshot, compositionStore.getSnapshot); - /** * All peers in the linked room, projected into composition streams. Flat list — * the worker is not a peer, so there is no local/remote split. @@ -12,8 +9,8 @@ const useSnapshot = () => export function usePeers(): { peers: PeerWithStreams[]; } { - const snapshot = useSnapshot(); - return { peers: snapshot.peers as PeerWithStreams[] }; + const peers = useSyncExternalStore(compositionStore.subscribe, compositionStore.getPeers, compositionStore.getPeers); + return { peers: peers as PeerWithStreams[] }; } /** @@ -22,8 +19,8 @@ export function usePeers(): { export function usePeer( inputId: string ): PeerWithStreams | undefined { - const snapshot = useSnapshot(); - const peer = snapshot.peers.find((p) => p.streams.some((s) => s.inputId === inputId)); + const getSnapshot = useCallback(() => compositionStore.getPeer(inputId), [inputId]); + const peer = useSyncExternalStore(compositionStore.subscribe, getSnapshot, getSnapshot); return peer as PeerWithStreams | undefined; } @@ -31,9 +28,12 @@ export function usePeer( * The linked room's id, or `undefined` when no room is linked. */ export function useRoom(): { id: string } | undefined { - const snapshot = useSnapshot(); - if (!snapshot.roomId) return undefined; - return { id: snapshot.roomId }; + const roomId = useSyncExternalStore( + compositionStore.subscribe, + compositionStore.getRoomId, + compositionStore.getRoomId + ); + return roomId ? { id: roomId } : undefined; } /** @@ -41,6 +41,6 @@ export function useRoom(): { id: string } | undefined { * `'silence'` until the first VAD notification for the input arrives. */ export function useSpeakingState(inputId: string): VadStatus { - const snapshot = useSnapshot(); - return snapshot.vad[inputId] ?? 'silence'; + const getSnapshot = useCallback(() => compositionStore.getVadStatus(inputId), [inputId]); + return useSyncExternalStore(compositionStore.subscribe, getSnapshot, getSnapshot); } diff --git a/packages/composition/src/store.ts b/packages/composition/src/store.ts index 44087bdc..cd5512a1 100644 --- a/packages/composition/src/store.ts +++ b/packages/composition/src/store.ts @@ -10,8 +10,9 @@ import type { TrackMetadataUpdated, TrackRemoved, VadNotification, + TrackType, } from '@fishjam-cloud/js-server-sdk'; -import type { AudioTrackState, PeerWithStreams, Stream, VideoTrackState } from './types'; +import type { PeerWithStreams, Stream } from './types'; /** * Discriminated notifier event accepted by {@link CompositionStore.applyNotification}. @@ -40,19 +41,12 @@ export type RoomSnapshot = { type Metadata = Record; -type InternalTrack = { id: string; metadata: Metadata }; - -type InternalStream = { - inputId: string; - video?: InternalTrack; - audio?: InternalTrack; -}; +type InternalTrack = { id: string; metadata: Metadata; inputId?: string; type: TrackType }; type InternalPeer = { id: string; metadata: { peer: unknown; server: unknown }; tracks: Map; - streams: Map; }; const EMPTY_SNAPSHOT: RoomSnapshot = { peers: [], vad: {} }; @@ -79,10 +73,10 @@ const splitMetadata = (raw: string | object | null | undefined): { peer: unknown return { peer: obj.peer, server: obj.server }; }; -const roleOf = (stream: InternalStream): 'camera' | 'screenShare' | 'custom' => { +const roleOf = (stream: Stream): 'camera' | 'screenShare' | 'custom' => { const type = stream.video?.metadata.type ?? stream.audio?.metadata.type; - if (type === 'camera') return 'camera'; - if (type === 'screenShare') return 'screenShare'; + if (type === 'camera' || type === 'microphone') return 'camera'; + if (type === 'screenShareVideo' || type === 'screenShareAudio') return 'screenShare'; return 'custom'; }; @@ -90,11 +84,11 @@ class CompositionStore { private peers = new Map(); private roomId: string | undefined; /** trackId -> inputId, used to resolve VAD/metadata events to a stream. */ - private forwarding = new Map(); private vad = new Map(); private listeners = new Set<() => void>(); private cachedSnapshot: RoomSnapshot | null = EMPTY_SNAPSHOT; + private cachedPeers: PeerWithStreams[] = EMPTY_SNAPSHOT.peers; readonly subscribe = (cb: () => void): (() => void) => { this.listeners.add(cb); @@ -102,23 +96,35 @@ class CompositionStore { }; readonly getSnapshot = (): RoomSnapshot => { - if (this.cachedSnapshot === null) this.cachedSnapshot = this.buildSnapshot(); + if (this.cachedSnapshot === null) { + this.cachedSnapshot = { peers: this.cachedPeers, roomId: this.roomId, vad: this.vadRecord() }; + } return this.cachedSnapshot; }; + readonly getPeers = (): PeerWithStreams[] => this.cachedPeers; + + readonly getPeer = (inputId: string): PeerWithStreams | undefined => + this.getPeers().find((peer) => peer.streams.some((stream) => stream.inputId === inputId)); + + readonly getRoomId = (): string | undefined => this.roomId; + + readonly getVadStatus = (inputId: string): VadStatus => this.vad.get(inputId) ?? 'silence'; + // -- backward-facing feed API ------------------------------------------------ reset(): void { this.peers.clear(); - this.forwarding.clear(); this.vad.clear(); this.roomId = undefined; + this.cachedSnapshot = null; + this.cachedPeers = []; + this.commit(); } seedFromRoom(room: Room): void { this.peers.clear(); - this.forwarding.clear(); this.vad.clear(); this.roomId = room.id; @@ -127,18 +133,21 @@ class CompositionStore { id: peer.id, metadata: splitMetadata(peer.metadata), tracks: new Map(), - streams: new Map(), }; for (const track of peer.tracks ?? []) { if (!track.id) continue; - internal.tracks.set(track.id, { id: track.id, metadata: normalizeMetadata(track.metadata) }); + internal.tracks.set(track.id, { id: track.id, metadata: normalizeMetadata(track.metadata), type: track.type! }); } this.peers.set(peer.id, internal); } + + this.rebuildPeers(); this.commit(); } applyNotification(event: CompositionEvent): void { + if (event.data.roomId !== this.roomId) return; + let changed: boolean; switch (event.type) { case 'peerConnected': @@ -174,12 +183,10 @@ class CompositionStore { if (changed) this.commit(); } - // -- reducer --------------------------------------------------------------- - private ensurePeer(peerId: string): InternalPeer { let peer = this.peers.get(peerId); if (!peer) { - peer = { id: peerId, metadata: { peer: undefined, server: undefined }, tracks: new Map(), streams: new Map() }; + peer = { id: peerId, metadata: { peer: undefined, server: undefined }, tracks: new Map() }; this.peers.set(peerId, peer); } return peer; @@ -188,163 +195,204 @@ class CompositionStore { private onPeerConnected(data: PeerConnected): boolean { if (this.peers.has(data.peerId)) return false; this.ensurePeer(data.peerId); + this.replacePeer(data.peerId); return true; } private onPeerDisconnected(data: PeerDisconnected): boolean { - let changed = false; const peer = this.peers.get(data.peerId); - if (peer) { - for (const inputId of peer.streams.keys()) this.vad.delete(inputId); - this.peers.delete(data.peerId); - changed = true; - } - for (const key of this.forwarding.keys()) { - if (key.startsWith(`${data.peerId}:`)) { - this.forwarding.delete(key); - changed = true; - } - } - return changed; + if (!peer) return false; + + for (const { inputId } of peer.tracks.values()) if (inputId) this.vad.delete(inputId); + this.peers.delete(data.peerId); + + this.replacePeer(data.peerId); + return true; } private onPeerMetadataUpdated(data: PeerMetadataUpdated): boolean { this.ensurePeer(data.peerId).metadata = splitMetadata(data.metadata); + this.replacePeer(data.peerId); return true; } private onTrackAdded(data: TrackAdded): boolean { if (!data.peerId || !data.track) return false; + const peer = this.ensurePeer(data.peerId); - peer.tracks.set(data.track.id, { id: data.track.id, metadata: normalizeMetadata(data.track.metadata) }); + peer.tracks.set(data.track.id, { + id: data.track.id, + metadata: normalizeMetadata(data.track.metadata), + type: data.track.type, + }); + this.replacePeer(data.peerId); return true; } private onTrackMetadataUpdated(data: TrackMetadataUpdated): boolean { if (!data.peerId || !data.track) return false; + const peer = this.peers.get(data.peerId); if (!peer) return false; + const metadata = normalizeMetadata(data.track.metadata); - peer.tracks.set(data.track.id, { id: data.track.id, metadata }); - - const inputId = this.forwarding.get(`${data.peerId}:${data.track.id}`); - if (!inputId) return true; - const stream = peer.streams.get(inputId); - if (!stream) return true; - if (stream.video?.id === data.track.id) stream.video = { id: data.track.id, metadata }; - if (stream.audio?.id === data.track.id) stream.audio = { id: data.track.id, metadata }; + const inputId = peer.tracks.get(data.track.id)?.inputId; + peer.tracks.set(data.track.id, { id: data.track.id, metadata, type: data.track.type, inputId }); + + this.replacePeer(data.peerId); return true; } private onTrackRemoved(data: TrackRemoved): boolean { if (!data.peerId || !data.track) return false; + const peer = this.peers.get(data.peerId); if (!peer) return false; - const removed = peer.tracks.delete(data.track.id); - - const key = `${data.peerId}:${data.track.id}`; - const inputId = this.forwarding.get(key); - const forwardingRemoved = this.forwarding.delete(key); - if (!inputId) return removed || forwardingRemoved; - const stream = peer.streams.get(inputId); - if (!stream) return removed || forwardingRemoved; - if (stream.video?.id === data.track.id) stream.video = undefined; - if (stream.audio?.id === data.track.id) stream.audio = undefined; + if (!peer.tracks.delete(data.track.id)) return false; + + this.replacePeer(data.peerId); return true; } private onTrackForwarding(data: TrackForwarding): boolean { const peer = this.ensurePeer(data.peerId); - const stream: InternalStream = peer.streams.get(data.inputId) ?? { inputId: data.inputId }; if (data.videoTrack) { - stream.video = { id: data.videoTrack.id, metadata: normalizeMetadata(data.videoTrack.metadata) }; - this.forwarding.set(`${data.peerId}:${data.videoTrack.id}`, data.inputId); - peer.tracks.set(data.videoTrack.id, stream.video); + peer.tracks.set(data.videoTrack.id, { + id: data.videoTrack.id, + metadata: normalizeMetadata(data.videoTrack.metadata), + inputId: data.inputId, + type: data.videoTrack.type, + }); } if (data.audioTrack) { - stream.audio = { id: data.audioTrack.id, metadata: normalizeMetadata(data.audioTrack.metadata) }; - this.forwarding.set(`${data.peerId}:${data.audioTrack.id}`, data.inputId); - peer.tracks.set(data.audioTrack.id, stream.audio); + peer.tracks.set(data.audioTrack.id, { + id: data.audioTrack.id, + metadata: normalizeMetadata(data.audioTrack.metadata), + inputId: data.inputId, + type: data.audioTrack.type, + }); } - peer.streams.set(data.inputId, stream); + this.replacePeer(data.peerId); return true; } private onTrackForwardingRemoved(data: TrackForwardingRemoved): boolean { let changed = this.vad.delete(data.inputId); const peer = this.peers.get(data.peerId); - if (!peer) return changed; - if (peer.streams.delete(data.inputId)) changed = true; - for (const [key, inputId] of this.forwarding) { - if (inputId === data.inputId && key.startsWith(`${data.peerId}:`)) { - this.forwarding.delete(key); + if (!peer) { + if (changed) this.cachedSnapshot = null; + return changed; + } + + for (const track of peer.tracks.values()) { + if (track.inputId === data.inputId) { + track.inputId = undefined; changed = true; } } + + if (changed) this.replacePeer(data.peerId); return changed; } private onVadNotification(data: VadNotification): boolean { - const inputId = this.forwarding.get(`${data.peerId}:${data.trackId}`); - if (!inputId) return false; - if (this.vad.get(inputId) === data.status) return false; - this.vad.set(inputId, data.status); + const peer = this.peers.get(data.peerId); + if (!peer) return false; + + const track = peer.tracks.get(data.trackId); + if (!track || !track.inputId) return false; + if (this.vad.get(track.inputId) === data.status) return false; + + this.vad.set(track.inputId, data.status); + this.replacePeer(data.peerId); return true; } - // -- snapshot -------------------------------------------------------------- - private commit(): void { - this.cachedSnapshot = null; for (const cb of this.listeners) cb(); } - private buildSnapshot(): RoomSnapshot { + /** + * Update `cachedPeers` array, reusing every other peer's reference. + */ + private replacePeer(peerId: string): void { + const internal = this.peers.get(peerId); + const next = this.cachedPeers.slice(); + const idx = next.findIndex((peer) => peer.id === peerId); + if (!internal) { + if (idx >= 0) next.splice(idx, 1); + } else if (idx >= 0) { + next[idx] = this.derivePeer(internal, this.vadRecord()); + } else { + next.push(this.derivePeer(internal, this.vadRecord())); + } + this.cachedPeers = next; + this.cachedSnapshot = null; + } + + private rebuildPeers(): void { + const vad = this.vadRecord(); + this.cachedPeers = Array.from(this.peers.values(), (peer) => this.derivePeer(peer, vad)); + this.cachedSnapshot = null; + } + + private vadRecord(): Record { const vad: Record = {}; for (const [inputId, status] of this.vad) vad[inputId] = status; - - const peers = Array.from(this.peers.values()).map((peer) => this.derivePeer(peer, vad)); - return { peers, roomId: this.roomId, vad }; + return vad; } private derivePeer(peer: InternalPeer, vad: Record): PeerWithStreams { - const streams: Stream[] = []; - const customStreams: Stream[] = []; + const streams = new Map(); let cameraStream: Stream | undefined; let screenShareStream: Stream | undefined; + const customStreams: Stream[] = []; - for (const internal of peer.streams.values()) { - const video: VideoTrackState | undefined = internal.video - ? { - id: internal.video.id, - paused: Boolean(internal.video.metadata.paused), - metadata: internal.video.metadata, - type: 'video', - } - : undefined; - const audio: AudioTrackState | undefined = internal.audio - ? { - id: internal.audio.id, - paused: Boolean(internal.audio.metadata.paused), - metadata: internal.audio.metadata, - type: 'audio', - ...(vad[internal.inputId] !== undefined ? { vadStatus: vad[internal.inputId] } : {}), - } - : undefined; - - const stream: Stream = { inputId: internal.inputId, video, audio }; - streams.push(stream); - - const role = roleOf(internal); - if (role === 'camera') cameraStream = stream; - else if (role === 'screenShare') screenShareStream = stream; - else customStreams.push(stream); + for (const track of peer.tracks.values()) { + if (!track.inputId) continue; + const stream: Stream = streams.get(track.inputId) || { inputId: track.inputId }; + if (track.type === 'audio') + stream.audio = { + id: track.id, + paused: Boolean(track.metadata.paused), + metadata: track.metadata, + type: 'audio', + vadStatus: vad[track.inputId], + }; + else + stream.video = { + id: track.id, + paused: Boolean(track.metadata.paused), + metadata: track.metadata, + type: 'video', + }; + streams.set(track.inputId, stream); + } + + for (const stream of streams.values()) { + const role = roleOf(stream); + switch (role) { + case 'camera': + cameraStream = stream; + break; + case 'screenShare': + screenShareStream = stream; + break; + default: + customStreams.push(stream); + } } - return { id: peer.id, metadata: peer.metadata, streams, cameraStream, screenShareStream, customStreams }; + return { + id: peer.id, + metadata: peer.metadata, + streams: Array.from(streams.values()), + cameraStream, + screenShareStream, + customStreams, + }; } } -/** Module-level singleton shared by the hooks and the worker feed API. */ export const compositionStore = new CompositionStore(); diff --git a/packages/composition/tests/hooks.test.ts b/packages/composition/tests/hooks.test.ts index 8484266b..9f8fb827 100644 --- a/packages/composition/tests/hooks.test.ts +++ b/packages/composition/tests/hooks.test.ts @@ -4,6 +4,7 @@ import { renderToStaticMarkup } from 'react-dom/server'; import { compositionStore, type CompositionEvent } from '../src/store'; import { usePeers, usePeer, useRoom, useSpeakingState } from '../src/hooks'; import type { PeerWithStreams } from '../src/types'; +import { PeerId, RoomId } from '@fishjam-cloud/js-server-sdk'; const apply = (event: CompositionEvent) => compositionStore.applyNotification(event); const render = (component: FunctionComponent) => renderToStaticMarkup(createElement(component)); @@ -12,16 +13,16 @@ const forwardCamera = () => apply({ type: 'trackForwarding', data: { - roomId: 'r1', - peerId: 'p1', + roomId: 'r1' as RoomId, + peerId: 'p1' as PeerId, compositionUrl: 'url', inputId: 'in1', videoTrack: { id: 'v1', type: 'video', metadata: JSON.stringify({ type: 'camera' }) }, audioTrack: { id: 'a1', type: 'audio', metadata: JSON.stringify({ type: 'camera' }) }, - } as never, + }, }); -beforeEach(() => compositionStore.reset()); +beforeEach(() => compositionStore.seedFromRoom({ id: 'r1' as RoomId, config: {}, peers: [] })); describe('composition hooks', () => { it('usePeers reflects the current store state', () => { @@ -47,6 +48,7 @@ describe('composition hooks', () => { it('useRoom reflects the linked room', () => { const Probe: FunctionComponent = () => createElement('div', null, useRoom()?.id ?? 'none'); + compositionStore.reset(); expect(render(Probe)).toBe('
none
'); compositionStore.seedFromRoom({ id: 'r1', config: {}, peers: [] } as never); expect(render(Probe)).toBe('
r1
'); diff --git a/packages/composition/tests/store.test.ts b/packages/composition/tests/store.test.ts index 5f117a15..50f24eb7 100644 --- a/packages/composition/tests/store.test.ts +++ b/packages/composition/tests/store.test.ts @@ -1,6 +1,6 @@ import { describe, it, expect, beforeEach } from 'vitest'; import { compositionStore, type CompositionEvent } from '../src/store'; -import type { Room } from '@fishjam-cloud/js-server-sdk'; +import type { Room, RoomId } from '@fishjam-cloud/js-server-sdk'; const apply = (event: CompositionEvent) => compositionStore.applyNotification(event); @@ -28,13 +28,13 @@ const forwardScreenShare = (peerId = 'p1', inputId = 'in2') => peerId, compositionUrl: 'url', inputId, - videoTrack: track('v2', 'video', { type: 'screenShare', paused: false }), + videoTrack: track('v2', 'video', { type: 'screenShareVideo', paused: false }), } as never, }); const peers = () => compositionStore.getSnapshot().peers; -beforeEach(() => compositionStore.reset()); +beforeEach(() => compositionStore.seedFromRoom({ id: 'r1' as RoomId, config: {}, peers: [] })); describe('composition store reducer', () => { it('peerConnected adds an empty peer', () => { @@ -86,19 +86,6 @@ describe('composition store reducer', () => { expect(peers()[0].cameraStream?.video?.paused).toBe(true); }); - it('trackMetadataUpdated re-classifies a stream when the role changes', () => { - forwardCamera(); - expect(peers()[0].cameraStream).toBeDefined(); - expect(peers()[0].screenShareStream).toBeUndefined(); - - apply({ - type: 'trackMetadataUpdated', - data: { roomId: 'r1', peerId: 'p1', track: track('v1', 'video', { type: 'screenShare' }) } as never, - }); - expect(peers()[0].cameraStream).toBeUndefined(); - expect(peers()[0].screenShareStream?.inputId).toBe('in1'); - }); - it('trackRemoved clears one slot but keeps the other; trackForwardingRemoved drops the stream', () => { forwardCamera(); apply({ @@ -241,6 +228,32 @@ describe('composition store reducer', () => { expect(compositionStore.getSnapshot().vad).toEqual({}); }); + it('keeps an unchanged peer reference stable when another peer mutates (structural sharing)', () => { + forwardCamera('p1', 'in1'); + forwardCamera('p2', 'in2'); + const p1Before = compositionStore.getPeers().find((p) => p.id === 'p1'); + + // a mutation scoped to p2 must not re-derive p1 + apply({ type: 'vadNotification', data: { roomId: 'r1', peerId: 'p2', trackId: 'a1', status: 'speech' } as never }); + const after = compositionStore.getPeers(); + expect(after.find((p) => p.id === 'p1')).toBe(p1Before); + expect(after.find((p) => p.id === 'p2')).not.toBe(p1Before); + }); + + it('getPeer returns a stable reference until that peer changes', () => { + forwardCamera('p1', 'in1'); + forwardCamera('p2', 'in2'); + const a = compositionStore.getPeer('in1'); + + // unrelated commit on p2 → p1 reference unchanged + apply({ type: 'vadNotification', data: { roomId: 'r1', peerId: 'p2', trackId: 'a1', status: 'speech' } as never }); + expect(compositionStore.getPeer('in1')).toBe(a); + + // commit on p1 → new reference + apply({ type: 'vadNotification', data: { roomId: 'r1', peerId: 'p1', trackId: 'a1', status: 'speech' } as never }); + expect(compositionStore.getPeer('in1')).not.toBe(a); + }); + it('notifies subscribers on mutation and stops after unsubscribe', () => { let calls = 0; const unsubscribe = compositionStore.subscribe(() => calls++); From 6cbf1e00ed64c5a52301fb1dda040a50746fb7fd Mon Sep 17 00:00:00 2001 From: Tomasz Mazur <47872060+AHGIJMKLKKZNPJKQR@users.noreply.github.com> Date: Thu, 25 Jun 2026 17:47:07 +0200 Subject: [PATCH 06/12] Make hooks surface simpler to use --- examples/room-manager/package.json | 1 - packages/composition/src/hooks.ts | 20 +++--- packages/composition/src/store.ts | 32 ++++----- packages/composition/src/types.ts | 2 +- packages/composition/tests/hooks.test.ts | 16 ++--- packages/composition/tests/store.test.ts | 88 +++++++++++++++++++++--- yarn.lock | 10 --- 7 files changed, 113 insertions(+), 56 deletions(-) diff --git a/examples/room-manager/package.json b/examples/room-manager/package.json index d53ac41e..126dbc92 100644 --- a/examples/room-manager/package.json +++ b/examples/room-manager/package.json @@ -27,7 +27,6 @@ "@fastify/swagger": "^9.4.2", "@types/node": "^22.13.16", "pino-pretty": "^13.0.0", - "tsc": "^2.0.4", "tsx": "^4.19.3", "typescript": "~5.8.2" }, diff --git a/packages/composition/src/hooks.ts b/packages/composition/src/hooks.ts index 67448249..5d8ac3bf 100644 --- a/packages/composition/src/hooks.ts +++ b/packages/composition/src/hooks.ts @@ -6,20 +6,18 @@ import type { PeerWithStreams, VadStatus } from './types'; * All peers in the linked room, projected into composition streams. Flat list — * the worker is not a peer, so there is no local/remote split. */ -export function usePeers(): { - peers: PeerWithStreams[]; -} { +export function usePeers(): PeerWithStreams[] { const peers = useSyncExternalStore(compositionStore.subscribe, compositionStore.getPeers, compositionStore.getPeers); - return { peers: peers as PeerWithStreams[] }; + return peers as PeerWithStreams[]; } /** - * The peer that owns any {@link Stream} with the given `inputId`, or `undefined`. + * The peer with the given `peerId`, or `undefined`. */ export function usePeer( - inputId: string + peerId: string ): PeerWithStreams | undefined { - const getSnapshot = useCallback(() => compositionStore.getPeer(inputId), [inputId]); + const getSnapshot = useCallback(() => compositionStore.getPeer(peerId), [peerId]); const peer = useSyncExternalStore(compositionStore.subscribe, getSnapshot, getSnapshot); return peer as PeerWithStreams | undefined; } @@ -37,10 +35,10 @@ export function useRoom(): { id: string } | undefined { } /** - * Voice-activity status for the input identified by `inputId`. Defaults to - * `'silence'` until the first VAD notification for the input arrives. + * Voice-activity status for the peer identified by `peerId`: `'speech'` when any + * of the peer's forwarded audio inputs is speaking, otherwise `'silence'`. */ -export function useSpeakingState(inputId: string): VadStatus { - const getSnapshot = useCallback(() => compositionStore.getVadStatus(inputId), [inputId]); +export function useSpeakingState(peerId: string): VadStatus { + const getSnapshot = useCallback(() => compositionStore.getVadStatus(peerId), [peerId]); return useSyncExternalStore(compositionStore.subscribe, getSnapshot, getSnapshot); } diff --git a/packages/composition/src/store.ts b/packages/composition/src/store.ts index cd5512a1..8936a7e2 100644 --- a/packages/composition/src/store.ts +++ b/packages/composition/src/store.ts @@ -35,7 +35,7 @@ export type CompositionEvent = export type RoomSnapshot = { peers: PeerWithStreams[]; roomId?: string; - /** per-`inputId` voice activity, consumed by `useSpeakingState`. */ + /** per-`inputId` voice activity, resolved per peer by `useSpeakingState`. */ vad: Record; }; @@ -83,7 +83,6 @@ const roleOf = (stream: Stream): 'camera' | 'screenShare' | 'custom' => { class CompositionStore { private peers = new Map(); private roomId: string | undefined; - /** trackId -> inputId, used to resolve VAD/metadata events to a stream. */ private vad = new Map(); private listeners = new Set<() => void>(); @@ -104,14 +103,18 @@ class CompositionStore { readonly getPeers = (): PeerWithStreams[] => this.cachedPeers; - readonly getPeer = (inputId: string): PeerWithStreams | undefined => - this.getPeers().find((peer) => peer.streams.some((stream) => stream.inputId === inputId)); + readonly getPeer = (peerId: string): PeerWithStreams | undefined => + this.getPeers().find((peer) => peer.id === peerId); readonly getRoomId = (): string | undefined => this.roomId; - readonly getVadStatus = (inputId: string): VadStatus => this.vad.get(inputId) ?? 'silence'; - - // -- backward-facing feed API ------------------------------------------------ + readonly getVadStatus = (peerId: string): VadStatus => { + const peer = this.peers.get(peerId); + if (!peer) return 'silence'; + for (const track of peer.tracks.values()) + if (track.type === 'audio' && track.inputId && this.vad.get(track.inputId) === 'speech') return 'speech'; + return 'silence'; + }; reset(): void { this.peers.clear(); @@ -305,7 +308,7 @@ class CompositionStore { if (this.vad.get(track.inputId) === data.status) return false; this.vad.set(track.inputId, data.status); - this.replacePeer(data.peerId); + this.cachedSnapshot = null; return true; } @@ -313,9 +316,6 @@ class CompositionStore { for (const cb of this.listeners) cb(); } - /** - * Update `cachedPeers` array, reusing every other peer's reference. - */ private replacePeer(peerId: string): void { const internal = this.peers.get(peerId); const next = this.cachedPeers.slice(); @@ -323,17 +323,16 @@ class CompositionStore { if (!internal) { if (idx >= 0) next.splice(idx, 1); } else if (idx >= 0) { - next[idx] = this.derivePeer(internal, this.vadRecord()); + next[idx] = this.derivePeer(internal); } else { - next.push(this.derivePeer(internal, this.vadRecord())); + next.push(this.derivePeer(internal)); } this.cachedPeers = next; this.cachedSnapshot = null; } private rebuildPeers(): void { - const vad = this.vadRecord(); - this.cachedPeers = Array.from(this.peers.values(), (peer) => this.derivePeer(peer, vad)); + this.cachedPeers = Array.from(this.peers.values(), (peer) => this.derivePeer(peer)); this.cachedSnapshot = null; } @@ -343,7 +342,7 @@ class CompositionStore { return vad; } - private derivePeer(peer: InternalPeer, vad: Record): PeerWithStreams { + private derivePeer(peer: InternalPeer): PeerWithStreams { const streams = new Map(); let cameraStream: Stream | undefined; let screenShareStream: Stream | undefined; @@ -358,7 +357,6 @@ class CompositionStore { paused: Boolean(track.metadata.paused), metadata: track.metadata, type: 'audio', - vadStatus: vad[track.inputId], }; else stream.video = { diff --git a/packages/composition/src/types.ts b/packages/composition/src/types.ts index fef20d76..f57ec3f4 100644 --- a/packages/composition/src/types.ts +++ b/packages/composition/src/types.ts @@ -15,7 +15,7 @@ export type TrackState = { }; export type VideoTrackState = TrackState & { type: 'video' }; -export type AudioTrackState = TrackState & { type: 'audio'; vadStatus?: VadStatus }; +export type AudioTrackState = TrackState & { type: 'audio' }; /** * A forwarded input. diff --git a/packages/composition/tests/hooks.test.ts b/packages/composition/tests/hooks.test.ts index 9f8fb827..c82797e6 100644 --- a/packages/composition/tests/hooks.test.ts +++ b/packages/composition/tests/hooks.test.ts @@ -31,7 +31,7 @@ describe('composition hooks', () => { 'div', null, usePeers() - .peers.map((p) => p.id) + .map((p) => p.id) .join(',') ); expect(render(Probe)).toBe('
'); @@ -39,8 +39,8 @@ describe('composition hooks', () => { expect(render(Probe)).toBe('
p1
'); }); - it('usePeer selects the peer owning the inputId', () => { - const Probe: FunctionComponent = () => createElement('div', null, usePeer('in1')?.id ?? 'none'); + it('usePeer selects the peer by id', () => { + const Probe: FunctionComponent = () => createElement('div', null, usePeer('p1')?.id ?? 'none'); expect(render(Probe)).toBe('
none
'); forwardCamera(); expect(render(Probe)).toBe('
p1
'); @@ -54,8 +54,8 @@ describe('composition hooks', () => { expect(render(Probe)).toBe('
r1
'); }); - it('useSpeakingState tracks VAD per input', () => { - const Probe: FunctionComponent = () => createElement('div', null, String(useSpeakingState('in1'))); + it('useSpeakingState tracks VAD per peer', () => { + const Probe: FunctionComponent = () => createElement('div', null, String(useSpeakingState('p1'))); forwardCamera(); expect(render(Probe)).toBe('
silence
'); apply({ type: 'vadNotification', data: { roomId: 'r1', peerId: 'p1', trackId: 'a1', status: 'speech' } as never }); @@ -63,9 +63,9 @@ describe('composition hooks', () => { }); it('flows the metadata generics through the return types', () => { - expectTypeOf>>().toEqualTypeOf<{ - peers: PeerWithStreams<{ name: string }, { trusted: boolean }>[]; - }>(); + expectTypeOf>>().toEqualTypeOf< + PeerWithStreams<{ name: string }, { trusted: boolean }>[] + >(); expectTypeOf>>().toEqualTypeOf< PeerWithStreams<{ name: string }, unknown> | undefined >(); diff --git a/packages/composition/tests/store.test.ts b/packages/composition/tests/store.test.ts index 50f24eb7..10eede52 100644 --- a/packages/composition/tests/store.test.ts +++ b/packages/composition/tests/store.test.ts @@ -103,11 +103,11 @@ describe('composition store reducer', () => { expect(peers()[0].cameraStream).toBeUndefined(); }); - it('VAD is keyed by (peerId, trackId) → inputId and surfaces on the audio track', () => { + it('VAD is keyed by (peerId, trackId) → inputId and surfaces per peer via getVadStatus', () => { forwardCamera(); apply({ type: 'vadNotification', data: { roomId: 'r1', peerId: 'p1', trackId: 'a1', status: 'speech' } as never }); expect(compositionStore.getSnapshot().vad).toEqual({ in1: 'speech' }); - expect(peers()[0].cameraStream?.audio?.vadStatus).toBe('speech'); + expect(compositionStore.getVadStatus('p1')).toBe('speech'); }); it('ignores a VAD notification for an unknown (peerId, trackId)', () => { @@ -213,6 +213,31 @@ describe('composition store reducer', () => { expect(peer.customStreams.map((s) => s.inputId)).toEqual(['in3']); }); + it('classifies audio-only streams by audio track metadata', () => { + const forwardAudio = (inputId: string, type: string, audioId: string) => + apply({ + type: 'trackForwarding', + data: { + roomId: 'r1', + peerId: 'p1', + compositionUrl: 'url', + inputId, + audioTrack: track(audioId, 'audio', { type }), + } as never, + }); + + forwardAudio('in1', 'microphone', 'a1'); + forwardAudio('in2', 'screenShareAudio', 'a2'); + forwardAudio('in3', 'customAudio', 'a3'); + + const peer = peers()[0]; + expect(peer.cameraStream?.inputId).toBe('in1'); + expect(peer.cameraStream?.audio?.id).toBe('a1'); + expect(peer.screenShareStream?.inputId).toBe('in2'); + expect(peer.screenShareStream?.audio?.id).toBe('a2'); + expect(peer.customStreams.map((s) => s.inputId)).toEqual(['in3']); + }); + it('trackForwardingRemoved drops only the removed input, leaving the peer other stream routable', () => { forwardCamera('p1', 'in1'); forwardScreenShare('p1', 'in2'); @@ -232,26 +257,73 @@ describe('composition store reducer', () => { forwardCamera('p1', 'in1'); forwardCamera('p2', 'in2'); const p1Before = compositionStore.getPeers().find((p) => p.id === 'p1'); + const p2Before = compositionStore.getPeers().find((p) => p.id === 'p2'); // a mutation scoped to p2 must not re-derive p1 - apply({ type: 'vadNotification', data: { roomId: 'r1', peerId: 'p2', trackId: 'a1', status: 'speech' } as never }); + apply({ + type: 'trackMetadataUpdated', + data: { roomId: 'r1', peerId: 'p2', track: track('a1', 'audio', { type: 'camera', paused: true }) } as never, + }); const after = compositionStore.getPeers(); expect(after.find((p) => p.id === 'p1')).toBe(p1Before); - expect(after.find((p) => p.id === 'p2')).not.toBe(p1Before); + expect(after.find((p) => p.id === 'p2')).not.toBe(p2Before); }); it('getPeer returns a stable reference until that peer changes', () => { forwardCamera('p1', 'in1'); forwardCamera('p2', 'in2'); - const a = compositionStore.getPeer('in1'); + const a = compositionStore.getPeer('p1'); // unrelated commit on p2 → p1 reference unchanged - apply({ type: 'vadNotification', data: { roomId: 'r1', peerId: 'p2', trackId: 'a1', status: 'speech' } as never }); - expect(compositionStore.getPeer('in1')).toBe(a); + apply({ + type: 'trackMetadataUpdated', + data: { roomId: 'r1', peerId: 'p2', track: track('a1', 'audio', { type: 'camera', paused: true }) } as never, + }); + expect(compositionStore.getPeer('p1')).toBe(a); // commit on p1 → new reference + apply({ + type: 'trackMetadataUpdated', + data: { roomId: 'r1', peerId: 'p1', track: track('a1', 'audio', { type: 'camera', paused: true }) } as never, + }); + expect(compositionStore.getPeer('p1')).not.toBe(a); + }); + + it('VAD change does not re-derive peers but still notifies subscribers', () => { + forwardCamera('p1', 'in1'); + const peersBefore = compositionStore.getPeers(); + const p1Before = peersBefore[0]; + + let calls = 0; + const unsubscribe = compositionStore.subscribe(() => calls++); + apply({ type: 'vadNotification', data: { roomId: 'r1', peerId: 'p1', trackId: 'a1', status: 'speech' } as never }); + unsubscribe(); + + // peer list and the peer object keep their references: no peer re-render + expect(compositionStore.getPeers()).toBe(peersBefore); + expect(compositionStore.getPeers()[0]).toBe(p1Before); + // but useSpeakingState's source updated and subscribers were notified + expect(compositionStore.getVadStatus('p1')).toBe('speech'); + expect(calls).toBe(1); + }); + + it('VAD change leaves getPeer references stable for the speaking peer', () => { + forwardCamera('p1', 'in1'); + const before = compositionStore.getPeer('p1'); apply({ type: 'vadNotification', data: { roomId: 'r1', peerId: 'p1', trackId: 'a1', status: 'speech' } as never }); - expect(compositionStore.getPeer('in1')).not.toBe(a); + expect(compositionStore.getPeer('p1')).toBe(before); + }); + + it('a repeated VAD status does not notify subscribers', () => { + forwardCamera('p1', 'in1'); + apply({ type: 'vadNotification', data: { roomId: 'r1', peerId: 'p1', trackId: 'a1', status: 'speech' } as never }); + + let calls = 0; + const unsubscribe = compositionStore.subscribe(() => calls++); + apply({ type: 'vadNotification', data: { roomId: 'r1', peerId: 'p1', trackId: 'a1', status: 'speech' } as never }); + unsubscribe(); + + expect(calls).toBe(0); }); it('notifies subscribers on mutation and stops after unsubscribe', () => { diff --git a/yarn.lock b/yarn.lock index 398a13b0..a38f0c98 100644 --- a/yarn.lock +++ b/yarn.lock @@ -7324,7 +7324,6 @@ __metadata: fastify-healthcheck: "npm:^5.1.0" fluent-json-schema: "npm:^5.0.0" pino-pretty: "npm:^13.0.0" - tsc: "npm:^2.0.4" tsx: "npm:^4.19.3" typescript: "npm:~5.8.2" languageName: unknown @@ -8057,15 +8056,6 @@ __metadata: languageName: node linkType: hard -"tsc@npm:^2.0.4": - version: 2.0.4 - resolution: "tsc@npm:2.0.4" - bin: - tsc: bin/tsc - checksum: 10c0/4651d344891d995e62ab7ca64ce0a8597bbdc2d392886c9956d15caab4dc9bfe86e759d3385b3f97c49fb5294194a161d6812673e3f51e46357c82482f32c3ab - languageName: node - linkType: hard - "tslib@npm:2.8.1, tslib@npm:^2.0.1": version: 2.8.1 resolution: "tslib@npm:2.8.1" From 231dda97cf45462734a5fbb764e5f5ad16b0c148 Mon Sep 17 00:00:00 2001 From: Tomasz Mazur <47872060+AHGIJMKLKKZNPJKQR@users.noreply.github.com> Date: Thu, 25 Jun 2026 17:48:35 +0200 Subject: [PATCH 07/12] Add global declaration --- packages/composition/src/eventBus.ts | 4 +++- packages/composition/src/hooks.ts | 5 ++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/packages/composition/src/eventBus.ts b/packages/composition/src/eventBus.ts index ca146d9d..39489cbc 100644 --- a/packages/composition/src/eventBus.ts +++ b/packages/composition/src/eventBus.ts @@ -2,4 +2,6 @@ export interface CompositionEventBus { on(eventName: string, callback: (data: T) => void): () => void; } -export const eventBus = (globalThis as unknown as { eventBus: CompositionEventBus }).eventBus; +declare global { + const eventBus: CompositionEventBus; +} diff --git a/packages/composition/src/hooks.ts b/packages/composition/src/hooks.ts index 5d8ac3bf..499abc03 100644 --- a/packages/composition/src/hooks.ts +++ b/packages/composition/src/hooks.ts @@ -6,7 +6,10 @@ import type { PeerWithStreams, VadStatus } from './types'; * All peers in the linked room, projected into composition streams. Flat list — * the worker is not a peer, so there is no local/remote split. */ -export function usePeers(): PeerWithStreams[] { +export function usePeers(): PeerWithStreams< + PeerMetadata, + ServerMetadata +>[] { const peers = useSyncExternalStore(compositionStore.subscribe, compositionStore.getPeers, compositionStore.getPeers); return peers as PeerWithStreams[]; } From 858800594f32eb6f704554223ae44272ccdc159a Mon Sep 17 00:00:00 2001 From: Tomasz Mazur <47872060+AHGIJMKLKKZNPJKQR@users.noreply.github.com> Date: Thu, 25 Jun 2026 17:57:00 +0200 Subject: [PATCH 08/12] Make TrackRemoved update vad --- packages/composition/src/store.ts | 6 +++++- packages/composition/tests/store.test.ts | 14 ++++++++++++++ 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/packages/composition/src/store.ts b/packages/composition/src/store.ts index 8936a7e2..ac94c370 100644 --- a/packages/composition/src/store.ts +++ b/packages/composition/src/store.ts @@ -251,7 +251,11 @@ class CompositionStore { const peer = this.peers.get(data.peerId); if (!peer) return false; - if (!peer.tracks.delete(data.track.id)) return false; + + const track = peer.tracks.get(data.track.id); + if (!track) return false; + peer.tracks.delete(data.track.id); + if (track.type === 'audio' && track.inputId) this.vad.delete(track.inputId); this.replacePeer(data.peerId); return true; diff --git a/packages/composition/tests/store.test.ts b/packages/composition/tests/store.test.ts index 10eede52..17e296ec 100644 --- a/packages/composition/tests/store.test.ts +++ b/packages/composition/tests/store.test.ts @@ -119,6 +119,20 @@ describe('composition store reducer', () => { expect(compositionStore.getSnapshot().vad).toEqual({}); }); + it('trackRemoved clears the audio input vad so a re-forward does not resurface speech', () => { + forwardCamera(); + apply({ type: 'vadNotification', data: { roomId: 'r1', peerId: 'p1', trackId: 'a1', status: 'speech' } as never }); + expect(compositionStore.getVadStatus('p1')).toBe('speech'); + + apply({ type: 'trackRemoved', data: { roomId: 'r1', peerId: 'p1', track: track('a1', 'audio', {}) } as never }); + expect(compositionStore.getSnapshot().vad).toEqual({}); + expect(compositionStore.getVadStatus('p1')).toBe('silence'); + + // re-forwarding the same input must start silent, not inherit the stale status + forwardCamera(); + expect(compositionStore.getVadStatus('p1')).toBe('silence'); + }); + it('peerMetadataUpdated splits into { peer, server }', () => { apply({ type: 'peerConnected', data: { roomId: 'r1', peerId: 'p1', peerType: 'webrtc' } as never }); apply({ From b36d3f91e8a1c4827363296e155e4d1b162ad308 Mon Sep 17 00:00:00 2001 From: Tomasz Mazur <47872060+AHGIJMKLKKZNPJKQR@users.noreply.github.com> Date: Thu, 25 Jun 2026 18:29:33 +0200 Subject: [PATCH 09/12] Fix eventBus typo --- packages/composition/src/eventBus.ts | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/packages/composition/src/eventBus.ts b/packages/composition/src/eventBus.ts index 39489cbc..36af027a 100644 --- a/packages/composition/src/eventBus.ts +++ b/packages/composition/src/eventBus.ts @@ -3,5 +3,8 @@ export interface CompositionEventBus { } declare global { - const eventBus: CompositionEventBus; + // eslint-disable-next-line no-var + var eventBus: CompositionEventBus; } + +export const eventBus = globalThis.eventBus; From 0164903883f8cd16d8629b989de7e8a94918384a Mon Sep 17 00:00:00 2001 From: Tomasz Mazur <47872060+AHGIJMKLKKZNPJKQR@users.noreply.github.com> Date: Fri, 26 Jun 2026 14:34:54 +0200 Subject: [PATCH 10/12] Rename internal to core --- packages/composition/package.json | 13 +++++++------ packages/composition/src/{internal.ts => core.ts} | 0 2 files changed, 7 insertions(+), 6 deletions(-) rename packages/composition/src/{internal.ts => core.ts} (100%) diff --git a/packages/composition/package.json b/packages/composition/package.json index efca9767..2ae63368 100644 --- a/packages/composition/package.json +++ b/packages/composition/package.json @@ -33,10 +33,10 @@ "import": "./dist/index.mjs", "require": "./dist/index.js" }, - "./internal": { - "types": "./dist/internal.d.ts", - "import": "./dist/internal.mjs", - "require": "./dist/internal.js" + "./core": { + "types": "./dist/core.d.ts", + "import": "./dist/core.mjs", + "require": "./dist/core.js" } }, "scripts": { @@ -51,14 +51,15 @@ "tsup": { "entry": [ "src/index.ts", - "src/internal.ts" + "src/core.ts" ], "minify": false, "format": [ "cjs", "esm" ], - "outDir": "dist" + "outDir": "dist", + "dts": true }, "peerDependencies": { "react": ">=18" diff --git a/packages/composition/src/internal.ts b/packages/composition/src/core.ts similarity index 100% rename from packages/composition/src/internal.ts rename to packages/composition/src/core.ts From bdba2dc2d13d136847d7a4375870132d4ef92d59 Mon Sep 17 00:00:00 2001 From: Tomasz Mazur <47872060+AHGIJMKLKKZNPJKQR@users.noreply.github.com> Date: Fri, 26 Jun 2026 14:59:42 +0200 Subject: [PATCH 11/12] Export interface --- packages/composition/src/core.ts | 2 +- packages/composition/src/store.ts | 8 +++++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/packages/composition/src/core.ts b/packages/composition/src/core.ts index e2b73cb6..22a22b18 100644 --- a/packages/composition/src/core.ts +++ b/packages/composition/src/core.ts @@ -1,2 +1,2 @@ export { compositionStore } from './store'; -export type { CompositionEvent, RoomSnapshot } from './store'; +export type { CompositionEvent, RoomSnapshot, CompositionStoreFeed } from './store'; diff --git a/packages/composition/src/store.ts b/packages/composition/src/store.ts index ac94c370..a8450917 100644 --- a/packages/composition/src/store.ts +++ b/packages/composition/src/store.ts @@ -49,6 +49,12 @@ type InternalPeer = { tracks: Map; }; +export interface CompositionStoreFeed { + seedFromRoom(room: Room): void; + applyNotification(event: CompositionEvent): void; + reset(): void; +} + const EMPTY_SNAPSHOT: RoomSnapshot = { peers: [], vad: {} }; const assertNever = (event: never): never => { @@ -80,7 +86,7 @@ const roleOf = (stream: Stream): 'camera' | 'screenShare' | 'custom' => { return 'custom'; }; -class CompositionStore { +class CompositionStore implements CompositionStoreFeed { private peers = new Map(); private roomId: string | undefined; private vad = new Map(); From fa70461388554ec656b5e8bd21f55809b4bb0dce Mon Sep 17 00:00:00 2001 From: Tomasz Mazur <47872060+AHGIJMKLKKZNPJKQR@users.noreply.github.com> Date: Fri, 26 Jun 2026 15:27:53 +0200 Subject: [PATCH 12/12] Prevent re-emitting js-server-sdk types --- packages/composition/package.json | 6 ++++-- yarn.lock | 1 + 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/packages/composition/package.json b/packages/composition/package.json index 2ae63368..ec829839 100644 --- a/packages/composition/package.json +++ b/packages/composition/package.json @@ -40,7 +40,7 @@ } }, "scripts": { - "build": "tsup --dts-resolve", + "build": "tsup", "format": "prettier --write .", "format:check": "prettier --check .", "typecheck": "tsc --noEmit", @@ -59,9 +59,11 @@ "esm" ], "outDir": "dist", - "dts": true + "dts": true, + "clean": true }, "peerDependencies": { + "@fishjam-cloud/js-server-sdk": "workspace:^", "react": ">=18" }, "devDependencies": { diff --git a/yarn.lock b/yarn.lock index a38f0c98..bc49337f 100644 --- a/yarn.lock +++ b/yarn.lock @@ -1579,6 +1579,7 @@ __metadata: tsup: "npm:^8.4.0" vitest: "npm:^3.0.0" peerDependencies: + "@fishjam-cloud/js-server-sdk": "workspace:^" react: ">=18" languageName: unknown linkType: soft