349 lines
14 KiB
Python
349 lines
14 KiB
Python
from AAA3A_utils import Cog, Menu # isort:skip
|
|
from redbot.core import commands # isort:skip
|
|
from redbot.core.bot import Red # isort:skip
|
|
from redbot.core.i18n import Translator, cog_i18n # isort:skip
|
|
import discord # isort:skip
|
|
import typing # isort:skip
|
|
|
|
import argparse
|
|
import asyncio
|
|
import datetime
|
|
import functools
|
|
import multiprocessing
|
|
import re
|
|
from time import monotonic
|
|
|
|
import dateparser
|
|
from redbot.core.utils.chat_formatting import bold, box, underline
|
|
from redbot.core.utils.common_filters import URL_RE
|
|
|
|
# Credits:
|
|
# General repo credits.
|
|
# Thanks to Trusty for the secure way to manage user Regexes (https://github.com/TrustyJAID/Trusty-cogs/blob/master/retrigger/triggerhandler.py#L542-L606)!
|
|
|
|
_: Translator = Translator("DiscordSearch", __file__)
|
|
|
|
|
|
class StrConverter(commands.Converter):
|
|
async def convert(self, ctx: commands.Context, argument: str) -> str:
|
|
return argument
|
|
|
|
|
|
@cog_i18n(_)
|
|
class DiscordSearch(Cog):
|
|
"""A cog to edit roles!"""
|
|
|
|
def __init__(self, bot: Red) -> None:
|
|
super().__init__(bot=bot)
|
|
|
|
self.re_pool: multiprocessing.Pool = multiprocessing.Pool()
|
|
|
|
@commands.guild_only()
|
|
@commands.admin_or_permissions(administrator=True)
|
|
@commands.cooldown(rate=3, per=30, type=commands.BucketType.user)
|
|
@commands.bot_has_permissions(embed_links=True)
|
|
@commands.hybrid_command(name="discordsearch", aliases=["dsearch"])
|
|
async def discordsearch(
|
|
self,
|
|
ctx: commands.Context,
|
|
channel: typing.Optional[discord.TextChannel],
|
|
args: commands.Greedy[StrConverter],
|
|
) -> None:
|
|
"""Search for a message on Discord in a channel.
|
|
|
|
Warning: The bot uses the api for each search.
|
|
Arguments:
|
|
`--author @user1 --author user2#1234 --author 0123456789`
|
|
`--mention @user1 --mention user2#1234 --mention 0123456789`
|
|
`--before now`
|
|
`--after "25/12/2000 00h00"`
|
|
`--pinned true`
|
|
`--content "AAA3A-cogs"`
|
|
`--regex "\\[p\\]"`
|
|
`--contain link --contain embed --contain file`
|
|
`--limit 100` (It's the limit of the number of messages taken into account in the search, not the number of results.)
|
|
"""
|
|
if not args:
|
|
raise commands.UserInputError()
|
|
try:
|
|
args = await SearchArgs().convert(ctx, args)
|
|
except commands.BadArgument as e:
|
|
await ctx.send(e)
|
|
return
|
|
authors = args.authors
|
|
mentions = args.mentions
|
|
before = args.before
|
|
after = args.after
|
|
pinned = args.pinned
|
|
content = args.content
|
|
regex = args.regex
|
|
contains = args.contains
|
|
limit = args.limit
|
|
if channel is None:
|
|
channel = ctx.channel
|
|
if all(
|
|
setting is None
|
|
for setting in (
|
|
authors,
|
|
mentions,
|
|
before,
|
|
after,
|
|
pinned,
|
|
content,
|
|
regex,
|
|
contains,
|
|
limit,
|
|
)
|
|
):
|
|
raise commands.UserFeedbackCheckFailure(_("You must provide at least one parameter."))
|
|
args_str = [
|
|
underline("--- Settings of search ---"),
|
|
bold("Authors:")
|
|
+ " "
|
|
+ (
|
|
", ".join([author.mention for author in authors])
|
|
if authors is not None
|
|
else "None"
|
|
),
|
|
bold("Mentions:")
|
|
+ " "
|
|
+ (
|
|
", ".join([mention.mention for mention in mentions])
|
|
if mentions is not None
|
|
else "None"
|
|
),
|
|
bold("Before:") + " " + f"{before}",
|
|
bold("After:") + " " + f"{after}",
|
|
bold("Pinned:") + " " + f"{pinned}",
|
|
bold("Content:") + " " + (f"`{content}`" if content is not None else "None"),
|
|
bold("Regex:") + " " + f"{regex}",
|
|
bold("Contains:")
|
|
+ " "
|
|
+ (", ".join(list(contains)) if contains is not None else "None"),
|
|
bold("Limit:") + " " + f"{limit}",
|
|
]
|
|
start = monotonic()
|
|
messages: typing.List[discord.Message] = []
|
|
async for message in channel.history(
|
|
limit=limit, oldest_first=False, before=before, after=after
|
|
):
|
|
if message.id == ctx.message.id:
|
|
continue
|
|
if authors is not None and message.author not in authors:
|
|
continue
|
|
if mentions is not None and not any(
|
|
True for mention in message.mentions if mention in mentions
|
|
):
|
|
continue
|
|
if pinned is not None and message.pinned != pinned:
|
|
continue
|
|
if (
|
|
content is not None
|
|
and content.lower() not in message.content.lower()
|
|
and all(
|
|
content.lower() not in str(embed.to_dict()).lower() for embed in message.embeds
|
|
)
|
|
):
|
|
continue
|
|
if regex is not None and message.content is not None:
|
|
# Thanks Trusty for this.
|
|
try:
|
|
trigger_timeout = 1
|
|
process = self.re_pool.apply_async(regex.findall, (message.content,))
|
|
task = functools.partial(process.get, timeout=trigger_timeout)
|
|
loop = asyncio.get_running_loop()
|
|
new_task = loop.run_in_executor(None, task)
|
|
search = await asyncio.wait_for(new_task, timeout=trigger_timeout + 5)
|
|
except (multiprocessing.TimeoutError, asyncio.TimeoutError):
|
|
raise commands.UserFeedbackCheckFailure(
|
|
_("Your regex process took too long. Removing from memory.")
|
|
)
|
|
except ValueError:
|
|
continue
|
|
except Exception as e:
|
|
raise commands.UserFeedbackCheckFailure(
|
|
_("There is an error in your regex.\n{e}").format(e=box(str(e), lang="py"))
|
|
)
|
|
else:
|
|
if not search:
|
|
continue
|
|
if contains is not None:
|
|
if "link" in contains:
|
|
regex = URL_RE.findall(message.content.lower())
|
|
if regex == []:
|
|
continue
|
|
if "embed" in contains and len(message.embeds) == 0:
|
|
continue
|
|
if "file" in contains and len(message.attachments) == 0:
|
|
continue
|
|
messages.append(message)
|
|
embeds = []
|
|
not_found = len(messages) == 0
|
|
args_str = "\n".join(args_str)
|
|
if not not_found:
|
|
for i, message in enumerate(messages, start=1):
|
|
embed: discord.Embed = discord.Embed()
|
|
embed.title = f"Search in #{channel.name} ({channel.id})"
|
|
embed.description = args_str
|
|
embed.url = message.jump_url
|
|
embed.set_author(name=f"{message.author.display_name} ({message.author.id})")
|
|
embed.add_field(
|
|
name=f"Message ({message.id}) content:",
|
|
value=(
|
|
(
|
|
message.content
|
|
if len(message.content) < 1025
|
|
else (message.content[:1020] + "\n...")
|
|
)
|
|
if message.content
|
|
else "None"
|
|
),
|
|
inline=False,
|
|
)
|
|
embed.add_field(
|
|
name="Embed(s):",
|
|
value=(
|
|
_("Look at the original message.") if len(message.embeds) > 0 else "None"
|
|
),
|
|
inline=False,
|
|
)
|
|
embed.timestamp = message.created_at
|
|
embed.set_thumbnail(
|
|
url="https://us.123rf.com/450wm/sommersby/sommersby1610/sommersby161000062/66918773-recherche-ic%C3%B4ne-plate-recherche-ic%C3%B4ne-conception-recherche-ic%C3%B4ne-web-vecteur-loupe.jpg"
|
|
)
|
|
embed.set_footer(
|
|
text=f"Page {i}/{len(messages)}",
|
|
icon_url="https://us.123rf.com/450wm/sommersby/sommersby1610/sommersby161000062/66918773-recherche-ic%C3%B4ne-plate-recherche-ic%C3%B4ne-conception-recherche-ic%C3%B4ne-web-vecteur-loupe.jpg",
|
|
)
|
|
embeds.append(embed)
|
|
else:
|
|
embed: discord.Embed = discord.Embed()
|
|
embed.title = _("Search in #{channel.name} ({channel.id})").format(channel=channel)
|
|
embed.add_field(name="Result:", value=_("Sorry, I could not find any results."))
|
|
embed.timestamp = datetime.datetime.now()
|
|
embed.set_thumbnail(
|
|
url="https://us.123rf.com/450wm/sommersby/sommersby1610/sommersby161000062/66918773-recherche-ic%C3%B4ne-plate-recherche-ic%C3%B4ne-conception-recherche-ic%C3%B4ne-web-vecteur-loupe.jpg"
|
|
)
|
|
embed.set_footer(
|
|
text="Page 1/1",
|
|
icon_url="https://us.123rf.com/450wm/sommersby/sommersby1610/sommersby161000062/66918773-recherche-ic%C3%B4ne-plate-recherche-ic%C3%B4ne-conception-recherche-ic%C3%B4ne-web-vecteur-loupe.jpg",
|
|
)
|
|
embeds.append(embed)
|
|
end = monotonic()
|
|
total = round(end - start, 1)
|
|
for embed in embeds:
|
|
embed.title = _("Search in #{channel.name} ({channel.id}) in {total}s").format(
|
|
channel=channel, total=total
|
|
)
|
|
await Menu(pages=embeds).start(ctx)
|
|
|
|
|
|
class DateConverter(commands.Converter):
|
|
"""Date converter which uses dateparser.parse()."""
|
|
|
|
async def convert(self, ctx: commands.Context, argument: str) -> datetime.datetime:
|
|
parsed = dateparser.parse(argument)
|
|
if parsed is None:
|
|
raise commands.BadArgument(_("Unrecognized date/time."))
|
|
return parsed
|
|
|
|
|
|
# class SearchArgs(commands.FlagConverter, case_insensitive=False, prefix="--", delimiter=" "):
|
|
# authors: commands.Greedy[discord.Member]
|
|
# mentions: commands.Greedy[discord.Member]
|
|
# before: DateConverter
|
|
# after: DateConverter
|
|
# pinned: bool
|
|
# content: str
|
|
# regex: str
|
|
# contains: commands.Greedy[str]
|
|
# limit: int
|
|
|
|
|
|
class NoExitParser(argparse.ArgumentParser):
|
|
def error(self, message) -> None:
|
|
raise commands.BadArgument(message)
|
|
|
|
|
|
class SearchArgs:
|
|
def parse_arguments(self, arguments: str) -> argparse.Namespace:
|
|
parser = NoExitParser(description="Selection args for DiscordSearch.", add_help=False)
|
|
parser.add_argument("--author", dest="authors", nargs="+")
|
|
parser.add_argument("--mention", dest="mentions", nargs="+")
|
|
parser.add_argument("--before", dest="before")
|
|
parser.add_argument("--after", dest="after")
|
|
parser.add_argument("--pinned", dest="pinned")
|
|
parser.add_argument("--content", dest="content", nargs="*")
|
|
parser.add_argument("--regex", dest="regex", nargs="*")
|
|
parser.add_argument("--contain", dest="contains", nargs="+")
|
|
parser.add_argument("--limit", dest="limit")
|
|
|
|
return parser.parse_args(arguments)
|
|
|
|
async def convert(self, ctx: commands.Context, arguments) -> typing.Any:
|
|
self.ctx = ctx
|
|
args = self.parse_arguments(arguments)
|
|
if args.authors is not None:
|
|
self.authors = []
|
|
for author in args.authors:
|
|
author = await commands.MemberConverter().convert(ctx, author)
|
|
if author is None:
|
|
raise commands.BadArgument("`--author` must be a member.")
|
|
self.authors.append(author)
|
|
else:
|
|
self.authors = None
|
|
if args.mentions is not None:
|
|
self.mentions = []
|
|
for mention in args.mentions:
|
|
mention = await commands.MemberConverter().convert(ctx, mention)
|
|
if mention is None:
|
|
raise commands.BadArgument("`--mention` must be a member.")
|
|
self.mentions.append(mention)
|
|
else:
|
|
self.mentions = None
|
|
self.before = (
|
|
await DateConverter().convert(ctx, args.before)
|
|
if args.before is not None
|
|
else args.before
|
|
)
|
|
self.after = (
|
|
await DateConverter().convert(ctx, args.after)
|
|
if args.after is not None
|
|
else args.after
|
|
)
|
|
if args.pinned is not None:
|
|
args.pinned = str(args.pinned)
|
|
if args.pinned.lower() in ("true", "y", "yes"):
|
|
self.pinned = True
|
|
elif args.pinned.lower() in ("false", "n", "no"):
|
|
self.pinned = False
|
|
else:
|
|
raise commands.BadArgument("`--pinned` must be a bool.")
|
|
else:
|
|
self.pinned = args.pinned
|
|
self.content = "".join(args.content) if args.content is not None else args.content
|
|
if args.regex is not None:
|
|
try:
|
|
self.regex = re.compile("".join(args.regex))
|
|
except Exception as e:
|
|
raise commands.BadArgument(f"`{args.regex}` is not a valid regex pattern.\n{e}")
|
|
else:
|
|
self.regex = None
|
|
if args.contains is not None:
|
|
self.contains = []
|
|
for contain in args.contains:
|
|
if contain.lower() not in ("link", "embed", "file"):
|
|
raise commands.BadArgument("`--contain` must be `link`, `embed` or `file`.")
|
|
self.contains.append(contain.lower())
|
|
else:
|
|
self.contains = None
|
|
if args.limit is not None:
|
|
try:
|
|
self.limit = int(args.limit)
|
|
except ValueError:
|
|
raise commands.BadArgument("`--limit` must be a int.")
|
|
else:
|
|
self.limit = int(args.limit)
|
|
else:
|
|
self.limit = None
|
|
return self
|