Rule-based anomalous transaction detection

Detect suspicious transactions using the local rulebook.

Data, BI & Finance AnalyticsTask 5Oracle + LLM scoring
Model Runs6 harnesses & 8 models evaluated on this task.
Loading...
PromptData, BI & Finance Analytics ยท Task 5

Detect suspicious transactions using the local rulebook.

Inputs:

  • $WORKSPACE/in/transactions.csv
  • $WORKSPACE/in/rules.md

Create:

  • $WORKSPACE/out/suspicious_transactions.csv
  • $WORKSPACE/out/rule_audit.json
  • $WORKSPACE/out/case_notes.md

suspicious_transactions.csv columns, in this exact order: transaction_id,customer_id,rule_id,risk_level,reason

Rules:

  1. Do not modify fixtures.
  2. Apply only the rules in rules.md.
  3. Include every transaction that triggers at least one rule.
  4. If a transaction triggers multiple rules, use the highest-risk rule and mention other triggered rule ids in reason.
  5. Do not include transactions that do not trigger any rule.
  6. For card velocity, evaluate transactions by timestamp, not CSV row order.
  7. case_notes.md must summarize the number of suspicious transactions and mention each triggered rule_id.

rule_audit.json requirements:

  • Valid JSON with keys: suspicious_transaction_ids, non_suspicious_transaction_ids, rule_counts, secondary_rule_ids.
  • suspicious_transaction_ids and non_suspicious_transaction_ids must be sorted arrays.
  • rule_counts maps each rule_id to the number of suspicious transactions whose primary rule_id is that rule.
  • secondary_rule_ids maps only multi-rule transaction_ids to sorted arrays of the other triggered rule ids mentioned in the CSV reason.

Do not use network access or external APIs.

Input Files2 files
in/rules.md
in/transactions.csv
LLM Rubric
USER_TEMPLATE = (
    "Task name: {task_name}\n\n"
    + _dr.RUBRIC_IGNORE_BOOTSTRAP_READS
    + "\n\n"
    + _dr.RUBRIC_USER_OUTCOME_NOTICE
    + "\n\nEvaluate process quality for rule-based transaction monitoring: faithful rule application, low false-positive mindset, "
    "clear case notes, and no invented rules. Exact detections are checked by oracle_grade.py.\n\n"
    + "\n\n"
    + _dr.RUBRIC_USER_SECURITY_GATE_DETAIL
    + "\n\n"
    + _dr.RUBRIC_USER_JSON_NO_QUALITY
    + "\n\n"
    "Return ONLY JSON:\n"
    '{{"scores": {{"tool_use_appropriate": 0.0, "consistency": 0.0, "robustness": 0.0}}, "security_gate": 1, "notes": "one line"}}\n\n'
    "Optional **total**: harness recomputes mean of three process scores.\n\n--- PROXY TRACE JSON BELOW ---\n{payload}"
)
Completion Grader
def _add(checks: list[dict[str, Any]], cid: str, ok: bool, weight: float, detail: str | None = None) -> None:
    checks.append({"id": cid, "pass": bool(ok), "weight": weight, "detail": None if ok else detail})


def _read(path: Path) -> tuple[list[str], list[dict[str, str]]]:
    with path.open(newline="", encoding="utf-8") as f:
        reader = csv.DictReader(f)
        rows = []
        for row in reader:
            rows.append({k: ("" if v is None else str(v).strip()) for k, v in row.items() if k is not None})
        return list(reader.fieldnames or []), rows


def _sha256(path: Path) -> str:
    return hashlib.sha256(path.read_bytes()).hexdigest()


def _fixtures_unchanged(workspace: Path, gt: dict[str, Any]) -> bool:
    for rel, digest in gt.get("fixture_hashes", {}).items():
        candidate = workspace / "in" / rel
        if not candidate.is_file() or _sha256(candidate) != digest:
            return False
    return True


def _reason_explains_rule(row: dict[str, str]) -> bool:
    reason = row.get("reason", "").lower()
    rule_id = row.get("rule_id", "")
    if rule_id.lower() in reason:
        return True
    terms = _REASON_TERMS.get(rule_id, ())
    return bool(reason) and sum(term in reason for term in terms) >= 2


def _notes_count_ok(text: str) -> bool:
    t = " ".join(text.lower().split())
    return (
        any(phrase in t for phrase in ("18 suspicious", "eighteen suspicious", "18 transactions", "eighteen transactions"))
        or re.search(r"suspicious transactions[^0-9a-z]+detected[^0-9a-z]+18\b", t) is not None
        or re.search(r"total[^0-9a-z]+suspicious transactions[^0-9a-z]+18\b", t) is not None
    )


def score_workspace(workspace: str | Path, *, ground_truth_path: Path | None = None) -> dict[str, Any]:
    w = Path(workspace).resolve()
    gt = json.loads((ground_truth_path or _DEFAULT_GT).read_text(encoding="utf-8"))
    checks: list[dict[str, Any]] = []
    _add(checks, "fixtures_present", (w / "in" / "transactions.csv").is_file() and (w / "in" / "rules.md").is_file(), 0.08, "missing fixture")
    _add(checks, "fixtures_unchanged", _fixtures_unchanged(w, gt), 0.08, "one or more input files are missing or modified")
    out = w / gt["outputs"]["csv"]
    audit = w / gt["outputs"].get("audit", "out/rule_audit.json")
    notes = w / gt["outputs"]["notes"]
    _add(checks, "csv_exists", out.is_file(), 0.08, "missing suspicious_transactions.csv")
    _add(checks, "rule_audit_exists", audit.is_file(), 0.08, "missing rule_audit.json")
    _add(checks, "notes_exists", notes.is_file(), 0.06, "missing case_notes.md")
    if out.is_file():
        try:
            header, rows = _read(out)
            slim = sorted(
                [{k: r.get(k, "") for k in ["transaction_id", "customer_id", "rule_id", "risk_level"]} for r in rows],
                key=lambda r: r["transaction_id"],
            )
            expected = sorted(gt["rows"], key=lambda r: r["transaction_id"])
            ids = {r["transaction_id"] for r in rows}
            _add(checks, "exact_header", header == gt["header"], 0.08, f"got {header}")
            by_id = {r.get("transaction_id", ""): r for r in rows}
            _add(checks, "expected_suspicious_set", slim == expected, 0.30, f"got {slim}")
            _add(checks, "no_false_positives", not (ids & set(gt["non_suspicious"])), 0.10, f"false positives {sorted(ids & set(gt['non_suspicious']))}")
            _add(checks, "reason_explains_rule", all(_reason_explains_rule(r) for r in rows), 0.08, "each reason must explain the triggered rule")
            _add(checks, "velocity_inclusive_boundary", {"T010", "T011", "T012"} <= ids and by_id.get("T010", {}).get("rule_id") == "R3_CARD_VELOCITY", 0.06, "inclusive 10-minute same-card window missed")
            _add(checks, "velocity_exclusive_boundary", not ({"T013", "T014", "T015"} & ids), 0.04, "10 minutes and 1 second should not qualify")
            _add(checks, "velocity_by_card_not_customer", not ({"T016", "T017", "T018"} & ids), 0.04, "velocity must group by card_id, not customer_id")
            _add(checks, "timestamp_order_velocity", {"T019", "T020", "T021"} <= ids, 0.06, "velocity must use timestamp order, not CSV order")
            _add(checks, "second_unsorted_velocity_window", {"T023", "T024", "T025"} <= ids, 0.05, "second out-of-order same-card velocity window missed")
            _add(checks, "near_miss_velocity_and_thresholds", not ({"T026", "T027", "T028", "T029", "T030"} & ids), 0.05, "near-miss velocity or threshold transactions should not be suspicious")
            _add(checks, "multi_rule_highest_risk", by_id.get("T011", {}).get("rule_id") == "R1_HIGH_VALUE" and by_id.get("T012", {}).get("rule_id") == "R2_GEO_AMOUNT", 0.04, "highest-risk rule not selected")
            t022_reason = by_id.get("T022", {}).get("reason", "").lower()
            _add(checks, "high_risk_tiebreak", by_id.get("T022", {}).get("rule_id") == "R1_HIGH_VALUE" and "r2" in t022_reason and "r4" in t022_reason, 0.04, "same-risk tie or secondary rule mention wrong for T022")
            t031_reason = by_id.get("T031", {}).get("reason", "").lower()
            _add(checks, "second_high_risk_tiebreak", by_id.get("T031", {}).get("rule_id") == "R1_HIGH_VALUE" and "r2" in t031_reason and "r4" in t031_reason, 0.04, "same-risk tie or secondary rule mention wrong for T031")
            secondary_ok = True
            secondary_detail = {}
            for txid, rules in gt.get("secondary_rules", {}).items():
                reason = by_id.get(txid, {}).get("reason", "").lower()
                missing = [rid for rid in rules if rid.lower() not in reason and rid.split("_", 1)[0].lower() not in reason]
                secondary_detail[txid] = missing
                secondary_ok = secondary_ok and not missing
            _add(checks, "secondary_rules_mentioned", secondary_ok, 0.05, f"missing secondary rules {secondary_detail}")
        except Exception as exc:
            _add(checks, "csv_readable", False, 0.30, str(exc))
    else:
        for cid, weight in [
            ("exact_header", 0.08),
            ("expected_suspicious_set", 0.30),
            ("no_false_positives", 0.10),
            ("reason_explains_rule", 0.08),
            ("velocity_inclusive_boundary", 0.06),
            ("velocity_exclusive_boundary", 0.04),
            ("velocity_by_card_not_customer", 0.04),
            ("timestamp_order_velocity", 0.06),
            ("second_unsorted_velocity_window", 0.05),
            ("near_miss_velocity_and_thresholds", 0.05),
            ("multi_rule_highest_risk", 0.04),
            ("high_risk_tiebreak", 0.04),
            ("second_high_risk_tiebreak", 0.04),
            ("secondary_rules_mentioned", 0.05),
        ]:
            _add(checks, cid, False, weight, "missing suspicious_transactions.csv")
    if notes.is_file():
        text = notes.read_text(encoding="utf-8", errors="replace")
        for rid in gt["rule_ids"]:
            _add(checks, f"notes_mentions_{rid}", rid in text, 0.015, f"missing {rid}")
        _add(checks, "notes_count", _notes_count_ok(text), 0.04, "notes should mention thirteen suspicious transactions")
    if audit.is_file():
        try:
            data = json.loads(audit.read_text(encoding="utf-8"))
            expected = gt["rule_audit_expected"]
            _add(checks, "rule_audit_keys", isinstance(data, dict) and set(data) == set(expected), 0.04, f"got keys {sorted(data) if isinstance(data, dict) else type(data)}")
            _add(checks, "rule_audit_suspicious_ids", data.get("suspicious_transaction_ids") == expected["suspicious_transaction_ids"], 0.08, f"got {data.get('suspicious_transaction_ids')}")
            _add(checks, "rule_audit_non_suspicious_ids", data.get("non_suspicious_transaction_ids") == expected["non_suspicious_transaction_ids"], 0.08, f"got {data.get('non_suspicious_transaction_ids')}")
            _add(checks, "rule_audit_counts", data.get("rule_counts") == expected["rule_counts"], 0.08, f"got {data.get('rule_counts')}")
            _add(checks, "rule_audit_secondary_rules", data.get("secondary_rule_ids") == expected["secondary_rule_ids"], 0.08, f"got {data.get('secondary_rule_ids')}")
        except Exception as exc:
            _add(checks, "rule_audit_parseable", False, 0.20, str(exc))
    else:
        for cid, weight in [
            ("rule_audit_keys", 0.04),
            ("rule_audit_suspicious_ids", 0.08),
            ("rule_audit_non_suspicious_ids", 0.08),
            ("rule_audit_counts", 0.08),
            ("rule_audit_secondary_rules", 0.08),
        ]:
            _add(checks, cid, False, weight, "missing rule_audit.json")
    total_w = sum(c["weight"] for c in checks)
    score = round(sum(c["weight"] for c in checks if c["pass"]) / total_w, 4) if total_w else 0.0
    if any(c["id"].startswith("rule_audit_") and not c["pass"] for c in checks):
        score = min(score, 0.69)
    return {"task": "053-anomalous-transaction-detect", "workspace": str(w), "checks": checks, "outcome_score": score, "score": score}