From 7fc83b2022a73c8c43a6f210f50aa938c709c0f0 Mon Sep 17 00:00:00 2001 From: Valerie Date: Fri, 13 Jun 2025 19:15:32 -0400 Subject: [PATCH] Add Autoroom cog --- autoroom/README.md | 63 ++ autoroom/__init__.py | 18 + autoroom/abc.py | 103 ++++ autoroom/autoroom.py | 1064 +++++++++++++++++++++++++++++++++ autoroom/c_autoroom.py | 517 ++++++++++++++++ autoroom/c_autoroomset.py | 1008 +++++++++++++++++++++++++++++++ autoroom/info.json | 29 + autoroom/pcx_lib.py | 252 ++++++++ autoroom/pcx_template.py | 75 +++ autoroom/pcx_template_test.py | 930 ++++++++++++++++++++++++++++ 10 files changed, 4059 insertions(+) create mode 100644 autoroom/README.md create mode 100644 autoroom/__init__.py create mode 100644 autoroom/abc.py create mode 100644 autoroom/autoroom.py create mode 100644 autoroom/c_autoroom.py create mode 100644 autoroom/c_autoroomset.py create mode 100644 autoroom/info.json create mode 100644 autoroom/pcx_lib.py create mode 100644 autoroom/pcx_template.py create mode 100644 autoroom/pcx_template_test.py diff --git a/autoroom/README.md b/autoroom/README.md new file mode 100644 index 0000000..c7c3cd5 --- /dev/null +++ b/autoroom/README.md @@ -0,0 +1,63 @@ +# AutoRoom + +This cog facilitates automatic voice channel creation. When a member joins an AutoRoom Source (voice channel), this cog will move them to a brand new AutoRoom that they have control over. Once everyone leaves the AutoRoom, it is automatically deleted. + +## For Members - `[p]autoroom` + +Once you join an AutoRoom Source, you will be moved into a brand new AutoRoom (voice channel). This is your AutoRoom, you can do whatever you want with it. Use the `[p]autoroom` command to check out all the different things you can do. Some examples include: + +- Check its current settings with `[p]autoroom settings` +- Make it a public AutoRoom with `[p]autoroom public` (everyone can see and join your AutoRoom) +- Make it a locked AutoRoom with `[p]autoroom locked` (everyone can see, but nobody can join your AutoRoom) +- Make it a private AutoRoom with `[p]autoroom private` (nobody can see and join your AutoRoom) +- Kick/ban users (or entire roles) from your AutoRoom with `[p]autoroom deny` (useful for public AutoRooms) +- Allow users (or roles) into your AutoRoom with `[p]autoroom allow` (useful for locked and private AutoRooms) +- You can manage the messages in your AutoRooms associated text channel + +When everyone leaves your AutoRoom, it will automatically be deleted. + +## For Server Admins - `[p]autoroomset` + +Start by having a voice channel, and a category (the voice channel does not need to be in the category, but it can be if you want). The voice channel will be the "AutoRoom Source", where your members will join and then be moved into their own AutoRoom (voice channel). These AutoRooms will be created in the category that you choose. + +``` +[p]autoroomset create +``` + +This command will guide you through setting up an AutoRoom Source by asking some questions. If you get a warning about missing permissions, take a look at `[p]autoroomset permissions`, grant the missing permissions, and then run the command again. Otherwise, answer the questions, and you'll have a new AutoRoom Source. Give it a try by joining it: if all goes well, you will be moved to a new AutoRoom, where you can do all of the `[p]autoroom` commands. + +There are some additional configuration options for AutoRoom Sources that you can set by using `[p]autoroomset modify`. You can also check out `[p]autoroomset access`, which controls whether admins (default yes) or moderators (default no) can see and join private AutoRooms. For an overview of all of your settings, use `[p]autoroomset settings`. + +#### Member Roles and Hidden Sources + +The AutoRoom Source will behave in a certain way, depending on what permissions it has. If the `@everyone` role is denied the connect permission (and optionally the view channel permission), the AutoRoom Source and AutoRooms will behave in a member role style. Any roles that are allowed to connect on the AutoRoom Source will be considered the "member roles". Only members of the server with one or more of these roles will be able to utilize these AutoRooms and their Sources. + +For hidden AutoRoom Sources, you can deny the view channel permission for the `@everyone` role, but still allow the connect permission. The members won't be able to see the AutoRoom Source, but any AutoRooms it creates they will be able to see (depending on if it isn't a private AutoRoom). Ideally you would have a role that is allowed to see the AutoRoom Source, that role is allowed to create AutoRooms, but then can invite anyone to their AutoRooms. + +You can of course do both of these, where the `@everyone` role is denied view channel and connect, and your member role is denied the view channel permission, but is allowed the connect permission. Non-members will never see the AutoRoom Source and AutoRooms, and the members will not see the AutoRoom Source, but will see AutoRooms. + +#### Templates + +The default AutoRoom name format is based on the AutoRoom Owners username. Using `[p]autoroomset modify name`, you can choose a default format, or you can set a custom format. For custom formats, you have a couple of variables you can use in your template: + +- `{{username}}` - The AutoRoom Owners username +- `{{game}}` - The AutoRoom Owners game they were playing when the AutoRoom was created, or blank if they were not plating a game. +- `{{dupenum}}` - A number, starting at 1, that will increment when the generated AutoRoom name would be a duplicate of an already existing AutoRoom. + +You are also able to use `if`/`elif`/`else`/`endif` statements in order to conditionally show or hide parts of your template. Here are the templates for the two default formats included: + +- `username` - `{{username}}'s Room{% if dupenum > 1 %} ({{dupenum}}){% endif %}` +- `game` - `{{game}}{% if not game %}{{username}}'s Room{% endif %}{% if dupenum > 1 %} ({{dupenum}}){% endif %}` + +The username format is pretty self-explanatory: put the username, along with "'s Room" after it. For the game format, we put the game name, and then if there isn't a game, show `{{username}}'s Room` instead. Remember, if no game is being played, `{{game}}` won't return anything. + +The last bit of both of these is `{% if dupenum > 1 %} ({{dupenum}}){% endif %}`. With this, we are checking if `dupenum` is greater than 1. If it is, we display ` ({{dupenum}})` at the end of our room name. This way, only duplicate named rooms will ever get a ` (2)`, ` (3)`, etc. appended to them, no ` (1)` will be shown. + +Finally, you can use filters in order to format your variables. They are specified by adding a pipe, and then the name of the filter. The following are the currently implemented filters: + +- `{{username | lower}}` - Will lowercase the variable, the username in this example +- `{{game | upper}}` - Will uppercase the variable, the game name in this example + +This template format can also be used for the message hint sent to new AutoRooms built in text channels. For that, you can also use this variable: + +- `{{mention}}` - The AutoRoom Owners mention diff --git a/autoroom/__init__.py b/autoroom/__init__.py new file mode 100644 index 0000000..6ecb890 --- /dev/null +++ b/autoroom/__init__.py @@ -0,0 +1,18 @@ +"""Package for AutoRoom cog.""" + +import json +from pathlib import Path + +from redbot.core.bot import Red + +from .autoroom import AutoRoom + +with Path(__file__).parent.joinpath("info.json").open() as fp: + __red_end_user_data_statement__ = json.load(fp)["end_user_data_statement"] + + +async def setup(bot: Red) -> None: + """Load AutoRoom cog.""" + cog = AutoRoom(bot) + await cog.initialize() + await bot.add_cog(cog) diff --git a/autoroom/abc.py b/autoroom/abc.py new file mode 100644 index 0000000..49b694d --- /dev/null +++ b/autoroom/abc.py @@ -0,0 +1,103 @@ +"""ABC for the AutoRoom Cog.""" + +from abc import ABC, abstractmethod +from typing import Any, ClassVar + +import discord +from discord.ext.commands import CooldownMapping +from redbot.core import Config +from redbot.core.bot import Red + +from autoroom.pcx_template import Template + + +class MixinMeta(ABC): + """Base class for well-behaved type hint detection with composite class. + + Basically, to keep developers sane when not all attributes are defined in each mixin. + """ + + bot: Red + config: Config + template: Template + bucket_autoroom_name: CooldownMapping + bucket_autoroom_owner_claim: CooldownMapping + extra_channel_name_change_delay: int + + perms_legacy_text_allow: ClassVar[dict[str, bool]] + perms_legacy_text_reset: ClassVar[dict[str, None]] + perms_autoroom_owner_legacy_text: ClassVar[dict[str, bool]] + + @staticmethod + @abstractmethod + def get_template_data(member: discord.Member | discord.User) -> dict[str, str]: + raise NotImplementedError + + @abstractmethod + async def format_template_room_name( + self, template: str, data: dict, num: int = 1 + ) -> str: + raise NotImplementedError + + @abstractmethod + async def is_admin_or_admin_role(self, who: discord.Role | discord.Member) -> bool: + raise NotImplementedError + + @abstractmethod + async def is_mod_or_mod_role(self, who: discord.Role | discord.Member) -> bool: + raise NotImplementedError + + @abstractmethod + def check_perms_source_dest( + self, + autoroom_source: discord.VoiceChannel, + category_dest: discord.CategoryChannel, + *, + with_manage_roles_guild: bool = False, + with_legacy_text_channel: bool = False, + with_optional_clone_perms: bool = False, + detailed: bool = False, + ) -> tuple[bool, bool, str | None]: + raise NotImplementedError + + @abstractmethod + async def get_all_autoroom_source_configs( + self, guild: discord.Guild + ) -> dict[int, dict[str, Any]]: + raise NotImplementedError + + @abstractmethod + async def get_autoroom_source_config( + self, autoroom_source: discord.VoiceChannel | discord.abc.GuildChannel | None + ) -> dict[str, Any] | None: + raise NotImplementedError + + @abstractmethod + async def get_autoroom_info( + self, autoroom: discord.VoiceChannel | None + ) -> dict[str, Any] | None: + raise NotImplementedError + + @abstractmethod + async def get_autoroom_legacy_text_channel( + self, autoroom: discord.VoiceChannel | int | None + ) -> discord.TextChannel | None: + raise NotImplementedError + + @staticmethod + @abstractmethod + def check_if_member_or_role_allowed( + channel: discord.VoiceChannel, + member_or_role: discord.Member | discord.Role, + ) -> bool: + raise NotImplementedError + + @abstractmethod + def get_member_roles( + self, autoroom_source: discord.VoiceChannel + ) -> list[discord.Role]: + raise NotImplementedError + + @abstractmethod + async def get_bot_roles(self, guild: discord.Guild) -> list[discord.Role]: + raise NotImplementedError diff --git a/autoroom/autoroom.py b/autoroom/autoroom.py new file mode 100644 index 0000000..4d6dea7 --- /dev/null +++ b/autoroom/autoroom.py @@ -0,0 +1,1064 @@ +"""AutoRoom cog for Red-DiscordBot by PhasecoreX.""" + +import random +from abc import ABC +from contextlib import suppress +from datetime import UTC, datetime +from typing import Any, ClassVar + +import discord +from redbot.core import Config, commands +from redbot.core.bot import Red +from redbot.core.utils.chat_formatting import humanize_timedelta + +from .c_autoroom import AutoRoomCommands +from .c_autoroomset import AutoRoomSetCommands, channel_name_template +from .pcx_lib import Perms, SettingDisplay +from .pcx_template import Template + + +class CompositeMetaClass(type(commands.Cog), type(ABC)): + """Allows the metaclass used for proper type detection to coexist with discord.py's metaclass.""" + + +class AutoRoom( + AutoRoomCommands, + AutoRoomSetCommands, + commands.Cog, + metaclass=CompositeMetaClass, +): + """Automatic voice channel management. + + This cog facilitates automatic voice channel creation. + When a member joins an AutoRoom Source (voice channel), + this cog will move them to a brand new AutoRoom that they have control over. + Once everyone leaves the AutoRoom, it is automatically deleted. + + For a quick rundown on how to get started with this cog, + check out [the readme](https://github.com/PhasecoreX/PCXCogs/tree/master/autoroom/README.md) + """ + + __author__ = "PhasecoreX" + __version__ = "4.0.6" + + default_global_settings: ClassVar[dict[str, int]] = {"schema_version": 0} + default_guild_settings: ClassVar[dict[str, bool | list[int]]] = { + "admin_access": True, + "mod_access": False, + "bot_access": [], + } + default_autoroom_source_settings: ClassVar[dict[str, int | str | None]] = { + "dest_category_id": None, + "room_type": "public", + "legacy_text_channel": False, + "text_channel_hint": None, + "text_channel_topic": "", + "channel_name_type": "username", + "channel_name_format": "", + "perm_owner_manage_channels": True, + "perm_send_messages": True, + } + default_channel_settings: ClassVar[dict[str, int | list[int] | None]] = { + "source_channel": None, + "owner": None, + "associated_text_channel": None, + "denied": [], + } + extra_channel_name_change_delay = 4 + + perms_bot_source: ClassVar[dict[str, bool]] = { + "view_channel": True, + "connect": True, + "move_members": True, + } + perms_bot_dest: ClassVar[dict[str, bool]] = { + "view_channel": True, + "connect": True, + "send_messages": True, + "manage_channels": True, + "manage_messages": True, + "move_members": True, + } + + perms_legacy_text: ClassVar[list[str]] = ["read_message_history", "read_messages"] + perms_legacy_text_allow: ClassVar[dict[str, bool]] = dict.fromkeys( + perms_legacy_text, True + ) + perms_legacy_text_deny: ClassVar[dict[str, bool]] = dict.fromkeys( + perms_legacy_text, False + ) + perms_legacy_text_reset: ClassVar[dict[str, None]] = dict.fromkeys( + perms_legacy_text, None + ) + perms_autoroom_owner_legacy_text: ClassVar[dict[str, bool]] = { + **perms_legacy_text_allow, + "manage_channels": True, + "manage_messages": True, + } + perms_bot_dest_legacy_text = perms_autoroom_owner_legacy_text + + def __init__(self, bot: Red) -> None: + """Set up the cog.""" + super().__init__() + self.bot = bot + self.config = Config.get_conf( + self, identifier=1224364860, force_registration=True + ) + self.config.register_global(**self.default_global_settings) + self.config.register_guild(**self.default_guild_settings) + self.config.init_custom("AUTOROOM_SOURCE", 2) + self.config.register_custom( + "AUTOROOM_SOURCE", **self.default_autoroom_source_settings + ) + self.config.register_channel(**self.default_channel_settings) + self.template = Template() + self.bucket_autoroom_create = commands.CooldownMapping.from_cooldown( + 2, 60, lambda member: member + ) + self.bucket_autoroom_create_warn = commands.CooldownMapping.from_cooldown( + 1, 3600, lambda member: member + ) + self.bucket_autoroom_name = commands.CooldownMapping.from_cooldown( + 2, 600 + self.extra_channel_name_change_delay, lambda channel: channel + ) + self.bucket_autoroom_owner_claim = commands.CooldownMapping.from_cooldown( + 1, 120, lambda channel: channel + ) + + # + # Red methods + # + + def format_help_for_context(self, ctx: commands.Context) -> str: + """Show version in help.""" + pre_processed = super().format_help_for_context(ctx) + return f"{pre_processed}\n\nCog Version: {self.__version__}" + + async def red_delete_data_for_user(self, *, _requester: str, _user_id: int) -> None: + """Nothing to delete.""" + return + + # + # Initialization methods + # + + async def initialize(self) -> None: + """Perform setup actions before loading cog.""" + await self._migrate_config() + self.bot.loop.create_task(self._cleanup_autorooms()) + + async def _migrate_config(self) -> None: + """Perform some configuration migrations.""" + schema_version = await self.config.schema_version() + + if schema_version < 1: + # Migrate private -> room_type + guild_dict = await self.config.all_guilds() + for guild_id in guild_dict: + avcs = await self.config.guild_from_id(guild_id).get_raw( + "auto_voice_channels", default={} + ) + if avcs: + for avc_settings in avcs.values(): + if "private" in avc_settings: + avc_settings["room_type"] = ( + "private" if avc_settings["private"] else "public" + ) + del avc_settings["private"] + await self.config.guild_from_id(guild_id).set_raw( + "auto_voice_channels", value=avcs + ) + await self.config.schema_version.set(1) + + if schema_version < 2: # noqa: PLR2004 + # Migrate member_role -> per auto_voice_channel member_roles + guild_dict = await self.config.all_guilds() + for guild_id in guild_dict: + await self.config.guild_from_id(guild_id).clear_raw("member_role") + await self.config.schema_version.set(2) + + if schema_version < 4: # noqa: PLR2004 + # Migrate to AUTOROOM_SOURCE custom config group + guild_dict = await self.config.all_guilds() + for guild_id in guild_dict: + avcs = await self.config.guild_from_id(guild_id).get_raw( + "auto_voice_channels", default={} + ) + for avc_id, avc_settings in avcs.items(): + new_dict = { + "dest_category_id": avc_settings["dest_category_id"], + "room_type": avc_settings["room_type"], + } + # The rest of these were optional + if "channel_name_type" in avc_settings: + new_dict["channel_name_type"] = avc_settings[ + "channel_name_type" + ] + await self.config.custom("AUTOROOM_SOURCE", guild_id, avc_id).set( + new_dict + ) + await self.config.guild_from_id(guild_id).clear_raw( + "auto_voice_channels" + ) + await self.config.schema_version.set(4) + + if schema_version < 5: # noqa: PLR2004 + # Upgrade room templates + all_autoroom_sources = await self.config.custom("AUTOROOM_SOURCE").all() + for guild_id, guild_autoroom_sources in all_autoroom_sources.items(): + for ( + avc_id, + autoroom_source_config, + ) in guild_autoroom_sources.items(): + if autoroom_source_config.get("channel_name_format"): + # Change username and game template variables + new_template = ( + autoroom_source_config["channel_name_format"] + .replace("{username}", "{{username}}") + .replace("{game}", "{{game}}") + ) + if autoroom_source_config.get("increment_always"): + if "increment_format" in autoroom_source_config: + # Always show number, custom format + new_template += autoroom_source_config[ + "increment_format" + ].replace("{number}", "{{dupenum}}") + else: + # Always show number, default format + new_template += " ({{dupenum}})" + elif "increment_format" in autoroom_source_config: + # Show numbers > 1, custom format + new_template += ( + "{% if dupenum > 1 %}" + + autoroom_source_config["increment_format"].replace( + "{number}", "{{dupenum}}" + ) + + "{% endif %}" + ) + else: + # Show numbers > 1, default format + new_template += ( + "{% if dupenum > 1 %} ({{dupenum}}){% endif %}" + ) + await self.config.custom( + "AUTOROOM_SOURCE", guild_id, avc_id + ).channel_name_format.set(new_template) + await self.config.custom( + "AUTOROOM_SOURCE", guild_id, avc_id + ).clear_raw("increment_always") + await self.config.custom( + "AUTOROOM_SOURCE", guild_id, avc_id + ).clear_raw("increment_format") + await self.config.schema_version.set(5) + + if schema_version < 6: # noqa: PLR2004 + # Remove member roles + all_autoroom_sources = await self.config.custom("AUTOROOM_SOURCE").all() + for guild_id, guild_autoroom_sources in all_autoroom_sources.items(): + for avc_id in guild_autoroom_sources: + await self.config.custom( + "AUTOROOM_SOURCE", guild_id, avc_id + ).clear_raw("member_roles") + await self.config.schema_version.set(6) + + if schema_version < 7: # noqa: PLR2004 + # Remove auto text channels + guild_dict = await self.config.all_guilds() + for guild_id in guild_dict: + await self.config.guild_from_id(guild_id).clear_raw("admin_access_text") + await self.config.guild_from_id(guild_id).clear_raw("mod_access_text") + all_autoroom_sources = await self.config.custom("AUTOROOM_SOURCE").all() + for guild_id, guild_autoroom_sources in all_autoroom_sources.items(): + for avc_id in guild_autoroom_sources: + await self.config.custom( + "AUTOROOM_SOURCE", guild_id, avc_id + ).clear_raw("text_channel") + await self.config.schema_version.set(7) + + async def _cleanup_autorooms(self) -> None: + """Remove non-existent AutoRooms from the config.""" + await self.bot.wait_until_ready() + voice_channel_dict = await self.config.all_channels() + for voice_channel_id, voice_channel_settings in voice_channel_dict.items(): + voice_channel = self.bot.get_channel(voice_channel_id) + if voice_channel: + if isinstance(voice_channel, discord.VoiceChannel): + # Delete AutoRoom if it is empty + await self._process_autoroom_delete(voice_channel, None) + else: + # AutoRoom has already been deleted, clean up legacy text channel if it still exists + legacy_text_channel = await self.get_autoroom_legacy_text_channel( + voice_channel_settings["associated_text_channel"] + ) + if legacy_text_channel: + await legacy_text_channel.delete( + reason="AutoRoom: Associated voice channel deleted." + ) + await self.config.channel_from_id(voice_channel_id).clear() + + # + # Listener methods + # + + @commands.Cog.listener() + async def on_guild_channel_delete( + self, guild_channel: discord.abc.GuildChannel + ) -> None: + """Clean up config when an AutoRoom (or Source) is deleted (either by the bot or the user).""" + if not isinstance(guild_channel, discord.VoiceChannel): + return + if await self.get_autoroom_source_config(guild_channel): + # AutoRoom Source was deleted, remove configuration + await self.config.custom( + "AUTOROOM_SOURCE", str(guild_channel.guild.id), str(guild_channel.id) + ).clear() + else: + # AutoRoom was deleted, remove associated text channel if it exists + legacy_text_channel = await self.get_autoroom_legacy_text_channel( + guild_channel + ) + if legacy_text_channel: + await legacy_text_channel.delete( + reason="AutoRoom: Associated voice channel deleted." + ) + await self.config.channel(guild_channel).clear() + + @commands.Cog.listener() + async def on_voice_state_update( + self, + member: discord.Member, + leaving: discord.VoiceState, + joining: discord.VoiceState, + ) -> None: + """Do voice channel stuff when users move about channels.""" + if await self.bot.cog_disabled_in_guild(self, member.guild): + return + + if leaving.channel == joining.channel: + return + + # If user left an AutoRoom, do cleanup + if isinstance(leaving.channel, discord.VoiceChannel): + autoroom_info = await self.get_autoroom_info(leaving.channel) + if autoroom_info: + deleted = await self._process_autoroom_delete(leaving.channel, member) + if not deleted: + # AutoRoom wasn't deleted, so update text channel perms + await self._process_autoroom_legacy_text_perms(leaving.channel) + + if member.id == autoroom_info["owner"]: + # There are still users left and the AutoRoom Owner left. + # Start a countdown so that others can claim the AutoRoom. + bucket = self.bucket_autoroom_owner_claim.get_bucket( + leaving.channel + ) + if bucket: + bucket.reset() + bucket.update_rate_limit() + + if isinstance(joining.channel, discord.VoiceChannel): + # If user entered an AutoRoom Source channel, create new AutoRoom + asc = await self.get_autoroom_source_config(joining.channel) + if asc: + await self._process_autoroom_create(joining.channel, asc, member) + # If user entered an AutoRoom, allow them into the associated text channel + elif await self.get_autoroom_info(joining.channel): + await self._process_autoroom_legacy_text_perms(joining.channel) + + @commands.Cog.listener() + async def on_member_join(self, member: discord.Member) -> None: + """Check joining users against existing AutoRooms, re-adds their deny override if missing.""" + for autoroom_channel in member.guild.voice_channels: + autoroom_info = await self.get_autoroom_info(autoroom_channel) + if autoroom_info and member.id in autoroom_info["denied"]: + source_channel = member.guild.get_channel( + autoroom_info["source_channel"] + ) + asc = await self.get_autoroom_source_config(source_channel) + if not asc: + continue + perms = Perms(autoroom_channel.overwrites) + perms.update(member, asc["perms"]["deny"]) + if perms.modified: + await autoroom_channel.edit( + overwrites=perms.overwrites if perms.overwrites else {}, + reason="AutoRoom: Rejoining user, prevent deny evasion", + ) + + # + # Private methods + # + + async def _process_autoroom_create( + self, + autoroom_source: discord.VoiceChannel, + autoroom_source_config: dict[str, Any], + member: discord.Member, + ) -> None: + """Create a voice channel for a member in an AutoRoom Source channel.""" + # Check perms for guild, source, and dest + guild = autoroom_source.guild + dest_category = guild.get_channel(autoroom_source_config["dest_category_id"]) + if not isinstance(dest_category, discord.CategoryChannel): + return + required_check, optional_check, _ = self.check_perms_source_dest( + autoroom_source, dest_category + ) + if not required_check or not optional_check: + return + + # Check that user isn't spamming + bucket = self.bucket_autoroom_create.get_bucket(member) + if bucket: + retry_after = bucket.update_rate_limit() + if retry_after: + warn_bucket = self.bucket_autoroom_create_warn.get_bucket(member) + if warn_bucket: + if not warn_bucket.update_rate_limit(): + with suppress( + discord.Forbidden, + discord.NotFound, + discord.HTTPException, + ): + await member.send( + "Hello there! It looks like you're trying to make an AutoRoom." + "\n" + f"Please note that you are only allowed to make **{bucket.rate}** AutoRooms " + f"every **{humanize_timedelta(seconds=bucket.per)}**." + "\n" + f"You can try again in **{humanize_timedelta(seconds=max(retry_after, 1))}**." + ) + return + + # Generate channel name + taken_channel_names = [ + voice_channel.name for voice_channel in dest_category.voice_channels + ] + new_channel_name = await self._generate_channel_name( + autoroom_source_config, member, taken_channel_names + ) + + # Generate overwrites + perms = Perms() + dest_perms = dest_category.permissions_for(dest_category.guild.me) + source_overwrites = ( + autoroom_source.overwrites if autoroom_source.overwrites else {} + ) + member_roles = self.get_member_roles(autoroom_source) + for target, permissions in source_overwrites.items(): + # We can't put manage_roles in overwrites, so just get rid of it + permissions.update(manage_roles=None) + # Check each permission for each overwrite target to make sure the bot has it allowed in the dest category + failed_checks = {} + for name, value in permissions: + if value is not None: + permission_check_result = getattr(dest_perms, name) + if not permission_check_result: + # If the bot doesn't have the permission allowed in the dest category, just ignore it. Too bad! + failed_checks[name] = None + if failed_checks: + permissions.update(**failed_checks) + perms.overwrite(target, permissions) + if member_roles and target in member_roles: + # If we have member roles and this target is one, apply AutoRoom type permissions + perms.update(target, autoroom_source_config["perms"]["access"]) + + # Update overwrites for default role to account for AutoRoom type + if member_roles: + perms.update(guild.default_role, autoroom_source_config["perms"]["deny"]) + else: + perms.update(guild.default_role, autoroom_source_config["perms"]["access"]) + + # Bot overwrites + perms.update(guild.me, self.perms_bot_dest) + + # AutoRoom Owner overwrites + if autoroom_source_config["room_type"] != "server": + perms.update(member, autoroom_source_config["perms"]["owner"]) + + # Admin/moderator/bot overwrites + # Add bot roles to be allowed + additional_allowed_roles = await self.get_bot_roles(guild) + if await self.config.guild(guild).mod_access(): + # Add mod roles to be allowed + additional_allowed_roles += await self.bot.get_mod_roles(guild) + if await self.config.guild(guild).admin_access(): + # Add admin roles to be allowed + additional_allowed_roles += await self.bot.get_admin_roles(guild) + for role in additional_allowed_roles: + # Add all the mod/admin roles, if required + perms.update(role, autoroom_source_config["perms"]["allow"]) + + # Create new AutoRoom + new_voice_channel = await guild.create_voice_channel( + name=new_channel_name, + category=dest_category, + reason="AutoRoom: New AutoRoom needed.", + overwrites=perms.overwrites if perms.overwrites else {}, + bitrate=min(autoroom_source.bitrate, int(guild.bitrate_limit)), + user_limit=autoroom_source.user_limit, + ) + await self.config.channel(new_voice_channel).source_channel.set( + autoroom_source.id + ) + if autoroom_source_config["room_type"] != "server": + await self.config.channel(new_voice_channel).owner.set(member.id) + try: + await member.move_to( + new_voice_channel, reason="AutoRoom: Move user to new AutoRoom." + ) + except discord.HTTPException: + await self._process_autoroom_delete(new_voice_channel, member) + return + + # Create optional legacy text channel + new_legacy_text_channel = None + if autoroom_source_config["legacy_text_channel"]: + # Sanity check on required permissions + for perm_name in self.perms_bot_dest_legacy_text: + if not getattr(dest_perms, perm_name): + return + # Generate overwrites + perms = Perms() + perms.update(guild.me, self.perms_bot_dest_legacy_text) + perms.update(guild.default_role, self.perms_legacy_text_deny) + if autoroom_source_config["room_type"] != "server": + perms.update(member, self.perms_autoroom_owner_legacy_text) + else: + perms.update(member, self.perms_legacy_text_allow) + # Admin/moderator overwrites + additional_allowed_roles_text = [] + if await self.config.guild(guild).mod_access(): + # Add mod roles to be allowed + additional_allowed_roles_text += await self.bot.get_mod_roles(guild) + if await self.config.guild(guild).admin_access(): + # Add admin roles to be allowed + additional_allowed_roles_text += await self.bot.get_admin_roles(guild) + for role in additional_allowed_roles_text: + # Add all the mod/admin roles, if required + perms.update(role, self.perms_legacy_text_allow) + # Create text channel + text_channel_topic = await self.template.render( + autoroom_source_config["text_channel_topic"], + self.get_template_data(member), + ) + new_legacy_text_channel = await guild.create_text_channel( + name=new_channel_name.replace("'s ", " "), + category=dest_category, + topic=text_channel_topic, + reason="AutoRoom: New legacy text channel needed.", + overwrites=perms.overwrites if perms.overwrites else {}, + ) + + await self.config.channel(new_voice_channel).associated_text_channel.set( + new_legacy_text_channel.id + ) + + # Send text chat hint if enabled + if autoroom_source_config["text_channel_hint"]: + with suppress(Exception): + hint = await self.template.render( + autoroom_source_config["text_channel_hint"], + self.get_template_data(member), + ) + if hint: + if new_legacy_text_channel: + await new_legacy_text_channel.send(hint) + else: + await new_voice_channel.send(hint[:2000].strip()) + + @staticmethod + async def _process_autoroom_delete( + voice_channel: discord.VoiceChannel, leaving_user: discord.Member | None + ) -> bool: + """Delete AutoRoom if empty.""" + if ( + # If there are no members left in the channel, or if there's just one and it's the person currently leaving (race condition) + not voice_channel.members + or ( + len(voice_channel.members) == 1 + and voice_channel.members[0] == leaving_user + ) + ) and voice_channel.permissions_for(voice_channel.guild.me).manage_channels: + with suppress( + discord.NotFound + ): # Sometimes this happens when the user manually deletes their channel + await voice_channel.delete(reason="AutoRoom: Channel empty.") + return True + return False + + async def _process_autoroom_legacy_text_perms( + self, autoroom: discord.VoiceChannel + ) -> None: + """Allow or deny a user access to the legacy text channel associated to an AutoRoom.""" + legacy_text_channel = await self.get_autoroom_legacy_text_channel(autoroom) + if not legacy_text_channel: + return + + overwrites = dict(legacy_text_channel.overwrites) + perms = Perms(overwrites) + # Remove read perms for users not in autoroom + for member in overwrites: + if ( + isinstance(member, discord.Member) + and member not in autoroom.members + and member != autoroom.guild.me + ): + perms.update(member, self.perms_legacy_text_reset) + # Add read perms for users in autoroom + for member in autoroom.members: + perms.update(member, self.perms_legacy_text_allow) + # Edit channel if overwrites were modified + if perms.modified: + await legacy_text_channel.edit( + overwrites=perms.overwrites if perms.overwrites else {}, + reason="AutoRoom: Legacy text channel permission update", + ) + + async def _generate_channel_name( + self, + autoroom_source_config: dict, + member: discord.Member, + taken_channel_names: list, + ) -> str: + """Return a channel name with an incrementing number appended to it, based on a formatting string.""" + template = None + if autoroom_source_config["channel_name_type"] in channel_name_template: + template = channel_name_template[ + autoroom_source_config["channel_name_type"] + ] + elif autoroom_source_config["channel_name_type"] == "custom": + template = autoroom_source_config["channel_name_format"] + template = template or channel_name_template["username"] + + data = self.get_template_data(member) + data["random_seed"] = ( + f"{member.id}{random.random()}" # noqa: S311 # Doesn't need to be secure + ) + new_channel_name = None + attempt = 1 + with suppress(Exception): + new_channel_name = await self.format_template_room_name( + template, data, attempt + ) + + if not new_channel_name: + # Either the user screwed with the template, or the template returned nothing. Use a default one instead. + template = channel_name_template["username"] + new_channel_name = await self.format_template_room_name( + template, data, attempt + ) + + # Check for duplicate names + attempted_channel_names = [] + while ( + new_channel_name in taken_channel_names + and new_channel_name not in attempted_channel_names + ): + attempt += 1 + attempted_channel_names.append(new_channel_name) + new_channel_name = await self.format_template_room_name( + template, data, attempt + ) + return new_channel_name + + # + # Public methods + # + + @staticmethod + def get_template_data(member: discord.Member | discord.User) -> dict[str, Any]: + """Return a dict of template data based on a member.""" + data = { + "username": member.display_name, + "mention": member.mention, + "datetime": datetime.now(tz=UTC), + "member": { + "display_name": member.display_name, + "mention": member.mention, + "name": member.name, + "id": member.id, + "global_name": member.global_name, + "bot": member.bot, + "system": member.system, + }, + "game": None, + } + if isinstance(member, discord.Member): + for activity in member.activities: + if activity.type == discord.ActivityType.playing and activity.name: + data["game"] = activity.name + break + return data + + async def format_template_room_name( + self, template: str, data: dict, num: int = 1 + ) -> str: + """Return a formatted channel name, taking into account the 100 character channel name limit.""" + nums = {"dupenum": num} + msg = await self.template.render(template, {**data, **nums}) + return msg[:100].strip() + + async def is_admin_or_admin_role(self, who: discord.Role | discord.Member) -> bool: + """Check if a member (or role) is an admin (role). + + Also takes into account if the setting is enabled. + """ + if await self.config.guild(who.guild).admin_access(): + if isinstance(who, discord.Role): + return who in await self.bot.get_admin_roles(who.guild) + if isinstance(who, discord.Member): + return await self.bot.is_admin(who) + return False + + async def is_mod_or_mod_role(self, who: discord.Role | discord.Member) -> bool: + """Check if a member (or role) is a mod (role). + + Also takes into account if the setting is enabled. + """ + if await self.config.guild(who.guild).mod_access(): + if isinstance(who, discord.Role): + return who in await self.bot.get_mod_roles(who.guild) + if isinstance(who, discord.Member): + return await self.bot.is_mod(who) + return False + + def check_perms_source_dest( + self, + autoroom_source: discord.VoiceChannel, + category_dest: discord.CategoryChannel, + *, + with_manage_roles_guild: bool = False, + with_legacy_text_channel: bool = False, + with_optional_clone_perms: bool = False, + detailed: bool = False, + ) -> tuple[bool, bool, str | None]: + """Check if the permissions in an AutoRoom Source and a destination category are sufficient.""" + source = autoroom_source.permissions_for(autoroom_source.guild.me) + dest = category_dest.permissions_for(category_dest.guild.me) + result_required = True + result_optional = True + # Required + for perm_name in self.perms_bot_source: + result_required = result_required and bool(getattr(source, perm_name)) + for perm_name in self.perms_bot_dest: + result_required = result_required and bool(getattr(dest, perm_name)) + if with_manage_roles_guild: + result_required = ( + result_required + and category_dest.guild.me.guild_permissions.manage_roles + ) + # Optional + if with_legacy_text_channel: + for perm_name in self.perms_bot_dest_legacy_text: + result_optional = result_optional and getattr(dest, perm_name) + clone_section = None + if with_optional_clone_perms: + clone_result, clone_section = self._check_perms_source_dest_optional( + autoroom_source, dest, detailed=detailed + ) + result_optional = result_optional and clone_result + result = result_required and result_optional + if not detailed: + return result_required, result_optional, None + + source_section = SettingDisplay("Required on Source Voice Channel") + for perm_name in self.perms_bot_source: + source_section.add( + perm_name.capitalize().replace("_", " "), getattr(source, perm_name) + ) + + dest_section = SettingDisplay("Required on Destination Category") + for perm_name in self.perms_bot_dest: + dest_section.add( + perm_name.capitalize().replace("_", " "), getattr(dest, perm_name) + ) + autoroom_sections = [dest_section] + + if with_manage_roles_guild: + guild_section = SettingDisplay("Required in Guild") + guild_section.add( + "Manage roles", category_dest.guild.me.guild_permissions.manage_roles + ) + autoroom_sections.append(guild_section) + + if with_legacy_text_channel: + text_section = SettingDisplay( + "Optional on Destination Category (for legacy text channel)" + ) + for perm_name in self.perms_bot_dest_legacy_text: + text_section.add( + perm_name.capitalize().replace("_", " "), getattr(dest, perm_name) + ) + autoroom_sections.append(text_section) + + if clone_section: + autoroom_sections.append(clone_section) + + status_emoji = "\N{NO ENTRY SIGN}" + if result: + status_emoji = "\N{WHITE HEAVY CHECK MARK}" + elif result_required: + status_emoji = "\N{WARNING SIGN}\N{VARIATION SELECTOR-16}" + result_str = ( + f"\n{status_emoji} Source VC: {autoroom_source.mention} -> Dest Category: {category_dest.mention}" + "\n" + f"{source_section.display(*autoroom_sections)}" + ) + return result_required, result_optional, result_str + + @staticmethod + def _check_perms_source_dest_optional( + autoroom_source: discord.VoiceChannel, + dest_perms: discord.Permissions, + *, + detailed: bool = False, + ) -> tuple[bool, SettingDisplay | None]: + result = True + checked_perms = {} + source_overwrites = ( + autoroom_source.overwrites if autoroom_source.overwrites else {} + ) + for permissions in source_overwrites.values(): + # We can't put manage_roles in overwrites, so just get rid of it + # Also get rid of view_channel, connect, and send_messages, as we will be controlling those + permissions.update( + connect=None, manage_roles=None, view_channel=None, send_messages=None + ) + # Check each permission for each overwrite target to make sure the bot has it allowed in the dest category + for name, value in permissions: + if value is not None and name not in checked_perms: + check_result = bool(getattr(dest_perms, name)) + if not detailed and not check_result: + return False, None + checked_perms[name] = check_result + result = result and check_result + if not detailed: + return True, None + clone_section = SettingDisplay( + "Optional on Destination Category (for source clone)" + ) + if checked_perms: + for name, value in checked_perms.items(): + clone_section.add(name.capitalize().replace("_", " "), value) + return result, clone_section + return result, None + + async def get_all_autoroom_source_configs( + self, guild: discord.Guild + ) -> dict[int, dict[str, Any]]: + """Return a dict of all autoroom source configs, cleaning up any invalid ones.""" + unsorted_list_of_configs = [] + configs = await self.config.custom( + "AUTOROOM_SOURCE", str(guild.id) + ).all() # Does NOT return default values + for channel_id in configs: + channel = guild.get_channel(int(channel_id)) + if not isinstance(channel, discord.VoiceChannel): + continue + config = await self.get_autoroom_source_config(channel) + if config: + unsorted_list_of_configs.append((channel.position, channel_id, config)) + else: + await self.config.custom( + "AUTOROOM_SOURCE", str(guild.id), channel_id + ).clear() + result = {} + for _, channel_id, config in sorted( + unsorted_list_of_configs, key=lambda source_config: source_config[0] + ): + result[int(channel_id)] = config + return result + + async def get_autoroom_source_config( + self, autoroom_source: discord.VoiceChannel | discord.abc.GuildChannel | None + ) -> dict[str, Any] | None: + """Return the config for an autoroom source, or None if not set up yet.""" + if not autoroom_source: + return None + if not isinstance(autoroom_source, discord.VoiceChannel): + return None + config = await self.config.custom( + "AUTOROOM_SOURCE", str(autoroom_source.guild.id), str(autoroom_source.id) + ).all() # Returns default values + if not config["dest_category_id"]: + return None + + perms = { + "allow": { + "view_channel": True, + "connect": True, + "send_messages": config["perm_send_messages"], + }, + "lock": {"view_channel": True, "connect": False, "send_messages": False}, + "deny": {"view_channel": False, "connect": False, "send_messages": False}, + } + perms["owner"] = { + **perms["allow"], + "manage_channels": True if config["perm_owner_manage_channels"] else None, + "manage_messages": True, + } + if config["room_type"] == "private": + perms["access"] = perms["deny"] + elif config["room_type"] == "locked": + perms["access"] = perms["lock"] + else: + perms["access"] = perms["allow"] + + config["perms"] = perms + return config + + async def get_autoroom_info( + self, autoroom: discord.VoiceChannel | None + ) -> dict[str, Any] | None: + """Get info for an AutoRoom, or None if the voice channel isn't an AutoRoom.""" + if not autoroom: + return None + if not await self.config.channel(autoroom).source_channel(): + return None + return await self.config.channel(autoroom).all() + + async def get_autoroom_legacy_text_channel( + self, autoroom: discord.VoiceChannel | int | None + ) -> discord.TextChannel | None: + """Get the AutoRoom legacy test channel, if it exists and we have manage channels permission.""" + if isinstance(autoroom, discord.VoiceChannel): + autoroom = autoroom.id + if not autoroom: + return None + legacy_text_channel_id = await self.config.channel_from_id( + autoroom + ).associated_text_channel() + legacy_text_channel = ( + self.bot.get_channel(legacy_text_channel_id) + if legacy_text_channel_id + else None + ) + if ( + isinstance(legacy_text_channel, discord.TextChannel) + and legacy_text_channel.permissions_for( + legacy_text_channel.guild.me + ).manage_channels + ): + return legacy_text_channel + return None + + @staticmethod + def check_if_member_or_role_allowed( + channel: discord.VoiceChannel, + member_or_role: discord.Member | discord.Role, + ) -> bool: + """Check if a member/role is allowed to connect to a voice channel. + + Doesn't matter if they can't see it, it ONLY checks the connect permission. + Mostly copied from https://github.com/Rapptz/discord.py/blob/master/discord/abc.py:GuildChannel.permissions_for() + I needed the logic except the "if not base.read_messages:" part that removed all permissions. + """ + if channel.guild.owner_id == member_or_role.id: + return True + + default_role = channel.guild.default_role + base = discord.Permissions(default_role.permissions.value) + + # Handle the role case first + if isinstance(member_or_role, discord.Role): + base.value |= member_or_role.permissions.value + + if base.administrator: + return True + + # Apply @everyone allow/deny first since it's special + with suppress(KeyError): + default_allow, default_deny = channel.overwrites[default_role].pair() + base.handle_overwrite( + allow=default_allow.value, deny=default_deny.value + ) + + if member_or_role.is_default(): + return base.connect + + with suppress(KeyError): + role_allow, role_deny = channel.overwrites[member_or_role].pair() + base.handle_overwrite(allow=role_allow.value, deny=role_deny.value) + + return base.connect + + member_roles = member_or_role.roles + + # Apply guild roles that the member has. + for role in member_roles: + base.value |= role.permissions.value + + # Guild-wide Administrator -> True for everything + # Bypass all channel-specific overrides + if base.administrator: + return True + + # Apply @everyone allow/deny first since it's special + with suppress(KeyError): + default_allow, default_deny = channel.overwrites[default_role].pair() + base.handle_overwrite(allow=default_allow.value, deny=default_deny.value) + + allows = 0 + denies = 0 + + # Apply channel specific role permission overwrites + for role, overwrite in channel.overwrites.items(): + if ( + isinstance(role, discord.Role) + and role != default_role + and role in member_roles + ): + allows |= overwrite.pair()[0].value + denies |= overwrite.pair()[1].value + + base.handle_overwrite(allow=allows, deny=denies) + + # Apply member specific permission overwrites + with suppress(KeyError): + member_allow, member_deny = channel.overwrites[member_or_role].pair() + base.handle_overwrite(allow=member_allow.value, deny=member_deny.value) + + if member_or_role.is_timed_out(): + # Timeout leads to every permission except VIEW_CHANNEL and READ_MESSAGE_HISTORY + # being explicitly denied + return False + + return base.connect + + def get_member_roles( + self, autoroom_source: discord.VoiceChannel + ) -> list[discord.Role]: + """Get member roles set on an AutoRoom Source.""" + member_roles = [] + # If @everyone is allowed to connect to the source channel, there are no member roles + if not self.check_if_member_or_role_allowed( + autoroom_source, + autoroom_source.guild.default_role, + ): + # If it isn't allowed, then member roles are being used + member_roles.extend( + role + for role, overwrite in autoroom_source.overwrites.items() + if isinstance(role, discord.Role) + and role != autoroom_source.guild.default_role + and overwrite.pair()[0].connect + ) + return member_roles + + async def get_bot_roles(self, guild: discord.Guild) -> list[discord.Role]: + """Get the additional bot roles that are added to each AutoRoom.""" + bot_roles = [] + bot_role_ids = [] + some_roles_were_not_found = False + for bot_role_id in await self.config.guild(guild).bot_access(): + bot_role = guild.get_role(bot_role_id) + if bot_role: + bot_roles.append(bot_role) + bot_role_ids.append(bot_role_id) + else: + some_roles_were_not_found = True + if some_roles_were_not_found: + # Update the bot role list to remove nonexistent roles + await self.config.guild(guild).bot_access.set(bot_role_ids) + return bot_roles diff --git a/autoroom/c_autoroom.py b/autoroom/c_autoroom.py new file mode 100644 index 0000000..f9df5a6 --- /dev/null +++ b/autoroom/c_autoroom.py @@ -0,0 +1,517 @@ +"""The autoroom command.""" + +import datetime +from abc import ABC +from typing import Any + +import discord +from redbot.core import commands +from redbot.core.utils.chat_formatting import error, humanize_timedelta + +from .abc import MixinMeta +from .pcx_lib import Perms, SettingDisplay, delete + +MAX_CHANNEL_NAME_LENGTH = 100 + + +class AutoRoomCommands(MixinMeta, ABC): + """The autoroom command.""" + + @commands.group(aliases=["vc"]) + @commands.guild_only() + async def autoroom(self, ctx: commands.Context) -> None: + """Manage your AutoRoom. + + For a quick rundown on how to manage your AutoRoom, + check out [the readme](https://github.com/PhasecoreX/PCXCogs/tree/master/autoroom/README.md) + """ + + @autoroom.command(name="settings", aliases=["about", "info"]) + async def autoroom_settings(self, ctx: commands.Context) -> None: + """Display current settings.""" + if not ctx.guild: + return + autoroom_channel, autoroom_info = await self._get_autoroom_channel_and_info( + ctx, check_owner=False + ) + if not autoroom_channel or not autoroom_info: + return + + room_settings = SettingDisplay("Room Settings") + owner = ctx.guild.get_member(autoroom_info["owner"]) + if owner: + room_settings.add( + "Owner", + owner.display_name, + ) + else: + room_settings.add( + "Mode", + "Server Managed", + ) + + source_channel = ctx.guild.get_channel(autoroom_info["source_channel"]) + if isinstance(source_channel, discord.VoiceChannel): + member_roles = self.get_member_roles(source_channel) + + access_text = "" + if member_roles: + for role in member_roles: + autoroom_type = self._get_autoroom_type(autoroom_channel, role) + if not access_text: + access_text = autoroom_type + elif access_text != autoroom_type: + # Multiple member roles present, and we can't determine the autoroom type + access_text = "custom" + break + else: + access_text = self._get_autoroom_type( + autoroom_channel, autoroom_channel.guild.default_role + ) + access_text = access_text.capitalize() + if member_roles: + access_text += ", only certain roles" + room_settings.add("Access", access_text) + if member_roles: + room_settings.add( + "Member Roles", ", ".join(role.name for role in member_roles) + ) + + room_settings.add("Bitrate", f"{autoroom_channel.bitrate // 1000}kbps") + room_settings.add( + "Channel Age", + humanize_timedelta( + timedelta=datetime.datetime.now(datetime.UTC) + - autoroom_channel.created_at + ), + ) + + access_settings = SettingDisplay("Current Access Settings") + allowed_members = [] + allowed_roles = [] + denied_members = [] + denied_roles = [] + for member_or_role in autoroom_channel.overwrites: + if isinstance(member_or_role, discord.Role): + if self.check_if_member_or_role_allowed( + autoroom_channel, member_or_role + ): + allowed_roles.append(member_or_role) + else: + denied_roles.append(member_or_role) + elif isinstance(member_or_role, discord.Member): + if self.check_if_member_or_role_allowed( + autoroom_channel, member_or_role + ): + allowed_members.append(member_or_role) + else: + denied_members.append(member_or_role) + + if allowed_members: + access_settings.add( + "Allowed Members", + ", ".join(member.display_name for member in allowed_members), + ) + if allowed_roles: + access_settings.add( + "Allowed Roles", ", ".join(role.name for role in allowed_roles) + ) + if denied_members: + access_settings.add( + "Denied Members", + ", ".join(member.display_name for member in denied_members), + ) + if denied_roles: + access_settings.add( + "Denied Roles", ", ".join(role.name for role in denied_roles) + ) + + await ctx.send(str(room_settings.display(access_settings))) + + @autoroom.command(name="name") + async def autoroom_name(self, ctx: commands.Context, *, name: str) -> None: + """Change the name of your AutoRoom.""" + if not ctx.guild: + return + autoroom_channel, autoroom_info = await self._get_autoroom_channel_and_info(ctx) + if not autoroom_channel or not autoroom_info: + return + + if len(name) > MAX_CHANNEL_NAME_LENGTH: + name = name[:MAX_CHANNEL_NAME_LENGTH] + if name != autoroom_channel.name: + bucket = self.bucket_autoroom_name.get_bucket(autoroom_channel) + if bucket: + retry_after = bucket.update_rate_limit() + if retry_after: + per_display = bucket.per - self.extra_channel_name_change_delay + hint_text = error( + f"{ctx.message.author.mention}, you can only modify an AutoRoom name **{bucket.rate}** times " + f"every **{humanize_timedelta(seconds=per_display)}** with this command. " + f"You can try again in **{humanize_timedelta(seconds=max(1, int(min(per_display, retry_after))))}**." + "\n\n" + "Alternatively, you can modify the channel yourself by either right clicking the channel on " + "desktop or by long pressing it on mobile." + ) + if ctx.guild.mfa_level: + hint_text += ( + " Do note that since this server has 2FA enabled, you will need it enabled " + "on your account to modify the channel in this way." + ) + hint = await ctx.send(hint_text) + await delete(ctx.message, delay=30) + await delete(hint, delay=30) + return + await autoroom_channel.edit( + name=name, reason="AutoRoom: User edit room info" + ) + await ctx.tick() + await delete(ctx.message, delay=5) + + @autoroom.command(name="bitrate", aliases=["kbps"]) + async def autoroom_bitrate(self, ctx: commands.Context, kbps: int) -> None: + """Change the bitrate of your AutoRoom.""" + if not ctx.guild: + return + autoroom_channel, autoroom_info = await self._get_autoroom_channel_and_info(ctx) + if not autoroom_channel or not autoroom_info: + return + + bps = max(8000, min(int(ctx.guild.bitrate_limit), kbps * 1000)) + if bps != autoroom_channel.bitrate: + await autoroom_channel.edit( + bitrate=bps, reason="AutoRoom: User edit room info" + ) + await ctx.tick() + await delete(ctx.message, delay=5) + + @autoroom.command(name="users", aliases=["userlimit"]) + async def autoroom_users(self, ctx: commands.Context, user_limit: int) -> None: + """Change the user limit of your AutoRoom.""" + autoroom_channel, autoroom_info = await self._get_autoroom_channel_and_info(ctx) + if not autoroom_channel or not autoroom_info: + return + + limit = max(0, min(99, user_limit)) + if limit != autoroom_channel.user_limit: + await autoroom_channel.edit( + user_limit=limit, reason="AutoRoom: User edit room info" + ) + await ctx.tick() + await delete(ctx.message, delay=5) + + @autoroom.command() + async def claim(self, ctx: commands.Context) -> None: + """Claim ownership of this AutoRoom.""" + if not ctx.guild: + return + new_owner = ctx.message.author + if not isinstance(new_owner, discord.Member): + return + autoroom_channel, autoroom_info = await self._get_autoroom_channel_and_info( + ctx, check_owner=False + ) + if not autoroom_channel or not autoroom_info: + return + bucket = self.bucket_autoroom_owner_claim.get_bucket(autoroom_channel) + old_owner = ctx.guild.get_member(autoroom_info["owner"]) + denied_message = "" + + if ( + not await self.is_mod_or_mod_role(new_owner) + and not await self.is_admin_or_admin_role(new_owner) + and new_owner != ctx.guild.owner + ): + if old_owner and old_owner in autoroom_channel.members: + denied_message = ( + "you can only claim ownership once the AutoRoom Owner has left" + ) + elif bucket: + retry_after = bucket.update_rate_limit() + if retry_after: + denied_message = f"you must wait **{humanize_timedelta(seconds=max(retry_after, 1))}** before claiming ownership, in case the previous AutoRoom Owner comes back" + + if denied_message: + await self._send_temp_error_message(ctx, denied_message) + return + + source_channel = ctx.guild.get_channel(autoroom_info["source_channel"]) + asc = await self.get_autoroom_source_config(source_channel) + if not asc: + await self._send_temp_error_message( + ctx, + "it seems like the AutoRoom Source this AutoRoom was made from " + "no longer exists. Because of that, I can no longer modify this AutoRoom.", + ) + return + perms = Perms(autoroom_channel.overwrites) + if old_owner: + perms.overwrite(old_owner, asc["perms"]["allow"]) + perms.update(new_owner, asc["perms"]["owner"]) + if perms.modified: + await autoroom_channel.edit( + overwrites=perms.overwrites if perms.overwrites else {}, + reason="AutoRoom: Ownership claimed", + ) + await self.config.channel(autoroom_channel).owner.set(new_owner.id) + + legacy_text_channel = await self.get_autoroom_legacy_text_channel( + autoroom_channel + ) + if legacy_text_channel: + legacy_text_perms = Perms(legacy_text_channel.overwrites) + if old_owner: + if old_owner in autoroom_channel.members: + legacy_text_perms.overwrite(old_owner, self.perms_legacy_text_allow) + else: + legacy_text_perms.overwrite(old_owner, self.perms_legacy_text_reset) + legacy_text_perms.update(new_owner, self.perms_autoroom_owner_legacy_text) + if legacy_text_perms.modified: + await legacy_text_channel.edit( + overwrites=( + legacy_text_perms.overwrites + if legacy_text_perms.overwrites + else {} + ), + reason="AutoRoom: Ownership claimed (legacy text channel)", + ) + + if bucket: + bucket.reset() + await ctx.tick() + await delete(ctx.message, delay=5) + + @autoroom.command() + async def public(self, ctx: commands.Context) -> None: + """Make your AutoRoom public.""" + await self._process_allow_deny(ctx, "allow") + + @autoroom.command() + async def locked(self, ctx: commands.Context) -> None: + """Lock your AutoRoom (visible, but no one can join).""" + await self._process_allow_deny(ctx, "lock") + + @autoroom.command() + async def private(self, ctx: commands.Context) -> None: + """Make your AutoRoom private.""" + await self._process_allow_deny(ctx, "deny") + + @autoroom.command(aliases=["add"]) + async def allow( + self, ctx: commands.Context, member_or_role: discord.Role | discord.Member + ) -> None: + """Allow a user (or role) into your AutoRoom.""" + await self._process_allow_deny(ctx, "allow", member_or_role=member_or_role) + + @autoroom.command(aliases=["ban", "block"]) + async def deny( + self, ctx: commands.Context, member_or_role: discord.Role | discord.Member + ) -> None: + """Deny a user (or role) from accessing your AutoRoom. + + If the user is already in your AutoRoom, they will be disconnected. + + If a user is no longer able to access the room due to denying a role, + they too will be disconnected. Keep in mind that if the server is using + member roles, denying roles will probably not work as expected. + """ + if not ctx.guild: + return + if await self._process_allow_deny(ctx, "deny", member_or_role=member_or_role): + channel = self._get_current_voice_channel(ctx.message.author) + if not channel or not channel.permissions_for(ctx.guild.me).move_members: + return + for member in channel.members: + if not channel.permissions_for(member).connect: + await member.move_to(None, reason="AutoRoom: Deny user") + + async def _process_allow_deny( + self, + ctx: commands.Context, + access: str, + *, + member_or_role: discord.Role | discord.Member | None = None, + ) -> bool: + """Actually do channel edit for allow/deny.""" + if not ctx.guild: + return False + autoroom_channel, autoroom_info = await self._get_autoroom_channel_and_info(ctx) + if not autoroom_channel or not autoroom_info: + return False + + if not autoroom_channel.permissions_for(autoroom_channel.guild.me).manage_roles: + await self._send_temp_error_message( + ctx, + "I do not have the required permissions to do this. " + "Please let the staff know about this!", + ) + return False + + source_channel = ctx.guild.get_channel(autoroom_info["source_channel"]) + if not isinstance(source_channel, discord.VoiceChannel): + await self._send_temp_error_message( + ctx, + "it seems like the AutoRoom Source this AutoRoom was made from " + "no longer exists. Because of that, I can no longer modify this AutoRoom.", + ) + return False + + # Gather member roles and determine the lowest ranked member role + member_roles = self.get_member_roles(source_channel) + lowest_member_role = 999999999999 + for role in member_roles: + lowest_member_role = min(lowest_member_role, role.position) + + asc = await self.get_autoroom_source_config(source_channel) + if not asc: + await self._send_temp_error_message( + ctx, + "it seems like the AutoRoom Source this AutoRoom was made from " + "no longer exists. Because of that, I can no longer modify this AutoRoom.", + ) + return False + + perms = Perms(autoroom_channel.overwrites) + + denied_message = "" + to_modify = [member_or_role] + if not member_or_role: + # Public/locked/private command + to_modify = member_roles or [source_channel.guild.default_role] + if access != "allow": + for member in autoroom_channel.members: + perms.update(member, asc["perms"]["allow"]) + elif access == "allow": + # If we are allowing a bot role, allow it + if isinstance( + member_or_role, discord.Role + ) and member_or_role in await self.get_bot_roles(ctx.guild): + pass + # Allow a specific user + # - check if they have "connect" perm in the source channel + # - works for both deny everyone with allowed roles/users, and allow everyone with denied roles/users + # Allow a specific role + # - Make sure that the role isn't specifically denied on the source channel + elif not self.check_if_member_or_role_allowed( + source_channel, member_or_role + ): + user_role = "user" + them_it = "them" + if isinstance(member_or_role, discord.Role): + user_role = "role" + them_it = "it" + denied_message = ( + f"since that {user_role} is not allowed to connect to the AutoRoom Source " + f"that this AutoRoom was made from, I can't allow {them_it} here either." + ) + # Allow a specific role part 2 + # - Check that the role is equal to or above the lowest allowed (member) role on the source channel + elif ( + isinstance(member_or_role, discord.Role) + and member_roles + and member_or_role.position < lowest_member_role + ): + denied_message = "this AutoRoom is using member roles, so I can't allow a lower hierarchy role." + # Deny a specific user + # - check that they aren't a mod/admin/owner/autoroom owner/bot itself, then deny user + # Deny a specific role + # - Check that it isn't a mod/admin role, then deny role + elif member_or_role == ctx.guild.me: + denied_message = "why would I deny myself from entering your AutoRoom?" + elif member_or_role == ctx.message.author: + denied_message = "don't be so hard on yourself! This is your AutoRoom!" + elif member_or_role == ctx.guild.owner: + denied_message = ( + "I don't know if you know this, but that's the server owner... " + "I can't deny them from entering your AutoRoom." + ) + elif await self.is_admin_or_admin_role(member_or_role): + role_suffix = " role" if isinstance(member_or_role, discord.Role) else "" + denied_message = f"that's an admin{role_suffix}, so I can't deny them from entering your AutoRoom." + elif await self.is_mod_or_mod_role(member_or_role): + role_suffix = " role" if isinstance(member_or_role, discord.Role) else "" + denied_message = f"that's a moderator{role_suffix}, so I can't deny them from entering your AutoRoom." + + if denied_message: + await self._send_temp_error_message(ctx, denied_message) + return False + + for target in to_modify: + if isinstance(target, discord.Member | discord.Role): + perms.update(target, asc["perms"][access]) + if isinstance(target, discord.Member): + async with self.config.channel( + autoroom_channel + ).denied() as denied_users: + if access == "deny" and target.id not in denied_users: + denied_users.append(target.id) + elif access == "allow" and target.id in denied_users: + denied_users.remove(target.id) + if perms.modified: + await autoroom_channel.edit( + overwrites=perms.overwrites if perms.overwrites else {}, + reason="AutoRoom: Permission change", + ) + await ctx.tick() + await delete(ctx.message, delay=5) + return True + + @staticmethod + def _get_current_voice_channel( + member: discord.Member | discord.User, + ) -> discord.VoiceChannel | None: + """Get the members current voice channel, or None if not in a voice channel.""" + if ( + isinstance(member, discord.Member) + and member.voice + and isinstance(member.voice.channel, discord.VoiceChannel) + ): + return member.voice.channel + return None + + async def _get_autoroom_channel_and_info( + self, ctx: commands.Context, *, check_owner: bool = True + ) -> tuple[discord.VoiceChannel | None, dict[str, Any] | None]: + autoroom_channel = self._get_current_voice_channel(ctx.message.author) + autoroom_info = await self.get_autoroom_info(autoroom_channel) + if not autoroom_info: + await self._send_temp_error_message(ctx, "you are not in an AutoRoom.") + return None, None + if check_owner and ctx.message.author.id != autoroom_info["owner"]: + reason_server = "" + if not autoroom_info["owner"]: + reason_server = " (it is a server AutoRoom)" + await self._send_temp_error_message( + ctx, f"you are not the owner of this AutoRoom{reason_server}." + ) + return None, None + return autoroom_channel, autoroom_info + + @staticmethod + def _get_autoroom_type(autoroom: discord.VoiceChannel, role: discord.Role) -> str: + """Get the type of access a role has in an AutoRoom (public, locked, private, etc).""" + view_channel = role.permissions.view_channel + connect = role.permissions.connect + if role in autoroom.overwrites: + overwrites_allow, overwrites_deny = autoroom.overwrites[role].pair() + if overwrites_allow.view_channel: + view_channel = True + if overwrites_allow.connect: + connect = True + if overwrites_deny.view_channel: + view_channel = False + if overwrites_deny.connect: + connect = False + if not view_channel and not connect: + return "private" + if view_channel and not connect: + return "locked" + return "public" + + async def _send_temp_error_message( + self, ctx: commands.Context, message: str + ) -> None: + """Send an error message that deletes itself along with the context message.""" + hint = await ctx.send(error(f"{ctx.message.author.mention}, {message}")) + await delete(ctx.message, delay=10) + await delete(hint, delay=10) diff --git a/autoroom/c_autoroomset.py b/autoroom/c_autoroomset.py new file mode 100644 index 0000000..655ce4a --- /dev/null +++ b/autoroom/c_autoroomset.py @@ -0,0 +1,1008 @@ +"""The autoroomset command.""" + +import asyncio +import io +from abc import ABC +from contextlib import suppress + +import discord +from jinja2.exceptions import TemplateError +from redbot.core import checks, commands +from redbot.core.utils.chat_formatting import error, info, success, warning +from redbot.core.utils.menus import DEFAULT_CONTROLS, menu +from redbot.core.utils.predicates import MessagePredicate + +from .abc import MixinMeta +from .pcx_lib import SettingDisplay + +channel_name_template = { + "username": "{{username}}'s Room{% if dupenum > 1 %} ({{dupenum}}){% endif %}", + "game": "{{game}}{% if not game %}{{username}}'s Room{% endif %}{% if dupenum > 1 %} ({{dupenum}}){% endif %}", +} + +MAX_MESSAGE_LENGTH = 2000 + + +class AutoRoomSetCommands(MixinMeta, ABC): + """The autoroomset command.""" + + @commands.group() + @commands.guild_only() + @checks.admin_or_permissions(manage_guild=True) + async def autoroomset(self, ctx: commands.Context) -> None: + """Configure the AutoRoom cog. + + For a quick rundown on how to get started with this cog, + check out [the readme](https://github.com/PhasecoreX/PCXCogs/tree/master/autoroom/README.md) + """ + + @autoroomset.command() + async def settings(self, ctx: commands.Context) -> None: + """Display current settings.""" + if not ctx.guild: + return + server_section = SettingDisplay("Server Settings") + server_section.add( + "Admin access all AutoRooms", + await self.config.guild(ctx.guild).admin_access(), + ) + server_section.add( + "Moderator access all AutoRooms", + await self.config.guild(ctx.guild).mod_access(), + ) + bot_roles = ", ".join( + [role.name for role in await self.get_bot_roles(ctx.guild)] + ) + if bot_roles: + server_section.add("Bot roles allowed in all AutoRooms", bot_roles) + + await ctx.send(server_section.display()) + avcs = await self.get_all_autoroom_source_configs(ctx.guild) + for avc_id, avc_settings in avcs.items(): + source_channel = ctx.guild.get_channel(avc_id) + if not isinstance(source_channel, discord.VoiceChannel): + continue + dest_category = ctx.guild.get_channel(avc_settings["dest_category_id"]) + autoroom_section = SettingDisplay(f"AutoRoom - {source_channel.name}") + autoroom_section.add( + "Room type", + avc_settings["room_type"].capitalize(), + ) + autoroom_section.add( + "Destination category", + f"#{dest_category.name}" if dest_category else "INVALID CATEGORY", + ) + if avc_settings["legacy_text_channel"]: + autoroom_section.add( + "Legacy Text Channel", + "True", + ) + if not avc_settings["perm_send_messages"]: + autoroom_section.add( + "Send Messages", + "False", + ) + if not avc_settings["perm_owner_manage_channels"]: + autoroom_section.add( + "Owner Manage Channel", + "False", + ) + member_roles = self.get_member_roles(source_channel) + if member_roles: + autoroom_section.add( + "Member Roles" if len(member_roles) > 1 else "Member Role", + ", ".join(role.name for role in member_roles), + ) + room_name_format = "Username" + if avc_settings["channel_name_type"] in channel_name_template: + room_name_format = avc_settings["channel_name_type"].capitalize() + elif ( + avc_settings["channel_name_type"] == "custom" + and avc_settings["channel_name_format"] + ): + room_name_format = f'Custom: "{avc_settings["channel_name_format"]}"' + autoroom_section.add("Room name format", room_name_format) + + if avc_settings["text_channel_hint"]: + autoroom_section.add( + "Text Channel Hint", + avc_settings["text_channel_hint"], + ) + + if avc_settings["text_channel_topic"]: + autoroom_section.add( + "Text Channel Topic", + avc_settings["text_channel_topic"], + ) + msg = autoroom_section.display() + if len(msg) < MAX_MESSAGE_LENGTH: + await ctx.send(msg) + else: + raw_msg = autoroom_section.raw() + msg_bytes = io.BytesIO(raw_msg.encode("utf-8")) + await ctx.send( + file=discord.File(msg_bytes, filename="autoroom_settings.txt") + ) + + message = "" + required_check, optional_check, _ = await self._check_all_perms(ctx.guild) + if not required_check: + message += "\n" + error( + "It looks like I am missing one or more required permissions. " + "Until I have them, the AutoRoom cog may not function properly " + "for all AutoRoom Sources. " + "Check `[p]autoroomset permissions` for more information." + ) + elif not optional_check: + message += "\n" + warning( + "All AutoRooms will work correctly, as I have all of the required permissions. " + "However, it looks like I am missing one or more optional permissions " + "for one or more AutoRooms. " + "Check `[p]autoroomset permissions` for more information." + ) + if message: + await ctx.send(message) + + @autoroomset.command(aliases=["perms"]) + async def permissions(self, ctx: commands.Context) -> None: + """Check that the bot has all needed permissions.""" + if not ctx.guild: + return + required_check, optional_check, details_list = await self._check_all_perms( + ctx.guild, detailed=True + ) + if not details_list: + await ctx.send( + info( + "You don't have any AutoRoom Sources set up! " + "Set one up with `[p]autoroomset create` first, " + "then I can check what permissions I need for it." + ) + ) + return + + if ( + len(details_list) > 1 + and not ctx.channel.permissions_for(ctx.guild.me).add_reactions + ): + await ctx.send( + error( + "Since you have multiple AutoRoom Sources, " + 'I need the "Add Reactions" permission to display permission information.' + ) + ) + return + + if not required_check: + await ctx.send( + error( + "It looks like I am missing one or more required permissions. " + "Until I have them, the AutoRoom Source(s) in question will not function properly." + "\n\n" + "The easiest way of fixing this is just giving me these permissions as part of my server role, " + "otherwise you will need to give me these permissions on the AutoRoom Source and destination " + "category, as specified below." + ) + ) + elif not optional_check: + await ctx.send( + warning( + "It looks like I am missing one or more optional permissions. " + "All AutoRooms will work, however some extra features may not work. " + "\n\n" + "The easiest way of fixing this is just giving me these permissions as part of my server role, " + "otherwise you will need to give me these permissions on the destination category, " + "as specified below." + "\n\n" + "In the case of optional permissions, any permission on the AutoRoom Source will be copied to " + "the created AutoRoom, as if we were cloning the AutoRoom Source. In order for this to work, " + "I need each permission to be allowed in the destination category (or server). " + "If it isn't allowed, I will skip copying that permission over." + ) + ) + else: + await ctx.send(success("Everything looks good here!")) + + if len(details_list) > 1: + if ( + ctx.channel.permissions_for(ctx.guild.me).add_reactions + and ctx.channel.permissions_for(ctx.guild.me).read_message_history + ): + await menu(ctx, details_list, DEFAULT_CONTROLS, timeout=60.0) + else: + for details in details_list: + await ctx.send(details) + else: + await ctx.send(details_list[0]) + + @autoroomset.group() + async def access(self, ctx: commands.Context) -> None: + """Control access to all AutoRooms. + + Roles that are considered "admin" or "moderator" are + set up with the commands `[p]set addadminrole` + and `[p]set addmodrole` (plus the remove commands too) + """ + + @access.command(name="admin") + async def access_admin(self, ctx: commands.Context) -> None: + """Allow Admins to join locked/private AutoRooms.""" + if not ctx.guild: + return + admin_access = not await self.config.guild(ctx.guild).admin_access() + await self.config.guild(ctx.guild).admin_access.set(admin_access) + await ctx.send( + success( + f"Admins are {'now' if admin_access else 'no longer'} able to join (new) locked/private AutoRooms." + ) + ) + + @access.command(name="mod") + async def access_mod(self, ctx: commands.Context) -> None: + """Allow Moderators to join locked/private AutoRooms.""" + if not ctx.guild: + return + mod_access = not await self.config.guild(ctx.guild).mod_access() + await self.config.guild(ctx.guild).mod_access.set(mod_access) + await ctx.send( + success( + f"Moderators are {'now' if mod_access else 'no longer'} able to join (new) locked/private AutoRooms." + ) + ) + + @access.group(name="bot") + async def access_bot(self, ctx: commands.Context) -> None: + """Automatically allow bots into AutoRooms. + + The AutoRoom Owner is able to freely allow or deny these roles as they see fit. + """ + + @access_bot.command(name="add") + async def access_bot_add(self, ctx: commands.Context, role: discord.Role) -> None: + """Allow a bot role into every AutoRoom.""" + if not ctx.guild: + return + bot_role_ids = await self.config.guild(ctx.guild).bot_access() + if role.id not in bot_role_ids: + bot_role_ids.append(role.id) + await self.config.guild(ctx.guild).bot_access.set(bot_role_ids) + + role_list = "\n".join( + [role.name for role in await self.get_bot_roles(ctx.guild)] + ) + await ctx.send( + success( + f"AutoRooms will now allow the following bot roles in by default:\n\n{role_list}" + ) + ) + + @access_bot.command(name="remove", aliases=["delete", "del"]) + async def access_bot_remove( + self, ctx: commands.Context, role: discord.Role + ) -> None: + """Disallow a bot role from joining every AutoRoom.""" + if not ctx.guild: + return + bot_role_ids = await self.config.guild(ctx.guild).bot_access() + if role.id in bot_role_ids: + bot_role_ids.remove(role.id) + await self.config.guild(ctx.guild).bot_access.set(bot_role_ids) + + if bot_role_ids: + role_list = "\n".join( + [role.name for role in await self.get_bot_roles(ctx.guild)] + ) + await ctx.send( + success( + f"AutoRooms will now allow the following bot roles in by default:\n\n{role_list}" + ) + ) + else: + await ctx.send( + success("New AutoRooms will not allow any extra bot roles in.") + ) + + @autoroomset.command(aliases=["enable", "add"]) + async def create( + self, + ctx: commands.Context, + source_voice_channel: discord.VoiceChannel, + dest_category: discord.CategoryChannel, + ) -> None: + """Create an AutoRoom Source. + + Anyone joining an AutoRoom Source will automatically have a new + voice channel (AutoRoom) created in the destination category, + and then be moved into it. + """ + if not ctx.guild: + return + perms_required, perms_optional, details = self.check_perms_source_dest( + source_voice_channel, dest_category, detailed=True + ) + if not perms_required or not perms_optional: + await ctx.send( + error( + "I am missing a permission that the AutoRoom cog requires me to have. " + "Check below for the permissions I require in both the AutoRoom Source " + "and the destination category. " + "Try creating the AutoRoom Source again once I have these permissions." + "\n" + f"{details}" + "\n" + "The easiest way of doing this is just giving me these permissions as part of my server role, " + "otherwise you will need to give me these permissions on the source channel and destination " + "category, as specified above." + ) + ) + return + new_source: dict[str, str | int] = {"dest_category_id": dest_category.id} + + # Room type + options = ["public", "locked", "private", "server"] + pred = MessagePredicate.lower_contained_in(options, ctx) + await ctx.send( + "**Welcome to the setup wizard for creating an AutoRoom Source!**" + "\n" + f"Users joining the {source_voice_channel.mention} AutoRoom Source will have an AutoRoom " + f"created in the {dest_category.mention} category and be moved into it." + "\n\n" + "**AutoRoom Type**" + "\n" + "AutoRooms can be one of the following types when created:" + "\n" + "`public ` - Visible and joinable by other users. The AutoRoom Owner can kick/ban users out of them." + "\n" + "`locked ` - Visible, but not joinable by other users. The AutoRoom Owner must allow users into their room." + "\n" + "`private` - Not visible or joinable by other users. The AutoRoom Owner must allow users into their room." + "\n" + "`server ` - Same as a public AutoRoom, but with no AutoRoom Owner. " + "No modifications can be made to the generated AutoRoom." + "\n\n" + "What would you like these created AutoRooms to be by default? (`public`/`locked`/`private`/`server`)" + ) + answer = None + with suppress(asyncio.TimeoutError): + await ctx.bot.wait_for("message", check=pred, timeout=60) + answer = pred.result + if answer is not None: + new_source["room_type"] = options[answer] + else: + await ctx.send("No valid answer was received, canceling setup process.") + return + + # Check perms room type + perms_required, perms_optional, details = self.check_perms_source_dest( + source_voice_channel, + dest_category, + with_manage_roles_guild=new_source["room_type"] != "server", + detailed=True, + ) + if not perms_required or not perms_optional: + await ctx.send( + error( + f"Since you want to have this AutoRoom Source create {new_source['room_type']} AutoRooms, " + "I will need a few extra permissions. " + "Try creating the AutoRoom Source again once I have these permissions." + "\n" + f"{details}" + ) + ) + return + + # Channel name + options = ["username", "game"] + pred = MessagePredicate.lower_contained_in(options, ctx) + await ctx.send( + "**Channel Name**" + "\n" + "When an AutoRoom is created, a name will be generated for it. How would you like that name to be generated?" + "\n\n" + f'`username` - Shows up as "{ctx.author.display_name}\'s Room"\n' + "`game ` - AutoRoom Owner's playing game, otherwise `username`" + ) + answer = None + with suppress(asyncio.TimeoutError): + await ctx.bot.wait_for("message", check=pred, timeout=60) + answer = pred.result + if answer is not None: + new_source["channel_name_type"] = options[answer] + else: + await ctx.send("No valid answer was received, canceling setup process.") + return + + # Save new source + await self.config.custom( + "AUTOROOM_SOURCE", str(ctx.guild.id), str(source_voice_channel.id) + ).set(new_source) + await ctx.send( + success( + "Settings saved successfully!\n" + "Check out `[p]autoroomset modify` for even more AutoRoom Source settings, " + "or to make modifications to your above answers." + ) + ) + + @autoroomset.command(aliases=["disable", "delete", "del"]) + async def remove( + self, + ctx: commands.Context, + autoroom_source: discord.VoiceChannel, + ) -> None: + """Remove an AutoRoom Source.""" + if not ctx.guild: + return + await self.config.custom( + "AUTOROOM_SOURCE", str(ctx.guild.id), str(autoroom_source.id) + ).clear() + await ctx.send( + success( + f"**{autoroom_source.mention}** is no longer an AutoRoom Source channel." + ) + ) + + @autoroomset.group(aliases=["edit"]) + async def modify(self, ctx: commands.Context) -> None: + """Modify an existing AutoRoom Source.""" + + @modify.command(name="category") + async def modify_category( + self, + ctx: commands.Context, + autoroom_source: discord.VoiceChannel, + dest_category: discord.CategoryChannel, + ) -> None: + """Set the category that AutoRooms will be created in.""" + if not ctx.guild: + return + if await self.get_autoroom_source_config(autoroom_source): + await self.config.custom( + "AUTOROOM_SOURCE", str(ctx.guild.id), str(autoroom_source.id) + ).dest_category_id.set(dest_category.id) + perms_required, perms_optional, details = self.check_perms_source_dest( + autoroom_source, dest_category, detailed=True + ) + message = f"**{autoroom_source.mention}** will now create new AutoRooms in the **{dest_category.mention}** category." + if perms_required and perms_optional: + await ctx.send(success(message)) + else: + await ctx.send( + warning( + f"{message}" + "\n" + "Do note, this new category does not have sufficient permissions for me to make AutoRooms. " + "Until you fix this, the AutoRoom Source will not work." + "\n" + f"{details}" + ) + ) + else: + await ctx.send( + error( + f"**{autoroom_source.mention}** is not an AutoRoom Source channel." + ) + ) + + @modify.group(name="type") + async def modify_type(self, ctx: commands.Context) -> None: + """Choose what type of AutoRoom is created.""" + + @modify_type.command(name="public") + async def modify_type_public( + self, ctx: commands.Context, autoroom_source: discord.VoiceChannel + ) -> None: + """Rooms will be open to all. AutoRoom Owner has control over room.""" + await self._save_public_private(ctx, autoroom_source, "public") + + @modify_type.command(name="locked") + async def modify_type_locked( + self, ctx: commands.Context, autoroom_source: discord.VoiceChannel + ) -> None: + """Rooms will be visible to all, but not joinable. AutoRoom Owner can allow users in.""" + await self._save_public_private(ctx, autoroom_source, "locked") + + @modify_type.command(name="private") + async def modify_type_private( + self, ctx: commands.Context, autoroom_source: discord.VoiceChannel + ) -> None: + """Rooms will be hidden. AutoRoom Owner can allow users in.""" + await self._save_public_private(ctx, autoroom_source, "private") + + @modify_type.command(name="server") + async def modify_type_server( + self, ctx: commands.Context, autoroom_source: discord.VoiceChannel + ) -> None: + """Rooms will be open to all, but the server owns the AutoRoom (so they can't be modified).""" + await self._save_public_private(ctx, autoroom_source, "server") + + async def _save_public_private( + self, + ctx: commands.Context, + autoroom_source: discord.VoiceChannel, + room_type: str, + ) -> None: + """Save the public/private setting.""" + if not ctx.guild: + return + if await self.get_autoroom_source_config(autoroom_source): + await self.config.custom( + "AUTOROOM_SOURCE", str(ctx.guild.id), str(autoroom_source.id) + ).room_type.set(room_type) + await ctx.send( + success( + f"**{autoroom_source.mention}** will now create `{room_type}` AutoRooms." + ) + ) + else: + await ctx.send( + error( + f"**{autoroom_source.mention}** is not an AutoRoom Source channel." + ) + ) + + @modify.group(name="name") + async def modify_name(self, ctx: commands.Context) -> None: + """Set the default name format of an AutoRoom.""" + + @modify_name.command(name="username") + async def modify_name_username( + self, ctx: commands.Context, autoroom_source: discord.VoiceChannel + ) -> None: + """Default format: PhasecoreX's Room. + + Custom format example: + `{{username}}'s Room{% if dupenum > 1 %} ({{dupenum}}){% endif %}` + """ # noqa: D401 + await self._save_room_name(ctx, autoroom_source, "username") + + @modify_name.command(name="game") + async def modify_name_game( + self, ctx: commands.Context, autoroom_source: discord.VoiceChannel + ) -> None: + """The users current playing game, otherwise the username format. + + Custom format example: + `{{game}}{% if not game %}{{username}}'s Room{% endif %}{% if dupenum > 1 %} ({{dupenum}}){% endif %}` + """ # noqa: D401 + await self._save_room_name(ctx, autoroom_source, "game") + + @modify_name.command(name="custom") + async def modify_name_custom( + self, + ctx: commands.Context, + autoroom_source: discord.VoiceChannel, + *, + template: str, + ) -> None: + """A custom channel name. + + Use `{{ expressions }}` to print variables and `{% statements %}` to do basic evaluations on variables. + + Variables supported: + - `username` - AutoRoom Owner's username + - `game ` - AutoRoom Owner's game + - `dupenum ` - An incrementing number that starts at 1, useful for un-duplicating channel names + + Statements supported: + - `if/elif/else/endif` + - Example: `{% if dupenum > 1 %}DupeNum is {{dupenum}}, which is greater than 1{% endif %}` + - Another example: `{% if not game %}User isn't playing a game!{% endif %}` + + It's kinda like Jinja2, but way simpler. Check out [the readme](https://github.com/PhasecoreX/PCXCogs/tree/master/autoroom/README.md) for more info. + """ # noqa: D401 + await self._save_room_name(ctx, autoroom_source, "custom", template) + + async def _save_room_name( + self, + ctx: commands.Context, + autoroom_source: discord.VoiceChannel, + room_type: str, + template: str | None = None, + ) -> None: + """Save the room name type.""" + if not ctx.guild: + return + if await self.get_autoroom_source_config(autoroom_source): + data = self.get_template_data(ctx.author) + if template: + template = template.replace("\n", " ") + try: + # Validate template + await self.format_template_room_name(template, data) + except TemplateError as rte: + await ctx.send( + error( + "Hmm... that doesn't seem to be a valid template:" + "\n\n" + f"`{rte!s}`" + "\n\n" + "If you need some help, take a look at " + "[the readme](https://github.com/PhasecoreX/PCXCogs/tree/master/autoroom/README.md)." + ) + ) + return + await self.config.custom( + "AUTOROOM_SOURCE", str(ctx.guild.id), str(autoroom_source.id) + ).channel_name_format.set(template) + else: + await self.config.custom( + "AUTOROOM_SOURCE", str(ctx.guild.id), str(autoroom_source.id) + ).channel_name_format.clear() + await self.config.custom( + "AUTOROOM_SOURCE", str(ctx.guild.id), str(autoroom_source.id) + ).channel_name_type.set(room_type) + message = ( + f"New AutoRooms created by **{autoroom_source.mention}** " + f"will use the **{room_type.capitalize()}** format" + ) + if template: + message += f":\n`{template}`" + else: + # Load preset template for display purposes + template = channel_name_template[room_type] + message += "." + if "game" not in data: + data["game"] = "Example Game" + message += "\n\nExample room names:" + for room_num in range(1, 4): + message += f"\n{await self.format_template_room_name(template, data, room_num)}" + await ctx.send(success(message)) + else: + await ctx.send( + error( + f"**{autoroom_source.mention}** is not an AutoRoom Source channel." + ) + ) + + @modify.group(name="text") + async def modify_text( + self, + ctx: commands.Context, + ) -> None: + """Configure sending an introductory message to the AutoRoom text channel.""" + + @modify_text.command(name="set") + async def modify_text_set( + self, + ctx: commands.Context, + autoroom_source: discord.VoiceChannel, + *, + hint_text: str, + ) -> None: + """Send a message to the newly generated AutoRoom text channel. + + This can have template variables and statements, which you can learn more + about by looking at `[p]autoroomset modify name custom`, or by looking at + [the readme](https://github.com/PhasecoreX/PCXCogs/tree/master/autoroom/README.md). + + The only additional variable that may be useful here is the `mention` variable, + which will insert the users mention (pinging them). + + - Example: + `Hello {{mention}}! Welcome to your new AutoRoom!` + """ + if not ctx.guild: + return + if await self.get_autoroom_source_config(autoroom_source): + data = self.get_template_data(ctx.author) + try: + # Validate template + hint_text_formatted = await self.template.render(hint_text, data) + except TemplateError as rte: + await ctx.send( + error( + "Hmm... that doesn't seem to be a valid template:" + "\n\n" + f"`{rte!s}`" + "\n\n" + "If you need some help, take a look at " + "[the readme](https://github.com/PhasecoreX/PCXCogs/tree/master/autoroom/README.md)." + ) + ) + return + + await self.config.custom( + "AUTOROOM_SOURCE", str(ctx.guild.id), str(autoroom_source.id) + ).text_channel_hint.set(hint_text) + + await ctx.send( + success( + f"New AutoRooms created by **{autoroom_source.mention}** will have the following message sent in them:" + "\n\n" + f"{hint_text_formatted[:1900]}" + ) + ) + else: + await ctx.send( + error( + f"**{autoroom_source.mention}** is not an AutoRoom Source channel." + ) + ) + + @modify_text.command(name="disable") + async def modify_text_disable( + self, + ctx: commands.Context, + autoroom_source: discord.VoiceChannel, + ) -> None: + """Disable sending a message to the newly generated AutoRoom text channel.""" + if not ctx.guild: + return + if await self.get_autoroom_source_config(autoroom_source): + await self.config.custom( + "AUTOROOM_SOURCE", str(ctx.guild.id), str(autoroom_source.id) + ).text_channel_hint.clear() + await ctx.send( + success( + f"New AutoRooms created by **{autoroom_source.mention}** will no longer have a message sent in them." + ) + ) + else: + await ctx.send( + error( + f"**{autoroom_source.mention}** is not an AutoRoom Source channel." + ) + ) + + @modify.group(name="specialperms") + async def special_perms(self, ctx: commands.Context) -> None: + """Modify special AutoRoom permissions. + + Remember, most permissions are automatically copied + from the AuthRoom Source over to the AutoRoom. + These are for configuring special cases. + """ + + @special_perms.command(name="ownermodify") + async def owner_manage_channels( + self, ctx: commands.Context, autoroom_source: discord.VoiceChannel + ) -> None: + """Allow AutoRoom Owners to have the Manage Channels permission on their AutoRoom.""" + if not ctx.guild: + return + if await self.get_autoroom_source_config(autoroom_source): + new_config_value = not await self.config.custom( + "AUTOROOM_SOURCE", str(ctx.guild.id), str(autoroom_source.id) + ).perm_owner_manage_channels() + await self.config.custom( + "AUTOROOM_SOURCE", str(ctx.guild.id), str(autoroom_source.id) + ).perm_owner_manage_channels.set(new_config_value) + await ctx.send( + success( + f"AutoRoom Owners are {'now' if new_config_value else 'no longer'} able to modify their AutoRoom with native Discord controls." + ) + ) + else: + await ctx.send( + error( + f"**{autoroom_source.mention}** is not an AutoRoom Source channel." + ) + ) + + @special_perms.command(name="sendmessage") + async def public_send_messages( + self, ctx: commands.Context, autoroom_source: discord.VoiceChannel + ) -> None: + """Allow users to send messages in the AutoRoom built in text channel.""" + if not ctx.guild: + return + if await self.get_autoroom_source_config(autoroom_source): + new_config_value = not await self.config.custom( + "AUTOROOM_SOURCE", str(ctx.guild.id), str(autoroom_source.id) + ).perm_send_messages() + await self.config.custom( + "AUTOROOM_SOURCE", str(ctx.guild.id), str(autoroom_source.id) + ).perm_send_messages.set(new_config_value) + await ctx.send( + success( + f"Users are {'now' if new_config_value else 'no longer'} able to send messages in the AutoRoom built in text channel." + ) + ) + else: + await ctx.send( + error( + f"**{autoroom_source.mention}** is not an AutoRoom Source channel." + ) + ) + + @modify.group(name="legacytextchannel") + async def modify_legacytext( + self, + ctx: commands.Context, + ) -> None: + """Manage if a legacy text channel should be created as well.""" + + @modify_legacytext.command(name="enable") + async def modify_legacytext_enable( + self, + ctx: commands.Context, + autoroom_source: discord.VoiceChannel, + ) -> None: + """Enable creating a legacy text channel with the AutoRoom.""" + if not ctx.guild: + return + if await self.get_autoroom_source_config(autoroom_source): + await self.config.custom( + "AUTOROOM_SOURCE", str(ctx.guild.id), str(autoroom_source.id) + ).legacy_text_channel.set(value=True) + await ctx.send( + success( + f"New AutoRooms created by **{autoroom_source.mention}** will now get their own legacy text channel." + ) + ) + else: + await ctx.send( + error( + f"**{autoroom_source.mention}** is not an AutoRoom Source channel." + ) + ) + + @modify_legacytext.command(name="disable") + async def modify_legacytext_disable( + self, + ctx: commands.Context, + autoroom_source: discord.VoiceChannel, + ) -> None: + """Disable creating a legacy text channel with the AutoRoom.""" + if not ctx.guild: + return + if await self.get_autoroom_source_config(autoroom_source): + await self.config.custom( + "AUTOROOM_SOURCE", str(ctx.guild.id), str(autoroom_source.id) + ).legacy_text_channel.clear() + await ctx.send( + success( + f"New AutoRooms created by **{autoroom_source.mention}** will no longer get their own legacy text channel." + ) + ) + else: + await ctx.send( + error( + f"**{autoroom_source.mention}** is not an AutoRoom Source channel." + ) + ) + + @modify_legacytext.group(name="topic") + async def modify_legacytext_topic( + self, + ctx: commands.Context, + ) -> None: + """Manage the legacy text channel topic.""" + + @modify_legacytext_topic.command(name="set") + async def modify_legacytext_topic_set( + self, + ctx: commands.Context, + autoroom_source: discord.VoiceChannel, + *, + topic_text: str, + ) -> None: + """Set the legacy text channel topic. + + - Example: + `This channel is only visible to members of your voice channel, and admins of this server. It will be deleted when everyone leaves. ` + """ + if not ctx.guild: + return + if await self.get_autoroom_source_config(autoroom_source): + data = self.get_template_data(ctx.author) + try: + # Validate template + topic_text_formatted = await self.template.render(topic_text, data) + except TemplateError as rte: + await ctx.send( + error( + "Hmm... that doesn't seem to be a valid template:" + "\n\n" + f"`{rte!s}`" + "\n\n" + "If you need some help, take a look at " + "[the readme](https://github.com/PhasecoreX/PCXCogs/tree/master/autoroom/README.md)." + ) + ) + return + + await self.config.custom( + "AUTOROOM_SOURCE", str(ctx.guild.id), str(autoroom_source.id) + ).text_channel_topic.set(topic_text) + + await ctx.send( + success( + f"New AutoRooms created by **{autoroom_source.mention}** will have the following message topic set:" + "\n\n" + f"{topic_text_formatted}" + ) + ) + else: + await ctx.send( + error( + f"**{autoroom_source.mention}** is not an AutoRoom Source channel." + ) + ) + + @modify_legacytext_topic.command(name="disable") + async def modify_legacytext_topic_disable( + self, + ctx: commands.Context, + autoroom_source: discord.VoiceChannel, + ) -> None: + """Disable setting a legacy text channel topic.""" + if not ctx.guild: + return + if await self.get_autoroom_source_config(autoroom_source): + await self.config.custom( + "AUTOROOM_SOURCE", str(ctx.guild.id), str(autoroom_source.id) + ).text_channel_topic.clear() + await ctx.send( + success( + f"New AutoRooms created by **{autoroom_source.mention}** will no longer have a topic set." + ) + ) + else: + await ctx.send( + error( + f"**{autoroom_source.mention}** is not an AutoRoom Source channel." + ) + ) + + @modify.command( + name="defaults", aliases=["bitrate", "memberrole", "other", "perms", "users"] + ) + async def modify_defaults(self, ctx: commands.Context) -> None: + """Learn how AutoRoom defaults are set.""" + await ctx.send( + info( + "**Bitrate/User Limit**" + "\n" + "Default bitrate and user limit settings are copied from the AutoRoom Source to the resulting AutoRoom." + "\n\n" + "**Member Roles**" + "\n" + "Only members that can view and join an AutoRoom Source will be able to join its resulting AutoRooms. " + "If you would like to limit AutoRooms to only allow certain members, simply deny the everyone role " + "from viewing/connecting to the AutoRoom Source and allow your member roles to view/connect to it." + "\n\n" + "**Permissions**" + "\n" + "All permission overwrites (except for Manage Roles) will be copied from the AutoRoom Source " + "to the resulting AutoRoom. Every permission overwrite you want copied over, regardless if it is " + "allowed or denied, must be allowed for the bot. It can either be allowed for the bot in the " + "destination category or server-wide with the roles it has. `[p]autoroomset permissions` will " + "show what permissions will be copied over." + ) + ) + + async def _check_all_perms( + self, guild: discord.Guild, *, detailed: bool = False + ) -> tuple[bool, bool, list[str]]: + """Check all permissions for all AutoRooms in a guild.""" + result_required = True + result_optional = True + result_list = [] + avcs = await self.get_all_autoroom_source_configs(guild) + for avc_id, avc_settings in avcs.items(): + autoroom_source = guild.get_channel(avc_id) + category_dest = guild.get_channel(avc_settings["dest_category_id"]) + if isinstance(autoroom_source, discord.VoiceChannel) and isinstance( + category_dest, discord.CategoryChannel + ): + ( + required_check, + optional_check, + detail, + ) = self.check_perms_source_dest( + autoroom_source, + category_dest, + with_manage_roles_guild=avc_settings["room_type"] != "server", + with_legacy_text_channel=avc_settings["legacy_text_channel"], + with_optional_clone_perms=True, + detailed=detailed, + ) + result_required = result_required and required_check + result_optional = result_optional and optional_check + if detailed: + result_list.append(detail) + elif not result_required and not result_optional: + break + return result_required, result_optional, result_list diff --git a/autoroom/info.json b/autoroom/info.json new file mode 100644 index 0000000..975ac0a --- /dev/null +++ b/autoroom/info.json @@ -0,0 +1,29 @@ +{ + "name": "AutoRoom", + "author": [ + "PhasecoreX (PhasecoreX#0635)" + ], + "short": "Automatic voice channel management.", + "description": "This cog facilitates automatic voice channel creation. When a member joins an AutoRoom Source (voice channel), this cog will move them to a brand new AutoRoom that they have control over. Once everyone leaves the AutoRoom, it is automatically deleted.", + "install_msg": "Thanks for installing AutoRoom! For a quick rundown on how to get started with this cog, check out [the readme](https://github.com/PhasecoreX/PCXCogs/tree/master/autoroom/README.md)", + "requirements": [ + "func-timeout", + "jinja2" + ], + "tags": [ + "audio", + "auto", + "automated", + "automatic", + "channel", + "room", + "voice" + ], + "min_bot_version": "3.5.0", + "min_python_version": [ + 3, + 11, + 0 + ], + "end_user_data_statement": "This cog does not persistently store data or metadata about users." +} diff --git a/autoroom/pcx_lib.py b/autoroom/pcx_lib.py new file mode 100644 index 0000000..262a506 --- /dev/null +++ b/autoroom/pcx_lib.py @@ -0,0 +1,252 @@ +"""Shared code across multiple cogs.""" + +import asyncio +from collections.abc import Mapping +from contextlib import suppress +from typing import Any + +import discord +from redbot.core import __version__ as redbot_version +from redbot.core import commands +from redbot.core.utils import common_filters +from redbot.core.utils.chat_formatting import box + +headers = {"user-agent": "Red-DiscordBot/" + redbot_version} + +MAX_EMBED_SIZE = 5900 +MAX_EMBED_FIELDS = 20 +MAX_EMBED_FIELD_SIZE = 1024 + + +async def delete(message: discord.Message, *, delay: float | None = None) -> bool: + """Attempt to delete a message. + + Returns True if successful, False otherwise. + """ + try: + await message.delete(delay=delay) + except discord.NotFound: + return True # Already deleted + except discord.HTTPException: + return False + return True + + +async def reply( + ctx: commands.Context, content: str | None = None, **kwargs: Any # noqa: ANN401 +) -> None: + """Safely reply to a command message. + + If the command is in a guild, will reply, otherwise will send a message like normal. + Pre discord.py 1.6, replies are just messages sent with the users mention prepended. + """ + if ctx.guild: + if ( + hasattr(ctx, "reply") + and ctx.channel.permissions_for(ctx.guild.me).read_message_history + ): + mention_author = kwargs.pop("mention_author", False) + kwargs.update(mention_author=mention_author) + with suppress(discord.HTTPException): + await ctx.reply(content=content, **kwargs) + return + allowed_mentions = kwargs.pop( + "allowed_mentions", + discord.AllowedMentions(users=False), + ) + kwargs.update(allowed_mentions=allowed_mentions) + await ctx.send(content=f"{ctx.message.author.mention} {content}", **kwargs) + else: + await ctx.send(content=content, **kwargs) + + +async def type_message( + destination: discord.abc.Messageable, content: str, **kwargs: Any # noqa: ANN401 +) -> discord.Message | None: + """Simulate typing and sending a message to a destination. + + Will send a typing indicator, wait a variable amount of time based on the length + of the text (to simulate typing speed), then send the message. + """ + content = common_filters.filter_urls(content) + with suppress(discord.HTTPException): + async with destination.typing(): + await asyncio.sleep(max(0.25, min(2.5, len(content) * 0.01))) + return await destination.send(content=content, **kwargs) + + +async def embed_splitter( + embed: discord.Embed, destination: discord.abc.Messageable | None = None +) -> list[discord.Embed]: + """Take an embed and split it so that each embed has at most 20 fields and a length of 5900. + + Each field value will also be checked to have a length no greater than 1024. + + If supplied with a destination, will also send those embeds to the destination. + """ + embed_dict = embed.to_dict() + + # Check and fix field value lengths + modified = False + if "fields" in embed_dict: + for field in embed_dict["fields"]: + if len(field["value"]) > MAX_EMBED_FIELD_SIZE: + field["value"] = field["value"][: MAX_EMBED_FIELD_SIZE - 3] + "..." + modified = True + if modified: + embed = discord.Embed.from_dict(embed_dict) + + # Short circuit + if len(embed) <= MAX_EMBED_SIZE and ( + "fields" not in embed_dict or len(embed_dict["fields"]) <= MAX_EMBED_FIELDS + ): + if destination: + await destination.send(embed=embed) + return [embed] + + # Nah, we're really doing this + split_embeds: list[discord.Embed] = [] + fields = embed_dict.get("fields", []) + embed_dict["fields"] = [] + + for field in fields: + embed_dict["fields"].append(field) + current_embed = discord.Embed.from_dict(embed_dict) + if ( + len(current_embed) > MAX_EMBED_SIZE + or len(embed_dict["fields"]) > MAX_EMBED_FIELDS + ): + embed_dict["fields"].pop() + current_embed = discord.Embed.from_dict(embed_dict) + split_embeds.append(current_embed.copy()) + embed_dict["fields"] = [field] + + current_embed = discord.Embed.from_dict(embed_dict) + split_embeds.append(current_embed.copy()) + + if destination: + for split_embed in split_embeds: + await destination.send(embed=split_embed) + return split_embeds + + +class SettingDisplay: + """A formatted list of settings.""" + + def __init__(self, header: str | None = None) -> None: + """Init.""" + self.header = header + self._length = 0 + self._settings: list[tuple] = [] + + def add(self, setting: str, value: Any) -> None: # noqa: ANN401 + """Add a setting.""" + setting_colon = setting + ":" + self._settings.append((setting_colon, value)) + self._length = max(len(setting_colon), self._length) + + def raw(self) -> str: + """Generate the raw text of this SettingDisplay, to be monospace (ini) formatted later.""" + msg = "" + if not self._settings: + return msg + if self.header: + msg += f"--- {self.header} ---\n" + for setting in self._settings: + msg += f"{setting[0].ljust(self._length, ' ')} [{setting[1]}]\n" + return msg.strip() + + def display(self, *additional) -> str: # noqa: ANN002 (Self) + """Generate a ready-to-send formatted box of settings. + + If additional SettingDisplays are provided, merges their output into one. + """ + msg = self.raw() + for section in additional: + msg += "\n\n" + section.raw() + return box(msg, lang="ini") + + def __str__(self) -> str: + """Generate a ready-to-send formatted box of settings.""" + return self.display() + + def __len__(self) -> int: + """Count of how many settings there are to display.""" + return len(self._settings) + + +class Perms: + """Helper class for dealing with a dictionary of discord.PermissionOverwrite.""" + + def __init__( + self, + overwrites: ( + dict[ + discord.Role | discord.Member | discord.Object, + discord.PermissionOverwrite, + ] + | None + ) = None, + ) -> None: + """Init.""" + self.__overwrites: dict[ + discord.Role | discord.Member, + discord.PermissionOverwrite, + ] = {} + self.__original: dict[ + discord.Role | discord.Member, + discord.PermissionOverwrite, + ] = {} + if overwrites: + for key, value in overwrites.items(): + if isinstance(key, discord.Role | discord.Member): + pair = value.pair() + self.__overwrites[key] = discord.PermissionOverwrite().from_pair( + *pair + ) + self.__original[key] = discord.PermissionOverwrite().from_pair( + *pair + ) + + def overwrite( + self, + target: discord.Role | discord.Member | discord.Object, + permission_overwrite: Mapping[str, bool | None] | discord.PermissionOverwrite, + ) -> None: + """Set the permissions for a target.""" + if not isinstance(target, discord.Role | discord.Member): + return + if isinstance(permission_overwrite, discord.PermissionOverwrite): + if permission_overwrite.is_empty(): + self.__overwrites[target] = discord.PermissionOverwrite() + return + self.__overwrites[target] = discord.PermissionOverwrite().from_pair( + *permission_overwrite.pair() + ) + else: + self.__overwrites[target] = discord.PermissionOverwrite() + self.update(target, permission_overwrite) + + def update( + self, + target: discord.Role | discord.Member, + perm: Mapping[str, bool | None], + ) -> None: + """Update the permissions for a target.""" + if target not in self.__overwrites: + self.__overwrites[target] = discord.PermissionOverwrite() + self.__overwrites[target].update(**perm) + if self.__overwrites[target].is_empty(): + del self.__overwrites[target] + + @property + def modified(self) -> bool: + """Check if current overwrites are different from when this object was first initialized.""" + return self.__overwrites != self.__original + + @property + def overwrites( + self, + ) -> dict[discord.Role | discord.Member, discord.PermissionOverwrite] | None: + """Get current overwrites.""" + return self.__overwrites diff --git a/autoroom/pcx_template.py b/autoroom/pcx_template.py new file mode 100644 index 0000000..9653441 --- /dev/null +++ b/autoroom/pcx_template.py @@ -0,0 +1,75 @@ +"""Module for template engine using Jinja2, safe for untrusted user templates.""" + +import random +from typing import Any + +from func_timeout import FunctionTimedOut, func_timeout +from jinja2 import Undefined, pass_context +from jinja2.exceptions import TemplateError +from jinja2.runtime import Context +from jinja2.sandbox import ImmutableSandboxedEnvironment + +TIMEOUT = 0.25 # Maximum runtime for template rendering in seconds / should be very low to avoid DoS attacks + + +class TemplateTimeoutError(TemplateError): + """Custom exception raised when template rendering exceeds maximum runtime.""" + + +class SilentUndefined(Undefined): + """Class that converts Undefined type to None.""" + + def _fail_with_undefined_error(self, *_args: Any, **_kwargs: Any) -> None: # type: ignore[incorrect-return-type] # noqa: ANN401 + return None + + +class Template: + """A template engine using Jinja2, safe for untrusted user templates with an immutable sandbox.""" + + def __init__(self) -> None: + """Set up the Jinja2 environment with an immutable sandbox.""" + self.env = ImmutableSandboxedEnvironment( + finalize=self.finalize, + undefined=SilentUndefined, + keep_trailing_newline=True, + trim_blocks=True, + lstrip_blocks=True, + ) + + # Override Jinja's built-in random filter with a deterministic version + self.env.filters["random"] = self.deterministic_random + + @pass_context + def deterministic_random(self, ctx: Context, seq: list) -> Any: # noqa: ANN401 + """Generate a deterministic random choice from a sequence based on the context's random_seed.""" + seed = ctx.get( + "random_seed", random.getrandbits(32) + ) # Use seed from context or default + random.seed(seed) + return random.choice(seq) # Return a deterministic random choice # noqa: S311 + + def finalize(self, element: Any) -> Any: # noqa: ANN401 + """Callable that converts None elements to an empty string.""" + return element if element is not None else "" + + def _render_template(self, template_str: str, data: dict[str, Any]) -> str: + """Render the template to a string.""" + return self.env.from_string(template_str).render(data) + + async def render( + self, + template_str: str, + data: dict[str, Any] | None = None, + timeout: float = TIMEOUT, # noqa: ASYNC109 + ) -> str: + """Render a template with the given data, enforcing a maximum runtime.""" + if data is None: + data = {} + try: + result: str = func_timeout( + timeout, self._render_template, args=(template_str, data) + ) # type: ignore[unknown-return-type] + except FunctionTimedOut as err: + msg = f"Template rendering exceeded {timeout} seconds." + raise TemplateTimeoutError(msg) from err + return result diff --git a/autoroom/pcx_template_test.py b/autoroom/pcx_template_test.py new file mode 100644 index 0000000..49f3b5e --- /dev/null +++ b/autoroom/pcx_template_test.py @@ -0,0 +1,930 @@ +"""Tests for template engine using Jinja2.""" + +import pytest +from pcx_template import ( + Template, + TemplateTimeoutError, +) + + +@pytest.mark.asyncio +async def test_simple_template(): + tpl = Template() + template_str = "Hello, {{ name }}!" + data = {"name": "World"} + result = await tpl.render(template_str, data) + assert result == "Hello, World!" + + +@pytest.mark.asyncio +async def test_template_with_no_data(): + tpl = Template() + template_str = "Hello, World!" + result = await tpl.render(template_str) + assert result == "Hello, World!" + + +@pytest.mark.asyncio +async def test_template_with_missing_variable(): + tpl = Template() + template_str = "Hello, {{ name }}!" + result = await tpl.render(template_str) + assert result == "Hello, !" + + +@pytest.mark.asyncio +async def test_template_timeout(): + tpl = Template() + template_str = """{% for i in range(100000) %}{% for j in range(100000) %}{% for k in range(100000) %}{{ i*i }}{% endfor %}{% endfor %}{% endfor %}""" + data = {} + with pytest.raises(TemplateTimeoutError) as excinfo: + await tpl.render(template_str, data) + assert "Template rendering exceeded" in str(excinfo.value) + + +@pytest.mark.asyncio +async def test_template_no_timeout(): + tpl = Template() + template_str = """{% for i in range(100) %}{{ i }}\n{% endfor %}""" + data = {} + result = await tpl.render(template_str, data, timeout=2) + expected_result = "\n".join(str(i) for i in range(100)) + "\n" + assert result == expected_result + + +@pytest.mark.asyncio +async def test_template_with_custom_timeout(): + tpl = Template() + template_str = """ + {% for i in range(100000) %} + {% for j in range(100000) %} + {{ i }} + {% endfor %} + {% endfor %} + """ + data = {} + with pytest.raises(TemplateTimeoutError) as excinfo: + await tpl.render(template_str, data, timeout=0.1) + + assert "Template rendering exceeded 0.1 seconds" in str(excinfo.value) + + +@pytest.mark.asyncio +async def test_template_with_complex_data(): + tpl = Template() + template_str = ( + "User: {{ user.name }}, Age: {{ user.age }}, Active: {{ user.active }}" + ) + data = {"user": {"name": "John", "age": 30, "active": True}} + result = await tpl.render(template_str, data) + assert result == "User: John, Age: 30, Active: True" + + +@pytest.mark.asyncio +async def test_template_with_escape_sequences(): + tpl = Template() + template_str = "Hello, {{ user.name }}!\nYour score: {{ user.score }}\n\nThank you!" + data = {"user": {"name": "Alice", "score": 42}} + result = await tpl.render(template_str, data) + expected_result = "Hello, Alice!\nYour score: 42\n\nThank you!" + assert result == expected_result + + +@pytest.mark.asyncio +async def test_deterministic_random() -> None: + tpl = Template() + template_str = "Random choice: {{ ['a', 'b', 'c', 'd', 'e', 'f', 'g']|random }}" + data = {"random_seed": "test_seed"} + result1 = await tpl.render(template_str, data) + result2 = await tpl.render(template_str, data) + assert result1 == result2 # Should be the same due to deterministic randomness + + +@pytest.mark.asyncio +async def test_deterministic_random_with_different_seeds(): + tpl = Template() + template_str = "Random choice: {{ ['a', 'b', 'c', 'd', 'e', 'f', 'g']|random }}" + data1 = {"random_seed": "seed1"} + data2 = {"random_seed": "seed2"} + result1 = await tpl.render(template_str, data1) + result2 = await tpl.render(template_str, data2) + assert result1 != result2 # Should be different due to different seeds + + +@pytest.mark.asyncio +async def test_template_with_large_data(): + tpl = Template() + template_str = "Sum: {{ data|sum }}" + data = {"data": list(range(10000))} + result = await tpl.render(template_str, data) + assert result == f"Sum: {sum(range(10000))}" + + +@pytest.mark.asyncio +async def test_template_with_nested_data(): + tpl = Template() + template_str = "User: {{ user.name }}, Address: {{ user.address.city }}, {{ user.address.zip }}" + data = {"user": {"name": "John", "address": {"city": "New York", "zip": "10001"}}} + result = await tpl.render(template_str, data) + assert result == "User: John, Address: New York, 10001" + + +# Old template system tests: Interpolation + + +@pytest.mark.asyncio +async def test_no_interpolation(): + tpl = Template() + template_str = "PhasecoreX says {hello}!" + data = {} + result = await tpl.render(template_str, data) + expected = template_str + assert expected == result + + +@pytest.mark.asyncio +async def test_basic_interpolation(): + tpl = Template() + template_str = "Hello, {{subject}}!" + data = {"subject": "world"} + result = await tpl.render(template_str, data) + expected = "Hello, world!" + assert expected == result + + +@pytest.mark.asyncio +async def test_basic_integer_interpolation(): + tpl = Template() + template_str = '"{{mph}} miles an hour!"' + data = {"mph": 88} + result = await tpl.render(template_str, data) + expected = '"88 miles an hour!"' + assert expected == result + + +@pytest.mark.asyncio +async def test_basic_float_interpolation(): + tpl = Template() + template_str = '"{{power}} jiggawatts!"' + data = {"power": 1.210} + result = await tpl.render(template_str, data) + expected = '"1.21 jiggawatts!"' + assert expected == result + + +@pytest.mark.asyncio +async def test_basic_context_miss_interpolation(): + tpl = Template() + template_str = "I ({{cannot}}) be seen!" + data = {} + result = await tpl.render(template_str, data) + expected = "I () be seen!" + assert expected == result + + +# Dotted Names + + +@pytest.mark.asyncio +async def test_dotted_names_arbitrary_depth(): + tpl = Template() + template_str = '"{{a.b.c.d.e.name}}" == "Phil"' + data = {"a": {"b": {"c": {"d": {"e": {"name": "Phil"}}}}}} + result = await tpl.render(template_str, data) + expected = '"Phil" == "Phil"' + assert expected == result + + +@pytest.mark.asyncio +async def test_dotted_names_broken_chains(): + tpl = Template() + template_str = '"{{a.b.c}}" == ""' + data = {"a": {}} + result = await tpl.render(template_str, data) + expected = '"" == ""' + assert expected == result + + +@pytest.mark.asyncio +async def test_dotted_names_broken_chain_resolution(): + tpl = Template() + template_str = '"{{a.b.c.name}}" == ""' + data = {"a": {"b": {}}, "c": {"name": "Jim"}} + result = await tpl.render(template_str, data) + expected = '"" == ""' + assert expected == result + + +# Whitespace Sensitivity + + +@pytest.mark.asyncio +async def test_interpolation_surrounding_whitespace(): + tpl = Template() + template_str = "| {{string}} |" + data = {"string": "---"} + result = await tpl.render(template_str, data) + expected = "| --- |" + assert expected == result + + +@pytest.mark.asyncio +async def test_interpolation_standalone(): + tpl = Template() + template_str = " {{string}}\n" + data = {"string": "---"} + result = await tpl.render(template_str, data) + expected = " ---\n" + assert expected == result + + +# Whitespace Insensitivity + + +@pytest.mark.asyncio +async def test_interpolation_with_padding(): + tpl = Template() + template_str = "|{{ string }}|" + data = {"string": "---"} + result = await tpl.render(template_str, data) + expected = "|---|" + assert expected == result + + +# Old template system tests: IfStatement + + +@pytest.mark.asyncio +async def test_if_truthy(): + tpl = Template() + template_str = '"{% if boolean %}This should be rendered.{% endif %}"' + data = {"boolean": True} + result = await tpl.render(template_str, data) + expected = '"This should be rendered."' + assert expected == result + + +@pytest.mark.asyncio +async def test_if_falsey(): + tpl = Template() + template_str = '"{% if boolean %}This should not be rendered.{% endif %}"' + data = {"boolean": False} + result = await tpl.render(template_str, data) + expected = '""' + assert expected == result + + +@pytest.mark.asyncio +async def test_empty_lists(): + tpl = Template() + template_str = '"{% if list %}Yay lists!{% endif %}"' + data = {"list": []} + result = await tpl.render(template_str, data) + expected = '""' + assert expected == result + + +@pytest.mark.asyncio +async def test_if_doubled(): + tpl = Template() + template_str = """{% if bool %} +* first +{% endif %} +* {{two}} +{% if bool %} +* third +{% endif %} +""" + data = {"bool": True, "two": "second"} + result = await tpl.render(template_str, data) + expected = """* first +* second +* third +""" + assert expected == result + + +@pytest.mark.asyncio +async def test_if_nested_truthy(): + tpl = Template() + template_str = "| A {% if bool %}B {% if bool %}C{% endif %} D{% endif %} E |" + data = {"bool": True} + result = await tpl.render(template_str, data) + expected = "| A B C D E |" + assert expected == result + + +@pytest.mark.asyncio +async def test_if_nested_falsey(): + tpl = Template() + template_str = "| A {% if bool %}B {% if bool %}C{% endif %} D{% endif %} E |" + data = {"bool": False} + result = await tpl.render(template_str, data) + expected = "| A E |" + assert expected == result + + +@pytest.mark.asyncio +async def test_if_context_misses(): + tpl = Template() + template_str = "[{% if missing %}Found key 'missing'!{% endif %}]" + data = {} + result = await tpl.render(template_str, data) + expected = "[]" + assert expected == result + + +# Dotted Names + + +@pytest.mark.asyncio +async def test_if_dotted_names_truthy(): + tpl = Template() + template_str = '"{% if a.b.c %}Here{% endif %}" == "Here"' + data = {"a": {"b": {"c": True}}} + result = await tpl.render(template_str, data) + expected = '"Here" == "Here"' + assert expected == result + + +@pytest.mark.asyncio +async def test_if_dotted_names_falsey(): + tpl = Template() + template_str = '"{% if a.b.c %}Here{% endif %}" == ""' + data = {"a": {"b": {"c": False}}} + result = await tpl.render(template_str, data) + expected = '"" == ""' + assert expected == result + + +@pytest.mark.asyncio +async def test_if_dotted_names_broken_chains(): + tpl = Template() + template_str = '"{% if a.b.c %}Here{% endif %}" == ""' + data = {"a": {}} + result = await tpl.render(template_str, data) + expected = '"" == ""' + assert expected == result + + +# Whitespace Sensitivity + + +@pytest.mark.asyncio +async def test_if_surrounding_whitespace(): + tpl = Template() + template_str = " | {% if boolean %}\t|\t{% endif %} | \n" + data = {"boolean": True} + result = await tpl.render(template_str, data) + expected = " | \t|\t | \n" + assert expected == result + + +@pytest.mark.asyncio +async def test_if_internal_whitespace(): + tpl = Template() + template_str = " | {% if boolean %} {# Important Whitespace #}\n {% endif %} | \n" + data = {"boolean": True} + result = await tpl.render(template_str, data) + expected = " | \n | \n" + assert expected == result + + +@pytest.mark.asyncio +async def test_if_indented_inline_sections(): + tpl = Template() + template_str = " {% if boolean %}YES{% endif %}\n {% if boolean %}GOOD{% endif %}\n" + data = {"boolean": True} + result = await tpl.render(template_str, data) + expected = " YES\n GOOD\n" + assert expected == result + + +@pytest.mark.asyncio +async def test_if_standalone_lines(): + tpl = Template() + template_str = """| This Is +{% if boolean %} +| +{% endif %} +| A Line +""" + data = {"boolean": True} + result = await tpl.render(template_str, data) + expected = """| This Is +| +| A Line +""" + assert expected == result + + +@pytest.mark.asyncio +async def test_if_indented_standalone_lines(): + tpl = Template() + template_str = """| This Is +{% if boolean %} +| +{% endif %} +| A Line +""" + data = {"boolean": True} + result = await tpl.render(template_str, data) + expected = """| This Is +| +| A Line +""" + assert expected == result + + +@pytest.mark.asyncio +async def test_if_standalone_line_endings(): + tpl = Template() + template_str = "|\r\n{% if boolean %}\r\n{% endif %}\r\n|" + data = {"boolean": True} + result = await tpl.render(template_str, data) + expected = "|\r\n|" + assert expected == result + + +@pytest.mark.asyncio +async def test_if_standalone_without_previous_line(): + tpl = Template() + template_str = " {% if boolean %}\n#{% endif %}\n/" + data = {"boolean": True} + result = await tpl.render(template_str, data) + expected = "#\n/" + assert expected == result + + +@pytest.mark.asyncio +async def test_if_standalone_without_newline(): + tpl = Template() + template_str = "#{% if boolean %}\n/\n {% endif %}" + data = {"boolean": True} + result = await tpl.render(template_str, data) + expected = "#\n/\n" + assert expected == result + + +# Whitespace Insensitivity + + +@pytest.mark.asyncio +async def test_if_padding(): + tpl = Template() + template_str = "|{% if boolean%}={% endif %}|" + data = {"boolean": True} + result = await tpl.render(template_str, data) + expected = "|=|" + assert expected == result + + +# Old template system tests: IfNotStatement + + +@pytest.mark.asyncio +async def test_falsey(): + tpl = Template() + template_str = '"{% if not boolean %}This should be rendered.{% endif %}"' + data = {"boolean": False} + result = await tpl.render(template_str, data) + expected = '"This should be rendered."' + assert expected == result + + +@pytest.mark.asyncio +async def test_truthy(): + tpl = Template() + template_str = '"{% if not boolean %}This should not be rendered.{% endif %}"' + data = {"boolean": True} + result = await tpl.render(template_str, data) + expected = '""' + assert expected == result + + +@pytest.mark.asyncio +async def test_empty_list(): + tpl = Template() + template_str = '"{% if not list %}Yay lists!{% endif %}"' + data = {"list": []} + result = await tpl.render(template_str, data) + expected = '"Yay lists!"' + assert expected == result + + +@pytest.mark.asyncio +async def test_doubled(): + tpl = Template() + template_str = """{% if not bool %} +* first +{% endif %} +* {{two}} +{% if not bool %} +* third +{% endif %} +""" + data = {"bool": False, "two": "second"} + result = await tpl.render(template_str, data) + expected = """* first +* second +* third +""" + assert expected == result + + +@pytest.mark.asyncio +async def test_nested_falsey(): + tpl = Template() + template_str = ( + "| A {% if not bool %}B {% if not bool %}C{% endif %} D{% endif %} E |" + ) + data = {"bool": False} + result = await tpl.render(template_str, data) + expected = "| A B C D E |" + assert expected == result + + +@pytest.mark.asyncio +async def test_nested_truthy(): + tpl = Template() + template_str = ( + "| A {% if not bool %}B {% if not bool %}C{% endif %} D{% endif %} E |" + ) + data = {"bool": True} + result = await tpl.render(template_str, data) + expected = "| A E |" + assert expected == result + + +@pytest.mark.asyncio +async def test_context_misses(): + tpl = Template() + template_str = "[{% if not missing %}Cannot find key 'missing'!{% endif %}]" + data = {} + result = await tpl.render(template_str, data) + expected = "[Cannot find key 'missing'!]" + assert expected == result + + +# Dotted Names + + +@pytest.mark.asyncio +async def test_dotted_names_truthy(): + tpl = Template() + template_str = '"{% if not a.b.c %}Here{% endif %}" == ""' + data = {"a": {"b": {"c": True}}} + result = await tpl.render(template_str, data) + expected = '"" == ""' + assert expected == result + + +@pytest.mark.asyncio +async def test_dotted_names_falsey(): + tpl = Template() + template_str = '"{% if not a.b.c %}Not Here{% endif %}" == "Not Here"' + data = {"a": {"b": {"c": False}}} + result = await tpl.render(template_str, data) + expected = '"Not Here" == "Not Here"' + assert expected == result + + +@pytest.mark.asyncio +async def test_ifnot_dotted_names_broken_chains(): + tpl = Template() + template_str = '"{% if not a.b.c %}Not Here{% endif %}" == "Not Here"' + data = {"a": {}} + result = await tpl.render(template_str, data) + expected = '"Not Here" == "Not Here"' + assert expected == result + + +# Whitespace Sensitivity + + +@pytest.mark.asyncio +async def test_ifnot_surrounding_whitespace(): + tpl = Template() + template_str = " | {% if not boolean %}\t|\t{% endif %} | \n" + data = {"boolean": False} + result = await tpl.render(template_str, data) + expected = " | \t|\t | \n" + assert expected == result + + +@pytest.mark.asyncio +async def test_internal_whitespace(): + tpl = Template() + template_str = ( + " | {% if not boolean %} {# Important Whitespace #}\n {% endif %} | \n" + ) + data = {"boolean": False} + result = await tpl.render(template_str, data) + expected = " | \n | \n" + assert expected == result + + +@pytest.mark.asyncio +async def test_indented_inline_sections(): + tpl = Template() + template_str = ( + " {% if not boolean %}YES{% endif %}\n {% if not boolean %}GOOD{% endif %}\n" + ) + data = {"boolean": False} + result = await tpl.render(template_str, data) + expected = " YES\n GOOD\n" + assert expected == result + + +@pytest.mark.asyncio +async def test_standalone_lines(): + tpl = Template() + template_str = """| This Is +{% if not boolean %} +| +{% endif %} +| A Line +""" + data = {"boolean": False} + result = await tpl.render(template_str, data) + expected = """| This Is +| +| A Line +""" + assert expected == result + + +@pytest.mark.asyncio +async def test_indented_standalone_lines(): + tpl = Template() + template_str = """| This Is +{% if not boolean %} +| +{% endif %} +| A Line +""" + data = {"boolean": False} + result = await tpl.render(template_str, data) + expected = """| This Is +| +| A Line +""" + assert expected == result + + +@pytest.mark.asyncio +async def test_ifnot_standalone_line_endings(): + tpl = Template() + template_str = "|\r\n{% if not boolean %}\r\n{% endif %}\r\n|" + data = {"boolean": False} + result = await tpl.render(template_str, data) + expected = "|\r\n|" + assert expected == result + + +@pytest.mark.asyncio +async def test_ifnot_standalone_without_previous_line(): + tpl = Template() + template_str = " {% if not boolean %}\n#{% endif %}\n/" + data = {"boolean": False} + result = await tpl.render(template_str, data) + expected = "#\n/" + assert expected == result + + +@pytest.mark.asyncio +async def test_ifnot_standalone_without_newline(): + tpl = Template() + template_str = "#{% if not boolean %}\n/\n {% endif %}" + data = {"boolean": False} + result = await tpl.render(template_str, data) + expected = "#\n/\n" + assert expected == result + + +# Whitespace Insensitivity + + +@pytest.mark.asyncio +async def test_padding(): + tpl = Template() + template_str = "|{% if not boolean%}={% endif %}|" + data = {"boolean": False} + result = await tpl.render(template_str, data) + expected = "|=|" + assert expected == result + + +# Old template system tests: ElseStatement + + +@pytest.mark.asyncio +async def test_else(): + tpl = Template() + template_str = "{% if 1 > 2 %}Bad...{% else %}Good!{% endif %}" + data = {} + result = await tpl.render(template_str, data) + expected = "Good!" + assert expected == result + + +# Old template system tests: ElifStatement + + +@pytest.mark.asyncio +async def test_elif(): + tpl = Template() + template_str = "{% if 1 > 2 %}Bad if...{% elif 2 == 3 %}Bad elif 1...{% elif 1 == 1 %}Good!{% elif 1 == 2 %}Bad elif 2...{% else %}Bad else...{% endif %}" + data = {} + result = await tpl.render(template_str, data) + expected = "Good!" + assert expected == result + + +# Old template system tests: Comment + + +@pytest.mark.asyncio +async def test_inline(): + tpl = Template() + template_str = "12345{# Comment Block! #}67890" + data = {} + result = await tpl.render(template_str, data) + expected = "1234567890" + assert expected == result + + +@pytest.mark.asyncio +async def test_multiline(): + tpl = Template() + template_str = """12345{# +This is a +multi-line comment... +#}67890 +""" + data = {} + result = await tpl.render(template_str, data) + expected = """1234567890 +""" + assert expected == result + + +@pytest.mark.asyncio +async def test_standalone(): + tpl = Template() + template_str = """Begin. +{# Comment Block! #} +End. +""" + data = {} + result = await tpl.render(template_str, data) + expected = """Begin. +End. +""" + assert expected == result + + +@pytest.mark.asyncio +async def test_indented_standalone(): + tpl = Template() + template_str = """Begin. +{# Indented Comment Block! #} +End. +""" + data = {} + result = await tpl.render(template_str, data) + expected = """Begin. +End. +""" + assert expected == result + + +@pytest.mark.asyncio +async def test_standalone_line_endings(): + tpl = Template() + template_str = "|\r\n{# Standalone Comment #}\r\n|" + data = {} + result = await tpl.render(template_str, data) + expected = "|\r\n|" + assert expected == result + + +@pytest.mark.asyncio +async def test_standalone_without_previous_line(): + tpl = Template() + template_str = " {# I'm Still Standalone #}\n!" + data = {} + result = await tpl.render(template_str, data) + expected = "!" + assert expected == result + + +@pytest.mark.asyncio +async def test_standalone_without_newline(): + tpl = Template() + template_str = "!\n {# I'm Still Standalone #}" + data = {} + result = await tpl.render(template_str, data) + expected = "!\n" + assert expected == result + + +@pytest.mark.asyncio +async def test_multiline_standalone(): + tpl = Template() + template_str = """Begin. +{# +Something's going on here... +#} +End. +""" + data = {} + result = await tpl.render(template_str, data) + expected = """Begin. +End. +""" + assert expected == result + + +@pytest.mark.asyncio +async def test_indented_multiline_standalone(): + tpl = Template() + template_str = """Begin. +{# +Something's going on here... +#} +End. +""" + data = {} + result = await tpl.render(template_str, data) + expected = """Begin. +End. +""" + assert expected == result + + +@pytest.mark.asyncio +async def test_indented_inline(): + tpl = Template() + template_str = " 12 {# 34 #}\n" + data = {} + result = await tpl.render(template_str, data) + expected = " 12 \n" + assert expected == result + + +@pytest.mark.asyncio +async def test_surrounding_whitespace(): + tpl = Template() + template_str = "12345 {# Comment Block! #} 67890" + data = {} + result = await tpl.render(template_str, data) + expected = "12345 67890" + assert expected == result + + +# Old template system tests: Filter + + +@pytest.mark.asyncio +async def test_invalid(): + tpl = Template() + template_str = "{{words | invalidFilter}}" + data = {"words": "This won't be changed"} + result = await tpl.render(template_str, data) + expected = "This won't be changed" + assert expected == result + + +@pytest.mark.asyncio +async def test_lower(): + tpl = Template() + template_str = "{{word | lower}}" + data = {"word": "QUIET"} + result = await tpl.render(template_str, data) + expected = "quiet" + assert expected == result + + +@pytest.mark.asyncio +async def test_upper(): + tpl = Template() + template_str = "{{word | upper}}" + data = {"word": "loud"} + result = await tpl.render(template_str, data) + expected = "LOUD" + assert expected == result + + +@pytest.mark.asyncio +async def test_multiple(): + tpl = Template() + template_str = "{{word | upper | lower | upper | lower | upper | lower | upper | lower | upper}}" + data = {"word": "loud"} + result = await tpl.render(template_str, data) + expected = "LOUD" + assert expected == result + + +if __name__ == "__main__": + pytest.main(["-v", __file__])