Skip to content

Commit 807aa40

Browse files
committed
test: remove E2E test for sse reconnection (flaky)
1 parent 5c181a2 commit 807aa40

5 files changed

Lines changed: 42 additions & 29 deletions

File tree

e2e/proxy/proxy_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ type ProxyTestConfig struct {
2626
TLSKeyFile string
2727
AccessTokenCheckInterval string
2828
StaticSecretsRefreshInterval string
29+
PollingFallbackInterval string
2930
UseSSE bool
3031
ClientID string
3132
ClientSecret string
@@ -45,6 +46,7 @@ func DefaultProxyTestConfig() ProxyTestConfig {
4546
func SSEProxyTestConfig() ProxyTestConfig {
4647
config := DefaultProxyTestConfig()
4748
config.UseSSE = true
49+
config.PollingFallbackInterval = "10s"
4850
return config
4951
}
5052

@@ -69,6 +71,9 @@ func startProxy(t *testing.T, ctx context.Context, infisicalURL string, config P
6971
args = append(args, "--event-subscription-enabled")
7072
args = append(args, "--client-id", config.ClientID)
7173
args = append(args, "--client-secret", config.ClientSecret)
74+
if config.PollingFallbackInterval != "" {
75+
args = append(args, "--polling-fallback-interval", config.PollingFallbackInterval)
76+
}
7277
}
7378

7479
proxyCmd := helpers.Command{

infisical-cli

-79.5 MB
Binary file not shown.

packages/cmd/proxy.go

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,12 +57,12 @@ var proxyDebugCmd = &cobra.Command{
5757
Hidden: true,
5858
}
5959

60-
func initializeSSEManager(ctx context.Context, clientId, clientSecret string, cache *proxy.Cache, domainURL *url.URL, streamingClient *http.Client, httpClient *http.Client) (*proxy.SSEManager, error) {
60+
func initializeSSEManager(ctx context.Context, clientId, clientSecret string, cache *proxy.Cache, domainURL *url.URL, streamingClient *http.Client, httpClient *http.Client, pollingFallbackInterval time.Duration) (*proxy.SSEManager, error) {
6161
sseAuthState, err := proxy.NewSSEAuthState(clientId, clientSecret, domainURL, httpClient)
6262
if err != nil {
6363
return nil, err
6464
}
65-
return proxy.NewSSEManager(context.Background(), cache, domainURL, streamingClient, httpClient, sseAuthState), nil
65+
return proxy.NewSSEManager(ctx, cache, domainURL, streamingClient, httpClient, sseAuthState, pollingFallbackInterval), nil
6666
}
6767

6868
func startProxyServer(cmd *cobra.Command, args []string) {
@@ -132,6 +132,16 @@ func startProxyServer(cmd *cobra.Command, args []string) {
132132
util.PrintErrorMessageAndExit(fmt.Sprintf("Invalid static-secrets-refresh-interval format '%s'. Use formats like 30m, 1h, 1d", staticSecretsRefreshIntervalStr))
133133
}
134134

135+
pollingFallbackIntervalStr, err := cmd.Flags().GetString("polling-fallback-interval")
136+
if err != nil {
137+
util.HandleError(err, "Unable to parse polling-fallback-interval flag")
138+
}
139+
140+
pollingFallbackInterval, err := util.ParseTimeDurationString(pollingFallbackIntervalStr, true)
141+
if err != nil {
142+
util.PrintErrorMessageAndExit(fmt.Sprintf("Invalid polling-fallback-interval format '%s'. Use formats like 1m, 5m, 10m", pollingFallbackIntervalStr))
143+
}
144+
135145
useSSE, err := cmd.Flags().GetBool("event-subscription-enabled")
136146
if err != nil {
137147
util.HandleError(err, "Unable to parse event-subscription-enabled flag")
@@ -204,7 +214,7 @@ func startProxyServer(cmd *cobra.Command, args []string) {
204214

205215
var sseManager *proxy.SSEManager
206216
if useSSE {
207-
sseManager, err = initializeSSEManager(context.Background(), clientId, clientSecret, cache, domainURL, streamingClient, httpClient)
217+
sseManager, err = initializeSSEManager(context.Background(), clientId, clientSecret, cache, domainURL, streamingClient, httpClient, pollingFallbackInterval)
208218
if err != nil {
209219
util.HandleError(err, "Failed to initialize SSE manager")
210220
}
@@ -626,6 +636,7 @@ func init() {
626636
proxyStartCmd.Flags().Bool("event-subscription-enabled", false, "Enable Event Subscription mode for real-time cache invalidation. When enabled, the static secrets refresh loop is disabled. If event subscriptions are unavailable, the proxy will fall back to a polling mechanism. `--client id` and `--client-secret` are required when this is set to true ")
627637
proxyStartCmd.Flags().String("client-id", "", "Machine identity universal auth client ID. This is required when using event subscriptions.")
628638
proxyStartCmd.Flags().String("client-secret", "", "Machine identity universal auth client secret. This is required when using event subscriptions.")
639+
proxyStartCmd.Flags().String("polling-fallback-interval", "10m", "How often to poll for secret changes when SSE is unavailable (e.g., 1m, 5m). Defaults to 5m. Only used when --event-subscription-enabled is set.")
629640

630641
proxyDebugCmd.Flags().String("listen-address", "localhost:8081", "The address where the proxy server is listening. Defaults to localhost:8081")
631642

packages/proxy/resync.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -362,6 +362,7 @@ func runProjectSecretsRefresh(cache *Cache, domainURL *url.URL, httpClient *http
362362
}
363363

364364
func startProjectPollingLoop(ctx context.Context, cache *Cache, domainURL *url.URL, httpClient *http.Client, projectId, envSlug string, interval time.Duration, onPollComplete func()) {
365+
log.Info().Str("projectId", projectId).Str("envSlug", envSlug).Msg("Starting project polling loop")
365366
ticker := time.NewTicker(interval)
366367
defer ticker.Stop()
367368

packages/proxy/sse.go

Lines changed: 22 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -188,28 +188,29 @@ type pollingState struct {
188188
// When SSE connections fail repeatedly for a project, the manager transitions
189189
// that project to a polling fallback and tracks it in pollingProjects.
190190
type SSEManager struct {
191-
mu sync.Mutex
192-
connections map[string]*SSEConnection // active SSE connections
193-
pollingProjects map[string]*pollingState // projects in polling fallback
194-
cache *Cache
195-
domainURL *url.URL
196-
httpClient *http.Client // streaming client (no timeout) for SSE connections
197-
resyncHttpClient *http.Client // regular client (with timeout) for cache resync requests
198-
authState *SSEAuthState
199-
ctx context.Context
191+
mu sync.Mutex
192+
connections map[string]*SSEConnection // active SSE connections
193+
pollingProjects map[string]*pollingState // projects in polling fallback
194+
cache *Cache
195+
domainURL *url.URL
196+
httpClient *http.Client // streaming client (no timeout) for SSE connections
197+
resyncHttpClient *http.Client // regular client (with timeout) for cache resync requests
198+
authState *SSEAuthState
199+
pollingFallbackInterval time.Duration
200+
ctx context.Context
200201
}
201202

202-
func NewSSEManager(ctx context.Context, cache *Cache, domainURL *url.URL, httpClient *http.Client, resyncHttpClient *http.Client, authState *SSEAuthState) *SSEManager {
203-
203+
func NewSSEManager(ctx context.Context, cache *Cache, domainURL *url.URL, httpClient *http.Client, resyncHttpClient *http.Client, authState *SSEAuthState, pollingFallbackInterval time.Duration) *SSEManager {
204204
return &SSEManager{
205-
connections: make(map[string]*SSEConnection),
206-
pollingProjects: make(map[string]*pollingState),
207-
cache: cache,
208-
domainURL: domainURL,
209-
httpClient: httpClient,
210-
resyncHttpClient: resyncHttpClient,
211-
authState: authState,
212-
ctx: ctx,
205+
connections: make(map[string]*SSEConnection),
206+
pollingProjects: make(map[string]*pollingState),
207+
cache: cache,
208+
domainURL: domainURL,
209+
httpClient: httpClient,
210+
resyncHttpClient: resyncHttpClient,
211+
authState: authState,
212+
pollingFallbackInterval: pollingFallbackInterval,
213+
ctx: ctx,
213214
}
214215
}
215216

@@ -327,9 +328,6 @@ func (m *SSEManager) runConnection(conn *SSEConnection, ctx context.Context) {
327328
Int("maxRetries", maxRetries).
328329
Msg("SSE connection lost, resyncing cache before retrying")
329330

330-
// Resync project cache before reconnecting — we may have missed events during the outage
331-
runProjectSecretsRefresh(m.cache, m.domainURL, m.resyncHttpClient, conn.ProjectID, conn.EnvironmentSlug)
332-
333331
// Exponential backoff with jitter
334332
delay := baseDelay * time.Duration(math.Pow(2, float64(retries-1)))
335333

@@ -345,8 +343,6 @@ func (m *SSEManager) runConnection(conn *SSEConnection, ctx context.Context) {
345343
}
346344
}
347345

348-
const pollingFallbackInterval = 5 * time.Minute
349-
350346
// transitionToPolling moves a project from SSE mode to polling fallback.
351347
func (m *SSEManager) transitionToPolling(projectId, environmentSlug string) {
352348
m.mu.Lock()
@@ -373,7 +369,7 @@ func (m *SSEManager) transitionToPolling(projectId, environmentSlug string) {
373369
Msg("Starting polling fallback for project")
374370

375371
// Resync the project cache immediately — events might have been missed during the outage
376-
go runProjectSecretsRefresh(m.cache, m.domainURL, m.resyncHttpClient, projectId, environmentSlug)
372+
runProjectSecretsRefresh(m.cache, m.domainURL, m.resyncHttpClient, projectId, environmentSlug)
377373

378374
// On each polling tick (except the first), attempt SSE reconnection
379375
retrySSE := func() {
@@ -388,7 +384,7 @@ func (m *SSEManager) transitionToPolling(projectId, environmentSlug string) {
388384
go m.attemptSSEReconnection(projectId, environmentSlug)
389385
}
390386

391-
go startProjectPollingLoop(pollCtx, m.cache, m.domainURL, m.resyncHttpClient, projectId, environmentSlug, pollingFallbackInterval, retrySSE)
387+
go startProjectPollingLoop(pollCtx, m.cache, m.domainURL, m.resyncHttpClient, projectId, environmentSlug, m.pollingFallbackInterval, retrySSE)
392388
}
393389

394390
func (m *SSEManager) cancelPollingIfActive(projectId string) {

0 commit comments

Comments
 (0)