Files
figmar 81115dc23d 初始提交:识流 AI 助手项目
微信自动回复机器人,基于截图+OCR识别消息,支持关键词规则和 AI(OpenAI/DeepSeek/Dify)自动回复。
技术栈:PySide6 + Flask + Vue3 + RapidOCR + SQLite

注:OCR大模型文件(.onnx / .pdiparams)不纳入版本控制,需单独下载。

🤖 Generated with [Qoder][https://qoder.com]
2026-05-30 15:09:40 +08:00

138 lines
5.8 KiB
Python

import json
import queue
from flask import Response, jsonify, request, stream_with_context
from app.application.bot_controller import bot_controller
from app.infrastructure.service.backend.db import set_setting
from app.infrastructure.service.logging.log_query_service import clear_logs, query_event_json, query_events, query_summary, query_trace
from app.infrastructure.service.logging.log_service import log_event, new_trace_id
def _sync_runtime_settings(status):
current = (status.get("status") or "stopped").strip().lower()
running = current == "running"
transitional = current in {"starting", "stopping"}
errored = current == "error"
if running:
set_setting("listener_enabled", "1")
set_setting("listener_runtime_status", "running")
elif transitional:
set_setting("listener_runtime_status", current)
elif errored:
set_setting("listener_enabled", "0")
set_setting("listener_runtime_status", "error")
else:
set_setting("listener_enabled", "0")
set_setting("listener_runtime_status", "stopped")
def register_bot_routes(app):
def _stream_payload(status):
return f"event: status\ndata: {json.dumps({'success': True, **status}, ensure_ascii=False)}\n\n"
@app.route("/api/bot/status", methods=["GET"])
def api_bot_status():
trace_id = new_trace_id("api")
status = bot_controller.status()
_sync_runtime_settings(status)
log_event("INFO", "api", "api.bot.status", trace_id, "status", "ok", "查询监听状态成功", extra={"status": status.get("status")})
return jsonify({"success": True, **status})
@app.route("/api/bot/status/stream", methods=["GET"])
def api_bot_status_stream():
status_queue = queue.Queue(maxsize=8)
def on_status(status):
_sync_runtime_settings(status)
try:
status_queue.put_nowait(status)
except queue.Full:
try:
status_queue.get_nowait()
except queue.Empty:
pass
try:
status_queue.put_nowait(status)
except queue.Full:
pass
listener_id = bot_controller.add_status_listener(on_status, emit_initial=True)
@stream_with_context
def generate():
try:
yield "retry: 2000\n\n"
while True:
try:
status = status_queue.get(timeout=15)
yield _stream_payload(status)
except queue.Empty:
yield "event: ping\ndata: {}\n\n"
finally:
bot_controller.remove_status_listener(listener_id)
return Response(generate(), mimetype="text/event-stream", headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
})
@app.route("/api/logs/v2/events", methods=["GET"])
def api_logs_v2_events():
trace_id = request.args.get("trace_id") or ""
module = request.args.get("module") or ""
level = request.args.get("level") or ""
event = request.args.get("event") or ""
start_ts = request.args.get("start_ts") or ""
end_ts = request.args.get("end_ts") or ""
keyword = request.args.get("keyword") or ""
page = request.args.get("page", 1)
size = request.args.get("size", 50)
payload = query_events(module=module or None, level=level or None, event=event or None, trace_id=trace_id or None, start_ts=start_ts or None, end_ts=end_ts or None, keyword=keyword or None, page=page, size=size)
return jsonify({"success": True, **payload})
@app.route("/api/logs/v2/trace/<trace_id>", methods=["GET"])
def api_logs_v2_trace(trace_id):
items = query_trace(trace_id)
return jsonify({"success": True, "trace_id": trace_id, "items": items})
@app.route("/api/logs/v2/summary", methods=["GET"])
def api_logs_v2_summary():
limit = request.args.get("limit", 300)
payload = query_summary(limit=limit)
return jsonify({"success": True, **payload})
@app.route("/api/logs/v2/event/<event_id>", methods=["GET"])
def api_logs_v2_event(event_id):
item = query_event_json(event_id)
if not item:
return jsonify({"success": False, "error": "event_not_found"}), 404
return jsonify({"success": True, "item": item})
@app.route("/api/logs/v2/clear", methods=["POST"])
def api_logs_v2_clear():
body = request.get_json(silent=True) or {}
module = (body.get("module") or request.values.get("module") or request.args.get("module") or "").strip()
payload = clear_logs(module=module or None)
return jsonify({"success": True, **payload})
@app.route("/api/bot/start", methods=["POST"])
def api_bot_start():
trace_id = new_trace_id("api")
backend_url = (request.values.get("backend_url") or "").strip()
if not backend_url:
backend_url = request.host_url.rstrip("/") + "/api/messages/receive"
status = bot_controller.start(backend_url)
_sync_runtime_settings(status)
log_event("INFO", "api", "api.bot.start", trace_id, "start", "ok", "启动监听请求已处理", extra={"status": status.get("status"), "backend_url": backend_url})
return jsonify({"success": True, **status})
@app.route("/api/bot/stop", methods=["POST"])
def api_bot_stop():
trace_id = new_trace_id("api")
status = bot_controller.stop()
_sync_runtime_settings(status)
log_event("INFO", "api", "api.bot.stop", trace_id, "stop", "ok", "停止监听请求已处理", extra={"status": status.get("status")})
return jsonify({"success": True, **status})