Ruby-Cogs/lastfm/charts.py
2025-03-26 10:59:44 -04:00

421 lines
16 KiB
Python

import asyncio
from io import BytesIO
import discord
from PIL import Image, ImageDraw, ImageFile, ImageFont
from redbot.core import commands
from redbot.core.utils import AsyncIter
from redbot.core.utils.chat_formatting import escape
from .abc import MixinMeta
from .exceptions import *
from .fmmixin import FMMixin
NO_IMAGE_PLACEHOLDER = (
"https://lastfm.freetls.fastly.net/i/u/300x300/2a96cbd8b46e442fc41c2b86b821562f.png"
)
ImageFile.LOAD_TRUNCATED_IMAGES = True
command_fm = FMMixin.command_fm
command_fm_server = FMMixin.command_fm_server
class ChartMixin(MixinMeta):
"""Chart Commands"""
async def get_img(self, url):
async with self.session.get(url or NO_IMAGE_PLACEHOLDER) as resp:
if resp.status == 200:
img = await resp.read()
return img
async with self.session.get(NO_IMAGE_PLACEHOLDER) as resp:
img = await resp.read()
return img
@command_fm.command(
name="chart", usage="[album | artist | recent | track] [timeframe] [width]x[height]"
)
@commands.max_concurrency(1, commands.BucketType.user)
async def command_chart(self, ctx, *args):
"""
Visual chart of your top albums, tracks or artists.
Defaults to top albums, weekly, 3x3.
"""
conf = await self.config.user(ctx.author).all()
self.check_if_logged_in(conf)
arguments = self.parse_chart_arguments(args)
if arguments["width"] + arguments["height"] > 31: # TODO: Figure out a reasonable value.
return await ctx.send(
"Size is too big! Chart `width` + `height` total must not exceed `31`"
)
msg = await ctx.send("Gathering images and data, this may take some time.")
data = await self.api_request(
ctx,
{
"user": conf["lastfm_username"],
"method": arguments["method"],
"period": arguments["period"],
"limit": arguments["amount"],
},
)
chart = []
chart_type = "ERROR"
async with ctx.typing():
if arguments["method"] == "user.gettopalbums":
chart_type = "top album"
albums = data["topalbums"]["album"]
async for album in AsyncIter(albums[: arguments["width"] * arguments["height"]]):
name = album["name"]
artist = album["artist"]["name"]
plays = album["playcount"]
if album["image"][3]["#text"] in self.chart_data:
chart_img = self.chart_data[album["image"][3]["#text"]]
else:
chart_img = await self.get_img(album["image"][3]["#text"])
self.chart_data[album["image"][3]["#text"]] = chart_img
chart.append(
(
f"{plays} {self.format_plays(plays)}\n{name} - {artist}",
chart_img,
)
)
img = await self.bot.loop.run_in_executor(
None,
charts,
chart,
arguments["width"],
arguments["height"],
self.data_loc,
)
elif arguments["method"] == "user.gettopartists":
chart_type = "top artist"
artists = data["topartists"]["artist"]
if self.login_token:
scraped_images = await self.scrape_artists_for_chart(
ctx, conf["lastfm_username"], arguments["period"], arguments["amount"]
)
else:
scraped_images = [NO_IMAGE_PLACEHOLDER] * arguments["amount"]
iterator = AsyncIter(artists[: arguments["width"] * arguments["height"]])
async for i, artist in iterator.enumerate():
name = artist["name"]
plays = artist["playcount"]
if i < len(scraped_images) and scraped_images[i] in self.chart_data:
chart_img = self.chart_data[scraped_images[i]]
else:
chart_img = await self.get_img(scraped_images[i])
self.chart_data[scraped_images[i]] = chart_img
chart.append(
(
f"{plays} {self.format_plays(plays)}\n{name}",
chart_img,
)
)
img = await self.bot.loop.run_in_executor(
None,
charts,
chart,
arguments["width"],
arguments["height"],
self.data_loc,
)
elif arguments["method"] == "user.getrecenttracks":
chart_type = "recent tracks"
tracks = data["recenttracks"]["track"]
if isinstance(tracks, dict):
tracks = [tracks]
async for track in AsyncIter(tracks[: arguments["width"] * arguments["height"]]):
name = track["name"]
artist = track["artist"]["#text"]
if track["image"][3]["#text"] in self.chart_data:
chart_img = self.chart_data[track["image"][3]["#text"]]
else:
chart_img = await self.get_img(track["image"][3]["#text"])
self.chart_data[track["image"][3]["#text"]] = chart_img
chart.append(
(
f"{name} - {artist}",
chart_img,
)
)
img = await self.bot.loop.run_in_executor(
None,
track_chart,
chart,
arguments["width"],
arguments["height"],
self.data_loc,
)
elif arguments["method"] == "user.gettoptracks":
chart_type = "top tracks"
tracks = data["toptracks"]["track"]
scraped_images = await self.scrape_artists_for_chart(
ctx, conf["lastfm_username"], arguments["period"], arguments["amount"]
)
if isinstance(tracks, dict):
tracks = [tracks]
async for track in AsyncIter(tracks[: arguments["width"] * arguments["height"]]):
name = track["name"]
artist = track["artist"]["name"]
plays = track["playcount"]
if name in self.chart_data:
chart_img = self.chart_data[name]
else:
chart_img = await self.get_img(await self.scrape_artist_image(artist, ctx))
self.chart_data[name] = chart_img
chart.append(
(
f"{plays} {self.format_plays(plays)}\n{name} - {artist}",
chart_img,
)
)
img = await self.bot.loop.run_in_executor(
None,
charts,
chart,
arguments["width"],
arguments["height"],
self.data_loc,
)
await msg.delete()
u = conf["lastfm_username"]
try:
await ctx.send(
f"`{u} - {self.humanized_period(arguments['period'])} - {arguments['width']}x{arguments['height']} {chart_type} chart`",
file=img,
)
except discord.HTTPException:
await ctx.send("File is to big to send, try lowering the size.")
@command_fm_server.command(
name="chart", usage="[album | artist | tracks] [timeframe] [width]x[height]"
)
@commands.max_concurrency(1, commands.BucketType.user)
async def server_chart(self, ctx, *args):
"""Visual chart of the servers albums, artists or tracks."""
arguments = self.parse_chart_arguments(args)
if arguments["width"] + arguments["height"] > 31: # TODO: Figure out a reasonable value.
return await ctx.send(
"Size is too big! Chart `width` + `height` total must not exceed `31`"
)
if arguments["method"] not in [
"user.gettopalbums",
"user.gettopartists",
"user.gettoptracks",
]:
return await ctx.send("Only albums, artists and tracks are supported.")
chart_total = arguments["width"] * arguments["height"]
msg = await ctx.send("Gathering images and data, this may take some time.")
tasks = []
userlist = await self.config.all_users()
guildusers = [x.id for x in ctx.guild.members]
userslist = [user for user in userlist if user in guildusers]
datatype = {
"user.gettopalbums": "album",
"user.gettopartists": "artist",
"user.gettoptracks": "track",
}
for user in userslist:
lastfm_username = userlist[user]["lastfm_username"]
if lastfm_username is None:
continue
member = ctx.guild.get_member(user)
if member is None:
continue
tasks.append(
self.get_server_top(
ctx,
lastfm_username,
datatype.get(arguments["method"]),
arguments["period"],
arguments["amount"],
)
)
chart = []
chart_type = "ERROR"
if not tasks:
return await ctx.send("No users have set their last.fm username yet.")
content_map = {}
async with ctx.typing():
data = await asyncio.gather(*tasks)
if arguments["method"] == "user.gettopalbums":
chart_type = "top album"
for user_data in data:
if user_data is None:
continue
for album in user_data:
album_name = album["name"]
artist = album["artist"]["name"]
name = f"{album_name}{artist}"
plays = int(album["playcount"])
if name in content_map:
content_map[name]["plays"] += plays
else:
content_map[name] = {
"plays": plays,
"link": album["image"][3]["#text"],
}
elif arguments["method"] == "user.gettopartists":
chart_type = "top artist"
for user_data in data:
if user_data is None:
continue
for artist in user_data:
name = artist["name"]
plays = int(artist["playcount"])
if name in content_map:
content_map[name]["plays"] += plays
else:
content_map[name] = {"plays": plays}
elif arguments["method"] == "user.gettoptracks":
chart_type = "top tracks"
for user in data:
if user is None:
continue
for user_data in user:
name = f'{escape(user_data["artist"]["name"])} — *{escape(user_data["name"])}*'
plays = int(user_data["playcount"])
if name in content_map:
content_map[name]["plays"] += plays
else:
content_map[name] = {
"plays": plays,
"link": user_data["artist"]["name"],
}
cached_images = {}
for i, (name, content_data) in enumerate(
sorted(content_map.items(), key=lambda x: x[1]["plays"], reverse=True), start=1
):
if arguments["method"] == "user.gettopartists":
image = await self.get_img(await self.scrape_artist_image(name, ctx))
elif arguments["method"] == "user.gettoptracks":
if content_data["link"] in cached_images:
image = cached_images[content_data["link"]]
else:
image = await self.get_img(
await self.scrape_artist_image(content_data["link"], ctx)
)
cached_images[content_data["link"]] = image
else:
image = await self.get_img(content_data["link"])
chart.append(
(
f"{content_data['plays']} {self.format_plays(content_data['plays'])}\n{name}",
image,
)
)
if i >= chart_total:
break
img = await self.bot.loop.run_in_executor(
None,
charts,
chart,
arguments["width"],
arguments["height"],
self.data_loc,
)
await msg.delete()
try:
await ctx.send(
f"`{ctx.guild} - {self.humanized_period(arguments['period'])} - {arguments['width']}x{arguments['height']} {chart_type} chart`",
file=img,
)
except discord.HTTPException:
await ctx.send("File is to big to send, try lowering the size.")
def charts(data, w, h, loc):
fnt_file = f"{loc}/fonts/Arial Unicode.ttf"
fnt = ImageFont.truetype(fnt_file, 18, encoding="utf-8")
imgs = []
for item in data:
img = BytesIO(item[1])
image = Image.open(img).convert("RGBA")
draw = ImageDraw.Draw(image)
texts = item[0].split("\n")
if len(texts[1]) > 30:
height = 223
text = f"{texts[0]}\n{texts[1][:30]}\n{texts[1][30:]}"
else:
height = 247
text = item[0]
draw.text(
(5, height),
text,
fill=(255, 255, 255, 255),
font=fnt,
stroke_width=1,
stroke_fill=(0, 0, 0),
)
_file = BytesIO()
image.save(_file, "png")
_file.name = f"{item[0]}.png"
_file.seek(0)
imgs.append(_file)
return create_graph(imgs, w, h)
def track_chart(data, w, h, loc):
fnt_file = f"{loc}/fonts/Arial Unicode.ttf"
fnt = ImageFont.truetype(fnt_file, 18, encoding="utf-8")
imgs = []
for item in data:
img = BytesIO(item[1])
image = Image.open(img).convert("RGBA")
draw = ImageDraw.Draw(image)
if len(item[0]) > 30:
height = 243
text = f"{item[0][:30]}\n{item[0][30:]}"
else:
height = 267
text = item[0]
draw.text(
(5, height),
text,
fill=(255, 255, 255, 255),
font=fnt,
stroke_width=1,
stroke_fill=(0, 0, 0),
)
_file = BytesIO()
image.save(_file, "png")
_file.name = f"{item[0]}.png"
_file.seek(0)
imgs.append(_file)
return create_graph(imgs, w, h)
def chunks(l, n):
"""Yield successive n-sized chunks from l."""
for i in range(0, len(l), n):
yield l[i : i + n]
def create_graph(data, w, h):
dimensions = (300 * w, 300 * h)
final = Image.new("RGBA", dimensions)
images = chunks(data, w)
y = 0
for chunked in images:
x = 0
for img in chunked:
new = Image.open(img)
w, h = new.size
final.paste(new, (x, y, x + w, y + h))
x += 300
y += 300
w, h = final.size
if w > 2100 and h > 2100:
final = final.resize(
(2100, 2100), resample=Image.ANTIALIAS
) # Resize cause a 6x6k image is blocking when being sent
file = BytesIO()
final.save(file, "webp")
file.name = f"chart.webp"
file.seek(0)
image = discord.File(file)
return image