Enhance RubyAPI cog with new features for user verification and compatibility management. Introduce cooldown settings for verifications, reward credits for successful verifications, and auto-verification for new members. Update commands to manage compatibility with other cogs and improve data handling for user verification history.
Some checks are pending
Run pre-commit / Run pre-commit (push) Waiting to run

This commit is contained in:
Valerie 2025-05-25 04:39:56 -04:00
parent 0b8483511b
commit 6efbd0d291

View file

@ -1,9 +1,12 @@
from typing import Optional
from typing import Optional, Dict, Any
import aiohttp
import discord
from redbot.core import commands, Config, checks
from redbot.core.bot import Red
from redbot.core.utils.chat_formatting import box
from redbot.core.utils.views import SimpleMenu
from redbot.core import bank
from datetime import datetime
class RubyAPI(commands.Cog):
@ -13,12 +16,20 @@ class RubyAPI(commands.Cog):
self.bot = bot
self.config = Config.get_conf(self, identifier=867530999, force_registration=True)
self.session = aiohttp.ClientSession()
self._last_verify_time: Dict[int, datetime] = {} # Cache for rate limiting
default_global = {
"interaction_url": "https://ruby.valerie.lol/api/verify-user", # Default interaction URL
"verify_url": "https://ruby.valerie.lol/api/verify-user", # Default verify URL
"enabled": True, # Enabled by default
"api_key": None
"interaction_url": "https://ruby.valerie.lol/api/verify-user",
"verify_url": "https://ruby.valerie.lol/api/verify-user",
"enabled": True,
"api_key": None,
"verify_cooldown": 60, # Cooldown in seconds
"reward_credits": 100, # Credits reward for verification
"compatible_cogs": {
"Economy": True,
"Leveler": True,
"CustomCom": True
}
}
self.config.register_global(**default_global)
@ -27,6 +38,24 @@ class RubyAPI(commands.Cog):
if self.session:
self.bot.loop.create_task(self.session.close())
async def red_delete_data_for_user(self, *, requester: str, user_id: int):
"""Handle data deletion requests from users."""
if requester not in ("discord_deleted_user", "owner", "user"):
return
# Clear verification cache for the user
self._last_verify_time.pop(user_id, None)
async def red_get_data_for_user(self, *, user_id: int):
"""Get a user's personal data."""
data = {"verify_history": {}}
# Add verification timestamp if exists
if user_id in self._last_verify_time:
data["verify_history"]["last_verify"] = self._last_verify_time[user_id].isoformat()
return data
async def cog_check(self, ctx: commands.Context) -> bool:
"""Cog-wide check to ensure only bot team members can use commands."""
return await self.bot.is_owner(ctx.author) or await self.bot.is_co_owner(ctx.author)
@ -36,79 +65,61 @@ class RubyAPI(commands.Cog):
"""Ruby API configuration commands. Bot team only."""
pass
@rubyapi.command(name="enable")
async def enable_integration(self, ctx: commands.Context, toggle: bool):
"""Enable or disable Ruby API integration globally."""
await self.config.enabled.set(toggle)
status = "enabled" if toggle else "disabled"
await ctx.send(f"Ruby API integration has been {status} globally.")
@rubyapi.command(name="compatability")
async def toggle_compatibility(self, ctx: commands.Context, cog_name: str, enabled: bool):
"""Toggle compatibility with other cogs."""
async with self.config.compatible_cogs() as compat:
if cog_name not in compat:
return await ctx.send(f"Unknown cog {cog_name}. Available cogs: {', '.join(compat.keys())}")
compat[cog_name] = enabled
await ctx.send(f"Compatibility with {cog_name} {'enabled' if enabled else 'disabled'}.")
@rubyapi.command(name="setkey")
async def set_api_key(self, ctx: commands.Context, api_key: str):
"""Set the API key for authentication."""
await self.config.api_key.set(api_key)
await ctx.send("API key has been set globally. I'll delete your message for security.")
try:
await ctx.message.delete()
except discord.HTTPException:
pass
@rubyapi.command(name="setcooldown")
async def set_cooldown(self, ctx: commands.Context, seconds: int):
"""Set the cooldown between verifications."""
await self.config.verify_cooldown.set(seconds)
await ctx.send(f"Verification cooldown set to {seconds} seconds.")
@rubyapi.command(name="setinteraction")
async def set_interaction_url(self, ctx: commands.Context, url: str):
"""Set the interaction endpoint URL."""
if not url.startswith(("http://", "https://")):
return await ctx.send("Please provide a valid URL starting with http:// or https://")
await self.config.interaction_url.set(url)
await ctx.send("Interaction endpoint URL has been updated globally.")
@rubyapi.command(name="setreward")
async def set_reward(self, ctx: commands.Context, credits: int):
"""Set the credit reward for successful verification."""
await self.config.reward_credits.set(credits)
await ctx.send(f"Verification reward set to {credits} credits.")
@rubyapi.command(name="setverify")
async def set_verify_url(self, ctx: commands.Context, url: str):
"""Set the verification endpoint URL."""
if not url.startswith(("http://", "https://")):
return await ctx.send("Please provide a valid URL starting with http:// or https://")
async def notify_verification(self, guild: discord.Guild, user: discord.Member, roles_added: list):
"""Notify other cogs about successful verification."""
compatible_cogs = await self.config.compatible_cogs()
await self.config.verify_url.set(url)
await ctx.send("Verification endpoint URL has been updated globally.")
if compatible_cogs.get("Economy", False):
reward_credits = await self.config.reward_credits()
try:
await bank.deposit_credits(user, reward_credits)
# You could send a DM to the user about the reward
except Exception as e:
print(f"Failed to reward credits: {e}")
@rubyapi.command(name="settings")
async def show_settings(self, ctx: commands.Context):
"""Show current Ruby API settings."""
config = await self.config.all()
enabled = "Yes" if config["enabled"] else "No"
interaction_url = config["interaction_url"]
verify_url = config["verify_url"]
has_api_key = "Yes" if config["api_key"] else "No"
message = (
"Ruby API Global Settings:\n"
f"Enabled: {enabled}\n"
f"API Key Set: {has_api_key}\n"
f"Interaction URL: {interaction_url}\n"
f"Verify URL: {verify_url}"
)
await ctx.send(box(message))
if compatible_cogs.get("Leveler", False):
leveler = self.bot.get_cog("Leveler")
if leveler:
# Example of how to integrate with Leveler cog
try:
await leveler.add_exp(user, guild, 100)
except Exception:
pass
async def _make_api_request(self, url: str, method: str = "GET", **kwargs):
"""Make an API request with proper headers and error handling."""
headers = kwargs.pop("headers", {})
if api_key := await self.config.api_key():
headers["Authorization"] = f"Bearer {api_key}"
try:
async with self.session.request(method, url, headers=headers, **kwargs) as resp:
if resp.status == 429: # Rate limited
retry_after = float(resp.headers.get("Retry-After", 5))
return {"error": "rate_limited", "retry_after": retry_after}
data = await resp.json()
return data
except aiohttp.ClientError as e:
return {"error": f"API request failed: {str(e)}"}
if compatible_cogs.get("CustomCom", False):
customcom = self.bot.get_cog("CustomCom")
if customcom:
# Trigger any custom commands tagged for verification
try:
ctx = await self.bot.get_context(user.last_message) if user.last_message else None
if ctx:
await customcom.trigger_custom_commands(ctx, "verification")
except Exception:
pass
@commands.command()
@commands.cooldown(1, 60, commands.BucketType.user)
async def verifyuser(self, ctx: commands.Context, user: Optional[discord.Member] = None):
"""Verify a user's linked roles. Bot team only."""
if not await self.config.enabled():
@ -121,6 +132,15 @@ class RubyAPI(commands.Cog):
if not api_key:
return await ctx.send("API key has not been set. Please set it using `[p]rubyapi setkey`")
# Check cooldown
cooldown = await self.config.verify_cooldown()
last_verify = self._last_verify_time.get(user.id)
if last_verify:
time_diff = (datetime.now() - last_verify).total_seconds()
if time_diff < cooldown:
remaining = int(cooldown - time_diff)
return await ctx.send(f"Please wait {remaining} seconds before verifying again.")
payload = {
"user_id": str(user.id),
"guild_id": str(ctx.guild.id)
@ -146,14 +166,46 @@ class RubyAPI(commands.Cog):
try:
await user.add_roles(*roles_to_add, reason="Linked roles verification")
role_names = ", ".join(role.name for role in roles_to_add)
await ctx.send(f"Verified {user.mention}! Added roles: {role_names}")
# Update verification timestamp
self._last_verify_time[user.id] = datetime.now()
# Notify other cogs
await self.notify_verification(ctx.guild, user, roles_to_add)
# Create a nice embed for the response
embed = discord.Embed(
title="Verification Successful!",
description=f"Verified {user.mention}",
color=discord.Color.green()
)
embed.add_field(name="Roles Added", value=role_names)
if await self.config.compatible_cogs.get_raw("Economy"):
reward = await self.config.reward_credits()
embed.add_field(name="Reward", value=f"{reward} credits")
await ctx.send(embed=embed)
except discord.HTTPException as e:
await ctx.send(f"Error adding roles: {str(e)}")
else:
await ctx.send(f"Verified {user.mention}, but no roles needed to be added.")
else:
await ctx.send(f"Could not verify {user.mention}. Please make sure their accounts are properly linked.")
@commands.Cog.listener()
async def on_member_join(self, member: discord.Member):
"""Auto-verify users when they join if enabled."""
if not await self.config.enabled():
return
try:
ctx = await self.bot.get_context(member.last_message) if member.last_message else None
if ctx:
await self.verifyuser(ctx, member)
except Exception:
pass
@commands.Cog.listener()
async def on_interaction(self, interaction: discord.Interaction):
"""Handle incoming Discord interactions."""
@ -184,8 +236,28 @@ class RubyAPI(commands.Cog):
json=payload
)
# Handle the API response if needed
if "error" in result:
# Log the error but don't respond to the interaction
# as it might have been handled elsewhere
print(f"Error forwarding interaction: {result['error']}")
print(f"Error forwarding interaction: {result['error']}")
@commands.Cog.listener()
async def on_command_error(self, ctx: commands.Context, error: commands.CommandError):
"""Handle command errors."""
if isinstance(error, commands.CommandOnCooldown):
await ctx.send(f"Please wait {int(error.retry_after)} seconds before using this command again.")
async def _make_api_request(self, url: str, method: str = "GET", **kwargs):
"""Make an API request with proper headers and error handling."""
headers = kwargs.pop("headers", {})
if api_key := await self.config.api_key():
headers["Authorization"] = f"Bearer {api_key}"
try:
async with self.session.request(method, url, headers=headers, **kwargs) as resp:
if resp.status == 429: # Rate limited
retry_after = float(resp.headers.get("Retry-After", 5))
return {"error": "rate_limited", "retry_after": retry_after}
data = await resp.json()
return data
except aiohttp.ClientError as e:
return {"error": f"API request failed: {str(e)}"}