Model Runs6 harnesses & 8 models evaluated on this task.
Loading...
PromptData, BI & Finance Analytics ยท Task 13
Sessionize the clickstream JSONL using only local files.
Inputs:
- $WORKSPACE/in/clickstream.jsonl
- $WORKSPACE/in/session_rules.md
- $WORKSPACE/in/campaign_map.csv
Create:
- $WORKSPACE/out/sessions.csv
- $WORKSPACE/out/session_summary.json
- $WORKSPACE/out/reject_ledger.csv
- $WORKSPACE/out/sessionization_notes.md
Rules:
- Do not modify fixtures.
- Parse JSONL line by line. Record malformed lines in reject_ledger.csv.
- Sort valid, non-rejected events by timestamp before sessionizing.
- Exclude bot users when user_agent contains bot, crawler, or synthetic case-insensitively.
- Deduplicate repeated event_id values after keeping the first valid occurrence by timestamp.
- Stitch anonymous events to the logged-in user when the same anonymous_id later has a login event with user_id.
- Start a new session after more than 30 minutes of inactivity. Exactly 30 minutes remains in the same session.
- session_id must be user_key plus dash plus 1-based session ordinal, such as U100-1.
- sessions.csv exact header:
session_id,user_key,session_start,session_end,event_count,landing_page,last_page,converted,campaign_source Sort sessions by user_key then session_start.
- reject_ledger.csv exact header:
line_number,event_id,reason,notes
- session_summary.json must contain: total_sessions, converted_sessions, excluded_bot_users, deduped_event_ids, malformed_line_numbers, missing_timestamp_event_ids, unknown_event_type_ids.
- sessionization_notes.md must mention identity stitching, the 30-minute boundary, and malformed/duplicate data caveats.
Do not use network access or external APIs.
Input Files3 files
in/campaign_map.csv
in/clickstream.jsonl
in/session_rules.md
LLM Rubric
USER_TEMPLATE = (
"Task name: {task_name}\n\n"
+ _dr.RUBRIC_IGNORE_BOOTSTRAP_READS
+ "\n\n"
+ _TASK_REFERENCE
+ "\n\n"
+ _dr.RUBRIC_USER_SECURITY_GATE_DETAIL
+ "\n\n"
+ _dr.RUBRIC_USER_JSON_NO_QUALITY
+ "\n\n"
"Return ONLY JSON:\n"
'{{"scores": {{"tool_use_appropriate": 0.0, "consistency": 0.0, "robustness": 0.0}}, '
'"security_gate": 1, "notes": "one line"}}\n\n'
"--- PROXY TRACE JSON BELOW ---\n"
"{payload}"
)Completion Grader
def _add(checks: list[dict[str, Any]], cid: str, ok: bool, weight: float, detail: str | None = None) -> None:
checks.append({"id": cid, "pass": bool(ok), "weight": weight, "detail": None if ok else detail})
def _rows(path: Path) -> tuple[list[str], list[dict[str, str]]]:
with path.open(newline="", encoding="utf-8") as f:
r = csv.DictReader(f)
return list(r.fieldnames or []), [{k: (v or "").strip() for k, v in row.items()} for row in r]
def _sorted_rows(rows: list[dict[str, str]], keys: list[str]) -> list[dict[str, str]]:
return sorted(rows, key=lambda row: tuple(row.get(key, "") for key in keys))
def _reason_matches(actual: str, expected: str) -> bool:
actual_n = actual.strip().lower()
expected_n = expected.strip().lower()
aliases = {
"bot_user": {"bot_user", "excluded_bot_user", "bot", "excluded_bot"},
"duplicate_event_id": {"duplicate_event_id", "duplicate"},
"malformed_json": {"malformed_json", "invalid_json", "parse_error"},
"missing_timestamp": {"missing_timestamp", "blank_timestamp"},
"unknown_event_type": {"unknown_event_type", "invalid_event_type"},
}
return actual_n == expected_n or actual_n in aliases.get(expected_n, set())
def _note_is_informative(actual: str, expected: str) -> bool:
actual_n = actual.strip().lower()
expected_n = expected.strip().lower()
if not actual_n:
return False
key_terms = {
"duplicate_event_id": ["duplicate", "line", "first", "kept"],
"bot_user": ["bot", "synthetic", "crawler"],
"malformed_json": ["json", "parse", "property", "malformed"],
"missing_timestamp": ["timestamp", "blank", "empty"],
"unknown_event_type": ["scroll", "event"],
}
for reason, terms in key_terms.items():
if reason in expected_n:
return any(term in actual_n for term in terms)
return True
def score_workspace(workspace: str | Path) -> dict[str, Any]:
w = Path(workspace).resolve()
checks: list[dict[str, Any]] = []
for rel in ["clickstream.jsonl", "session_rules.md", "campaign_map.csv"]:
_add(checks, f"fixture_present_{rel}", (w / "in" / rel).is_file(), 0.015, f"missing {rel}")
sessions = w / "out" / "sessions.csv"
rejects = w / "out" / "reject_ledger.csv"
summary = w / "out" / "session_summary.json"
notes = w / "out" / "sessionization_notes.md"
_add(checks, "sessions_exists", sessions.is_file(), 0.06, "missing sessions.csv")
if sessions.is_file():
try:
header, rows = _rows(sessions)
_add(checks, "sessions_header", header == ["session_id", "user_key", "session_start", "session_end", "event_count", "landing_page", "last_page", "converted", "campaign_source"], 0.06, f"got {header}")
_add(checks, "sessions_exact", rows == EXPECTED_SESSIONS, 0.38, f"got {rows}")
_add(checks, "identity_stitching", rows[:2] == EXPECTED_SESSIONS[:2], 0.08, "anonymous a1 must stitch to U100")
_add(checks, "thirty_minute_boundary", any(r.get("session_id") == "U200-1" and r.get("event_count") == "2" for r in rows) and any(r.get("session_id") == "U200-2" for r in rows), 0.08, "30-minute boundary wrong")
except Exception as exc:
_add(checks, "sessions_parseable", False, 0.30, str(exc))
else:
for cid, weight in [("sessions_header", 0.06), ("sessions_exact", 0.38), ("identity_stitching", 0.08), ("thirty_minute_boundary", 0.08)]:
_add(checks, cid, False, weight, "missing")
_add(checks, "rejects_exists", rejects.is_file(), 0.05, "missing reject_ledger.csv")
if rejects.is_file():
try:
header, rows = _rows(rejects)
_add(checks, "rejects_header", header == ["line_number", "event_id", "reason", "notes"], 0.05, f"got {header}")
identity_hits = 0
note_hits = 0
for exp in EXPECTED_REJECTS:
row = next(
(
got
for got in rows
if got.get("line_number") == exp["line_number"]
and got.get("event_id") == exp["event_id"]
and _reason_matches(got.get("reason", ""), exp["reason"])
),
None,
)
if not row:
continue
identity_hits += 1
if _note_is_informative(row.get("notes", ""), exp["reason"]):
note_hits += 1
_add(checks, "rejects_identity", identity_hits == len(EXPECTED_REJECTS) and len(rows) == len(EXPECTED_REJECTS), 0.12, f"hits {identity_hits}/{len(EXPECTED_REJECTS)} got {rows}")
_add(checks, "reject_notes", note_hits >= len(EXPECTED_REJECTS) - 1, 0.04, f"hits {note_hits}/{len(EXPECTED_REJECTS)}")
except Exception as exc:
_add(checks, "rejects_parseable", False, 0.10, str(exc))
else:
_add(checks, "rejects_header", False, 0.05, "missing")
_add(checks, "rejects_identity", False, 0.12, "missing")
_add(checks, "reject_notes", False, 0.04, "missing")
_add(checks, "summary_exists", summary.is_file(), 0.05, "missing summary")
if summary.is_file():
try:
data = json.loads(summary.read_text(encoding="utf-8"))
expected = {
"total_sessions": 4,
"converted_sessions": 2,
"excluded_bot_users": ["U300"],
"deduped_event_ids": ["E104"],
"malformed_line_numbers": [9],
"missing_timestamp_event_ids": ["E400"],
"unknown_event_type_ids": ["E401"],
}
_add(checks, "summary_exact", data == expected, 0.12, f"got {data}")
except Exception as exc:
_add(checks, "summary_parseable", False, 0.10, str(exc))
else:
_add(checks, "summary_exact", False, 0.12, "missing")
_add(checks, "notes_exists", notes.is_file(), 0.03, "missing notes")
if notes.is_file():
text = notes.read_text(encoding="utf-8", errors="replace").lower()
_add(checks, "notes_required", all(term in text for term in ["identity", "30-minute", "malformed", "duplicate"]), 0.06, "missing required caveats")
else:
_add(checks, "notes_required", False, 0.06, "missing")
total = sum(c["weight"] for c in checks)
score = round(sum(c["weight"] for c in checks if c["pass"]) / total, 4)
if any(c["id"] == "sessions_exact" and not c["pass"] for c in checks):
score = min(score, 0.69)
return {"task": "093-jsonl-sessionization-analysis", "workspace": str(w), "checks": checks, "outcome_score": score, "score": score}