263 lines
No EOL
11 KiB
Python
263 lines
No EOL
11 KiB
Python
from typing import Optional, Dict, Any
|
|
import aiohttp
|
|
import discord
|
|
from redbot.core import commands, Config, checks
|
|
from redbot.core.bot import Red
|
|
from redbot.core.utils.chat_formatting import box
|
|
from redbot.core.utils.views import SimpleMenu
|
|
from redbot.core import bank
|
|
from datetime import datetime
|
|
|
|
|
|
class RubyAPI(commands.Cog):
|
|
"""Ruby API integration for interactions and linked roles verification."""
|
|
|
|
def __init__(self, bot: Red):
|
|
self.bot = bot
|
|
self.config = Config.get_conf(self, identifier=867530999, force_registration=True)
|
|
self.session = aiohttp.ClientSession()
|
|
self._last_verify_time: Dict[int, datetime] = {} # Cache for rate limiting
|
|
|
|
default_global = {
|
|
"interaction_url": "https://ruby.valerie.lol/api/verify-user",
|
|
"verify_url": "https://ruby.valerie.lol/api/verify-user",
|
|
"enabled": True,
|
|
"api_key": None,
|
|
"verify_cooldown": 60, # Cooldown in seconds
|
|
"reward_credits": 100, # Credits reward for verification
|
|
"compatible_cogs": {
|
|
"Economy": True,
|
|
"Leveler": True,
|
|
"CustomCom": True
|
|
}
|
|
}
|
|
|
|
self.config.register_global(**default_global)
|
|
|
|
def cog_unload(self):
|
|
if self.session:
|
|
self.bot.loop.create_task(self.session.close())
|
|
|
|
async def red_delete_data_for_user(self, *, requester: str, user_id: int):
|
|
"""Handle data deletion requests from users."""
|
|
if requester not in ("discord_deleted_user", "owner", "user"):
|
|
return
|
|
|
|
# Clear verification cache for the user
|
|
self._last_verify_time.pop(user_id, None)
|
|
|
|
async def red_get_data_for_user(self, *, user_id: int):
|
|
"""Get a user's personal data."""
|
|
data = {"verify_history": {}}
|
|
|
|
# Add verification timestamp if exists
|
|
if user_id in self._last_verify_time:
|
|
data["verify_history"]["last_verify"] = self._last_verify_time[user_id].isoformat()
|
|
|
|
return data
|
|
|
|
async def cog_check(self, ctx: commands.Context) -> bool:
|
|
"""Cog-wide check to ensure only bot team members can use commands."""
|
|
return await self.bot.is_owner(ctx.author) or await self.bot.is_co_owner(ctx.author)
|
|
|
|
@commands.group(name="rubyapi")
|
|
async def rubyapi(self, ctx: commands.Context):
|
|
"""Ruby API configuration commands. Bot team only."""
|
|
pass
|
|
|
|
@rubyapi.command(name="compatability")
|
|
async def toggle_compatibility(self, ctx: commands.Context, cog_name: str, enabled: bool):
|
|
"""Toggle compatibility with other cogs."""
|
|
async with self.config.compatible_cogs() as compat:
|
|
if cog_name not in compat:
|
|
return await ctx.send(f"Unknown cog {cog_name}. Available cogs: {', '.join(compat.keys())}")
|
|
compat[cog_name] = enabled
|
|
await ctx.send(f"Compatibility with {cog_name} {'enabled' if enabled else 'disabled'}.")
|
|
|
|
@rubyapi.command(name="setcooldown")
|
|
async def set_cooldown(self, ctx: commands.Context, seconds: int):
|
|
"""Set the cooldown between verifications."""
|
|
await self.config.verify_cooldown.set(seconds)
|
|
await ctx.send(f"Verification cooldown set to {seconds} seconds.")
|
|
|
|
@rubyapi.command(name="setreward")
|
|
async def set_reward(self, ctx: commands.Context, credits: int):
|
|
"""Set the credit reward for successful verification."""
|
|
await self.config.reward_credits.set(credits)
|
|
await ctx.send(f"Verification reward set to {credits} credits.")
|
|
|
|
async def notify_verification(self, guild: discord.Guild, user: discord.Member, roles_added: list):
|
|
"""Notify other cogs about successful verification."""
|
|
compatible_cogs = await self.config.compatible_cogs()
|
|
|
|
if compatible_cogs.get("Economy", False):
|
|
reward_credits = await self.config.reward_credits()
|
|
try:
|
|
await bank.deposit_credits(user, reward_credits)
|
|
# You could send a DM to the user about the reward
|
|
except Exception as e:
|
|
print(f"Failed to reward credits: {e}")
|
|
|
|
if compatible_cogs.get("Leveler", False):
|
|
leveler = self.bot.get_cog("Leveler")
|
|
if leveler:
|
|
# Example of how to integrate with Leveler cog
|
|
try:
|
|
await leveler.add_exp(user, guild, 100)
|
|
except Exception:
|
|
pass
|
|
|
|
if compatible_cogs.get("CustomCom", False):
|
|
customcom = self.bot.get_cog("CustomCom")
|
|
if customcom:
|
|
# Trigger any custom commands tagged for verification
|
|
try:
|
|
ctx = await self.bot.get_context(user.last_message) if user.last_message else None
|
|
if ctx:
|
|
await customcom.trigger_custom_commands(ctx, "verification")
|
|
except Exception:
|
|
pass
|
|
|
|
@commands.command()
|
|
@commands.cooldown(1, 60, commands.BucketType.user)
|
|
async def verifyuser(self, ctx: commands.Context, user: Optional[discord.Member] = None):
|
|
"""Verify a user's linked roles. Bot team only."""
|
|
if not await self.config.enabled():
|
|
return await ctx.send("Ruby API integration is not enabled globally.")
|
|
|
|
user = user or ctx.author
|
|
verify_url = await self.config.verify_url()
|
|
api_key = await self.config.api_key()
|
|
|
|
if not api_key:
|
|
return await ctx.send("API key has not been set. Please set it using `[p]rubyapi setkey`")
|
|
|
|
# Check cooldown
|
|
cooldown = await self.config.verify_cooldown()
|
|
last_verify = self._last_verify_time.get(user.id)
|
|
if last_verify:
|
|
time_diff = (datetime.now() - last_verify).total_seconds()
|
|
if time_diff < cooldown:
|
|
remaining = int(cooldown - time_diff)
|
|
return await ctx.send(f"Please wait {remaining} seconds before verifying again.")
|
|
|
|
payload = {
|
|
"user_id": str(user.id),
|
|
"guild_id": str(ctx.guild.id)
|
|
}
|
|
|
|
async with ctx.typing():
|
|
result = await self._make_api_request(
|
|
verify_url,
|
|
method="POST",
|
|
json=payload
|
|
)
|
|
|
|
if "error" in result:
|
|
return await ctx.send(f"Error verifying user: {result['error']}")
|
|
|
|
if result.get("verified", False):
|
|
roles_to_add = []
|
|
for role_id in result.get("roles", []):
|
|
if role := ctx.guild.get_role(int(role_id)):
|
|
roles_to_add.append(role)
|
|
|
|
if roles_to_add:
|
|
try:
|
|
await user.add_roles(*roles_to_add, reason="Linked roles verification")
|
|
role_names = ", ".join(role.name for role in roles_to_add)
|
|
|
|
# Update verification timestamp
|
|
self._last_verify_time[user.id] = datetime.now()
|
|
|
|
# Notify other cogs
|
|
await self.notify_verification(ctx.guild, user, roles_to_add)
|
|
|
|
# Create a nice embed for the response
|
|
embed = discord.Embed(
|
|
title="Verification Successful!",
|
|
description=f"Verified {user.mention}",
|
|
color=discord.Color.green()
|
|
)
|
|
embed.add_field(name="Roles Added", value=role_names)
|
|
|
|
if await self.config.compatible_cogs.get_raw("Economy"):
|
|
reward = await self.config.reward_credits()
|
|
embed.add_field(name="Reward", value=f"{reward} credits")
|
|
|
|
await ctx.send(embed=embed)
|
|
except discord.HTTPException as e:
|
|
await ctx.send(f"Error adding roles: {str(e)}")
|
|
else:
|
|
await ctx.send(f"Verified {user.mention}, but no roles needed to be added.")
|
|
else:
|
|
await ctx.send(f"Could not verify {user.mention}. Please make sure their accounts are properly linked.")
|
|
|
|
@commands.Cog.listener()
|
|
async def on_member_join(self, member: discord.Member):
|
|
"""Auto-verify users when they join if enabled."""
|
|
if not await self.config.enabled():
|
|
return
|
|
|
|
try:
|
|
ctx = await self.bot.get_context(member.last_message) if member.last_message else None
|
|
if ctx:
|
|
await self.verifyuser(ctx, member)
|
|
except Exception:
|
|
pass
|
|
|
|
@commands.Cog.listener()
|
|
async def on_interaction(self, interaction: discord.Interaction):
|
|
"""Handle incoming Discord interactions."""
|
|
if not await self.config.enabled():
|
|
return
|
|
|
|
interaction_url = await self.config.interaction_url()
|
|
|
|
# Forward the interaction to the API
|
|
payload = {
|
|
"type": interaction.type.value,
|
|
"guild_id": str(interaction.guild_id),
|
|
"channel_id": str(interaction.channel_id),
|
|
"data": interaction.data,
|
|
"member": {
|
|
"user": {
|
|
"id": str(interaction.user.id),
|
|
"username": interaction.user.name,
|
|
"discriminator": interaction.user.discriminator,
|
|
},
|
|
"roles": [str(role.id) for role in interaction.user.roles],
|
|
} if interaction.user else None
|
|
}
|
|
|
|
result = await self._make_api_request(
|
|
interaction_url,
|
|
method="POST",
|
|
json=payload
|
|
)
|
|
|
|
if "error" in result:
|
|
print(f"Error forwarding interaction: {result['error']}")
|
|
|
|
@commands.Cog.listener()
|
|
async def on_command_error(self, ctx: commands.Context, error: commands.CommandError):
|
|
"""Handle command errors."""
|
|
if isinstance(error, commands.CommandOnCooldown):
|
|
await ctx.send(f"Please wait {int(error.retry_after)} seconds before using this command again.")
|
|
|
|
async def _make_api_request(self, url: str, method: str = "GET", **kwargs):
|
|
"""Make an API request with proper headers and error handling."""
|
|
headers = kwargs.pop("headers", {})
|
|
if api_key := await self.config.api_key():
|
|
headers["Authorization"] = f"Bearer {api_key}"
|
|
|
|
try:
|
|
async with self.session.request(method, url, headers=headers, **kwargs) as resp:
|
|
if resp.status == 429: # Rate limited
|
|
retry_after = float(resp.headers.get("Retry-After", 5))
|
|
return {"error": "rate_limited", "retry_after": retry_after}
|
|
|
|
data = await resp.json()
|
|
return data
|
|
except aiohttp.ClientError as e:
|
|
return {"error": f"API request failed: {str(e)}"} |