Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,7 @@ infisical-merge
test/infisical-merge
.DS_Store

infisical


/agent-testing
80 changes: 61 additions & 19 deletions packages/cmd/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,25 @@ func startProxyServer(cmd *cobra.Command, args []string) {
util.PrintErrorMessageAndExit(fmt.Sprintf("Invalid static-secrets-refresh-interval format '%s'. Use formats like 30m, 1h, 1d", staticSecretsRefreshIntervalStr))
}

useSSE, err := cmd.Flags().GetBool("use-sse")
if err != nil {
util.HandleError(err, "Unable to parse use-sse flag")
}

clientId, err := cmd.Flags().GetString("client-id")
if err != nil {
util.HandleError(err, "Unable to parse client-id flag")
}

clientSecret, err := cmd.Flags().GetString("client-secret")
if err != nil {
util.HandleError(err, "Unable to parse client-secret flag")
}

if useSSE && (clientId == "" || clientSecret == "") {
util.PrintErrorMessageAndExit("--client-id and --client-secret are required when --use-sse is enabled")
}

domainURL, err := url.Parse(domain)
if err != nil {
util.HandleError(err, fmt.Sprintf("Invalid domain URL: %s", domain))
Expand Down Expand Up @@ -175,6 +194,14 @@ func startProxyServer(cmd *cobra.Command, args []string) {
log.Info().Msg("Dev mode enabled: debug endpoint available at /_debug/cache")
}

var sseManager *proxy.SSEManager
var sseAuthState *proxy.SSEAuthState
if useSSE {
sseAuthState = proxy.NewSSEAuthState(clientId, clientSecret, domainURL, httpClient)
Comment thread
adilsitos marked this conversation as resolved.
Outdated
sseManager = proxy.NewSSEManager(context.Background(), cache, domainURL, streamingClient, httpClient, sseAuthState)
log.Info().Msg("SSE manager initialized for demand-driven cache updates")
}

proxyHandler := func(w http.ResponseWriter, r *http.Request) {
// Skip debug endpoints - they're handled by mux
if strings.HasPrefix(r.URL.Path, "/_debug/") {
Expand All @@ -190,26 +217,9 @@ func startProxyServer(cmd *cobra.Command, args []string) {
// -- Cache Check --

if isCacheable && token != "" {
cacheKey := proxy.GenerateCacheKey(r.Method, r.URL.Path, r.URL.RawQuery, token)

if cachedResp, found := cache.Get(cacheKey); found {
log.Info().
Str("hash", cacheKey).
Msg("Cache hit")

proxy.CopyHeaders(w.Header(), cachedResp.Header)
w.WriteHeader(cachedResp.StatusCode)
_, err := io.Copy(w, cachedResp.Body)
if err != nil {
log.Error().Err(err).Msg("Failed to copy cached response body")
return
}
if served := serveCachedResponse(w, r, cache, token); served {
return
}

log.Info().
Str("hash", cacheKey).
Msg("Cache miss")
}

// -- Proxy Request --
Expand Down Expand Up @@ -428,6 +438,12 @@ func startProxyServer(cmd *cobra.Command, args []string) {

cache.Set(cacheKey, r, cachedResp, token, indexEntry)

if sseManager != nil {
log.Info().Str("projectId", indexEntry.ProjectId).Msg("Ensuring SSE subscription for project")
// this will start the subscription using SSE for the project
sseManager.EnsureSubscription(indexEntry.ProjectId)
}

log.Debug().
Str("method", r.Method).
Str("path", r.URL.Path).
Expand Down Expand Up @@ -473,7 +489,7 @@ func startProxyServer(cmd *cobra.Command, args []string) {
resyncCtx, resyncCancel := context.WithCancel(context.Background())
defer resyncCancel()

go proxy.StartBackgroundLoops(resyncCtx, cache, domainURL, httpClient, evictionStrategy, accessTokenCheckInterval, staticSecretsRefreshInterval)
proxy.StartBackgroundLoops(resyncCtx, cache, domainURL, httpClient, evictionStrategy, accessTokenCheckInterval, staticSecretsRefreshInterval, useSSE)

// Handle graceful shutdown
sigCh := make(chan os.Signal, 1)
Expand Down Expand Up @@ -563,6 +579,29 @@ func printCacheDebug(cmd *cobra.Command, args []string) {
fmt.Println(string(output))
}

// serveCachedResponse looks up the cache for the request using the caller's token.
// Returns true if a cached response was written to w.
func serveCachedResponse(w http.ResponseWriter, r *http.Request, c *proxy.Cache, token string) bool {
cacheKey := proxy.GenerateCacheKey(r.Method, r.URL.Path, r.URL.RawQuery, token)

if cachedResp, found := c.Get(cacheKey); found {
log.Info().Str("hash", cacheKey).Msg("Cache hit")
writeCachedResponse(w, cachedResp)
return true
}

log.Info().Str("hash", cacheKey).Msg("Cache miss")
return false
}

func writeCachedResponse(w http.ResponseWriter, cachedResp *http.Response) {
proxy.CopyHeaders(w.Header(), cachedResp.Header)
w.WriteHeader(cachedResp.StatusCode)
if _, err := io.Copy(w, cachedResp.Body); err != nil {
log.Error().Err(err).Msg("Failed to copy cached response body")
}
}

func isStreamingEndpoint(path string) bool {
return strings.HasPrefix(path, "/api/v1/events/")
}
Expand All @@ -576,6 +615,9 @@ func init() {
proxyStartCmd.Flags().String("tls-cert-file", "", "The path to the TLS certificate file for the proxy server. Required when `tls-enabled` is set to true (default)")
proxyStartCmd.Flags().String("tls-key-file", "", "The path to the TLS key file for the proxy server. Required when `tls-enabled` is set to true (default)")
proxyStartCmd.Flags().Bool("tls-enabled", true, "Whether to enable TLS for the proxy server. Defaults to true")
proxyStartCmd.Flags().Bool("use-sse", false, "Enable SSE (Server-Sent Events) mode for real-time cache invalidation. When enabled, the static secrets refresh loop is disabled and --client-id/--client-secret are required.")
Comment thread
adilsitos marked this conversation as resolved.
Outdated
proxyStartCmd.Flags().String("client-id", "", "Universal auth client ID for SSE (required when --use-sse is enabled)")
proxyStartCmd.Flags().String("client-secret", "", "Machine identity client secret for universal auth (required when --use-sse is enabled)")
Comment thread
adilsitos marked this conversation as resolved.
Outdated

proxyDebugCmd.Flags().String("listen-address", "localhost:8081", "The address where the proxy server is listening. Defaults to localhost:8081")

Expand Down
78 changes: 78 additions & 0 deletions packages/proxy/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,15 @@ type PathIndexMarker struct {
CacheKey string `json:"cacheKey"`
}

// CollectedCacheEntry holds the metadata of a cache entry collected before purging,
// so it can be replayed (re-fetched) using the original request and token.
type CollectedCacheEntry struct {
CacheKey string
Request *CachedRequest
Token string
IndexEntry IndexEntry
}

// Cache is an HTTP response cache fully backed by EncryptedStorage
type Cache struct {
storage *cache.EncryptedStorage
Expand Down Expand Up @@ -130,6 +139,18 @@ func IsSecretsEndpoint(path string) bool {
path == "/api/v3/secrets" || path == "/api/v4/secrets"
}

func ExtractSecretNameFromPath(urlPath string) string {
for _, prefix := range []string{"/api/v4/secrets/", "/api/v3/secrets/"} {
if strings.HasPrefix(urlPath, prefix) {
name := strings.TrimPrefix(urlPath, prefix)
if name != "" {
return name
}
}
}
return ""
}

func IsCacheableRequest(path string, method string) bool {
if method != http.MethodGet {
return false
Expand Down Expand Up @@ -163,6 +184,8 @@ func (c *Cache) Get(cacheKey string) (*http.Response, bool) {
return resp, true
}

// when a new key is added, we need to check if the projectID and the env slug are being tracked in the cache
// to refactor: every operation that call set, it checkes the values of indexEntry before, this could happen here.
func (c *Cache) Set(cacheKey string, req *http.Request, resp *http.Response, token string, indexEntry IndexEntry) {
c.mu.Lock()
defer c.mu.Unlock()
Expand Down Expand Up @@ -570,6 +593,60 @@ func (c *Cache) PurgeByMutation(projectID, envSlug, mutationPath string) int {
return purgedCount
}

// CollectAndPurgeByMutation collects cache entries matching the mutation path, then purges them.
// Returns the collected entries so they can be replayed with their original requests/tokens.
func (c *Cache) CollectAndPurgeByMutation(projectID, envSlug, mutationPath string) []CollectedCacheEntry {
c.mu.Lock()
defer c.mu.Unlock()

var collected []CollectedCacheEntry

prefix := buildPathIndexPrefixForProject(projectID, envSlug)
pathKeys, err := c.storage.GetKeysByPrefix(prefix)
if err != nil {
log.Error().Err(err).Msg("Failed to get path index keys for collect-and-purge")
return collected
}

for _, key := range pathKeys {
withoutPrefix := strings.TrimPrefix(key, prefix)
parts := strings.SplitN(withoutPrefix, ":", 3)
if len(parts) < 3 {
continue
}

keySecretPath := strings.ReplaceAll(parts[1], "\\:", ":")
keyCacheKey := parts[2]

if !matchesPath(keySecretPath, mutationPath) {
continue
}

// Load the full entry before evicting
var entry StoredCacheEntry
if err := c.storage.Get(buildEntryKey(keyCacheKey), &entry); err == nil && entry.Request != nil {
requestCopy := &CachedRequest{
Method: entry.Request.Method,
RequestURI: entry.Request.RequestURI,
Headers: make(http.Header),
CachedAt: entry.Request.CachedAt,
}
CopyHeaders(requestCopy.Headers, entry.Request.Headers)

collected = append(collected, CollectedCacheEntry{
CacheKey: keyCacheKey,
Request: requestCopy,
Token: entry.Token,
IndexEntry: entry.Index,
})
}

c.evictEntryUnsafe(keyCacheKey)
}

return collected
}

// CompoundPathIndexDebugInfo represents the compound path index structure
type CompoundPathIndexDebugInfo struct {
Token string `json:"token"`
Expand Down Expand Up @@ -755,3 +832,4 @@ func (c *Cache) GetDebugInfo() CacheDebugInfo {
CompoundPathIndex: compoundPathIndex,
}
}

41 changes: 30 additions & 11 deletions packages/proxy/resync.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,28 +289,47 @@ func runStaticSecretsRefresh(cache *Cache, domainURL *url.URL, httpClient *http.
Msg("Static secrets refresh completed")
}

// StartBackgroundLoops starts the background loops for token validation and secrets refresh
func StartBackgroundLoops(ctx context.Context, cache *Cache, domainURL *url.URL, httpClient *http.Client, evictionStrategy string, accessTokenCheckInterval time.Duration, staticSecretsRefreshInterval time.Duration) {
func startAccessTokenValidation(ctx context.Context, cache *Cache, domainURL *url.URL, httpClient *http.Client, accessTokenCheckInterval time.Duration) {
tokenTicker := time.NewTicker(accessTokenCheckInterval)
secretsTicker := time.NewTicker(staticSecretsRefreshInterval)
defer tokenTicker.Stop()
defer secretsTicker.Stop()

log.Info().
Str("evictionStrategy", evictionStrategy).
Str("accessTokenCheckInterval", accessTokenCheckInterval.String()).
Str("staticSecretsRefreshInterval", staticSecretsRefreshInterval.String()).
Msg("Background loops started")

for {
select {
case <-tokenTicker.C:
runAccessTokenValidation(cache, domainURL, httpClient)
case <-ctx.Done():
return
}
}
}

func startStaticSecretsRefresh(ctx context.Context, cache *Cache, domainURL *url.URL, httpClient *http.Client, staticSecretsRefreshInterval time.Duration) {
secretsTicker := time.NewTicker(staticSecretsRefreshInterval)
defer secretsTicker.Stop()

for {
select {
case <-secretsTicker.C:
runStaticSecretsRefresh(cache, domainURL, httpClient, staticSecretsRefreshInterval)
case <-ctx.Done():
log.Info().Msg("Background loops stopped")
return
}
}
}

// StartBackgroundLoops starts the background loops for token validation and secrets refresh.
// When sseEnabled is true, the static secrets refresh loop is disabled (SSE handles cache invalidation).
func StartBackgroundLoops(ctx context.Context, cache *Cache, domainURL *url.URL, httpClient *http.Client, evictionStrategy string, accessTokenCheckInterval time.Duration, staticSecretsRefreshInterval time.Duration, sseEnabled bool) {
log.Info().
Str("evictionStrategy", evictionStrategy).
Str("accessTokenCheckInterval", accessTokenCheckInterval.String()).
Str("staticSecretsRefreshInterval", staticSecretsRefreshInterval.String()).
Bool("sseEnabled", sseEnabled).
Msg("Background loops started")

if sseEnabled {
go startAccessTokenValidation(ctx, cache, domainURL, httpClient, accessTokenCheckInterval)
Comment thread
adilsitos marked this conversation as resolved.
Outdated
}
go startStaticSecretsRefresh(ctx, cache, domainURL, httpClient, staticSecretsRefreshInterval)

}
Loading
Loading