Spaces:
Running
Running
from nc_py_api import Nextcloud | |
import json | |
from typing import Dict, Any | |
import time | |
from datetime import datetime | |
import threading | |
import config | |
import math | |
import plotly.graph_objects as go | |
# Initialize Nextcloud client | |
nc = Nextcloud(nextcloud_url=config.NEXTCLOUD_URL, nc_auth_user=config.NEXTCLOUD_USERNAME, nc_auth_pass=config.NEXTCLOUD_PASSWORD) | |
# Dictionary to store ELO ratings | |
elo_ratings = {} | |
def load_leaderboard() -> Dict[str, Any]: | |
try: | |
file_content = nc.files.download(config.NEXTCLOUD_LEADERBOARD_PATH) | |
return json.loads(file_content.decode('utf-8')) | |
except Exception as e: | |
print(f"Error loading leaderboard: {str(e)}") | |
return {} | |
def save_leaderboard(leaderboard_data: Dict[str, Any]) -> bool: | |
try: | |
json_data = json.dumps(leaderboard_data, indent=2) | |
nc.files.upload(config.NEXTCLOUD_LEADERBOARD_PATH, json_data.encode('utf-8')) | |
return True | |
except Exception as e: | |
print(f"Error saving leaderboard: {str(e)}") | |
return False | |
def get_model_size(model_name): | |
for model, human_readable in config.get_approved_models(): | |
if model == model_name: | |
size = float(human_readable.split('(')[1].split('B')[0]) | |
return size | |
return 1.0 # Default size if not found | |
def calculate_expected_score(rating_a, rating_b): | |
return 1 / (1 + math.pow(10, (rating_b - rating_a) / 400)) | |
def update_elo_ratings(winner, loser): | |
if winner not in elo_ratings or loser not in elo_ratings: | |
initialize_elo_ratings() | |
winner_rating = elo_ratings[winner] | |
loser_rating = elo_ratings[loser] | |
expected_winner = calculate_expected_score(winner_rating, loser_rating) | |
expected_loser = 1 - expected_winner | |
winner_size = get_model_size(winner) | |
loser_size = get_model_size(loser) | |
max_size = max(get_model_size(model) for model, _ in config.get_approved_models()) | |
k_factor = min(64, 32 * (1 + (loser_size - winner_size) / max_size)) | |
elo_ratings[winner] += k_factor * (1 - expected_winner) | |
elo_ratings[loser] += k_factor * (0 - expected_loser) | |
def initialize_elo_ratings(): | |
leaderboard = load_leaderboard() | |
for model, _ in config.get_approved_models(): | |
size = get_model_size(model) | |
elo_ratings[model] = 1000 + (size * 100) | |
# Replay all battles to update ELO ratings | |
for model, data in leaderboard.items(): | |
if model not in elo_ratings: | |
elo_ratings[model] = 1000 + (get_model_size(model) * 100) | |
for opponent, results in data['opponents'].items(): | |
if opponent not in elo_ratings: | |
elo_ratings[opponent] = 1000 + (get_model_size(opponent) * 100) | |
for _ in range(results['wins']): | |
update_elo_ratings(model, opponent) | |
for _ in range(results['losses']): | |
update_elo_ratings(opponent, model) | |
def ensure_elo_ratings_initialized(): | |
if not elo_ratings: | |
initialize_elo_ratings() | |
def update_leaderboard(winner: str, loser: str) -> Dict[str, Any]: | |
leaderboard = load_leaderboard() | |
if winner not in leaderboard: | |
leaderboard[winner] = {"wins": 0, "losses": 0, "opponents": {}} | |
if loser not in leaderboard: | |
leaderboard[loser] = {"wins": 0, "losses": 0, "opponents": {}} | |
leaderboard[winner]["wins"] += 1 | |
leaderboard[winner]["opponents"].setdefault(loser, {"wins": 0, "losses": 0})["wins"] += 1 | |
leaderboard[loser]["losses"] += 1 | |
leaderboard[loser]["opponents"].setdefault(winner, {"wins": 0, "losses": 0})["losses"] += 1 | |
# Update ELO ratings | |
update_elo_ratings(winner, loser) | |
save_leaderboard(leaderboard) | |
return leaderboard | |
def get_current_leaderboard() -> Dict[str, Any]: | |
return load_leaderboard() | |
def get_human_readable_name(model_name: str) -> str: | |
model_dict = dict(config.get_approved_models()) | |
return model_dict.get(model_name, model_name) | |
def get_leaderboard(): | |
leaderboard = load_leaderboard() | |
# Calculate scores for each model | |
for model, results in leaderboard.items(): | |
total_battles = results["wins"] + results["losses"] | |
if total_battles > 0: | |
win_rate = results["wins"] / total_battles | |
results["score"] = win_rate * (1 - 1 / (total_battles + 1)) | |
else: | |
results["score"] = 0 | |
# Sort results by score, then by total battles | |
sorted_results = sorted( | |
leaderboard.items(), | |
key=lambda x: (x[1]["score"], x[1]["wins"] + x[1]["losses"]), | |
reverse=True | |
) | |
# Explanation of the main leaderboard | |
explanation = """ | |
<p style="font-size: 16px; margin-bottom: 20px;"> | |
This leaderboard uses a scoring system that balances win rate and total battles. The score is calculated using the formula: | |
<br> | |
<strong>Score = Win Rate * (1 - 1 / (Total Battles + 1))</strong> | |
<br> | |
This formula rewards models with higher win rates and more battles. As the number of battles increases, the score approaches the win rate. | |
</p> | |
""" | |
leaderboard_html = f""" | |
{explanation} | |
<style> | |
.leaderboard-table {{ | |
width: 100%; | |
border-collapse: collapse; | |
font-family: Arial, sans-serif; | |
}} | |
.leaderboard-table th, .leaderboard-table td {{ | |
border: 1px solid #ddd; | |
padding: 8px; | |
text-align: left; | |
}} | |
.leaderboard-table th {{ | |
background-color: rgba(255, 255, 255, 0.1); | |
font-weight: bold; | |
}} | |
.rank-column {{ | |
width: 60px; | |
text-align: center; | |
}} | |
.opponent-details {{ | |
font-size: 0.9em; | |
color: #888; | |
}} | |
</style> | |
<table class='leaderboard-table'> | |
<tr> | |
<th class='rank-column'>Rank</th> | |
<th>Model</th> | |
<th>Score</th> | |
<th>Wins</th> | |
<th>Losses</th> | |
<th>Win Rate</th> | |
<th>Total Battles</th> | |
<th>Top Rival</th> | |
<th>Toughest Opponent</th> | |
</tr> | |
""" | |
for index, (model, results) in enumerate(sorted_results, start=1): | |
total_battles = results["wins"] + results["losses"] | |
win_rate = (results["wins"] / total_battles * 100) if total_battles > 0 else 0 | |
rank_display = {1: "π₯", 2: "π₯", 3: "π₯"}.get(index, f"{index}") | |
top_rival = max(results["opponents"].items(), key=lambda x: x[1]["wins"], default=(None, {"wins": 0})) | |
top_rival_name = get_human_readable_name(top_rival[0]) if top_rival[0] else "N/A" | |
top_rival_wins = top_rival[1]["wins"] | |
toughest_opponent = max(results["opponents"].items(), key=lambda x: x[1]["losses"], default=(None, {"losses": 0})) | |
toughest_opponent_name = get_human_readable_name(toughest_opponent[0]) if toughest_opponent[0] else "N/A" | |
toughest_opponent_losses = toughest_opponent[1]["losses"] | |
leaderboard_html += f""" | |
<tr> | |
<td class='rank-column'>{rank_display}</td> | |
<td>{get_human_readable_name(model)}</td> | |
<td>{results['score']:.4f}</td> | |
<td>{results['wins']}</td> | |
<td>{results['losses']}</td> | |
<td>{win_rate:.2f}%</td> | |
<td>{total_battles}</td> | |
<td class='opponent-details'>{top_rival_name} (W: {top_rival_wins})</td> | |
<td class='opponent-details'>{toughest_opponent_name} (L: {toughest_opponent_losses})</td> | |
</tr> | |
""" | |
leaderboard_html += "</table>" | |
return leaderboard_html | |
def calculate_elo_impact(model): | |
positive_impact = 0 | |
negative_impact = 0 | |
leaderboard = load_leaderboard() | |
initial_rating = 1000 + (get_model_size(model) * 100) | |
if model in leaderboard: | |
for opponent, results in leaderboard[model]['opponents'].items(): | |
model_size = get_model_size(model) | |
opponent_size = get_model_size(opponent) | |
max_size = max(get_model_size(m) for m, _ in config.get_approved_models()) | |
size_difference = (opponent_size - model_size) / max_size | |
win_impact = 1 + max(0, size_difference) | |
loss_impact = 1 + max(0, -size_difference) | |
positive_impact += results['wins'] * win_impact | |
negative_impact += results['losses'] * loss_impact | |
return round(positive_impact), round(negative_impact), round(initial_rating) | |
def get_elo_leaderboard(): | |
ensure_elo_ratings_initialized() | |
leaderboard = load_leaderboard() | |
# Create a list of all models, including those from APPROVED_MODELS that might not be in the leaderboard yet | |
all_models = set(dict(config.get_approved_models()).keys()) | set(leaderboard.keys()) | |
elo_data = [] | |
for model in all_models: | |
initial_rating = 1000 + (get_model_size(model) * 100) | |
current_rating = elo_ratings.get(model, initial_rating) | |
# Calculate battle data only if the model exists in the leaderboard | |
if model in leaderboard: | |
wins = leaderboard[model].get('wins', 0) | |
losses = leaderboard[model].get('losses', 0) | |
total_battles = wins + losses | |
positive_impact, negative_impact, _ = calculate_elo_impact(model) | |
else: | |
wins = losses = total_battles = positive_impact = negative_impact = 0 | |
elo_data.append({ | |
'model': model, | |
'current_rating': current_rating, | |
'initial_rating': initial_rating, | |
'total_battles': total_battles, | |
'positive_impact': positive_impact, | |
'negative_impact': negative_impact | |
}) | |
# Sort the data by current rating | |
sorted_elo_data = sorted(elo_data, key=lambda x: x['current_rating'], reverse=True) | |
min_initial_rating = min(data['initial_rating'] for data in elo_data) | |
max_initial_rating = max(data['initial_rating'] for data in elo_data) | |
explanation_elo = f""" | |
<p style="font-size: 16px; margin-bottom: 20px;"> | |
This leaderboard uses a modified ELO rating system that takes into account both the performance and size of the models. | |
Initial ratings range from {round(min_initial_rating)} to {round(max_initial_rating)} points, based on model size, with larger models starting at higher ratings. | |
The "Positive Impact" score reflects the significance of wins, with higher scores for defeating larger models. | |
The "Negative Impact" score indicates the significance of losses, with higher scores for losing against smaller models. | |
The current ELO rating is calculated based on these impacts and the model's performance history. | |
</p> | |
""" | |
leaderboard_html = f""" | |
{explanation_elo} | |
<style> | |
.elo-leaderboard-table {{ | |
width: 100%; | |
border-collapse: collapse; | |
font-family: Arial, sans-serif; | |
}} | |
.elo-leaderboard-table th, .elo-leaderboard-table td {{ | |
border: 1px solid #ddd; | |
padding: 8px; | |
text-align: left; | |
}} | |
.elo-leaderboard-table th {{ | |
background-color: rgba(255, 255, 255, 0.1); | |
font-weight: bold; | |
}} | |
.rank-column {{ | |
width: 60px; | |
text-align: center; | |
}} | |
</style> | |
<table class='elo-leaderboard-table'> | |
<tr> | |
<th class='rank-column'>Rank</th> | |
<th>Model</th> | |
<th>Current ELO Rating</th> | |
<th>Positive Impact</th> | |
<th>Negative Impact</th> | |
<th>Total Battles</th> | |
<th>Initial Rating</th> | |
</tr> | |
""" | |
for index, data in enumerate(sorted_elo_data, start=1): | |
rank_display = {1: "π₯", 2: "π₯", 3: "π₯"}.get(index, f"{index}") | |
leaderboard_html += f""" | |
<tr> | |
<td class='rank-column'>{rank_display}</td> | |
<td>{get_human_readable_name(data['model'])}</td> | |
<td><strong>{round(data['current_rating'])}</strong></td> | |
<td>{data['positive_impact']}</td> | |
<td>{data['negative_impact']}</td> | |
<td>{data['total_battles']}</td> | |
<td>{round(data['initial_rating'])}</td> | |
</tr> | |
""" | |
leaderboard_html += "</table>" | |
return leaderboard_html | |
def create_backup(): | |
while True: | |
try: | |
leaderboard_data = load_leaderboard() | |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
backup_file_name = f"leaderboard_backup_{timestamp}.json" | |
backup_path = f"{config.NEXTCLOUD_BACKUP_FOLDER}/{backup_file_name}" | |
json_data = json.dumps(leaderboard_data, indent=2) | |
nc.files.upload(backup_path, json_data.encode('utf-8')) | |
print(f"Backup created on Nextcloud: {backup_path}") | |
except Exception as e: | |
print(f"Error creating backup: {e}") | |
time.sleep(43200) # Sleep for 12 HOURS | |
def start_backup_thread(): | |
backup_thread = threading.Thread(target=create_backup, daemon=True) | |
backup_thread.start() | |
def get_leaderboard_chart(): | |
battle_results = get_current_leaderboard() | |
# Calculate scores and sort results | |
for model, results in battle_results.items(): | |
total_battles = results["wins"] + results["losses"] | |
if total_battles > 0: | |
win_rate = results["wins"] / total_battles | |
results["score"] = win_rate * (1 - 1 / (total_battles + 1)) | |
else: | |
results["score"] = 0 | |
sorted_results = sorted( | |
battle_results.items(), | |
key=lambda x: (x[1]["score"], x[1]["wins"] + x[1]["losses"]), | |
reverse=True | |
) | |
models = [get_human_readable_name(model) for model, _ in sorted_results] | |
wins = [results["wins"] for _, results in sorted_results] | |
losses = [results["losses"] for _, results in sorted_results] | |
scores = [results["score"] for _, results in sorted_results] | |
fig = go.Figure() | |
# Stacked Bar chart for Wins and Losses | |
fig.add_trace(go.Bar( | |
x=models, | |
y=wins, | |
name='Wins', | |
marker_color='#22577a' | |
)) | |
fig.add_trace(go.Bar( | |
x=models, | |
y=losses, | |
name='Losses', | |
marker_color='#38a3a5' | |
)) | |
# Line chart for Scores | |
fig.add_trace(go.Scatter( | |
x=models, | |
y=scores, | |
name='Score', | |
yaxis='y2', | |
line=dict(color='#ff7f0e', width=2) | |
)) | |
# Update layout for full-width, increased height, and secondary y-axis | |
fig.update_layout( | |
title='Model Performance', | |
xaxis_title='Models', | |
yaxis_title='Number of Battles', | |
yaxis2=dict( | |
title='Score', | |
overlaying='y', | |
side='right' | |
), | |
barmode='stack', | |
height=800, | |
width=1450, | |
autosize=True, | |
legend=dict( | |
orientation='h', | |
yanchor='bottom', | |
y=1.02, | |
xanchor='right', | |
x=1 | |
) | |
) | |
return fig |