Skip to content

Commit b9a7585

Browse files
authored
[ErrorProne] Enable StaticAssignmentInConstructor check (#37786)
* Update static test state to use atomic variables for thread safety and re-enable StaticAssignmentInConstructor checkstyle rule. * checkstyle
1 parent 0f6c88e commit b9a7585

10 files changed

Lines changed: 83 additions & 72 deletions

File tree

buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1554,7 +1554,6 @@ class BeamModulePlugin implements Plugin<Project> {
15541554
"NonCanonicalType",
15551555
"Slf4jFormatShouldBeConst",
15561556
"Slf4jSignOnlyFormat",
1557-
"StaticAssignmentInConstructor",
15581557
"ThreadPriorityCheck",
15591558
"TimeUnitConversionChecker",
15601559
"UndefinedEquals",

runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import java.util.List;
3737
import java.util.NoSuchElementException;
3838
import java.util.concurrent.CountDownLatch;
39+
import java.util.concurrent.atomic.AtomicBoolean;
3940
import org.apache.beam.runners.direct.BoundedReadEvaluatorFactory.BoundedSourceShard;
4041
import org.apache.beam.sdk.coders.BigEndianLongCoder;
4142
import org.apache.beam.sdk.coders.Coder;
@@ -306,7 +307,7 @@ public void boundedSourceEvaluatorClosesReader() throws Exception {
306307
evaluator.finishBundle();
307308
CommittedBundle<Long> committed = output.commit(Instant.now());
308309
assertThat(committed.getElements(), containsInAnyOrder(gw(2L), gw(3L), gw(1L)));
309-
assertThat(TestSource.readerClosed, is(true));
310+
assertThat(TestSource.readerClosed.get(), is(true));
310311
}
311312

312313
@Test
@@ -326,7 +327,7 @@ public void boundedSourceEvaluatorNoElementsClosesReader() throws Exception {
326327
evaluator.finishBundle();
327328
CommittedBundle<Long> committed = output.commit(Instant.now());
328329
assertThat(committed.getElements(), emptyIterable());
329-
assertThat(TestSource.readerClosed, is(true));
330+
assertThat(TestSource.readerClosed.get(), is(true));
330331
}
331332

332333
@Test
@@ -336,7 +337,7 @@ public void cleanupShutsDownExecutor() {
336337
}
337338

338339
private static class TestSource<T> extends OffsetBasedSource<T> {
339-
private static boolean readerClosed;
340+
private static final AtomicBoolean readerClosed = new AtomicBoolean(false);
340341
private final Coder<T> coder;
341342
private final T[] elems;
342343
private final int firstSplitIndex;
@@ -352,7 +353,7 @@ public TestSource(Coder<T> coder, int firstSplitIndex, T... elems) {
352353
this.elems = elems;
353354
this.coder = coder;
354355
this.firstSplitIndex = firstSplitIndex;
355-
readerClosed = false;
356+
readerClosed.set(false);
356357

357358
subrangesCompleted = new CountDownLatch(2);
358359
}
@@ -449,7 +450,7 @@ public T getCurrent() throws NoSuchElementException {
449450

450451
@Override
451452
public void close() throws IOException {
452-
TestSource.readerClosed = true;
453+
TestSource.readerClosed.set(true);
453454
}
454455
}
455456

runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java

Lines changed: 24 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -339,8 +339,8 @@ public void evaluatorReusesReaderAndClosesAtTheEnd() throws Exception {
339339
} while (!Iterables.isEmpty(residual.getElements()));
340340

341341
verify(output, times(numElements)).add(any());
342-
assertThat(TestUnboundedSource.readerCreatedCount, equalTo(1));
343-
assertThat(TestUnboundedSource.readerClosedCount, equalTo(1));
342+
assertThat(TestUnboundedSource.READER_CREATED_COUNT.get(), equalTo(1));
343+
assertThat(TestUnboundedSource.READER_CLOSED_COUNT.get(), equalTo(1));
344344
}
345345

346346
@Test
@@ -382,7 +382,7 @@ public void evaluatorClosesReaderAndResumesFromCheckpoint() throws Exception {
382382
secondEvaluator.processElement(Iterables.getOnlyElement(residual.getElements()));
383383
secondEvaluator.finishBundle();
384384

385-
assertThat(TestUnboundedSource.readerClosedCount, equalTo(2));
385+
assertThat(TestUnboundedSource.READER_CLOSED_COUNT.get(), equalTo(2));
386386
assertThat(
387387
Iterables.getOnlyElement(residual.getElements()).getValue().getCheckpoint().isFinalized(),
388388
is(true));
@@ -421,12 +421,12 @@ public void evaluatorThrowsInCloseRethrows() throws Exception {
421421

422422
@Test // before this was throwing a NPE
423423
public void emptySource() throws Exception {
424-
TestUnboundedSource.readerClosedCount = 0;
424+
TestUnboundedSource.READER_CLOSED_COUNT.set(0);
425425
final TestUnboundedSource<String> source = new TestUnboundedSource<>(StringUtf8Coder.of());
426426
source.advanceWatermarkToInfinity = true;
427427
processElement(source);
428-
assertEquals(1, TestUnboundedSource.readerClosedCount);
429-
TestUnboundedSource.readerClosedCount = 0; // reset
428+
assertEquals(1, TestUnboundedSource.READER_CLOSED_COUNT.get());
429+
TestUnboundedSource.READER_CLOSED_COUNT.set(0); // reset
430430
}
431431

432432
@Test(expected = IOException.class)
@@ -472,7 +472,7 @@ private void processElement(final TestUnboundedSource<String> source) throws Exc
472472
final WindowedValue<UnboundedSourceShard<String, TestCheckpointMark>> value =
473473
WindowedValues.of(
474474
shard, BoundedWindow.TIMESTAMP_MAX_VALUE, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING);
475-
TestUnboundedSource.readerClosedCount = 0;
475+
TestUnboundedSource.READER_CLOSED_COUNT.set(0);
476476
evaluator.processElement(value);
477477
}
478478

@@ -492,11 +492,15 @@ public Instant apply(Long input) {
492492
}
493493

494494
private static class TestUnboundedSource<T> extends UnboundedSource<T, TestCheckpointMark> {
495-
private static int getWatermarkCalls = 0;
496-
497-
static int readerCreatedCount;
498-
static int readerClosedCount;
499-
static int readerAdvancedCount;
495+
private static final java.util.concurrent.atomic.AtomicInteger getWatermarkCalls =
496+
new java.util.concurrent.atomic.AtomicInteger(0);
497+
498+
static final java.util.concurrent.atomic.AtomicInteger READER_CREATED_COUNT =
499+
new java.util.concurrent.atomic.AtomicInteger(0);
500+
static final java.util.concurrent.atomic.AtomicInteger READER_CLOSED_COUNT =
501+
new java.util.concurrent.atomic.AtomicInteger(0);
502+
static final java.util.concurrent.atomic.AtomicInteger READER_ADVANCED_COUNT =
503+
new java.util.concurrent.atomic.AtomicInteger(0);
500504
private final Coder<T> coder;
501505
private final List<T> elems;
502506
private boolean dedupes = false;
@@ -508,9 +512,9 @@ public TestUnboundedSource(Coder<T> coder, T... elems) {
508512
}
509513

510514
private TestUnboundedSource(Coder<T> coder, boolean throwOnClose, List<T> elems) {
511-
readerCreatedCount = 0;
512-
readerClosedCount = 0;
513-
readerAdvancedCount = 0;
515+
READER_CREATED_COUNT.set(0);
516+
READER_CLOSED_COUNT.set(0);
517+
READER_ADVANCED_COUNT.set(0);
514518
this.coder = coder;
515519
this.elems = elems;
516520
this.throwOnClose = throwOnClose;
@@ -528,7 +532,7 @@ public UnboundedSource.UnboundedReader<T> createReader(
528532
checkState(
529533
checkpointMark == null || checkpointMark.decoded,
530534
"Cannot resume from a checkpoint that has not been decoded");
531-
readerCreatedCount++;
535+
READER_CREATED_COUNT.incrementAndGet();
532536
return new TestUnboundedReader(elems, checkpointMark == null ? -1 : checkpointMark.index);
533537
}
534538

@@ -568,7 +572,7 @@ public boolean start() throws IOException {
568572

569573
@Override
570574
public boolean advance() throws IOException {
571-
readerAdvancedCount++;
575+
READER_ADVANCED_COUNT.incrementAndGet();
572576
if (index + 1 < elems.size()) {
573577
index++;
574578
return true;
@@ -578,11 +582,11 @@ public boolean advance() throws IOException {
578582

579583
@Override
580584
public Instant getWatermark() {
581-
getWatermarkCalls++;
585+
getWatermarkCalls.incrementAndGet();
582586
if (index + 1 == elems.size() && TestUnboundedSource.this.advanceWatermarkToInfinity) {
583587
return BoundedWindow.TIMESTAMP_MAX_VALUE;
584588
} else {
585-
return new Instant(index + getWatermarkCalls);
589+
return new Instant(index + getWatermarkCalls.get());
586590
}
587591
}
588592

@@ -618,7 +622,7 @@ public byte[] getCurrentRecordId() {
618622
@Override
619623
public void close() throws IOException {
620624
try {
621-
readerClosedCount++;
625+
READER_CLOSED_COUNT.incrementAndGet();
622626
// Enforce the AutoCloseable contract. Close is not idempotent.
623627
assertThat(closed, is(false));
624628
if (throwOnClose) {

runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4796,17 +4796,18 @@ public void run() {
47964796

47974797
private static class FakeSlowDoFn extends DoFn<String, String> {
47984798

4799-
private static FakeClock clock; // A static variable keeps this DoFn serializable.
4799+
private static final AtomicReference<FakeClock> clock =
4800+
new AtomicReference<>(); // A static variable keeps this DoFn serializable.
48004801
private final Duration sleep;
48014802

48024803
FakeSlowDoFn(FakeClock clock, Duration sleep) {
4803-
FakeSlowDoFn.clock = clock;
4804+
FakeSlowDoFn.clock.set(clock);
48044805
this.sleep = sleep;
48054806
}
48064807

48074808
@ProcessElement
48084809
public void processElement(ProcessContext c) throws Exception {
4809-
clock.sleep(sleep);
4810+
clock.get().sleep(sleep);
48104811
c.output(c.element());
48114812
}
48124813
}

runners/spark/3/src/test/java/org/apache/beam/runners/spark/structuredstreaming/aggregators/metrics/sink/InMemoryMetrics.java

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,40 +22,43 @@
2222
import com.codahale.metrics.MetricRegistry;
2323
import java.util.Collection;
2424
import java.util.Properties;
25+
import java.util.concurrent.atomic.AtomicReference;
2526
import org.apache.beam.runners.spark.structuredstreaming.metrics.WithMetricsSupport;
2627
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
2728
import org.apache.spark.metrics.sink.Sink;
2829

2930
/** An in-memory {@link Sink} implementation for tests. */
3031
public class InMemoryMetrics implements Sink {
3132

32-
private static WithMetricsSupport extendedMetricsRegistry;
33-
private static MetricRegistry internalMetricRegistry;
33+
private static final AtomicReference<WithMetricsSupport> extendedMetricsRegistry =
34+
new AtomicReference<>();
35+
private static final AtomicReference<MetricRegistry> internalMetricRegistry =
36+
new AtomicReference<>();
3437

3538
// Constructor for Spark 3.1
3639
@SuppressWarnings("UnusedParameters")
3740
public InMemoryMetrics(
3841
final Properties properties,
3942
final MetricRegistry metricRegistry,
4043
final org.apache.spark.SecurityManager securityMgr) {
41-
extendedMetricsRegistry = WithMetricsSupport.forRegistry(metricRegistry);
42-
internalMetricRegistry = metricRegistry;
44+
extendedMetricsRegistry.set(WithMetricsSupport.forRegistry(metricRegistry));
45+
internalMetricRegistry.set(metricRegistry);
4346
}
4447

4548
// Constructor for Spark >= 3.2
4649
@SuppressWarnings("UnusedParameters")
4750
public InMemoryMetrics(final Properties properties, final MetricRegistry metricRegistry) {
48-
extendedMetricsRegistry = WithMetricsSupport.forRegistry(metricRegistry);
49-
internalMetricRegistry = metricRegistry;
51+
extendedMetricsRegistry.set(WithMetricsSupport.forRegistry(metricRegistry));
52+
internalMetricRegistry.set(metricRegistry);
5053
}
5154

5255
@SuppressWarnings({"TypeParameterUnusedInFormals", "rawtypes"}) // because of getGauges
5356
public static <T> T valueOf(final String name) {
5457
// this might fail in case we have multiple aggregators with the same suffix after
5558
// the last dot, but it should be good enough for tests.
56-
if (extendedMetricsRegistry != null) {
57-
Collection<Gauge> matches =
58-
extendedMetricsRegistry.getGauges((n, m) -> n.endsWith(name)).values();
59+
WithMetricsSupport extended = extendedMetricsRegistry.get();
60+
if (extended != null) {
61+
Collection<Gauge> matches = extended.getGauges((n, m) -> n.endsWith(name)).values();
5962
return matches.isEmpty() ? null : (T) Iterables.getOnlyElement(matches).getValue();
6063
} else {
6164
return null;
@@ -64,8 +67,9 @@ public static <T> T valueOf(final String name) {
6467

6568
@SuppressWarnings("WeakerAccess")
6669
public static void clearAll() {
67-
if (internalMetricRegistry != null) {
68-
internalMetricRegistry.removeMatching(MetricFilter.ALL);
70+
MetricRegistry internal = internalMetricRegistry.get();
71+
if (internal != null) {
72+
internal.removeMatching(MetricFilter.ALL);
6973
}
7074
}
7175

runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/InMemoryMetrics.java

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,40 +22,43 @@
2222
import com.codahale.metrics.MetricRegistry;
2323
import java.util.Collection;
2424
import java.util.Properties;
25+
import java.util.concurrent.atomic.AtomicReference;
2526
import org.apache.beam.runners.spark.metrics.WithMetricsSupport;
2627
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
2728
import org.apache.spark.metrics.sink.Sink;
2829

2930
/** An in-memory {@link Sink} implementation for tests. */
3031
public class InMemoryMetrics implements Sink {
3132

32-
private static WithMetricsSupport extendedMetricsRegistry;
33-
private static MetricRegistry internalMetricRegistry;
33+
private static final AtomicReference<WithMetricsSupport> extendedMetricsRegistry =
34+
new AtomicReference<>();
35+
private static final AtomicReference<MetricRegistry> internalMetricRegistry =
36+
new AtomicReference<>();
3437

3538
// Constructor for Spark 3.1
3639
@SuppressWarnings("UnusedParameters")
3740
public InMemoryMetrics(
3841
final Properties properties,
3942
final MetricRegistry metricRegistry,
4043
final org.apache.spark.SecurityManager securityMgr) {
41-
extendedMetricsRegistry = WithMetricsSupport.forRegistry(metricRegistry);
42-
internalMetricRegistry = metricRegistry;
44+
extendedMetricsRegistry.set(WithMetricsSupport.forRegistry(metricRegistry));
45+
internalMetricRegistry.set(metricRegistry);
4346
}
4447

4548
// Constructor for Spark >= 3.2
4649
@SuppressWarnings("UnusedParameters")
4750
public InMemoryMetrics(final Properties properties, final MetricRegistry metricRegistry) {
48-
extendedMetricsRegistry = WithMetricsSupport.forRegistry(metricRegistry);
49-
internalMetricRegistry = metricRegistry;
51+
extendedMetricsRegistry.set(WithMetricsSupport.forRegistry(metricRegistry));
52+
internalMetricRegistry.set(metricRegistry);
5053
}
5154

5255
@SuppressWarnings({"TypeParameterUnusedInFormals", "rawtypes"})
5356
public static <T> T valueOf(final String name) {
5457
// this might fail in case we have multiple aggregators with the same suffix after
5558
// the last dot, but it should be good enough for tests.
56-
if (extendedMetricsRegistry != null) {
57-
Collection<Gauge> matches =
58-
extendedMetricsRegistry.getGauges((n, m) -> n.endsWith(name)).values();
59+
WithMetricsSupport extended = extendedMetricsRegistry.get();
60+
if (extended != null) {
61+
Collection<Gauge> matches = extended.getGauges((n, m) -> n.endsWith(name)).values();
5962
return matches.isEmpty() ? null : (T) Iterables.getOnlyElement(matches).getValue();
6063
} else {
6164
return null;
@@ -64,8 +67,9 @@ public static <T> T valueOf(final String name) {
6467

6568
@SuppressWarnings("WeakerAccess")
6669
public static void clearAll() {
67-
if (internalMetricRegistry != null) {
68-
internalMetricRegistry.removeMatching(MetricFilter.ALL);
70+
MetricRegistry internal = internalMetricRegistry.get();
71+
if (internal != null) {
72+
internal.removeMatching(MetricFilter.ALL);
6973
}
7074
}
7175

runners/spark/src/test/java/org/apache/beam/runners/spark/coders/SparkRunnerKryoRegistratorTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@
3838
* tests requiring a different context have to be forked using separate test classes.
3939
*/
4040
@SuppressWarnings({
41-
"rawtypes" // TODO(https://github.com/apache/beam/issues/20447)
41+
"rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
42+
"StaticAssignmentInConstructor" // used for testing purposes
4243
})
4344
@RunWith(Enclosed.class)
4445
public class SparkRunnerKryoRegistratorTest {

sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HiveDatabaseTestHelper.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,13 @@
2121
import java.sql.DriverManager;
2222
import java.sql.SQLException;
2323
import java.sql.Statement;
24+
import java.util.concurrent.atomic.AtomicReference;
2425
import org.apache.beam.sdk.io.common.DatabaseTestHelper;
2526

2627
/** Helper for creating connection and test tables on hive database via JDBC driver. */
2728
class HiveDatabaseTestHelper {
28-
private static Connection con;
29-
private static Statement stmt;
29+
private static final AtomicReference<Connection> con = new AtomicReference<>();
30+
private static final AtomicReference<Statement> stmt = new AtomicReference<>();
3031

3132
HiveDatabaseTestHelper(
3233
String hiveHost,
@@ -36,24 +37,24 @@ class HiveDatabaseTestHelper {
3637
String hivePassword)
3738
throws Exception {
3839
String hiveUrl = String.format("jdbc:hive2://%s:%s/%s", hiveHost, hivePort, hiveDatabase);
39-
con = DriverManager.getConnection(hiveUrl, hiveUsername, hivePassword);
40-
stmt = con.createStatement();
40+
con.set(DriverManager.getConnection(hiveUrl, hiveUsername, hivePassword));
41+
stmt.set(con.get().createStatement());
4142
}
4243

4344
/** Create hive table. */
4445
String createHiveTable(String testIdentifier) throws Exception {
4546
String tableName = DatabaseTestHelper.getTestTableName(testIdentifier);
46-
stmt.execute(" CREATE TABLE IF NOT EXISTS " + tableName + " (id STRING)");
47+
stmt.get().execute(" CREATE TABLE IF NOT EXISTS " + tableName + " (id STRING)");
4748
return tableName;
4849
}
4950

5051
/** Delete hive table. */
5152
void dropHiveTable(String tableName) throws SQLException {
52-
stmt.execute(" DROP TABLE " + tableName);
53+
stmt.get().execute(" DROP TABLE " + tableName);
5354
}
5455

5556
void closeConnection() throws Exception {
56-
stmt.close();
57-
con.close();
57+
stmt.get().close();
58+
con.get().close();
5859
}
5960
}

0 commit comments

Comments
 (0)