Skip to content

Commit 6c5c5b7

Browse files
committed
feat: add query metrics fetching in executeStatement and update related types
1 parent 5b344f3 commit 6c5c5b7

10 files changed

Lines changed: 300 additions & 36 deletions

src/api/executeStatement.ts

Lines changed: 27 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,9 @@ import type {
44
ExecuteStatementRequest,
55
StatementResult,
66
StatementState,
7+
QueryMetrics,
78
} from '../types.js'
8-
import { postStatement, getStatement, cancelStatement } from '../databricks-api.js'
9+
import { postStatement, getStatement, cancelStatement, getQueryMetrics } from '../databricks-api.js'
910
import { extractWarehouseId, throwIfAborted, delay } from '../util.js'
1011
import {
1112
DatabricksSqlError,
@@ -19,8 +20,21 @@ const TERMINAL_STATES = new Set<StatementState>([
1920
'CANCELED',
2021
'CLOSED',
2122
])
22-
const POLL_INTERVAL_MS = 500
23-
const MAX_POLL_INTERVAL_MS = 5000
23+
const POLL_INTERVAL_MS = 5000
24+
25+
async function fetchMetrics(
26+
auth: AuthInfo,
27+
statementId: string,
28+
signal?: AbortSignal
29+
): Promise<QueryMetrics | undefined> {
30+
try {
31+
const queryInfo = await getQueryMetrics(auth, statementId, signal)
32+
return queryInfo.metrics
33+
} catch {
34+
// Ignore metrics fetch errors - non-critical
35+
return undefined
36+
}
37+
}
2438

2539
/**
2640
* Execute SQL statement and poll until completion
@@ -31,13 +45,17 @@ export async function executeStatement(
3145
options: ExecuteStatementOptions = {}
3246
): Promise<StatementResult> {
3347
const warehouseId = options.warehouse_id ?? extractWarehouseId(auth.httpPath)
34-
const { signal, onProgress } = options
48+
const { signal, onProgress, enableMetrics } = options
3549

3650
// Check if already aborted
3751
throwIfAborted(signal, 'executeStatement')
3852

53+
// Helper to call onProgress with optional metrics
54+
const emitProgress = onProgress
55+
? async (statementId: string) => onProgress(result.status, enableMetrics ? await fetchMetrics(auth, statementId, signal) : undefined)
56+
: undefined
57+
3958
// 1. Build request (filter out undefined values)
40-
// Keep payload small and aligned with the REST API contract.
4159
const request = Object.fromEntries(
4260
Object.entries({
4361
warehouse_id: warehouseId,
@@ -58,31 +76,19 @@ export async function executeStatement(
5876
let result = await postStatement(auth, request, signal)
5977

6078
// 3. Poll until terminal state
61-
let pollInterval = POLL_INTERVAL_MS
62-
6379
while (!TERMINAL_STATES.has(result.status.state)) {
64-
// Check abort signal
6580
if (signal?.aborted) {
66-
// Try to cancel on server
67-
await cancelStatement(auth, result.statement_id).catch(() => {
68-
// Ignore cancel errors
69-
})
81+
await cancelStatement(auth, result.statement_id).catch(() => { })
7082
throw new AbortError('Aborted during polling')
7183
}
7284

73-
// Call progress callback
74-
onProgress?.(result.status)
75-
76-
// Wait before next poll (exponential backoff)
77-
await delay(pollInterval, signal)
78-
pollInterval = Math.min(pollInterval * 1.5, MAX_POLL_INTERVAL_MS)
79-
80-
// Get current status
85+
await emitProgress?.(result.statement_id)
86+
await delay(POLL_INTERVAL_MS, signal)
8187
result = await getStatement(auth, result.statement_id, signal)
8288
}
8389

8490
// 4. Final progress callback
85-
onProgress?.(result.status)
91+
await emitProgress?.(result.statement_id)
8692

8793
// 5. Handle terminal states
8894
if (result.status.state === 'SUCCEEDED')

src/api/fetchAll.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import type {
66
RowObject,
77
StatementResult,
88
} from '../types.js'
9+
910
import { fetchRow } from './fetchRow.js'
1011

1112
/**

src/api/fetchRow.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,14 @@ import type {
66
RowObject,
77
StatementResult,
88
} from '../types.js'
9+
910
import { parser } from 'stream-json'
1011
import { streamArray } from 'stream-json/streamers/StreamArray'
12+
1113
import { getChunk } from '../databricks-api.js'
12-
import { DatabricksSqlError, AbortError } from '../errors.js'
13-
import { validateSucceededResult } from '../util.js'
1414
import { createRowMapper } from '../createRowMapper.js'
15+
import { AbortError, DatabricksSqlError } from '../errors.js'
16+
import { validateSucceededResult } from '../util.js'
1517
import { fetchStream } from './fetchStream.js'
1618

1719
/**

src/api/fetchStream.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,16 @@
1+
import type { MergeFormat } from '@bitofsky/merge-streams'
12
import type {
23
AuthInfo,
34
ExternalLinkInfo,
4-
StatementResult,
55
FetchStreamOptions,
66
StatementManifest,
7+
StatementResult,
78
} from '../types.js'
9+
810
import { PassThrough, Readable } from 'node:stream'
9-
import { mergeStreamsFromUrls, type MergeFormat } from '@bitofsky/merge-streams'
11+
12+
import { mergeStreamsFromUrls } from '@bitofsky/merge-streams'
13+
1014
import { getChunk } from '../databricks-api.js'
1115
import { AbortError, DatabricksSqlError } from '../errors.js'
1216
import { pipeUrlToOutput, validateSucceededResult } from '../util.js'

src/api/mergeExternalLinks.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
import type {
22
AuthInfo,
3-
StatementResult,
43
MergeExternalLinksOptions,
4+
StatementResult,
55
} from '../types.js'
6-
import { fetchStream } from './fetchStream.js'
6+
77
import { validateSucceededResult } from '../util.js'
8+
import { fetchStream } from './fetchStream.js'
89

910
/**
1011
* Merge external links from StatementResult into a single stream,

src/databricks-api.ts

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,14 @@ import type {
33
ExecuteStatementRequest,
44
StatementResult,
55
GetChunkResponse,
6+
QueryInfo,
67
} from './types.js'
78
import { httpRequest } from './http.js'
89

910
// Base path for Databricks SQL Statement Execution API.
1011
const BASE_PATH = '/api/2.0/sql/statements'
12+
// Base path for Query History API.
13+
const HISTORY_BASE_PATH = '/api/2.0/sql/history/queries'
1114

1215
/**
1316
* Execute SQL statement
@@ -74,3 +77,19 @@ export async function getChunk(
7477
...(signal ? { signal } : {}),
7578
})
7679
}
80+
81+
/**
82+
* Get query metrics from Query History API
83+
* GET /api/2.0/sql/history/queries/{query_id}?include_metrics=true
84+
*/
85+
export async function getQueryMetrics(
86+
auth: AuthInfo,
87+
queryId: string,
88+
signal?: AbortSignal
89+
): Promise<QueryInfo> {
90+
return httpRequest<QueryInfo>(auth, {
91+
method: 'GET',
92+
path: `${HISTORY_BASE_PATH}/${queryId}?include_metrics=true`,
93+
...(signal ? { signal } : {}),
94+
})
95+
}

src/types.ts

Lines changed: 82 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,85 @@ export type StatementStatus = {
2828
}
2929
}
3030

31+
/**
32+
* Query execution metrics from Query History API
33+
* @see https://docs.databricks.com/api/workspace/queryhistory/list
34+
*/
35+
export type QueryMetrics = {
36+
/** Total time in milliseconds */
37+
total_time_ms?: number
38+
/** Compilation time in milliseconds */
39+
compilation_time_ms?: number
40+
/** Execution time in milliseconds */
41+
execution_time_ms?: number
42+
/** Result fetch time in milliseconds */
43+
result_fetch_time_ms?: number
44+
/** Query execution time in milliseconds */
45+
query_execution_time_ms?: number
46+
/** Metadata time in milliseconds */
47+
metadata_time_ms?: number
48+
/** Task total time in milliseconds */
49+
task_total_time_ms?: number
50+
/** Photon total time in milliseconds */
51+
photon_total_time_ms?: number
52+
/** Query compilation start timestamp */
53+
query_compilation_start_timestamp?: number
54+
/** Bytes read */
55+
read_bytes?: number
56+
/** Remote bytes read */
57+
read_remote_bytes?: number
58+
/** Remote bytes written */
59+
write_remote_bytes?: number
60+
/** Cache bytes read */
61+
read_cache_bytes?: number
62+
/** Bytes spilled to disk */
63+
spill_to_disk_bytes?: number
64+
/** Network bytes sent */
65+
network_sent_bytes?: number
66+
/** Pruned bytes */
67+
pruned_bytes?: number
68+
/** Rows produced count */
69+
rows_produced_count?: number
70+
/** Rows read count */
71+
rows_read_count?: number
72+
/** Files read count */
73+
read_files_count?: number
74+
/** Partitions read count */
75+
read_partitions_count?: number
76+
/** Pruned files count */
77+
pruned_files_count?: number
78+
/** Whether result is from cache */
79+
result_from_cache?: boolean
80+
/** Percentage of bytes read from cache */
81+
bytes_read_from_cache_percentage?: number
82+
/** Remote rows written */
83+
write_remote_rows?: number
84+
/** Remote files written */
85+
write_remote_files?: number
86+
}
87+
88+
/**
89+
* Query info from Query History API
90+
* @see https://docs.databricks.com/api/workspace/queryhistory/list
91+
*/
92+
export type QueryInfo = {
93+
query_id: string
94+
status: string
95+
query_text: string
96+
query_start_time_ms: number
97+
execution_end_time_ms?: number
98+
query_end_time_ms?: number
99+
user_id: number
100+
user_name: string
101+
endpoint_id: string
102+
warehouse_id: string
103+
rows_produced?: number
104+
metrics?: QueryMetrics
105+
is_final: boolean
106+
duration?: number
107+
statement_type?: string
108+
}
109+
31110
/** Column schema information */
32111
export type ColumnInfo = {
33112
name: string
@@ -109,7 +188,9 @@ export type StatementParameter = {
109188
*/
110189
export type ExecuteStatementOptions = {
111190
/** Progress callback (called on each poll) */
112-
onProgress?: (status: StatementStatus) => void
191+
onProgress?: (status: StatementStatus, metrics?: QueryMetrics) => void
192+
/** Enable query metrics fetching during polling (default: false) */
193+
enableMetrics?: boolean
113194
/** Abort signal for cancellation */
114195
signal?: AbortSignal
115196
/** Result byte limit */

0 commit comments

Comments
 (0)