Ruby-Cogs/devutils/devutils.py
2025-04-02 22:56:57 -04:00

420 lines
15 KiB
Python

from AAA3A_utils import Cog, CogsUtils, Menu # isort:skip
from redbot.core import commands # isort:skip
from redbot.core.bot import Red # isort:skip
from redbot.core.i18n import Translator, cog_i18n # isort:skip
import discord # isort:skip
import typing # isort:skip
import asyncio
import importlib
import json
import logging
import re
import sys
import time
from discord.http import Route
from red_commons.logging import TRACE, VERBOSE, getLogger
from redbot.core.utils.chat_formatting import humanize_list
# Credits:
# General repo credits.
# Thanks to Phen for the original code (https://github.com/phenom4n4n/phen-cogs/tree/master/phenutils)!
_: Translator = Translator("Devutils", __file__)
SLEEP_FLAG = re.compile(r"(?:--|—)sleep (\d+)$")
class LogLevelConverter(commands.Converter):
async def convert(self, ctx: commands.Context, argument: str) -> int:
levels = {
"CRITICAL": logging.CRITICAL,
"ERROR": logging.ERROR,
"WARNING": logging.WARNING,
"INFO": logging.INFO,
"DEBUG": logging.DEBUG,
"VERBOSE": VERBOSE,
"TRACE": TRACE,
}
if argument.upper() in levels:
return levels[argument.upper()]
try:
argument = int(argument)
except ValueError:
pass
else:
try:
return list(levels.values())[argument]
except IndexError:
pass
raise commands.BadArgument(_("No valid log level provided."))
class StrConverter(commands.Converter):
async def convert(self, ctx: commands.Context, argument: str) -> str:
return argument
class RawRequestConverter(commands.Converter):
async def convert(self, ctx: commands.Context, argument: str):
_types = [
discord.Guild,
discord.abc.GuildChannel,
discord.Thread,
discord.Member,
discord.User,
discord.Role,
discord.Emoji,
discord.Message,
discord.Invite,
]
# _types = list(discord.ext.commands.converter.CONVERTER_MAPPING.keys())[1:]
for _type in _types:
try:
return await discord.ext.commands.converter.CONVERTER_MAPPING[_type]().convert(
ctx, argument
)
except commands.BadArgument:
pass
raise commands.BadArgument(_("No valid discord object provided."))
@cog_i18n(_)
class DevUtils(Cog):
"""Various development utilities!"""
__authors__: typing.List[str] = ["PhenoM4n4n", "AAA3A"]
@commands.Cog.listener()
async def on_message_without_command(self, message: discord.Message):
if await self.bot.cog_disabled_in_guild(
cog=self, guild=message.guild
) or not await self.bot.allowed_by_whitelist_blacklist(who=message.author):
return
if message.webhook_id is not None or message.author.bot:
return
context = await self.bot.get_context(message)
if context.prefix is None:
return
command = context.message.content[len(str(context.prefix)) :]
if len(command.split(" ")) == 0:
return
command_name = command.split(" ")[0]
if command_name not in (
"do",
"execute",
"bypass",
"timing",
"reinvoke",
"loglevel",
"stoptyping",
"reloadmodule",
"rawrequest",
):
return
await CogsUtils.invoke_command(
bot=self.bot,
author=context.author,
channel=context.channel,
command=f"devutils {command}",
prefix=context.prefix,
message=context.message,
)
@commands.is_owner()
@commands.hybrid_group()
async def devutils(self, ctx: commands.Context) -> None:
"""Various development utilities."""
pass
@devutils.command()
async def do(
self, ctx, times: int, sequential: typing.Optional[bool] = True, *, command: str
) -> None:
"""
Repeats a command a specified number of times.
`--sleep <int>` is an optional flag specifying how much time to wait between command invocations.
"""
if (match := SLEEP_FLAG.search(command)) is not None:
sleep = int(match.group(1))
command = command[: -len(match.group(0))]
else:
sleep = 1
new_ctx = await CogsUtils.invoke_command(
bot=ctx.bot,
author=ctx.author,
channel=ctx.channel,
command=command,
prefix=ctx.prefix,
message=ctx.message,
invoke=False,
)
if not new_ctx.valid:
raise commands.UserFeedbackCheckFailure(_("You have not specified a correct command."))
if not await discord.utils.async_all([check(new_ctx) for check in new_ctx.command.checks]):
raise commands.UserFeedbackCheckFailure(_("You can't execute yourself this command."))
if sequential:
for __ in range(times):
await ctx.bot.invoke(new_ctx)
await asyncio.sleep(sleep)
else:
todo = [ctx.bot.invoke(new_ctx) for _ in range(times)]
await asyncio.gather(*todo)
@devutils.command()
async def execute(
self,
ctx: commands.Context,
sequential: typing.Optional[bool] = True,
*,
commands_list: str,
) -> None:
"""Execute multiple commands at once. Split them using |."""
commands_list = [command.strip() for command in commands_list.split("|")]
if sequential:
for command in commands_list:
new_ctx = await CogsUtils.invoke_command(
bot=ctx.bot,
author=ctx.author,
channel=ctx.channel,
command=command,
prefix=ctx.prefix,
message=ctx.message,
invoke=True,
)
if not new_ctx.valid:
raise commands.UserFeedbackCheckFailure(
_("`{command}` isn't a valid command.").format(command=command)
)
if not await discord.utils.async_all(
[check(new_ctx) for check in new_ctx.command.checks]
):
raise commands.UserFeedbackCheckFailure(
_("You can't execute yourself `{command}`.").format(command=command)
)
else:
todo = []
for command in commands_list:
new_ctx = await CogsUtils.invoke_command(
bot=ctx.bot,
author=ctx.author,
channel=ctx.channel,
command=command,
prefix=ctx.prefix,
message=ctx.message,
invoke=False,
)
if not new_ctx.valid:
raise commands.UserFeedbackCheckFailure(
_("`{command}` isn't a valid command.").format(command=command)
)
if not await discord.utils.async_all(
[check(new_ctx) for check in new_ctx.command.checks]
):
raise commands.UserFeedbackCheckFailure(
_("You can't execute yourself `{command}`.").format(command=command)
)
todo.append(ctx.bot.invoke(new_ctx))
await asyncio.gather(*todo)
@devutils.command()
async def bypass(self, ctx: commands.Context, *, command: str) -> None:
"""Bypass a command's checks and cooldowns."""
new_ctx = await CogsUtils.invoke_command(
bot=ctx.bot,
author=ctx.author,
channel=ctx.channel,
command=command,
prefix=ctx.prefix,
message=ctx.message,
invoke=False,
)
if not new_ctx.valid:
raise commands.UserFeedbackCheckFailure(_("You have not specified a correct command."))
await new_ctx.reinvoke()
@devutils.command()
async def timing(self, ctx: commands.Context, *, command: str) -> None:
"""Run a command timing execution and catching exceptions."""
new_ctx = await CogsUtils.invoke_command(
bot=ctx.bot,
author=ctx.author,
channel=ctx.channel,
command=command,
prefix=ctx.prefix,
message=ctx.message,
invoke=False,
)
if not new_ctx.valid:
raise commands.UserFeedbackCheckFailure(_("You have not specified a correct command."))
if not await discord.utils.async_all([check(new_ctx) for check in new_ctx.command.checks]):
raise commands.UserFeedbackCheckFailure(_("You can't execute yourself this command."))
start = time.perf_counter()
await ctx.bot.invoke(new_ctx)
end = time.perf_counter()
return await ctx.send(
_("Command `{command}` finished in `{timing}`s.").format(
command=new_ctx.command.qualified_name, timing=f"{end - start:.3f}"
)
)
@devutils.command()
async def reinvoke(self, ctx: commands.Context, message: discord.Message = None) -> None:
"""Reinvoke a command message.
You may reply to a message to reinvoke it or pass a message ID/link.
The command will be invoked with the author and the channel of the specified message.
"""
if message is None:
if not (
ctx.message.reference is not None
and isinstance((message := ctx.message.reference.resolved), discord.Message)
):
raise commands.UserInputError()
new_ctx = await CogsUtils.invoke_command(
bot=ctx.bot,
author=message.author,
channel=message.channel,
command=(
f"{ctx.prefix}devutils reinvoke{message.content[len(ctx.prefix)+8:]}"
if message.content.startswith(f"{ctx.prefix}reinvoke")
else message.content
),
prefix="",
message=message,
)
if not new_ctx.valid:
raise commands.UserFeedbackCheckFailure(_("The command isn't valid."))
if not await discord.utils.async_all([check(new_ctx) for check in new_ctx.command.checks]):
raise commands.UserFeedbackCheckFailure(_("This command can't be executed."))
@devutils.command()
async def loglevel(
self, ctx: commands.Context, level: LogLevelConverter, logger_name: str = "red"
) -> None:
"""Change the logging level for a logger. If no name is provided, the root logger (`red`) is used.
Levels are the following:
- `0`: `CRITICAL`
- `1`: `ERROR`
- `2`: `WARNING`
- `3`: `INFO`
- `4`: `DEBUG`
- `5`: `VERBOSE`
- `6`: `TRACE`
"""
logger = getLogger(logger_name)
logger.setLevel(level)
await ctx.send(
_("Logger `{logger_name}` level set to `{level}`.").format(
level=logging.getLevelName(logger.level), logger_name=logger_name
)
)
@devutils.command()
async def stoptyping(self, ctx: commands.Context) -> None:
"""Stop all bot typing tasks."""
tasks = []
was_typing = False
for task in asyncio.all_tasks():
if task.get_stack(limit=1)[0].f_code.co_name == "do_typing":
tasks.append(task)
was_typing = True
if not was_typing:
raise commands.UserFeedbackCheckFailure("Hmm, it doesn't look like I'm typing...")
for task in tasks:
task.cancel()
@devutils.command()
async def reloadmodule(
self, ctx: commands.Context, modules: commands.Greedy[StrConverter]
) -> None:
"""Force reload a module (to use code changes without restarting your bot).
⚠️ Please only use this if you know what you're doing.
"""
_modules = []
for module in modules:
_modules.extend(
[
m
for m in sys.modules
if m.split(".")[: len(module.split("."))] == module.split(".")
]
)
modules = sorted(_modules, reverse=True)
if not modules:
raise commands.UserFeedbackCheckFailure(
_("I couldn't find any module with this name.")
)
for module in modules:
importlib.reload(sys.modules[module])
text = _("Module(s) {modules} reloaded.").format(
modules=humanize_list([f"`{module}`" for module in modules])
)
if len(text) <= 2000:
await ctx.send(text)
else:
await ctx.send(_("Modules [...] reloaded."))
@devutils.command(aliases=["rawcontent"])
async def rawrequest(self, ctx: commands.Context, *, thing: RawRequestConverter) -> None:
"""Display the JSON of a Discord object with a raw request."""
if isinstance(thing, discord.Guild):
raw_content = await ctx.bot.http.request(
route=Route(method="GET", path="/guilds/{guild_id}", guild_id=thing.id)
)
elif isinstance(thing, (discord.abc.GuildChannel, discord.Thread)):
raw_content = await ctx.bot.http.request(
route=Route(method="GET", path="/channels/{channel_id}", channel_id=thing.id)
)
elif isinstance(thing, discord.Member):
raw_content = await ctx.bot.http.request(
route=Route(
method="GET",
path="/guilds/{guild_id}/members/{user_id}",
guild_id=thing.guild.id,
user_id=thing.id,
)
)
elif isinstance(thing, discord.User):
raw_content = await ctx.bot.http.request(
route=Route(method="GET", path="/users/{user_id}", user_id=thing.id)
)
elif isinstance(thing, discord.Role):
raw_content = [
role
for role in await ctx.bot.http.request(
route=Route(
method="GET", path="/guilds/{guild_id}/roles", guild_id=thing.guild.id
)
)
if int(role["id"]) == thing.id
][0]
elif isinstance(thing, discord.Emoji):
raw_content = await ctx.bot.http.request(
route=Route(
method="GET",
path="/guilds/{guild_id}/emojis/{emoji_id}",
guild_id=thing.guild.id,
emoji_id=thing.id,
)
)
elif isinstance(thing, discord.Message):
raw_content = await ctx.bot.http.request(
route=Route(
method="GET",
path="/channels/{channel_id}/messages/{message_id}",
channel_id=thing.channel.id,
message_id=thing.id,
)
)
elif isinstance(thing, discord.Invite):
raw_content = await ctx.bot.http.request(
route=Route(method="GET", path="/invites/{invite_code}", invite_code=thing.code)
)
await Menu(json.dumps(raw_content, indent=4), lang="json").start(ctx)