Skip to content

Commit 47deed9

Browse files
authored
feat: Row readers using storage API acceleration can leverage full precision avro values (#7525)
## Description Copied from https://togithub.com/googleapis/nodejs-bigquery-storage/pull/658 This PR allows users to fetch high precision timestamps for read calls with avro readers. Previous to this PR there was no way users could fetch such timestamps. ## Impact With a custom Avro reader, now users can fetch and consume data from the server that preserves high precision timestamps on the backend while providing information about the avro reader in the request. ## Testing A system test is added that proves the timestamp fetched from the server is high precision. ## Additional Information The source code changes are pretty much exactly in the same place as those from https://togithub.com/googleapis/nodejs-bigquery-storage/pull/656. The main difference is data coming back from the server for arrow readers is very different from data coming back from avro readers. That means instead of using arrow reader transforms from the source code we need to leverage the avsc library to parse server results in the same way that the quickstart guide demonstrates. This allows the test to produce a high precision timestamp.
1 parent 894c30a commit 47deed9

4 files changed

Lines changed: 145 additions & 5 deletions

File tree

handwritten/bigquery-storage/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
"@types/node": "^22.13.14",
5151
"@types/sinon": "^17.0.4",
5252
"@types/uuid": "^10.0.0",
53+
"avsc": "^5.7.9",
5354
"c8": "^10.1.3",
5455
"gapic-tools": "^1.0.1",
5556
"gts": "^6.0.2",

handwritten/bigquery-storage/src/reader/read_client.ts

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import {DataFormat} from './data_format';
2525
type CreateReadSessionRequest =
2626
protos.google.cloud.bigquery.storage.v1.ICreateReadSessionRequest;
2727
type ReadSession = protos.google.cloud.bigquery.storage.v1.IReadSession;
28-
type ArrowSerializationOptions = {
28+
type AvroArrowSerializationOptions = {
2929
picosTimestampPrecision: protos.google.cloud.bigquery.storage.v1.ArrowSerializationOptions.PicosTimestampPrecision;
3030
};
3131

@@ -131,7 +131,8 @@ export class ReadClient {
131131
table: string;
132132
dataFormat: DataFormat;
133133
selectedFields?: string[];
134-
arrowSerializationOptions?: ArrowSerializationOptions;
134+
arrowSerializationOptions?: AvroArrowSerializationOptions;
135+
avroSerializationOptions?: AvroArrowSerializationOptions;
135136
}): Promise<ReadSession> {
136137
await this.initialize();
137138
const {table, parent, dataFormat, selectedFields} = request;
@@ -154,6 +155,11 @@ export class ReadClient {
154155
arrowSerializationOptions: request.arrowSerializationOptions,
155156
});
156157
}
158+
if (request.avroSerializationOptions) {
159+
Object.assign(createReq.readSession.readOptions, {
160+
avroSerializationOptions: request.avroSerializationOptions,
161+
});
162+
}
157163
const [response] = await this._client.createReadSession(createReq);
158164
if (typeof [response] === undefined) {
159165
throw new gax.GoogleError(`${response}`);

handwritten/bigquery-storage/src/reader/read_session.ts

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ type ReadRowsResponse =
2525
protos.google.cloud.bigquery.storage.v1.IReadRowsResponse;
2626
type ReadSessionInfo = protos.google.cloud.bigquery.storage.v1.IReadSession;
2727
const ReadSessionInfo = protos.google.cloud.bigquery.storage.v1.ReadSession;
28-
type ArrowSerializationOptions = {
28+
type AvroArrowSerializationOptions = {
2929
picosTimestampPrecision: protos.google.cloud.bigquery.storage.v1.ArrowSerializationOptions.PicosTimestampPrecision;
3030
};
3131

@@ -39,9 +39,13 @@ export type GetStreamOptions = {
3939
*/
4040
selectedFields?: string;
4141
/**
42-
* Option to opt into higher precision timestamps.
42+
* Option to opt into higher precision timestamps for arrow readers.
4343
*/
44-
arrowSerializationOptions?: ArrowSerializationOptions;
44+
arrowSerializationOptions?: AvroArrowSerializationOptions;
45+
/**
46+
* Option to opt into higher precision timestamps for avro readers.
47+
*/
48+
avroSerializationOptions?: AvroArrowSerializationOptions;
4549
};
4650

4751
/**
@@ -93,6 +97,7 @@ export class ReadSession {
9397
dataFormat: this._format,
9498
selectedFields: options?.selectedFields?.split(','),
9599
arrowSerializationOptions: options?.arrowSerializationOptions,
100+
avroSerializationOptions: options?.avroSerializationOptions,
96101
});
97102
this.trace(
98103
'session created',

handwritten/bigquery-storage/system-test/reader_client_test.ts

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ import {RecordBatch, Table, tableFromIPC} from 'apache-arrow';
2929
import {ArrowRecordBatchTableRowTransform} from '../src/reader/arrow_transform';
3030
import {ResourceStream} from '@google-cloud/paginator';
3131
import {ArrowTableReader} from '../src/reader';
32+
import {Transform, TransformCallback} from 'stream';
33+
type ReadSession = protos.google.cloud.bigquery.storage.v1.IReadSession;
3234

3335
type ReadRowsResponse =
3436
protos.google.cloud.bigquery.storage.v1.IReadRowsResponse;
@@ -230,6 +232,132 @@ describe('reader.ReaderClient', () => {
230232
});
231233
});
232234

235+
describe('AvroReader', () => {
236+
it('should read high precision timestamps from an avro stream', async () => {
237+
const avro = require('avsc');
238+
class AvroRawTransform extends Transform {
239+
private session: ReadSession;
240+
241+
constructor(session: ReadSession) {
242+
super({
243+
objectMode: true,
244+
});
245+
this.session = session;
246+
}
247+
248+
_transform(
249+
serializedRecordBatch: any,
250+
_: BufferEncoding,
251+
callback: TransformCallback,
252+
): void {
253+
const session = this.session;
254+
const schema = JSON.parse(session?.avroSchema?.schema as string);
255+
const avroType = avro.Type.forSchema(schema);
256+
if (
257+
!(
258+
serializedRecordBatch.avroRows &&
259+
serializedRecordBatch.avroRows.serializedBinaryRows
260+
)
261+
) {
262+
callback(null);
263+
return;
264+
}
265+
const decodedData = avroType.decode(
266+
serializedRecordBatch.avroRows.serializedBinaryRows,
267+
0,
268+
);
269+
callback(null, decodedData.value);
270+
}
271+
}
272+
273+
const picosTableId = generateUuid();
274+
const picosSchema: any = {
275+
fields: [
276+
{
277+
name: 'customer_name',
278+
type: 'STRING',
279+
mode: 'REQUIRED',
280+
},
281+
{
282+
name: 'row_num',
283+
type: 'INTEGER',
284+
mode: 'REQUIRED',
285+
},
286+
{
287+
name: 'created_at',
288+
type: 'TIMESTAMP',
289+
mode: 'NULLABLE',
290+
timestampPrecision: 12,
291+
},
292+
],
293+
};
294+
const expectedTsValue = '2024-04-05T15:45:58.981123456789Z';
295+
await bigquery
296+
.dataset(datasetId)
297+
.createTable(picosTableId, {schema: picosSchema});
298+
await bigquery
299+
.dataset(datasetId)
300+
.table(picosTableId)
301+
.insert([
302+
{
303+
customer_name: 'my-name',
304+
row_num: 1,
305+
created_at: expectedTsValue,
306+
},
307+
]);
308+
309+
bqReadClient.initialize().catch(err => {
310+
throw err;
311+
});
312+
const client = new ReadClient();
313+
client.setClient(bqReadClient);
314+
315+
try {
316+
const session = await client.createReadSession({
317+
parent: `projects/${projectId}`,
318+
table: `projects/${projectId}/datasets/${datasetId}/tables/${picosTableId}`,
319+
dataFormat: AvroFormat,
320+
avroSerializationOptions: {
321+
picosTimestampPrecision:
322+
protos.google.cloud.bigquery.storage.v1.ArrowSerializationOptions
323+
.PicosTimestampPrecision.TIMESTAMP_PRECISION_PICOS,
324+
},
325+
});
326+
327+
assert.equal(session.dataFormat, AvroFormat);
328+
assert.notEqual(session.streams, null);
329+
assert.notEqual(session.streams?.length, 0);
330+
331+
const readStream = session.streams![0];
332+
const connection = await client.createReadStream({
333+
session,
334+
streamName: readStream.name!,
335+
});
336+
337+
const myStream = connection
338+
.getRowsStream()
339+
.pipe(new AvroRawTransform(session!));
340+
const responses: ReadRowsResponse[] = [];
341+
await new Promise((resolve, reject) => {
342+
myStream.on('data', (data: ReadRowsResponse) => {
343+
responses.push(data);
344+
});
345+
myStream.on('error', reject);
346+
myStream.on('end', () => {
347+
resolve(null);
348+
});
349+
});
350+
351+
assert.equal(responses.length, 1);
352+
assert.equal((responses[0] as any)['created_at'], expectedTsValue);
353+
354+
connection.close();
355+
client.close();
356+
} finally {
357+
client.close();
358+
}
359+
});
360+
});
233361
describe('ArrowTableReader', () => {
234362
it('should allow to read a table as an Arrow byte stream', async () => {
235363
bqReadClient.initialize().catch(err => {

0 commit comments

Comments
 (0)