diff --git a/frontends/chatapp_common.py b/frontends/chatapp_common.py
index befaf1c8..fc2dfecd 100644
--- a/frontends/chatapp_common.py
+++ b/frontends/chatapp_common.py
@@ -35,6 +35,23 @@ def build_help_text(commands=HELP_COMMANDS):
return "📖 命令列表:\n" + "\n".join(f"{cmd} - {desc}" for cmd, desc in commands)
+def _extract_progress(text):
+ """从 agent 的流式文本中提取一句可读的进度快照。
+ 优先取最新 ...,否则取最后一行有意义的文本。"""
+ if not text:
+ return ""
+ ms = re.findall(r"(.*?)", text, re.DOTALL)
+ if ms:
+ s = ms[-1].strip()
+ if s:
+ return s[:300]
+ for line in reversed(text.splitlines()):
+ ln = line.strip()
+ if ln and not ln.startswith("```") and not ln.startswith("<"):
+ return ln[:300]
+ return ""
+
+
HELP_TEXT = build_help_text()
FILE_HINT = "If you need to show files to user, use [FILE:filepath] in your response."
TAG_PATS = [r"<" + t + r">.*?" + t + r">" for t in ("thinking", "summary", "tool_use", "file_content")]
@@ -259,6 +276,10 @@ class AgentChatMixin:
source = "chat"
split_limit = 1500
ping_interval = 20
+ # 流式逐 turn 心跳:开启后每个 turn 跑完即把该 turn 详情作为日志推送,
+ # 不再发"还在处理中"占位,也不在收尾时重复汇总全部 turn。默认关闭,
+ # 仅在能承受多条出站消息的前端(如 QQ)启用。
+ stream_turns = False
def __init__(self, agent, user_tasks):
self.agent, self.user_tasks = agent, user_tasks
@@ -269,6 +290,33 @@ async def send_text(self, chat_id, content, **ctx):
async def send_done(self, chat_id, raw_text, **ctx):
await self.send_text(chat_id, build_done_text(raw_text), **ctx)
+ @staticmethod
+ def format_turn_log(turn_no, text):
+ """把单个 turn 的累积文本整理成可读日志。
+ 保留 (监控用),剔除 (冗长),并去掉重复的
+ "LLM Running (Turn N) ..." 头,避免与外层 Turn 标记重复。"""
+ body = re.sub(r".*?", "", text or "", flags=re.DOTALL)
+ body = re.sub(r"^\s*\*{0,2}(?:LLM Running )?\(?Turn \d+\)? ?\.\.\.\*{0,2}\s*", "", body)
+ body = re.sub(r"\n{3,}", "\n\n", body).strip()
+ return f"📍 Turn {turn_no}\n{body or '(无文本输出)'}"
+
+ async def send_turn(self, chat_id, turn_no, text, **ctx):
+ """逐 turn 心跳推送。容错:单条心跳发送失败不应中断整个任务。"""
+ try:
+ await self.send_text(chat_id, self.format_turn_log(turn_no, text), **ctx)
+ except Exception as e:
+ print(f"[{self.label}] send_turn {turn_no} failed: {e}")
+
+ async def send_done_files(self, chat_id, raw_text, **ctx):
+ """流式模式收尾:只补发生成的文件,不再重复汇总全部 turn 文本。
+ 基类无富媒体能力,默认空操作;有富媒体的前端(如 QQ)覆盖此方法。"""
+ return
+
+ @staticmethod
+ def format_done_message(turn_count):
+ """对话完全结束后的结束语提示。"""
+ return f"✅ 本次任务已全部结束,共 {turn_count} 个 turn。可以下达新指令了。"
+
async def handle_command(self, chat_id, cmd, **ctx):
parts = (cmd or "").split()
op = (parts[0] if parts else "").lower()
@@ -317,23 +365,42 @@ async def handle_command(self, chat_id, cmd, **ctx):
return await self.send_text(chat_id, HELP_TEXT, **ctx)
async def run_agent(self, chat_id, text, **ctx):
+ if self.stream_turns:
+ return await self._run_agent_streaming(chat_id, text, **ctx)
+ return await self._run_agent_classic(chat_id, text, **ctx)
+
+ async def _run_agent_classic(self, chat_id, text, **ctx):
state = {"running": True}
self.user_tasks[chat_id] = state
try:
- await self.send_text(chat_id, "思考中...", **ctx)
+ await self.send_text(chat_id, "🤔 思考中...", **ctx)
dq = self.agent.put_task(f"{FILE_HINT}\n\n{text}", source=self.source)
last_ping = time.time()
+ latest = "" # 最近一次的进度原文(当前 turn 文本)
+ sent_progress = "" # 上次实际发出的进度快照,去重用
while state["running"]:
try:
item = await asyncio.to_thread(dq.get, True, 3)
except Q.Empty:
+ # 节流:每 ping_interval 发一次进度快照(沿用原有消息预算,
+ # 不额外增加被动回复条数,只是把"处理中"换成真实进度)
if self.agent.is_running and time.time() - last_ping > self.ping_interval:
- await self.send_text(chat_id, "⏳ 还在处理中,请稍等...", **ctx)
+ snap = _extract_progress(latest)
+ if snap and snap != sent_progress:
+ await self.send_text(chat_id, f"⚙️ {snap}", **ctx)
+ sent_progress = snap
+ else:
+ await self.send_text(chat_id, "⏳ 还在处理中,请稍等...", **ctx)
last_ping = time.time()
continue
if "done" in item:
- await self.send_done(chat_id, item.get("done", ""), **ctx)
+ # 被 /stop 打断时不再补发完整结果,交给下方"已停止"提示
+ if state["running"]:
+ await self.send_done(chat_id, item.get("done", ""), **ctx)
break
+ if "next" in item:
+ outs = item.get("outputs") or []
+ latest = outs[-1] if outs else (item.get("next", "") or "")
if not state["running"]:
await self.send_text(chat_id, "⏹️ 已停止", **ctx)
except Exception as e:
@@ -344,6 +411,58 @@ async def run_agent(self, chat_id, text, **ctx):
finally:
self.user_tasks.pop(chat_id, None)
+ async def _run_agent_streaming(self, chat_id, text, **ctx):
+ """逐 turn 实时心跳模式:每个 turn 跑完即推送该 turn 详情,作为实时日志。
+ 不发"还在处理中"占位;收尾时不再重复汇总全部 turn,只补发生成的文件。
+
+ turn 完成判定:put_task 产出的 'next' 项里 turn 号递增,意味着上一个
+ turn 已经结束。此时 outputs[-2] 是刚结束 turn 的完整文本(agentmain 用
+ turn_resps[-2:] 携带最近两个 turn)。done 项携带全部 turn 文本,用于补发
+ 尚未推送的尾部 turn。"""
+ state = {"running": True}
+ self.user_tasks[chat_id] = state
+ sent_turn = 0 # 已推送的最大 turn 号
+ try:
+ await self.send_text(chat_id, "🤔 开始处理,将逐 turn 推送日志...", **ctx)
+ dq = self.agent.put_task(f"{FILE_HINT}\n\n{text}", source=self.source)
+ while state["running"]:
+ try:
+ item = await asyncio.to_thread(dq.get, True, 3)
+ except Q.Empty:
+ continue
+ if "next" in item:
+ cur = item.get("turn", 0)
+ outs = item.get("outputs") or []
+ # turn 号已推进且恰好领先 1 → 上一个 turn (cur-1) 刚结束,
+ # 其完整文本就在 outs[-2](agentmain 用 turn_resps[-2:] 携带)。
+ # 只在严格连续(cur-1 == sent_turn+1)时推送,避免重复;
+ # 万一出现跳跃,留给 done 阶段统一补发,保证不漏不重。
+ if cur - 1 == sent_turn + 1 and len(outs) >= 2:
+ await self.send_turn(chat_id, cur - 1, outs[-2], **ctx)
+ sent_turn = cur - 1
+ continue
+ if "done" in item:
+ if state["running"]:
+ all_outs = item.get("outputs") or []
+ # 补发所有尚未推送的 turn(含最后一个 turn)
+ for t in range(sent_turn + 1, len(all_outs) + 1):
+ await self.send_turn(chat_id, t, all_outs[t - 1], **ctx)
+ sent_turn = len(all_outs)
+ # 只补发生成的文件,不重复汇总全部 turn 文本
+ await self.send_done_files(chat_id, item.get("done", ""), **ctx)
+ # 对话完全结束后发结束语提示
+ await self.send_text(chat_id, self.format_done_message(sent_turn), **ctx)
+ break
+ if not state["running"]:
+ await self.send_text(chat_id, "⏹️ 已停止", **ctx)
+ except Exception as e:
+ import traceback
+ print(f"[{self.label}] run_agent(stream) error: {e}")
+ traceback.print_exc()
+ await self.send_text(chat_id, f"❌ 错误: {e}", **ctx)
+ finally:
+ self.user_tasks.pop(chat_id, None)
+
from agentmain import GeneraticAgent as _GA
from continue_cmd import handle_frontend_command as _handle_continue_frontend, install as _install_continue, reset_conversation as _reset_conversation
diff --git a/frontends/puburl.py b/frontends/puburl.py
new file mode 100644
index 00000000..6056c3f9
--- /dev/null
+++ b/frontends/puburl.py
@@ -0,0 +1,244 @@
+"""
+puburl.py — 把本地文件临时暴露成公网 HTTPS URL,供 QQ 富媒体出站使用。
+
+QQ 出站富媒体(图片/视频/语音/文件)不接受字节直传,必须给腾讯一个公网 URL
+让它反向拉取。本模块在【任意机器】上自包含地解决这个依赖:
+
+ 本地文件 -> 复制到内部 serve 目录 -> 内置 HTTP 文件服务(127.0.0.1:随机端口)
+ -> cloudflared quick tunnel 出站建连 -> 公网 https://xxx.trycloudflare.com//
+
+设计目标(换任何电脑部署 GA、重启即复现):
+ 1. 隧道 URL 每次重启都变 —— 运行期从 cloudflared 输出实时抓取,绝不硬编码。
+ 2. cloudflared 缺失 —— 按当前 OS/架构自动从官方 GitHub 下载到 .portable/tools/。
+ 3. 纯标准库实现 HTTP 服务,无额外依赖。
+
+安全:文件放在以 uuid4 token 命名的子目录下,URL 不可枚举;隧道地址本身随机;
+仅在隧道存活期间可达,并自动清理超过 TTL 的旧文件。
+"""
+
+import atexit
+import functools
+import http.server
+import os
+import platform
+import re
+import shutil
+import socket
+import stat
+import subprocess
+import threading
+import time
+import urllib.request
+import uuid
+
+ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
+TOOLS_DIR = os.path.join(ROOT, ".portable", "tools")
+SERVE_DIR = os.path.join(ROOT, "temp", "_pubserve")
+FILE_TTL = 3600 # 已发布文件保留秒数,超过则清理
+
+_TUNNEL_RE = re.compile(r"https://[-a-z0-9]+\.trycloudflare\.com", re.I)
+
+
+def _log(msg):
+ print(f"[puburl] {msg}", flush=True)
+
+
+class _QuietHandler(http.server.SimpleHTTPRequestHandler):
+ """静默版文件 handler(不污染 qqapp.log)。"""
+
+ def log_message(self, *args):
+ pass
+
+
+def _cf_asset():
+ """返回 (github资产名, 本地二进制文件名),按当前系统/架构。"""
+ sysname = platform.system().lower()
+ machine = platform.machine().lower()
+ if machine in ("aarch64", "arm64"):
+ arch = "arm64"
+ elif machine in ("x86_64", "amd64", "x64"):
+ arch = "amd64"
+ elif "386" in machine or "i686" in machine or "i386" in machine:
+ arch = "386"
+ elif machine.startswith("arm"):
+ arch = "arm"
+ else:
+ arch = "amd64"
+ if sysname == "windows":
+ return f"cloudflared-windows-{arch}.exe", "cloudflared.exe"
+ if sysname == "linux":
+ return f"cloudflared-linux-{arch}", "cloudflared"
+ if sysname == "darwin":
+ # macOS 官方只发布 .tgz
+ return f"cloudflared-darwin-{arch}.tgz", "cloudflared"
+ return f"cloudflared-linux-{arch}", "cloudflared"
+
+
+class PublicFileServer:
+ def __init__(self):
+ self._lock = threading.Lock()
+ self._httpd = None
+ self._http_port = None
+ self._cf_proc = None
+ self._tunnel_url = None
+
+ # ---- cloudflared 二进制 ----
+ def _ensure_cloudflared(self):
+ asset, binname = _cf_asset()
+ os.makedirs(TOOLS_DIR, exist_ok=True)
+ binpath = os.path.join(TOOLS_DIR, binname)
+ if os.path.exists(binpath) and os.path.getsize(binpath) > 0:
+ return binpath
+ url = f"https://github.com/cloudflare/cloudflared/releases/latest/download/{asset}"
+ _log(f"cloudflared 未找到,开始下载: {asset}")
+ tmp = binpath + ".part"
+ req = urllib.request.Request(url, headers={"User-Agent": "GA-puburl"})
+ with urllib.request.urlopen(req, timeout=120) as resp, open(tmp, "wb") as f:
+ shutil.copyfileobj(resp, f)
+ if asset.endswith(".tgz"):
+ import tarfile
+ with tarfile.open(tmp) as tar:
+ member = next((m for m in tar.getmembers() if m.name.endswith("cloudflared")), None)
+ if not member:
+ raise RuntimeError("tgz 内未找到 cloudflared")
+ with tar.extractfile(member) as src, open(binpath, "wb") as dst:
+ shutil.copyfileobj(src, dst)
+ os.remove(tmp)
+ else:
+ os.replace(tmp, binpath)
+ if platform.system().lower() != "windows":
+ os.chmod(binpath, os.stat(binpath).st_mode | stat.S_IEXEC | stat.S_IXGRP | stat.S_IXOTH)
+ _log(f"cloudflared 已就绪: {binpath} ({os.path.getsize(binpath)} bytes)")
+ return binpath
+
+ # ---- 本地 HTTP 文件服务 ----
+ def _start_http(self):
+ os.makedirs(SERVE_DIR, exist_ok=True)
+ handler = functools.partial(_QuietHandler, directory=SERVE_DIR)
+ httpd = http.server.ThreadingHTTPServer(("127.0.0.1", 0), handler)
+ self._http_port = httpd.server_address[1]
+ self._httpd = httpd
+ threading.Thread(target=httpd.serve_forever, daemon=True).start()
+ _log(f"本地文件服务启动于 127.0.0.1:{self._http_port}")
+
+ # ---- cloudflared 隧道 ----
+ def _start_tunnel(self, binpath):
+ cmd = [
+ binpath, "tunnel",
+ "--no-autoupdate",
+ "--url", f"http://127.0.0.1:{self._http_port}",
+ ]
+ self._cf_proc = subprocess.Popen(
+ cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
+ text=True, encoding="utf-8", errors="replace", bufsize=1,
+ )
+ found = threading.Event()
+
+ def _reader():
+ for line in self._cf_proc.stdout:
+ if self._tunnel_url is None:
+ m = _TUNNEL_RE.search(line)
+ if m:
+ self._tunnel_url = m.group(0)
+ _log(f"隧道已建立: {self._tunnel_url}")
+ found.set()
+ threading.Thread(target=_reader, daemon=True).start()
+ if not found.wait(timeout=45):
+ raise RuntimeError("等待 cloudflared 隧道 URL 超时")
+ self._warmup(self._tunnel_url)
+
+ def _warmup(self, base_url):
+ """隧道刚建立时边缘节点可能尚未就绪,首次请求会 SSL EOF。
+ 这里自探到拿回任意 HTTP 响应为止,避免腾讯首次反向拉取失败。"""
+ import ssl
+ ctx = ssl.create_default_context()
+ for i in range(10):
+ try:
+ req = urllib.request.Request(base_url, headers={"User-Agent": "ga-warmup"})
+ urllib.request.urlopen(req, timeout=15, context=ctx)
+ _log(f"隧道边缘就绪 (warmup#{i})")
+ return True
+ except urllib.error.HTTPError:
+ # 有 HTTP 响应(如 404)即说明边缘已就绪
+ _log(f"隧道边缘就绪 (warmup#{i}, http)")
+ return True
+ except Exception:
+ time.sleep(3)
+ _log("隧道预热未确认就绪,继续(腾讯侧可能首拉失败)")
+ return False
+
+ def ensure_started(self):
+ with self._lock:
+ if self._tunnel_url and self._cf_proc and self._cf_proc.poll() is None:
+ return self._tunnel_url
+ # 隧道挂了则重置重建
+ if self._cf_proc and self._cf_proc.poll() is not None:
+ _log("检测到 cloudflared 已退出,重建隧道")
+ self._tunnel_url = None
+ if self._httpd is None:
+ self._start_http()
+ binpath = self._ensure_cloudflared()
+ self._start_tunnel(binpath)
+ return self._tunnel_url
+
+ # ---- 清理过期文件 ----
+ def _cleanup(self):
+ now = time.time()
+ try:
+ for name in os.listdir(SERVE_DIR):
+ p = os.path.join(SERVE_DIR, name)
+ try:
+ if now - os.path.getmtime(p) > FILE_TTL:
+ shutil.rmtree(p, ignore_errors=True) if os.path.isdir(p) else os.remove(p)
+ except OSError:
+ pass
+ except FileNotFoundError:
+ pass
+
+ # ---- 对外接口 ----
+ def publish(self, local_path):
+ """把本地文件复制到 serve 目录并返回公网 URL;失败返回 None。"""
+ if not os.path.isfile(local_path):
+ return None
+ url = self.ensure_started()
+ if not url:
+ return None
+ self._cleanup()
+ token = uuid.uuid4().hex
+ dest_dir = os.path.join(SERVE_DIR, token)
+ os.makedirs(dest_dir, exist_ok=True)
+ fname = os.path.basename(local_path)
+ shutil.copy2(local_path, os.path.join(dest_dir, fname))
+ return f"{url}/{token}/{urllib.request.quote(fname)}"
+
+ def shutdown(self):
+ if self._cf_proc and self._cf_proc.poll() is None:
+ self._cf_proc.terminate()
+ if self._httpd:
+ self._httpd.shutdown()
+
+
+_INSTANCE = PublicFileServer()
+atexit.register(_INSTANCE.shutdown)
+
+
+def publish(local_path):
+ return _INSTANCE.publish(local_path)
+
+
+def ensure_started():
+ return _INSTANCE.ensure_started()
+
+
+if __name__ == "__main__":
+ # 自测:发布一个临时文件并打印 URL
+ import sys
+ test_file = sys.argv[1] if len(sys.argv) > 1 else __file__
+ print("publishing:", test_file)
+ print("URL:", publish(test_file))
+ print("Ctrl-C to stop");
+ try:
+ while True:
+ time.sleep(5)
+ except KeyboardInterrupt:
+ _INSTANCE.shutdown()
diff --git a/frontends/qqapp.py b/frontends/qqapp.py
index 2ad088f8..f5a03eb0 100644
--- a/frontends/qqapp.py
+++ b/frontends/qqapp.py
@@ -1,10 +1,14 @@
-import asyncio, os, sys, threading, time
+import asyncio, os, re, sys, threading, time, uuid
from collections import deque
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
+_TEMP_DIR = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'temp')
+_INBOX_DIR = os.path.join(_TEMP_DIR, 'qq_inbox') # 入站附件独立存放,便于清理
+_INBOX_TTL = 86400 # 入站附件保留秒数,超过自动清理
from agentmain import GeneraticAgent
-from chatapp_common import AgentChatMixin, ensure_single_instance, public_access, redirect_log, require_runtime, split_text
+from chatapp_common import AgentChatMixin, ensure_single_instance, public_access, redirect_log, require_runtime, split_text, extract_files, strip_files, clean_reply
from llmcore import mykeys
+import puburl
try:
import botpy
@@ -18,6 +22,9 @@
APP_SECRET = str(mykeys.get("qq_app_secret", "") or "").strip()
ALLOWED = {str(x).strip() for x in mykeys.get("qq_allowed_users", []) if str(x).strip()}
PROCESSED_IDS, USER_TASKS = deque(maxlen=1000), {}
+# 缓冲区:用户发来的附件先暂存,等用户发文字指令再合并触发模型
+# {chat_id: [(kind_label, 'temp/qq_inbox/xxx'), ...]}
+PENDING = {}
SEQ_LOCK, MSG_SEQ = threading.Lock(), 1
@@ -28,6 +35,127 @@ def _next_msg_seq():
return MSG_SEQ
+# QQ 出站富媒体类型:1=图片 2=视频 3=语音 4=文件(按后缀判定)
+_EXT_FILE_TYPE = {
+ ".jpg": 1, ".jpeg": 1, ".png": 1, ".gif": 1, ".bmp": 1, ".webp": 1,
+ ".mp4": 2, ".mov": 2, ".avi": 2, ".mkv": 2,
+ ".silk": 3, ".amr": 3, ".mp3": 3, ".wav": 3, ".m4a": 3,
+}
+
+
+def _qq_file_type(path):
+ return _EXT_FILE_TYPE.get(os.path.splitext(path)[1].lower(), 4)
+
+
+# QQ 附件 content_type: 1=图片 2=视频 3=语音 4=文件;不同消息类型字段可能不全,按后缀/url 兜底
+def _guess_ext(att, kind):
+ fn = getattr(att, "filename", "") or ""
+ ext = os.path.splitext(fn)[1]
+ if ext:
+ return ext
+ url = (getattr(att, "url", "") or "").split("?")[0]
+ ext = os.path.splitext(url)[1]
+ if ext:
+ return ext
+ ct = getattr(att, "content_type", None)
+ name = str(ct).lower() if ct is not None else ""
+ for key, e in (("image", ".jpg"), ("voice", ".silk"), ("audio", ".silk"), ("video", ".mp4")):
+ if key in name:
+ return e
+ return {1: ".jpg", 2: ".mp4", 3: ".silk", 4: ".dat"}.get(ct, ".dat")
+
+
+def _kind_label(att):
+ ct = getattr(att, "content_type", None)
+ name = str(ct).lower() if ct is not None else ""
+ if ct == 1 or "image" in name:
+ return "图片"
+ if ct == 2 or "video" in name:
+ return "视频"
+ if ct == 3 or "voice" in name or "audio" in name:
+ return "语音"
+ return "文件"
+
+
+def _is_inbound(path):
+ """判断路径是否落在入站目录 temp/qq_inbox/ 内(兼容相对/绝对路径)。"""
+ try:
+ ap = os.path.abspath(path)
+ base = os.path.abspath(_INBOX_DIR)
+ return os.path.commonpath([ap, base]) == base
+ except (ValueError, OSError):
+ return False
+
+
+def _cleanup_inbox():
+ """清理 temp/qq_inbox/ 下超过 TTL 的旧附件。"""
+ now = time.time()
+ try:
+ for name in os.listdir(_INBOX_DIR):
+ p = os.path.join(_INBOX_DIR, name)
+ try:
+ if now - os.path.getmtime(p) > _INBOX_TTL:
+ os.remove(p)
+ except OSError:
+ pass
+ except FileNotFoundError:
+ pass
+
+
+async def _download_attachments(data):
+ """下载消息附件到 temp/qq_inbox/,返回 [(kind_label, 'temp/qq_inbox/xxx'), ...]。失败的跳过。"""
+ atts = getattr(data, "attachments", None) or []
+ if not atts:
+ return []
+ import aiohttp
+ os.makedirs(_INBOX_DIR, exist_ok=True)
+ _cleanup_inbox()
+ saved = []
+ async with aiohttp.ClientSession() as sess:
+ for att in atts:
+ url = getattr(att, "url", "") or ""
+ if not url:
+ continue
+ if url.startswith("//"):
+ url = "https:" + url
+ elif url.startswith("http://"):
+ url = "https://" + url[len("http://"):]
+ kind = _kind_label(att)
+ ext = _guess_ext(att, kind)
+ fname = f"qq_{uuid.uuid4().hex[:12]}{ext}"
+ fpath = os.path.join(_INBOX_DIR, fname)
+ try:
+ async with sess.get(url, timeout=aiohttp.ClientTimeout(total=60)) as resp:
+ resp.raise_for_status()
+ body = await resp.read()
+ with open(fpath, "wb") as f:
+ f.write(body)
+ saved.append((kind, f"temp/qq_inbox/{fname}"))
+ print(f"[QQ] downloaded {kind} -> temp/qq_inbox/{fname} ({len(body)} bytes)")
+ except Exception as e:
+ print(f"[QQ] download failed {url}: {e}")
+ return saved
+
+
+def _build_prompt(text, attachments):
+ """把文本与附件路径合并成给 agent 的 prompt。"""
+ if not attachments:
+ return text
+ lines = []
+ for kind, path in attachments:
+ if kind == "语音":
+ lines.append(f"[TIPS] 收到{kind}文件 {path}(QQ 语音通常为 silk 编码,需先转码再处理)")
+ elif kind == "图片":
+ lines.append(f"[TIPS] 收到{kind} {path}(可用 vision 查看)")
+ else:
+ lines.append(f"[TIPS] 收到{kind} {path}")
+ head = "\n".join(lines)
+ if text:
+ return f"{head}\n{text}"
+ return f"{head}\n请查看后等待下一步指令。"
+
+
+
def _build_intents():
try:
return botpy.Intents(public_messages=True, direct_message=True)
@@ -64,6 +192,7 @@ async def on_direct_message_create(self, message):
class QQApp(AgentChatMixin):
label, source, split_limit = "QQ", "qq", 1500
+ stream_turns = True # 逐 turn 实时心跳:每个 turn 跑完即推送该 turn 日志
def __init__(self):
super().__init__(agent, USER_TASKS)
@@ -81,6 +210,58 @@ async def send_text(self, chat_id, content, *, msg_id=None, is_group=False):
except Exception:
await api(**{key: chat_id, "msg_type": 0, "content": part, "msg_id": msg_id, "msg_seq": seq})
+ async def send_done(self, chat_id, raw_text, *, msg_id=None, is_group=False):
+ # 先发清理后的文本(去掉 [FILE:] 标记),再把文件作为富媒体发出
+ files = [p for p in extract_files(raw_text)
+ if os.path.exists(p) and not _is_inbound(p)]
+ body = strip_files(clean_reply(raw_text))
+ if body and body != "...":
+ await self.send_text(chat_id, body, msg_id=msg_id, is_group=is_group)
+ for path in files:
+ await self._send_file(chat_id, path, msg_id=msg_id, is_group=is_group)
+
+ async def send_done_files(self, chat_id, raw_text, *, msg_id=None, is_group=False):
+ # 流式逐 turn 模式收尾:turn 日志已实时推送完毕,这里只补发生成的文件,
+ # 不再重复汇总全部 turn 文本。
+ files = [p for p in extract_files(raw_text)
+ if os.path.exists(p) and not _is_inbound(p)]
+ for path in files:
+ await self._send_file(chat_id, path, msg_id=msg_id, is_group=is_group)
+
+ async def _send_file(self, chat_id, path, *, msg_id=None, is_group=False):
+ """QQ 富媒体出站:本地文件 -> 公网URL -> 腾讯反向拉取。
+
+ 腾讯反向拉取对文件体积有上限(实测 1MB 文档可发,2.9MB pdf / 11.5MB docx
+ 会被拒,报 "download file error")。原生富媒体发送失败时,降级为把公网下载
+ 链接作为文本发出——该链接经 cloudflared 隧道直连本地文件服务,不受腾讯体积
+ 限制,用户可在 TTL(默认 1 小时)内手动下载。"""
+ name = os.path.basename(path)
+ file_type = _qq_file_type(path)
+ size = os.path.getsize(path) if os.path.exists(path) else 0
+ url = None
+ try:
+ url = await asyncio.to_thread(puburl.publish, path)
+ if not url:
+ raise RuntimeError("无法生成公网URL(隧道未就绪)")
+ upload = self.client.api.post_group_file if is_group else self.client.api.post_c2c_file
+ send = self.client.api.post_group_message if is_group else self.client.api.post_c2c_message
+ key = "group_openid" if is_group else "openid"
+ media = await upload(**{key: chat_id, "file_type": file_type, "url": url})
+ await send(**{key: chat_id, "msg_type": 7, "media": media,
+ "msg_id": msg_id, "msg_seq": _next_msg_seq()})
+ print(f"[QQ] send_file ok ({name}, type={file_type}, {size}B) via {url}")
+ except Exception as e:
+ print(f"[QQ] send_file failed ({name}, {size}B): {e}")
+ if url:
+ # 降级:原生富媒体被腾讯拒收(多见于大文件),改发公网下载链接
+ mb = size / 1024 / 1024
+ tip = (f"📎 文件「{name}」({mb:.1f}MB)超出 QQ 直传体积限制,"
+ f"已转为下载链接(1 小时内有效):\n{url}")
+ await self.send_text(chat_id, tip, msg_id=msg_id, is_group=is_group)
+ print(f"[QQ] send_file degraded to link ({name}) via {url}")
+ else:
+ await self.send_text(chat_id, f"⚠️ 文件「{name}」发送失败:{e}", msg_id=msg_id, is_group=is_group)
+
async def on_message(self, data, is_group=False):
try:
msg_id = getattr(data, "id", None)
@@ -88,7 +269,8 @@ async def on_message(self, data, is_group=False):
return
PROCESSED_IDS.append(msg_id)
content = (getattr(data, "content", "") or "").strip()
- if not content:
+ attachments = await _download_attachments(data)
+ if not content and not attachments:
return
author = getattr(data, "author", None)
user_id = str(getattr(author, "member_openid" if is_group else "user_openid", "") or getattr(author, "id", "") or "unknown")
@@ -96,10 +278,46 @@ async def on_message(self, data, is_group=False):
if not public_access(ALLOWED) and user_id not in ALLOWED:
print(f"[QQ] unauthorized user: {user_id}")
return
- print(f"[QQ] message from {user_id} ({'group' if is_group else 'c2c'}): {content}")
+ print(f"[QQ] message from {user_id} ({'group' if is_group else 'c2c'}): {content!r} +{len(attachments)} attach")
if content.startswith("/"):
+ if content.strip().lower() == "/clearfiles":
+ pend = PENDING.pop(chat_id, [])
+ if pend:
+ kinds = "、".join(sorted({k for k, _ in pend}))
+ tip = f"🗑️ 已撤销缓存的 {len(pend)} 个附件({kinds})。"
+ else:
+ tip = "📭 当前没有缓存的附件。"
+ return await self.send_text(chat_id, tip, msg_id=msg_id, is_group=is_group)
return await self.handle_command(chat_id, content, msg_id=msg_id, is_group=is_group)
- asyncio.create_task(self.run_agent(chat_id, content, msg_id=msg_id, is_group=is_group))
+ # 1) 先把本条消息的附件存入缓冲(不立即触发模型)
+ if attachments:
+ PENDING.setdefault(chat_id, []).extend(attachments)
+ # 2) 并发保护:该会话已有任务在跑,拒绝新指令,提示可 /stop 中断
+ if chat_id in USER_TASKS:
+ if attachments:
+ pend = PENDING.get(chat_id, [])
+ kinds = "、".join(sorted({k for k, _ in pend}))
+ tip = (f"📥 已收到本条 {len(attachments)} 个附件并缓存,当前共缓存 {len(pend)} 个({kinds}),无需重发。"
+ f"\n⏳ 但当前正在处理上一条指令,附件会在你下达新指令时一并带上。"
+ f"发送 /stop 可中断当前任务后立即下达,发送 /clearfiles 可撤销已缓存的附件。")
+ else:
+ tip = "⏳ 正在处理上一条指令。发送 /stop 可中断后再下达新指令。"
+ return await self.send_text(chat_id, tip, msg_id=msg_id, is_group=is_group)
+ # 3) 只有附件、没有文字指令 → 回执并等待文字
+ if not content:
+ pend = PENDING.get(chat_id, [])
+ if pend:
+ kinds = "、".join(sorted({k for k, _ in pend}))
+ return await self.send_text(
+ chat_id,
+ f"📥 已收到 {len(pend)} 个附件({kinds}),已缓存。请发送文字说明要做什么,我再开始处理。"
+ f"\n发送 /clearfiles 可撤销已缓存的附件。",
+ msg_id=msg_id, is_group=is_group)
+ return
+ # 4) 有文字指令 → 合并缓冲的附件一起触发,清空缓冲
+ buffered = PENDING.pop(chat_id, [])
+ prompt = _build_prompt(content, buffered)
+ asyncio.create_task(self.run_agent(chat_id, prompt, msg_id=msg_id, is_group=is_group))
except Exception:
import traceback
print("[QQ] handle_message error")