Ruby-Cogs/leaderboard/leaderboard.py

554 lines
No EOL
23 KiB
Python

from redbot.core import commands, Config, bank
from redbot.core.bot import Red
from redbot.core.utils.chat_formatting import box, pagify, humanize_number
import discord
from discord.ui import Button, View
import aiohttp
import asyncio
import logging
from datetime import datetime, timezone
from typing import Dict, Optional, List
log = logging.getLogger("red.ruby.leaderboard")
class LeaderboardView(View):
def __init__(self, cog, ctx, pages: List[discord.Embed], timeout: float = 180.0):
super().__init__(timeout=timeout)
self.cog = cog
self.ctx = ctx
self.pages = pages
self.current_page = 0
self.message = None
# Add buttons
self.first_page.disabled = True
self.prev_page.disabled = True
if len(self.pages) == 1:
self.next_page.disabled = True
self.last_page.disabled = True
@discord.ui.button(label="", style=discord.ButtonStyle.gray)
async def first_page(self, interaction: discord.Interaction, button: discord.ui.Button):
self.current_page = 0
await self.update_page(interaction)
@discord.ui.button(label="Previous", style=discord.ButtonStyle.blurple)
async def prev_page(self, interaction: discord.Interaction, button: discord.ui.Button):
self.current_page = max(0, self.current_page - 1)
await self.update_page(interaction)
@discord.ui.button(label="Next", style=discord.ButtonStyle.blurple)
async def next_page(self, interaction: discord.Interaction, button: discord.ui.Button):
self.current_page = min(len(self.pages) - 1, self.current_page + 1)
await self.update_page(interaction)
@discord.ui.button(label="", style=discord.ButtonStyle.gray)
async def last_page(self, interaction: discord.Interaction, button: discord.ui.Button):
self.current_page = len(self.pages) - 1
await self.update_page(interaction)
async def update_page(self, interaction: discord.Interaction):
# Update button states
self.first_page.disabled = self.current_page == 0
self.prev_page.disabled = self.current_page == 0
self.next_page.disabled = self.current_page == len(self.pages) - 1
self.last_page.disabled = self.current_page == len(self.pages) - 1
await interaction.response.edit_message(embed=self.pages[self.current_page], view=self)
async def interaction_check(self, interaction: discord.Interaction) -> bool:
if interaction.user.id == self.ctx.author.id:
return True
await interaction.response.send_message("This menu is not for you!", ephemeral=True)
return False
async def on_timeout(self):
try:
for item in self.children:
item.disabled = True
await self.message.edit(view=self)
except discord.HTTPException:
pass
class Leaderboard(commands.Cog):
"""Global leaderboard system for Ruby."""
def __init__(self, bot: Red):
self.bot = bot
self.config = Config.get_conf(self, identifier=867530999, force_registration=True)
# Optional API integration
self.session = None
self.api_base_url = "https://ruby.valerie.lol/api"
self.admin_secret = None
# Add task for auto-sync
self.auto_sync_task = None
default_guild = {
"min_message_length": 5,
"cooldown": 60,
}
default_user = {
"opted_out": False
}
self.config.register_guild(**default_guild)
self.config.register_user(**default_user)
async def initialize(self):
"""Initialize optional API connection and start auto-sync task."""
try:
self.admin_secret = await self.bot.get_shared_api_tokens("ruby_api")
if self.admin_secret.get("admin_secret"):
self.session = aiohttp.ClientSession()
# Test API connection
async with self.session.get(
f"{self.api_base_url}/leaderboard",
headers={"Authorization": self.admin_secret["admin_secret"]}
) as resp:
if resp.status != 200:
log.error(f"Failed to connect to API: Status {resp.status}")
else:
# Start auto-sync task if API connection successful
self.auto_sync_task = self.bot.loop.create_task(self._auto_sync_task())
log.info("Started automatic leaderboard sync task")
except Exception as e:
log.error(f"Failed to initialize API connection: {e}")
async def _auto_sync_task(self):
"""Task to automatically sync leaderboard every 6 hours."""
await self.bot.wait_until_ready()
while True:
try:
success_count, fail_count = await self._perform_full_sync()
log.info(f"Completed automatic leaderboard sync: {success_count} users with 10,000+ credits synced, {fail_count} failures")
await asyncio.sleep(21600) # 6 hours in seconds
except asyncio.CancelledError:
break
except Exception as e:
log.error(f"Error in automatic sync task: {e}")
await asyncio.sleep(300) # Wait 5 minutes before retrying on error
async def _perform_full_sync(self) -> tuple[int, int]:
"""Perform a full sync of all users to the API.
Returns tuple of (success_count, fail_count)"""
success_count = 0
fail_count = 0
processed_users = set()
users_to_sync = []
min_credits = 10000 # Minimum credits to show on leaderboard
# First, collect all eligible users and their credits
for guild in self.bot.guilds:
for member in guild.members:
if member.bot or member.id in processed_users:
continue
try:
credits = await self.get_user_credits(member)
# Only sync users with 10,000+ credits
if credits >= min_credits:
users_to_sync.append({
"member": member,
"credits": credits
})
log.debug(f"Queued for sync: {member} ({member.id}) with {credits} credits")
processed_users.add(member.id)
except Exception as e:
log.error(f"Error processing user {member} ({member.id}): {e}")
fail_count += 1
# Debug logging
log.info(f"Found {len(users_to_sync)} users with {min_credits}+ credits to sync")
# Now perform the sync for eligible users
for user_data in users_to_sync:
member = user_data["member"]
try:
if await self._try_api_update(
str(member.id),
str(member),
user_data["credits"]
):
success_count += 1
log.debug(f"Successfully synced {member} ({member.id}) with {user_data['credits']} credits")
else:
fail_count += 1
log.error(f"Failed to sync {member} ({member.id})")
except Exception as e:
log.error(f"Error syncing user {member} ({member.id}): {e}")
fail_count += 1
# Verify the sync by checking the API
try:
async with self.session.get(
f"{self.api_base_url}/leaderboard",
headers={"Authorization": self.admin_secret["admin_secret"]}
) as resp:
if resp.status == 200:
data = await resp.json()
if "leaderboard" in data:
log.info(f"API verification: {len(data['leaderboard'])} users in leaderboard after sync")
if len(data['leaderboard']) != len(users_to_sync):
log.warning(f"Mismatch in user count: {len(users_to_sync)} synced vs {len(data['leaderboard'])} in API")
else:
log.error("API verification failed: Invalid response format")
else:
log.error(f"API verification failed: Status {resp.status}")
except Exception as e:
log.error(f"API verification failed: {e}")
return success_count, fail_count
async def cog_load(self):
"""Initialize the cog when it's loaded."""
await self.initialize()
async def get_user_credits(self, user: discord.Member) -> int:
"""Get a user's total credits from all servers."""
try:
# Use Red's bank API to get the user's balance
# This automatically handles the global bank if enabled
balance = await bank.get_balance(user)
log.debug(f"Got balance for {user} ({user.id}): {balance} credits")
return balance
except Exception as e:
log.error(f"Error getting credits for {user}: {e}")
return 0
async def get_all_balances(self) -> List[dict]:
"""Get all users' credit balances across all servers."""
all_users = {}
min_credits = 10000 # Minimum credits to show on leaderboard - matches API's MIN_CREDITS
processed_users = set()
# Process each guild
for guild in self.bot.guilds:
for member in guild.members:
if member.bot or member.id in processed_users:
continue
# Skip opted-out users
try:
if await self.config.user(member).opted_out():
continue
except Exception as e:
log.error(f"Error checking opt-out status for {member}: {e}")
continue
try:
credits = await self.get_user_credits(member)
# Debug logging for credit calculation
log.debug(f"Checking credits for {member} ({member.id}): {credits} credits, minimum: {min_credits}")
# Store user data regardless of credits amount
all_users[member.id] = {
"userId": str(member.id),
"username": str(member),
"points": credits
}
processed_users.add(member.id)
except Exception as e:
log.error(f"Error getting balance for {member}: {e}")
continue
# Filter and sort users after collecting all data
qualified_users = [
user for user in all_users.values()
if user["points"] >= min_credits
]
sorted_users = sorted(
qualified_users,
key=lambda x: x["points"],
reverse=True
)
# Debug logging
log.info(f"Found {len(all_users)} total users, {len(sorted_users)} with {min_credits}+ credits")
if sorted_users:
log.info("Top 10 users:")
for user in sorted_users[:10]:
log.info(f"User {user['username']} ({user['userId']}) has {user['points']} credits")
else:
log.warning("No users found with sufficient credits!")
return sorted_users
async def _try_api_update(self, user_id: str, username: str, credits: int) -> bool:
"""Try to update the API if configured."""
if not self.session or not self.admin_secret or not self.admin_secret.get("admin_secret"):
return False
try:
if credits < 0:
credits = 0 # API doesn't accept negative values
# Get opt-out status
opted_out = await self.config.user_from_id(int(user_id)).opted_out()
# Debug logging before update
log.debug(f"Sending API update for {username} ({user_id}): {credits} credits, opted_out: {opted_out}")
async with self.session.post(
f"{self.api_base_url}/leaderboard",
headers={
"Authorization": self.admin_secret["admin_secret"],
"Content-Type": "application/json"
},
json={
"userId": str(user_id),
"username": str(username),
"points": credits,
"optedOut": opted_out
}
) as resp:
if resp.status == 200:
data = await resp.json()
if data.get("success"):
log.info(f"Updated leaderboard for {username}: {credits} credits (opted_out: {opted_out})")
return True
else:
log.error(f"API update failed: {data.get('error', 'Unknown error')}")
elif resp.status == 401:
log.error("Unauthorized: Invalid admin secret")
else:
log.error(f"API update failed: Status {resp.status}")
return False
except Exception as e:
log.error(f"API update failed: {e}")
return False
async def _get_api_leaderboard(self) -> Optional[list]:
"""Fetch the leaderboard from the API."""
if not self.session or not self.admin_secret or not self.admin_secret.get("admin_secret"):
return None
try:
async with self.session.get(
f"{self.api_base_url}/leaderboard",
headers={"Authorization": self.admin_secret["admin_secret"]}
) as resp:
if resp.status == 200:
data = await resp.json()
if "leaderboard" in data:
return data["leaderboard"]
log.error("Invalid API response format")
elif resp.status == 401:
log.error("Unauthorized: Invalid admin secret")
else:
log.error(f"Failed to fetch leaderboard: Status {resp.status}")
return None
except Exception as e:
log.error(f"Error fetching leaderboard: {e}")
return None
@commands.group(name="globalboard", aliases=["glb"])
async def globalboard(self, ctx: commands.Context):
"""Global leaderboard commands."""
if ctx.invoked_subcommand is None:
return
@globalboard.command(name="show")
async def show_leaderboard(self, ctx: commands.Context, page: int = 1):
"""Show the global credits leaderboard."""
async with ctx.typing():
leaderboard_data = await self.get_all_balances()
if not leaderboard_data:
# Debug info in log
log.warning(f"No leaderboard data returned for guild {ctx.guild.id}")
return await ctx.send("No users have 10,000 or more credits!")
items_per_page = 10
chunks = [leaderboard_data[i:i + items_per_page]
for i in range(0, len(leaderboard_data), items_per_page)]
if not chunks:
return await ctx.send("No users have 10,000 or more credits!")
embeds = []
for page_num, entries in enumerate(chunks, 1):
embed = discord.Embed(
title="🏆 Global Credits Leaderboard",
description="*Only showing users with 10,000+ credits*",
color=await ctx.embed_color()
)
description = []
start_pos = (page_num - 1) * items_per_page
for i, entry in enumerate(entries, start=start_pos + 1):
username = entry.get("username", "Unknown User")
user_id = entry.get("userId", "0")
credits = entry.get("points", 0)
description.append(
f"`{i}.` <@{user_id}> • **{humanize_number(credits)}** credits"
)
embed.description = embed.description + "\n\n" + "\n".join(description)
embed.set_footer(text=f"Page {page_num}/{len(chunks)} • Total Users: {len(leaderboard_data)}")
embeds.append(embed)
view = LeaderboardView(self, ctx, embeds)
view.message = await ctx.send(embed=embeds[0], view=view)
@globalboard.command(name="optout")
async def opt_out(self, ctx: commands.Context):
"""Opt out of the global leaderboard. Your credits will no longer be visible."""
user_settings = self.config.user(ctx.author)
current_status = await user_settings.opted_out()
if current_status:
await ctx.send("You are already opted out of the global leaderboard.")
return
await user_settings.opted_out.set(True)
# Update API with new opt-out status
credits = await self.get_user_credits(ctx.author)
await self._try_api_update(str(ctx.author.id), str(ctx.author), credits)
await ctx.send("You have been opted out of the global leaderboard. Your credits will no longer be visible.")
@globalboard.command(name="optin")
async def opt_in(self, ctx: commands.Context):
"""Opt back into the global leaderboard."""
user_settings = self.config.user(ctx.author)
current_status = await user_settings.opted_out()
if not current_status:
await ctx.send("You are already opted into the global leaderboard.")
return
await user_settings.opted_out.set(False)
# Update API with new opt-in status
credits = await self.get_user_credits(ctx.author)
await self._try_api_update(str(ctx.author.id), str(ctx.author), credits)
await ctx.send("You have been opted back into the global leaderboard. Your credits will now be visible.")
@globalboard.command(name="credits")
async def check_credits(self, ctx: commands.Context, member: commands.MemberConverter = None):
"""Check your credits or another member's credits."""
member = member or ctx.author
# Check if the target user has opted out
if await self.config.user(member).opted_out():
if member == ctx.author:
await ctx.send("You have opted out of the global leaderboard. Use `[p]globalboard optin` to show your credits again.")
else:
await ctx.send(f"{member.display_name} has opted out of the global leaderboard.")
return
credits = await self.get_user_credits(member)
# Debug logging
log.info(f"Credits check for {member} ({member.id}): {credits} credits")
# Get user's rank
leaderboard_data = await self.get_all_balances()
rank = next(
(i for i, entry in enumerate(leaderboard_data, 1)
if entry.get("userId") == str(member.id)),
None
)
embed = discord.Embed(
title="🏆 Global Credits",
color=await ctx.embed_color()
)
if credits >= 10000:
embed.description = (
f"**User:** {member.mention}\n"
f"**Credits:** {humanize_number(credits)}\n"
f"**Rank:** #{humanize_number(rank) if rank else 'Unranked'}\n"
f"**ID:** {member.id}"
)
else:
embed.description = f"{member.mention} has less than 10,000 credits ({humanize_number(credits)} total)"
embed.set_thumbnail(url=member.display_avatar.url)
await ctx.send(embed=embed)
@globalboard.command(name="resync")
@commands.is_owner()
async def resync_leaderboard(self, ctx: commands.Context):
"""Force a resync of all users with 10,000+ credits to the API."""
if not self.session or not self.admin_secret or not self.admin_secret.get("admin_secret"):
await ctx.send("API is not configured.")
return
async with ctx.typing():
message = await ctx.send("🔄 Starting global leaderboard resync (10,000+ credits only)...")
try:
success_count, fail_count = await self._perform_full_sync()
embed = discord.Embed(
title="🔄 Global Leaderboard Resync Complete",
description=(
f"Successfully updated: **{success_count}** users with 10,000+ credits\n"
f"Failed to update: **{fail_count}** users\n\n"
"Note: Updates may take a few moments to appear on the website."
),
color=await ctx.embed_color(),
timestamp=datetime.now(timezone.utc)
)
await message.edit(content=None, embed=embed)
except Exception as e:
await message.edit(content=f"❌ Error during resync: {str(e)}")
@commands.Cog.listener()
async def on_bank_update(self, member: discord.Member, before: int, after: int):
"""Update API when a member's bank balance changes."""
if member.bot:
return
total_credits = await self.get_user_credits(member)
await self._try_api_update(str(member.id), str(member), total_credits)
@globalboard.command(name="cleanup")
@commands.is_owner()
async def cleanup_leaderboard(self, ctx: commands.Context):
"""Clean up old leaderboard entries (older than 30 days)."""
if not self.session or not self.admin_secret or not self.admin_secret.get("admin_secret"):
await ctx.send("API is not configured.")
return
try:
async with self.session.delete(
f"{self.api_base_url}/leaderboard",
headers={"Authorization": self.admin_secret["admin_secret"]}
) as resp:
if resp.status == 200:
data = await resp.json()
if data.get("success"):
await ctx.send(
f"Cleaned up {data['removedCount']} old entries. "
f"{data['remainingCount']} entries remaining."
)
else:
await ctx.send("Failed to clean up leaderboard.")
elif resp.status == 401:
await ctx.send("Unauthorized: Invalid admin secret")
else:
await ctx.send(f"Failed to clean up leaderboard: Status {resp.status}")
except Exception as e:
await ctx.send(f"Error cleaning up leaderboard: {e}")
def cog_unload(self):
"""Clean up when cog is unloaded."""
if self.session:
asyncio.create_task(self.session.close())
if self.auto_sync_task:
self.auto_sync_task.cancel()