451 lines
19 KiB
Python
451 lines
19 KiB
Python
from AAA3A_utils import Cog, Loop, Menu # isort:skip
|
|
from redbot.core import commands, Config # isort:skip
|
|
from redbot.core.bot import Red # isort:skip
|
|
from redbot.core.i18n import Translator, cog_i18n # isort:skip
|
|
import discord # isort:skip
|
|
import typing # isort:skip
|
|
import typing_extensions # isort:skip
|
|
|
|
import asyncio
|
|
import base64
|
|
import re
|
|
from io import BytesIO
|
|
from uuid import UUID
|
|
|
|
import aiohttp
|
|
from mcstatus import JavaServer
|
|
from redbot.core.utils.chat_formatting import box, pagify
|
|
|
|
# Credits:
|
|
# General repo credits.
|
|
# Thanks to Fixator for the code to get informations about Minecraft servers (https://github.com/fixator10/Fixator10-Cogs/blob/V3/minecraftdata/minecraftdata.py)!
|
|
|
|
_: Translator = Translator("Minecraft", __file__)
|
|
|
|
|
|
class MCPlayer:
|
|
def __init__(self, name: str, uuid: str) -> None:
|
|
self.name: str = name
|
|
self.uuid: str = uuid
|
|
self.dashed_uuid: str = str(UUID(self.uuid))
|
|
|
|
def __str__(self) -> str:
|
|
return self.name
|
|
|
|
@classmethod
|
|
async def convert(cls, ctx: commands.Context, argument: str) -> typing_extensions.Self:
|
|
cog = ctx.bot.get_cog("Minecraft")
|
|
try:
|
|
async with cog._session.get(
|
|
f"https://api.mojang.com/users/profiles/minecraft/{argument}",
|
|
raise_for_status=True,
|
|
) as r:
|
|
response_data = await r.json()
|
|
except aiohttp.ContentTypeError:
|
|
response_data = None
|
|
except aiohttp.ClientResponseError as e:
|
|
raise commands.BadArgument(
|
|
_("Unable to get data from Minecraft API: {e.message}.").format(e=e)
|
|
)
|
|
if response_data is None or "id" not in response_data:
|
|
raise commands.BadArgument(
|
|
_("{argument} not found on Mojang servers.").format(argument=argument)
|
|
)
|
|
uuid = str(response_data["id"])
|
|
name = str(response_data["name"])
|
|
try:
|
|
return cls(name=name, uuid=uuid)
|
|
except ValueError:
|
|
raise commands.BadArgument(
|
|
_("{argument} is found, but has incorrect UUID.").format(argument=argument)
|
|
)
|
|
|
|
|
|
@cog_i18n(_)
|
|
class Minecraft(Cog):
|
|
"""A cog to display informations about Minecraft Java users and servers, and notify for each change of a server!"""
|
|
|
|
def __init__(self, bot: Red) -> None:
|
|
super().__init__(bot=bot)
|
|
|
|
self._session: aiohttp.ClientSession = None
|
|
self.cache: typing.Dict[int, typing.Dict[str, dict]] = {}
|
|
|
|
self.config: Config = Config.get_conf(
|
|
self,
|
|
identifier=205192943327321000143939875896557571750,
|
|
force_registration=True,
|
|
)
|
|
self.config.register_channel(
|
|
servers={},
|
|
check_players=False,
|
|
edit_last_message=False,
|
|
)
|
|
|
|
async def cog_load(self) -> None:
|
|
await super().cog_load()
|
|
self._session: aiohttp.ClientSession = aiohttp.ClientSession()
|
|
self.loops.append(
|
|
Loop(
|
|
cog=self,
|
|
name="Check Minecraft Servers",
|
|
function=self.check_servers,
|
|
minutes=1,
|
|
)
|
|
)
|
|
|
|
async def cog_unload(self) -> None:
|
|
await self._session.close()
|
|
await super().cog_unload()
|
|
|
|
async def check_servers(self) -> None:
|
|
all_channels = await self.config.all_channels()
|
|
for channel_id in all_channels:
|
|
channel = self.bot.get_channel(channel_id)
|
|
if channel is None:
|
|
continue
|
|
if channel.id not in self.cache:
|
|
self.cache[channel.id] = {}
|
|
servers = all_channels[channel_id]["servers"]
|
|
check_players = all_channels[channel_id]["check_players"]
|
|
for server_url in servers:
|
|
try:
|
|
server: JavaServer = await JavaServer.async_lookup(address=server_url.lower())
|
|
status = await server.async_status()
|
|
except (asyncio.CancelledError, TimeoutError):
|
|
continue
|
|
except Exception as e:
|
|
self.logger.error(
|
|
f"No data found for {server_url} server in {channel.id} channel in {channel.guild.id} guild.",
|
|
exc_info=e,
|
|
)
|
|
continue
|
|
if check_players and "sample" in status.raw["players"]:
|
|
players = {player["id"]: player for player in status.raw["players"]["sample"]}
|
|
players = [players[_id] for _id in set(list(players.keys()))]
|
|
else:
|
|
players = {}
|
|
status.raw["players"]["sample"] = players
|
|
if server_url not in self.cache[channel.id]:
|
|
self.cache[channel.id][server_url] = {"server": server, "status": status}
|
|
continue
|
|
if status.raw != self.cache[channel.id][server_url]["status"].raw:
|
|
if "This server is offline." in (
|
|
await self.clear_mcformatting(status.description)
|
|
) and "This server is offline." in (
|
|
await self.clear_mcformatting(
|
|
self.cache[channel.id][server_url]["status"].description
|
|
)
|
|
): # Minecraft ADS
|
|
continue
|
|
embed, icon = await self.get_embed(server, status)
|
|
servers = await self.config.channel(channel).servers()
|
|
if isinstance(servers, typing.List):
|
|
servers = {server: None for server in servers}
|
|
if (
|
|
await self.config.channel(channel).edit_last_message()
|
|
and servers[server_url] is not None
|
|
):
|
|
try:
|
|
message = await channel.get_partial_message(servers[server_url]).edit(
|
|
embed=embed, attachments=[icon]
|
|
)
|
|
except discord.HTTPException:
|
|
message = await channel.send(embed=embed, file=icon)
|
|
else:
|
|
message = await channel.send(embed=embed, file=icon)
|
|
servers[server_url] = message.id
|
|
await self.config.channel(channel).servers.set(servers)
|
|
self.cache[channel.id][server_url] = {"server": server, "status": status}
|
|
|
|
async def get_embed(self, server: JavaServer, status) -> discord.Embed:
|
|
server_description = await self.clear_mcformatting(status.description)
|
|
embed: discord.Embed = discord.Embed(
|
|
title=f"{server.address.host}:{server.address.port}",
|
|
description=box(server_description),
|
|
)
|
|
embed.color = (
|
|
discord.Color.red()
|
|
if "This server is offline." in server_description
|
|
else (
|
|
discord.Color.orange()
|
|
if "This server is currently stopping." in server_description
|
|
else discord.Color.green()
|
|
)
|
|
)
|
|
icon_file = None
|
|
icon = (
|
|
discord.File(
|
|
icon_file := BytesIO(
|
|
base64.b64decode(status.icon.removeprefix("data:image/png;base64,"))
|
|
),
|
|
filename="icon.png",
|
|
)
|
|
if status.icon
|
|
else None
|
|
)
|
|
if icon:
|
|
embed.set_thumbnail(url="attachment://icon.png")
|
|
embed.add_field(name=_("Latency"), value=f"{status.latency:.2f} ms")
|
|
embed.add_field(
|
|
name=_("Players"),
|
|
value="{status.players.online}/{status.players.max}\n{players_list}".format(
|
|
status=status,
|
|
players_list=(
|
|
box(
|
|
list(
|
|
pagify(
|
|
await self.clear_mcformatting(
|
|
"\n".join([p.name for p in status.players.sample])
|
|
),
|
|
page_length=992,
|
|
)
|
|
)[0]
|
|
)
|
|
if status.players.sample
|
|
else ""
|
|
),
|
|
),
|
|
)
|
|
embed.add_field(
|
|
name=_("Version"),
|
|
value=f"{status.version.name}\nProtocol: {status.version.protocol}",
|
|
)
|
|
if icon_file is not None:
|
|
icon_file.close()
|
|
return embed, icon
|
|
|
|
async def clear_mcformatting(self, formatted_str) -> str:
|
|
"""Remove Minecraft-formatting"""
|
|
if not isinstance(formatted_str, dict):
|
|
return re.sub(r"\xA7[0-9A-FK-OR]", "", formatted_str, flags=re.IGNORECASE)
|
|
clean = ""
|
|
async for text in self.gen_dict_extract("text", formatted_str):
|
|
clean += text
|
|
return re.sub(r"\xA7[0-9A-FK-OR]", "", clean, flags=re.IGNORECASE)
|
|
|
|
async def gen_dict_extract(self, key: str, var: dict) -> str:
|
|
if not hasattr(var, "items"):
|
|
return
|
|
for k, v in var.items():
|
|
if k == key:
|
|
yield v
|
|
if isinstance(v, typing.Dict):
|
|
async for result in self.gen_dict_extract(key, v):
|
|
yield result
|
|
elif isinstance(v, typing.List):
|
|
for d in v:
|
|
async for result in self.gen_dict_extract(key, d):
|
|
yield result
|
|
|
|
@commands.hybrid_group()
|
|
async def minecraft(self, ctx: commands.Context):
|
|
"""Get informations about Minecraft Java."""
|
|
pass
|
|
|
|
@commands.bot_has_permissions(attach_files=True, embed_links=True)
|
|
@minecraft.command()
|
|
async def getplayerskin(
|
|
self, ctx: commands.Context, player: MCPlayer, overlay: bool = False
|
|
) -> None:
|
|
"""Get Minecraft Java player skin by name."""
|
|
uuid = player.uuid
|
|
stripname = player.name.strip("_")
|
|
files = []
|
|
try:
|
|
async with self._session.get(
|
|
f"https://crafatar.com/renders/head/{uuid}",
|
|
params="overlay" if overlay else None,
|
|
) as s:
|
|
files.append(
|
|
discord.File(BytesIO(await s.read()), filename=f"{stripname}_head.png")
|
|
)
|
|
async with self._session.get(f"https://crafatar.com/skins/{uuid}") as s:
|
|
files.append(discord.File(BytesIO(await s.read()), filename=f"{stripname}.png"))
|
|
async with self._session.get(
|
|
f"https://crafatar.com/renders/body/{uuid}.png",
|
|
params="overlay" if overlay else None,
|
|
) as s:
|
|
files.append(
|
|
discord.File(BytesIO(await s.read()), filename=f"{stripname}_body.png")
|
|
)
|
|
except aiohttp.ClientResponseError as e:
|
|
raise commands.UserFeedbackCheckFailure(
|
|
_("Unable to get data from Crafatar: {}").format(e.message)
|
|
)
|
|
embed: discord.Embed = discord.Embed(
|
|
timestamp=ctx.message.created_at, color=await ctx.embed_color()
|
|
)
|
|
embed.set_author(
|
|
name=player.name,
|
|
icon_url=f"attachment://{stripname}_head.png",
|
|
url=f"https://crafatar.com/skins/{uuid}",
|
|
)
|
|
embed.set_thumbnail(url=f"attachment://{stripname}.png")
|
|
embed.set_image(url=f"attachment://{stripname}_body.png")
|
|
embed.set_footer(text=_("Provided by Crafatar."), icon_url="https://crafatar.com/logo.png")
|
|
await ctx.send(embed=embed, files=files)
|
|
|
|
@commands.bot_has_permissions(attach_files=True, embed_links=True)
|
|
@minecraft.command()
|
|
async def getserver(self, ctx: commands.Context, server_url: str) -> None:
|
|
"""Get informations about a Minecraft Java server."""
|
|
try:
|
|
server: JavaServer = await JavaServer.async_lookup(address=server_url.lower())
|
|
status = await server.async_status()
|
|
except Exception:
|
|
raise commands.UserFeedbackCheckFailure(
|
|
_(
|
|
"No data found for this Minecraft server. Maybe it doesn't exist or its data are temporarily unavailable."
|
|
)
|
|
)
|
|
embed, icon = await self.get_embed(server, status)
|
|
await ctx.send(embed=embed, file=icon)
|
|
|
|
@commands.admin_or_permissions(manage_guild=True)
|
|
@minecraft.command(aliases=["add", "+"])
|
|
async def addserver(
|
|
self, ctx: commands.Context, channel: typing.Optional[discord.TextChannel], server_url: str
|
|
) -> None:
|
|
"""Add a Minecraft Java server in Config to get automatically new status."""
|
|
if channel is None:
|
|
channel = ctx.channel
|
|
channel_permissions = channel.permissions_for(ctx.me)
|
|
if (
|
|
not channel_permissions.view_channel
|
|
or not channel_permissions.read_messages
|
|
or not channel_permissions.read_message_history
|
|
or not channel_permissions.embed_links
|
|
):
|
|
raise commands.UserFeedbackCheckFailure(
|
|
_(
|
|
"I don't have sufficient permissions in this channel to send messages with embeds."
|
|
)
|
|
)
|
|
servers = await self.config.channel(channel).servers()
|
|
if server_url.lower() in servers:
|
|
raise commands.UserFeedbackCheckFailure(_("This server has already been added."))
|
|
try:
|
|
server: JavaServer = await JavaServer.async_lookup(address=server_url.lower())
|
|
await server.async_status()
|
|
except Exception:
|
|
raise commands.UserFeedbackCheckFailure(
|
|
_(
|
|
"No data found for this Minecraft server. Maybe it doesn't exist or its data are temporarily unavailable."
|
|
)
|
|
)
|
|
if isinstance(servers, typing.List):
|
|
servers = {server: None for server in servers}
|
|
servers[server_url.lower()] = None # last message
|
|
await self.config.channel(channel).servers.set(servers)
|
|
await ctx.send(_("Server added to this channel."))
|
|
|
|
@commands.admin_or_permissions(manage_guild=True)
|
|
@minecraft.command(aliases=["remove", "-"])
|
|
async def removeserver(
|
|
self, ctx: commands.Context, channel: typing.Optional[discord.TextChannel], server_url: str
|
|
) -> None:
|
|
"""Remove a Minecraft Java server in Config."""
|
|
if channel is None:
|
|
channel = ctx.channel
|
|
servers = await self.config.channel(channel).servers()
|
|
if server_url.lower() not in servers:
|
|
raise commands.UserFeedbackCheckFailure(_("This server isn't in the Config."))
|
|
if isinstance(servers, typing.List):
|
|
servers = {server: None for server in servers}
|
|
del servers[server_url.lower()]
|
|
await self.config.channel(channel).servers.set(servers)
|
|
await ctx.send(_("Server removed from this channel."))
|
|
|
|
@commands.admin_or_permissions(manage_guild=True)
|
|
@minecraft.command()
|
|
async def checkplayers(
|
|
self, ctx: commands.Context, channel: typing.Optional[discord.TextChannel], state: bool
|
|
) -> None:
|
|
"""Include players joining or leaving the server in notifications."""
|
|
if channel is None:
|
|
channel = ctx.channel
|
|
await self.config.channel(channel).check_players.set(state)
|
|
if not state:
|
|
for server_url in self.cache[channel.id]:
|
|
self.cache[channel.id][server_url]["status"].raw["players"]["sample"] = {}
|
|
await ctx.send(_("I will not check players for the notifications."))
|
|
else:
|
|
await ctx.send(_("I will check players for the notifications."))
|
|
|
|
@commands.admin_or_permissions(manage_guild=True)
|
|
@minecraft.command()
|
|
async def editlastmessage(
|
|
self, ctx: commands.Context, channel: typing.Optional[discord.TextChannel], state: bool
|
|
) -> None:
|
|
"""Edit the last message sent for changes."""
|
|
if channel is None:
|
|
channel = ctx.channel
|
|
await self.config.channel(channel).edit_last_message.set(state)
|
|
if not state:
|
|
await ctx.send(_("I will not edit my last message for the notifications."))
|
|
else:
|
|
await ctx.send(_("I will edit my last message for the notifications."))
|
|
|
|
@commands.is_owner()
|
|
@minecraft.command(hidden=True)
|
|
async def forcecheck(self, ctx: commands.Context) -> None:
|
|
"""Force check Minecraft Java servers in Config."""
|
|
await self.check_servers()
|
|
await ctx.send(_("Servers checked."))
|
|
|
|
@commands.is_owner()
|
|
@commands.bot_has_permissions(embed_links=True)
|
|
@minecraft.command(hidden=True)
|
|
async def getdebugloopsstatus(self, ctx: commands.Context):
|
|
"""Get an embed for check loop status."""
|
|
embeds = [loop.get_debug_embed() for loop in self.loops]
|
|
await Menu(pages=embeds).start(ctx)
|
|
|
|
@commands.Cog.listener()
|
|
async def on_assistant_cog_add(
|
|
self, assistant_cog: typing.Optional[commands.Cog] = None
|
|
) -> None: # Vert's Assistant integration/third party.
|
|
if assistant_cog is None:
|
|
return self.get_minecraft_java_server_for_assistant
|
|
schema = {
|
|
"name": "get_minecraft_java_server_for_assistant",
|
|
"description": "Get informations about a Minecraft Java server.",
|
|
"parameters": {
|
|
"type": "object",
|
|
"properties": {
|
|
"server_url": {
|
|
"type": "string",
|
|
"description": "The URL of the Minecraft Java server.",
|
|
},
|
|
},
|
|
"required": ["server_url"],
|
|
},
|
|
}
|
|
await assistant_cog.register_function(cog_name=self.qualified_name, schema=schema)
|
|
|
|
async def get_minecraft_java_server_for_assistant(self, server_url: str, *args, **kwargs):
|
|
try:
|
|
server: JavaServer = await JavaServer.async_lookup(address=server_url.lower())
|
|
status = await server.async_status()
|
|
except Exception:
|
|
return "No data found for this Minecraft Java server."
|
|
server_description = await self.clear_mcformatting(status.description)
|
|
data = {
|
|
"Host & Port": f"{server.address.host}:{server.address.port}",
|
|
"Description": box(server_description),
|
|
"Status": (
|
|
"Offline."
|
|
if "This server is offline." in server_description
|
|
else (
|
|
"Currently stopping."
|
|
if "This server is currently stopping." in server_description
|
|
else "Online."
|
|
)
|
|
),
|
|
"Latency": f"{status.latency:.2f} ms",
|
|
"Players": f"{status.players.online}/{status.players.max}",
|
|
"Version": status.version.name,
|
|
"Protocol": status.version.protocol,
|
|
}
|
|
return [f"{key}: {value}\n" for key, value in data.items() if value is not None]
|