Ruby-Cogs/extendedaudio/extendedaudio.py

333 lines
No EOL
14 KiB
Python

import asyncio
import logging
import time
from typing import Optional, List, Dict
from collections import defaultdict
import discord
from redbot.core import Config, commands
from redbot.core.bot import Red
from redbot.core.utils.chat_formatting import box, humanize_list
from redbot.core.utils.predicates import MessagePredicate
log = logging.getLogger("red.valerie.extendedaudio")
SUPPORTED_VARIABLES = {
"title": "Song title",
"requester": "User who requested the song",
"duration": "Duration of the song",
"author": "Song author/artist"
}
class ExtendedAudio(commands.Cog):
"""Extends Red's core Audio cog with additional features"""
def __init__(self, bot: Red):
self.bot = bot
self.config = Config.get_conf(self, identifier=117117117)
default_guild = {
"status_channel": None, # Channel ID to post song updates
"status_format": "🎵 Now Playing: **{title}**\nRequested by: {requester}\nDuration: {duration}\nArtist: {author}", # Format for the status message
"allowed_channels": [], # List of allowed voice channel IDs
"bot_managed_only": False, # Whether to only allow bot-created channels
"last_status_message": None, # ID of the last status message (for cleanup)
"clean_old_messages": True, # Whether to delete old status messages
}
self.config.register_guild(**default_guild)
self.task = self.bot.loop.create_task(self.initialize())
self.audio = None
async def initialize(self):
"""Initialize the cog"""
await self.bot.wait_until_ready()
try:
self.audio = self.bot.get_cog("Audio")
if not self.audio:
log.error("Audio cog not loaded! Some features may not work.")
except Exception as e:
log.error(f"Failed to initialize ExtendedAudio: {e}")
def cog_unload(self):
"""Cleanup when cog unloads"""
if self.task:
self.task.cancel()
async def cog_check(self, ctx: commands.Context):
"""Check if Audio cog is loaded"""
if not self.audio:
self.audio = self.bot.get_cog("Audio")
if not self.audio:
await ctx.send("The Audio cog is not loaded. Please load it first with `[p]load audio`")
return False
return True
@commands.group(aliases=["eaudioset"])
@commands.guild_only()
@commands.admin_or_permissions(manage_guild=True)
async def extendedaudioset(self, ctx: commands.Context):
"""Configure ExtendedAudio settings"""
if not ctx.invoked_subcommand:
guild = ctx.guild
conf = await self.config.guild(guild).all()
status_channel = ctx.guild.get_channel(conf["status_channel"]) if conf["status_channel"] else None
msg = (
f"**Current Settings**\n"
f"Status Channel: {status_channel.mention if status_channel else 'Not Set'}\n"
f"Status Format: {conf['status_format']}\n"
f"Clean Old Messages: {'Enabled' if conf['clean_old_messages'] else 'Disabled'}\n\n"
f"Available Variables: {humanize_list([f'{{' + k + '}}' for k in SUPPORTED_VARIABLES.keys()])}\n"
f"Use `{ctx.prefix}extendedaudioset statuschannel` to set the channel for song updates."
)
await ctx.send(box(msg))
@extendedaudioset.command(name="statuschannel")
async def set_status_channel(self, ctx: commands.Context, channel: Optional[discord.TextChannel] = None):
"""Set the channel where song updates will be posted.
Leave empty to use the current channel.
"""
channel = channel or ctx.channel
await self.config.guild(ctx.guild).status_channel.set(channel.id)
await ctx.send(f"Song updates will now be posted in {channel.mention}")
@extendedaudioset.command(name="statusformat")
async def set_status_format(self, ctx: commands.Context, *, format_str: str):
"""Set the format for song update messages.
Available variables:
{title} - Song title
{requester} - User who requested the song
{duration} - Duration of the song
{author} - Song author/artist
Example: [p]extendedaudioset statusformat 🎵 Now Playing: **{title}** by {author}
"""
# Validate format string
try:
# Test with sample data
test_data = {
"title": "Test Song",
"requester": "Test User",
"duration": "3:45",
"author": "Test Artist"
}
format_str.format(**test_data)
except KeyError as e:
invalid_var = str(e).strip("'")
supported = humanize_list([f"{{" + k + "}}" for k in SUPPORTED_VARIABLES.keys()])
await ctx.send(
f"Invalid variable `{invalid_var}` in format string.\n"
f"Supported variables are: {supported}"
)
return
except Exception as e:
await ctx.send(f"Invalid format string: {e}")
return
await self.config.guild(ctx.guild).status_format.set(format_str)
example = format_str.format(
title="Never Gonna Give You Up",
requester="Rick Astley",
duration="3:32",
author="Rick Astley"
)
await ctx.send(
f"Status format set! Example:\n"
f"{example}"
)
@extendedaudioset.command(name="cleanmessages")
async def set_clean_messages(self, ctx: commands.Context, enabled: bool):
"""Set whether to delete old status messages.
If enabled, only the current song's status message will be kept.
If disabled, all status messages will remain in the channel.
"""
await self.config.guild(ctx.guild).clean_old_messages.set(enabled)
state = "will" if enabled else "will not"
await ctx.send(f"Old status messages {state} be deleted when new songs play.")
@commands.Cog.listener()
async def on_red_audio_track_start(self, guild: discord.Guild, track: dict, requester: discord.Member):
"""Post song information when a new track starts"""
if not guild or not track:
return
try:
# Get the status channel
channel_id = await self.config.guild(guild).status_channel()
if not channel_id:
return
channel = guild.get_channel(channel_id)
if not channel:
return
# Get the format and create the message
format_str = await self.config.guild(guild).status_format()
# Prepare variables
duration = track.length if hasattr(track, "length") else 0
minutes = duration // 60000 # Convert milliseconds to minutes
seconds = (duration % 60000) // 1000 # Convert remaining milliseconds to seconds
duration_str = f"{minutes}:{seconds:02d}"
message = format_str.format(
title=track.title if hasattr(track, "title") else "Unknown",
requester=requester.display_name,
duration=duration_str,
author=track.author if hasattr(track, "author") else "Unknown"
)
# Clean up old message if enabled
if await self.config.guild(guild).clean_old_messages():
last_message_id = await self.config.guild(guild).last_status_message()
if last_message_id:
try:
old_message = await channel.fetch_message(last_message_id)
await old_message.delete()
except (discord.NotFound, discord.Forbidden):
pass
# Send new message
new_message = await channel.send(message)
await self.config.guild(guild).last_status_message.set(new_message.id)
except Exception as e:
log.error(f"Error posting song update in {guild.name}: {e}")
@commands.Cog.listener()
async def on_red_audio_queue_end(self, guild: discord.Guild, *args, **kwargs):
"""Clean up status message when queue ends"""
if not guild:
return
try:
if await self.config.guild(guild).clean_old_messages():
channel_id = await self.config.guild(guild).status_channel()
if not channel_id:
return
channel = guild.get_channel(channel_id)
if not channel:
return
last_message_id = await self.config.guild(guild).last_status_message()
if last_message_id:
try:
old_message = await channel.fetch_message(last_message_id)
await old_message.delete()
except (discord.NotFound, discord.Forbidden):
pass
await self.config.guild(guild).last_status_message.set(None)
except Exception as e:
log.error(f"Error cleaning up status message in {guild.name}: {e}")
@commands.Cog.listener()
async def on_voice_state_update(self, member: discord.Member, before: discord.VoiceState, after: discord.VoiceState):
"""Reset channel name when bot leaves voice channel"""
if not member.guild:
return
# Check if this is the bot disconnecting
if member.id == self.bot.user.id and before.channel and not after.channel:
try:
# Reset channel name if we have the original stored and permissions
guild = member.guild
if not guild.me.guild_permissions.manage_channels:
return
original_name = await self.config.guild(guild).original_name()
if original_name and before.channel:
await before.channel.edit(name=original_name)
await self.config.guild(guild).original_name.set(None)
except Exception as e:
log.error(f"Error resetting channel name on disconnect: {e}")
@commands.Cog.listener()
async def on_red_audio_player_auto_disconnect(self, guild: discord.Guild):
"""Handle auto-disconnect cleanup"""
try:
if guild.me.guild_permissions.manage_channels:
original_name = await self.config.guild(guild).original_name()
if original_name:
voice_client = guild.voice_client
if voice_client and voice_client.channel:
await voice_client.channel.edit(name=original_name)
await self.config.guild(guild).original_name.set(None)
except Exception as e:
log.error(f"Error handling auto-disconnect cleanup: {e}")
@commands.Cog.listener()
async def on_red_audio_player_auto_pause(self, guild: discord.Guild):
"""Handle auto-pause"""
# We don't change the channel name on pause to maintain consistency
@commands.Cog.listener()
async def on_red_audio_player_pause(self, guild: discord.Guild):
"""Reset channel name when audio is paused"""
try:
if not await self.config.guild(guild).channel_status():
return
voice_client = guild.voice_client
if not voice_client or not voice_client.channel:
return
channel = voice_client.channel
original_name = await self.config.guild(guild).original_name()
if original_name and guild.me.guild_permissions.manage_channels:
await channel.edit(name=original_name)
except Exception as e:
log.error(f"Error resetting channel name on pause: {e}")
@commands.Cog.listener()
async def on_red_audio_player_stop(self, guild: discord.Guild):
"""Reset channel name when audio is stopped"""
try:
if not await self.config.guild(guild).channel_status():
return
voice_client = guild.voice_client
if not voice_client or not voice_client.channel:
return
channel = voice_client.channel
original_name = await self.config.guild(guild).original_name()
if original_name and guild.me.guild_permissions.manage_channels:
await channel.edit(name=original_name)
except Exception as e:
log.error(f"Error resetting channel name on stop: {e}")
async def cog_before_invoke(self, ctx: commands.Context):
"""Check if the voice channel can be used before any audio command."""
if not ctx.guild:
return True
# Only check audio-related commands
if not self.audio or ctx.cog != self.audio:
return True
# Get the voice channel
voice_channel = None
if ctx.author.voice:
voice_channel = ctx.author.voice.channel
if not await self.can_play_in_channel(voice_channel):
if await self.config.guild(ctx.guild).bot_managed_only():
await ctx.send("I can only play music in channels created by me.")
else:
allowed_channels = await self.config.guild(ctx.guild).allowed_channels()
if allowed_channels:
channels = [ctx.guild.get_channel(c).mention for c in allowed_channels if ctx.guild.get_channel(c)]
await ctx.send(f"I can only play music in these channels: {humanize_list(channels)}")
else:
await ctx.send("I cannot play music in this channel.")
return False
return True