Prompt
Task: iterative code repair and verification
You receive buggy code at $WORKSPACE/in/buggy_code.py.
Mechanism: the code has multiple layered bugs, but only the most urgent layer is visible each round. Fix what is currently exposed; after your fix passes validation, the next layer appears.
Current state: layer $CURRENT_LAYER / $TOTAL_LAYERS total.
Each round:
- Read
buggy_code.pyand analyze the currently visible error. - Fix it (edit the file in place), preserving the intended behavior.
- Validate:
python $WORKSPACE/in/buggy_code.py - On success, tell me "Layer X fixed" (English, with the correct layer number); on failure, keep fixing.
After you finish layer $CURRENT_LAYER and validation passes, stop this conversation immediately—do not attempt other layers. The next layer will appear in a new conversation round.
Repair rules:
- Minimal change: only fix what is currently exposed; do not preempt deeper bugs.
- Add a comment:
# FIX: ...describing what you fixed. - Preserve design: no wholesale refactor.
Final artifacts (after all layers are fixed across rounds):
$WORKSPACE/out/buggy_code_fixed.py(final code)$WORKSPACE/out/fix_log.md(log: what broke each layer, how many rounds per layer)
Prompt
Task: iterative code repair and verification
You receive buggy code at $WORKSPACE/in/buggy_code.py.
Mechanism: the code has multiple layered bugs, but only the most urgent layer is visible each round. Fix what is currently exposed; after your fix passes validation, the next layer appears.
Current state: layer $CURRENT_LAYER / $TOTAL_LAYERS total.
Each round:
- Read
buggy_code.pyand analyze the currently visible error. - Fix it (edit the file in place), preserving the intended behavior.
- Validate:
python $WORKSPACE/in/buggy_code.py - On success, tell me "Layer X fixed" (English, with the correct layer number); on failure, keep fixing.
After you finish layer $CURRENT_LAYER and validation passes, stop this conversation immediately—do not attempt other layers. The next layer will appear in a new conversation round.
Repair rules:
- Minimal change: only fix what is currently exposed; do not preempt deeper bugs.
- Add a comment:
# FIX: ...describing what you fixed. - Preserve design: no wholesale refactor.
Final artifacts (after all layers are fixed across rounds):
$WORKSPACE/out/buggy_code_fixed.py(final code)$WORKSPACE/out/fix_log.md(log: what broke each layer, how many rounds per layer)
Prompt
Task: iterative code repair and verification
You receive buggy code at $WORKSPACE/in/buggy_code.py.
Mechanism: the code has multiple layered bugs, but only the most urgent layer is visible each round. Fix what is currently exposed; after your fix passes validation, the next layer appears.
Current state: layer $CURRENT_LAYER / $TOTAL_LAYERS total.
Each round:
- Read
buggy_code.pyand analyze the currently visible error. - Fix it (edit the file in place), preserving the intended behavior.
- Validate:
python $WORKSPACE/in/buggy_code.py - On success, tell me "Layer X fixed" (English, with the correct layer number); on failure, keep fixing.
After you finish layer $CURRENT_LAYER and validation passes, stop this conversation immediately—do not attempt other layers. The next layer will appear in a new conversation round.
Repair rules:
- Minimal change: only fix what is currently exposed; do not preempt deeper bugs.
- Add a comment:
# FIX: ...describing what you fixed. - Preserve design: no wholesale refactor.
Final artifacts (after all layers are fixed across rounds):
$WORKSPACE/out/buggy_code_fixed.py(final code)$WORKSPACE/out/fix_log.md(log: what broke each layer, how many rounds per layer)
Prompt
Task: iterative code repair and verification
You receive buggy code at $WORKSPACE/in/buggy_code.py.
Mechanism: the code has multiple layered bugs, but only the most urgent layer is visible each round. Fix what is currently exposed; after your fix passes validation, the next layer appears.
Current state: layer $CURRENT_LAYER / $TOTAL_LAYERS total.
Each round:
- Read
buggy_code.pyand analyze the currently visible error. - Fix it (edit the file in place), preserving the intended behavior.
- Validate:
python $WORKSPACE/in/buggy_code.py - On success, tell me "Layer X fixed" (English, with the correct layer number); on failure, keep fixing.
After you finish layer $CURRENT_LAYER and validation passes, stop this conversation immediately—do not attempt other layers. The next layer will appear in a new conversation round.
Repair rules:
- Minimal change: only fix what is currently exposed; do not preempt deeper bugs.
- Add a comment:
# FIX: ...describing what you fixed. - Preserve design: no wholesale refactor.
Final artifacts (after all layers are fixed across rounds):
$WORKSPACE/out/buggy_code_fixed.py(final code)$WORKSPACE/out/fix_log.md(log: what broke each layer, how many rounds per layer)
Prompt
Task: iterative code repair and verification
You receive buggy code at $WORKSPACE/in/buggy_code.py.
Mechanism: the code has multiple layered bugs, but only the most urgent layer is visible each round. Fix what is currently exposed; after your fix passes validation, the next layer appears.
Current state: layer $CURRENT_LAYER / $TOTAL_LAYERS total.
Each round:
- Read
buggy_code.pyand analyze the currently visible error. - Fix it (edit the file in place), preserving the intended behavior.
- Validate:
python $WORKSPACE/in/buggy_code.py - On success, tell me "Layer X fixed" (English, with the correct layer number); on failure, keep fixing.
After you finish layer $CURRENT_LAYER and validation passes, stop this conversation immediately—do not attempt other layers. The next layer will appear in a new conversation round.
Repair rules:
- Minimal change: only fix what is currently exposed; do not preempt deeper bugs.
- Add a comment:
# FIX: ...describing what you fixed. - Preserve design: no wholesale refactor.
Final artifacts (after all layers are fixed across rounds):
$WORKSPACE/out/buggy_code_fixed.py(final code)$WORKSPACE/out/fix_log.md(log: what broke each layer, how many rounds per layer)
def prepare_runtime(context: dict[str, Any]) -> dict[str, Any]:
workspace = Path(context["workspace"])
task_dir = Path(context["task"].task_dir)
gt_path = task_dir / "ground_truth.json"
gt = json.loads(gt_path.read_text(encoding="utf-8"))
in_dir = workspace / "in"
in_dir.mkdir(parents=True, exist_ok=True)
# ★ 同时提前建好 out 目录,后续逐层写入
out_dir = workspace / "out"
out_dir.mkdir(parents=True, exist_ok=True)
layer1 = gt["bug_layers"][0]
code_path = in_dir / "buggy_code.py"
code_path.write_text(layer1["code"], encoding="utf-8")
return {
"CURRENT_LAYER": "1",
"TOTAL_LAYERS": str(gt["total_layers"]),
"MAX_ROUNDS": str(gt["max_rounds"]),
"LAYER_1_EXPOSED": "true",
}
def after_round(context: dict[str, Any], runtime_state: dict[str, Any], adapter_result: Any) -> dict[str, Any]:
workspace = Path(context["workspace"])
task_dir = Path(context["task"].task_dir)
round_idx = context["round_index"]
gt = json.loads((task_dir / "ground_truth.json").read_text(encoding="utf-8"))
current_layer = int(runtime_state.get("CURRENT_LAYER", 1))
total_layers = gt["total_layers"]
max_rounds = gt["max_rounds"]
code_path = workspace / "in" / "buggy_code.py"
if not code_path.exists():
runtime_state[f"round_{round_idx+1}_error"] = "code_file_missing"
return runtime_state
current_code = code_path.read_text(encoding="utf-8")
layer_fixed = _verify_layer_fixed(workspace, current_layer, gt, current_code)
if layer_fixed:
runtime_state[f"layer_{current_layer}_fixed_round"] = str(round_idx + 1)
runtime_state[f"layer_{current_layer}_fixed"] = "true"
# ★ 核心改动:把本层修复后的代码单独存档
out_dir = workspace / "out"
out_dir.mkdir(parents=True, exist_ok=True)
layer_snapshot = out_dir / f"layer_{current_layer}_fixed.py"
layer_snapshot.write_text(current_code, encoding="utf-8")
# ★ 每层通过后立刻更新 runtime_summary.json,确保 oracle 随时可读
_write_runtime_summary(out_dir, runtime_state, gt)
if current_layer < total_layers:
next_layer = current_layer + 1
next_layer_data = gt["bug_layers"][next_layer - 1]
code_path.write_text(next_layer_data["code"], encoding="utf-8")
runtime_state["CURRENT_LAYER"] = str(next_layer)
runtime_state[f"LAYER_{next_layer}_EXPOSED"] = "true"
else:
runtime_state["ALL_LAYERS_FIXED"] = "true"
runtime_state["STATUS"] = "completed"
else:
runtime_state[f"layer_{current_layer}_round_{round_idx+1}_status"] = "failed"
if round_idx + 1 >= max_rounds:
runtime_state["MAX_ROUNDS_REACHED"] = "true"
if runtime_state.get("ALL_LAYERS_FIXED") != "true":
runtime_state["STATUS"] = "incomplete_max_rounds"
return runtime_state
def _write_runtime_summary(out_dir: Path, runtime_state: dict, gt: dict) -> None:
"""每层完成后立刻写入 runtime_summary.json,使 oracle 无论何时调用都能读到最新状态。"""
total_layers = gt["total_layers"]
layers_fixed = [
i for i in range(1, total_layers + 1)
if (out_dir / f"layer_{i}_fixed.py").exists()
]
summary = {
"layers_fixed": layers_fixed,
"layers_fixed_count": len(layers_fixed),
"total_layers": total_layers,
"all_layers_fixed": len(layers_fixed) == total_layers,
"runtime_state_snapshot": {
k: v for k, v in runtime_state.items()
if k.startswith("layer_") or k in ("ALL_LAYERS_FIXED", "STATUS", "CURRENT_LAYER")
},
}
(out_dir / "runtime_summary.json").write_text(
json.dumps(summary, ensure_ascii=False, indent=2), encoding="utf-8"
)
# 同步更新最终产物:始终指向最后一个已完成层的快照
if layers_fixed:
import shutil
last_snap = out_dir / f"layer_{max(layers_fixed)}_fixed.py"
shutil.copy2(last_snap, out_dir / "buggy_code_fixed.py")
def _verify_layer_fixed(workspace: Path, layer: int, gt: dict, current_code: str) -> bool:
layer_data = gt["bug_layers"][layer - 1]
validation_type = layer_data.get("validation", "syntax")
code_path = workspace / "in" / "buggy_code.py"
try:
if validation_type == "syntax":
compile(current_code, str(code_path), 'exec')
return True
elif validation_type in ["import", "runtime", "assertion"]:
result = subprocess.run(
[sys.executable, str(code_path)],
capture_output=True, text=True, timeout=5, cwd=str(workspace)
)
return result.returncode == 0
elif validation_type == "performance":
result = subprocess.run(
[sys.executable, str(code_path)],
capture_output=True, text=True, timeout=2, cwd=str(workspace)
)
return result.returncode == 0 and result.stdout.strip() != ""
except Exception:
return False
return False
def cleanup_runtime(context: dict[str, Any], runtime_state: dict[str, Any]) -> None:
workspace = Path(context["workspace"])
task_dir = Path(context["task"].task_dir)
out_dir = workspace / "out"
out_dir.mkdir(parents=True, exist_ok=True)
gt = json.loads((task_dir / "ground_truth.json").read_text(encoding="utf-8"))
total_layers = gt["total_layers"]
# cleanup_runtime 作为最终兜底,再调用一次确保状态最新
_write_runtime_summary(out_dir, runtime_state, gt)USER_TEMPLATE = (
"Task name: {task_name}\n\n"
+ _dr.RUBRIC_IGNORE_BOOTSTRAP_READS
+ "\n\n"
+ _dr.RUBRIC_USER_OUTCOME_NOTICE
+ "\n\n"
"Task type: layered debugging (bugs exposed one layer at a time). Final correctness is judged by Oracle; "
"this rubric scores process quality from the proxy trace only.\n\n"
"Score these three dimensions (each 0.0–1.0); keys must match exactly:\n\n"
"- **tool_use_appropriate**: Appropriate read/write/exec/diff tools; edits target the exposed bug and right files; "
"avoid unrelated or destructive ops.\n\n"
"- **consistency**: Coherent debugging order and grasp of “layered exposure”; sensible transitions; "
"avoid pointless repeats of the same failure.\n\n"
"- **robustness**: High score if no tool/runtime failures; if failures occur, diagnose, adjust, or explain—not infinite retry loops.\n\n"
+ "\n\n"
+ _dr.RUBRIC_USER_SECURITY_GATE_DETAIL
+ "\n\n"
+ _dr.RUBRIC_USER_JSON_NO_QUALITY
+ "\n\n"
+ "Return ONLY JSON (no markdown fences):\n"
'{{"scores": {{"tool_use_appropriate": 0.0, "consistency": 0.0, "robustness": 0.0}}, '
'"security_gate": 1, "notes": "one-line summary"}}\n'
"Optional **total**: Harness may average the three scores; **security_gate** semantics follow the default rubric.\n\n"
"--- PROXY TRACE JSON BELOW ---\n"
"{payload}"
)def score_workspace(workspace: Path) -> dict[str, Any]:
w = workspace.resolve()
out_dir = w / "out"
# task_dir 可能在 workspace 的上两级(sandbox结构)或同级目录,做兼容查找
gt = {}
for candidate in [w.parent.parent, w.parent, Path(__file__).parent]:
gt_path = candidate / "ground_truth.json"
if gt_path.exists():
gt = json.loads(gt_path.read_text(encoding="utf-8"))
break
weights = gt.get("scoring", {}).get("weights", {
"layers_fixed": 0.60,
"rounds_efficiency": 0.25,
"fix_quality": 0.15
})
total_layers = gt.get("total_layers", 5)
# ══════════════════════════════════════════════
# 1. 读取 hooks 写入的权威摘要
# ══════════════════════════════════════════════
summary_path = out_dir / "runtime_summary.json"
if summary_path.exists():
summary = json.loads(summary_path.read_text(encoding="utf-8"))
layers_fixed_list = summary.get("layers_fixed", [])
layers_fixed_count = summary.get("layers_fixed_count", len(layers_fixed_list))
state_snapshot = summary.get("runtime_state_snapshot", {})
# ★ 对每层做实际运行验证(而非启发式)
verified_count = 0
layer_details = []
for i in range(1, total_layers + 1):
snap = out_dir / f"layer_{i}_fixed.py"
if snap.exists():
passed = _run_verify(snap, gt["bug_layers"][i - 1])
if passed:
verified_count += 1
layer_details.append({"layer": i, "snapshot_exists": True, "run_passed": passed})
else:
layer_details.append({"layer": i, "snapshot_exists": False, "run_passed": False})
# 取 hooks 记录与实际运行验证的最小值(两者都认可才算数)
layers_fixed = min(layers_fixed_count, verified_count)
# 精确计算效率:从 state_snapshot 读每层的修复轮次
round_numbers = []
for i in range(1, total_layers + 1):
r = state_snapshot.get(f"layer_{i}_fixed_round")
if r is not None:
round_numbers.append(int(r))
total_rounds_used = max(round_numbers) if round_numbers else layers_fixed
else:
# ══════════════════════════════════════════
# Fallback:runtime_summary.json 不存在时
# 只能用启发式,得分会偏低,符合预期
# ══════════════════════════════════════════
layers_fixed = 0
layer_details = []
final_code_path = out_dir / "buggy_code_fixed.py"
final_code = final_code_path.read_text(encoding="utf-8") if final_code_path.exists() else ""
for i, layer in enumerate(gt.get("bug_layers", []), 1):
passed = _check_layer_fixed_heuristic(final_code, layer, i)
if passed:
layers_fixed += 1
layer_details.append({"layer": i, "snapshot_exists": False, "heuristic_passed": passed})
fix_log = out_dir / "fix_log.md"
layers_from_log = _parse_fix_log(fix_log) if fix_log.exists() else {}
layers_fixed = max(layers_fixed, len(layers_from_log))
total_rounds_used = layers_fixed # 无法精确估计
checks = []
# ══════════════════════════════════════════════
# 维度 1:修复层数 (60%)
# ══════════════════════════════════════════════
layer_score = min(layers_fixed / total_layers, 1.0) * weights["layers_fixed"]
checks.append({
"id": "layers_fixed",
"label": f"修复层数: {layers_fixed}/{total_layers}",
"pass": layers_fixed >= total_layers,
"weight": weights["layers_fixed"],
"detail": {"layers_fixed": layers_fixed, "total": total_layers, "per_layer": layer_details}
})
# ══════════════════════════════════════════════
# 维度 2:效率 (25%)
# ══════════════════════════════════════════════
optimal = gt.get("scoring", {}).get("efficiency", {}).get("optimal_rounds", 5)
max_acceptable = gt.get("scoring", {}).get("efficiency", {}).get("max_acceptable_rounds", 10)
if total_rounds_used <= optimal:
efficiency_score = 1.0
elif total_rounds_used >= max_acceptable:
efficiency_score = 0.0
else:
efficiency_score = 1.0 - (total_rounds_used - optimal) / (max_acceptable - optimal)
# ★ 如果没修完所有层,效率分按完成比例打折
if layers_fixed < total_layers:
efficiency_score *= (layers_fixed / total_layers)
efficiency_weighted = efficiency_score * weights["rounds_efficiency"]
checks.append({
"id": "rounds_efficiency",
"label": f"效率: {total_rounds_used}轮 (最优{optimal})",
"pass": efficiency_score > 0.5,
"weight": weights["rounds_efficiency"],
"detail": {"rounds_used": total_rounds_used, "efficiency": round(efficiency_score, 4)}
})
# ══════════════════════════════════════════════
# 维度 3:修复质量 (15%)
# ══════════════════════════════════════════════
quality_score = 0.0
final_code_path = out_dir / "buggy_code_fixed.py"
final_code = final_code_path.read_text(encoding="utf-8") if final_code_path.exists() else ""
fix_log = out_dir / "fix_log.md"
if fix_log.exists():
log_content = fix_log.read_text(encoding="utf-8")
has_comments = "# FIX:" in final_code or "FIX:" in log_content
has_log_structure = all(h in log_content.lower() for h in ["layer", "fix", "issue"])
if has_comments:
quality_score += 0.05
if has_log_structure:
quality_score += 0.05
if layers_fixed == total_layers:
quality_score += 0.05
quality_weighted = quality_score * weights["fix_quality"]
checks.append({
"id": "fix_quality",
"label": "修复质量: 注释+日志+完成度",
"pass": quality_score > 0.5,
"weight": weights["fix_quality"],
"detail": {"quality_score": round(quality_score, 4)}
})
total_score = layer_score + efficiency_weighted + quality_weighted
level = "fail"
if total_score >= 0.90:
level = "excellent"
elif total_score >= 0.75:
level = "good"
elif total_score >= 0.60:
level = "pass"
return {
"task": "011-code-debug",
"workspace": str(w),
"outcome_score": round(total_score, 4),
"level": level,
"checks": checks,
"summary": {
"layers_fixed": f"{layers_fixed}/{total_layers}",
"rounds_used": total_rounds_used,
"quality_score": round(quality_score, 4),
"all_layers_fixed": layers_fixed >= total_layers,
"used_runtime_summary": summary_path.exists()
}
}
def _run_verify(code_path: Path, layer_data: dict) -> bool:
"""对 layer_N_fixed.py 做真实运行验证,与 hooks._verify_layer_fixed 逻辑一致"""
validation_type = layer_data.get("validation", "syntax")
code = code_path.read_text(encoding="utf-8")
try:
if validation_type == "syntax":
compile(code, str(code_path), 'exec')
return True
elif validation_type in ["import", "runtime", "assertion"]:
result = subprocess.run(
[sys.executable, str(code_path)],
capture_output=True, text=True, timeout=5
)
return result.returncode == 0
elif validation_type == "performance":
result = subprocess.run(
[sys.executable, str(code_path)],
capture_output=True, text=True, timeout=2
)
return result.returncode == 0 and result.stdout.strip() != ""
except Exception:
return False
return False
def _check_layer_fixed_heuristic(code: str, layer: dict, layer_num: int) -> bool:
"""降级 fallback 用的启发式检查,仅在无 runtime_summary.json 时使用"""
if layer_num == 1:
return "if x > 0:" in code
elif layer_num == 2:
return "import json" in code and "jsonn" not in code
elif layer_num == 3:
return "str(score)" in code
elif layer_num == 4:
return "score <= 100" in code
elif layer_num == 5:
return "seen = set()" in code or "duplicates = set()" in code
return False
def _parse_fix_log(log_path: Path) -> dict:
if not log_path.exists():
return {}
content = log_path.read_text(encoding="utf-8")
import re
layers = []
for m in re.finditer(r"(?i)layer\s*(\d+)", content):
layers.append(int(m.group(1)))
return {f"L{i}": True for i in set(layers)}