Skip to content

Commit 4cdcf04

Browse files
authored
Add connectivity type tracking to endpoint consumption logic (#38249)
* Add connectivity type tracking to endpoint consumption logic * Addressing review comments * Removing redundant version check
1 parent 357862a commit 4cdcf04

5 files changed

Lines changed: 186 additions & 60 deletions

File tree

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,12 @@ public final class FanOutStreamingEngineWorkerHarness implements StreamingWorker
109109
@GuardedBy("metadataLock")
110110
private long pendingMetadataVersion;
111111

112+
@GuardedBy("this")
113+
private WindmillEndpoints.Type activeMetadataType;
114+
115+
@GuardedBy("metadataLock")
116+
private WindmillEndpoints.Type pendingMetadataType;
117+
112118
@GuardedBy("this")
113119
private boolean started;
114120

@@ -141,6 +147,8 @@ private FanOutStreamingEngineWorkerHarness(
141147
this.getWorkBudgetDistributor = getWorkBudgetDistributor;
142148
this.totalGetWorkBudget = totalGetWorkBudget;
143149
this.activeMetadataVersion = Long.MIN_VALUE;
150+
this.activeMetadataType = WindmillEndpoints.Type.UNKNOWN;
151+
this.pendingMetadataType = WindmillEndpoints.Type.UNKNOWN;
144152
this.workCommitterFactory = workCommitterFactory;
145153
}
146154

@@ -271,8 +279,13 @@ private void consumeWorkerMetadata(WindmillEndpoints windmillEndpoints) {
271279
synchronized (metadataLock) {
272280
// Only process versions greater than what we currently have to prevent double processing of
273281
// metadata. workerMetadataConsumer is single-threaded so we maintain ordering.
274-
if (windmillEndpoints.version() > pendingMetadataVersion) {
282+
// The endpoints are also consumed if the version is the same but the type of endpoints
283+
// sent by the server has changed.
284+
if (windmillEndpoints.version() > pendingMetadataVersion
285+
|| (windmillEndpoints.type() != WindmillEndpoints.Type.UNKNOWN
286+
&& windmillEndpoints.type() != pendingMetadataType)) {
275287
pendingMetadataVersion = windmillEndpoints.version();
288+
pendingMetadataType = windmillEndpoints.type();
276289
workerMetadataConsumer.execute(() -> consumeWindmillWorkerEndpoints(windmillEndpoints));
277290
}
278291
}
@@ -283,16 +296,19 @@ private synchronized void consumeWindmillWorkerEndpoints(WindmillEndpoints newWi
283296
// queued up while a previous version of the windmillEndpoints were being consumed. Only consume
284297
// the endpoints if they are the most current version.
285298
synchronized (metadataLock) {
286-
if (newWindmillEndpoints.version() < pendingMetadataVersion) {
299+
if (newWindmillEndpoints.version() < pendingMetadataVersion
300+
|| newWindmillEndpoints.type() != pendingMetadataType) {
287301
return;
288302
}
289303
}
290304

291-
LOG.debug(
292-
"Consuming new endpoints: {}. previous metadata version: {}, current metadata version: {}",
305+
LOG.info(
306+
"Consuming new endpoints: {}. previous metadata version: {}, current metadata version: {}, previous endpoint type: {}, current endpoint type: {}",
293307
newWindmillEndpoints,
294308
activeMetadataVersion,
295-
newWindmillEndpoints.version());
309+
newWindmillEndpoints.version(),
310+
activeMetadataType,
311+
newWindmillEndpoints.type());
296312
closeStreamsNotIn(newWindmillEndpoints).join();
297313
ImmutableMap<Endpoint, WindmillStreamSender> newStreams =
298314
createAndStartNewStreams(newWindmillEndpoints.windmillEndpoints()).join();
@@ -305,6 +321,7 @@ private synchronized void consumeWindmillWorkerEndpoints(WindmillEndpoints newWi
305321
backends.set(newBackends);
306322
getWorkBudgetDistributor.distributeBudget(newStreams.values(), totalGetWorkBudget);
307323
activeMetadataVersion = newWindmillEndpoints.version();
324+
activeMetadataType = newWindmillEndpoints.type();
308325
}
309326

310327
/** Close the streams that are no longer valid asynchronously. */

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillEndpoints.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,13 +39,31 @@
3939
*/
4040
@AutoValue
4141
public abstract class WindmillEndpoints {
42+
public enum Type {
43+
UNKNOWN,
44+
CLOUDPATH,
45+
DIRECTPATH;
46+
47+
static Type fromProto(Windmill.WorkerMetadataResponse.EndpointType protoType) {
48+
switch (protoType) {
49+
case CLOUDPATH:
50+
return CLOUDPATH;
51+
case DIRECTPATH:
52+
return DIRECTPATH;
53+
default:
54+
return UNKNOWN;
55+
}
56+
}
57+
}
58+
4259
public static final int DEFAULT_WINDMILL_SERVICE_PORT = 443;
4360
private static final Logger LOG = LoggerFactory.getLogger(WindmillEndpoints.class);
4461
private static final WindmillEndpoints NO_ENDPOINTS =
4562
WindmillEndpoints.builder()
4663
.setVersion(Long.MAX_VALUE)
4764
.setWindmillEndpoints(ImmutableSet.of())
4865
.setGlobalDataEndpoints(ImmutableMap.of())
66+
.setType(Type.UNKNOWN)
4967
.build();
5068

5169
public static WindmillEndpoints none() {
@@ -75,6 +93,7 @@ public static WindmillEndpoints from(
7593
.setVersion(workerMetadataResponseProto.getMetadataVersion())
7694
.setGlobalDataEndpoints(globalDataServers)
7795
.setWindmillEndpoints(windmillServers)
96+
.setType(Type.fromProto(workerMetadataResponseProto.getEndpointType()))
7897
.build();
7998
}
8099

@@ -138,6 +157,8 @@ private static Optional<HostAndPort> tryParseDirectEndpointIntoIpV6Address(
138157
/** Version of the endpoints which increases with every modification. */
139158
public abstract long version();
140159

160+
public abstract Type type();
161+
141162
/**
142163
* Used by GetData GlobalDataRequest(s) to support Beam side inputs. Returns a map where the key
143164
* is a global data tag and the value is the endpoint where the data associated with the global
@@ -221,6 +242,8 @@ public abstract static class Builder {
221242
public abstract static class Builder {
222243
public abstract Builder setVersion(long version);
223244

245+
public abstract Builder setType(Type type);
246+
224247
public abstract Builder setGlobalDataEndpoints(
225248
ImmutableMap<String, WindmillEndpoints.Endpoint> globalDataServers);
226249

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetWorkerMetadataStream.java

Lines changed: 3 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -114,22 +114,10 @@ public static GrpcGetWorkerMetadataStream create(
114114
private Optional<WindmillEndpoints> extractWindmillEndpointsFrom(
115115
WorkerMetadataResponse response) {
116116
synchronized (metadataLock) {
117-
if (response.getMetadataVersion() > latestResponse.getMetadataVersion()) {
118-
this.latestResponse = response;
119-
this.latestResponseReceived = Instant.now();
120-
return Optional.of(WindmillEndpoints.from(response));
121-
} else {
122-
// If the currentMetadataVersion is greater than or equal to one in the response, the
123-
// response data is stale, and we do not want to do anything.
124-
LOG.debug(
125-
"Received metadata version={}; Current metadata version={}. "
126-
+ "Skipping update because received stale metadata",
127-
response.getMetadataVersion(),
128-
latestResponse.getMetadataVersion());
129-
}
117+
this.latestResponse = response;
118+
this.latestResponseReceived = Instant.now();
119+
return Optional.of(WindmillEndpoints.from(response));
130120
}
131-
132-
return Optional.empty();
133121
}
134122

135123
@Override

runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarnessTest.java

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
*/
1818
package org.apache.beam.runners.dataflow.worker.streaming.harness;
1919

20+
import static com.google.common.truth.Truth.assertThat;
2021
import static org.junit.Assert.assertEquals;
2122
import static org.junit.Assert.assertFalse;
2223
import static org.junit.Assert.assertTrue;
@@ -348,6 +349,143 @@ public void testOnNewWorkerMetadata_redistributesBudget() throws InterruptedExce
348349
TimeUnit.SECONDS.sleep(WAIT_FOR_METADATA_INJECTIONS_SECONDS);
349350
}
350351

352+
@Test
353+
public void testOnNewWorkerMetadata_alternatesConnectivityTypesAndRemovesStaleEndpoints()
354+
throws InterruptedException {
355+
String workerToken = "workerToken1";
356+
357+
WorkerMetadataResponse cloudPathMetadata =
358+
WorkerMetadataResponse.newBuilder()
359+
.setMetadataVersion(1)
360+
.setEndpointType(Windmill.WorkerMetadataResponse.EndpointType.CLOUDPATH)
361+
.addWorkEndpoints(
362+
WorkerMetadataResponse.Endpoint.newBuilder()
363+
.setBackendWorkerToken(workerToken)
364+
.build())
365+
.putAllGlobalDataEndpoints(DEFAULT)
366+
.build();
367+
WorkerMetadataResponse directPathMetadata =
368+
WorkerMetadataResponse.newBuilder()
369+
.setMetadataVersion(1)
370+
.setEndpointType(Windmill.WorkerMetadataResponse.EndpointType.DIRECTPATH)
371+
.addWorkEndpoints(
372+
WorkerMetadataResponse.Endpoint.newBuilder()
373+
.setBackendWorkerToken(workerToken + "1")
374+
.build())
375+
.addWorkEndpoints(
376+
WorkerMetadataResponse.Endpoint.newBuilder()
377+
.setBackendWorkerToken(workerToken + "2")
378+
.build())
379+
.putAllGlobalDataEndpoints(DEFAULT)
380+
.build();
381+
WorkerMetadataResponse directPathMetadata2 =
382+
WorkerMetadataResponse.newBuilder()
383+
.setMetadataVersion(1)
384+
.setEndpointType(Windmill.WorkerMetadataResponse.EndpointType.DIRECTPATH)
385+
.addWorkEndpoints(
386+
WorkerMetadataResponse.Endpoint.newBuilder()
387+
.setBackendWorkerToken(workerToken + "3")
388+
.build())
389+
.putAllGlobalDataEndpoints(DEFAULT)
390+
.build();
391+
392+
TestGetWorkBudgetDistributor getWorkBudgetDistributor = spy(new TestGetWorkBudgetDistributor());
393+
fanOutStreamingEngineWorkProvider =
394+
newFanOutStreamingEngineWorkerHarness(
395+
GetWorkBudget.builder().setItems(1).setBytes(1).build(),
396+
getWorkBudgetDistributor,
397+
noOpProcessWorkItemFn());
398+
399+
// Sequence : CLOUDPATH -> DIRECTPATH -> CLOUDPATH -> DIRECTPATH
400+
// Start with CLOUDPATH (version 1, 1 endpoint)
401+
// Verifies: version > pendingMetadataVersion condition triggers consumption
402+
fakeGetWorkerMetadataStub.injectWorkerMetadata(cloudPathMetadata);
403+
verify(getWorkBudgetDistributor, times(1)).distributeBudget(any(), any());
404+
TimeUnit.SECONDS.sleep(WAIT_FOR_METADATA_INJECTIONS_SECONDS);
405+
assertThat(fanOutStreamingEngineWorkProvider.currentBackends().windmillStreams()).hasSize(1);
406+
assertThat(
407+
fanOutStreamingEngineWorkProvider.currentBackends().windmillStreams().keySet().stream()
408+
.map(endpoint -> endpoint.workerToken().orElse(""))
409+
.collect(Collectors.toSet()))
410+
.contains(workerToken);
411+
412+
// Switch to DIRECTPATH (same version 1, 2 endpoints, different type)
413+
// Verifies: type change at same version triggers consumption (consumeWorkerMetadata lines
414+
// 284-286)
415+
fakeGetWorkerMetadataStub.injectWorkerMetadata(directPathMetadata);
416+
verify(getWorkBudgetDistributor, times(2)).distributeBudget(any(), any());
417+
TimeUnit.SECONDS.sleep(WAIT_FOR_METADATA_INJECTIONS_SECONDS);
418+
assertThat(fanOutStreamingEngineWorkProvider.currentBackends().windmillStreams().values())
419+
.hasSize(2);
420+
// Verifies: stale CLOUDPATH endpoint is not consumed
421+
Set<String> directPathTokens =
422+
fanOutStreamingEngineWorkProvider.currentBackends().windmillStreams().keySet().stream()
423+
.map(endpoint -> endpoint.workerToken().orElse(""))
424+
.collect(Collectors.toSet());
425+
assertThat(directPathTokens).contains(workerToken + "1");
426+
assertThat(directPathTokens).contains(workerToken + "2");
427+
assertThat(directPathTokens).containsNoneIn(java.util.Arrays.asList(workerToken));
428+
429+
// Switch back to CLOUDPATH (same version 1, 1 endpoint, different type)
430+
fakeGetWorkerMetadataStub.injectWorkerMetadata(cloudPathMetadata);
431+
verify(getWorkBudgetDistributor, times(3)).distributeBudget(any(), any());
432+
TimeUnit.SECONDS.sleep(WAIT_FOR_METADATA_INJECTIONS_SECONDS);
433+
assertThat(fanOutStreamingEngineWorkProvider.currentBackends().windmillStreams().values())
434+
.hasSize(1);
435+
// Verifies: stale DIRECTPATH endpoints are not consumed
436+
Set<String> cloudPathTokens =
437+
fanOutStreamingEngineWorkProvider.currentBackends().windmillStreams().keySet().stream()
438+
.map(endpoint -> endpoint.workerToken().orElse(""))
439+
.collect(Collectors.toSet());
440+
assertThat(cloudPathTokens).contains(workerToken);
441+
assertThat(cloudPathTokens)
442+
.containsNoneIn(java.util.Arrays.asList(workerToken + "1", workerToken + "2"));
443+
444+
// Switch to DIRECTPATH (same version 1, 2 endpoints, different type)
445+
// Verifies: type change works in both directions
446+
fakeGetWorkerMetadataStub.injectWorkerMetadata(directPathMetadata);
447+
verify(getWorkBudgetDistributor, times(4)).distributeBudget(any(), any());
448+
TimeUnit.SECONDS.sleep(WAIT_FOR_METADATA_INJECTIONS_SECONDS);
449+
assertThat(fanOutStreamingEngineWorkProvider.currentBackends().windmillStreams()).hasSize(2);
450+
directPathTokens =
451+
fanOutStreamingEngineWorkProvider.currentBackends().windmillStreams().keySet().stream()
452+
.map(endpoint -> endpoint.workerToken().orElse(""))
453+
.collect(Collectors.toSet());
454+
assertThat(directPathTokens).contains(workerToken + "1");
455+
assertThat(directPathTokens).contains(workerToken + "2");
456+
assertThat(directPathTokens).containsNoneIn(java.util.Arrays.asList(workerToken));
457+
458+
// Switch to DIRECTPATH (same version 1, 1 endpoint, same type)
459+
// Verifies: same version same type does not trigger consumption, endpoints remain the same
460+
fakeGetWorkerMetadataStub.injectWorkerMetadata(directPathMetadata2);
461+
verify(getWorkBudgetDistributor, times(4)).distributeBudget(any(), any());
462+
TimeUnit.SECONDS.sleep(WAIT_FOR_METADATA_INJECTIONS_SECONDS);
463+
assertThat(fanOutStreamingEngineWorkProvider.currentBackends().windmillStreams()).hasSize(2);
464+
directPathTokens =
465+
fanOutStreamingEngineWorkProvider.currentBackends().windmillStreams().keySet().stream()
466+
.map(endpoint -> endpoint.workerToken().orElse(""))
467+
.collect(Collectors.toSet());
468+
assertThat(directPathTokens).contains(workerToken + "1");
469+
assertThat(directPathTokens).contains(workerToken + "2");
470+
assertThat(directPathTokens).containsNoneIn(java.util.Arrays.asList(workerToken + "3"));
471+
472+
directPathMetadata2 = directPathMetadata2.toBuilder().setMetadataVersion(2).build();
473+
474+
// Final switch back to DIRECTPATH (different version:2, 1 endpoint, same type)
475+
// Verifies: version change triggers consumption even if type is the same.
476+
fakeGetWorkerMetadataStub.injectWorkerMetadata(directPathMetadata2);
477+
verify(getWorkBudgetDistributor, times(5)).distributeBudget(any(), any());
478+
TimeUnit.SECONDS.sleep(WAIT_FOR_METADATA_INJECTIONS_SECONDS);
479+
assertThat(fanOutStreamingEngineWorkProvider.currentBackends().windmillStreams()).hasSize(1);
480+
directPathTokens =
481+
fanOutStreamingEngineWorkProvider.currentBackends().windmillStreams().keySet().stream()
482+
.map(endpoint -> endpoint.workerToken().orElse(""))
483+
.collect(Collectors.toSet());
484+
assertThat(directPathTokens)
485+
.containsNoneIn(java.util.Arrays.asList(workerToken + "1", workerToken + "2"));
486+
assertThat(directPathTokens).contains(workerToken + "3");
487+
}
488+
351489
private static class WindmillServiceFakeStub
352490
extends CloudWindmillServiceV1Alpha1Grpc.CloudWindmillServiceV1Alpha1ImplBase {
353491
@Override

runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetWorkerMetadataStreamTest.java

Lines changed: 0 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import static com.google.common.truth.Truth.assertThat;
2121
import static org.junit.Assert.assertTrue;
2222
import static org.mockito.Mockito.verify;
23-
import static org.mockito.Mockito.verifyNoMoreInteractions;
2423

2524
import java.io.IOException;
2625
import java.util.HashMap;
@@ -196,45 +195,6 @@ public void testGetWorkerMetadata_consumesSubsequentResponseMetadata() {
196195
.collect(Collectors.toList()));
197196
}
198197

199-
@Test
200-
public void testGetWorkerMetadata_doesNotConsumeResponseIfMetadataStale() {
201-
WorkerMetadataResponse freshEndpoints =
202-
WorkerMetadataResponse.newBuilder()
203-
.setMetadataVersion(2)
204-
.addAllWorkEndpoints(DIRECT_PATH_ENDPOINTS)
205-
.putAllGlobalDataEndpoints(GLOBAL_DATA_ENDPOINTS)
206-
.setExternalEndpoint(AUTHENTICATING_SERVICE)
207-
.build();
208-
209-
TestWindmillEndpointsConsumer testWindmillEndpointsConsumer =
210-
Mockito.spy(new TestWindmillEndpointsConsumer());
211-
GetWorkerMetadataTestStub testStub =
212-
new GetWorkerMetadataTestStub(new TestGetWorkMetadataRequestObserver());
213-
stream = getWorkerMetadataTestStream(testStub, testWindmillEndpointsConsumer);
214-
testStub.injectWorkerMetadata(freshEndpoints);
215-
216-
List<WorkerMetadataResponse.Endpoint> staleDirectPathEndpoints =
217-
Lists.newArrayList(
218-
WorkerMetadataResponse.Endpoint.newBuilder()
219-
.setDirectEndpoint("staleWindmillEndpoint")
220-
.build());
221-
Map<String, WorkerMetadataResponse.Endpoint> staleGlobalDataEndpoints = new HashMap<>();
222-
staleGlobalDataEndpoints.put(
223-
"stale_global_data",
224-
WorkerMetadataResponse.Endpoint.newBuilder().setDirectEndpoint("staleGlobalData").build());
225-
226-
testStub.injectWorkerMetadata(
227-
WorkerMetadataResponse.newBuilder()
228-
.setMetadataVersion(1)
229-
.addAllWorkEndpoints(staleDirectPathEndpoints)
230-
.putAllGlobalDataEndpoints(staleGlobalDataEndpoints)
231-
.build());
232-
233-
// Should have ignored the stale update and only used initial.
234-
verify(testWindmillEndpointsConsumer).accept(WindmillEndpoints.from(freshEndpoints));
235-
verifyNoMoreInteractions(testWindmillEndpointsConsumer);
236-
}
237-
238198
@Test
239199
public void testGetWorkerMetadata_correctlyAddsAndRemovesStreamFromRegistry()
240200
throws InterruptedException {

0 commit comments

Comments
 (0)