Ruby-Cogs/martools/marttools.py
Valerie 48256636da
Some checks are pending
Run pre-commit / Run pre-commit (push) Waiting to run
Fix typo in docstring for Nsfw cog settings command.
2025-05-23 03:03:38 -04:00

511 lines
21 KiB
Python

import asyncio
import logging
import time
from collections import Counter
from datetime import datetime, timezone
from typing import Union
import discord
import lavalink
import sqlite3
from databases import Database
from redbot.cogs.audio.audio_dataclasses import Query
from redbot.core import bank, commands
from redbot.core.bot import Red
from redbot.core.data_manager import cog_data_path
from redbot.core.i18n import Translator, cog_i18n
from redbot.core.utils.chat_formatting import bold, box, humanize_number, humanize_timedelta
from .listeners import Listeners
from .statements import (
CREATE_TABLE,
CREATE_VERSION_TABLE,
DROP_OLD_PERMA,
DROP_OLD_TEMP,
GET_EVENT_VALUE,
INSERT_OR_IGNORE,
SELECT_OLD,
UPSERT,
PRAGMA_journal_mode,
PRAGMA_read_uncommitted,
PRAGMA_wal_autocheckpoint,
)
from .utils import EVENTS_NAMES
log = logging.getLogger("red.predacogs.martools")
_ = Translator("MartTools", __file__)
@cog_i18n(_)
class MartTools(Listeners, commands.Cog):
"""Multiple tools that are originally used on Martine."""
__author__ = ["Predä", "Draper"]
__version__ = "3.0.2"
async def red_delete_data_for_user(self, **kwargs):
"""Nothing to delete."""
return
def __init__(self, bot: Red):
self.bot = bot
self.cursor = Database(f"sqlite:///{cog_data_path(self)}/MartTools.db")
self.cache = {"perma": Counter(), "session": Counter()}
self.uptime = discord.utils.utcnow()
self.init_task = self.bot.loop.create_task(self.initialize())
self.dump_cache_task = self.bot.loop.create_task(self._dump_cache_to_db_task())
def cog_unload(self):
self.dump_cache_task.cancel()
if self.init_task:
self.init_task.cancel()
asyncio.create_task(self._dump_cache_to_db())
def format_help_for_context(self, ctx: commands.Context) -> str:
"""Thanks Sinbad!"""
pre_processed = super().format_help_for_context(ctx)
return f"{pre_processed}\n\nAuthors: {', '.join(self.__author__)}\nCog Version: {self.__version__}"
async def initialize(self):
await self.cursor.connect()
await self.cursor.execute(PRAGMA_journal_mode)
await self.cursor.execute(PRAGMA_wal_autocheckpoint)
await self.cursor.execute(PRAGMA_read_uncommitted)
await self.cursor.execute(CREATE_TABLE)
await self.cursor.execute(CREATE_VERSION_TABLE)
await self.cursor.execute(
INSERT_OR_IGNORE, {"event": "creation_time", "quantity": time.time()}
)
try:
check_result = list(await self.cursor.fetch_all("SELECT * FROM bot_stats_perma"))
except sqlite3.OperationalError:
await self._populate_cache()
return
else:
if check_result:
await self._migrate_data()
await self._populate_cache()
async def _migrate_data(self):
for event_name in EVENTS_NAMES:
result = await self.cursor.fetch_val(SELECT_OLD, {"event": event_name})
if result:
await self.cursor.execute(UPSERT, {"event": event_name, "quantity": result})
old_creation_time = await self.cursor.fetch_val(
SELECT_OLD, {"event": "creation_time", "guild_id": -1000}
)
await self.cursor.execute(
UPSERT,
("creation_time", old_creation_time or time.time()),
)
await self.cursor.execute(DROP_OLD_TEMP)
await self.cursor.execute(DROP_OLD_PERMA)
await self.cursor.execute("INSERT or IGNORE INTO version (version_num) VALUES (2)")
async def _populate_cache(self):
for event_name in EVENTS_NAMES:
result = await self.cursor.fetch_val(GET_EVENT_VALUE, {"event": event_name})
if result:
self.cache["perma"][event_name] = result
result = await self.cursor.fetch_val(GET_EVENT_VALUE, {"event": "creation_time"})
self.cache["perma"]["creation_time"] = result or time.time()
async def _dump_cache_to_db(self):
for event_name, value in self.cache["perma"].items():
await self.cursor.execute(UPSERT, {"event": event_name, "quantity": value})
async def _dump_cache_to_db_task(self):
await self.bot.wait_until_red_ready()
while True:
await asyncio.sleep(300)
try:
await self._dump_cache_to_db()
except Exception:
log.exception("Something went wrong in _dump_cache_to_db_task:")
def get_value(self, key: str, perma: bool = False, raw: bool = False) -> Union[int, str]:
if raw:
return self.cache["perma" if perma else "session"][key]
return humanize_number(self.cache["perma" if perma else "session"][key])
def get_bot_uptime(self):
delta = discord.utils.utcnow() - self.uptime
return str(humanize_timedelta(timedelta=delta))
def usage_counts_cpm(self, key: str, time: int = 60):
delta = discord.utils.utcnow() - self.uptime
minutes = delta.total_seconds() / time
total = self.get_value(key, raw=True)
return total / minutes
@commands.command()
@commands.guild_only()
@commands.bot_has_permissions(embed_links=True)
async def bankstats(self, ctx: commands.Context):
"""Show stats of the bank."""
icon = self.bot.user.display_avatar
user_bal = await bank.get_balance(ctx.author)
credits_name = await bank.get_currency_name(ctx.guild)
pos = await bank.get_leaderboard_position(ctx.author)
bank_name = await bank.get_bank_name(ctx.guild)
bank_config = bank._config
if await bank.is_global():
all_accounts = len(await bank_config.all_users())
accounts = await bank_config.all_users()
else:
all_accounts = len(await bank_config.all_members(ctx.guild))
accounts = await bank_config.all_members(ctx.guild)
member_account = await bank.get_account(ctx.author)
created_at = str(member_account.created_at)
no = "1970-01-01 00:00:00"
overall = 0
for key, value in accounts.items():
overall += value["balance"]
em = discord.Embed(color=await ctx.embed_colour())
em.set_author(name=_("{} stats:").format(bank_name), icon_url=icon)
em.add_field(
name=_("{} stats:").format("Global" if await bank.is_global() else "Bank"),
value=_(
"Total accounts: **{all_accounts}**\nTotal amount: **{overall} {credits_name}**"
).format(
all_accounts=all_accounts,
overall=humanize_number(overall),
credits_name=credits_name,
),
)
if pos is not None:
percent = round((int(user_bal) / overall * 100), 3)
em.add_field(
name=_("Your stats:"),
value=_(
"You have **{bal} {currency}**.\n"
"It's **{percent}%** of the {g}amount in the bank.\n"
"You are **{pos}/{all_accounts}** in the {g}leaderboard."
).format(
bal=humanize_number(user_bal),
currency=credits_name,
percent=percent,
g="global " if await bank.is_global() else "",
pos=humanize_number(pos),
all_accounts=humanize_number(all_accounts),
),
inline=False,
)
if created_at != no:
em.set_footer(text=_("Bank account created on: ") + str(created_at))
await ctx.send(embed=em)
@commands.command(aliases=["usagec"])
async def usagecount(self, ctx: commands.Context):
"""
Show the usage count of the bot.
Commands processed, messages received, and music on servers.
"""
msg = _(
"**Commands processed:** `{commands_count}` commands. (`{cpm_commands:.2f}`/min)\n"
"**Commands errors:** `{errors_count}` errors.\n"
"**Messages received:** `{messages_read}` messages. (`{cpm_msgs:.2f}`/min)\n"
"**Messages sent:** `{messages_sent}` messages. (`{cpm_msgs_sent:.2f}`/min)\n"
"**Playing music on:** `{ll_players}` servers.\n"
"**Tracks played:** `{tracks_played}` tracks. (`{cpm_tracks:.2f}`/min)\n\n"
"**Servers joined:** `{guild_join}` servers. (`{cpm_guild_join:.2f}`/hour)\n"
"**Servers left:** `{guild_leave}` servers. (`{cpm_guild_leave:.2f}`/hour)"
).format(
commands_count=self.get_value("processed_commands"),
cpm_commands=self.usage_counts_cpm("processed_commands"),
errors_count=self.get_value("command_error"),
messages_read=self.get_value("messages_read"),
cpm_msgs=self.usage_counts_cpm("messages_read"),
messages_sent=self.get_value("msg_sent"),
cpm_msgs_sent=self.usage_counts_cpm("msg_sent"),
ll_players="`{}/{}`".format(
humanize_number(len(lavalink.active_players())),
humanize_number(len(lavalink.all_players())),
),
tracks_played=self.get_value("tracks_played"),
cpm_tracks=self.usage_counts_cpm("tracks_played"),
guild_join=self.get_value("guild_join"),
cpm_guild_join=self.usage_counts_cpm("guild_join", 3600),
guild_leave=self.get_value("guild_remove"),
cpm_guild_leave=self.usage_counts_cpm("guild_remove", 3600),
)
if await ctx.embed_requested():
em = discord.Embed(
color=await ctx.embed_colour(),
title=_("Usage count of {} since last restart:").format(self.bot.user.name),
description=msg,
)
em.set_thumbnail(url=self.bot.user.display_avatar)
em.set_footer(text=_("Since {}").format(self.get_bot_uptime()))
await ctx.send(embed=em)
else:
await ctx.send(
_("Usage count of {} since last restart:\n").format(ctx.bot.user.name)
+ msg
+ _("\n\nSince {}").format(self.get_bot_uptime())
)
@commands.bot_has_permissions(embed_links=True)
@commands.command(aliases=["advusagec"])
async def advusagecount(self, ctx: commands.Context):
"""
Permanent stats since first time that the cog has been loaded.
"""
avatar = self.bot.user.display_avatar
delta = discord.utils.utcnow() - datetime.fromtimestamp(
self.get_value("creation_time", perma=True, raw=True), timezone.utc
)
uptime = humanize_timedelta(timedelta=delta)
ll_players = "{}/{}".format(
humanize_number(len(lavalink.active_players())),
humanize_number(len(lavalink.all_players())),
)
em = discord.Embed(
title=_("Usage count of {}:").format(ctx.bot.user.name),
color=await ctx.embed_colour(),
)
em.add_field(
name=_("Message Stats"),
value=box(
_(
"Messages Read : {messages_read}\n"
"Messages Sent : {msg_sent}\n"
"Messages Deleted : {messages_deleted}\n"
"Messages Edited : {messages_edited}\n"
"DMs Received : {dms_received}\n"
).format_map(
{
"messages_read": self.get_value("messages_read", perma=True),
"msg_sent": self.get_value("msg_sent", perma=True),
"messages_deleted": self.get_value("messages_deleted", perma=True),
"messages_edited": self.get_value("messages_edited", perma=True),
"dms_received": self.get_value("dms_received", perma=True),
}
),
lang="prolog",
),
inline=False,
)
em.add_field(
name=_("Commands Stats"),
value=box(
_(
"Commands Processed : {processed_commands}\n"
"Errors Occured : {command_error}\n"
"Sessions Resumed : {sessions_resumed}\n"
).format_map(
{
"processed_commands": self.get_value("processed_commands", perma=True),
"command_error": self.get_value("command_error", perma=True),
"sessions_resumed": self.get_value("sessions_resumed", perma=True),
}
),
lang="prolog",
),
inline=False,
)
em.add_field(
name=_("Guild Stats"),
value=box(
_(
"Guilds Joined : {guild_join}\n" "Guilds Left : {guild_remove}\n"
).format_map(
{
"guild_join": self.get_value("guild_join", perma=True),
"guild_remove": self.get_value("guild_remove", perma=True),
}
),
lang="prolog",
),
inline=False,
)
em.add_field(
name=_("User Stats"),
value=box(
_(
"New Users : {new_members}\n"
"Left Users : {members_left}\n"
"Banned Users : {members_banned}\n"
"Unbanned Users : {members_unbanned}\n"
).format_map(
{
"new_members": self.get_value("new_members", perma=True),
"members_left": self.get_value("members_left", perma=True),
"members_banned": self.get_value("members_banned", perma=True),
"members_unbanned": self.get_value("members_unbanned", perma=True),
}
),
lang="prolog",
),
inline=False,
)
em.add_field(
name=_("Role Stats"),
value=box(
_(
"Roles Added : {roles_added}\n"
"Roles Removed : {roles_removed}\n"
"Roles Updated : {roles_updated}\n"
).format_map(
{
"roles_added": self.get_value("roles_added", perma=True),
"roles_removed": self.get_value("roles_removed", perma=True),
"roles_updated": self.get_value("roles_updated", perma=True),
}
),
lang="prolog",
),
inline=False,
)
em.add_field(
name=_("Emoji Stats"),
value=box(
_(
"Reacts Added : {reactions_added}\n"
"Reacts Removed : {reactions_removed}\n"
"Emoji Added : {emojis_added}\n"
"Emoji Removed : {emojis_removed}\n"
"Emoji Updated : {emojis_updated}\n"
).format_map(
{
"reactions_added": self.get_value("reactions_added", perma=True),
"reactions_removed": self.get_value("reactions_removed", perma=True),
"emojis_added": self.get_value("emojis_added", perma=True),
"emojis_removed": self.get_value("emojis_removed", perma=True),
"emojis_updated": self.get_value("emojis_updated", perma=True),
}
),
lang="prolog",
),
inline=False,
)
em.add_field(
name=_("Audio Stats"),
value=box(
_(
"Users Who Joined VC : {users_joined_bot_music_room}\n"
"Tracks Played : {tracks_played}\n"
"Number Of Players : {ll_players}"
).format(
users_joined_bot_music_room=self.get_value(
"users_joined_bot_music_room", perma=True
),
tracks_played=self.get_value("tracks_played", perma=True),
ll_players=ll_players,
),
lang="prolog",
),
inline=False,
)
if Query:
em.add_field(
name=_("Track Stats"),
value=box(
_(
"Streams : {streams_played}\n"
"YouTube Streams : {yt_streams_played}\n"
"Twitch Streams : {ttv_streams_played}\n"
"Other Streams : {streams_played}\n"
"YouTube Tracks : {youtube_tracks}\n"
"Soundcloud Tracks : {soundcloud_tracks}\n"
"Bandcamp Tracks : {bandcamp_tracks}\n"
"Vimeo Tracks : {vimeo_tracks}\n"
"Twitch Tracks : {twitch_tracks}\n"
"Other Tracks : {other_tracks}\n"
).format(
streams_played=self.get_value("streams_played", perma=True),
yt_streams_played=self.get_value("yt_streams_played", perma=True),
ttv_streams_played=self.get_value("ttv_streams_played", perma=True),
other_streams_played=self.get_value("other_streams_played", perma=True),
youtube_tracks=self.get_value("youtube_tracks", perma=True),
soundcloud_tracks=self.get_value("soundcloud_tracks", perma=True),
bandcamp_tracks=self.get_value("bandcamp_tracks", perma=True),
vimeo_tracks=self.get_value("vimeo_tracks", perma=True),
twitch_tracks=self.get_value("twitch_tracks", perma=True),
other_tracks=self.get_value("other_tracks", perma=True),
),
lang="prolog",
),
inline=False,
)
em.set_thumbnail(url=avatar)
em.set_footer(text=_("Since {}").format(uptime))
await ctx.send(embed=em)
@commands.command(aliases=["prefixes"])
async def prefix(self, ctx: commands.Context):
"""Show all prefixes of the bot"""
default_prefixes = await self.bot._config.prefix()
try:
guild_prefixes = await self.bot._config.guild(ctx.guild).prefix()
except AttributeError:
guild_prefixes = False
bot_name = ctx.bot.user.name
avatar = self.bot.user.display_avatar
if not guild_prefixes:
to_send = [f"`\u200b{p}\u200b`" for p in default_prefixes]
plural = _("Prefixes") if len(default_prefixes) >= 2 else _("Prefix")
if await ctx.embed_requested():
em = discord.Embed(
color=await ctx.embed_colour(),
title=_("{} of {}:").format(plural, bot_name),
description=" ".join(to_send),
)
em.set_thumbnail(url=avatar)
await ctx.send(embed=em)
else:
await ctx.send(bold(_("{} of {}:\n")).format(plural, bot_name) + " ".join(to_send))
else:
to_send = [f"`\u200b{p}\u200b`" for p in guild_prefixes]
plural = _("prefixes") if len(default_prefixes) >= 2 else _("prefix")
if await ctx.embed_requested():
em = discord.Embed(
color=await ctx.embed_colour(),
title=_("Server {} of {}:").format(plural, bot_name),
description=" ".join(to_send),
)
em.set_thumbnail(url=avatar)
await ctx.send(embed=em)
else:
await ctx.send(
bold(_("Server {} of {name}:\n")).format(plural, bot_name) + " ".join(to_send)
)
@commands.command(aliases=["serverc", "serversc"])
async def servercount(self, ctx: commands.Context):
"""Send servers stats of the bot."""
visible_users = sum(len(s.members) for s in self.bot.guilds)
total_users = sum(s.member_count for s in self.bot.guilds)
msg = _(
"{name} is running on `{shard_count}` {shards}.\n"
"Serving `{servs}` servers (`{channels}` channels).\n"
"For a total of `{visible_users}` users (`{unique}` unique).\n"
"(`{visible_users}` visible now, `{total_users}` total, `{percentage_chunked:.2f}%` chunked)"
).format(
name=ctx.bot.user.name,
shard_count=humanize_number(self.bot.shard_count),
shards=_("shards") if self.bot.shard_count > 1 else _("shard"),
servs=humanize_number(len(self.bot.guilds)),
channels=humanize_number(sum(len(s.channels) for s in self.bot.guilds)),
visible_users=humanize_number(visible_users),
unique=humanize_number(len(self.bot.users)),
total_users=humanize_number(total_users),
percentage_chunked=visible_users / total_users * 100,
)
if await ctx.embed_requested():
em = discord.Embed(color=await ctx.embed_colour(), description=msg)
await ctx.send(embed=em)
else:
await ctx.send(msg)