Local API Cursor Retry Ledger

Use only the task-provided mock API at $MOCK_API_BASE. It may be exposed through a public tunnel to a local benchmark server. Do not use any other internet resources.

Workspace, Tool Use & Multimodal OperationsTask 12Oracle + LLM scoring
Model Runs6 harnesses & 8 models evaluated on this task.
Loading...
PromptWorkspace, Tool Use & Multimodal Operations ยท Task 12

Use only the task-provided mock API at $MOCK_API_BASE. It may be exposed through a public tunnel to a local benchmark server. Do not use any other internet resources.

Read the contract at $WORKSPACE/in/api_contract.md, then fetch all datasets, jobs, and artifacts.
The API uses cursor pagination and may return 429, 503, and one cursor_expired response. Retry transient
failures locally. If a cursor expires, use the checkpoint endpoint described in the contract and continue.

Create:

  • $WORKSPACE/out/api_inventory.json
  • $WORKSPACE/out/retry_ledger.csv
  • $WORKSPACE/out/data_quality_report.md

api_inventory.json format:
{
"source_base_url": "the exact base URL",
"dataset_count": 3,
"artifact_count": 5,
"jobs_by_status": {"failed": 1, "running": 1, "succeeded": 2},
"datasets": [
{
"dataset_id": "ds-001",
"name": "Northwind Orders",
"owner": "ana",
"job_count": 2,
"artifact_count": 2,
"latest_job_status": "succeeded",
"orphan_artifact_count": 0
}
]
}

Sort datasets by dataset_id. Exclude duplicate artifact ids from artifact_count, but mention duplicates in data_quality_report.md.
Exclude orphan artifacts from per-dataset artifact_count, but mention them in data_quality_report.md.

retry_ledger.csv header must be: endpoint,cursor_or_page,status_code,action,attempts

Input Files4 files
in/api_contract.md
in/api_seed/artifacts.json
in/api_seed/datasets.json
in/api_seed/jobs.json
Hooks
def prepare_runtime(runtime: dict[str, Any]) -> dict[str, Any]:
    workspace = Path(runtime["workspace"])
    data_dir = workspace / "in" / "api_seed"
    log_path = workspace / "out" / "api_access.log"
    log_path.parent.mkdir(parents=True, exist_ok=True)
    port = 36000 + random.randint(0, 2000)
    script = textwrap.dedent(f"""
        import json
        from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
        from pathlib import Path
        from urllib.parse import parse_qs, urlparse

        DATA_DIR = Path({str(data_dir)!r})
        LOG_PATH = Path({str(log_path)!r})

        def page(items, cursor):
            size = 2
            start = 0 if cursor in ("", "START", None) else int(cursor)
            chunk = items[start:start + size]
            next_cursor = str(start + size) if start + size < len(items) else None
            return {{"items": chunk, "next_cursor": next_cursor}}

        class Handler(BaseHTTPRequestHandler):
            def do_GET(self):
                parsed = urlparse(self.path)
                qs = parse_qs(parsed.query)
                with LOG_PATH.open("a", encoding="utf-8") as f:
                    f.write(parsed.path + "?" + parsed.query + "\n")
                attempts = getattr(self.server, "attempts", {{}})
                key = self.path
                attempts[key] = attempts.get(key, 0) + 1
                self.server.attempts = attempts

                if parsed.path == "/checkpoint":
                    body = json.dumps({{"cursor": "2"}}, sort_keys=True).encode()
                    self.send_response(200)
                    self.send_header("Content-Type", "application/json")
                    self.send_header("Content-Length", str(len(body)))
                    self.end_headers()
                    self.wfile.write(body)
                    return

                mapping = {{"/datasets": "datasets.json", "/jobs": "jobs.json", "/artifacts": "artifacts.json"}}
                if parsed.path not in mapping:
                    self.send_response(404); self.end_headers(); return
                cursor = qs.get("cursor", ["START"])[0]
                if parsed.path == "/datasets" and cursor == "START" and attempts[key] == 1:
                    self.send_response(429)
                    self.send_header("Retry-After", "0")
                    self.end_headers()
                    self.wfile.write(b"rate limited")
                    return
                if parsed.path == "/jobs" and cursor == "2" and attempts[key] == 1:
                    self.send_response(503)
                    self.end_headers()
                    self.wfile.write(b"try again")
                    return
                if parsed.path == "/artifacts" and cursor == "2" and attempts[key] == 1:
                    body = json.dumps({{"error": "cursor_expired", "checkpoint": "artifacts-restart"}}).encode()
                    self.send_response(410)
                    self.send_header("Content-Type", "application/json")
                    self.send_header("Content-Length", str(len(body)))
                    self.end_headers()
                    self.wfile.write(body)
                    return
                items = json.loads((DATA_DIR / mapping[parsed.path]).read_text())
                body = json.dumps(page(items, cursor), sort_keys=True).encode()
                self.send_response(200)
                self.send_header("Content-Type", "application/json")
                self.send_header("Content-Length", str(len(body)))
                self.end_headers()
                self.wfile.write(body)

            def log_message(self, fmt, *args):
                return

        ThreadingHTTPServer(("127.0.0.1", {port}), Handler).serve_forever()
    """)
    proc = subprocess.Popen([sys.executable, "-c", script], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
    time.sleep(0.3)
    local_api = f"http://127.0.0.1:{port}"
    try:
        public_api, tunnel_proc = _start_public_tunnel(local_api)
    except Exception:
        try:
            proc.terminate()
        except OSError:
            pass
        raise
    return {"MOCK_API_BASE": public_api, "server_pid": proc.pid, "tunnel_pid": tunnel_proc.pid if tunnel_proc else 0}

def cleanup_runtime(runtime: dict[str, Any], state: dict[str, Any]) -> None:
    for key in ("tunnel_pid", "server_pid"):
        pid = int(state.get(key, 0) or 0)
        if not pid:
            continue
        try:
            os.kill(pid, 15)
        except OSError:
            pass
LLM Rubric
USER_TEMPLATE = """This task: Use only the task-provided mock API at $MOCK_API_BASE. It may be exposed through a public tunnel to a local benchmark server. Do not use any other internet resources. Read the contract at $WORKSPACE/in/api_contract.md, t...

Key outputs: in/api_contract.md, out/api_inventory.json, out/data_quality_report.md, out/retry_ledger.csv.

Evaluate the agent run:
- tool_use_appropriate: uses tools that match the task modality and files/APIs.
- consistency: follows the prompt requirements and writes the expected artifacts.
- robustness: handles missing directories, malformed inputs, retries, or local-server issues when relevant.

Return ONLY JSON with scores, security_gate, and notes.
"""
Completion Grader
def _looks_like_task_url(value: Any) -> bool:
    if not isinstance(value, str):
        return False
    return value.startswith(("http://127.0.0.1:", "http://localhost:", "https://")) and not value.rstrip("/").endswith("example.com")

def _add(checks: list[dict[str, Any]], cid: str, label: str, ok: bool, weight: float, detail: str | None = None) -> None:
    checks.append({"id": cid, "label": label, "pass": ok, "weight": weight, "detail": None if ok else detail})

def _artifact_count_ok(value: Any, truth: dict[str, Any]) -> bool:
    # The prompt example says artifact_count is 5 after deduplicating IDs, while
    # the original ground truth used 4 by excluding the orphan from the global
    # total. Accept either interpretation; per-dataset counts still enforce that
    # orphan artifacts are not assigned to a dataset.
    return value in {truth["artifact_count"], truth["artifact_count"] + 1}

def score_workspace(workspace: Path) -> dict[str, Any]:
    truth = json.loads((Path(__file__).resolve().parent / "ground_truth.json").read_text(encoding="utf-8"))
    checks: list[dict[str, Any]] = []
    try:
        inv = json.loads((workspace / "out" / "api_inventory.json").read_text(encoding="utf-8"))
        _add(checks, "json_parse", "api_inventory.json parses", isinstance(inv, dict), 1.0)
    except Exception as exc:
        inv = {}
        _add(checks, "json_parse", "api_inventory.json parses", False, 1.0, str(exc))
    _add(checks, "source_url", "source_base_url records the provided mock API", _looks_like_task_url(inv.get("source_base_url")), 1.0, repr(inv.get("source_base_url")))
    _add(checks, "dataset_count", "dataset_count exact", inv.get("dataset_count") == truth["dataset_count"], 2.0, repr(inv.get("dataset_count")))
    _add(checks, "artifact_count", "artifact_count follows a documented global-count interpretation", _artifact_count_ok(inv.get("artifact_count"), truth), 2.0, repr(inv.get("artifact_count")))
    _add(checks, "jobs_by_status", "jobs_by_status exact", inv.get("jobs_by_status") == truth["jobs_by_status"], 2.0, repr(inv.get("jobs_by_status")))
    _add(checks, "datasets", "datasets exact", inv.get("datasets") == truth["datasets"], 3.0, repr(inv.get("datasets")))

    try:
        with (workspace / "out" / "retry_ledger.csv").open("r", encoding="utf-8", newline="") as f:
            reader = csv.DictReader(f)
            rows = list(reader)
            fields = reader.fieldnames
        _add(checks, "ledger_read", "retry_ledger.csv parses", True, 1.0)
    except Exception as exc:
        rows, fields = [], None
        _add(checks, "ledger_read", "retry_ledger.csv parses", False, 1.0, str(exc))
    _add(checks, "ledger_header", "retry ledger header exact", fields == ["endpoint", "cursor_or_page", "status_code", "action", "attempts"], 1.0, repr(fields))
    codes = {r.get("status_code", "") for r in rows}
    actions = " ".join(r.get("action", "").lower() for r in rows)
    cursor_recovery_logged = "410" in codes or "checkpoint" in actions or "cursor" in actions
    _add(checks, "ledger_events", "ledger records 429, 503, and cursor recovery", {"429", "503"} <= codes and cursor_recovery_logged, 3.0, repr(rows))

    seen = (workspace / "out" / "api_access.log").read_text(encoding="utf-8", errors="replace").splitlines() if (workspace / "out" / "api_access.log").is_file() else []
    for endpoint in ("/datasets", "/jobs", "/artifacts", "/checkpoint"):
        weight = 3.0 if endpoint == "/checkpoint" else 1.0
        _add(checks, "visited_" + endpoint.strip("/"), f"server log includes {endpoint}", any(s.startswith(endpoint + "?") for s in seen), weight, repr(seen))
    _add(checks, "retry_observed", "server log shows retries", sum(1 for s in seen if s.startswith("/datasets?cursor=START")) >= 2 and sum(1 for s in seen if s.startswith("/jobs?cursor=2")) >= 2, 2.0, repr(seen))

    report = (workspace / "out" / "data_quality_report.md").read_text(encoding="utf-8", errors="replace").lower() if (workspace / "out" / "data_quality_report.md").is_file() else ""
    _add(checks, "quality_terms", "quality report covers duplicate, orphan, cursor, 429, and 503", all(t in report for t in truth["quality_terms"]), 3.0)
    score = sum(c["weight"] for c in checks if c["pass"]) / sum(c["weight"] for c in checks)
    return {"task": "078-local-api-cursor-retry-ledger", "workspace": str(workspace), "checks": checks, "outcome_score": score}