import aiohttp import asyncio import datetime import discord import html import logging import math import random import time from redbot.core import bank, checks, commands, Config from redbot.core.errors import BalanceTooHigh from redbot.core.utils.chat_formatting import box log = logging.getLogger("red.aikaterna.quiz") def check_global_setting_admin(): """ Command decorator. If the bank is not global, it checks if the author is either a bot admin or has the manage_guild permission. """ async def pred(ctx: commands.Context): author = ctx.author if not await bank.is_global(): if not isinstance(ctx.channel, discord.abc.GuildChannel): return False if await ctx.bot.is_owner(author): return True if author == ctx.guild.owner: return True if ctx.channel.permissions_for(author).manage_guild: return True admin_role_ids = await ctx.bot.get_admin_role_ids(ctx.guild.id) for role in author.roles: if role.id in admin_role_ids: return True else: return await ctx.bot.is_owner(author) return commands.check(pred) class Quiz(commands.Cog): """ Play a kahoot-like trivia game with questions from Open Trivia Database. Originally by Keane for Red v2 """ async def red_delete_data_for_user(self, **kwargs): """ Nothing to delete """ return def __init__(self, bot): self.bot = bot self.game_tasks = [] self.playing_channels = {} self.session = aiohttp.ClientSession() self.starter_task = self.bot.loop.create_task(self.start_loop()) self.config = Config.get_conf(self, 2782511001, force_registration=True) default_guild = { "afk": 3, "multiplier": 100, "questions": 20, "show_answer": True, "token": None, } self.config.register_guild(**default_guild) @commands.guild_only() @commands.group() async def quiz(self, ctx): """Play a kahoot-like trivia game. Questions from the Open Trivia Database. In this game, you will compete with other players to correctly answer each question as quickly as you can. You have 10 seconds to type the answer choice before time runs out. The longer you take to say the right answer, the fewer points you get. If you get it wrong, you get no points. Only the first valid answer (A, B, C, or D) will be recorded - be sure of the answer before replying! To end the game, stop responding to the questions and the game will time out. """ pass @quiz.command(name="play") async def quiz_play(self, ctx, *, category_name_or_id=None): """ Create or join a quiz game. Specify a category name or ID number, otherwise it will be random. Use [p]quiz categories to list category names or id numbers. """ channel = ctx.message.channel player = ctx.message.author if not category_name_or_id: # random category_id = await self.category_selector() category_name = await self.category_name_from_id(category_id) elif category_name_or_id.isdigit(): # cat id specified if 9 <= int(category_name_or_id) >= 32: return await ctx.send(f"Invalid category number. Use `{ctx.prefix}quiz categories` to see a list.") category_id = category_name_or_id try: category_name = await self.category_name_from_id(int(category_name_or_id)) except RuntimeError: return await ctx.send(f"Invalid category ID. Use `{ctx.prefix}quiz categories` to see a list.") else: # cat name specified try: category_name = await self.category_name_match(category_name_or_id) except RuntimeError: return await ctx.send(f"Invalid category name. Use `{ctx.prefix}quiz categories` to see a list.") category_id = await self.category_id_from_name(category_name) if channel.id not in self.playing_channels: self.playing_channels[channel.id] = { "Start": datetime.datetime.utcnow(), "Started": False, "Players": {player.id: 0}, "Answers": {}, "Category": str(category_name), "CategoryID": int(category_id), } return await ctx.send( f"{player.display_name} is starting a quiz game!\n" f"Category: `{category_name}`\n" f"It will start in 30 seconds. Use `{ctx.prefix}quiz play` to join." ) channelinfo = self.playing_channels[channel.id] if player.id in channelinfo["Players"]: await ctx.send("You are already in the game.") elif channelinfo["Started"]: await ctx.send("A quiz game is already underway.") else: channelinfo["Players"][player.id] = 0 await ctx.send(f"{player.display_name} joined the game.") @quiz.command(name="categories") async def quiz_cat(self, ctx): """List quiz categories.""" async with self.session.get("https://opentdb.com/api_category.php") as response: response_json = await response.json() msg = f"[Category Name]{' ' * 24}[ID]\n" for cat_dict in response_json["trivia_categories"]: padding = 40 - len(cat_dict["name"]) msg += f"{cat_dict['name']}{' ' * padding}{cat_dict['id']}\n" embed = discord.Embed(description=box(msg, lang="ini")) await ctx.send(embed=embed) @commands.guild_only() @commands.group() @checks.mod_or_permissions(manage_guild=True) async def quizset(self, ctx): """Quiz settings.""" if ctx.invoked_subcommand is None: guild_data = await self.config.guild(ctx.guild).all() msg = ( f"[Quiz Settings for {ctx.guild.name}]\n" f"AFK questions before end: {guild_data['afk']}\n" f"Credit multiplier: {guild_data['multiplier']}x\n" f"Number of questions: {guild_data['questions']}\n" f"Reveal correct answer: {guild_data['show_answer']}\n" ) await ctx.send(box(msg, lang="ini")) @quizset.command(name="afk") async def quizset_afk(self, ctx, questions: int): """Set number of questions before the game ends due to non-answers.""" if 1 <= questions <= 10: await self.config.guild(ctx.guild).afk.set(questions) plural = "" if int(questions) == 1 else "s" return await ctx.send( f"{questions} question{plural} will be asked before the game times out. " "A question will be counted in this afk count if 0 or 1 person answers. " "2 or more answers on a question will not trigger this counter." ) await ctx.send("Please use a number between 1 and 10. The default is 3.") @quizset.command(name="show") async def quizset_show(self, ctx): """Toggle revealing the answers.""" show = await self.config.guild(ctx.guild).show_answer() await self.config.guild(ctx.guild).show_answer.set(not show) await ctx.send(f"Question answers will be revealed during the game: {not show}") @quizset.command(name="questions") async def quizset_questions(self, ctx, questions: int): """Set number of questions per game.""" if 5 <= questions <= 50: await self.config.guild(ctx.guild).questions.set(questions) return await ctx.send(f"Number of questions per game: {questions}") await ctx.send("Please use a number between 5 and 50. The default is 20.") @check_global_setting_admin() @quizset.command(name="multiplier") async def quizset_multiplier(self, ctx, multiplier: int): """ Set the credit multiplier. The accepted range is 0 - 10000. 0 will turn credit gain off. Credit gain will be based on the number of questions set and user speed. 1x = A small amount of credits like 1-10. 100x = A handful of credits: 100-500. 10000x = Quite a lot of credits, around 10k to 50k. """ if 0 <= multiplier <= 10000: await self.config.guild(ctx.guild).multiplier.set(multiplier) credits_name = await bank.get_currency_name(ctx.guild) return await ctx.send(f"Credit multipilier: `{multiplier}x`") await ctx.send("Please use a number between 0 and 10000. The default is 100.") async def start_loop(self): """Starts quiz games when the timeout period ends.""" try: while True: for channelid in list(self.playing_channels): channelinfo = self.playing_channels[channelid] since_start = (datetime.datetime.utcnow() - channelinfo["Start"]).total_seconds() if since_start > 30 and not channelinfo["Started"]: channel = self.bot.get_channel(channelid) if len(channelinfo["Players"]) > 1: channelinfo["Started"] = True task = self.bot.loop.create_task(self.game(channel)) self.game_tasks.append(task) else: await channel.send("Nobody else joined the quiz game.") self.playing_channels.pop(channelid) await asyncio.sleep(2) except Exception: log.error("Error in Quiz start loop.", exc_info=True) async def game(self, channel): """Runs a quiz game on a channel.""" channelinfo = self.playing_channels[channel.id] category = channelinfo["CategoryID"] category_name = channelinfo["Category"] try: response = await self.get_questions(channel.guild, category=channelinfo["CategoryID"]) except RuntimeError: await channel.send("An error occurred in retrieving questions. Please try again.") self.playing_channels.pop(channel.id) raise # Introduction intro = ( f"Welcome to the quiz game! Your category is `{category_name}`.\n" "Remember to answer correctly as quickly as you can for more points.\n" "You have 10 seconds per question: the timer is shown in reactions on each question.\n" "The game will begin shortly." ) await channel.send(intro) await asyncio.sleep(4) # Question and Answer afk_questions = 0 for index, dictionary in enumerate(response["results"]): answers = [dictionary["correct_answer"]] + dictionary["incorrect_answers"] # Display question and countdown if len(answers) == 2: # true/false question answers = ["True", "False", "", ""] else: answers = [html.unescape(answer) for answer in answers] random.shuffle(answers) message = "" message += html.unescape(dictionary["question"]) + "\n" message += f"A. {answers[0]}\n" message += f"B. {answers[1]}\n" message += f"C. {answers[2]}\n" message += f"D. {answers[3]}\n" message_obj = await channel.send(box(message)) await message_obj.add_reaction("0⃣") channelinfo["Answers"].clear() # clear the previous question's answers start_time = time.perf_counter() numbers = ["1⃣", "2⃣", "3⃣", "4⃣", "5⃣", "6⃣", "7⃣", "8⃣", "9⃣", "🔟"] for i in range(10): if len(channelinfo["Answers"]) == len(channelinfo["Players"]): break await asyncio.sleep(1) await message_obj.add_reaction(numbers[i]) # Organize answers user_answers = channelinfo["Answers"] # snapshot channelinfo["Answers"] at this point in time # to ignore new answers that are added to it answerdict = {["a", "b", "c", "d"][num]: answers[num] for num in range(4)} # Check for AFK if len(user_answers) < 2: afk_questions += 1 afk_count = await self.config.guild(channel.guild).afk() if afk_questions == int(afk_count): await channel.send("The game has been cancelled due to lack of participation.") self.playing_channels.pop(channel.id) return else: afk_questions = 0 # Find and display correct answer correct_letter = "" for letter, answer in answerdict.items(): if answer == html.unescape(dictionary["correct_answer"]): correct_letter = letter break assert answerdict[correct_letter] == html.unescape(dictionary["correct_answer"]) if await self.config.guild(channel.guild).show_answer(): message = f"Correct answer:```{correct_letter.upper()}. {answerdict[correct_letter]}```" await channel.send(message) # Assign scores for playerid in user_answers: if user_answers[playerid]["Choice"] == correct_letter: time_taken = user_answers[playerid]["Time"] - start_time assert time_taken > 0 if time_taken < 1: channelinfo["Players"][playerid] += 1000 else: # the 20 in the formula below is 2 * 10s (max answer time) channelinfo["Players"][playerid] += round(1000 * (1 - (time_taken / 20))) # Display top 5 players and their points message = self.scoreboard(channel) await channel.send("Scoreboard:\n" + message) await asyncio.sleep(4) questions = await self.config.guild(channel.guild).questions() if index < (int(questions) - 1): await channel.send("Next question...") await asyncio.sleep(1) await self.end_game(channel) async def end_game(self, channel): """Ends a quiz game.""" # non-linear credit earning .0002x^{2.9} where x is score/100 channelinfo = self.playing_channels[channel.id] idlist = sorted(list(channelinfo["Players"]), key=(lambda idnum: channelinfo["Players"][idnum]), reverse=True,) winner = channel.guild.get_member(idlist[0]) await channel.send(f"Game over! {winner.mention} won!") multiplier = await self.config.guild(channel.guild).multiplier() if multiplier == 0: self.playing_channels.pop(channel.id) return leaderboard = "\n" max_credits = self.calculate_credits(channelinfo["Players"][idlist[0]]) end_len = len(str(max_credits)) + 1 rank_len = len(str(len(channelinfo["Players"]))) rank = 1 for playerid in idlist: player = channel.guild.get_member(playerid) if len(player.display_name) > 25 - rank_len - end_len: name = player.display_name[: 22 - rank_len - end_len] + "..." else: name = player.display_name leaderboard += str(rank) leaderboard += " " * (1 + rank_len - len(str(rank))) leaderboard += name creds = self.calculate_credits(channelinfo["Players"][playerid]) * int(multiplier) creds_str = str(creds) leaderboard += " " * (26 - rank_len - 1 - len(name) - len(creds_str)) leaderboard += creds_str + "\n" try: await bank.deposit_credits(player, creds) except BalanceTooHigh as e: await bank.set_balance(player, e.max_balance) rank += 1 await channel.send("Credits earned:\n" + box(leaderboard, lang="py")) self.playing_channels.pop(channel.id) def scoreboard(self, channel): """Returns a scoreboard string to be sent to the text channel.""" channelinfo = self.playing_channels[channel.id] scoreboard = "\n" idlist = sorted(list(channelinfo["Players"]), key=(lambda idnum: channelinfo["Players"][idnum]), reverse=True,) max_score = channelinfo["Players"][idlist[0]] end_len = len(str(max_score)) + 1 rank = 1 for playerid in idlist[:5]: player = channel.guild.get_member(playerid) if len(player.display_name) > 24 - end_len: name = player.display_name[: 21 - end_len] + "..." else: name = player.display_name scoreboard += str(rank) + " " + name score_str = str(channelinfo["Players"][playerid]) scoreboard += " " * (24 - len(name) - len(score_str)) scoreboard += score_str + "\n" rank += 1 return box(scoreboard, lang="py") def calculate_credits(self, score): """Calculates credits earned from a score.""" adjusted = score / 100 if adjusted < 156.591: result = 0.0002 * (adjusted ** 2.9) else: result = (0.6625 * math.exp(0.0411 * adjusted)) + 50 return round(result) @commands.Cog.listener() async def on_message_without_command(self, message): if not message.guild: return authorid = message.author.id channelid = message.channel.id choice = message.content.lower() if channelid in self.playing_channels: channelinfo = self.playing_channels[channelid] if ( authorid in channelinfo["Players"] and authorid not in channelinfo["Answers"] and choice in {"a", "b", "c", "d"} ): channelinfo["Answers"][authorid] = {"Choice": choice, "Time": time.perf_counter()} # OpenTriviaDB API functions async def get_questions(self, server, category=None, difficulty=None): """Gets questions, resetting a token or getting a new one if necessary.""" questions = await self.config.guild(server).questions() parameters = {"amount": questions} if category: parameters["category"] = category if difficulty: parameters["difficulty"] = difficulty for _ in range(3): parameters["token"] = await self.get_token(server) async with self.session.get("https://opentdb.com/api.php", params=parameters) as response: response_json = await response.json() response_code = response_json["response_code"] if response_code == 0: return response_json elif response_code == 1: raise RuntimeError("Question retrieval unsuccessful. Response code from OTDB: 1") elif response_code == 2: raise RuntimeError("Question retrieval unsuccessful. Response code from OTDB: 2") elif response_code == 3: # Token expired. Obtain new one. log.debug("Quiz: Response code from OTDB: 3") await self.config.guild(server).token.set(None) elif response_code == 4: # Token empty. Reset it. log.debug("Quiz: Response code from OTDB: 4") await self.reset_token(server) raise RuntimeError("Failed to retrieve questions.") async def get_token(self, server): """Gets the provided server's token, or generates and saves one if one doesn't exist.""" token = await self.config.guild(server).token() if not token: async with self.session.get("https://opentdb.com/api_token.php", params={"command": "request"}) as response: response_json = await response.json() token = response_json["token"] await self.config.guild(server).token.set(token) return token async def reset_token(self, server): """Resets the provided server's token.""" token = await self.config.guild(server).token() async with self.session.get( "https://opentdb.com/api_token.php", params={"command": "reset", "token": token} ) as response: response_code = (await response.json())["response_code"] if response_code != 0: raise RuntimeError(f"Token reset was unsuccessful. Response code from OTDB: {response_code}") async def category_selector(self): """Chooses a random category that has enough questions.""" for _ in range(10): category = random.randint(9, 32) async with self.session.get("https://opentdb.com/api_count.php", params={"category": category}) as response: response_json = await response.json() assert response_json["category_id"] == category if response_json["category_question_count"]["total_question_count"] > 39: return category raise RuntimeError("Failed to select a category.") async def category_name_from_id(self, idnum): """Finds a category's name from its number.""" async with self.session.get("https://opentdb.com/api_category.php") as response: response_json = await response.json() for cat_dict in response_json["trivia_categories"]: if cat_dict["id"] == idnum: return cat_dict["name"] raise RuntimeError("Failed to find category's name.") async def category_name_match(self, name): """Check if a category name exists.""" async with self.session.get("https://opentdb.com/api_category.php") as response: response_json = await response.json() for cat_dict in response_json["trivia_categories"]: if cat_dict["name"].lower() == name.lower(): return cat_dict["name"] raise RuntimeError("Failed to find category's name.") async def category_id_from_name(self, name): """Finds a category's name from its number.""" async with self.session.get("https://opentdb.com/api_category.php") as response: response_json = await response.json() for cat_dict in response_json["trivia_categories"]: if cat_dict["name"] == name: return cat_dict["id"] raise RuntimeError("Failed to find category's id.") def cog_unload(self): self.bot.loop.create_task(self.session.close()) self.starter_task.cancel() for task in self.game_tasks: task.cancel()