Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ public class LocalFlagsProvider extends BaseFlagsProvider<LocalFlagsConfig> impl
private final AtomicBoolean ready;
private final AtomicBoolean closed;

// Guards start/stop of pollingExecutor so concurrent startPollingForDefinitions
// calls don't both create executors (leaking the first).
private final Object pollingLock = new Object();
private ScheduledExecutorService pollingExecutor;

/**
Expand Down Expand Up @@ -75,39 +78,54 @@ public void startPollingForDefinitions() {

// Start background polling if enabled
if (config.isEnablePolling()) {
pollingExecutor = Executors.newSingleThreadScheduledExecutor(r -> {
Thread t = new Thread(r, "mixpanel-flags-poller");
t.setDaemon(true);
return t;
});
synchronized (pollingLock) {
if (pollingExecutor != null) {
// Idempotent: a previous call already scheduled the poller.
// Without this guard, two concurrent start calls would each
// allocate a new ScheduledExecutorService and the earlier
// one would leak.
return;
}

pollingExecutor.scheduleAtFixedRate(
this::fetchDefinitions,
config.getPollingIntervalSeconds(),
config.getPollingIntervalSeconds(),
TimeUnit.SECONDS
);
pollingExecutor = Executors.newSingleThreadScheduledExecutor(r -> {
Thread t = new Thread(r, "mixpanel-flags-poller");
t.setDaemon(true);
return t;
});

pollingExecutor.scheduleAtFixedRate(
this::fetchDefinitions,
config.getPollingIntervalSeconds(),
config.getPollingIntervalSeconds(),
TimeUnit.SECONDS
);

logger.log(Level.INFO, "Started polling for flag definitions every " + config.getPollingIntervalSeconds() + " seconds");
logger.log(Level.INFO, "Started polling for flag definitions every " + config.getPollingIntervalSeconds() + " seconds");
}
}
}

/**
* Stops polling for flag definitions and releases resources.
*/
public void stopPollingForDefinitions() {
if (pollingExecutor != null) {
pollingExecutor.shutdown();
try {
if (!pollingExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
pollingExecutor.shutdownNow();
}
} catch (InterruptedException e) {
pollingExecutor.shutdownNow();
Thread.currentThread().interrupt();
}
ScheduledExecutorService toShutdown;
synchronized (pollingLock) {
toShutdown = pollingExecutor;
pollingExecutor = null;
}
if (toShutdown == null) {
return;
}
toShutdown.shutdown();
try {
if (!toShutdown.awaitTermination(5, TimeUnit.SECONDS)) {
toShutdown.shutdownNow();
}
} catch (InterruptedException e) {
toShutdown.shutdownNow();
Thread.currentThread().interrupt();
}
}

// #endregion
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -888,6 +888,76 @@ public void testDoNotTrackExposureWhenDistinctIdIsMissing() {
assertEquals(0, eventSender.getEvents().size());
}

Comment thread
tylerjroach marked this conversation as resolved.
// #endregion
// #region Polling Lifecycle Thread Safety (SDK-81)

// Counts JVM threads named "mixpanel-flags-poller". Each call to
// startPollingForDefinitions creates one ScheduledExecutorService whose
// worker thread carries that name; before this fix two concurrent starts
// would create two of them and leak the first.
private static int countPollerThreads() {
int count = 0;
for (Thread t : Thread.getAllStackTraces().keySet()) {
if (t.isAlive() && "mixpanel-flags-poller".equals(t.getName())) {
count++;
}
}
return count;
}

@Test
public void testConcurrentStartPollingDoesNotLeakExecutors() throws Exception {
List<Variant> variants = Arrays.asList(new Variant("variant-a", "value-a", false, 1.0f));
List<Rollout> rollouts = Arrays.asList(new Rollout(1.0f));
String response = buildFlagsResponse(flagKey, distinctIdContextKey, variants, rollouts, null);

// Use a polling-enabled config so startPollingForDefinitions actually schedules.
LocalFlagsConfig pollingConfig = LocalFlagsConfig.builder()
.projectToken(TEST_TOKEN)
.enablePolling(true)
.pollingIntervalSeconds(60)
.build();
TestableLocalFlagsProvider pollingProvider = new TestableLocalFlagsProvider(pollingConfig, SDK_VERSION, eventSender);
pollingProvider.setMockResponse("/flags/definitions", response);
provider = pollingProvider;
int baselinePollers = countPollerThreads();

int contenders = 8;
java.util.concurrent.CountDownLatch ready = new java.util.concurrent.CountDownLatch(contenders);
java.util.concurrent.CountDownLatch start = new java.util.concurrent.CountDownLatch(1);
java.util.concurrent.CountDownLatch done = new java.util.concurrent.CountDownLatch(contenders);
java.util.concurrent.ExecutorService pool = java.util.concurrent.Executors.newFixedThreadPool(contenders);
try {
for (int i = 0; i < contenders; i++) {
pool.submit(() -> {
ready.countDown();
try {
start.await();
provider.startPollingForDefinitions();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
done.countDown();
}
});
}
ready.await();
start.countDown();
assertTrue("contenders should finish", done.await(10, java.util.concurrent.TimeUnit.SECONDS));
} finally {
pool.shutdown();
}

// Exactly one poller thread should be running regardless of how many
// callers raced. Without the lock, we'd see N.
assertEquals(baselinePollers + 1, countPollerThreads());

provider.stopPollingForDefinitions();
// And shutdown should clean it up.
Thread.sleep(100);
assertEquals(baselinePollers, countPollerThreads());
}

// #endregion
// #region Readiness Tests

Expand Down
Loading