import asyncio import logging import time from typing import Optional, List, Dict from collections import defaultdict from datetime import datetime import discord from redbot.core import Config, commands from redbot.core.bot import Red from redbot.core.utils.chat_formatting import box, humanize_list from redbot.core.utils.predicates import MessagePredicate log = logging.getLogger("red.valerie.extendedaudio") SUPPORTED_VARIABLES = { "title": "Song title", "requester": "User who requested the song", "duration": "Duration of the song", "author": "Song author/artist" } class ExtendedAudio(commands.Cog): """Extends Red's core Audio cog with additional features""" def __init__(self, bot: Red): self.bot = bot self.config = Config.get_conf(self, identifier=117117117) default_guild = { "status_channel": None, # Channel ID to post song updates "embed_color": None, # Custom embed color (optional) "show_thumbnail": True, # Whether to show track thumbnails in embeds "show_progress": True, # Whether to show a progress bar "allowed_channels": [], # List of allowed voice channel IDs "bot_managed_only": False, # Whether to only allow bot-created channels "last_status_message": None, # ID of the last status message (for cleanup) "clean_old_messages": True, # Whether to delete old status messages } default_global = { "disable_notify_command": True, # Whether to disable the audioset notify command } self.config.register_guild(**default_guild) self.config.register_global(**default_global) self.task = self.bot.loop.create_task(self.initialize()) self.audio = None async def initialize(self): """Initialize the cog""" await self.bot.wait_until_ready() try: self.audio = self.bot.get_cog("Audio") if not self.audio: log.error("Audio cog not loaded! Some features may not work.") except Exception as e: log.error(f"Failed to initialize ExtendedAudio: {e}") def cog_unload(self): """Cleanup when cog unloads""" if self.task: self.task.cancel() async def cog_check(self, ctx: commands.Context): """Check if Audio cog is loaded""" if not self.audio: self.audio = self.bot.get_cog("Audio") if not self.audio: await ctx.send("The Audio cog is not loaded. Please load it first with `[p]load audio`") return False return True @commands.group(aliases=["eaudioset"]) @commands.guild_only() @commands.admin_or_permissions(manage_guild=True) async def extendedaudioset(self, ctx: commands.Context): """Configure ExtendedAudio settings""" if not ctx.invoked_subcommand: guild = ctx.guild conf = await self.config.guild(guild).all() status_channel = ctx.guild.get_channel(conf["status_channel"]) if conf["status_channel"] else None embed_color = conf["embed_color"] if embed_color: embed_color = discord.Color(embed_color) else: embed_color = discord.Color.from_rgb(224, 17, 95) # #e0115f embed = discord.Embed( title="ExtendedAudio Settings", color=embed_color, timestamp=datetime.now() ) embed.add_field( name="Status Channel", value=status_channel.mention if status_channel else "Not Set", inline=False ) embed.add_field( name="Embed Color", value=str(embed_color) if embed_color else "Default (Blue)", inline=True ) embed.add_field( name="Show Thumbnails", value="Enabled" if conf["show_thumbnail"] else "Disabled", inline=True ) embed.add_field( name="Show Progress Bar", value="Enabled" if conf["show_progress"] else "Disabled", inline=True ) embed.add_field( name="Clean Old Messages", value="Enabled" if conf["clean_old_messages"] else "Disabled", inline=True ) await ctx.send(embed=embed) @extendedaudioset.command(name="statuschannel") async def set_status_channel(self, ctx: commands.Context, channel: Optional[discord.TextChannel] = None): """Set the channel where song updates will be posted. Leave empty to use the current channel. """ channel = channel or ctx.channel await self.config.guild(ctx.guild).status_channel.set(channel.id) await ctx.send(f"Song updates will now be posted in {channel.mention}") @extendedaudioset.command(name="embedcolor") async def set_embed_color(self, ctx: commands.Context, color: discord.Color = None): """Set the color for song update embeds. Leave empty to reset to default (blue). Use hex colors like #FF0000 for red. """ if color: await self.config.guild(ctx.guild).embed_color.set(color.value) msg = f"Embed color set to {color}" else: await self.config.guild(ctx.guild).embed_color.set(None) msg = "Embed color reset to default (blue)" embed = discord.Embed( title="Color Updated", description=msg, color=color or discord.Color.from_rgb(224, 17, 95) # #e0115f ) await ctx.send(embed=embed) @extendedaudioset.command(name="thumbnail") async def set_show_thumbnail(self, ctx: commands.Context, enabled: bool): """Set whether to show track thumbnails in embeds.""" await self.config.guild(ctx.guild).show_thumbnail.set(enabled) state = "will" if enabled else "will not" await ctx.send(f"Track thumbnails {state} be shown in song updates.") @extendedaudioset.command(name="progress") async def set_show_progress(self, ctx: commands.Context, enabled: bool): """Set whether to show a progress bar in embeds.""" await self.config.guild(ctx.guild).show_progress.set(enabled) state = "will" if enabled else "will not" await ctx.send(f"Progress bars {state} be shown in song updates.") @extendedaudioset.command(name="cleanmessages") async def set_clean_messages(self, ctx: commands.Context, enabled: bool): """Set whether to delete old status messages. If enabled, only the current song's status message will be kept. If disabled, all status messages will remain in the channel. """ await self.config.guild(ctx.guild).clean_old_messages.set(enabled) state = "will" if enabled else "will not" await ctx.send(f"Old status messages {state} be deleted when new songs play.") def format_duration(self, milliseconds: int) -> str: """Format milliseconds into a readable duration.""" seconds = milliseconds // 1000 minutes = seconds // 60 hours = minutes // 60 if hours > 0: return f"{hours}:{minutes % 60:02d}:{seconds % 60:02d}" else: return f"{minutes}:{seconds % 60:02d}" def create_progress_bar(self, current: int, total: int, length: int = 20) -> str: """Create a text progress bar.""" filled = int((current / total) * length) bar = "▰" * filled + "▱" * (length - filled) current_time = self.format_duration(current) total_time = self.format_duration(total) return f"{current_time} {bar} {total_time}" @commands.Cog.listener() async def on_red_audio_track_start(self, guild: discord.Guild, track: dict, requester: discord.Member): """Post song information when a new track starts""" if not guild or not track: return try: # Get the status channel channel_id = await self.config.guild(guild).status_channel() if not channel_id: return channel = guild.get_channel(channel_id) if not channel: return # Get guild settings guild_settings = await self.config.guild(guild).all() embed_color = discord.Color(guild_settings["embed_color"]) if guild_settings["embed_color"] else discord.Color.from_rgb(224, 17, 95) # #e0115f # Create embed embed = discord.Embed( title="Now Playing", color=embed_color, timestamp=datetime.now() ) # Add track info embed.add_field( name="Title", value=track.title if hasattr(track, "title") else "Unknown", inline=False ) if hasattr(track, "author"): embed.add_field(name="Artist", value=track.author, inline=True) embed.add_field(name="Requested By", value=requester.mention, inline=True) # Add duration/progress if guild_settings["show_progress"] and hasattr(track, "length"): duration = track.length progress_bar = self.create_progress_bar(0, duration) embed.add_field(name="Duration", value=progress_bar, inline=False) elif hasattr(track, "length"): embed.add_field( name="Duration", value=self.format_duration(track.length), inline=True ) # Add thumbnail if enabled and available if ( guild_settings["show_thumbnail"] and hasattr(track, "thumbnail") and track.thumbnail ): embed.set_thumbnail(url=track.thumbnail) # Set footer with additional info embed.set_footer( text=f"Track {track.track_id if hasattr(track, 'track_id') else 'Unknown ID'}" ) # Clean up old message if enabled if guild_settings["clean_old_messages"]: last_message_id = guild_settings["last_status_message"] if last_message_id: try: old_message = await channel.fetch_message(last_message_id) await old_message.delete() except (discord.NotFound, discord.Forbidden): pass # Send new message new_message = await channel.send(embed=embed) await self.config.guild(guild).last_status_message.set(new_message.id) except Exception as e: log.error(f"Error posting song update in {guild.name}: {e}") @commands.Cog.listener() async def on_red_audio_queue_end(self, guild: discord.Guild, *args, **kwargs): """Clean up status message when queue ends""" if not guild: return try: if await self.config.guild(guild).clean_old_messages(): channel_id = await self.config.guild(guild).status_channel() if not channel_id: return channel = guild.get_channel(channel_id) if not channel: return last_message_id = await self.config.guild(guild).last_status_message() if last_message_id: try: old_message = await channel.fetch_message(last_message_id) await old_message.delete() except (discord.NotFound, discord.Forbidden): pass await self.config.guild(guild).last_status_message.set(None) except Exception as e: log.error(f"Error cleaning up status message in {guild.name}: {e}") @commands.Cog.listener() async def on_voice_state_update(self, member: discord.Member, before: discord.VoiceState, after: discord.VoiceState): """Reset channel name when bot leaves voice channel""" if not member.guild: return # Check if this is the bot disconnecting if member.id == self.bot.user.id and before.channel and not after.channel: try: # Reset channel name if we have the original stored and permissions guild = member.guild if not guild.me.guild_permissions.manage_channels: return original_name = await self.config.guild(guild).original_name() if original_name and before.channel: await before.channel.edit(name=original_name) await self.config.guild(guild).original_name.set(None) except Exception as e: log.error(f"Error resetting channel name on disconnect: {e}") @commands.Cog.listener() async def on_red_audio_player_auto_disconnect(self, guild: discord.Guild): """Handle auto-disconnect cleanup""" try: if guild.me.guild_permissions.manage_channels: original_name = await self.config.guild(guild).original_name() if original_name: voice_client = guild.voice_client if voice_client and voice_client.channel: await voice_client.channel.edit(name=original_name) await self.config.guild(guild).original_name.set(None) except Exception as e: log.error(f"Error handling auto-disconnect cleanup: {e}") @commands.Cog.listener() async def on_red_audio_player_auto_pause(self, guild: discord.Guild): """Handle auto-pause""" # We don't change the channel name on pause to maintain consistency @commands.Cog.listener() async def on_red_audio_player_pause(self, guild: discord.Guild): """Reset channel name when audio is paused""" try: if not await self.config.guild(guild).channel_status(): return voice_client = guild.voice_client if not voice_client or not voice_client.channel: return channel = voice_client.channel original_name = await self.config.guild(guild).original_name() if original_name and guild.me.guild_permissions.manage_channels: await channel.edit(name=original_name) except Exception as e: log.error(f"Error resetting channel name on pause: {e}") @commands.Cog.listener() async def on_red_audio_player_stop(self, guild: discord.Guild): """Reset channel name when audio is stopped""" try: if not await self.config.guild(guild).channel_status(): return voice_client = guild.voice_client if not voice_client or not voice_client.channel: return channel = voice_client.channel original_name = await self.config.guild(guild).original_name() if original_name and guild.me.guild_permissions.manage_channels: await channel.edit(name=original_name) except Exception as e: log.error(f"Error resetting channel name on stop: {e}") @commands.command() @commands.is_owner() async def togglenotify(self, ctx: commands.Context): """Toggle whether the audioset notify command should be disabled. This command is bot owner only. When disabled, users will be directed to use ExtendedAudio's notification features instead. """ current = await self.config.disable_notify_command() await self.config.disable_notify_command.set(not current) state = "disabled" if not current else "enabled" await ctx.send(f"The audioset notify command is now {state}.") async def cog_before_invoke(self, ctx: commands.Context): """Check if the voice channel can be used before any audio command.""" if not ctx.guild: return True # Check if we should block audioset notify if await self.config.disable_notify_command(): if ctx.command.qualified_name == "audioset notify": await ctx.send("This command is disabled. Please use the ExtendedAudio cog's notification features instead.") return False # Only check audio-related commands if not self.audio or ctx.cog != self.audio: return True # Get the voice channel voice_channel = None if ctx.author.voice: voice_channel = ctx.author.voice.channel if not await self.can_play_in_channel(voice_channel): if await self.config.guild(ctx.guild).bot_managed_only(): await ctx.send("I can only play music in channels created by me.") else: allowed_channels = await self.config.guild(ctx.guild).allowed_channels() if allowed_channels: channels = [ctx.guild.get_channel(c).mention for c in allowed_channels if ctx.guild.get_channel(c)] await ctx.send(f"I can only play music in these channels: {humanize_list(channels)}") else: await ctx.send("I cannot play music in this channel.") return False return True