Skip to content

Commit a3ae7cc

Browse files
committed
test: add new E2E test to check pooling recovery
1 parent 5c181a2 commit a3ae7cc

5 files changed

Lines changed: 130 additions & 29 deletions

File tree

e2e/proxy/proxy_test.go

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ type ProxyTestConfig struct {
2626
TLSKeyFile string
2727
AccessTokenCheckInterval string
2828
StaticSecretsRefreshInterval string
29+
PollingFallbackInterval string
2930
UseSSE bool
3031
ClientID string
3132
ClientSecret string
@@ -45,6 +46,7 @@ func DefaultProxyTestConfig() ProxyTestConfig {
4546
func SSEProxyTestConfig() ProxyTestConfig {
4647
config := DefaultProxyTestConfig()
4748
config.UseSSE = true
49+
config.PollingFallbackInterval = "10s"
4850
return config
4951
}
5052

@@ -69,6 +71,9 @@ func startProxy(t *testing.T, ctx context.Context, infisicalURL string, config P
6971
args = append(args, "--event-subscription-enabled")
7072
args = append(args, "--client-id", config.ClientID)
7173
args = append(args, "--client-secret", config.ClientSecret)
74+
if config.PollingFallbackInterval != "" {
75+
args = append(args, "--polling-fallback-interval", config.PollingFallbackInterval)
76+
}
7277
}
7378

7479
proxyCmd := helpers.Command{
@@ -884,3 +889,91 @@ func TestProxy_SSEMultipleProjects(t *testing.T) {
884889
}
885890
require.True(t, foundOriginal, "Original secret not found in project 2")
886891
}
892+
func TestProxy_SSEPollingFallbackRecovery(t *testing.T) {
893+
ctx, cancel := context.WithCancel(context.Background())
894+
t.Cleanup(cancel)
895+
896+
infisical, helper, proxyCmd, _, _, identity := setupProxyTest(t, ctx, SSEProxyTestConfig())
897+
898+
result := helpers.WaitForStderr(t, helpers.WaitForStderrOptions{
899+
EnsureCmdRunning: proxyCmd,
900+
ExpectedString: "SSE manager initialized",
901+
Timeout: 30 * time.Second,
902+
})
903+
require.Equal(t, helpers.WaitSuccess, result)
904+
905+
// create secret via API (not proxy)
906+
initialSecret := helper.GenerateSecret(proxyHelpers.GenerateSecretOptions{
907+
Prefix: "SSE_POLLING_FALLBACK_RECOVERY_",
908+
})
909+
helper.CreateSecretWithApi(ctx, initialSecret)
910+
911+
// fetch via proxy -> cache miss, now cached
912+
slog.Info("Fetching secret via proxy (expecting cache miss)")
913+
resp1 := helper.GetSecretsWithProxy(ctx)
914+
require.Equal(t, http.StatusOK, resp1.StatusCode())
915+
require.NotNil(t, resp1.JSON200)
916+
917+
var foundInitial bool
918+
for _, s := range resp1.JSON200.Secrets {
919+
if s.SecretKey == initialSecret.SecretKey {
920+
assert.Equal(t, initialSecret.SecretValue, s.SecretValue)
921+
foundInitial = true
922+
break
923+
}
924+
}
925+
require.True(t, foundInitial, "Initial secret not found in response")
926+
927+
// wait for SSE connection (demand-driven, triggered by first fetch)
928+
result = helpers.WaitForStderr(t, helpers.WaitForStderrOptions{
929+
EnsureCmdRunning: proxyCmd,
930+
ExpectedString: "SSE connection established",
931+
Timeout: 30 * time.Second,
932+
Interval: 200 * time.Millisecond,
933+
})
934+
require.Equal(t, helpers.WaitSuccess, result)
935+
936+
// change identity role to no-access to break SSE connection with auth errors
937+
slog.Info("Changing identity role to no-access to break SSE connection")
938+
infisical.UpdateMachineIdentityRole(t, ctx, identity.Id, "no-access")
939+
940+
// update secret via API so SSE event is triggered
941+
// do it twice so we don't have race conditions to update the role
942+
updatedSecret := helper.GenerateSecret(proxyHelpers.GenerateSecretOptions{
943+
PresetName: initialSecret.SecretKey,
944+
})
945+
helper.UpdateSecretWithApi(ctx, updatedSecret)
946+
947+
updatedSecret2 := helper.GenerateSecret(proxyHelpers.GenerateSecretOptions{
948+
PresetName: updatedSecret.SecretKey,
949+
})
950+
helper.UpdateSecretWithApi(ctx, updatedSecret2)
951+
952+
// wait for SSE retries to exhaust and polling fallback to start
953+
slog.Info("Waiting for SSE retries to exhaust and polling fallback to start")
954+
result = helpers.WaitForStderr(t, helpers.WaitForStderrOptions{
955+
EnsureCmdRunning: proxyCmd,
956+
ExpectedString: "SSE connection retries exhausted, transitioning to polling fallback",
957+
Timeout: 20 * time.Second,
958+
Interval: 200 * time.Millisecond,
959+
})
960+
require.Equal(t, helpers.WaitSuccess, result, "Should transition to polling fallback after retries exhausted")
961+
962+
result = helpers.WaitForStderr(t, helpers.WaitForStderrOptions{
963+
EnsureCmdRunning: proxyCmd,
964+
ExpectedString: "Starting polling fallback for project",
965+
Timeout: 10 * time.Second,
966+
Interval: 200 * time.Millisecond,
967+
})
968+
require.Equal(t, helpers.WaitSuccess, result, "Polling fallback loop should start")
969+
970+
// restore identity role to member to allow SSE reconnection
971+
slog.Info("Restoring identity role to member")
972+
infisical.UpdateMachineIdentityRole(t, ctx, identity.Id, "member")
973+
974+
// verify proxy is still functional
975+
slog.Info("Verifying proxy still works after SSE recovery from polling")
976+
respFinal := helper.GetSecretsWithProxy(ctx)
977+
require.Equal(t, http.StatusOK, respFinal.StatusCode())
978+
require.True(t, proxyCmd.IsRunning(), "Proxy should still be running")
979+
}

infisical-cli

-79.5 MB
Binary file not shown.

packages/cmd/proxy.go

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,12 +57,12 @@ var proxyDebugCmd = &cobra.Command{
5757
Hidden: true,
5858
}
5959

60-
func initializeSSEManager(ctx context.Context, clientId, clientSecret string, cache *proxy.Cache, domainURL *url.URL, streamingClient *http.Client, httpClient *http.Client) (*proxy.SSEManager, error) {
60+
func initializeSSEManager(ctx context.Context, clientId, clientSecret string, cache *proxy.Cache, domainURL *url.URL, streamingClient *http.Client, httpClient *http.Client, pollingFallbackInterval time.Duration) (*proxy.SSEManager, error) {
6161
sseAuthState, err := proxy.NewSSEAuthState(clientId, clientSecret, domainURL, httpClient)
6262
if err != nil {
6363
return nil, err
6464
}
65-
return proxy.NewSSEManager(context.Background(), cache, domainURL, streamingClient, httpClient, sseAuthState), nil
65+
return proxy.NewSSEManager(context.Background(), cache, domainURL, streamingClient, httpClient, sseAuthState, pollingFallbackInterval), nil
6666
}
6767

6868
func startProxyServer(cmd *cobra.Command, args []string) {
@@ -132,6 +132,16 @@ func startProxyServer(cmd *cobra.Command, args []string) {
132132
util.PrintErrorMessageAndExit(fmt.Sprintf("Invalid static-secrets-refresh-interval format '%s'. Use formats like 30m, 1h, 1d", staticSecretsRefreshIntervalStr))
133133
}
134134

135+
pollingFallbackIntervalStr, err := cmd.Flags().GetString("polling-fallback-interval")
136+
if err != nil {
137+
util.HandleError(err, "Unable to parse polling-fallback-interval flag")
138+
}
139+
140+
pollingFallbackInterval, err := util.ParseTimeDurationString(pollingFallbackIntervalStr, true)
141+
if err != nil {
142+
util.PrintErrorMessageAndExit(fmt.Sprintf("Invalid polling-fallback-interval format '%s'. Use formats like 1m, 5m, 10m", pollingFallbackIntervalStr))
143+
}
144+
135145
useSSE, err := cmd.Flags().GetBool("event-subscription-enabled")
136146
if err != nil {
137147
util.HandleError(err, "Unable to parse event-subscription-enabled flag")
@@ -204,7 +214,7 @@ func startProxyServer(cmd *cobra.Command, args []string) {
204214

205215
var sseManager *proxy.SSEManager
206216
if useSSE {
207-
sseManager, err = initializeSSEManager(context.Background(), clientId, clientSecret, cache, domainURL, streamingClient, httpClient)
217+
sseManager, err = initializeSSEManager(context.Background(), clientId, clientSecret, cache, domainURL, streamingClient, httpClient, pollingFallbackInterval)
208218
if err != nil {
209219
util.HandleError(err, "Failed to initialize SSE manager")
210220
}
@@ -626,6 +636,7 @@ func init() {
626636
proxyStartCmd.Flags().Bool("event-subscription-enabled", false, "Enable Event Subscription mode for real-time cache invalidation. When enabled, the static secrets refresh loop is disabled. If event subscriptions are unavailable, the proxy will fall back to a polling mechanism. `--client id` and `--client-secret` are required when this is set to true ")
627637
proxyStartCmd.Flags().String("client-id", "", "Machine identity universal auth client ID. This is required when using event subscriptions.")
628638
proxyStartCmd.Flags().String("client-secret", "", "Machine identity universal auth client secret. This is required when using event subscriptions.")
639+
proxyStartCmd.Flags().String("polling-fallback-interval", "10m", "How often to poll for secret changes when SSE is unavailable (e.g., 1m, 5m). Defaults to 5m. Only used when --event-subscription-enabled is set.")
629640

630641
proxyDebugCmd.Flags().String("listen-address", "localhost:8081", "The address where the proxy server is listening. Defaults to localhost:8081")
631642

packages/proxy/resync.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -362,6 +362,7 @@ func runProjectSecretsRefresh(cache *Cache, domainURL *url.URL, httpClient *http
362362
}
363363

364364
func startProjectPollingLoop(ctx context.Context, cache *Cache, domainURL *url.URL, httpClient *http.Client, projectId, envSlug string, interval time.Duration, onPollComplete func()) {
365+
log.Info().Str("projectId", projectId).Str("envSlug", envSlug).Msg("Starting project polling loop")
365366
ticker := time.NewTicker(interval)
366367
defer ticker.Stop()
367368

packages/proxy/sse.go

Lines changed: 22 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -188,28 +188,29 @@ type pollingState struct {
188188
// When SSE connections fail repeatedly for a project, the manager transitions
189189
// that project to a polling fallback and tracks it in pollingProjects.
190190
type SSEManager struct {
191-
mu sync.Mutex
192-
connections map[string]*SSEConnection // active SSE connections
193-
pollingProjects map[string]*pollingState // projects in polling fallback
194-
cache *Cache
195-
domainURL *url.URL
196-
httpClient *http.Client // streaming client (no timeout) for SSE connections
197-
resyncHttpClient *http.Client // regular client (with timeout) for cache resync requests
198-
authState *SSEAuthState
199-
ctx context.Context
191+
mu sync.Mutex
192+
connections map[string]*SSEConnection // active SSE connections
193+
pollingProjects map[string]*pollingState // projects in polling fallback
194+
cache *Cache
195+
domainURL *url.URL
196+
httpClient *http.Client // streaming client (no timeout) for SSE connections
197+
resyncHttpClient *http.Client // regular client (with timeout) for cache resync requests
198+
authState *SSEAuthState
199+
pollingFallbackInterval time.Duration
200+
ctx context.Context
200201
}
201202

202-
func NewSSEManager(ctx context.Context, cache *Cache, domainURL *url.URL, httpClient *http.Client, resyncHttpClient *http.Client, authState *SSEAuthState) *SSEManager {
203-
203+
func NewSSEManager(ctx context.Context, cache *Cache, domainURL *url.URL, httpClient *http.Client, resyncHttpClient *http.Client, authState *SSEAuthState, pollingFallbackInterval time.Duration) *SSEManager {
204204
return &SSEManager{
205-
connections: make(map[string]*SSEConnection),
206-
pollingProjects: make(map[string]*pollingState),
207-
cache: cache,
208-
domainURL: domainURL,
209-
httpClient: httpClient,
210-
resyncHttpClient: resyncHttpClient,
211-
authState: authState,
212-
ctx: ctx,
205+
connections: make(map[string]*SSEConnection),
206+
pollingProjects: make(map[string]*pollingState),
207+
cache: cache,
208+
domainURL: domainURL,
209+
httpClient: httpClient,
210+
resyncHttpClient: resyncHttpClient,
211+
authState: authState,
212+
pollingFallbackInterval: pollingFallbackInterval,
213+
ctx: ctx,
213214
}
214215
}
215216

@@ -327,9 +328,6 @@ func (m *SSEManager) runConnection(conn *SSEConnection, ctx context.Context) {
327328
Int("maxRetries", maxRetries).
328329
Msg("SSE connection lost, resyncing cache before retrying")
329330

330-
// Resync project cache before reconnecting — we may have missed events during the outage
331-
runProjectSecretsRefresh(m.cache, m.domainURL, m.resyncHttpClient, conn.ProjectID, conn.EnvironmentSlug)
332-
333331
// Exponential backoff with jitter
334332
delay := baseDelay * time.Duration(math.Pow(2, float64(retries-1)))
335333

@@ -345,8 +343,6 @@ func (m *SSEManager) runConnection(conn *SSEConnection, ctx context.Context) {
345343
}
346344
}
347345

348-
const pollingFallbackInterval = 5 * time.Minute
349-
350346
// transitionToPolling moves a project from SSE mode to polling fallback.
351347
func (m *SSEManager) transitionToPolling(projectId, environmentSlug string) {
352348
m.mu.Lock()
@@ -373,7 +369,7 @@ func (m *SSEManager) transitionToPolling(projectId, environmentSlug string) {
373369
Msg("Starting polling fallback for project")
374370

375371
// Resync the project cache immediately — events might have been missed during the outage
376-
go runProjectSecretsRefresh(m.cache, m.domainURL, m.resyncHttpClient, projectId, environmentSlug)
372+
runProjectSecretsRefresh(m.cache, m.domainURL, m.resyncHttpClient, projectId, environmentSlug)
377373

378374
// On each polling tick (except the first), attempt SSE reconnection
379375
retrySSE := func() {
@@ -388,7 +384,7 @@ func (m *SSEManager) transitionToPolling(projectId, environmentSlug string) {
388384
go m.attemptSSEReconnection(projectId, environmentSlug)
389385
}
390386

391-
go startProjectPollingLoop(pollCtx, m.cache, m.domainURL, m.resyncHttpClient, projectId, environmentSlug, pollingFallbackInterval, retrySSE)
387+
go startProjectPollingLoop(pollCtx, m.cache, m.domainURL, m.resyncHttpClient, projectId, environmentSlug, m.pollingFallbackInterval, retrySSE)
392388
}
393389

394390
func (m *SSEManager) cancelPollingIfActive(projectId string) {

0 commit comments

Comments
 (0)