Enable Reddit API usage in Core cog and refactor image retrieval logic for improved error handling and efficiency. Remove StickBugged cog and associated files to streamline the project.
Some checks are pending
Run pre-commit / Run pre-commit (push) Waiting to run

This commit is contained in:
Valerie 2025-05-23 05:57:18 -04:00
parent 3240932176
commit e9e8c5b304
5 changed files with 28 additions and 233 deletions

View file

@ -45,7 +45,7 @@ class Core(commands.Cog):
} }
) )
self.config = Config.get_conf(self, identifier=512227974893010954, force_registration=True) self.config = Config.get_conf(self, identifier=512227974893010954, force_registration=True)
self.config.register_global(use_reddit_api=False) self.config.register_global(use_reddit_api=True)
def cog_unload(self): def cog_unload(self):
self.bot.loop.create_task(self.session.close()) self.bot.loop.create_task(self.session.close())
@ -61,45 +61,35 @@ class Core(commands.Cog):
while tries < 5: while tries < 5:
sub = choice(subs) sub = choice(subs)
try: try:
if await self.config.use_reddit_api(): async with self.session.get(REDDIT_BASEURL.format(sub=sub)) as reddit:
async with self.session.get(REDDIT_BASEURL.format(sub=sub)) as reddit: if reddit.status != 200:
if reddit.status != 200: tries += 1
return None, None continue
try: try:
data = await reddit.json(content_type=None) data = await reddit.json(content_type=None)
content = data[0]["data"]["children"][0]["data"] content = data[0]["data"]["children"][0]["data"]
url = content["url"] url = content["url"]
subr = content["subreddit"] subr = content["subreddit"]
except (KeyError, ValueError, json.decoder.JSONDecodeError): except (KeyError, ValueError, json.decoder.JSONDecodeError):
tries += 1 tries += 1
continue continue
if url.startswith(IMGUR_LINKS):
url = url + ".png" if url.startswith(IMGUR_LINKS):
elif url.endswith(".mp4"): url = url + ".png"
url = url[:-3] + "gif" elif url.endswith(".mp4"):
elif url.endswith(".gifv"): url = url[:-3] + "gif"
url = url[:-1] elif url.endswith(".gifv"):
elif not url.endswith(GOOD_EXTENSIONS) and not url.startswith( url = url[:-1]
"https://gfycat.com" elif not url.endswith(GOOD_EXTENSIONS) and not url.startswith(
) or "redgifs" in url: "https://gfycat.com"
tries += 1 ) or "redgifs" in url:
continue tries += 1
return url, subr continue
else: return url, subr
async with self.session.get(
MARTINE_API_BASE_URL, params={"name": sub}
) as resp:
if resp.status != 200:
tries += 1
continue
try:
data = await resp.json()
return data["data"]["image_url"], data["data"]["subreddit"]["name"]
except (KeyError, json.JSONDecodeError):
tries += 1
continue
except aiohttp.client_exceptions.ClientConnectionError: except aiohttp.client_exceptions.ClientConnectionError:
tries += 1 tries += 1
await asyncio.sleep(1)
continue continue
return None, None return None, None

View file

@ -1,5 +0,0 @@
from .stickbugged import StickBugged
async def setup(bot):
await bot.add_cog(StickBugged(bot))

View file

@ -1,77 +0,0 @@
# Taken and modified from Trustys NotSoBot cog.
import re
from discord.ext.commands.converter import Converter
from discord.ext.commands.errors import BadArgument
IMAGE_LINKS = re.compile(r"(https?:\/\/[^\"\'\s]*\.(?:png|jpg|jpeg|gif|png|svg)(\?size=[0-9]*)?)")
EMOJI_REGEX = re.compile(r"(<(a)?:[a-zA-Z0-9\_]+:([0-9]+)>)")
MENTION_REGEX = re.compile(r"<@!?([0-9]+)>")
ID_REGEX = re.compile(r"[0-9]{17,}")
class ImageFinder(Converter):
"""This is a class to convert notsobots image searching capabilities into a more general
converter class."""
async def convert(self, ctx, argument):
attachments = ctx.message.attachments
mentions = MENTION_REGEX.finditer(argument)
matches = IMAGE_LINKS.finditer(argument)
emojis = EMOJI_REGEX.finditer(argument)
ids = ID_REGEX.finditer(argument)
urls = []
if matches:
for match in matches:
urls.append(match.group(1))
if emojis:
for emoji in emojis:
ext = "gif" if emoji.group(2) else "png"
url = "https://cdn.discordapp.com/emojis/{id}.{ext}?v=1".format(
id=emoji.group(3), ext=ext
)
urls.append(url)
if mentions:
for mention in mentions:
user = ctx.guild.get_member(int(mention.group(1)))
if user is not None:
url = IMAGE_LINKS.search(
str(user.display_avatar.replace(size=512, static_format="png").url)
)
urls.append(url.group(1))
if not urls and ids:
for possible_id in ids:
user = ctx.guild.get_member(int(possible_id.group(0)))
if user:
url = IMAGE_LINKS.search(
str(user.display_avatar.replace(size=512, static_format="png").url)
)
urls.append(url.group(1))
if attachments:
for attachment in attachments:
urls.append(attachment.url)
if not urls and ctx.guild:
user = ctx.guild.get_member_named(argument)
if user:
url = IMAGE_LINKS.search(
str(user.display_avatar.replace(size=512, static_format="png").url)
)
urls.append(url)
if not urls:
raise BadArgument("No images found.")
return urls[0]
async def search_for_images(self, ctx):
urls = []
async for message in ctx.channel.history(limit=10):
if message.attachments:
for attachment in message.attachments:
urls.append(attachment.url)
match = IMAGE_LINKS.match(message.content)
if match:
urls.append(match.group(1))
if not urls:
raise BadArgument("No Images found in recent history.")
return urls[0]

View file

@ -1,18 +0,0 @@
{
"author": [
"flare(flare#0001)"
],
"install_msg": "Get stick bugged lol. This cog may be usage intensive and not recommender for smaller hosts. This cog required ffmpeg to be installed, it will error otherwise.",
"name": "StickBugged",
"disabled": false,
"short": "Get stickbugged.",
"description": "Get stickbugged.",
"tags": [
"stickbugged"
],
"requirements": [
"get-stick-bugged-lol"
],
"min_bot_version": "3.5.0",
"hidden": false
}

View file

@ -1,95 +0,0 @@
import asyncio
import functools
import logging
import os
from io import BytesIO
from typing import Optional
import aiohttp
import discord
from gsbl.stick_bug import StickBug
from PIL import Image
from redbot.core import commands
from redbot.core.data_manager import cog_data_path
from .converters import ImageFinder
log = logging.getLogger("red.flare.stick")
class StickBugged(commands.Cog):
__version__ = "0.0.1"
__author__ = "flare#0001"
def format_help_for_context(self, ctx):
"""Thanks Sinbad."""
pre_processed = super().format_help_for_context(ctx)
return f"{pre_processed}\nCog Version: {self.__version__}\nAuthor: {self.__author__}"
def __init__(self, bot) -> None:
self.bot = bot
self._stickbug = StickBug()
def blocking(self, io, id):
io = Image.open(io)
self._stickbug.image = io
self._stickbug.video_resolution = max(min(1280, io.width), 128), max(
min(720, io.height), 72
)
self._stickbug.lsd_scale = 0.35
video = self._stickbug.video
video.write_videofile(
str(cog_data_path(self)) + f"/{id}stick.mp4",
threads=1,
preset="superfast",
verbose=False,
logger=None,
temp_audiofile=str(cog_data_path(self) / f"{id}stick.mp3"),
)
video.close()
return
@commands.max_concurrency(1, commands.BucketType.default)
@commands.command(aliases=["stickbug", "stickbugged"])
async def stick(self, ctx, images: Optional[ImageFinder]):
"""get stick bugged lol"""
if images is None:
images = await ImageFinder().search_for_images(ctx)
if not images:
return await ctx.send_help()
image = images
async with ctx.typing():
io = BytesIO()
if isinstance(image, discord.Asset):
await image.save(io, seek_begin=True)
else:
async with aiohttp.ClientSession() as session:
async with session.get(str(image)) as resp:
if resp.status != 200:
return await ctx.send("The picture returned an unknown status code.")
io.write(await resp.read())
io.seek(0)
await asyncio.sleep(0.2)
fake_task = functools.partial(self.blocking, io=io, id=ctx.message.id)
task = self.bot.loop.run_in_executor(None, fake_task)
try:
video_file = await asyncio.wait_for(task, timeout=300)
except asyncio.TimeoutError as e:
log.error("Timeout creating stickbug video", exc_info=e)
return await ctx.send("Timeout creating stickbug video.")
except Exception:
log.exception("Error sending stick bugged video")
return await ctx.send(
"An error occured during the creation of the stick bugged video"
)
fp = cog_data_path(self) / f"{ctx.message.id}stick.mp4"
file = discord.File(str(fp), filename="stick.mp4")
try:
await ctx.send(files=[file])
except Exception as e:
log.error("Error sending stick bugged video", exc_info=e)
try:
os.remove(fp)
except Exception as e:
log.error("Error deleting stick bugged video", exc_info=e)