341 lines
15 KiB
Python
341 lines
15 KiB
Python
import asyncio
|
|
import base64
|
|
import logging
|
|
import random
|
|
import typing as t
|
|
from contextlib import suppress
|
|
from io import BytesIO
|
|
|
|
import aiohttp
|
|
import discord
|
|
from redbot.core.i18n import Translator
|
|
|
|
from ..abc import MixinMeta
|
|
from ..common import utils
|
|
from ..common.models import GuildSettings, Profile
|
|
from ..generator import levelalert
|
|
|
|
log = logging.getLogger("red.vrt.levelup.shared.levelups")
|
|
_ = Translator("LevelUp", __file__)
|
|
|
|
|
|
class LevelUps(MixinMeta):
|
|
async def check_levelups(
|
|
self,
|
|
guild: discord.Guild,
|
|
member: discord.Member,
|
|
profile: Profile,
|
|
conf: GuildSettings,
|
|
message: t.Optional[discord.Message] = None,
|
|
channel: t.Optional[
|
|
t.Union[discord.TextChannel, discord.VoiceChannel, discord.Thread, discord.ForumChannel]
|
|
] = None,
|
|
) -> bool:
|
|
"""Check if a user has leveled up and award roles if needed
|
|
|
|
Args:
|
|
guild (discord.Guild): The guild where the leveling up occurred.
|
|
member (discord.Member): The member who leveled up.
|
|
profile (Profile): The profile of the member.
|
|
conf (GuildSettings): The guild settings.
|
|
message (t.Optional[discord.Message], optional): The message that triggered the leveling up. Defaults to None.
|
|
channel (t.Optional[t.Union[discord.TextChannel, discord.VoiceChannel, discord.Thread, discord.ForumChannel]], optional): The channel where the leveling up occurred. Defaults to None.
|
|
|
|
Returns:
|
|
bool: True if the user leveled up, False otherwise.
|
|
"""
|
|
calculated_level = conf.algorithm.get_level(profile.xp)
|
|
if calculated_level == profile.level:
|
|
# No action needed, user hasn't leveled up
|
|
return False
|
|
if not calculated_level:
|
|
# User hasnt reached level 1 yet
|
|
return False
|
|
log.debug(f"{member} has reached level {calculated_level} in {guild}")
|
|
profile.level = calculated_level
|
|
# User has reached a new level, time to log and award roles if needed
|
|
await self.ensure_roles(member, conf)
|
|
current_channel = channel or (message.channel if message else None)
|
|
log_channel = guild.get_channel(conf.notifylog) if conf.notifylog else None
|
|
|
|
role = None
|
|
if calculated_level in conf.levelroles:
|
|
role_id = conf.levelroles[calculated_level]
|
|
role = guild.get_role(role_id)
|
|
|
|
placeholders = {
|
|
"username": member.name,
|
|
"displayname": member.display_name,
|
|
"mention": member.mention,
|
|
"level": profile.level,
|
|
"role": role.name if role else None,
|
|
"server": guild.name,
|
|
}
|
|
username = member.display_name if profile.show_displayname else member.name
|
|
mention = member.mention if conf.notifymention else username
|
|
if role:
|
|
if dm_txt_raw := conf.role_awarded_dm:
|
|
dm_txt = dm_txt_raw.format(**placeholders)
|
|
else:
|
|
dm_txt = _("You just reached level {} in {} and obtained the {} role!").format(
|
|
profile.level, guild.name, role.mention
|
|
)
|
|
if msg_txt_raw := conf.role_awarded_msg:
|
|
msg_txt = msg_txt_raw.format(**placeholders)
|
|
else:
|
|
msg_txt = _("{} just reached level {} and obtained the {} role!").format(
|
|
mention, profile.level, role.mention
|
|
)
|
|
else:
|
|
placeholders.pop("role")
|
|
if dm_txt_raw := conf.levelup_dm:
|
|
dm_txt = dm_txt_raw.format(**placeholders)
|
|
else:
|
|
dm_txt = _("You just reached level {} in {}!").format(profile.level, guild.name)
|
|
if msg_txt_raw := conf.levelup_msg:
|
|
msg_txt = msg_txt_raw.format(**placeholders)
|
|
else:
|
|
msg_txt = _("{} just reached level {}!").format(mention, profile.level)
|
|
|
|
if conf.use_embeds or self.db.force_embeds:
|
|
if conf.notifydm:
|
|
embed = discord.Embed(
|
|
description=dm_txt,
|
|
color=member.color,
|
|
).set_thumbnail(url=member.display_avatar)
|
|
with suppress(discord.HTTPException):
|
|
await member.send(embed=embed)
|
|
|
|
embed = discord.Embed(
|
|
description=msg_txt,
|
|
color=member.color,
|
|
).set_author(
|
|
name=member.display_name if profile.show_displayname else member.name,
|
|
icon_url=member.display_avatar,
|
|
)
|
|
if current_channel and conf.notify:
|
|
with suppress(discord.HTTPException):
|
|
if conf.notifymention:
|
|
await current_channel.send(member.mention, embed=embed)
|
|
else:
|
|
await current_channel.send(embed=embed)
|
|
|
|
current_channel_id = current_channel.id if current_channel else 0
|
|
if log_channel and log_channel.id != current_channel_id:
|
|
with suppress(discord.HTTPException):
|
|
if conf.notifymention:
|
|
await log_channel.send(member.mention, embed=embed)
|
|
else:
|
|
await log_channel.send(embed=embed)
|
|
|
|
else:
|
|
fonts = list(self.fonts.glob("*.ttf")) + list(self.custom_fonts.iterdir())
|
|
font = str(random.choice(fonts))
|
|
if profile.font:
|
|
if (self.fonts / profile.font).exists():
|
|
font = str(self.fonts / profile.font)
|
|
elif (self.custom_fonts / profile.font).exists():
|
|
font = str(self.custom_fonts / profile.font)
|
|
|
|
color = utils.string_to_rgb(profile.statcolor) if profile.statcolor else member.color.to_rgb()
|
|
if color == (0, 0, 0):
|
|
color = utils.string_to_rgb(profile.namecolor) if profile.namecolor else None
|
|
|
|
payload = aiohttp.FormData()
|
|
if self.db.external_api_url or (self.db.internal_api_port and self.api_proc):
|
|
banner = await self.get_profile_background(member.id, profile, try_return_url=True)
|
|
avatar = member.display_avatar.url
|
|
payload.add_field(
|
|
"background_bytes", BytesIO(banner) if isinstance(banner, bytes) else banner, filename="data"
|
|
)
|
|
payload.add_field("avatar_bytes", avatar)
|
|
payload.add_field("level", str(profile.level))
|
|
payload.add_field("color", str(color))
|
|
payload.add_field("font_path", font)
|
|
payload.add_field("render_gif", str(self.db.render_gifs))
|
|
|
|
else:
|
|
avatar = await member.display_avatar.read()
|
|
banner = await self.get_profile_background(member.id, profile)
|
|
|
|
img_bytes, animated = None, None
|
|
if external_url := self.db.external_api_url:
|
|
try:
|
|
url = f"{external_url}/levelup"
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.post(url, data=payload) as response:
|
|
if response.status == 200:
|
|
data = await response.json()
|
|
img_b64, animated = data["b64"], data["animated"]
|
|
img_bytes = base64.b64decode(img_b64)
|
|
except Exception as e:
|
|
log.error("Failed to fetch levelup image from external API", exc_info=e)
|
|
elif self.db.internal_api_port and self.api_proc:
|
|
try:
|
|
url = f"http://127.0.0.1:{self.db.internal_api_port}/levelup"
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.post(url, data=payload) as response:
|
|
if response.status == 200:
|
|
data = await response.json()
|
|
img_b64, animated = data["b64"], data["animated"]
|
|
img_bytes = base64.b64decode(img_b64)
|
|
except Exception as e:
|
|
log.error("Failed to fetch levelup image from internal API", exc_info=e)
|
|
|
|
def _run() -> t.Tuple[bytes, bool]:
|
|
img_bytes, animated = levelalert.generate_level_img(
|
|
background_bytes=banner,
|
|
avatar_bytes=avatar,
|
|
level=profile.level,
|
|
color=color,
|
|
font_path=font,
|
|
render_gif=self.db.render_gifs,
|
|
)
|
|
return img_bytes, animated
|
|
|
|
if not img_bytes:
|
|
img_bytes, animated = await asyncio.to_thread(_run)
|
|
|
|
ext = "gif" if animated else "webp"
|
|
if conf.notifydm:
|
|
file = discord.File(BytesIO(img_bytes), filename=f"levelup.{ext}")
|
|
with suppress(discord.HTTPException):
|
|
await member.send(dm_txt, file=file)
|
|
|
|
if current_channel and conf.notify:
|
|
file = discord.File(BytesIO(img_bytes), filename=f"levelup.{ext}")
|
|
with suppress(discord.HTTPException):
|
|
if conf.notifymention and message is not None:
|
|
await message.reply(msg_txt, file=file, mention_author=True)
|
|
else:
|
|
await current_channel.send(msg_txt, file=file)
|
|
|
|
current_channel_id = current_channel.id if current_channel else 0
|
|
if log_channel and log_channel.id != current_channel_id:
|
|
file = discord.File(BytesIO(img_bytes), filename=f"levelup.{ext}")
|
|
with suppress(discord.HTTPException):
|
|
await log_channel.send(msg_txt, file=file)
|
|
|
|
payload = {
|
|
"guild": guild, # discord.Guild
|
|
"member": member, # discord.Member
|
|
"message": message, # Optional[discord.Message] = None
|
|
"channel": channel, # Optional[TextChannel | VoiceChannel | Thread | ForumChannel] = None
|
|
"new_level": profile.level, # int
|
|
}
|
|
self.bot.dispatch("member_levelup", **payload)
|
|
return True
|
|
|
|
async def ensure_roles(
|
|
self,
|
|
member: discord.Member,
|
|
conf: t.Optional[GuildSettings] = None,
|
|
reason: t.Optional[str] = None,
|
|
) -> t.Tuple[t.List[discord.Role], t.List[discord.Role]]:
|
|
"""Ensure a user has the correct level roles based on their level and the guild's settings"""
|
|
if conf is None:
|
|
conf = self.db.get_conf(member.guild)
|
|
if not conf.levelroles:
|
|
return [], []
|
|
if not member.guild.me.guild_permissions.manage_roles:
|
|
return [], []
|
|
if member.id not in conf.users:
|
|
return [], []
|
|
if reason is None:
|
|
reason = _("Level Up")
|
|
conf.levelroles = dict(sorted(conf.levelroles.items(), key=lambda x: x[0], reverse=True))
|
|
to_add = set()
|
|
to_remove = set()
|
|
user_roles = member.roles
|
|
user_role_ids = [role.id for role in user_roles]
|
|
profile = conf.get_profile(member)
|
|
|
|
using_prestige = all([profile.prestige, conf.prestigelevel, conf.prestigedata, conf.keep_level_roles])
|
|
|
|
if using_prestige:
|
|
# User has prestiges and thus must meet the requirements for any level role inherently
|
|
valid_levels = conf.levelroles
|
|
else:
|
|
valid_levels = {k: v for k, v in conf.levelroles.items() if k <= profile.level}
|
|
|
|
valid_levels = dict(sorted(valid_levels.items(), key=lambda x: x[0]))
|
|
|
|
if conf.autoremove:
|
|
# Add highest level role and remove the rest
|
|
highest_role_id = 0
|
|
if valid_levels:
|
|
highest_role_id = valid_levels[max(list(valid_levels.keys()))]
|
|
if highest_role_id not in user_role_ids:
|
|
to_add.add(highest_role_id)
|
|
|
|
for role_id in conf.levelroles.values():
|
|
if role_id != highest_role_id and role_id in user_role_ids:
|
|
to_remove.add(role_id)
|
|
else:
|
|
# Ensure user has all roles up to their level
|
|
for level, role_id in conf.levelroles.items():
|
|
if level <= profile.level or using_prestige:
|
|
if role_id not in user_role_ids:
|
|
to_add.add(role_id)
|
|
elif role_id in user_role_ids:
|
|
to_remove.add(role_id)
|
|
|
|
if profile.prestige and conf.prestigedata:
|
|
# Assign prestige roles
|
|
if conf.stackprestigeroles:
|
|
for prestige_level, pdata in conf.prestigedata.items():
|
|
if profile.prestige < prestige_level:
|
|
continue
|
|
if pdata.role in user_role_ids:
|
|
continue
|
|
to_add.add(pdata.role)
|
|
else:
|
|
# Remove all prestige roles except the highest
|
|
for prestige_level, pdata in conf.prestigedata.items():
|
|
if prestige_level == profile.prestige:
|
|
if pdata.role not in user_role_ids:
|
|
to_add.add(pdata.role)
|
|
continue
|
|
if pdata.role in user_role_ids:
|
|
to_remove.add(pdata.role)
|
|
|
|
if weekly_role_id := conf.weeklysettings.role:
|
|
role_winners = conf.weeklysettings.last_winners
|
|
if not conf.weeklysettings.role_all and role_winners:
|
|
role_winners = [role_winners[0]]
|
|
|
|
if member.id in role_winners and weekly_role_id not in user_role_ids:
|
|
to_add.add(weekly_role_id)
|
|
elif member.id not in role_winners and weekly_role_id in user_role_ids:
|
|
to_remove.add(weekly_role_id)
|
|
|
|
add_roles: t.List[discord.Role] = []
|
|
remove_roles: t.List[discord.Role] = []
|
|
bad_roles = set() # Roles that the bot can't manage or cant find
|
|
for role_id in to_add:
|
|
role = member.guild.get_role(role_id)
|
|
if role and role.position < member.guild.me.top_role.position:
|
|
add_roles.append(role)
|
|
else:
|
|
bad_roles.add(role_id)
|
|
for role_id in to_remove:
|
|
role = member.guild.get_role(role_id)
|
|
if role and role.position < member.guild.me.top_role.position:
|
|
remove_roles.append(role)
|
|
else:
|
|
bad_roles.add(role_id)
|
|
|
|
if bad_roles:
|
|
conf.levelroles = {k: v for k, v in conf.levelroles.items() if v not in bad_roles}
|
|
self.save()
|
|
|
|
try:
|
|
if add_roles:
|
|
await member.add_roles(*add_roles, reason=reason)
|
|
if remove_roles:
|
|
await member.remove_roles(*remove_roles, reason=reason)
|
|
except discord.HTTPException:
|
|
log.warning(f"Failed to add/remove roles for {member}")
|
|
add_roles = []
|
|
remove_roles = []
|
|
return add_roles, remove_roles
|