Skip to content

Commit b987d37

Browse files
Add table information to BigQueryStorageApiInsertError (#36832)
* Add table information to BigQueryStorageApiInsertError This change adds table identification (project, dataset, table) to BigQueryStorageApiInsertError to help users identify which table failed during Storage Write API operations. Changes: - Add tableUrn field to BigQueryStorageApiInsertError - Add getProjectId(), getDatasetId(), getTableId() convenience methods that parse the tableUrn on first access (lazy initialization with caching) - Update StorageApiWriteUnshardedRecords to pass tableUrn to error objects - Update StorageApiWritesShardedRecords to pass tableUrn to error objects - Update StorageApiConvertMessages to pass tableUrn to error objects - Update BigQueryStorageApiInsertErrorCoder to serialize/deserialize tableUrn This makes the API consistent with BigQueryInsertError (used by STREAMING_INSERTS method), which provides table information via TableReference. The tableUrn uses the format: "projects/{project}/datasets/{dataset}/tables/{table}" which is the standard format returned by TableDestination.getTableUrn(). Tested: - All StorageApi-related tests pass - Verified table information is correctly captured in error outputs * Use TableReference instead of String for table identification in BigQueryStorageApiInsertError Following reviewer feedback, this change updates BigQueryStorageApiInsertError to use TableReference instead of String tableUrn for better consistency with BigQueryInsertError. Changes: - Changed BigQueryStorageApiInsertError to use TableReference field instead of String tableUrn - Updated BigQueryStorageApiInsertErrorCoder to use BigQueryHelpers.toTableSpec() and parseTableSpec() - Added null safety checks in coder to prevent NullPointerException when table is unknown - Updated all calling sites to pass TableReference from TableDestination.getTableReference(): - StorageApiWriteUnshardedRecords.java (3 locations) - StorageApiWritesShardedRecords.java (3 locations + added tableReference variable) - StorageApiConvertMessages.java (1 location) - Added TableReference imports to modified files The null checks in the coder ensure job stability even when table information is unavailable during error handling, preventing pipeline failures in error reporting scenarios. * Add table field to toString in BigQueryStorageApiInsertError Added the table field to the toString() method for better debugging and logging visibility. This ensures all fields are represented in the string output, making error investigation easier. Change: - Updated toString() to include ", table=" + table * Add unit tests for BigQueryStorageApiInsertErrorCoder Added comprehensive unit tests for BigQueryStorageApiInsertErrorCoder to verify encode/decode functionality: - testDecodeEncodeEqual: all fields populated - testDecodeEncodeWithNullTable: table is null - testDecodeEncodeWithNullErrorMessage: errorMessage is null - testDecodeEncodeWithAllNullableFieldsNull: both nullable fields are null These tests ensure the coder correctly handles all combinations of nullable fields (errorMessage and table).
1 parent 87de10f commit b987d37

6 files changed

Lines changed: 172 additions & 10 deletions

File tree

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageApiInsertError.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
*/
1818
package org.apache.beam.sdk.io.gcp.bigquery;
1919

20+
import com.google.api.services.bigquery.model.TableReference;
2021
import com.google.api.services.bigquery.model.TableRow;
2122
import javax.annotation.Nullable;
2223

@@ -25,13 +26,21 @@ public class BigQueryStorageApiInsertError {
2526

2627
private @Nullable String errorMessage;
2728

29+
private @Nullable TableReference table;
30+
2831
public BigQueryStorageApiInsertError(TableRow row) {
29-
this.row = row;
32+
this(row, null, null);
3033
}
3134

3235
public BigQueryStorageApiInsertError(TableRow row, @Nullable String errorMessage) {
36+
this(row, errorMessage, null);
37+
}
38+
39+
public BigQueryStorageApiInsertError(
40+
TableRow row, @Nullable String errorMessage, @Nullable TableReference table) {
3341
this.row = row;
3442
this.errorMessage = errorMessage;
43+
this.table = table;
3544
}
3645

3746
public TableRow getRow() {
@@ -43,6 +52,11 @@ public String getErrorMessage() {
4352
return errorMessage;
4453
}
4554

55+
@Nullable
56+
public TableReference getTable() {
57+
return table;
58+
}
59+
4660
@Override
4761
public String toString() {
4862
return "BigQueryStorageApiInsertError{"
@@ -51,6 +65,8 @@ public String toString() {
5165
+ ", errorMessage='"
5266
+ errorMessage
5367
+ '\''
68+
+ ", table="
69+
+ table
5470
+ '}';
5571
}
5672
}

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageApiInsertErrorCoder.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
*/
1818
package org.apache.beam.sdk.io.gcp.bigquery;
1919

20+
import com.google.api.services.bigquery.model.TableReference;
2021
import com.google.api.services.bigquery.model.TableRow;
2122
import java.io.IOException;
2223
import java.io.InputStream;
@@ -42,12 +43,18 @@ public void encode(BigQueryStorageApiInsertError value, OutputStream outStream)
4243
throws IOException {
4344
TABLE_ROW_CODER.encode(value.getRow(), outStream);
4445
STRING_CODER.encode(value.getErrorMessage(), outStream);
46+
TableReference table = value.getTable();
47+
String tableSpec = table != null ? BigQueryHelpers.toTableSpec(table) : null;
48+
STRING_CODER.encode(tableSpec, outStream);
4549
}
4650

4751
@Override
4852
public BigQueryStorageApiInsertError decode(InputStream inStream)
4953
throws CoderException, IOException {
50-
return new BigQueryStorageApiInsertError(
51-
TABLE_ROW_CODER.decode(inStream), STRING_CODER.decode(inStream));
54+
TableRow row = TABLE_ROW_CODER.decode(inStream);
55+
String errorMessage = STRING_CODER.decode(inStream);
56+
String tableSpec = STRING_CODER.decode(inStream);
57+
TableReference table = tableSpec != null ? BigQueryHelpers.parseTableSpec(tableSpec) : null;
58+
return new BigQueryStorageApiInsertError(row, errorMessage, table);
5259
}
5360
}

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiConvertMessages.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import static org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter.BAD_RECORD_TAG;
2121

22+
import com.google.api.services.bigquery.model.TableReference;
2223
import com.google.api.services.bigquery.model.TableRow;
2324
import java.io.IOException;
2425
import org.apache.beam.sdk.coders.Coder;
@@ -186,10 +187,15 @@ public void processElement(
186187
badRecordRouter.route(o, element, elementCoder, e, "Unable to convert value to TableRow");
187188
return;
188189
}
190+
TableReference tableReference = null;
191+
TableDestination tableDestination = dynamicDestinations.getTable(element.getKey());
192+
if (tableDestination != null) {
193+
tableReference = tableDestination.getTableReference();
194+
}
189195
o.get(failedWritesTag)
190196
.output(
191197
new BigQueryStorageApiInsertError(
192-
failsafeTableRow, conversionException.toString()));
198+
failsafeTableRow, conversionException.toString(), tableReference));
193199
} catch (Exception e) {
194200
badRecordRouter.route(
195201
o, element, elementCoder, e, "Unable to convert value to StorageWriteApiPayload");

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -608,7 +608,8 @@ void addMessage(
608608
org.joda.time.Instant timestamp = payload.getTimestamp();
609609
rowsSentToFailedRowsCollection.inc();
610610
failedRowsReceiver.outputWithTimestamp(
611-
new BigQueryStorageApiInsertError(tableRow, e.toString()),
611+
new BigQueryStorageApiInsertError(
612+
tableRow, e.toString(), tableDestination.getTableReference()),
612613
timestamp != null ? timestamp : elementTs);
613614
return;
614615
}
@@ -668,7 +669,9 @@ long flush(
668669
org.joda.time.Instant timestamp = insertTimestamps.get(i);
669670
failedRowsReceiver.outputWithTimestamp(
670671
new BigQueryStorageApiInsertError(
671-
failedRow, "Row payload too large. Maximum size " + maxRequestSize),
672+
failedRow,
673+
"Row payload too large. Maximum size " + maxRequestSize,
674+
tableDestination.getTableReference()),
672675
timestamp);
673676
}
674677
int numRowsFailed = inserts.getSerializedRowsCount();
@@ -753,7 +756,9 @@ long flush(
753756
}
754757
element =
755758
new BigQueryStorageApiInsertError(
756-
failedRow, error.getRowIndexToErrorMessage().get(failedIndex));
759+
failedRow,
760+
error.getRowIndexToErrorMessage().get(failedIndex),
761+
tableDestination.getTableReference());
757762
} catch (Exception e) {
758763
LOG.error("Failed to insert row and could not parse the result!", e);
759764
}

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import com.google.api.core.ApiFuture;
2323
import com.google.api.core.ApiFutures;
24+
import com.google.api.services.bigquery.model.TableReference;
2425
import com.google.api.services.bigquery.model.TableRow;
2526
import com.google.cloud.bigquery.storage.v1.AppendRowsRequest;
2627
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
@@ -481,6 +482,7 @@ public void process(
481482
});
482483
final String tableId = tableDestination.getTableUrn(bigQueryOptions);
483484
final String shortTableId = tableDestination.getShortTableUrn();
485+
final TableReference tableReference = tableDestination.getTableReference();
484486
final DatasetService datasetService = getDatasetService(pipelineOptions);
485487
final WriteStreamService writeStreamService = getWriteStreamService(pipelineOptions);
486488

@@ -619,7 +621,8 @@ public void process(
619621
(failedRow, errorMessage) -> {
620622
o.get(failedRowsTag)
621623
.outputWithTimestamp(
622-
new BigQueryStorageApiInsertError(failedRow.getValue(), errorMessage),
624+
new BigQueryStorageApiInsertError(
625+
failedRow.getValue(), errorMessage, tableReference),
623626
failedRow.getTimestamp());
624627
rowsSentToFailedRowsCollection.inc();
625628
BigQuerySinkMetrics.appendRowsRowStatusCounter(
@@ -739,7 +742,9 @@ public void process(
739742
o.get(failedRowsTag)
740743
.outputWithTimestamp(
741744
new BigQueryStorageApiInsertError(
742-
failedRow, error.getRowIndexToErrorMessage().get(failedIndex)),
745+
failedRow,
746+
error.getRowIndexToErrorMessage().get(failedIndex),
747+
tableReference),
743748
timestamp);
744749
}
745750
int failedRows = failedRowIndices.size();
@@ -910,7 +915,9 @@ public void process(
910915
o.get(failedRowsTag)
911916
.outputWithTimestamp(
912917
new BigQueryStorageApiInsertError(
913-
failedRow, "Row payload too large. Maximum size " + maxRequestSize),
918+
failedRow,
919+
"Row payload too large. Maximum size " + maxRequestSize,
920+
tableReference),
914921
timestamp);
915922
}
916923
int numRowsFailed = splitValue.getProtoRows().getSerializedRowsCount();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.sdk.io.gcp.bigquery;
19+
20+
import static org.junit.Assert.assertEquals;
21+
import static org.junit.Assert.assertNull;
22+
23+
import com.google.api.services.bigquery.model.TableReference;
24+
import com.google.api.services.bigquery.model.TableRow;
25+
import java.io.ByteArrayInputStream;
26+
import java.io.ByteArrayOutputStream;
27+
import org.apache.beam.sdk.coders.Coder;
28+
import org.junit.Test;
29+
import org.junit.runner.RunWith;
30+
import org.junit.runners.JUnit4;
31+
32+
/** Test case for {@link BigQueryStorageApiInsertErrorCoder}. */
33+
@RunWith(JUnit4.class)
34+
public class BigQueryStorageApiInsertErrorCoderTest {
35+
36+
private static final Coder<BigQueryStorageApiInsertError> TEST_CODER =
37+
BigQueryStorageApiInsertErrorCoder.of();
38+
39+
@Test
40+
public void testDecodeEncodeEqual() throws Exception {
41+
TableRow row = new TableRow().set("field1", "value1").set("field2", 123);
42+
BigQueryStorageApiInsertError value =
43+
new BigQueryStorageApiInsertError(
44+
row,
45+
"An error message",
46+
new TableReference()
47+
.setProjectId("dummy-project-id")
48+
.setDatasetId("dummy-dataset-id")
49+
.setTableId("dummy-table-id"));
50+
51+
ByteArrayOutputStream outStream = new ByteArrayOutputStream();
52+
TEST_CODER.encode(value, outStream);
53+
54+
ByteArrayInputStream inStream = new ByteArrayInputStream(outStream.toByteArray());
55+
BigQueryStorageApiInsertError decoded = TEST_CODER.decode(inStream);
56+
57+
assertEquals(value.getRow(), decoded.getRow());
58+
assertEquals(value.getErrorMessage(), decoded.getErrorMessage());
59+
assertEquals("dummy-project-id", decoded.getTable().getProjectId());
60+
assertEquals("dummy-dataset-id", decoded.getTable().getDatasetId());
61+
assertEquals("dummy-table-id", decoded.getTable().getTableId());
62+
}
63+
64+
@Test
65+
public void testDecodeEncodeWithNullTable() throws Exception {
66+
TableRow row = new TableRow().set("field1", "value1");
67+
BigQueryStorageApiInsertError value =
68+
new BigQueryStorageApiInsertError(row, "An error message", null);
69+
70+
ByteArrayOutputStream outStream = new ByteArrayOutputStream();
71+
TEST_CODER.encode(value, outStream);
72+
73+
ByteArrayInputStream inStream = new ByteArrayInputStream(outStream.toByteArray());
74+
BigQueryStorageApiInsertError decoded = TEST_CODER.decode(inStream);
75+
76+
assertEquals(value.getRow(), decoded.getRow());
77+
assertEquals(value.getErrorMessage(), decoded.getErrorMessage());
78+
assertNull(decoded.getTable());
79+
}
80+
81+
@Test
82+
public void testDecodeEncodeWithNullErrorMessage() throws Exception {
83+
TableRow row = new TableRow().set("field1", "value1");
84+
BigQueryStorageApiInsertError value =
85+
new BigQueryStorageApiInsertError(
86+
row,
87+
null,
88+
new TableReference()
89+
.setProjectId("dummy-project-id")
90+
.setDatasetId("dummy-dataset-id")
91+
.setTableId("dummy-table-id"));
92+
93+
ByteArrayOutputStream outStream = new ByteArrayOutputStream();
94+
TEST_CODER.encode(value, outStream);
95+
96+
ByteArrayInputStream inStream = new ByteArrayInputStream(outStream.toByteArray());
97+
BigQueryStorageApiInsertError decoded = TEST_CODER.decode(inStream);
98+
99+
assertEquals(value.getRow(), decoded.getRow());
100+
assertNull(decoded.getErrorMessage());
101+
assertEquals("dummy-project-id", decoded.getTable().getProjectId());
102+
assertEquals("dummy-dataset-id", decoded.getTable().getDatasetId());
103+
assertEquals("dummy-table-id", decoded.getTable().getTableId());
104+
}
105+
106+
@Test
107+
public void testDecodeEncodeWithAllNullableFieldsNull() throws Exception {
108+
TableRow row = new TableRow().set("field1", "value1");
109+
BigQueryStorageApiInsertError value = new BigQueryStorageApiInsertError(row, null, null);
110+
111+
ByteArrayOutputStream outStream = new ByteArrayOutputStream();
112+
TEST_CODER.encode(value, outStream);
113+
114+
ByteArrayInputStream inStream = new ByteArrayInputStream(outStream.toByteArray());
115+
BigQueryStorageApiInsertError decoded = TEST_CODER.decode(inStream);
116+
117+
assertEquals(value.getRow(), decoded.getRow());
118+
assertNull(decoded.getErrorMessage());
119+
assertNull(decoded.getTable());
120+
}
121+
}

0 commit comments

Comments
 (0)