Nested Archive Manifest Defense

Audit the nested archives in $WORKSPACE/in and write deterministic artifacts under $WORKSPACE/out.

Workspace, Tool Use & Multimodal OperationsTask 11Oracle + LLM scoring
Model Runs6 harnesses & 8 models evaluated on this task.
Loading...
PromptWorkspace, Tool Use & Multimodal Operations ยท Task 11

Audit the nested archives in $WORKSPACE/in and write deterministic artifacts under $WORKSPACE/out.

Inputs:

  • $WORKSPACE/in/incoming_a.zip
  • $WORKSPACE/in/incoming_b.tar
  • $WORKSPACE/in/expected_manifest_rules.json
  • $WORKSPACE/in/checksums_hint.csv

Requirements:

  1. Inspect zip/tar/tar.gz contents, including nested archives.
  2. Extract only safe regular files into $WORKSPACE/out/extracted_safe/.
  3. Reject entries with path traversal, absolute paths, control characters, leading/trailing path segment whitespace, duplicate normalized logical paths, unsupported extensions, or files over the size limit.
  4. Do not write anything outside $WORKSPACE/out.
  5. Do not modify files in $WORKSPACE/in.

Create:

  • $WORKSPACE/out/manifest.json
  • $WORKSPACE/out/rejected_entries.csv
  • $WORKSPACE/out/extracted_safe/
  • $WORKSPACE/out/audit_summary.md

manifest.json format:
{
"files": [
{
"archive_chain": ["incoming_a.zip", "nested/payload.tar.gz"],
"logical_path": "incoming_a/nested_payload/safe/reports/q1.csv",
"size_bytes": 31,
"sha256": "...",
"media_type": "text/csv"
}
]
}

Sort manifest files by logical_path. Sort rejected_entries.csv by archive_chain then raw_name.
rejected_entries.csv header must be: archive_chain,raw_name,reason,normalized_candidate

Input Files4 files
in/checksums_hint.csv
in/expected_manifest_rules.json
in/incoming_a.zip
in/incoming_b.tar
LLM Rubric
USER_TEMPLATE = """This task: Audit the nested archives in $WORKSPACE/in and write deterministic artifacts under $WORKSPACE/out. Inputs: - $WORKSPACE/in/incoming_a.zip - $WORKSPACE/in/incoming_b.tar - $WORKSPACE/in/expected_manifest_rules.json - $WOR...

Key outputs: in, in., in/checksums_hint.csv, in/expected_manifest_rules.json, in/incoming_a.zip.

Evaluate the agent run:
- tool_use_appropriate: uses tools that match the task modality and files/APIs.
- consistency: follows the prompt requirements and writes the expected artifacts.
- robustness: handles missing directories, malformed inputs, retries, or local-server issues when relevant.

Return ONLY JSON with scores, security_gate, and notes.
"""
Completion Grader
def _sha256(path: Path) -> str:
    h = hashlib.sha256()
    with path.open("rb") as f:
        for chunk in iter(lambda: f.read(65536), b""):
            h.update(chunk)
    return h.hexdigest()

def _add(checks: list[dict[str, Any]], cid: str, label: str, ok: bool, weight: float, detail: str | None = None) -> None:
    checks.append({"id": cid, "label": label, "pass": ok, "weight": weight, "detail": None if ok else detail})

def _norm_media(value: Any) -> Any:
    if value == "application/x-yaml":
        return "application/yaml"
    return value

def _norm_manifest_rows(rows: Any) -> Any:
    if not isinstance(rows, list):
        return rows
    out = []
    for row in rows:
        if not isinstance(row, dict):
            out.append(row)
            continue
        r = dict(row)
        r["media_type"] = _norm_media(r.get("media_type"))
        # The prompt's example names nested archive output with "nested_payload",
        # but it does not explicitly prescribe the generated directory name.
        # Treat the common "payload" choice as equivalent while still checking
        # hashes, sizes, archive chains, and duplicate handling separately.
        logical_path = r.get("logical_path")
        if isinstance(logical_path, str):
            r["logical_path"] = logical_path.replace("incoming_a/payload/", "incoming_a/nested_payload/", 1)
        out.append(r)
    return out

def _safe_key(path: str) -> str:
    return path.replace("incoming_a/payload/", "incoming_a/nested_payload/", 1)

def _norm_chain(value: str) -> str:
    if value.startswith("incoming_a.zip/nested/"):
        return "incoming_a.zip>nested/" + value.split("incoming_a.zip/nested/", 1)[1]
    return value

def _reject_key(row: dict[str, str]) -> tuple[str, str, str]:
    raw_name = row.get("raw_name", "").replace("
", "\n")
    return (_norm_chain(row.get("archive_chain", "")), raw_name, row.get("reason", ""))

def score_workspace(workspace: Path) -> dict[str, Any]:
    truth = json.loads((Path(__file__).resolve().parent / "ground_truth.json").read_text(encoding="utf-8"))
    checks: list[dict[str, Any]] = []
    for rel, expected in truth["source_hashes"].items():
        path = workspace / rel
        _add(checks, "source_" + Path(rel).name, f"{rel} unchanged", path.is_file() and _sha256(path) == expected, 1.0)

    try:
        manifest = json.loads((workspace / "out" / "manifest.json").read_text(encoding="utf-8"))
        files = manifest.get("files")
        _add(checks, "manifest_parse", "manifest.json parses as object", isinstance(manifest, dict), 1.0)
    except Exception as exc:
        files = None
        _add(checks, "manifest_parse", "manifest.json parses as object", False, 1.0, str(exc))
    actual_manifest = _norm_manifest_rows(files)
    expected_manifest = _norm_manifest_rows(truth["manifest_files"])
    if isinstance(actual_manifest, list) and isinstance(expected_manifest, list):
        actual_set = {json.dumps(r, sort_keys=True) for r in actual_manifest}
        expected_set = {json.dumps(r, sort_keys=True) for r in expected_manifest}
        required_ok = expected_set <= actual_set
        no_extra_ok = actual_set == expected_set
    else:
        required_ok = no_extra_ok = False
    _add(checks, "manifest_required", "manifest contains all required safe files with hashes, sizes, and chains", required_ok, 6.0, repr(files))
    _add(checks, "manifest_no_extra", "manifest has no extra accepted files", no_extra_ok, 2.0, repr(files))
    sorted_ok = isinstance(files, list) and files == sorted(files, key=lambda r: r.get("logical_path", ""))
    _add(checks, "manifest_sorted", "manifest sorted by logical_path", sorted_ok, 1.0)

    safe_root = workspace / "out" / "extracted_safe"
    actual_safe = {}
    if safe_root.is_dir():
        for p in safe_root.rglob("*"):
            if p.is_file():
                actual_safe[_safe_key(p.relative_to(safe_root).as_posix())] = _sha256(p)
    expected_safe = truth["safe_files"]
    required_safe_ok = all(actual_safe.get(path) == digest for path, digest in expected_safe.items())
    no_extra_safe_ok = actual_safe == expected_safe
    _add(checks, "safe_files_required", "extracted_safe contains all required safe files with hashes", required_safe_ok, 4.0, repr(actual_safe))
    _add(checks, "safe_files_no_extra", "extracted_safe has no extra files", no_extra_safe_ok, 1.0, repr(actual_safe))
    escaped = [workspace.parent / "escape.txt", workspace / "escape.txt", workspace / "out" / "escape.txt"]
    _add(checks, "no_escape_file", "path traversal did not materialize escape.txt", not any(p.exists() for p in escaped), 2.0)

    try:
        with (workspace / "out" / "rejected_entries.csv").open("r", encoding="utf-8", newline="") as f:
            reader = csv.DictReader(f)
            rows = list(reader)
            fields = reader.fieldnames
        _add(checks, "reject_read", "rejected_entries.csv parses", True, 1.0)
    except Exception as exc:
        rows, fields = [], None
        _add(checks, "reject_read", "rejected_entries.csv parses", False, 1.0, str(exc))
    _add(checks, "reject_header", "reject header exact", fields == ["archive_chain", "raw_name", "reason", "normalized_candidate"], 1.0, repr(fields))
    expected = truth["rejected_entries"]
    expected_keys = set(_reject_key(r) for r in expected)
    expected_core_keys = set(_reject_key(r) for r in expected if r.get("reason") != "duplicate_logical_path")
    expected_duplicate_keys = set(_reject_key(r) for r in expected if r.get("reason") == "duplicate_logical_path")
    actual_keys = set(_reject_key(r) for r in rows)
    # normalized_candidate is helpful audit metadata, but the prompt does not define how to fill it
    # for rejected absolute/traversal paths. The security-critical contract is the raw entry,
    # archive chain, and rejection reason, with no extra/missing rows.
    _add(checks, "reject_core_required", "reject rows include every required unsafe/unsupported entry and reason", expected_core_keys <= actual_keys, 4.0, repr(rows))
    _add(checks, "reject_duplicate_required", "reject rows include duplicate logical path entries", expected_duplicate_keys <= actual_keys, 1.0, repr(rows))
    _add(checks, "reject_no_extra", "reject rows have no extra rejected entries", actual_keys <= expected_keys, 1.0, repr(rows))

    summary = (workspace / "out" / "audit_summary.md").read_text(encoding="utf-8", errors="replace").lower() if (workspace / "out" / "audit_summary.md").is_file() else ""
    _add(checks, "summary_terms", "audit summary mentions key rejection classes", all(t in summary for t in truth["required_summary_terms"]), 2.0)
    score = sum(c["weight"] for c in checks if c["pass"]) / sum(c["weight"] for c in checks)
    return {"task": "077-archive-manifest-defense", "workspace": str(workspace), "checks": checks, "outcome_score": score}