Ruby-Cogs/voicelogs/voicelogs.py
2025-02-19 22:02:13 -05:00

250 lines
10 KiB
Python

# This cog was originally by ZeLarpMaster for Red v2, and can be found at:
# https://github.com/ZeLarpMaster/ZeCogs/blob/master/voice_logs/voice_logs.py
import asyncio
import contextlib
import discord
import logging
from datetime import date, datetime, timedelta, timezone
from types import SimpleNamespace
from typing import Literal, Union
from redbot.core import checks, commands, Config
from redbot.core.utils.chat_formatting import bold
log = logging.getLogger("red.aikaterna.voicelogs")
class VoiceLogs(commands.Cog):
"""Logs information about voice channel connection times."""
__author__ = ["ZeLarpMaster#0818", "aikaterna"]
__version__ = "0.1.1"
TIME_FORMATS = ["{} seconds", "{} minutes", "{} hours", "{} days", "{} weeks"]
TIME_FRACTIONS = [60, 60, 24, 7]
ENTRY_TIME_LIMIT = timedelta(weeks=1)
CLEANUP_DELAY = timedelta(days=1).total_seconds()
async def red_delete_data_for_user(
self,
*,
requester: Literal["discord", "owner", "user", "user_strict"],
user_id: int,
):
await self.config.user_from_id(user_id).clear()
def __init__(self, bot):
self.bot = bot
self.config = Config.get_conf(self, 2708181003, force_registration=True)
default_guild = {"toggle": False}
default_user = {"history": []}
# history is a list of dict entries
# {"channel_id": int,
# "channel_name": str,
# "joined_at": datetime,
# "left_at": datetime}
self.config.register_guild(**default_guild)
self.config.register_user(**default_user)
asyncio.ensure_future(self.cleanup_loop())
# Commands
@commands.group(name="voicelog", aliases=["voicelogs"])
async def _command_voicelog(self, ctx):
"""
Access voice activity data.
You must have the bot Mod or Admin role or View Audit Log permissions to view and use the commands.
"""
pass
@_command_voicelog.command(name="user", aliases=["u"])
@checks.mod_or_permissions(view_audit_log=True)
async def _command_voicelog_user(self, ctx: commands.Context, *, user: discord.Member):
"""
Look up the voice activity of a user.
Timestamps are in UTC.
"""
entries = await self.config.user(user).history()
embed = discord.Embed(description=f"**Voice Activity for** {user.mention}")
for entry in self.process_entries(entries, limit=25):
joined_at = self.format_time(entry["joined_at"])
left_at = entry.get("left_at")
left_at = self.format_time(left_at) if left_at is not None else "now"
embed.add_field(
name=f"#{entry['channel_name']} ({entry['channel_id']})",
value=f"**{joined_at}** until **{left_at}**",
inline=False,
)
if len(embed.fields) == 0:
embed.description = f"No voice activity for {user.mention}"
await ctx.channel.send(embed=embed)
@_command_voicelog.command(name="channel", aliases=["c"])
@checks.mod_or_permissions(view_audit_log=True)
async def _command_voicelog_channel(self, ctx: commands.Context, *, voice_channel_name_or_id: discord.VoiceChannel):
"""
Look up the voice activity on a voice channel.
`voice_channel_name_or_id` is either the exact name of the target voice channel (proper case), or its ID.
"""
entries = []
all_entries = await self.config.all_users()
for user_id, user_entries in all_entries.items():
for history_key, entry_list in user_entries.items():
for entry in entry_list:
if entry["channel_id"] == voice_channel_name_or_id.id:
entry["user_id"] = user_id
entries.append(entry)
embed = discord.Embed(title=f"Voice Activity in {voice_channel_name_or_id.name}", description="")
for entry in self.process_entries(entries, limit=25):
time_spent = ""
left_at = entry.get("left_at", None)
if left_at is None:
time_spent = "+"
left_at = datetime.now(timezone.utc)
time_diff = left_at - entry["joined_at"]
time_spent = self.humanize_time(round(time_diff.total_seconds())) + time_spent
user_obj = ctx.guild.get_member(entry["user_id"])
if not user_obj:
user_obj = SimpleNamespace(name="Unknown User", id=entry["user_id"])
embed.description += f"**{user_obj.name}** ({user_obj.id}) for **{time_spent}**\n"
if len(embed.description) == 0:
embed.description = f"No voice activity in {voice_channel_name_or_id.mention}"
await ctx.send(embed=embed)
@_command_voicelog.command(name="toggle")
@checks.mod_or_permissions(view_audit_log=True)
async def _command_voicelog_toggle(self, ctx: commands.Context):
"""Toggle voice activity recording on and off."""
toggle = await self.config.guild(ctx.guild).toggle()
await self.config.guild(ctx.guild).toggle.set(not toggle)
await ctx.send(f"Voice channel watching is now toggled {bold('ON') if toggle == False else bold('OFF')}")
# Events
@commands.Cog.listener()
async def on_voice_state_update(
self, member: discord.Member, before: discord.VoiceState, after: discord.VoiceState
):
if before.channel == after.channel:
return
toggle = await self.config.guild(member.guild).toggle()
if not toggle:
return
try:
# Left that channel
if before.channel is not None:
async with self.config.user(member).history() as user_data:
entry = discord.utils.find(
lambda e: e["channel_id"] == before.channel.id and "left_at" not in e, user_data
)
if entry is not None:
entry["left_at"] = datetime.now(timezone.utc).timestamp()
# Joined that channel
if after.channel is not None:
async with self.config.user(member).history() as user_info:
entry = {
"channel_id": after.channel.id,
"channel_name": after.channel.name,
"joined_at": datetime.now(timezone.utc).timestamp(),
}
user_info.append(entry)
except Exception as e:
log.error(f"Error in on_voice_state_update:\n{e}", exc_info=True)
async def cleanup_loop(self):
await self.bot.wait_until_red_ready()
# Suppress the "Event loop is closed" error
with contextlib.suppress(RuntimeError, asyncio.CancelledError):
while True:
try:
await self.cleanup_entries()
await asyncio.sleep(self.CLEANUP_DELAY)
except Exception as e:
log.error(f"Error in cleanup_loop:\n{e}", exc_info=True)
async def cleanup_entries(self):
try:
delete_threshold = datetime.now(timezone.utc) - self.ENTRY_TIME_LIMIT
to_delete = {"history": []}
user_data = await self.config.all_users()
for user_id, history in user_data.items():
for dict_title, entry_list in history.items():
for entry in entry_list:
left_at = entry.get("left_at", None)
if left_at is not None and datetime.fromtimestamp(left_at, timezone.utc) < delete_threshold:
entry_list_index = [i for i, d in enumerate(entry_list) if left_at in d.values()]
entry_list.pop(entry_list_index[0])
await self.config.user_from_id(user_id).history.set(entry_list)
except Exception as e:
log.error(f"Error in cleanup_entries:\n{e}", exc_info=True)
def process_entries(self, entries, *, limit=None):
return sorted(self.map_entries(entries), key=lambda o: o["joined_at"], reverse=True)[:limit]
def map_entries(self, entries):
for entry in entries:
new_entry = entry.copy()
joined_at = datetime.fromtimestamp(entry["joined_at"], timezone.utc)
new_entry["joined_at"] = joined_at
left_at = entry.get("left_at")
if left_at is not None:
new_entry["left_at"] = datetime.fromtimestamp(left_at, timezone.utc)
yield new_entry
def format_time(self, moment: datetime):
if date.today() == moment.date():
return "today " + moment.strftime("%X")
else:
return moment.strftime("%c")
def humanize_time(self, time: int) -> str:
"""
Returns a string of the humanized given time keeping only the 2 biggest formats.
Examples:
1661410 --> 2 weeks 5 days (hours, mins, seconds are ignored)
30 --> 30 seconds
"""
times = []
# 90 --> divmod(90, 60) --> (1, 30) --> (1m + 30s)
for time_f in zip(self.TIME_FRACTIONS, self.TIME_FORMATS):
time, units = divmod(time, time_f[0])
if units > 0:
times.append(self.plural_format(units, time_f[1]))
if time > 0:
times.append(self.plural_format(time, self.TIME_FORMATS[-1]))
return " ".join(reversed(times[-2:]))
def plural_format(self, raw_amount: Union[int, float], format_string: str, *, singular_format: str = None) -> str:
"""
Formats a string for plural and singular forms of an amount.
The amount given is rounded.
`raw_amount` is an integer (rounded if something else is given)
`format_string` is the string to use when formatting in plural
`singular_format` is the string to use for singular
By default uses the plural and removes the last character.
"""
amount = round(raw_amount)
result = format_string.format(raw_amount)
if singular_format is None:
result = format_string.format(raw_amount)[: -1 if amount == 1 else None]
elif amount == 1:
result = singular_format.format(raw_amount)
return result