E-commerce support ticket routing and reply templates

You are routing ecommerce support tickets.

Vertical Professional WorkflowsTask 4Oracle + LLM scoring
Model Runs6 harnesses & 8 models evaluated on this task.
Loading...
PromptVertical Professional Workflows · Task 4

You are routing ecommerce support tickets.

Read:

  • $WORKSPACE/in/tickets.json
  • $WORKSPACE/in/order_history.csv
  • $WORKSPACE/in/policy.md

Each ticket includes optional fields such as vip_flag, prior_ticket_refs. Honor VIP-1: VIP speeds information_request (priority=high) but never skips INFO-1 evidence rules or fraud holds.

Create:

  • $WORKSPACE/out/routing_decisions.json
  • $WORKSPACE/out/reply_templates.md

Requirements for routing_decisions.json:

  • Valid JSON array with exactly one object per ticket id present in tickets.json (same ids, same ordering not required).
  • Each object must include these keys (use empty string "" when no order id is supplied): ticket_id, action, policy_basis, order_id, priority, customer_reply_key.
  • action ∈ { refund, reship, escalate_human, information_request }.
  • policy_basis must include the governing clause token (REFUND-1, SHIP-1, ESC-1, INFO-1, FRAUD-1, CONFLICT-1, DISPUTE-1). Multiple citations allowed via semicolon text but must contain the listed primary clause id string for tooling checks.
  • Resolve conflicts conservatively: fraud holds, carrier/system contradictions vs warehouse facts, delivered-status POD disputes inside DISPUTE-1 window — escalate_human even if VIP asks otherwise.
  • Missing verification → information_request + INFO-1.
  • priority ∈ { high, normal }. Use high for every escalate_human ticket and for VIP-backed INFO-1 threads lacking verification (policy VIP-1).
  • customer_reply_key must be exactly one stable snake_case identifier per ticket (templates reference the same key).

Requirements for reply_templates.md:

  • Provide one section per ticket, headed exactly: ### Template <ticket_id> (example: ### Template T-1001).
  • Body content must stay empathetic and non-binding on outcomes already routed away from automation.
  • Never promise refunds/reships/hold removals unless action already guarantees automation handled it — keep language conditional when escalating.

Create also:

  • $WORKSPACE/out/escalation_notes.csv

First row must be exactly: ticket_id,reason,policy_basis,human_team One row per escalate_human ticket only; concise reason (≥8 chars); policy_basis consistent with routing_decisions.json; human_team must be exactly one of:

  • T-1003 → Logistics Escalations
  • T-1005 → Fraud Review
  • T-1006 → Trust and Safety
  • T-1008 → Carrier Claims
  • T-1009 → Logistics Integrity

Do not use the network. Do not modify input files.

Input Files3 files
in/order_history.csv
in/policy.md
in/tickets.json
LLM Rubric
USER_TEMPLATE = (
    "Task name: {task_name}\n\n"
    + _dr.RUBRIC_IGNORE_BOOTSTRAP_READS
    + "\n\n"
    + _dr.RUBRIC_USER_OUTCOME_NOTICE
    + "\n\n"
    + """Evaluate the agent run for ecommerce support routing. Deterministic checks cover JSON routing contracts (priority/order_id/reply keys), DISPUTE-1 POD windows, CONFLICT-1 follow-ups, VIP INFO-1 handling, structured templates, and escalation team routing. Score only qualitative aspects:
- tool_use_appropriate: reads tickets (vip_flag/prior refs), orders, policy clauses SHIP through DISPUTE.
- consistency: aligns escalating disputes vs informational gaps before authoring templates.
- robustness: refuses VIP/fraud shortcuts and avoids contradictory automation promises.

"""
    + "\n\n"
    + _dr.RUBRIC_USER_SECURITY_GATE_DETAIL
    + "\n\n"
    + _dr.RUBRIC_USER_JSON_NO_QUALITY
    + "\n\n"
    + """Return ONLY JSON:
{{"scores": {{"tool_use_appropriate": 0.0, "consistency": 0.0, "robustness": 0.0}}, "security_gate": 1, "notes": "one line"}}

Optional **total**: harness recomputes mean of three process scores.

--- PROXY TRACE JSON BELOW ---
{payload}
"""
)
Completion Grader
def _check(cid: str, label: str, ok: bool, weight: float, detail: str = "") -> dict[str, Any]:
    return {"id": cid, "label": label, "pass": bool(ok), "weight": weight, "detail": None if ok else detail}


def _first_line(path: Path) -> str:
    raw = path.read_text(encoding="utf-8-sig", errors="replace").splitlines()
    return raw[0].strip() if raw else ""


def _template_sections(md: str) -> dict[str, str]:
    rx = re.compile(r"^###\s+Template\s+(T-\d+)\s*$", re.MULTILINE)
    ms = list(rx.finditer(md))
    out: dict[str, str] = {}
    for i, m in enumerate(ms):
        tid = m.group(1)
        start = m.end()
        end = ms[i + 1].start() if i + 1 < len(ms) else len(md)
        out[tid] = md[start:end]
    return out


def score_workspace(workspace: Path, *, ground_truth_path: Path | None = None) -> dict[str, Any]:
    w = workspace.resolve()
    gt = json.loads((ground_truth_path or TASK_DIR / "ground_truth.json").read_text(encoding="utf-8"))
    decisions_path = w / gt["decisions_path"]
    templates_path = w / gt["templates_path"]
    escalation_path = w / gt["escalation_path"]
    expected: dict[str, Any] = gt["expected"]

    checks: list[dict[str, Any]] = []
    checks.append(_check("decisions_exists", "routing_decisions.json exists", decisions_path.is_file(), 0.05))
    checks.append(_check("templates_exists", "reply_templates.md exists", templates_path.is_file(), 0.05))
    checks.append(_check("escalations_exists", "escalation_notes.csv exists", escalation_path.is_file(), 0.05))

    data: Any = None
    try:
        data = json.loads(decisions_path.read_text(encoding="utf-8")) if decisions_path.exists() else None
    except Exception:
        data = None
    req_keys = {"ticket_id", "action", "policy_basis", "order_id", "priority", "customer_reply_key"}
    schema_ok = (
        isinstance(data, list)
        and len(data) == len(expected)
        and all(isinstance(x, dict) and req_keys.issubset(x) for x in data)
    )
    checks.append(_check("json_schema", "decisions JSON objects include required keys", schema_ok, 0.06))

    by_id = {str(x.get("ticket_id")): x for x in data if isinstance(x, dict)} if isinstance(data, list) else {}
    all_ids = set(by_id) == set(expected)
    checks.append(_check("all_tickets", "one routing row per ticket id", all_ids, 0.06))

    actions_ok = all(by_id.get(tid, {}).get("action") == exp["action"] for tid, exp in expected.items())
    checks.append(_check("actions", "routing actions match ground truth", actions_ok, 0.13))

    basis_ok = all(str(by_id.get(tid, {}).get("policy_basis", "")).find(exp["policy_basis"]) >= 0 for tid, exp in expected.items())
    checks.append(_check("policy_basis", "policy_basis cites expected clause ids", basis_ok, 0.10))

    allowed_ok = bool(by_id) and all(str(x.get("action")) in gt["allowed_actions"] for x in by_id.values())
    checks.append(_check("allowed_actions", "actions stay within taxonomy", allowed_ok, 0.05))

    order_ok = True
    order_detail = ""
    for tid, exp in expected.items():
        got = by_id.get(tid, {}).get("order_id", "")
        if got is None:
            got_s = ""
        else:
            got_s = str(got).strip()
        exp_s = exp.get("order_id", "")
        if exp_s != got_s:
            order_ok = False
            order_detail = f"{tid}: order_id expected {exp_s!r} got {got_s!r}"
            break
    checks.append(_check("order_id_fields", "order_id echoes ticket input", order_ok, 0.08))

    prio_ok = True
    prio_detail = ""
    for tid, exp in expected.items():
        exp_p = str(exp.get("priority", "")).strip().lower()
        got_p = str(by_id.get(tid, {}).get("priority", "")).strip().lower()
        if got_p != exp_p:
            prio_ok = False
            prio_detail = f"{tid}: priority expected {exp_p!r} got {got_p!r}"
            break
    checks.append(_check("priority_map", "priority reflects escalation and vip-info urgency", prio_ok, 0.10))

    keys_ok = True
    keys_detail = ""
    for tid, exp in expected.items():
        exp_k = str(exp.get("customer_reply_key", "")).strip()
        got_k = str(by_id.get(tid, {}).get("customer_reply_key", "")).strip()
        if got_k != exp_k:
            keys_ok = False
            keys_detail = f"{tid}: customer_reply_key expected {exp_k!r} got {got_k!r}"
            break
    checks.append(_check("reply_key_contract", "customer_reply_key matches routing contract", keys_ok, 0.07))

    tmpl_raw = templates_path.read_text(encoding="utf-8", errors="replace") if templates_path.exists() else ""
    sections = _template_sections(tmpl_raw)
    headings_ok = templates_path.is_file() and set(sections.keys()) == set(expected)
    checks.append(_check("template_headings", "reply_templates uses ### Template <ticket_id> sections", headings_ok, 0.07))

    tmpl_lc = tmpl_raw.lower()
    safe_templates = templates_path.is_file() and not any(p.lower() in tmpl_lc for p in gt["forbidden_template_phrases"])
    checks.append(_check("templates_no_overpromise", "templates avoid forbidden promises", safe_templates, 0.05))

    wants_esc = {tid for tid, exp in expected.items() if exp["action"] == "escalate_human"}
    esc_header_ok = escalation_path.is_file() and _first_line(escalation_path) == gt["escalation_header"]

    escalation_ok = False
    esc_detail = ""
    teams_map: dict[str, str] = gt.get("escalation_human_team") or {}
    if escalation_path.exists() and esc_header_ok:
        try:
            with escalation_path.open("r", encoding="utf-8-sig", newline="") as f:
                rows = list(csv.DictReader(f))
            by_esc = {str(r.get("ticket_id", "")).strip(): r for r in rows if r.get("ticket_id")}
            ids_ok = set(by_esc.keys()) == wants_esc
            teams_ok = True
            reasons_ok = True
            for tid in wants_esc:
                row = by_esc.get(tid, {})
                pb = str(row.get("policy_basis", "")).strip()
                rs = str(row.get("reason", "")).strip()
                hm = str(row.get("human_team", "")).strip()
                want_team = teams_map.get(tid, "")
                if len(rs) < 8:
                    reasons_ok = False
                    esc_detail = f"{tid}: reason too short"
                    break
                if not pb:
                    reasons_ok = False
                    esc_detail = f"{tid}: missing policy_basis"
                    break
                if hm != want_team:
                    teams_ok = False
                    esc_detail = f"{tid}: human_team expected {want_team!r} got {hm!r}"
                    break
            escalation_ok = ids_ok and teams_ok and reasons_ok and len(rows) == len(wants_esc)
            if not ids_ok and not esc_detail:
                esc_detail = "escalation_notes.csv ticket_id set mismatch"
        except Exception as exc:
            escalation_ok = False
            esc_detail = str(exc)

    checks.append(_check("escalation_csv_header", "escalation_notes.csv header exact", esc_header_ok, 0.03))
    checks.append(_check("escalation_notes_complete", "escalation rows teams reasons policy_basis", escalation_ok, 0.05))

    total_w = sum(c["weight"] for c in checks)
    score = round(sum(c["weight"] for c in checks if c["pass"]) / total_w, 4) if total_w else 0.0

    for c in checks:
        if c["id"] == "order_id_fields" and not c["pass"]:
            c["detail"] = order_detail
        if c["id"] == "priority_map" and not c["pass"]:
            c["detail"] = prio_detail
        if c["id"] == "reply_key_contract" and not c["pass"]:
            c["detail"] = keys_detail
        if c["id"] == "escalation_notes_complete" and not c["pass"]:
            c["detail"] = esc_detail or c.get("detail")

    return {"task": "071-ecommerce-support-routing", "workspace": str(w), "outcome_score": score, "checks": checks}