Iterative Code Repair And Verification

Task: iterative code repair and verification

Software Engineering & Codebase MaintenanceTask 2Oracle + LLM scoring
Model Runs6 harnesses & 8 models evaluated on this task.
Loading...
PromptSoftware Engineering & Codebase Maintenance · Task 2

Prompt

Task: iterative code repair and verification

You receive buggy code at $WORKSPACE/in/buggy_code.py.

Mechanism: the code has multiple layered bugs, but only the most urgent layer is visible each round. Fix what is currently exposed; after your fix passes validation, the next layer appears.

Current state: layer $CURRENT_LAYER / $TOTAL_LAYERS total.

Each round:

  1. Read buggy_code.py and analyze the currently visible error.
  2. Fix it (edit the file in place), preserving the intended behavior.
  3. Validate: python $WORKSPACE/in/buggy_code.py
  4. On success, tell me "Layer X fixed" (English, with the correct layer number); on failure, keep fixing.

After you finish layer $CURRENT_LAYER and validation passes, stop this conversation immediately—do not attempt other layers. The next layer will appear in a new conversation round.

Repair rules:

  • Minimal change: only fix what is currently exposed; do not preempt deeper bugs.
  • Add a comment: # FIX: ... describing what you fixed.
  • Preserve design: no wholesale refactor.

Final artifacts (after all layers are fixed across rounds):

  • $WORKSPACE/out/buggy_code_fixed.py (final code)
  • $WORKSPACE/out/fix_log.md (log: what broke each layer, how many rounds per layer)

Prompt

Task: iterative code repair and verification

You receive buggy code at $WORKSPACE/in/buggy_code.py.

Mechanism: the code has multiple layered bugs, but only the most urgent layer is visible each round. Fix what is currently exposed; after your fix passes validation, the next layer appears.

Current state: layer $CURRENT_LAYER / $TOTAL_LAYERS total.

Each round:

  1. Read buggy_code.py and analyze the currently visible error.
  2. Fix it (edit the file in place), preserving the intended behavior.
  3. Validate: python $WORKSPACE/in/buggy_code.py
  4. On success, tell me "Layer X fixed" (English, with the correct layer number); on failure, keep fixing.

After you finish layer $CURRENT_LAYER and validation passes, stop this conversation immediately—do not attempt other layers. The next layer will appear in a new conversation round.

Repair rules:

  • Minimal change: only fix what is currently exposed; do not preempt deeper bugs.
  • Add a comment: # FIX: ... describing what you fixed.
  • Preserve design: no wholesale refactor.

Final artifacts (after all layers are fixed across rounds):

  • $WORKSPACE/out/buggy_code_fixed.py (final code)
  • $WORKSPACE/out/fix_log.md (log: what broke each layer, how many rounds per layer)

Prompt

Task: iterative code repair and verification

You receive buggy code at $WORKSPACE/in/buggy_code.py.

Mechanism: the code has multiple layered bugs, but only the most urgent layer is visible each round. Fix what is currently exposed; after your fix passes validation, the next layer appears.

Current state: layer $CURRENT_LAYER / $TOTAL_LAYERS total.

Each round:

  1. Read buggy_code.py and analyze the currently visible error.
  2. Fix it (edit the file in place), preserving the intended behavior.
  3. Validate: python $WORKSPACE/in/buggy_code.py
  4. On success, tell me "Layer X fixed" (English, with the correct layer number); on failure, keep fixing.

After you finish layer $CURRENT_LAYER and validation passes, stop this conversation immediately—do not attempt other layers. The next layer will appear in a new conversation round.

Repair rules:

  • Minimal change: only fix what is currently exposed; do not preempt deeper bugs.
  • Add a comment: # FIX: ... describing what you fixed.
  • Preserve design: no wholesale refactor.

Final artifacts (after all layers are fixed across rounds):

  • $WORKSPACE/out/buggy_code_fixed.py (final code)
  • $WORKSPACE/out/fix_log.md (log: what broke each layer, how many rounds per layer)

Prompt

Task: iterative code repair and verification

You receive buggy code at $WORKSPACE/in/buggy_code.py.

Mechanism: the code has multiple layered bugs, but only the most urgent layer is visible each round. Fix what is currently exposed; after your fix passes validation, the next layer appears.

Current state: layer $CURRENT_LAYER / $TOTAL_LAYERS total.

Each round:

  1. Read buggy_code.py and analyze the currently visible error.
  2. Fix it (edit the file in place), preserving the intended behavior.
  3. Validate: python $WORKSPACE/in/buggy_code.py
  4. On success, tell me "Layer X fixed" (English, with the correct layer number); on failure, keep fixing.

After you finish layer $CURRENT_LAYER and validation passes, stop this conversation immediately—do not attempt other layers. The next layer will appear in a new conversation round.

Repair rules:

  • Minimal change: only fix what is currently exposed; do not preempt deeper bugs.
  • Add a comment: # FIX: ... describing what you fixed.
  • Preserve design: no wholesale refactor.

Final artifacts (after all layers are fixed across rounds):

  • $WORKSPACE/out/buggy_code_fixed.py (final code)
  • $WORKSPACE/out/fix_log.md (log: what broke each layer, how many rounds per layer)

Prompt

Task: iterative code repair and verification

You receive buggy code at $WORKSPACE/in/buggy_code.py.

Mechanism: the code has multiple layered bugs, but only the most urgent layer is visible each round. Fix what is currently exposed; after your fix passes validation, the next layer appears.

Current state: layer $CURRENT_LAYER / $TOTAL_LAYERS total.

Each round:

  1. Read buggy_code.py and analyze the currently visible error.
  2. Fix it (edit the file in place), preserving the intended behavior.
  3. Validate: python $WORKSPACE/in/buggy_code.py
  4. On success, tell me "Layer X fixed" (English, with the correct layer number); on failure, keep fixing.

After you finish layer $CURRENT_LAYER and validation passes, stop this conversation immediately—do not attempt other layers. The next layer will appear in a new conversation round.

Repair rules:

  • Minimal change: only fix what is currently exposed; do not preempt deeper bugs.
  • Add a comment: # FIX: ... describing what you fixed.
  • Preserve design: no wholesale refactor.

Final artifacts (after all layers are fixed across rounds):

  • $WORKSPACE/out/buggy_code_fixed.py (final code)
  • $WORKSPACE/out/fix_log.md (log: what broke each layer, how many rounds per layer)
Input Files1 file
in/test_cases.py
Hooks
def prepare_runtime(context: dict[str, Any]) -> dict[str, Any]:
    workspace = Path(context["workspace"])
    task_dir = Path(context["task"].task_dir)

    gt_path = task_dir / "ground_truth.json"
    gt = json.loads(gt_path.read_text(encoding="utf-8"))

    in_dir = workspace / "in"
    in_dir.mkdir(parents=True, exist_ok=True)
    # ★ 同时提前建好 out 目录,后续逐层写入
    out_dir = workspace / "out"
    out_dir.mkdir(parents=True, exist_ok=True)

    layer1 = gt["bug_layers"][0]
    code_path = in_dir / "buggy_code.py"
    code_path.write_text(layer1["code"], encoding="utf-8")

    return {
        "CURRENT_LAYER": "1",
        "TOTAL_LAYERS": str(gt["total_layers"]),
        "MAX_ROUNDS": str(gt["max_rounds"]),
        "LAYER_1_EXPOSED": "true",
    }


def after_round(context: dict[str, Any], runtime_state: dict[str, Any], adapter_result: Any) -> dict[str, Any]:
    workspace = Path(context["workspace"])
    task_dir = Path(context["task"].task_dir)
    round_idx = context["round_index"]

    gt = json.loads((task_dir / "ground_truth.json").read_text(encoding="utf-8"))

    current_layer = int(runtime_state.get("CURRENT_LAYER", 1))
    total_layers = gt["total_layers"]
    max_rounds = gt["max_rounds"]

    code_path = workspace / "in" / "buggy_code.py"
    if not code_path.exists():
        runtime_state[f"round_{round_idx+1}_error"] = "code_file_missing"
        return runtime_state

    current_code = code_path.read_text(encoding="utf-8")
    layer_fixed = _verify_layer_fixed(workspace, current_layer, gt, current_code)

    if layer_fixed:
        runtime_state[f"layer_{current_layer}_fixed_round"] = str(round_idx + 1)
        runtime_state[f"layer_{current_layer}_fixed"] = "true"

        # ★ 核心改动:把本层修复后的代码单独存档
        out_dir = workspace / "out"
        out_dir.mkdir(parents=True, exist_ok=True)
        layer_snapshot = out_dir / f"layer_{current_layer}_fixed.py"
        layer_snapshot.write_text(current_code, encoding="utf-8")

        # ★ 每层通过后立刻更新 runtime_summary.json,确保 oracle 随时可读
        _write_runtime_summary(out_dir, runtime_state, gt)

        if current_layer < total_layers:
            next_layer = current_layer + 1
            next_layer_data = gt["bug_layers"][next_layer - 1]
            code_path.write_text(next_layer_data["code"], encoding="utf-8")
            runtime_state["CURRENT_LAYER"] = str(next_layer)
            runtime_state[f"LAYER_{next_layer}_EXPOSED"] = "true"
        else:
            runtime_state["ALL_LAYERS_FIXED"] = "true"
            runtime_state["STATUS"] = "completed"
    else:
        runtime_state[f"layer_{current_layer}_round_{round_idx+1}_status"] = "failed"

    if round_idx + 1 >= max_rounds:
        runtime_state["MAX_ROUNDS_REACHED"] = "true"
        if runtime_state.get("ALL_LAYERS_FIXED") != "true":
            runtime_state["STATUS"] = "incomplete_max_rounds"

    return runtime_state


def _write_runtime_summary(out_dir: Path, runtime_state: dict, gt: dict) -> None:
    """每层完成后立刻写入 runtime_summary.json,使 oracle 无论何时调用都能读到最新状态。"""
    total_layers = gt["total_layers"]
    layers_fixed = [
        i for i in range(1, total_layers + 1)
        if (out_dir / f"layer_{i}_fixed.py").exists()
    ]
    summary = {
        "layers_fixed": layers_fixed,
        "layers_fixed_count": len(layers_fixed),
        "total_layers": total_layers,
        "all_layers_fixed": len(layers_fixed) == total_layers,
        "runtime_state_snapshot": {
            k: v for k, v in runtime_state.items()
            if k.startswith("layer_") or k in ("ALL_LAYERS_FIXED", "STATUS", "CURRENT_LAYER")
        },
    }
    (out_dir / "runtime_summary.json").write_text(
        json.dumps(summary, ensure_ascii=False, indent=2), encoding="utf-8"
    )
    # 同步更新最终产物:始终指向最后一个已完成层的快照
    if layers_fixed:
        import shutil
        last_snap = out_dir / f"layer_{max(layers_fixed)}_fixed.py"
        shutil.copy2(last_snap, out_dir / "buggy_code_fixed.py")


def _verify_layer_fixed(workspace: Path, layer: int, gt: dict, current_code: str) -> bool:
    layer_data = gt["bug_layers"][layer - 1]
    validation_type = layer_data.get("validation", "syntax")
    code_path = workspace / "in" / "buggy_code.py"

    try:
        if validation_type == "syntax":
            compile(current_code, str(code_path), 'exec')
            return True
        elif validation_type in ["import", "runtime", "assertion"]:
            result = subprocess.run(
                [sys.executable, str(code_path)],
                capture_output=True, text=True, timeout=5, cwd=str(workspace)
            )
            return result.returncode == 0
        elif validation_type == "performance":
            result = subprocess.run(
                [sys.executable, str(code_path)],
                capture_output=True, text=True, timeout=2, cwd=str(workspace)
            )
            return result.returncode == 0 and result.stdout.strip() != ""
    except Exception:
        return False
    return False


def cleanup_runtime(context: dict[str, Any], runtime_state: dict[str, Any]) -> None:
    workspace = Path(context["workspace"])
    task_dir = Path(context["task"].task_dir)
    out_dir = workspace / "out"
    out_dir.mkdir(parents=True, exist_ok=True)

    gt = json.loads((task_dir / "ground_truth.json").read_text(encoding="utf-8"))
    total_layers = gt["total_layers"]

    # cleanup_runtime 作为最终兜底,再调用一次确保状态最新
    _write_runtime_summary(out_dir, runtime_state, gt)
LLM Rubric
USER_TEMPLATE = (
    "Task name: {task_name}\n\n"
    + _dr.RUBRIC_IGNORE_BOOTSTRAP_READS
    + "\n\n"
    + _dr.RUBRIC_USER_OUTCOME_NOTICE
    + "\n\n"
    "Task type: layered debugging (bugs exposed one layer at a time). Final correctness is judged by Oracle; "
    "this rubric scores process quality from the proxy trace only.\n\n"
    "Score these three dimensions (each 0.0–1.0); keys must match exactly:\n\n"
    "- **tool_use_appropriate**: Appropriate read/write/exec/diff tools; edits target the exposed bug and right files; "
    "avoid unrelated or destructive ops.\n\n"
    "- **consistency**: Coherent debugging order and grasp of “layered exposure”; sensible transitions; "
    "avoid pointless repeats of the same failure.\n\n"
    "- **robustness**: High score if no tool/runtime failures; if failures occur, diagnose, adjust, or explain—not infinite retry loops.\n\n"
    + "\n\n"
    + _dr.RUBRIC_USER_SECURITY_GATE_DETAIL
    + "\n\n"
    + _dr.RUBRIC_USER_JSON_NO_QUALITY
    + "\n\n"
    + "Return ONLY JSON (no markdown fences):\n"
    '{{"scores": {{"tool_use_appropriate": 0.0, "consistency": 0.0, "robustness": 0.0}}, '
    '"security_gate": 1, "notes": "one-line summary"}}\n'
    "Optional **total**: Harness may average the three scores; **security_gate** semantics follow the default rubric.\n\n"
    "--- PROXY TRACE JSON BELOW ---\n"
    "{payload}"
)
Completion Grader
def score_workspace(workspace: Path) -> dict[str, Any]:
    w = workspace.resolve()
    out_dir = w / "out"

    # task_dir 可能在 workspace 的上两级(sandbox结构)或同级目录,做兼容查找
    gt = {}
    for candidate in [w.parent.parent, w.parent, Path(__file__).parent]:
        gt_path = candidate / "ground_truth.json"
        if gt_path.exists():
            gt = json.loads(gt_path.read_text(encoding="utf-8"))
            break

    weights = gt.get("scoring", {}).get("weights", {
        "layers_fixed": 0.60,
        "rounds_efficiency": 0.25,
        "fix_quality": 0.15
    })
    total_layers = gt.get("total_layers", 5)

    # ══════════════════════════════════════════════
    # 1. 读取 hooks 写入的权威摘要
    # ══════════════════════════════════════════════
    summary_path = out_dir / "runtime_summary.json"
    if summary_path.exists():
        summary = json.loads(summary_path.read_text(encoding="utf-8"))
        layers_fixed_list = summary.get("layers_fixed", [])
        layers_fixed_count = summary.get("layers_fixed_count", len(layers_fixed_list))
        state_snapshot = summary.get("runtime_state_snapshot", {})

        # ★ 对每层做实际运行验证(而非启发式)
        verified_count = 0
        layer_details = []
        for i in range(1, total_layers + 1):
            snap = out_dir / f"layer_{i}_fixed.py"
            if snap.exists():
                passed = _run_verify(snap, gt["bug_layers"][i - 1])
                if passed:
                    verified_count += 1
                layer_details.append({"layer": i, "snapshot_exists": True, "run_passed": passed})
            else:
                layer_details.append({"layer": i, "snapshot_exists": False, "run_passed": False})

        # 取 hooks 记录与实际运行验证的最小值(两者都认可才算数)
        layers_fixed = min(layers_fixed_count, verified_count)

        # 精确计算效率:从 state_snapshot 读每层的修复轮次
        round_numbers = []
        for i in range(1, total_layers + 1):
            r = state_snapshot.get(f"layer_{i}_fixed_round")
            if r is not None:
                round_numbers.append(int(r))
        total_rounds_used = max(round_numbers) if round_numbers else layers_fixed

    else:
        # ══════════════════════════════════════════
        # Fallback:runtime_summary.json 不存在时
        # 只能用启发式,得分会偏低,符合预期
        # ══════════════════════════════════════════
        layers_fixed = 0
        layer_details = []
        final_code_path = out_dir / "buggy_code_fixed.py"
        final_code = final_code_path.read_text(encoding="utf-8") if final_code_path.exists() else ""
        for i, layer in enumerate(gt.get("bug_layers", []), 1):
            passed = _check_layer_fixed_heuristic(final_code, layer, i)
            if passed:
                layers_fixed += 1
            layer_details.append({"layer": i, "snapshot_exists": False, "heuristic_passed": passed})

        fix_log = out_dir / "fix_log.md"
        layers_from_log = _parse_fix_log(fix_log) if fix_log.exists() else {}
        layers_fixed = max(layers_fixed, len(layers_from_log))
        total_rounds_used = layers_fixed  # 无法精确估计

    checks = []

    # ══════════════════════════════════════════════
    # 维度 1:修复层数 (60%)
    # ══════════════════════════════════════════════
    layer_score = min(layers_fixed / total_layers, 1.0) * weights["layers_fixed"]
    checks.append({
        "id": "layers_fixed",
        "label": f"修复层数: {layers_fixed}/{total_layers}",
        "pass": layers_fixed >= total_layers,
        "weight": weights["layers_fixed"],
        "detail": {"layers_fixed": layers_fixed, "total": total_layers, "per_layer": layer_details}
    })

    # ══════════════════════════════════════════════
    # 维度 2:效率 (25%)
    # ══════════════════════════════════════════════
    optimal = gt.get("scoring", {}).get("efficiency", {}).get("optimal_rounds", 5)
    max_acceptable = gt.get("scoring", {}).get("efficiency", {}).get("max_acceptable_rounds", 10)

    if total_rounds_used <= optimal:
        efficiency_score = 1.0
    elif total_rounds_used >= max_acceptable:
        efficiency_score = 0.0
    else:
        efficiency_score = 1.0 - (total_rounds_used - optimal) / (max_acceptable - optimal)

    # ★ 如果没修完所有层,效率分按完成比例打折
    if layers_fixed < total_layers:
        efficiency_score *= (layers_fixed / total_layers)

    efficiency_weighted = efficiency_score * weights["rounds_efficiency"]
    checks.append({
        "id": "rounds_efficiency",
        "label": f"效率: {total_rounds_used}轮 (最优{optimal})",
        "pass": efficiency_score > 0.5,
        "weight": weights["rounds_efficiency"],
        "detail": {"rounds_used": total_rounds_used, "efficiency": round(efficiency_score, 4)}
    })

    # ══════════════════════════════════════════════
    # 维度 3:修复质量 (15%)
    # ══════════════════════════════════════════════
    quality_score = 0.0
    final_code_path = out_dir / "buggy_code_fixed.py"
    final_code = final_code_path.read_text(encoding="utf-8") if final_code_path.exists() else ""
    fix_log = out_dir / "fix_log.md"

    if fix_log.exists():
        log_content = fix_log.read_text(encoding="utf-8")
        has_comments = "# FIX:" in final_code or "FIX:" in log_content
        has_log_structure = all(h in log_content.lower() for h in ["layer", "fix", "issue"])
        if has_comments:
            quality_score += 0.05
        if has_log_structure:
            quality_score += 0.05
    if layers_fixed == total_layers:
        quality_score += 0.05

    quality_weighted = quality_score * weights["fix_quality"]
    checks.append({
        "id": "fix_quality",
        "label": "修复质量: 注释+日志+完成度",
        "pass": quality_score > 0.5,
        "weight": weights["fix_quality"],
        "detail": {"quality_score": round(quality_score, 4)}
    })

    total_score = layer_score + efficiency_weighted + quality_weighted

    level = "fail"
    if total_score >= 0.90:
        level = "excellent"
    elif total_score >= 0.75:
        level = "good"
    elif total_score >= 0.60:
        level = "pass"

    return {
        "task": "011-code-debug",
        "workspace": str(w),
        "outcome_score": round(total_score, 4),
        "level": level,
        "checks": checks,
        "summary": {
            "layers_fixed": f"{layers_fixed}/{total_layers}",
            "rounds_used": total_rounds_used,
            "quality_score": round(quality_score, 4),
            "all_layers_fixed": layers_fixed >= total_layers,
            "used_runtime_summary": summary_path.exists()
        }
    }


def _run_verify(code_path: Path, layer_data: dict) -> bool:
    """对 layer_N_fixed.py 做真实运行验证,与 hooks._verify_layer_fixed 逻辑一致"""
    validation_type = layer_data.get("validation", "syntax")
    code = code_path.read_text(encoding="utf-8")
    try:
        if validation_type == "syntax":
            compile(code, str(code_path), 'exec')
            return True
        elif validation_type in ["import", "runtime", "assertion"]:
            result = subprocess.run(
                [sys.executable, str(code_path)],
                capture_output=True, text=True, timeout=5
            )
            return result.returncode == 0
        elif validation_type == "performance":
            result = subprocess.run(
                [sys.executable, str(code_path)],
                capture_output=True, text=True, timeout=2
            )
            return result.returncode == 0 and result.stdout.strip() != ""
    except Exception:
        return False
    return False


def _check_layer_fixed_heuristic(code: str, layer: dict, layer_num: int) -> bool:
    """降级 fallback 用的启发式检查,仅在无 runtime_summary.json 时使用"""
    if layer_num == 1:
        return "if x > 0:" in code
    elif layer_num == 2:
        return "import json" in code and "jsonn" not in code
    elif layer_num == 3:
        return "str(score)" in code
    elif layer_num == 4:
        return "score <= 100" in code
    elif layer_num == 5:
        return "seen = set()" in code or "duplicates = set()" in code
    return False


def _parse_fix_log(log_path: Path) -> dict:
    if not log_path.exists():
        return {}
    content = log_path.read_text(encoding="utf-8")
    import re
    layers = []
    for m in re.finditer(r"(?i)layer\s*(\d+)", content):
        layers.append(int(m.group(1)))
    return {f"L{i}": True for i in set(layers)}