Ruby-Cogs/lootdrop/lootdrop.py

1289 lines
54 KiB
Python

"""LootDrop cog for Red-DiscordBot - Drop random loot in channels for users to grab"""
from typing import Optional, Dict, List, Union, Tuple, Any, cast
import discord
from redbot.core import commands, Config, bank
from redbot.core.bot import Red
from redbot.core.utils.chat_formatting import box, humanize_list
import datetime
import random
from discord.ext import tasks
from collections import defaultdict
import asyncio
from .scenarios import SCENARIOS, Scenario
import time
# Default guild settings
DEFAULT_GUILD_SETTINGS: Dict[str, Any] = {
"enabled": False,
"channels": [],
"min_credits": 100,
"max_credits": 1000,
"bad_outcome_chance": 30,
"drop_timeout": 60,
"min_frequency": 300,
"max_frequency": 1800,
"activity_timeout": 300,
"last_drop": 0,
"next_drop": 0,
"messages": {"expired": "The opportunity has passed..."},
"user_stats": {}, # {user_id: {"good": count, "bad": count}}
"streak_bonus": 10, # Percentage bonus per streak level
"streak_max": 5, # Maximum streak multiplier
"streak_timeout": 24, # Hours before streak resets
"party_drop_chance": 5, # Percentage chance for party drop
"party_drop_min": 50, # Min credits per person
"party_drop_max": 200, # Max credits per person
"party_drop_timeout": 30 # Seconds to claim party drop
}
class ActiveDrop:
"""Represents an active loot drop in a channel
Attributes
----------
message: discord.Message
The message containing the loot drop
view: discord.ui.View
The view containing the claim button
created: int
Unix timestamp of when the drop was created
"""
def __init__(self, message: discord.Message, view: discord.ui.View, created: int) -> None:
self.message: discord.Message = message
self.view: discord.ui.View = view
self.created: int = created
class LootDrop(commands.Cog):
"""Drop random loot in channels for users to grab
This cog creates random loot drops in active channels that users can claim.
It supports both regular drops (single claim) and party drops (multi-claim).
Attributes
----------
bot: Red
The Red instance
config: Config
The config manager for guild settings
active_drops: Dict[int, ActiveDrop]
Currently active drops, keyed by guild ID
tasks: Dict[int, asyncio.Task]
Active timeout tasks, keyed by guild ID
channel_last_message: Dict[int, int]
Last message timestamp per channel
channel_perms_cache: Dict[int, Tuple[int, bool]]
Cache of channel permission checks
"""
def __init__(self, bot: Red) -> None:
self.bot: Red = bot
self.config: Config = Config.get_conf(
self,
identifier=582650109,
force_registration=True
)
self.config.register_guild(**DEFAULT_GUILD_SETTINGS)
self.active_drops: Dict[int, ActiveDrop] = {}
self.tasks: Dict[int, asyncio.Task] = {}
self.channel_last_message: Dict[int, int] = defaultdict(lambda: 0)
self.channel_perms_cache: Dict[int, Tuple[int, bool]] = {}
self.start_drops.start()
def cog_unload(self) -> None:
"""Cleanup when cog is unloaded
Cancels all tasks and removes active drops
"""
# Stop the background task first
self.start_drops.cancel()
try:
# Cancel all timeout tasks
for task in self.tasks.values():
task.cancel()
# Clean up active drops
for drop in self.active_drops.values():
try:
self.bot.loop.create_task(drop.message.delete())
except:
pass
finally:
# Clear all state
self.active_drops.clear()
self.tasks.clear()
self.channel_last_message.clear()
self.channel_perms_cache.clear()
@commands.Cog.listener()
async def on_message(self, message: discord.Message) -> None:
"""Track channel activity"""
if message.guild and not message.author.bot:
# Update activity for both channels and threads
self.channel_last_message[message.channel.id] = int(datetime.datetime.now().timestamp())
async def channel_is_active(self, channel: Union[discord.TextChannel, discord.Thread]) -> bool:
"""Check if a channel has had message activity within the activity timeout"""
now: int = int(datetime.datetime.now().timestamp())
last_message: int = self.channel_last_message[channel.id]
timeout: int = await self.config.guild(channel.guild).activity_timeout()
# For threads, also check if they're still active
if isinstance(channel, discord.Thread) and not channel.parent:
return False
return (now - last_message) < timeout
async def has_channel_permissions(self, channel: Union[discord.TextChannel, discord.Thread]) -> bool:
"""Check if bot has required permissions in channel, with caching"""
now: int = int(datetime.datetime.now().timestamp())
cache_entry: Optional[Tuple[int, bool]] = self.channel_perms_cache.get(channel.id)
if cache_entry and (now - cache_entry[0]) < 300:
return cache_entry[1]
# For threads, check parent channel permissions too
if isinstance(channel, discord.Thread):
if not channel.parent:
return False
parent_perms = channel.parent.permissions_for(channel.guild.me)
if not parent_perms.send_messages:
return False
has_perms: bool = channel.permissions_for(channel.guild.me).send_messages
self.channel_perms_cache[channel.id] = (now, has_perms)
return has_perms
async def get_active_channel(self, guild: discord.Guild) -> Optional[Union[discord.TextChannel, discord.Thread]]:
"""Get a random active channel from the configured channels"""
channels: List[int] = await self.config.guild(guild).channels()
active_channels: List[Union[discord.TextChannel, discord.Thread]] = []
for channel_id in channels:
channel = None
# Try to get as text channel first
channel = guild.get_channel(channel_id)
# If not found, try to get as thread
if not channel:
for thread in guild.threads:
if thread.id == channel_id:
channel = thread
break
if channel and await self.channel_is_active(channel) and await self.has_channel_permissions(channel):
active_channels.append(channel)
return random.choice(active_channels) if active_channels else None
@tasks.loop(seconds=30)
async def start_drops(self) -> None:
"""Check for and create drops in all guilds"""
for guild in self.bot.guilds:
try:
if not await self.config.guild(guild).enabled() or guild.id in self.active_drops:
continue
now: int = int(datetime.datetime.now().timestamp())
if now >= await self.config.guild(guild).next_drop():
if channel := await self.get_active_channel(guild):
await self.create_drop(channel)
await self.schedule_next_drop(guild)
except Exception as e:
if guild.system_channel and guild.system_channel.permissions_for(guild.me).send_messages:
await guild.system_channel.send(f"Error creating loot drop: {e}")
@start_drops.before_loop
async def before_start_drops(self) -> None:
"""Wait for bot to be ready before starting drops"""
await self.bot.wait_until_ready()
async def schedule_next_drop(self, guild: discord.Guild) -> None:
"""Schedule the next drop for a guild"""
if not await self.config.guild(guild).enabled():
return
min_freq: int = await self.config.guild(guild).min_frequency()
max_freq: int = await self.config.guild(guild).max_frequency()
now: int = int(datetime.datetime.now().timestamp())
await self.config.guild(guild).next_drop.set(now + random.randint(min_freq, max_freq))
async def create_drop(self, channel: Union[discord.TextChannel, discord.Thread]) -> None:
"""Create a new loot drop in the specified channel"""
if not channel.permissions_for(channel.guild.me).send_messages:
return
if channel.guild.id in self.active_drops:
return
# Check for party drop
is_party = random.randint(1, 100) <= await self.config.guild(channel.guild).party_drop_chance()
if is_party:
await self.create_party_drop(channel)
else:
scenario = random.choice(SCENARIOS)
timeout = await self.config.guild(channel.guild).drop_timeout()
view = LootDropView(self, scenario, float(timeout))
view.message = await channel.send(scenario["start"], view=view)
self.active_drops[channel.guild.id] = ActiveDrop(
message=view.message,
view=view,
created=int(datetime.datetime.now().timestamp())
)
# Start timeout task
self.tasks[channel.guild.id] = asyncio.create_task(self._handle_drop_timeout(channel.guild.id, timeout))
async def _handle_drop_timeout(self, guild_id: int, timeout: int) -> None:
"""Handle drop timeout and cleanup"""
try:
if guild_id in self.active_drops:
drop = self.active_drops[guild_id]
# Calculate remaining time based on when the drop was created
now = int(datetime.datetime.now().timestamp())
elapsed = now - drop.created
remaining = max(0, timeout - elapsed)
await asyncio.sleep(remaining)
if guild_id in self.active_drops:
drop = self.active_drops[guild_id]
if not drop.view.claimed:
await drop.message.edit(content="The opportunity has passed...", view=None)
del self.active_drops[guild_id]
except Exception:
pass
finally:
if guild_id in self.tasks:
del self.tasks[guild_id]
async def create_party_drop(self, channel: Union[discord.TextChannel, discord.Thread]) -> None:
"""Create a party drop that everyone can claim
Parameters
----------
channel: Union[discord.TextChannel, discord.Thread]
The channel to create the drop in
"""
if channel.guild.id in self.active_drops:
return
timeout = await self.config.guild(channel.guild).party_drop_timeout()
view = PartyDropView(self, timeout)
message = await channel.send(
"🎉 **PARTY DROP!** 🎉\n"
"Everyone who clicks the button in the next "
f"{timeout} seconds gets a prize!\n",
view=view
)
view.message = message
self.active_drops[channel.guild.id] = ActiveDrop(
message=message,
view=view,
created=int(datetime.datetime.now().timestamp())
)
# Start timeout task
self.tasks[channel.guild.id] = asyncio.create_task(self._handle_party_timeout(channel.guild.id, timeout))
async def _handle_party_timeout(self, guild_id: int, timeout: int) -> None:
"""Handle party drop timeout and rewards
Parameters
----------
guild_id: int
The guild ID for this party drop
timeout: int
Seconds to wait before processing rewards
Notes
-----
Rewards are scaled based on reaction time:
- Clicking within first 20% of timeout: 80-100% of max credits
- Clicking within 20-40% of timeout: 60-80% of max credits
- Clicking within 40-60% of timeout: 40-60% of max credits
- Clicking within 60-80% of timeout: 20-40% of max credits
- Clicking within 80-100% of timeout: min credits
"""
try:
await asyncio.sleep(timeout)
if guild_id in self.active_drops:
drop = self.active_drops[guild_id]
view = cast(PartyDropView, drop.view)
if not view.claimed_users:
await drop.message.edit(content="No one joined the party... 😢", view=None)
else:
min_credits = await self.config.guild(drop.message.guild).party_drop_min()
max_credits = await self.config.guild(drop.message.guild).party_drop_max()
credit_range = max_credits - min_credits
# Sort users by click speed
sorted_users = sorted(view.claimed_users.items(), key=lambda x: x[1])
results = []
for position, (user_id, claim_time) in enumerate(sorted_users, 1):
user = drop.message.guild.get_member(int(user_id))
if user:
# Calculate reaction time and percentage of timeout
reaction_time = claim_time - view.start_time
time_percentage = (reaction_time / timeout) * 100
# Calculate credits based on reaction time
if time_percentage <= 20: # Super fast (80-100% of max)
credits = max_credits - int((time_percentage / 20) * (credit_range * 0.2))
elif time_percentage <= 40: # Fast (60-80% of max)
credits = int(max_credits * 0.8) - int(((time_percentage - 20) / 20) * (credit_range * 0.2))
elif time_percentage <= 60: # Medium (40-60% of max)
credits = int(max_credits * 0.6) - int(((time_percentage - 40) / 20) * (credit_range * 0.2))
elif time_percentage <= 80: # Slow (20-40% of max)
credits = int(max_credits * 0.4) - int(((time_percentage - 60) / 20) * (credit_range * 0.2))
else: # Very slow (min credits)
credits = min_credits
await bank.deposit_credits(user, credits)
# Update stats and streak
async with self.config.guild(drop.message.guild).user_stats() as stats:
if user_id not in stats:
stats[user_id] = {"good": 0, "bad": 0, "streak": 0, "highest_streak": 0, "last_claim": 0}
stats[user_id]["good"] += 1
# Update streak if within timeout
now = int(datetime.datetime.now().timestamp())
streak_timeout = await self.config.guild(drop.message.guild).streak_timeout()
if now - stats[user_id]["last_claim"] < streak_timeout * 3600:
stats[user_id]["streak"] += 1
stats[user_id]["highest_streak"] = max(stats[user_id]["streak"], stats[user_id]["highest_streak"])
else:
stats[user_id]["streak"] = 1
stats[user_id]["last_claim"] = now
# Add result with streak and timing info
streak_display = f" (🔥{stats[user_id]['streak']})" if stats[user_id]['streak'] > 1 else ""
position_emoji = ["🥇", "🥈", "🥉"][position-1] if position <= 3 else f"{position}th"
# Add speed indicator based on time percentage
if time_percentage <= 20:
speed = "💨 Super Fast!"
elif time_percentage <= 40:
speed = "⚡ Fast!"
elif time_percentage <= 60:
speed = "👍 Good"
elif time_percentage <= 80:
speed = "🐌 Slow"
else:
speed = "🦥 Very Slow"
currency_name = await bank.get_currency_name(drop.message.guild)
results.append(
f"{position_emoji} {user.mention}{streak_display}: {credits:,} {currency_name}\n"
f"{speed} ({reaction_time:.2f}s)"
)
await drop.message.edit(
content=f"🎊 **Party Drop Results!** 🎊\n"
f"{len(results)} party-goers claimed rewards:\n\n" +
"\n".join(results),
view=None
)
del self.active_drops[guild_id]
except Exception as e:
if guild_id in self.active_drops:
try:
await self.active_drops[guild_id].message.edit(content=f"Error processing party drop: {e}", view=None)
except:
pass
del self.active_drops[guild_id]
finally:
if guild_id in self.tasks:
del self.tasks[guild_id]
async def process_loot_claim(self, interaction: discord.Interaction) -> None:
"""Process a user's loot claim"""
try:
guild: discord.Guild = interaction.guild
user: discord.Member = interaction.user
min_credits: int = await self.config.guild(guild).min_credits()
max_credits: int = await self.config.guild(guild).max_credits()
bad_chance: int = await self.config.guild(guild).bad_outcome_chance()
streak_bonus: int = await self.config.guild(guild).streak_bonus()
streak_max: int = await self.config.guild(guild).streak_max()
streak_timeout: int = await self.config.guild(guild).streak_timeout()
currency_name: str = await bank.get_currency_name(guild)
scenario: Scenario = next(
(s for s in SCENARIOS if s["start"] == interaction.message.content),
SCENARIOS[0]
)
amount: int = random.randint(min_credits, max_credits)
is_bad: bool = random.randint(1, 100) <= bad_chance
# Update user stats and streaks
async with self.config.guild(guild).user_stats() as stats:
now = int(datetime.datetime.now().timestamp())
user_stats = stats.setdefault(str(user.id), {
"good": 0,
"bad": 0,
"streak": 0,
"last_claim": 0,
"highest_streak": 0
})
# Check if streak should reset due to timeout
hours_since_last = (now - user_stats.get("last_claim", 0)) / 3600
if hours_since_last > streak_timeout:
user_stats["streak"] = 0
user_stats["last_claim"] = now
try:
if is_bad:
amount = min(amount, await bank.get_balance(user))
await bank.withdraw_credits(user, amount)
message: str = scenario["bad"].format(user=user.mention, amount=f"{amount:,}", currency=currency_name)
user_stats["bad"] += 1
user_stats["streak"] = 0
else:
# Calculate streak bonus
streak = min(user_stats["streak"], streak_max)
bonus = int(amount * (streak * streak_bonus / 100))
total = amount + bonus
await bank.deposit_credits(user, total)
if bonus > 0:
message: str = (
f"{scenario['good'].format(user=user.mention, amount=f'{total:,}', currency=currency_name)}\n"
f"(Base: {amount:,} + Streak Bonus: {bonus:,} [{streak}x])"
)
else:
message: str = scenario["good"].format(user=user.mention, amount=f"{total:,}", currency=currency_name)
user_stats["good"] += 1
user_stats["streak"] += 1
user_stats["highest_streak"] = max(user_stats["streak"], user_stats.get("highest_streak", 0))
# Add stats to message
message += f"\n-# (✅ {user_stats['good']} | ❌ {user_stats.get('bad', 0)} drops claimed | "
if user_stats["streak"] > 0:
message += f"🔥 {user_stats['streak']} streak"
else:
message += "❄️ streak lost"
position, _ = await self.get_leaderboard_position(guild, str(user.id))
message += f" | Rank #{position})"
# Use followup instead of response since we might have already responded
if interaction.response.is_done():
await interaction.followup.send(message)
else:
await interaction.response.send_message(message)
except Exception as e:
if interaction.response.is_done():
await interaction.followup.send(f"Error processing claim: {e}", ephemeral=True)
else:
await interaction.response.send_message(f"Error processing claim: {e}", ephemeral=True)
except Exception as e:
if interaction.response.is_done():
await interaction.followup.send(f"Error processing claim: {e}", ephemeral=True)
else:
await interaction.response.send_message(f"Error processing claim: {e}", ephemeral=True)
@commands.group()
@commands.guild_only()
async def lootdrop(self, ctx: commands.Context) -> None:
"""Random loot drops that appear in active channels
Drops can be claimed by clicking a button. Features include:
- Regular drops (single claim)
- Party drops (everyone can claim)
- Streak bonuses for consecutive claims
- Risk/reward mechanics with good/bad outcomes
- Leaderboards and statistics
Use `[p]lootdrop set` to configure settings
Use `[p]lootdrop stats` to view your stats
Use `[p]lootdrop leaderboard` to view top claimers
"""
pass
@lootdrop.group(name="set")
@commands.admin_or_permissions(administrator=True)
async def lootdrop_set(self, ctx: commands.Context) -> None:
"""Configure LootDrop settings
All settings are per-server. Available settings:
- Toggle drops on/off
- Add/remove channels
- Credit amounts
- Drop frequency and timeouts
- Streak bonuses and timeouts
- Party drop settings
Use `[p]help LootDrop set` to see all settings
"""
pass
@lootdrop_set.command(name="toggle")
async def lootdrop_set_toggle(self, ctx: commands.Context, on_off: Optional[bool] = None) -> None:
"""Toggle LootDrop on/off"""
curr_state: bool = await self.config.guild(ctx.guild).enabled()
if on_off is None:
on_off = not curr_state
await self.config.guild(ctx.guild).enabled.set(on_off)
if on_off:
await self.schedule_next_drop(ctx.guild)
await ctx.send("LootDrop is now enabled! Drops will begin shortly.")
else:
await ctx.send("LootDrop is now disabled.")
@lootdrop_set.command(name="addchannel")
async def lootdrop_set_addchannel(
self,
ctx: commands.Context,
channel: Union[discord.TextChannel, discord.Thread]
) -> None:
"""Add a channel or thread to the loot drop pool"""
if isinstance(channel, discord.Thread) and not channel.parent:
await ctx.send("That thread no longer exists!")
return
if not channel.permissions_for(ctx.guild.me).send_messages:
await ctx.send("I don't have permission to send messages there!")
return
async with self.config.guild(ctx.guild).channels() as channels:
if channel.id in channels:
await ctx.send(f"{channel.mention} is already in the loot drop pool!")
return
channels.append(channel.id)
await ctx.send(f"Added {channel.mention} to the loot drop pool!")
@lootdrop_set.command(name="removechannel")
async def lootdrop_set_removechannel(
self,
ctx: commands.Context,
channel: Union[discord.TextChannel, discord.Thread]
) -> None:
"""Remove a channel or thread from the loot drop pool"""
async with self.config.guild(ctx.guild).channels() as channels:
if channel.id not in channels:
await ctx.send(f"{channel.mention} is not in the loot drop pool!")
return
channels.remove(channel.id)
await ctx.send(f"Removed {channel.mention} from the loot drop pool!")
@lootdrop_set.command(name="credits")
async def lootdrop_set_credits(self, ctx: commands.Context, min_credits: int, max_credits: int) -> None:
"""Set the credit range for drops"""
currency_name = await bank.get_currency_name(ctx.guild)
if min_credits < 1:
await ctx.send(f"Minimum {currency_name} must be at least 1!")
return
if max_credits < min_credits:
await ctx.send(f"Maximum {currency_name} must be greater than minimum {currency_name}!")
return
await self.config.guild(ctx.guild).min_credits.set(min_credits)
await self.config.guild(ctx.guild).max_credits.set(max_credits)
await ctx.send(f"Drops will now give between {min_credits:,} and {max_credits:,} {currency_name}!")
@lootdrop_set.command(name="badchance")
async def lootdrop_set_badchance(self, ctx: commands.Context, chance: int) -> None:
"""Set the chance of bad outcomes (0-100)"""
if not 0 <= chance <= 100:
await ctx.send("Chance must be between 0 and 100!")
return
await self.config.guild(ctx.guild).bad_outcome_chance.set(chance)
await ctx.send(f"Bad outcome chance set to {chance}%")
@lootdrop_set.command(name="timeout")
async def lootdrop_set_timeout(self, ctx: commands.Context, seconds: int) -> None:
"""Set how long users have to claim a drop
Parameters
----------
seconds: int
Number of seconds before the drop expires
Example: 3600 = 1 hour, 1800 = 30 minutes
"""
if seconds < 10:
await ctx.send("Timeout must be at least 10 seconds!")
return
await self.config.guild(ctx.guild).drop_timeout.set(seconds)
# Convert to a more readable format for the response
if seconds >= 3600:
time_str = f"{seconds // 3600} hours"
if seconds % 3600:
time_str += f" and {(seconds % 3600) // 60} minutes"
elif seconds >= 60:
time_str = f"{seconds // 60} minutes"
if seconds % 60:
time_str += f" and {seconds % 60} seconds"
else:
time_str = f"{seconds} seconds"
await ctx.send(f"Users will now have {time_str} to claim drops!")
@lootdrop_set.command(name="frequency")
async def lootdrop_set_frequency(self, ctx: commands.Context, min_minutes: int, max_minutes: int) -> None:
"""Set how frequently drops appear"""
if min_minutes < 1:
await ctx.send("Minimum frequency must be at least 1 minute!")
return
if max_minutes < min_minutes:
await ctx.send("Maximum frequency must be greater than minimum frequency!")
return
await self.config.guild(ctx.guild).min_frequency.set(min_minutes * 60)
await self.config.guild(ctx.guild).max_frequency.set(max_minutes * 60)
await self.schedule_next_drop(ctx.guild)
await ctx.send(f"Drops will occur randomly between {min_minutes} and {max_minutes} minutes apart.")
@lootdrop_set.command(name="activitytimeout")
async def lootdrop_set_activitytimeout(self, ctx: commands.Context, minutes: int) -> None:
"""Set how long a channel can be inactive before drops are skipped"""
if minutes < 1:
await ctx.send("Timeout must be at least 1 minute!")
return
await self.config.guild(ctx.guild).activity_timeout.set(minutes * 60)
await ctx.send(f"Channels will now be considered inactive after {minutes} minutes without messages.")
@lootdrop_set.command(name="streakbonus")
async def lootdrop_set_streakbonus(self, ctx: commands.Context, percentage: int) -> None:
"""Set the credit bonus percentage per streak level
Example: A bonus of 10 means each streak level adds 10% more credits
So streak 3 would give a 30% bonus
"""
if percentage < 0:
await ctx.send("Bonus percentage must be positive!")
return
await self.config.guild(ctx.guild).streak_bonus.set(percentage)
await ctx.send(f"Streak bonus set to {percentage}% per level")
@lootdrop_set.command(name="streakmax")
async def lootdrop_set_streakmax(self, ctx: commands.Context, max_level: int) -> None:
"""Set the maximum streak multiplier level
Example: A max of 5 means streaks cap at 5x the bonus
"""
if max_level < 1:
await ctx.send("Maximum streak level must be at least 1!")
return
await self.config.guild(ctx.guild).streak_max.set(max_level)
await ctx.send(f"Maximum streak level set to {max_level}")
@lootdrop_set.command(name="streaktimeout")
async def lootdrop_set_streaktimeout(self, ctx: commands.Context, hours: int) -> None:
"""Set how many hours before a streak resets
Example: A timeout of 24 means you must claim within 24 hours to keep streak
"""
if hours < 1:
await ctx.send("Timeout must be at least 1 hour!")
return
await self.config.guild(ctx.guild).streak_timeout.set(hours)
await ctx.send(f"Streak timeout set to {hours} hours")
@lootdrop_set.command(name="partychance")
async def lootdrop_set_partychance(self, ctx: commands.Context, chance: int) -> None:
"""Set the chance of a party drop appearing (0-100)
Party drops allow everyone to claim within a time limit
"""
if not 0 <= chance <= 100:
await ctx.send("Chance must be between 0 and 100!")
return
await self.config.guild(ctx.guild).party_drop_chance.set(chance)
await ctx.send(f"Party drop chance set to {chance}%")
@lootdrop_set.command(name="partycredits")
async def lootdrop_set_partycredits(self, ctx: commands.Context, min_credits: int, max_credits: int) -> None:
"""Set the credit range for party drops
Each person who claims gets this amount
"""
currency_name = await bank.get_currency_name(ctx.guild)
if min_credits < 1:
await ctx.send(f"Minimum {currency_name} must be at least 1!")
return
if max_credits < min_credits:
await ctx.send(f"Maximum {currency_name} must be greater than minimum {currency_name}!")
return
await self.config.guild(ctx.guild).party_drop_min.set(min_credits)
await self.config.guild(ctx.guild).party_drop_max.set(max_credits)
await ctx.send(f"Party drops will now give between {min_credits:,} and {max_credits:,} {currency_name} per person!")
@lootdrop_set.command(name="partytimeout")
async def lootdrop_set_partytimeout(self, ctx: commands.Context, seconds: int) -> None:
"""Set how long users have to claim a party drop"""
if seconds < 5:
await ctx.send("Timeout must be at least 5 seconds!")
return
await self.config.guild(ctx.guild).party_drop_timeout.set(seconds)
await ctx.send(f"Users will now have {seconds} seconds to claim party drops!")
@commands.is_owner()
@lootdrop.command(name="wipedata")
async def lootdrop_wipedata(self, ctx: commands.Context) -> None:
"""⚠️ [Owner Only] Completely wipe all LootDrop data
This will erase:
- All guild settings
- All user stats and streaks
- All active drops
This action cannot be undone!
"""
# Ask for confirmation
msg = await ctx.send(
"⚠️ **WARNING**: This will completely wipe all LootDrop data across all servers!\n"
"This includes all settings, stats, and streaks.\n\n"
"**This action cannot be undone!**\n\n"
"Type `yes, wipe all data` to confirm."
)
try:
response = await self.bot.wait_for(
"message",
timeout=30.0,
check=lambda m: m.author == ctx.author and m.channel == ctx.channel
)
except asyncio.TimeoutError:
await msg.edit(content="Data wipe cancelled - timed out.")
return
if response.content.lower() != "yes, wipe all data":
await msg.edit(content="Data wipe cancelled - incorrect confirmation.")
return
# Stop all active drops and tasks
self.start_drops.cancel()
for task in self.tasks.values():
task.cancel()
try:
# Clean up active drops
for drop in self.active_drops.values():
try:
await drop.message.delete()
except:
pass
# Clear all state
self.active_drops.clear()
self.tasks.clear()
self.channel_last_message.clear()
self.channel_perms_cache.clear()
# Clear all data from config
await self.config.clear_all()
# Restart the drop task
self.start_drops.start()
await msg.edit(content="✅ All LootDrop data has been wiped!")
except Exception as e:
await msg.edit(content=f"Error wiping data: {e}")
@commands.is_owner()
@lootdrop.command(name="wipestats")
async def lootdrop_wipestats(self, ctx: commands.Context) -> None:
"""⚠️ [Owner Only] Wipe all user stats and streaks
This will erase:
- All user stats (good/bad drops)
- All user streaks
- Leaderboard data
This action cannot be undone!
Settings and active drops will be preserved.
"""
# Ask for confirmation
msg = await ctx.send(
"⚠️ **WARNING**: This will wipe all user stats and streaks across all servers!\n"
"This includes all drop counts, streaks, and leaderboard data.\n\n"
"**This action cannot be undone!**\n"
"Server settings and active drops will be preserved.\n\n"
"Type `yes, wipe stats` to confirm."
)
try:
response = await self.bot.wait_for(
"message",
timeout=30.0,
check=lambda m: m.author == ctx.author and m.channel == ctx.channel
)
except asyncio.TimeoutError:
await msg.edit(content="Stats wipe cancelled - timed out.")
return
if response.content.lower() != "yes, wipe stats":
await msg.edit(content="Stats wipe cancelled - incorrect confirmation.")
return
try:
# Get all guild data
all_guilds = await self.config.all_guilds()
# Clear only user_stats for each guild
for guild_id in all_guilds:
async with self.config.guild_from_id(guild_id).user_stats() as stats:
stats.clear()
await msg.edit(content="✅ All user stats and streaks have been wiped!")
except Exception as e:
await msg.edit(content=f"Error wiping stats: {e}")
@lootdrop.command(name="force")
@commands.admin_or_permissions(administrator=True)
async def lootdrop_force(self, ctx: commands.Context, channel: Optional[Union[discord.TextChannel, discord.Thread]] = None) -> None:
"""Force a lootdrop to appear"""
channel = channel or ctx.channel
if channel.guild.id in self.active_drops:
await ctx.send("There's already an active drop in this server! Wait for it to be claimed or expire.")
return
if not channel.permissions_for(channel.guild.me).send_messages:
await ctx.send(f"I don't have permission to send messages in {channel.mention}!")
return
try:
await self.create_drop(channel)
await self.config.guild(channel.guild).last_drop.set(int(datetime.datetime.now().timestamp()))
except Exception as e:
await ctx.send(f"Error creating drop: {e}")
@lootdrop.command(name="forceparty")
@commands.admin_or_permissions(administrator=True)
async def lootdrop_force_party(self, ctx: commands.Context, channel: Optional[Union[discord.TextChannel, discord.Thread]] = None) -> None:
"""Force a party drop to appear
Everyone who clicks the button gets credits!
"""
channel = channel or ctx.channel
if channel.guild.id in self.active_drops:
await ctx.send("There's already an active drop in this server! Wait for it to be claimed or expire.")
return
if not channel.permissions_for(channel.guild.me).send_messages:
await ctx.send(f"I don't have permission to send messages in {channel.mention}!")
return
try:
await self.create_party_drop(channel)
await self.config.guild(channel.guild).last_drop.set(int(datetime.datetime.now().timestamp()))
except Exception as e:
await ctx.send(f"Error creating party drop: {e}")
@lootdrop.command(name="stats")
async def lootdrop_stats(self, ctx: commands.Context, user: Optional[discord.Member] = None) -> None:
"""View loot drop statistics for a user"""
user = user or ctx.author
stats = await self.config.guild(ctx.guild).user_stats()
user_stats = stats.get(str(user.id), {
"good": 0,
"bad": 0,
"streak": 0,
"highest_streak": 0
})
total = user_stats["good"] + user_stats.get("bad", 0)
if total == 0:
await ctx.send(f"{user.mention} hasn't claimed any drops yet!")
return
success_rate = (user_stats["good"] / total) * 100
streak_bonus = await self.config.guild(ctx.guild).streak_bonus()
current_bonus = min(user_stats["streak"], await self.config.guild(ctx.guild).streak_max()) * streak_bonus
embed = discord.Embed(
title="🎲 Drop Statistics",
description=f"Stats for {user.mention}",
color=discord.Color.blue()
)
stats_text = (
f"**Total Drops:** {total:,}\n"
f"**Successful:** ✅ {user_stats['good']:,}\n"
f"**Bad Luck:** ❌ {user_stats.get('bad', 0):,}\n"
f"**Success Rate:** {success_rate:.1f}%\n\n"
f"**Current Streak:** 🔥 {user_stats['streak']}\n"
f"**Highest Streak:** ⭐ {user_stats.get('highest_streak', 0)}\n"
f"**Current Bonus:** +{current_bonus}% credits"
)
embed.add_field(name="📊 Statistics", value=stats_text, inline=False)
await ctx.send(embed=embed)
@lootdrop.command(name="leaderboard", aliases=["lb"])
async def lootdrop_leaderboard(self, ctx: commands.Context) -> None:
"""View the loot drop leaderboard (Top 5)"""
_, leaderboard = await self.get_leaderboard_position(ctx.guild, str(ctx.author.id))
if not leaderboard:
await ctx.send("No drops have been claimed yet!")
return
embed = discord.Embed(
title="🏆 LootDrop Leaderboard",
color=discord.Color.gold()
)
# Get top 5 users
description = []
for i, (user_id, total, good, highest_streak) in enumerate(leaderboard[:5], 1):
user = ctx.guild.get_member(int(user_id))
if not user:
continue
medal = {1: "🥇", 2: "🥈", 3: "🥉"}.get(i, "👑")
success_rate = (good / total) * 100 if total > 0 else 0
description.append(
f"{medal} **#{i}** {user.mention}\n"
f"{total:,} drops (✅ {good:,} | {success_rate:.1f}% success) | ⭐ Highest Streak: {highest_streak}"
)
if description:
embed.description = "\n\n".join(description)
else:
embed.description = "No valid leaderboard entries found"
# Add author's position if not in top 5
author_pos, _ = await self.get_leaderboard_position(ctx.guild, str(ctx.author.id))
if author_pos > 5:
stats = await self.config.guild(ctx.guild).user_stats()
author_stats = stats.get(str(ctx.author.id), {"good": 0, "bad": 0, "highest_streak": 0})
total = author_stats["good"] + author_stats.get("bad", 0)
success_rate = (author_stats["good"] / total) * 100 if total > 0 else 0
embed.add_field(
name="Your Position",
value=(
f"#{author_pos} with {total:,} drops "
f"(✅ {author_stats['good']:,} | {success_rate:.1f}% success) | "
f"⭐ Highest Streak: {author_stats.get('highest_streak', 0)}"
),
inline=False
)
await ctx.send(embed=embed)
@lootdrop.command(name="settings")
async def lootdrop_settings(self, ctx: commands.Context) -> None:
"""View current LootDrop settings"""
settings: Dict[str, Any] = await self.config.guild(ctx.guild).all()
currency_name = await bank.get_currency_name(ctx.guild)
channels: List[str] = []
for channel_id in settings["channels"]:
# Try to get as text channel first
channel = ctx.guild.get_channel(channel_id)
# If not found, try to get as thread
if not channel:
for thread in ctx.guild.threads:
if thread.id == channel_id:
channel = thread
break
if channel:
channels.append(channel.mention)
embed: discord.Embed = discord.Embed(
title="💰 LootDrop Settings",
description=f"**Status:** {'🟢 Enabled' if settings['enabled'] else '🔴 Disabled'}",
color=discord.Color.gold() if settings['enabled'] else discord.Color.greyple()
)
# Channel Settings
if channels:
embed.add_field(
name="📝 Channels",
value=humanize_list(channels),
inline=False
)
else:
embed.add_field(
name="📝 Channels",
value="*No channels configured*\nUse `lootdrop set addchannel` to add channels or threads",
inline=False
)
# Drop Settings
drop_settings = (
f"**{currency_name}:** {settings['min_credits']:,} - {settings['max_credits']:,}\n"
f"**Bad Outcome:** {settings['bad_outcome_chance']}% chance\n"
f"**Claim Time:** {settings['drop_timeout']} seconds\n"
f"**Frequency:** {settings['min_frequency'] // 60} - {settings['max_frequency'] // 60} minutes"
)
embed.add_field(name="⚙️ Drop Settings", value=drop_settings, inline=False)
# Activity Settings
activity_settings = (
f"**Timeout:** {settings['activity_timeout'] // 60} minutes of inactivity\n"
"*Channels must have recent messages to receive drops*"
)
embed.add_field(name="⏰ Activity Settings", value=activity_settings, inline=False)
# Streak Settings
streak_settings = (
f"**Bonus:** {settings['streak_bonus']}% per level\n"
f"**Max Streak:** {settings['streak_max']}x\n"
f"**Timeout:** {settings['streak_timeout']} hours"
)
embed.add_field(name="🔥 Streak Settings", value=streak_settings, inline=False)
# Party Drop Settings
party_settings = (
f"**Chance:** {settings['party_drop_chance']}% chance\n"
f"**{currency_name}:** {settings['party_drop_min']:,} - {settings['party_drop_max']:,}\n"
f"**Timeout:** {settings['party_drop_timeout']} seconds"
)
embed.add_field(name="🎉 Party Drop Settings", value=party_settings, inline=False)
# Next Drop Info
if settings["enabled"]:
now: int = int(datetime.datetime.now().timestamp())
next_drop: int = settings["next_drop"]
if next_drop <= now:
next_drop_text = "**🎉 Drop is due now!**"
else:
minutes_left: int = (next_drop - now) // 60
seconds_left: int = (next_drop - now) % 60
if minutes_left > 0:
next_drop_text = f"**Next drop in {minutes_left}m {seconds_left}s**"
else:
next_drop_text = f"**Next drop in {seconds_left}s**"
embed.add_field(name="⏳ Next Drop", value=next_drop_text, inline=False)
# Footer with command hint
embed.set_footer(text="Use 'help lootdrop' to see all commands")
await ctx.send(embed=embed)
async def get_leaderboard_position(self, guild: discord.Guild, user_id: str) -> Tuple[int, List[Tuple[str, int, int, int]]]:
"""Get user's position and full leaderboard data"""
stats = await self.config.guild(guild).user_stats()
# Convert stats to list of (user_id, total_claims, good_claims, highest_streak)
leaderboard = [
(uid, data["good"] + data.get("bad", 0), data["good"], data.get("highest_streak", 0))
for uid, data in stats.items()
if data["good"] + data.get("bad", 0) > 0 # Only include users with at least one drop
]
# Sort by total claims (desc) and then highest streak (desc)
leaderboard.sort(key=lambda x: (x[1], x[3]), reverse=True)
# Find user's position (1-based index)
# If user has drops but somehow not in leaderboard, give last place instead of 0
user_stats = stats.get(user_id, {"good": 0, "bad": 0})
total_drops = user_stats["good"] + user_stats.get("bad", 0)
if total_drops == 0:
position = 0
else:
position = next((i + 1 for i, (uid, _, _, _) in enumerate(leaderboard) if uid == user_id), len(leaderboard) + 1)
return position, leaderboard
class DropButton(discord.ui.Button['LootDropView']):
"""Button for claiming regular loot drops
This button is used for single-claim drops where only one user can claim the reward.
The button is removed after a successful claim.
Attributes
----------
scenario: Scenario
The scenario this button is for
"""
def __init__(self, scenario: Scenario) -> None:
super().__init__(
style=discord.ButtonStyle.success,
emoji=scenario["button_emoji"],
label=scenario["button_text"]
)
self.scenario = scenario
async def callback(self, interaction: discord.Interaction) -> None:
"""Handle button press
Parameters
----------
interaction: discord.Interaction
The interaction that triggered this callback
"""
if interaction.user.bot:
return
assert self.view is not None
if self.view.claimed:
await interaction.response.send_message("This drop has already been claimed!", ephemeral=True)
return
self.view.claimed = True
await interaction.response.defer()
# Remove from active drops and cancel timeout task
if interaction.guild_id in self.view.cog.active_drops:
del self.view.cog.active_drops[interaction.guild_id]
if interaction.guild_id in self.view.cog.tasks:
self.view.cog.tasks[interaction.guild_id].cancel()
del self.view.cog.tasks[interaction.guild_id]
await self.view.cog.process_loot_claim(interaction)
try:
await interaction.message.edit(view=None)
except:
pass
class LootDropView(discord.ui.View):
"""View for regular loot drops
This view contains a single button that can be claimed once.
After claiming, the button is removed and rewards are distributed.
Attributes
----------
cog: LootDrop
The cog instance that created this view
message: Optional[discord.Message]
The message containing this view
claimed: bool
Whether this drop has been claimed
scenario: Scenario
The scenario for this drop
"""
def __init__(self, cog: 'LootDrop', scenario: Scenario, timeout: float) -> None:
super().__init__(timeout=timeout)
self.cog: 'LootDrop' = cog
self.message: Optional[discord.Message] = None
self.claimed: bool = False
self.scenario: Scenario = scenario
self.add_item(DropButton(scenario))
async def on_timeout(self) -> None:
"""Handle view timeout
Removes the button and updates the message when the drop expires
"""
if self.message and not self.claimed:
try:
await self.message.edit(content="The opportunity has passed...", view=None)
except:
pass
class PartyDropView(discord.ui.View):
"""View for party drops that multiple users can claim
This view allows multiple users to claim rewards within a time window.
The button remains active until timeout, allowing multiple users to join.
Attributes
----------
cog: LootDrop
The cog instance that created this view
claimed_users: Dict[str, float]
Dict of user IDs to their claim timestamps
message: Optional[discord.Message]
The message containing this view
start_time: float
When the party drop was created
"""
def __init__(self, cog: 'LootDrop', timeout: float) -> None:
super().__init__(timeout=timeout)
self.cog: 'LootDrop' = cog
self.claimed_users: Dict[str, float] = {}
self.message: Optional[discord.Message] = None
self.start_time: float = time.time()
@discord.ui.button(label="Claim Party Drop!", emoji="🎊", style=discord.ButtonStyle.success)
async def claim_button(self, interaction: discord.Interaction, button: discord.ui.Button) -> None:
"""Handle party drop claims
Parameters
----------
interaction: discord.Interaction
The interaction that triggered this callback
button: discord.ui.Button
The button that was clicked
"""
if interaction.user.bot:
return
if str(interaction.user.id) in self.claimed_users:
await interaction.response.send_message("You've already claimed this party drop!", ephemeral=True)
return
# Add user to claimed list with timestamp
self.claimed_users[str(interaction.user.id)] = time.time()
# Update button label to show number of participants
button.label = f"Join Party! ({len(self.claimed_users)} joined)"
# Acknowledge the interaction and update button
await interaction.response.edit_message(view=self)
# Send confirmation
await interaction.followup.send("🎉 You've joined the party! Wait for rewards...", ephemeral=True)
async def on_timeout(self) -> None:
"""Handle view timeout
Called when the party drop timer expires. The button will be removed
and rewards will be processed by the cog's timeout handler.
"""
if self.message:
try:
await self.message.edit(view=None)
except:
pass