Ruby-Cogs/leaderboard/leaderboard.py
Valerie 02d67baeef
Some checks are pending
Run pre-commit / Run pre-commit (push) Waiting to run
Add automatic leaderboard sync task and enhance user credit retrieval in Leaderboard cog
Implement a background task for automatic leaderboard synchronization every 6 hours, improving data consistency. Update user credit retrieval to first check the current server's balance before aggregating from other servers, ensuring accurate total credits. Enhance error handling and logging for better debugging and user feedback during API interactions.
2025-05-26 05:55:32 -04:00

492 lines
No EOL
20 KiB
Python

from redbot.core import commands, Config, bank
from redbot.core.bot import Red
from redbot.core.utils.chat_formatting import box, pagify, humanize_number
import discord
from discord.ui import Button, View
import aiohttp
import asyncio
import logging
from datetime import datetime, timezone
from typing import Dict, Optional, List
log = logging.getLogger("red.ruby.leaderboard")
class LeaderboardView(View):
def __init__(self, cog, ctx, pages: List[discord.Embed], timeout: float = 180.0):
super().__init__(timeout=timeout)
self.cog = cog
self.ctx = ctx
self.pages = pages
self.current_page = 0
self.message = None
# Add buttons
self.first_page.disabled = True
self.prev_page.disabled = True
if len(self.pages) == 1:
self.next_page.disabled = True
self.last_page.disabled = True
@discord.ui.button(label="", style=discord.ButtonStyle.gray)
async def first_page(self, interaction: discord.Interaction, button: discord.ui.Button):
self.current_page = 0
await self.update_page(interaction)
@discord.ui.button(label="Previous", style=discord.ButtonStyle.blurple)
async def prev_page(self, interaction: discord.Interaction, button: discord.ui.Button):
self.current_page = max(0, self.current_page - 1)
await self.update_page(interaction)
@discord.ui.button(label="Next", style=discord.ButtonStyle.blurple)
async def next_page(self, interaction: discord.Interaction, button: discord.ui.Button):
self.current_page = min(len(self.pages) - 1, self.current_page + 1)
await self.update_page(interaction)
@discord.ui.button(label="", style=discord.ButtonStyle.gray)
async def last_page(self, interaction: discord.Interaction, button: discord.ui.Button):
self.current_page = len(self.pages) - 1
await self.update_page(interaction)
async def update_page(self, interaction: discord.Interaction):
# Update button states
self.first_page.disabled = self.current_page == 0
self.prev_page.disabled = self.current_page == 0
self.next_page.disabled = self.current_page == len(self.pages) - 1
self.last_page.disabled = self.current_page == len(self.pages) - 1
await interaction.response.edit_message(embed=self.pages[self.current_page], view=self)
async def interaction_check(self, interaction: discord.Interaction) -> bool:
if interaction.user.id == self.ctx.author.id:
return True
await interaction.response.send_message("This menu is not for you!", ephemeral=True)
return False
async def on_timeout(self):
try:
for item in self.children:
item.disabled = True
await self.message.edit(view=self)
except discord.HTTPException:
pass
class Leaderboard(commands.Cog):
"""Global leaderboard system for Ruby."""
def __init__(self, bot: Red):
self.bot = bot
self.config = Config.get_conf(self, identifier=867530999, force_registration=True)
# Optional API integration
self.session = None
self.api_base_url = "https://ruby.valerie.lol/api"
self.admin_secret = None
# Add task for auto-sync
self.auto_sync_task = None
default_guild = {
"min_message_length": 5,
"cooldown": 60,
}
default_user = {
"opted_out": False
}
self.config.register_guild(**default_guild)
self.config.register_user(**default_user)
async def initialize(self):
"""Initialize optional API connection and start auto-sync task."""
try:
self.admin_secret = await self.bot.get_shared_api_tokens("ruby_api")
if self.admin_secret.get("admin_secret"):
self.session = aiohttp.ClientSession()
# Test API connection
async with self.session.get(
f"{self.api_base_url}/leaderboard",
headers={"Authorization": self.admin_secret["admin_secret"]}
) as resp:
if resp.status != 200:
log.error(f"Failed to connect to API: Status {resp.status}")
else:
# Start auto-sync task if API connection successful
self.auto_sync_task = self.bot.loop.create_task(self._auto_sync_task())
log.info("Started automatic leaderboard sync task")
except Exception as e:
log.error(f"Failed to initialize API connection: {e}")
async def _auto_sync_task(self):
"""Task to automatically sync leaderboard every 6 hours."""
await self.bot.wait_until_ready()
while True:
try:
await self._perform_full_sync()
log.info("Completed automatic leaderboard sync")
await asyncio.sleep(21600) # 6 hours in seconds
except asyncio.CancelledError:
break
except Exception as e:
log.error(f"Error in automatic sync task: {e}")
await asyncio.sleep(300) # Wait 5 minutes before retrying on error
async def _perform_full_sync(self) -> tuple[int, int]:
"""Perform a full sync of all users to the API.
Returns tuple of (success_count, fail_count)"""
success_count = 0
fail_count = 0
processed_users = set()
for guild in self.bot.guilds:
for member in guild.members:
if member.bot or member.id in processed_users:
continue
try:
# Get total credits across all guilds
credits = await self.get_user_credits(member)
# Update API
if await self._try_api_update(str(member.id), str(member), credits):
success_count += 1
else:
fail_count += 1
processed_users.add(member.id)
except Exception as e:
log.error(f"Error syncing user {member} ({member.id}): {e}")
fail_count += 1
return success_count, fail_count
async def cog_load(self):
"""Initialize the cog when it's loaded."""
await self.initialize()
async def get_user_credits(self, user: discord.Member) -> int:
"""Get a user's total credits from all servers."""
total_credits = 0
try:
# Get credits from the user's current server first
total_credits = await bank.get_balance(user)
# Then add credits from all other servers where they are a member
for guild in self.bot.guilds:
if guild != user.guild and user in guild.members: # Skip current guild since we already got those credits
try:
guild_credits = await bank.get_balance(user, guild)
total_credits += guild_credits
except Exception as e:
log.error(f"Error getting bank balance for {user} in {guild}: {e}")
continue
except Exception as e:
log.error(f"Error getting total credits for {user}: {e}")
return total_credits
async def get_all_balances(self) -> List[dict]:
"""Get all users' credit balances across all servers."""
all_users = {}
min_credits = 10000 # Minimum credits to show on leaderboard - matches API's MIN_CREDITS
# First get all unique members across all guilds
all_members = set()
for guild in self.bot.guilds:
for member in guild.members:
if not member.bot:
all_members.add(member)
# Now get total credits for each unique member
for member in all_members:
# Skip opted-out users
if await self.config.user(member).opted_out():
continue
try:
total_credits = await self.get_user_credits(member)
if total_credits >= min_credits:
all_users[member.id] = {
"userId": str(member.id),
"username": str(member),
"points": total_credits
}
except Exception as e:
log.error(f"Error getting balance for {member}: {e}")
continue
# Sort by total credits
sorted_users = sorted(
all_users.values(),
key=lambda x: x["points"],
reverse=True
)
return sorted_users
async def _try_api_update(self, user_id: str, username: str, credits: int) -> bool:
"""Try to update the API if configured."""
if not self.session or not self.admin_secret or not self.admin_secret.get("admin_secret"):
return False
try:
if credits < 0:
credits = 0 # API doesn't accept negative values
# Get opt-out status
opted_out = await self.config.user_from_id(int(user_id)).opted_out()
async with self.session.post(
f"{self.api_base_url}/leaderboard",
headers={
"Authorization": self.admin_secret["admin_secret"],
"Content-Type": "application/json"
},
json={
"userId": str(user_id),
"username": str(username),
"points": credits,
"optedOut": opted_out
}
) as resp:
if resp.status == 200:
data = await resp.json()
if data.get("success"):
log.info(f"Updated leaderboard for {username}: {credits} credits (opted_out: {opted_out})")
return True
else:
log.error(f"API update failed: {data.get('error', 'Unknown error')}")
elif resp.status == 401:
log.error("Unauthorized: Invalid admin secret")
else:
log.error(f"API update failed: Status {resp.status}")
return False
except Exception as e:
log.error(f"API update failed: {e}")
return False
async def _get_api_leaderboard(self) -> Optional[list]:
"""Fetch the leaderboard from the API."""
if not self.session or not self.admin_secret or not self.admin_secret.get("admin_secret"):
return None
try:
async with self.session.get(
f"{self.api_base_url}/leaderboard",
headers={"Authorization": self.admin_secret["admin_secret"]}
) as resp:
if resp.status == 200:
data = await resp.json()
if "leaderboard" in data:
return data["leaderboard"]
log.error("Invalid API response format")
elif resp.status == 401:
log.error("Unauthorized: Invalid admin secret")
else:
log.error(f"Failed to fetch leaderboard: Status {resp.status}")
return None
except Exception as e:
log.error(f"Error fetching leaderboard: {e}")
return None
@commands.group(name="globalboard", aliases=["glb"])
async def globalboard(self, ctx: commands.Context):
"""Global leaderboard commands."""
if ctx.invoked_subcommand is None:
return
@globalboard.command(name="show")
async def show_leaderboard(self, ctx: commands.Context, page: int = 1):
"""Show the global credits leaderboard."""
async with ctx.typing():
leaderboard_data = await self.get_all_balances()
if not leaderboard_data:
return await ctx.send("No users have 10,000 or more credits!")
items_per_page = 10
chunks = [leaderboard_data[i:i + items_per_page]
for i in range(0, len(leaderboard_data), items_per_page)]
if not chunks:
return await ctx.send("No users have 10,000 or more credits!")
embeds = []
for page_num, entries in enumerate(chunks, 1):
embed = discord.Embed(
title="🏆 Global Credits Leaderboard",
description="*Only showing users with 10,000+ credits*",
color=await ctx.embed_color()
)
description = []
start_pos = (page_num - 1) * items_per_page
for i, entry in enumerate(entries, start=start_pos + 1):
username = entry.get("username", "Unknown User")
user_id = entry.get("userId", "0")
credits = entry.get("points", 0)
description.append(
f"`{i}.` <@{user_id}> • **{humanize_number(credits)}** credits"
)
embed.description = embed.description + "\n\n" + "\n".join(description)
embed.set_footer(text=f"Page {page_num}/{len(chunks)} • Total Users: {len(leaderboard_data)}")
embeds.append(embed)
view = LeaderboardView(self, ctx, embeds)
view.message = await ctx.send(embed=embeds[0], view=view)
@globalboard.command(name="optout")
async def opt_out(self, ctx: commands.Context):
"""Opt out of the global leaderboard. Your credits will no longer be visible."""
user_settings = self.config.user(ctx.author)
current_status = await user_settings.opted_out()
if current_status:
await ctx.send("You are already opted out of the global leaderboard.")
return
await user_settings.opted_out.set(True)
# Update API with new opt-out status
credits = await self.get_user_credits(ctx.author)
await self._try_api_update(str(ctx.author.id), str(ctx.author), credits)
await ctx.send("You have been opted out of the global leaderboard. Your credits will no longer be visible.")
@globalboard.command(name="optin")
async def opt_in(self, ctx: commands.Context):
"""Opt back into the global leaderboard."""
user_settings = self.config.user(ctx.author)
current_status = await user_settings.opted_out()
if not current_status:
await ctx.send("You are already opted into the global leaderboard.")
return
await user_settings.opted_out.set(False)
# Update API with new opt-in status
credits = await self.get_user_credits(ctx.author)
await self._try_api_update(str(ctx.author.id), str(ctx.author), credits)
await ctx.send("You have been opted back into the global leaderboard. Your credits will now be visible.")
@globalboard.command(name="credits")
async def check_credits(self, ctx: commands.Context, member: commands.MemberConverter = None):
"""Check your credits or another member's credits."""
member = member or ctx.author
# Check if the target user has opted out
if await self.config.user(member).opted_out():
if member == ctx.author:
await ctx.send("You have opted out of the global leaderboard. Use `[p]globalboard optin` to show your credits again.")
else:
await ctx.send(f"{member.display_name} has opted out of the global leaderboard.")
return
credits = await self.get_user_credits(member)
# Get user's rank
leaderboard_data = await self.get_all_balances()
rank = next(
(i for i, entry in enumerate(leaderboard_data, 1)
if entry.get("userId") == str(member.id)),
None
)
embed = discord.Embed(
title="🏆 Global Credits",
color=await ctx.embed_color()
)
if credits >= 10000:
embed.description = (
f"**User:** <@{member.id}>\n"
f"**Credits:** {humanize_number(credits)}\n"
f"**Rank:** #{humanize_number(rank) if rank else 'Unranked'}\n"
f"**ID:** {member.id}"
)
else:
embed.description = f"<@{member.id}> has less than 10,000 credits."
embed.set_thumbnail(url=member.display_avatar.url)
await ctx.send(embed=embed)
@globalboard.command(name="resync")
@commands.is_owner()
async def resync_leaderboard(self, ctx: commands.Context):
"""Force a resync of all users' credits with the API (if configured)."""
if not self.session or not self.admin_secret or not self.admin_secret.get("admin_secret"):
await ctx.send("API is not configured.")
return
async with ctx.typing():
message = await ctx.send("🔄 Starting global leaderboard resync...")
try:
success_count, fail_count = await self._perform_full_sync()
embed = discord.Embed(
title="🔄 Global Leaderboard Resync Complete",
description=(
f"Successfully updated: **{success_count}** users\n"
f"Failed to update: **{fail_count}** users"
),
color=await ctx.embed_color(),
timestamp=datetime.now(timezone.utc)
)
await message.edit(content=None, embed=embed)
except Exception as e:
await message.edit(content=f"❌ Error during resync: {str(e)}")
@commands.Cog.listener()
async def on_bank_update(self, member: discord.Member, before: int, after: int):
"""Update API when a member's bank balance changes."""
if member.bot:
return
total_credits = await self.get_user_credits(member)
await self._try_api_update(str(member.id), str(member), total_credits)
@globalboard.command(name="cleanup")
@commands.is_owner()
async def cleanup_leaderboard(self, ctx: commands.Context):
"""Clean up old leaderboard entries (older than 30 days)."""
if not self.session or not self.admin_secret or not self.admin_secret.get("admin_secret"):
await ctx.send("API is not configured.")
return
try:
async with self.session.delete(
f"{self.api_base_url}/leaderboard",
headers={"Authorization": self.admin_secret["admin_secret"]}
) as resp:
if resp.status == 200:
data = await resp.json()
if data.get("success"):
await ctx.send(
f"Cleaned up {data['removedCount']} old entries. "
f"{data['remainingCount']} entries remaining."
)
else:
await ctx.send("Failed to clean up leaderboard.")
elif resp.status == 401:
await ctx.send("Unauthorized: Invalid admin secret")
else:
await ctx.send(f"Failed to clean up leaderboard: Status {resp.status}")
except Exception as e:
await ctx.send(f"Error cleaning up leaderboard: {e}")
def cog_unload(self):
"""Clean up when cog is unloaded."""
if self.session:
asyncio.create_task(self.session.close())
if self.auto_sync_task:
self.auto_sync_task.cancel()