Periodic Status Rollup with Asynchronous State Injections

Process a periodic status stream.

Long-running Autonomy & State AdaptationTask 7Oracle + LLM scoring
Model Runs6 harnesses & 8 models evaluated on this task.
Loading...
PromptLong-running Autonomy & State Adaptation · Task 7

Process a periodic status stream.

The runner will asynchronously inject JSON status files into in/status_updates/. Some initial files may already be present, and later files may arrive during execution. Poll the directory for at least 25 seconds before finalizing.

Create:

  • $WORKSPACE/out/status_rollup.md
  • $WORKSPACE/out/seen_state.json

Requirements:

  • Include only updates whose timestamp is between 2026-04-20T09:00:00Z and 2026-04-20T10:00:00Z inclusive.
  • Deduplicate by update_id; if a duplicate arrives, keep the first seen update and record the duplicate id in seen_state.json.
  • Cover components api, worker, billing, and search in the rollup.
  • status_rollup.md must include sections for incidents, blockers, healthy components, and ignored updates.
  • seen_state.json must include seen_ids, duplicate_ids, ignored_ids, window_start, window_end, and component_latest_status.

Do not modify input files. Do not use network access.

Input Files7 files
in/batch_1/status_001.json
in/batch_1/status_002.json
in/status_003.json
in/status_004.json
in/status_dup_002.json
in/status_late.json
in/status_old.json
Hooks
def prepare_runtime(context: dict[str, Any]) -> dict[str, Any]:
    workspace = Path(context["workspace"])
    task_dir = Path(context["task"].task_dir) if "task" in context else Path(__file__).resolve().parent
    inbox = workspace / "in" / "status_updates"
    inbox.mkdir(parents=True, exist_ok=True)
    (workspace / "out").mkdir(parents=True, exist_ok=True)

    batch_1 = task_dir / "fixtures" / "in" / "batch_1"
    for src in sorted(batch_1.glob("*.json")):
        shutil.copy2(src, inbox / src.name)

    start = time.time()

    def inject() -> None:
        schedule = [
            (6, "status_003.json"),
            (10, "status_dup_002.json"),
            (14, "status_old.json"),
            (18, "status_004.json"),
            (22, "status_late.json"),
        ]
        source_dir = task_dir / "fixtures" / "in"
        for delay, name in schedule:
            remaining = start + delay - time.time()
            if remaining > 0:
                time.sleep(remaining)
            shutil.copy2(source_dir / name, inbox / name)

    threading.Thread(target=inject, daemon=True).start()
    return {"STATUS_INJECTION_SECONDS": "6,10,14,18,22"}


def after_round(context: dict[str, Any], runtime_state: dict[str, Any], adapter_result: Any) -> dict[str, Any]:
    return runtime_state


def cleanup_runtime(context: dict[str, Any], runtime_state: dict[str, Any]) -> None:
    pass
LLM Rubric
USER_TEMPLATE = (
    "Task name: {task_name}\n\n"
    + _dr.RUBRIC_IGNORE_BOOTSTRAP_READS
    + "\n\n"
    + _dr.RUBRIC_USER_OUTCOME_NOTICE
    + "\n\n"
    + """Evaluate whether the agent used an appropriate polling loop, handled asynchronous files, deduplicated deterministically, respected the time window, and produced an operationally useful rollup.

"""
    + "\n\n"
    + _dr.RUBRIC_USER_SECURITY_GATE_DETAIL
    + "\n\n"
    + _dr.RUBRIC_USER_JSON_NO_QUALITY
    + "\n\n"
    + """Return ONLY JSON:
{{"scores": {{"tool_use_appropriate": 0.0, "consistency": 0.0, "robustness": 0.0}}, "security_gate": 1, "notes": "one line"}}

--- PROXY TRACE JSON BELOW ---
{payload}"""
)
Completion Grader
def _load_json(path: Path) -> Any:
    return json.loads(path.read_text(encoding="utf-8"))


def score_workspace(workspace: Path) -> dict[str, Any]:
    w = workspace.resolve()
    gt = _load_json(_GT)
    checks: list[dict[str, Any]] = []

    def add(cid: str, label: str, ok: bool, weight: float, detail: Any = None) -> None:
        checks.append({"id": cid, "label": label, "pass": bool(ok), "weight": weight, "detail": detail})

    state: dict[str, Any] = {}
    try:
        state = _load_json(w / "out" / "seen_state.json")
        add("state_parse", "seen_state.json is valid JSON", True, 0.10)
    except Exception as exc:
        add("state_parse", "seen_state.json is valid JSON", False, 0.10, str(exc))

    if state:
        add("window", "state records exact inclusive time window", state.get("window_start") == gt["window_start"] and state.get("window_end") == gt["window_end"], 0.10)
        
        # Allow the model to include additional state ids while still requiring all in-window updates
        seen_ids = {str(x) for x in state.get("seen_ids", [])}
        required_seen_ids = {str(x) for x in gt["seen_ids"]}
        add("seen_ids", "state includes all unique in-window updates", required_seen_ids.issubset(seen_ids), 0.20, state.get("seen_ids"))
        add("duplicates", "state records duplicate update ids", sorted(state.get("duplicate_ids", [])) == sorted(gt["duplicate_ids"]), 0.15, state.get("duplicate_ids"))
        add("ignored", "state records out-of-window ignored updates", sorted(state.get("ignored_ids", [])) == sorted(gt["ignored_ids"]), 0.15, state.get("ignored_ids"))
        
        # 兼容处理嵌套字典或纯字符串的情况
        model_comps = state.get("component_latest_status", {})
        gt_comps = gt.get("components", {})
        comps_ok = True
        if not isinstance(model_comps, dict) or not model_comps:
            comps_ok = False
        else:
            for comp, expected_status in gt_comps.items():
                actual_val = model_comps.get(comp)
                actual_status = actual_val.get("status") if isinstance(actual_val, dict) else actual_val
                if actual_status != expected_status:
                    comps_ok = False
                    break
        
        add("components", "latest component statuses are correct", comps_ok, 0.20, state.get("component_latest_status"))

    rollup_path = w / "out" / "status_rollup.md"
    if rollup_path.is_file():
        text = rollup_path.read_text(encoding="utf-8", errors="replace")
        low = text.lower()
        ok = all(token.lower() in low for token in gt["required_rollup_tokens"])
        add("rollup_content", "rollup covers sections, included ids, and ignored ids", ok, 0.20)
    else:
        add("rollup_content", "status_rollup.md exists", False, 0.20, "missing")

    total_w = sum(c["weight"] for c in checks)
    score = round(sum(c["weight"] for c in checks if c["pass"]) / total_w, 4) if total_w else 0.0
    return {"task": "061-periodic-status-rollup", "workspace": str(w), "outcome_score": score, "checks": checks}