Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 26 additions & 3 deletions backend/internal/review/review.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,12 +184,35 @@ func (e *Engine) Trigger(ctx context.Context, workerID domain.SessionID) (Trigge
return TriggerResult{}, err
}

// Idempotency: return a non-failed pass as-is. Failed passes stay visible
// but can be retried after the user fixes the underlying issue.
// Idempotency: a commit only counts as reviewed once a pass actually
// finished — Status Complete, which only Submit sets, and only after
// recording a verdict. Any non-Complete pass is retryable:
// - A Failed pass (launch error, or one superseded below) falls through to
// a fresh retry.
// - A Running pass may have been interrupted (its execution stopped, or
// its pane killed) without ever calling Submit, and pane liveness can't
// distinguish "still working" from "pane open, work stopped." So it's
// superseded on retry: marked Failed, then a fresh pass starts below —
// reusing the pane via Notify if it's still alive, or spawning a new one
// if not (#342).
if existing, ok, err := e.store.GetReviewRunBySessionAndSHA(ctx, workerID, targetSHA); err != nil {
return TriggerResult{}, err
} else if ok && existing.Status != domain.ReviewRunFailed {
} else if ok && existing.Status == domain.ReviewRunComplete {
return TriggerResult{Run: existing, ReviewerHandleID: review.ReviewerHandleID, Created: false}, nil
} else if ok && existing.Status == domain.ReviewRunRunning {

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Non-blocking, just flagging the behavior change: previously a repeat Trigger against a genuinely live, in-progress review (Running, pane alive, not yet submitted) was a pure no-op that returned the existing run. Now it always supersedes — the in-progress run is marked Failed ("superseded by a new review trigger") and a fresh run is created, re-Notifying the live pane. Since the only caller is the user-initiated POST /reviews/trigger button, this reads as "re-run the review," which is reasonable, and the test comments call it out. Worth confirming nothing auto-polls this endpoint, since on a poll loop it would keep killing an in-flight review before it can submit. From what I can see it's button-only, so this is fine.

superseded, err := e.store.UpdateReviewRunResult(ctx, existing.ID, domain.ReviewRunFailed, domain.VerdictNone, "superseded by a new review trigger")
if err != nil {
return TriggerResult{}, err
}
if !superseded {
// Lost the race to a concurrent Submit: that pass is now Complete, so
// re-read and return it instead of starting a redundant pass.
if latest, ok, err := e.store.GetReviewRun(ctx, existing.ID); err != nil {
return TriggerResult{}, err
} else if ok {
return TriggerResult{Run: latest, ReviewerHandleID: review.ReviewerHandleID, Created: false}, nil
}
}
}

harness, err := e.reviewerHarness(ctx, worker)
Expand Down
143 changes: 125 additions & 18 deletions backend/internal/review/review_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ type fakeStore struct {
// winner (so a follow-up GetReviewRunBySessionAndSHA finds it) and returns
// insertErr instead of recording the caller's run.
insertErr error
// beforeUpdate, when set, runs at the start of UpdateReviewRunResult. Tests
// use it to model a concurrent Submit landing on a run between the
// idempotency lookup and the supersede update (the CAS race).
beforeUpdate func(s *fakeStore, id string)
}

func (f *fakeStore) UpsertReview(_ context.Context, r domain.Review) error {
Expand All @@ -47,6 +51,9 @@ func (f *fakeStore) InsertReviewRun(_ context.Context, r domain.ReviewRun) error
return nil
}
func (f *fakeStore) UpdateReviewRunResult(_ context.Context, id string, status domain.ReviewRunStatus, verdict domain.ReviewVerdict, body string) (bool, error) {
if f.beforeUpdate != nil {
f.beforeUpdate(f, id)
}
for i := range f.runs {
if f.runs[i].ID == id {
if f.runs[i].Status != domain.ReviewRunRunning {
Expand Down Expand Up @@ -128,7 +135,9 @@ func (f *fakeLauncher) Notify(_ context.Context, handleID string, spec LaunchSpe
f.gotSpec = spec
return f.notifyErr
}
func (f *fakeLauncher) Alive(_ context.Context, _ string) (bool, error) { return f.alive, nil }
func (f *fakeLauncher) Alive(_ context.Context, _ string) (bool, error) {
return f.alive || f.spawned, nil
}

func liveWorker() domain.SessionRecord {
return domain.SessionRecord{
Expand Down Expand Up @@ -180,42 +189,37 @@ func TestTriggerSpawnsNewReviewerAndRecordsRunAfterLaunch(t *testing.T) {
}
}

func TestTriggerConcurrentSameWorkerSpawnsOnce(t *testing.T) {
func TestTriggerConcurrentSameWorkerNeverDoubleSpawns(t *testing.T) {
// Concurrent triggers for the same worker must never end up with two
// independent reviewer panes for the same commit (#242): the loser of
// each race either finds the pane already alive (Notify, not Spawn) or
// loses the unique-insert race outright. Verdict-gated idempotency (#342)
// means a run with no verdict yet is no longer trusted blindly, so repeat
// triggers may notify the live pane again instead of being a pure no-op —
// that's fine; only an actual second Spawn would be a regression.
store := &fakeStore{}
launcher := &fakeLauncher{handle: "review-mer-1"}
eng := newEngineForTest(store, fakeSessions{rec: liveWorker(), ok: true}, prAt("sha1"), fakeProjects{}, launcher)

const n = 8
var wg sync.WaitGroup
results := make([]TriggerResult, n)
errs := make([]error, n)
wg.Add(n)
for i := 0; i < n; i++ {
go func(i int) {
defer wg.Done()
results[i], errs[i] = eng.Trigger(context.Background(), "mer-1")
_, errs[i] = eng.Trigger(context.Background(), "mer-1")
}(i)
}
wg.Wait()

created := 0
for i := 0; i < n; i++ {
if errs[i] != nil {
t.Fatalf("Trigger[%d]: %v", i, errs[i])
}
if results[i].Created {
created++
}
}
// Exactly one trigger does the work; the rest reuse its run.
if created != 1 {
t.Errorf("Created=true count = %d, want exactly 1", created)
}
if launcher.spawnCount != 1 {
t.Errorf("reviewer spawn count = %d, want 1", launcher.spawnCount)
}
if len(store.runs) != 1 {
t.Errorf("recorded review runs = %d, want 1", len(store.runs))
t.Errorf("reviewer spawn count = %d, want exactly 1 (no double-spawn)", launcher.spawnCount)
}
}

Expand All @@ -242,11 +246,18 @@ func TestTriggerFallsBackToExistingRunOnUniqueConflict(t *testing.T) {
}

func TestTriggerIsIdempotentForSameCommit(t *testing.T) {
// A Complete pass is the only state that blocks re-review: Status Complete
// is set exclusively by Submit, after it records a verdict. The body is
// deliberately empty here — an approved review may carry none, yet a
// finished pass still blocks. The block is gated on Status, not the body.
store := &fakeStore{
review: &domain.Review{ID: "rev-1", SessionID: "mer-1", ReviewerHandleID: "review-mer-1"},
runs: []domain.ReviewRun{{ID: "run-1", SessionID: "mer-1", TargetSHA: "sha1", Status: domain.ReviewRunRunning}},
runs: []domain.ReviewRun{{
ID: "run-1", SessionID: "mer-1", TargetSHA: "sha1",
Status: domain.ReviewRunComplete, Verdict: domain.VerdictApproved, Body: "",
}},
}
launcher := &fakeLauncher{}
launcher := &fakeLauncher{alive: true}
eng := newEngineForTest(store, fakeSessions{rec: liveWorker(), ok: true}, prAt("sha1"), fakeProjects{}, launcher)

res, err := eng.Trigger(context.Background(), "mer-1")
Expand All @@ -264,6 +275,102 @@ func TestTriggerIsIdempotentForSameCommit(t *testing.T) {
}
}

func TestTriggerReturnsCompletedRunWhenSubmitWinsSupersedeRace(t *testing.T) {
// A retry finds a Running pass and moves to supersede it, but a concurrent
// Submit lands first and marks that pass Complete. The CAS supersede then
// no-ops (the row is no longer Running), so Trigger must re-read and return
// the now-Complete pass rather than starting a redundant one.
store := &fakeStore{
review: &domain.Review{ID: "rev-1", SessionID: "mer-1", ReviewerHandleID: "review-mer-1"},
runs: []domain.ReviewRun{{ID: "run-1", SessionID: "mer-1", TargetSHA: "sha1", Status: domain.ReviewRunRunning}},
}
// Model the concurrent Submit: the moment Trigger tries to supersede run-1,
// flip it Complete with a verdict, exactly as Submit would have.
store.beforeUpdate = func(s *fakeStore, id string) {
for i := range s.runs {
if s.runs[i].ID == id && s.runs[i].Status == domain.ReviewRunRunning {
s.runs[i].Status = domain.ReviewRunComplete
s.runs[i].Verdict = domain.VerdictApproved
}
}
}
launcher := &fakeLauncher{alive: true, handle: "review-mer-1"}
eng := newEngineForTest(store, fakeSessions{rec: liveWorker(), ok: true}, prAt("sha1"), fakeProjects{}, launcher)

res, err := eng.Trigger(context.Background(), "mer-1")
if err != nil {
t.Fatalf("Trigger: %v", err)
}
if res.Created || res.Run.ID != "run-1" || res.Run.Status != domain.ReviewRunComplete {
t.Fatalf("expected the Submit-completed run returned as-is, got %+v", res)
}
if launcher.spawned || launcher.notified {
t.Fatalf("should not start a redundant pass after losing to Submit: %+v", launcher)
}
if len(store.runs) != 1 {
t.Fatalf("should not insert another run: %+v", store.runs)
}
}

func TestTriggerRespawnsWhenRunningRowHasNoVerdict(t *testing.T) {
// A prior pass for this commit never produced a verdict — its execution
// may have been stopped (e.g. the user killed the terminal, or just
// interrupted the agent without killing the pane) without ever calling
// Submit. Pane liveness alone can't tell those apart, so Status=Running
// with no verdict is never trusted blindly: the stale row is marked
// Failed and a fresh pass starts, reusing the pane if it's still alive
// (Notify) and spawning a new one only if it's not (#342).
store := &fakeStore{
review: &domain.Review{ID: "rev-1", SessionID: "mer-1", ReviewerHandleID: "review-mer-1"},
runs: []domain.ReviewRun{{ID: "run-1", SessionID: "mer-1", TargetSHA: "sha1", Status: domain.ReviewRunRunning}},
}
launcher := &fakeLauncher{alive: false, handle: "review-mer-2"}
eng := newEngineForTest(store, fakeSessions{rec: liveWorker(), ok: true}, prAt("sha1"), fakeProjects{}, launcher)

res, err := eng.Trigger(context.Background(), "mer-1")
if err != nil {
t.Fatalf("Trigger: %v", err)
}
if !res.Created {
t.Fatalf("expected a fresh pass when the prior run has no verdict: %+v", res)
}
if !launcher.spawned {
t.Fatalf("expected a fresh spawn since the old pane is dead: %+v", launcher)
}
stale := store.runs[0]
if stale.ID != "run-1" || stale.Status != domain.ReviewRunFailed {
t.Fatalf("expected stale run-1 marked Failed, got %+v", stale)
}
}

func TestTriggerNotifiesLiveReviewerWhenRunningRowHasNoVerdict(t *testing.T) {
// Same as above, but the pane is still alive (e.g. the user only
// interrupted the agent's work without killing the terminal/pane). The
// stale row is still superseded, but the live pane is re-notified instead
// of spawning a redundant second pane.
store := &fakeStore{
review: &domain.Review{ID: "rev-1", SessionID: "mer-1", ReviewerHandleID: "review-mer-1"},
runs: []domain.ReviewRun{{ID: "run-1", SessionID: "mer-1", TargetSHA: "sha1", Status: domain.ReviewRunRunning}},
}
launcher := &fakeLauncher{alive: true, handle: "review-mer-1"}
eng := newEngineForTest(store, fakeSessions{rec: liveWorker(), ok: true}, prAt("sha1"), fakeProjects{}, launcher)

res, err := eng.Trigger(context.Background(), "mer-1")
if err != nil {
t.Fatalf("Trigger: %v", err)
}
if !res.Created {
t.Fatalf("expected a fresh pass even though the pane is reused: %+v", res)
}
if !launcher.notified || launcher.spawned {
t.Fatalf("expected notify on the still-alive pane, not a spawn: %+v", launcher)
}
stale := store.runs[0]
if stale.ID != "run-1" || stale.Status != domain.ReviewRunFailed {
t.Fatalf("expected stale run-1 marked Failed, got %+v", stale)
}
}

func TestTriggerNotifiesLiveReviewerOnNewCommit(t *testing.T) {
store := &fakeStore{
review: &domain.Review{ID: "rev-1", SessionID: "mer-1", ReviewerHandleID: "review-mer-1"},
Expand Down
Loading