Skip to content

Commit f5234eb

Browse files
Merge pull request #7 from BTreeMap/copilot/wire-in-persistence-infra
Wire persistence infrastructure into flow and messaging layers for restart safety
2 parents 11088b1 + 29083c5 commit f5234eb

12 files changed

Lines changed: 1058 additions & 56 deletions

internal/api/api.go

Lines changed: 108 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package api
66

77
import (
88
"context"
9+
"encoding/json"
910
"errors"
1011
"fmt"
1112
"log/slog"
@@ -52,16 +53,19 @@ type Server struct {
5253
timer models.Timer
5354
defaultSchedule *models.Schedule
5455
gaClient *genai.Client
55-
intakeBotPromptFile string // path to intake bot system prompt file
56-
promptGeneratorPromptFile string // path to prompt generator system prompt file
57-
feedbackTrackerPromptFile string // path to feedback tracker system prompt file
58-
chatHistoryLimit int // limit for number of history messages sent to bot tools
59-
feedbackInitialTimeout string // timeout for initial feedback response
60-
feedbackFollowupDelay string // delay before follow-up feedback session
61-
debugMode bool // enable debug mode for user-facing debug messages
62-
schedulerPrepTimeMinutes int // preparation time in minutes before scheduled habit reminders
63-
autoFeedbackAfterPromptEnabled bool // enable auto feedback session enforcement after scheduled prompt inactivity
64-
autoEnrollNewUsers bool // enable automatic enrollment of new users on first message
56+
intakeBotPromptFile string // path to intake bot system prompt file
57+
promptGeneratorPromptFile string // path to prompt generator system prompt file
58+
feedbackTrackerPromptFile string // path to feedback tracker system prompt file
59+
chatHistoryLimit int // limit for number of history messages sent to bot tools
60+
feedbackInitialTimeout string // timeout for initial feedback response
61+
feedbackFollowupDelay string // delay before follow-up feedback session
62+
debugMode bool // enable debug mode for user-facing debug messages
63+
schedulerPrepTimeMinutes int // preparation time in minutes before scheduled habit reminders
64+
autoFeedbackAfterPromptEnabled bool // enable auto feedback session enforcement after scheduled prompt inactivity
65+
autoEnrollNewUsers bool // enable automatic enrollment of new users on first message
66+
jobRunner *store.JobRunner // durable job runner (nil when in-memory store)
67+
outboxSender *store.OutboxSender // outbox message sender (nil when in-memory store)
68+
workerCancel context.CancelFunc // cancel function for worker goroutines
6569
}
6670

6771
// NewServer creates a new API server instance with the provided dependencies.
@@ -288,6 +292,9 @@ func createAndConfigureServer(waOpts []whatsapp.Option, storeOpts []store.Option
288292
return nil, "", fmt.Errorf("failed to initialize conversation flow: %w", err)
289293
}
290294

295+
// Initialize persistence workers (JobRunner + OutboxSender) if store supports it
296+
server.initializePersistenceWorkers()
297+
291298
// Initialize application state recovery
292299
if err := server.initializeRecovery(); err != nil {
293300
return nil, "", fmt.Errorf("failed to initialize recovery: %w", err)
@@ -393,6 +400,90 @@ func (s *Server) initializeGenAI(genaiOpts []genai.Option) error {
393400
return nil
394401
}
395402

403+
// initializePersistenceWorkers sets up the JobRunner and OutboxSender if the store
404+
// supports durable persistence (implements PersistenceProvider).
405+
func (s *Server) initializePersistenceWorkers() {
406+
pp, ok := s.st.(store.PersistenceProvider)
407+
if !ok {
408+
slog.Info("Store does not support persistence workers (in-memory mode), skipping JobRunner/OutboxSender")
409+
return
410+
}
411+
412+
// Get the conversation flow to wire up job handlers
413+
conversationFlowInterface, exists := flow.Get(models.PromptTypeConversation)
414+
if !exists || conversationFlowInterface == nil {
415+
slog.Warn("Conversation flow not registered, skipping persistence worker setup")
416+
return
417+
}
418+
conversationFlow, ok := conversationFlowInterface.(*flow.ConversationFlow)
419+
if !ok {
420+
slog.Warn("Conversation flow has unexpected type, skipping persistence worker setup")
421+
return
422+
}
423+
424+
// Set durable job repo on all flow tools
425+
jobRepo := pp.JobRepo()
426+
conversationFlow.SetJobRepo(jobRepo)
427+
428+
// Set inbound dedup repo on the messaging service
429+
if waService, ok := s.msgService.(*messaging.WhatsAppService); ok {
430+
waService.SetDedupRepo(pp.DedupRepo())
431+
slog.Debug("Inbound dedup repo wired into WhatsApp service")
432+
}
433+
434+
// Create state manager for job handlers
435+
stateManager := flow.NewStoreBasedStateManager(s.st)
436+
437+
// Create JobRunner and register handlers
438+
jobRunner := store.NewJobRunner(jobRepo, 10*time.Second)
439+
flow.RegisterJobHandlers(
440+
jobRunner,
441+
stateManager,
442+
s.msgService,
443+
conversationFlow.GetSchedulerTool(),
444+
conversationFlow.GetFeedbackModule(),
445+
conversationFlow.GetStateTransitionTool(),
446+
)
447+
448+
// Create OutboxSender with real send callback that parses JSON payload
449+
outboxSender := store.NewOutboxSender(pp.OutboxRepo(), func(ctx context.Context, msg store.OutboxMessage) error {
450+
// Parse the payload to extract the message body
451+
var payload struct {
452+
To string `json:"to"`
453+
Body string `json:"body"`
454+
}
455+
if err := json.Unmarshal([]byte(msg.PayloadJSON), &payload); err != nil {
456+
// If payload is not JSON, treat the whole string as the message body
457+
slog.Warn("OutboxSender: payload is not structured JSON, sending as-is", "participantID", msg.ParticipantID)
458+
return s.msgService.SendMessage(ctx, msg.ParticipantID, msg.PayloadJSON)
459+
}
460+
to := payload.To
461+
if to == "" {
462+
to = msg.ParticipantID
463+
}
464+
return s.msgService.SendMessage(ctx, to, payload.Body)
465+
}, 5*time.Second)
466+
467+
// Recover stale work from previous crash
468+
if err := jobRunner.RecoverStaleJobs(); err != nil {
469+
slog.Error("Failed to recover stale jobs on startup", "error", err)
470+
}
471+
if err := outboxSender.RecoverStaleMessages(); err != nil {
472+
slog.Error("Failed to recover stale outbox messages on startup", "error", err)
473+
}
474+
475+
// Start worker loops
476+
workerCtx, workerCancel := context.WithCancel(context.Background())
477+
go jobRunner.Run(workerCtx)
478+
go outboxSender.Run(workerCtx)
479+
480+
s.jobRunner = jobRunner
481+
s.outboxSender = outboxSender
482+
s.workerCancel = workerCancel
483+
484+
slog.Info("Persistence workers started (JobRunner + OutboxSender)")
485+
}
486+
396487
// initializeConversationFlow sets up the conversation flow with system prompt loading and scheduler tool
397488
func (s *Server) initializeConversationFlow() error {
398489
// Create conversation flow with dependencies and scheduler tool
@@ -642,6 +733,13 @@ func (s *Server) gracefulShutdown(srv *http.Server) error {
642733
s.timer.Stop()
643734
slog.Debug("Timer stopped")
644735

736+
// Stop persistence workers
737+
if s.workerCancel != nil {
738+
slog.Debug("Stopping persistence workers")
739+
s.workerCancel()
740+
slog.Debug("Persistence workers stopped")
741+
}
742+
645743
// Close store to clean up database connections
646744
slog.Debug("Closing store")
647745
if err := s.st.Close(); err != nil {

internal/flow/conversation_flow.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212

1313
"github.com/BTreeMap/PromptPipe/internal/genai"
1414
"github.com/BTreeMap/PromptPipe/internal/models"
15+
"github.com/BTreeMap/PromptPipe/internal/store"
1516
"github.com/BTreeMap/PromptPipe/internal/whatsapp"
1617
"github.com/openai/openai-go"
1718
)
@@ -641,6 +642,29 @@ func (f *ConversationFlow) GetSchedulerTool() *SchedulerTool {
641642
return f.schedulerTool
642643
}
643644

645+
// GetFeedbackModule returns the feedback module for external handler registration.
646+
func (f *ConversationFlow) GetFeedbackModule() *FeedbackModule {
647+
return f.feedbackModule
648+
}
649+
650+
// GetStateTransitionTool returns the state transition tool for external handler registration.
651+
func (f *ConversationFlow) GetStateTransitionTool() *StateTransitionTool {
652+
return f.stateTransitionTool
653+
}
654+
655+
// SetJobRepo sets the durable job repository on all tools that support it.
656+
func (f *ConversationFlow) SetJobRepo(repo store.JobRepo) {
657+
if f.schedulerTool != nil {
658+
f.schedulerTool.SetJobRepo(repo)
659+
}
660+
if f.feedbackModule != nil {
661+
f.feedbackModule.SetJobRepo(repo)
662+
}
663+
if f.stateTransitionTool != nil {
664+
f.stateTransitionTool.SetJobRepo(repo)
665+
}
666+
}
667+
644668
// sendDebugMessage sends a debug message to the user if debug mode is enabled.
645669
func (f *ConversationFlow) sendDebugMessage(ctx context.Context, participantID, message string) {
646670
if !f.debugMode || f.msgService == nil {

internal/flow/durable_jobs.go

Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
// Package flow provides durable job kind constants and payload types
2+
// used to replace in-memory timers with restart-safe database jobs.
3+
package flow
4+
5+
import (
6+
"context"
7+
"encoding/json"
8+
"fmt"
9+
"log/slog"
10+
11+
"github.com/BTreeMap/PromptPipe/internal/models"
12+
"github.com/BTreeMap/PromptPipe/internal/store"
13+
)
14+
15+
// Job kind constants for durable jobs that replace in-memory timers.
16+
const (
17+
JobKindStateTransition = "state_transition"
18+
JobKindFeedbackTimeout = "feedback_timeout"
19+
JobKindFeedbackFollowup = "feedback_followup"
20+
JobKindDailyPromptReminder = "daily_prompt_reminder"
21+
JobKindAutoFeedbackEnforcement = "auto_feedback_enforcement"
22+
)
23+
24+
// StateTransitionPayload is the JSON payload for state_transition jobs.
25+
type StateTransitionPayload struct {
26+
ParticipantID string `json:"participant_id"`
27+
TargetState string `json:"target_state"`
28+
Reason string `json:"reason,omitempty"`
29+
}
30+
31+
// FeedbackTimeoutPayload is the JSON payload for feedback_timeout jobs.
32+
type FeedbackTimeoutPayload struct {
33+
ParticipantID string `json:"participant_id"`
34+
PhoneNumber string `json:"phone_number"`
35+
}
36+
37+
// FeedbackFollowupPayload is the JSON payload for feedback_followup jobs.
38+
type FeedbackFollowupPayload struct {
39+
ParticipantID string `json:"participant_id"`
40+
PhoneNumber string `json:"phone_number"`
41+
}
42+
43+
// DailyPromptReminderPayload is the JSON payload for daily_prompt_reminder jobs.
44+
type DailyPromptReminderPayload struct {
45+
ParticipantID string `json:"participant_id"`
46+
To string `json:"to"`
47+
ExpectedSentAt string `json:"expected_sent_at"`
48+
}
49+
50+
// AutoFeedbackEnforcementPayload is the JSON payload for auto_feedback_enforcement jobs.
51+
type AutoFeedbackEnforcementPayload struct {
52+
ParticipantID string `json:"participant_id"`
53+
}
54+
55+
// RegisterJobHandlers registers all flow-related job handlers with the given JobRunner.
56+
func RegisterJobHandlers(runner *store.JobRunner, stateManager StateManager, msgService MessagingService, schedulerTool *SchedulerTool, feedbackModule *FeedbackModule, stateTransitionTool *StateTransitionTool) {
57+
runner.RegisterHandler(JobKindStateTransition, makeStateTransitionHandler(stateManager, stateTransitionTool))
58+
runner.RegisterHandler(JobKindFeedbackTimeout, makeFeedbackTimeoutHandler(stateManager, feedbackModule))
59+
runner.RegisterHandler(JobKindFeedbackFollowup, makeFeedbackFollowupHandler(stateManager, msgService))
60+
runner.RegisterHandler(JobKindDailyPromptReminder, makeDailyPromptReminderHandler(schedulerTool))
61+
runner.RegisterHandler(JobKindAutoFeedbackEnforcement, makeAutoFeedbackEnforcementHandler(schedulerTool))
62+
}
63+
64+
func makeStateTransitionHandler(stateManager StateManager, stt *StateTransitionTool) store.JobHandler {
65+
return func(ctx context.Context, payload string) error {
66+
var p StateTransitionPayload
67+
if err := json.Unmarshal([]byte(payload), &p); err != nil {
68+
return fmt.Errorf("invalid state_transition payload: %w", err)
69+
}
70+
slog.Info("JobHandler.state_transition: executing", "participantID", p.ParticipantID, "targetState", p.TargetState)
71+
72+
// Idempotency: check current state - if already at target, skip
73+
currentState, err := stateManager.GetStateData(ctx, p.ParticipantID, models.FlowTypeConversation, models.DataKeyConversationState)
74+
if err != nil {
75+
return fmt.Errorf("failed to read current state: %w", err)
76+
}
77+
if currentState == p.TargetState {
78+
slog.Info("JobHandler.state_transition: already at target state, skipping", "participantID", p.ParticipantID, "targetState", p.TargetState)
79+
return nil
80+
}
81+
82+
// Execute the transition
83+
_, err = stt.executeImmediateTransition(ctx, p.ParticipantID, models.StateType(p.TargetState), p.Reason)
84+
if err != nil {
85+
return fmt.Errorf("state transition failed: %w", err)
86+
}
87+
// Clear the stored job ID reference
88+
_ = stateManager.SetStateData(ctx, p.ParticipantID, models.FlowTypeConversation, models.DataKeyStateTransitionTimerID, "")
89+
return nil
90+
}
91+
}
92+
93+
func makeFeedbackTimeoutHandler(stateManager StateManager, fm *FeedbackModule) store.JobHandler {
94+
return func(ctx context.Context, payload string) error {
95+
var p FeedbackTimeoutPayload
96+
if err := json.Unmarshal([]byte(payload), &p); err != nil {
97+
return fmt.Errorf("invalid feedback_timeout payload: %w", err)
98+
}
99+
slog.Info("JobHandler.feedback_timeout: executing", "participantID", p.ParticipantID)
100+
101+
// Idempotency: check if feedback was already received
102+
feedbackState, err := stateManager.GetStateData(ctx, p.ParticipantID, models.FlowTypeConversation, models.DataKeyFeedbackState)
103+
if err == nil && feedbackState != "waiting_initial" {
104+
slog.Info("JobHandler.feedback_timeout: feedback already received, skipping", "participantID", p.ParticipantID, "currentState", feedbackState)
105+
return nil
106+
}
107+
108+
// Inject phone number into context for the handler
109+
ctxWithPhone := context.WithValue(ctx, phoneNumberContextKey, p.PhoneNumber)
110+
fm.handleInitialFeedbackTimeout(ctxWithPhone, p.ParticipantID)
111+
return nil
112+
}
113+
}
114+
115+
func makeFeedbackFollowupHandler(stateManager StateManager, msgService MessagingService) store.JobHandler {
116+
return func(ctx context.Context, payload string) error {
117+
var p FeedbackFollowupPayload
118+
if err := json.Unmarshal([]byte(payload), &p); err != nil {
119+
return fmt.Errorf("invalid feedback_followup payload: %w", err)
120+
}
121+
slog.Info("JobHandler.feedback_followup: executing", "participantID", p.ParticipantID)
122+
123+
// Idempotency: check if feedback was already completed
124+
feedbackState, err := stateManager.GetStateData(ctx, p.ParticipantID, models.FlowTypeConversation, models.DataKeyFeedbackState)
125+
if err == nil && feedbackState == "completed" {
126+
slog.Info("JobHandler.feedback_followup: feedback already completed, skipping", "participantID", p.ParticipantID)
127+
return nil
128+
}
129+
130+
// Send follow-up message
131+
followupMessage := "Hey! 👋 Just checking in - I sent you a habit suggestion earlier. Even if you didn't try it, I'd love to know what you think! Your feedback helps me learn what works best for you. 😊"
132+
if err := msgService.SendMessage(ctx, p.PhoneNumber, followupMessage); err != nil {
133+
return fmt.Errorf("failed to send follow-up: %w", err)
134+
}
135+
136+
// Update state
137+
_ = stateManager.SetStateData(ctx, p.ParticipantID, models.FlowTypeConversation, models.DataKeyFeedbackState, "followup_sent")
138+
return nil
139+
}
140+
}
141+
142+
func makeDailyPromptReminderHandler(schedulerTool *SchedulerTool) store.JobHandler {
143+
return func(ctx context.Context, payload string) error {
144+
var p DailyPromptReminderPayload
145+
if err := json.Unmarshal([]byte(payload), &p); err != nil {
146+
return fmt.Errorf("invalid daily_prompt_reminder payload: %w", err)
147+
}
148+
slog.Info("JobHandler.daily_prompt_reminder: executing", "participantID", p.ParticipantID, "to", p.To)
149+
150+
// Delegate to existing scheduler tool logic (which already checks pending state)
151+
schedulerTool.sendDailyPromptReminder(p.ParticipantID, p.To, p.ExpectedSentAt)
152+
return nil
153+
}
154+
}
155+
156+
func makeAutoFeedbackEnforcementHandler(schedulerTool *SchedulerTool) store.JobHandler {
157+
return func(ctx context.Context, payload string) error {
158+
var p AutoFeedbackEnforcementPayload
159+
if err := json.Unmarshal([]byte(payload), &p); err != nil {
160+
return fmt.Errorf("invalid auto_feedback_enforcement payload: %w", err)
161+
}
162+
slog.Info("JobHandler.auto_feedback_enforcement: executing", "participantID", p.ParticipantID)
163+
164+
// Delegate to existing scheduler tool logic (which already checks state)
165+
schedulerTool.enforceFeedbackIfNoResponse(p.ParticipantID)
166+
return nil
167+
}
168+
}

0 commit comments

Comments
 (0)