Ruby-Cogs/minecraft/minecraft.py
2025-04-02 22:57:51 -04:00

451 lines
19 KiB
Python

from AAA3A_utils import Cog, Loop, Menu # isort:skip
from redbot.core import commands, Config # isort:skip
from redbot.core.bot import Red # isort:skip
from redbot.core.i18n import Translator, cog_i18n # isort:skip
import discord # isort:skip
import typing # isort:skip
import typing_extensions # isort:skip
import asyncio
import base64
import re
from io import BytesIO
from uuid import UUID
import aiohttp
from mcstatus import JavaServer
from redbot.core.utils.chat_formatting import box, pagify
# Credits:
# General repo credits.
# Thanks to Fixator for the code to get informations about Minecraft servers (https://github.com/fixator10/Fixator10-Cogs/blob/V3/minecraftdata/minecraftdata.py)!
_: Translator = Translator("Minecraft", __file__)
class MCPlayer:
def __init__(self, name: str, uuid: str) -> None:
self.name: str = name
self.uuid: str = uuid
self.dashed_uuid: str = str(UUID(self.uuid))
def __str__(self) -> str:
return self.name
@classmethod
async def convert(cls, ctx: commands.Context, argument: str) -> typing_extensions.Self:
cog = ctx.bot.get_cog("Minecraft")
try:
async with cog._session.get(
f"https://api.mojang.com/users/profiles/minecraft/{argument}",
raise_for_status=True,
) as r:
response_data = await r.json()
except aiohttp.ContentTypeError:
response_data = None
except aiohttp.ClientResponseError as e:
raise commands.BadArgument(
_("Unable to get data from Minecraft API: {e.message}.").format(e=e)
)
if response_data is None or "id" not in response_data:
raise commands.BadArgument(
_("{argument} not found on Mojang servers.").format(argument=argument)
)
uuid = str(response_data["id"])
name = str(response_data["name"])
try:
return cls(name=name, uuid=uuid)
except ValueError:
raise commands.BadArgument(
_("{argument} is found, but has incorrect UUID.").format(argument=argument)
)
@cog_i18n(_)
class Minecraft(Cog):
"""A cog to display informations about Minecraft Java users and servers, and notify for each change of a server!"""
def __init__(self, bot: Red) -> None:
super().__init__(bot=bot)
self._session: aiohttp.ClientSession = None
self.cache: typing.Dict[int, typing.Dict[str, dict]] = {}
self.config: Config = Config.get_conf(
self,
identifier=205192943327321000143939875896557571750,
force_registration=True,
)
self.config.register_channel(
servers={},
check_players=False,
edit_last_message=False,
)
async def cog_load(self) -> None:
await super().cog_load()
self._session: aiohttp.ClientSession = aiohttp.ClientSession()
self.loops.append(
Loop(
cog=self,
name="Check Minecraft Servers",
function=self.check_servers,
minutes=1,
)
)
async def cog_unload(self) -> None:
await self._session.close()
await super().cog_unload()
async def check_servers(self) -> None:
all_channels = await self.config.all_channels()
for channel_id in all_channels:
channel = self.bot.get_channel(channel_id)
if channel is None:
continue
if channel.id not in self.cache:
self.cache[channel.id] = {}
servers = all_channels[channel_id]["servers"]
check_players = all_channels[channel_id]["check_players"]
for server_url in servers:
try:
server: JavaServer = await JavaServer.async_lookup(address=server_url.lower())
status = await server.async_status()
except (asyncio.CancelledError, TimeoutError):
continue
except Exception as e:
self.logger.error(
f"No data found for {server_url} server in {channel.id} channel in {channel.guild.id} guild.",
exc_info=e,
)
continue
if check_players and "sample" in status.raw["players"]:
players = {player["id"]: player for player in status.raw["players"]["sample"]}
players = [players[_id] for _id in set(list(players.keys()))]
else:
players = {}
status.raw["players"]["sample"] = players
if server_url not in self.cache[channel.id]:
self.cache[channel.id][server_url] = {"server": server, "status": status}
continue
if status.raw != self.cache[channel.id][server_url]["status"].raw:
if "This server is offline." in (
await self.clear_mcformatting(status.description)
) and "This server is offline." in (
await self.clear_mcformatting(
self.cache[channel.id][server_url]["status"].description
)
): # Minecraft ADS
continue
embed, icon = await self.get_embed(server, status)
servers = await self.config.channel(channel).servers()
if isinstance(servers, typing.List):
servers = {server: None for server in servers}
if (
await self.config.channel(channel).edit_last_message()
and servers[server_url] is not None
):
try:
message = await channel.get_partial_message(servers[server_url]).edit(
embed=embed, attachments=[icon]
)
except discord.HTTPException:
message = await channel.send(embed=embed, file=icon)
else:
message = await channel.send(embed=embed, file=icon)
servers[server_url] = message.id
await self.config.channel(channel).servers.set(servers)
self.cache[channel.id][server_url] = {"server": server, "status": status}
async def get_embed(self, server: JavaServer, status) -> discord.Embed:
server_description = await self.clear_mcformatting(status.description)
embed: discord.Embed = discord.Embed(
title=f"{server.address.host}:{server.address.port}",
description=box(server_description),
)
embed.color = (
discord.Color.red()
if "This server is offline." in server_description
else (
discord.Color.orange()
if "This server is currently stopping." in server_description
else discord.Color.green()
)
)
icon_file = None
icon = (
discord.File(
icon_file := BytesIO(
base64.b64decode(status.icon.removeprefix("data:image/png;base64,"))
),
filename="icon.png",
)
if status.icon
else None
)
if icon:
embed.set_thumbnail(url="attachment://icon.png")
embed.add_field(name=_("Latency"), value=f"{status.latency:.2f} ms")
embed.add_field(
name=_("Players"),
value="{status.players.online}/{status.players.max}\n{players_list}".format(
status=status,
players_list=(
box(
list(
pagify(
await self.clear_mcformatting(
"\n".join([p.name for p in status.players.sample])
),
page_length=992,
)
)[0]
)
if status.players.sample
else ""
),
),
)
embed.add_field(
name=_("Version"),
value=f"{status.version.name}\nProtocol: {status.version.protocol}",
)
if icon_file is not None:
icon_file.close()
return embed, icon
async def clear_mcformatting(self, formatted_str) -> str:
"""Remove Minecraft-formatting"""
if not isinstance(formatted_str, dict):
return re.sub(r"\xA7[0-9A-FK-OR]", "", formatted_str, flags=re.IGNORECASE)
clean = ""
async for text in self.gen_dict_extract("text", formatted_str):
clean += text
return re.sub(r"\xA7[0-9A-FK-OR]", "", clean, flags=re.IGNORECASE)
async def gen_dict_extract(self, key: str, var: dict) -> str:
if not hasattr(var, "items"):
return
for k, v in var.items():
if k == key:
yield v
if isinstance(v, typing.Dict):
async for result in self.gen_dict_extract(key, v):
yield result
elif isinstance(v, typing.List):
for d in v:
async for result in self.gen_dict_extract(key, d):
yield result
@commands.hybrid_group()
async def minecraft(self, ctx: commands.Context):
"""Get informations about Minecraft Java."""
pass
@commands.bot_has_permissions(attach_files=True, embed_links=True)
@minecraft.command()
async def getplayerskin(
self, ctx: commands.Context, player: MCPlayer, overlay: bool = False
) -> None:
"""Get Minecraft Java player skin by name."""
uuid = player.uuid
stripname = player.name.strip("_")
files = []
try:
async with self._session.get(
f"https://crafatar.com/renders/head/{uuid}",
params="overlay" if overlay else None,
) as s:
files.append(
discord.File(BytesIO(await s.read()), filename=f"{stripname}_head.png")
)
async with self._session.get(f"https://crafatar.com/skins/{uuid}") as s:
files.append(discord.File(BytesIO(await s.read()), filename=f"{stripname}.png"))
async with self._session.get(
f"https://crafatar.com/renders/body/{uuid}.png",
params="overlay" if overlay else None,
) as s:
files.append(
discord.File(BytesIO(await s.read()), filename=f"{stripname}_body.png")
)
except aiohttp.ClientResponseError as e:
raise commands.UserFeedbackCheckFailure(
_("Unable to get data from Crafatar: {}").format(e.message)
)
embed: discord.Embed = discord.Embed(
timestamp=ctx.message.created_at, color=await ctx.embed_color()
)
embed.set_author(
name=player.name,
icon_url=f"attachment://{stripname}_head.png",
url=f"https://crafatar.com/skins/{uuid}",
)
embed.set_thumbnail(url=f"attachment://{stripname}.png")
embed.set_image(url=f"attachment://{stripname}_body.png")
embed.set_footer(text=_("Provided by Crafatar."), icon_url="https://crafatar.com/logo.png")
await ctx.send(embed=embed, files=files)
@commands.bot_has_permissions(attach_files=True, embed_links=True)
@minecraft.command()
async def getserver(self, ctx: commands.Context, server_url: str) -> None:
"""Get informations about a Minecraft Java server."""
try:
server: JavaServer = await JavaServer.async_lookup(address=server_url.lower())
status = await server.async_status()
except Exception:
raise commands.UserFeedbackCheckFailure(
_(
"No data found for this Minecraft server. Maybe it doesn't exist or its data are temporarily unavailable."
)
)
embed, icon = await self.get_embed(server, status)
await ctx.send(embed=embed, file=icon)
@commands.admin_or_permissions(manage_guild=True)
@minecraft.command(aliases=["add", "+"])
async def addserver(
self, ctx: commands.Context, channel: typing.Optional[discord.TextChannel], server_url: str
) -> None:
"""Add a Minecraft Java server in Config to get automatically new status."""
if channel is None:
channel = ctx.channel
channel_permissions = channel.permissions_for(ctx.me)
if (
not channel_permissions.view_channel
or not channel_permissions.read_messages
or not channel_permissions.read_message_history
or not channel_permissions.embed_links
):
raise commands.UserFeedbackCheckFailure(
_(
"I don't have sufficient permissions in this channel to send messages with embeds."
)
)
servers = await self.config.channel(channel).servers()
if server_url.lower() in servers:
raise commands.UserFeedbackCheckFailure(_("This server has already been added."))
try:
server: JavaServer = await JavaServer.async_lookup(address=server_url.lower())
await server.async_status()
except Exception:
raise commands.UserFeedbackCheckFailure(
_(
"No data found for this Minecraft server. Maybe it doesn't exist or its data are temporarily unavailable."
)
)
if isinstance(servers, typing.List):
servers = {server: None for server in servers}
servers[server_url.lower()] = None # last message
await self.config.channel(channel).servers.set(servers)
await ctx.send(_("Server added to this channel."))
@commands.admin_or_permissions(manage_guild=True)
@minecraft.command(aliases=["remove", "-"])
async def removeserver(
self, ctx: commands.Context, channel: typing.Optional[discord.TextChannel], server_url: str
) -> None:
"""Remove a Minecraft Java server in Config."""
if channel is None:
channel = ctx.channel
servers = await self.config.channel(channel).servers()
if server_url.lower() not in servers:
raise commands.UserFeedbackCheckFailure(_("This server isn't in the Config."))
if isinstance(servers, typing.List):
servers = {server: None for server in servers}
del servers[server_url.lower()]
await self.config.channel(channel).servers.set(servers)
await ctx.send(_("Server removed from this channel."))
@commands.admin_or_permissions(manage_guild=True)
@minecraft.command()
async def checkplayers(
self, ctx: commands.Context, channel: typing.Optional[discord.TextChannel], state: bool
) -> None:
"""Include players joining or leaving the server in notifications."""
if channel is None:
channel = ctx.channel
await self.config.channel(channel).check_players.set(state)
if not state:
for server_url in self.cache[channel.id]:
self.cache[channel.id][server_url]["status"].raw["players"]["sample"] = {}
await ctx.send(_("I will not check players for the notifications."))
else:
await ctx.send(_("I will check players for the notifications."))
@commands.admin_or_permissions(manage_guild=True)
@minecraft.command()
async def editlastmessage(
self, ctx: commands.Context, channel: typing.Optional[discord.TextChannel], state: bool
) -> None:
"""Edit the last message sent for changes."""
if channel is None:
channel = ctx.channel
await self.config.channel(channel).edit_last_message.set(state)
if not state:
await ctx.send(_("I will not edit my last message for the notifications."))
else:
await ctx.send(_("I will edit my last message for the notifications."))
@commands.is_owner()
@minecraft.command(hidden=True)
async def forcecheck(self, ctx: commands.Context) -> None:
"""Force check Minecraft Java servers in Config."""
await self.check_servers()
await ctx.send(_("Servers checked."))
@commands.is_owner()
@commands.bot_has_permissions(embed_links=True)
@minecraft.command(hidden=True)
async def getdebugloopsstatus(self, ctx: commands.Context):
"""Get an embed for check loop status."""
embeds = [loop.get_debug_embed() for loop in self.loops]
await Menu(pages=embeds).start(ctx)
@commands.Cog.listener()
async def on_assistant_cog_add(
self, assistant_cog: typing.Optional[commands.Cog] = None
) -> None: # Vert's Assistant integration/third party.
if assistant_cog is None:
return self.get_minecraft_java_server_for_assistant
schema = {
"name": "get_minecraft_java_server_for_assistant",
"description": "Get informations about a Minecraft Java server.",
"parameters": {
"type": "object",
"properties": {
"server_url": {
"type": "string",
"description": "The URL of the Minecraft Java server.",
},
},
"required": ["server_url"],
},
}
await assistant_cog.register_function(cog_name=self.qualified_name, schema=schema)
async def get_minecraft_java_server_for_assistant(self, server_url: str, *args, **kwargs):
try:
server: JavaServer = await JavaServer.async_lookup(address=server_url.lower())
status = await server.async_status()
except Exception:
return "No data found for this Minecraft Java server."
server_description = await self.clear_mcformatting(status.description)
data = {
"Host & Port": f"{server.address.host}:{server.address.port}",
"Description": box(server_description),
"Status": (
"Offline."
if "This server is offline." in server_description
else (
"Currently stopping."
if "This server is currently stopping." in server_description
else "Online."
)
),
"Latency": f"{status.latency:.2f} ms",
"Players": f"{status.players.online}/{status.players.max}",
"Version": status.version.name,
"Protocol": status.version.protocol,
}
return [f"{key}: {value}\n" for key, value in data.items() if value is not None]