Skip to content

Commit 3b41e8b

Browse files
authored
Remove rolled out Windmill separate heartbeat stream flag (#37845)
1 parent 02d8f56 commit 3b41e8b

7 files changed

Lines changed: 20 additions & 153 deletions

File tree

runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -133,8 +133,9 @@ public interface DataflowStreamingPipelineOptions extends PipelineOptions {
133133

134134
void setUseWindmillIsolatedChannels(Boolean value);
135135

136-
@Description(
137-
"If true, separate streaming rpcs will be used for heartbeats instead of sharing streams with state reads.")
136+
/** @deprecated since beam 2.73.0 */
137+
@Deprecated
138+
@Description("Unused Flag")
138139
Boolean getUseSeparateWindmillHeartbeatStreams();
139140

140141
void setUseSeparateWindmillHeartbeatStreams(Boolean value);

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java

Lines changed: 4 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,6 @@
5454
import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingApplianceComputationConfigFetcher;
5555
import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingEngineComputationConfigFetcher;
5656
import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfig;
57-
import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfigHandle;
5857
import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfigHandleImpl;
5958
import org.apache.beam.runners.dataflow.worker.streaming.harness.FanOutStreamingEngineWorkerHarness;
6059
import org.apache.beam.runners.dataflow.worker.streaming.harness.SingleSourceWorkerHarness;
@@ -460,12 +459,7 @@ private StreamingWorkerHarnessFactoryOutput createSingleSourceWorkerHarness(
460459
windmillServer::getDataStream);
461460
GetDataClient getDataClient =
462461
new StreamPoolGetDataClient(getDataMetricTracker, getDataStreamPool);
463-
HeartbeatSender heartbeatSender =
464-
createStreamingEngineHeartbeatSender(
465-
options,
466-
windmillServer,
467-
getDataStreamPool,
468-
checkNotNull(configFetcher).getGlobalConfigHandle());
462+
HeartbeatSender heartbeatSender = createStreamingEngineHeartbeatSender(windmillServer);
469463
@SuppressWarnings("methodref.receiver.bound")
470464
WorkCommitter workCommitter =
471465
StreamingEngineWorkCommitter.builder()
@@ -603,25 +597,9 @@ private static ChannelzServlet createChannelzServlet(
603597
}
604598

605599
private static HeartbeatSender createStreamingEngineHeartbeatSender(
606-
DataflowWorkerHarnessOptions options,
607-
WindmillServerStub windmillClient,
608-
WindmillStreamPool<GetDataStream> getDataStreamPool,
609-
StreamingGlobalConfigHandle globalConfigHandle) {
610-
// Experiment gates the logic till backend changes are rollback safe
611-
if (!DataflowRunner.hasExperiment(
612-
options, STREAMING_ENGINE_USE_JOB_SETTINGS_FOR_HEARTBEAT_POOL_EXPERIMENT)
613-
|| options.getUseSeparateWindmillHeartbeatStreams() != null) {
614-
return StreamPoolHeartbeatSender.create(
615-
Boolean.TRUE.equals(options.getUseSeparateWindmillHeartbeatStreams())
616-
? WindmillStreamPool.create(1, GET_DATA_STREAM_TIMEOUT, windmillClient::getDataStream)
617-
: getDataStreamPool);
618-
619-
} else {
620-
return StreamPoolHeartbeatSender.create(
621-
WindmillStreamPool.create(1, GET_DATA_STREAM_TIMEOUT, windmillClient::getDataStream),
622-
getDataStreamPool,
623-
globalConfigHandle);
624-
}
600+
WindmillServerStub windmillClient) {
601+
return StreamPoolHeartbeatSender.create(
602+
WindmillStreamPool.create(1, GET_DATA_STREAM_TIMEOUT, windmillClient::getDataStream));
625603
}
626604

627605
public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions options) {

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/StreamPoolHeartbeatSender.java

Lines changed: 0 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
import java.util.concurrent.atomic.AtomicReference;
2121
import javax.annotation.Nonnull;
22-
import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfigHandle;
2322
import org.apache.beam.runners.dataflow.worker.windmill.client.CloseableStream;
2423
import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream;
2524
import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStreamPool;
@@ -47,32 +46,6 @@ public static StreamPoolHeartbeatSender create(
4746
return new StreamPoolHeartbeatSender(heartbeatStreamPool);
4847
}
4948

50-
/**
51-
* Creates StreamPoolHeartbeatSender that switches between the passed in stream pools depending on
52-
* global config.
53-
*
54-
* @param dedicatedHeartbeatPool stream to use when using separate streams for heartbeat is
55-
* enabled.
56-
* @param getDataPool stream to use when using separate streams for heartbeat is disabled.
57-
*/
58-
public static StreamPoolHeartbeatSender create(
59-
@Nonnull WindmillStreamPool<WindmillStream.GetDataStream> dedicatedHeartbeatPool,
60-
@Nonnull WindmillStreamPool<WindmillStream.GetDataStream> getDataPool,
61-
@Nonnull StreamingGlobalConfigHandle configHandle) {
62-
// Use getDataPool as the default, settings callback will
63-
// switch to the separate pool if enabled before processing any elements are processed.
64-
StreamPoolHeartbeatSender heartbeatSender = new StreamPoolHeartbeatSender(getDataPool);
65-
configHandle.registerConfigObserver(
66-
streamingGlobalConfig ->
67-
heartbeatSender.heartbeatStreamPool.set(
68-
streamingGlobalConfig
69-
.userWorkerJobSettings()
70-
.getUseSeparateWindmillHeartbeatStreams()
71-
? dedicatedHeartbeatPool
72-
: getDataPool));
73-
return heartbeatSender;
74-
}
75-
7649
@Override
7750
public void sendHeartbeats(Heartbeats heartbeats) {
7851
try (CloseableStream<WindmillStream.GetDataStream> closeableStream =

runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/FixedGlobalConfigHandleTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.concurrent.TimeUnit;
2525
import java.util.concurrent.atomic.AtomicReference;
2626
import org.apache.beam.runners.dataflow.worker.OperationalLimits;
27+
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.ConnectivityType;
2728
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.UserWorkerRunnerV1Settings;
2829
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
2930
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort;
@@ -47,7 +48,7 @@ public void getConfig() {
4748
.setWindmillServiceEndpoints(ImmutableSet.of(HostAndPort.fromHost("windmillHost")))
4849
.setUserWorkerJobSettings(
4950
UserWorkerRunnerV1Settings.newBuilder()
50-
.setUseSeparateWindmillHeartbeatStreams(false)
51+
.setConnectivityType(ConnectivityType.CONNECTIVITY_TYPE_CLOUDPATH)
5152
.build())
5253
.build();
5354
FixedGlobalConfigHandle globalConfigHandle = new FixedGlobalConfigHandle(config);
@@ -67,7 +68,7 @@ public void registerConfigObserver() throws InterruptedException {
6768
.setWindmillServiceEndpoints(ImmutableSet.of(HostAndPort.fromHost("windmillHost")))
6869
.setUserWorkerJobSettings(
6970
UserWorkerRunnerV1Settings.newBuilder()
70-
.setUseSeparateWindmillHeartbeatStreams(false)
71+
.setConnectivityType(ConnectivityType.CONNECTIVITY_TYPE_CLOUDPATH)
7172
.build())
7273
.build();
7374
FixedGlobalConfigHandle globalConfigHandle = new FixedGlobalConfigHandle(config);

runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingGlobalConfigHandleImplTest.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.util.concurrent.atomic.AtomicReference;
2828
import java.util.function.Supplier;
2929
import org.apache.beam.runners.dataflow.worker.OperationalLimits;
30+
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.ConnectivityType;
3031
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.UserWorkerRunnerV1Settings;
3132
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
3233
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort;
@@ -51,7 +52,7 @@ public void getConfig() {
5152
.setWindmillServiceEndpoints(ImmutableSet.of(HostAndPort.fromHost("windmillHost")))
5253
.setUserWorkerJobSettings(
5354
UserWorkerRunnerV1Settings.newBuilder()
54-
.setUseSeparateWindmillHeartbeatStreams(false)
55+
.setConnectivityType(ConnectivityType.CONNECTIVITY_TYPE_CLOUDPATH)
5556
.build())
5657
.build();
5758
globalConfigHandle.setConfig(config);
@@ -68,7 +69,7 @@ public void getConfig() {
6869
.setWindmillServiceEndpoints(ImmutableSet.of(HostAndPort.fromHost("windmillHost1")))
6970
.setUserWorkerJobSettings(
7071
UserWorkerRunnerV1Settings.newBuilder()
71-
.setUseSeparateWindmillHeartbeatStreams(true)
72+
.setConnectivityType(ConnectivityType.CONNECTIVITY_TYPE_CLOUDPATH)
7273
.build())
7374
.build();
7475
globalConfigHandle.setConfig(updatedConfig);
@@ -91,7 +92,7 @@ public void registerConfigObserver_configSetAfterRegisteringCallback()
9192
.setWindmillServiceEndpoints(ImmutableSet.of(HostAndPort.fromHost("windmillHost")))
9293
.setUserWorkerJobSettings(
9394
UserWorkerRunnerV1Settings.newBuilder()
94-
.setUseSeparateWindmillHeartbeatStreams(false)
95+
.setConnectivityType(ConnectivityType.CONNECTIVITY_TYPE_CLOUDPATH)
9596
.build())
9697
.build();
9798
AtomicReference<StreamingGlobalConfig> configFromCallback1 = new AtomicReference<>();
@@ -128,7 +129,7 @@ public void registerConfigObserver_configSetBeforeRegisteringCallback()
128129
.setWindmillServiceEndpoints(ImmutableSet.of(HostAndPort.fromHost("windmillHost")))
129130
.setUserWorkerJobSettings(
130131
UserWorkerRunnerV1Settings.newBuilder()
131-
.setUseSeparateWindmillHeartbeatStreams(false)
132+
.setConnectivityType(ConnectivityType.CONNECTIVITY_TYPE_CLOUDPATH)
132133
.build())
133134
.build();
134135
AtomicReference<StreamingGlobalConfig> configFromCallback1 = new AtomicReference<>();
@@ -165,7 +166,7 @@ public void registerConfigObserver_configSetBeforeRegisteringCallback_callbackTh
165166
.setWindmillServiceEndpoints(ImmutableSet.of(HostAndPort.fromHost("windmillHost")))
166167
.setUserWorkerJobSettings(
167168
UserWorkerRunnerV1Settings.newBuilder()
168-
.setUseSeparateWindmillHeartbeatStreams(false)
169+
.setConnectivityType(ConnectivityType.CONNECTIVITY_TYPE_CLOUDPATH)
169170
.build())
170171
.build();
171172
AtomicReference<StreamingGlobalConfig> configFromCallback = new AtomicReference<>();
@@ -200,7 +201,7 @@ public void registerConfigObserver_configSetAfterRegisteringCallback_callbackThr
200201
.setWindmillServiceEndpoints(ImmutableSet.of(HostAndPort.fromHost("windmillHost")))
201202
.setUserWorkerJobSettings(
202203
UserWorkerRunnerV1Settings.newBuilder()
203-
.setUseSeparateWindmillHeartbeatStreams(false)
204+
.setConnectivityType(ConnectivityType.CONNECTIVITY_TYPE_CLOUDPATH)
204205
.build())
205206
.build();
206207
AtomicReference<StreamingGlobalConfig> configFromCallback = new AtomicReference<>();
@@ -237,7 +238,7 @@ public void registerConfigObserver_shouldNotCallCallbackForIfConfigRemainsSame()
237238
.setWindmillServiceEndpoints(ImmutableSet.of(HostAndPort.fromHost("windmillHost")))
238239
.setUserWorkerJobSettings(
239240
UserWorkerRunnerV1Settings.newBuilder()
240-
.setUseSeparateWindmillHeartbeatStreams(false)
241+
.setConnectivityType(ConnectivityType.CONNECTIVITY_TYPE_CLOUDPATH)
241242
.build())
242243
.build();
243244
globalConfigHandle.registerConfigObserver(
@@ -273,7 +274,7 @@ public void registerConfigObserver_updateConfigWhenCallbackIsRunning()
273274
.setWindmillServiceEndpoints(ImmutableSet.of(HostAndPort.fromHost("windmillHost")))
274275
.setUserWorkerJobSettings(
275276
UserWorkerRunnerV1Settings.newBuilder()
276-
.setUseSeparateWindmillHeartbeatStreams(false)
277+
.setConnectivityType(ConnectivityType.CONNECTIVITY_TYPE_CLOUDPATH)
277278
.build())
278279
.build();
279280
CopyOnWriteArrayList<StreamingGlobalConfig> configsFromCallback = new CopyOnWriteArrayList<>();

runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/StreamPoolHeartbeatSenderTest.java

Lines changed: 0 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,7 @@
2121

2222
import java.util.Optional;
2323
import org.apache.beam.runners.dataflow.worker.FakeWindmillServer;
24-
import org.apache.beam.runners.dataflow.worker.streaming.config.FakeGlobalConfigHandle;
25-
import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfig;
2624
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.HeartbeatRequest;
27-
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.UserWorkerRunnerV1Settings;
2825
import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStreamPool;
2926
import org.joda.time.Duration;
3027
import org.junit.Test;
@@ -48,85 +45,4 @@ public void sendsHeartbeatsOnStream() {
4845
heartbeatSender.sendHeartbeats(heartbeatsBuilder.build());
4946
assertEquals(1, server.getGetDataRequests().size());
5047
}
51-
52-
@Test
53-
public void sendsHeartbeatsOnDedicatedStream() {
54-
FakeWindmillServer dedicatedServer =
55-
new FakeWindmillServer(new ErrorCollector(), c -> Optional.empty());
56-
FakeWindmillServer getDataServer =
57-
new FakeWindmillServer(new ErrorCollector(), c -> Optional.empty());
58-
59-
FakeGlobalConfigHandle configHandle =
60-
new FakeGlobalConfigHandle(getGlobalConfig(/*useSeparateHeartbeatStreams=*/ true));
61-
StreamPoolHeartbeatSender heartbeatSender =
62-
StreamPoolHeartbeatSender.create(
63-
WindmillStreamPool.create(
64-
1, Duration.standardSeconds(10), dedicatedServer::getDataStream),
65-
WindmillStreamPool.create(
66-
1, Duration.standardSeconds(10), getDataServer::getDataStream),
67-
configHandle);
68-
Heartbeats.Builder heartbeatsBuilder = Heartbeats.builder();
69-
heartbeatsBuilder
70-
.heartbeatRequestsBuilder()
71-
.put("key", HeartbeatRequest.newBuilder().setWorkToken(123).build());
72-
heartbeatSender.sendHeartbeats(heartbeatsBuilder.build());
73-
assertEquals(1, dedicatedServer.getGetDataRequests().size());
74-
assertEquals(0, getDataServer.getGetDataRequests().size());
75-
76-
heartbeatSender.sendHeartbeats(heartbeatsBuilder.build());
77-
assertEquals(2, dedicatedServer.getGetDataRequests().size());
78-
assertEquals(0, getDataServer.getGetDataRequests().size());
79-
80-
// Turn off separate heartbeats
81-
configHandle.setConfig(getGlobalConfig(/*useSeparateHeartbeatStreams=*/ false));
82-
heartbeatSender.sendHeartbeats(heartbeatsBuilder.build());
83-
// request to getDataServer increases and dedicatedServer remains same
84-
assertEquals(2, dedicatedServer.getGetDataRequests().size());
85-
assertEquals(1, getDataServer.getGetDataRequests().size());
86-
}
87-
88-
private static StreamingGlobalConfig getGlobalConfig(boolean useSeparateHeartbeatStreams) {
89-
return StreamingGlobalConfig.builder()
90-
.setUserWorkerJobSettings(
91-
UserWorkerRunnerV1Settings.newBuilder()
92-
.setUseSeparateWindmillHeartbeatStreams(useSeparateHeartbeatStreams)
93-
.build())
94-
.build();
95-
}
96-
97-
@Test
98-
public void sendsHeartbeatsOnGetDataStream() {
99-
FakeWindmillServer dedicatedServer =
100-
new FakeWindmillServer(new ErrorCollector(), c -> Optional.empty());
101-
FakeWindmillServer getDataServer =
102-
new FakeWindmillServer(new ErrorCollector(), c -> Optional.empty());
103-
104-
FakeGlobalConfigHandle configHandle =
105-
new FakeGlobalConfigHandle(getGlobalConfig(/*useSeparateHeartbeatStreams=*/ false));
106-
StreamPoolHeartbeatSender heartbeatSender =
107-
StreamPoolHeartbeatSender.create(
108-
WindmillStreamPool.create(
109-
1, Duration.standardSeconds(10), dedicatedServer::getDataStream),
110-
WindmillStreamPool.create(
111-
1, Duration.standardSeconds(10), getDataServer::getDataStream),
112-
configHandle);
113-
Heartbeats.Builder heartbeatsBuilder = Heartbeats.builder();
114-
heartbeatsBuilder
115-
.heartbeatRequestsBuilder()
116-
.put("key", HeartbeatRequest.newBuilder().setWorkToken(123).build());
117-
heartbeatSender.sendHeartbeats(heartbeatsBuilder.build());
118-
assertEquals(0, dedicatedServer.getGetDataRequests().size());
119-
assertEquals(1, getDataServer.getGetDataRequests().size());
120-
121-
heartbeatSender.sendHeartbeats(heartbeatsBuilder.build());
122-
assertEquals(0, dedicatedServer.getGetDataRequests().size());
123-
assertEquals(2, getDataServer.getGetDataRequests().size());
124-
125-
// Turn on separate heartbeats
126-
configHandle.setConfig(getGlobalConfig(/*useSeparateHeartbeatStreams=*/ true));
127-
heartbeatSender.sendHeartbeats(heartbeatsBuilder.build());
128-
// request to dedicatedServer increases and getDataServer remains same
129-
assertEquals(1, dedicatedServer.getGetDataRequests().size());
130-
assertEquals(2, getDataServer.getGetDataRequests().size());
131-
}
13248
}

runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -966,15 +966,12 @@ enum ConnectivityType {
966966

967967
// Settings to control runtime behavior of the java runner v1 user worker.
968968
message UserWorkerRunnerV1Settings {
969-
// If true, use separate streaming RPC for windmill heartbeats and state reads.
970-
optional bool use_separate_windmill_heartbeat_streams = 2 [default = true];
971-
972969
optional UserWorkerGrpcFlowControlSettings flow_control_settings = 3;
973970

974971
optional ConnectivityType connectivity_type = 4
975972
[default = CONNECTIVITY_TYPE_DEFAULT];
976973

977-
reserved 1;
974+
reserved 1, 2;
978975
}
979976

980977
service WindmillAppliance {

0 commit comments

Comments
 (0)