from typing import Optional, Dict, Any import aiohttp import discord from redbot.core import commands, Config, checks from redbot.core.bot import Red from redbot.core.utils.chat_formatting import box from redbot.core.utils.views import SimpleMenu from redbot.core import bank from datetime import datetime class RubyAPI(commands.Cog): """Ruby API integration for interactions and linked roles verification.""" def __init__(self, bot: Red): self.bot = bot self.config = Config.get_conf(self, identifier=867530999, force_registration=True) self.session = aiohttp.ClientSession() self._last_verify_time: Dict[int, datetime] = {} # Cache for rate limiting default_global = { "interaction_url": "https://ruby.valerie.lol/api/verify-user", "verify_url": "https://ruby.valerie.lol/api/verify-user", "enabled": True, "api_key": None, "verify_cooldown": 60, # Cooldown in seconds "reward_credits": 100, # Credits reward for verification "compatible_cogs": { "Economy": True, "Leveler": True, "CustomCom": True } } self.config.register_global(**default_global) def cog_unload(self): if self.session: self.bot.loop.create_task(self.session.close()) async def red_delete_data_for_user(self, *, requester: str, user_id: int): """Handle data deletion requests from users.""" if requester not in ("discord_deleted_user", "owner", "user"): return # Clear verification cache for the user self._last_verify_time.pop(user_id, None) async def red_get_data_for_user(self, *, user_id: int): """Get a user's personal data.""" data = {"verify_history": {}} # Add verification timestamp if exists if user_id in self._last_verify_time: data["verify_history"]["last_verify"] = self._last_verify_time[user_id].isoformat() return data async def cog_check(self, ctx: commands.Context) -> bool: """Cog-wide check to ensure only bot team members can use commands.""" return await self.bot.is_owner(ctx.author) or await self.bot.is_co_owner(ctx.author) @commands.group(name="rubyapi") async def rubyapi(self, ctx: commands.Context): """Ruby API configuration commands. Bot team only.""" pass @rubyapi.command(name="compatability") async def toggle_compatibility(self, ctx: commands.Context, cog_name: str, enabled: bool): """Toggle compatibility with other cogs.""" async with self.config.compatible_cogs() as compat: if cog_name not in compat: return await ctx.send(f"Unknown cog {cog_name}. Available cogs: {', '.join(compat.keys())}") compat[cog_name] = enabled await ctx.send(f"Compatibility with {cog_name} {'enabled' if enabled else 'disabled'}.") @rubyapi.command(name="setcooldown") async def set_cooldown(self, ctx: commands.Context, seconds: int): """Set the cooldown between verifications.""" await self.config.verify_cooldown.set(seconds) await ctx.send(f"Verification cooldown set to {seconds} seconds.") @rubyapi.command(name="setreward") async def set_reward(self, ctx: commands.Context, credits: int): """Set the credit reward for successful verification.""" await self.config.reward_credits.set(credits) await ctx.send(f"Verification reward set to {credits} credits.") async def notify_verification(self, guild: discord.Guild, user: discord.Member, roles_added: list): """Notify other cogs about successful verification.""" compatible_cogs = await self.config.compatible_cogs() if compatible_cogs.get("Economy", False): reward_credits = await self.config.reward_credits() try: await bank.deposit_credits(user, reward_credits) # You could send a DM to the user about the reward except Exception as e: print(f"Failed to reward credits: {e}") if compatible_cogs.get("Leveler", False): leveler = self.bot.get_cog("Leveler") if leveler: # Example of how to integrate with Leveler cog try: await leveler.add_exp(user, guild, 100) except Exception: pass if compatible_cogs.get("CustomCom", False): customcom = self.bot.get_cog("CustomCom") if customcom: # Trigger any custom commands tagged for verification try: ctx = await self.bot.get_context(user.last_message) if user.last_message else None if ctx: await customcom.trigger_custom_commands(ctx, "verification") except Exception: pass @commands.command() @commands.cooldown(1, 60, commands.BucketType.user) async def verifyuser(self, ctx: commands.Context, user: Optional[discord.Member] = None): """Verify a user's linked roles. Bot team only.""" if not await self.config.enabled(): return await ctx.send("Ruby API integration is not enabled globally.") user = user or ctx.author verify_url = await self.config.verify_url() api_key = await self.config.api_key() if not api_key: return await ctx.send("API key has not been set. Please set it using `[p]rubyapi setkey`") # Check cooldown cooldown = await self.config.verify_cooldown() last_verify = self._last_verify_time.get(user.id) if last_verify: time_diff = (datetime.now() - last_verify).total_seconds() if time_diff < cooldown: remaining = int(cooldown - time_diff) return await ctx.send(f"Please wait {remaining} seconds before verifying again.") payload = { "user_id": str(user.id), "guild_id": str(ctx.guild.id) } async with ctx.typing(): result = await self._make_api_request( verify_url, method="POST", json=payload ) if "error" in result: return await ctx.send(f"Error verifying user: {result['error']}") if result.get("verified", False): roles_to_add = [] for role_id in result.get("roles", []): if role := ctx.guild.get_role(int(role_id)): roles_to_add.append(role) if roles_to_add: try: await user.add_roles(*roles_to_add, reason="Linked roles verification") role_names = ", ".join(role.name for role in roles_to_add) # Update verification timestamp self._last_verify_time[user.id] = datetime.now() # Notify other cogs await self.notify_verification(ctx.guild, user, roles_to_add) # Create a nice embed for the response embed = discord.Embed( title="Verification Successful!", description=f"Verified {user.mention}", color=discord.Color.green() ) embed.add_field(name="Roles Added", value=role_names) if await self.config.compatible_cogs.get_raw("Economy"): reward = await self.config.reward_credits() embed.add_field(name="Reward", value=f"{reward} credits") await ctx.send(embed=embed) except discord.HTTPException as e: await ctx.send(f"Error adding roles: {str(e)}") else: await ctx.send(f"Verified {user.mention}, but no roles needed to be added.") else: await ctx.send(f"Could not verify {user.mention}. Please make sure their accounts are properly linked.") @commands.Cog.listener() async def on_member_join(self, member: discord.Member): """Auto-verify users when they join if enabled.""" if not await self.config.enabled(): return try: ctx = await self.bot.get_context(member.last_message) if member.last_message else None if ctx: await self.verifyuser(ctx, member) except Exception: pass @commands.Cog.listener() async def on_interaction(self, interaction: discord.Interaction): """Handle incoming Discord interactions.""" if not await self.config.enabled(): return interaction_url = await self.config.interaction_url() # Forward the interaction to the API payload = { "type": interaction.type.value, "guild_id": str(interaction.guild_id), "channel_id": str(interaction.channel_id), "data": interaction.data, "member": { "user": { "id": str(interaction.user.id), "username": interaction.user.name, "discriminator": interaction.user.discriminator, }, "roles": [str(role.id) for role in interaction.user.roles], } if interaction.user else None } result = await self._make_api_request( interaction_url, method="POST", json=payload ) if "error" in result: print(f"Error forwarding interaction: {result['error']}") @commands.Cog.listener() async def on_command_error(self, ctx: commands.Context, error: commands.CommandError): """Handle command errors.""" if isinstance(error, commands.CommandOnCooldown): await ctx.send(f"Please wait {int(error.retry_after)} seconds before using this command again.") async def _make_api_request(self, url: str, method: str = "GET", **kwargs): """Make an API request with proper headers and error handling.""" headers = kwargs.pop("headers", {}) if api_key := await self.config.api_key(): headers["Authorization"] = f"Bearer {api_key}" try: async with self.session.request(method, url, headers=headers, **kwargs) as resp: if resp.status == 429: # Rate limited retry_after = float(resp.headers.get("Retry-After", 5)) return {"error": "rate_limited", "retry_after": retry_after} data = await resp.json() return data except aiohttp.ClientError as e: return {"error": f"API request failed: {str(e)}"}