455 lines
No EOL
19 KiB
Python
455 lines
No EOL
19 KiB
Python
import asyncio
|
|
import logging
|
|
import time
|
|
from typing import Optional, List, Dict
|
|
from collections import defaultdict
|
|
from datetime import datetime
|
|
|
|
import discord
|
|
from redbot.core import Config, commands
|
|
from redbot.core.bot import Red
|
|
from redbot.core.utils.chat_formatting import box, humanize_list
|
|
from redbot.core.utils.predicates import MessagePredicate
|
|
|
|
log = logging.getLogger("red.valerie.extendedaudio")
|
|
|
|
SUPPORTED_VARIABLES = {
|
|
"title": "Song title",
|
|
"requester": "User who requested the song",
|
|
"duration": "Duration of the song",
|
|
"author": "Song author/artist"
|
|
}
|
|
|
|
class ExtendedAudio(commands.Cog):
|
|
"""Extends Red's core Audio cog with additional features"""
|
|
|
|
def __init__(self, bot: Red):
|
|
self.bot = bot
|
|
self.config = Config.get_conf(self, identifier=117117117)
|
|
default_guild = {
|
|
"status_channel": None, # Channel ID to post song updates
|
|
"embed_color": None, # Custom embed color (optional)
|
|
"show_thumbnail": True, # Whether to show track thumbnails in embeds
|
|
"show_progress": True, # Whether to show a progress bar
|
|
"allowed_channels": [], # List of allowed voice channel IDs
|
|
"bot_managed_only": False, # Whether to only allow bot-created channels
|
|
"last_status_message": None, # ID of the last status message (for cleanup)
|
|
"clean_old_messages": True, # Whether to delete old status messages
|
|
"delete_play_messages": False, # Whether to delete play command messages
|
|
}
|
|
default_global = {
|
|
"disable_notify_command": True, # Whether to disable the audioset notify command
|
|
}
|
|
self.config.register_guild(**default_guild)
|
|
self.config.register_global(**default_global)
|
|
self.task = self.bot.loop.create_task(self.initialize())
|
|
self.audio = None
|
|
|
|
async def initialize(self):
|
|
"""Initialize the cog"""
|
|
await self.bot.wait_until_ready()
|
|
try:
|
|
self.audio = self.bot.get_cog("Audio")
|
|
if not self.audio:
|
|
log.error("Audio cog not loaded! Some features may not work.")
|
|
except Exception as e:
|
|
log.error(f"Failed to initialize ExtendedAudio: {e}")
|
|
|
|
def cog_unload(self):
|
|
"""Cleanup when cog unloads"""
|
|
if self.task:
|
|
self.task.cancel()
|
|
|
|
async def cog_check(self, ctx: commands.Context):
|
|
"""Check if Audio cog is loaded"""
|
|
if not self.audio:
|
|
self.audio = self.bot.get_cog("Audio")
|
|
if not self.audio:
|
|
await ctx.send("The Audio cog is not loaded. Please load it first with `[p]load audio`")
|
|
return False
|
|
return True
|
|
|
|
@commands.group(aliases=["eaudioset"])
|
|
@commands.guild_only()
|
|
@commands.admin_or_permissions(manage_guild=True)
|
|
async def extendedaudioset(self, ctx: commands.Context):
|
|
"""Configure ExtendedAudio settings"""
|
|
if not ctx.invoked_subcommand:
|
|
guild = ctx.guild
|
|
conf = await self.config.guild(guild).all()
|
|
status_channel = ctx.guild.get_channel(conf["status_channel"]) if conf["status_channel"] else None
|
|
embed_color = conf["embed_color"]
|
|
if embed_color:
|
|
embed_color = discord.Color(embed_color)
|
|
else:
|
|
embed_color = discord.Color.from_rgb(224, 17, 95) # #e0115f
|
|
|
|
embed = discord.Embed(
|
|
title="ExtendedAudio Settings",
|
|
color=embed_color,
|
|
timestamp=datetime.now()
|
|
)
|
|
embed.add_field(
|
|
name="Status Channel",
|
|
value=status_channel.mention if status_channel else "Not Set",
|
|
inline=False
|
|
)
|
|
embed.add_field(
|
|
name="Embed Color",
|
|
value=str(embed_color) if embed_color else "Default (Blue)",
|
|
inline=True
|
|
)
|
|
embed.add_field(
|
|
name="Show Thumbnails",
|
|
value="Enabled" if conf["show_thumbnail"] else "Disabled",
|
|
inline=True
|
|
)
|
|
embed.add_field(
|
|
name="Show Progress Bar",
|
|
value="Enabled" if conf["show_progress"] else "Disabled",
|
|
inline=True
|
|
)
|
|
embed.add_field(
|
|
name="Clean Old Messages",
|
|
value="Enabled" if conf["clean_old_messages"] else "Disabled",
|
|
inline=True
|
|
)
|
|
|
|
await ctx.send(embed=embed)
|
|
|
|
@extendedaudioset.command(name="statuschannel")
|
|
async def set_status_channel(self, ctx: commands.Context, channel: Optional[discord.TextChannel] = None):
|
|
"""Set the channel where song updates will be posted.
|
|
|
|
Leave empty to use the current channel.
|
|
"""
|
|
channel = channel or ctx.channel
|
|
await self.config.guild(ctx.guild).status_channel.set(channel.id)
|
|
await ctx.send(f"Song updates will now be posted in {channel.mention}")
|
|
|
|
@extendedaudioset.command(name="embedcolor")
|
|
async def set_embed_color(self, ctx: commands.Context, color: discord.Color = None):
|
|
"""Set the color for song update embeds.
|
|
|
|
Leave empty to reset to default (blue).
|
|
Use hex colors like #FF0000 for red.
|
|
"""
|
|
if color:
|
|
await self.config.guild(ctx.guild).embed_color.set(color.value)
|
|
msg = f"Embed color set to {color}"
|
|
else:
|
|
await self.config.guild(ctx.guild).embed_color.set(None)
|
|
msg = "Embed color reset to default (blue)"
|
|
|
|
embed = discord.Embed(
|
|
title="Color Updated",
|
|
description=msg,
|
|
color=color or discord.Color.from_rgb(224, 17, 95) # #e0115f
|
|
)
|
|
await ctx.send(embed=embed)
|
|
|
|
@extendedaudioset.command(name="thumbnail")
|
|
async def set_show_thumbnail(self, ctx: commands.Context, enabled: bool):
|
|
"""Set whether to show track thumbnails in embeds."""
|
|
await self.config.guild(ctx.guild).show_thumbnail.set(enabled)
|
|
state = "will" if enabled else "will not"
|
|
await ctx.send(f"Track thumbnails {state} be shown in song updates.")
|
|
|
|
@extendedaudioset.command(name="progress")
|
|
async def set_show_progress(self, ctx: commands.Context, enabled: bool):
|
|
"""Set whether to show a progress bar in embeds."""
|
|
await self.config.guild(ctx.guild).show_progress.set(enabled)
|
|
state = "will" if enabled else "will not"
|
|
await ctx.send(f"Progress bars {state} be shown in song updates.")
|
|
|
|
@extendedaudioset.command(name="cleanmessages")
|
|
async def set_clean_messages(self, ctx: commands.Context, enabled: bool):
|
|
"""Set whether to delete old status messages.
|
|
|
|
If enabled, only the current song's status message will be kept.
|
|
If disabled, all status messages will remain in the channel.
|
|
"""
|
|
await self.config.guild(ctx.guild).clean_old_messages.set(enabled)
|
|
state = "will" if enabled else "will not"
|
|
await ctx.send(f"Old status messages {state} be deleted when new songs play.")
|
|
|
|
@extendedaudioset.command(name="deleteplay")
|
|
async def toggle_delete_play(self, ctx: commands.Context):
|
|
"""Toggle whether play command messages should be automatically deleted.
|
|
|
|
When enabled, messages for play commands (like queue notifications) will be deleted after being sent.
|
|
This helps keep the chat clean when queuing multiple songs.
|
|
"""
|
|
current = await self.config.guild(ctx.guild).delete_play_messages()
|
|
await self.config.guild(ctx.guild).delete_play_messages.set(not current)
|
|
state = "enabled" if not current else "disabled"
|
|
await ctx.send(f"Auto-deletion of play command messages has been {state}.")
|
|
|
|
def format_duration(self, milliseconds: int) -> str:
|
|
"""Format milliseconds into a readable duration."""
|
|
seconds = milliseconds // 1000
|
|
minutes = seconds // 60
|
|
hours = minutes // 60
|
|
|
|
if hours > 0:
|
|
return f"{hours}:{minutes % 60:02d}:{seconds % 60:02d}"
|
|
else:
|
|
return f"{minutes}:{seconds % 60:02d}"
|
|
|
|
@commands.Cog.listener()
|
|
async def on_red_audio_track_start(self, guild: discord.Guild, track: dict, requester: discord.Member):
|
|
"""Post song information when a new track starts"""
|
|
if not guild or not track:
|
|
return
|
|
|
|
try:
|
|
# Get the status channel
|
|
channel_id = await self.config.guild(guild).status_channel()
|
|
if not channel_id:
|
|
return
|
|
|
|
channel = guild.get_channel(channel_id)
|
|
if not channel:
|
|
return
|
|
|
|
# Get guild settings
|
|
guild_settings = await self.config.guild(guild).all()
|
|
embed_color = discord.Color(guild_settings["embed_color"]) if guild_settings["embed_color"] else discord.Color.from_rgb(224, 17, 95) # #e0115f
|
|
|
|
# Create embed
|
|
embed = discord.Embed(
|
|
title="Now Playing",
|
|
color=embed_color,
|
|
timestamp=datetime.now()
|
|
)
|
|
|
|
# Add track info
|
|
embed.add_field(
|
|
name="Title",
|
|
value=track.title if hasattr(track, "title") else "Unknown",
|
|
inline=False
|
|
)
|
|
|
|
if hasattr(track, "author"):
|
|
embed.add_field(name="Artist", value=track.author, inline=True)
|
|
|
|
embed.add_field(name="Requested By", value=requester.mention, inline=True)
|
|
|
|
# Add duration
|
|
if hasattr(track, "length"):
|
|
embed.add_field(
|
|
name="Duration",
|
|
value=self.format_duration(track.length),
|
|
inline=True
|
|
)
|
|
|
|
# Add thumbnail if enabled and available
|
|
if (
|
|
guild_settings["show_thumbnail"]
|
|
and hasattr(track, "thumbnail")
|
|
and track.thumbnail
|
|
):
|
|
embed.set_thumbnail(url=track.thumbnail)
|
|
|
|
# Set footer with additional info
|
|
embed.set_footer(
|
|
text=f"Track {track.track_id if hasattr(track, 'track_id') else 'Unknown ID'}"
|
|
)
|
|
|
|
# Clean up old message if enabled
|
|
if guild_settings["clean_old_messages"]:
|
|
last_message_id = guild_settings["last_status_message"]
|
|
if last_message_id:
|
|
try:
|
|
old_message = await channel.fetch_message(last_message_id)
|
|
await old_message.delete()
|
|
except (discord.NotFound, discord.Forbidden):
|
|
pass
|
|
|
|
# Send new message
|
|
new_message = await channel.send(embed=embed)
|
|
await self.config.guild(guild).last_status_message.set(new_message.id)
|
|
|
|
except Exception as e:
|
|
log.error(f"Error posting song update in {guild.name}: {e}")
|
|
|
|
@commands.Cog.listener()
|
|
async def on_red_audio_queue_end(self, guild: discord.Guild, *args, **kwargs):
|
|
"""Clean up status message when queue ends"""
|
|
if not guild:
|
|
return
|
|
|
|
try:
|
|
if await self.config.guild(guild).clean_old_messages():
|
|
channel_id = await self.config.guild(guild).status_channel()
|
|
if not channel_id:
|
|
return
|
|
|
|
channel = guild.get_channel(channel_id)
|
|
if not channel:
|
|
return
|
|
|
|
last_message_id = await self.config.guild(guild).last_status_message()
|
|
if last_message_id:
|
|
try:
|
|
old_message = await channel.fetch_message(last_message_id)
|
|
await old_message.delete()
|
|
except (discord.NotFound, discord.Forbidden):
|
|
pass
|
|
await self.config.guild(guild).last_status_message.set(None)
|
|
|
|
except Exception as e:
|
|
log.error(f"Error cleaning up status message in {guild.name}: {e}")
|
|
|
|
@commands.Cog.listener()
|
|
async def on_voice_state_update(self, member: discord.Member, before: discord.VoiceState, after: discord.VoiceState):
|
|
"""Reset channel name when bot leaves voice channel"""
|
|
if not member.guild:
|
|
return
|
|
|
|
# Check if this is the bot disconnecting
|
|
if member.id == self.bot.user.id and before.channel and not after.channel:
|
|
try:
|
|
# Reset channel name if we have the original stored and permissions
|
|
guild = member.guild
|
|
if not guild.me.guild_permissions.manage_channels:
|
|
return
|
|
|
|
original_name = await self.config.guild(guild).original_name()
|
|
if original_name and before.channel:
|
|
await before.channel.edit(name=original_name)
|
|
await self.config.guild(guild).original_name.set(None)
|
|
except Exception as e:
|
|
log.error(f"Error resetting channel name on disconnect: {e}")
|
|
|
|
@commands.Cog.listener()
|
|
async def on_red_audio_player_auto_disconnect(self, guild: discord.Guild):
|
|
"""Handle auto-disconnect cleanup"""
|
|
try:
|
|
if guild.me.guild_permissions.manage_channels:
|
|
original_name = await self.config.guild(guild).original_name()
|
|
if original_name:
|
|
voice_client = guild.voice_client
|
|
if voice_client and voice_client.channel:
|
|
await voice_client.channel.edit(name=original_name)
|
|
await self.config.guild(guild).original_name.set(None)
|
|
except Exception as e:
|
|
log.error(f"Error handling auto-disconnect cleanup: {e}")
|
|
|
|
@commands.Cog.listener()
|
|
async def on_red_audio_player_auto_pause(self, guild: discord.Guild):
|
|
"""Handle auto-pause"""
|
|
# We don't change the channel name on pause to maintain consistency
|
|
|
|
@commands.Cog.listener()
|
|
async def on_red_audio_player_pause(self, guild: discord.Guild):
|
|
"""Reset channel name when audio is paused"""
|
|
try:
|
|
if not await self.config.guild(guild).channel_status():
|
|
return
|
|
|
|
voice_client = guild.voice_client
|
|
if not voice_client or not voice_client.channel:
|
|
return
|
|
|
|
channel = voice_client.channel
|
|
original_name = await self.config.guild(guild).original_name()
|
|
|
|
if original_name and guild.me.guild_permissions.manage_channels:
|
|
await channel.edit(name=original_name)
|
|
|
|
except Exception as e:
|
|
log.error(f"Error resetting channel name on pause: {e}")
|
|
|
|
@commands.Cog.listener()
|
|
async def on_red_audio_player_stop(self, guild: discord.Guild):
|
|
"""Reset channel name when audio is stopped"""
|
|
try:
|
|
if not await self.config.guild(guild).channel_status():
|
|
return
|
|
|
|
voice_client = guild.voice_client
|
|
if not voice_client or not voice_client.channel:
|
|
return
|
|
|
|
channel = voice_client.channel
|
|
original_name = await self.config.guild(guild).original_name()
|
|
|
|
if original_name and guild.me.guild_permissions.manage_channels:
|
|
await channel.edit(name=original_name)
|
|
|
|
except Exception as e:
|
|
log.error(f"Error resetting channel name on stop: {e}")
|
|
|
|
@commands.command()
|
|
@commands.is_owner()
|
|
async def togglenotify(self, ctx: commands.Context):
|
|
"""Toggle whether the audioset notify command should be disabled.
|
|
|
|
This command is bot owner only.
|
|
When disabled, users will be directed to use ExtendedAudio's notification features instead.
|
|
"""
|
|
current = await self.config.disable_notify_command()
|
|
await self.config.disable_notify_command.set(not current)
|
|
state = "disabled" if not current else "enabled"
|
|
await ctx.send(f"The audioset notify command is now {state}.")
|
|
|
|
async def cog_before_invoke(self, ctx: commands.Context):
|
|
"""Check if the voice channel can be used before any audio command."""
|
|
if not ctx.guild:
|
|
return True
|
|
|
|
# Check if we should block audioset notify
|
|
if await self.config.disable_notify_command():
|
|
if ctx.command.qualified_name == "audioset notify":
|
|
await ctx.send("This command is disabled. Please use the ExtendedAudio cog's notification features instead.")
|
|
return False
|
|
|
|
# Only check audio-related commands
|
|
if not self.audio or ctx.cog != self.audio:
|
|
return True
|
|
|
|
# Get the voice channel
|
|
voice_channel = None
|
|
if ctx.author.voice:
|
|
voice_channel = ctx.author.voice.channel
|
|
|
|
if not await self.can_play_in_channel(voice_channel):
|
|
if await self.config.guild(ctx.guild).bot_managed_only():
|
|
await ctx.send("I can only play music in channels created by me.")
|
|
else:
|
|
allowed_channels = await self.config.guild(ctx.guild).allowed_channels()
|
|
if allowed_channels:
|
|
channels = [ctx.guild.get_channel(c).mention for c in allowed_channels if ctx.guild.get_channel(c)]
|
|
await ctx.send(f"I can only play music in these channels: {humanize_list(channels)}")
|
|
else:
|
|
await ctx.send("I cannot play music in this channel.")
|
|
return False
|
|
|
|
return True
|
|
|
|
async def maybe_delete_response(self, ctx: commands.Context, message: discord.Message):
|
|
"""Delete a message if delete_play_messages is enabled."""
|
|
try:
|
|
if await self.config.guild(ctx.guild).delete_play_messages():
|
|
await message.delete(delay=5) # Delete after 5 seconds
|
|
except (discord.Forbidden, discord.NotFound):
|
|
pass
|
|
|
|
async def cog_after_invoke(self, ctx: commands.Context):
|
|
"""Handle message deletion after command invocation."""
|
|
if ctx.guild and hasattr(ctx, 'command'):
|
|
# Check if it's an audio play command
|
|
audio_play_commands = [
|
|
"play", "bumpplay", "forceplay", "playnow", "playnext",
|
|
"playlist play", "playlist start"
|
|
]
|
|
|
|
if ctx.command.qualified_name.lower() in audio_play_commands:
|
|
# If there's a stored response message
|
|
if hasattr(ctx, 'message') and ctx.message:
|
|
await self.maybe_delete_response(ctx, ctx.message)
|
|
# If there's a command response
|
|
if hasattr(ctx, 'sent_messages'):
|
|
for message in ctx.sent_messages:
|
|
await self.maybe_delete_response(ctx, message) |