from astrbot.api.event import AstrMessageEvent from astrbot.api.event.filter import event_message_type, EventMessageType from astrbot.api.star import Context, Star, register from astrbot.api import logger from astrbot.core.platform import AstrBotMessage from astrbot.core.message.components import At import astrbot.api.message_components as Comp import aiohttp import json from typing import Dict, Any, Optional @register("qqfight-bot", "hxuanyu", "消息中转插件", "1.0.0") class RelayPlugin(Star): def __init__(self, context: Context, config=None): super().__init__(context) self.config = config or {} # 读取群聊白名单和私聊黑名单,全部转为字符串集合,避免类型不一致 self.group_whitelist = set(str(x) for x in self.config.get("group_whitelist", [])) self.private_blacklist = set(str(x) for x in self.config.get("private_blacklist", [])) self.server_url = self.config.get("server", {}).get("url", "http://127.0.0.1:8080") self.server_token = self.config.get("server", {}).get("token", "") logger.info(f"[RelayPlugin] 群聊白名单: {self.group_whitelist}") logger.info(f"[RelayPlugin] 私聊黑名单: {self.private_blacklist}") logger.info(f"[RelayPlugin] 服务器地址: {self.server_url}") async def send_to_server(self, data: Dict[Any, Any]) -> Optional[Dict[str, Any]]: """ 发送JSON数据到服务器的/api/bot接口 Args: data: 要发送的JSON数据字典 Returns: 服务器响应的JSON字典,格式为 {"code": "200", "msg": "", "data": ""} 如果请求失败则返回 None """ url = f"{self.server_url.rstrip('/')}/api/bot" headers = { "Content-Type": "application/json", "Authorization": f"Bearer {self.server_token}" } try: async with aiohttp.ClientSession() as session: async with session.post(url, json=data, headers=headers, timeout=aiohttp.ClientTimeout(total=10)) as response: logger.info(f"[RelayPlugin] 发送请求到 {url}, 状态码: {response.status}") if response.status == 200: response_json = await response.json() logger.info(f"[RelayPlugin] 服务器响应: {response_json}") return response_json else: logger.error(f"[RelayPlugin] HTTP请求失败, 状态码: {response.status}") response_text = await response.text() logger.error(f"[RelayPlugin] 错误响应内容: {response_text}") return None except aiohttp.ClientError as e: logger.error(f"[RelayPlugin] 网络请求异常: {e}") return None except json.JSONDecodeError as e: logger.error(f"[RelayPlugin] JSON解析异常: {e}") return None except Exception as e: logger.error(f"[RelayPlugin] 未知异常: {e}") return None async def send_message_to_server(self, message_type: str, message_obj: AstrBotMessage, extra_data: Dict[str, Any] = None) -> Optional[str]: """ 向服务器发送消息的封装方法 Args: message_type: 消息类型,"group" 或 "private" message_obj: AstrBotMessage 消息对象 extra_data: 额外的数据字典 Returns: 服务器响应的 data 字段内容,如果失败则返回 None """ # 构建基础消息数据 message_data = { "type": message_type, "message": str(message_obj.message_str), "message_id": message_obj.message_id, "timestamp": getattr(message_obj, 'timestamp', None), "sender_nickname": str(message_obj.sender.nickname), } # 根据消息类型添加特定字段 if message_type == "group": message_data.update({ "group_id": str(message_obj.group_id), "sender_id": str(message_obj.sender.user_id) }) elif message_type == "private": message_data.update({ "sender_id": str(message_obj.sender.user_id) }) # 合并额外数据 if extra_data: message_data.update(extra_data) logger.info(f"[RelayPlugin] 准备发送{message_type}消息到服务器: {message_data}") # 发送请求 response = await self.send_to_server(message_data) if response and isinstance(response, dict): # 提取 data 字段 data = response.get("data", "") logger.info(f"[RelayPlugin] 服务器返回数据: {data}") return data else: logger.error(f"[RelayPlugin] 服务器请求失败或返回格式错误") return None def check_at_bot(self, message_obj: AstrBotMessage) -> bool: """ 检测群聊消息是否@了当前机器人。 依据: plugin.md 中 AstrBotMessage 及消息链定义: - message_obj.message 为 BaseMessageComponent 列表 - At(qq=...) 为 @ 消息段,message_obj.self_id 为机器人的标识 ID 当消息链中存在 At 且其 qq 等于 self_id 时,判定为@了机器人。 忽略 AtAll(@全体)。 """ self_id = message_obj.self_id segments = message_obj.message for segment in segments: if isinstance(segment, At) and str(segment.qq) == self_id: logger.info(f"[check_at_bot] 消息 {message_obj.message_id} @了机器人 {self_id}") return True logger.info(f"[check_at_bot] 消息 {message_obj.message_id} 未@机器人 {self_id}") return False @event_message_type(EventMessageType.GROUP_MESSAGE) async def on_group_message(self, event: AstrMessageEvent): group_id = str(event.message_obj.group_id) at_bot = self.check_at_bot(event.message_obj) if group_id not in self.group_whitelist: logger.info(f"[RelayPlugin] 群聊 {group_id} 不在白名单,忽略消息") return if not at_bot: logger.info(f"[RelayPlugin] 群聊 {group_id} 消息未@机器人,忽略消息") return logger.info(f"[RelayPlugin] 群聊 {group_id} 消息链: {event.message_obj.message}") # 发送群聊消息到服务器 result_data = await self.send_message_to_server("group", event.message_obj) logger.info(f"收到服务端响应:{result_data}") if result_data: message_chan = [ Comp.At(qq=event.get_sender_id()), Comp.Plain(str(result_data)) ] yield event.chain_result(message_chan) else: logger.error(f"[RelayPlugin] 处理群聊消息失败") yield event.plain_result("处理消息时发生错误,请稍后重试") @event_message_type(EventMessageType.PRIVATE_MESSAGE) async def on_private_message(self, event: AstrMessageEvent): sender = event.message_obj.sender if hasattr(sender, 'id'): sender_id = str(sender.id) else: sender_id = str(sender) if sender_id in self.private_blacklist: logger.info(f"[RelayPlugin] 私聊 {sender_id} 在黑名单,忽略消息") return logger.info(f"[RelayPlugin] 私聊 {sender_id} 消息链: {event.message_obj.message}") # 发送私聊消息到服务器 if event.message_obj.message_str: result_data = await self.send_message_to_server("private", event.message_obj) logger.info(f"收到服务端响应:{result_data}") if result_data: yield event.plain_result(str(result_data)) else: logger.error(f"[RelayPlugin] 处理私聊消息失败") yield event.plain_result("处理消息时发生错误,请稍后重试")