Alert deduplication and noise reduction from topology-aware alerts

You are reducing an offline alert storm into actionable incidents.

SRE, DevOps & Release OpsTask 3Oracle + LLM scoring
Model Runs6 harnesses & 8 models evaluated on this task.
Loading...
PromptSRE, DevOps & Release Ops ยท Task 3

You are reducing an offline alert storm into actionable incidents.

Inputs:

  • $WORKSPACE/in/alerts.jsonl
  • $WORKSPACE/in/topology.json

Create exactly these output files:

  • $WORKSPACE/out/deduped_incidents.json
  • $WORKSPACE/out/noise_report.md

deduped_incidents.json requirements:

  • Top-level object with key incidents.
  • incidents must be a list of objects.
  • Each incident must include: incident_id, root_alert_id, root_service, cluster_alert_ids, impact_services, summary, evidence.
  • cluster_alert_ids must include the root alert and downstream symptom alerts.
  • impact_services must name customer-facing or dependent services affected by the same root problem.

noise_report.md requirements:

  • List alerts filtered as noise and explain why.
  • Distinguish duplicates/downstream symptoms from true noise.
  • Mention maintenance windows, synthetic heartbeat flaps, and staging-only alerts when applicable.

Do not modify input files. Do not use external services. Base the result only on the supplied alerts and topology.

Input Files2 files
in/alerts.jsonl
in/topology.json
LLM Rubric
USER_TEMPLATE = (
    "Task name: {task_name}\n\n"
    + _dr.RUBRIC_IGNORE_BOOTSTRAP_READS
    + "\n\nScore only process quality: topology-aware reasoning, clear noise explanations, and avoidance of fabricated production actions. "
    "Deterministic checks own the incident IDs and exact grouping.\n\n"
    + "\n\n"
    + _dr.RUBRIC_USER_SECURITY_GATE_DETAIL
    + "\n\n"
    + _dr.RUBRIC_USER_JSON_NO_QUALITY
    + "\n\n"
    + """Return ONLY JSON:
{{"scores": {{"tool_use_appropriate": 0.0, "consistency": 0.0, "robustness": 0.0}}, "security_gate": 1, "notes": "one line"}}

--- PROXY TRACE JSON BELOW ---
{payload}"""
)
Completion Grader
def _load_json(path: Path) -> Any:
    return json.loads(path.read_text(encoding="utf-8"))


def _norm_set(values: Any) -> set[str]:
    if not isinstance(values, list):
        return set()
    return {str(v).strip().lower() for v in values}


def _source_unchanged(workspace: Path) -> bool:
    root = _TASK_DIR / "fixtures" / "in"
    src = workspace.resolve()
    if not src.is_dir():
        return True
    for original in root.rglob("*"):
        if not original.is_file():
            continue
        rel = original.relative_to(root)
        candidate = src / "in" / rel
        if candidate.is_file() and candidate.read_bytes() != original.read_bytes():
            return False
    return True


def score_workspace(workspace: Path) -> dict[str, Any]:
    w = workspace.resolve()
    gt = _load_json(_GT)
    weights = gt["scoring"]["weights"]
    checks: list[dict[str, Any]] = []

    def add(cid: str, label: str, ok: bool, weight: float, detail: Any = None) -> None:
        checks.append({"id": cid, "label": label, "pass": bool(ok), "weight": weight, "detail": detail})

    incident_score = 0.0
    path = w / "out" / "deduped_incidents.json"
    if path.is_file():
        try:
            data = _load_json(path)
            incidents = data.get("incidents", []) if isinstance(data, dict) else []
            by_root = {str(i.get("root_alert_id", "")).strip().lower(): i for i in incidents if isinstance(i, dict)}
            per = 1.0 / len(gt["expected_incidents"])
            for exp in gt["expected_incidents"]:
                inc = by_root.get(exp["root_alert_id"].lower(), {})
                cluster = _norm_set(inc.get("cluster_alert_ids"))
                impact = _norm_set(inc.get("impact_services"))
                evidence_text = json.dumps(inc.get("evidence", ""), ensure_ascii=False).lower() + str(inc.get("summary", "")).lower()
                incident_score += per * (
                    0.20 * (str(inc.get("root_service", "")).lower() == exp["root_service"])
                    + 0.35 * (len(cluster & {x.lower() for x in exp["cluster_alert_ids"]}) / len(exp["cluster_alert_ids"]))
                    + 0.25 * (len(impact & {x.lower() for x in exp["impact_services"]}) / len(exp["impact_services"]))
                    + 0.20 * bool(re.search(r"topology|depends|downstream|fingerprint|timeout|eviction|connection", evidence_text))
                )
            add("deduped_incidents", "deduped_incidents.json groups alerts by root cause and topology", incident_score >= 0.70, weights["incidents"], {"score": round(incident_score, 4), "incidents": len(incidents)})
        except Exception as exc:
            add("deduped_incidents_parse", "deduped_incidents.json parseable", False, weights["incidents"], str(exc))
    else:
        add("deduped_incidents_missing", "deduped_incidents.json exists", False, weights["incidents"], "missing")

    noise_score = 0.0
    report = w / "out" / "noise_report.md"
    if report.is_file():
        text = report.read_text(encoding="utf-8", errors="replace")
        low = text.lower()
        id_hits = sum(1 for aid in gt["noise_alert_ids"] if aid.lower() in low)
        keyword_hits = sum(1 for kw in gt["noise_keywords"] if kw.lower() in low)
        symptom_ok = "duplicate" in low or "downstream" in low or "symptom" in low
        noise_score = 0.45 * (id_hits / len(gt["noise_alert_ids"])) + 0.40 * (keyword_hits / len(gt["noise_keywords"])) + 0.15 * symptom_ok
        add("noise_report", "noise_report.md explains filtered noise and duplicate symptoms", noise_score >= 0.70, weights["noise_report"], {"score": round(noise_score, 4), "id_hits": id_hits})
    else:
        add("noise_report_missing", "noise_report.md exists", False, weights["noise_report"], "missing")

    unchanged = _source_unchanged(w)
    add("source_unchanged", "fixtures are not modified", unchanged, weights["source_unchanged"])
    total = incident_score * weights["incidents"] + noise_score * weights["noise_report"] + float(unchanged) * weights["source_unchanged"]
    thresholds = gt["scoring"]["thresholds"]
    level = "excellent" if total >= thresholds["excellent"] else "good" if total >= thresholds["good"] else "pass" if total >= thresholds["pass"] else "fail"
    return {"task": "063-alert-dedup-noise", "workspace": str(w), "outcome_score": round(total, 4), "level": level, "checks": checks}