Skip to content

Commit 02d8f56

Browse files
authored
[Dataflow Streaming] Remove rolled out Windmill isolated channels flag (#37844)
1 parent ab56619 commit 02d8f56

10 files changed

Lines changed: 25 additions & 208 deletions

File tree

runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,9 @@ public interface DataflowStreamingPipelineOptions extends PipelineOptions {
126126

127127
void setWindmillMessagesBetweenIsReadyChecks(int value);
128128

129-
@Description("If true, a most a single active rpc will be used per channel.")
129+
/** @deprecated since 2.73.0 */
130+
@Deprecated
131+
@Description("Unused flag.")
130132
Boolean getUseWindmillIsolatedChannels();
131133

132134
void setUseWindmillIsolatedChannels(Boolean value);

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -725,7 +725,7 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o
725725
Function<ComputationConfig.Fetcher, ComputationStateCache> computationStateCacheFactory) {
726726
if (options.isEnableStreamingEngine()) {
727727
GrpcDispatcherClient dispatcherClient =
728-
GrpcDispatcherClient.create(options, new WindmillStubFactoryFactoryImpl(options));
728+
GrpcDispatcherClient.create(new WindmillStubFactoryFactoryImpl(options));
729729
ComputationConfig.Fetcher configFetcher =
730730
StreamingEngineComputationConfigFetcher.create(
731731
options.getGlobalConfigRefreshPeriod().getMillis(), dataflowServiceClient);
@@ -753,7 +753,7 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o
753753
if (options.getWindmillServiceEndpoint() != null
754754
|| options.getLocalWindmillHostport().startsWith("grpc:")) {
755755
GrpcDispatcherClient dispatcherClient =
756-
GrpcDispatcherClient.create(options, new WindmillStubFactoryFactoryImpl(options));
756+
GrpcDispatcherClient.create(new WindmillStubFactoryFactoryImpl(options));
757757
GrpcWindmillStreamFactory windmillStreamFactory =
758758
windmillStreamFactoryBuilder
759759
.setHealthCheckIntervalMillis(
@@ -920,7 +920,7 @@ static StreamingDataflowWorker forTesting(
920920
createGrpcwindmillStreamFactoryBuilder(options, 1)
921921
.setProcessHeartbeatResponses(
922922
new WorkHeartbeatResponseProcessor(computationStateCache::get));
923-
GrpcDispatcherClient grpcDispatcherClient = GrpcDispatcherClient.create(options, stubFactory);
923+
GrpcDispatcherClient grpcDispatcherClient = GrpcDispatcherClient.create(stubFactory);
924924
grpcDispatcherClient.consumeWindmillDispatcherEndpoints(
925925
ImmutableSet.<HostAndPort>builder()
926926
.add(HostAndPort.fromHost("StreamingDataflowWorkerTest"))

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java

Lines changed: 7 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,9 @@
2828
import java.util.Set;
2929
import java.util.concurrent.CountDownLatch;
3030
import java.util.concurrent.TimeUnit;
31-
import java.util.concurrent.atomic.AtomicBoolean;
3231
import java.util.concurrent.atomic.AtomicReference;
3332
import javax.annotation.concurrent.GuardedBy;
3433
import javax.annotation.concurrent.ThreadSafe;
35-
import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
3634
import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfig;
3735
import org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillMetadataServiceV1Alpha1Grpc;
3836
import org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillMetadataServiceV1Alpha1Grpc.CloudWindmillMetadataServiceV1Alpha1Stub;
@@ -66,42 +64,25 @@ public class GrpcDispatcherClient {
6664
@GuardedBy("this")
6765
private final Random rand;
6866

69-
private final WindmillStubFactoryFactory windmillStubFactoryFactory;
70-
71-
private final AtomicReference<WindmillStubFactory> windmillStubFactory = new AtomicReference<>();
72-
73-
private final AtomicBoolean useIsolatedChannels = new AtomicBoolean();
74-
private final boolean reactToIsolatedChannelsJobSetting;
67+
private final WindmillStubFactory windmillStubFactory;
7568

7669
private GrpcDispatcherClient(
77-
DataflowWorkerHarnessOptions options,
7870
WindmillStubFactoryFactory windmillStubFactoryFactory,
7971
DispatcherStubs initialDispatcherStubs,
8072
Random rand) {
81-
this.windmillStubFactoryFactory = windmillStubFactoryFactory;
82-
if (options.getUseWindmillIsolatedChannels() != null) {
83-
this.useIsolatedChannels.set(options.getUseWindmillIsolatedChannels());
84-
this.reactToIsolatedChannelsJobSetting = false;
85-
} else {
86-
this.useIsolatedChannels.set(false);
87-
this.reactToIsolatedChannelsJobSetting = true;
88-
}
89-
this.windmillStubFactory.set(
90-
windmillStubFactoryFactory.makeWindmillStubFactory(useIsolatedChannels.get()));
73+
this.windmillStubFactory = windmillStubFactoryFactory.makeWindmillStubFactory();
9174
this.rand = rand;
9275
this.dispatcherStubs = new AtomicReference<>(initialDispatcherStubs);
9376
this.onInitializedEndpoints = new CountDownLatch(1);
9477
}
9578

96-
public static GrpcDispatcherClient create(
97-
DataflowWorkerHarnessOptions options, WindmillStubFactoryFactory windmillStubFactoryFactory) {
79+
public static GrpcDispatcherClient create(WindmillStubFactoryFactory windmillStubFactoryFactory) {
9880
return new GrpcDispatcherClient(
99-
options, windmillStubFactoryFactory, DispatcherStubs.empty(), new Random());
81+
windmillStubFactoryFactory, DispatcherStubs.empty(), new Random());
10082
}
10183

10284
@VisibleForTesting
10385
public static GrpcDispatcherClient forTesting(
104-
DataflowWorkerHarnessOptions options,
10586
WindmillStubFactoryFactory windmillStubFactoryFactory,
10687
List<CloudWindmillServiceV1Alpha1Stub> windmillServiceStubs,
10788
List<CloudWindmillMetadataServiceV1Alpha1Stub> windmillMetadataServiceStubs,
@@ -110,7 +91,6 @@ public static GrpcDispatcherClient forTesting(
11091
dispatcherEndpoints.size() == windmillServiceStubs.size()
11192
&& windmillServiceStubs.size() == windmillMetadataServiceStubs.size());
11293
return new GrpcDispatcherClient(
113-
options,
11494
windmillStubFactoryFactory,
11595
DispatcherStubs.create(
11696
dispatcherEndpoints, windmillServiceStubs, windmillMetadataServiceStubs),
@@ -172,31 +152,17 @@ public void onJobConfig(StreamingGlobalConfig config) {
172152
LOG.warn("Dispatcher client received empty windmill service endpoints from global config");
173153
return;
174154
}
175-
boolean forceRecreateStubs = false;
176-
if (reactToIsolatedChannelsJobSetting) {
177-
boolean useIsolatedChannels = config.userWorkerJobSettings().getUseWindmillIsolatedChannels();
178-
if (this.useIsolatedChannels.getAndSet(useIsolatedChannels) != useIsolatedChannels) {
179-
windmillStubFactory.set(
180-
windmillStubFactoryFactory.makeWindmillStubFactory(useIsolatedChannels));
181-
forceRecreateStubs = true;
182-
}
183-
}
184-
consumeWindmillDispatcherEndpoints(config.windmillServiceEndpoints(), forceRecreateStubs);
155+
consumeWindmillDispatcherEndpoints(config.windmillServiceEndpoints());
185156
}
186157

187158
public synchronized void consumeWindmillDispatcherEndpoints(
188159
ImmutableSet<HostAndPort> dispatcherEndpoints) {
189-
consumeWindmillDispatcherEndpoints(dispatcherEndpoints, /* forceRecreateStubs= */ false);
190-
}
191-
192-
private synchronized void consumeWindmillDispatcherEndpoints(
193-
ImmutableSet<HostAndPort> dispatcherEndpoints, boolean forceRecreateStubs) {
194160
ImmutableSet<HostAndPort> currentDispatcherEndpoints =
195161
dispatcherStubs.get().dispatcherEndpoints();
196162
Preconditions.checkArgument(
197163
dispatcherEndpoints != null && !dispatcherEndpoints.isEmpty(),
198164
"Cannot set dispatcher endpoints to nothing.");
199-
if (!forceRecreateStubs && currentDispatcherEndpoints.equals(dispatcherEndpoints)) {
165+
if (currentDispatcherEndpoints.equals(dispatcherEndpoints)) {
200166
// The endpoints are equal don't recreate the stubs.
201167
return;
202168
}
@@ -207,7 +173,7 @@ private synchronized void consumeWindmillDispatcherEndpoints(
207173
}
208174

209175
LOG.info("Initializing Streaming Engine GRPC client for endpoints: {}", dispatcherEndpoints);
210-
dispatcherStubs.set(DispatcherStubs.create(dispatcherEndpoints, windmillStubFactory.get()));
176+
dispatcherStubs.set(DispatcherStubs.create(dispatcherEndpoints, windmillStubFactory));
211177
onInitializedEndpoints.countDown();
212178
}
213179

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,6 @@ static GrpcWindmillServer newTestInstance(
166166
Set<HostAndPort> dispatcherEndpoints = Sets.newHashSet(HostAndPort.fromHost(name));
167167
GrpcDispatcherClient dispatcherClient =
168168
GrpcDispatcherClient.forTesting(
169-
testOptions,
170169
windmillStubFactoryFactory,
171170
windmillServiceStubs,
172171
windmillMetadataServiceStubs,
@@ -198,7 +197,7 @@ static GrpcWindmillServer newApplianceTestInstance(
198197
options,
199198
GrpcWindmillStreamFactory.of(createJobHeader(options, 1)).build(),
200199
// No-op, Appliance does not use Dispatcher to call Streaming Engine.
201-
GrpcDispatcherClient.create(options, windmillStubFactoryFactory));
200+
GrpcDispatcherClient.create(windmillStubFactoryFactory));
202201
testServer.syncApplianceStub = createWindmillApplianceStubWithDeadlineInterceptor(channel);
203202
return testServer;
204203
}

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillStubFactoryFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,5 +21,5 @@
2121

2222
@Internal
2323
public interface WindmillStubFactoryFactory {
24-
WindmillStubFactory makeWindmillStubFactory(boolean useIsolatedChannels);
24+
WindmillStubFactory makeWindmillStubFactory();
2525
}

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillStubFactoryFactoryImpl.java

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -33,24 +33,19 @@ public WindmillStubFactoryFactoryImpl(DataflowWorkerHarnessOptions workerOptions
3333
}
3434

3535
@Override
36-
public WindmillStubFactory makeWindmillStubFactory(boolean useIsolatedChannels) {
36+
public WindmillStubFactory makeWindmillStubFactory() {
3737
ChannelCache channelCache =
3838
ChannelCache.create(
3939
(flowControlSettings, serviceAddress) ->
4040
// IsolationChannel will create and manage separate RPC channels to the same
4141
// serviceAddress via calling the channelFactory, else just directly return the
4242
// RPC channel.
43-
useIsolatedChannels
44-
? IsolationChannel.create(
45-
() ->
46-
remoteChannel(
47-
serviceAddress.getServiceAddress(),
48-
windmillServiceRpcChannelAliveTimeoutSec,
49-
flowControlSettings))
50-
: remoteChannel(
51-
serviceAddress.getServiceAddress(),
52-
windmillServiceRpcChannelAliveTimeoutSec,
53-
flowControlSettings));
43+
IsolationChannel.create(
44+
() ->
45+
remoteChannel(
46+
serviceAddress.getServiceAddress(),
47+
windmillServiceRpcChannelAliveTimeoutSec,
48+
flowControlSettings)));
5449
return ChannelCachingRemoteStubFactory.create(gcpCredential, channelCache);
5550
}
5651
}

runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarnessTest.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@
3636
import java.util.concurrent.TimeUnit;
3737
import java.util.stream.Collectors;
3838
import javax.annotation.Nullable;
39-
import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
4039
import org.apache.beam.runners.dataflow.worker.util.MemoryMonitor;
4140
import org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillMetadataServiceV1Alpha1Grpc;
4241
import org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc;
@@ -58,7 +57,6 @@
5857
import org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudget;
5958
import org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudgetDistributor;
6059
import org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudgetSpender;
61-
import org.apache.beam.sdk.options.PipelineOptionsFactory;
6260
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.Server;
6361
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.inprocess.InProcessServerBuilder;
6462
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.inprocess.InProcessSocketAddress;
@@ -112,7 +110,6 @@ public class FanOutStreamingEngineWorkerHarnessTest {
112110
() -> grpcCleanup.register(WindmillChannels.inProcessChannel(CHANNEL_NAME)));
113111
private final GrpcDispatcherClient dispatcherClient =
114112
GrpcDispatcherClient.forTesting(
115-
PipelineOptionsFactory.as(DataflowWorkerHarnessOptions.class),
116113
new FakeWindmillStubFactoryFactory(stubFactory),
117114
new ArrayList<>(),
118115
new ArrayList<>(),

runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClientTest.java

Lines changed: 0 additions & 141 deletions
This file was deleted.

runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/testing/FakeWindmillStubFactoryFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ public FakeWindmillStubFactoryFactory(WindmillStubFactory windmillStubFactory) {
2929
}
3030

3131
@Override
32-
public WindmillStubFactory makeWindmillStubFactory(boolean useIsolatedChannels) {
32+
public WindmillStubFactory makeWindmillStubFactory() {
3333
return windmillStubFactory;
3434
}
3535
}

runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -966,16 +966,15 @@ enum ConnectivityType {
966966

967967
// Settings to control runtime behavior of the java runner v1 user worker.
968968
message UserWorkerRunnerV1Settings {
969-
// If true, use separate channels for each windmill RPC.
970-
optional bool use_windmill_isolated_channels = 1 [default = true];
971-
972969
// If true, use separate streaming RPC for windmill heartbeats and state reads.
973970
optional bool use_separate_windmill_heartbeat_streams = 2 [default = true];
974971

975972
optional UserWorkerGrpcFlowControlSettings flow_control_settings = 3;
976973

977974
optional ConnectivityType connectivity_type = 4
978975
[default = CONNECTIVITY_TYPE_DEFAULT];
976+
977+
reserved 1;
979978
}
980979

981980
service WindmillAppliance {

0 commit comments

Comments
 (0)