Use only the task-provided mock API at $MOCK_API_BASE. It may be exposed through a public tunnel to a local benchmark server. Do not use any other internet resources.
Read the contract at $WORKSPACE/in/api_contract.md, then fetch all datasets, jobs, and artifacts.
The API uses cursor pagination and may return 429, 503, and one cursor_expired response. Retry transient
failures locally. If a cursor expires, use the checkpoint endpoint described in the contract and continue.
Create:
- $WORKSPACE/out/api_inventory.json
- $WORKSPACE/out/retry_ledger.csv
- $WORKSPACE/out/data_quality_report.md
api_inventory.json format:
{
"source_base_url": "the exact base URL",
"dataset_count": 3,
"artifact_count": 5,
"jobs_by_status": {"failed": 1, "running": 1, "succeeded": 2},
"datasets": [
{
"dataset_id": "ds-001",
"name": "Northwind Orders",
"owner": "ana",
"job_count": 2,
"artifact_count": 2,
"latest_job_status": "succeeded",
"orphan_artifact_count": 0
}
]
}
Sort datasets by dataset_id. Exclude duplicate artifact ids from artifact_count, but mention duplicates in data_quality_report.md.
Exclude orphan artifacts from per-dataset artifact_count, but mention them in data_quality_report.md.
retry_ledger.csv header must be: endpoint,cursor_or_page,status_code,action,attempts
def prepare_runtime(runtime: dict[str, Any]) -> dict[str, Any]:
workspace = Path(runtime["workspace"])
data_dir = workspace / "in" / "api_seed"
log_path = workspace / "out" / "api_access.log"
log_path.parent.mkdir(parents=True, exist_ok=True)
port = 36000 + random.randint(0, 2000)
script = textwrap.dedent(f"""
import json
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from pathlib import Path
from urllib.parse import parse_qs, urlparse
DATA_DIR = Path({str(data_dir)!r})
LOG_PATH = Path({str(log_path)!r})
def page(items, cursor):
size = 2
start = 0 if cursor in ("", "START", None) else int(cursor)
chunk = items[start:start + size]
next_cursor = str(start + size) if start + size < len(items) else None
return {{"items": chunk, "next_cursor": next_cursor}}
class Handler(BaseHTTPRequestHandler):
def do_GET(self):
parsed = urlparse(self.path)
qs = parse_qs(parsed.query)
with LOG_PATH.open("a", encoding="utf-8") as f:
f.write(parsed.path + "?" + parsed.query + "\n")
attempts = getattr(self.server, "attempts", {{}})
key = self.path
attempts[key] = attempts.get(key, 0) + 1
self.server.attempts = attempts
if parsed.path == "/checkpoint":
body = json.dumps({{"cursor": "2"}}, sort_keys=True).encode()
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
return
mapping = {{"/datasets": "datasets.json", "/jobs": "jobs.json", "/artifacts": "artifacts.json"}}
if parsed.path not in mapping:
self.send_response(404); self.end_headers(); return
cursor = qs.get("cursor", ["START"])[0]
if parsed.path == "/datasets" and cursor == "START" and attempts[key] == 1:
self.send_response(429)
self.send_header("Retry-After", "0")
self.end_headers()
self.wfile.write(b"rate limited")
return
if parsed.path == "/jobs" and cursor == "2" and attempts[key] == 1:
self.send_response(503)
self.end_headers()
self.wfile.write(b"try again")
return
if parsed.path == "/artifacts" and cursor == "2" and attempts[key] == 1:
body = json.dumps({{"error": "cursor_expired", "checkpoint": "artifacts-restart"}}).encode()
self.send_response(410)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
return
items = json.loads((DATA_DIR / mapping[parsed.path]).read_text())
body = json.dumps(page(items, cursor), sort_keys=True).encode()
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def log_message(self, fmt, *args):
return
ThreadingHTTPServer(("127.0.0.1", {port}), Handler).serve_forever()
""")
proc = subprocess.Popen([sys.executable, "-c", script], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
time.sleep(0.3)
local_api = f"http://127.0.0.1:{port}"
try:
public_api, tunnel_proc = _start_public_tunnel(local_api)
except Exception:
try:
proc.terminate()
except OSError:
pass
raise
return {"MOCK_API_BASE": public_api, "server_pid": proc.pid, "tunnel_pid": tunnel_proc.pid if tunnel_proc else 0}
def cleanup_runtime(runtime: dict[str, Any], state: dict[str, Any]) -> None:
for key in ("tunnel_pid", "server_pid"):
pid = int(state.get(key, 0) or 0)
if not pid:
continue
try:
os.kill(pid, 15)
except OSError:
passUSER_TEMPLATE = """This task: Use only the task-provided mock API at $MOCK_API_BASE. It may be exposed through a public tunnel to a local benchmark server. Do not use any other internet resources. Read the contract at $WORKSPACE/in/api_contract.md, t...
Key outputs: in/api_contract.md, out/api_inventory.json, out/data_quality_report.md, out/retry_ledger.csv.
Evaluate the agent run:
- tool_use_appropriate: uses tools that match the task modality and files/APIs.
- consistency: follows the prompt requirements and writes the expected artifacts.
- robustness: handles missing directories, malformed inputs, retries, or local-server issues when relevant.
Return ONLY JSON with scores, security_gate, and notes.
"""def _looks_like_task_url(value: Any) -> bool:
if not isinstance(value, str):
return False
return value.startswith(("http://127.0.0.1:", "http://localhost:", "https://")) and not value.rstrip("/").endswith("example.com")
def _add(checks: list[dict[str, Any]], cid: str, label: str, ok: bool, weight: float, detail: str | None = None) -> None:
checks.append({"id": cid, "label": label, "pass": ok, "weight": weight, "detail": None if ok else detail})
def _artifact_count_ok(value: Any, truth: dict[str, Any]) -> bool:
# The prompt example says artifact_count is 5 after deduplicating IDs, while
# the original ground truth used 4 by excluding the orphan from the global
# total. Accept either interpretation; per-dataset counts still enforce that
# orphan artifacts are not assigned to a dataset.
return value in {truth["artifact_count"], truth["artifact_count"] + 1}
def score_workspace(workspace: Path) -> dict[str, Any]:
truth = json.loads((Path(__file__).resolve().parent / "ground_truth.json").read_text(encoding="utf-8"))
checks: list[dict[str, Any]] = []
try:
inv = json.loads((workspace / "out" / "api_inventory.json").read_text(encoding="utf-8"))
_add(checks, "json_parse", "api_inventory.json parses", isinstance(inv, dict), 1.0)
except Exception as exc:
inv = {}
_add(checks, "json_parse", "api_inventory.json parses", False, 1.0, str(exc))
_add(checks, "source_url", "source_base_url records the provided mock API", _looks_like_task_url(inv.get("source_base_url")), 1.0, repr(inv.get("source_base_url")))
_add(checks, "dataset_count", "dataset_count exact", inv.get("dataset_count") == truth["dataset_count"], 2.0, repr(inv.get("dataset_count")))
_add(checks, "artifact_count", "artifact_count follows a documented global-count interpretation", _artifact_count_ok(inv.get("artifact_count"), truth), 2.0, repr(inv.get("artifact_count")))
_add(checks, "jobs_by_status", "jobs_by_status exact", inv.get("jobs_by_status") == truth["jobs_by_status"], 2.0, repr(inv.get("jobs_by_status")))
_add(checks, "datasets", "datasets exact", inv.get("datasets") == truth["datasets"], 3.0, repr(inv.get("datasets")))
try:
with (workspace / "out" / "retry_ledger.csv").open("r", encoding="utf-8", newline="") as f:
reader = csv.DictReader(f)
rows = list(reader)
fields = reader.fieldnames
_add(checks, "ledger_read", "retry_ledger.csv parses", True, 1.0)
except Exception as exc:
rows, fields = [], None
_add(checks, "ledger_read", "retry_ledger.csv parses", False, 1.0, str(exc))
_add(checks, "ledger_header", "retry ledger header exact", fields == ["endpoint", "cursor_or_page", "status_code", "action", "attempts"], 1.0, repr(fields))
codes = {r.get("status_code", "") for r in rows}
actions = " ".join(r.get("action", "").lower() for r in rows)
cursor_recovery_logged = "410" in codes or "checkpoint" in actions or "cursor" in actions
_add(checks, "ledger_events", "ledger records 429, 503, and cursor recovery", {"429", "503"} <= codes and cursor_recovery_logged, 3.0, repr(rows))
seen = (workspace / "out" / "api_access.log").read_text(encoding="utf-8", errors="replace").splitlines() if (workspace / "out" / "api_access.log").is_file() else []
for endpoint in ("/datasets", "/jobs", "/artifacts", "/checkpoint"):
weight = 3.0 if endpoint == "/checkpoint" else 1.0
_add(checks, "visited_" + endpoint.strip("/"), f"server log includes {endpoint}", any(s.startswith(endpoint + "?") for s in seen), weight, repr(seen))
_add(checks, "retry_observed", "server log shows retries", sum(1 for s in seen if s.startswith("/datasets?cursor=START")) >= 2 and sum(1 for s in seen if s.startswith("/jobs?cursor=2")) >= 2, 2.0, repr(seen))
report = (workspace / "out" / "data_quality_report.md").read_text(encoding="utf-8", errors="replace").lower() if (workspace / "out" / "data_quality_report.md").is_file() else ""
_add(checks, "quality_terms", "quality report covers duplicate, orphan, cursor, 429, and 503", all(t in report for t in truth["quality_terms"]), 3.0)
score = sum(c["weight"] for c in checks if c["pass"]) / sum(c["weight"] for c in checks)
return {"task": "078-local-api-cursor-retry-ledger", "workspace": str(workspace), "checks": checks, "outcome_score": score}