-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrun_pipeline.py
More file actions
executable file
·383 lines (299 loc) · 12.8 KB
/
run_pipeline.py
File metadata and controls
executable file
·383 lines (299 loc) · 12.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
#!/usr/bin/env python3
"""CLI pipeline runner for agent invocation.
Usage:
python generate/run_pipeline.py # full pipeline (fetch + extract + assemble + validate)
python generate/run_pipeline.py --stage extract # run single stage
python generate/run_pipeline.py --validate-only # validate existing candidate
python generate/run_pipeline.py --json # JSON summary only
"""
import argparse
import json
import re
import sys
import time
from pathlib import Path
ROOT = Path(__file__).parent
sys.path.insert(0, str(ROOT))
from src.sanitizer import Sanitizer
from src.markdown_extractor import MarkdownExtractor
from src.assembler import Assembler
from src.llm import LLM
from src.code_validator import Validator
from src.syntax_validator import SyntaxValidator
def load_config():
import yaml
with open(ROOT / "config" / "config.yaml") as f:
return yaml.safe_load(f)
def log(msg, quiet=False):
if not quiet:
print(msg, flush=True)
def run_fetch(config, quiet=False):
log("[FETCH] Starting...", quiet)
t0 = time.time()
src_dir = ROOT / config.get("source_dir", "docs")
out_dir = ROOT / "output" / "0_sanitized"
sanitizer = Sanitizer(config)
def progress(source_id, current, total):
log(f"[FETCH] Source {current}/{total}: {source_id}", quiet)
stats = sanitizer.run(src_dir, out_dir, progress)
duration = time.time() - t0
kept = stats.get("kept_files", 0)
excluded = stats.get("excluded_files", 0)
log(f"[FETCH] Complete: {kept} files kept, {excluded} excluded ({duration:.1f}s)", quiet)
return {
"status": "complete",
"duration": round(duration, 1),
"files_kept": kept,
"files_excluded": excluded,
}
def run_extract(config, quiet=False):
log("[EXTRACT] Starting...", quiet)
t0 = time.time()
sanitized_dir = ROOT / "output" / "0_sanitized"
extractor = MarkdownExtractor(config)
extracted = extractor.extract_from_directory(sanitized_dir)
duration = time.time() - t0
log(
f"[EXTRACT] Signatures: {extracted.total_signatures}, "
f"Examples: {extracted.total_examples}, "
f"Keywords: {len(extracted.keywords_found)}",
quiet,
)
log(f"[EXTRACT] Complete ({duration:.1f}s)", quiet)
return extracted, extractor, {
"status": "complete",
"duration": round(duration, 1),
"signatures": extracted.total_signatures,
"examples": extracted.total_examples,
"keywords": len(extracted.keywords_found),
}
def ensure_rules_jsonl(quiet=False):
"""Generate rules.jsonl from rag_rules.txt if missing or stale."""
rules_path = ROOT / "config" / "rules.jsonl"
prompt_path = ROOT / "config" / "rag_rules.txt"
if not prompt_path.exists():
return rules_path
needs_rebuild = (
not rules_path.exists()
or prompt_path.stat().st_mtime > rules_path.stat().st_mtime
)
if needs_rebuild:
log("[RAG] Generating rules.jsonl from rag_rules.txt...", quiet)
sys.path.insert(0, str(ROOT / "scripts"))
from split_rules import main as split_main
split_main()
return rules_path
def init_rag(config, extracted, quiet=False):
"""Initialize RAG retriever. Raises on failure -- no monolithic fallback."""
rag_config = config.get("rag", {})
if not rag_config.get("enabled", True):
raise RuntimeError("RAG is disabled in config but is required for pipeline execution")
from src.rag import RAGRetriever
retriever = RAGRetriever(config)
if not retriever.available:
raise RuntimeError(
"RAG dependencies not installed (sentence-transformers, chromadb, numpy). "
"Install with: pip install sentence-transformers chromadb numpy"
)
rules_path = ensure_rules_jsonl(quiet)
rules_count = retriever.ensure_rules_indexed(rules_path)
log(f"[RAG] Rules indexed: {rules_count}", quiet)
examples_count = retriever.index_extracted_examples(extracted)
log(f"[RAG] Examples indexed: {examples_count}", quiet)
return retriever
def fetch_jaclang_version():
"""Fetch current jaclang major.minor version from upstream."""
import urllib.request
try:
url = "https://raw.githubusercontent.com/jaseci-labs/jaseci/main/jac/pyproject.toml"
with urllib.request.urlopen(url, timeout=10) as resp:
for line in resp.read().decode().splitlines():
if line.startswith("version"):
return ".".join(line.split('"')[1].split(".")[:2])
except Exception:
return None
def check_version_and_archive(quiet=False):
"""Check jaclang version; archive old candidate if version changed."""
release_dir = ROOT / "release"
version_file = release_dir / "VERSION"
current = version_file.read_text().strip() if version_file.exists() else ""
upstream = fetch_jaclang_version()
if not upstream:
log("[VERSION] Could not fetch jaclang version, skipping check", quiet)
return current
log(f"[VERSION] Current: {current or '<none>'}, Upstream: {upstream}", quiet)
if current and current != upstream:
archive_dir = release_dir / current
archive_dir.mkdir(parents=True, exist_ok=True)
candidate = release_dir / "jac-llmdocs.md"
validation = ROOT / "jac-llmdocs.validation.json"
if candidate.exists():
(archive_dir / "jac-llmdocs.md").write_text(candidate.read_text())
if validation.exists():
(archive_dir / "jac-llmdocs.validation.json").write_text(validation.read_text())
log(f"[VERSION] Archived release/{current}/", quiet)
version_file.write_text(upstream + "\n")
return upstream
def run_assemble(config, extracted, extractor, quiet=False):
log("[ASSEMBLE] Starting LLM assembly...", quiet)
t0 = time.time()
rag_retriever = init_rag(config, extracted, quiet)
llm = LLM(config, config.get("assembly", {}))
token_count = [0]
def on_token(token):
token_count[0] += 1
if token_count[0] % 100 == 0 and not quiet:
print(".", end="", flush=True)
def on_progress(current, total, msg):
log(f"[ASSEMBLE] {msg}", quiet)
assembler = Assembler(llm, config, on_progress=on_progress, on_token=on_token, rag_retriever=rag_retriever)
result = assembler.assemble(extracted, extractor)
if token_count[0] >= 100 and not quiet:
print(flush=True)
# Stamp the correct version into the header deterministically
version_file = ROOT / "release" / "VERSION"
if version_file.exists():
ver = version_file.read_text().strip()
result = re.sub(
r"^(# Jac Language Reference\s*)\(v[\d.]+\)",
rf"\1(v{ver})",
result,
count=1,
)
output_dir = ROOT / "output" / "2_final"
output_dir.mkdir(parents=True, exist_ok=True)
(output_dir / "jac_reference.txt").write_text(result)
release_dir = ROOT / "release"
release_dir.mkdir(exist_ok=True)
(release_dir / "jac-llmdocs.md").write_text(result)
duration = time.time() - t0
mode = "RAG-enhanced" if rag_retriever else "monolithic"
log(
f"[ASSEMBLE] Output: {len(result):,} bytes saved to release/jac-llmdocs.md ({duration:.1f}s, {mode})",
quiet,
)
return result, {
"status": "complete",
"duration": round(duration, 1),
"output_size": len(result),
"tokens_streamed": token_count[0],
"mode": mode,
}
def run_validate(text, quiet=False):
log("[VALIDATE] Running validation...", quiet)
t0 = time.time()
validator = Validator()
final_result = validator.validate_final(text)
patterns = validator.find_patterns(text)
def strict_progress(current, total, msg):
if current == total:
log(f"[VALIDATE] Strict check progress: {current}/{total}", quiet)
strict_result = validator.validate_strict(text, fail_on_error=False, on_progress=strict_progress)
docs_validator = SyntaxValidator()
syntax_verification = docs_validator.validate_syntax_in_output(text)
incorrect_syntax = [v for v in syntax_verification if not v.matches_docs and v.found_in_output]
all_syntax_correct = len(incorrect_syntax) == 0
is_valid = final_result.is_valid and strict_result.failed == 0 and all_syntax_correct
recommendation = "PASS" if (strict_result.failed == 0 and all_syntax_correct) else "REVIEW"
duration = time.time() - t0
log(
f"[VALIDATE] Strict: {strict_result.passed} passed, "
f"{strict_result.failed} failed, {strict_result.skipped} skipped",
quiet,
)
log(f"[VALIDATE] Syntax: {'all correct' if all_syntax_correct else f'{len(incorrect_syntax)} incorrect'}", quiet)
log(f"[VALIDATE] Recommendation: {recommendation} ({duration:.1f}s)", quiet)
validation_data = {
"is_valid": is_valid,
"strict": {
"total": strict_result.total_blocks,
"passed": strict_result.passed,
"failed": strict_result.failed,
"skipped": strict_result.skipped,
"pass_rate": strict_result.pass_rate,
"errors": strict_result.errors[:10] if hasattr(strict_result, "errors") else [],
},
"syntax": {
"all_correct": all_syntax_correct,
"incorrect": [
{"construct": v.construct, "expected": v.expected}
for v in incorrect_syntax
],
},
"patterns": {
"found": len(patterns),
"total": len(validator.CRITICAL_PATTERNS),
"missing": final_result.missing_patterns,
},
"recommendation": recommendation,
}
(ROOT / "jac-llmdocs.validation.json").write_text(json.dumps(validation_data, indent=2))
return validation_data
def main():
parser = argparse.ArgumentParser(description="Run the Jac docs generation pipeline")
parser.add_argument("--stage", choices=["fetch", "extract", "assemble"], help="Run single stage")
parser.add_argument("--validate-only", action="store_true", help="Validate existing release/jac-llmdocs.md")
parser.add_argument("--json", action="store_true", help="Output JSON summary only")
args = parser.parse_args()
quiet = args.json
summary = {"success": False, "stages": {}, "validation": None, "output_path": None}
try:
if args.validate_only:
candidate = ROOT / "release" / "jac-llmdocs.md"
if not candidate.exists():
log("[ERROR] release/jac-llmdocs.md not found", quiet)
summary["error"] = "release/jac-llmdocs.md not found"
print_summary(summary)
sys.exit(2)
text = candidate.read_text()
log(f"[VALIDATE] Loaded {len(text):,} bytes from release/jac-llmdocs.md", quiet)
summary["validation"] = run_validate(text, quiet)
summary["output_path"] = str(candidate)
summary["success"] = summary["validation"]["recommendation"] == "PASS"
print_summary(summary)
sys.exit(0 if summary["success"] else 1)
config = load_config()
version = check_version_and_archive(quiet)
summary["jaclang_version"] = version
stages_to_run = ["fetch", "extract", "assemble"]
if args.stage:
stages_to_run = [args.stage]
extracted = None
extractor = None
result_text = None
for stage in stages_to_run:
if stage == "fetch":
summary["stages"]["fetch"] = run_fetch(config, quiet)
elif stage == "extract":
extracted, extractor, stats = run_extract(config, quiet)
summary["stages"]["extract"] = stats
elif stage == "assemble":
if extracted is None:
extracted, extractor, stats = run_extract(config, quiet)
summary["stages"]["extract"] = stats
result_text, stats = run_assemble(config, extracted, extractor, quiet)
summary["stages"]["assemble"] = stats
if result_text:
summary["validation"] = run_validate(result_text, quiet)
summary["output_path"] = "release/jac-llmdocs.md"
summary["success"] = summary["validation"]["recommendation"] == "PASS"
else:
summary["success"] = True
print_summary(summary)
sys.exit(0 if summary.get("success") else 1)
except KeyboardInterrupt:
log("\n[ABORT] Interrupted by user", quiet)
summary["error"] = "interrupted"
print_summary(summary)
sys.exit(2)
except Exception as e:
log(f"[ERROR] {e}", quiet)
summary["error"] = str(e)
print_summary(summary)
sys.exit(2)
def print_summary(summary):
print("\n---JSON_SUMMARY---")
print(json.dumps(summary, indent=2, default=str))
print("---END_SUMMARY---", flush=True)
if __name__ == "__main__":
main()