Platform governance appeal review

You are reviewing moderation appeals against a platform policy bundle.

Vertical Professional WorkflowsTask 8Oracle + LLM scoring
Model Runs6 harnesses & 8 models evaluated on this task.
Loading...
PromptVertical Professional Workflows · Task 8

You are reviewing moderation appeals against a platform policy bundle.

Mandatory execution constraint: After you finish reading inputs, you must write all three deliverables to disk under $WORKSPACE/out/non‑negotiable—before your run ends:

  1. appeal_decisions.csv
  2. explanations.md
  3. redaction_audit.csv

Use file-writing tools or shell redirection so these paths exist with real content; do not only describe them in chat. Create $WORKSPACE/out/ if missing, then write the files.

Read every input below before deciding:

  • $WORKSPACE/in/moderation_policy.md
  • $WORKSPACE/in/policy_amendments.md
  • $WORKSPACE/in/case_files/*.json

Amendments apply to appeals filed 2026-01-01 or later where noted; when case JSON includes appeal_filed before that date, META-2 blocks PRECVIO-1 even if prior_harass_strikes is elevated—apply EDGE-1 for qualifying harassment-tier appeals.

Create:

  • $WORKSPACE/out/appeal_decisions.csv
  • $WORKSPACE/out/explanations.md

Requirements for appeal_decisions.csv:

  • Header row columns exactly (names and order): case_id,decision,final_action,policy_basis,confidence
  • Exactly one row per case JSON file present under case_files/.
  • decision must be one of: uphold, overturn, downgrade.
  • final_action must be consistent with the decision and policy bundle (including amendments).
  • policy_basis must include every applicable clause ID; separate multiple IDs with semicolons (e.g. PRECVIO-1;EDGE-1). IDs must match the policy documents.
  • confidence must match calibration exactly per row using only high, medium, or low: clear-cut hate/counterspeech → high; compromise narratives, EDGE/PRECVIO/MISLABEL interplay, PRECVIO vs EDGE/META-2 date logic, or advisory automation conflicts → typically medium; genuinely ambiguous fact patterns → low.

Requirements for explanations.md:

  • Cover every case id using one Markdown section each, headed exactly: ### Case <case_id> (example: ### Case A-01).
  • Inside each section: cite every clause ID listed for that case in appeal_decisions.csv, summarize facts (including advisory automation where relevant), and include rather than or instead of in every section (not only once in the whole file).

Create also:

  • $WORKSPACE/out/redaction_audit.csv

Header columns exactly (names and order): case_id,redacted_field,redaction_reason Provide enough rows so sensitive payload paths are explicit—when a case JSON contains multiple sensitive keys (reporter, reporter_email, reporter_alias, …), use separate rows naming those paths (minimum one row per case id, more when multiple fields apply).

Do not use the network. Do not modify input files.

Input Files9 files
in/case_files/case_1.json
in/case_files/case_2.json
in/case_files/case_3.json
in/case_files/case_4.json
in/case_files/case_5.json
in/case_files/case_6.json
in/case_files/case_7.json
in/moderation_policy.md
in/policy_amendments.md
LLM Rubric
USER_TEMPLATE = (
    "Task name: {task_name}\n\n"
    + _dr.RUBRIC_IGNORE_BOOTSTRAP_READS
    + "\n\n"
    + _dr.RUBRIC_USER_OUTCOME_NOTICE
    + "\n\n"
    + """Evaluate the agent run for platform appeal review. Deterministic checks cover decisions, META-2 appeal_filed scope vs PRECVIO, clause citations per section, confidence calibration, granular redaction paths, and privacy. Score only qualitative aspects:
- tool_use_appropriate: reads base policy, amendments, and every case JSON (including appeal_filed and automation fields).
- consistency: applies META-2/PRECVIO/EDGE ordering before disposition labels.
- robustness: resists misleading automation scores when textual context indicates SAFE or elevated harassment strikes.

"""
    + "\n\n"
    + _dr.RUBRIC_USER_SECURITY_GATE_DETAIL
    + "\n\n"
    + _dr.RUBRIC_USER_JSON_NO_QUALITY
    + "\n\n"
    + """Return ONLY JSON:
{{"scores": {{"tool_use_appropriate": 0.0, "consistency": 0.0, "robustness": 0.0}}, "security_gate": 1, "notes": "one line"}}

Optional **total**: harness recomputes mean of three process scores.

--- PROXY TRACE JSON BELOW ---
{payload}
"""
)
Completion Grader
def _check(cid: str, label: str, ok: bool, weight: float, detail: str = "") -> dict[str, Any]:
    return {"id": cid, "label": label, "pass": bool(ok), "weight": weight, "detail": None if ok else detail}


def _first_line(path: Path) -> str:
    raw = path.read_text(encoding="utf-8-sig", errors="replace").splitlines()
    return raw[0].strip() if raw else ""


def _header_cells(line: str) -> list[str]:
    return [p.strip() for p in line.split(",")]


def _split_basis(cell: str) -> list[str]:
    parts = [p.strip() for p in cell.replace(",", ";").split(";")]
    return [p for p in parts if p]


def _extract_case_sections(text: str) -> dict[str, str]:
    rx = re.compile(r"^###\s*Case\s+([A-Z]-\d+)\s*$", re.MULTILINE)
    ms = list(rx.finditer(text))
    out: dict[str, str] = {}
    for i, m in enumerate(ms):
        cid = m.group(1)
        start = m.end()
        end = ms[i + 1].start() if i + 1 < len(ms) else len(text)
        out[cid] = text[start:end]
    return out


def score_workspace(workspace: Path, *, ground_truth_path: Path | None = None) -> dict[str, Any]:
    w = workspace.resolve()
    gt = json.loads((ground_truth_path or TASK_DIR / "ground_truth.json").read_text(encoding="utf-8"))
    decisions = w / gt["decisions_path"]
    explanations = w / gt["explanations_path"]
    redaction = w / gt["redaction_path"]
    expected = gt["expected"]
    valid_ids = set(gt["valid_clause_ids"])
    checks: list[dict[str, Any]] = []

    checks.append(_check("decisions_exists", "appeal_decisions.csv exists", decisions.is_file(), 0.04))
    checks.append(_check("explanations_exists", "explanations.md exists", explanations.is_file(), 0.04))
    checks.append(_check("redaction_exists", "redaction_audit.csv exists", redaction.is_file(), 0.04))

    want_dec_hdr = _header_cells(gt["decisions_header"])
    dh_ok = decisions.is_file() and _header_cells(_first_line(decisions)) == want_dec_hdr
    checks.append(_check("decisions_header", "appeal_decisions.csv has required columns", dh_ok, 0.03))

    want_red_hdr = _header_cells(gt["redaction_header"])
    rh_ok = redaction.is_file() and _header_cells(_first_line(redaction)) == want_red_hdr
    checks.append(_check("redaction_header", "redaction_audit.csv has required columns", rh_ok, 0.03))

    rows: list[dict[str, str]] = []
    if decisions.exists():
        with decisions.open("r", encoding="utf-8-sig", newline="") as f:
            rows = list(csv.DictReader(f))
    by_id = {r.get("case_id", "").strip(): r for r in rows if r.get("case_id")}
    checks.append(_check("all_cases", "exactly one row per expected case id", set(by_id) == set(expected), 0.06))

    decision_ok = all(by_id.get(cid, {}).get("decision") == exp["decision"] for cid, exp in expected.items())
    checks.append(_check("decisions", "appeal decisions are correct", decision_ok, 0.14))

    basis_tokens_ok = True
    basis_detail = ""
    whitelist_ok = True
    whitelist_detail = ""
    for cid, exp in expected.items():
        cell = by_id.get(cid, {}).get("policy_basis", "")
        tokens = _split_basis(cell)
        req = set(exp["policy_basis_tokens"])
        got = set(tokens)
        if not req <= got:
            basis_tokens_ok = False
            basis_detail = f"{cid}: expected clause tokens {sorted(req)} not all present in policy_basis"
            break
        if not got <= valid_ids:
            whitelist_ok = False
            whitelist_detail = f"{cid}: unknown clause id in policy_basis ({sorted(got - valid_ids)})"
            break

    checks.append(_check("policy_basis_tokens", "policy_basis lists required clause IDs", basis_tokens_ok, 0.09))
    checks.append(_check("policy_basis_whitelist", "policy_basis tokens are from policy/amendments ID set", whitelist_ok, 0.05))

    final_ok = all(
        exp["final_action_contains"].lower() in by_id.get(cid, {}).get("final_action", "").lower()
        for cid, exp in expected.items()
    )
    checks.append(_check("final_action", "final actions match outcomes", final_ok, 0.09))

    conf_ok = all(
        by_id.get(cid, {}).get("confidence", "").strip().lower() == exp["confidence"]
        for cid, exp in expected.items()
    )
    checks.append(_check("confidence_expected", "confidence matches calibration rubric per case", conf_ok, 0.05))

    text = explanations.read_text(encoding="utf-8", errors="replace") if explanations.exists() else ""
    text_l = text.lower()
    sections = _extract_case_sections(text) if text else {}
    headings_ok = explanations.is_file() and set(sections.keys()) == set(expected.keys())
    checks.append(_check("explanation_headings", "each case uses ### Case <id> heading", headings_ok, 0.06))

    sec_tokens_ok = True
    sec_detail = ""
    if headings_ok:
        for cid, exp in expected.items():
            body = sections.get(cid, "")
            bl = body.lower()
            for tok in exp["policy_basis_tokens"]:
                if tok.lower() not in bl:
                    sec_tokens_ok = False
                    sec_detail = f"{cid}: missing clause mention {tok} inside its section"
                    break
            if not sec_tokens_ok:
                break
    else:
        sec_tokens_ok = False
        sec_detail = "fix ### Case headings so every expected case id has its own section"

    checks.append(_check("explanations_clause_sections", "each section cites its row's clause IDs", sec_tokens_ok, 0.06))

    why_ok = False
    why_detail = ""
    if headings_ok:
        why_ok = True
        for cid in expected:
            bl = sections.get(cid, "").lower()
            if "rather than" not in bl and "instead of" not in bl:
                why_ok = False
                why_detail = f"{cid}: section needs contrast phrasing (rather than / instead of)"
                break
    else:
        why_detail = sec_detail or "missing structured sections"

    checks.append(_check("why_not_per_section", "each section contrasts alternatives", why_ok, 0.06))

    privacy_ok = explanations.is_file() and not any(term.lower() in text_l for term in gt["forbidden_terms"])
    checks.append(_check("privacy", "sensitive identifiers not copied into explanations", privacy_ok, 0.05))

    redaction_ok = False
    red_fields_ok = False
    red_detail = ""
    if redaction.exists():
        try:
            with redaction.open("r", encoding="utf-8-sig", newline="") as f:
                rrows = list(csv.DictReader(f))
            ids_seen = {r.get("case_id", "").strip() for r in rrows if r.get("case_id")}
            nonempty = bool(rrows) and all(
                (r.get("redacted_field") or "").strip() and (r.get("redaction_reason") or "").strip() for r in rrows
            )
            min_rows = sum(len(v) for v in gt["redaction_fields_min"].values())
            redaction_ok = ids_seen >= set(gt["redaction_cases"]) and nonempty and len(rrows) >= min_rows

            red_fields_ok = True
            for cid, needles in gt["redaction_fields_min"].items():
                case_rows = [r for r in rrows if r.get("case_id", "").strip() == cid]
                fields_seen = " ".join((r.get("redacted_field") or "").lower() for r in case_rows)
                for needle in needles:
                    if needle.lower() not in fields_seen:
                        red_fields_ok = False
                        red_detail = f"{cid}: redacted_field must reference {needle!r}"
                        break
                if not red_fields_ok:
                    break
        except Exception:
            redaction_ok = False
            red_fields_ok = False

    checks.append(_check("redaction_audit", "redaction_audit covers cases with minimum rows", redaction_ok, 0.06))
    checks.append(_check("redaction_field_coverage", "redaction lists sensitive JSON paths", red_fields_ok, 0.05))

    total_w = sum(c["weight"] for c in checks)
    score = round(sum(c["weight"] for c in checks if c["pass"]) / total_w, 4) if total_w else 0.0

    if basis_detail and not basis_tokens_ok:
        for c in checks:
            if c["id"] == "policy_basis_tokens" and not c["pass"]:
                c["detail"] = basis_detail
    if whitelist_detail and not whitelist_ok:
        for c in checks:
            if c["id"] == "policy_basis_whitelist" and not c["pass"]:
                c["detail"] = whitelist_detail
    if sec_detail and not sec_tokens_ok:
        for c in checks:
            if c["id"] == "explanations_clause_sections" and not c["pass"]:
                c["detail"] = sec_detail
    if why_detail and not why_ok:
        for c in checks:
            if c["id"] == "why_not_per_section" and not c["pass"]:
                c["detail"] = why_detail
    if red_detail and not red_fields_ok:
        for c in checks:
            if c["id"] == "redaction_field_coverage" and not c["pass"]:
                c["detail"] = red_detail

    return {
        "task": "075-platform-appeal-review",
        "workspace": str(w),
        "outcome_score": score,
        "checks": checks,
        "outcome_llm_weight": 0.0,
    }