Skip to content

Commit c9829c7

Browse files
feat(store): add jobs, outbox, and inbound dedup tables with repos and workers
- Add migrations for Postgres and SQLite (jobs, outbox_messages, inbound_dedup) - Implement JobRepo interface with SQLite and Postgres implementations - Implement OutboxRepo interface with SQLite and Postgres implementations - Implement DedupRepo interface with SQLite and Postgres implementations - Add JobRunner worker with handler registry and exponential backoff - Add OutboxSender worker with retry and crash recovery - Add comprehensive tests for all new repos and workers - Create docs/persistence-audit.md state ledger Co-authored-by: MinecraftFuns <25814618+MinecraftFuns@users.noreply.github.com>
1 parent 4a0bc89 commit c9829c7

15 files changed

Lines changed: 1747 additions & 0 deletions

docs/persistence-audit.md

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
# Persistence Audit – State Ledger
2+
3+
This document inventories all state that influences user-visible behavior and
4+
must survive a `docker compose down && docker compose up -d` cycle.
5+
6+
## State Items
7+
8+
| # | State Item | Owner (package) | Must Survive Restart? | Current Storage | Proposed Durable Representation | Idempotency / Dedup Strategy |
9+
|---|---|---|---|---|---|---|
10+
| 1 | **In-memory timers** (`SimpleTimer.timers` map) | `flow/timer.go` | Yes | In-memory map of `*timerEntry` keyed by timer ID | `jobs` table (`kind` + `run_at` + `payload_json`) | `dedupe_key` unique constraint; on restart, requeue stale running jobs |
11+
| 2 | **Recurring schedule timers** (daily prompt schedule) | `flow/timer.go` | Yes | In-memory via `time.AfterFunc` with self-rescheduling | `jobs` table with `kind=recurring_prompt`; after execution, enqueue next occurrence | `dedupe_key` per participant + schedule; only one active job per schedule |
12+
| 3 | **Daily prompt pending state** (`dailyPromptPendingState`) | `flow/scheduler_tool.go` | Yes | Persisted in `flow_states.state_data` as JSON (key `daily_prompt_pending`) | Keep in `flow_states`; reminder becomes a `jobs` row instead of `time.AfterFunc` | Check pending state before sending reminder; clear after send |
13+
| 4 | **Pending state transition timer** (delayed `transition_state`) | `flow/state_transition_tool.go` | Yes | Timer ID stored in `flow_states.state_data`, actual timer in memory | `jobs` table with `kind=state_transition` | `dedupe_key` per participant + flow type; cancel = mark job canceled |
14+
| 5 | **Feedback follow-up timer** | `flow/feedback_module.go` | Yes | In-memory timer via `SimpleTimer` | `jobs` table with `kind=feedback_followup` | `dedupe_key` per participant; skip if participant state changed |
15+
| 6 | **Feedback initial timeout timer** | `flow/feedback_module.go` | Yes | In-memory timer via `SimpleTimer` | `jobs` table with `kind=feedback_timeout` | `dedupe_key` per participant; skip if already responded |
16+
| 7 | **LID-by-phone cache** (`lidByPhone` map) | `messaging/whatsapp_service.go` | No | In-memory map | Remains in-memory; repopulated on first message from each contact | N/A – cache miss just means first send uses phone number directly |
17+
| 8 | **Outgoing WhatsApp messages** (sent via `SendMessage`) | `messaging/whatsapp_service.go` | Yes | Fire-and-forget; receipt stored in `receipts` table | `outbox_messages` table; flow enqueues, sender worker delivers | `dedupe_key` prevents duplicate sends on restart |
18+
| 9 | **Inbound WhatsApp messages** (received via event handler) | `messaging/whatsapp_service.go` | Yes (dedup) | Processed immediately; response stored in `responses` table | `inbound_dedup` table keyed by WhatsApp message ID | Insert-or-skip on message ID; prevents reprocessing after restart |
19+
| 10 | **Flow state** (`flow_states` table) | `store/`, `flow/state_manager.go` | Yes | SQLite/Postgres `flow_states` table | Already persisted | N/A – already durable |
20+
| 11 | **Conversation participants** | `store/` | Yes | SQLite/Postgres `conversation_participants` table | Already persisted | N/A – already durable |
21+
| 12 | **Registered hooks** | `store/` | Yes | SQLite/Postgres `registered_hooks` table | Already persisted | N/A – already durable |
22+
| 13 | **Receipts / Responses** | `store/` | Yes | SQLite/Postgres tables | Already persisted | N/A – already durable |
23+
24+
## New Tables
25+
26+
### `jobs`
27+
28+
Replaces all in-memory `time.AfterFunc` timers with durable, restart-safe job records.
29+
30+
| Column | Type | Description |
31+
|---|---|---|
32+
| `id` | TEXT (UUID) | Primary key |
33+
| `kind` | TEXT | Job type (e.g., `recurring_prompt`, `state_transition`, `feedback_followup`, `daily_prompt_reminder`) |
34+
| `run_at` | TIMESTAMP | When the job should execute |
35+
| `payload_json` | TEXT/JSON | Job-specific parameters |
36+
| `status` | TEXT | `queued`, `running`, `done`, `failed`, `canceled` |
37+
| `attempt` | INTEGER | Current attempt number |
38+
| `max_attempts` | INTEGER | Maximum retry attempts |
39+
| `last_error` | TEXT | Last error message (nullable) |
40+
| `locked_at` | TIMESTAMP | When a worker claimed this job (nullable) |
41+
| `dedupe_key` | TEXT | Unique constraint for preventing duplicates (nullable) |
42+
| `created_at` | TIMESTAMP | Row creation time |
43+
| `updated_at` | TIMESTAMP | Last update time |
44+
45+
### `outbox_messages`
46+
47+
Ensures outgoing WhatsApp sends are restart-safe and idempotent.
48+
49+
| Column | Type | Description |
50+
|---|---|---|
51+
| `id` | TEXT (UUID) | Primary key |
52+
| `participant_id` | TEXT | Target participant |
53+
| `kind` | TEXT | Message type (e.g., `prompt`, `reminder`, `feedback_followup`) |
54+
| `payload_json` | TEXT/JSON | Message content and metadata |
55+
| `status` | TEXT | `queued`, `sending`, `sent`, `failed` |
56+
| `attempts` | INTEGER | Send attempt count |
57+
| `next_attempt_at` | TIMESTAMP | When to retry (nullable) |
58+
| `dedupe_key` | TEXT | Unique constraint for preventing duplicate sends |
59+
| `locked_at` | TIMESTAMP | When a worker claimed this message (nullable) |
60+
| `last_error` | TEXT | Last error message (nullable) |
61+
| `created_at` | TIMESTAMP | Row creation time |
62+
| `updated_at` | TIMESTAMP | Last update time |
63+
64+
### `inbound_dedup`
65+
66+
Prevents reprocessing of inbound messages after restart.
67+
68+
| Column | Type | Description |
69+
|---|---|---|
70+
| `message_id` | TEXT | Primary key – stable WhatsApp message ID |
71+
| `participant_id` | TEXT | Sender identifier |
72+
| `received_at` | TIMESTAMP | When the message was first seen |
73+
| `processed_at` | TIMESTAMP | When processing completed (nullable) |
74+
75+
## Invariants
76+
77+
1. Any state transition that implies future work must, in ONE DB transaction:
78+
- Update `flow_states`
79+
- Enqueue `jobs` and/or `outbox_messages`
80+
2. Timers are never used for durable behavior – they become jobs.
81+
3. `dedupe_key` constraints ensure restarts and retries do not duplicate work.
82+
4. On startup: requeue stale `running` jobs/outbox (locked_at older than threshold).

internal/store/dedup_postgres.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package store
2+
3+
import (
4+
"database/sql"
5+
"fmt"
6+
"time"
7+
)
8+
9+
// Compile-time check that PostgresStore implements DedupRepo.
10+
var _ DedupRepo = (*PostgresStore)(nil)
11+
12+
func (s *PostgresStore) IsDuplicate(messageID string) (bool, error) {
13+
var id string
14+
err := s.db.QueryRow(`SELECT message_id FROM inbound_dedup WHERE message_id = $1`, messageID).Scan(&id)
15+
if err == sql.ErrNoRows {
16+
return false, nil
17+
}
18+
if err != nil {
19+
return false, fmt.Errorf("dedup check failed: %w", err)
20+
}
21+
return true, nil
22+
}
23+
24+
func (s *PostgresStore) RecordInbound(messageID, participantID string) (bool, error) {
25+
now := time.Now()
26+
result, err := s.db.Exec(
27+
`INSERT INTO inbound_dedup (message_id, participant_id, received_at) VALUES ($1, $2, $3) ON CONFLICT (message_id) DO NOTHING`,
28+
messageID, participantID, now,
29+
)
30+
if err != nil {
31+
return false, fmt.Errorf("record inbound failed: %w", err)
32+
}
33+
34+
n, err := result.RowsAffected()
35+
if err != nil {
36+
return false, fmt.Errorf("dedup rows affected check failed: %w", err)
37+
}
38+
return n > 0, nil
39+
}
40+
41+
func (s *PostgresStore) MarkProcessed(messageID string) error {
42+
now := time.Now()
43+
_, err := s.db.Exec(
44+
`UPDATE inbound_dedup SET processed_at = $1 WHERE message_id = $2`,
45+
now, messageID,
46+
)
47+
if err != nil {
48+
return fmt.Errorf("mark processed failed: %w", err)
49+
}
50+
return nil
51+
}

internal/store/dedup_repo.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
// Package store provides the DedupRepo interface for inbound message deduplication.
2+
package store
3+
4+
import (
5+
"time"
6+
)
7+
8+
// DedupRecord represents an inbound message deduplication record.
9+
type DedupRecord struct {
10+
MessageID string `json:"message_id"`
11+
ParticipantID string `json:"participant_id"`
12+
ReceivedAt time.Time `json:"received_at"`
13+
ProcessedAt *time.Time `json:"processed_at"`
14+
}
15+
16+
// DedupRepo defines the interface for inbound message deduplication.
17+
type DedupRepo interface {
18+
// IsDuplicate checks if a message ID has already been processed.
19+
// Returns true if the message was already seen.
20+
IsDuplicate(messageID string) (bool, error)
21+
22+
// RecordInbound inserts a new inbound message record. Returns false if the
23+
// message was already recorded (duplicate).
24+
RecordInbound(messageID, participantID string) (bool, error)
25+
26+
// MarkProcessed sets the processed_at timestamp for a message.
27+
MarkProcessed(messageID string) error
28+
}

internal/store/dedup_sqlite.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package store
2+
3+
import (
4+
"database/sql"
5+
"fmt"
6+
"time"
7+
)
8+
9+
// Compile-time check that SQLiteStore implements DedupRepo.
10+
var _ DedupRepo = (*SQLiteStore)(nil)
11+
12+
func (s *SQLiteStore) IsDuplicate(messageID string) (bool, error) {
13+
var id string
14+
err := s.db.QueryRow(`SELECT message_id FROM inbound_dedup WHERE message_id = ?`, messageID).Scan(&id)
15+
if err == sql.ErrNoRows {
16+
return false, nil
17+
}
18+
if err != nil {
19+
return false, fmt.Errorf("dedup check failed: %w", err)
20+
}
21+
return true, nil
22+
}
23+
24+
func (s *SQLiteStore) RecordInbound(messageID, participantID string) (bool, error) {
25+
// First check if it already exists
26+
exists, err := s.IsDuplicate(messageID)
27+
if err != nil {
28+
return false, err
29+
}
30+
if exists {
31+
return false, nil
32+
}
33+
34+
now := time.Now()
35+
_, err = s.db.Exec(
36+
`INSERT OR IGNORE INTO inbound_dedup (message_id, participant_id, received_at) VALUES (?, ?, ?)`,
37+
messageID, participantID, now,
38+
)
39+
if err != nil {
40+
return false, fmt.Errorf("record inbound failed: %w", err)
41+
}
42+
return true, nil
43+
}
44+
45+
func (s *SQLiteStore) MarkProcessed(messageID string) error {
46+
now := time.Now()
47+
_, err := s.db.Exec(
48+
`UPDATE inbound_dedup SET processed_at = ? WHERE message_id = ?`,
49+
now, messageID,
50+
)
51+
if err != nil {
52+
return fmt.Errorf("mark processed failed: %w", err)
53+
}
54+
return nil
55+
}

0 commit comments

Comments
 (0)