1123 lines
44 KiB
Python
1123 lines
44 KiB
Python
from AAA3A_utils import Cog, Menu, Settings, CogsUtils # isort:skip
|
|
from redbot.core import commands, app_commands, Config # isort:skip
|
|
from redbot.core.bot import Red # isort:skip
|
|
from redbot.core.i18n import Translator, cog_i18n # isort:skip
|
|
import discord # isort:skip
|
|
import typing # isort:skip
|
|
|
|
import ast
|
|
import asyncio
|
|
import collections
|
|
import contextlib
|
|
import io
|
|
import random
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
import textwrap
|
|
|
|
import aiohttp
|
|
import rich
|
|
from pygments.styles import get_style_by_name
|
|
from redbot.core import dev_commands
|
|
from redbot.core.utils.chat_formatting import box
|
|
from redbot.core.utils.predicates import MessagePredicate
|
|
|
|
from .dashboard_integration import DashboardIntegration
|
|
from .env import DevEnv, DevSpace, Exit, ctxconsole
|
|
from .view import ExecuteView, cleanup_code
|
|
|
|
# Credits:
|
|
# General repo credits.
|
|
# Thanks to Cogs-Creators for the original Dev cog!
|
|
# Thanks to Zeph for many ideas and a big part of the code (code removed from public)!
|
|
|
|
_: Translator = Translator("Dev", __file__)
|
|
|
|
TimeConverter: commands.converter.TimedeltaConverter = commands.converter.TimedeltaConverter(
|
|
minimum=None,
|
|
maximum=None,
|
|
allowed_units=None,
|
|
default_unit="minutes",
|
|
)
|
|
|
|
|
|
class SolarizedCustom(get_style_by_name("solarized-dark")):
|
|
background_color = None
|
|
line_number_background_color = None
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def redirect(**kwargs):
|
|
if "file" not in kwargs:
|
|
kwargs["file"] = file = io.StringIO()
|
|
else:
|
|
file = None
|
|
console = rich.console.Console(**kwargs)
|
|
token = ctxconsole.set(console)
|
|
try:
|
|
yield console
|
|
finally:
|
|
ctxconsole.reset(token)
|
|
if file:
|
|
file.close()
|
|
|
|
|
|
class DevOutput(dev_commands.DevOutput):
|
|
def __init__(self, *args, **kwargs) -> None:
|
|
self._locals: typing.Dict[str, typing.Any] = kwargs.pop("_locals", {})
|
|
self.prints: str = ""
|
|
self.rich_tracebacks: bool = kwargs.pop("rich_tracebacks", False)
|
|
self.exc: typing.Optional[Exception] = None
|
|
super().__init__(*args, **kwargs)
|
|
|
|
def __str__(self, output_mode: typing.Literal["repr", "repr_or_str", "str"] = "repr") -> str:
|
|
_console_custom_kwargs: typing.Dict[str, typing.Any] = self.env.get(
|
|
"_console_custom",
|
|
{
|
|
"width": 80,
|
|
"no_color": True,
|
|
"color_system": None,
|
|
"tab_size": 2,
|
|
"soft_wrap": False,
|
|
},
|
|
)
|
|
with redirect(**_console_custom_kwargs) as console:
|
|
with console.capture() as captured:
|
|
if formatted_imports := self.env.get_formatted_imports():
|
|
console.print(
|
|
rich.syntax.Syntax(formatted_imports, "pycon", theme=SolarizedCustom)
|
|
)
|
|
if self.prints:
|
|
console.print(self.prints)
|
|
if printed := self._stream.getvalue():
|
|
console.print(printed.strip())
|
|
if self.formatted_exc:
|
|
console.print(self.formatted_exc.strip())
|
|
elif (
|
|
self.result is not None
|
|
or self.always_include_result
|
|
# and not self.prints
|
|
# and not formatted_imports
|
|
# and not printed
|
|
):
|
|
if output_mode == "str":
|
|
result = str(self.result)
|
|
elif (
|
|
isinstance(self.result, collections.abc.Iterable)
|
|
and not (output_mode == "repr" and isinstance(self.result, str))
|
|
or hasattr(self.result, "__dataclass_fields__")
|
|
):
|
|
result = self.result
|
|
else:
|
|
result = repr(self.result)
|
|
try:
|
|
console.print(result)
|
|
except Exception as exc:
|
|
console.print(self.format_exception(exc).strip())
|
|
output = captured.get().strip()
|
|
return CogsUtils.replace_var_paths(dev_commands.sanitize_output(self.ctx, output)).replace(
|
|
"```", "\u02CB\u02CB\u02CB"
|
|
)
|
|
|
|
async def send(
|
|
self,
|
|
*,
|
|
tick: bool = True,
|
|
output_mode: typing.Literal["repr", "repr_or_str", "str"] = "repr",
|
|
ansi_formatting: bool = False,
|
|
send_interactive: bool = False,
|
|
send_dpy_objects: bool = True,
|
|
wait: bool = True,
|
|
) -> None:
|
|
if send_dpy_objects and self.result is not None:
|
|
kwargs = {}
|
|
channel_permissions = self.ctx.channel.permissions_for(self.ctx.me)
|
|
if isinstance(self.result, discord.Embed) and channel_permissions.embed_links:
|
|
kwargs["embed"] = self.result
|
|
elif isinstance(self.result, discord.File) and channel_permissions.attach_files:
|
|
kwargs["file"] = self.result
|
|
elif isinstance(self.result, discord.abc.Iterable):
|
|
kwargs = {"embeds": [], "files": []}
|
|
for element in self.result:
|
|
if isinstance(element, discord.Embed) and channel_permissions.embed_links:
|
|
if (
|
|
len(kwargs["embeds"]) < 10
|
|
and (sum(len(embed) for embed in kwargs["embeds"]) + len(element))
|
|
<= 6000
|
|
):
|
|
kwargs["embeds"].append(element)
|
|
elif isinstance(element, discord.File) and channel_permissions.attach_files:
|
|
if (sum(len(file) for file in kwargs["files"]) + len(element)) <= 6000:
|
|
kwargs["files"].append(element)
|
|
for key in ("embeds", "files"):
|
|
if not kwargs[key]:
|
|
del kwargs[key]
|
|
if kwargs:
|
|
try:
|
|
await Menu(pages=[kwargs]).start(self.ctx, wait=False)
|
|
except discord.HTTPException:
|
|
pass
|
|
if tick and self.exc is not None:
|
|
await self.ctx.react_quietly(
|
|
reaction=(
|
|
"❗"
|
|
if isinstance(self.exc, SyntaxError)
|
|
else (
|
|
"⏰"
|
|
if isinstance(
|
|
self.exc,
|
|
(
|
|
TimeoutError,
|
|
asyncio.TimeoutError,
|
|
aiohttp.ClientTimeout,
|
|
aiohttp.ServerTimeoutError,
|
|
subprocess.TimeoutExpired,
|
|
),
|
|
)
|
|
else "❌"
|
|
)
|
|
)
|
|
)
|
|
box_lang = (
|
|
"ini" if self.ctx.command.name == "eshell" else ("ansi" if ansi_formatting else "py")
|
|
)
|
|
if send_interactive:
|
|
task = self.ctx.send_interactive(
|
|
[
|
|
box(page, lang=box_lang)
|
|
for page in dev_commands.get_pages(
|
|
(
|
|
f"{self.env['prefix_dev_output']}\n\n"
|
|
if "prefix_dev_output" in self.env
|
|
else None
|
|
)
|
|
+ self.__str__(output_mode=output_mode)
|
|
)
|
|
],
|
|
)
|
|
if wait:
|
|
await task
|
|
else:
|
|
await asyncio.create_task(task)
|
|
elif pages := self.__str__(output_mode=output_mode):
|
|
await Menu(
|
|
pages=pages,
|
|
prefix=self.env.get("prefix_dev_output"),
|
|
lang=box_lang,
|
|
).start(
|
|
self.ctx,
|
|
wait=wait,
|
|
)
|
|
if tick and self.exc is None:
|
|
await self.ctx.react_quietly(
|
|
# sourcery skip: swap-if-expression
|
|
reaction=(
|
|
commands.context.TICK
|
|
if not hasattr(commands.context, "MORE_TICKS")
|
|
else random.choice(list(commands.context.MORE_TICKS))
|
|
)
|
|
)
|
|
|
|
@classmethod
|
|
async def from_debug(
|
|
cls,
|
|
ctx: commands.Context,
|
|
*,
|
|
source: str,
|
|
source_cache: dev_commands.SourceCache,
|
|
env: typing.Dict[str, typing.Any],
|
|
**kwargs,
|
|
) -> "DevOutput":
|
|
output = cls(
|
|
ctx,
|
|
source=source,
|
|
source_cache=source_cache,
|
|
filename=f"<debug command - snippet #{source_cache.take_next_index()}>",
|
|
env=env,
|
|
**kwargs,
|
|
)
|
|
await output.run_debug()
|
|
return output
|
|
|
|
@classmethod
|
|
async def from_eval(
|
|
cls,
|
|
ctx: commands.Context,
|
|
*,
|
|
source: str,
|
|
source_cache: dev_commands.SourceCache,
|
|
env: typing.Dict[str, typing.Any],
|
|
**kwargs,
|
|
) -> "DevOutput":
|
|
output = cls(
|
|
ctx,
|
|
source=source,
|
|
source_cache=source_cache,
|
|
filename=f"<eval command - snippet #{source_cache.take_next_index()}>",
|
|
env=env,
|
|
**kwargs,
|
|
)
|
|
await output.run_eval()
|
|
return output
|
|
|
|
@classmethod
|
|
async def from_repl(
|
|
cls,
|
|
ctx: commands.Context,
|
|
*,
|
|
source: str,
|
|
source_cache: dev_commands.SourceCache,
|
|
env: typing.Dict[str, typing.Any],
|
|
**kwargs,
|
|
) -> "DevOutput":
|
|
output = cls(
|
|
ctx,
|
|
source=source,
|
|
source_cache=source_cache,
|
|
filename=f"<repl session - snippet #{source_cache.take_next_index()}>",
|
|
env=env,
|
|
**kwargs,
|
|
)
|
|
await output.run_repl()
|
|
return output
|
|
|
|
async def run_debug(self) -> None:
|
|
async def add_triangle_reaction_after_1_seconds():
|
|
await asyncio.sleep(2)
|
|
try:
|
|
await self.ctx.message.add_reaction("▶")
|
|
except discord.HTTPException:
|
|
pass
|
|
|
|
task = asyncio.create_task(add_triangle_reaction_after_1_seconds())
|
|
|
|
self.env.update({"dev_output": self})
|
|
self.env.update(**self._locals)
|
|
_console_custom_kwargs: typing.Dict[str, typing.Any] = self.env.get(
|
|
"_console_custom",
|
|
{
|
|
"width": 80,
|
|
"no_color": True,
|
|
"color_system": None,
|
|
"tab_size": 2,
|
|
"soft_wrap": False,
|
|
},
|
|
).copy()
|
|
_console_custom_kwargs["color_system"] = None
|
|
with redirect(**_console_custom_kwargs) as console:
|
|
with console.capture() as captured:
|
|
try:
|
|
await super().run_debug()
|
|
except Exit: # Not a real exception...
|
|
pass
|
|
except (SystemExit, KeyboardInterrupt):
|
|
raise
|
|
except BaseException as exc:
|
|
self.set_exception(exc)
|
|
self.always_include_result: bool = False
|
|
self.prints: str = captured.get().strip()
|
|
|
|
task.cancel()
|
|
|
|
async def run_eval(self) -> None:
|
|
async def add_triangle_reaction_after_1_seconds():
|
|
await asyncio.sleep(2)
|
|
try:
|
|
await self.ctx.message.add_reaction("▶")
|
|
except discord.HTTPException:
|
|
pass
|
|
|
|
task = asyncio.create_task(add_triangle_reaction_after_1_seconds())
|
|
|
|
self.env.update({"dev_output": self})
|
|
try:
|
|
parse = ast.parse("async def func():\n%s" % textwrap.indent(self.raw_source, " "))
|
|
try:
|
|
return_found = [d for d in parse.body[0].body if isinstance(d, ast.Return)][0]
|
|
except IndexError:
|
|
line = len(self.raw_source.split("\n"))
|
|
else:
|
|
line = return_found.lineno - 2
|
|
_raw_source = self.raw_source.split("\n")
|
|
_raw_source.insert(line, textwrap.indent("dev_output._locals.update(**locals())", ""))
|
|
# `yield` like in Jishaku.
|
|
for line, line_text in enumerate(_raw_source.copy()):
|
|
_line_text = textwrap.dedent(line_text)
|
|
if _line_text.startswith("yield "):
|
|
_raw_source[line] = textwrap.indent(
|
|
f"print(repr(({_line_text[6:]})))",
|
|
(len(line_text) - len(_line_text)) * " ",
|
|
)
|
|
self.raw_source = "\n".join(_raw_source)
|
|
except SyntaxError:
|
|
pass
|
|
self.env.update(**self._locals)
|
|
_console_custom_kwargs: typing.Dict[str, typing.Any] = self.env.get(
|
|
"_console_custom",
|
|
{
|
|
"width": 80,
|
|
"no_color": True,
|
|
"color_system": None,
|
|
"tab_size": 2,
|
|
"soft_wrap": False,
|
|
},
|
|
).copy()
|
|
_console_custom_kwargs["color_system"] = None
|
|
with redirect(**_console_custom_kwargs) as console:
|
|
with console.capture() as captured:
|
|
try:
|
|
await super().run_eval()
|
|
except Exit: # Not a real exception...
|
|
pass
|
|
except (SystemExit, KeyboardInterrupt):
|
|
raise
|
|
except BaseException as exc:
|
|
self.set_exception(exc)
|
|
self.prints: str = captured.get().strip()
|
|
|
|
task.cancel()
|
|
|
|
async def run_repl(self) -> None:
|
|
async def add_triangle_reaction_after_1_seconds():
|
|
await asyncio.sleep(2)
|
|
try:
|
|
await self.ctx.message.add_reaction("▶")
|
|
except discord.HTTPException:
|
|
pass
|
|
|
|
task = asyncio.create_task(add_triangle_reaction_after_1_seconds())
|
|
|
|
self.env.update({"dev_output": self})
|
|
self.env.update(**self._locals)
|
|
_console_custom_kwargs: typing.Dict[str, typing.Any] = self.env.get(
|
|
"_console_custom",
|
|
{
|
|
"width": 80,
|
|
"no_color": True,
|
|
"color_system": None,
|
|
"tab_size": 2,
|
|
"soft_wrap": False,
|
|
},
|
|
).copy()
|
|
_console_custom_kwargs["color_system"] = None
|
|
with redirect(**_console_custom_kwargs) as console:
|
|
with console.capture() as captured:
|
|
try:
|
|
await super().run_repl()
|
|
except (Exit, SystemExit, KeyboardInterrupt): # `Exit` isn't a real exception...
|
|
raise
|
|
except BaseException as exc:
|
|
self.set_exception(exc)
|
|
self.prints: str = captured.get().strip()
|
|
|
|
task.cancel()
|
|
|
|
def set_exception(self, exc: Exception, *, skip_frames: int = 1) -> None:
|
|
self.exc: Exception = exc
|
|
self.formatted_exc: str = self.format_exception(exc, skip_frames=skip_frames)
|
|
|
|
def format_exception(self, exc: Exception, *, skip_frames: int = 1) -> str:
|
|
if not self.rich_tracebacks:
|
|
return super().format_exception(exc=exc, skip_frames=skip_frames)
|
|
_console_custom_kwargs: typing.Dict[str, typing.Any] = self.env.get(
|
|
"_console_custom",
|
|
{
|
|
"width": 80,
|
|
"no_color": True,
|
|
"color_system": None,
|
|
"tab_size": 2,
|
|
"soft_wrap": False,
|
|
},
|
|
).copy()
|
|
_console_custom_kwargs["color_system"] = None
|
|
with redirect(**_console_custom_kwargs) as console:
|
|
with console.capture() as captured:
|
|
tb = exc.__traceback__
|
|
for _ in range(skip_frames):
|
|
if tb is None:
|
|
break
|
|
tb = tb.tb_next
|
|
# sometimes SyntaxError.text is None, sometimes it isn't
|
|
if issubclass(type(exc), SyntaxError) and exc.lineno is not None:
|
|
try:
|
|
source_lines, line_offset = self.source_cache[exc.filename]
|
|
except KeyError:
|
|
pass
|
|
else:
|
|
if exc.text is None:
|
|
try:
|
|
# line numbers are 1-based, the list indexes are 0-based
|
|
exc.text = source_lines[exc.lineno - 1]
|
|
except IndexError:
|
|
# the frame might be pointing at a different source code, ignore...
|
|
pass
|
|
else:
|
|
exc.lineno -= line_offset
|
|
if sys.version_info >= (3, 10) and exc.end_lineno is not None:
|
|
exc.end_lineno -= line_offset
|
|
else:
|
|
exc.lineno -= line_offset
|
|
if sys.version_info >= (3, 10) and exc.end_lineno is not None:
|
|
exc.end_lineno -= line_offset
|
|
rich_tb = rich.traceback.Traceback.from_exception(
|
|
type(exc), exc, tb, extra_lines=0, theme=SolarizedCustom
|
|
)
|
|
console.print(rich_tb)
|
|
return captured.get().strip()
|
|
|
|
|
|
@cog_i18n(_)
|
|
class Dev(DashboardIntegration, Cog, dev_commands.Dev):
|
|
"""Various development focused utilities!"""
|
|
|
|
__authors__: typing.List[str] = ["Cog-Creators", "Zephyrkul (Zephyrkul#1089)", "AAA3A"]
|
|
|
|
def __init__(self, bot: Red) -> None:
|
|
super().__init__(bot=bot)
|
|
|
|
self.env_extensions: typing.Dict[str, typing.Any] = {}
|
|
self.source_cache: dev_commands.SourceCache = dev_commands.SourceCache()
|
|
self._session: aiohttp.ClientSession = None
|
|
self.dev_space: DevSpace = DevSpace()
|
|
|
|
self._last_result: typing.Optional[typing.Any] = None
|
|
self._last_locals: typing.Dict[
|
|
typing.Union[discord.Member, discord.User], typing.Dict[str, typing.Any]
|
|
] = {}
|
|
self.dev_outputs: typing.Dict[discord.Message, DevOutput] = {}
|
|
self.sessions: typing.Dict[int, bool] = {}
|
|
self._repl_tasks: typing.List[asyncio.Task] = []
|
|
self._bypass_cooldowns_task: typing.Optional[asyncio.Task] = None
|
|
|
|
self.config: Config = Config.get_conf(
|
|
self,
|
|
identifier=205192943327321000143939875896557571750,
|
|
force_registration=True,
|
|
)
|
|
self.dev_global: typing.Dict[
|
|
str, typing.Union[typing.Literal["repr", "repr_or_str", "str"], bool]
|
|
] = {
|
|
"auto_imports": True,
|
|
"output_mode": "repr",
|
|
"rich_tracebacks": False,
|
|
"ansi_formatting": False,
|
|
"send_interactive": False,
|
|
"send_dpy_objects": True,
|
|
"use_last_locals": True,
|
|
"downloader_already_agreed": False,
|
|
"use_extended_environment": True,
|
|
}
|
|
self.config.register_global(
|
|
auto_imports=True,
|
|
output_mode="repr",
|
|
rich_tracebacks=False,
|
|
ansi_formatting=False,
|
|
send_interactive=False,
|
|
send_dpy_objects=True,
|
|
use_last_locals=True,
|
|
downloader_already_agreed=False,
|
|
use_extended_environment=True,
|
|
)
|
|
|
|
_settings: typing.Dict[
|
|
str, typing.Dict[str, typing.Union[typing.List[str], bool, str]]
|
|
] = {
|
|
"auto_imports": {
|
|
"converter": bool,
|
|
"description": "Enable or disable auto imports.",
|
|
},
|
|
"output_mode": {
|
|
"converter": typing.Literal["repr", "repr_or_str", "str"],
|
|
"description": "Set the output mode. `repr` is to display the repr of the result. `repr_or_str` is to display in the same way, but a string as a string. `str` is to display the string of the result.",
|
|
},
|
|
"rich_tracebacks": {
|
|
"converter": bool,
|
|
"description": "Use `rich` to display tracebacks.",
|
|
},
|
|
"ansi_formatting": {
|
|
"converter": bool,
|
|
"description": "Use the `ansi` formatting for results.",
|
|
},
|
|
"send_interactive": {
|
|
"converter": bool,
|
|
"description": "Send results with `commands.Context.send_interactive`, not a Menu.",
|
|
},
|
|
"send_dpy_objects": {
|
|
"converter": bool,
|
|
"description": "If the result is an embed/file/attachment object or an iterable of these, send.",
|
|
},
|
|
"use_last_locals": {
|
|
"converter": bool,
|
|
"description": "Use the last locals for each evals. Locals are only registered for `[p]eval`, but can be used in other commands.",
|
|
},
|
|
"downloader_already_agreed": {
|
|
"converter": bool,
|
|
"description": "If enabled, Downloader will no longer prompt you to type `I agree` when adding a repo, even after a bot restart.",
|
|
},
|
|
"use_extended_environment": {
|
|
"converter": bool,
|
|
"description": "Use my own Dev env with useful values.",
|
|
},
|
|
}
|
|
self.settings: Settings = Settings(
|
|
bot=self.bot,
|
|
cog=self,
|
|
config=self.config,
|
|
group=self.config.GLOBAL,
|
|
settings=_settings,
|
|
global_path=[],
|
|
use_profiles_system=False,
|
|
can_edit=True,
|
|
commands_group=self.configuration,
|
|
)
|
|
|
|
async def cog_load(self) -> None:
|
|
await super().cog_load()
|
|
await self.settings.add_commands()
|
|
if (
|
|
await self.config.downloader_already_agreed()
|
|
and (downloader_cog := self.bot.get_cog("Downloader")) is not None
|
|
):
|
|
downloader_cog.already_agreed = True
|
|
self._session: aiohttp.ClientSession = aiohttp.ClientSession()
|
|
|
|
async def cog_unload(self) -> None:
|
|
if self._session is not None:
|
|
await self._session.close()
|
|
core_dev: dev_commands.Dev = dev_commands.Dev()
|
|
core_dev.env_extensions: typing.Dict[str, typing.Any] = self.env_extensions
|
|
core_dev.source_cache: dev_commands.SourceCache = self.source_cache
|
|
core_dev.dev_space: DevSpace = self.dev_space
|
|
core_dev._last_result: typing.Optional[typing.Any] = self._last_result
|
|
# core_dev.sessions: typing.Dict[int, bool] = self.sessions
|
|
for task in self._repl_tasks:
|
|
task.cancel()
|
|
await self.bot.add_cog(core_dev)
|
|
await super().cog_unload()
|
|
|
|
def get_environment(
|
|
self, ctx: commands.Context, use_extended_environment: bool = True
|
|
) -> DevEnv:
|
|
return DevEnv.get_environment(ctx, use_extended_environment=use_extended_environment)
|
|
|
|
async def my_exec(
|
|
self,
|
|
ctx: commands.Context,
|
|
type: typing.Literal["debug", "eval", "repl"],
|
|
source: str,
|
|
env: typing.Optional[typing.Dict[str, typing.Any]] = None,
|
|
send_result: bool = False,
|
|
wait: bool = True,
|
|
) -> bool:
|
|
tasks: typing.List[asyncio.Task] = [
|
|
asyncio.create_task(
|
|
ctx.bot.wait_for("message", check=MessagePredicate.cancelled(ctx))
|
|
),
|
|
asyncio.create_task(
|
|
self._my_exec(
|
|
ctx, type=type, source=source, env=env, send_result=send_result, wait=wait
|
|
)
|
|
),
|
|
]
|
|
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
|
|
for task in pending:
|
|
task.cancel()
|
|
result = done.pop().result()
|
|
return result if result is not None else None
|
|
|
|
async def _my_exec(
|
|
self,
|
|
ctx: commands.Context,
|
|
type: typing.Literal["debug", "eval", "repl"],
|
|
source: str,
|
|
env: typing.Optional[typing.Dict[str, typing.Any]] = None,
|
|
send_result: bool = False,
|
|
wait: bool = True,
|
|
) -> DevOutput:
|
|
source = cleanup_code(source)
|
|
if env is None:
|
|
env = self.get_environment(
|
|
ctx, use_extended_environment=await self.config.use_extended_environment()
|
|
)
|
|
env["auto_imports"] = await self.config.auto_imports()
|
|
if (
|
|
isinstance(ctx.author, (discord.Member, discord.User))
|
|
and ctx.author in self._last_locals
|
|
and await self.config.use_last_locals()
|
|
):
|
|
_locals = self._last_locals[ctx.author]
|
|
else:
|
|
_locals = {}
|
|
types = {
|
|
"debug": DevOutput.from_debug,
|
|
"eval": DevOutput.from_eval,
|
|
"repl": DevOutput.from_repl,
|
|
}
|
|
mobile = ctx.author.is_on_mobile() if isinstance(ctx.author, discord.Member) else False
|
|
if await self.config.ansi_formatting():
|
|
_console_custom_kwargs: typing.Dict[str, typing.Any] = {
|
|
"width": 37 if mobile else 80,
|
|
"no_color": mobile,
|
|
"color_system": None if mobile else "standard",
|
|
"tab_size": 2,
|
|
"soft_wrap": False,
|
|
}
|
|
else:
|
|
_console_custom_kwargs: typing.Dict[str, typing.Any] = {
|
|
"width": 80,
|
|
"no_color": True,
|
|
"color_system": None,
|
|
"tab_size": 2,
|
|
"soft_wrap": False,
|
|
}
|
|
if _console_custom := env.get("_console_custom"):
|
|
_console_custom_kwargs.update(_console_custom)
|
|
env["_console_custom"] = _console_custom_kwargs
|
|
output: DevOutput = await types[type](
|
|
ctx,
|
|
source=source,
|
|
source_cache=self.source_cache,
|
|
env=env,
|
|
rich_tracebacks=await self.config.rich_tracebacks(),
|
|
_locals=_locals,
|
|
)
|
|
self._last_result = output.result
|
|
self.dev_outputs[ctx.message] = output
|
|
if (
|
|
type == "eval"
|
|
and isinstance(ctx.author, (discord.Member, discord.User))
|
|
and output._locals
|
|
):
|
|
if ctx.author not in self._last_locals:
|
|
self._last_locals[ctx.author] = {}
|
|
self._last_locals[ctx.author].update(**output._locals)
|
|
if send_result:
|
|
send_interactive = await self.config.send_interactive()
|
|
send_coroutine = output.send(
|
|
tick=type != "repl",
|
|
output_mode=await self.config.output_mode(),
|
|
ansi_formatting=await self.config.ansi_formatting(),
|
|
send_interactive=send_interactive,
|
|
send_dpy_objects=await self.config.send_dpy_objects(),
|
|
wait=wait,
|
|
)
|
|
if wait and not send_coroutine:
|
|
await send_coroutine
|
|
else:
|
|
asyncio.create_task(send_coroutine)
|
|
return output
|
|
|
|
@commands.is_owner()
|
|
@commands.hybrid_command()
|
|
@app_commands.allowed_installs(guilds=True, users=True)
|
|
# @discord.utils.copy_doc(dev_commands.Dev.debug.callback)
|
|
async def debug(self, ctx: commands.Context, *, code: str = None) -> None:
|
|
"""Evaluate a statement of python code.
|
|
|
|
The bot will always respond with the return value of the code.
|
|
If the return value of the code is a coroutine, it will be awaited,
|
|
and the result of that will be the bot's response.
|
|
|
|
Note: Only one statement may be evaluated. Using certain restricted
|
|
keywords, e.g. yield, will result in a syntax error. For multiple
|
|
lines or asynchronous code, see [p]repl or [p]eval.
|
|
|
|
The code can be within a codeblock, inline code or neither, as long as they are not mixed and they are formatted correctly.
|
|
You can upload a file with the code to be executed, or reply to a message containing the command, from any bot.
|
|
|
|
Environment Variables:
|
|
`ctx` - the command invocation context
|
|
`bot` - the bot object
|
|
`channel` - the current channel object
|
|
`author` - the command author's member object
|
|
`guild` - the current guild object
|
|
`message` - the command's message object
|
|
`_` - the result of the last dev command
|
|
`aiohttp` - the aiohttp library
|
|
`asyncio` - the asyncio library
|
|
`discord` - the discord.py library
|
|
`commands` - the redbot.core.commands module
|
|
`cf` - the redbot.core.utils.chat_formatting module
|
|
(See `[p]setdev getenvironment` for more.)
|
|
"""
|
|
if code is None:
|
|
if ctx.message.attachments:
|
|
try:
|
|
code = (await ctx.message.attachments[0].read()).decode(encoding="utf-8")
|
|
except UnicodeDecodeError:
|
|
raise commands.UserFeedbackCheckFailure(
|
|
_("Unreadable attachment with `utf-8`.")
|
|
)
|
|
elif ctx.message.reference is not None and isinstance(
|
|
(reference := ctx.message.reference.resolved), discord.Message
|
|
):
|
|
if (
|
|
match := re.compile(
|
|
r"(debug|(jsk|jishaku) (py|python|eval|ev))(\n)?( )?(?P<code>(.|\n)*)"
|
|
).search(reference.content)
|
|
) is not None and match.groupdict()["code"].strip():
|
|
code = match.groupdict()["code"]
|
|
elif (
|
|
re.compile(r"```py\n(.|\n)*\n```").match(reference.content)
|
|
and reference.content.count("```") == 2
|
|
):
|
|
code = reference.content
|
|
else:
|
|
raise commands.UserFeedbackCheckFailure(_("This message isn't reachable."))
|
|
else:
|
|
return asyncio.create_task(ExecuteView(cog=self).start(ctx))
|
|
source = cleanup_code(code)
|
|
await self.my_exec(
|
|
getattr(ctx, "original_context", ctx),
|
|
type="debug",
|
|
source=source,
|
|
send_result=True,
|
|
)
|
|
|
|
@commands.is_owner()
|
|
@commands.hybrid_command(name="eval")
|
|
@app_commands.allowed_installs(guilds=True, users=True)
|
|
# @discord.utils.copy_doc(dev_commands.Dev._eval.callback)
|
|
async def _eval(self, ctx: commands.Context, *, body: str = None) -> None:
|
|
"""Execute asynchronous code.
|
|
|
|
This command wraps code into the body of an async function and then
|
|
calls and awaits it. The bot will respond with anything printed to
|
|
stdout, as well as the return value of the function.
|
|
|
|
The code can be within a codeblock, inline code or neither, as long as they are not mixed and they are formatted correctly.
|
|
You can upload a file with the code to be executed, or reply to a message containing the command, from any bot.
|
|
|
|
Environment Variables:
|
|
`ctx` - the command invocation context
|
|
`bot` - the bot object
|
|
`channel` - the current channel object
|
|
`author` - the command author's member object
|
|
`guild` - the current guild object
|
|
`message` - the command's message object
|
|
`_` - the result of the last dev command
|
|
`aiohttp` - the aiohttp library
|
|
`asyncio` - the asyncio library
|
|
`discord` - the discord.py library
|
|
`commands` - the redbot.core.commands module
|
|
`cf` - the redbot.core.utils.chat_formatting module
|
|
(See `[p]setdev getenvironment` for more.)
|
|
"""
|
|
if body is None:
|
|
if ctx.message.attachments:
|
|
try:
|
|
body = (await ctx.message.attachments[0].read()).decode(encoding="utf-8")
|
|
except UnicodeDecodeError:
|
|
raise commands.UserFeedbackCheckFailure(
|
|
_("Unreadable attachment with `utf-8`.")
|
|
)
|
|
elif ctx.message.reference is not None and isinstance(
|
|
(reference := ctx.message.reference.resolved), discord.Message
|
|
):
|
|
if (
|
|
match := re.compile(
|
|
r"(eval|ev|e|(jsk|jishaku) (py|python|eval|ev)|(runcode|executecode) (py|python))(\n)?( )?(?P<body>(.|\n)*)"
|
|
).search(reference.content)
|
|
) is not None and match.groupdict()["body"].strip():
|
|
body = match.groupdict()["body"]
|
|
elif (
|
|
re.compile(r"```py\n(.|\n)*\n```").match(reference.content)
|
|
and reference.content.count("```") == 2
|
|
):
|
|
body = reference.content
|
|
else:
|
|
raise commands.UserInputError()
|
|
else:
|
|
return asyncio.create_task(ExecuteView(cog=self).start(ctx))
|
|
source = cleanup_code(body)
|
|
await self.my_exec(
|
|
getattr(ctx, "original_context", ctx),
|
|
type="eval",
|
|
source=source,
|
|
send_result=True,
|
|
)
|
|
|
|
@commands.is_owner()
|
|
@commands.hybrid_command()
|
|
# @discord.utils.copy_doc(dev_commands.Dev.repl.callback)
|
|
async def repl(self, ctx: commands.Context) -> None:
|
|
"""Open an interactive REPL.
|
|
|
|
The REPL will only recognise code as messages which start with a
|
|
backtick. This includes codeblocks, and as such multiple lines can be
|
|
evaluated.
|
|
|
|
Use `exit()` or `quit` to exit the REPL session, prefixed with
|
|
a backtick so they may be interpreted.
|
|
|
|
You can upload a file with the code to be executed, or reply to a message containing the same command, for any bot.
|
|
|
|
Environment Variables:
|
|
`ctx` - the command invocation context
|
|
`bot` - the bot object
|
|
`channel` - the current channel object
|
|
`author` - the command author's member object
|
|
`guild` - the current guild object
|
|
`message` - the command's message object
|
|
`_` - the result of the last dev command
|
|
`aiohttp` - the aiohttp library
|
|
`asyncio` - the asyncio library
|
|
`discord` - the discord.py library
|
|
`commands` - the redbot.core.commands module
|
|
`cf` - the redbot.core.utils.chat_formatting module
|
|
(See `[p]setdev getenvironment` for more.)
|
|
"""
|
|
if ctx.channel.id in self.sessions:
|
|
if self.sessions[ctx.channel.id]:
|
|
await ctx.send(
|
|
_("Already running a REPL session in this channel. Exit it with `quit`.")
|
|
)
|
|
else:
|
|
await ctx.send(
|
|
_(
|
|
"Already running a REPL session in this channel. Resume the REPL with `{prefix}replresume`."
|
|
).format(prefix=ctx.prefix)
|
|
)
|
|
return
|
|
|
|
env = self.get_environment(
|
|
ctx, use_extended_environment=await self.config.use_extended_environment()
|
|
)
|
|
env["_"] = None
|
|
self.sessions[ctx.channel.id] = True
|
|
await ctx.send(
|
|
_(
|
|
"Enter code to execute or evaluate. `exit()` or `quit` to exit. `{prefix}replpause` to pause."
|
|
).format(prefix=ctx.prefix)
|
|
)
|
|
|
|
while True:
|
|
task = asyncio.create_task(
|
|
ctx.bot.wait_for("message", check=MessagePredicate.regex(r"^`", ctx))
|
|
)
|
|
self._repl_tasks.append(task)
|
|
try:
|
|
response = await task
|
|
except asyncio.CancelledError:
|
|
return
|
|
finally:
|
|
self._repl_tasks.remove(task)
|
|
if not self.sessions[ctx.channel.id]:
|
|
continue
|
|
env["message"] = response
|
|
source = cleanup_code(response.content)
|
|
try:
|
|
# if source in ("quit", "exit", "exit()"):
|
|
# raise Exit()
|
|
output = await self._my_exec(
|
|
getattr(ctx, "original_context", ctx),
|
|
type="repl",
|
|
source=source,
|
|
env=env,
|
|
wait=False,
|
|
send_result=True,
|
|
)
|
|
except Exit:
|
|
break
|
|
try:
|
|
if output.formatted_exc:
|
|
await response.add_reaction("❌")
|
|
elif not str(output):
|
|
await response.add_reaction(
|
|
commands.context.TICK
|
|
if not hasattr(commands.context, "MORE_TICKS")
|
|
else random.choice(list(commands.context.MORE_TICKS))
|
|
)
|
|
except discord.HTTPException:
|
|
pass
|
|
|
|
await ctx.send(_("Exiting."))
|
|
del self.sessions[ctx.channel.id]
|
|
|
|
@commands.is_owner()
|
|
@commands.hybrid_command(name="replpause", aliases=["replresume"])
|
|
async def pause(self, ctx: commands.Context, toggle: bool = None) -> None:
|
|
"""Pauses/resumes the REPL running in the current channel."""
|
|
if ctx.channel.id not in self.sessions:
|
|
await ctx.send(_("There is no currently running REPL session in this channel."))
|
|
return
|
|
if toggle is None:
|
|
toggle = not self.sessions[ctx.channel.id]
|
|
self.sessions[ctx.channel.id] = toggle
|
|
if toggle:
|
|
await ctx.send(_("The REPL session in this channel has been resumed."))
|
|
else:
|
|
await ctx.send(_("The REPL session in this channel is now paused."))
|
|
|
|
@commands.is_owner()
|
|
@commands.hybrid_command()
|
|
async def bypasscooldowns(
|
|
self,
|
|
ctx: commands.Context,
|
|
toggle: typing.Optional[bool] = None,
|
|
*,
|
|
time: TimeConverter = None,
|
|
) -> None:
|
|
"""Give bot owners the ability to bypass cooldowns.
|
|
|
|
Does not persist through restarts.
|
|
"""
|
|
if toggle is None:
|
|
toggle = not ctx.bot._bypass_cooldowns
|
|
if self._bypass_cooldowns_task is not None:
|
|
self._bypass_cooldowns_task.cancel()
|
|
ctx.bot._bypass_cooldowns = toggle
|
|
if toggle:
|
|
await ctx.send(
|
|
_(
|
|
"Bot owners will now bypass all commands with cooldowns{optional_duration}."
|
|
).format(
|
|
optional_duration=(
|
|
"" if time is None else f" for {CogsUtils.get_interval_string(time)}"
|
|
)
|
|
)
|
|
)
|
|
else:
|
|
await ctx.send(
|
|
_(
|
|
"Bot owners will no longer bypass all commands with cooldowns{optional_duration}."
|
|
).format(
|
|
optional_duration=(
|
|
"" if time is None else f" for {CogsUtils.get_interval_string(time)}"
|
|
)
|
|
)
|
|
)
|
|
if time is not None:
|
|
task = asyncio.create_task(asyncio.sleep(time.total_seconds()))
|
|
self._bypass_cooldowns_task: asyncio.Task = task
|
|
try:
|
|
await task
|
|
except asyncio.CancelledError:
|
|
return
|
|
finally:
|
|
self._bypass_cooldowns_task = None
|
|
ctx.bot._bypass_cooldowns = not toggle
|
|
|
|
@commands.is_owner()
|
|
@commands.hybrid_command(name="eshell")
|
|
@app_commands.allowed_installs(guilds=True, users=True)
|
|
async def _eshell(
|
|
self, ctx: commands.Context, silent: typing.Optional[bool] = False, *, command: str = None
|
|
) -> None:
|
|
"""Execute Shell commands.
|
|
|
|
This command wraps the shell command into a Python code to invoke them.
|
|
|
|
The code can be within a codeblock, inline code or neither, as long as they are not mixed and they are formatted correctly.
|
|
You can upload a file with the code to be executed, or reply to a message containing the command, from any bot.
|
|
"""
|
|
if command is None:
|
|
if ctx.message.attachments:
|
|
try:
|
|
command = (await ctx.message.attachments[0].read()).decode(encoding="utf-8")
|
|
except UnicodeDecodeError:
|
|
raise commands.UserFeedbackCheckFailure(
|
|
_("Unreadable attachment with `utf-8`.")
|
|
)
|
|
elif ctx.message.reference is not None and isinstance(
|
|
(reference := ctx.message.reference.resolved), discord.Message
|
|
):
|
|
if (
|
|
match := re.compile(
|
|
r"(eshell|shell|qshell)(\n)?( )?(?P<command>(.|\n)*)"
|
|
).search(reference.content)
|
|
) is not None and match.groupdict()["command"].strip():
|
|
command = match.groupdict()["command"]
|
|
elif (
|
|
re.compile(r"```py\n(.|\n)*\n```").match(reference.content)
|
|
and reference.content.count("```") == 2
|
|
):
|
|
command = reference.content
|
|
else:
|
|
raise commands.UserInputError()
|
|
else:
|
|
raise commands.UserInputError()
|
|
command = cleanup_code(command)
|
|
|
|
# Thanks Jack for a part of this code!
|
|
source = (
|
|
cleanup_code(
|
|
"""
|
|
import asyncio
|
|
import asyncio.subprocess as asp
|
|
import os
|
|
import sys
|
|
import typing
|
|
|
|
command = '''
|
|
COMMAND
|
|
'''.strip()
|
|
|
|
def get_env() -> typing.Dict[str, str]:
|
|
env = os.environ.copy()
|
|
if hasattr(sys, "real_prefix") or sys.base_prefix != sys.prefix:
|
|
if sys.platform == "win32":
|
|
binfolder = f"{sys.prefix}{os.path.sep}Scripts"
|
|
env["PATH"] = f"{binfolder}{os.pathsep}{env['PATH']}"
|
|
else:
|
|
binfolder = f"{sys.prefix}{os.path.sep}bin"
|
|
env["PATH"] = f"{binfolder}{os.pathsep}{env['PATH']}"
|
|
return env
|
|
|
|
process = await asp.create_subprocess_shell(
|
|
command,
|
|
stdout=asp.PIPE,
|
|
stderr=asp.STDOUT,
|
|
env=get_env(),
|
|
executable=None,
|
|
)
|
|
try:
|
|
await process.wait()
|
|
except asyncio.CancelledError:
|
|
prefix = f"Command was terminated early and this is a partial output:\\n\\n"
|
|
# raise
|
|
else:
|
|
prefix = ""
|
|
finally:
|
|
lines = [line async for line in process.stdout]
|
|
print(prefix + b"".join(lines).decode("utf-8", "replace").strip().replace("\\r", ""))
|
|
"""
|
|
)
|
|
.strip()
|
|
.replace("COMMAND", command)
|
|
)
|
|
if silent:
|
|
source = "\n".join(source.split("\n")[:-3])
|
|
|
|
await self.my_exec(
|
|
getattr(ctx, "original_context", ctx),
|
|
type="eval",
|
|
source=source,
|
|
send_result=True,
|
|
)
|
|
|
|
@commands.is_owner()
|
|
@commands.hybrid_group(name="setdev")
|
|
async def configuration(self, ctx: commands.Context) -> None:
|
|
"""
|
|
Commands to configure Dev.
|
|
"""
|
|
pass
|
|
|
|
@configuration.command(aliases=["getenv", "getformattedenvironment", "getformattedenv"])
|
|
async def getenvironment(self, ctx: commands.Context, show_values: bool = True) -> None:
|
|
"""Display all Dev environment values."""
|
|
env = self.get_environment(
|
|
ctx, use_extended_environment=await self.config.use_extended_environment()
|
|
)
|
|
formatted_env = env.get_formatted_env(show_values=show_values)
|
|
await Menu(pages=formatted_env, lang="py").start(ctx)
|
|
|
|
@configuration.command(aliases=["rlocals"])
|
|
async def resetlocals(self, ctx: commands.Context) -> None:
|
|
"""Reset its own locals in evals."""
|
|
try:
|
|
del self._last_locals[ctx.author]
|
|
except ValueError:
|
|
pass
|