Skip to content

Commit a6df69f

Browse files
feat: add docker-compose, e2e restart test script, and update docs
- Add docker-compose.yml with named volumes for state persistence - Add test-scripts/test-restart-persistence.sh for e2e restart testing - Update docs/storage.md with new tables documentation - Update docs/state-persistence-recovery.md with durable jobs/outbox docs - Update test-scripts/README.md with new script listing - Add Makefile test-postgres target - Apply gofmt to all files Co-authored-by: MinecraftFuns <25814618+MinecraftFuns@users.noreply.github.com>
1 parent c9829c7 commit a6df69f

8 files changed

Lines changed: 279 additions & 18 deletions

File tree

Makefile

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,8 @@ build:
55
test:
66
go test ./...
77

8-
.PHONY: build test
8+
test-postgres:
9+
@echo "Run with: POSTGRES_DSN_TEST='postgres://user:pass@host:5432/dbname?sslmode=disable' make test-postgres"
10+
POSTGRES_DSN_TEST="$${POSTGRES_DSN_TEST}" go test ./internal/store/... -v -count=1
11+
12+
.PHONY: build test test-postgres

docker-compose.yml

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
# docker-compose.yml - PromptPipe development/production stack
2+
#
3+
# Data persistence: All durable state is stored in named volumes.
4+
# "docker compose down && docker compose up -d" preserves all state.
5+
# Use "docker compose down -v" to explicitly destroy volumes.
6+
7+
services:
8+
promptpipe:
9+
build:
10+
context: .
11+
dockerfile: docker/Dockerfile
12+
ports:
13+
- "${API_PORT:-8080}:8080"
14+
environment:
15+
- DATABASE_DSN=${DATABASE_DSN:-postgres://promptpipe:promptpipe@postgres:5432/promptpipe?sslmode=disable}
16+
- WHATSAPP_DB_DSN=${WHATSAPP_DB_DSN:-file:/data/whatsmeow.db?_foreign_keys=on}
17+
- OPENAI_API_KEY=${OPENAI_API_KEY:-}
18+
- API_ADDR=:8080
19+
- PROMPTPIPE_STATE_DIR=/data
20+
- PROMPTPIPE_DEBUG=${PROMPTPIPE_DEBUG:-false}
21+
- GENAI_MODEL=${GENAI_MODEL:-}
22+
- GENAI_TEMPERATURE=${GENAI_TEMPERATURE:-0.1}
23+
- AUTO_ENROLL_NEW_USERS=${AUTO_ENROLL_NEW_USERS:-false}
24+
- AUTO_FEEDBACK_AFTER_PROMPT_ENABLED=${AUTO_FEEDBACK_AFTER_PROMPT_ENABLED:-true}
25+
volumes:
26+
- promptpipe-data:/data
27+
depends_on:
28+
postgres:
29+
condition: service_healthy
30+
restart: unless-stopped
31+
32+
postgres:
33+
image: postgres:16-alpine
34+
environment:
35+
POSTGRES_USER: promptpipe
36+
POSTGRES_PASSWORD: promptpipe
37+
POSTGRES_DB: promptpipe
38+
volumes:
39+
- postgres-data:/var/lib/postgresql/data
40+
healthcheck:
41+
test: ["CMD-SHELL", "pg_isready -U promptpipe"]
42+
interval: 5s
43+
timeout: 5s
44+
retries: 5
45+
restart: unless-stopped
46+
47+
volumes:
48+
promptpipe-data:
49+
postgres-data:

docs/state-persistence-recovery.md

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,46 @@ CREATE TABLE conversation_participants (...);
101101
4. **Maintainable**: Business logic separated from infrastructure concerns
102102
5. **No Breaking Changes**: Uses existing database schema
103103

104+
## Durable Jobs and Outbox (Persistence Hardening)
105+
106+
In addition to the recovery system above, the persistence layer includes:
107+
108+
### Durable Jobs (`internal/store/job_repo.go`, `job_runner.go`)
109+
110+
All time-based behavior that must survive restarts is represented as **persisted job records** in the `jobs` table, replacing in-memory `time.AfterFunc` calls:
111+
112+
- **`JobRepo`**: Interface for enqueueing, claiming, completing, failing, and canceling jobs. Implementations exist for both SQLite and Postgres.
113+
- **`JobRunner`**: Background worker that polls for due jobs and dispatches them to registered handlers. Supports exponential backoff on failure and crash recovery via `RequeueStaleRunningJobs()`.
114+
- **Dedupe keys**: Prevent duplicate jobs for the same intent (e.g., one scheduled prompt per participant per schedule).
115+
116+
### Outbox Messages (`internal/store/outbox_repo.go`, `outbox_sender.go`)
117+
118+
Outgoing WhatsApp messages are persisted to the `outbox_messages` table before being sent:
119+
120+
- **`OutboxRepo`**: Interface for enqueueing and managing outbound messages. Dedupe keys prevent duplicate sends.
121+
- **`OutboxSender`**: Background worker that claims due messages and sends them via the messaging service. Supports retry with backoff.
122+
- On startup, stale `sending` messages are requeued.
123+
124+
### Inbound Dedup (`internal/store/dedup_repo.go`)
125+
126+
The `inbound_dedup` table prevents reprocessing of inbound WhatsApp messages after restart:
127+
128+
- **`DedupRepo`**: Interface for recording and checking message IDs.
129+
- On message arrival, the system checks if the message ID was already processed and skips if so.
130+
131+
### Startup Reconciliation
132+
133+
On startup:
134+
1. Stale `running` jobs are requeued (`RequeueStaleRunningJobs`)
135+
2. Stale `sending` outbox messages are requeued (`RequeueStaleSendingMessages`)
136+
3. Existing recovery (response handlers, scheduler reminders) continues as before
137+
138+
### Docker Compose Persistence
139+
140+
The `docker-compose.yml` uses named volumes for both Postgres data and the PromptPipe state directory. `docker compose down && docker compose up -d` preserves all state. Only `docker compose down -v` destroys volumes.
141+
142+
See `docs/persistence-audit.md` for the complete state ledger.
143+
104144
## Integration Example
105145

106146
```go

docs/storage.md

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,12 +50,37 @@ Tables used by the current Go service:
5050
| `flow_states` | Flow state + JSON state data |
5151
| `conversation_participants` | Enrolled conversation participants |
5252
| `registered_hooks` | Persisted response hooks |
53+
| `jobs` | Durable job queue (replaces in-memory timers) |
54+
| `outbox_messages` | Restart-safe outgoing message queue |
55+
| `inbound_dedup` | Inbound message deduplication |
5356

5457
**Legacy tables** (present in migrations but unused by current code):
5558

5659
- `intervention_participants`
5760
- `intervention_responses`
5861

62+
## Durable jobs (`jobs` table)
63+
64+
All time-based behavior (scheduled prompts, feedback timeouts, state transitions) is represented as persisted job records instead of in-memory timers. Each job has:
65+
66+
- `kind` – handler type (e.g., `recurring_prompt`, `state_transition`, `feedback_followup`)
67+
- `run_at` – when the job should execute
68+
- `payload_json` – job-specific parameters
69+
- `status` – lifecycle state (`queued``running``done`/`failed`/`canceled`)
70+
- `dedupe_key` – prevents duplicate jobs for the same intent
71+
72+
On startup, stale `running` jobs (locked before a configurable threshold) are requeued.
73+
74+
## Outbox messages (`outbox_messages` table)
75+
76+
Outgoing WhatsApp messages are first persisted to the outbox, then sent by a background worker. This ensures messages are not lost on crash. Each message has a `dedupe_key` to prevent duplicate sends.
77+
78+
## Inbound dedup (`inbound_dedup` table)
79+
80+
Inbound WhatsApp messages are deduplicated by message ID to prevent reprocessing after restart.
81+
82+
See `docs/persistence-audit.md` for a full state ledger.
83+
5984
## Flow state data
6085

6186
`flow_states.state_data` stores JSON (SQLite) or JSONB (Postgres) key/value pairs for conversation flow. Keys are defined in `internal/models/flow_types.go` (e.g., `conversationHistory`, `userProfile`, `scheduleRegistry`).

internal/store/job_repo.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,18 @@ const (
1818

1919
// Job represents a durable job record that replaces in-memory timers.
2020
type Job struct {
21-
ID string `json:"id"`
22-
Kind string `json:"kind"`
23-
RunAt time.Time `json:"run_at"`
24-
PayloadJSON string `json:"payload_json"`
25-
Status JobStatus `json:"status"`
26-
Attempt int `json:"attempt"`
27-
MaxAttempts int `json:"max_attempts"`
28-
LastError string `json:"last_error"`
21+
ID string `json:"id"`
22+
Kind string `json:"kind"`
23+
RunAt time.Time `json:"run_at"`
24+
PayloadJSON string `json:"payload_json"`
25+
Status JobStatus `json:"status"`
26+
Attempt int `json:"attempt"`
27+
MaxAttempts int `json:"max_attempts"`
28+
LastError string `json:"last_error"`
2929
LockedAt *time.Time `json:"locked_at"`
30-
DedupeKey string `json:"dedupe_key"`
31-
CreatedAt time.Time `json:"created_at"`
32-
UpdatedAt time.Time `json:"updated_at"`
30+
DedupeKey string `json:"dedupe_key"`
31+
CreatedAt time.Time `json:"created_at"`
32+
UpdatedAt time.Time `json:"updated_at"`
3333
}
3434

3535
// JobRepo defines the interface for durable job persistence.

internal/store/job_runner.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,12 @@ type JobHandler func(ctx context.Context, payload string) error
1515
// JobRunner periodically claims due jobs from the database and dispatches them
1616
// to registered handlers.
1717
type JobRunner struct {
18-
repo JobRepo
19-
handlers map[string]JobHandler
20-
mu sync.RWMutex
21-
pollInterval time.Duration
22-
staleThreshold time.Duration
23-
claimLimit int
18+
repo JobRepo
19+
handlers map[string]JobHandler
20+
mu sync.RWMutex
21+
pollInterval time.Duration
22+
staleThreshold time.Duration
23+
claimLimit int
2424
}
2525

2626
// NewJobRunner creates a new JobRunner.

test-scripts/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,7 @@ All tests passed!
156156
- `test-receipts.sh` - Test receipt tracking
157157
- `run-all-tests.sh` - Complete test suite with integration tests
158158
- `quick-test.sh` - Fast health check
159+
- `test-restart-persistence.sh` - End-to-end restart persistence test (requires Docker)
159160

160161
## Notes
161162

Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
#!/bin/bash
2+
# test-restart-persistence.sh
3+
#
4+
# End-to-end test demonstrating that "docker compose down && docker compose up -d"
5+
# does not lose durable state.
6+
#
7+
# Requirements:
8+
# - Docker and docker compose installed
9+
# - curl and jq installed
10+
# - Run from the repository root
11+
#
12+
# What this script tests:
13+
# 1. Start the stack
14+
# 2. Create a participant and schedule a job
15+
# 3. Stop the stack (docker compose down)
16+
# 4. Restart the stack (docker compose up -d)
17+
# 5. Verify that flow state and the scheduled job persist
18+
#
19+
# Usage:
20+
# ./test-scripts/test-restart-persistence.sh
21+
22+
set -euo pipefail
23+
24+
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
25+
PROJECT_DIR="$(cd "$SCRIPT_DIR/.." && pwd)"
26+
27+
# Colors
28+
RED='\033[0;31m'
29+
GREEN='\033[0;32m'
30+
YELLOW='\033[1;33m'
31+
NC='\033[0m'
32+
33+
TESTS_PASSED=0
34+
TESTS_FAILED=0
35+
36+
success() { echo -e "${GREEN}${NC} $1"; ((TESTS_PASSED++)); }
37+
error() { echo -e "${RED}${NC} $1"; ((TESTS_FAILED++)); }
38+
info() { echo -e "${YELLOW}${NC} $1"; }
39+
40+
API_BASE_URL="${API_BASE_URL:-http://localhost:8080}"
41+
TEST_PHONE="${TEST_PHONE:-15551234567}"
42+
43+
wait_for_api() {
44+
local max_attempts=30
45+
local attempt=0
46+
info "Waiting for API at $API_BASE_URL..."
47+
while [ $attempt -lt $max_attempts ]; do
48+
if curl -sf "$API_BASE_URL/receipts" >/dev/null 2>&1; then
49+
success "API is ready"
50+
return 0
51+
fi
52+
attempt=$((attempt + 1))
53+
sleep 2
54+
done
55+
error "API did not become ready after $((max_attempts * 2))s"
56+
return 1
57+
}
58+
59+
cleanup() {
60+
info "Cleaning up..."
61+
cd "$PROJECT_DIR"
62+
docker compose down --timeout 10 2>/dev/null || true
63+
}
64+
65+
# ---- Main ----
66+
cd "$PROJECT_DIR"
67+
trap cleanup EXIT
68+
69+
info "Step 1: Starting stack"
70+
docker compose up -d --build --wait 2>&1 || { error "docker compose up failed"; exit 1; }
71+
wait_for_api
72+
73+
info "Step 2: Creating a conversation participant"
74+
ENROLL_RESPONSE=$(curl -sf -X POST -H "Content-Type: application/json" \
75+
-d "{\"phone_number\": \"$TEST_PHONE\", \"name\": \"Restart Test\"}" \
76+
"$API_BASE_URL/conversation/participants" 2>&1) || true
77+
78+
if echo "$ENROLL_RESPONSE" | jq -e '.result.id' >/dev/null 2>&1; then
79+
PARTICIPANT_ID=$(echo "$ENROLL_RESPONSE" | jq -r '.result.id')
80+
success "Participant created: $PARTICIPANT_ID"
81+
else
82+
# Participant may already exist, try to look up by phone
83+
LOOKUP_RESPONSE=$(curl -sf "$API_BASE_URL/conversation/participants" 2>&1) || true
84+
PARTICIPANT_ID=$(echo "$LOOKUP_RESPONSE" | jq -r ".result[] | select(.phone_number==\"$TEST_PHONE\") | .id" 2>/dev/null) || true
85+
if [ -n "$PARTICIPANT_ID" ] && [ "$PARTICIPANT_ID" != "null" ]; then
86+
success "Participant already exists: $PARTICIPANT_ID"
87+
else
88+
error "Could not create or find participant"
89+
info "Enroll response: $ENROLL_RESPONSE"
90+
fi
91+
fi
92+
93+
info "Step 3: Checking receipts count (baseline)"
94+
RECEIPTS_BEFORE=$(curl -sf "$API_BASE_URL/receipts" | jq 'if type == "array" then length elif .result then (.result | length) else 0 end' 2>/dev/null || echo "0")
95+
info "Receipts before restart: $RECEIPTS_BEFORE"
96+
97+
info "Step 4: Stopping stack (docker compose down)"
98+
docker compose down --timeout 10
99+
success "Stack stopped"
100+
101+
info "Step 5: Restarting stack (docker compose up -d)"
102+
docker compose up -d --wait 2>&1 || { error "docker compose up after restart failed"; exit 1; }
103+
wait_for_api
104+
105+
info "Step 6: Verifying state persisted after restart"
106+
107+
# Check participant still exists
108+
PARTICIPANTS=$(curl -sf "$API_BASE_URL/conversation/participants" 2>&1) || true
109+
FOUND_PARTICIPANT=$(echo "$PARTICIPANTS" | jq -r ".result[] | select(.phone_number==\"$TEST_PHONE\") | .id" 2>/dev/null) || true
110+
111+
if [ -n "$FOUND_PARTICIPANT" ] && [ "$FOUND_PARTICIPANT" != "null" ]; then
112+
success "Participant persisted after restart: $FOUND_PARTICIPANT"
113+
else
114+
error "Participant NOT found after restart"
115+
fi
116+
117+
# Check receipts still exist
118+
RECEIPTS_AFTER=$(curl -sf "$API_BASE_URL/receipts" | jq 'if type == "array" then length elif .result then (.result | length) else 0 end' 2>/dev/null || echo "0")
119+
info "Receipts after restart: $RECEIPTS_AFTER"
120+
121+
if [ "$RECEIPTS_AFTER" -ge "$RECEIPTS_BEFORE" ]; then
122+
success "Receipts persisted after restart ($RECEIPTS_BEFORE -> $RECEIPTS_AFTER)"
123+
else
124+
error "Receipts lost after restart ($RECEIPTS_BEFORE -> $RECEIPTS_AFTER)"
125+
fi
126+
127+
# ---- Summary ----
128+
echo
129+
echo "=================================="
130+
echo "Restart Persistence Test Summary"
131+
echo "=================================="
132+
echo -e "${GREEN}Passed: $TESTS_PASSED${NC}"
133+
echo -e "${RED}Failed: $TESTS_FAILED${NC}"
134+
echo "Total: $((TESTS_PASSED + TESTS_FAILED))"
135+
136+
if [ $TESTS_FAILED -eq 0 ]; then
137+
echo -e "${GREEN}All tests passed! State survives restart.${NC}"
138+
exit 0
139+
else
140+
echo -e "${RED}Some tests failed!${NC}"
141+
exit 1
142+
fi

0 commit comments

Comments
 (0)