Refactor Translator Cog to improve configuration handling and enhance user experience. Introduce new options for embed display preferences and streamline error handling for translations. Update command descriptions for clarity and usability.
Some checks are pending
Run pre-commit / Run pre-commit (push) Waiting to run

This commit is contained in:
Valerie 2025-05-24 01:34:19 -04:00
parent d188c97267
commit 07686d47c2
3 changed files with 212 additions and 0 deletions

4
repoupdates/__init__.py Normal file
View file

@ -0,0 +1,4 @@
from .repoupdates import RepoUpdates
async def setup(bot):
await bot.add_cog(RepoUpdates(bot))

22
repoupdates/info.json Normal file
View file

@ -0,0 +1,22 @@
{
"author": [
"Valerie"
],
"install_msg": "Thanks for installing the RepoUpdates cog! Use `[p]help repoupdate` to see available commands.",
"name": "RepoUpdates",
"disabled": false,
"short": "Monitor cog repositories for updates",
"description": "Monitors GitHub repositories for updates and posts notifications in a specified channel when changes are detected.",
"tags": [
"utility",
"notifications",
"github",
"updates"
],
"required_cogs": {},
"requirements": [
"aiohttp"
],
"type": "COG",
"end_user_data_statement": "This cog does not store any user data."
}

186
repoupdates/repoupdates.py Normal file
View file

@ -0,0 +1,186 @@
import asyncio
import json
from datetime import datetime
from pathlib import Path
from typing import Dict, Optional
import aiohttp
import discord
from redbot.core import Config, commands
from redbot.core.bot import Red
from redbot.core.utils.chat_formatting import box, humanize_list
class RepoUpdates(commands.Cog):
"""Monitor cog repositories for updates and post notifications."""
def __init__(self, bot: Red):
self.bot = bot
self.config = Config.get_conf(self, identifier=8927348923)
self.session = aiohttp.ClientSession()
self.bg_task = None
default_global = {
"check_interval": 300, # 5 minutes in seconds
"channel_id": None,
"repos": {} # Dict[str, Dict[str, str]] - repo_url: {last_commit: str}
}
self.config.register_global(**default_global)
def cog_unload(self):
if self.bg_task:
self.bg_task.cancel()
asyncio.create_task(self.session.close())
async def initialize(self):
"""Start the background task."""
self.bg_task = self.bot.loop.create_task(self.check_updates_loop())
@commands.group()
@commands.admin_or_permissions(administrator=True)
async def repoupdate(self, ctx: commands.Context):
"""Commands for repository update notifications."""
pass
@repoupdate.command()
async def channel(self, ctx: commands.Context, channel: discord.TextChannel):
"""Set the channel for repository update notifications."""
await self.config.channel_id.set(channel.id)
await ctx.send(f"Update notifications will be sent to {channel.mention}")
@repoupdate.command()
async def interval(self, ctx: commands.Context, seconds: int):
"""Set how often to check for updates (in seconds)."""
if seconds < 60:
await ctx.send("Interval must be at least 60 seconds.")
return
await self.config.check_interval.set(seconds)
await ctx.send(f"Update check interval set to {seconds} seconds.")
@repoupdate.command()
async def addrepo(self, ctx: commands.Context, name: str, repo_url: str):
"""Add a repository to monitor."""
async with self.config.repos() as repos:
if name in repos:
await ctx.send("A repository with that name already exists.")
return
# Validate the repo URL and get the latest commit
try:
api_url = repo_url.replace("github.com", "api.github.com/repos")
if api_url.endswith("/"):
api_url = api_url[:-1]
api_url += "/commits"
async with self.session.get(api_url) as resp:
if resp.status != 200:
await ctx.send("Failed to fetch repository information. Please check the URL.")
return
commits = await resp.json()
if not commits:
await ctx.send("No commits found in the repository.")
return
latest_commit = commits[0]["sha"]
repos[name] = {
"url": repo_url,
"last_commit": latest_commit
}
await ctx.send(f"Added repository: {name}")
except Exception as e:
await ctx.send(f"Error adding repository: {str(e)}")
@repoupdate.command()
async def removerepo(self, ctx: commands.Context, name: str):
"""Remove a repository from monitoring."""
async with self.config.repos() as repos:
if name not in repos:
await ctx.send("Repository not found.")
return
del repos[name]
await ctx.send(f"Removed repository: {name}")
@repoupdate.command()
async def listrepos(self, ctx: commands.Context):
"""List all monitored repositories."""
repos = await self.config.repos()
if not repos:
await ctx.send("No repositories are being monitored.")
return
msg = "Monitored Repositories:\n"
for name, data in repos.items():
msg += f"\n{name}: {data['url']}"
await ctx.send(box(msg))
async def check_updates_loop(self):
"""Background loop to check for repository updates."""
await self.bot.wait_until_ready()
while True:
try:
await self.check_updates()
except Exception as e:
print(f"Error checking updates: {str(e)}")
interval = await self.config.check_interval()
await asyncio.sleep(interval)
async def check_updates(self):
"""Check all repositories for updates."""
channel_id = await self.config.channel_id()
if not channel_id:
return
channel = self.bot.get_channel(channel_id)
if not channel:
return
async with self.config.repos() as repos:
for name, data in repos.items():
try:
api_url = data["url"].replace("github.com", "api.github.com/repos")
if api_url.endswith("/"):
api_url = api_url[:-1]
api_url += "/commits"
async with self.session.get(api_url) as resp:
if resp.status != 200:
continue
commits = await resp.json()
if not commits:
continue
latest_commit = commits[0]["sha"]
if latest_commit != data["last_commit"]:
# Get the changes
changes = []
for commit in commits:
if commit["sha"] == data["last_commit"]:
break
changes.append(f"{commit['commit']['message']}")
# Create and send the update embed
embed = discord.Embed(
title=f"Repository Update: {name}",
url=data["url"],
color=discord.Color.green(),
timestamp=datetime.now()
)
if changes:
embed.add_field(
name="Changes",
value="\n".join(changes[:10]), # Show up to 10 changes
inline=False
)
if len(changes) > 10:
embed.set_footer(text=f"And {len(changes) - 10} more changes...")
await channel.send(embed=embed)
data["last_commit"] = latest_commit
except Exception as e:
print(f"Error checking repository {name}: {str(e)}")
continue