Department budget variance analysis

Prepare a budget-vs-actuals variance report.

Data, BI & Finance AnalyticsTask 6Oracle + LLM scoring
Model Runs6 harnesses & 8 models evaluated on this task.
Loading...
PromptData, BI & Finance Analytics ยท Task 6

Prepare a budget-vs-actuals variance report.

Inputs:

  • $WORKSPACE/in/budget.csv
  • $WORKSPACE/in/actuals.csv

Create:

  • $WORKSPACE/out/variance_report.csv
  • $WORKSPACE/out/department_rollup.csv
  • $WORKSPACE/out/review_reasons.json
  • $WORKSPACE/out/summary.md

variance_report.csv columns, in this exact order: department,category,budget_amount,actual_amount,variance_amount,variance_pct,flag

Rules:

  1. Do not modify fixtures.
  2. Use a full outer join across budget.csv and actuals.csv by department and category.
  • If a budget row has no matching actual row, set actual_amount to 0.00.
  • If an actual row has no matching budget row, include it as an unplanned item with budget_amount 0.00.
  1. variance_amount = actual_amount - budget_amount.
  2. variance_pct = variance_amount / budget_amount * 100.
  • If budget_amount is 0.00 and actual_amount is nonzero, set variance_pct to N/A and flag review.
  • If both budget_amount and actual_amount are 0.00, set variance_pct to 0.00 and flag ok.
  1. Format amount fields with exactly two decimals and variance_pct with exactly two decimals.
  2. flag must be review when absolute variance_pct is strictly greater than 10.00, otherwise ok. Exactly 10.00 is ok.
  3. summary.md must name every review item and the largest overrun by variance_amount.
  4. summary.md must mention any unplanned actuals, zero-budget rows, or missing actuals.

department_rollup.csv requirements:

  • Exact header:

department,budget_amount,actual_amount,variance_amount,variance_pct,review_item_count,rollup_flag

  • Sum all variance_report.csv rows by department after the full outer join logic.
  • variance_pct is department variance_amount / department budget_amount * 100.
  • If department budget_amount is 0.00 and actual_amount is nonzero, variance_pct is N/A.
  • review_item_count is the number of category rows in that department with flag=review.
  • rollup_flag is review when review_item_count > 0, otherwise ok.
  • Sort rows by department ascending.

review_reasons.json requirements:

  • Valid JSON object keyed by each review item as "Department Category".
  • Each value must contain reason_type and primary_driver.
  • reason_type must be one of: overrun_pct, underrun_pct, unplanned_actual, missing_actual, zero_budget_actual.
  • primary_driver should briefly identify the rule that caused the review flag, such as variance_pct > 10, variance_pct < -10, budget missing, actual missing, or budget is zero with actual spend.

Do not use network access or external APIs.

Input Files2 files
in/actuals.csv
in/budget.csv
LLM Rubric
USER_TEMPLATE = (
    "Task name: {task_name}\n\n"
    + _dr.RUBRIC_IGNORE_BOOTSTRAP_READS
    + "\n\n"
    + _dr.RUBRIC_USER_OUTCOME_NOTICE
    + "\n\nEvaluate whether the agent produced a finance-ready variance workflow with clear assumptions, correct threshold reasoning, "
    "and a useful concise summary. Exact arithmetic is checked by oracle_grade.py.\n\n"
    + "\n\n"
    + _dr.RUBRIC_USER_SECURITY_GATE_DETAIL
    + "\n\n"
    + _dr.RUBRIC_USER_JSON_NO_QUALITY
    + "\n\n"
    "Return ONLY JSON:\n"
    '{{"scores": {{"tool_use_appropriate": 0.0, "consistency": 0.0, "robustness": 0.0}}, "security_gate": 1, "notes": "one line"}}\n\n'
    "Optional **total**: harness recomputes mean of three process scores.\n\n--- PROXY TRACE JSON BELOW ---\n{payload}"
)
Completion Grader
def _add(checks: list[dict[str, Any]], cid: str, ok: bool, weight: float, detail: str | None = None) -> None:
    checks.append({"id": cid, "pass": bool(ok), "weight": weight, "detail": None if ok else detail})


def _read(path: Path) -> tuple[list[str], list[dict[str, str]]]:
    with path.open(newline="", encoding="utf-8") as f:
        reader = csv.DictReader(f)
        return list(reader.fieldnames or []), [{k: (v or "").strip() for k, v in row.items()} for row in reader]


def _sha256(path: Path) -> str:
    return hashlib.sha256(path.read_bytes()).hexdigest()


def _fixtures_unchanged(workspace: Path, gt: dict[str, Any]) -> bool:
    for rel, digest in gt.get("fixture_hashes", {}).items():
        candidate = workspace / "in" / rel
        if not candidate.is_file() or _sha256(candidate) != digest:
            return False
    return True


def score_workspace(workspace: str | Path, *, ground_truth_path: Path | None = None) -> dict[str, Any]:
    w = Path(workspace).resolve()
    gt = json.loads((ground_truth_path or _DEFAULT_GT).read_text(encoding="utf-8"))
    checks: list[dict[str, Any]] = []
    _add(checks, "fixtures_unchanged", _fixtures_unchanged(w, gt), 0.08, "one or more input files are missing or modified")
    out = w / gt["outputs"]["csv"]
    rollup = w / gt["outputs"].get("rollup", "out/department_rollup.csv")
    review_reasons = w / gt["outputs"].get("review_reasons", "out/review_reasons.json")
    summary = w / gt["outputs"]["summary"]
    _add(checks, "fixtures_present", (w / "in" / "budget.csv").is_file() and (w / "in" / "actuals.csv").is_file(), 0.08, "missing fixture")
    _add(checks, "csv_exists", out.is_file(), 0.08, "missing variance_report.csv")
    _add(checks, "rollup_exists", rollup.is_file(), 0.08, "missing department_rollup.csv")
    _add(checks, "review_reasons_exists", review_reasons.is_file(), 0.08, "missing review_reasons.json")
    _add(checks, "summary_exists", summary.is_file(), 0.06, "missing summary.md")
    if out.is_file():
        try:
            header, rows = _read(out)
            rows_sorted = sorted(rows, key=lambda r: (r.get("department", ""), r.get("category", "")))
            _add(checks, "exact_header", header == gt["header"], 0.08, f"got {header}")
            by_key = {r.get("department", "") + " " + r.get("category", ""): r for r in rows}
            _add(checks, "exact_rows", rows_sorted == gt["rows"], 0.34, f"got {rows_sorted}")
            money_ok = all(re.fullmatch(r"-?\d+\.\d{2}", r.get(col, "")) for r in rows for col in ["budget_amount", "actual_amount", "variance_amount"])
            pct_ok = all((r.get("variance_pct") == "N/A") or re.fullmatch(r"-?\d+\.\d{2}", r.get("variance_pct", "")) for r in rows)
            _add(checks, "number_format", money_ok, 0.08, "amounts and percentages need two decimals")
            _add(checks, "review_flags", {r["department"] + " " + r["category"] for r in rows if r.get("flag") == "review"} == set(gt["review_items"]), 0.10, "incorrect review flags")
            _add(checks, "pct_format_or_na", pct_ok, 0.04, "variance_pct must be two decimals or N/A")
            _add(checks, "full_outer_join_rows", {"Product Research", "Security Tools"} <= set(by_key), 0.04, "unplanned or missing-actual rows absent")
            _add(checks, "zero_budget_pct", by_key.get("Product Research", {}).get("variance_pct") == "N/A" and by_key.get("Support Escalations", {}).get("variance_pct") == "N/A", 0.04, "zero-budget rows should use N/A")
            _add(checks, "missing_actual_defaults_zero", by_key.get("Security Tools", {}).get("actual_amount") == "0.00", 0.04, "missing actual should default to 0.00")
            _add(checks, "strict_threshold", by_key.get("Sales Travel", {}).get("variance_pct") == "10.00" and by_key.get("Sales Travel", {}).get("flag") == "ok", 0.04, "exact 10.00 variance should be ok")
        except Exception as exc:
            _add(checks, "csv_readable", False, 0.30, str(exc))
    else:
        for cid, weight in [
            ("exact_header", 0.08),
            ("exact_rows", 0.34),
            ("number_format", 0.08),
            ("review_flags", 0.10),
            ("pct_format_or_na", 0.04),
            ("full_outer_join_rows", 0.04),
            ("zero_budget_pct", 0.04),
            ("missing_actual_defaults_zero", 0.04),
            ("strict_threshold", 0.04),
        ]:
            _add(checks, cid, False, weight, "missing variance_report.csv")
    if rollup.is_file():
        try:
            header, rows = _read(rollup)
            rows_sorted = sorted(rows, key=lambda r: r.get("department", ""))
            _add(checks, "rollup_header", header == gt["rollup_header"], 0.06, f"got {header}")
            _add(checks, "rollup_rows", rows_sorted == gt["rollup_rows"], 0.22, f"got {rows_sorted}")
            by_department = {r.get("department", ""): r for r in rows}
            _add(checks, "rollup_review_counts", by_department.get("Marketing", {}).get("review_item_count") == "2" and by_department.get("Sales", {}).get("review_item_count") == "0", 0.04, "department review counts are wrong")
            _add(checks, "rollup_zero_budget_na", by_department.get("Product", {}).get("variance_pct") == "N/A" and by_department.get("Support", {}).get("variance_pct") == "N/A", 0.04, "zero-budget department rollups should use N/A")
            _add(checks, "rollup_flag_from_review_items", by_department.get("Marketing", {}).get("rollup_flag") == "review" and by_department.get("Sales", {}).get("rollup_flag") == "ok", 0.04, "rollup_flag should follow review_item_count")
        except Exception as exc:
            _add(checks, "rollup_readable", False, 0.22, str(exc))
    else:
        for cid, weight in [
            ("rollup_header", 0.06),
            ("rollup_rows", 0.22),
            ("rollup_review_counts", 0.04),
            ("rollup_zero_budget_na", 0.04),
            ("rollup_flag_from_review_items", 0.04),
        ]:
            _add(checks, cid, False, weight, "missing department_rollup.csv")
    if review_reasons.is_file():
        try:
            data = json.loads(review_reasons.read_text(encoding="utf-8"))
            expected = gt["review_reasons"]
            keys_ok = isinstance(data, dict) and set(data) == set(expected)
            types_ok = keys_ok and all(isinstance(data.get(item), dict) and data[item].get("reason_type") == exp["reason_type"] for item, exp in expected.items())
            drivers_ok = keys_ok and all(str(data.get(item, {}).get("primary_driver", "")).strip() for item in expected)
            _add(checks, "review_reasons_keys", keys_ok, 0.06, f"got keys {sorted(data) if isinstance(data, dict) else type(data)}")
            _add(checks, "review_reason_types", types_ok, 0.18, f"got {data}")
            _add(checks, "review_reason_primary_drivers", drivers_ok, 0.06, "primary_driver missing")
        except Exception as exc:
            _add(checks, "review_reasons_parseable", False, 0.18, str(exc))
    else:
        for cid, weight in [
            ("review_reasons_keys", 0.06),
            ("review_reason_types", 0.18),
            ("review_reason_primary_drivers", 0.06),
        ]:
            _add(checks, cid, False, weight, "missing review_reasons.json")
    if summary.is_file():
        text = summary.read_text(encoding="utf-8", errors="replace")
        text_l = text.lower()
        for item in gt["review_items"]:
            parts = item.lower().split()
            if item == "R&D Tools":
                ok = ("r&d" in text_l or "research and development" in text_l or "r and d" in text_l) and "tools" in text_l
            else:
                ok = all(part in text_l for part in parts)
            _add(checks, f"summary_mentions_{item}", ok, 0.015, f"missing {item}")
        _add(checks, "summary_largest_overrun", all(part in text_l for part in gt["largest_overrun"].lower().split()), 0.04, "largest overrun not identified")
        _add(checks, "summary_mentions_unplanned", "unplanned" in text_l and "product" in text_l and "research" in text_l, 0.015, "summary should mention Product Research unplanned actual")
        _add(checks, "summary_mentions_zero_budget", ("zero budget" in text_l or "zero-budget" in text_l) and ("support" in text_l or "product" in text_l), 0.015, "summary should mention zero-budget rows")
        _add(checks, "summary_mentions_missing_actual", "missing actual" in text_l and "security" in text_l and "tools" in text_l, 0.015, "summary should mention Security Tools missing actual")
    else:
        for item in gt["review_items"]:
            _add(checks, f"summary_mentions_{item}", False, 0.015, "missing summary.md")
        for cid, weight in [
            ("summary_largest_overrun", 0.04),
            ("summary_mentions_unplanned", 0.015),
            ("summary_mentions_zero_budget", 0.015),
            ("summary_mentions_missing_actual", 0.015),
        ]:
            _add(checks, cid, False, weight, "missing summary.md")
    total_w = sum(c["weight"] for c in checks)
    score = round(sum(c["weight"] for c in checks if c["pass"]) / total_w, 4) if total_w else 0.0
    if any(c["id"].startswith("review_reason") and not c["pass"] for c in checks):
        score = min(score, 0.69)
    return {"task": "054-budget-variance-analysis", "workspace": str(w), "checks": checks, "outcome_score": score, "score": score}