Autonomous Task Decomposition And Execution

Task: autonomous decomposition and execution

Long-running Autonomy & State AdaptationTask 2Oracle + LLM scoring
Model Runs6 harnesses & 8 models evaluated on this task.
Loading...
PromptLong-running Autonomy & State Adaptation · Task 2

Task: autonomous decomposition and execution

Goal: plan and run a virtual product launch for the ClawMind AI assistant.

Do the following; keep all artifacts under $WORKSPACE/:

  1. Decomposition
  • Create a subtasks/ directory.
  • Break the goal into at least 3 subtasks (e.g. market research, event planning, promotional copy, budget, timeline).
  • One .md file per subtask (e.g. subtask_01_research.md), each containing:
  • objective
  • concrete steps or deliverables
  • status line: ## STATUS: pending initially, then ## STATUS: done when finished
  1. Execution and tracking
  • Execute subtasks in order; you may update statuses as you go.
  • Create progress.md with start/end times and a short note per subtask.
  • If you replan, explain why in progress.md.
  1. Final rollup
  • Write final_report.md including:
  • overall launch plan (time, venue/format, audience)
  • at least one full promotional piece (copy)
  • budget estimate with major line items
  • success metrics (how you measure impact)

You may use any tools available (browser, editing, scripts). Scoring emphasizes sensible decomposition, completed subtasks, report quality, and accurate progress tracking.

Input FilesNo static fixture files

No static fixture files.

Hooks
def prepare_runtime(context: dict[str, Any]) -> dict[str, Any]:
    workspace = Path(context["workspace"])
    # 创建必要的目录
    (workspace / "subtasks").mkdir(parents=True, exist_ok=True)
    (workspace / "out").mkdir(parents=True, exist_ok=True)
    return {
        "PRODUCT_NAME": "ClawMind",
        "GOAL": "Plan and execute a virtual product launch"
    }


def after_round(context: dict[str, Any], runtime_state: dict[str, Any], adapter_result: Any) -> dict[str, Any]:
    return runtime_state


def cleanup_runtime(context: dict[str, Any], runtime_state: dict[str, Any]) -> None:
    pass
LLM Rubric
USER_TEMPLATE = (
    "Task name: {task_name}\n\n"
    + _dr.RUBRIC_IGNORE_BOOTSTRAP_READS
    + "\n\n"
    + _dr.RUBRIC_USER_OUTCOME_NOTICE
    + "\n\n"
    "Task type: decompose and execute a complex virtual-event style goal; outputs include subtasks/, progress.md, final_report.md.\n"
    "Oracle checks files/format; this rubric scores process from the proxy trace.\n\n"
    "Score these three dimensions (each 0.0–1.0); keys must match exactly:\n\n"
    "- **tool_use_appropriate**: Tools for creating/editing subtasks, progress, and reports match decompose→execute→summarize.\n\n"
    "- **consistency**: Decompose before executing and summarizing; ordering respects dependencies (research→plan→execute→review).\n\n"
    "- **robustness**: High score if no failures; on failure, replan, retry, or explain blockers.\n\n"
    + "\n\n"
    + _dr.RUBRIC_USER_SECURITY_GATE_DETAIL
    + "\n\n"
    + _dr.RUBRIC_USER_JSON_NO_QUALITY
    + "\n\n"
    + "Return ONLY JSON (no markdown fences):\n"
    '{{"scores": {{"tool_use_appropriate": 0.0, "consistency": 0.0, "robustness": 0.0}}, '
    '"security_gate": 1, "notes": "one line"}}\n'
    "Optional **total**: Harness may recompute the mean.\n\n"
    "--- PROXY TRACE JSON BELOW ---\n"
    "{payload}"
)
Completion Grader
def _to_native(obj: Any) -> Any:
    if hasattr(obj, "item"):
        return obj.item()
    if isinstance(obj, dict):
        return {k: _to_native(v) for k, v in obj.items()}
    if isinstance(obj, list):
        return [_to_native(v) for v in obj]
    return obj


def score_workspace(workspace: Path) -> dict[str, Any]:
    w = workspace.resolve()
    task_dir = w.parent.parent
    gt_path = task_dir / "ground_truth.json"
    gt = json.loads(gt_path.read_text(encoding="utf-8")) if gt_path.exists() else {}

    subtasks_dir = w / "subtasks"
    progress_file = w / "progress.md"
    report_file = w / "final_report.md"

    checks = []

    # ---------- 1. 分解合理性 (25%) ----------
    decomposition_score = 0.0
    if subtasks_dir.exists() and subtasks_dir.is_dir():
        subtask_files = list(subtasks_dir.glob("*.md"))
        num_subtasks = len(subtask_files)
        min_subtasks = gt.get("min_subtasks", 3)
        if num_subtasks >= min_subtasks:
            decomposition_score = min(1.0, num_subtasks / (min_subtasks + 2))
        else:
            decomposition_score = num_subtasks / min_subtasks

        # 检查是否涵盖预期主题
        expected_topics = gt.get("expected_subtask_topics", [])
        all_content = " ".join([f.read_text(encoding="utf-8") for f in subtask_files if f.exists()])
        found_topics = sum(1 for topic in expected_topics if topic.lower() in all_content.lower())
        if expected_topics:
            decomposition_score = (decomposition_score + found_topics / len(expected_topics)) / 2

        checks.append({
            "id": "decomposition",
            "label": f"子任务数量: {num_subtasks}, 主题覆盖: {found_topics}/{len(expected_topics)}",
            "pass": bool(decomposition_score >= 0.7),
            "weight": 0.25,
            "detail": {"num": num_subtasks, "topics_covered": found_topics, "total_topics": len(expected_topics)}
        })
    else:
        checks.append({"id": "decomposition_missing", "label": "subtasks/ directory missing", "pass": False, "weight": 0.25, "detail": None})
        decomposition_score = 0.0

    # ---------- 2. 执行完整性 (40%) ----------
    execution_score = 0.0
    if subtasks_dir.exists():
        done_count = 0
        total_count = 0
        for f in subtasks_dir.glob("*.md"):
            total_count += 1
            content = f.read_text(encoding="utf-8")
            if re.search(r"STATUS:\s*done", content, re.IGNORECASE):
                done_count += 1
        if total_count > 0:
            execution_score = done_count / total_count
        checks.append({
            "id": "execution",
            "label": f"子任务完成: {done_count}/{total_count}",
            "pass": bool(execution_score >= 0.8),
            "weight": 0.40,
            "detail": {"done": done_count, "total": total_count}
        })
    else:
        execution_score = 0.0

    # ---------- 3. 最终报告质量 (25%) ----------
    report_score = 0.0
    if report_file.exists():
        content = report_file.read_text(encoding="utf-8")
        # 长度分(至少 500 字)
        length_score = min(1.0, len(content) / 800)
        # 关键词检查
        cl = content.lower()
        has_budget = "budget" in cl
        has_timeline = "timeline" in cl
        has_copy = "copy" in cl
        has_metrics = "metric" in cl or "kpi" in cl
        element_score = (has_budget + has_timeline + has_copy + has_metrics) / 4.0
        report_score = (length_score + element_score) / 2.0
        checks.append({
            "id": "report_quality",
            "label": f"report length {len(content)} chars, budget:{has_budget}, timeline:{has_timeline}, copy:{has_copy}, metrics:{has_metrics}",
            "pass": bool(report_score >= 0.7),
            "weight": 0.25,
            "detail": {"length": len(content), "has_budget": has_budget, "has_timeline": has_timeline, "has_copy": has_copy, "has_metrics": has_metrics}
        })
    else:
        checks.append({"id": "report_missing", "label": "final_report.md missing", "pass": False, "weight": 0.25, "detail": None})

    # ---------- 4. 过程监控 (10%) ----------
    progress_score = 0.0
    if progress_file.exists():
        content = progress_file.read_text(encoding="utf-8")
        # 检查是否包含状态变化标记
        has_pending = "pending" in content.lower()
        has_done = "done" in content.lower()
        has_start = "start" in content.lower()
        has_end = "end" in content.lower()
        if has_pending and has_done and has_start and has_end:
            progress_score = 1.0
        elif has_pending and has_done:
            progress_score = 0.7
        elif has_pending or has_done:
            progress_score = 0.4
        else:
            progress_score = 0.1
        checks.append({
            "id": "progress_tracking",
            "label": "progress.md 包含状态标记",
            "pass": bool(progress_score >= 0.5),
            "weight": 0.10,
            "detail": {"has_pending": has_pending, "has_done": has_done, "has_start": has_start, "has_end": has_end}
        })
    else:
        checks.append({"id": "progress_missing", "label": "progress.md missing", "pass": False, "weight": 0.10, "detail": None})

    total_score = (decomposition_score * 0.25 +
                   execution_score * 0.40 +
                   report_score * 0.25 +
                   progress_score * 0.10)

    thresholds = gt.get("scoring", {}).get("thresholds", {"excellent": 0.90, "good": 0.75, "pass": 0.60})
    if total_score >= thresholds["excellent"]:
        level = "excellent"
    elif total_score >= thresholds["good"]:
        level = "good"
    elif total_score >= thresholds["pass"]:
        level = "pass"
    else:
        level = "fail"

    result = {
        "task": "014-task-decomposition",
        "workspace": str(w),
        "outcome_score": round(float(total_score), 4),
        "level": level,
        "checks": _to_native(checks),
        "summary": {
            "decomposition": round(float(decomposition_score), 4),
            "execution": round(float(execution_score), 4),
            "report_quality": round(float(report_score), 4),
            "progress_tracking": round(float(progress_score), 4)
        }
    }
    return json.loads(json.dumps(result, default=str))


def score_workspace_safe(workspace: Path) -> dict[str, Any]:
    try:
        return score_workspace(workspace)
    except Exception as e:
        return {
            "task": "014-task-decomposition",
            "workspace": str(workspace),
            "outcome_score": 0.0,
            "level": "error",
            "error": str(e),
            "checks": [],
            "summary": {}
        }