Skip to content

Commit ffcd336

Browse files
Merge pull request #6 from BTreeMap/copilot/harden-persistence-layer
feat(store): durable jobs, outbox, and inbound dedup for restart-safe persistence
2 parents 65ed60c + 99cf77a commit ffcd336

22 files changed

Lines changed: 2018 additions & 1 deletion

Makefile

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,8 @@ build:
55
test:
66
go test ./...
77

8-
.PHONY: build test
8+
test-postgres:
9+
@echo "Run with: POSTGRES_DSN_TEST='postgres://user:pass@host:5432/dbname?sslmode=disable' make test-postgres"
10+
POSTGRES_DSN_TEST="$${POSTGRES_DSN_TEST}" go test ./internal/store/... -v -count=1
11+
12+
.PHONY: build test test-postgres

docker-compose.yml

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
# docker-compose.yml - PromptPipe development/production stack
2+
#
3+
# Data persistence: All durable state is stored in named volumes.
4+
# "docker compose down && docker compose up -d" preserves all state.
5+
# Use "docker compose down -v" to explicitly destroy volumes.
6+
7+
services:
8+
promptpipe:
9+
build:
10+
context: .
11+
dockerfile: docker/Dockerfile
12+
ports:
13+
- "${API_PORT:-8080}:8080"
14+
environment:
15+
- DATABASE_DSN=${DATABASE_DSN:-postgres://promptpipe:promptpipe@postgres:5432/promptpipe?sslmode=disable}
16+
- WHATSAPP_DB_DSN=${WHATSAPP_DB_DSN:-file:/data/whatsmeow.db?_foreign_keys=on}
17+
- OPENAI_API_KEY=${OPENAI_API_KEY:-}
18+
- API_ADDR=:8080
19+
- PROMPTPIPE_STATE_DIR=/data
20+
- PROMPTPIPE_DEBUG=${PROMPTPIPE_DEBUG:-false}
21+
- GENAI_MODEL=${GENAI_MODEL:-}
22+
- GENAI_TEMPERATURE=${GENAI_TEMPERATURE:-0.1}
23+
- AUTO_ENROLL_NEW_USERS=${AUTO_ENROLL_NEW_USERS:-false}
24+
- AUTO_FEEDBACK_AFTER_PROMPT_ENABLED=${AUTO_FEEDBACK_AFTER_PROMPT_ENABLED:-true}
25+
volumes:
26+
- promptpipe-data:/data
27+
depends_on:
28+
postgres:
29+
condition: service_healthy
30+
restart: unless-stopped
31+
32+
postgres:
33+
image: postgres:16-alpine
34+
environment:
35+
POSTGRES_USER: promptpipe
36+
POSTGRES_PASSWORD: promptpipe
37+
POSTGRES_DB: promptpipe
38+
volumes:
39+
- postgres-data:/var/lib/postgresql/data
40+
healthcheck:
41+
test: ["CMD-SHELL", "pg_isready -U promptpipe"]
42+
interval: 5s
43+
timeout: 5s
44+
retries: 5
45+
restart: unless-stopped
46+
47+
volumes:
48+
promptpipe-data:
49+
postgres-data:

docs/persistence-audit.md

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
# Persistence Audit – State Ledger
2+
3+
This document inventories all state that influences user-visible behavior and
4+
must survive a `docker compose down && docker compose up -d` cycle.
5+
6+
## State Items
7+
8+
| # | State Item | Owner (package) | Must Survive Restart? | Current Storage | Proposed Durable Representation | Idempotency / Dedup Strategy |
9+
|---|---|---|---|---|---|---|
10+
| 1 | **In-memory timers** (`SimpleTimer.timers` map) | `flow/timer.go` | Yes | In-memory map of `*timerEntry` keyed by timer ID | `jobs` table (`kind` + `run_at` + `payload_json`) | `dedupe_key` unique constraint; on restart, requeue stale running jobs |
11+
| 2 | **Recurring schedule timers** (daily prompt schedule) | `flow/timer.go` | Yes | In-memory via `time.AfterFunc` with self-rescheduling | `jobs` table with `kind=recurring_prompt`; after execution, enqueue next occurrence | `dedupe_key` per participant + schedule; only one active job per schedule |
12+
| 3 | **Daily prompt pending state** (`dailyPromptPendingState`) | `flow/scheduler_tool.go` | Yes | Persisted in `flow_states.state_data` as JSON (key `daily_prompt_pending`) | Keep in `flow_states`; reminder becomes a `jobs` row instead of `time.AfterFunc` | Check pending state before sending reminder; clear after send |
13+
| 4 | **Pending state transition timer** (delayed `transition_state`) | `flow/state_transition_tool.go` | Yes | Timer ID stored in `flow_states.state_data`, actual timer in memory | `jobs` table with `kind=state_transition` | `dedupe_key` per participant + flow type; cancel = mark job canceled |
14+
| 5 | **Feedback follow-up timer** | `flow/feedback_module.go` | Yes | In-memory timer via `SimpleTimer` | `jobs` table with `kind=feedback_followup` | `dedupe_key` per participant; skip if participant state changed |
15+
| 6 | **Feedback initial timeout timer** | `flow/feedback_module.go` | Yes | In-memory timer via `SimpleTimer` | `jobs` table with `kind=feedback_timeout` | `dedupe_key` per participant; skip if already responded |
16+
| 7 | **LID-by-phone cache** (`lidByPhone` map) | `messaging/whatsapp_service.go` | No | In-memory map | Remains in-memory; repopulated on first message from each contact | N/A – cache miss just means first send uses phone number directly |
17+
| 8 | **Outgoing WhatsApp messages** (sent via `SendMessage`) | `messaging/whatsapp_service.go` | Yes | Fire-and-forget; receipt stored in `receipts` table | `outbox_messages` table; flow enqueues, sender worker delivers | `dedupe_key` prevents duplicate sends on restart |
18+
| 9 | **Inbound WhatsApp messages** (received via event handler) | `messaging/whatsapp_service.go` | Yes (dedup) | Processed immediately; response stored in `responses` table | `inbound_dedup` table keyed by WhatsApp message ID | Insert-or-skip on message ID; prevents reprocessing after restart |
19+
| 10 | **Flow state** (`flow_states` table) | `store/`, `flow/state_manager.go` | Yes | SQLite/Postgres `flow_states` table | Already persisted | N/A – already durable |
20+
| 11 | **Conversation participants** | `store/` | Yes | SQLite/Postgres `conversation_participants` table | Already persisted | N/A – already durable |
21+
| 12 | **Registered hooks** | `store/` | Yes | SQLite/Postgres `registered_hooks` table | Already persisted | N/A – already durable |
22+
| 13 | **Receipts / Responses** | `store/` | Yes | SQLite/Postgres tables | Already persisted | N/A – already durable |
23+
24+
## New Tables
25+
26+
### `jobs`
27+
28+
Replaces all in-memory `time.AfterFunc` timers with durable, restart-safe job records.
29+
30+
| Column | Type | Description |
31+
|---|---|---|
32+
| `id` | TEXT (UUID) | Primary key |
33+
| `kind` | TEXT | Job type (e.g., `recurring_prompt`, `state_transition`, `feedback_followup`, `daily_prompt_reminder`) |
34+
| `run_at` | TIMESTAMP | When the job should execute |
35+
| `payload_json` | TEXT/JSON | Job-specific parameters |
36+
| `status` | TEXT | `queued`, `running`, `done`, `failed`, `canceled` |
37+
| `attempt` | INTEGER | Current attempt number |
38+
| `max_attempts` | INTEGER | Maximum retry attempts |
39+
| `last_error` | TEXT | Last error message (nullable) |
40+
| `locked_at` | TIMESTAMP | When a worker claimed this job (nullable) |
41+
| `dedupe_key` | TEXT | Unique constraint for preventing duplicates (nullable) |
42+
| `created_at` | TIMESTAMP | Row creation time |
43+
| `updated_at` | TIMESTAMP | Last update time |
44+
45+
### `outbox_messages`
46+
47+
Ensures outgoing WhatsApp sends are restart-safe and idempotent.
48+
49+
| Column | Type | Description |
50+
|---|---|---|
51+
| `id` | TEXT (UUID) | Primary key |
52+
| `participant_id` | TEXT | Target participant |
53+
| `kind` | TEXT | Message type (e.g., `prompt`, `reminder`, `feedback_followup`) |
54+
| `payload_json` | TEXT/JSON | Message content and metadata |
55+
| `status` | TEXT | `queued`, `sending`, `sent`, `failed` |
56+
| `attempts` | INTEGER | Send attempt count |
57+
| `next_attempt_at` | TIMESTAMP | When to retry (nullable) |
58+
| `dedupe_key` | TEXT | Unique constraint for preventing duplicate sends |
59+
| `locked_at` | TIMESTAMP | When a worker claimed this message (nullable) |
60+
| `last_error` | TEXT | Last error message (nullable) |
61+
| `created_at` | TIMESTAMP | Row creation time |
62+
| `updated_at` | TIMESTAMP | Last update time |
63+
64+
### `inbound_dedup`
65+
66+
Prevents reprocessing of inbound messages after restart.
67+
68+
| Column | Type | Description |
69+
|---|---|---|
70+
| `message_id` | TEXT | Primary key – stable WhatsApp message ID |
71+
| `participant_id` | TEXT | Sender identifier |
72+
| `received_at` | TIMESTAMP | When the message was first seen |
73+
| `processed_at` | TIMESTAMP | When processing completed (nullable) |
74+
75+
## Invariants
76+
77+
1. Any state transition that implies future work must, in ONE DB transaction:
78+
- Update `flow_states`
79+
- Enqueue `jobs` and/or `outbox_messages`
80+
2. Timers are never used for durable behavior – they become jobs.
81+
3. `dedupe_key` constraints ensure restarts and retries do not duplicate work.
82+
4. On startup: requeue stale `running` jobs/outbox (locked_at older than threshold).

docs/state-persistence-recovery.md

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,46 @@ CREATE TABLE conversation_participants (...);
101101
4. **Maintainable**: Business logic separated from infrastructure concerns
102102
5. **No Breaking Changes**: Uses existing database schema
103103

104+
## Durable Jobs and Outbox (Persistence Hardening)
105+
106+
In addition to the recovery system above, the persistence layer includes:
107+
108+
### Durable Jobs (`internal/store/job_repo.go`, `job_runner.go`)
109+
110+
All time-based behavior that must survive restarts is represented as **persisted job records** in the `jobs` table, replacing in-memory `time.AfterFunc` calls:
111+
112+
- **`JobRepo`**: Interface for enqueueing, claiming, completing, failing, and canceling jobs. Implementations exist for both SQLite and Postgres.
113+
- **`JobRunner`**: Background worker that polls for due jobs and dispatches them to registered handlers. Supports exponential backoff on failure and crash recovery via `RequeueStaleRunningJobs()`.
114+
- **Dedupe keys**: Prevent duplicate jobs for the same intent (e.g., one scheduled prompt per participant per schedule).
115+
116+
### Outbox Messages (`internal/store/outbox_repo.go`, `outbox_sender.go`)
117+
118+
Outgoing WhatsApp messages are persisted to the `outbox_messages` table before being sent:
119+
120+
- **`OutboxRepo`**: Interface for enqueueing and managing outbound messages. Dedupe keys prevent duplicate sends.
121+
- **`OutboxSender`**: Background worker that claims due messages and sends them via the messaging service. Supports retry with backoff.
122+
- On startup, stale `sending` messages are requeued.
123+
124+
### Inbound Dedup (`internal/store/dedup_repo.go`)
125+
126+
The `inbound_dedup` table prevents reprocessing of inbound WhatsApp messages after restart:
127+
128+
- **`DedupRepo`**: Interface for recording and checking message IDs.
129+
- On message arrival, the system checks if the message ID was already processed and skips if so.
130+
131+
### Startup Reconciliation
132+
133+
On startup:
134+
1. Stale `running` jobs are requeued (`RequeueStaleRunningJobs`)
135+
2. Stale `sending` outbox messages are requeued (`RequeueStaleSendingMessages`)
136+
3. Existing recovery (response handlers, scheduler reminders) continues as before
137+
138+
### Docker Compose Persistence
139+
140+
The `docker-compose.yml` uses named volumes for both Postgres data and the PromptPipe state directory. `docker compose down && docker compose up -d` preserves all state. Only `docker compose down -v` destroys volumes.
141+
142+
See `docs/persistence-audit.md` for the complete state ledger.
143+
104144
## Integration Example
105145

106146
```go

docs/storage.md

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,12 +50,37 @@ Tables used by the current Go service:
5050
| `flow_states` | Flow state + JSON state data |
5151
| `conversation_participants` | Enrolled conversation participants |
5252
| `registered_hooks` | Persisted response hooks |
53+
| `jobs` | Durable job queue (replaces in-memory timers) |
54+
| `outbox_messages` | Restart-safe outgoing message queue |
55+
| `inbound_dedup` | Inbound message deduplication |
5356

5457
**Legacy tables** (present in migrations but unused by current code):
5558

5659
- `intervention_participants`
5760
- `intervention_responses`
5861

62+
## Durable jobs (`jobs` table)
63+
64+
All time-based behavior (scheduled prompts, feedback timeouts, state transitions) is represented as persisted job records instead of in-memory timers. Each job has:
65+
66+
- `kind` – handler type (e.g., `recurring_prompt`, `state_transition`, `feedback_followup`)
67+
- `run_at` – when the job should execute
68+
- `payload_json` – job-specific parameters
69+
- `status` – lifecycle state (`queued``running``done`/`failed`/`canceled`)
70+
- `dedupe_key` – prevents duplicate jobs for the same intent
71+
72+
On startup, stale `running` jobs (locked before a configurable threshold) are requeued.
73+
74+
## Outbox messages (`outbox_messages` table)
75+
76+
Outgoing WhatsApp messages are first persisted to the outbox, then sent by a background worker. This ensures messages are not lost on crash. Each message has a `dedupe_key` to prevent duplicate sends.
77+
78+
## Inbound dedup (`inbound_dedup` table)
79+
80+
Inbound WhatsApp messages are deduplicated by message ID to prevent reprocessing after restart.
81+
82+
See `docs/persistence-audit.md` for a full state ledger.
83+
5984
## Flow state data
6085

6186
`flow_states.state_data` stores JSON (SQLite) or JSONB (Postgres) key/value pairs for conversation flow. Keys are defined in `internal/models/flow_types.go` (e.g., `conversationHistory`, `userProfile`, `scheduleRegistry`).

internal/store/dedup_postgres.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package store
2+
3+
import (
4+
"database/sql"
5+
"fmt"
6+
"time"
7+
)
8+
9+
// Compile-time check that PostgresStore implements DedupRepo.
10+
var _ DedupRepo = (*PostgresStore)(nil)
11+
12+
func (s *PostgresStore) IsDuplicate(messageID string) (bool, error) {
13+
var id string
14+
err := s.db.QueryRow(`SELECT message_id FROM inbound_dedup WHERE message_id = $1`, messageID).Scan(&id)
15+
if err == sql.ErrNoRows {
16+
return false, nil
17+
}
18+
if err != nil {
19+
return false, fmt.Errorf("dedup check failed: %w", err)
20+
}
21+
return true, nil
22+
}
23+
24+
func (s *PostgresStore) RecordInbound(messageID, participantID string) (bool, error) {
25+
now := time.Now()
26+
result, err := s.db.Exec(
27+
`INSERT INTO inbound_dedup (message_id, participant_id, received_at) VALUES ($1, $2, $3) ON CONFLICT (message_id) DO NOTHING`,
28+
messageID, participantID, now,
29+
)
30+
if err != nil {
31+
return false, fmt.Errorf("record inbound failed: %w", err)
32+
}
33+
34+
n, err := result.RowsAffected()
35+
if err != nil {
36+
return false, fmt.Errorf("dedup rows affected check failed: %w", err)
37+
}
38+
return n > 0, nil
39+
}
40+
41+
func (s *PostgresStore) MarkProcessed(messageID string) error {
42+
now := time.Now()
43+
_, err := s.db.Exec(
44+
`UPDATE inbound_dedup SET processed_at = $1 WHERE message_id = $2`,
45+
now, messageID,
46+
)
47+
if err != nil {
48+
return fmt.Errorf("mark processed failed: %w", err)
49+
}
50+
return nil
51+
}

internal/store/dedup_repo.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
// Package store provides the DedupRepo interface for inbound message deduplication.
2+
package store
3+
4+
import (
5+
"time"
6+
)
7+
8+
// DedupRecord represents an inbound message deduplication record.
9+
type DedupRecord struct {
10+
MessageID string `json:"message_id"`
11+
ParticipantID string `json:"participant_id"`
12+
ReceivedAt time.Time `json:"received_at"`
13+
ProcessedAt *time.Time `json:"processed_at"`
14+
}
15+
16+
// DedupRepo defines the interface for inbound message deduplication.
17+
type DedupRepo interface {
18+
// IsDuplicate checks if a message ID has already been processed.
19+
// Returns true if the message was already seen.
20+
IsDuplicate(messageID string) (bool, error)
21+
22+
// RecordInbound inserts a new inbound message record. Returns false if the
23+
// message was already recorded (duplicate).
24+
RecordInbound(messageID, participantID string) (bool, error)
25+
26+
// MarkProcessed sets the processed_at timestamp for a message.
27+
MarkProcessed(messageID string) error
28+
}

internal/store/dedup_sqlite.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package store
2+
3+
import (
4+
"database/sql"
5+
"fmt"
6+
"time"
7+
)
8+
9+
// Compile-time check that SQLiteStore implements DedupRepo.
10+
var _ DedupRepo = (*SQLiteStore)(nil)
11+
12+
func (s *SQLiteStore) IsDuplicate(messageID string) (bool, error) {
13+
var id string
14+
err := s.db.QueryRow(`SELECT message_id FROM inbound_dedup WHERE message_id = ?`, messageID).Scan(&id)
15+
if err == sql.ErrNoRows {
16+
return false, nil
17+
}
18+
if err != nil {
19+
return false, fmt.Errorf("dedup check failed: %w", err)
20+
}
21+
return true, nil
22+
}
23+
24+
func (s *SQLiteStore) RecordInbound(messageID, participantID string) (bool, error) {
25+
// First check if it already exists
26+
exists, err := s.IsDuplicate(messageID)
27+
if err != nil {
28+
return false, err
29+
}
30+
if exists {
31+
return false, nil
32+
}
33+
34+
now := time.Now()
35+
_, err = s.db.Exec(
36+
`INSERT OR IGNORE INTO inbound_dedup (message_id, participant_id, received_at) VALUES (?, ?, ?)`,
37+
messageID, participantID, now,
38+
)
39+
if err != nil {
40+
return false, fmt.Errorf("record inbound failed: %w", err)
41+
}
42+
return true, nil
43+
}
44+
45+
func (s *SQLiteStore) MarkProcessed(messageID string) error {
46+
now := time.Now()
47+
_, err := s.db.Exec(
48+
`UPDATE inbound_dedup SET processed_at = ? WHERE message_id = ?`,
49+
now, messageID,
50+
)
51+
if err != nil {
52+
return fmt.Errorf("mark processed failed: %w", err)
53+
}
54+
return nil
55+
}

0 commit comments

Comments
 (0)