Ruby-Cogs/embedutils/converters.py
2025-04-02 22:56:57 -04:00

402 lines
16 KiB
Python

from AAA3A_utils import Menu # isort:skip
from redbot.core import commands # isort:skip
from redbot.core.i18n import Translator # isort:skip
import discord # isort:skip
import typing # isort:skip
import io
import json
import re
import textwrap
import aiohttp
import yaml
from redbot.core import dev_commands
from redbot.core.utils.chat_formatting import box
_: Translator = Translator("EmbedUtils", __file__)
def cleanup_code(code: str) -> str:
code = dev_commands.cleanup_code(textwrap.dedent(code)).strip()
if code.startswith("json\n"):
code = code[5:]
with io.StringIO(code) as codeio:
for line in codeio:
line = line.strip()
if line and not line.startswith("#"):
break
else:
return "pass"
return code
class StringToEmbed(commands.Converter):
def __init__(
self, *, conversion_type: str = "json", validate: bool = False, allow_content: bool = True
) -> None:
self.CONVERSION_TYPES: typing.Dict[str, typing.Any] = {
"json": self.load_from_json,
"yaml": self.load_from_yaml,
}
self.validate: bool = validate
self.conversion_type: typing.Literal["json", "yaml"] = conversion_type.lower()
self.allow_content: bool = allow_content
try:
self.converter = self.CONVERSION_TYPES[self.conversion_type]
except KeyError as exc:
raise ValueError(
f"`{conversion_type}` is not a valid conversion type for Embed conversion."
) from exc
async def convert(
self, ctx: commands.Context, argument: str
) -> typing.Dict[typing.Literal["content", "embed"], typing.Union[discord.Embed, str]]:
argument = cleanup_code(argument)
data = await self.converter(ctx, argument=argument)
content = self.get_content(data) if isinstance(data, typing.Dict) else None
if isinstance(data, typing.List):
data = data[0]
elif "embed" in data:
data = data["embed"]
elif "embeds" in data:
data = data.get("embeds")[0]
if not data:
raise commands.BadArgument(
_(
"This doesn't seem to be properly formatted embed {conversion_type}. "
"Refer to the link on `{ctx.clean_prefix}help {ctx.command.qualified_name}`."
).format(conversion_type=self.conversion_type.upper(), ctx=ctx)
)
self.check_data_type(ctx, data=data)
kwargs = await self.create_embed(ctx, data=data, content=content)
if self.validate:
await self.validate_embed(ctx, kwargs)
return kwargs
def check_data_type(self, ctx: commands.Context, data, *, data_type=dict) -> None:
if not isinstance(data, data_type):
raise commands.BadArgument(
_(
"This doesn't seem to be properly formatted embed {conversion_type}. "
"Refer to the link on `{ctx.clean_prefix}help {ctx.command.qualified_name}`."
).format(conversion_type=self.conversion_type.upper(), ctx=ctx)
)
async def load_from_json(
self, ctx: commands.Context, argument: str, **kwargs
) -> typing.Dict[str, typing.Any]:
try:
data = json.loads(argument)
except json.decoder.JSONDecodeError as error:
await self.embed_convert_error(ctx, _("JSON Parse Error"), error)
raise commands.BadArgument()
self.check_data_type(ctx, data, **kwargs)
return data
async def load_from_yaml(
self, ctx: commands.Context, argument: str, **kwargs
) -> typing.Dict[str, typing.Any]:
try:
data = yaml.safe_load(argument)
except Exception as error:
await self.embed_convert_error(ctx, _("YAML Parse Error"), error)
raise commands.BadArgument()
self.check_data_type(ctx, data, **kwargs)
return data
def get_content(
self, data: typing.Dict[str, typing.Any], *, content: str = None
) -> typing.Optional[str]:
content = data.pop("content", content)
if content is not None and not self.allow_content:
raise commands.BadArgument(_("The `content` field is not supported for this command."))
return content
async def create_embed(
self, ctx: commands.Context, data: typing.Dict[str, typing.Any], *, content: str = None
) -> typing.Dict[typing.Literal["content", "embed"], typing.Union[discord.Embed, str]]:
content = self.get_content(data, content=content)
if data.get("color") is None:
data.pop("color", None)
if (timestamp := data.get("timestamp")) is not None:
data["timestamp"] = (
timestamp.strip("Z") if isinstance(timestamp, str) else str(timestamp)
)
else:
data.pop("timestamp", None)
try:
embed = discord.Embed.from_dict(data)
length = len(embed)
except Exception as error:
await self.embed_convert_error(ctx, _("Embed Parse Error"), error)
raise commands.BadArgument()
if length > 6000:
raise commands.BadArgument(
_("Embed size exceeds Discord limit of 6000 characters ({length}).").format(
length=length
)
)
return {"content": content, "embed": embed}
async def validate_embed(
self, ctx: commands.Context, kwargs: typing.Dict[str, typing.Union[discord.Embed, str]]
) -> None:
try:
await ctx.channel.send(**kwargs) # ignore tips/monkeypatch cogs
except discord.errors.HTTPException as error:
await self.embed_convert_error(ctx, _("Embed Send Error"), error)
raise commands.BadArgument()
@staticmethod
async def embed_convert_error(
ctx: commands.Context, error_type: str, error: Exception
) -> None:
if getattr(ctx, "__is_mocked__", False):
raise commands.BadArgument(f"{error_type}: `{type(error).__name__}`")
embed: discord.Embed = discord.Embed(
title=f"{error_type}: `{type(error).__name__}`",
description=box(str(error), lang="py"),
color=await ctx.embed_color(),
)
embed.set_footer(
text=_(
"Use `{ctx.prefix}help {ctx.command.qualified_name}` to see an example."
).format(ctx=ctx)
)
await Menu(pages=[embed]).start(ctx)
class ListStringToEmbed(StringToEmbed):
def __init__(self, *, conversion_type: str = "json", limit: int = 10) -> None:
super().__init__(conversion_type=conversion_type, allow_content=False)
self.limit: int = min(limit, 10)
async def convert(
self, ctx: commands.Context, argument: str
) -> typing.Dict[
typing.Literal["content", "embeds"], typing.Union[typing.List[discord.Embed], str]
]:
argument = cleanup_code(argument)
data = await self.converter(ctx, argument=argument, data_type=(dict, list))
content = data.get("content") if isinstance(data, typing.Dict) else None
if isinstance(data, typing.List):
pass
elif "embed" in data:
data = [data["embed"]]
elif "embeds" in data:
data = data["embeds"]
if isinstance(data, typing.Dict):
data = list(data.values())
elif "content" in data:
data = []
else:
data = [data]
self.check_data_type(ctx, data=data, data_type=list)
embeds = []
for i, embed_data in enumerate(data, 1):
kwargs = await self.create_embed(ctx, data=embed_data)
embed = kwargs["embed"]
embeds.append(embed)
if i > self.limit:
raise commands.BadArgument(
_("Embed limit reached ({limit}).").format(limit=self.limit)
)
if content or embeds:
return {"content": content, "embeds": embeds}
else:
raise commands.BadArgument(_("Failed to convert input into embeds."))
class MessageableConverter(commands.Converter):
async def convert(
self, ctx: commands.Context, argument: str
) -> typing.Union[discord.TextChannel, discord.VoiceChannel, discord.Thread]:
for converter in (
commands.TextChannelConverter,
commands.VoiceChannelConverter,
commands.ThreadConverter,
):
try:
channel = await converter().convert(ctx, argument=argument)
except commands.BadArgument:
pass
else:
break
else:
raise commands.BadArgument(_("It's not a valid channel or thread."))
bot_permissions = channel.permissions_for(ctx.me)
if not (bot_permissions.send_messages and bot_permissions.embed_links):
raise commands.BadArgument(
_("I do not have permissions to send embeds in {channel.mention}.").format(
channel=channel
)
)
permissions = channel.permissions_for(ctx.author)
if not (
permissions.send_messages and permissions.embed_links and permissions.manage_messages
):
raise commands.BadArgument(
_("You do not have permissions to send embeds in {channel.mention}.").format(
channel=channel
)
)
return channel
class MyMessageConverter(commands.MessageConverter):
async def convert(self, ctx: commands.Context, argument: str) -> discord.Message:
message = await super().convert(ctx, argument=argument)
if message.author != ctx.me:
raise commands.UserFeedbackCheckFailure(
_(
"I have to be the author of the message. You can use the command without providing a message to send one."
)
)
ctx.message.channel = message.channel
fake_context = await ctx.bot.get_context(ctx.message)
if not await discord.utils.async_all(
[check(fake_context) for check in ctx.bot.get_cog("EmbedUtils").embed_edit.checks]
):
raise commands.BadArgument(
_(
"You are not allowed to edit embeds of an existing message (bot owner can set the permissions with the cog Permissions on the command `[p]embed edit`)."
)
)
return message
class MessageableOrMessageConverter(commands.Converter):
async def convert(
self, ctx: commands.Context, argument: str
) -> typing.Union[discord.TextChannel, discord.VoiceChannel, discord.Thread, discord.Message]:
try:
return await MessageableConverter().convert(ctx, argument=argument)
except commands.BadArgument as e:
try:
return await MyMessageConverter().convert(ctx, argument=argument)
except commands.BadArgument:
raise e
GITHUB_RE = re.compile(
r"https://(?:www\.)?github\.com/(?P<repo>[a-zA-Z0-9-]+/[\w.-]+)/blob/(?P<path>[^#>]+)"
)
GITHUB_GIST_RE = re.compile(
r"https://(?:www\.)?gist\.github\.com/([a-zA-Z0-9-]+)/(?P<gist_id>[a-zA-Z0-9]+)/*"
r"(?P<revision>[a-zA-Z0-9]*)/*(#file-(?P<file_path>[^#>]+))?"
)
PASTEBIN_RE = re.compile(r"https://(?:www\.)?pastebin\.com/(?P<paste_id>[a-zA-Z0-9]+)/*")
HASTEBIN_RE = re.compile(r"https://(?:www\.)?hastebin\.com/(?P<paste_id>[a-zA-Z0-9]+)/*")
GITHUB_HEADERS = {"Accept": "application/vnd.github.v3.raw"}
class PastebinMixin:
async def convert(
self, ctx: commands.Context, argument: str
) -> typing.Dict[
typing.Literal["content", "embed", "embeds"],
typing.Union[discord.Embed, typing.List[discord.Embed], str],
]:
async def _fetch_response(url: str, response_format: str, **kwargs) -> typing.Any:
if "github.com" in url:
api_tokens = await ctx.bot.get_shared_api_tokens(service_name="github")
if (token := api_tokens.get("token")) is not None:
if "headers" not in kwargs:
kwargs["headers"] = {}
kwargs["headers"]["Authorization"] = f"Token {token}"
try:
async with ctx.bot.get_cog("EmbedUtils")._session.get(
url, raise_for_status=True, **kwargs
) as response:
if response_format == "text":
return await response.text()
return await response.json() if response_format == "json" else None
except (aiohttp.ClientResponseError, aiohttp.ClientError) as error:
raise commands.BadArgument(
f"Failed to fetch the content from the URL: {url}\n{box(error.message, lang='py')}"
)
def _find_ref(path: str, refs: typing.List[dict]) -> typing.Tuple[str, str]:
ref, file_path = path.split("/", 1)
for possible_ref in refs:
if path.startswith(possible_ref["name"] + "/"):
ref = possible_ref["name"]
file_path = path[len(ref) + 1 :]
break
return ref, file_path
if _match := list(GITHUB_RE.finditer(argument)):
_match = _match[0].groupdict()
branches = await _fetch_response(
f"https://api.github.com/repos/{_match['repo']}/branches",
response_format="json",
headers=GITHUB_HEADERS,
)
tags = await _fetch_response(
f"https://api.github.com/repos/{_match['repo']}/tags", response_format="json"
)
refs = branches + tags
ref, file_path = _find_ref(_match["path"], refs)
argument = await _fetch_response(
f"https://api.github.com/repos/{_match['repo']}/contents/{file_path}?ref={ref}",
response_format="text",
headers=GITHUB_HEADERS,
)
elif _match := list(GITHUB_GIST_RE.finditer(argument)):
_match = _match[0].groupdict()
revision = _match["revision"]
gist_json = await _fetch_response(
f"https://api.github.com/gists/{_match['gist_id']}{f'/{revision}' if revision != '' else ''}",
response_format="json",
headers=GITHUB_HEADERS,
)
if len(gist_json["files"]) == 1 and _match["file_path"] is None:
file_path = list(gist_json["files"])[0].lower().replace(".", "-")
for gist_file in gist_json["files"]:
if file_path == gist_file.lower().replace(".", "-"):
argument = await _fetch_response(
gist_json["files"][gist_file]["raw_url"],
"text",
headers=GITHUB_HEADERS,
)
elif _match := list(PASTEBIN_RE.finditer(argument)):
_match = _match[0].groupdict()
argument = await _fetch_response(
f"https://pastebin.com/raw/{_match['paste_id']}", response_format="text"
)
elif (_match := list(HASTEBIN_RE.finditer(argument))) and (
token := (await ctx.bot.get_shared_api_tokens(service_name="hastebin")).get("token")
) is not None:
_match = _match[0].groupdict()
argument = await _fetch_response(
f"https://hastebin.com/raw/{_match['paste_id']}",
response_format="text",
headers={"Authentification": f"Bearer {token}"},
)
else:
raise commands.BadArgument(
f"`{argument}` is not a valid code GitHub/Gist/Pastebin/Hastebin link."
)
return await super().convert(ctx, argument=argument)
class PastebinConverter(PastebinMixin, StringToEmbed):
pass
class PastebinListConverter(PastebinMixin, ListStringToEmbed):
pass
class StrConverter(commands.Converter):
async def convert(self, ctx: commands.Context, argument: str) -> str:
return argument