Update main.py

This commit is contained in:
谢幕 2025-07-18 21:34:09 +08:00 committed by GitHub
parent 6dc75cd4ed
commit 3a6aef8ed7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

222
main.py
View File

@ -1,6 +1,7 @@
import re
import json
import os
import uuid
import aiohttp
import asyncio
from datetime import datetime
@ -16,15 +17,17 @@ SUBSCRIPTION_FILE = "data/astrbot_plugin_github_sub_subscriptions.json"
DEFAULT_REPO_FILE = "data/astrbot_plugin_github_sub_default_repos.json"
GITHUB_URL_PATTERN = r"https://github\.com/[\w\-]+/[\w\-]+(?:/(pull|issues)/\d+)?"
GITHUB_REPO_OPENGRAPH = "https://opengraph.githubassets.com/{hash}/{appendix}"
GITHUB_API_URL = "https://api.github.com/repos/{repo}"
GITHUB_ISSUES_API_URL = "https://api.github.com/repos/{repo}/issues"
GITHUB_RELEASES_API_URL = "https://api.github.com/repos/{repo}/releases" # 新增Release API
@register(
"astrbot_plugin_github_sub",
"XieMu",
"GitHub仓库订阅插件",
"1.0.0",
"1.1.0", # 版本号升级
"https://github.com/xiemu-c/astrbot_plugin_github_sub",
)
class MyPlugin(Star):
@ -33,15 +36,22 @@ class MyPlugin(Star):
self.config = config or {}
self.subscriptions = self._load_subscriptions()
self.default_repos = self._load_default_repos()
self.last_check_time = {} # 存储每个仓库的最后检查时间
# 修改区分issues和releases的检查时间
self.last_check_time = {
"issues": {}, # 记录Issue/PR的最后检查时间
"releases": {} # 记录Release的最后检查时间
}
self.use_lowercase = self.config.get("use_lowercase_repo", True)
self.github_token = self.config.get("github_token", "")
self.check_interval = self.config.get("check_interval", 30)
# 新增:是否包含预发布版本的配置
self.include_prereleases = self.config.get("include_prereleases", False)
# 启动后台检查更新任务
self.task = asyncio.create_task(self._check_updates_periodically())
logger.info(
f"GitHub 订阅插件初始化完成,检查间隔: {self.check_interval}分钟"
f"GitHub 订阅插件初始化完成,检查间隔: {self.check_interval}分钟,"
f"是否包含预发布版本: {self.include_prereleases}"
)
def _load_subscriptions(self) -> Dict[str, List[str]]:
@ -113,7 +123,7 @@ class MyPlugin(Star):
@filter.command("ghsub")
async def subscribe_repo(self, event: AstrMessageEvent, repo: str):
"""订阅GitHub仓库的IssuePR。例如: /ghsub Soulter/AstrBot"""
"""订阅GitHub仓库的Issue、PR和Release。例如: /ghsub Soulter/AstrBot"""
if not self._is_valid_repo(repo):
yield event.plain_result("请提供有效的仓库名,格式为: 用户名/仓库名")
return
@ -150,9 +160,10 @@ class MyPlugin(Star):
self._save_subscriptions()
# 为新订阅获取初始状态
await self._fetch_new_items(normalized_repo, None)
await self._fetch_new_items(repo, None)
await self._fetch_new_releases(repo, None) # 初始化Release检查时间
yield event.plain_result(f"成功订阅仓库 {display_name} 的IssuePR更新")
yield event.plain_result(f"成功订阅仓库 {display_name} 的Issue、PR和Release更新")
else:
yield event.plain_result(f"你已经订阅了仓库 {display_name}")
@ -174,6 +185,11 @@ class MyPlugin(Star):
unsubscribed.append(repo_name)
if not subscribers:
del self.subscriptions[repo_name]
# 移除检查时间记录
if repo_name in self.last_check_time["issues"]:
del self.last_check_time["issues"][repo_name]
if repo_name in self.last_check_time["releases"]:
del self.last_check_time["releases"][repo_name]
if unsubscribed:
self._save_subscriptions()
@ -206,6 +222,10 @@ class MyPlugin(Star):
self.subscriptions[normalized_repo].remove(subscriber_id)
if not self.subscriptions[normalized_repo]:
del self.subscriptions[normalized_repo]
if normalized_repo in self.last_check_time["issues"]:
del self.last_check_time["issues"][normalized_repo]
if normalized_repo in self.last_check_time["releases"]:
del self.last_check_time["releases"][normalized_repo]
self._save_subscriptions()
yield event.plain_result(f"已取消订阅仓库 {repo}")
else:
@ -256,18 +276,20 @@ class MyPlugin(Star):
continue
try:
# 获取该仓库的最后检查时间
last_check = self.last_check_time.get(repo, None)
# 获取新的issues和PRs
new_items = await self._fetch_new_items(repo, last_check)
# 检查新的issues和PRs
issue_last_check = self.last_check_time["issues"].get(repo, None)
new_items = await self._fetch_new_items(repo, issue_last_check)
if new_items:
# 更新最后检查时间
self.last_check_time[repo] = datetime.now().isoformat()
# 通知订阅者有关新内容
self.last_check_time["issues"][repo] = datetime.utcnow().replace(microsecond=0).isoformat()
await self._notify_subscribers(repo, new_items)
# 检查新的Releases
release_last_check = self.last_check_time["releases"].get(repo, None)
new_releases = await self._fetch_new_releases(repo, release_last_check)
if new_releases:
self.last_check_time["releases"][repo] = datetime.utcnow().replace(microsecond=0).isoformat()
await self._notify_subscribers_releases(repo, new_releases)
except Exception as e:
logger.error(f"检查仓库 {repo} 更新时出错: {e}")
@ -275,26 +297,20 @@ class MyPlugin(Star):
"""从上次检查以来获取仓库的新issues和PRs"""
if not last_check:
# 如果是第一次检查,只记录当前时间并返回空列表
# 存储为UTC时间戳不带时区信息以避免比较问题
self.last_check_time[repo] = (
self.last_check_time["issues"][repo] = (
datetime.utcnow().replace(microsecond=0).isoformat()
)
logger.info(f"初始化仓库 {repo}时间戳: {self.last_check_time[repo]}")
logger.info(f"初始化仓库 {repo}Issue/PR时间戳: {self.last_check_time['issues'][repo]}")
return []
try:
# 始终将存储的时间戳视为不带时区信息的UTC时间
last_check_dt = datetime.fromisoformat(last_check)
# 确保它被视为简单的datetime
if hasattr(last_check_dt, "tzinfo") and last_check_dt.tzinfo is not None:
# 如果它以某种方式具有时区信息转换为简单的UTC
last_check_dt = last_check_dt.replace(tzinfo=None)
logger.info(f"仓库 {repo}上次检查时间: {last_check_dt.isoformat()}")
logger.info(f"仓库 {repo}Issue/PR上次检查时间: {last_check_dt.isoformat()}")
new_items = []
# GitHub API在issues端点中同时返回issues和PRs
async with aiohttp.ClientSession() as session:
try:
params = {
@ -312,56 +328,107 @@ class MyPlugin(Star):
items = await resp.json()
for item in items:
# 将GitHub的时间戳转换为简单的UTC datetime以进行一致的比较
github_timestamp = item["created_at"].replace("Z", "")
created_at = datetime.fromisoformat(github_timestamp)
# 始终移除时区信息以进行比较
created_at = created_at.replace(tzinfo=None)
logger.info(
f"比较: 仓库 {repo} 的item #{item['number']} 创建于 {created_at.isoformat()}, 上次检查: {last_check_dt.isoformat()}"
)
if created_at > last_check_dt:
logger.info(
f"发现新的item #{item['number']} in {repo}"
)
logger.info(f"发现新的item #{item['number']} in {repo}")
new_items.append(item)
else:
# 由于项目按创建时间排序,我们可以提前中断
logger.info(f"没有更多新items in {repo}")
break
else:
logger.error(
f"获取仓库 {repo} 的Issue/PR失败: {resp.status}: {await resp.text()}"
)
logger.error(f"获取仓库 {repo} 的Issue/PR失败: {resp.status}: {await resp.text()}")
except Exception as e:
logger.error(f"获取仓库 {repo} 的Issue/PR时出错: {e}")
# 将最后检查时间更新为现在不带时区信息的UTC
if new_items:
logger.info(f"找到 {len(new_items)} 个新的items在 {repo}")
else:
logger.info(f"没有找到新的items在 {repo}")
# 检查后始终更新时间戳,无论是否找到项目
self.last_check_time[repo] = (
self.last_check_time["issues"][repo] = (
datetime.utcnow().replace(microsecond=0).isoformat()
)
logger.info(f"更新仓库 {repo}时间戳为: {self.last_check_time[repo]}")
logger.info(f"更新仓库 {repo} 的Issue/PR时间戳为: {self.last_check_time['issues'][repo]}")
return new_items
except Exception as e:
logger.error(f"解析时间时出错: {e}")
# 如果无法正确解析时间,只需返回空列表
# 并更新最后检查时间以防止连续错误
self.last_check_time[repo] = (
logger.error(f"解析Issue/PR时间时出错: {e}")
self.last_check_time["issues"][repo] = (
datetime.utcnow().replace(microsecond=0).isoformat()
)
logger.info(
f"出错后更新仓库 {repo} 的时间戳为: {self.last_check_time[repo]}"
logger.info(f"出错后更新仓库 {repo} 的Issue/PR时间戳为: {self.last_check_time['issues'][repo]}")
return []
# 新增获取新的Releases
async def _fetch_new_releases(self, repo: str, last_check: str):
"""从上次检查以来获取仓库的新Releases"""
if not last_check:
# 第一次检查,初始化时间戳
self.last_check_time["releases"][repo] = (
datetime.utcnow().replace(microsecond=0).isoformat()
)
logger.info(f"初始化仓库 {repo} 的Release时间戳: {self.last_check_time['releases'][repo]}")
return []
try:
last_check_dt = datetime.fromisoformat(last_check)
if hasattr(last_check_dt, "tzinfo") and last_check_dt.tzinfo is not None:
last_check_dt = last_check_dt.replace(tzinfo=None)
logger.info(f"仓库 {repo} 的Release上次检查时间: {last_check_dt.isoformat()}")
new_releases = []
async with aiohttp.ClientSession() as session:
try:
params = {
"sort": "published",
"direction": "desc",
"per_page": 10,
}
async with session.get(
GITHUB_RELEASES_API_URL.format(repo=repo),
params=params,
headers=self._get_github_headers(),
) as resp:
if resp.status == 200:
releases = await resp.json()
for release in releases:
# 跳过草稿版本
if release["draft"]:
continue
# 根据配置决定是否跳过预发布版本
if not self.include_prereleases and release["prerelease"]:
continue
# 解析发布时间
publish_timestamp = release["published_at"].replace("Z", "") if release["published_at"] else None
if not publish_timestamp:
continue # 跳过未发布的版本
published_at = datetime.fromisoformat(publish_timestamp)
published_at = published_at.replace(tzinfo=None)
if published_at > last_check_dt:
logger.info(f"发现新的Release {release['tag_name']} in {repo}")
new_releases.append(release)
else:
break # 按时间排序,可提前中断
else:
logger.error(f"获取仓库 {repo} 的Release失败: {resp.status}: {await resp.text()}")
except Exception as e:
logger.error(f"获取仓库 {repo} 的Release时出错: {e}")
self.last_check_time["releases"][repo] = (
datetime.utcnow().replace(microsecond=0).isoformat()
)
logger.info(f"更新仓库 {repo} 的Release时间戳为: {self.last_check_time['releases'][repo]}")
return new_releases
except Exception as e:
logger.error(f"解析Release时间时出错: {e}")
self.last_check_time["releases"][repo] = (
datetime.utcnow().replace(microsecond=0).isoformat()
)
logger.info(f"出错后更新仓库 {repo} 的Release时间戳为: {self.last_check_time['releases'][repo]}")
return []
async def _notify_subscribers(self, repo: str, new_items: List[Dict]):
@ -371,7 +438,6 @@ class MyPlugin(Star):
for subscriber_id in self.subscriptions.get(repo, []):
try:
# 创建通知消息
for item in new_items:
item_type = "PR" if "pull_request" in item else "Issue"
message = (
@ -381,16 +447,54 @@ class MyPlugin(Star):
f"链接: {item['html_url']}"
)
# 向订阅者发送消息
await self.context.send_message(
subscriber_id, Comp.Plain(message)
)
# 消息之间添加小延迟以避免速率限制
await asyncio.sleep(1)
except Exception as e:
logger.error(f"向订阅者 {subscriber_id} 发送通知时出错: {e}")
# 新增通知订阅者有关新的Releases
async def _notify_subscribers_releases(self, repo: str, new_releases: List[Dict]):
"""通知订阅者有关新的Release"""
if not new_releases:
return
for subscriber_id in self.subscriptions.get(repo, []):
try:
for release in new_releases:
# 处理发布说明(过长时截断)
body = release.get("body", "无发布说明")
if len(body) > 200:
body = body[:200] + "..."
# 构建通知消息
message_parts = [
f"[GitHub Release更新] 仓库 {repo} 发布了新版本:\n",
f"版本: {release['tag_name']}"
]
# 如果是预发布版本,添加标记
if release["prerelease"]:
message_parts.append(" 🧪 预发布")
message_parts.extend([
f"\n标题: {release['name'] or '无标题'}\n",
f"发布时间: {release['published_at'].replace('T', ' ').replace('Z', '')}\n",
f"说明: {body}\n",
f"下载: {release['html_url']}"
])
message = ''.join(message_parts)
# 发送通知
await self.context.send_message(
subscriber_id, Comp.Plain(message)
)
await asyncio.sleep(1)
except Exception as e:
logger.error(f"向订阅者 {subscriber_id} 发送Release通知时出错: {e}")
async def terminate(self):
"""终止前清理并保存数据"""
self._save_subscriptions()