Skip to content

Commit ae6d624

Browse files
Add a configuration to wait for session creation (#37625)
* Add a configuration to wait for session creation * Address spotless comments * Add session wait time to tests * Fix checkstyle issue * Add an description * Set wait time to 0 * Increase wait time * Increase wait time * Reduce time increment to 5 seconds * Reduce time increment to 2 seconds
1 parent 7b33e1c commit ae6d624

5 files changed

Lines changed: 54 additions & 0 deletions

File tree

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import com.google.cloud.spanner.DatabaseAdminClient;
3131
import com.google.cloud.spanner.DatabaseClient;
3232
import com.google.cloud.spanner.DatabaseId;
33+
import com.google.cloud.spanner.SessionPoolOptions;
3334
import com.google.cloud.spanner.Spanner;
3435
import com.google.cloud.spanner.SpannerOptions;
3536
import com.google.cloud.spanner.v1.stub.SpannerStubSettings;
@@ -38,6 +39,7 @@
3839
import com.google.spanner.v1.ExecuteSqlRequest;
3940
import com.google.spanner.v1.PartialResultSet;
4041
import java.util.HashSet;
42+
import java.util.Optional;
4143
import java.util.Set;
4244
import java.util.concurrent.ConcurrentHashMap;
4345
import org.apache.beam.sdk.options.ValueProvider;
@@ -61,6 +63,9 @@ public class SpannerAccessor implements AutoCloseable {
6163
*/
6264
private static final String USER_AGENT_PREFIX = "Apache_Beam_Java";
6365

66+
// Default wait time for session creation
67+
static final java.time.Duration DEFAULT_SESSION_WAIT_DURATION = java.time.Duration.ofMinutes(5);
68+
6469
/** Instance ID to use when connecting to an experimental host. */
6570
public static final String EXPERIMENTAL_HOST_INSTANCE_ID = "default";
6671

@@ -270,6 +275,15 @@ static SpannerOptions buildSpannerOptions(SpannerConfig spannerConfig) {
270275
builder.setCredentials(credentials.get());
271276
}
272277

278+
ValueProvider<java.time.Duration> waitForSessionCreationDuration =
279+
spannerConfig.getWaitForSessionCreationDuration();
280+
java.time.Duration waitDuration =
281+
Optional.ofNullable(waitForSessionCreationDuration)
282+
.map(ValueProvider::get)
283+
.orElse(DEFAULT_SESSION_WAIT_DURATION);
284+
builder.setSessionPoolOption(
285+
SessionPoolOptions.newBuilder().setWaitForMinSessionsDuration(waitDuration).build());
286+
273287
return builder.build();
274288
}
275289

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,8 @@ public String getHostValue() {
104104

105105
public abstract @Nullable ValueProvider<Credentials> getCredentials();
106106

107+
public abstract @Nullable ValueProvider<java.time.Duration> getWaitForSessionCreationDuration();
108+
107109
abstract Builder toBuilder();
108110

109111
public static SpannerConfig create() {
@@ -189,6 +191,9 @@ abstract Builder setExecuteStreamingSqlRetrySettings(
189191

190192
abstract Builder setPlainText(ValueProvider<Boolean> plainText);
191193

194+
abstract Builder setWaitForSessionCreationDuration(
195+
ValueProvider<java.time.Duration> waitForSessionCreationDuration);
196+
192197
public abstract SpannerConfig build();
193198
}
194199

@@ -389,4 +394,24 @@ public SpannerConfig withUsingPlainTextChannel(ValueProvider<Boolean> plainText)
389394
public SpannerConfig withUsingPlainTextChannel(boolean plainText) {
390395
return withUsingPlainTextChannel(ValueProvider.StaticValueProvider.of(plainText));
391396
}
397+
398+
/**
399+
* Sets the wait time for a multiplexed session to be available when creating a database client.
400+
*
401+
* <p>Setting this will block the {@link com.google.cloud.spanner.DatabaseClient} creation.
402+
*
403+
* @param waitForSessionCreationDuration The duration to wait. Defaults to {@link
404+
* SpannerAccessor#DEFAULT_SESSION_WAIT_DURATION}.
405+
* @return {@link SpannerConfig}
406+
*/
407+
public SpannerConfig withWaitForSessionCreationDuration(
408+
ValueProvider<java.time.Duration> waitForSessionCreationDuration) {
409+
return toBuilder().setWaitForSessionCreationDuration(waitForSessionCreationDuration).build();
410+
}
411+
412+
public SpannerConfig withWaitForSessionCreationDuration(
413+
java.time.Duration waitForSessionCreationDuration) {
414+
return withWaitForSessionCreationDuration(
415+
ValueProvider.StaticValueProvider.of(waitForSessionCreationDuration));
416+
}
392417
}

sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerReadIT.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import com.google.cloud.spanner.DatabaseId;
2727
import com.google.cloud.spanner.Dialect;
2828
import com.google.cloud.spanner.Mutation;
29+
import com.google.cloud.spanner.SessionPoolOptions;
2930
import com.google.cloud.spanner.Spanner;
3031
import com.google.cloud.spanner.SpannerOptions;
3132
import com.google.cloud.spanner.Struct;
@@ -118,6 +119,10 @@ public void setUp() throws Exception {
118119
SpannerOptions.newBuilder()
119120
.setProjectId(project)
120121
.disableGrpcGcpExtension()
122+
.setSessionPoolOption(
123+
SessionPoolOptions.newBuilder()
124+
.setWaitForMinSessionsDuration(java.time.Duration.ofMinutes(5))
125+
.build())
121126
.build()
122127
.getService();
123128

sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteIT.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import com.google.cloud.spanner.Dialect;
2929
import com.google.cloud.spanner.Mutation;
3030
import com.google.cloud.spanner.ResultSet;
31+
import com.google.cloud.spanner.SessionPoolOptions;
3132
import com.google.cloud.spanner.Spanner;
3233
import com.google.cloud.spanner.SpannerOptions;
3334
import com.google.cloud.spanner.Statement;
@@ -125,6 +126,10 @@ public void setUp() throws Exception {
125126
SpannerOptions.newBuilder()
126127
.setProjectId(project)
127128
.disableGrpcGcpExtension()
129+
.setSessionPoolOption(
130+
SessionPoolOptions.newBuilder()
131+
.setWaitForMinSessionsDuration(java.time.Duration.ofMinutes(5))
132+
.build())
128133
.build()
129134
.getService();
130135

sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/IntegrationTestEnv.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.google.cloud.spanner.DatabaseClient;
2222
import com.google.cloud.spanner.DatabaseId;
2323
import com.google.cloud.spanner.Dialect;
24+
import com.google.cloud.spanner.SessionPoolOptions;
2425
import com.google.cloud.spanner.Spanner;
2526
import com.google.cloud.spanner.SpannerOptions;
2627
import java.util.ArrayList;
@@ -81,6 +82,10 @@ protected void before() throws Throwable {
8182
.setProjectId(projectId)
8283
.setHost(host)
8384
.disableGrpcGcpExtension()
85+
.setSessionPoolOption(
86+
SessionPoolOptions.newBuilder()
87+
.setWaitForMinSessionsDuration(java.time.Duration.ofMinutes(5))
88+
.build())
8489
.build()
8590
.getService();
8691
databaseAdminClient = spanner.getDatabaseAdminClient();

0 commit comments

Comments
 (0)