The original PromptPipe application had critical state persistence issues that caused loss of functionality across application restarts:
- Timer State Loss: Timer IDs were stored in flow state data, but actual timers existed only in memory
- Response Handler Loss: Response handlers were registered only in memory
- Application-Aware Recovery: Previous recovery logic was tightly coupled to specific flow types
RecoveryManager: Orchestrates recovery of all registered componentsRecoveryRegistry: Provides services and callbacks for infrastructure recoveryRecoverableInterface: Components implement this to handle their own recovery logicTimerRecoveryInfo&ResponseHandlerRecoveryInfo: Metadata for infrastructure recovery
ConversationFlowRecovery: Handles conversation participant recovery (the current flow type)- Each flow manages its own business logic while using generic infrastructure
- Recovery system integrated into server startup
- Infrastructure callbacks provided to avoid import cycles
- Recovery runs after store and timer initialization, before server start
- Recovery infrastructure handles timers and response handlers generically
- Flow logic handles business-specific recovery concerns
- No business logic embedded in recovery infrastructure
- Flows register with recovery manager rather than recovery knowing about flows
- Infrastructure provides callbacks rather than direct dependencies
- Plugin-like architecture for extensibility
- Recovery package doesn't import messaging or flow packages
- Callbacks used to wire up dependencies at application level
- Clean dependency graph maintained
The existing database schema already supports all required state persistence:
-- Flow states with current state and timer IDs in state_data
CREATE TABLE flow_states (
participant_id TEXT NOT NULL,
flow_type TEXT NOT NULL,
current_state TEXT NOT NULL,
state_data TEXT,
...
);
-- Participant table for recovery enumeration
CREATE TABLE conversation_participants (...);- Store and timer infrastructure initialized
- Recovery manager created with infrastructure callbacks
- Flow recoveries registered with manager
RecoverAll()called to restore state
- Query database for active participants by flow type
- Register response handlers for active conversation participants
- Scheduler-specific reminders are recovered separately after initialization (see
recoverSchedulerRemindersininternal/api/api.go)
- Timer Recovery: Uses a generic callback that logs timeouts (business-specific timer recovery is handled by the owning component)
- Response Handler Recovery: Recreates hooks based on flow type
- Error Handling: Continues recovery even if individual components fail
recovery_test.go: Tests generic recovery infrastructure with mocks- Demonstrates decoupled architecture without dependencies on real flows
- Validates timer and response handler recovery callbacks
- Resilient: Application restarts don't lose participant state
- Extensible: New flow types just implement
Recoverableinterface - Testable: Clear boundaries enable focused unit testing
- Maintainable: Business logic separated from infrastructure concerns
- No Breaking Changes: Uses existing database schema
In addition to the recovery system above, the persistence layer includes:
All time-based behavior that must survive restarts is represented as persisted job records in the jobs table, replacing in-memory time.AfterFunc calls:
JobRepo: Interface for enqueueing, claiming, completing, failing, and canceling jobs. Implementations exist for both SQLite and Postgres.JobRunner: Background worker that polls for due jobs and dispatches them to registered handlers. Supports exponential backoff on failure and crash recovery viaRequeueStaleRunningJobs().- Dedupe keys: Prevent duplicate jobs for the same intent (e.g., one scheduled prompt per participant per schedule).
Outgoing WhatsApp messages are persisted to the outbox_messages table before being sent:
OutboxRepo: Interface for enqueueing and managing outbound messages. Dedupe keys prevent duplicate sends.OutboxSender: Background worker that claims due messages and sends them via the messaging service. Supports retry with backoff.- On startup, stale
sendingmessages are requeued.
The inbound_dedup table prevents reprocessing of inbound WhatsApp messages after restart:
DedupRepo: Interface for recording and checking message IDs.- On message arrival, the system checks if the message ID was already processed and skips if so.
On startup:
- Stale
runningjobs are requeued (RequeueStaleRunningJobs) - Stale
sendingoutbox messages are requeued (RequeueStaleSendingMessages) - Existing recovery (response handlers, scheduler reminders) continues as before
The docker-compose.yml uses named volumes for both Postgres data and the PromptPipe state directory. docker compose down && docker compose up -d preserves all state. Only docker compose down -v destroys volumes.
See docs/persistence-audit.md for the complete state ledger.
// In server initialization
recoveryManager := recovery.NewRecoveryManager(store, timer)
// Register flow recoveries
stateManager := flow.NewStoreBasedStateManager(store)
recoveryManager.RegisterRecoverable(
flow.NewConversationFlowRecovery())
// Register infrastructure callbacks
recoveryManager.RegisterTimerRecovery(
recovery.TimerRecoveryHandler(timer))
recoveryManager.RegisterHandlerRecovery(
createResponseHandlerCallback(respHandler, msgService))
// Perform recovery
recoveryManager.RecoverAll(context.Background())This solution provides robust state recovery while maintaining clean architecture principles and enabling future extensibility.