from AAA3A_utils import Menu # isort:skip from redbot.core import commands # isort:skip from redbot.core.i18n import Translator # isort:skip import discord # isort:skip import typing # isort:skip import io import json import re import textwrap import aiohttp import yaml from redbot.core import dev_commands from redbot.core.utils.chat_formatting import box _: Translator = Translator("EmbedUtils", __file__) def cleanup_code(code: str) -> str: code = dev_commands.cleanup_code(textwrap.dedent(code)).strip() if code.startswith("json\n"): code = code[5:] with io.StringIO(code) as codeio: for line in codeio: line = line.strip() if line and not line.startswith("#"): break else: return "pass" return code class StringToEmbed(commands.Converter): def __init__( self, *, conversion_type: str = "json", validate: bool = False, allow_content: bool = True ) -> None: self.CONVERSION_TYPES: typing.Dict[str, typing.Any] = { "json": self.load_from_json, "yaml": self.load_from_yaml, } self.validate: bool = validate self.conversion_type: typing.Literal["json", "yaml"] = conversion_type.lower() self.allow_content: bool = allow_content try: self.converter = self.CONVERSION_TYPES[self.conversion_type] except KeyError as exc: raise ValueError( f"`{conversion_type}` is not a valid conversion type for Embed conversion." ) from exc async def convert( self, ctx: commands.Context, argument: str ) -> typing.Dict[typing.Literal["content", "embed"], typing.Union[discord.Embed, str]]: argument = cleanup_code(argument) data = await self.converter(ctx, argument=argument) content = self.get_content(data) if isinstance(data, typing.Dict) else None if isinstance(data, typing.List): data = data[0] elif "embed" in data: data = data["embed"] elif "embeds" in data: data = data.get("embeds")[0] if not data: raise commands.BadArgument( _( "This doesn't seem to be properly formatted embed {conversion_type}. " "Refer to the link on `{ctx.clean_prefix}help {ctx.command.qualified_name}`." ).format(conversion_type=self.conversion_type.upper(), ctx=ctx) ) self.check_data_type(ctx, data=data) kwargs = await self.create_embed(ctx, data=data, content=content) if self.validate: await self.validate_embed(ctx, kwargs) return kwargs def check_data_type(self, ctx: commands.Context, data, *, data_type=dict) -> None: if not isinstance(data, data_type): raise commands.BadArgument( _( "This doesn't seem to be properly formatted embed {conversion_type}. " "Refer to the link on `{ctx.clean_prefix}help {ctx.command.qualified_name}`." ).format(conversion_type=self.conversion_type.upper(), ctx=ctx) ) async def load_from_json( self, ctx: commands.Context, argument: str, **kwargs ) -> typing.Dict[str, typing.Any]: try: data = json.loads(argument) except json.decoder.JSONDecodeError as error: await self.embed_convert_error(ctx, _("JSON Parse Error"), error) raise commands.BadArgument() self.check_data_type(ctx, data, **kwargs) return data async def load_from_yaml( self, ctx: commands.Context, argument: str, **kwargs ) -> typing.Dict[str, typing.Any]: try: data = yaml.safe_load(argument) except Exception as error: await self.embed_convert_error(ctx, _("YAML Parse Error"), error) raise commands.BadArgument() self.check_data_type(ctx, data, **kwargs) return data def get_content( self, data: typing.Dict[str, typing.Any], *, content: str = None ) -> typing.Optional[str]: content = data.pop("content", content) if content is not None and not self.allow_content: raise commands.BadArgument(_("The `content` field is not supported for this command.")) return content async def create_embed( self, ctx: commands.Context, data: typing.Dict[str, typing.Any], *, content: str = None ) -> typing.Dict[typing.Literal["content", "embed"], typing.Union[discord.Embed, str]]: content = self.get_content(data, content=content) if data.get("color") is None: data.pop("color", None) if (timestamp := data.get("timestamp")) is not None: data["timestamp"] = ( timestamp.strip("Z") if isinstance(timestamp, str) else str(timestamp) ) else: data.pop("timestamp", None) try: embed = discord.Embed.from_dict(data) length = len(embed) except Exception as error: await self.embed_convert_error(ctx, _("Embed Parse Error"), error) raise commands.BadArgument() if length > 6000: raise commands.BadArgument( _("Embed size exceeds Discord limit of 6000 characters ({length}).").format( length=length ) ) return {"content": content, "embed": embed} async def validate_embed( self, ctx: commands.Context, kwargs: typing.Dict[str, typing.Union[discord.Embed, str]] ) -> None: try: await ctx.channel.send(**kwargs) # ignore tips/monkeypatch cogs except discord.errors.HTTPException as error: await self.embed_convert_error(ctx, _("Embed Send Error"), error) raise commands.BadArgument() @staticmethod async def embed_convert_error( ctx: commands.Context, error_type: str, error: Exception ) -> None: if getattr(ctx, "__is_mocked__", False): raise commands.BadArgument(f"{error_type}: `{type(error).__name__}`") embed: discord.Embed = discord.Embed( title=f"{error_type}: `{type(error).__name__}`", description=box(str(error), lang="py"), color=await ctx.embed_color(), ) embed.set_footer( text=_( "Use `{ctx.prefix}help {ctx.command.qualified_name}` to see an example." ).format(ctx=ctx) ) await Menu(pages=[embed]).start(ctx) class ListStringToEmbed(StringToEmbed): def __init__(self, *, conversion_type: str = "json", limit: int = 10) -> None: super().__init__(conversion_type=conversion_type, allow_content=False) self.limit: int = min(limit, 10) async def convert( self, ctx: commands.Context, argument: str ) -> typing.Dict[ typing.Literal["content", "embeds"], typing.Union[typing.List[discord.Embed], str] ]: argument = cleanup_code(argument) data = await self.converter(ctx, argument=argument, data_type=(dict, list)) content = data.get("content") if isinstance(data, typing.Dict) else None if isinstance(data, typing.List): pass elif "embed" in data: data = [data["embed"]] elif "embeds" in data: data = data["embeds"] if isinstance(data, typing.Dict): data = list(data.values()) elif "content" in data: data = [] else: data = [data] self.check_data_type(ctx, data=data, data_type=list) embeds = [] for i, embed_data in enumerate(data, 1): kwargs = await self.create_embed(ctx, data=embed_data) embed = kwargs["embed"] embeds.append(embed) if i > self.limit: raise commands.BadArgument( _("Embed limit reached ({limit}).").format(limit=self.limit) ) if content or embeds: return {"content": content, "embeds": embeds} else: raise commands.BadArgument(_("Failed to convert input into embeds.")) class MessageableConverter(commands.Converter): async def convert( self, ctx: commands.Context, argument: str ) -> typing.Union[discord.TextChannel, discord.VoiceChannel, discord.Thread]: for converter in ( commands.TextChannelConverter, commands.VoiceChannelConverter, commands.ThreadConverter, ): try: channel = await converter().convert(ctx, argument=argument) except commands.BadArgument: pass else: break else: raise commands.BadArgument(_("It's not a valid channel or thread.")) bot_permissions = channel.permissions_for(ctx.me) if not (bot_permissions.send_messages and bot_permissions.embed_links): raise commands.BadArgument( _("I do not have permissions to send embeds in {channel.mention}.").format( channel=channel ) ) permissions = channel.permissions_for(ctx.author) if not ( permissions.send_messages and permissions.embed_links and permissions.manage_messages ): raise commands.BadArgument( _("You do not have permissions to send embeds in {channel.mention}.").format( channel=channel ) ) return channel class MyMessageConverter(commands.MessageConverter): async def convert(self, ctx: commands.Context, argument: str) -> discord.Message: message = await super().convert(ctx, argument=argument) if message.author != ctx.me: raise commands.UserFeedbackCheckFailure( _( "I have to be the author of the message. You can use the command without providing a message to send one." ) ) ctx.message.channel = message.channel fake_context = await ctx.bot.get_context(ctx.message) if not await discord.utils.async_all( [check(fake_context) for check in ctx.bot.get_cog("EmbedUtils").embed_edit.checks] ): raise commands.BadArgument( _( "You are not allowed to edit embeds of an existing message (bot owner can set the permissions with the cog Permissions on the command `[p]embed edit`)." ) ) return message class MessageableOrMessageConverter(commands.Converter): async def convert( self, ctx: commands.Context, argument: str ) -> typing.Union[discord.TextChannel, discord.VoiceChannel, discord.Thread, discord.Message]: try: return await MessageableConverter().convert(ctx, argument=argument) except commands.BadArgument as e: try: return await MyMessageConverter().convert(ctx, argument=argument) except commands.BadArgument: raise e GITHUB_RE = re.compile( r"https://(?:www\.)?github\.com/(?P[a-zA-Z0-9-]+/[\w.-]+)/blob/(?P[^#>]+)" ) GITHUB_GIST_RE = re.compile( r"https://(?:www\.)?gist\.github\.com/([a-zA-Z0-9-]+)/(?P[a-zA-Z0-9]+)/*" r"(?P[a-zA-Z0-9]*)/*(#file-(?P[^#>]+))?" ) PASTEBIN_RE = re.compile(r"https://(?:www\.)?pastebin\.com/(?P[a-zA-Z0-9]+)/*") HASTEBIN_RE = re.compile(r"https://(?:www\.)?hastebin\.com/(?P[a-zA-Z0-9]+)/*") GITHUB_HEADERS = {"Accept": "application/vnd.github.v3.raw"} class PastebinMixin: async def convert( self, ctx: commands.Context, argument: str ) -> typing.Dict[ typing.Literal["content", "embed", "embeds"], typing.Union[discord.Embed, typing.List[discord.Embed], str], ]: async def _fetch_response(url: str, response_format: str, **kwargs) -> typing.Any: if "github.com" in url: api_tokens = await ctx.bot.get_shared_api_tokens(service_name="github") if (token := api_tokens.get("token")) is not None: if "headers" not in kwargs: kwargs["headers"] = {} kwargs["headers"]["Authorization"] = f"Token {token}" try: async with ctx.bot.get_cog("EmbedUtils")._session.get( url, raise_for_status=True, **kwargs ) as response: if response_format == "text": return await response.text() return await response.json() if response_format == "json" else None except (aiohttp.ClientResponseError, aiohttp.ClientError) as error: raise commands.BadArgument( f"Failed to fetch the content from the URL: {url}\n{box(error.message, lang='py')}" ) def _find_ref(path: str, refs: typing.List[dict]) -> typing.Tuple[str, str]: ref, file_path = path.split("/", 1) for possible_ref in refs: if path.startswith(possible_ref["name"] + "/"): ref = possible_ref["name"] file_path = path[len(ref) + 1 :] break return ref, file_path if _match := list(GITHUB_RE.finditer(argument)): _match = _match[0].groupdict() branches = await _fetch_response( f"https://api.github.com/repos/{_match['repo']}/branches", response_format="json", headers=GITHUB_HEADERS, ) tags = await _fetch_response( f"https://api.github.com/repos/{_match['repo']}/tags", response_format="json" ) refs = branches + tags ref, file_path = _find_ref(_match["path"], refs) argument = await _fetch_response( f"https://api.github.com/repos/{_match['repo']}/contents/{file_path}?ref={ref}", response_format="text", headers=GITHUB_HEADERS, ) elif _match := list(GITHUB_GIST_RE.finditer(argument)): _match = _match[0].groupdict() revision = _match["revision"] gist_json = await _fetch_response( f"https://api.github.com/gists/{_match['gist_id']}{f'/{revision}' if revision != '' else ''}", response_format="json", headers=GITHUB_HEADERS, ) if len(gist_json["files"]) == 1 and _match["file_path"] is None: file_path = list(gist_json["files"])[0].lower().replace(".", "-") for gist_file in gist_json["files"]: if file_path == gist_file.lower().replace(".", "-"): argument = await _fetch_response( gist_json["files"][gist_file]["raw_url"], "text", headers=GITHUB_HEADERS, ) elif _match := list(PASTEBIN_RE.finditer(argument)): _match = _match[0].groupdict() argument = await _fetch_response( f"https://pastebin.com/raw/{_match['paste_id']}", response_format="text" ) elif (_match := list(HASTEBIN_RE.finditer(argument))) and ( token := (await ctx.bot.get_shared_api_tokens(service_name="hastebin")).get("token") ) is not None: _match = _match[0].groupdict() argument = await _fetch_response( f"https://hastebin.com/raw/{_match['paste_id']}", response_format="text", headers={"Authentification": f"Bearer {token}"}, ) else: raise commands.BadArgument( f"`{argument}` is not a valid code GitHub/Gist/Pastebin/Hastebin link." ) return await super().convert(ctx, argument=argument) class PastebinConverter(PastebinMixin, StringToEmbed): pass class PastebinListConverter(PastebinMixin, ListStringToEmbed): pass class StrConverter(commands.Converter): async def convert(self, ctx: commands.Context, argument: str) -> str: return argument