405 lines
No EOL
17 KiB
Python
405 lines
No EOL
17 KiB
Python
import asyncio
|
|
import logging
|
|
import time
|
|
from typing import Optional, List, Dict
|
|
from collections import defaultdict
|
|
from datetime import datetime
|
|
|
|
import discord
|
|
from redbot.core import Config, commands
|
|
from redbot.core.bot import Red
|
|
from redbot.core.utils.chat_formatting import box, humanize_list
|
|
from redbot.core.utils.predicates import MessagePredicate
|
|
|
|
log = logging.getLogger("red.valerie.extendedaudio")
|
|
|
|
SUPPORTED_VARIABLES = {
|
|
"title": "Song title",
|
|
"requester": "User who requested the song",
|
|
"duration": "Duration of the song",
|
|
"author": "Song author/artist"
|
|
}
|
|
|
|
class ExtendedAudio(commands.Cog):
|
|
"""Extends Red's core Audio cog with additional features"""
|
|
|
|
def __init__(self, bot: Red):
|
|
self.bot = bot
|
|
self.config = Config.get_conf(self, identifier=117117117)
|
|
default_guild = {
|
|
"status_channel": None, # Channel ID to post song updates
|
|
"embed_color": None, # Custom embed color (optional)
|
|
"show_thumbnail": True, # Whether to show track thumbnails in embeds
|
|
"show_progress": True, # Whether to show a progress bar
|
|
"allowed_channels": [], # List of allowed voice channel IDs
|
|
"bot_managed_only": False, # Whether to only allow bot-created channels
|
|
"last_status_message": None, # ID of the last status message (for cleanup)
|
|
"clean_old_messages": True, # Whether to delete old status messages
|
|
}
|
|
self.config.register_guild(**default_guild)
|
|
self.task = self.bot.loop.create_task(self.initialize())
|
|
self.audio = None
|
|
|
|
async def initialize(self):
|
|
"""Initialize the cog"""
|
|
await self.bot.wait_until_ready()
|
|
try:
|
|
self.audio = self.bot.get_cog("Audio")
|
|
if not self.audio:
|
|
log.error("Audio cog not loaded! Some features may not work.")
|
|
except Exception as e:
|
|
log.error(f"Failed to initialize ExtendedAudio: {e}")
|
|
|
|
def cog_unload(self):
|
|
"""Cleanup when cog unloads"""
|
|
if self.task:
|
|
self.task.cancel()
|
|
|
|
async def cog_check(self, ctx: commands.Context):
|
|
"""Check if Audio cog is loaded"""
|
|
if not self.audio:
|
|
self.audio = self.bot.get_cog("Audio")
|
|
if not self.audio:
|
|
await ctx.send("The Audio cog is not loaded. Please load it first with `[p]load audio`")
|
|
return False
|
|
return True
|
|
|
|
@commands.group(aliases=["eaudioset"])
|
|
@commands.guild_only()
|
|
@commands.admin_or_permissions(manage_guild=True)
|
|
async def extendedaudioset(self, ctx: commands.Context):
|
|
"""Configure ExtendedAudio settings"""
|
|
if not ctx.invoked_subcommand:
|
|
guild = ctx.guild
|
|
conf = await self.config.guild(guild).all()
|
|
status_channel = ctx.guild.get_channel(conf["status_channel"]) if conf["status_channel"] else None
|
|
embed_color = conf["embed_color"]
|
|
if embed_color:
|
|
embed_color = discord.Color(embed_color)
|
|
else:
|
|
embed_color = discord.Color.from_rgb(224, 17, 95) # #e0115f
|
|
|
|
embed = discord.Embed(
|
|
title="ExtendedAudio Settings",
|
|
color=embed_color,
|
|
timestamp=datetime.now()
|
|
)
|
|
embed.add_field(
|
|
name="Status Channel",
|
|
value=status_channel.mention if status_channel else "Not Set",
|
|
inline=False
|
|
)
|
|
embed.add_field(
|
|
name="Embed Color",
|
|
value=str(embed_color) if embed_color else "Default (Blue)",
|
|
inline=True
|
|
)
|
|
embed.add_field(
|
|
name="Show Thumbnails",
|
|
value="Enabled" if conf["show_thumbnail"] else "Disabled",
|
|
inline=True
|
|
)
|
|
embed.add_field(
|
|
name="Show Progress Bar",
|
|
value="Enabled" if conf["show_progress"] else "Disabled",
|
|
inline=True
|
|
)
|
|
embed.add_field(
|
|
name="Clean Old Messages",
|
|
value="Enabled" if conf["clean_old_messages"] else "Disabled",
|
|
inline=True
|
|
)
|
|
|
|
await ctx.send(embed=embed)
|
|
|
|
@extendedaudioset.command(name="statuschannel")
|
|
async def set_status_channel(self, ctx: commands.Context, channel: Optional[discord.TextChannel] = None):
|
|
"""Set the channel where song updates will be posted.
|
|
|
|
Leave empty to use the current channel.
|
|
"""
|
|
channel = channel or ctx.channel
|
|
await self.config.guild(ctx.guild).status_channel.set(channel.id)
|
|
await ctx.send(f"Song updates will now be posted in {channel.mention}")
|
|
|
|
@extendedaudioset.command(name="embedcolor")
|
|
async def set_embed_color(self, ctx: commands.Context, color: discord.Color = None):
|
|
"""Set the color for song update embeds.
|
|
|
|
Leave empty to reset to default (blue).
|
|
Use hex colors like #FF0000 for red.
|
|
"""
|
|
if color:
|
|
await self.config.guild(ctx.guild).embed_color.set(color.value)
|
|
msg = f"Embed color set to {color}"
|
|
else:
|
|
await self.config.guild(ctx.guild).embed_color.set(None)
|
|
msg = "Embed color reset to default (blue)"
|
|
|
|
embed = discord.Embed(
|
|
title="Color Updated",
|
|
description=msg,
|
|
color=color or discord.Color.from_rgb(224, 17, 95) # #e0115f
|
|
)
|
|
await ctx.send(embed=embed)
|
|
|
|
@extendedaudioset.command(name="thumbnail")
|
|
async def set_show_thumbnail(self, ctx: commands.Context, enabled: bool):
|
|
"""Set whether to show track thumbnails in embeds."""
|
|
await self.config.guild(ctx.guild).show_thumbnail.set(enabled)
|
|
state = "will" if enabled else "will not"
|
|
await ctx.send(f"Track thumbnails {state} be shown in song updates.")
|
|
|
|
@extendedaudioset.command(name="progress")
|
|
async def set_show_progress(self, ctx: commands.Context, enabled: bool):
|
|
"""Set whether to show a progress bar in embeds."""
|
|
await self.config.guild(ctx.guild).show_progress.set(enabled)
|
|
state = "will" if enabled else "will not"
|
|
await ctx.send(f"Progress bars {state} be shown in song updates.")
|
|
|
|
@extendedaudioset.command(name="cleanmessages")
|
|
async def set_clean_messages(self, ctx: commands.Context, enabled: bool):
|
|
"""Set whether to delete old status messages.
|
|
|
|
If enabled, only the current song's status message will be kept.
|
|
If disabled, all status messages will remain in the channel.
|
|
"""
|
|
await self.config.guild(ctx.guild).clean_old_messages.set(enabled)
|
|
state = "will" if enabled else "will not"
|
|
await ctx.send(f"Old status messages {state} be deleted when new songs play.")
|
|
|
|
def format_duration(self, milliseconds: int) -> str:
|
|
"""Format milliseconds into a readable duration."""
|
|
seconds = milliseconds // 1000
|
|
minutes = seconds // 60
|
|
hours = minutes // 60
|
|
|
|
if hours > 0:
|
|
return f"{hours}:{minutes % 60:02d}:{seconds % 60:02d}"
|
|
else:
|
|
return f"{minutes}:{seconds % 60:02d}"
|
|
|
|
def create_progress_bar(self, current: int, total: int, length: int = 20) -> str:
|
|
"""Create a text progress bar."""
|
|
filled = int((current / total) * length)
|
|
bar = "▰" * filled + "▱" * (length - filled)
|
|
current_time = self.format_duration(current)
|
|
total_time = self.format_duration(total)
|
|
return f"{current_time} {bar} {total_time}"
|
|
|
|
@commands.Cog.listener()
|
|
async def on_red_audio_track_start(self, guild: discord.Guild, track: dict, requester: discord.Member):
|
|
"""Post song information when a new track starts"""
|
|
if not guild or not track:
|
|
return
|
|
|
|
try:
|
|
# Get the status channel
|
|
channel_id = await self.config.guild(guild).status_channel()
|
|
if not channel_id:
|
|
return
|
|
|
|
channel = guild.get_channel(channel_id)
|
|
if not channel:
|
|
return
|
|
|
|
# Get guild settings
|
|
guild_settings = await self.config.guild(guild).all()
|
|
embed_color = discord.Color(guild_settings["embed_color"]) if guild_settings["embed_color"] else discord.Color.from_rgb(224, 17, 95) # #e0115f
|
|
|
|
# Create embed
|
|
embed = discord.Embed(
|
|
title="Now Playing",
|
|
color=embed_color,
|
|
timestamp=datetime.now()
|
|
)
|
|
|
|
# Add track info
|
|
embed.add_field(
|
|
name="Title",
|
|
value=track.title if hasattr(track, "title") else "Unknown",
|
|
inline=False
|
|
)
|
|
|
|
if hasattr(track, "author"):
|
|
embed.add_field(name="Artist", value=track.author, inline=True)
|
|
|
|
embed.add_field(name="Requested By", value=requester.mention, inline=True)
|
|
|
|
# Add duration/progress
|
|
if guild_settings["show_progress"] and hasattr(track, "length"):
|
|
duration = track.length
|
|
progress_bar = self.create_progress_bar(0, duration)
|
|
embed.add_field(name="Duration", value=progress_bar, inline=False)
|
|
elif hasattr(track, "length"):
|
|
embed.add_field(
|
|
name="Duration",
|
|
value=self.format_duration(track.length),
|
|
inline=True
|
|
)
|
|
|
|
# Add thumbnail if enabled and available
|
|
if (
|
|
guild_settings["show_thumbnail"]
|
|
and hasattr(track, "thumbnail")
|
|
and track.thumbnail
|
|
):
|
|
embed.set_thumbnail(url=track.thumbnail)
|
|
|
|
# Set footer with additional info
|
|
embed.set_footer(
|
|
text=f"Track {track.track_id if hasattr(track, 'track_id') else 'Unknown ID'}"
|
|
)
|
|
|
|
# Clean up old message if enabled
|
|
if guild_settings["clean_old_messages"]:
|
|
last_message_id = guild_settings["last_status_message"]
|
|
if last_message_id:
|
|
try:
|
|
old_message = await channel.fetch_message(last_message_id)
|
|
await old_message.delete()
|
|
except (discord.NotFound, discord.Forbidden):
|
|
pass
|
|
|
|
# Send new message
|
|
new_message = await channel.send(embed=embed)
|
|
await self.config.guild(guild).last_status_message.set(new_message.id)
|
|
|
|
except Exception as e:
|
|
log.error(f"Error posting song update in {guild.name}: {e}")
|
|
|
|
@commands.Cog.listener()
|
|
async def on_red_audio_queue_end(self, guild: discord.Guild, *args, **kwargs):
|
|
"""Clean up status message when queue ends"""
|
|
if not guild:
|
|
return
|
|
|
|
try:
|
|
if await self.config.guild(guild).clean_old_messages():
|
|
channel_id = await self.config.guild(guild).status_channel()
|
|
if not channel_id:
|
|
return
|
|
|
|
channel = guild.get_channel(channel_id)
|
|
if not channel:
|
|
return
|
|
|
|
last_message_id = await self.config.guild(guild).last_status_message()
|
|
if last_message_id:
|
|
try:
|
|
old_message = await channel.fetch_message(last_message_id)
|
|
await old_message.delete()
|
|
except (discord.NotFound, discord.Forbidden):
|
|
pass
|
|
await self.config.guild(guild).last_status_message.set(None)
|
|
|
|
except Exception as e:
|
|
log.error(f"Error cleaning up status message in {guild.name}: {e}")
|
|
|
|
@commands.Cog.listener()
|
|
async def on_voice_state_update(self, member: discord.Member, before: discord.VoiceState, after: discord.VoiceState):
|
|
"""Reset channel name when bot leaves voice channel"""
|
|
if not member.guild:
|
|
return
|
|
|
|
# Check if this is the bot disconnecting
|
|
if member.id == self.bot.user.id and before.channel and not after.channel:
|
|
try:
|
|
# Reset channel name if we have the original stored and permissions
|
|
guild = member.guild
|
|
if not guild.me.guild_permissions.manage_channels:
|
|
return
|
|
|
|
original_name = await self.config.guild(guild).original_name()
|
|
if original_name and before.channel:
|
|
await before.channel.edit(name=original_name)
|
|
await self.config.guild(guild).original_name.set(None)
|
|
except Exception as e:
|
|
log.error(f"Error resetting channel name on disconnect: {e}")
|
|
|
|
@commands.Cog.listener()
|
|
async def on_red_audio_player_auto_disconnect(self, guild: discord.Guild):
|
|
"""Handle auto-disconnect cleanup"""
|
|
try:
|
|
if guild.me.guild_permissions.manage_channels:
|
|
original_name = await self.config.guild(guild).original_name()
|
|
if original_name:
|
|
voice_client = guild.voice_client
|
|
if voice_client and voice_client.channel:
|
|
await voice_client.channel.edit(name=original_name)
|
|
await self.config.guild(guild).original_name.set(None)
|
|
except Exception as e:
|
|
log.error(f"Error handling auto-disconnect cleanup: {e}")
|
|
|
|
@commands.Cog.listener()
|
|
async def on_red_audio_player_auto_pause(self, guild: discord.Guild):
|
|
"""Handle auto-pause"""
|
|
# We don't change the channel name on pause to maintain consistency
|
|
|
|
@commands.Cog.listener()
|
|
async def on_red_audio_player_pause(self, guild: discord.Guild):
|
|
"""Reset channel name when audio is paused"""
|
|
try:
|
|
if not await self.config.guild(guild).channel_status():
|
|
return
|
|
|
|
voice_client = guild.voice_client
|
|
if not voice_client or not voice_client.channel:
|
|
return
|
|
|
|
channel = voice_client.channel
|
|
original_name = await self.config.guild(guild).original_name()
|
|
|
|
if original_name and guild.me.guild_permissions.manage_channels:
|
|
await channel.edit(name=original_name)
|
|
|
|
except Exception as e:
|
|
log.error(f"Error resetting channel name on pause: {e}")
|
|
|
|
@commands.Cog.listener()
|
|
async def on_red_audio_player_stop(self, guild: discord.Guild):
|
|
"""Reset channel name when audio is stopped"""
|
|
try:
|
|
if not await self.config.guild(guild).channel_status():
|
|
return
|
|
|
|
voice_client = guild.voice_client
|
|
if not voice_client or not voice_client.channel:
|
|
return
|
|
|
|
channel = voice_client.channel
|
|
original_name = await self.config.guild(guild).original_name()
|
|
|
|
if original_name and guild.me.guild_permissions.manage_channels:
|
|
await channel.edit(name=original_name)
|
|
|
|
except Exception as e:
|
|
log.error(f"Error resetting channel name on stop: {e}")
|
|
|
|
async def cog_before_invoke(self, ctx: commands.Context):
|
|
"""Check if the voice channel can be used before any audio command."""
|
|
if not ctx.guild:
|
|
return True
|
|
|
|
# Only check audio-related commands
|
|
if not self.audio or ctx.cog != self.audio:
|
|
return True
|
|
|
|
# Get the voice channel
|
|
voice_channel = None
|
|
if ctx.author.voice:
|
|
voice_channel = ctx.author.voice.channel
|
|
|
|
if not await self.can_play_in_channel(voice_channel):
|
|
if await self.config.guild(ctx.guild).bot_managed_only():
|
|
await ctx.send("I can only play music in channels created by me.")
|
|
else:
|
|
allowed_channels = await self.config.guild(ctx.guild).allowed_channels()
|
|
if allowed_channels:
|
|
channels = [ctx.guild.get_channel(c).mention for c in allowed_channels if ctx.guild.get_channel(c)]
|
|
await ctx.send(f"I can only play music in these channels: {humanize_list(channels)}")
|
|
else:
|
|
await ctx.send("I cannot play music in this channel.")
|
|
return False
|
|
|
|
return True |