diff --git a/.claude/skills/pr-polish/SKILL.md b/.claude/skills/pr-polish/SKILL.md new file mode 100644 index 000000000000..3b36adee141e --- /dev/null +++ b/.claude/skills/pr-polish/SKILL.md @@ -0,0 +1,245 @@ +--- +name: pr-polish +description: Alternate /pr-review and /pr-address on a PR until the PR is truly mergeable — no new review findings, zero unresolved inline threads, zero unaddressed top-level reviews or issue comments, all CI checks green, and two consecutive quiet polls after CI settles. Use when the user wants a PR polished to merge-ready without setting a fixed number of rounds. +user-invocable: true +argument-hint: "[PR number or URL] — if omitted, finds PR for current branch." +metadata: + author: autogpt-team + version: "1.0.0" +--- + +# PR Polish + +**Goal.** Drive a PR to merge-ready by alternating `/pr-review` and `/pr-address` until **all** of the following hold: + +1. The most recent `/pr-review` produces **zero new findings** (no new inline comments, no new top-level reviews with a non-empty body). +2. Every inline review thread reachable via GraphQL reports `isResolved: true`. +3. Every non-bot, non-author top-level review has been acknowledged (replied-to) OR resolved via a thread it spawned. +4. Every non-bot, non-author issue comment has been acknowledged (replied-to). +5. Every CI check is `conclusion: "success"` or `"skipped"` / `"neutral"` — none `"failure"` or still pending. +6. **Two consecutive post-CI polls** (≥60s apart) stay clean — no new threads, no new non-empty reviews, no new issue comments. Bots (coderabbitai, sentry, autogpt-reviewer) frequently post late after CI settles; a single green snapshot is not sufficient. + +**Do not stop at a fixed number of rounds.** If round N introduces new comments, round N+1 is required. Cap at `_MAX_ROUNDS = 10` as a safety valve, but expect 2–5 in practice. + +## TodoWrite + +Before starting, write two todos so the user can see the loop progression: + +- `Round {current}: /pr-review + /pr-address on PR #{N}` — current iteration. +- `Final polish polling: 2 consecutive clean polls, CI green, 0 unresolved` — runs after the last non-empty review round. + +Update the `current` round counter at the start of each iteration; mark `completed` only when the round's address step finishes (all new threads addressed + resolved). + +## Find the PR + +```bash +ARG_PR="${ARG:-}" +# Normalize URL → numeric ID if the skill arg is a pull-request URL. +if [[ "$ARG_PR" =~ ^https?://github\.com/[^/]+/[^/]+/pull/([0-9]+) ]]; then + ARG_PR="${BASH_REMATCH[1]}" +fi +PR="${ARG_PR:-$(gh pr list --head "$(git branch --show-current)" --repo Significant-Gravitas/AutoGPT --json number --jq '.[0].number')}" +if [ -z "$PR" ] || [ "$PR" = "null" ]; then + echo "No PR found for current branch. Provide a PR number or URL as the skill arg." + exit 1 +fi +echo "Polishing PR #$PR" +``` + +## The outer loop + +```text +round = 0 +while round < _MAX_ROUNDS: + round += 1 + baseline = snapshot_state(PR) # see "Snapshotting state" below + invoke_skill("pr-review", PR) # posts findings as inline comments / top-level review + findings = diff_state(PR, baseline) + if findings.total == 0: + break # no new findings → go to polish polling + invoke_skill("pr-address", PR) # resolves every unresolved thread + CI failure +# Post-loop: polish polling (see below). +polish_polling(PR) +``` + +### Snapshotting state + +Before each `/pr-review`, capture a baseline so the diff after the review reflects **only** what the review just added (not pre-existing threads): + +```bash +# Inline threads — total count + latest databaseId per thread +gh api graphql -f query=" +{ + repository(owner: \"Significant-Gravitas\", name: \"AutoGPT\") { + pullRequest(number: ${PR}) { + reviewThreads(first: 100) { + totalCount + nodes { + id + isResolved + comments(last: 1) { nodes { databaseId } } + } + } + } + } +}" > /tmp/baseline_threads.json + +# Top-level reviews — count + latest id per non-empty review +gh api "repos/Significant-Gravitas/AutoGPT/pulls/${PR}/reviews" --paginate \ + --jq '[.[] | select((.body // "") != "") | {id, user: .user.login, state, submitted_at}]' \ + > /tmp/baseline_reviews.json + +# Issue comments — count + latest id per non-bot, non-author comment. +# Bots are filtered by User.type == "Bot" (GitHub sets this for app/bot +# accounts like coderabbitai, github-actions, sentry-io). The author is +# filtered by comparing login to the PR author — export it so jq can see it. +AUTHOR=$(gh api "repos/Significant-Gravitas/AutoGPT/pulls/${PR}" --jq '.user.login') +gh api "repos/Significant-Gravitas/AutoGPT/issues/${PR}/comments" --paginate \ + --jq --arg author "$AUTHOR" \ + '[.[] | select(.user.type != "Bot" and .user.login != $author) + | {id, user: .user.login, created_at}]' \ + > /tmp/baseline_issue_comments.json +``` + +### Diffing after a review + +After `/pr-review` runs, any of these counting as "new findings" means another address round is needed: + +- New inline thread `id` not in the baseline. +- An existing thread whose latest comment `databaseId` is higher than the baseline's (new reply on an old thread). +- A new top-level review `id` with a non-empty body. +- A new issue comment `id` from a non-bot, non-author user. + +If any of the four buckets is non-empty → not done; invoke `/pr-address` and loop. + +## Polish polling + +Once `/pr-review` produces zero new findings, do **not** exit yet. Bots (coderabbitai, sentry, autogpt-reviewer) commonly post late reviews after CI settles — 30–90 seconds after the final push. Poll at 60-second intervals: + +```text +NON_SUCCESS_TERMINAL = {"failure", "cancelled", "timed_out", "action_required", "startup_failure"} +clean_polls = 0 +required_clean = 2 +while clean_polls < required_clean: + # 1. CI gate — any terminal non-success conclusion (not just "failure") + # must trigger /pr-address. "success", "skipped", "neutral" are clean; + # anything else (including cancelled, timed_out, action_required) is a + # blocker that won't self-resolve. + ci = fetch_check_runs(PR) + if any ci.conclusion in NON_SUCCESS_TERMINAL: + invoke_skill("pr-address", PR) # address failures + any new comments + baseline = snapshot_state(PR) # reset — push during address invalidates old baseline + clean_polls = 0 + continue + if any ci.conclusion is None (still in_progress): + sleep 60; continue # wait without counting this as clean + + # 2. Comment / thread gate + threads = fetch_unresolved_threads(PR) + new_issue_comments = diff_against_baseline(issue_comments) + new_reviews = diff_against_baseline(reviews) + if threads or new_issue_comments or new_reviews: + invoke_skill("pr-address", PR) + baseline = snapshot_state(PR) # reset — the address loop just dealt with these, + # otherwise they stay "new" relative to the old baseline forever + clean_polls = 0 + continue + + # 3. Mergeability gate + mergeable = gh api repos/.../pulls/${PR} --jq '.mergeable' + if mergeable == false (CONFLICTING): + resolve_conflicts(PR) # see pr-address skill + clean_polls = 0 + continue + if mergeable is null (UNKNOWN): + sleep 60; continue + + clean_polls += 1 + sleep 60 +``` + +Only after `clean_polls == 2` do you report `ORCHESTRATOR:DONE`. + +### Why 2 clean polls, not 1 + +A single green snapshot can be misleading — the final CI check often completes ~30s before a bot posts its delayed review. One quiet cycle does not prove the PR is stable; two consecutive cycles with no new threads, reviews, or issue comments arriving gives high confidence nothing else is incoming. + +### Why checking every source each poll + +`/pr-address` polling inside a single round already re-checks its own comments, but `/pr-polish` sits a level above and must also catch: + +- New top-level reviews (autogpt-reviewer sometimes posts structured feedback only after several CI green cycles). +- Issue comments from human reviewers (not caught by inline thread polling). +- Sentry bug predictions that land on new line numbers post-push. +- Merge conflicts introduced by a race between your push and a merge to `dev`. + +## Invocation pattern + +Delegate to existing skills with the `Skill` tool; do not re-implement the review or address logic inline. This keeps the polish loop focused on orchestration and lets the child skills evolve independently. + +```python +Skill(skill="pr-review", args=pr_url) +Skill(skill="pr-address", args=pr_url) +``` + +After each child invocation, re-query GitHub state directly — never trust a summary for the stop condition. The orchestrator's `ORCHESTRATOR:DONE` is verified against actual GraphQL / REST responses per the rules in `pr-address`'s "Verify actual count before outputting ORCHESTRATOR:DONE" section. + +### **Auto-continue: do NOT end your response between child skills** + +`/pr-polish` is a single orchestration task — one invocation drives the PR all the way to merge-ready. When a child `Skill()` call returns control to you: + +- Do NOT summarize and stop. +- Do NOT wait for user confirmation to continue. +- Immediately, in the same response, perform the next loop step: state diff → decide next action → next `Skill()` call or polling sleep. + +The child skill returning is a **loop iteration boundary**, not a conversation turn boundary. You are expected to keep going until one of the exit conditions in the opening section is met (2 consecutive clean polls, `_MAX_ROUNDS` hit, or an unrecoverable error). + +If the user needs to approve a risky action mid-loop (e.g., a force-push or a destructive git operation), pause there — but not at the routine "round N finished, round N+1 needed" boundary. Those are silent transitions. + +## GitHub rate limits + +This skill issues many GraphQL calls (one review-thread query per outer iteration plus per-poll queries inside polish polling). Expect the GraphQL budget to be tight on large PRs. When `gh api rate_limit --jq .resources.graphql.remaining` drops below ~200, back off: + +- Fall back to REST for reads (flat `/pulls/{N}/comments`, `/pulls/{N}/reviews`, `/issues/{N}/comments`) per the `pr-address` skill's GraphQL-fallback section. +- Queue thread resolutions (GraphQL-only) until the budget resets; keep making progress on fixes + REST replies meanwhile. +- `sleep 5` between any batch of ≥20 writes to avoid secondary rate limits. + +## Safety valves + +- `_MAX_ROUNDS = 10` — if review+address rounds exceed this, stop and escalate to the user with a summary of what's still unresolved. A PR that cannot converge in 10 rounds has systemic issues that need human judgment. +- After each commit, run `poetry run format` / `pnpm format && pnpm lint && pnpm types` per the target codebase's conventions. A failing format check is CI `failure` that will never self-resolve. +- Every `/pr-review` round checks for **duplicate** concerns first (via `pr-review`'s own "Fetch existing review comments" step) so the loop does not re-post the same finding that a prior round already resolved. + +## Reporting + +When the skill finishes (either via two clean polls or hitting `_MAX_ROUNDS`), produce a compact summary: + +``` +PR #{N} polish complete ({rounds_completed} rounds): +- {X} inline threads opened and resolved +- {Y} CI failures fixed +- {Z} new commits pushed +Final state: CI green, {total} threads all resolved, mergeable. +``` + +If exiting via `_MAX_ROUNDS`, flag explicitly: + +``` +PR #{N} polish stopped at {_MAX_ROUNDS} rounds — NOT merge-ready: +- {N} threads still unresolved: {titles} +- CI status: {summary} +Needs human review. +``` + +## When to use this skill + +Use when the user says any of: +- "polish this PR" +- "keep reviewing and addressing until it's mergeable" +- "loop /pr-review + /pr-address until done" +- "make sure the PR is actually merge-ready" + +Do **not** use when: +- User wants just one review pass (→ `/pr-review`). +- User wants to address already-posted comments without further self-review (→ `/pr-address`). +- A fixed round count is explicitly requested (e.g., "do 3 rounds") — honour the count instead of converging. diff --git a/.claude/skills/pr-test/SKILL.md b/.claude/skills/pr-test/SKILL.md index b368fb7f0d39..09699ec546b5 100644 --- a/.claude/skills/pr-test/SKILL.md +++ b/.claude/skills/pr-test/SKILL.md @@ -260,6 +260,32 @@ Use a `trap` so release runs even on `exit 1`: trap 'kill "$HEARTBEAT_PID" 2>/dev/null; rm -f "$LOCK"' EXIT INT TERM ``` +### **Release the lock AS SOON AS the test run is done** + +The lock guards **test execution**, not **app lifecycle**. Once Step 5 (record results) and Step 6 (post PR comment) are complete, release the lock IMMEDIATELY — even if: + +- The native `poetry run app` / `pnpm dev` processes are still running so the user can keep poking at the app manually. +- You're leaving docker containers up. +- You're tailing logs for a minute or two. + +Keeping the lock held past the test run is the single most common way `/pr-test` stalls other agents. **The app staying up is orthogonal to the lock; don't conflate them.** Sibling worktrees running their own `/pr-test` will kill the stray processes and free the ports themselves (Step 3c/3e-native handle that) — they just need the lock file gone. + +Concretely, the sequence at the end of every `/pr-test` run (success or failure) is: + +```bash +# 1. Write the final report + post PR comment — done above in Step 5/6. +# 2. Release the lock right now, even if the app is still up. +kill "$HEARTBEAT_PID" 2>/dev/null +rm -f "$LOCK" /tmp/pr-test-heartbeat.pid +echo "$(date -u +%Y-%m-%dT%H:%MZ) [pr-${PR_NUMBER}] released lock (app may still be running)" \ + >> /Users/majdyz/Code/AutoGPT/.ign.testing.log +# 3. Optionally leave the app running and note it so the user knows: +echo "Native stack still running on :3000 / :8006 for manual poking. Kill with:" +echo " pkill -9 -f 'poetry run app'; pkill -9 -f 'next-server|next dev'" +``` + +If a sibling agent's `/pr-test` needs to take over, it'll do the kill+rebuild dance from Step 3c/3e-native on its own — your only job is to not hold the lock file past the end of your test. + ### Shared status log `/Users/majdyz/Code/AutoGPT/.ign.testing.log` is an append-only channel any agent can read/write. Use it for "I'm waiting", "I'm done, resources free", or post-run notes: @@ -755,6 +781,19 @@ Upload screenshots to the PR using the GitHub Git API (no local git operations **CRITICAL — NEVER post a bare directory link like `https://github.com/.../tree/...`.** Every screenshot MUST appear as `![name](raw_url)` inline in the PR comment so reviewers can see them without clicking any links. After posting, the verification step below greps the comment for `![` tags and exits 1 if none are found — the test run is considered incomplete until this passes. +**CRITICAL — NEVER paste absolute local paths into the PR comment.** Strings like `/Users/…`, `/home/…`, `C:\…` are useless to every reviewer except you. Before posting, grep the final body for `/Users/`, `/home/`, `/tmp/`, `/private/`, `C:\`, `~/` and either drop those lines entirely or rewrite them as repo-relative paths (`autogpt_platform/backend/…`). The PR comment is an artifact reviewers on GitHub read — it must be self-contained on github.com. Keep local paths in `$RESULTS_DIR/test-report.md` for yourself; only copy the *content* they reference (excerpts, test names, log lines) into the PR comment, not the path. + +**Pre-post sanity check** (paste after building the comment body, before `gh api ... comments`): + +```bash +# Reject any local-looking absolute path or home-dir shortcut in the body +if grep -nE '(^|[^A-Za-z])(/Users/|/home/|/tmp/|/private/|C:\\|~/)[A-Za-z0-9]' "$COMMENT_FILE" ; then + echo "ABORT: local filesystem paths detected in PR comment body." + echo "Remove or rewrite as repo-relative (autogpt_platform/...) before posting." + exit 1 +fi +``` + ```bash # Upload screenshots via GitHub Git API (creates blobs, tree, commit, and ref remotely) REPO="Significant-Gravitas/AutoGPT" diff --git a/autogpt_platform/backend/backend/copilot/executor/manager.py b/autogpt_platform/backend/backend/copilot/executor/manager.py index 02a291388378..08baf73c0596 100644 --- a/autogpt_platform/backend/backend/copilot/executor/manager.py +++ b/autogpt_platform/backend/backend/copilot/executor/manager.py @@ -105,25 +105,46 @@ def run(self): time.sleep(1e5) def cleanup(self): - """Graceful shutdown with active execution waiting.""" + """Graceful shutdown — mirrors ``backend.executor.manager`` pattern. + + 1. Stop consumer immediately (both the Python flag that gates + ``_handle_run_message`` and ``channel.stop_consuming()`` at + the broker), so no new work enters. + 2. Passively wait for ``active_tasks`` to drain — each turn's + own ``finally`` publishes its terminal state via + ``mark_session_completed``. When a turn exits, ``on_run_done`` + removes it from ``active_tasks`` and releases its cluster lock. + 3. Shut down the thread-pool executor (cancels pending, leaves + running threads alone — process exit handles them). + 4. Release any cluster locks still held (defensive — on_run_done's + finally should have already released them). + 5. Stop message consumer threads + disconnect pika clients. + + The zombie-session bug this PR targets is handled inside each + turn's own lifecycle by :func:`sync_fail_close_session`, NOT by + cleanup — so cleanup can stay as a simple "wait, then teardown" + and matches agent-executor's proven pattern. + """ pid = os.getpid() - logger.info(f"[cleanup {pid}] Starting graceful shutdown...") + prefix = f"[cleanup {pid}]" + logger.info(f"{prefix} Starting graceful shutdown...") - # Signal the consumer thread to stop + # 1. Stop consumer — flag AND broker-side try: self.stop_consuming.set() run_channel = self.run_client.get_channel() run_channel.connection.add_callback_threadsafe( lambda: run_channel.stop_consuming() ) - logger.info(f"[cleanup {pid}] Consumer has been signaled to stop") + logger.info(f"{prefix} Consumer has been signaled to stop") except Exception as e: - logger.error(f"[cleanup {pid}] Error stopping consumer: {e}") + logger.error(f"{prefix} Error stopping consumer: {e}") - # Wait for active executions to complete + # 2. Wait for in-flight turns to finish naturally if self.active_tasks: logger.info( - f"[cleanup {pid}] Waiting for {len(self.active_tasks)} active tasks to complete (timeout: {GRACEFUL_SHUTDOWN_TIMEOUT_SECONDS}s)..." + f"{prefix} Waiting for {len(self.active_tasks)} active tasks " + f"to complete (timeout: {GRACEFUL_SHUTDOWN_TIMEOUT_SECONDS}s)..." ) start_time = time.monotonic() @@ -138,38 +159,42 @@ def cleanup(self): if not self.active_tasks: break - # Refresh cluster locks periodically - current_time = time.monotonic() - if current_time - last_refresh >= lock_refresh_interval: + now = time.monotonic() + if now - last_refresh >= lock_refresh_interval: for lock in list(self._task_locks.values()): try: lock.refresh() except Exception as e: - logger.warning( - f"[cleanup {pid}] Failed to refresh lock: {e}" - ) - last_refresh = current_time + logger.warning(f"{prefix} Failed to refresh lock: {e}") + last_refresh = now logger.info( - f"[cleanup {pid}] {len(self.active_tasks)} tasks still active, waiting..." + f"{prefix} {len(self.active_tasks)} tasks still active, waiting..." ) time.sleep(10.0) - # Stop message consumers + if self.active_tasks: + logger.warning( + f"{prefix} {len(self.active_tasks)} tasks still running after " + f"{GRACEFUL_SHUTDOWN_TIMEOUT_SECONDS}s — process exit will " + f"abandon them; RabbitMQ redelivery handles the message." + ) + + # 3. Stop message consumer threads if self._run_thread: self._stop_message_consumers( - self._run_thread, self.run_client, "[cleanup][run]" + self._run_thread, self.run_client, f"{prefix} [run]" ) if self._cancel_thread: self._stop_message_consumers( - self._cancel_thread, self.cancel_client, "[cleanup][cancel]" + self._cancel_thread, self.cancel_client, f"{prefix} [cancel]" ) - # Clean up worker threads (closes per-loop workspace storage sessions) + # 4. Worker cleanup + executor shutdown if self._executor: from .processor import cleanup_worker - logger.info(f"[cleanup {pid}] Cleaning up workers...") + logger.info(f"{prefix} Cleaning up workers...") futures = [] for _ in range(self._executor._max_workers): futures.append(self._executor.submit(cleanup_worker)) @@ -177,22 +202,20 @@ def cleanup(self): try: f.result(timeout=10) except Exception as e: - logger.warning(f"[cleanup {pid}] Worker cleanup error: {e}") + logger.warning(f"{prefix} Worker cleanup error: {e}") - logger.info(f"[cleanup {pid}] Shutting down executor...") + logger.info(f"{prefix} Shutting down executor...") self._executor.shutdown(wait=False) - # Release any remaining locks + # 5. Release any cluster locks still held for session_id, lock in list(self._task_locks.items()): try: lock.release() - logger.info(f"[cleanup {pid}] Released lock for {session_id}") + logger.info(f"{prefix} Released lock for {session_id}") except Exception as e: - logger.error( - f"[cleanup {pid}] Failed to release lock for {session_id}: {e}" - ) + logger.error(f"{prefix} Failed to release lock for {session_id}: {e}") - logger.info(f"[cleanup {pid}] Graceful shutdown completed") + logger.info(f"{prefix} Graceful shutdown completed") # ============ RabbitMQ Consumer Methods ============ # @@ -387,13 +410,12 @@ def ack_message(reject: bool, requeue: bool): # Execute the task try: - self._task_locks[session_id] = cluster_lock - logger.info( f"Acquired cluster lock for {session_id}, " f"executor_id={self.executor_id}" ) + self._task_locks[session_id] = cluster_lock cancel_event = threading.Event() future = self.executor.submit( execute_copilot_turn, entry, cancel_event, cluster_lock @@ -425,7 +447,6 @@ def on_run_done(f: Future): error_msg = str(e) or type(e).__name__ logger.exception(f"Error in run completion callback: {error_msg}") finally: - # Release the cluster lock if session_id in self._task_locks: logger.info(f"Releasing cluster lock for {session_id}") self._task_locks[session_id].release() diff --git a/autogpt_platform/backend/backend/copilot/executor/processor.py b/autogpt_platform/backend/backend/copilot/executor/processor.py index f40264b70b6b..383830250433 100644 --- a/autogpt_platform/backend/backend/copilot/executor/processor.py +++ b/autogpt_platform/backend/backend/copilot/executor/processor.py @@ -5,6 +5,7 @@ """ import asyncio +import concurrent.futures import logging import os import subprocess @@ -30,6 +31,87 @@ logger = TruncatedLogger(logging.getLogger(__name__), prefix="[CoPilotExecutor]") +SHUTDOWN_ERROR_MESSAGE = ( + "Copilot executor shut down before this turn finished. Please retry." +) + +# Max time execute() blocks after calling future.cancel() / when draining a +# soon-to-be-cancelled future. Gives _execute_async's own finally a chance to +# publish the accurate terminal state over the Redis CAS; long enough to let +# an in-flight Redis call settle, short enough that shutdown doesn't stall. +_CANCEL_GRACE_SECONDS = 5.0 + +# Max time the sync safety net itself spends on a single Redis CAS. Without +# this bound the whole point of ``sync_fail_close_session`` is defeated — +# ``mark_session_completed`` would hang on the same broken Redis that caused +# the original failure. On timeout we give up silently; worst case the +# session stays ``running`` until the stale-session watchdog reaps it, but +# at least the pool worker thread isn't blocked forever. +_FAIL_CLOSE_REDIS_TIMEOUT = 10.0 + + +# Module-level symbol preserved for backward-compat with callers that import +# ``sync_fail_close_session``; the real implementation now lives on +# ``CoPilotProcessor`` so it can reuse ``self.execution_loop`` (same +# pattern as ``backend.executor.manager``'s ``node_execution_loop`` bridge +# at :meth:`ExecutionProcessor.on_graph_execution`). + + +def sync_fail_close_session( + session_id: str, + log: "CoPilotLogMetadata | TruncatedLogger", + execution_loop: asyncio.AbstractEventLoop, +) -> None: + """Synchronously mark *session_id* as failed from the pool worker thread. + + Submits the CAS coroutine to the long-lived *execution_loop* via + ``run_coroutine_threadsafe`` — the same shape agent-executor uses at + :meth:`backend.executor.manager.ExecutionProcessor.on_graph_execution` + to reach its ``node_execution_loop`` from the pool worker. Reusing the + persistent loop means: + + * no fresh TCP connection per turn (the ``@thread_cached`` + ``AsyncRedis`` on the execution thread stays bound to the same loop + and is reused across every turn); + * no loop-teardown overhead; + * no ``clear_cache()`` gymnastics to dodge the "loop is closed" pitfall. + + ``mark_session_completed`` is an atomic CAS on ``status == "running"``, + so when the async path already wrote a terminal state the sync call is + a cheap no-op. The inner ``asyncio.wait_for`` bounds the Redis call so + a wedged Redis can't hang the safety net for the full redis-py default + TCP timeout; the outer ``.result(timeout=...)`` is a belt-and-braces + upper bound for the cross-thread wait. + """ + + async def _bounded() -> None: + await asyncio.wait_for( + stream_registry.mark_session_completed( + session_id, error_message=SHUTDOWN_ERROR_MESSAGE + ), + timeout=_FAIL_CLOSE_REDIS_TIMEOUT, + ) + + try: + future = asyncio.run_coroutine_threadsafe(_bounded(), execution_loop) + except RuntimeError as e: + # execution_loop is closed — happens if cleanup() already ran the + # per-worker teardown. Nothing we can do; let the stale-session + # watchdog reap it. + log.warning(f"sync fail-close skipped (execution_loop closed): {e}") + return + try: + future.result(timeout=_FAIL_CLOSE_REDIS_TIMEOUT + 2) + except concurrent.futures.TimeoutError: + log.warning( + f"sync fail-close timed out after {_FAIL_CLOSE_REDIS_TIMEOUT}s " + f"(session={session_id})" + ) + future.cancel() + except Exception as e: + log.warning(f"sync fail-close mark_session_completed failed: {e}") + + # ============ Mode Routing ============ # @@ -252,12 +334,13 @@ def execute( ): """Execute a CoPilot turn. - Runs the async logic in the worker's event loop and handles errors. - - Args: - entry: The turn payload containing session and message info - cancel: Threading event to signal cancellation - cluster_lock: Distributed lock to prevent duplicate execution + Thin wrapper around :meth:`_execute`. The ``try/finally`` here + guarantees :func:`sync_fail_close_session` runs on every exit + path — normal completion, exception, or a wedged event loop + that escapes via :data:`_CANCEL_GRACE_SECONDS` timeout. + ``mark_session_completed`` is an atomic CAS on + ``status == "running"``, so when the async path already wrote a + terminal state the sync call is a cheap no-op. """ log = CoPilotLogMetadata( logging.getLogger(__name__), @@ -265,10 +348,28 @@ def execute( user_id=entry.user_id, ) log.info("Starting execution") - start_time = time.monotonic() + try: + self._execute(entry, cancel, cluster_lock, log) + finally: + sync_fail_close_session(entry.session_id, log, self.execution_loop) + elapsed = time.monotonic() - start_time + log.info(f"Execution completed in {elapsed:.2f}s") + + def _execute( + self, + entry: CoPilotExecutionEntry, + cancel: threading.Event, + cluster_lock: ClusterLock, + log: CoPilotLogMetadata, + ): + """Submit the async turn to ``self.execution_loop`` and drive it. - # Run the async execution in our event loop + Handles the sync/async boundary (cancel-event checks, cluster-lock + refresh, bounded waits) without any Redis-state cleanup logic — + that lives in :func:`sync_fail_close_session` which the outer + :meth:`execute` always invokes on exit. + """ future = asyncio.run_coroutine_threadsafe( self._execute_async(entry, cancel, cluster_lock, log), self.execution_loop, @@ -282,16 +383,27 @@ def execute( if cancel.is_set(): log.info("Cancellation requested") future.cancel() - break - # Refresh cluster lock to maintain ownership + # Give _execute_async's own finally a short window to + # publish its accurate terminal state before the outer + # sync safety net fires. + try: + future.result(timeout=_CANCEL_GRACE_SECONDS) + except BaseException: + pass + return cluster_lock.refresh() if not future.cancelled(): - # Get result to propagate any exceptions - future.result() - - elapsed = time.monotonic() - start_time - log.info(f"Execution completed in {elapsed:.2f}s") + # Bounded timeout so a wedged event loop can't trap us here — + # on timeout we escape to execute()'s finally and the sync + # safety net fires. + try: + future.result(timeout=_CANCEL_GRACE_SECONDS) + except concurrent.futures.TimeoutError: + log.warning( + "Future did not complete within grace window; " + "falling through to sync fail-close" + ) async def _execute_async( self, diff --git a/autogpt_platform/backend/backend/copilot/executor/processor_test.py b/autogpt_platform/backend/backend/copilot/executor/processor_test.py index 554164874705..cdc393e5b1c5 100644 --- a/autogpt_platform/backend/backend/copilot/executor/processor_test.py +++ b/autogpt_platform/backend/backend/copilot/executor/processor_test.py @@ -10,6 +10,8 @@ has meaningful coverage. """ +import asyncio +import concurrent.futures import logging import threading from unittest.mock import AsyncMock, MagicMock, patch @@ -20,6 +22,7 @@ CoPilotProcessor, resolve_effective_mode, resolve_use_sdk_for_mode, + sync_fail_close_session, ) from backend.copilot.executor.utils import CoPilotExecutionEntry, CoPilotLogMetadata @@ -275,3 +278,221 @@ async def test_cancel_break_calls_aclose(self) -> None: await proc._execute_async(_make_entry(), cancel, cluster_lock, _make_log()) assert published.aclose_called is True + + +@pytest.fixture +def exec_loop(): + """Long-lived asyncio loop on a daemon thread — mirrors the layout + ``CoPilotProcessor`` sets up (``execution_loop`` + ``execution_thread``) + so ``sync_fail_close_session`` has a real cross-thread loop to submit + into via ``run_coroutine_threadsafe``.""" + loop = asyncio.new_event_loop() + thread = threading.Thread(target=loop.run_forever, daemon=True) + thread.start() + try: + yield loop + finally: + loop.call_soon_threadsafe(loop.stop) + thread.join(timeout=5) + loop.close() + + +class TestSyncFailCloseSession: + """``sync_fail_close_session`` is the last-line-of-defense invoked from + ``CoPilotProcessor.execute``'s ``finally``. It must call + ``mark_session_completed`` via the processor's long-lived + ``execution_loop`` (cross-thread submit) and must swallow Redis + failures so a transient outage doesn't propagate out of the finally.""" + + def test_invokes_mark_session_completed_with_shutdown_message( + self, exec_loop + ) -> None: + mock_mark = AsyncMock() + with patch( + "backend.copilot.executor.processor.stream_registry.mark_session_completed", + new=mock_mark, + ): + sync_fail_close_session("sess-1", _make_log(), exec_loop) + + mock_mark.assert_awaited_once() + assert mock_mark.await_args is not None + assert mock_mark.await_args.args[0] == "sess-1" + assert "shut down" in mock_mark.await_args.kwargs["error_message"].lower() + + def test_swallows_redis_error(self, exec_loop) -> None: + # Raising from the mock ensures the helper catches the exception + # instead of propagating it back into execute()'s finally block. + mock_mark = AsyncMock(side_effect=RuntimeError("redis down")) + with patch( + "backend.copilot.executor.processor.stream_registry.mark_session_completed", + new=mock_mark, + ): + sync_fail_close_session("sess-2", _make_log(), exec_loop) # must not raise + + mock_mark.assert_awaited_once() + + def test_closed_execution_loop_skipped_cleanly(self) -> None: + """If cleanup_worker has already stopped the execution_loop by the + time the safety net fires, ``run_coroutine_threadsafe`` raises + RuntimeError. Expected behavior: log + return without propagating.""" + dead_loop = asyncio.new_event_loop() + dead_loop.close() + + mock_mark = AsyncMock() + with patch( + "backend.copilot.executor.processor.stream_registry.mark_session_completed", + new=mock_mark, + ): + # Must not raise even though the loop is closed + sync_fail_close_session("sess-closed-loop", _make_log(), dead_loop) + + # mark_session_completed was never scheduled because the loop was dead + mock_mark.assert_not_awaited() + + def test_bounded_timeout_when_redis_hangs(self, exec_loop) -> None: + """Scenario D: Redis unreachable — the inner ``asyncio.wait_for`` + must fire and the helper must return without blocking the worker. + + Simulates a wedged Redis by sleeping past the 10s fail-close budget. + The helper must return within the configured grace (+ a small + scheduler margin) and must not re-raise. + """ + import time as _time + + from backend.copilot.executor.processor import _FAIL_CLOSE_REDIS_TIMEOUT + + async def _hang(*_args, **_kwargs): + await asyncio.sleep(_FAIL_CLOSE_REDIS_TIMEOUT + 5) + + with patch( + "backend.copilot.executor.processor.stream_registry.mark_session_completed", + new=_hang, + ): + start = _time.monotonic() + sync_fail_close_session( + "sess-hang", _make_log(), exec_loop + ) # must not raise + elapsed = _time.monotonic() - start + + # wait_for fires at _FAIL_CLOSE_REDIS_TIMEOUT; outer future.result + # has +2s slack. If the timeout is missing/broken the helper would + # block the full sleep duration (~15s). + assert elapsed < _FAIL_CLOSE_REDIS_TIMEOUT + 4.0, ( + f"sync_fail_close_session hung for {elapsed:.1f}s — bounded " + f"timeout did not fire" + ) + + +# --------------------------------------------------------------------------- +# End-to-end execute() safety-net coverage — the PR's core invariant +# --------------------------------------------------------------------------- + + +class TestExecuteSafetyNet: + """``CoPilotProcessor.execute`` must always invoke + ``sync_fail_close_session`` in its ``finally`` so a session never stays + ``status=running`` in Redis. + + Validates the four deploy-time scenarios the PR targets: + + * A — SIGTERM mid-turn: ``cancel`` event fires, ``_execute`` returns, + safety net still runs. + * B — happy path: normal completion, safety net runs (cheap CAS no-op). + * C — zombie Redis state: the async ``mark_session_completed`` in + ``_execute_async`` blows up, but the outer safety net marks the + session failed anyway. + * D — covered by ``TestSyncFailCloseSession::test_bounded_timeout…``. + """ + + def _attach_exec_loop(self, proc: CoPilotProcessor, loop) -> None: + """``execute`` dispatches the safety net onto ``self.execution_loop``. + Tests don't call ``on_executor_start`` (which spawns the real + per-worker loop), so wire the shared fixture loop in directly.""" + proc.execution_loop = loop + + def _run_execute_in_thread(self, proc: CoPilotProcessor, cancel: threading.Event): + """``CoPilotProcessor.execute`` expects to be called from a pool + worker thread that has *no* running event loop, so we always run + it off the main thread to preserve that invariant. Returns the + future so callers can inspect both result and exception paths.""" + pool = concurrent.futures.ThreadPoolExecutor(max_workers=1) + try: + fut = pool.submit(proc.execute, _make_entry(), cancel, MagicMock()) + # Block until execute() returns (or raises) so the safety net + # has run by the time we inspect mocks. + try: + fut.result(timeout=30) + except BaseException: + pass + return fut + finally: + pool.shutdown(wait=True) + + def test_happy_path_invokes_safety_net(self, exec_loop) -> None: + """Scenario B: normal completion still runs the sync safety net. + Proves the ``finally`` always fires, even when nothing went wrong — + ``mark_session_completed``'s atomic CAS makes this a cheap no-op + in production.""" + mock_mark = AsyncMock() + proc = CoPilotProcessor() + self._attach_exec_loop(proc, exec_loop) + with patch.object(proc, "_execute"), patch( + "backend.copilot.executor.processor.stream_registry.mark_session_completed", + new=mock_mark, + ): + self._run_execute_in_thread(proc, threading.Event()) + + mock_mark.assert_awaited_once() + assert mock_mark.await_args is not None + assert mock_mark.await_args.args[0] == "sess-1" + + def test_sigterm_mid_turn_invokes_safety_net(self, exec_loop) -> None: + """Scenario A: worker raises (simulating future.cancel + grace + timeout escaping ``_execute``); ``execute`` must still reach the + safety net in its ``finally`` and mark the session failed.""" + mock_mark = AsyncMock() + proc = CoPilotProcessor() + self._attach_exec_loop(proc, exec_loop) + with patch.object( + proc, + "_execute", + side_effect=concurrent.futures.TimeoutError("grace expired"), + ), patch( + "backend.copilot.executor.processor.stream_registry.mark_session_completed", + new=mock_mark, + ): + self._run_execute_in_thread(proc, threading.Event()) + + mock_mark.assert_awaited_once() + + def test_zombie_redis_async_path_still_marks_session_failed( + self, exec_loop + ) -> None: + """Scenario C: ``_execute_async``'s own ``mark_session_completed`` + call is broken (simulating the exact async-Redis hiccup that caused + the original zombie sessions). The outer ``sync_fail_close_session`` + runs on the processor's long-lived ``execution_loop`` and succeeds + where the async path failed.""" + call_log: list[str] = [] + + async def _ok(*args, **kwargs): + call_log.append("sync-ok") + + def _broken_execute(entry, cancel, cluster_lock, log): + # Simulate the async path raising because its Redis client is + # wedged (the pre-fix zombie-session scenario). + raise RuntimeError("async Redis client broken") + + proc = CoPilotProcessor() + self._attach_exec_loop(proc, exec_loop) + with patch.object(proc, "_execute", side_effect=_broken_execute), patch( + "backend.copilot.executor.processor.stream_registry.mark_session_completed", + new=_ok, + ): + self._run_execute_in_thread(proc, threading.Event()) + + # The sync safety net must have fired despite the async path + # blowing up — this is the core guarantee of the PR. + assert call_log == [ + "sync-ok" + ], f"expected sync_fail_close_session to run once, got {call_log!r}" diff --git a/autogpt_platform/backend/backend/copilot/executor/utils.py b/autogpt_platform/backend/backend/copilot/executor/utils.py index a2b051d82b94..de1681b55cd4 100644 --- a/autogpt_platform/backend/backend/copilot/executor/utils.py +++ b/autogpt_platform/backend/backend/copilot/executor/utils.py @@ -89,11 +89,16 @@ def get_session_lock_key(session_id: str) -> str: # CoPilot operations can include extended thinking and agent generation -# which may take 30+ minutes to complete -COPILOT_CONSUMER_TIMEOUT_SECONDS = 60 * 60 # 1 hour +# which may take several hours to complete. Matches the pod's +# terminationGracePeriodSeconds in the helm chart so a rolling deploy can let +# the longest legitimate turn finish. Also bounds the stale-session auto- +# complete watchdog in stream_registry (consumer_timeout + 5min buffer). +COPILOT_CONSUMER_TIMEOUT_SECONDS = 6 * 60 * 60 # 6 hours -# Graceful shutdown timeout - allow in-flight operations to complete -GRACEFUL_SHUTDOWN_TIMEOUT_SECONDS = 30 * 60 # 30 minutes +# Graceful shutdown timeout - must match COPILOT_CONSUMER_TIMEOUT_SECONDS so +# cleanup can let the longest legitimate turn complete before the pod is +# SIGKILL'd by kubelet. +GRACEFUL_SHUTDOWN_TIMEOUT_SECONDS = COPILOT_CONSUMER_TIMEOUT_SECONDS def create_copilot_queue_config() -> RabbitMQConfig: @@ -113,9 +118,27 @@ def create_copilot_queue_config() -> RabbitMQConfig: durable=True, auto_delete=False, arguments={ - # Extended consumer timeout for long-running LLM operations - # Default 30-minute timeout is insufficient for extended thinking - # and agent generation which can take 30+ minutes + # Consumer timeout matches the pod graceful-shutdown window so a + # rolling deploy never forces redelivery of a turn that the pod + # is still legitimately finishing. + # + # Deploy note: RabbitMQ (verified on 4.1.4) does NOT strictly + # compare ``x-consumer-timeout`` on queue redeclaration, so this + # value can change between deploys without triggering + # PRECONDITION_FAILED. To update the *effective* timeout on an + # already-running queue before the new code deploys (so pods + # mid-shutdown don't have their consumer cancelled at the old + # limit), apply a policy: + # + # rabbitmqctl set_policy copilot-consumer-timeout \ + # "^copilot_execution_queue$" \ + # '{"consumer-timeout": 21600000}' \ + # --apply-to queues + # + # The policy takes effect immediately. Once the policy is set + # to match the code's value the policy is redundant for new + # pods and can be removed after a stable deploy if desired — + # but it's harmless to leave in place. "x-consumer-timeout": COPILOT_CONSUMER_TIMEOUT_SECONDS * 1000, }, diff --git a/autogpt_platform/backend/backend/copilot/pending_messages.py b/autogpt_platform/backend/backend/copilot/pending_messages.py index ff6eed8b5978..8e6aa61af973 100644 --- a/autogpt_platform/backend/backend/copilot/pending_messages.py +++ b/autogpt_platform/backend/backend/copilot/pending_messages.py @@ -240,16 +240,15 @@ async def peek_pending_messages(session_id: str) -> list[PendingMessage]: return messages -async def _clear_pending_messages_unsafe(session_id: str) -> None: +async def clear_pending_messages_unsafe(session_id: str) -> None: """Drop the session's pending buffer — **not** the normal turn cleanup. - Named ``_unsafe`` because reaching for this at turn end drops queued - follow-ups on the floor instead of running them (the bug fixed by - commit b64be73). The atomic ``LPOP`` drain at turn start is the - primary consumer; anything pushed after the drain window belongs to - the next turn by definition. Retained only as an operator/debug - escape hatch for manually clearing a stuck session and as a fixture - in the unit tests. + The ``_unsafe`` suffix warns: reaching for this at turn end drops queued + follow-ups on the floor instead of running them (the bug fixed by commit + b64be73). The atomic ``LPOP`` drain at turn start is the primary consumer; + anything pushed after the drain window belongs to the next turn by + definition. Retained only as an operator/debug escape hatch for manually + clearing a stuck session and as a fixture in the unit tests. """ redis = await get_redis_async() await redis.delete(_buffer_key(session_id)) diff --git a/autogpt_platform/backend/backend/copilot/pending_messages_test.py b/autogpt_platform/backend/backend/copilot/pending_messages_test.py index 06f809579f9a..c997d7d9cfb9 100644 --- a/autogpt_platform/backend/backend/copilot/pending_messages_test.py +++ b/autogpt_platform/backend/backend/copilot/pending_messages_test.py @@ -16,7 +16,7 @@ MAX_PENDING_MESSAGES, PendingMessage, PendingMessageContext, - _clear_pending_messages_unsafe, + clear_pending_messages_unsafe, drain_and_format_for_injection, drain_pending_for_persist, drain_pending_messages, @@ -208,15 +208,15 @@ async def test_cap_drops_oldest_when_exceeded(fake_redis: _FakeRedis) -> None: async def test_clear_removes_buffer(fake_redis: _FakeRedis) -> None: await push_pending_message("sess4", PendingMessage(content="x")) await push_pending_message("sess4", PendingMessage(content="y")) - await _clear_pending_messages_unsafe("sess4") + await clear_pending_messages_unsafe("sess4") assert await peek_pending_count("sess4") == 0 @pytest.mark.asyncio async def test_clear_is_idempotent(fake_redis: _FakeRedis) -> None: # Clearing an already-empty buffer should not raise - await _clear_pending_messages_unsafe("sess_empty") - await _clear_pending_messages_unsafe("sess_empty") + await clear_pending_messages_unsafe("sess_empty") + await clear_pending_messages_unsafe("sess_empty") # ── Publish hook ──────────────────────────────────────────────────── diff --git a/autogpt_platform/backend/backend/copilot/stream_registry.py b/autogpt_platform/backend/backend/copilot/stream_registry.py index e4559c46e57d..79deadacc0fc 100644 --- a/autogpt_platform/backend/backend/copilot/stream_registry.py +++ b/autogpt_platform/backend/backend/copilot/stream_registry.py @@ -1026,8 +1026,8 @@ async def get_active_session( # Check if session is stale (running beyond tool timeout + buffer). # Auto-complete it to prevent infinite polling loops. - # Synchronous tools can run up to COPILOT_CONSUMER_TIMEOUT_SECONDS (1 hour), - # so we add a 5-minute buffer to avoid false positives during legitimate operations. + # A turn can legitimately run up to COPILOT_CONSUMER_TIMEOUT_SECONDS, so we + # add a 5-minute buffer to avoid false positives during legitimate operations. created_at_str = meta.get("created_at") if created_at_str: try: diff --git a/autogpt_platform/backend/backend/data/redis_client.py b/autogpt_platform/backend/backend/data/redis_client.py index f7d030c62b87..e3675370e5e9 100644 --- a/autogpt_platform/backend/backend/data/redis_client.py +++ b/autogpt_platform/backend/backend/data/redis_client.py @@ -14,6 +14,21 @@ PORT = int(os.getenv("REDIS_PORT", "6379")) PASSWORD = os.getenv("REDIS_PASSWORD", None) +# Default socket timeouts so a wedged Redis endpoint can't hang callers +# indefinitely — long-running code paths (cluster_lock refresh in particular) +# rely on these to fail-fast instead of blocking on no-response TCP. Override +# via env if a specific deployment needs a different budget. +# +# 30s matches the convention in ``backend.data.rabbitmq`` and leaves ~6x +# headroom over the largest ``xread(block=5000)`` wait in stream_registry. +# The connect timeout is shorter (5s) because initial connects should be +# fast; a slow connect usually means the endpoint is genuinely unreachable. +SOCKET_TIMEOUT = float(os.getenv("REDIS_SOCKET_TIMEOUT", "30")) +SOCKET_CONNECT_TIMEOUT = float(os.getenv("REDIS_SOCKET_CONNECT_TIMEOUT", "5")) +# How often redis-py sends a PING on idle connections to detect half-open +# sockets; cheap and avoids waiting for the OS TCP keepalive (~2h default). +HEALTH_CHECK_INTERVAL = int(os.getenv("REDIS_HEALTH_CHECK_INTERVAL", "30")) + logger = logging.getLogger(__name__) @@ -24,6 +39,10 @@ def connect() -> Redis: port=PORT, password=PASSWORD, decode_responses=True, + socket_timeout=SOCKET_TIMEOUT, + socket_connect_timeout=SOCKET_CONNECT_TIMEOUT, + socket_keepalive=True, + health_check_interval=HEALTH_CHECK_INTERVAL, ) c.ping() return c @@ -46,6 +65,10 @@ async def connect_async() -> AsyncRedis: port=PORT, password=PASSWORD, decode_responses=True, + socket_timeout=SOCKET_TIMEOUT, + socket_connect_timeout=SOCKET_CONNECT_TIMEOUT, + socket_keepalive=True, + health_check_interval=HEALTH_CHECK_INTERVAL, ) await c.ping() return c