Ruby-Cogs/levelup/shared/levelups.py
Valerie 477974d53c
Some checks are pending
Run pre-commit / Run pre-commit (push) Waiting to run
Upload 2 Cogs & Update README
2025-05-23 01:30:53 -04:00

341 lines
15 KiB
Python

import asyncio
import base64
import logging
import random
import typing as t
from contextlib import suppress
from io import BytesIO
import aiohttp
import discord
from redbot.core.i18n import Translator
from ..abc import MixinMeta
from ..common import utils
from ..common.models import GuildSettings, Profile
from ..generator import levelalert
log = logging.getLogger("red.vrt.levelup.shared.levelups")
_ = Translator("LevelUp", __file__)
class LevelUps(MixinMeta):
async def check_levelups(
self,
guild: discord.Guild,
member: discord.Member,
profile: Profile,
conf: GuildSettings,
message: t.Optional[discord.Message] = None,
channel: t.Optional[
t.Union[discord.TextChannel, discord.VoiceChannel, discord.Thread, discord.ForumChannel]
] = None,
) -> bool:
"""Check if a user has leveled up and award roles if needed
Args:
guild (discord.Guild): The guild where the leveling up occurred.
member (discord.Member): The member who leveled up.
profile (Profile): The profile of the member.
conf (GuildSettings): The guild settings.
message (t.Optional[discord.Message], optional): The message that triggered the leveling up. Defaults to None.
channel (t.Optional[t.Union[discord.TextChannel, discord.VoiceChannel, discord.Thread, discord.ForumChannel]], optional): The channel where the leveling up occurred. Defaults to None.
Returns:
bool: True if the user leveled up, False otherwise.
"""
calculated_level = conf.algorithm.get_level(profile.xp)
if calculated_level == profile.level:
# No action needed, user hasn't leveled up
return False
if not calculated_level:
# User hasnt reached level 1 yet
return False
log.debug(f"{member} has reached level {calculated_level} in {guild}")
profile.level = calculated_level
# User has reached a new level, time to log and award roles if needed
await self.ensure_roles(member, conf)
current_channel = channel or (message.channel if message else None)
log_channel = guild.get_channel(conf.notifylog) if conf.notifylog else None
role = None
if calculated_level in conf.levelroles:
role_id = conf.levelroles[calculated_level]
role = guild.get_role(role_id)
placeholders = {
"username": member.name,
"displayname": member.display_name,
"mention": member.mention,
"level": profile.level,
"role": role.name if role else None,
"server": guild.name,
}
username = member.display_name if profile.show_displayname else member.name
mention = member.mention if conf.notifymention else username
if role:
if dm_txt_raw := conf.role_awarded_dm:
dm_txt = dm_txt_raw.format(**placeholders)
else:
dm_txt = _("You just reached level {} in {} and obtained the {} role!").format(
profile.level, guild.name, role.mention
)
if msg_txt_raw := conf.role_awarded_msg:
msg_txt = msg_txt_raw.format(**placeholders)
else:
msg_txt = _("{} just reached level {} and obtained the {} role!").format(
mention, profile.level, role.mention
)
else:
placeholders.pop("role")
if dm_txt_raw := conf.levelup_dm:
dm_txt = dm_txt_raw.format(**placeholders)
else:
dm_txt = _("You just reached level {} in {}!").format(profile.level, guild.name)
if msg_txt_raw := conf.levelup_msg:
msg_txt = msg_txt_raw.format(**placeholders)
else:
msg_txt = _("{} just reached level {}!").format(mention, profile.level)
if conf.use_embeds or self.db.force_embeds:
if conf.notifydm:
embed = discord.Embed(
description=dm_txt,
color=member.color,
).set_thumbnail(url=member.display_avatar)
with suppress(discord.HTTPException):
await member.send(embed=embed)
embed = discord.Embed(
description=msg_txt,
color=member.color,
).set_author(
name=member.display_name if profile.show_displayname else member.name,
icon_url=member.display_avatar,
)
if current_channel and conf.notify:
with suppress(discord.HTTPException):
if conf.notifymention:
await current_channel.send(member.mention, embed=embed)
else:
await current_channel.send(embed=embed)
current_channel_id = current_channel.id if current_channel else 0
if log_channel and log_channel.id != current_channel_id:
with suppress(discord.HTTPException):
if conf.notifymention:
await log_channel.send(member.mention, embed=embed)
else:
await log_channel.send(embed=embed)
else:
fonts = list(self.fonts.glob("*.ttf")) + list(self.custom_fonts.iterdir())
font = str(random.choice(fonts))
if profile.font:
if (self.fonts / profile.font).exists():
font = str(self.fonts / profile.font)
elif (self.custom_fonts / profile.font).exists():
font = str(self.custom_fonts / profile.font)
color = utils.string_to_rgb(profile.statcolor) if profile.statcolor else member.color.to_rgb()
if color == (0, 0, 0):
color = utils.string_to_rgb(profile.namecolor) if profile.namecolor else None
payload = aiohttp.FormData()
if self.db.external_api_url or (self.db.internal_api_port and self.api_proc):
banner = await self.get_profile_background(member.id, profile, try_return_url=True)
avatar = member.display_avatar.url
payload.add_field(
"background_bytes", BytesIO(banner) if isinstance(banner, bytes) else banner, filename="data"
)
payload.add_field("avatar_bytes", avatar)
payload.add_field("level", str(profile.level))
payload.add_field("color", str(color))
payload.add_field("font_path", font)
payload.add_field("render_gif", str(self.db.render_gifs))
else:
avatar = await member.display_avatar.read()
banner = await self.get_profile_background(member.id, profile)
img_bytes, animated = None, None
if external_url := self.db.external_api_url:
try:
url = f"{external_url}/levelup"
async with aiohttp.ClientSession() as session:
async with session.post(url, data=payload) as response:
if response.status == 200:
data = await response.json()
img_b64, animated = data["b64"], data["animated"]
img_bytes = base64.b64decode(img_b64)
except Exception as e:
log.error("Failed to fetch levelup image from external API", exc_info=e)
elif self.db.internal_api_port and self.api_proc:
try:
url = f"http://127.0.0.1:{self.db.internal_api_port}/levelup"
async with aiohttp.ClientSession() as session:
async with session.post(url, data=payload) as response:
if response.status == 200:
data = await response.json()
img_b64, animated = data["b64"], data["animated"]
img_bytes = base64.b64decode(img_b64)
except Exception as e:
log.error("Failed to fetch levelup image from internal API", exc_info=e)
def _run() -> t.Tuple[bytes, bool]:
img_bytes, animated = levelalert.generate_level_img(
background_bytes=banner,
avatar_bytes=avatar,
level=profile.level,
color=color,
font_path=font,
render_gif=self.db.render_gifs,
)
return img_bytes, animated
if not img_bytes:
img_bytes, animated = await asyncio.to_thread(_run)
ext = "gif" if animated else "webp"
if conf.notifydm:
file = discord.File(BytesIO(img_bytes), filename=f"levelup.{ext}")
with suppress(discord.HTTPException):
await member.send(dm_txt, file=file)
if current_channel and conf.notify:
file = discord.File(BytesIO(img_bytes), filename=f"levelup.{ext}")
with suppress(discord.HTTPException):
if conf.notifymention and message is not None:
await message.reply(msg_txt, file=file, mention_author=True)
else:
await current_channel.send(msg_txt, file=file)
current_channel_id = current_channel.id if current_channel else 0
if log_channel and log_channel.id != current_channel_id:
file = discord.File(BytesIO(img_bytes), filename=f"levelup.{ext}")
with suppress(discord.HTTPException):
await log_channel.send(msg_txt, file=file)
payload = {
"guild": guild, # discord.Guild
"member": member, # discord.Member
"message": message, # Optional[discord.Message] = None
"channel": channel, # Optional[TextChannel | VoiceChannel | Thread | ForumChannel] = None
"new_level": profile.level, # int
}
self.bot.dispatch("member_levelup", **payload)
return True
async def ensure_roles(
self,
member: discord.Member,
conf: t.Optional[GuildSettings] = None,
reason: t.Optional[str] = None,
) -> t.Tuple[t.List[discord.Role], t.List[discord.Role]]:
"""Ensure a user has the correct level roles based on their level and the guild's settings"""
if conf is None:
conf = self.db.get_conf(member.guild)
if not conf.levelroles:
return [], []
if not member.guild.me.guild_permissions.manage_roles:
return [], []
if member.id not in conf.users:
return [], []
if reason is None:
reason = _("Level Up")
conf.levelroles = dict(sorted(conf.levelroles.items(), key=lambda x: x[0], reverse=True))
to_add = set()
to_remove = set()
user_roles = member.roles
user_role_ids = [role.id for role in user_roles]
profile = conf.get_profile(member)
using_prestige = all([profile.prestige, conf.prestigelevel, conf.prestigedata, conf.keep_level_roles])
if using_prestige:
# User has prestiges and thus must meet the requirements for any level role inherently
valid_levels = conf.levelroles
else:
valid_levels = {k: v for k, v in conf.levelroles.items() if k <= profile.level}
valid_levels = dict(sorted(valid_levels.items(), key=lambda x: x[0]))
if conf.autoremove:
# Add highest level role and remove the rest
highest_role_id = 0
if valid_levels:
highest_role_id = valid_levels[max(list(valid_levels.keys()))]
if highest_role_id not in user_role_ids:
to_add.add(highest_role_id)
for role_id in conf.levelroles.values():
if role_id != highest_role_id and role_id in user_role_ids:
to_remove.add(role_id)
else:
# Ensure user has all roles up to their level
for level, role_id in conf.levelroles.items():
if level <= profile.level or using_prestige:
if role_id not in user_role_ids:
to_add.add(role_id)
elif role_id in user_role_ids:
to_remove.add(role_id)
if profile.prestige and conf.prestigedata:
# Assign prestige roles
if conf.stackprestigeroles:
for prestige_level, pdata in conf.prestigedata.items():
if profile.prestige < prestige_level:
continue
if pdata.role in user_role_ids:
continue
to_add.add(pdata.role)
else:
# Remove all prestige roles except the highest
for prestige_level, pdata in conf.prestigedata.items():
if prestige_level == profile.prestige:
if pdata.role not in user_role_ids:
to_add.add(pdata.role)
continue
if pdata.role in user_role_ids:
to_remove.add(pdata.role)
if weekly_role_id := conf.weeklysettings.role:
role_winners = conf.weeklysettings.last_winners
if not conf.weeklysettings.role_all and role_winners:
role_winners = [role_winners[0]]
if member.id in role_winners and weekly_role_id not in user_role_ids:
to_add.add(weekly_role_id)
elif member.id not in role_winners and weekly_role_id in user_role_ids:
to_remove.add(weekly_role_id)
add_roles: t.List[discord.Role] = []
remove_roles: t.List[discord.Role] = []
bad_roles = set() # Roles that the bot can't manage or cant find
for role_id in to_add:
role = member.guild.get_role(role_id)
if role and role.position < member.guild.me.top_role.position:
add_roles.append(role)
else:
bad_roles.add(role_id)
for role_id in to_remove:
role = member.guild.get_role(role_id)
if role and role.position < member.guild.me.top_role.position:
remove_roles.append(role)
else:
bad_roles.add(role_id)
if bad_roles:
conf.levelroles = {k: v for k, v in conf.levelroles.items() if v not in bad_roles}
self.save()
try:
if add_roles:
await member.add_roles(*add_roles, reason=reason)
if remove_roles:
await member.remove_roles(*remove_roles, reason=reason)
except discord.HTTPException:
log.warning(f"Failed to add/remove roles for {member}")
add_roles = []
remove_roles = []
return add_roles, remove_roles