Skip to content

Commit 56ced3b

Browse files
committed
feat(pam): real-time session log sync via incremental batch uploads
Replaces end-of-session bulk upload with incremental 10-second batch uploads to enable live monitoring and session intervention. - Add CallUploadPamSessionEventBatch to post raw event bytes to the new batch endpoint - Add sessionUploadState with per-session file offset tracking and mutex - Add RegisterSession/UnregisterSession for active session management - Add readFromOffset to read and decrypt new records from a byte offset - Persist upload progress to a .offset file alongside the .enc recording for crash recovery; resume from saved offset on restart - Flush all active sessions every 10 seconds via a new ticker - Re-register all on-disk session files at startup to resume after crash - Fall back to legacy bulk upload if the batch endpoint returns 404 - Remove end-of-session bulk upload call from CleanupPAMSession
1 parent fa03013 commit 56ced3b

3 files changed

Lines changed: 305 additions & 78 deletions

File tree

packages/api/api.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ const (
5454
operationCallGetPamSessionKey = "CallGetPamSessionKey"
5555
operationCallUploadPamSessionLog = "CallUploadPamSessionLog"
5656
operationCallPAMSessionTermination = "CallPAMSessionTermination"
57+
operationCallUploadPamSessionEventBatch = "CallUploadPamSessionEventBatch"
5758
operationCallGetMFASessionStatus = "CallGetMFASessionStatus"
5859
operationCallOrgRelayHeartBeat = "CallOrgRelayHeartBeat"
5960
operationCallInstanceRelayHeartBeat = "CallInstanceRelayHeartBeat"
@@ -1008,6 +1009,23 @@ func CallUploadPamSessionLogs(httpClient *resty.Client, sessionId string, reques
10081009
return nil
10091010
}
10101011

1012+
func CallUploadPamSessionEventBatch(httpClient *resty.Client, sessionId string, startOffset int64, data []byte) error {
1013+
response, err := httpClient.
1014+
R().
1015+
SetHeader("User-Agent", USER_AGENT).
1016+
SetHeader("Content-Type", "application/octet-stream").
1017+
SetBody(data).
1018+
Post(fmt.Sprintf("%v/v1/pam/sessions/%s/event-batches?startOffset=%d", config.INFISICAL_URL, sessionId, startOffset))
1019+
1020+
if err != nil {
1021+
return NewGenericRequestError(operationCallUploadPamSessionEventBatch, err)
1022+
}
1023+
if response.IsError() {
1024+
return NewAPIErrorWithResponse(operationCallUploadPamSessionEventBatch, response, nil)
1025+
}
1026+
return nil
1027+
}
1028+
10111029
func CallPAMSessionTermination(httpClient *resty.Client, sessionId string) error {
10121030
response, err := httpClient.
10131031
R().

packages/pam/pam-proxy.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,7 @@ func HandlePAMProxy(ctx context.Context, conn *tls.Conn, pamConfig *GatewayPAMCo
151151
if err != nil {
152152
return fmt.Errorf("failed to create session logger: %w", err)
153153
}
154+
pamConfig.SessionUploader.RegisterSession(pamConfig.SessionId)
154155

155156
serverName := credentials.Host
156157
if pamConfig.ResourceType == session.ResourceTypeKubernetes {

0 commit comments

Comments
 (0)