diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..26d3352 --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,3 @@ +# Default ignored files +/shelf/ +/workspace.xml diff --git a/.idea/Ava-Cogs.iml b/.idea/Ava-Cogs.iml new file mode 100644 index 0000000..d6ebd48 --- /dev/null +++ b/.idea/Ava-Cogs.iml @@ -0,0 +1,9 @@ + + + + + + + + + \ No newline at end of file diff --git a/.idea/discord.xml b/.idea/discord.xml new file mode 100644 index 0000000..912db82 --- /dev/null +++ b/.idea/discord.xml @@ -0,0 +1,14 @@ + + + + + \ No newline at end of file diff --git a/.idea/misc.xml b/.idea/misc.xml new file mode 100644 index 0000000..6f29fee --- /dev/null +++ b/.idea/misc.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 0000000..37313d7 --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 0000000..35eb1dd --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/lastfm/__init__.py b/lastfm/__init__.py new file mode 100644 index 0000000..9873a82 --- /dev/null +++ b/lastfm/__init__.py @@ -0,0 +1,9 @@ +from .lastfm import LastFM + +__red_end_user_data_statement__ = "This cog stores a user's last.fm username, their amount of VC scrobbles, and a last.fm session key to scrobble for them. It also stores the user's crowns. This is all data that can be cleared." + + +async def setup(bot): + cog = LastFM(bot) + await bot.add_cog(cog) + await cog.initialize() diff --git a/lastfm/abc.py b/lastfm/abc.py new file mode 100644 index 0000000..69a08c9 --- /dev/null +++ b/lastfm/abc.py @@ -0,0 +1,22 @@ +from abc import ABC + +from redbot.core import Config, commands +from redbot.core.bot import Red + + +class CompositeMetaClass(type(commands.Cog), type(ABC)): + """ + This allows the metaclass used for proper type detection to coexist with discord.py's + metaclass. + """ + + +class MixinMeta(ABC): + """ + Base class for well behaved type hint detection with composite class. + Basically, to keep developers sane when not all attributes are defined in each mixin. + """ + + def __init__(self, *_args): + self.config: Config + self.bot: Red diff --git a/lastfm/charts.py b/lastfm/charts.py new file mode 100644 index 0000000..6875f99 --- /dev/null +++ b/lastfm/charts.py @@ -0,0 +1,421 @@ +import asyncio +from io import BytesIO + +import discord +from PIL import Image, ImageDraw, ImageFile, ImageFont +from redbot.core import commands +from redbot.core.utils import AsyncIter +from redbot.core.utils.chat_formatting import escape + +from .abc import MixinMeta +from .exceptions import * +from .fmmixin import FMMixin + +NO_IMAGE_PLACEHOLDER = ( + "https://lastfm.freetls.fastly.net/i/u/300x300/2a96cbd8b46e442fc41c2b86b821562f.png" +) +ImageFile.LOAD_TRUNCATED_IMAGES = True + +command_fm = FMMixin.command_fm +command_fm_server = FMMixin.command_fm_server + + +class ChartMixin(MixinMeta): + """Chart Commands""" + + async def get_img(self, url): + async with self.session.get(url or NO_IMAGE_PLACEHOLDER) as resp: + if resp.status == 200: + img = await resp.read() + return img + async with self.session.get(NO_IMAGE_PLACEHOLDER) as resp: + img = await resp.read() + return img + + @command_fm.command( + name="chart", usage="[album | artist | recent | track] [timeframe] [width]x[height]" + ) + @commands.max_concurrency(1, commands.BucketType.user) + async def command_chart(self, ctx, *args): + """ + Visual chart of your top albums, tracks or artists. + + Defaults to top albums, weekly, 3x3. + """ + conf = await self.config.user(ctx.author).all() + self.check_if_logged_in(conf) + arguments = self.parse_chart_arguments(args) + if arguments["width"] + arguments["height"] > 31: # TODO: Figure out a reasonable value. + return await ctx.send( + "Size is too big! Chart `width` + `height` total must not exceed `31`" + ) + msg = await ctx.send("Gathering images and data, this may take some time.") + data = await self.api_request( + ctx, + { + "user": conf["lastfm_username"], + "method": arguments["method"], + "period": arguments["period"], + "limit": arguments["amount"], + }, + ) + chart = [] + chart_type = "ERROR" + async with ctx.typing(): + if arguments["method"] == "user.gettopalbums": + chart_type = "top album" + albums = data["topalbums"]["album"] + async for album in AsyncIter(albums[: arguments["width"] * arguments["height"]]): + name = album["name"] + artist = album["artist"]["name"] + plays = album["playcount"] + if album["image"][3]["#text"] in self.chart_data: + chart_img = self.chart_data[album["image"][3]["#text"]] + else: + chart_img = await self.get_img(album["image"][3]["#text"]) + self.chart_data[album["image"][3]["#text"]] = chart_img + chart.append( + ( + f"{plays} {self.format_plays(plays)}\n{name} - {artist}", + chart_img, + ) + ) + img = await self.bot.loop.run_in_executor( + None, + charts, + chart, + arguments["width"], + arguments["height"], + self.data_loc, + ) + + elif arguments["method"] == "user.gettopartists": + chart_type = "top artist" + artists = data["topartists"]["artist"] + if self.login_token: + scraped_images = await self.scrape_artists_for_chart( + ctx, conf["lastfm_username"], arguments["period"], arguments["amount"] + ) + else: + scraped_images = [NO_IMAGE_PLACEHOLDER] * arguments["amount"] + iterator = AsyncIter(artists[: arguments["width"] * arguments["height"]]) + async for i, artist in iterator.enumerate(): + name = artist["name"] + plays = artist["playcount"] + if i < len(scraped_images) and scraped_images[i] in self.chart_data: + chart_img = self.chart_data[scraped_images[i]] + else: + chart_img = await self.get_img(scraped_images[i]) + self.chart_data[scraped_images[i]] = chart_img + chart.append( + ( + f"{plays} {self.format_plays(plays)}\n{name}", + chart_img, + ) + ) + img = await self.bot.loop.run_in_executor( + None, + charts, + chart, + arguments["width"], + arguments["height"], + self.data_loc, + ) + + elif arguments["method"] == "user.getrecenttracks": + chart_type = "recent tracks" + tracks = data["recenttracks"]["track"] + if isinstance(tracks, dict): + tracks = [tracks] + async for track in AsyncIter(tracks[: arguments["width"] * arguments["height"]]): + name = track["name"] + artist = track["artist"]["#text"] + if track["image"][3]["#text"] in self.chart_data: + chart_img = self.chart_data[track["image"][3]["#text"]] + else: + chart_img = await self.get_img(track["image"][3]["#text"]) + self.chart_data[track["image"][3]["#text"]] = chart_img + chart.append( + ( + f"{name} - {artist}", + chart_img, + ) + ) + img = await self.bot.loop.run_in_executor( + None, + track_chart, + chart, + arguments["width"], + arguments["height"], + self.data_loc, + ) + elif arguments["method"] == "user.gettoptracks": + chart_type = "top tracks" + tracks = data["toptracks"]["track"] + scraped_images = await self.scrape_artists_for_chart( + ctx, conf["lastfm_username"], arguments["period"], arguments["amount"] + ) + if isinstance(tracks, dict): + tracks = [tracks] + async for track in AsyncIter(tracks[: arguments["width"] * arguments["height"]]): + name = track["name"] + artist = track["artist"]["name"] + plays = track["playcount"] + if name in self.chart_data: + chart_img = self.chart_data[name] + else: + chart_img = await self.get_img(await self.scrape_artist_image(artist, ctx)) + self.chart_data[name] = chart_img + chart.append( + ( + f"{plays} {self.format_plays(plays)}\n{name} - {artist}", + chart_img, + ) + ) + img = await self.bot.loop.run_in_executor( + None, + charts, + chart, + arguments["width"], + arguments["height"], + self.data_loc, + ) + await msg.delete() + u = conf["lastfm_username"] + try: + await ctx.send( + f"`{u} - {self.humanized_period(arguments['period'])} - {arguments['width']}x{arguments['height']} {chart_type} chart`", + file=img, + ) + except discord.HTTPException: + await ctx.send("File is to big to send, try lowering the size.") + + @command_fm_server.command( + name="chart", usage="[album | artist | tracks] [timeframe] [width]x[height]" + ) + @commands.max_concurrency(1, commands.BucketType.user) + async def server_chart(self, ctx, *args): + """Visual chart of the servers albums, artists or tracks.""" + arguments = self.parse_chart_arguments(args) + if arguments["width"] + arguments["height"] > 31: # TODO: Figure out a reasonable value. + return await ctx.send( + "Size is too big! Chart `width` + `height` total must not exceed `31`" + ) + if arguments["method"] not in [ + "user.gettopalbums", + "user.gettopartists", + "user.gettoptracks", + ]: + return await ctx.send("Only albums, artists and tracks are supported.") + chart_total = arguments["width"] * arguments["height"] + msg = await ctx.send("Gathering images and data, this may take some time.") + tasks = [] + userlist = await self.config.all_users() + guildusers = [x.id for x in ctx.guild.members] + userslist = [user for user in userlist if user in guildusers] + datatype = { + "user.gettopalbums": "album", + "user.gettopartists": "artist", + "user.gettoptracks": "track", + } + for user in userslist: + lastfm_username = userlist[user]["lastfm_username"] + if lastfm_username is None: + continue + member = ctx.guild.get_member(user) + if member is None: + continue + + tasks.append( + self.get_server_top( + ctx, + lastfm_username, + datatype.get(arguments["method"]), + arguments["period"], + arguments["amount"], + ) + ) + chart = [] + chart_type = "ERROR" + if not tasks: + return await ctx.send("No users have set their last.fm username yet.") + content_map = {} + async with ctx.typing(): + data = await asyncio.gather(*tasks) + if arguments["method"] == "user.gettopalbums": + chart_type = "top album" + for user_data in data: + if user_data is None: + continue + for album in user_data: + album_name = album["name"] + artist = album["artist"]["name"] + name = f"{album_name} — {artist}" + plays = int(album["playcount"]) + if name in content_map: + content_map[name]["plays"] += plays + else: + content_map[name] = { + "plays": plays, + "link": album["image"][3]["#text"], + } + elif arguments["method"] == "user.gettopartists": + chart_type = "top artist" + for user_data in data: + if user_data is None: + continue + for artist in user_data: + name = artist["name"] + plays = int(artist["playcount"]) + if name in content_map: + content_map[name]["plays"] += plays + else: + content_map[name] = {"plays": plays} + elif arguments["method"] == "user.gettoptracks": + chart_type = "top tracks" + for user in data: + if user is None: + continue + for user_data in user: + name = f'{escape(user_data["artist"]["name"])} — *{escape(user_data["name"])}*' + plays = int(user_data["playcount"]) + if name in content_map: + content_map[name]["plays"] += plays + else: + content_map[name] = { + "plays": plays, + "link": user_data["artist"]["name"], + } + cached_images = {} + for i, (name, content_data) in enumerate( + sorted(content_map.items(), key=lambda x: x[1]["plays"], reverse=True), start=1 + ): + if arguments["method"] == "user.gettopartists": + image = await self.get_img(await self.scrape_artist_image(name, ctx)) + elif arguments["method"] == "user.gettoptracks": + if content_data["link"] in cached_images: + image = cached_images[content_data["link"]] + else: + image = await self.get_img( + await self.scrape_artist_image(content_data["link"], ctx) + ) + cached_images[content_data["link"]] = image + else: + image = await self.get_img(content_data["link"]) + chart.append( + ( + f"{content_data['plays']} {self.format_plays(content_data['plays'])}\n{name}", + image, + ) + ) + if i >= chart_total: + break + img = await self.bot.loop.run_in_executor( + None, + charts, + chart, + arguments["width"], + arguments["height"], + self.data_loc, + ) + await msg.delete() + try: + await ctx.send( + f"`{ctx.guild} - {self.humanized_period(arguments['period'])} - {arguments['width']}x{arguments['height']} {chart_type} chart`", + file=img, + ) + except discord.HTTPException: + await ctx.send("File is to big to send, try lowering the size.") + + +def charts(data, w, h, loc): + fnt_file = f"{loc}/fonts/Arial Unicode.ttf" + fnt = ImageFont.truetype(fnt_file, 18, encoding="utf-8") + imgs = [] + for item in data: + img = BytesIO(item[1]) + image = Image.open(img).convert("RGBA") + draw = ImageDraw.Draw(image) + texts = item[0].split("\n") + if len(texts[1]) > 30: + height = 223 + text = f"{texts[0]}\n{texts[1][:30]}\n{texts[1][30:]}" + else: + height = 247 + text = item[0] + draw.text( + (5, height), + text, + fill=(255, 255, 255, 255), + font=fnt, + stroke_width=1, + stroke_fill=(0, 0, 0), + ) + _file = BytesIO() + image.save(_file, "png") + _file.name = f"{item[0]}.png" + _file.seek(0) + imgs.append(_file) + return create_graph(imgs, w, h) + + +def track_chart(data, w, h, loc): + fnt_file = f"{loc}/fonts/Arial Unicode.ttf" + fnt = ImageFont.truetype(fnt_file, 18, encoding="utf-8") + imgs = [] + for item in data: + img = BytesIO(item[1]) + image = Image.open(img).convert("RGBA") + draw = ImageDraw.Draw(image) + if len(item[0]) > 30: + height = 243 + text = f"{item[0][:30]}\n{item[0][30:]}" + else: + height = 267 + text = item[0] + draw.text( + (5, height), + text, + fill=(255, 255, 255, 255), + font=fnt, + stroke_width=1, + stroke_fill=(0, 0, 0), + ) + _file = BytesIO() + image.save(_file, "png") + _file.name = f"{item[0]}.png" + _file.seek(0) + imgs.append(_file) + return create_graph(imgs, w, h) + + +def chunks(l, n): + """Yield successive n-sized chunks from l.""" + for i in range(0, len(l), n): + yield l[i : i + n] + + +def create_graph(data, w, h): + dimensions = (300 * w, 300 * h) + final = Image.new("RGBA", dimensions) + images = chunks(data, w) + y = 0 + for chunked in images: + x = 0 + for img in chunked: + new = Image.open(img) + w, h = new.size + final.paste(new, (x, y, x + w, y + h)) + x += 300 + y += 300 + w, h = final.size + if w > 2100 and h > 2100: + final = final.resize( + (2100, 2100), resample=Image.ANTIALIAS + ) # Resize cause a 6x6k image is blocking when being sent + file = BytesIO() + final.save(file, "webp") + file.name = f"chart.webp" + file.seek(0) + image = discord.File(file) + return image diff --git a/lastfm/compare.py b/lastfm/compare.py new file mode 100644 index 0000000..4e211fd --- /dev/null +++ b/lastfm/compare.py @@ -0,0 +1,295 @@ +import contextlib +from io import BytesIO + +import discord +import tabulate +from PIL import Image, ImageDraw, ImageFont +from redbot.core.utils import AsyncIter +from redbot.core.utils.chat_formatting import humanize_number + +from .abc import MixinMeta +from .exceptions import * +from .fmmixin import FMMixin + +command_fm = FMMixin.command_fm +command_fm_server = FMMixin.command_fm_server + + +class CompareMixin(MixinMeta): + """Commands for comparing two users""" + + def make_table_into_image(self, text): + + color = (150, 123, 182) + + lines = 0 + keep_going = True + width = 0 + for line in text: + if keep_going: + width += 6.6 + if line == "\n": + lines += 16.5 + keep_going = False + + img = Image.new("RGBA", (int(width), int(lines)), color=(255, 0, 0, 0)) + + d = ImageDraw.Draw(img) + fnt_file = f"{self.data_loc}/fonts/NotoSansMono-Regular.ttf" + font = ImageFont.truetype(fnt_file, 11, encoding="utf-8") + d.text((0, 0), text, fill=color, font=font) + + final = BytesIO() + img.save(final, "webp") + final.seek(0) + return discord.File(final, "result.webp") + + @command_fm.group(name="compare") + async def command_compare(self, ctx): + """Compare two users music tastes""" + + @command_compare.command(name="artists", aliases=["artist"]) + async def compare_artists(self, ctx, user: discord.Member, period: str = "1month"): + """ + Compare your top artists with someone else. + + `[period]` can be one of: overall, 7day, 1month, 3month, 6month, 12month + The default is 1 month. + """ + if user == ctx.author: + await ctx.send("You need to compare with someone else.") + return + + period, displayperiod = self.get_period(period) + if not period: + await ctx.send( + "Invalid period. Valid periods are: overall, 7day, 1month, 3month, 6month, 12month" + ) + return + + author_conf = await self.config.user(ctx.author).all() + self.check_if_logged_in(author_conf) + user_conf = await self.config.user(user).all() + self.check_if_logged_in(user_conf, False) + async with ctx.typing(): + author_data = await self.api_request( + ctx, + { + "user": author_conf["lastfm_username"], + "method": "user.gettopartists", + "period": period, + "limit": "10", + }, + ) + author_artists = author_data["topartists"]["artist"] + if not author_artists: + if period == "overall": + await ctx.send("You haven't listened to any artists yet.") + else: + await ctx.send( + "You haven't listened to any artists in the last {}s.".format(period) + ) + return + + g = await ctx.send("Gathering data... This might take a while.") + + author_plays = [] + artist_names = [] + for artist in author_artists: + if artist["playcount"] == 1: + author_plays.append(f"{artist['playcount']} Play") + else: + author_plays.append(f"{humanize_number(artist['playcount'])} Plays") + artist_names.append(artist["name"]) + + user_plays = [] + async for artist in AsyncIter(author_artists): + plays = await self.get_playcount( + ctx, user_conf["lastfm_username"], artist["name"], period + ) + if plays == 1: + user_plays.append(f"{plays} Play") + else: + user_plays.append(f"{humanize_number(plays)} Plays") + + data = {"Artist": artist_names, ctx.author: author_plays, user: user_plays} + table = tabulate.tabulate(data, headers="keys", tablefmt="fancy_grid") + color = await ctx.embed_colour() + img = await self.bot.loop.run_in_executor(None, self.make_table_into_image, table) + embed = discord.Embed(color=color, title=f"{ctx.author} vs {user} ({displayperiod})") + embed.set_image(url="attachment://result.webp") + + with contextlib.suppress(discord.NotFound): + await g.delete() + + await ctx.send(file=img, embed=embed) + + @command_compare.command(name="tracks", aliases=["track"]) + async def compare_tracks(self, ctx, user: discord.Member, period: str = "1month"): + """ + Compare your top tracks with someone else. + + `[period]` can be one of: overall, 7day, 1month, 3month, 6month, 12month + The default is 1 month. + """ + if user == ctx.author: + await ctx.send("You need to compare with someone else.") + return + + period, displayperiod = self.get_period(period) + if not period: + await ctx.send( + "Invalid period. Valid periods are: overall, 7day, 1month, 3month, 6month, 12month" + ) + return + + author_conf = await self.config.user(ctx.author).all() + self.check_if_logged_in(author_conf) + user_conf = await self.config.user(user).all() + self.check_if_logged_in(user_conf, False) + async with ctx.typing(): + author_data = await self.api_request( + ctx, + { + "user": author_conf["lastfm_username"], + "method": "user.gettoptracks", + "period": period, + "limit": "10", + }, + ) + author_tracks = author_data["toptracks"]["track"] + if not author_tracks: + if period == "overall": + await ctx.send("You haven't listened to any tracks yet.") + else: + await ctx.send("You haven't listened to any tracks in that time period.") + return + + g = await ctx.send("Gathering data... This might take a while.") + + author_plays = [] + artist_names = [] + track_names = [] + for track in author_tracks: + if track["playcount"] == 1: + author_plays.append(f"{track['playcount']} Play") + else: + author_plays.append(f"{humanize_number(track['playcount'])} Plays") + artist_names.append(track["artist"]["name"]) + track_names.append(track["name"]) + + user_plays = [] + async for track in AsyncIter(author_tracks): + plays = await self.get_playcount_track( + ctx, + user_conf["lastfm_username"], + track["artist"]["name"], + track["name"], + period, + ) + if plays == 1: + user_plays.append(f"{plays} Play") + else: + user_plays.append(f"{humanize_number(plays)} Plays") + + data = { + "Artist": artist_names, + "Track": track_names, + ctx.author: author_plays, + user: user_plays, + } + table = tabulate.tabulate(data, headers="keys", tablefmt="fancy_grid") + color = await ctx.embed_colour() + img = await self.bot.loop.run_in_executor(None, self.make_table_into_image, table) + embed = discord.Embed(color=color, title=f"{ctx.author} vs {user} ({displayperiod})") + embed.set_image(url="attachment://result.webp") + + with contextlib.suppress(discord.NotFound): + await g.delete() + + await ctx.send(file=img, embed=embed) + + @command_compare.command(name="albums", aliases=["album"]) + async def compare_albums(self, ctx, user: discord.Member, period: str = "1month"): + """ + Compare your top albums with someone else. + + `[period]` can be one of: overall, 7day, 1month, 3month, 6month, 12month + The default is 1 month. + """ + if user == ctx.author: + await ctx.send("You need to compare with someone else.") + return + + period, displayperiod = self.get_period(period) + if not period: + await ctx.send( + "Invalid period. Valid periods are: overall, 7day, 1month, 3month, 6month, 12month" + ) + return + + author_conf = await self.config.user(ctx.author).all() + self.check_if_logged_in(author_conf) + user_conf = await self.config.user(user).all() + self.check_if_logged_in(user_conf, False) + async with ctx.typing(): + author_data = await self.api_request( + ctx, + { + "user": author_conf["lastfm_username"], + "method": "user.gettopalbums", + "period": period, + "limit": "10", + }, + ) + author_albums = author_data["topalbums"]["album"] + if not author_albums: + if period == "overall": + await ctx.send("You haven't listened to any albums yet.") + else: + await ctx.send("You haven't listened to any albums in that time period.") + return + + g = await ctx.send("Gathering data... This might take a while.") + + author_plays = [] + artist_names = [] + album_names = [] + for album in author_albums: + if album["playcount"] == 1: + author_plays.append(f"{album['playcount']} Play") + else: + author_plays.append(f"{humanize_number(album['playcount'])} Plays") + artist_names.append(album["artist"]["name"]) + album_names.append(album["name"]) + + user_plays = [] + async for album in AsyncIter(author_albums): + plays = await self.get_playcount_album( + ctx, + user_conf["lastfm_username"], + album["artist"]["name"], + album["name"], + period, + ) + if plays == 1: + user_plays.append(f"{plays} Play") + else: + user_plays.append(f"{humanize_number(plays)} Plays") + + data = { + "Artist": artist_names, + "Album": album_names, + ctx.author: author_plays, + user: user_plays, + } + table = tabulate.tabulate(data, headers="keys", tablefmt="fancy_grid") + color = await ctx.embed_colour() + img = await self.bot.loop.run_in_executor(None, self.make_table_into_image, table) + embed = discord.Embed(color=color, title=f"{ctx.author} vs {user} ({displayperiod})") + embed.set_image(url="attachment://result.webp") + + with contextlib.suppress(discord.NotFound): + await g.delete() + + await ctx.send(file=img, embed=embed) diff --git a/lastfm/data/fonts/Arial Unicode.ttf b/lastfm/data/fonts/Arial Unicode.ttf new file mode 100644 index 0000000..1537c5b Binary files /dev/null and b/lastfm/data/fonts/Arial Unicode.ttf differ diff --git a/lastfm/data/fonts/NotoSansMono-Regular.ttf b/lastfm/data/fonts/NotoSansMono-Regular.ttf new file mode 100644 index 0000000..a850b21 Binary files /dev/null and b/lastfm/data/fonts/NotoSansMono-Regular.ttf differ diff --git a/lastfm/exceptions.py b/lastfm/exceptions.py new file mode 100644 index 0000000..7298a95 --- /dev/null +++ b/lastfm/exceptions.py @@ -0,0 +1,22 @@ +class LastFMError(Exception): + pass + + +class NotLoggedInError(LastFMError): + pass + + +class NeedToReauthorizeError(LastFMError): + pass + + +class SilentDeAuthorizedError(LastFMError): + pass + + +class NoScrobblesError(LastFMError): + pass + + +class NotScrobblingError(LastFMError): + pass diff --git a/lastfm/fmmixin.py b/lastfm/fmmixin.py new file mode 100644 index 0000000..6896dc3 --- /dev/null +++ b/lastfm/fmmixin.py @@ -0,0 +1,24 @@ +from redbot.core import commands + +from .utils.tokencheck import tokencheck + + + + +class FMMixin: + """This is mostly here to easily mess with things...""" + + + @commands.check(tokencheck) + @commands.group(name="fm") + async def command_fm(self, ctx: commands.Context): + """ + LastFM commands + """ + + + @command_fm.group(name="server") + async def command_fm_server(self, ctx: commands.Context): + """ + LastFM Server Commands + """ diff --git a/lastfm/info.json b/lastfm/info.json new file mode 100644 index 0000000..f1c1711 --- /dev/null +++ b/lastfm/info.json @@ -0,0 +1,27 @@ +{ + "author": [ + "flare(flare#0001)", + "joinem", + "fixator10", + "ryan" + ], + "install_msg": "Thanks for installing!\n\nOnce you've loaded the cog run the `[p]lastfmset` command for instructions on how to set your API key!", + "name": "LastFm", + "disabled": false, + "short": "Port of Miso Bot's LastFM Cog", + "description": "Port of Miso Bot's LastFM Cog,", + "tags": [ + "LastFM", + "scrobbler", + "music" + ], + "hidden": false, + "requirements": [ + "humanize", + "bs4", + "tabulate", + "wordcloud", + "arrow" + ], + "min_bot_version": "3.5.0" +} diff --git a/lastfm/lastfm.py b/lastfm/lastfm.py new file mode 100644 index 0000000..d4171df --- /dev/null +++ b/lastfm/lastfm.py @@ -0,0 +1,387 @@ +import asyncio +import urllib.parse +from operator import itemgetter + +import aiohttp +import discord +from redbot.core import Config, commands +from redbot.core.data_manager import bundled_data_path +from redbot.core.utils.chat_formatting import escape, pagify +from redbot.core.utils.menus import DEFAULT_CONTROLS, menu + +from .abc import * +from .charts import ChartMixin +from .compare import CompareMixin +from .exceptions import * +from .fmmixin import FMMixin +from .love import LoveMixin +from .nowplaying import NowPlayingMixin +from .profile import ProfileMixin +from .recent import RecentMixin +from .scrobbler import ScrobblerMixin +from .tags import TagsMixin +from .top import TopMixin +from .utils.base import UtilsMixin +from .utils.tokencheck import * +from .whoknows import WhoKnowsMixin +from .wordcloud import WordCloudMixin + +command_fm = FMMixin.command_fm + + +class LastFM( + ChartMixin, + CompareMixin, + FMMixin, + LoveMixin, + NowPlayingMixin, + ProfileMixin, + RecentMixin, + ScrobblerMixin, + TagsMixin, + TopMixin, + UtilsMixin, + WhoKnowsMixin, + WordCloudMixin, + commands.Cog, + metaclass=CompositeMetaClass, +): + """ + Interacts with the last.fm API. + """ + + __version__ = "1.7.2" + + # noinspection PyMissingConstructor + def __init__(self, bot, *args, **kwargs): + super().__init__(*args, **kwargs) + self.bot = bot + self.config = Config.get_conf(self, identifier=95932766180343808, force_registration=True) + defaults = {"lastfm_username": None, "session_key": None, "scrobbles": 0, "scrobble": True} + self.config.register_global(version=1) + self.config.register_user(**defaults) + self.config.register_guild(crowns={}) + self.session = aiohttp.ClientSession( + headers={ + "User-Agent": "Mozilla/5.0 (X11; Arch Linux; Linux x86_64; rv:66.0) Gecko/20100101 Firefox/66.0" + }, + ) + self.token = None + self.wc = None + self.login_token = None + self.wordcloud_create() + self.data_loc = bundled_data_path(self) + self.chart_data = {} + self.chart_data_loop = self.bot.loop.create_task(self.chart_clear_loop()) + + def format_help_for_context(self, ctx): + pre_processed = super().format_help_for_context(ctx) + return f"{pre_processed}\n\nCog Version: {self.__version__}" + + async def red_delete_data_for_user(self, *, requester, user_id): + await self.config.user_from_id(user_id).clear() + + async def chart_clear_loop(self): + await self.bot.wait_until_ready() + while True: + self.chart_data = {} + await asyncio.sleep(1800) + + async def initialize(self): + token = await self.bot.get_shared_api_tokens("lastfm") + self.token = token.get("appid") + self.secret = token.get("secret") + self.login_token = token.get("logintoken") + await self.migrate_config() + + async def migrate_config(self): + if await self.config.version() == 1: + a = {} + conf = await self.config.all_guilds() + for guild in conf: + a[guild] = {"crowns": {}} + for artist in conf[guild]["crowns"]: + a[guild]["crowns"][artist.lower()] = conf[guild]["crowns"][artist] + group = self.config._get_base_group(self.config.GUILD) + async with group.all() as new_data: + for guild in a: + new_data[guild] = a[guild] + await self.config.version.set(2) + + @commands.Cog.listener(name="on_red_api_tokens_update") + async def listener_update_class_tokens(self, service_name, api_tokens): + if service_name == "lastfm": + self.token = api_tokens.get("appid") + self.secret = api_tokens.get("secret") + self.login_token = api_tokens.get("logintoken") + + def cog_unload(self): + self.bot.loop.create_task(self.session.close()) + if self.chart_data_loop: + self.chart_data_loop.cancel() + + @commands.is_owner() + @commands.command(name="lastfmset", aliases=["fmset"]) + async def command_lastfmset(self, ctx): + """Instructions on how to set the api key.""" + message = ( + "1. Visit the [LastFM](https://www.last.fm/api/) website and click on 'Get an API Account'.\n" + "2. Fill in the application. Once completed do not exit the page. - " + "Copy all information on the page and save it.\n" + f"3. Enter the api key via `{ctx.clean_prefix}set api lastfm appid `\n" + f"4. Enter the api secret via `{ctx.clean_prefix}set api lastfm secret `\n" + f"--------\n" + f"Some commands that use webscraping may require a login token.\n" + f"1. Visit [LastFM](https://www.last.fm) site and login.\n" + f"2. Open your browser's developer tools and go to the Storage tab.\n" + f"3. Find the cookie named `sessionid` and copy the value.\n" + f"4. Enter the api secret via `{ctx.clean_prefix}set api lastfm logintoken `\n" + ) + await ctx.maybe_send_embed(message) + + @commands.command(name="crowns") + @commands.check(tokencheck) + @commands.guild_only() + async def command_crowns(self, ctx, user: discord.Member = None): + """Check yourself or another users crowns.""" + user = user or ctx.author + crowns = await self.config.guild(ctx.guild).crowns() + crownartists = [] + for key in crowns: + if crowns[key]["user"] == user.id: + crownartists.append((key, crowns[key]["playcount"])) + if crownartists is None: + return await ctx.send( + "You haven't acquired any crowns yet! " + f"Use the `{ctx.clean_prefix}whoknows` command to claim crowns \N{CROWN}" + ) + + rows = [] + for artist, playcount in sorted(crownartists, key=itemgetter(1), reverse=True): + rows.append(f"**{artist}** with **{playcount}** {self.format_plays(playcount)}") + + content = discord.Embed( + title=f"Artist crowns for {user.name} — Total {len(crownartists)} crowns", + color=user.color, + ) + content.set_footer(text="Playcounts are updated on the whoknows command.") + if not rows: + return await ctx.send("You do not have any crowns.") + pages = await self.create_pages(content, rows) + if len(pages) > 1: + await menu(ctx, pages, DEFAULT_CONTROLS) + else: + await ctx.send(embed=pages[0]) + + @command_fm.command( + name="artist", usage="[timeframe] " + ) + async def command_artist(self, ctx, timeframe, datatype, *, artistname=""): + """Your top tracks or albums for specific artist. + + Usage: + [p]fm artist [timeframe] toptracks + [p]fm artist [timeframe] topalbums + [p]fm artist [timeframe] overview """ + conf = await self.config.user(ctx.author).all() + username = conf["lastfm_username"] + + period, _ = self.get_period(timeframe) + if period in [None, "today"]: + artistname = " ".join([datatype, artistname]).strip() + datatype = timeframe + period = "overall" + + artistname = self.remove_mentions(artistname) + + if artistname == "": + return await ctx.send("Missing artist name!") + + if datatype in ["toptracks", "tt", "tracks", "track"]: + datatype = "tracks" + + elif datatype in ["topalbums", "talb", "albums", "album"]: + datatype = "albums" + + elif datatype in ["overview", "stats", "ov"]: + return await self.artist_overview(ctx, period, artistname, username) + + else: + return await ctx.send_help() + + artist, data = await self.artist_top(ctx, period, artistname, datatype, username) + if artist is None or not data: + artistname = escape(artistname) + if period == "overall": + return await ctx.send(f"You have never listened to **{artistname}**!") + else: + return await ctx.send( + f"You have not listened to **{artistname}** in the past {period}s!" + ) + + total = 0 + rows = [] + for i, (name, playcount) in enumerate(data, start=1): + rows.append( + f"`#{i:2}` **{playcount}** {self.format_plays(playcount)} — **{escape(name)}**" + ) + total += playcount + + artistname = urllib.parse.quote_plus(artistname) + content = discord.Embed(color=await ctx.embed_color()) + content.set_thumbnail(url=artist["image_url"]) + content.set_author( + name=f"{ctx.author.display_name} — " + + (f"{self.humanized_period(period)} " if period != "overall" else "") + + f"Top {datatype} by {artist['formatted_name']}", + icon_url=ctx.author.display_avatar.url, + url=f"https://last.fm/user/{username}/library/music/{artistname}/" + f"+{datatype}?date_preset={self.period_http_format(period)}", + ) + content.set_footer( + text=f"Total {total} {self.format_plays(total)} across {len(rows)} {datatype}" + ) + + pages = await self.create_pages(content, rows) + if len(pages) > 1: + await menu(ctx, pages[:15], DEFAULT_CONTROLS) + else: + await ctx.send(embed=pages[0]) + + @command_fm.command(name="last") + async def command_last(self, ctx): + """ + Your weekly listening overview. + """ + conf = await self.config.user(ctx.author).all() + self.check_if_logged_in(conf) + await self.listening_report(ctx, "week", conf["lastfm_username"]) + + @command_fm.command(name="lyrics", aliases=["lyr"]) + async def command_lyrics(self, ctx, *, track: str = None): + """Currently playing song or most recent song.""" + if track is None: + conf = await self.config.user(ctx.author).all() + self.check_if_logged_in(conf) + track, artist, albumname, image_url = await self.get_current_track( + ctx, conf["lastfm_username"] + ) + + title = ( + f"**{escape(artist, formatting=True)}** — ***{escape(track, formatting=True)} ***" + ) + + results, songtitle = await self.lyrics_musixmatch(f"{artist} {track}") + if results is None: + return await ctx.send(f'No lyrics for "{artist} {track}" found.') + embeds = [] + results = list(pagify(results, page_length=2048)) + for i, page in enumerate(results, 1): + content = discord.Embed( + color=await ctx.embed_color(), + description=page, + title=title, + ) + content.set_thumbnail(url=image_url) + if len(results) > 1: + content.set_footer(text=f"Page {i}/{len(results)}") + + embeds.append(content) + if len(embeds) > 1: + await menu(ctx, embeds, DEFAULT_CONTROLS) + else: + await ctx.send(embed=embeds[0]) + else: + + results, songtitle = await self.lyrics_musixmatch(track) + if results is None: + return await ctx.send(f'No lyrics for "{track}" found.') + embeds = [] + results = list(pagify(results, page_length=2048)) + for i, page in enumerate(results, 1): + content = discord.Embed( + color=await ctx.embed_color(), + title=f"***{escape(songtitle, formatting=True)} ***", + description=page, + ) + if len(results) > 1: + content.set_footer(text=f"Page {i}/{len(results)}") + embeds.append(content) + if len(embeds) > 1: + await menu(ctx, embeds, DEFAULT_CONTROLS) + else: + await ctx.send(embed=embeds[0]) + + @command_fm.command(name="streak") + async def command_streak(self, ctx, user: discord.User = None): + """ + View how many times you've listened to something in a row + + Only the most 200 recent plays are tracked + """ + if not user: + user = ctx.author + conf = await self.config.user(user).all() + self.check_if_logged_in(conf, user == ctx.author) + data = await self.api_request( + ctx, + {"user": conf["lastfm_username"], "method": "user.getrecenttracks", "limit": 200}, + ) + tracks = data["recenttracks"]["track"] + if not tracks or not isinstance(tracks, list): + return await ctx.send("You have not listened to anything yet!") + track_streak = [tracks[0]["name"], 1, True] + artist_streak = [tracks[0]["artist"]["#text"], 1, True] + album_streak = [tracks[0]["album"]["#text"], 1, True] + ignore = True + for x in tracks: + if ignore: + ignore = False + continue + if track_streak[2]: + if x["name"] == track_streak[0]: + track_streak[1] += 1 + else: + track_streak[2] = False + if artist_streak[2]: + if x["artist"]["#text"] == artist_streak[0]: + artist_streak[1] += 1 + else: + artist_streak[2] = False + if album_streak[2]: + if x["album"]["#text"] == album_streak[0]: + album_streak[1] += 1 + else: + album_streak[2] = False + + if not track_streak[2] and not artist_streak[2] and not album_streak[2]: + break + + if track_streak[1] == 1 and artist_streak[1] == 1 and album_streak[1] == 1: + return await ctx.send("You have not listened to anything in a row.") + embed = discord.Embed(color=await ctx.embed_color(), title=f"{user.name}'s streaks") + embed.set_thumbnail(url=tracks[0]["image"][3]["#text"]) + if track_streak[1] > 1: + embed.add_field( + name="Track", value=f"{track_streak[1]} times in a row \n({track_streak[0][:50]})" + ) + if artist_streak[1] > 1: + embed.add_field( + name="Artist", + value=f"{artist_streak[1]} times in a row \n({artist_streak[0][:50]})", + ) + if album_streak[1] > 1: + embed.add_field( + name="Album", value=f"{album_streak[1]} times in a row \n({album_streak[0][:50]})" + ) + + await ctx.send(embed=embed) + + async def cog_command_error(self, ctx, error): + if hasattr(error, "original"): + if isinstance(error.original, SilentDeAuthorizedError): + return + if isinstance(error.original, LastFMError): + await ctx.send(str(error.original)) + return + await ctx.bot.on_command_error(ctx, error, unhandled_by_cog=True) diff --git a/lastfm/love.py b/lastfm/love.py new file mode 100644 index 0000000..d1283ae --- /dev/null +++ b/lastfm/love.py @@ -0,0 +1,145 @@ +import discord +from redbot.core.utils.menus import DEFAULT_CONTROLS, menu + +from .abc import MixinMeta +from .exceptions import * +from .fmmixin import FMMixin + +command_fm = FMMixin.command_fm +command_fm_server = FMMixin.command_fm_server + + +class LoveMixin(MixinMeta): + """Love Commands""" + + async def love_or_unlove_song(self, track, artist, love, key): + params = { + "api_key": self.token, + "artist": artist, + "sk": key, + "track": track, + } + if love: + params["method"] = "track.love" + else: + params["method"] = "track.unlove" + data = await self.api_post(params=params) + return data + + @command_fm.command(name="love", usage=" | ") + async def command_love(self, ctx, *, track=None): + """ + Love a song on last.fm. + + Usage: + [p]love + [p]love | + """ + conf = await self.config.user(ctx.author).all() + self.check_if_logged_in_and_sk(conf) + if track: + try: + trackname, artistname = [x.strip() for x in track.split("|")] + if trackname == "" or artistname == "": + raise ValueError + except ValueError: + return await ctx.send("\N{WARNING SIGN} Incorrect format! use `track | artist`") + else: + trackname, artistname, albumname, imageurl = await self.get_current_track( + ctx, conf["lastfm_username"] + ) + + data = await self.api_request( + ctx, + { + "username": conf["lastfm_username"], + "method": "track.getInfo", + "track": trackname, + "artist": artistname, + }, + ) + + if data["track"].get("userloved", "0") == "1": + return await ctx.send( + f"This song is already loved. Did you mean to run `{ctx.clean_prefix}fm unlove`?" + ) + + result = await self.love_or_unlove_song( + data["track"]["name"], data["track"]["artist"]["name"], True, conf["session_key"] + ) + await self.maybe_send_403_msg(ctx, result) + await ctx.send(f"Loved **{trackname[:50]}** by **{artistname[:50]}**") + + @command_fm.command(name="unlove", usage=" | ") + async def command_unlove(self, ctx, *, track=None): + """ + Unlove a song on last.fm. + + Usage: + [p]unlove + [p]unlove | + """ + conf = await self.config.user(ctx.author).all() + self.check_if_logged_in_and_sk(conf) + if track: + try: + trackname, artistname = [x.strip() for x in track.split("|")] + if trackname == "" or artistname == "": + raise ValueError + except ValueError: + return await ctx.send("\N{WARNING SIGN} Incorrect format! use `track | artist`") + else: + trackname, artistname, albumname, imageurl = await self.get_current_track( + ctx, conf["lastfm_username"] + ) + + data = await self.api_request( + ctx, + { + "username": conf["lastfm_username"], + "method": "track.getInfo", + "track": trackname, + "artist": artistname, + }, + ) + + if data["track"].get("userloved", "0") == "0": + return await ctx.send( + f"This song is not loved. Did you mean to run `{ctx.clean_prefix}fm love`?" + ) + + result = await self.love_or_unlove_song( + data["track"]["name"], data["track"]["artist"]["name"], False, conf["session_key"] + ) + await self.maybe_send_403_msg(ctx, result) + await ctx.send(f"Unloved **{trackname[:50]}** by **{artistname[:50]}**") + + @command_fm.command(name="loved") + async def command_loved(self, ctx, user: discord.User = None): + """ + Get a list of loved songs for a user. + + Usage: + [p]loved + [p]loved + """ + if not user: + user = ctx.author + conf = await self.config.user(user).all() + self.check_if_logged_in_and_sk(conf) + data = await self.api_request( + ctx, {"user": conf["lastfm_username"], "method": "user.getlovedtracks"} + ) + tracks = data["lovedtracks"]["track"] + if not tracks: + return await ctx.send("You have not loved anything yet!") + tracks = [f"{x['name']} by {x['artist']['name']}\n" for x in tracks] + content = discord.Embed(color=await ctx.embed_color(), title=f"{user.name}'s loved tracks") + + pages = await self.create_pages(content, tracks) + for i, page in enumerate(pages): + page.set_footer(text=f"Page {i + 1}/{len(pages)}") + if len(pages) > 1: + await menu(ctx, pages, DEFAULT_CONTROLS) + else: + await ctx.send(embed=pages[0]) diff --git a/lastfm/nowplaying.py b/lastfm/nowplaying.py new file mode 100644 index 0000000..bc2c1e4 --- /dev/null +++ b/lastfm/nowplaying.py @@ -0,0 +1,159 @@ +import asyncio +from typing import Optional + +import discord +from redbot.core.utils.chat_formatting import escape +from redbot.core.utils.menus import DEFAULT_CONTROLS, menu + +from .abc import MixinMeta +from .exceptions import * +from .fmmixin import FMMixin + +command_fm = FMMixin.command_fm +command_fm_server = FMMixin.command_fm_server + + +class NowPlayingMixin(MixinMeta): + """NowPlaying Commands""" + + @command_fm.command(name="nowplaying", aliases=["np"]) + async def command_nowplaying(self, ctx, user: Optional[discord.Member] = None): + """Currently playing song or most recent song.""" + user = user or ctx.author + async with ctx.typing(): + conf = await self.config.user(user).all() + self.check_if_logged_in(conf, user == ctx.author) + data = await self.api_request( + ctx, + { + "user": conf["lastfm_username"], + "method": "user.getrecenttracks", + "limit": 1, + }, + ) + user_attr = data["recenttracks"]["@attr"] + tracks = data["recenttracks"]["track"] + + if not tracks: + return await ctx.send("You have not listened to anything yet!") + try: + artist = tracks[0]["artist"]["#text"] + album = tracks[0]["album"]["#text"] + track = tracks[0]["name"] + image_url = tracks[0]["image"][-1]["#text"] + url = tracks[0]["url"] + except KeyError: + artist = tracks["artist"]["#text"] + album = tracks["album"]["#text"] + track = tracks["name"] + image_url = tracks["image"][-1]["#text"] + url = tracks["url"] + + content = discord.Embed(color=await self.bot.get_embed_color(ctx.channel), url=url) + + content.description = f"**{escape(album, formatting=True)}**" if album else "" + content.title = ( + f"**{escape(artist, formatting=True)}** — ***{escape(track, formatting=True)} ***" + ) + content.set_thumbnail(url=image_url) + + # tags and playcount + trackdata = await self.api_request( + ctx, + { + "user": conf["lastfm_username"], + "method": "track.getInfo", + "artist": artist, + "track": track, + }, + ) + if trackdata is not None: + loved = trackdata["track"].get("userloved", "0") == "1" + if loved: + content.title += " :heart:" + tags = [] + try: + trackdata = trackdata["track"] + playcount = int(trackdata["userplaycount"]) + if playcount > 0: + content.description += f"\n> {playcount} {self.format_plays(playcount)}" + if isinstance(trackdata["toptags"], dict): + for tag in trackdata["toptags"]["tag"]: + if "name" in tag: + tags.append(tag["name"]) + else: + tags.append(tag) + if tags: + content.set_footer(text=", ".join(tags)) + except KeyError: + pass + + # play state + state = "— Most recent track" + try: + if "@attr" in tracks[0]: + if "nowplaying" in tracks[0]["@attr"]: + state = "— Now Playing" + except KeyError: + if "@attr" in tracks: + if "nowplaying" in tracks["@attr"]: + state = "— Now Playing" + + content.set_author( + name=f"{user_attr['user']} {state}", + icon_url=user.display_avatar.url, + ) + if state == "— Most recent track": + msg = "You aren't currently listening to anything, here is the most recent song found." + else: + msg = None + await ctx.send(msg if msg is not None else None, embed=content) + + @command_fm_server.command(name="nowplaying", aliases=["np"]) + async def command_servernp(self, ctx): + """What people on this server are listening to at the moment.""" + listeners = [] + tasks = [] + async with ctx.typing(): + userlist = await self.config.all_users() + guildusers = [x.id for x in ctx.guild.members] + userslist = [user for user in userlist if user in guildusers] + for user in userslist: + lastfm_username = userlist[user]["lastfm_username"] + if lastfm_username is None: + continue + member = ctx.guild.get_member(user) + if member is None: + continue + + tasks.append(self.get_current_track(ctx, lastfm_username, member, True)) + + total_linked = len(tasks) + if tasks: + data = await asyncio.gather(*tasks) + data = [i for i in data if i] + for name, artist, album, image, ref in data: + if name is not None: + listeners.append((name, artist, ref)) + else: + return await ctx.send("Nobody on this server has connected their last.fm account yet!") + + if not listeners: + return await ctx.send("Nobody on this server is listening to anything at the moment!") + + total_listening = len(listeners) + rows = [] + for name, artist, member in listeners: + rows.append(f"{member.mention} **{artist}** — ***{name}***") + + content = discord.Embed(color=await ctx.embed_color()) + content.set_author( + name=f"What is {ctx.guild.name} listening to?", + icon_url=ctx.guild.icon.url if ctx.guild.icon else None, + ) + content.set_footer(text=f"{total_listening} / {total_linked} Members") + pages = await self.create_pages(content, rows) + if len(pages) > 1: + await menu(ctx, pages, DEFAULT_CONTROLS) + else: + await ctx.send(embed=pages[0]) diff --git a/lastfm/profile.py b/lastfm/profile.py new file mode 100644 index 0000000..93106ab --- /dev/null +++ b/lastfm/profile.py @@ -0,0 +1,110 @@ +import asyncio +from typing import Optional + +import discord +from redbot.core import commands +from redbot.core.utils.predicates import MessagePredicate + +from .abc import MixinMeta +from .exceptions import * +from .fmmixin import FMMixin + +command_fm = FMMixin.command_fm +command_fm_server = FMMixin.command_fm_server +from .utils.tokencheck import tokencheck_plus_secret + + +class ProfileMixin(MixinMeta): + """Profile Commands""" + + @command_fm.command(name="login", aliases=["set"]) + @commands.check(tokencheck_plus_secret) + async def command_login(self, ctx): + """Authenticates your last.fm account.""" + params = { + "api_key": self.token, + "method": "auth.getToken", + } + hashed = self.hashRequest(params, self.secret) + params["api_sig"] = hashed + response = await self.api_request(ctx, params=params) + + token = response["token"] + link = f"https://www.last.fm/api/auth/?api_key={self.token}&token={token}" + message = ( + f"Please click [here]({link}) to authorize me to access your account.\n\n" + "You have 90 seconds to successfully authenticate." + ) + embed = discord.Embed( + title="Authorization", description=message, color=await ctx.embed_color() + ) + + try: + await ctx.author.send(embed=embed) + except discord.Forbidden: + await ctx.send("I can't DM you.") + return + + if ctx.guild: + await ctx.send("Check your Direct Messages for instructions on how to log in.") + + params = {"api_key": self.token, "method": "auth.getSession", "token": token} + hashed = self.hashRequest(params, self.secret) + params["api_sig"] = hashed + for x in range(6): + try: + data = await self.api_request(ctx, params=params) + break + except LastFMError as e: + if x == 5: + message = "You took to long to log in. Rerun the command to try again." + embed = discord.Embed( + title="Authorization Timeout", + description=message, + color=await ctx.embed_color(), + ) + await ctx.author.send(embed=embed) + return + await asyncio.sleep(15) + + await self.config.user(ctx.author).lastfm_username.set(data["session"]["name"]) + await self.config.user(ctx.author).session_key.set(data["session"]["key"]) + message = f"Your username is now set as: `{data['session']['name']}`" + embed = discord.Embed(title="Success!", description=message, color=await ctx.embed_color()) + await ctx.author.send(embed=embed) + + @command_fm.command(name="logout", aliases=["unset"]) + async def command_logout(self, ctx): + """ + Deauthenticates your last.fm account. + """ + await ctx.send("Are you sure you want to log out? (yes/no)") + try: + pred = MessagePredicate.yes_or_no(ctx, user=ctx.message.author) + await ctx.bot.wait_for("message", check=pred, timeout=60) + except asyncio.TimeoutError: + await ctx.send( + "You took too long! Use the command again if you still want to log out." + ) + return + if pred.result: + await self.config.user(ctx.author).clear() + await ctx.send("Ok, I've logged you out.") + if ctx.guild: + async with self.config.guild(ctx.guild).crowns() as crowns: + crownlist = [] + for crown in crowns: + if crowns[crown]["user"] == ctx.author.id: + crownlist.append(crown) + for crown in crownlist: + del crowns[crown] + else: + await ctx.send("Ok, I won't log you out.") + + @command_fm.command(name="profile") + async def command_profile(self, ctx, user: Optional[discord.Member] = None): + """Lastfm profile.""" + user = user or ctx.author + conf = await self.config.user(user).all() + self.check_if_logged_in(conf, user == ctx.author) + await ctx.send(embed=await self.get_userinfo_embed(ctx, user, conf["lastfm_username"])) diff --git a/lastfm/recent.py b/lastfm/recent.py new file mode 100644 index 0000000..cc45336 --- /dev/null +++ b/lastfm/recent.py @@ -0,0 +1,120 @@ +import asyncio + +import discord +from redbot.core.utils.chat_formatting import escape +from redbot.core.utils.menus import DEFAULT_CONTROLS, menu + +from .abc import MixinMeta +from .exceptions import * +from .fmmixin import FMMixin + +command_fm = FMMixin.command_fm +command_fm_server = FMMixin.command_fm_server + + +class RecentMixin(MixinMeta): + """Recent Commands""" + + @command_fm.command(name="recent", aliases=["recents", "re"], usage="[amount]") + async def command_recent(self, ctx, size: int = 15): + """Tracks you have recently listened to.""" + conf = await self.config.user(ctx.author).all() + name = conf["lastfm_username"] + self.check_if_logged_in(conf) + async with ctx.typing(): + data = await self.api_request( + ctx, {"user": name, "method": "user.getrecenttracks", "limit": size} + ) + user_attr = data["recenttracks"]["@attr"] + tracks = data["recenttracks"]["track"] + + if not tracks or not isinstance(tracks, list): + return await ctx.send("You have not listened to anything yet!") + + rows = [] + for i, track in enumerate(tracks): + if i >= size: + break + name = escape(track["name"], formatting=True) + track_url = track["url"] + artist_name = escape(track["artist"]["#text"], formatting=True) + if track.get("@attr") and track["@attr"].get("nowplaying"): + extra = ":musical_note:" + else: + extra = f"()" + rows.append(f"[**{artist_name}** — **{name}**]({track_url}) {extra}") + + image_url = tracks[0]["image"][-1]["#text"] + + content = discord.Embed(color=await self.bot.get_embed_color(ctx.channel)) + content.set_thumbnail(url=image_url) + content.set_footer(text=f"Total scrobbles: {user_attr['total']}") + content.set_author( + name=f"{user_attr['user']} — Recent tracks", + icon_url=ctx.message.author.display_avatar.url, + ) + + pages = await self.create_pages(content, rows) + if len(pages) > 1: + await menu(ctx, pages[:15], DEFAULT_CONTROLS) + else: + await ctx.send(embed=pages[0]) + + @command_fm_server.command(name="recent", aliases=["recents", "re"], usage="[amount]") + async def command_recent_server(self, ctx, size: int = 15): + """Tracks recently listened to in this server.""" + listeners = [] + tasks = [] + userlist = await self.config.all_users() + guildusers = [x.id for x in ctx.guild.members] + userslist = [user for user in userlist if user in guildusers] + for user in userslist: + lastfm_username = userlist[user]["lastfm_username"] + if lastfm_username is None: + continue + member = ctx.guild.get_member(user) + if member is None: + continue + + tasks.append(self.get_lastplayed(ctx, lastfm_username, member)) + + total_linked = len(tasks) + total_listening = 0 + if tasks: + data = await asyncio.gather(*tasks) + for song, member_ref in data: + if song is not None: + if song.get("nowplaying"): + total_listening += 1 + listeners.append((song, member_ref)) + else: + return await ctx.send("Nobody on this server has connected their last.fm account yet!") + + if not listeners: + return await ctx.send("Nobody on this server is listening to anything at the moment!") + + listeners = sorted(listeners, key=lambda l: l[0].get("date"), reverse=True) + rows = [] + for song, member in listeners: + suffix = "" + if song.get("nowplaying"): + suffix = ":musical_note: " + else: + suffix = f"()" + rows.append( + f"{member.mention} [**{escape(song.get('artist'), formatting=True)}** — **{escape(song.get('name'), formatting=True)}**]({song.get('url')}) {suffix}" + ) + + content = discord.Embed(color=await ctx.embed_color()) + content.set_author( + name=f"What has {ctx.guild.name} been listening to?", + icon_url = ctx.guild.icon.url if ctx.guild.icon else None, + ) + content.set_footer( + text=f"{total_listening} / {total_linked} Members are listening to music right now" + ) + pages = await self.create_pages(content, rows) + if len(pages) > 1: + await menu(ctx, pages, DEFAULT_CONTROLS) + else: + await ctx.send(embed=pages[0]) diff --git a/lastfm/scrobbler.py b/lastfm/scrobbler.py new file mode 100644 index 0000000..95c250b --- /dev/null +++ b/lastfm/scrobbler.py @@ -0,0 +1,162 @@ +import asyncio +import contextlib +import re + +import arrow +import discord +import lavalink +from redbot.core import commands + +from .abc import MixinMeta +from .exceptions import * +from .fmmixin import FMMixin + +command_fm = FMMixin.command_fm +command_fm_server = FMMixin.command_fm_server + + +class ScrobblerMixin(MixinMeta): + def __init__(self): + # This regex is from GitHub user TheWyn + # Source: https://github.com/TheWyn/Wyn-RedV3Cogs/blob/master/lyrics/lyrics.py#L12-13 + self.regex = re.compile( + ( + r"((\[)|(\()).*(of?ficial|feat\.?|" + r"ft\.?|audio|video|explicit|clean|lyrics?|remix|HD).*(?(2)]|\))" + ), + flags=re.I, + ) + + @commands.command(name="scrobble", usage=" | ") + @commands.cooldown(1, 300, type=commands.BucketType.user) + async def command_scrobble(self, ctx, *, track): + """ + Scrobble a song to last.fm. + + Usage: + [p]scrobble | + """ + conf = await self.config.user(ctx.author).all() + self.check_if_logged_in_and_sk(conf) + try: + trackname, artistname = [x.strip() for x in track.split("|")] + if trackname == "" or artistname == "": + raise ValueError + except ValueError: + return await ctx.send("\N{WARNING SIGN} Incorrect format! use `track | artist`") + + result = await self.scrobble_song( + trackname, artistname, ctx.author, ctx.author, conf["session_key"], False + ) + await self.maybe_send_403_msg(ctx, result) + await ctx.tick() + + @command_fm.command(name="scrobbler") + async def command_scrobbler(self, ctx): + """ + Toggles automatic scrobbling in VC. + + Note: this also toggles the setting of now playing in VC as well. + """ + current = await self.config.user(ctx.author).scrobble() + new = not current + await self.config.user(ctx.author).scrobble.set(new) + if new: + await ctx.send("\N{WHITE HEAVY CHECK MARK} VC scrobbling enabled.") + else: + await ctx.send("\N{CROSS MARK} VC scrobbling disabled.") + + async def scrobble_song(self, track, artist, user, requester, key, is_vc): + params = { + "api_key": self.token, + "artist": artist, + "method": "track.scrobble", + "sk": key, + "timestamp": str(arrow.utcnow().timestamp()), + "track": track, + } + data = await self.api_post(params=params) + if data[0] == 200 and is_vc: + scrobbles = await self.config.user(user).scrobbles() + if not scrobbles: + scrobbles = 0 + scrobbles += 1 + await self.config.user(user).scrobbles.set(scrobbles) + return data + + async def set_nowplaying(self, track, artist, user, key): + params = { + "artist": artist, + "method": "track.updateNowPlaying", + "sk": key, + "track": track, + } + data = await self.api_post(params=params) + if data[0] == 403 and data[1]["error"] == 9: + await self.config.user(user).session_key.clear() + await self.config.user(user).lastfm_username.clear() + with contextlib.suppress(discord.HTTPException): + message = ( + "I was unable to scrobble your last song as it seems you have unauthorized me to do so.\n" + "You can reauthorize me using the `fm login` command, but I have logged you out for now." + ) + embed = discord.Embed( + title="Authorization Failed", + description=message, + color=await self.bot.get_embed_color(user.dm_channel), + ) + await user.send(embed=embed) + + async def maybe_scrobble_song( + self, + user: discord.Member, + guild: discord.Guild, + track: lavalink.Track, + artist_name: str, + track_name: str, + session_key: str, + ): + four_minutes = 240 + half_track_length = int((track.length / 1000) / 2) + + time_to_sleep = min(four_minutes, half_track_length) + await asyncio.sleep(time_to_sleep) + + try: + player = lavalink.get_player(guild.id) + except: + return + + if player.current and player.current.uri == track.uri: + await self.scrobble_song(track_name, artist_name, user, guild.me, session_key, True) + + @commands.Cog.listener(name="on_red_audio_track_start") + async def listener_scrobbler_track_start( + self, guild: discord.Guild, track: lavalink.Track, requester: discord.Member + ): + if ( + not (guild and track) + or int(track.length) <= 30000 + or not guild.me + or not guild.me.voice + ): + return + + renamed_track = self.regex.sub("", track.title).strip() + track_array = renamed_track.split("-") + if len(track_array) == 1: + track_array = (track.author, track_array[0]) + track_artist = track_array[0] + track_title = track_array[1] + voice_members = guild.me.voice.channel.members + for member in voice_members: + if member == guild.me or member.bot is True: + continue + user_settings = await self.config.user(member).all() + if user_settings["scrobble"] and user_settings["session_key"]: + await self.set_nowplaying( + track_title, track_artist, member, user_settings["session_key"] + ) + await self.maybe_scrobble_song( + member, guild, track, track_artist, track_title, user_settings["session_key"] + ) diff --git a/lastfm/tags.py b/lastfm/tags.py new file mode 100644 index 0000000..ca91682 --- /dev/null +++ b/lastfm/tags.py @@ -0,0 +1,439 @@ +import discord +from redbot.core.utils.chat_formatting import humanize_list, pagify +from redbot.core.utils.menus import DEFAULT_CONTROLS, menu + +from .abc import MixinMeta +from .exceptions import * +from .fmmixin import FMMixin + +command_fm = FMMixin.command_fm +command_fm_server = FMMixin.command_fm_server + + +class TagsMixin(MixinMeta): + """Tag Commands""" + + @command_fm.group(name="tag") + async def command_tag(self, ctx): + """Commands to tag things""" + + @command_tag.group(name="track", aliases=["tracks", "song"]) + async def command_tag_track(self, ctx): + """Commands to tag tracks""" + + @command_tag_track.command(name="add", usage=",[tag] | [track name] | [artist name]") + async def command_tag_track_add(self, ctx, *, args): + """ + Add tags to a track + + Tags are inputted as a comma separated list in the first group + """ + conf = await self.config.user(ctx.author).all() + self.check_if_logged_in_and_sk(conf) + split_args = [x.strip() for x in args.split("|")] + list_of_tags = [x.strip() for x in split_args[0].split(",")] + list_of_tags = [x for x in list_of_tags if x][:10] + if len(split_args) not in [1, 3] or not list_of_tags: + return await ctx.send( + "\N{WARNING SIGN} Incorrect format! use `,[tag] | [track] | [artist]`" + ) + + if len(split_args) == 1: + + trackname, artistname, albumname, imageurl = await self.get_current_track( + ctx, conf["lastfm_username"] + ) + + else: + trackname = split_args[1] + artistname = split_args[2] + + params = { + "artist": artistname, + "method": "track.addtags", + "sk": conf["session_key"], + "tags": ",".join(list_of_tags), + "track": trackname, + } + data = await self.api_post(params=params) + await self.maybe_send_403_msg(ctx, data) + await ctx.send( + f"Added **{len(list_of_tags)}** {'tag' if len(list_of_tags) == 1 else 'tags'}." + ) + + @command_tag_track.command(name="remove", usage=",[tag] | [track name] | [artist name]") + async def command_tag_track_remove(self, ctx, *, args): + """ + Remove tags from a track + + Tags are inputted as a comma separated list in the first group + """ + conf = await self.config.user(ctx.author).all() + self.check_if_logged_in_and_sk(conf) + split_args = [x.strip() for x in args.split("|")] + list_of_tags = [x.strip() for x in split_args[0].split(",")] + list_of_tags = [x for x in list_of_tags if x][:10] + if len(split_args) not in [1, 3] or not list_of_tags: + return await ctx.send( + "\N{WARNING SIGN} Incorrect format! use `,[tag] | [track] | [artist]`" + ) + + if len(split_args) == 1: + + trackname, artistname, albumname, imageurl = await self.get_current_track( + ctx, conf["lastfm_username"] + ) + + else: + trackname = split_args[1] + artistname = split_args[2] + params = { + "artist": artistname, + "method": "track.removetags", + "sk": conf["session_key"], + "tags": ",".join(list_of_tags), + "track": trackname, + } + data = await self.api_post(params=params) + await self.maybe_send_403_msg(ctx, data) + await ctx.send( + f"Removed **{len(list_of_tags)}** {'tag' if len(list_of_tags) == 1 else 'tags'}." + ) + + @command_tag_track.command(name="list", usage="[track name] | [artist name]") + async def command_tag_track_list(self, ctx, *, args=None): + """ + List tags for a track + + If no arguments are given, the tags for the last track you listened to will be listed + """ + conf = await self.config.user(ctx.author).all() + self.check_if_logged_in_and_sk(conf) + if args: + try: + trackname, artistname = [x.strip() for x in args.split("|")] + if trackname == "" or artistname == "": + raise ValueError + except ValueError: + return await ctx.send("\N{WARNING SIGN} Incorrect format! use `track | artist`") + else: + + trackname, artistname, albumname, imageurl = await self.get_current_track( + ctx, conf["lastfm_username"] + ) + + params = { + "artist": artistname, + "method": "track.gettags", + "sk": conf["session_key"], + "track": trackname, + } + data = await self.api_post(params=params) + await self.maybe_send_403_msg(ctx, data) + if "tag" not in data[1]["tags"]: + return await ctx.send("This track has no tags.") + trackname = data[1]["tags"]["@attr"]["track"] + artistname = data[1]["tags"]["@attr"]["artist"] + embed = discord.Embed( + title=f"Your tags for {trackname} by {artistname}", + color=await ctx.embed_color(), + ) + nicelooking = [] + for tag in data[1]["tags"]["tag"]: + nicelooking.append(f"[{tag['name']}]({tag['url']})") + message = humanize_list(nicelooking) + pages = [] + for page in pagify(message, delims=[","]): + pages.append(page) + embeds = [] + for num, page in enumerate(pages): + embed.description = page + embeds.append(embed) + if len(pages) > 1: + embed.set_footer(text=f"Page {num + 1}/{len(pages)}") + if len(embeds) == 1: + await ctx.send(embed=embeds[0]) + else: + await menu(ctx, embeds, DEFAULT_CONTROLS) + + @command_tag.group(name="album", aliases=["albums"]) + async def command_tag_album(self, ctx): + """Commands to tag albums""" + + @command_tag_album.command(name="add", usage=",[tag] | [album name] | [artist name]") + async def command_tag_album_add(self, ctx, *, args): + """ + Add tags to an album + + Tags are inputted as a comma separated list in the first group + """ + conf = await self.config.user(ctx.author).all() + self.check_if_logged_in_and_sk(conf) + split_args = [x.strip() for x in args.split("|")] + list_of_tags = [x.strip() for x in split_args[0].split(",")] + list_of_tags = [x for x in list_of_tags if x][:10] + if len(split_args) not in [1, 3] or not list_of_tags: + return await ctx.send( + "\N{WARNING SIGN} Incorrect format! use `,[tag] | [album] | [artist]`" + ) + + if len(split_args) == 1: + + trackname, artistname, albumname, imageurl = await self.get_current_track( + ctx, conf["lastfm_username"] + ) + + else: + albumname = split_args[1] + artistname = split_args[2] + + if not albumname: + await ctx.send( + "Your currently playing track does not have an album attached on last.fm." + ) + return + + params = { + "artist": artistname, + "method": "album.addtags", + "sk": conf["session_key"], + "tags": ",".join(list_of_tags), + "album": albumname, + } + data = await self.api_post(params=params) + await self.maybe_send_403_msg(ctx, data) + await ctx.send( + f"Added **{len(list_of_tags)}** {'tag' if len(list_of_tags) == 1 else 'tags'}." + ) + + @command_tag_album.command(name="remove", usage=",[tag] | [album name] | [artist name]") + async def command_tag_album_remove(self, ctx, *, args): + """ + Remove tags from an album + + Tags are inputted as a comma separated list in the first group + """ + conf = await self.config.user(ctx.author).all() + self.check_if_logged_in_and_sk(conf) + split_args = [x.strip() for x in args.split("|")] + list_of_tags = [x.strip() for x in split_args[0].split(",")] + list_of_tags = [x for x in list_of_tags if x][:10] + if len(split_args) not in [1, 3] or not list_of_tags: + return await ctx.send( + "\N{WARNING SIGN} Incorrect format! use `,[tag] | [album] | [artist]`" + ) + + if len(split_args) == 1: + + trackname, artistname, albumname, imageurl = await self.get_current_track( + ctx, conf["lastfm_username"] + ) + + else: + albumname = split_args[1] + artistname = split_args[2] + + if not albumname: + await ctx.send( + "Your currently playing track does not have an album attached on last.fm." + ) + return + + params = { + "artist": artistname, + "method": "album.removetags", + "sk": conf["session_key"], + "tags": ",".join(list_of_tags), + "album": albumname, + } + data = await self.api_post(params=params) + await self.maybe_send_403_msg(ctx, data) + await ctx.send( + f"Removed **{len(list_of_tags)}** {'tag' if len(list_of_tags) == 1 else 'tags'}." + ) + + @command_tag_album.command(name="list", usage="[album name] | [artist name]") + async def command_tag_album_list(self, ctx, *, args=None): + """ + List tags for an album + + If no arguments are given, the tags for the last album you listened to will be listed + """ + conf = await self.config.user(ctx.author).all() + self.check_if_logged_in_and_sk(conf) + if args: + try: + albumname, artistname = [x.strip() for x in args.split("|")] + if albumname == "" or artistname == "": + raise ValueError + except ValueError: + return await ctx.send("\N{WARNING SIGN} Incorrect format! use `track | artist`") + else: + + trackname, artistname, albumname, imageurl = await self.get_current_track( + ctx, conf["lastfm_username"] + ) + + if not albumname: + await ctx.send( + "Your currently playing track does not have an album attached on last.fm." + ) + return + params = { + "artist": artistname, + "method": "album.gettags", + "sk": conf["session_key"], + "album": albumname, + } + data = await self.api_post(params=params) + await self.maybe_send_403_msg(ctx, data) + if "tag" not in data[1]["tags"]: + return await ctx.send("This album has no tags.") + albumname = data[1]["tags"]["@attr"]["album"] + artistname = data[1]["tags"]["@attr"]["artist"] + embed = discord.Embed( + title=f"Your tags for {albumname} by {artistname}", + color=await ctx.embed_color(), + ) + nicelooking = [] + for tag in data[1]["tags"]["tag"]: + nicelooking.append(f"[{tag['name']}]({tag['url']})") + message = humanize_list(nicelooking) + pages = [] + for page in pagify(message, delims=[","]): + pages.append(page) + embeds = [] + for num, page in enumerate(pages): + embed.description = page + embeds.append(embed) + if len(pages) > 1: + embed.set_footer(text=f"Page {num + 1}/{len(pages)}") + if len(embeds) == 1: + await ctx.send(embed=embeds[0]) + else: + await menu(ctx, embeds, DEFAULT_CONTROLS) + + @command_tag.group(name="artist") + async def command_tag_artist(self, ctx): + """Commands to tag tracks""" + + @command_tag_artist.command(name="add", usage=",[tag] | [artist name]") + async def command_tag_artist_add(self, ctx, *, args): + """ + Add tags to an artist + + Tags are inputted as a comma separated list in the first group + """ + conf = await self.config.user(ctx.author).all() + self.check_if_logged_in_and_sk(conf) + split_args = [x.strip() for x in args.split("|")] + list_of_tags = [x.strip() for x in split_args[0].split(",")] + list_of_tags = [x for x in list_of_tags if x][:10] + if len(split_args) not in [1, 2] or not list_of_tags: + return await ctx.send( + "\N{WARNING SIGN} Incorrect format! use `,[tag] | [artist]`" + ) + + if len(split_args) == 1: + + trackname, artistname, albumname, imageurl = await self.get_current_track( + ctx, conf["lastfm_username"] + ) + + else: + artistname = split_args[1] + params = { + "artist": artistname, + "method": "artist.addtags", + "sk": conf["session_key"], + "tags": ",".join(list_of_tags), + } + data = await self.api_post(params=params) + await self.maybe_send_403_msg(ctx, data) + await ctx.send( + f"Added **{len(list_of_tags)}** {'tag' if len(list_of_tags) == 1 else 'tags'}." + ) + + @command_tag_artist.command(name="remove", usage=",[tag] | [artist name]") + async def command_tag_artist_remove(self, ctx, *, args): + """ + Remove tags from an artist + + Tags are inputted as a comma separated list in the first group + """ + conf = await self.config.user(ctx.author).all() + self.check_if_logged_in_and_sk(conf) + split_args = [x.strip() for x in args.split("|")] + list_of_tags = [x.strip() for x in split_args[0].split(",")] + list_of_tags = [x for x in list_of_tags if x][:10] + if len(split_args) not in [1, 2] or not list_of_tags: + return await ctx.send( + "\N{WARNING SIGN} Incorrect format! use `,[tag] | [artist]`" + ) + + if len(split_args) == 1: + + trackname, artistname, albumname, imageurl = await self.get_current_track( + ctx, conf["lastfm_username"] + ) + + else: + artistname = split_args[1] + params = { + "artist": artistname, + "method": "artist.removetags", + "sk": conf["session_key"], + "tags": ",".join(list_of_tags), + } + data = await self.api_post(params=params) + await self.maybe_send_403_msg(ctx, data) + await ctx.send( + f"Removed **{len(list_of_tags)}** {'tag' if len(list_of_tags) == 1 else 'tags'}." + ) + + @command_tag_artist.command(name="list", usage="[artist name]") + async def command_tag_artist_list(self, ctx, *, artist=None): + """ + List tags for an artist + + If no arguments are given, the tags for the last track you listened to will be listed + """ + conf = await self.config.user(ctx.author).all() + self.check_if_logged_in_and_sk(conf) + if not artist: + + trackname, artistname, albumname, imageurl = await self.get_current_track( + ctx, conf["lastfm_username"] + ) + + params = { + "artist": artistname, + "method": "artist.gettags", + "sk": conf["session_key"], + } + data = await self.api_post(params=params) + await self.maybe_send_403_msg(ctx, data) + if "tag" not in data[1]["tags"]: + return await ctx.send("This artist has no tags.") + artistname = data[1]["tags"]["@attr"]["artist"] + embed = discord.Embed( + title=f"Your tags for {artistname}", + color=await ctx.embed_color(), + ) + nicelooking = [] + for tag in data[1]["tags"]["tag"]: + nicelooking.append(f"[{tag['name']}]({tag['url']})") + message = humanize_list(nicelooking) + pages = [] + for page in pagify(message, delims=[","]): + pages.append(page) + embeds = [] + for num, page in enumerate(pages): + embed.description = page + embeds.append(embed) + if len(pages) > 1: + embed.set_footer(text=f"Page {num + 1}/{len(pages)}") + if len(embeds) == 1: + await ctx.send(embed=embeds[0]) + else: + await menu(ctx, embeds, DEFAULT_CONTROLS) diff --git a/lastfm/top.py b/lastfm/top.py new file mode 100644 index 0000000..7ac25b7 --- /dev/null +++ b/lastfm/top.py @@ -0,0 +1,360 @@ +import asyncio + +import discord +from redbot.core.utils.chat_formatting import escape +from redbot.core.utils.menus import DEFAULT_CONTROLS, menu + +from .abc import MixinMeta +from .exceptions import * +from .fmmixin import FMMixin + +command_fm = FMMixin.command_fm +command_fm_server = FMMixin.command_fm_server + + +class TopMixin(MixinMeta): + """Top Artist/Album/Track Commands""" + + @command_fm.command(name="topartists", aliases=["ta"], usage="[timeframe] [amount]") + async def command_topartists(self, ctx, *args): + """Most listened artists.""" + conf = await self.config.user(ctx.author).all() + self.check_if_logged_in(conf) + async with ctx.typing(): + arguments = self.parse_arguments(args) + data = await self.api_request( + ctx, + { + "user": conf["lastfm_username"], + "method": "user.gettopartists", + "period": arguments["period"], + "limit": arguments["amount"], + }, + ) + user_attr = data["topartists"]["@attr"] + artists = data["topartists"]["artist"] + + if not artists: + return await ctx.send("You have not listened to any artists yet!") + + rows = [] + for i, artist in enumerate(artists, start=1): + name = escape(artist["name"], formatting=True) + plays = artist["playcount"] + rows.append(f"`#{i:2}` **{plays}** {self.format_plays(plays)} — **{name}**") + + image_url = await self.scrape_artist_image(artists[0]["name"], ctx) + + content = discord.Embed(color=await self.bot.get_embed_color(ctx.channel)) + content.set_thumbnail(url=image_url) + content.set_footer(text=f"Total unique artists: {user_attr['total']}") + content.set_author( + name=f"{user_attr['user']} — {self.humanized_period(arguments['period']).capitalize()} top artists", + icon_url=ctx.message.author.display_avatar.url, + ) + + pages = await self.create_pages(content, rows) + if len(pages) > 1: + await menu(ctx, pages[:15], DEFAULT_CONTROLS) + else: + await ctx.send(embed=pages[0]) + + @command_fm.command(name="topalbums", aliases=["talb"], usage="[timeframe] [amount]") + async def command_topalbums(self, ctx, *args): + """Most listened albums.""" + conf = await self.config.user(ctx.author).all() + self.check_if_logged_in(conf) + arguments = self.parse_arguments(args) + data = await self.api_request( + ctx, + { + "user": conf["lastfm_username"], + "method": "user.gettopalbums", + "period": arguments["period"], + "limit": arguments["amount"], + }, + ) + user_attr = data["topalbums"]["@attr"] + albums = data["topalbums"]["album"] + + if not albums: + return await ctx.send("You have not listened to any albums yet!") + + rows = [] + for i, album in enumerate(albums, start=1): + name = escape(album["name"], formatting=True) + artist_name = escape(album["artist"]["name"], formatting=True) + plays = album["playcount"] + rows.append( + f"`#{i:2}` **{plays}** {self.format_plays(plays)} — **{artist_name}** — ***{name}***" + ) + + image_url = albums[0]["image"][-1]["#text"] + + content = discord.Embed(color=await self.bot.get_embed_color(ctx.channel)) + content.set_thumbnail(url=image_url) + content.set_footer(text=f"Total unique albums: {user_attr['total']}") + content.set_author( + name=f"{user_attr['user']} — {self.humanized_period(arguments['period']).capitalize()} top albums", + icon_url=ctx.message.author.display_avatar.url, + ) + + pages = await self.create_pages(content, rows) + if len(pages) > 1: + await menu(ctx, pages[:15], DEFAULT_CONTROLS) + else: + await ctx.send(embed=pages[0]) + + @command_fm.command(name="toptracks", aliases=["tt"], usage="[timeframe] [amount]") + async def command_toptracks(self, ctx, *args): + """Most listened tracks.""" + conf = await self.config.user(ctx.author).all() + self.check_if_logged_in(conf) + async with ctx.typing(): + arguments = self.parse_arguments(args) + data = await self.api_request( + ctx, + { + "user": conf["lastfm_username"], + "method": "user.gettoptracks", + "period": arguments["period"], + "limit": arguments["amount"], + }, + ) + user_attr = data["toptracks"]["@attr"] + tracks = data["toptracks"]["track"] + + if not tracks: + return await ctx.send("You have not listened to anything yet!") + + rows = [] + for i, track in enumerate(tracks, start=1): + name = escape(track["name"], formatting=True) + artist_name = escape(track["artist"]["name"], formatting=True) + plays = track["playcount"] + rows.append( + f"`#{i:2}` **{plays}** {self.format_plays(plays)} — **{artist_name}** — ***{name}***" + ) + trackdata = await self.api_request( + ctx, + { + "user": name, + "method": "track.getInfo", + "artist": tracks[0]["artist"]["name"], + "track": tracks[0]["name"], + }, + ) + content = discord.Embed(color=await self.bot.get_embed_color(ctx.channel)) + try: + if trackdata is None: + raise KeyError + image_url = trackdata["track"]["album"]["image"][-1]["#text"] + # image_url_small = trackdata['track']['album']['image'][1]['#text'] + # image_colour = await color_from_image_url(image_url_small) + except KeyError: + image_url = await self.scrape_artist_image(tracks[0]["artist"]["name"], ctx) + # image_colour = await color_from_image_url(image_url) + + content.set_thumbnail(url=image_url) + + content.set_footer(text=f"Total unique tracks: {user_attr['total']}") + content.set_author( + name=f"{user_attr['user']} — {self.humanized_period(arguments['period']).capitalize()} top tracks", + icon_url=ctx.message.author.display_avatar.url, + ) + + pages = await self.create_pages(content, rows) + if len(pages) > 1: + await menu(ctx, pages[:15], DEFAULT_CONTROLS) + else: + await ctx.send(embed=pages[0]) + + @command_fm_server.command(name="topartists", aliases=["ta"]) + async def command_servertopartists(self, ctx): + """Most listened artists in the server.""" + tasks = [] + userlist = await self.config.all_users() + guildusers = [x.id for x in ctx.guild.members] + userslist = [user for user in userlist if user in guildusers] + for user in userslist: + lastfm_username = userlist[user]["lastfm_username"] + if lastfm_username is None: + continue + member = ctx.guild.get_member(user) + if member is None: + continue + + tasks.append( + self.get_server_top( + ctx, + lastfm_username, + "artist", + "overall", + 100, + ) + ) + if not tasks: + return await ctx.send("No users have logged in to LastFM!") + async with ctx.typing(): + mapping = {} + total_users = 0 + total_plays = 0 + data = await asyncio.gather(*tasks) + for user in data: + if user is None: + continue + total_users += 1 + for user_data in user: + artist_name = user_data["name"] + artist_plays = int(user_data["playcount"]) + total_plays += artist_plays + if artist_name in mapping: + mapping[artist_name] += artist_plays + else: + mapping[artist_name] = artist_plays + + rows = [] + for i, (artist, playcount) in enumerate( + sorted(mapping.items(), key=lambda x: x[1], reverse=True), start=1 + ): + name = escape(artist, formatting=True) + plays = playcount + rows.append(f"`#{i:2}` **{plays}** {self.format_plays(plays)} — **{name}**") + + content = discord.Embed( + title=f"Most listened to artists in {ctx.guild}", + color=await self.bot.get_embed_color(ctx.channel), + ) + content.set_footer(text=f"Top 100 artists of {total_users} users.") + + pages = await self.create_pages(content, rows) + if len(pages) > 1: + await menu(ctx, pages[:15], DEFAULT_CONTROLS) + else: + await ctx.send(embed=pages[0]) + + @command_fm_server.command(name="topalbums", aliases=["talb"]) + async def command_servertopalbums(self, ctx): + """Most listened albums in the server.""" + tasks = [] + userlist = await self.config.all_users() + guildusers = [x.id for x in ctx.guild.members] + userslist = [user for user in userlist if user in guildusers] + for user in userslist: + lastfm_username = userlist[user]["lastfm_username"] + if lastfm_username is None: + continue + member = ctx.guild.get_member(user) + if member is None: + continue + + tasks.append( + self.get_server_top( + ctx, + lastfm_username, + "album", + "overall", + 100, + ) + ) + if not tasks: + return await ctx.send("No users have logged in to LastFM!") + async with ctx.typing(): + mapping = {} + total_users = 0 + total_plays = 0 + data = await asyncio.gather(*tasks) + for user in data: + if user is None: + continue + total_users += 1 + for user_data in user: + name = f'**{escape(user_data["artist"]["name"], formatting=True)}** — **{escape(user_data["name"], formatting=True)}**' + plays = int(user_data["playcount"]) + total_plays += plays + if name in mapping: + mapping[name] += plays + else: + mapping[name] = plays + + rows = [] + for i, (album, playcount) in enumerate( + sorted(mapping.items(), key=lambda x: x[1], reverse=True), start=1 + ): + plays = playcount + rows.append(f"`#{i:2}` **{plays}** {self.format_plays(plays)} — {album}") + + content = discord.Embed( + title=f"Most listened to albums in {ctx.guild}", + color=await self.bot.get_embed_color(ctx.channel), + ) + content.set_footer(text=f"Top 100 albums of {total_users} users.") + + pages = await self.create_pages(content, rows) + if len(pages) > 1: + await menu(ctx, pages[:15], DEFAULT_CONTROLS) + else: + await ctx.send(embed=pages[0]) + + @command_fm_server.command(name="toptracks", aliases=["tt"]) + async def command_servertoptracks(self, ctx): + """Most listened tracks in the server.""" + tasks = [] + userlist = await self.config.all_users() + guildusers = [x.id for x in ctx.guild.members] + userslist = [user for user in userlist if user in guildusers] + for user in userslist: + lastfm_username = userlist[user]["lastfm_username"] + if lastfm_username is None: + continue + member = ctx.guild.get_member(user) + if member is None: + continue + + tasks.append( + self.get_server_top( + ctx, + lastfm_username, + "track", + "overall", + 100, + ) + ) + if not tasks: + return await ctx.send("No users have logged in to LastFM!") + async with ctx.typing(): + mapping = {} + total_users = 0 + total_plays = 0 + data = await asyncio.gather(*tasks) + for user in data: + if user is None: + continue + total_users += 1 + for user_data in user: + name = f'**{escape(user_data["artist"]["name"], formatting=True)}** — **{escape(user_data["name"], formatting=True)}**' + plays = int(user_data["playcount"]) + total_plays += plays + if name in mapping: + mapping[name] += plays + else: + mapping[name] = plays + + rows = [] + for i, (track, playcount) in enumerate( + sorted(mapping.items(), key=lambda x: x[1], reverse=True), start=1 + ): + plays = playcount + rows.append(f"`#{i:2}` **{plays}** {self.format_plays(plays)} — {track}") + + content = discord.Embed( + title=f"Most listened to tracks in {ctx.guild}", + color=await self.bot.get_embed_color(ctx.channel), + ) + content.set_footer(text=f"Top 100 tracks of {total_users} users.") + + pages = await self.create_pages(content, rows) + if len(pages) > 1: + await menu(ctx, pages[:15], DEFAULT_CONTROLS) + else: + await ctx.send(embed=pages[0]) diff --git a/lastfm/utils/__init__.py b/lastfm/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/lastfm/utils/api.py b/lastfm/utils/api.py new file mode 100644 index 0000000..0de71e9 --- /dev/null +++ b/lastfm/utils/api.py @@ -0,0 +1,159 @@ +import contextlib + +import aiohttp +import arrow + +from ..exceptions import * + + +class APIMixin: + async def api_request(self, ctx, params, supress_errors=False): + """Get json data from the lastfm api""" + url = "http://ws.audioscrobbler.com/2.0/" + params["api_key"] = self.token + params["format"] = "json" + async with self.session.get(url, params=params) as response: + with contextlib.suppress(aiohttp.ContentTypeError): + content = await response.json() + if "error" in content or response.status != 200: + if supress_errors: + return + raise LastFMError( + f"Last.fm returned an error: {content.get('message')} | Error code {content.get('error')}" + ) + return content + + async def api_post(self, params): + """Post data to the lastfm api""" + url = "http://ws.audioscrobbler.com/2.0/" + params["api_key"] = self.token + hashed = self.hashRequest(params, self.secret) + params["api_sig"] = hashed + params["format"] = "json" + async with self.session.post(url, params=params) as response: + with contextlib.suppress(aiohttp.ContentTypeError): + content = await response.json() + return response.status, content + + async def fetch(self, ctx, url, params=None, handling="json"): + if params is None: + params = {} + cookies = {'sessionid': self.login_token} + async with self.session.get(url, params=params, cookies=cookies) as response: + + if handling == "json": + return await response.json() + if handling == "text": + return await response.text() + return await response + + async def get_current_track(self, ctx, username, ref=None, supress_errors=False): + data = await self.api_request( + ctx, {"method": "user.getrecenttracks", "user": username, "limit": 1}, supress_errors + ) + if not data: + return + tracks = data["recenttracks"]["track"] + if type(tracks) == list: + if tracks: + track = tracks[0] + else: + if supress_errors: + return + raise NoScrobblesError("You haven't scrobbled anything yet.") + else: + track = tracks + + if "@attr" in track and "nowplaying" in track["@attr"]: + + name = track["name"] + artist = track["artist"]["#text"] + image = track["image"][-1]["#text"] + album = None + if "#text" in track["album"]: + album = track["album"]["#text"] + if ref: + return name, artist, album, image, ref + else: + return name, artist, album, image + + if not ref: + if supress_errors: + return + raise NotScrobblingError("You aren't currently listening to anything.") + else: + return None, None, None, None, ref + + async def get_server_top(self, ctx, username, request_type, period, limit=100): + if request_type == "artist": + data = await self.api_request( + ctx, + { + "user": username, + "method": "user.gettopartists", + "limit": limit, + "period": period, + }, + True, + ) + return data["topartists"]["artist"] if data is not None else None + if request_type == "album": + data = await self.api_request( + ctx, + { + "user": username, + "method": "user.gettopalbums", + "limit": limit, + "period": period, + }, + True, + ) + return data["topalbums"]["album"] if data is not None else None + if request_type == "track": + data = await self.api_request( + ctx, + { + "user": username, + "method": "user.gettoptracks", + "limit": limit, + "period": period, + }, + True, + ) + return data["toptracks"]["track"] if data is not None else None + + async def get_lastplayed(self, ctx, username, ref): + data = await self.api_request( + ctx, + {"method": "user.getrecenttracks", "user": username, "limit": 1}, + True, + ) + song = None + if data: + tracks = data["recenttracks"]["track"] + if type(tracks) == list: + if tracks: + track = tracks[0] + else: + return None, ref + else: + track = tracks + + nowplaying = False + if track.get("@attr") and track["@attr"].get("nowplaying"): + nowplaying = True + + if track.get("date"): + date = tracks[0]["date"]["uts"] + else: + date = arrow.utcnow().int_timestamp + + song = { + "name": track["name"], + "artist": track["artist"]["#text"], + "nowplaying": nowplaying, + "date": int(date), + "url": track["url"], + } + + return song, ref diff --git a/lastfm/utils/base.py b/lastfm/utils/base.py new file mode 100644 index 0000000..0a7acdd --- /dev/null +++ b/lastfm/utils/base.py @@ -0,0 +1,399 @@ +import hashlib +import re +import urllib +from copy import deepcopy + +import arrow +import discord +import tabulate +from bs4 import BeautifulSoup +from redbot.core.utils.chat_formatting import box + +from ..abc import * +from ..exceptions import * +from .api import APIMixin +from .converters import ConvertersMixin +from .scraping import ScrapingMixin + + +class UtilsMixin(APIMixin, ConvertersMixin, ScrapingMixin): + """Utils""" + + def remove_mentions(self, text): + """Remove mentions from string.""" + return (re.sub(r"<@\!?[0-9]+>", "", text)).strip() + + async def artist_overview(self, ctx, period, artistname, fmname): + """Overall artist view""" + albums = [] + tracks = [] + metadata = [None, None, None] + artistinfo = await self.api_request( + ctx, {"method": "artist.getInfo", "artist": artistname} + ) + url = ( + f"https://last.fm/user/{fmname}/library/music/{artistname}" + f"?date_preset={self.period_http_format(period)}" + ) + data = await self.fetch(ctx, url, handling="text") + soup = BeautifulSoup(data, "html.parser") + try: + albumsdiv, tracksdiv, _ = soup.findAll("tbody", {"data-playlisting-add-entries": ""}) + except ValueError: + if period == "overall": + return await ctx.send(f"You have never listened to **{artistname}**!") + return await ctx.send( + f"You have not listened to **{artistname}** in the past {period}s!" + ) + + for container, destination in zip([albumsdiv, tracksdiv], [albums, tracks]): + items = container.findAll("tr", {"class": "chartlist-row"}) + for item in items: + name = item.find("td", {"class": "chartlist-name"}).find("a").get("title") + playcount = ( + item.find("span", {"class": "chartlist-count-bar-value"}) + .text.replace("scrobbles", "") + .replace("scrobble", "") + .strip() + ) + destination.append((name, playcount)) + + metadata_list = soup.find("ul", {"class": "metadata-list"}) + for i, metadata_item in enumerate( + metadata_list.findAll("p", {"class": "metadata-display"}) + ): + metadata[i] = int(metadata_item.text.replace(",", "")) + + artist = { + "image_url": soup.find("span", {"class": "library-header-image"}) + .find("img") + .get("src") + .replace("avatar70s", "avatar300s"), + "formatted_name": soup.find("h2", {"class": "library-header-title"}).text.strip(), + } + + similar = [a["name"] for a in artistinfo["artist"]["similar"]["artist"]] + tags = [t["name"] for t in artistinfo["artist"]["tags"]["tag"]] + + content = discord.Embed(color=await self.bot.get_embed_color(ctx.channel)) + content.set_thumbnail(url=artist["image_url"]) + content.set_author( + name=f"{ctx.author.name} — {artist['formatted_name']} " + + (f"{self.humanized_period(period)} " if period != "overall" else "") + + "Overview", + icon_url=ctx.author.display_avatar.url, + url=f"https://last.fm/user/{fmname}/library/music/{urllib.parse.quote_plus(artistname)}?date_preset={self.period_http_format(period)}", + ) + content.set_footer(text=f"{', '.join(tags)}") + + crowns = await self.config.guild(ctx.guild).crowns() + crown_holder = crowns.get(artistname.lower(), None) + if crown_holder is None or crown_holder["user"] != ctx.author.id: + crownstate = None + else: + crownstate = "👑" + if crownstate is not None: + stats = [[crownstate, str(metadata[0]), str(metadata[1]), str(metadata[2])]] + headers = ["-", "Scrobbles", "Albums", "Tracks"] + else: + stats = [[str(metadata[0]), str(metadata[1]), str(metadata[2])]] + headers = ["Scrobbles", "Albums", "Tracks"] + content.description = box(tabulate.tabulate(stats, headers=headers), lang="prolog") + + content.add_field( + name="Top albums", + value="\n".join( + f"`#{i:2}` **{item}** ({playcount})" + for i, (item, playcount) in enumerate(albums, start=1) + ), + inline=True, + ) + content.add_field( + name="Top tracks", + value="\n".join( + f"`#{i:2}` **{item}** ({playcount})" + for i, (item, playcount) in enumerate(tracks, start=1) + ), + inline=True, + ) + if similar: + content.add_field(name="Similar artists", value=", ".join(similar), inline=False) + await ctx.send(embed=content) + + async def get_userinfo_embed(self, ctx, user, username): + data = await self.api_request(ctx, {"user": username, "method": "user.getinfo"}) + if not data: + raise LastFMError + + username = data["user"]["name"] + playcount = data["user"]["playcount"] + profile_url = data["user"]["url"] + profile_pic_url = data["user"]["image"][3]["#text"] + vc_scrobbles = await self.config.user(user).scrobbles() + timestamp = int(data["user"]["registered"]["unixtime"]) + exact_time = f"" + relative_time = f"" + + content = discord.Embed( + title=f"\N{OPTICAL DISC} {username}", color=await ctx.embed_color() + ) + content.add_field(name="Last.fm profile", value=f"[Link]({profile_url})", inline=True) + content.add_field( + name="Registered", + value=f"{exact_time}\n({relative_time})", + inline=True, + ) + content.set_thumbnail(url=profile_pic_url) + + footer = f"Total plays: {playcount}" + + if vc_scrobbles: + footer += f" | VC Plays: {vc_scrobbles}" + + content.set_footer(text=footer) + return content + + async def listening_report(self, ctx, timeframe, name): + current_day_floor = arrow.utcnow().floor("day") + week = [] + for i in range(1, 8): + dt = current_day_floor.shift(days=-i) + week.append( + { + "dt": int(dt.timestamp()), + "ts": int(dt.timestamp()), + "ts_to": int(dt.shift(days=+1, minutes=-1).timestamp()), + "day": dt.format("ddd, MMM Do"), + "scrobbles": 0, + } + ) + + params = { + "method": "user.getrecenttracks", + "user": name, + "from": week[-1]["ts"], + "to": int(current_day_floor.shift(minutes=-1).timestamp()), + "limit": 1000, + } + content = await self.api_request(ctx, params) + tracks = content["recenttracks"]["track"] + if not tracks or not isinstance(tracks, list): + await ctx.send("No data found.") + return + + # get rid of nowplaying track if user is currently scrobbling. + # for some reason even with from and to parameters it appears + if tracks[0].get("@attr") is not None: + tracks = tracks[1:] + + day_counter = 1 + for trackdata in reversed(tracks): + scrobble_ts = int(trackdata["date"]["uts"]) + if scrobble_ts > week[-day_counter]["ts_to"]: + day_counter += 1 + + week[day_counter - 1]["scrobbles"] += 1 + + scrobbles_total = sum(day["scrobbles"] for day in week) + scrobbles_average = round(scrobbles_total / len(week)) + + rows = [] + for day in week: + rows.append(f"`{day['day']}`: **{day['scrobbles']}** Scrobbles") + + content = discord.Embed(color=await self.bot.get_embed_color(ctx.channel)) + content.set_author( + name=f"{ctx.author.display_name} | Last {timeframe.title()}", + icon_url=ctx.author.display_avatar.url, + ) + content.description = "\n".join(rows) + content.add_field( + name="Total scrobbles", value=f"{scrobbles_total} Scrobbles", inline=False + ) + content.add_field( + name="Avg. daily scrobbles", value=f"{scrobbles_average} Scrobbles", inline=False + ) + await ctx.send(embed=content) + + async def create_pages(self, content, rows, maxrows=15, maxpages=10): + pages = [] + content.description = "" + thisrow = 0 + rowcount = len(rows) + for row in rows: + thisrow += 1 + if len(content.description) + len(row) < 2000 and thisrow < maxrows + 1: + content.description += f"\n{row}" + rowcount -= 1 + else: + thisrow = 1 + if len(pages) == maxpages - 1: + content.description += f"\n*+ {rowcount} more entries...*" + pages.append(content) + content = None + break + + pages.append(content) + content = deepcopy(content) + content.description = f"{row}" + rowcount -= 1 + if content is not None and not content.description == "": + pages.append(content) + + return pages + + def hashRequest(self, obj, secretKey): + """ + This hashing function is courtesy of GitHub user huberf. + It is licensed under the MIT license. + Source: https://github.com/huberf/lastfm-scrobbler/blob/master/lastpy/__init__.py#L50-L60 + """ + string = "" + items = obj.keys() + items = sorted(items) + for i in items: + string += i + string += obj[i] + string += secretKey + stringToHash = string.encode("utf8") + requestHash = hashlib.md5(stringToHash).hexdigest() + return requestHash + + def check_if_logged_in(self, conf, same_person=True): + you_or_they = "You" if same_person else "They" + if not conf["lastfm_username"]: + raise NotLoggedInError( + f"{you_or_they} need to log into a last.fm account. Please log in with `fm login`." + ) + + def check_if_logged_in_and_sk(self, conf): + if not conf["session_key"] and not conf["lastfm_username"]: + raise NotLoggedInError( + "You need to log into a last.fm account. Please log in with `fm login`." + ) + if not conf["session_key"] and conf["lastfm_username"]: + raise NeedToReauthorizeError( + "You appear to be an old user of this cog. " + "To use this command you will need to reauthorize with `fm login`." + ) + + async def maybe_send_403_msg(self, ctx, data): + if data[0] == 403 and data[1]["error"] == 9: + await self.config.user(ctx.author).session_key.clear() + await self.config.user(ctx.author).lastfm_username.clear() + message = ( + "I was unable to add your tags as it seems you have unauthorized me to do so.\n" + "You can reauthorize me using the `fm login` command, but I have logged you out for now." + ) + embed = discord.Embed( + title="Authorization Failed", + description=message, + color=await ctx.embed_color(), + ) + await ctx.send(embed=embed) + raise SilentDeAuthorizedError + + async def get_playcount_track(self, ctx, username, artist, track, period, reference=None): + if period != "overall": + return await self.get_playcount_track_scraper(ctx, username, artist, track, period) + + try: + data = await self.api_request( + ctx, + { + "method": "track.getinfo", + "user": username, + "track": track, + "artist": artist, + "autocorrect": 1, + }, + ) + except LastFMError: + data = {} + + try: + count = int(data["track"]["userplaycount"]) + except KeyError: + count = 0 + try: + artistname = data["track"]["artist"]["name"] + trackname = data["track"]["name"] + except KeyError: + artistname = None + trackname = None + + try: + image_url = data["track"]["album"]["image"][-1]["#text"] + except KeyError: + image_url = None + + if reference is None: + return count + else: + return count, reference, (artistname, trackname, image_url) + + async def get_playcount_album(self, ctx, username, artist, album, period, reference=None): + if period != "overall": + return await self.get_playcount_album_scraper(ctx, username, artist, album, period) + try: + data = await self.api_request( + ctx, + { + "method": "album.getinfo", + "user": username, + "album": album, + "artist": artist, + "autocorrect": 1, + }, + ) + except LastFMError: + data = {} + try: + count = int(data["album"]["userplaycount"]) + except (KeyError, TypeError): + count = 0 + + try: + artistname = data["album"]["artist"] + albumname = data["album"]["name"] + except KeyError: + artistname = None + albumname = None + + try: + image_url = data["album"]["image"][-1]["#text"] + except KeyError: + image_url = None + + if reference is None: + return count + else: + return count, reference, (artistname, albumname, image_url) + + async def get_playcount(self, ctx, username, artist, period, reference=None): + if period != "overall": + return await self.get_playcount_scraper(ctx, username, artist, period) + + try: + data = await self.api_request( + ctx, + { + "method": "artist.getinfo", + "user": username, + "artist": artist, + "autocorrect": 1, + }, + ) + except LastFMError: + data = {} + try: + count = int(data["artist"]["stats"]["userplaycount"]) + name = data["artist"]["name"] + except (KeyError, TypeError): + count = 0 + name = None + + if not reference: + return count + + return count, reference, name diff --git a/lastfm/utils/converters.py b/lastfm/utils/converters.py new file mode 100644 index 0000000..3c026af --- /dev/null +++ b/lastfm/utils/converters.py @@ -0,0 +1,127 @@ +class ConvertersMixin: + def format_plays(self, amount): + if amount == 1: + return "play" + return "plays" + + def get_period(self, timeframe): + if timeframe in ["7day", "7days", "weekly", "week", "1week", "7d"]: + period = "7day", "past week" + elif timeframe in ["30day", "30days", "monthly", "month", "1month", "1m"]: + period = "1month", "past month" + elif timeframe in ["90day", "90days", "3months", "3month", "3m"]: + period = "3month", "past 3 months" + elif timeframe in ["180day", "180days", "6months", "6month", "halfyear", "hy", "6m"]: + period = "6month", "past 6 months" + elif timeframe in [ + "365day", + "365days", + "1year", + "year", + "12months", + "12month", + "y", + "1y", + "12m", + ]: + period = "12month", "past year" + elif timeframe in ["at", "alltime", "overall"]: + period = "overall", "overall" + else: + period = None, None + + return period + + def humanized_period(self, period): + if period == "7day": + humanized = "weekly" + elif period == "1month": + humanized = "monthly" + elif period == "3month": + humanized = "past 3 months" + elif period == "6month": + humanized = "past 6 months" + elif period == "12month": + humanized = "yearly" + else: + humanized = "alltime" + + return humanized + + def period_http_format(self, period): + period_format_map = { + "7day": "LAST_7_DAYS", + "1month": "LAST_30_DAYS", + "3month": "LAST_90_DAYS", + "6month": "LAST_180_DAYS", + "12month": "LAST_365_DAYS", + "overall": "ALL", + } + return period_format_map.get(period) + + def parse_arguments(self, args): + parsed = {"period": None, "amount": None} + for a in args: + if parsed["amount"] is None: + try: + parsed["amount"] = int(a) + continue + except ValueError: + pass + if parsed["period"] is None: + parsed["period"], _ = self.get_period(a) + + if parsed["period"] is None: + parsed["period"] = "overall" + if parsed["amount"] is None: + parsed["amount"] = 15 + return parsed + + def parse_chart_arguments(self, args): + parsed = { + "period": None, + "amount": None, + "width": None, + "height": None, + "method": None, + "path": None, + } + for a in args: + a = a.lower() + if parsed["amount"] is None: + try: + size = a.split("x") + parsed["width"] = int(size[0]) + if len(size) > 1: + parsed["height"] = int(size[1]) + else: + parsed["height"] = int(size[0]) + continue + except ValueError: + pass + + if parsed["method"] is None: + if a in ["talb", "topalbums", "albums", "album"]: + parsed["method"] = "user.gettopalbums" + continue + elif a in ["ta", "topartists", "artists", "artist"]: + parsed["method"] = "user.gettopartists" + continue + elif a in ["re", "recent", "recents"]: + parsed["method"] = "user.getrecenttracks" + continue + elif a in ["tracks", "track"]: + parsed["method"] = "user.gettoptracks" + + if parsed["period"] is None: + parsed["period"], _ = self.get_period(a) + + if parsed["period"] is None: + parsed["period"] = "7day" + if parsed["width"] is None: + parsed["width"] = 3 + parsed["height"] = 3 + if parsed["method"] is None: + parsed["method"] = "user.gettopalbums" + parsed["amount"] = parsed["width"] * parsed["height"] + return parsed diff --git a/lastfm/utils/scraping.py b/lastfm/utils/scraping.py new file mode 100644 index 0000000..15338fe --- /dev/null +++ b/lastfm/utils/scraping.py @@ -0,0 +1,184 @@ +import asyncio +import math +import re +import urllib +from typing import Tuple + +from bs4 import BeautifulSoup + + +class ScrapingMixin: + async def artist_top(self, ctx, period, artistname, datatype, name): + """Scrape either top tracks or top albums from lastfm library page.""" + if not self.login_token: + return None, [] + url = ( + f"https://last.fm/user/{name}/library/music/{artistname}/" + f"+{datatype}?date_preset={self.period_http_format(period)}" + ) + data = await self.fetch(ctx, url, handling="text") + soup = BeautifulSoup(data, "html.parser") + data = [] + try: + chartlist = soup.find("tbody", {"data-playlisting-add-entries": ""}) + except ValueError: + return None, [] + + artist = { + "image_url": soup.find("span", {"class": "library-header-image"}) + .find("img") + .get("src") + .replace("avatar70s", "avatar300s"), + "formatted_name": soup.find("a", {"class": "library-header-crumb"}).text.strip(), + } + + items = chartlist.findAll("tr", {"class": "chartlist-row"}) + for item in items: + name = item.find("td", {"class": "chartlist-name"}).find("a").get("title") + playcount = ( + item.find("span", {"class": "chartlist-count-bar-value"}) + .text.replace("scrobbles", "") + .replace("scrobble", "") + .strip() + ) + data.append((name, int(playcount.replace(",", "")))) + + return artist, data + + async def lyrics_musixmatch(self, artistsong) -> Tuple[str, str]: + artistsong = re.sub("[^a-zA-Z0-9 \n.]", "", artistsong) + artistsong = re.sub(r"\s+", " ", artistsong).strip() + headers = { + "User-Agent": "Mozilla/5.0 (X11; Arch Linux; Linux x86_64; rv:66.0) Gecko/20100101 Firefox/66.0" + } + async with self.session.get( + "https://musixmatch.com/search/{}".format(artistsong).replace(" ", "%20"), + headers=headers, + ) as resp: + if resp.status == 200: + result = await resp.text() + else: + return None, None + soup = BeautifulSoup(result, "html.parser") + songurl = soup.find("a", {"class": "title"}) + if songurl is None: + return None, None + url = "https://www.musixmatch.com" + songurl["href"] + async with self.session.get(url, headers=headers) as resp: + result = await resp.text() + soup = BeautifulSoup(result, "html.parser") + lyrics = soup.text.split('"body":"') + lyrics = lyrics[0] + songname = lyrics.split("|")[0] + lyrics = lyrics.split('","language"')[0] + try: + lyrics = lyrics.split("languages")[1] + except IndexError: + return None, None + lyrics = lyrics.split("Report")[0] + lyrics = lyrics.replace("\\n", "\n") + lyrics = lyrics.replace("\\", "") + lyrics = lyrics.replace("&", "&") + lyrics = lyrics.replace("`", "'") + lyrics = lyrics.strip() + return lyrics, songname.strip() + + async def scrape_artist_image(self, artist, ctx): + url = f"https://www.last.fm/music/{urllib.parse.quote_plus(artist)}/+images" + data = await self.fetch(ctx, url, handling="text") + + soup = BeautifulSoup(data, "html.parser") + if soup is None: + return "" + image = soup.find("img", {"class": "image-list-image"}) + if image is None: + try: + image = soup.find("li", {"class": "image-list-item-wrapper"}).find("a").find("img") + except AttributeError: + return "" + return image["src"].replace("/avatar170s/", "/300x300/") if image else "" + + async def scrape_artists_for_chart(self, ctx, username, period, amount): + period_format_map = { + "7day": "LAST_7_DAYS", + "1month": "LAST_30_DAYS", + "3month": "LAST_90_DAYS", + "6month": "LAST_180_DAYS", + "12month": "LAST_365_DAYS", + "overall": "ALL", + } + tasks = [] + url = f"https://www.last.fm/user/{username}/library/artists" + for i in range(1, math.ceil(amount / 50) + 1): + params = {"date_preset": period_format_map[period], "page": i} + task = asyncio.ensure_future(self.fetch(ctx, url, params, handling="text")) + tasks.append(task) + + responses = await asyncio.gather(*tasks) + + images = [] + for data in responses: + if len(images) >= amount: + break + else: + soup = BeautifulSoup(data, "html.parser") + imagedivs = soup.findAll("td", {"class": "chartlist-image"}) + images += [ + div.find("img")["src"].replace("/avatar70s/", "/300x300/") for div in imagedivs + ] + + return images + + async def get_similar_artists(self, artistname, ctx): + similar = [] + url = f"https://last.fm/music/{artistname}" + data = await self.fetch(ctx, url, handling="text") + soup = BeautifulSoup(data, "html.parser") + for artist in soup.findAll("h3", {"class": "artist-similar-artists-sidebar-item-name"}): + similar.append(artist.find("a").text) + listeners = ( + soup.find("li", {"class": "header-metadata-tnew-item--listeners"}).find("abbr").text + ) + return similar, listeners + + async def get_playcount_scraper(self, ctx, username, artistname, period): + url = ( + f"https://last.fm/user/{username}/library/music/{artistname}" + f"?date_preset={self.period_http_format(period)}" + ) + data = await self.fetch(ctx, url, handling="text") + soup = BeautifulSoup(data, "html.parser") + divs = soup.findAll(class_="metadata-display") + if not divs: + return 0 + div = divs[0] + plays = div.get_text() + return int(plays.split(" ")[0].replace(",", "")) + + async def get_playcount_track_scraper(self, ctx, username, artistname, trackname, period): + url = ( + f"https://last.fm/user/{username}/library/music/{artistname}/_/{trackname}" + f"?date_preset={self.period_http_format(period)}" + ) + data = await self.fetch(ctx, url, handling="text") + soup = BeautifulSoup(data, "html.parser") + divs = soup.findAll(class_="metadata-display") + if not divs: + return 0 + div = divs[0] + plays = div.get_text() + return int(plays.split(" ")[0].replace(",", "")) + + async def get_playcount_album_scraper(self, ctx, username, artistname, albumname, period): + url = ( + f"https://last.fm/user/{username}/library/music/{artistname}/{albumname}" + f"?date_preset={self.period_http_format(period)}" + ) + data = await self.fetch(ctx, url, handling="text") + soup = BeautifulSoup(data, "html.parser") + divs = soup.findAll(class_="metadata-display") + if not divs: + return 0 + div = divs[0] + plays = div.get_text() + return int(plays.split(" ")[0].replace(",", "")) diff --git a/lastfm/utils/tokencheck.py b/lastfm/utils/tokencheck.py new file mode 100644 index 0000000..05ef328 --- /dev/null +++ b/lastfm/utils/tokencheck.py @@ -0,0 +1,10 @@ +async def tokencheck(ctx): + token = await ctx.bot.get_shared_api_tokens("lastfm") + return bool(token.get("appid")) + + +async def tokencheck_plus_secret(ctx): + token = await ctx.bot.get_shared_api_tokens("lastfm") + if token.get("appid") and token.get("secret"): + return True + return False diff --git a/lastfm/whoknows.py b/lastfm/whoknows.py new file mode 100644 index 0000000..9db9d02 --- /dev/null +++ b/lastfm/whoknows.py @@ -0,0 +1,279 @@ +import asyncio + +import discord +from redbot.core import commands +from redbot.core.utils.menus import DEFAULT_CONTROLS, menu + +from .abc import MixinMeta +from .exceptions import * +from .utils.tokencheck import tokencheck + + +class WhoKnowsMixin(MixinMeta): + """WhoKnows Commands""" + + @commands.command(name="whoknows", usage="", aliases=["wk"]) + @commands.check(tokencheck) + @commands.guild_only() + @commands.cooldown(2, 10, type=commands.BucketType.user) + async def command_whoknows(self, ctx, *, artistname=None): + """Check who has listened to a given artist the most.""" + listeners = [] + tasks = [] + async with ctx.typing(): + userlist = await self.config.all_users() + guildusers = [x.id for x in ctx.guild.members] + userslist = [user for user in userlist if user in guildusers] + if not artistname: + conf = await self.config.user(ctx.author).all() + self.check_if_logged_in(conf) + trackname, artistname, albumname, image_url = await self.get_current_track( + ctx, conf["lastfm_username"] + ) + for user in userslist: + lastfm_username = userlist[user]["lastfm_username"] + if lastfm_username is None: + continue + member = ctx.guild.get_member(user) + if member is None: + continue + + tasks.append( + self.get_playcount(ctx, lastfm_username, artistname, "overall", member) + ) + if tasks: + data = await asyncio.gather(*tasks) + data = [i for i in data if i] + for playcount, user, name in data: + artistname = name + if playcount > 0: + listeners.append((playcount, user)) + else: + return await ctx.send( + "Nobody on this server has connected their last.fm account yet!" + ) + rows = [] + total = 0 + for i, (playcount, user) in enumerate( + sorted(listeners, key=lambda p: p[0], reverse=True), start=1 + ): + if i == 1: + rank = "\N{CROWN}" + old_kingdata = await self.config.guild(ctx.guild).crowns() + old_kingartist = old_kingdata.get(artistname.lower()) + if old_kingartist is not None: + old_king = old_kingartist["user"] + old_king = ctx.guild.get_member(old_king) + else: + old_king = None + new_king = user + play = playcount + else: + rank = f"`#{i:2}`" + rows.append( + f"{rank} **{user.name}** — **{playcount}** {self.format_plays(playcount)}" + ) + total += playcount + + if not rows: + return await ctx.send(f"Nobody on this server has listened to **{artistname}**") + + content = discord.Embed( + title=f"Who knows **{artistname}**?", + color=await self.bot.get_embed_color(ctx.channel), + ) + image_url = await self.scrape_artist_image(artistname, ctx) + content.set_thumbnail(url=image_url) + content.set_footer(text=f"Collective plays: {total}") + + pages = await self.create_pages(content, rows) + if len(pages) > 1: + await menu(ctx, pages, DEFAULT_CONTROLS) + else: + await ctx.send(embed=pages[0]) + if old_king is None: + await ctx.send(f"> **{new_king.name}** just earned the **{artistname}** crown.") + async with self.config.guild(ctx.guild).crowns() as crowns: + crowns[artistname.lower()] = {"user": new_king.id, "playcount": play} + if isinstance(old_king, discord.Member): + if not (old_king.id == new_king.id): + await ctx.send( + f"> **{new_king.name}** just stole the **{artistname}** crown from **{old_king.name}**." + ) + async with self.config.guild(ctx.guild).crowns() as crowns: + crowns[artistname.lower()] = {"user": new_king.id, "playcount": play} + if old_king.id == new_king.id: + async with self.config.guild(ctx.guild).crowns() as crowns: + crowns[artistname.lower()] = {"user": new_king.id, "playcount": play} + + @commands.command( + name="whoknowstrack", usage=" | ", aliases=["wkt", "whoknowst"] + ) + @commands.check(tokencheck) + @commands.guild_only() + @commands.cooldown(2, 15, type=commands.BucketType.user) + async def command_whoknowstrack(self, ctx, *, track=None): + """ + Check who has listened to a given song the most. + """ + if not track: + conf = await self.config.user(ctx.author).all() + self.check_if_logged_in(conf) + trackname, artistname, albumname, image_url = await self.get_current_track( + ctx, conf["lastfm_username"] + ) + else: + try: + trackname, artistname = [x.strip() for x in track.split("|")] + if trackname == "" or artistname == "": + raise ValueError + except ValueError: + return await ctx.send("\N{WARNING SIGN} Incorrect format! use `track | artist`") + + listeners = [] + tasks = [] + userlist = await self.config.all_users() + guildusers = [x.id for x in ctx.guild.members] + userslist = [user for user in userlist if user in guildusers] + for user in userslist: + lastfm_username = userlist[user]["lastfm_username"] + member = ctx.guild.get_member(user) + if member is None or lastfm_username is None: + continue + + tasks.append( + self.get_playcount_track( + ctx, lastfm_username, artistname, trackname, "overall", member + ) + ) + + if tasks: + data = await asyncio.gather(*tasks) + data = [i for i in data if i] + for playcount, user, metadata in data: + artistname, trackname, image_url = metadata + if artistname is None or trackname is None: + return await ctx.send("Track could not be found on last.fm!") + if playcount > 0: + listeners.append((playcount, user)) + else: + return await ctx.send("Nobody on this server has connected their last.fm account yet!") + + rows = [] + total = 0 + for i, (playcount, user) in enumerate( + sorted(listeners, key=lambda p: p[0], reverse=True), start=1 + ): + rows.append( + f"`#{i:2}` **{user.name}** — **{playcount}** {self.format_plays(playcount)}" + ) + total += playcount + + if not rows: + return await ctx.send( + f"Nobody on this server has listened to **{trackname}** by **{artistname}**" + ) + if image_url is None: + image_url = await self.scrape_artist_image(artistname, ctx) + + content = discord.Embed( + title=f"Who knows **{trackname}**\n— by {artistname}", + color=await self.bot.get_embed_color(ctx.channel), + ) + content.set_thumbnail(url=image_url) + content.set_footer(text=f"Collective plays: {total}") + + pages = await self.create_pages(content, rows) + if len(pages) > 1: + await menu(ctx, pages, DEFAULT_CONTROLS) + else: + await ctx.send(embed=pages[0]) + + @commands.command( + name="whoknowsalbum", aliases=["wka", "whoknowsa"], usage=" | " + ) + @commands.check(tokencheck) + @commands.guild_only() + @commands.cooldown(2, 15, type=commands.BucketType.user) + async def command_whoknowsalbum(self, ctx, *, album=None): + """ + Check who has listened to a given album the most. + """ + if not album: + conf = await self.config.user(ctx.author).all() + self.check_if_logged_in(conf) + + trackname, artistname, albumname, image_url = await self.get_current_track( + ctx, conf["lastfm_username"] + ) + if not albumname: + return await ctx.send( + "Sorry, the track you're listening to doesn't have the album info provided." + ) + else: + try: + albumname, artistname = [x.strip() for x in album.split("|")] + if not albumname or not artistname: + raise ValueError + except ValueError: + return await ctx.send("\N{WARNING SIGN} Incorrect format! use `album | artist`") + + listeners = [] + tasks = [] + userlist = await self.config.all_users() + guildusers = [x.id for x in ctx.guild.members] + userslist = [user for user in userlist if user in guildusers] + for user in userslist: + lastfm_username = userlist[user]["lastfm_username"] + member = ctx.guild.get_member(user) + if member is None or lastfm_username is None: + continue + + tasks.append( + self.get_playcount_album( + ctx, lastfm_username, artistname, albumname, "overall", member + ) + ) + + if tasks: + data = await asyncio.gather(*tasks) + data = [i for i in data if i] + for playcount, user, metadata in data: + artistname, albumname, image_url = metadata + if artistname is None or albumname is None: + return await ctx.send("Album could not be found on last.fm!") + if playcount > 0: + listeners.append((playcount, user)) + else: + return await ctx.send("Nobody on this server has connected their last.fm account yet!") + + rows = [] + total = 0 + for i, (playcount, user) in enumerate( + sorted(listeners, key=lambda p: p[0], reverse=True), start=1 + ): + rows.append( + f"`#{i:2}` **{user.name}** — **{playcount}** {self.format_plays(playcount)}" + ) + total += playcount + + if not rows: + return await ctx.send( + f"Nobody on this server has listened to **{albumname}** by **{artistname}**" + ) + + if image_url is None: + image_url = await self.scrape_artist_image(artistname, ctx) + + content = discord.Embed( + title=f"Who knows **{albumname}**\n— by {artistname}", + color=await self.bot.get_embed_color(ctx.channel), + ) + content.set_thumbnail(url=image_url) + content.set_footer(text=f"Collective plays: {total}") + + pages = await self.create_pages(content, rows) + if len(pages) > 1: + await menu(ctx, pages, DEFAULT_CONTROLS) + else: + await ctx.send(embed=pages[0]) diff --git a/lastfm/wordcloud.py b/lastfm/wordcloud.py new file mode 100644 index 0000000..a148126 --- /dev/null +++ b/lastfm/wordcloud.py @@ -0,0 +1,108 @@ +from contextlib import suppress +from io import BytesIO +from typing import Optional + +import discord +from redbot.core import commands + +from .abc import MixinMeta +from .exceptions import * +from .fmmixin import FMMixin + +command_fm = FMMixin.command_fm +command_fm_server = FMMixin.command_fm_server + +with suppress(Exception): + from wordcloud import WordCloud + + +async def wordcloud_available(ctx): + return "WordCloud" in globals().keys() + + +class WordCloudMixin(MixinMeta): + """WordCloud Commands""" + + def wordcloud_create(self): + if "WordCloud" in globals().keys(): + self.wc = WordCloud(width=1920, height=1080, mode="RGBA", background_color=None) + + @command_fm.group(name="wordcloud", aliases=["cloud", "wc"]) + @commands.check(wordcloud_available) + @commands.bot_has_permissions(attach_files=True) + async def command_wordcloud(self, ctx): + """WordCloud Commands + + Original idea: http://lastfm.dontdrinkandroot.net""" + + @command_wordcloud.command(name="artists", aliases=["artist"]) + @commands.max_concurrency(1, commands.BucketType.user) + async def command_wordcloud_artists(self, ctx, user: Optional[discord.Member] = None): + """Get a picture with the most listened to artists.""" + user = user or ctx.author + async with ctx.typing(): + conf = await self.config.user(user).all() + self.check_if_logged_in(conf, user == ctx.author) + name = conf["lastfm_username"] + data = await self.api_request( + ctx, {"user": name, "method": "user.gettopartists", "period": "overall"} + ) + artists = data["topartists"]["artist"] + if not artists: + return await ctx.send(f"{name} has not listened to any artists yet!") + data = {a["name"]: int(a["playcount"]) for a in artists} + wc = await self.bot.loop.run_in_executor(None, self.wc.generate_from_frequencies, data) + pic = BytesIO() + pic.name = f"{name}_artists.png" + wc.to_file(pic) + pic.seek(0) + await ctx.send(f"{name}'s artist cloud:", file=discord.File(pic)) + pic.close() + + @command_wordcloud.command(name="tracks", aliases=["track"]) + @commands.max_concurrency(1, commands.BucketType.user) + async def command_wordcloud_tracks(self, ctx, user: Optional[discord.Member] = None): + """Get a picture with the most listened to tracks.""" + user = user or ctx.author + async with ctx.typing(): + conf = await self.config.user(user).all() + self.check_if_logged_in(conf, user == ctx.author) + name = conf["lastfm_username"] + data = await self.api_request( + ctx, {"user": name, "method": "user.gettoptracks", "period": "overall"} + ) + tracks = data["toptracks"]["track"] + if not tracks: + return await ctx.send(f"{name} has not listened to any tracks yet!") + data = {a["name"]: int(a["playcount"]) for a in tracks} + wc = await self.bot.loop.run_in_executor(None, self.wc.generate_from_frequencies, data) + pic = BytesIO() + pic.name = f"{name}_tracks.png" + wc.to_file(pic) + pic.seek(0) + await ctx.send(f"{name}'s track cloud:", file=discord.File(pic)) + pic.close() + + @command_wordcloud.command(name="albums", aliases=["album"]) + @commands.max_concurrency(1, commands.BucketType.user) + async def command_wordcloud_albums(self, ctx, user: Optional[discord.Member] = None): + """Get a picture with the most listened to albums.""" + user = user or ctx.author + async with ctx.typing(): + conf = await self.config.user(user).all() + self.check_if_logged_in(conf, user == ctx.author) + name = conf["lastfm_username"] + data = await self.api_request( + ctx, {"user": name, "method": "user.gettopalbums", "period": "overall"} + ) + albums = data["topalbums"]["album"] + if not albums: + return await ctx.send(f"{name} has not listened to any albums yet!") + data = {a["name"]: int(a["playcount"]) for a in albums} + wc = await self.bot.loop.run_in_executor(None, self.wc.generate_from_frequencies, data) + pic = BytesIO() + pic.name = f"{name}_albums.png" + wc.to_file(pic) + pic.seek(0) + await ctx.send(f"{name}'s albums cloud:", file=discord.File(pic)) + pic.close()