Schema drift audit across daily data extracts

Audit daily extracts for schema drift and row-level data-quality rejects using only local files.

Data, BI & Finance AnalyticsTask 12Oracle + LLM scoring
Model Runs6 harnesses & 8 models evaluated on this task.
Loading...
PromptData, BI & Finance Analytics ยท Task 12

Audit daily extracts for schema drift and row-level data-quality rejects using only local files.

Inputs:

  • $WORKSPACE/in/contracts/customer_events_schema.json
  • $WORKSPACE/in/extracts/*/events.csv
  • $WORKSPACE/in/source_change_log.md

Create:

  • $WORKSPACE/out/schema_drift_report.csv
  • $WORKSPACE/out/rejected_rows.csv
  • $WORKSPACE/out/drift_summary.json
  • $WORKSPACE/out/audit_notes.md

Rules:

  1. Do not modify fixtures.
  2. Compare each extract header and values against the approved JSON schema.
  3. schema_drift_report.csv exact header:

extract_date,field_name,drift_type,expected,observed,severity

  1. rejected_rows.csv exact header:

extract_date,source_row,record_id,reason,notes source_row is 1-based data row number excluding the header.

  1. Drift types must be one of:

missing_required_field,new_unapproved_field,type_change,enum_expansion,nullability_violation,date_format_drift

  1. Row rejects must use one of:

missing_required,invalid_enum,invalid_type,invalid_timestamp

  1. drift_summary.json must contain: extract_dates, drift_count_by_date, high_severity_count, rejected_row_count, changelog_mismatches.
  2. audit_notes.md must distinguish schema drift from row-level bad data and mention any source change log mismatch.

Do not use network access or external APIs.

Input Files6 files
in/contracts/customer_events_schema.json
in/extracts/2026-04-27/events.csv
in/extracts/2026-04-28/events.csv
in/extracts/2026-04-29/events.csv
in/extracts/2026-04-30/events.csv
in/source_change_log.md
LLM Rubric
USER_TEMPLATE = (
    "Task name: {task_name}\n\n"
    + _dr.RUBRIC_IGNORE_BOOTSTRAP_READS
    + "\n\n"
    + _TASK_REFERENCE
    + "\n\n"
    + _dr.RUBRIC_USER_SECURITY_GATE_DETAIL
    + "\n\n"
    + _dr.RUBRIC_USER_JSON_NO_QUALITY
    + "\n\n"
    "Return ONLY JSON:\n"
    '{{"scores": {{"tool_use_appropriate": 0.0, "consistency": 0.0, "robustness": 0.0}}, '
    '"security_gate": 1, "notes": "one line"}}\n\n'
    "--- PROXY TRACE JSON BELOW ---\n"
    "{payload}"
)
Completion Grader
def _add(checks: list[dict[str, Any]], cid: str, ok: bool, weight: float, detail: str | None = None) -> None:
    checks.append({"id": cid, "pass": bool(ok), "weight": weight, "detail": None if ok else detail})


def _rows(path: Path) -> tuple[list[str], list[dict[str, str]]]:
    with path.open(newline="", encoding="utf-8") as f:
        r = csv.DictReader(f)
        return list(r.fieldnames or []), [{k: (v or "").strip() for k, v in row.items()} for row in r]


def _sorted_rows(rows: list[dict[str, str]], keys: list[str]) -> list[dict[str, str]]:
    return sorted(rows, key=lambda row: tuple(row.get(key, "") for key in keys))


def _norm(value: Any) -> str:
    return str(value or "").strip().lower().replace("_", " ").replace("-", " ")


def _value_matches(actual: str, expected: str) -> bool:
    actual_n = _norm(actual)
    expected_n = _norm(expected)
    aliases = {
        "absent": ["absent", "not present", "notpresent", "not_present", "missing"],
        "blank": ["blank", "empty", "null", "null empty"],
        "present": ["present", "exists", "required"],
        "timestamp iso": ["timestamp iso", "iso", "iso8601", "yyyy mm dd", "yyyy mm ddthh:mm:ssz"],
        "decimal": ["decimal", "numeric", "number"],
        "zero": ["zero", "0"],
    }
    if expected_n == "signup|purchase|cancel":
        return all(token in actual_n for token in ["signup", "purchase", "cancel"])
    if expected_n in aliases:
        return any(alias in actual_n for alias in aliases[expected_n])
    return expected_n in actual_n or actual_n in expected_n


def _note_matches(actual: str, expected: str) -> bool:
    actual_n = _norm(actual)
    expected_n = _norm(expected)
    token_aliases = {
        "blank": ["blank", "empty", "null"],
        "iso": ["iso", "iso8601", "timestamp"],
        "zero": ["zero", "0"],
        "trial": ["trial"],
        "pause": ["pause"],
        "column": ["column", "header"],
        "missing": ["missing", "absent"],
    }
    tokens = [token for token in expected_n.split() if len(token) > 2 and token != "not"]
    if not tokens:
        return bool(actual_n)
    hits = 0
    for token in tokens:
        aliases = token_aliases.get(token, [token])
        if any(alias in actual_n for alias in aliases):
            hits += 1
    return hits >= max(1, len(tokens) - 1)


def score_workspace(workspace: str | Path) -> dict[str, Any]:
    w = Path(workspace).resolve()
    checks: list[dict[str, Any]] = []
    _add(checks, "schema_present", (w / "in" / "contracts" / "customer_events_schema.json").is_file(), 0.04, "missing schema")
    out = w / "out" / "schema_drift_report.csv"
    rejects = w / "out" / "rejected_rows.csv"
    summary = w / "out" / "drift_summary.json"
    notes = w / "out" / "audit_notes.md"

    _add(checks, "drift_exists", out.is_file(), 0.06, "missing drift report")
    if out.is_file():
        try:
            header, rows = _rows(out)
            _add(checks, "drift_header", header == ["extract_date", "field_name", "drift_type", "expected", "observed", "severity"], 0.06, f"got {header}")
            by_key = {
                (row.get("extract_date"), row.get("field_name"), row.get("drift_type")): row
                for row in rows
            }
            identity_hits = 0
            detail_hits = 0
            severity_hits = 0
            for exp in EXPECTED_DRIFT:
                key = (exp["extract_date"], exp["field_name"], exp["drift_type"])
                row = by_key.get(key)
                if not row:
                    continue
                identity_hits += 1
                if _value_matches(row.get("expected", ""), exp["expected"]) and _value_matches(row.get("observed", ""), exp["observed"]):
                    detail_hits += 1
                if row.get("severity", "").strip().lower() == exp["severity"]:
                    severity_hits += 1
            drift_identity_score = identity_hits / len(EXPECTED_DRIFT)
            drift_detail_score = detail_hits / len(EXPECTED_DRIFT)
            drift_severity_score = severity_hits / len(EXPECTED_DRIFT)
            _add(checks, "drift_identity", drift_identity_score == 1.0 and len(rows) == len(EXPECTED_DRIFT), 0.18, f"hits {identity_hits}/{len(EXPECTED_DRIFT)} got {rows}")
            _add(checks, "drift_details", drift_detail_score >= 0.85, 0.08, f"hits {detail_hits}/{len(EXPECTED_DRIFT)}")
            _add(checks, "drift_severity", drift_severity_score >= 0.85, 0.08, f"hits {severity_hits}/{len(EXPECTED_DRIFT)}")
            _add(checks, "schema_vs_data_drift_covered", {r["drift_type"] for r in rows} >= {"new_unapproved_field", "missing_required_field", "type_change", "enum_expansion", "nullability_violation", "date_format_drift"}, 0.08, "missing drift type")
        except Exception as exc:
            _add(checks, "drift_parseable", False, 0.30, str(exc))
    else:
        for cid, weight in [("drift_header", 0.06), ("drift_identity", 0.18), ("drift_details", 0.08), ("drift_severity", 0.08), ("schema_vs_data_drift_covered", 0.08)]:
            _add(checks, cid, False, weight, "missing")

    _add(checks, "rejects_exists", rejects.is_file(), 0.05, "missing rejected_rows.csv")
    if rejects.is_file():
        try:
            header, rows = _rows(rejects)
            _add(checks, "rejects_header", header == ["extract_date", "source_row", "record_id", "reason", "notes"], 0.05, f"got {header}")
            by_key = {
                (row.get("extract_date"), row.get("source_row"), row.get("record_id"), row.get("reason")): row
                for row in rows
            }
            identity_hits = 0
            note_hits = 0
            for exp in EXPECTED_REJECTS:
                key = (exp["extract_date"], exp["source_row"], exp["record_id"], exp["reason"])
                row = by_key.get(key)
                if not row:
                    continue
                identity_hits += 1
                if _note_matches(row.get("notes", ""), exp["notes"]):
                    note_hits += 1
            _add(checks, "rejects_identity", identity_hits == len(EXPECTED_REJECTS) and len(rows) == len(EXPECTED_REJECTS), 0.13, f"hits {identity_hits}/{len(EXPECTED_REJECTS)} got {rows}")
            _add(checks, "rejects_notes", note_hits >= len(EXPECTED_REJECTS) - 1, 0.05, f"hits {note_hits}/{len(EXPECTED_REJECTS)}")
        except Exception as exc:
            _add(checks, "rejects_parseable", False, 0.12, str(exc))
    else:
        _add(checks, "rejects_header", False, 0.05, "missing")
        _add(checks, "rejects_identity", False, 0.13, "missing")
        _add(checks, "rejects_notes", False, 0.05, "missing")

    _add(checks, "summary_exists", summary.is_file(), 0.05, "missing summary")
    if summary.is_file():
        try:
            data = json.loads(summary.read_text(encoding="utf-8"))
            _add(checks, "summary_dates_counts", data.get("extract_dates") == ["2026-04-27", "2026-04-28", "2026-04-29", "2026-04-30"] and data.get("drift_count_by_date") == {"2026-04-27": 0, "2026-04-28": 2, "2026-04-29": 2, "2026-04-30": 2}, 0.08, f"got {data}")
            _add(checks, "summary_rejects_mismatches", data.get("high_severity_count") == 4 and data.get("rejected_row_count") == 5 and data.get("changelog_mismatches") == ["2026-04-30 customer_id removed despite changelog saying no required fields removed"], 0.08, f"got {data}")
        except Exception as exc:
            _add(checks, "summary_parseable", False, 0.12, str(exc))
    else:
        _add(checks, "summary_dates_counts", False, 0.08, "missing")
        _add(checks, "summary_rejects_mismatches", False, 0.08, "missing")

    _add(checks, "notes_exists", notes.is_file(), 0.03, "missing notes")
    if notes.is_file():
        text = notes.read_text(encoding="utf-8", errors="replace").lower()
        _add(checks, "notes_distinguish", all(term in text for term in ["schema drift", "row-level", "change log"]), 0.05, "notes must distinguish schema drift, row-level bad data, and change log mismatch")
    else:
        _add(checks, "notes_distinguish", False, 0.05, "missing")

    total = sum(c["weight"] for c in checks)
    score = round(sum(c["weight"] for c in checks if c["pass"]) / total, 4)
    if any(c["id"] == "drift_identity" and not c["pass"] for c in checks):
        score = min(score, 0.69)
    if any(c["id"] == "drift_severity" and not c["pass"] for c in checks):
        score = min(score, 0.74)
    return {"task": "092-schema-drift-audit", "workspace": str(w), "checks": checks, "outcome_score": score, "score": score}