1818package org .apache .beam .runners .dataflow .worker ;
1919
2020import static org .apache .beam .runners .dataflow .worker .windmill .client .grpc .stubs .WindmillChannels .remoteChannel ;
21+ import static org .apache .beam .vendor .guava .v32_1_2_jre .com .google .common .base .Preconditions .checkNotNull ;
2122
2223import com .google .api .services .dataflow .model .MapTask ;
2324import com .google .auto .value .AutoValue ;
119120import org .apache .beam .vendor .guava .v32_1_2_jre .com .google .common .collect .ImmutableSet ;
120121import org .apache .beam .vendor .guava .v32_1_2_jre .com .google .common .net .HostAndPort ;
121122import org .apache .beam .vendor .guava .v32_1_2_jre .com .google .common .util .concurrent .ThreadFactoryBuilder ;
123+ import org .checkerframework .checker .initialization .qual .UnderInitialization ;
124+ import org .checkerframework .checker .initialization .qual .UnknownInitialization ;
122125import org .joda .time .Duration ;
123126import org .joda .time .Instant ;
124127import org .slf4j .Logger ;
129132 *
130133 * <p>Implements a Streaming Dataflow worker.
131134 */
132- @ SuppressWarnings ({
133- "nullness" // TODO(https://github.com/apache/beam/issues/20497)
134- })
135135@ Internal
136136public final class StreamingDataflowWorker {
137137
@@ -189,7 +189,7 @@ public final class StreamingDataflowWorker {
189189 private final StreamingWorkerStatusReporter workerStatusReporter ;
190190 private final int numCommitThreads ;
191191 private final Supplier <Instant > clock ;
192- private final GrpcDispatcherClient dispatcherClient ;
192+ private final @ Nullable GrpcDispatcherClient dispatcherClient ;
193193 private final ExecutorService harnessSwitchExecutor ;
194194 private final long clientId ;
195195 private final WindmillServerStub windmillServer ;
@@ -271,7 +271,7 @@ private StreamingDataflowWorker(
271271 streamingWorkScheduler ,
272272 getDataMetricTracker ,
273273 memoryMonitor ,
274- this .dispatcherClient );
274+ checkNotNull ( this .dispatcherClient ) );
275275 } else {
276276 harnessFactoryOutput =
277277 createSingleSourceWorkerHarness (
@@ -330,6 +330,8 @@ private StreamingDataflowWorker(
330330 }
331331
332332 private StreamingWorkerHarnessFactoryOutput createApplianceWorkerHarness (
333+ @ UnderInitialization ()
334+ StreamingDataflowWorker this , // Use receiver parameter syntax to allow annotation.
333335 long clientId ,
334336 DataflowWorkerHarnessOptions options ,
335337 WindmillServerStub windmillServer ,
@@ -345,6 +347,7 @@ private StreamingWorkerHarnessFactoryOutput createApplianceWorkerHarness(
345347
346348 GetDataClient getDataClient = new ApplianceGetDataClient (windmillServer , getDataMetricTracker );
347349 HeartbeatSender heartbeatSender = new ApplianceHeartbeatSender (windmillServer ::getData );
350+ @ SuppressWarnings ("methodref.receiver.bound" )
348351 WorkCommitter workCommitter =
349352 StreamingApplianceWorkCommitter .create (windmillServer ::commitWork , this ::onCompleteCommit );
350353 GetWorkSender getWorkSender = GetWorkSender .forAppliance (() -> windmillServer .getWork (request ));
@@ -355,7 +358,7 @@ private StreamingWorkerHarnessFactoryOutput createApplianceWorkerHarness(
355358 .setStreamingWorkScheduler (streamingWorkScheduler )
356359 .setWorkCommitter (workCommitter )
357360 .setGetDataClient (getDataClient )
358- .setComputationStateFetcher (this .computationStateCache ::get )
361+ .setComputationStateFetcher (checkNotNull ( this .computationStateCache ) ::get )
359362 .setWaitForResources (() -> memoryMonitor .waitForResources ("GetWork" ))
360363 .setHeartbeatSender (heartbeatSender )
361364 .setGetWorkSender (getWorkSender )
@@ -368,6 +371,8 @@ private StreamingWorkerHarnessFactoryOutput createApplianceWorkerHarness(
368371 }
369372
370373 private StreamingWorkerHarnessFactoryOutput createFanOutStreamingEngineWorkerHarness (
374+ @ UnknownInitialization ()
375+ StreamingDataflowWorker this , // Use receiver parameter syntax to allow annotation.
371376 long clientId ,
372377 DataflowWorkerHarnessOptions options ,
373378 GrpcWindmillStreamFactory windmillStreamFactory ,
@@ -376,7 +381,8 @@ private StreamingWorkerHarnessFactoryOutput createFanOutStreamingEngineWorkerHar
376381 MemoryMonitor memoryMonitor ,
377382 GrpcDispatcherClient dispatcherClient ) {
378383 WeightedSemaphore <Commit > maxCommitByteSemaphore = Commits .maxCommitByteSemaphore ();
379- ChannelCache channelCache = createChannelCache (options , configFetcher );
384+ ChannelCache channelCache = createChannelCache (options , checkNotNull (configFetcher ));
385+ @ SuppressWarnings ("methodref.receiver.bound" )
380386 FanOutStreamingEngineWorkerHarness fanOutStreamingEngineWorkerHarness =
381387 FanOutStreamingEngineWorkerHarness .create (
382388 createJobHeader (options , clientId ),
@@ -391,7 +397,7 @@ private StreamingWorkerHarnessFactoryOutput createFanOutStreamingEngineWorkerHar
391397 processingContext ,
392398 drainMode ,
393399 getWorkStreamLatencies ) ->
394- computationStateCache
400+ checkNotNull ( computationStateCache )
395401 .get (processingContext .computationId ())
396402 .ifPresent (
397403 computationState -> {
@@ -407,7 +413,7 @@ private StreamingWorkerHarnessFactoryOutput createFanOutStreamingEngineWorkerHar
407413 }),
408414 ChannelCachingRemoteStubFactory .create (options .getGcpCredential (), channelCache ),
409415 GetWorkBudgetDistributors .distributeEvenly (),
410- Preconditions . checkNotNull (dispatcherClient ),
416+ checkNotNull (dispatcherClient ),
411417 commitWorkStream ->
412418 StreamingEngineWorkCommitter .builder ()
413419 // Share the commitByteSemaphore across all created workCommitters.
@@ -433,6 +439,8 @@ private StreamingWorkerHarnessFactoryOutput createFanOutStreamingEngineWorkerHar
433439 }
434440
435441 private StreamingWorkerHarnessFactoryOutput createSingleSourceWorkerHarness (
442+ @ UnknownInitialization ()
443+ StreamingDataflowWorker this , // Use receiver parameter syntax to allow annotation.
436444 long clientId ,
437445 DataflowWorkerHarnessOptions options ,
438446 WindmillServerStub windmillServer ,
@@ -454,7 +462,11 @@ private StreamingWorkerHarnessFactoryOutput createSingleSourceWorkerHarness(
454462 new StreamPoolGetDataClient (getDataMetricTracker , getDataStreamPool );
455463 HeartbeatSender heartbeatSender =
456464 createStreamingEngineHeartbeatSender (
457- options , windmillServer , getDataStreamPool , configFetcher .getGlobalConfigHandle ());
465+ options ,
466+ windmillServer ,
467+ getDataStreamPool ,
468+ checkNotNull (configFetcher ).getGlobalConfigHandle ());
469+ @ SuppressWarnings ("methodref.receiver.bound" )
458470 WorkCommitter workCommitter =
459471 StreamingEngineWorkCommitter .builder ()
460472 .setCommitWorkStreamFactory (
@@ -476,7 +488,7 @@ private StreamingWorkerHarnessFactoryOutput createSingleSourceWorkerHarness(
476488 .setStreamingWorkScheduler (streamingWorkScheduler )
477489 .setWorkCommitter (workCommitter )
478490 .setGetDataClient (getDataClient )
479- .setComputationStateFetcher (this .computationStateCache ::get )
491+ .setComputationStateFetcher (checkNotNull ( this .computationStateCache ) ::get )
480492 .setWaitForResources (() -> memoryMonitor .waitForResources ("GetWork" ))
481493 .setHeartbeatSender (heartbeatSender )
482494 .setGetWorkSender (getWorkSender )
@@ -489,17 +501,20 @@ private StreamingWorkerHarnessFactoryOutput createSingleSourceWorkerHarness(
489501 }
490502
491503 private void switchStreamingWorkerHarness (ConnectivityType connectivityType ) {
492- if ((connectivityType == ConnectivityType .CONNECTIVITY_TYPE_DIRECTPATH
504+ if (connectivityType == ConnectivityType .CONNECTIVITY_TYPE_DEFAULT ) {
505+ return ;
506+ }
507+ boolean directPath = connectivityType == ConnectivityType .CONNECTIVITY_TYPE_DIRECTPATH ;
508+ if ((directPath
493509 && this .streamingWorkerHarness .get () instanceof FanOutStreamingEngineWorkerHarness )
494- || (connectivityType == ConnectivityType .CONNECTIVITY_TYPE_CLOUDPATH
495- && streamingWorkerHarness .get () instanceof SingleSourceWorkerHarness )) {
510+ || (!directPath && streamingWorkerHarness .get () instanceof SingleSourceWorkerHarness )) {
496511 return ;
497512 }
498513 // Stop the current status pages before switching the harness.
499514 this .statusPages .get ().stop ();
500515 LOG .debug ("Stopped StreamingWorkerStatusPages before switching connectivity type." );
501- StreamingWorkerHarnessFactoryOutput newHarnessFactoryOutput = null ;
502- if (connectivityType == ConnectivityType . CONNECTIVITY_TYPE_DIRECTPATH ) {
516+ StreamingWorkerHarnessFactoryOutput newHarnessFactoryOutput ;
517+ if (directPath ) {
503518 // If dataflow experiment `enable_windmill_service_direct_path` is not set for
504519 // the job, do not switch to FanOutStreamingEngineWorkerHarness. This is because
505520 // `enable_windmill_service_direct_path` is tied to SDK version and is only
@@ -524,11 +539,11 @@ private void switchStreamingWorkerHarness(ConnectivityType connectivityType) {
524539 this .streamingWorkScheduler ,
525540 this .getDataMetricTracker ,
526541 this .memoryMonitor .memoryMonitor (),
527- this .dispatcherClient );
542+ checkNotNull ( this .dispatcherClient ) );
528543 this .streamingWorkerHarness .set (newHarnessFactoryOutput .streamingWorkerHarness ());
529544 streamingWorkerHarness .get ().start ();
530545 LOG .debug ("Started FanOutStreamingEngineWorkerHarness" );
531- } else if ( connectivityType == ConnectivityType . CONNECTIVITY_TYPE_CLOUDPATH ) {
546+ } else {
532547 LOG .info ("Switching connectivity type from DIRECTPATH to CLOUDPATH" );
533548 LOG .debug ("Shutting down FanOutStreamingEngineWorkerHarness" );
534549 streamingWorkerHarness .get ().shutdown ();
0 commit comments