Refactor scorer for future maintainabiliy

This commit is contained in:
William Jeynes
2026-02-26 10:25:49 +00:00
parent 6c3aa7343d
commit 201176e71c
11 changed files with 601 additions and 356 deletions
+216
View File
@@ -0,0 +1,216 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[codz]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py.cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
# Pipfile.lock
# UV
# Similar to Pipfile.lock, it is generally recommended to include uv.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# uv.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
# poetry.lock
# poetry.toml
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
# pdm recommends including project-wide configuration in pdm.toml, but excluding .pdm-python.
# https://pdm-project.org/en/latest/usage/project/#working-with-version-control
# pdm.lock
# pdm.toml
.pdm-python
.pdm-build/
# pixi
# Similar to Pipfile.lock, it is generally recommended to include pixi.lock in version control.
# pixi.lock
# Pixi creates a virtual environment in the .pixi directory, just like venv module creates one
# in the .venv directory. It is recommended not to include this directory in version control.
.pixi
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# Redis
*.rdb
*.aof
*.pid
# RabbitMQ
mnesia/
rabbitmq/
rabbitmq-data/
# ActiveMQ
activemq-data/
# SageMath parsed files
*.sage.py
# Environments
.env
.envrc
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
# .idea/
# Abstra
# Abstra is an AI-powered process automation framework.
# Ignore directories containing user credentials, local state, and settings.
# Learn more at https://abstra.io/docs
.abstra/
# Visual Studio Code
# Visual Studio Code specific template is maintained in a separate VisualStudioCode.gitignore
# that can be found at https://github.com/github/gitignore/blob/main/Global/VisualStudioCode.gitignore
# and can be added to the global gitignore or merged into this file. However, if you prefer,
# you could uncomment the following to ignore the entire vscode folder
# .vscode/
# Ruff stuff:
.ruff_cache/
# PyPI configuration file
.pypirc
# Marimo
marimo/_static/
marimo/_lsp/
__marimo__/
# Streamlit
.streamlit/secrets.toml
+4
View File
@@ -0,0 +1,4 @@
INPUT_FILE = "../../data/results.jsonl"
OUTPUT_FILE = "../../data/ranked.jsonl"
PAGE_TITLE = "Claim Visualizer"
+76
View File
@@ -0,0 +1,76 @@
import json
from pathlib import Path
def load_data(file_path):
data = []
if Path(file_path).exists():
with open(file_path, "r", encoding="utf-8") as f:
for line in f:
if not line.strip():
continue
entry = json.loads(line)
outputs = entry.get("output", [])
if isinstance(outputs, dict):
outputs = [outputs]
for o in outputs:
content = o.get("content")
if content:
try:
o["content_parsed"] = json.loads(content)
except json.JSONDecodeError:
o["content_parsed"] = []
entry["output"] = outputs
data.append(entry)
return data
def save_data_clean(file_path, data):
merged = {}
for entry in data:
events = []
for o in entry.get("output", []):
if "content_parsed" in o:
events.extend(o["content_parsed"])
doc_url = entry.get("documentUrl")
if not doc_url:
continue
if doc_url not in merged:
new_entry = entry.copy()
new_entry["events"] = events
new_entry.pop("output", None)
new_entry.pop("status", None)
merged[doc_url] = new_entry
else:
merged[doc_url]["events"].extend(events)
for entry in merged.values():
entry["events"].sort(
key=lambda e: e.get("human_score", 0),
reverse=True
)
with open(file_path, "w", encoding="utf-8") as f:
for entry in merged.values():
f.write(json.dumps(entry, ensure_ascii=False) + "\n")
def save_data(file_path, data):
with open(file_path, "w", encoding="utf-8") as f:
for entry in data:
for o in entry.get("output", []):
if "content_parsed" in o:
o["content"] = json.dumps(
o["content_parsed"],
ensure_ascii=False
)
f.write(json.dumps(entry, ensure_ascii=False) + "\n")
+32 -356
View File
@@ -1,366 +1,42 @@
import copy
import importlib
import pkgutil
import streamlit as st
import json
import random
from pathlib import Path
from collections import Counter, defaultdict
import pandas as pd
from streamlit_sortables import sort_items
INPUT_FILE = "../../data/results.jsonl"
OUTPUT_FILE = "../../data/ranked.jsonl"
# --------------------------
# Helper functions
# --------------------------
def load_data(file_path):
data = []
if Path(file_path).exists():
with open(file_path, "r", encoding="utf-8") as f:
for line in f:
if not line.strip():
continue
entry = json.loads(line)
outputs = entry.get("output", [])
if isinstance(outputs, dict):
outputs = [outputs]
for o in outputs:
content = o.get("content")
if content:
try:
o["content_parsed"] = json.loads(content)
except json.JSONDecodeError:
o["content_parsed"] = []
entry["output"] = outputs
data.append(entry)
return data
def save_data_clean(file_path, data):
merged = {}
for entry in data:
events = []
for o in entry.get("output", []):
if "content_parsed" in o:
events.extend(o["content_parsed"])
doc_url = entry.get("documentUrl")
if not doc_url:
continue
if doc_url not in merged:
new_entry = entry.copy()
new_entry["events"] = events
new_entry.pop("output", None)
new_entry.pop("status", None)
merged[doc_url] = new_entry
else:
merged[doc_url]["events"].extend(events)
for entry in merged.values():
entry["events"].sort(
key=lambda e: e.get("human_score", 0),
reverse=True
)
with open(file_path, "w", encoding="utf-8") as f:
for entry in merged.values():
f.write(json.dumps(entry, ensure_ascii=False) + "\n")
def save_data(file_path, data):
with open(file_path, "w", encoding="utf-8") as f:
for entry in data:
for o in entry.get("output", []):
if "content_parsed" in o:
o["content"] = json.dumps(
o["content_parsed"],
ensure_ascii=False
)
f.write(json.dumps(entry, ensure_ascii=False) + "\n")
# --------------------------
# Session State Init
# --------------------------
if "data" not in st.session_state:
st.session_state.data = load_data(INPUT_FILE)
if "current_claim" not in st.session_state:
st.session_state.current_claim = None
if "drag_order" not in st.session_state:
st.session_state.drag_order = None
from config import PAGE_TITLE
from state import init_state
import views
st.set_page_config(
page_title="Claim Visualizer",
page_title=PAGE_TITLE,
layout="wide",
initial_sidebar_state="expanded"
initial_sidebar_state="expanded",
)
st.title("Claim Visualizer")
# --------------------------
# Sidebar
# --------------------------
init_state()
st.title(PAGE_TITLE)
def discover_views():
routes = {}
for module_info in pkgutil.iter_modules(views.__path__):
module_name = module_info.name
module = importlib.import_module(f"views.{module_name}")
if hasattr(module, "render") and hasattr(module, "page_title"):
title = module.page_title()
routes[title] = module.render
return routes
ROUTES = discover_views()
# optional: stable ordering
ROUTES = dict(sorted(ROUTES.items()))
view = st.sidebar.selectbox(
"Choose View",
[
"All Claims",
"Single Claim Random",
"Rank Perfect Events",
"View Rules",
"Statistics"
]
list(ROUTES.keys()),
)
# --------------------------
# View/AllClaims
# --------------------------
if view == "All Claims":
st.header("All Claims")
for entry in st.session_state.data:
st.subheader(entry.get("text"))
for o in entry.get("output", []):
for c in o.get("content_parsed", []):
st.markdown(f"**Event:** {c.get('event')}")
st.markdown(f"**Reasoning:** {c.get('reasoningWhyRelevant')}")
st.markdown(f"**Score:** {c.get('score')}")
st.markdown(f"**Human Score:** {c.get('human_score')}")
st.markdown(f"**Extra Info:** {c.get('extra_info', '')}")
st.markdown("---")
# --------------------------
# View/Annotate
# --------------------------
elif view == "Single Claim Random":
if st.session_state.current_claim is None:
unannotated = []
for entry in st.session_state.data:
claims = []
for o in entry.get("output", []):
for c in o.get("content_parsed", []):
if not c.get("ranked"):
claims.append(c)
if claims:
unannotated.append({"entry": entry, "claims": claims})
if unannotated:
st.session_state.current_claim = random.choice(unannotated)
st.session_state.drag_order = None
bundle = st.session_state.current_claim
if bundle is None:
st.info("All items annotated.")
else:
entry = bundle["entry"]
claims = bundle["claims"]
st.subheader(entry.get("text"))
st.write(entry.get("normalized", ""))
st.subheader("Annotate Events")
for idx, c in enumerate(claims):
with st.container(border=True):
st.markdown(f"**Event:** {c.get('event')}")
st.markdown(f"**Reasoning:** {c.get('reasoningWhyRelevant')}")
cols = st.columns(7)
temp = ""
labels = [
("Rewording", "REWORDING"),
("Not Specific", "NSPECIFIC"),
("Time Incorrect", "TINCORRECT"),
("Story?", "STORY"),
("Duplicate?", "DUPLICATE"),
("Bias Shown", "BIAS"),
("Perfect", "PERFECT"),
]
for i, (name, tag) in enumerate(labels):
with cols[i]:
if st.checkbox(name, key=f"{tag}{idx}{c.get('event')}"):
temp += tag + " "
c["extra_info"] = temp.strip()
c["ranked"] = True
if st.button("Save Annotation"):
save_data(INPUT_FILE, st.session_state.data)
st.session_state.current_claim = None
print("Annotation saved")
st.rerun()
# --------------------------
# View/Rank
# --------------------------
elif view == "Rank Perfect Events":
st.header("Rank PERFECT Events")
candidates = []
for entry in st.session_state.data:
perfect = []
for o in entry.get("output", []):
for c in o.get("content_parsed", []):
if "PERFECT" in c.get("extra_info", "") and not c.get("rank_position"):
perfect.append(c)
if perfect:
candidates.append({"entry": entry, "claims": perfect})
if not candidates:
st.info("No PERFECT events available.")
st.stop()
if "current_bundle" not in st.session_state:
st.session_state.current_bundle = random.choice(candidates)
bundle = st.session_state.current_bundle
entry = bundle["entry"]
claims = bundle["claims"]
st.subheader(entry.get("text"))
# init
if "perfect_order" not in st.session_state:
st.session_state.perfect_order = list(range(len(claims)))
order = st.session_state.perfect_order
# labels shown in sortable UI
labels = [
f"{i+1}. {claims[idx].get('event')}"
for i, idx in enumerate(order)
]
st.markdown("### Drag to reorder:")
# -------------------------
# Drag & drop UI
# -------------------------
new_labels = sort_items(labels)
# Convert reordered labels back → indices
if new_labels != labels:
new_order = []
for lbl in new_labels:
original_pos = labels.index(lbl)
new_order.append(order[original_pos])
st.session_state.perfect_order = new_order
order = new_order
st.markdown("---")
for rank, idx in enumerate(order):
c = claims[idx]
st.markdown(f"**Rank {rank+1}: {c.get('event')}**")
st.markdown(c.get("reasoningWhyRelevant"))
st.markdown("---")
if st.button("Submit PERFECT Ranking"):
n = len(order)
for rank_position, idx in enumerate(order):
claim_obj = claims[idx]
# explicit stored rank
claim_obj["rank_position"] = rank_position + 1
claim_obj["human_score"] = 1
# Auto-scoring
for entry in st.session_state.data:
for o in entry.get("output", []):
for c in o.get("content_parsed", []):
if c.get("human_score") is not None:
continue
extra = c.get("extra_info", "")
if "DUPLICATE" in extra:
c["human_score"] = 0
elif extra:
c["human_score"] = round(
c.get("score", 0) * 0.5, 3
)
save_data(INPUT_FILE, st.session_state.data)
save_data_clean(
OUTPUT_FILE,
copy.deepcopy(st.session_state.data)
)
# reset state for next example
del st.session_state.current_bundle
del st.session_state.perfect_order
print("Ranking saved!")
st.rerun()
# --------------------------
# View/Rules
# --------------------------
elif view == "View Rules":
with open("rules.txt", "r", encoding="utf-8") as f:
st.write(f.read())
# --------------------------
# View/Statistics
# --------------------------
elif view == "Statistics":
st.header("Statistics")
word_counter = Counter()
doc_scores = defaultdict(list)
diff_scores = defaultdict(list)
# ---- collect stats ----
for entry in st.session_state.data:
doc_url = entry.get("documentUrl")
for o in entry.get("output", []):
for c in o.get("content_parsed", []):
# ---- extra_info word counts ----
extra = c.get("extra_info", "")
if extra:
words = extra.strip().split()
word_counter.update(words)
# --------------------------
# Extra Info Word Counts
# --------------------------
st.subheader("Extra Info Label Counts")
if word_counter:
df_words = (
pd.DataFrame(word_counter.items(), columns=["Label", "Count"])
.sort_values("Count", ascending=False)
)
st.dataframe(df_words)
st.bar_chart(df_words.set_index("Label"))
else:
st.info("No extra_info data available yet.")
ROUTES[view]()
+13
View File
@@ -0,0 +1,13 @@
import streamlit as st
from config import INPUT_FILE
from data_utils import load_data
def init_state():
if "data" not in st.session_state:
st.session_state.data = load_data(INPUT_FILE)
if "current_claim" not in st.session_state:
st.session_state.current_claim = None
if "drag_order" not in st.session_state:
st.session_state.drag_order = None
View File
+18
View File
@@ -0,0 +1,18 @@
import streamlit as st
def page_title() -> str:
return "All Claims"
def render():
st.header("All Claims")
for entry in st.session_state.data:
st.subheader(entry.get("text"))
for o in entry.get("output", []):
for c in o.get("content_parsed", []):
st.markdown(f"**Event:** {c.get('event')}")
st.markdown(f"**Reasoning:** {c.get('reasoningWhyRelevant')}")
st.markdown(f"**Score:** {c.get('score')}")
st.markdown(f"**Human Score:** {c.get('human_score')}")
st.markdown(f"**Extra Info:** {c.get('extra_info', '')}")
st.markdown("---")
+75
View File
@@ -0,0 +1,75 @@
import random
import streamlit as st
from config import INPUT_FILE
from data_utils import save_data
def page_title() -> str:
return "Label"
def render():
if st.session_state.current_claim is None:
unannotated = []
for entry in st.session_state.data:
claims = []
for o in entry.get("output", []):
for c in o.get("content_parsed", []):
if not c.get("ranked"):
claims.append(c)
if claims:
unannotated.append({"entry": entry, "claims": claims})
if unannotated:
st.session_state.current_claim = random.choice(unannotated)
st.session_state.drag_order = None
bundle = st.session_state.current_claim
if bundle is None:
st.info("All items annotated.")
else:
entry = bundle["entry"]
claims = bundle["claims"]
st.subheader(entry.get("text"))
st.write(entry.get("normalized", ""))
st.subheader("Annotate Events")
for idx, c in enumerate(claims):
with st.container(border=True):
st.markdown(f"**Event:** {c.get('event')}")
st.markdown(f"**Reasoning:** {c.get('reasoningWhyRelevant')}")
cols = st.columns(7)
temp = ""
labels = [
("Rewording", "REWORDING"),
("Not Specific", "NSPECIFIC"),
("Time Incorrect", "TINCORRECT"),
("Story?", "STORY"),
("Duplicate?", "DUPLICATE"),
("Bias Shown", "BIAS"),
("Perfect", "PERFECT"),
]
for i, (name, tag) in enumerate(labels):
with cols[i]:
if st.checkbox(name, key=f"{tag}{idx}{c.get('event')}"):
temp += tag + " "
c["extra_info"] = temp.strip()
c["ranked"] = True
if st.button("Save Annotation"):
save_data(INPUT_FILE, st.session_state.data)
st.session_state.current_claim = None
print("Annotation saved")
st.rerun()
+116
View File
@@ -0,0 +1,116 @@
import streamlit as st
import copy
import random
from streamlit_sortables import sort_items
from config import INPUT_FILE, OUTPUT_FILE
from data_utils import save_data, save_data_clean
def page_title() -> str:
return "Rank"
def render():
st.header("Rank PERFECT Events")
candidates = []
for entry in st.session_state.data:
perfect = []
for o in entry.get("output", []):
for c in o.get("content_parsed", []):
if "PERFECT" in c.get("extra_info", "") and not c.get("rank_position"):
perfect.append(c)
if perfect:
candidates.append({"entry": entry, "claims": perfect})
if not candidates:
st.info("No PERFECT events available.")
st.stop()
if "current_bundle" not in st.session_state:
st.session_state.current_bundle = random.choice(candidates)
bundle = st.session_state.current_bundle
entry = bundle["entry"]
claims = bundle["claims"]
st.subheader(entry.get("text"))
# init
if "perfect_order" not in st.session_state:
st.session_state.perfect_order = list(range(len(claims)))
order = st.session_state.perfect_order
# labels shown in sortable UI
labels = [
f"{i+1}. {claims[idx].get('event')}"
for i, idx in enumerate(order)
]
st.markdown("### Drag to reorder:")
# -------------------------
# Drag & drop UI
# -------------------------
new_labels = sort_items(labels)
# Convert reordered labels back → indices
if new_labels != labels:
new_order = []
for lbl in new_labels:
original_pos = labels.index(lbl)
new_order.append(order[original_pos])
st.session_state.perfect_order = new_order
order = new_order
st.markdown("---")
for rank, idx in enumerate(order):
c = claims[idx]
st.markdown(f"**Rank {rank+1}: {c.get('event')}**")
st.markdown(c.get("reasoningWhyRelevant"))
st.markdown("---")
if st.button("Submit PERFECT Ranking"):
n = len(order)
for rank_position, idx in enumerate(order):
claim_obj = claims[idx]
# explicit stored rank
claim_obj["rank_position"] = rank_position + 1
claim_obj["human_score"] = 1
# Auto-scoring
for entry in st.session_state.data:
for o in entry.get("output", []):
for c in o.get("content_parsed", []):
if c.get("human_score") is not None:
continue
extra = c.get("extra_info", "")
if "DUPLICATE" in extra:
c["human_score"] = 0
elif extra:
c["human_score"] = round(
c.get("score", 0) * 0.5, 3
)
save_data(INPUT_FILE, st.session_state.data)
save_data_clean(
OUTPUT_FILE,
copy.deepcopy(st.session_state.data)
)
# reset state for next example
del st.session_state.current_bundle
del st.session_state.perfect_order
print("Ranking saved!")
st.rerun()
+9
View File
@@ -0,0 +1,9 @@
import streamlit as st
def page_title() -> str:
return "View Rules"
def render():
with open("rules.txt", "r", encoding="utf-8") as f:
st.write(f.read())
+42
View File
@@ -0,0 +1,42 @@
from collections import Counter, defaultdict
import streamlit as st
import pandas as pd
def page_title() -> str:
return "Statistics"
def render():
st.header("Statistics")
word_counter = Counter()
doc_scores = defaultdict(list)
diff_scores = defaultdict(list)
# ---- collect stats ----
for entry in st.session_state.data:
doc_url = entry.get("documentUrl")
for o in entry.get("output", []):
for c in o.get("content_parsed", []):
# ---- extra_info word counts ----
extra = c.get("extra_info", "")
if extra:
words = extra.strip().split()
word_counter.update(words)
# --------------------------
# Extra Info Word Counts
# --------------------------
st.subheader("Extra Info Label Counts")
if word_counter:
df_words = (
pd.DataFrame(word_counter.items(), columns=["Label", "Count"])
.sort_values("Count", ascending=False)
)
st.dataframe(df_words)
st.bar_chart(df_words.set_index("Label"))
else:
st.info("No extra_info data available yet.")