Ruby-Cogs/rubyapi/rubyapi.py

263 lines
No EOL
11 KiB
Python

from typing import Optional, Dict, Any
import aiohttp
import discord
from redbot.core import commands, Config, checks
from redbot.core.bot import Red
from redbot.core.utils.chat_formatting import box
from redbot.core.utils.views import SimpleMenu
from redbot.core import bank
from datetime import datetime
class RubyAPI(commands.Cog):
"""Ruby API integration for interactions and linked roles verification."""
def __init__(self, bot: Red):
self.bot = bot
self.config = Config.get_conf(self, identifier=867530999, force_registration=True)
self.session = aiohttp.ClientSession()
self._last_verify_time: Dict[int, datetime] = {} # Cache for rate limiting
default_global = {
"interaction_url": "https://ruby.valerie.lol/api/verify-user",
"verify_url": "https://ruby.valerie.lol/api/verify-user",
"enabled": True,
"api_key": None,
"verify_cooldown": 60, # Cooldown in seconds
"reward_credits": 100, # Credits reward for verification
"compatible_cogs": {
"Economy": True,
"Leveler": True,
"CustomCom": True
}
}
self.config.register_global(**default_global)
def cog_unload(self):
if self.session:
self.bot.loop.create_task(self.session.close())
async def red_delete_data_for_user(self, *, requester: str, user_id: int):
"""Handle data deletion requests from users."""
if requester not in ("discord_deleted_user", "owner", "user"):
return
# Clear verification cache for the user
self._last_verify_time.pop(user_id, None)
async def red_get_data_for_user(self, *, user_id: int):
"""Get a user's personal data."""
data = {"verify_history": {}}
# Add verification timestamp if exists
if user_id in self._last_verify_time:
data["verify_history"]["last_verify"] = self._last_verify_time[user_id].isoformat()
return data
async def cog_check(self, ctx: commands.Context) -> bool:
"""Cog-wide check to ensure only bot team members can use commands."""
return await self.bot.is_owner(ctx.author) or await self.bot.is_co_owner(ctx.author)
@commands.group(name="rubyapi")
async def rubyapi(self, ctx: commands.Context):
"""Ruby API configuration commands. Bot team only."""
pass
@rubyapi.command(name="compatability")
async def toggle_compatibility(self, ctx: commands.Context, cog_name: str, enabled: bool):
"""Toggle compatibility with other cogs."""
async with self.config.compatible_cogs() as compat:
if cog_name not in compat:
return await ctx.send(f"Unknown cog {cog_name}. Available cogs: {', '.join(compat.keys())}")
compat[cog_name] = enabled
await ctx.send(f"Compatibility with {cog_name} {'enabled' if enabled else 'disabled'}.")
@rubyapi.command(name="setcooldown")
async def set_cooldown(self, ctx: commands.Context, seconds: int):
"""Set the cooldown between verifications."""
await self.config.verify_cooldown.set(seconds)
await ctx.send(f"Verification cooldown set to {seconds} seconds.")
@rubyapi.command(name="setreward")
async def set_reward(self, ctx: commands.Context, credits: int):
"""Set the credit reward for successful verification."""
await self.config.reward_credits.set(credits)
await ctx.send(f"Verification reward set to {credits} credits.")
async def notify_verification(self, guild: discord.Guild, user: discord.Member, roles_added: list):
"""Notify other cogs about successful verification."""
compatible_cogs = await self.config.compatible_cogs()
if compatible_cogs.get("Economy", False):
reward_credits = await self.config.reward_credits()
try:
await bank.deposit_credits(user, reward_credits)
# You could send a DM to the user about the reward
except Exception as e:
print(f"Failed to reward credits: {e}")
if compatible_cogs.get("Leveler", False):
leveler = self.bot.get_cog("Leveler")
if leveler:
# Example of how to integrate with Leveler cog
try:
await leveler.add_exp(user, guild, 100)
except Exception:
pass
if compatible_cogs.get("CustomCom", False):
customcom = self.bot.get_cog("CustomCom")
if customcom:
# Trigger any custom commands tagged for verification
try:
ctx = await self.bot.get_context(user.last_message) if user.last_message else None
if ctx:
await customcom.trigger_custom_commands(ctx, "verification")
except Exception:
pass
@commands.command()
@commands.cooldown(1, 60, commands.BucketType.user)
async def verifyuser(self, ctx: commands.Context, user: Optional[discord.Member] = None):
"""Verify a user's linked roles. Bot team only."""
if not await self.config.enabled():
return await ctx.send("Ruby API integration is not enabled globally.")
user = user or ctx.author
verify_url = await self.config.verify_url()
api_key = await self.config.api_key()
if not api_key:
return await ctx.send("API key has not been set. Please set it using `[p]rubyapi setkey`")
# Check cooldown
cooldown = await self.config.verify_cooldown()
last_verify = self._last_verify_time.get(user.id)
if last_verify:
time_diff = (datetime.now() - last_verify).total_seconds()
if time_diff < cooldown:
remaining = int(cooldown - time_diff)
return await ctx.send(f"Please wait {remaining} seconds before verifying again.")
payload = {
"user_id": str(user.id),
"guild_id": str(ctx.guild.id)
}
async with ctx.typing():
result = await self._make_api_request(
verify_url,
method="POST",
json=payload
)
if "error" in result:
return await ctx.send(f"Error verifying user: {result['error']}")
if result.get("verified", False):
roles_to_add = []
for role_id in result.get("roles", []):
if role := ctx.guild.get_role(int(role_id)):
roles_to_add.append(role)
if roles_to_add:
try:
await user.add_roles(*roles_to_add, reason="Linked roles verification")
role_names = ", ".join(role.name for role in roles_to_add)
# Update verification timestamp
self._last_verify_time[user.id] = datetime.now()
# Notify other cogs
await self.notify_verification(ctx.guild, user, roles_to_add)
# Create a nice embed for the response
embed = discord.Embed(
title="Verification Successful!",
description=f"Verified {user.mention}",
color=discord.Color.green()
)
embed.add_field(name="Roles Added", value=role_names)
if await self.config.compatible_cogs.get_raw("Economy"):
reward = await self.config.reward_credits()
embed.add_field(name="Reward", value=f"{reward} credits")
await ctx.send(embed=embed)
except discord.HTTPException as e:
await ctx.send(f"Error adding roles: {str(e)}")
else:
await ctx.send(f"Verified {user.mention}, but no roles needed to be added.")
else:
await ctx.send(f"Could not verify {user.mention}. Please make sure their accounts are properly linked.")
@commands.Cog.listener()
async def on_member_join(self, member: discord.Member):
"""Auto-verify users when they join if enabled."""
if not await self.config.enabled():
return
try:
ctx = await self.bot.get_context(member.last_message) if member.last_message else None
if ctx:
await self.verifyuser(ctx, member)
except Exception:
pass
@commands.Cog.listener()
async def on_interaction(self, interaction: discord.Interaction):
"""Handle incoming Discord interactions."""
if not await self.config.enabled():
return
interaction_url = await self.config.interaction_url()
# Forward the interaction to the API
payload = {
"type": interaction.type.value,
"guild_id": str(interaction.guild_id),
"channel_id": str(interaction.channel_id),
"data": interaction.data,
"member": {
"user": {
"id": str(interaction.user.id),
"username": interaction.user.name,
"discriminator": interaction.user.discriminator,
},
"roles": [str(role.id) for role in interaction.user.roles],
} if interaction.user else None
}
result = await self._make_api_request(
interaction_url,
method="POST",
json=payload
)
if "error" in result:
print(f"Error forwarding interaction: {result['error']}")
@commands.Cog.listener()
async def on_command_error(self, ctx: commands.Context, error: commands.CommandError):
"""Handle command errors."""
if isinstance(error, commands.CommandOnCooldown):
await ctx.send(f"Please wait {int(error.retry_after)} seconds before using this command again.")
async def _make_api_request(self, url: str, method: str = "GET", **kwargs):
"""Make an API request with proper headers and error handling."""
headers = kwargs.pop("headers", {})
if api_key := await self.config.api_key():
headers["Authorization"] = f"Bearer {api_key}"
try:
async with self.session.request(method, url, headers=headers, **kwargs) as resp:
if resp.status == 429: # Rate limited
retry_after = float(resp.headers.get("Retry-After", 5))
return {"error": "rate_limited", "retry_after": retry_after}
data = await resp.json()
return data
except aiohttp.ClientError as e:
return {"error": f"API request failed: {str(e)}"}