Skip to content

Commit e87bb29

Browse files
authored
Bigtable: add an option to pass in row filter text proto for use in template (#37632)
* Bigtable: add row filter text proto for template * remove redundent code * chain filters * add a test * simplify logic
1 parent 8dc5da5 commit e87bb29

3 files changed

Lines changed: 137 additions & 8 deletions

File tree

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import static org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter.BAD_RECORD_TAG;
2323
import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
2424
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
25+
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
2526
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
2627

2728
import com.google.api.gax.batching.BatchingException;
@@ -38,6 +39,7 @@
3839
import com.google.cloud.bigtable.data.v2.models.ChangeStreamRecord;
3940
import com.google.cloud.bigtable.data.v2.models.KeyOffset;
4041
import com.google.protobuf.ByteString;
42+
import com.google.protobuf.TextFormat;
4143
import java.io.IOException;
4244
import java.util.ArrayDeque;
4345
import java.util.ArrayList;
@@ -610,6 +612,22 @@ public Read withRowFilter(RowFilter filter) {
610612
return withRowFilter(StaticValueProvider.of(filter));
611613
}
612614

615+
/**
616+
* Returns a new {@link BigtableIO.Read} that will filter the rows read from Cloud Bigtable
617+
* using the given {@link TextFormat} row filter. If {@link #withRowFilter(RowFilter)} is also
618+
* set, the filters are chained.
619+
*
620+
* <p>Does not modify this object.
621+
*/
622+
public Read withRowFilterTextProto(ValueProvider<String> filter) {
623+
checkNotNull(filter, "filter can not be null");
624+
BigtableReadOptions bigtableReadOptions = getBigtableReadOptions();
625+
return toBuilder()
626+
.setBigtableReadOptions(
627+
bigtableReadOptions.toBuilder().setRowFilterTextProto(filter).build())
628+
.build();
629+
}
630+
613631
/**
614632
* Returns a new {@link BigtableIO.Read} that will break up read requests into smaller batches.
615633
* This function will switch the base BigtableIO.Reader class to using the SegmentReader. If
@@ -1938,8 +1956,28 @@ public List<ByteKeyRange> getRanges() {
19381956
}
19391957

19401958
public @Nullable RowFilter getRowFilter() {
1941-
ValueProvider<RowFilter> rowFilter = readOptions.getRowFilter();
1942-
return rowFilter != null && rowFilter.isAccessible() ? rowFilter.get() : null;
1959+
RowFilter.Chain.Builder chain = RowFilter.Chain.newBuilder();
1960+
ValueProvider<RowFilter> rowFilterValueProvider = readOptions.getRowFilter();
1961+
if (rowFilterValueProvider != null && rowFilterValueProvider.isAccessible()) {
1962+
chain.addFilters(rowFilterValueProvider.get());
1963+
}
1964+
ValueProvider<String> textFilterValueProvider = readOptions.getRowFilterTextProto();
1965+
if (textFilterValueProvider != null && textFilterValueProvider.isAccessible()) {
1966+
try {
1967+
chain.addFilters(TextFormat.parse(textFilterValueProvider.get(), RowFilter.class));
1968+
} catch (TextFormat.ParseException e) {
1969+
throw new RuntimeException("Failed to parse row filter text proto", e);
1970+
}
1971+
}
1972+
1973+
switch (chain.getFiltersCount()) {
1974+
case 0:
1975+
return null;
1976+
case 1:
1977+
return chain.getFilters(0);
1978+
default:
1979+
return RowFilter.newBuilder().setChain(chain.build()).build();
1980+
}
19431981
}
19441982

19451983
public @Nullable Integer getMaxBufferElementCount() {

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadOptions.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.beam.sdk.io.gcp.bigtable;
1919

2020
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
21+
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
2122

2223
import com.google.auto.value.AutoValue;
2324
import com.google.bigtable.v2.RowFilter;
@@ -43,6 +44,8 @@ abstract class BigtableReadOptions implements Serializable {
4344
/** Returns the row filter to use. */
4445
abstract @Nullable ValueProvider<RowFilter> getRowFilter();
4546

47+
abstract @Nullable ValueProvider<String> getRowFilterTextProto();
48+
4649
/** Returns the key ranges to read. */
4750
abstract @Nullable ValueProvider<List<ByteKeyRange>> getKeyRanges();
4851

@@ -73,6 +76,8 @@ abstract static class Builder {
7376

7477
abstract Builder setRowFilter(ValueProvider<RowFilter> rowFilter);
7578

79+
abstract Builder setRowFilterTextProto(ValueProvider<String> rowFilter);
80+
7681
abstract Builder setMaxBufferElementCount(@Nullable Integer maxBufferElementCount);
7782

7883
abstract Builder setKeyRanges(ValueProvider<List<ByteKeyRange>> keyRanges);
@@ -110,6 +115,8 @@ void populateDisplayData(DisplayData.Builder builder) {
110115
builder
111116
.addIfNotNull(DisplayData.item("tableId", getTableId()).withLabel("Bigtable Table Id"))
112117
.addIfNotNull(DisplayData.item("rowFilter", getRowFilter()).withLabel("Row Filter"))
118+
.addIfNotNull(
119+
DisplayData.item("rowFilterTextProto", getRowFilterTextProto()).withLabel("Row Filter"))
113120
.addIfNotNull(DisplayData.item("keyRanges", getKeyRanges()).withLabel("Key Ranges"))
114121
.addIfNotNull(
115122
DisplayData.item("attemptTimeout", getAttemptTimeout())
@@ -127,6 +134,11 @@ void validate() {
127134
if (getRowFilter() != null && getRowFilter().isAccessible()) {
128135
checkArgument(getRowFilter().get() != null, "rowFilter can not be null");
129136
}
137+
138+
if (getRowFilterTextProto() != null && getRowFilterTextProto().isAccessible()) {
139+
checkNotNull(getRowFilterTextProto(), "rowFilter can not be null");
140+
}
141+
130142
if (getMaxBufferElementCount() != null) {
131143
checkArgument(
132144
getMaxBufferElementCount() > 0, "maxBufferElementCount can not be zero or negative");

sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java

Lines changed: 85 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -680,6 +680,69 @@ public void testReadingWithRuntimeParameterizedFilter() throws Exception {
680680
defaultRead.withTableId(table).withRowFilter(StaticValueProvider.of(filter)),
681681
Lists.newArrayList(filteredRows));
682682
}
683+
684+
/** Tests reading rows using a text proto filter provided through ValueProvider. */
685+
@Test
686+
public void testReadingWithRowFilterTextProto() throws Exception {
687+
final String table = "TEST-FILTER-TABLE";
688+
final int numRows = 1001;
689+
List<Row> testRows = makeTableData(table, numRows);
690+
String regex = ".*17.*";
691+
final KeyMatchesRegex keyPredicate = new KeyMatchesRegex(regex);
692+
Iterable<Row> filteredRows =
693+
testRows.stream()
694+
.filter(
695+
input -> {
696+
verifyNotNull(input, "input");
697+
return keyPredicate.apply(input.getKey());
698+
})
699+
.collect(Collectors.toList());
700+
701+
String filter = "row_key_regex_filter: \".*17.*\"";
702+
service.setupSampleRowKeys(table, 5, 10L);
703+
704+
runReadTest(
705+
defaultRead.withTableId(table).withRowFilterTextProto(StaticValueProvider.of(filter)),
706+
Lists.newArrayList(filteredRows));
707+
}
708+
709+
/** Tests reading rows using both filter providers. */
710+
@Test
711+
public void testReadingWithBothTextFilterAndRowFilter() throws Exception {
712+
final String table = "TEST-FILTER-TABLE";
713+
final int numRows = 1001;
714+
List<Row> testRows = makeTableData(table, numRows);
715+
String regex1 = ".*17.*";
716+
String regex2 = ".*2.*";
717+
final KeyMatchesRegex keyPredicate1 = new KeyMatchesRegex(regex1);
718+
final KeyMatchesRegex keyPredicate2 = new KeyMatchesRegex(regex2);
719+
Iterable<Row> filteredRows =
720+
testRows.stream()
721+
.filter(
722+
input -> {
723+
verifyNotNull(input, "input");
724+
return keyPredicate1.apply(input.getKey());
725+
})
726+
.filter(
727+
input -> {
728+
verifyNotNull(input, "input");
729+
return keyPredicate2.apply(input.getKey());
730+
})
731+
.collect(Collectors.toList());
732+
733+
String filter = "row_key_regex_filter: \".*17.*\"";
734+
RowFilter rowFilter =
735+
RowFilter.newBuilder().setRowKeyRegexFilter(ByteString.copyFromUtf8(regex2)).build();
736+
service.setupSampleRowKeys(table, 5, 10L);
737+
738+
runReadTest(
739+
defaultRead
740+
.withTableId(table)
741+
.withRowFilterTextProto(StaticValueProvider.of(filter))
742+
.withRowFilter(rowFilter),
743+
Lists.newArrayList(filteredRows));
744+
}
745+
683746
/** Tests dynamic work rebalancing exhaustively. */
684747
@Test
685748
public void testReadingSplitAtFractionExhaustive() throws Exception {
@@ -1778,17 +1841,27 @@ private static class FakeBigtableReader implements BigtableService.Reader {
17781841
private final FakeBigtableService service;
17791842
private Iterator<Map.Entry<ByteString, ByteString>> rows;
17801843
private Row currentRow;
1781-
private final Predicate<ByteString> filter;
1844+
private final List<Predicate<ByteString>> filters = new ArrayList<>();
17821845

17831846
public FakeBigtableReader(BigtableSource source, FakeBigtableService service) {
17841847
this.source = source;
17851848
this.service = service;
17861849
if (source.getRowFilter() == null) {
1787-
filter = Predicates.alwaysTrue();
1850+
filters.add(Predicates.alwaysTrue());
17881851
} else {
1789-
ByteString keyRegex = source.getRowFilter().getRowKeyRegexFilter();
1790-
checkArgument(!keyRegex.isEmpty(), "Only RowKeyRegexFilter is supported");
1791-
filter = new KeyMatchesRegex(keyRegex.toStringUtf8());
1852+
RowFilter rowFilter = source.getRowFilter();
1853+
if (rowFilter.hasChain()) {
1854+
RowFilter.Chain chain = rowFilter.getChain();
1855+
for (RowFilter filter : chain.getFiltersList()) {
1856+
ByteString keyRegex = filter.getRowKeyRegexFilter();
1857+
checkArgument(!keyRegex.isEmpty(), "Only RowKeyRegexFilter is supported");
1858+
filters.add(new KeyMatchesRegex(keyRegex.toStringUtf8()));
1859+
}
1860+
} else {
1861+
ByteString keyRegex = source.getRowFilter().getRowKeyRegexFilter();
1862+
checkArgument(!keyRegex.isEmpty(), "Only RowKeyRegexFilter is supported");
1863+
filters.add(new KeyMatchesRegex(keyRegex.toStringUtf8()));
1864+
}
17921865
}
17931866
service.verifyTableExists(source.getTableId().get());
17941867
}
@@ -1809,7 +1882,13 @@ public boolean advance() throws IOException {
18091882
Map.Entry<ByteString, ByteString> entry = null;
18101883
while (rows.hasNext()) {
18111884
entry = rows.next();
1812-
if (!filter.apply(entry.getKey())
1885+
boolean predicateMatched = true;
1886+
for (Predicate<ByteString> predicate : filters) {
1887+
if (!predicate.apply(entry.getKey())) {
1888+
predicateMatched = false;
1889+
}
1890+
}
1891+
if (!predicateMatched
18131892
|| !rangesContainsKey(source.getRanges(), makeByteKey(entry.getKey()))) {
18141893
// Does not match row filter or does not match source range. Skip.
18151894
entry = null;

0 commit comments

Comments
 (0)