与服务端接口打通

This commit is contained in:
hxuanyu 2025-08-13 18:47:44 +08:00
parent 4de10d02d7
commit 717a953f70
2 changed files with 170 additions and 24 deletions

View File

@ -10,5 +10,23 @@
"type": "list",
"hint": "填写需要屏蔽的用户ID多个ID用英文逗号分隔",
"default": []
},
"server": {
"description": "后端服务器相关配置",
"type": "object",
"items": {
"url": {
"description": "后端服务器地址",
"type": "string",
"hint": "填写后端服务器的地址,用于收到消息时从服务端获取响应",
"default": "http://127.0.0.1:8080"
},
"token": {
"description": "后端服务器请求token请手动从后端服务器生成token并配置",
"type": "string",
"hint": "填写后端服务器的token",
"default": ""
}
}
}
}

168
main.py
View File

@ -4,8 +4,127 @@ from astrbot.api.star import Context, Star, register
from astrbot.api import logger
from astrbot.core.platform import AstrBotMessage
from astrbot.core.message.components import At
import astrbot.api.message_components as Comp
def check_at_bot(message_obj:AstrBotMessage) -> bool:
import aiohttp
import json
from typing import Dict, Any, Optional
@register("qqfight-bot", "hxuanyu", "消息中转插件", "1.0.0")
class RelayPlugin(Star):
def __init__(self, context: Context, config=None):
super().__init__(context)
self.config = config or {}
# 读取群聊白名单和私聊黑名单,全部转为字符串集合,避免类型不一致
self.group_whitelist = set(str(x) for x in self.config.get("group_whitelist", []))
self.private_blacklist = set(str(x) for x in self.config.get("private_blacklist", []))
self.server_url = self.config.get("server", {}).get("url", "http://127.0.0.1:8080")
self.server_token = self.config.get("server", {}).get("token", "")
logger.info(f"[RelayPlugin] 群聊白名单: {self.group_whitelist}")
logger.info(f"[RelayPlugin] 私聊黑名单: {self.private_blacklist}")
logger.info(f"[RelayPlugin] 服务器地址: {self.server_url}")
async def send_to_server(self, data: Dict[Any, Any]) -> Optional[Dict[str, Any]]:
"""
发送JSON数据到服务器的/api/bot接口
Args:
data: 要发送的JSON数据字典
Returns:
服务器响应的JSON字典格式为 {"code": "200", "msg": "", "data": ""}
如果请求失败则返回 None
"""
url = f"{self.server_url.rstrip('/')}/api/bot"
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {self.server_token}"
}
try:
async with aiohttp.ClientSession() as session:
async with session.post(url,
json=data,
headers=headers,
timeout=aiohttp.ClientTimeout(total=10)) as response:
logger.info(f"[RelayPlugin] 发送请求到 {url}, 状态码: {response.status}")
if response.status == 200:
response_json = await response.json()
logger.info(f"[RelayPlugin] 服务器响应: {response_json}")
return response_json
else:
logger.error(f"[RelayPlugin] HTTP请求失败, 状态码: {response.status}")
response_text = await response.text()
logger.error(f"[RelayPlugin] 错误响应内容: {response_text}")
return None
except aiohttp.ClientError as e:
logger.error(f"[RelayPlugin] 网络请求异常: {e}")
return None
except json.JSONDecodeError as e:
logger.error(f"[RelayPlugin] JSON解析异常: {e}")
return None
except Exception as e:
logger.error(f"[RelayPlugin] 未知异常: {e}")
return None
async def send_message_to_server(self, message_type: str, message_obj: AstrBotMessage,
extra_data: Dict[str, Any] = None) -> Optional[str]:
"""
向服务器发送消息的封装方法
Args:
message_type: 消息类型"group" "private"
message_obj: AstrBotMessage 消息对象
extra_data: 额外的数据字典
Returns:
服务器响应的 data 字段内容如果失败则返回 None
"""
# 构建基础消息数据
message_data = {
"type": message_type,
"message": str(message_obj.message_str),
"message_id": message_obj.message_id,
"timestamp": getattr(message_obj, 'timestamp', None),
"sender_nickname": str(message_obj.sender.nickname),
}
# 根据消息类型添加特定字段
if message_type == "group":
message_data.update({
"group_id": str(message_obj.group_id),
"sender_id": str(message_obj.sender.user_id)
})
elif message_type == "private":
message_data.update({
"sender_id": str(message_obj.sender.user_id)
})
# 合并额外数据
if extra_data:
message_data.update(extra_data)
logger.info(f"[RelayPlugin] 准备发送{message_type}消息到服务器: {message_data}")
# 发送请求
response = await self.send_to_server(message_data)
if response and isinstance(response, dict):
# 提取 data 字段
data = response.get("data", "")
logger.info(f"[RelayPlugin] 服务器返回数据: {data}")
return data
else:
logger.error(f"[RelayPlugin] 服务器请求失败或返回格式错误")
return None
def check_at_bot(self, message_obj: AstrBotMessage) -> bool:
"""
检测群聊消息是否@了当前机器人
依据: plugin.md AstrBotMessage 及消息链定义:
@ -23,25 +142,10 @@ def check_at_bot(message_obj:AstrBotMessage) -> bool:
logger.info(f"[check_at_bot] 消息 {message_obj.message_id} 未@机器人 {self_id}")
return False
@register("qqfight-bot", "hxuanyu", "消息中转插件", "1.0.0")
class RelayPlugin(Star):
def __init__(self, context: Context, config=None):
super().__init__(context)
self.config = config or {}
# 读取群聊白名单和私聊黑名单,全部转为字符串集合,避免类型不一致
self.group_whitelist = set(str(x) for x in self.config.get("group_whitelist", []))
self.private_blacklist = set(str(x) for x in self.config.get("private_blacklist", []))
logger.info(f"[RelayPlugin] 群聊白名单: {self.group_whitelist}")
logger.info(f"[RelayPlugin] 私聊黑名单: {self.private_blacklist}")
@event_message_type(EventMessageType.GROUP_MESSAGE)
async def on_group_message(self, event: AstrMessageEvent):
group_id = str(event.message_obj.group_id)
at_bot = check_at_bot(event.message_obj)
at_bot = self.check_at_bot(event.message_obj)
if group_id not in self.group_whitelist:
logger.info(f"[RelayPlugin] 群聊 {group_id} 不在白名单,忽略消息")
@ -49,8 +153,21 @@ class RelayPlugin(Star):
if not at_bot:
logger.info(f"[RelayPlugin] 群聊 {group_id} 消息未@机器人,忽略消息")
return
logger.info(f"[RelayPlugin] 群聊 {group_id} 消息链: {event.message_obj.message}")
# TODO: 处理函数留空
# 发送群聊消息到服务器
result_data = await self.send_message_to_server("group", event.message_obj)
logger.info(f"收到服务端响应:{result_data}")
if result_data:
message_chan = [
Comp.At(qq=event.get_sender_id()),
Comp.Plain(str(result_data))
]
yield event.chain_result(message_chan)
else:
logger.error(f"[RelayPlugin] 处理群聊消息失败")
yield event.plain_result("处理消息时发生错误,请稍后重试")
@event_message_type(EventMessageType.PRIVATE_MESSAGE)
async def on_private_message(self, event: AstrMessageEvent):
@ -59,8 +176,19 @@ class RelayPlugin(Star):
sender_id = str(sender.id)
else:
sender_id = str(sender)
if sender_id in set(str(x) for x in self.private_blacklist):
if sender_id in self.private_blacklist:
logger.info(f"[RelayPlugin] 私聊 {sender_id} 在黑名单,忽略消息")
return
logger.info(f"[RelayPlugin] 私聊 {sender_id} 消息链: {event.message_obj.message}")
# TODO: 处理函数留空
# 发送私聊消息到服务器
if event.message_obj.message_str:
result_data = await self.send_message_to_server("private", event.message_obj)
logger.info(f"收到服务端响应:{result_data}")
if result_data:
yield event.plain_result(str(result_data))
else:
logger.error(f"[RelayPlugin] 处理私聊消息失败")
yield event.plain_result("处理消息时发生错误,请稍后重试")