Ruby-Cogs/autoplay/autoplay.py
Valerie e700af61cb
Some checks are pending
Run pre-commit / Run pre-commit (push) Waiting to run
Upload 2 Cogs
2025-04-27 08:06:54 -04:00

240 lines
9.3 KiB
Python

from typing import Optional
import discord
from discord import AppCommandType
from redbot.core import Config, commands
from redbot.core.i18n import Translator, cog_i18n
from pylav.events.player import PlayerDisconnectedEvent
from pylav.logging import getLogger
from pylav.players.player import Player
from pylav.players.query.obj import Query
from pylav.type_hints.bot import DISCORD_BOT_TYPE, DISCORD_INTERACTION_TYPE
log = getLogger("PyLav.3rdpt.karlo-cogs.autoplay")
_ = Translator("AutoPlay", __file__)
@cog_i18n(_)
class AutoPlay(commands.Cog):
"""Automatically play music that a guild member is listening to on Spotify."""
def __init__(self, bot: DISCORD_BOT_TYPE):
self.bot: DISCORD_BOT_TYPE = bot
self.config = Config.get_conf(self, identifier=87446677010550784, force_registration=True)
default_guild = {"tracked_member": None, "autoplaying": False, "paused_track": None}
self.config.register_guild(**default_guild)
self.context_user_autoplay = discord.app_commands.ContextMenu(
name=_("Start AutoPlay"),
callback=self._context_user_autoplay,
type=AppCommandType.user,
)
self.bot.tree.add_command(self.context_user_autoplay)
@commands.hybrid_command()
@commands.guild_only()
async def autoplay(self, ctx, member: discord.Member = None):
"""Toggle autoplay for a member.
This will cause the bot to automatically play music that
the member is listening to on Spotify.
To stop it, use `[p]autoplay` without a member, or
use a player command like `[p]stop` or `[p]play`.
"""
if ctx.interaction:
await ctx.defer(ephemeral=True)
if member is None:
await self.config.guild(ctx.guild).tracked_member.set(None)
return
else:
await self.config.guild(ctx.guild).tracked_member.set(member.id)
msg = await self._prepare_autoplay(ctx.guild, ctx.author)
await ctx.send(embed=await self.pylav.construct_embed(description=msg, messageable=ctx))
async def _context_user_autoplay(
self, interaction: DISCORD_INTERACTION_TYPE, member: discord.Member
):
await interaction.response.defer(ephemeral=True)
if not interaction.guild:
await interaction.followup.send(
embed=await self.pylav.construct_embed(
description=_("This can only be used in a guild."), messageable=interaction
)
)
return
await self.config.guild(interaction.guild).tracked_member.set(member.id)
msg = await self._prepare_autoplay(interaction.guild, interaction.user)
await interaction.followup.send(
embed=await self.pylav.construct_embed(
description=msg.format(member=member.mention),
messageable=interaction,
)
)
async def _prepare_autoplay(self, guild, author) -> str:
player: Player = self.bot.pylav.get_player(guild.id)
if not player and author.voice:
await self.bot.pylav.connect_player(author, author.voice.channel)
player: Player = self.bot.pylav.get_player(guild.id)
elif not player:
return _(
"I am not in a voice channel. Please connect to a voice channel and then use this command."
)
elif player and player.channel.id != author.voice.channel.id:
await player.move_to(author, author.voice.channel)
if player.is_playing or player.paused:
await player.stop(author)
if player.queue.size:
player.queue.clear()
return _(
"I'll now play whatever {member} is listening to.\n"
"To stop autoplay, use a player command like `stop`"
)
@commands.Cog.listener()
async def on_presence_update(
self, member_before: discord.Member, member_after: discord.Member
):
if await self._member_checks(member_after):
return
player: Player = self.bot.lavalink.get_player(member_after.guild.id)
if player is None:
log.verbose("No player found.")
return
current_activity = self._get_spotify_activity(member_after)
past_activity = self._get_spotify_activity(member_before)
if not current_activity:
# Member is no longer listening to Spotify.
autoplaying = await self.config.guild(member_after.guild).autoplaying()
if autoplaying and player.is_playing:
await player.set_pause(True, member_after)
await self.config.guild(member_after.guild).paused_track.set(
past_activity.track_id
)
return
log.verbose(f"Presence update detected. {current_activity.track_url}")
if past_activity and past_activity.track_id == current_activity.track_id:
# Same track, no need to do anything.
return
if current_activity.track_id == await self.config.guild(member_after.guild).paused_track():
# If the track is the same as when the activity stopped, it was probably paused,
# so we'll resume it.
log.verbose("Resuming track.")
await player.set_pause(False, member_after)
return
log.verbose(f"Querying {current_activity.track_url}")
query = await Query.from_string(current_activity.track_url)
response = await self.bot.lavalink.search_query(query=query)
if response.loadType == "error":
log.verbose(f"No tracks found. Response: {response}")
return
log.verbose(f"Query successful: {response.data}")
if player.paused:
# To prevent overlapping tracks, we'll stop the player first to clear the paused track.
await player.stop(member_after)
if player.queue.size():
log.verbose("Queue is not empty, clearing.")
player.queue.clear()
log.verbose(f"Playing {response.data.info.title}.")
await player.play(
query=query,
track=response.data,
requester=member_after,
)
await self.config.guild(member_after.guild).paused_track.set(None)
await self.config.guild(member_after.guild).autoplaying.set(True)
async def _member_checks(self, member: discord.Member) -> bool:
"""Check if the member is valid for autoplay."""
return (
member.id != tracked_member
if (tracked_member := await self.config.guild(member.guild).tracked_member())
else True
)
@staticmethod
def _get_spotify_activity(member: discord.Member) -> Optional[discord.Spotify]:
"""Get the Spotify activity of a member."""
activity = next(
(
activity
for activity in member.activities
if activity.type == discord.ActivityType.listening
and hasattr(activity, "track_id")
and hasattr(activity, "track_url")
),
None,
)
return activity if activity and isinstance(activity, discord.Spotify) else None
@commands.Cog.listener()
async def on_command(self, ctx: commands.Context):
"""Stop autoplay when a player command is used."""
log.verbose(f"Command {ctx.command.name}, {ctx.command.qualified_name} used.")
player_commands = [
"play",
"skip",
"stop",
"playlist play",
"dc",
"prev",
"repeat",
"shuffle",
"pause",
"resume",
]
if ctx.command.name in player_commands and ctx.guild:
await self._stop_autoplay(ctx.guild)
@commands.Cog.listener()
async def on_interaction(self, interaction: discord.Interaction):
"""Stop autoplay when a player interaction is used."""
if interaction.type != discord.InteractionType.application_command:
return
log.verbose(
f"Interaction {interaction.type}, "
f"{interaction.command.name if interaction.command else None} used."
)
player_commands = [
"play",
"skip",
"stop",
"playlist play",
"dc",
"prev",
"repeat",
"shuffle",
"pause",
"resume",
]
if interaction.command.name in player_commands and interaction.guild:
await self._stop_autoplay(interaction.guild)
@commands.Cog.listener()
async def on_pylav_player_disconnected_event(self, event: PlayerDisconnectedEvent):
"""Stop autoplay when the player is disconnected."""
guild = event.player.channel.guild
log.verbose(f"Player in {guild} disconnected.")
if await self.config.guild(guild).autoplaying():
await self.pylav.player_state_db_manager.delete_player(guild.id)
await self._stop_autoplay(guild)
async def _stop_autoplay(self, guild: discord.Guild):
if not await self.config.guild(guild).autoplaying():
return
log.verbose("Stopping autoplay.")
await self.config.guild(guild).autoplaying.set(False)
await self.config.guild(guild).tracked_member.set(None)
await self.config.guild(guild).paused_track.set(None)