Skip to content

Commit 859c90e

Browse files
committed
feat: optimize external link streaming
1 parent 857be0f commit 859c90e

9 files changed

Lines changed: 153 additions & 43 deletions

File tree

README.md

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,9 @@ function fetchStream(
183183
```
184184
- Merges `EXTERNAL_LINKS` into a single binary stream.
185185
- Preserves the original format (`JSON_ARRAY`, `CSV`, `ARROW_STREAM`).
186+
- Throws if the result is `INLINE`.
186187
- Ends as an empty stream when no external links exist.
188+
- `forceMerge: true` forces merge even when there is only a single external link.
187189

188190
### mergeExternalLinks(statementResult, auth, options)
189191
```ts
@@ -196,7 +198,8 @@ function mergeExternalLinks(
196198
- Creates a merged stream from `EXTERNAL_LINKS`, uploads it via
197199
`options.mergeStreamToExternalLink`, then returns a `StatementResult`
198200
with a single external link.
199-
- Returns the original result unchanged when input is `INLINE`.
201+
- Returns the original result unchanged when input is `INLINE` or already a
202+
single external link (unless `forceMerge: true`).
200203

201204
### Options (Summary)
202205
```ts
@@ -228,10 +231,12 @@ type FetchAllOptions = {
228231
229232
type FetchStreamOptions = {
230233
signal?: AbortSignal
234+
forceMerge?: boolean
231235
}
232236
233237
type MergeExternalLinksOptions = {
234238
signal?: AbortSignal
239+
forceMerge?: boolean
235240
mergeStreamToExternalLink: (stream: Readable) => Promise<{
236241
externalLink: string
237242
byte_count: number
@@ -243,6 +248,7 @@ type MergeExternalLinksOptions = {
243248
## Notes
244249
- Databricks requires `INLINE` results to use `JSON_ARRAY` format. `INLINE + CSV` is rejected by the API.
245250
- `EXTERNAL_LINKS` are merged using `@bitofsky/merge-streams`.
251+
- Requires Node.js >= 20 for global `fetch` and Web streams.
246252

247253
## Development
248254
```bash

src/api/fetchRow.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
1-
import { parser } from 'stream-json'
2-
import { streamArray } from 'stream-json/streamers/StreamArray'
31
import type { Readable } from 'node:stream'
4-
import { getChunk } from '../databricks-api.js'
5-
import { DatabricksSqlError, AbortError } from '../errors.js'
62
import type {
73
AuthInfo,
84
FetchRowsOptions,
95
RowArray,
106
RowObject,
117
StatementResult,
128
} from '../types.js'
9+
import { parser } from 'stream-json'
10+
import { streamArray } from 'stream-json/streamers/StreamArray'
11+
import { getChunk } from '../databricks-api.js'
12+
import { DatabricksSqlError, AbortError } from '../errors.js'
1313
import { validateSucceededResult } from '../util.js'
1414
import { createRowMapper } from '../createRowMapper.js'
1515
import { fetchStream } from './fetchStream.js'

src/api/fetchStream.ts

Lines changed: 66 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
1-
import { PassThrough, type Readable } from 'node:stream'
2-
import { mergeStreamsFromUrls, type MergeFormat } from '@bitofsky/merge-streams'
31
import type {
42
AuthInfo,
3+
ExternalLinkInfo,
54
StatementResult,
65
FetchStreamOptions,
76
StatementManifest,
87
} from '../types.js'
8+
import { PassThrough, Readable } from 'node:stream'
9+
import { mergeStreamsFromUrls, type MergeFormat } from '@bitofsky/merge-streams'
910
import { getChunk } from '../databricks-api.js'
10-
import { AbortError } from '../errors.js'
11-
import { validateSucceededResult } from '../util.js'
11+
import { AbortError, DatabricksSqlError } from '../errors.js'
12+
import { pipeUrlToOutput, validateSucceededResult } from '../util.js'
1213

1314
/**
1415
* Create a readable stream from statement result.
@@ -20,31 +21,32 @@ export function fetchStream(
2021
auth: AuthInfo,
2122
options: FetchStreamOptions = {}
2223
): Readable {
23-
const { signal } = options
24+
const { signal, forceMerge } = options
2425
const manifest = validateSucceededResult(statementResult)
2526
const format = manifest.format as MergeFormat
2627

28+
if (statementResult.result?.data_array) {
29+
throw new DatabricksSqlError(
30+
'fetchStream only supports EXTERNAL_LINKS results',
31+
'UNSUPPORTED_FORMAT',
32+
statementResult.statement_id
33+
)
34+
}
35+
2736
// Create PassThrough as output (readable by consumer)
2837
const output = new PassThrough()
2938

3039
// Handle AbortSignal
3140
if (signal) {
32-
const onAbort = () => {
33-
output.destroy(new AbortError('Stream aborted'))
34-
}
41+
const onAbort = () => output.destroy(new AbortError('Stream aborted'))
3542
signal.addEventListener('abort', onAbort, { once: true })
36-
output.once('close', () => {
37-
signal.removeEventListener('abort', onAbort)
38-
})
43+
output.once('close', () => signal.removeEventListener('abort', onAbort))
3944
}
4045

4146
// Start async merge process
4247
// Errors are forwarded to the stream consumer via destroy.
43-
mergeChunksToStream(statementResult, auth, manifest, format, output, signal).catch(
44-
(err) => {
45-
output.destroy(err as Error)
46-
}
47-
)
48+
mergeChunksToStream(statementResult, auth, manifest, format, output, signal, forceMerge)
49+
.catch((err) => output.destroy(err as Error))
4850

4951
return output
5052
}
@@ -58,29 +60,59 @@ async function mergeChunksToStream(
5860
manifest: StatementManifest,
5961
format: MergeFormat,
6062
output: PassThrough,
61-
signal?: AbortSignal
63+
signal?: AbortSignal,
64+
forceMerge?: boolean
6265
): Promise<void> {
63-
const result = statementResult.result
64-
65-
// Collect all external link URLs
66-
let urls = result?.external_links?.map((link) => link.external_link) ?? []
67-
68-
// If no URLs in initial result, fetch from chunks
69-
if (urls.length === 0 && manifest.total_chunk_count > 0) {
70-
for (let i = 0; i < manifest.total_chunk_count; i++) {
71-
if (signal?.aborted) throw new AbortError('Aborted while collecting URLs')
72-
73-
// Chunk metadata contains external link URLs when results are chunked.
74-
const chunkData = await getChunk(auth, statementResult.statement_id, i, signal)
75-
const chunkUrls = chunkData.external_links?.map((link) => link.external_link) ?? []
76-
urls.push(...chunkUrls)
77-
}
78-
}
66+
const urls = await collectExternalUrls(statementResult, auth, manifest, signal)
7967

8068
// No external links - close the stream
8169
if (urls.length === 0)
8270
return void output.end()
8371

72+
// Single URL - pipe directly to output unless forcing merge
73+
if (urls.length === 1 && !forceMerge)
74+
// Avoid merge-streams overhead for a single URL unless forced.
75+
return pipeUrlToOutput(urls[0]!, output, signal)
76+
8477
// Merge all URLs using merge-streams
85-
await mergeStreamsFromUrls(format, signal ? { urls, output, signal } : { urls, output })
78+
return mergeStreamsFromUrls(format, signal ? { urls, output, signal } : { urls, output })
79+
}
80+
81+
async function collectExternalUrls(
82+
statementResult: StatementResult,
83+
auth: AuthInfo,
84+
manifest: StatementManifest,
85+
signal?: AbortSignal
86+
): Promise<string[]> {
87+
const urls = extractExternalLinks(statementResult.result?.external_links)
88+
if (urls.length > 0)
89+
return urls
90+
91+
if (!manifest.total_chunk_count)
92+
return []
93+
94+
const chunkUrls: string[] = []
95+
for (let i = 0; i < manifest.total_chunk_count; i++) {
96+
if (signal?.aborted)
97+
throw new AbortError('Aborted while collecting URLs')
98+
99+
// Chunk metadata contains external link URLs when results are chunked.
100+
const chunkData = await getChunk(auth, statementResult.statement_id, i, signal)
101+
chunkUrls.push(...extractExternalLinks(chunkData.external_links))
102+
}
103+
104+
return chunkUrls
105+
}
106+
107+
function extractExternalLinks(externalLinks?: ExternalLinkInfo[]): string[] {
108+
if (!externalLinks)
109+
return []
110+
111+
return externalLinks
112+
.map((link) => link.external_link)
113+
.filter(isNonEmptyString)
114+
}
115+
116+
function isNonEmptyString(value: unknown): value is string {
117+
return typeof value === 'string' && value.length > 0
86118
}

src/api/mergeExternalLinks.ts

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,27 @@ export async function mergeExternalLinks(
1717
auth: AuthInfo,
1818
options: MergeExternalLinksOptions
1919
): Promise<StatementResult> {
20-
const { signal, mergeStreamToExternalLink } = options
20+
const { signal, mergeStreamToExternalLink, forceMerge } = options
2121

2222
// If not external links, return original as-is
2323
if (!statementResult.result?.external_links)
2424
return statementResult
2525

26+
if (!forceMerge) {
27+
const totalChunks = statementResult.manifest?.total_chunk_count
28+
const externalLinks = statementResult.result.external_links
29+
const isSingleChunk = totalChunks === undefined ? externalLinks.length <= 1 : totalChunks <= 1
30+
31+
// Skip merging when a single external link already exists unless forced.
32+
if (isSingleChunk && externalLinks.length <= 1)
33+
return statementResult
34+
}
35+
2636
// Get merged stream via fetchStream
27-
const stream = fetchStream(statementResult, auth, signal ? { signal } : {})
37+
const stream = fetchStream(statementResult, auth, {
38+
...signal ? { signal } : {},
39+
...forceMerge !== undefined ? { forceMerge } : {},
40+
})
2841

2942
// Upload via callback
3043
const uploadResult = await mergeStreamToExternalLink(stream)

src/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,4 @@ export type * from './types.js'
55
export * from './errors.js'
66

77
// Core functions
8-
export * from './api'
8+
export * from './api/index.js'

src/types.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,10 @@ export type RowObject = Record<string, unknown>
150150
export type FetchRowFormat = 'JSON_ARRAY' | 'JSON_OBJECT'
151151

152152
/** Options for fetchStream */
153-
export type FetchStreamOptions = SignalOptions
153+
export type FetchStreamOptions = SignalOptions & {
154+
/** Force merge even when there is only a single external link */
155+
forceMerge?: boolean
156+
}
154157

155158
/** Options for fetchRow */
156159
export type FetchRowsOptions = SignalOptions & {
@@ -180,6 +183,8 @@ export type MergeExternalLinksResult = {
180183
export type MergeExternalLinksOptions = SignalOptions & {
181184
/** Callback to upload merged stream to external link */
182185
mergeStreamToExternalLink: (stream: Readable) => Promise<MergeExternalLinksResult>
186+
/** Force merge even when there is only a single external link chunk */
187+
forceMerge?: boolean
183188
}
184189

185190
/**

src/util.ts

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
import { Readable } from 'node:stream'
2+
import { pipeline } from 'node:stream/promises'
3+
import type { ReadableStream as WebReadableStream } from 'node:stream/web'
14
import type { StatementResult, StatementManifest } from './types.js'
25
import { AbortError, DatabricksSqlError } from './errors.js'
36

@@ -80,3 +83,34 @@ export function validateSucceededResult(
8083

8184
return statementResult.manifest
8285
}
86+
87+
function isWebReadableStream(body: unknown): body is WebReadableStream {
88+
return typeof (body as WebReadableStream).getReader === 'function'
89+
}
90+
91+
export async function pipeUrlToOutput(
92+
url: string,
93+
output: NodeJS.WritableStream,
94+
signal?: AbortSignal
95+
): Promise<void> {
96+
// Uses Node 20+ global fetch with Web streams.
97+
if (signal?.aborted)
98+
throw new AbortError('Aborted while streaming')
99+
100+
const response = await fetch(url, signal ? { signal } : undefined)
101+
if (!response.ok) {
102+
throw new Error(
103+
`Failed to fetch external link: ${response.status} ${response.statusText}`
104+
)
105+
}
106+
107+
if (!response.body)
108+
return void output.end()
109+
110+
const body = response.body
111+
const input = isWebReadableStream(body)
112+
? Readable.fromWeb(body)
113+
: (body as NodeJS.ReadableStream)
114+
115+
await pipeline(input, output)
116+
}

test/mergeExternalLinks.spec.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ describe('mergeExternalLinks', () => {
6262

6363
const result = await mergeExternalLinks(mockExternalLinksResult, mockAuth, {
6464
mergeStreamToExternalLink: mockCallback,
65+
forceMerge: true,
6566
})
6667

6768
// Verify callback was called
@@ -113,6 +114,7 @@ describe('mergeExternalLinks', () => {
113114

114115
await mergeExternalLinks(mockExternalLinksResult, mockAuth, {
115116
mergeStreamToExternalLink: mockCallback,
117+
forceMerge: true,
116118
})
117119

118120
// Verify callback received the merged data
@@ -154,6 +156,7 @@ describe('mergeExternalLinks', () => {
154156
mergeExternalLinks(mockExternalLinksResult, mockAuth, {
155157
signal: controller.signal,
156158
mergeStreamToExternalLink: mockCallback,
159+
forceMerge: true,
157160
})
158161
).rejects.toThrow(/abort/i)
159162
})
@@ -170,6 +173,7 @@ describe('mergeExternalLinks', () => {
170173
await expect(
171174
mergeExternalLinks(mockExternalLinksResult, mockAuth, {
172175
mergeStreamToExternalLink: mockCallback,
176+
forceMerge: true,
173177
})
174178
).rejects.toThrow('Upload failed')
175179
})
@@ -189,6 +193,7 @@ describe('mergeExternalLinks', () => {
189193

190194
const result = await mergeExternalLinks(mockExternalLinksResult, mockAuth, {
191195
mergeStreamToExternalLink: vi.fn().mockResolvedValue(uploadResult),
196+
forceMerge: true,
192197
})
193198

194199
// Verify original manifest properties are preserved
@@ -198,4 +203,15 @@ describe('mergeExternalLinks', () => {
198203
mockExternalLinksResult.manifest?.total_row_count
199204
)
200205
})
206+
207+
it('should return original for single external link by default', async () => {
208+
const mockCallback = vi.fn()
209+
210+
const result = await mergeExternalLinks(mockExternalLinksResult, mockAuth, {
211+
mergeStreamToExternalLink: mockCallback,
212+
})
213+
214+
expect(result).toBe(mockExternalLinksResult)
215+
expect(mockCallback).not.toHaveBeenCalled()
216+
})
201217
})

test/s3.integration.spec.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -248,6 +248,7 @@ describe.skipIf(shouldSkip)('S3 Integration Tests', () => {
248248

249249
// Merge and upload to S3
250250
const mergedResult = await mergeExternalLinks(result, auth, {
251+
forceMerge: true,
251252
mergeStreamToExternalLink: (stream) => uploadToS3(stream, 'CSV'),
252253
})
253254

@@ -296,6 +297,7 @@ describe.skipIf(shouldSkip)('S3 Integration Tests', () => {
296297
}
297298

298299
const mergedResult = await mergeExternalLinks(result, auth, {
300+
forceMerge: true,
299301
mergeStreamToExternalLink: (stream) => uploadToS3(stream, 'JSON_ARRAY'),
300302
})
301303

@@ -332,6 +334,7 @@ describe.skipIf(shouldSkip)('S3 Integration Tests', () => {
332334
}
333335

334336
const mergedResult = await mergeExternalLinks(result, auth, {
337+
forceMerge: true,
335338
mergeStreamToExternalLink: (stream) => uploadToS3(stream, 'ARROW_STREAM'),
336339
})
337340

@@ -368,6 +371,7 @@ describe.skipIf(shouldSkip)('S3 Integration Tests', () => {
368371
}
369372

370373
const mergedResult = await mergeExternalLinks(result, auth, {
374+
forceMerge: true,
371375
mergeStreamToExternalLink: (stream) => uploadToS3(stream, 'CSV'),
372376
})
373377

0 commit comments

Comments
 (0)