Some checks are pending
Run pre-commit / Run pre-commit (push) Waiting to run
This commit introduces a new configuration option, `delete_play_messages`, allowing users to enable or disable the automatic deletion of messages related to play commands. A new command, `deleteplay`, is added to toggle this setting, helping to maintain a cleaner chat environment during music playback. Additionally, the cog is updated to handle message deletion after play commands are invoked.
467 lines
No EOL
20 KiB
Python
467 lines
No EOL
20 KiB
Python
import asyncio
|
|
import logging
|
|
import time
|
|
from typing import Optional, List, Dict
|
|
from collections import defaultdict
|
|
from datetime import datetime
|
|
|
|
import discord
|
|
from redbot.core import Config, commands
|
|
from redbot.core.bot import Red
|
|
from redbot.core.utils.chat_formatting import box, humanize_list
|
|
from redbot.core.utils.predicates import MessagePredicate
|
|
|
|
log = logging.getLogger("red.valerie.extendedaudio")
|
|
|
|
SUPPORTED_VARIABLES = {
|
|
"title": "Song title",
|
|
"requester": "User who requested the song",
|
|
"duration": "Duration of the song",
|
|
"author": "Song author/artist"
|
|
}
|
|
|
|
class ExtendedAudio(commands.Cog):
|
|
"""Extends Red's core Audio cog with additional features"""
|
|
|
|
def __init__(self, bot: Red):
|
|
self.bot = bot
|
|
self.config = Config.get_conf(self, identifier=117117117)
|
|
default_guild = {
|
|
"status_channel": None, # Channel ID to post song updates
|
|
"embed_color": None, # Custom embed color (optional)
|
|
"show_thumbnail": True, # Whether to show track thumbnails in embeds
|
|
"show_progress": True, # Whether to show a progress bar
|
|
"allowed_channels": [], # List of allowed voice channel IDs
|
|
"bot_managed_only": False, # Whether to only allow bot-created channels
|
|
"last_status_message": None, # ID of the last status message (for cleanup)
|
|
"clean_old_messages": True, # Whether to delete old status messages
|
|
"delete_play_messages": False, # Whether to delete play command messages
|
|
}
|
|
default_global = {
|
|
"disable_notify_command": True, # Whether to disable the audioset notify command
|
|
}
|
|
self.config.register_guild(**default_guild)
|
|
self.config.register_global(**default_global)
|
|
self.task = self.bot.loop.create_task(self.initialize())
|
|
self.audio = None
|
|
|
|
async def initialize(self):
|
|
"""Initialize the cog"""
|
|
await self.bot.wait_until_ready()
|
|
try:
|
|
self.audio = self.bot.get_cog("Audio")
|
|
if not self.audio:
|
|
log.error("Audio cog not loaded! Some features may not work.")
|
|
except Exception as e:
|
|
log.error(f"Failed to initialize ExtendedAudio: {e}")
|
|
|
|
def cog_unload(self):
|
|
"""Cleanup when cog unloads"""
|
|
if self.task:
|
|
self.task.cancel()
|
|
|
|
async def cog_check(self, ctx: commands.Context):
|
|
"""Check if Audio cog is loaded"""
|
|
if not self.audio:
|
|
self.audio = self.bot.get_cog("Audio")
|
|
if not self.audio:
|
|
await ctx.send("The Audio cog is not loaded. Please load it first with `[p]load audio`")
|
|
return False
|
|
return True
|
|
|
|
@commands.group(aliases=["eaudioset"])
|
|
@commands.guild_only()
|
|
@commands.admin_or_permissions(manage_guild=True)
|
|
async def extendedaudioset(self, ctx: commands.Context):
|
|
"""Configure ExtendedAudio settings"""
|
|
if not ctx.invoked_subcommand:
|
|
guild = ctx.guild
|
|
conf = await self.config.guild(guild).all()
|
|
status_channel = ctx.guild.get_channel(conf["status_channel"]) if conf["status_channel"] else None
|
|
embed_color = conf["embed_color"]
|
|
if embed_color:
|
|
embed_color = discord.Color(embed_color)
|
|
else:
|
|
embed_color = discord.Color.from_rgb(224, 17, 95) # #e0115f
|
|
|
|
embed = discord.Embed(
|
|
title="ExtendedAudio Settings",
|
|
color=embed_color,
|
|
timestamp=datetime.now()
|
|
)
|
|
embed.add_field(
|
|
name="Status Channel",
|
|
value=status_channel.mention if status_channel else "Not Set",
|
|
inline=False
|
|
)
|
|
embed.add_field(
|
|
name="Embed Color",
|
|
value=str(embed_color) if embed_color else "Default (Blue)",
|
|
inline=True
|
|
)
|
|
embed.add_field(
|
|
name="Show Thumbnails",
|
|
value="Enabled" if conf["show_thumbnail"] else "Disabled",
|
|
inline=True
|
|
)
|
|
embed.add_field(
|
|
name="Show Progress Bar",
|
|
value="Enabled" if conf["show_progress"] else "Disabled",
|
|
inline=True
|
|
)
|
|
embed.add_field(
|
|
name="Clean Old Messages",
|
|
value="Enabled" if conf["clean_old_messages"] else "Disabled",
|
|
inline=True
|
|
)
|
|
|
|
await ctx.send(embed=embed)
|
|
|
|
@extendedaudioset.command(name="statuschannel")
|
|
async def set_status_channel(self, ctx: commands.Context, channel: Optional[discord.TextChannel] = None):
|
|
"""Set the channel where song updates will be posted.
|
|
|
|
Leave empty to use the current channel.
|
|
"""
|
|
channel = channel or ctx.channel
|
|
await self.config.guild(ctx.guild).status_channel.set(channel.id)
|
|
await ctx.send(f"Song updates will now be posted in {channel.mention}")
|
|
|
|
@extendedaudioset.command(name="embedcolor")
|
|
async def set_embed_color(self, ctx: commands.Context, color: discord.Color = None):
|
|
"""Set the color for song update embeds.
|
|
|
|
Leave empty to reset to default (blue).
|
|
Use hex colors like #FF0000 for red.
|
|
"""
|
|
if color:
|
|
await self.config.guild(ctx.guild).embed_color.set(color.value)
|
|
msg = f"Embed color set to {color}"
|
|
else:
|
|
await self.config.guild(ctx.guild).embed_color.set(None)
|
|
msg = "Embed color reset to default (blue)"
|
|
|
|
embed = discord.Embed(
|
|
title="Color Updated",
|
|
description=msg,
|
|
color=color or discord.Color.from_rgb(224, 17, 95) # #e0115f
|
|
)
|
|
await ctx.send(embed=embed)
|
|
|
|
@extendedaudioset.command(name="thumbnail")
|
|
async def set_show_thumbnail(self, ctx: commands.Context, enabled: bool):
|
|
"""Set whether to show track thumbnails in embeds."""
|
|
await self.config.guild(ctx.guild).show_thumbnail.set(enabled)
|
|
state = "will" if enabled else "will not"
|
|
await ctx.send(f"Track thumbnails {state} be shown in song updates.")
|
|
|
|
@extendedaudioset.command(name="progress")
|
|
async def set_show_progress(self, ctx: commands.Context, enabled: bool):
|
|
"""Set whether to show a progress bar in embeds."""
|
|
await self.config.guild(ctx.guild).show_progress.set(enabled)
|
|
state = "will" if enabled else "will not"
|
|
await ctx.send(f"Progress bars {state} be shown in song updates.")
|
|
|
|
@extendedaudioset.command(name="cleanmessages")
|
|
async def set_clean_messages(self, ctx: commands.Context, enabled: bool):
|
|
"""Set whether to delete old status messages.
|
|
|
|
If enabled, only the current song's status message will be kept.
|
|
If disabled, all status messages will remain in the channel.
|
|
"""
|
|
await self.config.guild(ctx.guild).clean_old_messages.set(enabled)
|
|
state = "will" if enabled else "will not"
|
|
await ctx.send(f"Old status messages {state} be deleted when new songs play.")
|
|
|
|
@extendedaudioset.command(name="deleteplay")
|
|
async def toggle_delete_play(self, ctx: commands.Context):
|
|
"""Toggle whether play command messages should be automatically deleted.
|
|
|
|
When enabled, messages for play commands (like queue notifications) will be deleted after being sent.
|
|
This helps keep the chat clean when queuing multiple songs.
|
|
"""
|
|
current = await self.config.guild(ctx.guild).delete_play_messages()
|
|
await self.config.guild(ctx.guild).delete_play_messages.set(not current)
|
|
state = "enabled" if not current else "disabled"
|
|
await ctx.send(f"Auto-deletion of play command messages has been {state}.")
|
|
|
|
def format_duration(self, milliseconds: int) -> str:
|
|
"""Format milliseconds into a readable duration."""
|
|
seconds = milliseconds // 1000
|
|
minutes = seconds // 60
|
|
hours = minutes // 60
|
|
|
|
if hours > 0:
|
|
return f"{hours}:{minutes % 60:02d}:{seconds % 60:02d}"
|
|
else:
|
|
return f"{minutes}:{seconds % 60:02d}"
|
|
|
|
def create_progress_bar(self, current: int, total: int, length: int = 20) -> str:
|
|
"""Create a text progress bar."""
|
|
filled = int((current / total) * length)
|
|
bar = "▰" * filled + "▱" * (length - filled)
|
|
current_time = self.format_duration(current)
|
|
total_time = self.format_duration(total)
|
|
return f"{current_time} {bar} {total_time}"
|
|
|
|
@commands.Cog.listener()
|
|
async def on_red_audio_track_start(self, guild: discord.Guild, track: dict, requester: discord.Member):
|
|
"""Post song information when a new track starts"""
|
|
if not guild or not track:
|
|
return
|
|
|
|
try:
|
|
# Get the status channel
|
|
channel_id = await self.config.guild(guild).status_channel()
|
|
if not channel_id:
|
|
return
|
|
|
|
channel = guild.get_channel(channel_id)
|
|
if not channel:
|
|
return
|
|
|
|
# Get guild settings
|
|
guild_settings = await self.config.guild(guild).all()
|
|
embed_color = discord.Color(guild_settings["embed_color"]) if guild_settings["embed_color"] else discord.Color.from_rgb(224, 17, 95) # #e0115f
|
|
|
|
# Create embed
|
|
embed = discord.Embed(
|
|
title="Now Playing",
|
|
color=embed_color,
|
|
timestamp=datetime.now()
|
|
)
|
|
|
|
# Add track info
|
|
embed.add_field(
|
|
name="Title",
|
|
value=track.title if hasattr(track, "title") else "Unknown",
|
|
inline=False
|
|
)
|
|
|
|
if hasattr(track, "author"):
|
|
embed.add_field(name="Artist", value=track.author, inline=True)
|
|
|
|
embed.add_field(name="Requested By", value=requester.mention, inline=True)
|
|
|
|
# Add duration/progress
|
|
if guild_settings["show_progress"] and hasattr(track, "length"):
|
|
duration = track.length
|
|
progress_bar = self.create_progress_bar(0, duration)
|
|
embed.add_field(name="Duration", value=progress_bar, inline=False)
|
|
elif hasattr(track, "length"):
|
|
embed.add_field(
|
|
name="Duration",
|
|
value=self.format_duration(track.length),
|
|
inline=True
|
|
)
|
|
|
|
# Add thumbnail if enabled and available
|
|
if (
|
|
guild_settings["show_thumbnail"]
|
|
and hasattr(track, "thumbnail")
|
|
and track.thumbnail
|
|
):
|
|
embed.set_thumbnail(url=track.thumbnail)
|
|
|
|
# Set footer with additional info
|
|
embed.set_footer(
|
|
text=f"Track {track.track_id if hasattr(track, 'track_id') else 'Unknown ID'}"
|
|
)
|
|
|
|
# Clean up old message if enabled
|
|
if guild_settings["clean_old_messages"]:
|
|
last_message_id = guild_settings["last_status_message"]
|
|
if last_message_id:
|
|
try:
|
|
old_message = await channel.fetch_message(last_message_id)
|
|
await old_message.delete()
|
|
except (discord.NotFound, discord.Forbidden):
|
|
pass
|
|
|
|
# Send new message
|
|
new_message = await channel.send(embed=embed)
|
|
await self.config.guild(guild).last_status_message.set(new_message.id)
|
|
|
|
except Exception as e:
|
|
log.error(f"Error posting song update in {guild.name}: {e}")
|
|
|
|
@commands.Cog.listener()
|
|
async def on_red_audio_queue_end(self, guild: discord.Guild, *args, **kwargs):
|
|
"""Clean up status message when queue ends"""
|
|
if not guild:
|
|
return
|
|
|
|
try:
|
|
if await self.config.guild(guild).clean_old_messages():
|
|
channel_id = await self.config.guild(guild).status_channel()
|
|
if not channel_id:
|
|
return
|
|
|
|
channel = guild.get_channel(channel_id)
|
|
if not channel:
|
|
return
|
|
|
|
last_message_id = await self.config.guild(guild).last_status_message()
|
|
if last_message_id:
|
|
try:
|
|
old_message = await channel.fetch_message(last_message_id)
|
|
await old_message.delete()
|
|
except (discord.NotFound, discord.Forbidden):
|
|
pass
|
|
await self.config.guild(guild).last_status_message.set(None)
|
|
|
|
except Exception as e:
|
|
log.error(f"Error cleaning up status message in {guild.name}: {e}")
|
|
|
|
@commands.Cog.listener()
|
|
async def on_voice_state_update(self, member: discord.Member, before: discord.VoiceState, after: discord.VoiceState):
|
|
"""Reset channel name when bot leaves voice channel"""
|
|
if not member.guild:
|
|
return
|
|
|
|
# Check if this is the bot disconnecting
|
|
if member.id == self.bot.user.id and before.channel and not after.channel:
|
|
try:
|
|
# Reset channel name if we have the original stored and permissions
|
|
guild = member.guild
|
|
if not guild.me.guild_permissions.manage_channels:
|
|
return
|
|
|
|
original_name = await self.config.guild(guild).original_name()
|
|
if original_name and before.channel:
|
|
await before.channel.edit(name=original_name)
|
|
await self.config.guild(guild).original_name.set(None)
|
|
except Exception as e:
|
|
log.error(f"Error resetting channel name on disconnect: {e}")
|
|
|
|
@commands.Cog.listener()
|
|
async def on_red_audio_player_auto_disconnect(self, guild: discord.Guild):
|
|
"""Handle auto-disconnect cleanup"""
|
|
try:
|
|
if guild.me.guild_permissions.manage_channels:
|
|
original_name = await self.config.guild(guild).original_name()
|
|
if original_name:
|
|
voice_client = guild.voice_client
|
|
if voice_client and voice_client.channel:
|
|
await voice_client.channel.edit(name=original_name)
|
|
await self.config.guild(guild).original_name.set(None)
|
|
except Exception as e:
|
|
log.error(f"Error handling auto-disconnect cleanup: {e}")
|
|
|
|
@commands.Cog.listener()
|
|
async def on_red_audio_player_auto_pause(self, guild: discord.Guild):
|
|
"""Handle auto-pause"""
|
|
# We don't change the channel name on pause to maintain consistency
|
|
|
|
@commands.Cog.listener()
|
|
async def on_red_audio_player_pause(self, guild: discord.Guild):
|
|
"""Reset channel name when audio is paused"""
|
|
try:
|
|
if not await self.config.guild(guild).channel_status():
|
|
return
|
|
|
|
voice_client = guild.voice_client
|
|
if not voice_client or not voice_client.channel:
|
|
return
|
|
|
|
channel = voice_client.channel
|
|
original_name = await self.config.guild(guild).original_name()
|
|
|
|
if original_name and guild.me.guild_permissions.manage_channels:
|
|
await channel.edit(name=original_name)
|
|
|
|
except Exception as e:
|
|
log.error(f"Error resetting channel name on pause: {e}")
|
|
|
|
@commands.Cog.listener()
|
|
async def on_red_audio_player_stop(self, guild: discord.Guild):
|
|
"""Reset channel name when audio is stopped"""
|
|
try:
|
|
if not await self.config.guild(guild).channel_status():
|
|
return
|
|
|
|
voice_client = guild.voice_client
|
|
if not voice_client or not voice_client.channel:
|
|
return
|
|
|
|
channel = voice_client.channel
|
|
original_name = await self.config.guild(guild).original_name()
|
|
|
|
if original_name and guild.me.guild_permissions.manage_channels:
|
|
await channel.edit(name=original_name)
|
|
|
|
except Exception as e:
|
|
log.error(f"Error resetting channel name on stop: {e}")
|
|
|
|
@commands.command()
|
|
@commands.is_owner()
|
|
async def togglenotify(self, ctx: commands.Context):
|
|
"""Toggle whether the audioset notify command should be disabled.
|
|
|
|
This command is bot owner only.
|
|
When disabled, users will be directed to use ExtendedAudio's notification features instead.
|
|
"""
|
|
current = await self.config.disable_notify_command()
|
|
await self.config.disable_notify_command.set(not current)
|
|
state = "disabled" if not current else "enabled"
|
|
await ctx.send(f"The audioset notify command is now {state}.")
|
|
|
|
async def cog_before_invoke(self, ctx: commands.Context):
|
|
"""Check if the voice channel can be used before any audio command."""
|
|
if not ctx.guild:
|
|
return True
|
|
|
|
# Check if we should block audioset notify
|
|
if await self.config.disable_notify_command():
|
|
if ctx.command.qualified_name == "audioset notify":
|
|
await ctx.send("This command is disabled. Please use the ExtendedAudio cog's notification features instead.")
|
|
return False
|
|
|
|
# Only check audio-related commands
|
|
if not self.audio or ctx.cog != self.audio:
|
|
return True
|
|
|
|
# Get the voice channel
|
|
voice_channel = None
|
|
if ctx.author.voice:
|
|
voice_channel = ctx.author.voice.channel
|
|
|
|
if not await self.can_play_in_channel(voice_channel):
|
|
if await self.config.guild(ctx.guild).bot_managed_only():
|
|
await ctx.send("I can only play music in channels created by me.")
|
|
else:
|
|
allowed_channels = await self.config.guild(ctx.guild).allowed_channels()
|
|
if allowed_channels:
|
|
channels = [ctx.guild.get_channel(c).mention for c in allowed_channels if ctx.guild.get_channel(c)]
|
|
await ctx.send(f"I can only play music in these channels: {humanize_list(channels)}")
|
|
else:
|
|
await ctx.send("I cannot play music in this channel.")
|
|
return False
|
|
|
|
return True
|
|
|
|
async def maybe_delete_response(self, ctx: commands.Context, message: discord.Message):
|
|
"""Delete a message if delete_play_messages is enabled."""
|
|
try:
|
|
if await self.config.guild(ctx.guild).delete_play_messages():
|
|
await message.delete(delay=5) # Delete after 5 seconds
|
|
except (discord.Forbidden, discord.NotFound):
|
|
pass
|
|
|
|
async def cog_after_invoke(self, ctx: commands.Context):
|
|
"""Handle message deletion after command invocation."""
|
|
if ctx.guild and hasattr(ctx, 'command'):
|
|
# Check if it's an audio play command
|
|
audio_play_commands = [
|
|
"play", "bumpplay", "forceplay", "playnow", "playnext",
|
|
"playlist play", "playlist start"
|
|
]
|
|
|
|
if ctx.command.qualified_name.lower() in audio_play_commands:
|
|
# If there's a stored response message
|
|
if hasattr(ctx, 'message') and ctx.message:
|
|
await self.maybe_delete_response(ctx, ctx.message)
|
|
# If there's a command response
|
|
if hasattr(ctx, 'sent_messages'):
|
|
for message in ctx.sent_messages:
|
|
await self.maybe_delete_response(ctx, message) |