Ruby-Cogs/discordsearch/discordsearch.py
2025-04-02 22:56:57 -04:00

349 lines
14 KiB
Python

from AAA3A_utils import Cog, Menu # isort:skip
from redbot.core import commands # isort:skip
from redbot.core.bot import Red # isort:skip
from redbot.core.i18n import Translator, cog_i18n # isort:skip
import discord # isort:skip
import typing # isort:skip
import argparse
import asyncio
import datetime
import functools
import multiprocessing
import re
from time import monotonic
import dateparser
from redbot.core.utils.chat_formatting import bold, box, underline
from redbot.core.utils.common_filters import URL_RE
# Credits:
# General repo credits.
# Thanks to Trusty for the secure way to manage user Regexes (https://github.com/TrustyJAID/Trusty-cogs/blob/master/retrigger/triggerhandler.py#L542-L606)!
_: Translator = Translator("DiscordSearch", __file__)
class StrConverter(commands.Converter):
async def convert(self, ctx: commands.Context, argument: str) -> str:
return argument
@cog_i18n(_)
class DiscordSearch(Cog):
"""A cog to edit roles!"""
def __init__(self, bot: Red) -> None:
super().__init__(bot=bot)
self.re_pool: multiprocessing.Pool = multiprocessing.Pool()
@commands.guild_only()
@commands.admin_or_permissions(administrator=True)
@commands.cooldown(rate=3, per=30, type=commands.BucketType.user)
@commands.bot_has_permissions(embed_links=True)
@commands.hybrid_command(name="discordsearch", aliases=["dsearch"])
async def discordsearch(
self,
ctx: commands.Context,
channel: typing.Optional[discord.TextChannel],
args: commands.Greedy[StrConverter],
) -> None:
"""Search for a message on Discord in a channel.
Warning: The bot uses the api for each search.
Arguments:
`--author @user1 --author user2#1234 --author 0123456789`
`--mention @user1 --mention user2#1234 --mention 0123456789`
`--before now`
`--after "25/12/2000 00h00"`
`--pinned true`
`--content "AAA3A-cogs"`
`--regex "\\[p\\]"`
`--contain link --contain embed --contain file`
`--limit 100` (It's the limit of the number of messages taken into account in the search, not the number of results.)
"""
if not args:
raise commands.UserInputError()
try:
args = await SearchArgs().convert(ctx, args)
except commands.BadArgument as e:
await ctx.send(e)
return
authors = args.authors
mentions = args.mentions
before = args.before
after = args.after
pinned = args.pinned
content = args.content
regex = args.regex
contains = args.contains
limit = args.limit
if channel is None:
channel = ctx.channel
if all(
setting is None
for setting in (
authors,
mentions,
before,
after,
pinned,
content,
regex,
contains,
limit,
)
):
raise commands.UserFeedbackCheckFailure(_("You must provide at least one parameter."))
args_str = [
underline("--- Settings of search ---"),
bold("Authors:")
+ " "
+ (
", ".join([author.mention for author in authors])
if authors is not None
else "None"
),
bold("Mentions:")
+ " "
+ (
", ".join([mention.mention for mention in mentions])
if mentions is not None
else "None"
),
bold("Before:") + " " + f"{before}",
bold("After:") + " " + f"{after}",
bold("Pinned:") + " " + f"{pinned}",
bold("Content:") + " " + (f"`{content}`" if content is not None else "None"),
bold("Regex:") + " " + f"{regex}",
bold("Contains:")
+ " "
+ (", ".join(list(contains)) if contains is not None else "None"),
bold("Limit:") + " " + f"{limit}",
]
start = monotonic()
messages: typing.List[discord.Message] = []
async for message in channel.history(
limit=limit, oldest_first=False, before=before, after=after
):
if message.id == ctx.message.id:
continue
if authors is not None and message.author not in authors:
continue
if mentions is not None and not any(
True for mention in message.mentions if mention in mentions
):
continue
if pinned is not None and message.pinned != pinned:
continue
if (
content is not None
and content.lower() not in message.content.lower()
and all(
content.lower() not in str(embed.to_dict()).lower() for embed in message.embeds
)
):
continue
if regex is not None and message.content is not None:
# Thanks Trusty for this.
try:
trigger_timeout = 1
process = self.re_pool.apply_async(regex.findall, (message.content,))
task = functools.partial(process.get, timeout=trigger_timeout)
loop = asyncio.get_running_loop()
new_task = loop.run_in_executor(None, task)
search = await asyncio.wait_for(new_task, timeout=trigger_timeout + 5)
except (multiprocessing.TimeoutError, asyncio.TimeoutError):
raise commands.UserFeedbackCheckFailure(
_("Your regex process took too long. Removing from memory.")
)
except ValueError:
continue
except Exception as e:
raise commands.UserFeedbackCheckFailure(
_("There is an error in your regex.\n{e}").format(e=box(str(e), lang="py"))
)
else:
if not search:
continue
if contains is not None:
if "link" in contains:
regex = URL_RE.findall(message.content.lower())
if regex == []:
continue
if "embed" in contains and len(message.embeds) == 0:
continue
if "file" in contains and len(message.attachments) == 0:
continue
messages.append(message)
embeds = []
not_found = len(messages) == 0
args_str = "\n".join(args_str)
if not not_found:
for i, message in enumerate(messages, start=1):
embed: discord.Embed = discord.Embed()
embed.title = f"Search in #{channel.name} ({channel.id})"
embed.description = args_str
embed.url = message.jump_url
embed.set_author(name=f"{message.author.display_name} ({message.author.id})")
embed.add_field(
name=f"Message ({message.id}) content:",
value=(
(
message.content
if len(message.content) < 1025
else (message.content[:1020] + "\n...")
)
if message.content
else "None"
),
inline=False,
)
embed.add_field(
name="Embed(s):",
value=(
_("Look at the original message.") if len(message.embeds) > 0 else "None"
),
inline=False,
)
embed.timestamp = message.created_at
embed.set_thumbnail(
url="https://us.123rf.com/450wm/sommersby/sommersby1610/sommersby161000062/66918773-recherche-ic%C3%B4ne-plate-recherche-ic%C3%B4ne-conception-recherche-ic%C3%B4ne-web-vecteur-loupe.jpg"
)
embed.set_footer(
text=f"Page {i}/{len(messages)}",
icon_url="https://us.123rf.com/450wm/sommersby/sommersby1610/sommersby161000062/66918773-recherche-ic%C3%B4ne-plate-recherche-ic%C3%B4ne-conception-recherche-ic%C3%B4ne-web-vecteur-loupe.jpg",
)
embeds.append(embed)
else:
embed: discord.Embed = discord.Embed()
embed.title = _("Search in #{channel.name} ({channel.id})").format(channel=channel)
embed.add_field(name="Result:", value=_("Sorry, I could not find any results."))
embed.timestamp = datetime.datetime.now()
embed.set_thumbnail(
url="https://us.123rf.com/450wm/sommersby/sommersby1610/sommersby161000062/66918773-recherche-ic%C3%B4ne-plate-recherche-ic%C3%B4ne-conception-recherche-ic%C3%B4ne-web-vecteur-loupe.jpg"
)
embed.set_footer(
text="Page 1/1",
icon_url="https://us.123rf.com/450wm/sommersby/sommersby1610/sommersby161000062/66918773-recherche-ic%C3%B4ne-plate-recherche-ic%C3%B4ne-conception-recherche-ic%C3%B4ne-web-vecteur-loupe.jpg",
)
embeds.append(embed)
end = monotonic()
total = round(end - start, 1)
for embed in embeds:
embed.title = _("Search in #{channel.name} ({channel.id}) in {total}s").format(
channel=channel, total=total
)
await Menu(pages=embeds).start(ctx)
class DateConverter(commands.Converter):
"""Date converter which uses dateparser.parse()."""
async def convert(self, ctx: commands.Context, argument: str) -> datetime.datetime:
parsed = dateparser.parse(argument)
if parsed is None:
raise commands.BadArgument(_("Unrecognized date/time."))
return parsed
# class SearchArgs(commands.FlagConverter, case_insensitive=False, prefix="--", delimiter=" "):
# authors: commands.Greedy[discord.Member]
# mentions: commands.Greedy[discord.Member]
# before: DateConverter
# after: DateConverter
# pinned: bool
# content: str
# regex: str
# contains: commands.Greedy[str]
# limit: int
class NoExitParser(argparse.ArgumentParser):
def error(self, message) -> None:
raise commands.BadArgument(message)
class SearchArgs:
def parse_arguments(self, arguments: str) -> argparse.Namespace:
parser = NoExitParser(description="Selection args for DiscordSearch.", add_help=False)
parser.add_argument("--author", dest="authors", nargs="+")
parser.add_argument("--mention", dest="mentions", nargs="+")
parser.add_argument("--before", dest="before")
parser.add_argument("--after", dest="after")
parser.add_argument("--pinned", dest="pinned")
parser.add_argument("--content", dest="content", nargs="*")
parser.add_argument("--regex", dest="regex", nargs="*")
parser.add_argument("--contain", dest="contains", nargs="+")
parser.add_argument("--limit", dest="limit")
return parser.parse_args(arguments)
async def convert(self, ctx: commands.Context, arguments) -> typing.Any:
self.ctx = ctx
args = self.parse_arguments(arguments)
if args.authors is not None:
self.authors = []
for author in args.authors:
author = await commands.MemberConverter().convert(ctx, author)
if author is None:
raise commands.BadArgument("`--author` must be a member.")
self.authors.append(author)
else:
self.authors = None
if args.mentions is not None:
self.mentions = []
for mention in args.mentions:
mention = await commands.MemberConverter().convert(ctx, mention)
if mention is None:
raise commands.BadArgument("`--mention` must be a member.")
self.mentions.append(mention)
else:
self.mentions = None
self.before = (
await DateConverter().convert(ctx, args.before)
if args.before is not None
else args.before
)
self.after = (
await DateConverter().convert(ctx, args.after)
if args.after is not None
else args.after
)
if args.pinned is not None:
args.pinned = str(args.pinned)
if args.pinned.lower() in ("true", "y", "yes"):
self.pinned = True
elif args.pinned.lower() in ("false", "n", "no"):
self.pinned = False
else:
raise commands.BadArgument("`--pinned` must be a bool.")
else:
self.pinned = args.pinned
self.content = "".join(args.content) if args.content is not None else args.content
if args.regex is not None:
try:
self.regex = re.compile("".join(args.regex))
except Exception as e:
raise commands.BadArgument(f"`{args.regex}` is not a valid regex pattern.\n{e}")
else:
self.regex = None
if args.contains is not None:
self.contains = []
for contain in args.contains:
if contain.lower() not in ("link", "embed", "file"):
raise commands.BadArgument("`--contain` must be `link`, `embed` or `file`.")
self.contains.append(contain.lower())
else:
self.contains = None
if args.limit is not None:
try:
self.limit = int(args.limit)
except ValueError:
raise commands.BadArgument("`--limit` must be a int.")
else:
self.limit = int(args.limit)
else:
self.limit = None
return self