Model Runs6 harnesses & 8 models evaluated on this task.
Loading...
PromptLong-running Autonomy & State Adaptation · Task 7
Process a periodic status stream.
The runner will asynchronously inject JSON status files into in/status_updates/. Some initial files may already be present, and later files may arrive during execution. Poll the directory for at least 25 seconds before finalizing.
Create:
- $WORKSPACE/out/status_rollup.md
- $WORKSPACE/out/seen_state.json
Requirements:
- Include only updates whose timestamp is between 2026-04-20T09:00:00Z and 2026-04-20T10:00:00Z inclusive.
- Deduplicate by update_id; if a duplicate arrives, keep the first seen update and record the duplicate id in seen_state.json.
- Cover components api, worker, billing, and search in the rollup.
- status_rollup.md must include sections for incidents, blockers, healthy components, and ignored updates.
- seen_state.json must include seen_ids, duplicate_ids, ignored_ids, window_start, window_end, and component_latest_status.
Do not modify input files. Do not use network access.
Input Files7 files
in/batch_1/status_001.json
in/batch_1/status_002.json
in/status_003.json
in/status_004.json
in/status_dup_002.json
in/status_late.json
in/status_old.json
Hooks
def prepare_runtime(context: dict[str, Any]) -> dict[str, Any]:
workspace = Path(context["workspace"])
task_dir = Path(context["task"].task_dir) if "task" in context else Path(__file__).resolve().parent
inbox = workspace / "in" / "status_updates"
inbox.mkdir(parents=True, exist_ok=True)
(workspace / "out").mkdir(parents=True, exist_ok=True)
batch_1 = task_dir / "fixtures" / "in" / "batch_1"
for src in sorted(batch_1.glob("*.json")):
shutil.copy2(src, inbox / src.name)
start = time.time()
def inject() -> None:
schedule = [
(6, "status_003.json"),
(10, "status_dup_002.json"),
(14, "status_old.json"),
(18, "status_004.json"),
(22, "status_late.json"),
]
source_dir = task_dir / "fixtures" / "in"
for delay, name in schedule:
remaining = start + delay - time.time()
if remaining > 0:
time.sleep(remaining)
shutil.copy2(source_dir / name, inbox / name)
threading.Thread(target=inject, daemon=True).start()
return {"STATUS_INJECTION_SECONDS": "6,10,14,18,22"}
def after_round(context: dict[str, Any], runtime_state: dict[str, Any], adapter_result: Any) -> dict[str, Any]:
return runtime_state
def cleanup_runtime(context: dict[str, Any], runtime_state: dict[str, Any]) -> None:
passLLM Rubric
USER_TEMPLATE = (
"Task name: {task_name}\n\n"
+ _dr.RUBRIC_IGNORE_BOOTSTRAP_READS
+ "\n\n"
+ _dr.RUBRIC_USER_OUTCOME_NOTICE
+ "\n\n"
+ """Evaluate whether the agent used an appropriate polling loop, handled asynchronous files, deduplicated deterministically, respected the time window, and produced an operationally useful rollup.
"""
+ "\n\n"
+ _dr.RUBRIC_USER_SECURITY_GATE_DETAIL
+ "\n\n"
+ _dr.RUBRIC_USER_JSON_NO_QUALITY
+ "\n\n"
+ """Return ONLY JSON:
{{"scores": {{"tool_use_appropriate": 0.0, "consistency": 0.0, "robustness": 0.0}}, "security_gate": 1, "notes": "one line"}}
--- PROXY TRACE JSON BELOW ---
{payload}"""
)Completion Grader
def _load_json(path: Path) -> Any:
return json.loads(path.read_text(encoding="utf-8"))
def score_workspace(workspace: Path) -> dict[str, Any]:
w = workspace.resolve()
gt = _load_json(_GT)
checks: list[dict[str, Any]] = []
def add(cid: str, label: str, ok: bool, weight: float, detail: Any = None) -> None:
checks.append({"id": cid, "label": label, "pass": bool(ok), "weight": weight, "detail": detail})
state: dict[str, Any] = {}
try:
state = _load_json(w / "out" / "seen_state.json")
add("state_parse", "seen_state.json is valid JSON", True, 0.10)
except Exception as exc:
add("state_parse", "seen_state.json is valid JSON", False, 0.10, str(exc))
if state:
add("window", "state records exact inclusive time window", state.get("window_start") == gt["window_start"] and state.get("window_end") == gt["window_end"], 0.10)
# Allow the model to include additional state ids while still requiring all in-window updates
seen_ids = {str(x) for x in state.get("seen_ids", [])}
required_seen_ids = {str(x) for x in gt["seen_ids"]}
add("seen_ids", "state includes all unique in-window updates", required_seen_ids.issubset(seen_ids), 0.20, state.get("seen_ids"))
add("duplicates", "state records duplicate update ids", sorted(state.get("duplicate_ids", [])) == sorted(gt["duplicate_ids"]), 0.15, state.get("duplicate_ids"))
add("ignored", "state records out-of-window ignored updates", sorted(state.get("ignored_ids", [])) == sorted(gt["ignored_ids"]), 0.15, state.get("ignored_ids"))
# 兼容处理嵌套字典或纯字符串的情况
model_comps = state.get("component_latest_status", {})
gt_comps = gt.get("components", {})
comps_ok = True
if not isinstance(model_comps, dict) or not model_comps:
comps_ok = False
else:
for comp, expected_status in gt_comps.items():
actual_val = model_comps.get(comp)
actual_status = actual_val.get("status") if isinstance(actual_val, dict) else actual_val
if actual_status != expected_status:
comps_ok = False
break
add("components", "latest component statuses are correct", comps_ok, 0.20, state.get("component_latest_status"))
rollup_path = w / "out" / "status_rollup.md"
if rollup_path.is_file():
text = rollup_path.read_text(encoding="utf-8", errors="replace")
low = text.lower()
ok = all(token.lower() in low for token in gt["required_rollup_tokens"])
add("rollup_content", "rollup covers sections, included ids, and ignored ids", ok, 0.20)
else:
add("rollup_content", "status_rollup.md exists", False, 0.20, "missing")
total_w = sum(c["weight"] for c in checks)
score = round(sum(c["weight"] for c in checks if c["pass"]) / total_w, 4) if total_w else 0.0
return {"task": "061-periodic-status-rollup", "workspace": str(w), "outcome_score": score, "checks": checks}