From aaa1c69f5cf3c911ccf03f9614cee339985cf31b Mon Sep 17 00:00:00 2001 From: Tyler Roach Date: Mon, 29 Jun 2026 13:31:20 -0400 Subject: [PATCH 1/2] fix(flags): guard polling start so concurrent callers don't leak executors startPollingForDefinitions had no lock around the pollingExecutor create-and-assign. Two concurrent callers would each allocate a fresh ScheduledExecutorService and the earlier one's worker thread + queue would leak (still alive, still scheduled to poll, no way to shut it down because the field had been overwritten). Guard the executor lifecycle with a private lock and bail early if a poller is already scheduled. Snapshot the executor under the lock in stop, then run the (potentially blocking) shutdown / awaitTermination outside the lock so a long-running shutdown can't block a concurrent start for 5s. Linear: SDK-85 Co-Authored-By: Claude Opus 4.7 --- .../provider/LocalFlagsProvider.java | 62 ++++++++++------ .../provider/LocalFlagsProviderTest.java | 70 +++++++++++++++++++ 2 files changed, 110 insertions(+), 22 deletions(-) diff --git a/src/main/java/com/mixpanel/mixpanelapi/featureflags/provider/LocalFlagsProvider.java b/src/main/java/com/mixpanel/mixpanelapi/featureflags/provider/LocalFlagsProvider.java index 50357b0..8acb0b9 100644 --- a/src/main/java/com/mixpanel/mixpanelapi/featureflags/provider/LocalFlagsProvider.java +++ b/src/main/java/com/mixpanel/mixpanelapi/featureflags/provider/LocalFlagsProvider.java @@ -39,6 +39,9 @@ public class LocalFlagsProvider extends BaseFlagsProvider impl private final AtomicBoolean ready; private final AtomicBoolean closed; + // Guards start/stop of pollingExecutor so concurrent startPollingForDefinitions + // calls don't both create executors (leaking the first). + private final Object pollingLock = new Object(); private ScheduledExecutorService pollingExecutor; /** @@ -75,20 +78,30 @@ public void startPollingForDefinitions() { // Start background polling if enabled if (config.isEnablePolling()) { - pollingExecutor = Executors.newSingleThreadScheduledExecutor(r -> { - Thread t = new Thread(r, "mixpanel-flags-poller"); - t.setDaemon(true); - return t; - }); + synchronized (pollingLock) { + if (pollingExecutor != null) { + // Idempotent: a previous call already scheduled the poller. + // Without this guard, two concurrent start calls would each + // allocate a new ScheduledExecutorService and the earlier + // one would leak. + return; + } - pollingExecutor.scheduleAtFixedRate( - this::fetchDefinitions, - config.getPollingIntervalSeconds(), - config.getPollingIntervalSeconds(), - TimeUnit.SECONDS - ); + pollingExecutor = Executors.newSingleThreadScheduledExecutor(r -> { + Thread t = new Thread(r, "mixpanel-flags-poller"); + t.setDaemon(true); + return t; + }); + + pollingExecutor.scheduleAtFixedRate( + this::fetchDefinitions, + config.getPollingIntervalSeconds(), + config.getPollingIntervalSeconds(), + TimeUnit.SECONDS + ); - logger.log(Level.INFO, "Started polling for flag definitions every " + config.getPollingIntervalSeconds() + " seconds"); + logger.log(Level.INFO, "Started polling for flag definitions every " + config.getPollingIntervalSeconds() + " seconds"); + } } } @@ -96,18 +109,23 @@ public void startPollingForDefinitions() { * Stops polling for flag definitions and releases resources. */ public void stopPollingForDefinitions() { - if (pollingExecutor != null) { - pollingExecutor.shutdown(); - try { - if (!pollingExecutor.awaitTermination(5, TimeUnit.SECONDS)) { - pollingExecutor.shutdownNow(); - } - } catch (InterruptedException e) { - pollingExecutor.shutdownNow(); - Thread.currentThread().interrupt(); - } + ScheduledExecutorService toShutdown; + synchronized (pollingLock) { + toShutdown = pollingExecutor; pollingExecutor = null; } + if (toShutdown == null) { + return; + } + toShutdown.shutdown(); + try { + if (!toShutdown.awaitTermination(5, TimeUnit.SECONDS)) { + toShutdown.shutdownNow(); + } + } catch (InterruptedException e) { + toShutdown.shutdownNow(); + Thread.currentThread().interrupt(); + } } // #endregion diff --git a/src/test/java/com/mixpanel/mixpanelapi/featureflags/provider/LocalFlagsProviderTest.java b/src/test/java/com/mixpanel/mixpanelapi/featureflags/provider/LocalFlagsProviderTest.java index 44bae7d..50ff348 100644 --- a/src/test/java/com/mixpanel/mixpanelapi/featureflags/provider/LocalFlagsProviderTest.java +++ b/src/test/java/com/mixpanel/mixpanelapi/featureflags/provider/LocalFlagsProviderTest.java @@ -888,6 +888,76 @@ public void testDoNotTrackExposureWhenDistinctIdIsMissing() { assertEquals(0, eventSender.getEvents().size()); } + // #endregion + // #region Polling Lifecycle Thread Safety (SDK-85) + + // Counts JVM threads named "mixpanel-flags-poller". Each call to + // startPollingForDefinitions creates one ScheduledExecutorService whose + // worker thread carries that name; before this fix two concurrent starts + // would create two of them and leak the first. + private static int countPollerThreads() { + int count = 0; + for (Thread t : Thread.getAllStackTraces().keySet()) { + if (t.isAlive() && "mixpanel-flags-poller".equals(t.getName())) { + count++; + } + } + return count; + } + + @Test + public void testConcurrentStartPollingDoesNotLeakExecutors() throws Exception { + List variants = Arrays.asList(new Variant("variant-a", "value-a", false, 1.0f)); + List rollouts = Arrays.asList(new Rollout(1.0f)); + String response = buildFlagsResponse(flagKey, distinctIdContextKey, variants, rollouts, null); + + // Use a polling-enabled config so startPollingForDefinitions actually schedules. + LocalFlagsConfig pollingConfig = LocalFlagsConfig.builder() + .projectToken(TEST_TOKEN) + .enablePolling(true) + .pollingIntervalSeconds(60) + .build(); + TestableLocalFlagsProvider pollingProvider = new TestableLocalFlagsProvider(pollingConfig, SDK_VERSION, eventSender); + pollingProvider.setMockResponse("/flags/definitions", response); + provider = pollingProvider; + int baselinePollers = countPollerThreads(); + + int contenders = 8; + java.util.concurrent.CountDownLatch ready = new java.util.concurrent.CountDownLatch(contenders); + java.util.concurrent.CountDownLatch start = new java.util.concurrent.CountDownLatch(1); + java.util.concurrent.CountDownLatch done = new java.util.concurrent.CountDownLatch(contenders); + java.util.concurrent.ExecutorService pool = java.util.concurrent.Executors.newFixedThreadPool(contenders); + try { + for (int i = 0; i < contenders; i++) { + pool.submit(() -> { + ready.countDown(); + try { + start.await(); + provider.startPollingForDefinitions(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + done.countDown(); + } + }); + } + ready.await(); + start.countDown(); + assertTrue("contenders should finish", done.await(10, java.util.concurrent.TimeUnit.SECONDS)); + } finally { + pool.shutdown(); + } + + // Exactly one poller thread should be running regardless of how many + // callers raced. Without the lock, we'd see N. + assertEquals(baselinePollers + 1, countPollerThreads()); + + provider.stopPollingForDefinitions(); + // And shutdown should clean it up. + Thread.sleep(100); + assertEquals(baselinePollers, countPollerThreads()); + } + // #endregion // #region Readiness Tests From 6116ece3ef4f9a00f73da4c98969566fb8b45e28 Mon Sep 17 00:00:00 2001 From: Tyler Roach Date: Thu, 2 Jul 2026 12:36:10 -0400 Subject: [PATCH 2/2] test(flags): fix region comment issue key SDK-85 -> SDK-81 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Copy-paste typo — the PR, commit, and Linear link all reference SDK-81; only the test region comment said SDK-85. --- .../featureflags/provider/LocalFlagsProviderTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/java/com/mixpanel/mixpanelapi/featureflags/provider/LocalFlagsProviderTest.java b/src/test/java/com/mixpanel/mixpanelapi/featureflags/provider/LocalFlagsProviderTest.java index 50ff348..ae66cd5 100644 --- a/src/test/java/com/mixpanel/mixpanelapi/featureflags/provider/LocalFlagsProviderTest.java +++ b/src/test/java/com/mixpanel/mixpanelapi/featureflags/provider/LocalFlagsProviderTest.java @@ -889,7 +889,7 @@ public void testDoNotTrackExposureWhenDistinctIdIsMissing() { } // #endregion - // #region Polling Lifecycle Thread Safety (SDK-85) + // #region Polling Lifecycle Thread Safety (SDK-81) // Counts JVM threads named "mixpanel-flags-poller". Each call to // startPollingForDefinitions creates one ScheduledExecutorService whose