Skip to content

Commit f4f48b9

Browse files
authored
Add vault data layer for auto-triage (#64590)
* Draft: vault data layer for auto-triage (direction proposal) Signed-off-by: André Ahlert <andre@aex.partners> * Phase 1: Persist author profiles across sessions Add disk-backed caching for author profiles with a 7-day TTL. Previously profiles were stored only in an in-memory dict and re-fetched from the GitHub API on every run. Now they are saved to the breeze build cache on first fetch and loaded from disk on subsequent runs, falling back to the API when the cache entry is missing or expired. Signed-off-by: André Ahlert <andre@aex.partners> * Phase 2: Materialize PR metadata to vault on fetch Save PR metadata (number, title, author, labels, head_sha, checks state, etc.) to the breeze build cache after each GraphQL fetch. The vault uses a 4-hour TTL and validates against head_sha so stale entries for PRs that received new commits are discarded. This lays the groundwork for Phase 3 where the triage flow can load known PRs from the vault instead of re-fetching from the API. Signed-off-by: André Ahlert <andre@aex.partners> * Address review feedback: top-level imports and strip cached_at Move get_cached_author_profile and save_author_profile imports to module scope. Strip the internal cached_at field from disk-cached profiles so callers get a consistent shape regardless of source. Signed-off-by: André Ahlert <andre@aex.partners> * Phases 3-6: Hybrid lookups, check/workflow caching, directed review questions Phase 3: _fetch_check_status_counts now tries the vault before hitting the GraphQL API. Results are keyed by head_sha so they never go stale. Same for _find_workflow_runs_by_status with a 10-minute TTL. Phase 4: Check status counts are persisted to vault after API fetch. No TTL needed since the same SHA always produces the same check results. Phase 5: generate_review_questions() in pr_vault.py produces deterministic verification questions from the diff (large PR, missing tests, version fields, breaking changes, exception consistency). These are appended to the LLM user message via assess_pr so the model addresses each one. Phase 6: Workflow runs are cached with a 10-minute TTL. This eliminates the 4+ REST calls per PR on repeated triage runs within the TTL window. Signed-off-by: André Ahlert <andre@aex.partners> * Address code review: atomic writes, partial check guard, thread safety, false positives - Use atomic file writes (temp file + os.replace) in CacheStore.save to prevent corrupt reads from concurrent threads. - Skip caching check status when IN_PROGRESS/QUEUED/PENDING counts are present to avoid persisting incomplete CI results. - Add threading.Lock to _author_profile_cache to prevent redundant API calls from concurrent workers. - Scan only added lines (not removed) in generate_review_questions to avoid false positives from removed deprecation notices. - Document that review questions are active in sequential mode only (diff_text not yet available at background LLM submission time). - Document the intentional use of time.time() for persistent TTLs. - Add tests for scan_cached_pr_numbers and invalidate_stale_caches covering success, corrupt files, stale SHA, and multi-cache scenarios. Signed-off-by: André Ahlert <andre@aex.partners> * Fix ruff format and mypy type errors in vault layer Remove extra blank line in pr_commands.py (ruff format). Fix dataclass field annotations in test_pr_vault.py: list -> list | None to match None defaults (mypy assignment error). Signed-off-by: André Ahlert <andre@aex.partners> --------- Signed-off-by: André Ahlert <andre@aex.partners>
1 parent 7efa372 commit f4f48b9

7 files changed

Lines changed: 766 additions & 10 deletions

File tree

dev/breeze/src/airflow_breeze/commands/pr_commands.py

Lines changed: 62 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -58,11 +58,13 @@
5858
from airflow_breeze.utils.pr_cache import (
5959
classification_cache as _classification_cache,
6060
get_cached_assessment as _get_cached_assessment,
61+
get_cached_author_profile as _get_cached_author_profile,
6162
get_cached_classification as _get_cached_classification,
6263
get_cached_review as _get_cached_review,
6364
get_cached_status as _get_cached_status,
6465
review_cache as _review_cache,
6566
save_assessment_cache as _save_assessment_cache,
67+
save_author_profile as _save_author_profile,
6668
save_classification_cache as _save_classification_cache,
6769
save_review_cache as _save_review_cache,
6870
save_status_cache as _save_status_cache,
@@ -210,10 +212,12 @@ def _cached_assess_pr(
210212
pr_body: str,
211213
check_status_summary: str,
212214
llm_model: str,
215+
diff_text: str | None = None,
213216
) -> PRAssessment:
214217
"""Run assess_pr with caching keyed by PR number + commit hash.
215218
216219
Returns cached PRAssessment when the commit hash matches, avoiding redundant LLM calls.
220+
When *diff_text* is provided, generates directed review questions from it.
217221
"""
218222
from airflow_breeze.utils.github import PRAssessment, Violation
219223
from airflow_breeze.utils.llm_utils import assess_pr
@@ -243,6 +247,16 @@ def _cached_assess_pr(
243247
result._from_cache = True # type: ignore[attr-defined]
244248
return result
245249

250+
# Generate directed review questions from the diff if available.
251+
# Note: diff_text is not yet passed by the background thread-pool submissions
252+
# (the diff may not be fetched at LLM submission time). Review questions are
253+
# active when diff_text is provided explicitly (e.g. sequential review mode).
254+
review_questions: list[str] | None = None
255+
if diff_text:
256+
from airflow_breeze.utils.pr_vault import generate_review_questions
257+
258+
review_questions = generate_review_questions(diff_text, pr_body) or None
259+
246260
t_start = time.monotonic()
247261
last_err: Exception | None = None
248262
attempts_made = 0
@@ -255,6 +269,7 @@ def _cached_assess_pr(
255269
pr_body=pr_body,
256270
check_status_summary=check_status_summary,
257271
llm_model=llm_model,
272+
review_questions=review_questions,
258273
)
259274
if not result.error:
260275
break
@@ -1016,7 +1031,14 @@ def _fetch_check_status_counts(token: str, github_repository: str, head_sha: str
10161031
"""Fetch counts of checks by status for a commit. Returns a dict like {"SUCCESS": 5, "FAILURE": 2, ...}.
10171032
10181033
Also includes an "IN_PROGRESS" key for checks still running.
1034+
Tries the local vault first; falls back to the GitHub API.
10191035
"""
1036+
from airflow_breeze.utils.pr_vault import load_check_status, save_check_status
1037+
1038+
cached = load_check_status(github_repository, head_sha)
1039+
if cached is not None:
1040+
return cached
1041+
10201042
owner, repo = github_repository.split("/", 1)
10211043
counts: dict[str, int] = {}
10221044
cursor: str | None = None
@@ -1053,6 +1075,10 @@ def _fetch_check_status_counts(token: str, github_repository: str, head_sha: str
10531075
break
10541076
cursor = page_info.get("endCursor")
10551077

1078+
# Persist to vault for reuse (same SHA = same results)
1079+
if counts:
1080+
save_check_status(github_repository, head_sha, counts)
1081+
10561082
return counts
10571083

10581084

@@ -1788,6 +1814,11 @@ def _fetch_prs_graphql(
17881814
)
17891815
)
17901816

1817+
# Persist fetched PRs to vault for reuse across sessions
1818+
from airflow_breeze.utils.pr_vault import save_prs_batch
1819+
1820+
save_prs_batch(github_repository, prs)
1821+
17911822
return prs, has_next_page, end_cursor, search_data["issueCount"]
17921823

17931824

@@ -1829,6 +1860,7 @@ def _fetch_single_pr_graphql(token: str, github_repository: str, pr_number: int)
18291860

18301861

18311862
_author_profile_cache: dict[str, dict] = {}
1863+
_author_profile_lock = threading.Lock()
18321864

18331865

18341866
def _compute_author_scoring(
@@ -1904,10 +1936,18 @@ def _compute_author_scoring(
19041936
def _fetch_author_profile(token: str, login: str, github_repository: str) -> dict:
19051937
"""Fetch author profile info via GraphQL: account age, PR counts, contributed repos.
19061938
1907-
Results are cached per login so the same author is only queried once.
1939+
Results are cached in memory (per session) and on disk (across sessions, 7-day TTL).
1940+
Thread-safe: uses a lock to avoid redundant API calls from concurrent workers.
19081941
"""
1909-
if login in _author_profile_cache:
1910-
return _author_profile_cache[login]
1942+
with _author_profile_lock:
1943+
if login in _author_profile_cache:
1944+
return _author_profile_cache[login]
1945+
1946+
# Try disk cache before hitting the API
1947+
disk_profile = _get_cached_author_profile(github_repository, login)
1948+
if disk_profile:
1949+
_author_profile_cache[login] = disk_profile
1950+
return disk_profile
19111951

19121952
repo_prefix = f"repo:{github_repository} type:pr author:{login}"
19131953
global_prefix = f"type:pr author:{login}"
@@ -1939,7 +1979,8 @@ def _fetch_author_profile(token: str, login: str, github_repository: str) -> dic
19391979
"contributed_repos": [],
19401980
"contributed_repos_total": 0,
19411981
}
1942-
_author_profile_cache[login] = profile
1982+
with _author_profile_lock:
1983+
_author_profile_cache[login] = profile
19431984
return profile
19441985
user_data = data.get("user") or {}
19451986
created_at = user_data.get("createdAt", "unknown")
@@ -1989,7 +2030,12 @@ def _fetch_author_profile(token: str, login: str, github_repository: str) -> dic
19892030
contrib_total,
19902031
),
19912032
}
1992-
_author_profile_cache[login] = profile
2033+
with _author_profile_lock:
2034+
_author_profile_cache[login] = profile
2035+
2036+
# Persist to disk for reuse across sessions
2037+
_save_author_profile(github_repository, login, profile)
2038+
19932039
return profile
19942040

19952041

@@ -7885,7 +7931,14 @@ def _find_workflow_runs_by_status(
78857931
"""Find workflow runs with a given status for a commit SHA.
78867932
78877933
Common statuses: ``action_required``, ``in_progress``, ``queued``.
7934+
Tries the local vault first (10-minute TTL); falls back to the GitHub REST API.
78887935
"""
7936+
from airflow_breeze.utils.pr_vault import load_workflow_runs, save_workflow_runs
7937+
7938+
cached = load_workflow_runs(github_repository, head_sha, status)
7939+
if cached is not None:
7940+
return cached
7941+
78897942
import requests
78907943

78917944
url = f"https://api.github.com/repos/{github_repository}/actions/runs"
@@ -7900,7 +7953,10 @@ def _find_workflow_runs_by_status(
79007953
return []
79017954
if response.status_code != 200:
79027955
return []
7903-
return response.json().get("workflow_runs", [])
7956+
runs = response.json().get("workflow_runs", [])
7957+
7958+
save_workflow_runs(github_repository, head_sha, status, runs)
7959+
return runs
79047960

79057961

79067962
def _find_pending_workflow_runs(token: str, github_repository: str, head_sha: str) -> list[dict]:

dev/breeze/src/airflow_breeze/utils/llm_utils.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -151,16 +151,22 @@ def _build_user_message(
151151
pr_title: str,
152152
pr_body: str,
153153
check_status_summary: str,
154+
review_questions: list[str] | None = None,
154155
) -> str:
155156
truncated_body = pr_body[:MAX_PR_BODY_CHARS] if pr_body else "(empty)"
156157
if pr_body and len(pr_body) > MAX_PR_BODY_CHARS:
157158
truncated_body += "\n... (truncated)"
158-
return (
159+
msg = (
159160
f"PR #{pr_number}\n"
160161
f"Title: {pr_title}\n\n"
161162
f"Description:\n{truncated_body}\n\n"
162163
f"Check status summary:\n{check_status_summary}\n"
163164
)
165+
if review_questions:
166+
msg += "\nDirected verification questions (address each one):\n"
167+
for i, q in enumerate(review_questions, 1):
168+
msg += f" {i}. {q}\n"
169+
return msg
164170

165171

166172
def _extract_json(text: str) -> str:
@@ -645,10 +651,13 @@ def assess_pr(
645651
pr_body: str,
646652
check_status_summary: str,
647653
llm_model: str,
654+
review_questions: list[str] | None = None,
648655
) -> PRAssessment:
649656
"""Assess a PR using an LLM CLI tool. Returns PRAssessment.
650657
651658
llm_model must be in "provider/model" format (e.g. "claude/claude-3-opus" or "codex/gpt-5.3-codex").
659+
When *review_questions* is provided, they are appended to the user message so the LLM
660+
addresses each one in its assessment.
652661
"""
653662
provider, model = _resolve_cli_provider(llm_model)
654663
caller = _CLI_CALLERS.get(provider)
@@ -658,7 +667,9 @@ def assess_pr(
658667

659668
_check_cli_available(provider)
660669
system_prompt = get_system_prompt()
661-
user_message = _build_user_message(pr_number, pr_title, pr_body, check_status_summary)
670+
user_message = _build_user_message(
671+
pr_number, pr_title, pr_body, check_status_summary, review_questions=review_questions
672+
)
662673

663674
try:
664675
raw = caller(model, system_prompt, user_message)

dev/breeze/src/airflow_breeze/utils/pr_cache.py

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,10 +65,31 @@ def get(self, github_repository: str, key: str, *, match: dict[str, str] | None
6565
return data
6666

6767
def save(self, github_repository: str, key: str, data: dict) -> None:
68-
"""Save *data* as JSON. Automatically adds ``cached_at`` when TTL is configured."""
68+
"""Save *data* as JSON. Automatically adds ``cached_at`` when TTL is configured.
69+
70+
Uses atomic write (temp file + os.replace) to avoid corrupt reads when
71+
multiple threads write the same key concurrently.
72+
"""
73+
import os
74+
import tempfile
75+
6976
if self._ttl_seconds:
77+
# time.time() is intentional here: monotonic clocks reset across process
78+
# restarts, so wall-clock time is the only option for persistent TTLs.
7079
data = {**data, "cached_at": time.time()}
71-
self._file(github_repository, key).write_text(json.dumps(data, indent=2))
80+
target = self._file(github_repository, key)
81+
fd, tmp_path = tempfile.mkstemp(dir=target.parent, suffix=".tmp")
82+
closed = False
83+
try:
84+
os.write(fd, json.dumps(data, indent=2).encode())
85+
os.close(fd)
86+
closed = True
87+
os.replace(tmp_path, target)
88+
except BaseException:
89+
if not closed:
90+
os.close(fd)
91+
Path(tmp_path).unlink(missing_ok=True)
92+
raise
7293

7394

7495
# Concrete cache stores — one per domain
@@ -77,6 +98,7 @@ def save(self, github_repository: str, key: str, data: dict) -> None:
7798
triage_cache = CacheStore("triage_cache")
7899
status_cache = CacheStore("status_cache", ttl_seconds=4 * 3600)
79100
stats_interaction_cache = CacheStore("stats_interaction_cache")
101+
author_cache = CacheStore("author_cache", ttl_seconds=7 * 24 * 3600)
80102

81103

82104
# Convenience functions for common cache operations
@@ -142,6 +164,23 @@ def save_status_cache(github_repository: str, cache_key: str, payload: dict | li
142164
status_cache.save(github_repository, cache_key, {"payload": payload})
143165

144166

167+
def get_cached_author_profile(github_repository: str, login: str) -> dict | None:
168+
"""Load a cached author profile. Returns None if missing or expired (7-day TTL).
169+
170+
Strips the internal ``cached_at`` field so callers get the same shape
171+
regardless of whether the profile came from disk or the API.
172+
"""
173+
data = author_cache.get(github_repository, f"author_{login}")
174+
if data is not None:
175+
data.pop("cached_at", None)
176+
return data
177+
178+
179+
def save_author_profile(github_repository: str, login: str, profile: dict) -> None:
180+
"""Persist an author profile to disk."""
181+
author_cache.save(github_repository, f"author_{login}", profile)
182+
183+
145184
# PR-keyed caches that store head_sha and should be validated on startup
146185
_PR_CACHES: list[CacheStore] = [review_cache, classification_cache, triage_cache]
147186

0 commit comments

Comments
 (0)