Ruby-Cogs/leaderboard/leaderboard.py

541 lines
No EOL
22 KiB
Python

from redbot.core import commands, Config, bank
from redbot.core.bot import Red
from redbot.core.utils.chat_formatting import box, pagify, humanize_number
import discord
from discord.ui import Button, View
import aiohttp
import asyncio
import logging
from datetime import datetime, timezone
from typing import Dict, Optional, List
log = logging.getLogger("red.ruby.leaderboard")
class LeaderboardView(View):
def __init__(self, cog, ctx, pages: List[discord.Embed], timeout: float = 180.0):
super().__init__(timeout=timeout)
self.cog = cog
self.ctx = ctx
self.pages = pages
self.current_page = 0
self.message = None
# Add buttons
self.first_page.disabled = True
self.prev_page.disabled = True
if len(self.pages) == 1:
self.next_page.disabled = True
self.last_page.disabled = True
@discord.ui.button(label="", style=discord.ButtonStyle.gray)
async def first_page(self, interaction: discord.Interaction, button: discord.ui.Button):
self.current_page = 0
await self.update_page(interaction)
@discord.ui.button(label="Previous", style=discord.ButtonStyle.blurple)
async def prev_page(self, interaction: discord.Interaction, button: discord.ui.Button):
self.current_page = max(0, self.current_page - 1)
await self.update_page(interaction)
@discord.ui.button(label="Next", style=discord.ButtonStyle.blurple)
async def next_page(self, interaction: discord.Interaction, button: discord.ui.Button):
self.current_page = min(len(self.pages) - 1, self.current_page + 1)
await self.update_page(interaction)
@discord.ui.button(label="", style=discord.ButtonStyle.gray)
async def last_page(self, interaction: discord.Interaction, button: discord.ui.Button):
self.current_page = len(self.pages) - 1
await self.update_page(interaction)
async def update_page(self, interaction: discord.Interaction):
# Update button states
self.first_page.disabled = self.current_page == 0
self.prev_page.disabled = self.current_page == 0
self.next_page.disabled = self.current_page == len(self.pages) - 1
self.last_page.disabled = self.current_page == len(self.pages) - 1
await interaction.response.edit_message(embed=self.pages[self.current_page], view=self)
async def interaction_check(self, interaction: discord.Interaction) -> bool:
if interaction.user.id == self.ctx.author.id:
return True
await interaction.response.send_message("This menu is not for you!", ephemeral=True)
return False
async def on_timeout(self):
try:
for item in self.children:
item.disabled = True
await self.message.edit(view=self)
except discord.HTTPException:
pass
class Leaderboard(commands.Cog):
"""Global leaderboard system for Ruby."""
def __init__(self, bot: Red):
self.bot = bot
self.config = Config.get_conf(self, identifier=867530999, force_registration=True)
# Optional API integration
self.session = None
self.api_base_url = "https://ruby.valerie.lol/api"
self.admin_secret = None
# Add task for auto-sync
self.auto_sync_task = None
default_guild = {
"min_message_length": 5,
"cooldown": 60,
}
default_user = {
"opted_out": False
}
self.config.register_guild(**default_guild)
self.config.register_user(**default_user)
async def initialize(self):
"""Initialize optional API connection and start auto-sync task."""
try:
self.admin_secret = await self.bot.get_shared_api_tokens("ruby_api")
if self.admin_secret.get("admin_secret"):
self.session = aiohttp.ClientSession()
# Test API connection
try:
async with self.session.get(
f"{self.api_base_url}/leaderboard",
headers={"Authorization": self.admin_secret["admin_secret"]}
) as resp:
if resp.status == 200:
# Start auto-sync task if API connection successful
self.auto_sync_task = self.bot.loop.create_task(self._auto_sync_task())
log.info("Started automatic leaderboard sync task")
elif resp.status == 401:
log.error("Failed to initialize API: Invalid admin secret")
else:
log.error(f"Failed to initialize API: Status {resp.status}")
try:
data = await resp.json()
if data.get("error"):
log.error(f"Error message: {data['error']}")
except aiohttp.ContentTypeError:
log.error("API returned invalid JSON response during initialization")
except aiohttp.ClientError as e:
log.error(f"Failed to connect to API: {str(e)}")
except Exception as e:
log.error(f"Failed to initialize API connection: {str(e)}")
async def _auto_sync_task(self):
"""Task to automatically sync leaderboard every 6 hours."""
await self.bot.wait_until_ready()
while True:
try:
await self._perform_full_sync()
log.info("Completed automatic leaderboard sync")
await asyncio.sleep(21600) # 6 hours in seconds
except asyncio.CancelledError:
break
except Exception as e:
log.error(f"Error in automatic sync task: {e}")
await asyncio.sleep(300) # Wait 5 minutes before retrying on error
async def _perform_full_sync(self) -> tuple[int, int]:
"""Perform a full sync of all users to the API.
Returns tuple of (success_count, fail_count)"""
success_count = 0
fail_count = 0
processed_users = set()
for guild in self.bot.guilds:
for member in guild.members:
if member.bot or member.id in processed_users:
continue
try:
# Get total credits across all guilds
credits = await self.get_user_credits(member)
# Update API
if await self._try_api_update(str(member.id), str(member), credits):
success_count += 1
else:
fail_count += 1
processed_users.add(member.id)
except Exception as e:
log.error(f"Error syncing user {member} ({member.id}): {e}")
fail_count += 1
return success_count, fail_count
async def cog_load(self):
"""Initialize the cog when it's loaded."""
await self.initialize()
async def get_user_credits(self, user: discord.Member) -> int:
"""Get a user's total credits from all servers."""
total_credits = 0
try:
# Get credits from all servers the user is in
for guild in self.bot.guilds:
try:
member = guild.get_member(user.id)
if member: # If user is in this guild
guild_credits = await bank.get_balance(member)
total_credits += guild_credits
except Exception as e:
log.error(f"Error getting bank balance for {user} in {guild}: {e}")
continue
except Exception as e:
log.error(f"Error getting total credits for {user}: {e}")
return total_credits
async def get_all_balances(self) -> List[dict]:
"""Get all users' credit balances across all servers."""
all_users = {}
min_credits = 10000 # Minimum credits to show on leaderboard - matches API's MIN_CREDITS
# First get all unique members across all guilds
processed_users = set()
# Process each guild
for guild in self.bot.guilds:
for member in guild.members:
if member.bot or member.id in processed_users:
continue
# Skip opted-out users
try:
if await self.config.user(member).opted_out():
continue
except Exception as e:
log.error(f"Error checking opt-out status for {member}: {e}")
continue
try:
total_credits = await self.get_user_credits(member)
if total_credits >= min_credits:
all_users[member.id] = {
"userId": str(member.id),
"username": str(member),
"points": total_credits
}
processed_users.add(member.id)
except Exception as e:
log.error(f"Error getting balance for {member}: {e}")
continue
# Sort by total credits
sorted_users = sorted(
all_users.values(),
key=lambda x: x["points"],
reverse=True
)
# Debug logging
log.info(f"Found {len(sorted_users)} users with {min_credits}+ credits")
for user in sorted_users[:10]: # Log top 10 for debugging
log.info(f"User {user['username']} has {user['points']} credits")
return sorted_users
async def _try_api_update(self, user_id: str, username: str, credits: int) -> bool:
"""Try to update the API if configured."""
if not self.session or not self.admin_secret or not self.admin_secret.get("admin_secret"):
return False
try:
if credits < 0:
credits = 0 # API doesn't accept negative values
# Get opt-out status
opted_out = await self.config.user_from_id(int(user_id)).opted_out()
async with self.session.post(
f"{self.api_base_url}/leaderboard",
headers={
"Authorization": self.admin_secret["admin_secret"],
"Content-Type": "application/json"
},
json={
"userId": str(user_id),
"username": str(username),
"points": credits,
"optedOut": opted_out
}
) as resp:
if resp.status == 200:
try:
data = await resp.json()
if data.get("success"):
log.info(f"Updated leaderboard for {username}: {credits} credits (opted_out: {opted_out})")
return True
else:
log.error(f"API update failed: {data.get('error', 'Unknown error')}")
except aiohttp.ContentTypeError:
log.error("API returned invalid JSON response")
elif resp.status == 401:
log.error("Unauthorized: Invalid admin secret")
else:
log.error(f"API update failed: Status {resp.status}")
try:
data = await resp.json()
if data.get("error"):
log.error(f"Error message: {data['error']}")
except aiohttp.ContentTypeError:
pass
return False
except aiohttp.ClientError as e:
log.error(f"API request failed: {str(e)}")
return False
except Exception as e:
log.error(f"Unexpected error during API update: {str(e)}")
return False
async def _get_api_leaderboard(self) -> Optional[list]:
"""Fetch the leaderboard from the API."""
if not self.session or not self.admin_secret or not self.admin_secret.get("admin_secret"):
return None
try:
async with self.session.get(
f"{self.api_base_url}/leaderboard",
headers={"Authorization": self.admin_secret["admin_secret"]}
) as resp:
if resp.status == 200:
try:
data = await resp.json()
if "leaderboard" in data:
return data["leaderboard"]
log.error("Invalid API response format")
except aiohttp.ContentTypeError:
log.error("API returned invalid JSON response")
elif resp.status == 401:
log.error("Unauthorized: Invalid admin secret")
else:
log.error(f"Failed to fetch leaderboard: Status {resp.status}")
try:
data = await resp.json()
if data.get("error"):
log.error(f"Error message: {data['error']}")
except aiohttp.ContentTypeError:
pass
return None
except aiohttp.ClientError as e:
log.error(f"Error fetching leaderboard: {str(e)}")
return None
except Exception as e:
log.error(f"Unexpected error fetching leaderboard: {str(e)}")
return None
@commands.group(name="globalboard", aliases=["glb"])
async def globalboard(self, ctx: commands.Context):
"""Global leaderboard commands."""
if ctx.invoked_subcommand is None:
return
@globalboard.command(name="show")
async def show_leaderboard(self, ctx: commands.Context, page: int = 1):
"""Show the global credits leaderboard."""
async with ctx.typing():
leaderboard_data = await self.get_all_balances()
if not leaderboard_data:
return await ctx.send("No users have 10,000 or more credits!")
items_per_page = 10
chunks = [leaderboard_data[i:i + items_per_page]
for i in range(0, len(leaderboard_data), items_per_page)]
if not chunks:
return await ctx.send("No users have 10,000 or more credits!")
# Send warning message first
await ctx.send("⚠️ **Note:** The global leaderboard may be slightly inaccurate due to synchronization delays.")
embeds = []
for page_num, entries in enumerate(chunks, 1):
embed = discord.Embed(
title="🏆 Global Credits Leaderboard",
description="*Only showing users with 10,000+ credits*",
color=await ctx.embed_color()
)
description = []
start_pos = (page_num - 1) * items_per_page
for i, entry in enumerate(entries, start=start_pos + 1):
username = entry.get("username", "Unknown User")
user_id = entry.get("userId", "0")
credits = entry.get("points", 0)
description.append(
f"`{i}.` <@{user_id}> • **{humanize_number(credits)}** credits"
)
embed.description = embed.description + "\n\n" + "\n".join(description)
embed.set_footer(text=f"Page {page_num}/{len(chunks)} • Total Users: {len(leaderboard_data)}")
embeds.append(embed)
view = LeaderboardView(self, ctx, embeds)
view.message = await ctx.send(embed=embeds[0], view=view)
@globalboard.command(name="optout")
async def opt_out(self, ctx: commands.Context):
"""Opt out of the global leaderboard. Your credits will no longer be visible."""
user_settings = self.config.user(ctx.author)
current_status = await user_settings.opted_out()
if current_status:
await ctx.send("You are already opted out of the global leaderboard.")
return
await user_settings.opted_out.set(True)
# Update API with new opt-out status
credits = await self.get_user_credits(ctx.author)
await self._try_api_update(str(ctx.author.id), str(ctx.author), credits)
await ctx.send("You have been opted out of the global leaderboard. Your credits will no longer be visible.")
@globalboard.command(name="optin")
async def opt_in(self, ctx: commands.Context):
"""Opt back into the global leaderboard."""
user_settings = self.config.user(ctx.author)
current_status = await user_settings.opted_out()
if not current_status:
await ctx.send("You are already opted into the global leaderboard.")
return
await user_settings.opted_out.set(False)
# Update API with new opt-in status
credits = await self.get_user_credits(ctx.author)
await self._try_api_update(str(ctx.author.id), str(ctx.author), credits)
await ctx.send("You have been opted back into the global leaderboard. Your credits will now be visible.")
@globalboard.command(name="credits")
async def check_credits(self, ctx: commands.Context, member: commands.MemberConverter = None):
"""Check your credits or another member's credits."""
member = member or ctx.author
# Check if the target user has opted out
if await self.config.user(member).opted_out():
if member == ctx.author:
await ctx.send("You have opted out of the global leaderboard. Use `[p]globalboard optin` to show your credits again.")
else:
await ctx.send(f"{member.display_name} has opted out of the global leaderboard.")
return
credits = await self.get_user_credits(member)
# Debug logging
log.info(f"Credits check for {member}: {credits} credits")
# Get user's rank
leaderboard_data = await self.get_all_balances()
rank = next(
(i for i, entry in enumerate(leaderboard_data, 1)
if entry.get("userId") == str(member.id)),
None
)
embed = discord.Embed(
title="🏆 Global Credits",
color=await ctx.embed_color()
)
if credits >= 10000:
embed.description = (
f"**User:** {member.mention}\n"
f"**Credits:** {humanize_number(credits)}\n"
f"**Rank:** #{humanize_number(rank) if rank else 'Unranked'}\n"
f"**ID:** {member.id}"
)
else:
embed.description = f"{member.mention} has less than 10,000 credits ({humanize_number(credits)} total)"
embed.set_thumbnail(url=member.display_avatar.url)
await ctx.send(embed=embed)
@globalboard.command(name="resync")
@commands.is_owner()
async def resync_leaderboard(self, ctx: commands.Context):
"""Force a resync of all users' credits with the API (if configured)."""
if not self.session or not self.admin_secret or not self.admin_secret.get("admin_secret"):
await ctx.send("API is not configured.")
return
async with ctx.typing():
message = await ctx.send("🔄 Starting global leaderboard resync...")
try:
success_count, fail_count = await self._perform_full_sync()
embed = discord.Embed(
title="🔄 Global Leaderboard Resync Complete",
description=(
f"Successfully updated: **{success_count}** users\n"
f"Failed to update: **{fail_count}** users"
),
color=await ctx.embed_color(),
timestamp=datetime.now(timezone.utc)
)
await message.edit(content=None, embed=embed)
except Exception as e:
await message.edit(content=f"❌ Error during resync: {str(e)}")
@commands.Cog.listener()
async def on_bank_update(self, member: discord.Member, before: int, after: int):
"""Update API when a member's bank balance changes."""
if member.bot:
return
total_credits = await self.get_user_credits(member)
await self._try_api_update(str(member.id), str(member), total_credits)
@globalboard.command(name="cleanup")
@commands.is_owner()
async def cleanup_leaderboard(self, ctx: commands.Context):
"""Clean up old leaderboard entries (older than 30 days)."""
if not self.session or not self.admin_secret or not self.admin_secret.get("admin_secret"):
await ctx.send("API is not configured.")
return
try:
async with self.session.delete(
f"{self.api_base_url}/leaderboard",
headers={"Authorization": self.admin_secret["admin_secret"]}
) as resp:
if resp.status == 200:
data = await resp.json()
if data.get("success"):
await ctx.send(
f"Cleaned up {data['removedCount']} old entries. "
f"{data['remainingCount']} entries remaining."
)
else:
await ctx.send("Failed to clean up leaderboard.")
elif resp.status == 401:
await ctx.send("Unauthorized: Invalid admin secret")
else:
await ctx.send(f"Failed to clean up leaderboard: Status {resp.status}")
except Exception as e:
await ctx.send(f"Error cleaning up leaderboard: {e}")
def cog_unload(self):
"""Clean up when cog is unloaded."""
if self.session:
asyncio.create_task(self.session.close())
if self.auto_sync_task:
self.auto_sync_task.cancel()