import os import re import logging import asyncio import discord from copy import copy from typing import Optional from yt_dlp import YoutubeDL from yt_dlp.utils import YoutubeDLError from redbot.core import commands, app_commands from redbot.core.bot import Red, Config from redbot.core.commands import Cog from redbot.cogs.audio.core import Audio from redbot.cogs.audio.utils import PlaylistScope from redbot.cogs.audio.converters import PlaylistConverter, ScopeParser from redbot.cogs.audio.apis.playlist_interface import get_all_playlist log = logging.getLogger("red.crab-cogs.audioslash") LANGUAGE = "en" EXTRACT_CONFIG = { "extract_flat": True, "outtmpl": "%(title).85s.mp3", "extractor_args": {"youtube": {"lang": [LANGUAGE]}}, } DOWNLOAD_CONFIG = { "extract_audio": True, "format": "bestaudio", "outtmpl": "%(title).85s.mp3", "extractor_args": {"youtube": {"lang": [LANGUAGE]}}, } DOWNLOAD_FOLDER = "backup" YOUTUBE_LINK_PATTERN = re.compile(r"(https?://)?(www\.)?(youtube.com/watch\?v=|youtu.be/)([\w\-]+)") MAX_VIDEO_LENGTH = 600 MAX_OPTIONS = 25 MAX_OPTION_SIZE = 100 async def extract_info(ydl: YoutubeDL, url: str) -> dict: return await asyncio.to_thread(ydl.extract_info, url, False) # noqa async def download_video(ydl: YoutubeDL, url: str) -> dict: return await asyncio.to_thread(ydl.extract_info, url) # noqa def format_youtube(res: dict) -> str: if res.get("duration", None): m, s = divmod(int(res['duration']), 60) name = f"({m}:{s:02d}) {res['title']}" else: name = f"(🔴LIVE) {res['title']}" author = f" — {res['channel']}" if len(author) > MAX_OPTION_SIZE // 2: author = author[:MAX_OPTION_SIZE//2 - 3] + "..." if len(name) + len(author) > MAX_OPTION_SIZE: return name[:MAX_OPTION_SIZE - len(author) - 3] + "..." + author else: return name + author class AudioSlash(Cog): """Audio cog commands in the form of slash commands, with YouTube and playlist autocomplete.""" def __init__(self, bot: Red, *args, **kwargs): super().__init__(*args, **kwargs) self.bot = bot self.config = Config.get_conf(self, identifier=77241349) self.config.register_guild(**{"backup_mode": False}) async def get_audio_cog(self, inter: discord.Interaction) -> Optional[Audio]: cog: Optional[Audio] = self.bot.get_cog("Audio") if cog: return cog await inter.response.send_message("Audio cog not loaded! Contact the bot owner for more information.", ephemeral=True) async def get_context(self, inter: discord.Interaction, cog: Audio) -> commands.Context: ctx: commands.Context = await self.bot.get_context(inter) # noqa ctx.command.cog = cog return ctx async def can_run_command(self, ctx: commands.Context, command_name: str) -> bool: prefix = await self.bot.get_prefix(ctx.message) prefix = prefix[0] if isinstance(prefix, list) else prefix fake_message = copy(ctx.message) fake_message.content = prefix + command_name command = ctx.bot.get_command(command_name) fake_context: commands.Context = await ctx.bot.get_context(fake_message) # noqa try: can = await command.can_run(fake_context, check_all_parents=True, change_permission_state=False) except commands.CommandError: can = False if not can: await ctx.send("You do not have permission to do this.", ephemeral=True) return can @app_commands.command() @app_commands.guild_only @app_commands.describe(search="Type here to get suggestions, or send anything to get a best match.", when="You can choose when this track will play in the queue.") @app_commands.choices(when=[app_commands.Choice(name="Add to the end of the queue.", value="end"), app_commands.Choice(name="Play after the current song.", value="next"), app_commands.Choice(name="Start playing immediately.", value="now")]) async def play(self, inter: discord.Interaction, search: str, when: Optional[str]): """Search a YouTube video to play in voicechat.""" if not (audio := await self.get_audio_cog(inter)): return ctx = await self.get_context(inter, audio) search = search.strip() if await self.config.guild(ctx.guild).backup_mode(): if not audio.local_folder_current_path: await ctx.send("Connect bot to a voice channel first") return if not search.startswith(DOWNLOAD_FOLDER + "/"): if match := YOUTUBE_LINK_PATTERN.match(search): search = match.group(0) else: search = "ytsearch1:" + search (audio.local_folder_current_path / DOWNLOAD_FOLDER).mkdir(parents=True, exist_ok=True) ydl = YoutubeDL(EXTRACT_CONFIG) video_info = await extract_info(ydl, search) if video_info.get("entries", None): video_info = video_info["entries"][0] if "duration" not in video_info or video_info["duration"] > MAX_VIDEO_LENGTH: await ctx.send("Video too long or invalid!") return filename = ydl.prepare_filename(video_info) if not os.path.exists(filename): await ctx.send(f"Downloading `{filename}` ...") ydl = YoutubeDL(DOWNLOAD_CONFIG) os.chdir(audio.local_folder_current_path / DOWNLOAD_FOLDER) await download_video(ydl, search) search = DOWNLOAD_FOLDER + "/" + filename if when in ("next", "now"): if not await self.can_run_command(ctx, "bumpplay"): return await audio.command_bumpplay(ctx, when == "now", query=search) else: if not await self.can_run_command(ctx, "play"): return await audio.command_play(ctx, query=search) @app_commands.command() @app_commands.guild_only async def pause(self, inter: discord.Interaction): """Pauses or resumes the music in voicechat.""" if not (audio := await self.get_audio_cog(inter)): return ctx = await self.get_context(inter, audio) if not await self.can_run_command(ctx, "pause"): return await audio.command_pause(ctx) @app_commands.command() @app_commands.guild_only async def stop(self, inter: discord.Interaction): """Stops playing any music entirely.""" if not (audio := await self.get_audio_cog(inter)): return ctx = await self.get_context(inter, audio) if not await self.can_run_command(ctx, "stop"): return await audio.command_stop(ctx) @app_commands.command() @app_commands.guild_only @app_commands.describe(position="Will skip to this track in the queue.") async def skip(self, inter: discord.Interaction, position: Optional[app_commands.Range[int, 1, 1000]]): """Skips a number of tracks in the music queue.""" if not (audio := await self.get_audio_cog(inter)): return ctx = await self.get_context(inter, audio) if not await self.can_run_command(ctx, "skip"): return await audio.command_skip(ctx, position) @app_commands.command() @app_commands.guild_only async def queue(self, inter: discord.Interaction): """Show what's currently playing.""" if not (audio := await self.get_audio_cog(inter)): return ctx = await self.get_context(inter, audio) if not await self.can_run_command(ctx, "queue"): return await audio.command_queue(ctx) toggle = [app_commands.Choice(name="Enabled", value="1"), app_commands.Choice(name="Disabled", value="0")] @app_commands.command() @app_commands.guild_only @app_commands.describe(volume="New volume value between 1 and 150.") async def volume(self, inter: discord.Interaction, volume: app_commands.Range[int, 1, 150]): """Sets the music volume in voicechat.""" if not (audio := await self.get_audio_cog(inter)): return ctx = await self.get_context(inter, audio) if not await self.can_run_command(ctx, "volume"): return await audio.command_volume(ctx, volume) @app_commands.command() @app_commands.guild_only @app_commands.describe(toggle="Enable or disable track shuffling.") @app_commands.choices(toggle=toggle) async def shuffle(self, inter: discord.Interaction, toggle: str): """Sets whether the playlist should be shuffled.""" if not (audio := await self.get_audio_cog(inter)): return ctx = await self.get_context(inter, audio) value = bool(int(toggle)) if value != await audio.config.guild(ctx.guild).shuffle(): if not await self.can_run_command(ctx, "shuffle"): return await audio.command_shuffle(ctx) else: embed = discord.Embed(title="Setting Unchanged", description="Shuffle tracks: " + ("Enabled" if value else "Disabled")) await audio.send_embed_msg(ctx, embed=embed) @app_commands.command() @app_commands.guild_only @app_commands.describe(toggle="Enable or disable track repeating.") @app_commands.choices(toggle=toggle) async def repeat(self, inter: discord.Interaction, toggle: str): """Sets whether the playlist should repeat.""" if not (audio := await self.get_audio_cog(inter)): return ctx = await self.get_context(inter, audio) value = bool(int(toggle)) if value != await audio.config.guild(ctx.guild).repeat(): if not await self.can_run_command(ctx, "repeat"): return await audio.command_repeat(ctx) else: embed = discord.Embed(title="Setting Unchanged", description="Repeat tracks: " + ("Enabled" if value else "Disabled")) await audio.send_embed_msg(ctx, embed=embed) playlist = app_commands.Group(name="playlist", description="Playlist commands", guild_only=True) playlist_scopes = [app_commands.Choice(name="Personal", value="USERPLAYLIST"), app_commands.Choice(name="Server", value="GUILDPLAYLIST"), app_commands.Choice(name="Global", value="GLOBALPLAYLIST")] @staticmethod def get_scope_data(scope: str, ctx: commands.Context) -> ScopeParser: return [scope, ctx.author, ctx.guild, False] # noqa @playlist.command(name="play") @app_commands.describe(playlist="The name of the playlist.", shuffle="Whether to shuffle the playlist before sending it.") async def playlist_play(self, inter: discord.Interaction, playlist: str, shuffle: Optional[bool]): """Starts an existing playlist in voicechat.""" if not (audio := await self.get_audio_cog(inter)): return ctx = await self.get_context(inter, audio) if not await self.can_run_command(ctx, "playlist play"): return enabled = False if shuffle is not None and shuffle != await audio.config.guild(ctx.guild).shuffle(): dj_enabled = audio._dj_status_cache.setdefault(ctx.guild.id, await audio.config.guild(ctx.guild).dj_enabled()) can_skip = await audio._can_instaskip(ctx, ctx.author) if not dj_enabled or can_skip and await self.can_run_command(ctx, "shuffle"): await audio.config.guild(ctx.guild).shuffle.set(shuffle) enabled = shuffle match = await PlaylistConverter().convert(ctx, playlist) await audio.command_playlist_start(ctx, match) if enabled: await audio.config.guild(ctx.guild).shuffle.set(False) @playlist.command(name="create") @app_commands.describe(name="The name of the new playlist. Cannot contain spaces.", make_from_queue="This will fill the playlist with the current queue.", scope="Who this playlist will belong to. You need permissions for Server and Global.") @app_commands.choices(scope=playlist_scopes) async def playlist_create(self, inter: discord.Interaction, name: str, make_from_queue: Optional[bool], scope: Optional[str]): """Creates a new playlist.""" if not (audio := await self.get_audio_cog(inter)): return name = name.replace(" ", "-") ctx = await self.get_context(inter, audio) if make_from_queue: if not await self.can_run_command(ctx, "playlist queue"): return await audio.command_playlist_queue(ctx, name, scope_data=self.get_scope_data(scope, ctx)) else: if not await self.can_run_command(ctx, "playlist create"): return await audio.command_playlist_create(ctx, name, scope_data=self.get_scope_data(scope, ctx)) @playlist.command(name="add") @app_commands.describe(playlist="The name of the playlist.", track="The track to add to the playlist.", scope="You may specify who this playlist belongs to.") @app_commands.choices(scope=playlist_scopes) async def playlist_add(self, inter: discord.Interaction, playlist: str, track: str, scope: Optional[str]): """Adds a track to an existing playlist.""" if not (audio := await self.get_audio_cog(inter)): return ctx = await self.get_context(inter, audio) match = await PlaylistConverter().convert(ctx, playlist) if not await self.can_run_command(ctx, "playlist append"): return await audio.command_playlist_append(ctx, match, track, scope_data=self.get_scope_data(scope, ctx)) @playlist.command(name="remove") @app_commands.describe(playlist="The name of the playlist.", track="The link to the track to remove from the playlist.", scope="You may specify who this playlist belongs to.") @app_commands.choices(scope=playlist_scopes) async def playlist_remove(self, inter: discord.Interaction, playlist: str, track: str, scope: Optional[str]): """Removes a track from an existing playlist.""" if not (audio := await self.get_audio_cog(inter)): return ctx = await self.get_context(inter, audio) match = await PlaylistConverter().convert(ctx, playlist) if not await self.can_run_command(ctx, "playlist remove"): return await audio.command_playlist_remove(ctx, match, track, scope_data=self.get_scope_data(scope, ctx)) @playlist.command(name="info") @app_commands.describe(playlist="The name of the playlist to show.", scope="You may specify who this playlist belongs to.") @app_commands.choices(scope=playlist_scopes) async def playlist_info(self, inter: discord.Interaction, playlist: str, scope: Optional[str]): """Show information about a playlist.""" if not (audio := await self.get_audio_cog(inter)): return ctx = await self.get_context(inter, audio) match = await PlaylistConverter().convert(ctx, playlist) if not await self.can_run_command(ctx, "playlist info"): return await audio.command_playlist_info(ctx, match, scope_data=self.get_scope_data(scope, ctx)) @playlist.command(name="delete") @app_commands.describe(playlist="The name of the playlist to delete.", scope="You may specify who this playlist belongs to.") @app_commands.choices(scope=playlist_scopes) async def playlist_delete(self, inter: discord.Interaction, playlist: str, scope: Optional[str]): """Deletes a playlist entirely.""" if not (audio := await self.get_audio_cog(inter)): return ctx = await self.get_context(inter, audio) match = await PlaylistConverter().convert(ctx, playlist) if not await self.can_run_command(ctx, "playlist delete"): return await audio.command_playlist_delete(ctx, match, scope_data=self.get_scope_data(scope, ctx)) @play.autocomplete("search") @playlist_add.autocomplete("track") async def youtube_autocomplete(self, inter: discord.Interaction, current: str): try: return await self._youtube_autocomplete(inter, current) except Exception: # noqa, reason: user-facing error log.exception("YouTube autocomplete", stack_info=True) return [app_commands.Choice(name="Autocomplete error. Please contact the bot owner.", value=".")] async def _youtube_autocomplete(self, inter: discord.Interaction, current: str): lst = [] if await self.config.guild(inter.guild).backup_mode(): audio = await self.get_audio_cog(inter) if not audio or not audio.local_folder_current_path: return lst folder = (audio.local_folder_current_path / DOWNLOAD_FOLDER) folder.mkdir(parents=True, exist_ok=True) files = [app_commands.Choice(name=filename, value=f"{DOWNLOAD_FOLDER}/{filename}"[:MAX_OPTION_SIZE]) for filename in os.listdir(folder)] if current: lst += [file for file in files if file.name.lower().startswith(current.lower())] lst += [file for file in files if current.lower() in file.name.lower() and not file.name.lower().startswith(current.lower())] else: lst += files if not current or len(current) < 3 or len(lst) >= MAX_OPTIONS: return lst[:MAX_OPTIONS] try: ydl = YoutubeDL(EXTRACT_CONFIG) results = await extract_info(ydl, f"ytsearch{MAX_OPTIONS - len(lst)}:{current}") lst += [app_commands.Choice(name=format_youtube(res), value=res["url"]) for res in results["entries"]] except YoutubeDLError: log.exception("Retrieving youtube results", stack_info=True) return lst[:MAX_OPTIONS] @playlist_play.autocomplete("playlist") @playlist_add.autocomplete("playlist") @playlist_remove.autocomplete("playlist") @playlist_info.autocomplete("playlist") @playlist_delete.autocomplete("playlist") async def playlist_autocomplete(self, inter: discord.Interaction, current: str): try: return await self._playlist_autocomplete(inter, current) except Exception: # noqa, reason: user-facing error log.exception("Playlist autocomplete") return [app_commands.Choice(name="Autocomplete error. Please contact the bot owner.", value=".")] async def _playlist_autocomplete(self, inter: discord.Interaction, current: str): audio: Optional[Audio] = self.bot.get_cog("Audio") if not audio or not audio.playlist_api: return [] global_matches = await get_all_playlist( PlaylistScope.GLOBAL.value, self.bot, audio.playlist_api, inter.guild, inter.user ) guild_matches = await get_all_playlist( PlaylistScope.GUILD.value, self.bot, audio.playlist_api, inter.guild, inter.user ) user_matches = await get_all_playlist( PlaylistScope.USER.value, self.bot, audio.playlist_api, inter.guild, inter.user ) playlists = [*user_matches, *guild_matches, *global_matches] if current: results = [pl.name for pl in playlists if pl.name.lower().startswith(current.lower())] results += [pl.name for pl in playlists if current.lower() in pl.name.lower() and not pl.name.lower().startswith(current.lower())] else: results = [pl.name for pl in playlists] return [app_commands.Choice(name=pl, value=pl) for pl in results][:MAX_OPTIONS] @commands.command(name="audioslashbackupmode", hidden=True) @commands.is_owner() async def audioslashbackupmode(self, ctx: commands.Context, value: Optional[bool]): """Not intended for public use. If audio stopped working, enabling this will download YouTube tracks locally.""" if value is None: value = await self.config.guild(ctx.guild).backup_mode() else: await self.config.guild(ctx.guild).backup_mode.set(value) await ctx.reply(f"Backup mode: `{value}`", mention_author=False)