Model Runs6 harnesses & 8 models evaluated on this task.
Loading...
PromptWorkspace, Tool Use & Multimodal Operations ยท Task 13
Build an offline batch normalization pipeline using only local files.
Input:
- $WORKSPACE/in/dropbox/
- $WORKSPACE/in/normalization_rules.yaml
Recursively scan all files. Convert supported valid records into one normalized JSON file per input under
$WORKSPACE/out/normalized/. Write deterministic ledgers.
Supported formats:
- customer CSV files with columns id,name,email,status
- order JSON files with order_id, customer_id, amount, currency
- note TXT files containing lines like Case: C-100, Owner: ana, Status: open
Create:
- $WORKSPACE/out/normalized/
- $WORKSPACE/out/index.csv
- $WORKSPACE/out/reject_ledger.csv
- $WORKSPACE/out/batch_summary.json
index.csv header: source_path,target_path,record_type,record_id,status
reject_ledger.csv header: source_path,error_type,details
Rules:
- Use workspace-relative source paths such as in/dropbox/customers_01.csv.
- Sort index rows and reject rows by source_path.
- If one input file contains multiple valid records, create one normalized JSON file per valid record.
- Treat record ids as globally unique across all supported files; later duplicate ids must be rejected.
- Do not modify or delete input files.
- Unknown extensions, malformed files, missing required fields, invalid status, invalid amount, and duplicate record ids must be recorded in reject_ledger.csv.
Input Files46 files
in/dropbox/bulk_unsupported/legacy_01.bak
in/dropbox/bulk_unsupported/legacy_02.bak
in/dropbox/bulk_unsupported/legacy_03.bak
in/dropbox/bulk_unsupported/legacy_04.bak
in/dropbox/bulk_unsupported/legacy_05.bak
in/dropbox/bulk_unsupported/legacy_06.bak
in/dropbox/bulk_unsupported/legacy_07.bak
in/dropbox/bulk_unsupported/legacy_08.bak
in/dropbox/bulk_unsupported/legacy_09.bak
in/dropbox/bulk_unsupported/legacy_10.bak
in/dropbox/bulk_unsupported/legacy_11.bak
in/dropbox/bulk_unsupported/legacy_12.bak
in/dropbox/bulk_unsupported/legacy_13.bak
in/dropbox/bulk_unsupported/legacy_14.bak
in/dropbox/bulk_unsupported/legacy_15.bak
in/dropbox/bulk_unsupported/legacy_16.bak
in/dropbox/bulk_unsupported/legacy_17.bak
in/dropbox/bulk_unsupported/legacy_18.bak
in/dropbox/bulk_unsupported/legacy_19.bak
in/dropbox/bulk_unsupported/legacy_20.bak
in/dropbox/bulk_unsupported/legacy_21.bak
in/dropbox/bulk_unsupported/legacy_22.bak
in/dropbox/bulk_unsupported/legacy_23.bak
in/dropbox/bulk_unsupported/legacy_24.bak
in/dropbox/bulk_unsupported/legacy_25.bak
in/dropbox/bulk_unsupported/legacy_26.bak
in/dropbox/bulk_unsupported/legacy_27.bak
in/dropbox/bulk_unsupported/legacy_28.bak
in/dropbox/bulk_unsupported/legacy_29.bak
in/dropbox/bulk_unsupported/legacy_30.bak
in/dropbox/customers/customers_01.csv
in/dropbox/customers/customers_bad.csv
in/dropbox/customers/customers_extra.csv
in/dropbox/misc/empty.md
in/dropbox/misc/image.png
in/dropbox/notes/case_500.txt
in/dropbox/notes/case_501.txt
in/dropbox/notes/case_502.txt
in/dropbox/notes/case_bad.txt
in/dropbox/orders/2026/order_100.json
in/dropbox/orders/2026/order_101.JSON
in/dropbox/orders/2026/order_102.json
in/dropbox/orders/2026/order_bad.json
in/dropbox/orders/2026/order_dup.json
in/dropbox/orders/2026/order_missing_currency.json
in/normalization_rules.yaml
LLM Rubric
USER_TEMPLATE = """This task: Build an offline batch normalization pipeline using only local files. Input: - $WORKSPACE/in/dropbox/ - $WORKSPACE/in/normalization_rules.yaml Recursively scan all files. Convert supported valid records into one normaliz...
Key outputs: in/dropbox/, in/normalization_rules.yaml, out/batch_summary.json, out/index.csv, out/normalized/.
Evaluate the agent run:
- tool_use_appropriate: uses tools that match the task modality and files/APIs.
- consistency: follows the prompt requirements and writes the expected artifacts.
- robustness: handles missing directories, malformed inputs, retries, or local-server issues when relevant.
Return ONLY JSON with scores, security_gate, and notes.
"""Completion Grader
def _sha256(path: Path) -> str:
h = hashlib.sha256()
with path.open("rb") as f:
for chunk in iter(lambda: f.read(65536), b""):
h.update(chunk)
return h.hexdigest()
def _add(checks: list[dict[str, Any]], cid: str, label: str, ok: bool, weight: float, detail: str | None = None) -> None:
checks.append({"id": cid, "label": label, "pass": ok, "weight": weight, "detail": None if ok else detail})
def _read_csv(path: Path) -> tuple[list[str] | None, list[dict[str, str]]]:
with path.open("r", encoding="utf-8", newline="") as f:
reader = csv.DictReader(f)
return reader.fieldnames, list(reader)
def _record_matches(actual: Any, expected: dict[str, Any]) -> bool:
if not isinstance(actual, dict):
return False
required = dict(expected)
# The prompt defines order inputs without a status field. The benchmark's
# canonical normalized order includes "active" for convenience, but omitting
# that synthesized status should not invalidate otherwise correct order
# normalization.
if required.get("record_type") == "order" and required.get("status") == "active" and "status" not in actual:
required.pop("status")
for key, expected_value in required.items():
actual_value = actual.get(key)
if isinstance(expected_value, float) and isinstance(actual_value, str):
try:
actual_value = float(actual_value)
except ValueError:
pass
if actual_value != expected_value:
return False
allowed_extra = {"source_path"}
return set(actual) <= (set(expected) | allowed_extra)
def _norm_error_type(value: str) -> str:
return {
"unknown_extension": "unsupported_extension",
"missing_required_fields": "missing_required_field",
}.get(value, value)
def _summary_value(summary: dict[str, Any], key: str) -> Any:
aliases = {
"success_count": ("success_count", "records_created", "normalized_count", "normalized_records", "valid_records"),
"reject_count": ("reject_count", "rejected_count", "rejected_records", "rejections", "invalid_files", "files_rejected"),
"by_type": ("by_type", "records_by_type", "type_counts"),
}
for candidate in aliases.get(key, (key,)):
if candidate in summary:
return summary[candidate]
return None
def score_workspace(workspace: Path) -> dict[str, Any]:
truth = json.loads((Path(__file__).resolve().parent / "ground_truth.json").read_text(encoding="utf-8"))
checks: list[dict[str, Any]] = []
for rel, expected in truth["source_hashes"].items():
path = workspace / rel
_add(checks, "source_" + rel.replace("/", "_"), f"{rel} unchanged", path.is_file() and _sha256(path) == expected, 0.02)
out_root = workspace / "out" / "normalized"
actual_names = sorted(p.name for p in out_root.glob("*.json")) if out_root.is_dir() else []
_add(checks, "file_set", "normalized file set exact", actual_names == sorted(truth["outputs"]), 3.0, repr(actual_names))
for name, expected in truth["outputs"].items():
try:
actual = json.loads((out_root / name).read_text(encoding="utf-8"))
_add(checks, "json_" + name, f"{name} parses", True, 0.5)
except Exception as exc:
actual = None
_add(checks, "json_" + name, f"{name} parses", False, 0.5, str(exc))
_add(checks, "content_" + name, f"{name} content matches required normalized fields", _record_matches(actual, expected), 2.0, repr(actual))
try:
fields, rows = _read_csv(workspace / "out" / "index.csv")
_add(checks, "index_read", "index.csv parses", True, 1.0)
except Exception as exc:
fields, rows = None, []
_add(checks, "index_read", "index.csv parses", False, 1.0, str(exc))
_add(checks, "index_header", "index header exact", fields == ["source_path", "target_path", "record_type", "record_id", "status"], 1.0, repr(fields))
expected_index_keys = [{k: r[k] for k in ("source_path", "target_path", "record_type", "record_id")} for r in truth["index_rows"]]
actual_index_keys = [{k: r.get(k, "") for k in ("source_path", "target_path", "record_type", "record_id")} for r in rows]
index_status_ok = all(r.get("status", "") != "" for r in rows) if len(rows) == len(truth["index_rows"]) else False
_add(checks, "index_rows", "index rows identify the exact successful records in deterministic order", actual_index_keys == expected_index_keys and index_status_ok, 4.0, repr(rows))
try:
rfields, rrows = _read_csv(workspace / "out" / "reject_ledger.csv")
_add(checks, "reject_read", "reject_ledger.csv parses", True, 1.0)
except Exception as exc:
rfields, rrows = None, []
_add(checks, "reject_read", "reject_ledger.csv parses", False, 1.0, str(exc))
_add(checks, "reject_header", "reject header exact", rfields == ["source_path", "error_type", "details"], 1.0, repr(rfields))
expected_rejects = {(r["source_path"], _norm_error_type(r["error_type"])) for r in truth["reject_rows"]}
actual_rejects = {(r.get("source_path", ""), _norm_error_type(r.get("error_type", ""))) for r in rrows}
_add(checks, "reject_rows", "reject ledger includes every expected source/error", expected_rejects <= actual_rejects and len(rrows) == len(truth["reject_rows"]), 4.0, repr(rrows))
try:
summary = json.loads((workspace / "out" / "batch_summary.json").read_text(encoding="utf-8"))
except Exception as exc:
summary = {}
_add(checks, "summary_parse", "batch_summary parses", False, 1.0, str(exc))
else:
_add(checks, "summary_parse", "batch_summary parses", isinstance(summary, dict), 1.0)
for key, expected in truth["summary"].items():
actual = _summary_value(summary, key)
if key == "by_type" and actual is None and _summary_value(summary, "success_count") == sum(expected.values()):
actual = expected
_add(checks, "summary_" + key, f"summary {key} exact", actual == expected, 1.0, repr(actual))
score = sum(c["weight"] for c in checks if c["pass"]) / sum(c["weight"] for c in checks)
deliverables = [
out_root.is_dir() and any(out_root.glob("*.json")),
(workspace / "out" / "index.csv").is_file(),
(workspace / "out" / "reject_ledger.csv").is_file(),
(workspace / "out" / "batch_summary.json").is_file(),
]
if not any(deliverables):
score = min(score, 0.02)
return {"task": "079-smallfile-batch-reject-ledger", "workspace": str(workspace), "checks": checks, "outcome_score": score}