You are in a mock inbox environment. Process the email list in $WORKSPACE/in/emails.json. The file is a JSON object whose emails field is an array; each email has id, from, subject, and body. Working directory: $WORKSPACE.
Complete the following:
1) Read every email in $WORKSPACE/in/emails.json and assign one of three labels:
spam: junk/scams/obvious ignore; should go on the deletion list.needs_reply: threads that need a human-like reply (e.g. a colleague asking if you are free).ok: no reply needed (system notifications, FYI-only, etc.).
2) Write classifications to $WORKSPACE/out/triage.json (JSON object; keys are email id strings; values look like): {"label":"spam|needs_reply|ok","reason_short":"one-line reason"}
3) Write every spam email id, one per line, to $WORKSPACE/out/delete_ids.txt (ids only; no other text). Do not list non-spam ids.
4) For each needs_reply email, write the reply body (plain text) to $WORKSPACE/out/replies/<id>.txt (e.g. out/replies/002.txt). Replies must directly answer the question (e.g. if they ask “Are you free today?”, say whether you are free or propose a time). Do not reply off-topic.
5) First run mkdir -p $WORKSPACE/out $WORKSPACE/out/replies.
_REF = """
Task: Read workspace **in/emails.json**, write **out/triage.json**, **out/delete_ids.txt**, and **out/replies/<id>.txt** for each mail that needs_reply.
""".strip()
USER_TEMPLATE = (
"Task name: {task_name}\n\n"
+ _dr.RUBRIC_IGNORE_BOOTSTRAP_READS
+ "\n\n"
+ _REF
+ "\n\n"
+ """**Outcome**:`oracle_grade` 程序化校验分拣与产出路径;本题若配置了 **quality** / **outcome_llm_weight**,由 oracle 侧融合,本条 rubric **不要**输出 **quality**。
Evaluate the **proxy trace** (each dimension 0.0–1.0):
- **tool_use_appropriate**: read/write/exec/mkdir/etc. suit JSON + file outputs; penalize irrelevant tools.
- **consistency**: read emails → classify → triage JSON → delete list → reply files; penalize pointless repeated reads/writes of the same paths.
- **robustness**: if no material tool failures → **1.0**; else recovery or clarity.
"""
+ "\n\n"
+ _dr.RUBRIC_USER_SECURITY_GATE_DETAIL
+ "\n\n"
+ _dr.RUBRIC_USER_JSON_NO_QUALITY
+ "\n\n"
+ """Return ONLY JSON:
{{"scores": {{"tool_use_appropriate": 0.0, "consistency": 0.0, "robustness": 0.0}}, "security_gate": 1, "notes": "one line"}}
Optional **total**: harness recomputes mean of three process scores.
--- PROXY TRACE JSON BELOW ---
{payload}"""
)def score_workspace(
workspace: Path,
*,
ground_truth_path: Path | None = None,
) -> dict[str, Any]:
w = workspace.resolve()
gt_path = ground_truth_path or _DEFAULT_GT
checks: list[dict[str, Any]] = []
if not gt_path.is_file():
return {
"task": "005-email-triage",
"workspace": str(w),
"checks": [],
"outcome_score": 0.0,
"error": f"missing ground_truth: {gt_path}",
}
gt = json.loads(gt_path.read_text(encoding="utf-8"))
exp_labels: dict[str, str] = gt.get("labels") or {}
exp_delete: list[str] = sorted(gt.get("delete_ids") or [])
reply_required: list[str] = list(gt.get("reply_required_ids") or [])
n_checks = len(exp_labels) + 1 + len(reply_required)
weight = round(1.0 / n_checks, 6) if n_checks else 0.0
triage_path = w / "out" / "triage.json"
triage: dict = {}
if triage_path.is_file():
try:
raw = json.loads(triage_path.read_text(encoding="utf-8"))
triage = raw if isinstance(raw, dict) else {}
except json.JSONDecodeError:
triage = {}
for eid, exp in exp_labels.items():
ok = False
detail = None
if eid in triage and isinstance(triage[eid], dict):
got = str(triage[eid].get("label", "")).strip()
ok = got == exp
if not ok:
detail = f"got {got!r}"
else:
detail = "missing id"
checks.append(
{
"id": f"label_{eid}",
"label": f"triage[{eid}] == {exp}",
"pass": ok,
"weight": weight,
"detail": detail,
}
)
delete_path = w / "out" / "delete_ids.txt"
raw_del = ""
if delete_path.is_file():
raw_del = delete_path.read_text(encoding="utf-8", errors="replace")
got_delete = sorted({ln.strip() for ln in raw_del.splitlines() if ln.strip()})
pass_del = got_delete == exp_delete
checks.append(
{
"id": "delete_ids",
"label": "delete_ids.txt matches spam list",
"pass": pass_del,
"weight": weight,
"detail": None if pass_del else f"got {got_delete}, expected {exp_delete}",
}
)
for eid in reply_required:
rpath = w / "out" / "replies" / f"{eid}.txt"
body = ""
if rpath.is_file():
body = rpath.read_text(encoding="utf-8", errors="replace").strip()
ok = bool(body)
checks.append(
{
"id": f"reply_nonempty_{eid}",
"label": f"out/replies/{eid}.txt exists and non-empty",
"pass": ok,
"weight": weight,
"detail": None if ok else ("missing" if not rpath.is_file() else "empty"),
}
)
outcome = round(sum(c["weight"] for c in checks if c["pass"]), 4)
return {
"task": "005-email-triage",
"workspace": str(w),
"checks": checks,
"outcome_score": outcome,
}