diff --git a/backend/internal/review/review.go b/backend/internal/review/review.go index 4c8c812c..21bd1dac 100644 --- a/backend/internal/review/review.go +++ b/backend/internal/review/review.go @@ -188,12 +188,35 @@ func (e *Engine) Trigger(ctx context.Context, workerID domain.SessionID) (Trigge return TriggerResult{}, err } - // Idempotency: return a non-failed pass as-is. Failed passes stay visible - // but can be retried after the user fixes the underlying issue. + // Idempotency: a commit only counts as reviewed once a pass actually + // finished — Status Complete, which only Submit sets, and only after + // recording a verdict. Any non-Complete pass is retryable: + // - A Failed pass (launch error, or one superseded below) falls through to + // a fresh retry. + // - A Running pass may have been interrupted (its execution stopped, or + // its pane killed) without ever calling Submit, and pane liveness can't + // distinguish "still working" from "pane open, work stopped." So it's + // superseded on retry: marked Failed, then a fresh pass starts below — + // reusing the pane via Notify if it's still alive, or spawning a new one + // if not (#342). if existing, ok, err := e.store.GetReviewRunBySessionAndSHA(ctx, workerID, targetSHA); err != nil { return TriggerResult{}, err - } else if ok && existing.Status != domain.ReviewRunFailed { + } else if ok && existing.Status == domain.ReviewRunComplete { return TriggerResult{Run: existing, ReviewerHandleID: review.ReviewerHandleID, Created: false}, nil + } else if ok && existing.Status == domain.ReviewRunRunning { + superseded, err := e.store.UpdateReviewRunResult(ctx, existing.ID, domain.ReviewRunFailed, domain.VerdictNone, "superseded by a new review trigger", "") + if err != nil { + return TriggerResult{}, err + } + if !superseded { + // Lost the race to a concurrent Submit: that pass is now Complete, so + // re-read and return it instead of starting a redundant pass. + if latest, ok, err := e.store.GetReviewRun(ctx, existing.ID); err != nil { + return TriggerResult{}, err + } else if ok { + return TriggerResult{Run: latest, ReviewerHandleID: review.ReviewerHandleID, Created: false}, nil + } + } } harness, err := e.reviewerHarness(ctx, worker) diff --git a/backend/internal/review/review_test.go b/backend/internal/review/review_test.go index 54b0b8d1..4bd942c9 100644 --- a/backend/internal/review/review_test.go +++ b/backend/internal/review/review_test.go @@ -23,6 +23,10 @@ type fakeStore struct { // winner (so a follow-up GetReviewRunBySessionAndSHA finds it) and returns // insertErr instead of recording the caller's run. insertErr error + // beforeUpdate, when set, runs at the start of UpdateReviewRunResult. Tests + // use it to model a concurrent Submit landing on a run between the + // idempotency lookup and the supersede update (the CAS race). + beforeUpdate func(s *fakeStore, id string) } func (f *fakeStore) UpsertReview(_ context.Context, r domain.Review) error { @@ -47,6 +51,9 @@ func (f *fakeStore) InsertReviewRun(_ context.Context, r domain.ReviewRun) error return nil } func (f *fakeStore) UpdateReviewRunResult(_ context.Context, id string, status domain.ReviewRunStatus, verdict domain.ReviewVerdict, body, githubReviewID string) (bool, error) { + if f.beforeUpdate != nil { + f.beforeUpdate(f, id) + } for i := range f.runs { if f.runs[i].ID == id { if f.runs[i].Status != domain.ReviewRunRunning { @@ -129,7 +136,9 @@ func (f *fakeLauncher) Notify(_ context.Context, handleID string, spec LaunchSpe f.gotSpec = spec return f.notifyErr } -func (f *fakeLauncher) Alive(_ context.Context, _ string) (bool, error) { return f.alive, nil } +func (f *fakeLauncher) Alive(_ context.Context, _ string) (bool, error) { + return f.alive || f.spawned, nil +} type fakeMessenger struct { sends int @@ -195,42 +204,37 @@ func TestTriggerSpawnsNewReviewerAndRecordsRunAfterLaunch(t *testing.T) { } } -func TestTriggerConcurrentSameWorkerSpawnsOnce(t *testing.T) { +func TestTriggerConcurrentSameWorkerNeverDoubleSpawns(t *testing.T) { + // Concurrent triggers for the same worker must never end up with two + // independent reviewer panes for the same commit (#242): the loser of + // each race either finds the pane already alive (Notify, not Spawn) or + // loses the unique-insert race outright. Verdict-gated idempotency (#342) + // means a run with no verdict yet is no longer trusted blindly, so repeat + // triggers may notify the live pane again instead of being a pure no-op — + // that's fine; only an actual second Spawn would be a regression. store := &fakeStore{} launcher := &fakeLauncher{handle: "review-mer-1"} eng := newEngineForTest(store, fakeSessions{rec: liveWorker(), ok: true}, prAt("sha1"), fakeProjects{}, launcher) const n = 8 var wg sync.WaitGroup - results := make([]TriggerResult, n) errs := make([]error, n) wg.Add(n) for i := 0; i < n; i++ { go func(i int) { defer wg.Done() - results[i], errs[i] = eng.Trigger(context.Background(), "mer-1") + _, errs[i] = eng.Trigger(context.Background(), "mer-1") }(i) } wg.Wait() - created := 0 for i := 0; i < n; i++ { if errs[i] != nil { t.Fatalf("Trigger[%d]: %v", i, errs[i]) } - if results[i].Created { - created++ - } - } - // Exactly one trigger does the work; the rest reuse its run. - if created != 1 { - t.Errorf("Created=true count = %d, want exactly 1", created) } if launcher.spawnCount != 1 { - t.Errorf("reviewer spawn count = %d, want 1", launcher.spawnCount) - } - if len(store.runs) != 1 { - t.Errorf("recorded review runs = %d, want 1", len(store.runs)) + t.Errorf("reviewer spawn count = %d, want exactly 1 (no double-spawn)", launcher.spawnCount) } } @@ -257,11 +261,18 @@ func TestTriggerFallsBackToExistingRunOnUniqueConflict(t *testing.T) { } func TestTriggerIsIdempotentForSameCommit(t *testing.T) { + // A Complete pass is the only state that blocks re-review: Status Complete + // is set exclusively by Submit, after it records a verdict. The body is + // deliberately empty here — an approved review may carry none, yet a + // finished pass still blocks. The block is gated on Status, not the body. store := &fakeStore{ review: &domain.Review{ID: "rev-1", SessionID: "mer-1", ReviewerHandleID: "review-mer-1"}, - runs: []domain.ReviewRun{{ID: "run-1", SessionID: "mer-1", TargetSHA: "sha1", Status: domain.ReviewRunRunning}}, + runs: []domain.ReviewRun{{ + ID: "run-1", SessionID: "mer-1", TargetSHA: "sha1", + Status: domain.ReviewRunComplete, Verdict: domain.VerdictApproved, Body: "", + }}, } - launcher := &fakeLauncher{} + launcher := &fakeLauncher{alive: true} eng := newEngineForTest(store, fakeSessions{rec: liveWorker(), ok: true}, prAt("sha1"), fakeProjects{}, launcher) res, err := eng.Trigger(context.Background(), "mer-1") @@ -279,6 +290,102 @@ func TestTriggerIsIdempotentForSameCommit(t *testing.T) { } } +func TestTriggerReturnsCompletedRunWhenSubmitWinsSupersedeRace(t *testing.T) { + // A retry finds a Running pass and moves to supersede it, but a concurrent + // Submit lands first and marks that pass Complete. The CAS supersede then + // no-ops (the row is no longer Running), so Trigger must re-read and return + // the now-Complete pass rather than starting a redundant one. + store := &fakeStore{ + review: &domain.Review{ID: "rev-1", SessionID: "mer-1", ReviewerHandleID: "review-mer-1"}, + runs: []domain.ReviewRun{{ID: "run-1", SessionID: "mer-1", TargetSHA: "sha1", Status: domain.ReviewRunRunning}}, + } + // Model the concurrent Submit: the moment Trigger tries to supersede run-1, + // flip it Complete with a verdict, exactly as Submit would have. + store.beforeUpdate = func(s *fakeStore, id string) { + for i := range s.runs { + if s.runs[i].ID == id && s.runs[i].Status == domain.ReviewRunRunning { + s.runs[i].Status = domain.ReviewRunComplete + s.runs[i].Verdict = domain.VerdictApproved + } + } + } + launcher := &fakeLauncher{alive: true, handle: "review-mer-1"} + eng := newEngineForTest(store, fakeSessions{rec: liveWorker(), ok: true}, prAt("sha1"), fakeProjects{}, launcher) + + res, err := eng.Trigger(context.Background(), "mer-1") + if err != nil { + t.Fatalf("Trigger: %v", err) + } + if res.Created || res.Run.ID != "run-1" || res.Run.Status != domain.ReviewRunComplete { + t.Fatalf("expected the Submit-completed run returned as-is, got %+v", res) + } + if launcher.spawned || launcher.notified { + t.Fatalf("should not start a redundant pass after losing to Submit: %+v", launcher) + } + if len(store.runs) != 1 { + t.Fatalf("should not insert another run: %+v", store.runs) + } +} + +func TestTriggerRespawnsWhenRunningRowHasNoVerdict(t *testing.T) { + // A prior pass for this commit never produced a verdict — its execution + // may have been stopped (e.g. the user killed the terminal, or just + // interrupted the agent without killing the pane) without ever calling + // Submit. Pane liveness alone can't tell those apart, so Status=Running + // with no verdict is never trusted blindly: the stale row is marked + // Failed and a fresh pass starts, reusing the pane if it's still alive + // (Notify) and spawning a new one only if it's not (#342). + store := &fakeStore{ + review: &domain.Review{ID: "rev-1", SessionID: "mer-1", ReviewerHandleID: "review-mer-1"}, + runs: []domain.ReviewRun{{ID: "run-1", SessionID: "mer-1", TargetSHA: "sha1", Status: domain.ReviewRunRunning}}, + } + launcher := &fakeLauncher{alive: false, handle: "review-mer-2"} + eng := newEngineForTest(store, fakeSessions{rec: liveWorker(), ok: true}, prAt("sha1"), fakeProjects{}, launcher) + + res, err := eng.Trigger(context.Background(), "mer-1") + if err != nil { + t.Fatalf("Trigger: %v", err) + } + if !res.Created { + t.Fatalf("expected a fresh pass when the prior run has no verdict: %+v", res) + } + if !launcher.spawned { + t.Fatalf("expected a fresh spawn since the old pane is dead: %+v", launcher) + } + stale := store.runs[0] + if stale.ID != "run-1" || stale.Status != domain.ReviewRunFailed { + t.Fatalf("expected stale run-1 marked Failed, got %+v", stale) + } +} + +func TestTriggerNotifiesLiveReviewerWhenRunningRowHasNoVerdict(t *testing.T) { + // Same as above, but the pane is still alive (e.g. the user only + // interrupted the agent's work without killing the terminal/pane). The + // stale row is still superseded, but the live pane is re-notified instead + // of spawning a redundant second pane. + store := &fakeStore{ + review: &domain.Review{ID: "rev-1", SessionID: "mer-1", ReviewerHandleID: "review-mer-1"}, + runs: []domain.ReviewRun{{ID: "run-1", SessionID: "mer-1", TargetSHA: "sha1", Status: domain.ReviewRunRunning}}, + } + launcher := &fakeLauncher{alive: true, handle: "review-mer-1"} + eng := newEngineForTest(store, fakeSessions{rec: liveWorker(), ok: true}, prAt("sha1"), fakeProjects{}, launcher) + + res, err := eng.Trigger(context.Background(), "mer-1") + if err != nil { + t.Fatalf("Trigger: %v", err) + } + if !res.Created { + t.Fatalf("expected a fresh pass even though the pane is reused: %+v", res) + } + if !launcher.notified || launcher.spawned { + t.Fatalf("expected notify on the still-alive pane, not a spawn: %+v", launcher) + } + stale := store.runs[0] + if stale.ID != "run-1" || stale.Status != domain.ReviewRunFailed { + t.Fatalf("expected stale run-1 marked Failed, got %+v", stale) + } +} + func TestTriggerNotifiesLiveReviewerOnNewCommit(t *testing.T) { store := &fakeStore{ review: &domain.Review{ID: "rev-1", SessionID: "mer-1", ReviewerHandleID: "review-mer-1"},