#!/usr/bin/env python # -*- coding: utf-8 -*- import ctypes # Windows窗口句柄相关API import hashlib # 文本去重哈希 import logging # 日志模块 import os # 文件目录操作 import re # 正则过滤文本 import time # 休眠/节流 from datetime import datetime # 时间戳命名与日志辅助 from io import BytesIO # 图片转字节流给OCR import cv2 # 图像处理 import numpy as np # 数组/图像矩阵 import pyautogui # 鼠标键盘自动化 import pyperclip # 剪贴板粘贴发送文本 import requests # 调用后端接口 import uiautomation as auto # Windows UI 自动化 from PIL import ImageGrab, Image # 截图与图像对象 from app.infrastructure.service.wechat.config import BAIDU_API_KEY, BAIDU_SECRET_KEY, BACKEND_URL, LOOP_INTERVAL, NO_REPLY_KEYWORDS, BLOCKED_SESSION_KEYWORDS, UI_NOISE_KEYWORDS, BOT_DEBUG_LOG, WECHAT_WINDOW_TARGET_WIDTH, WECHAT_WINDOW_TARGET_HEIGHT, WECHAT_WINDOW_TARGET_LEFT, WECHAT_WINDOW_TARGET_TOP, OCR_SAVE_IMAGES, OCR_SAVE_DIR # 机器人配置 from app.infrastructure.service.wechat.ocr import BaiduOCR # 百度OCR封装 logging.basicConfig( # 配置全局日志 level=logging.INFO, # 默认输出INFO及以上 format="%(asctime)s - %(levelname)s - %(message)s", # 日志格式 handlers=[ logging.FileHandler("wechat_multi_chat_bot.log", encoding="utf-8"), # 写文件 logging.StreamHandler() # 打终端 ], ) logger = logging.getLogger(__name__) # 当前模块日志器 def debug_log(message): if BOT_DEBUG_LOG: # 仅在调试开关开启时输出 logger.info(f"[DEBUG] {message}") class WechatMultiChatBot: """微信自动回复机器人(UI自动化 + OCR + 规则/AI回复)。""" def __init__(self): """初始化OCR、微信窗口句柄和运行期状态。""" logger.info("[BOT] 初始化开始") # 生命周期日志 self.ocr = BaiduOCR(BAIDU_API_KEY, BAIDU_SECRET_KEY) # OCR客户端 self.wechat_window = None # 微信窗口对象 self.find_wechat_window() # 启动即查找窗口 self.seen_messages = {} # 去重缓存:contact_key -> hash set self.processed_contacts = set() # 预留:已处理联系人集合 self.running = False # 主循环运行状态 logger.info("[BOT] 初始化完成") def find_wechat_window(self): """查找微信主窗口;成功后执行窗口标准化。""" logger.info("[WIN] 正在查找微信窗口") self.wechat_window = auto.WindowControl(searchDepth=1, Name='微信') # 按窗口名查找 if self.wechat_window.Exists(0, 0): # 立即探测是否存在 logger.info("[WIN] 找到微信窗口") self.normalize_wechat_window() # 找到后标准化尺寸 return logger.error("[WIN] 未找到微信窗口") raise Exception("未找到微信窗口") def normalize_wechat_window(self): """将微信窗口调整到预设位置和尺寸,保证识别区域稳定。""" logger.info("[WIN] 开始标准化窗口大小和位置") try: self.wechat_window.SetActive() # 激活窗口 time.sleep(0.25) # 等待激活稳定 hwnd = getattr(self.wechat_window, "NativeWindowHandle", 0) # 尝试取窗口句柄 if hwnd: user32 = ctypes.windll.user32 # Win32接口 SWP_NOZORDER = 0x0004 # 不改变Z序 SWP_NOACTIVATE = 0x0010 # 不抢前台焦点 user32.SetWindowPos( int(hwnd), # 目标窗口句柄 0, # hWndInsertAfter int(WECHAT_WINDOW_TARGET_LEFT), # 目标左上x int(WECHAT_WINDOW_TARGET_TOP), # 目标左上y int(WECHAT_WINDOW_TARGET_WIDTH), # 目标宽度 int(WECHAT_WINDOW_TARGET_HEIGHT), # 目标高度 SWP_NOZORDER | SWP_NOACTIVATE, # 标志位 ) logger.info(f"[WIN] 句柄模式窗口标准化成功 hwnd={hwnd}") else: if hasattr(self.wechat_window, "MoveTo") and hasattr(self.wechat_window, "Resize"): # 兼容旧接口 self.wechat_window.MoveTo(WECHAT_WINDOW_TARGET_LEFT, WECHAT_WINDOW_TARGET_TOP) # 移动窗口 self.wechat_window.Resize(WECHAT_WINDOW_TARGET_WIDTH, WECHAT_WINDOW_TARGET_HEIGHT) # 调整尺寸 logger.info("[WIN] MoveTo/Resize 模式窗口标准化成功") else: raise Exception("未获取到窗口句柄且不支持 MoveTo/Resize") time.sleep(0.35) # 等待窗口重排完成 logger.info(f"[WIN] 标准化参数 left={WECHAT_WINDOW_TARGET_LEFT} top={WECHAT_WINDOW_TARGET_TOP} width={WECHAT_WINDOW_TARGET_WIDTH} height={WECHAT_WINDOW_TARGET_HEIGHT}") debug_log(f"window_normalized=({WECHAT_WINDOW_TARGET_LEFT},{WECHAT_WINDOW_TARGET_TOP},{WECHAT_WINDOW_TARGET_WIDTH},{WECHAT_WINDOW_TARGET_HEIGHT})") except Exception as e: logger.warning(f"[WIN] 窗口标准化失败,继续按当前窗口运行: {e}") def get_window_rect(self): try: rect = self.wechat_window.BoundingRectangle # UIA矩形 result = {'left': rect.left, 'top': rect.top, 'right': rect.right, 'bottom': rect.bottom, 'width': rect.right - rect.left, 'height': rect.bottom - rect.top} # 转字典 debug_log(f"window_rect={result}") return result except Exception as e: logger.warning(f"[WIN] 获取窗口位置失败: {e}") return None def get_contact_list_rect(self, window_rect): rect = {'left': window_rect['left'] + 10, 'top': window_rect['top'] + 50, 'right': window_rect['left'] + int(window_rect['width'] * 0.25) - 10, 'bottom': window_rect['bottom'] - 50} # 左侧会话列表区域 debug_log(f"contact_rect={rect}") return rect def detect_red_dots(self, window_rect): logger.info("[DETECT] 开始检测红点") contact_rect = self.get_contact_list_rect(window_rect) # 获取列表区域 try: screenshot = ImageGrab.grab(bbox=(contact_rect['left'], contact_rect['top'], contact_rect['right'], contact_rect['bottom'])) # 列表截图 img_np = np.array(screenshot) # PIL -> numpy img_bgr = cv2.cvtColor(img_np, cv2.COLOR_RGB2BGR) # RGB -> BGR hsv = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2HSV) # BGR -> HSV mask = cv2.inRange(hsv, np.array([0, 80, 80]), np.array([12, 255, 255])) + cv2.inRange(hsv, np.array([168, 80, 80]), np.array([180, 255, 255])) # 红色双区间 contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) # 查找连通域 contact_width = contact_rect['right'] - contact_rect['left'] # 列表宽度 red_dots_raw = [] # 原始红点 for contour in contours: area = cv2.contourArea(contour) # 面积过滤 if 12 < area < 220: perimeter = cv2.arcLength(contour, True) # 周长 if perimeter > 0: circularity = 4 * np.pi * area / (perimeter * perimeter) # 圆度 if circularity > 0.45: M = cv2.moments(contour) # 矩中心 if M["m00"] != 0: cx = int(M["m10"] / M["m00"]) # 中心x cy = int(M["m01"] / M["m00"]) # 中心y if cx > contact_width * 0.1: # 过滤左边缘噪声 red_dots_raw.append({'x': contact_rect['left'] + cx, 'y': contact_rect['top'] + cy, 'rel_y': cy}) # 记录绝对坐标 red_dots_grouped = [] # 按行合并后的红点 debug_log(f"red_dots_raw={len(red_dots_raw)}") used = set() # 已分组索引 for i, dot in enumerate(red_dots_raw): if i in used: continue group = [dot] # 当前分组 for j, other in enumerate(red_dots_raw): if j != i and j not in used and abs(dot['rel_y'] - other['rel_y']) < 50: # 同一行 group.append(other) used.add(j) red_dots_grouped.append({'x': sum(d['x'] for d in group) // len(group), 'y': sum(d['y'] for d in group) // len(group)}) # 行中心 used.add(i) logger.info(f"[DETECT] 红点检测完成 grouped={len(red_dots_grouped)}") debug_log(f"red_dots_grouped={len(red_dots_grouped)}") return red_dots_grouped except Exception as e: logger.error(f"[DETECT] 检测红点失败: {e}") return [] def click_contact_by_red_dot(self, red_dot, window_rect): contact_rect = self.get_contact_list_rect(window_rect) # 计算左侧联系人列表区域 click_x = (contact_rect['left'] + contact_rect['right']) // 2 # 点击联系人列表中线,避免点到红点本身 click_y = red_dot['y'] # 使用红点纵坐标对应会话行 logger.info(f"[ACTION] 点击联系人 x={click_x} y={click_y}") # 记录点击行为 pyautogui.click(click_x, click_y) # 执行点击 time.sleep(1.2) # 等待会话内容加载 session_title = self.get_current_session_title() # 读取当前会话标题 logger.info(f"[ACTION] 当前会话标题 title={session_title or '未知'}") # 输出标题用于排查 time.sleep(1.0) # 再等待短时间,保证后续OCR稳定 def get_session_title_by_ocr(self): try: window_rect = self.get_window_rect() # 获取微信窗口坐标 if not window_rect: return "" # 窗口坐标不可用 title_areas = [ # 多个标题区域兜底,适配不同微信版本/缩放 ( window_rect['left'] + int(window_rect['width'] * 0.32), window_rect['top'] + 6, window_rect['right'] - int(window_rect['width'] * 0.30), window_rect['top'] + max(40, int(window_rect['height'] * 0.085)), ), ( window_rect['left'] + int(window_rect['width'] * 0.28), window_rect['top'] + 4, window_rect['right'] - int(window_rect['width'] * 0.24), window_rect['top'] + max(46, int(window_rect['height'] * 0.095)), ), ] for idx, (left, top, right, bottom) in enumerate(title_areas, 1): # 逐区域尝试 screenshot = ImageGrab.grab(bbox=(left, top, right, bottom)) # 截标题图 self.save_ocr_debug_image(screenshot, f"title_ocr_{idx}_{datetime.now().strftime('%Y%m%d_%H%M%S_%f')}.png") # 落盘调试 img_bytes = BytesIO() # 内存字节流 screenshot.save(img_bytes, format='PNG') # 转png字节 lines = self.ocr.recognize(img_bytes.getvalue()) # OCR识别 valid = [x.strip() for x in lines if x and x.strip() and len(x.strip()) >= 2] # 清洗文本 valid = [x for x in valid if x not in UI_NOISE_KEYWORDS] # 去噪 if valid: title = valid[0] # 取首个有效标题 debug_log(f"session_title_ocr={title} area={idx} lines={valid[:5]}") return title return "" # 全部区域失败 except Exception as e: debug_log(f"session_title_ocr_error={e}") return "" # OCR异常返回空 def get_current_session_title(self): try: title_ctrl = self.wechat_window.TextControl(foundIndex=1) # 优先UIA读标题 if hasattr(title_ctrl, "Exists") and not title_ctrl.Exists(0.3, 0.05): # 快速存在性检查 debug_log("session_title_not_found_fast") title = self.get_session_title_by_ocr() # UIA失败转OCR return title title = (getattr(title_ctrl, "Name", "") or "").strip() # 读取控件Name if not title or title in UI_NOISE_KEYWORDS: # 噪声/空值 title = self.get_session_title_by_ocr() # OCR兜底 debug_log(f"session_title={title}") return title except Exception as e: debug_log(f"session_title_error={e}") return self.get_session_title_by_ocr() # 异常也兜底 def is_blocked_session(self): title = self.get_current_session_title() # 取会话标题 if not title: return False # 标题未知默认不拦截 for keyword in BLOCKED_SESSION_KEYWORDS: # 命中过滤词则跳过 if keyword in title: logger.info(f"[SESSION] 命中过滤会话 title={title} keyword={keyword}") return True return False def get_latest_message_areas(self, window_rect): """计算多段消息识别区域(靠近底部,适配不同窗口尺寸)。""" chat_left = window_rect['left'] + int(window_rect['width'] * 0.30) # 聊天区左边界 chat_right = window_rect['right'] - 20 # 聊天区右边界 chat_top = window_rect['top'] + int(window_rect['height'] * 0.16) # 聊天区上边界 chat_bottom = window_rect['bottom'] - int(window_rect['height'] * 0.20) # 聊天区下边界(排除输入区) chat_height = max(420, int((chat_bottom - chat_top) * 0.90)) # 有效聊天高度 area1_h = max(220, int(chat_height * 0.46)) # 近底部区域 area2_h = max(320, int(chat_height * 0.72)) # 中大区域 area3_h = max(420, int(chat_height * 0.98)) # 大区域兜底 areas = [ {'left': chat_left, 'top': max(chat_top, chat_bottom - area1_h), 'right': chat_right, 'bottom': chat_bottom}, # 区域1 {'left': chat_left, 'top': max(chat_top, chat_bottom - area2_h), 'right': chat_right, 'bottom': chat_bottom - max(35, int(area1_h * 0.12))}, # 区域2 {'left': chat_left, 'top': max(chat_top, chat_bottom - area3_h), 'right': chat_right, 'bottom': chat_bottom - max(80, int(area2_h * 0.20))}, # 区域3 ] debug_log(f"message_areas={areas}") return areas def should_reply(self, text): """基础过滤:过短、无意义、无需回复文本直接跳过。""" if not text or len(text) < 2: # 文本太短不回复 return False text_lower = text.lower() # 统一小写比较 for keyword in NO_REPLY_KEYWORDS: # 命中免回复词 if keyword.lower() in text_lower: return False return not text.isdigit() # 纯数字通常无语义 def is_new_message(self, text, contact_key): """消息去重:同一会话内已处理过的文本不重复回复。""" if contact_key not in self.seen_messages: self.seen_messages[contact_key] = set() # 首次初始化会话集合 h = hashlib.md5(text.encode("utf-8")).hexdigest() # 文本哈希 if h in self.seen_messages[contact_key]: logger.info(f"[FILTER] 重复消息 contact_key={contact_key}") return False self.seen_messages[contact_key].add(h) # 记录已处理 if len(self.seen_messages[contact_key]) > 50: # 限制缓存规模 self.seen_messages[contact_key] = set(list(self.seen_messages[contact_key])[-50:]) return True def extract_text_by_uia(self): logger.info("[UIA] 尝试控件树文本提取") lines = [] # 候选文本 try: all_text_controls = self.wechat_window.GetChildren() # 取一级子控件 for ctrl in all_text_controls: name = (getattr(ctrl, 'Name', '') or '').strip() # 控件文本 if not name: continue if len(name) < 2: continue if re.fullmatch(r"[0-9:\-\s]+", name): # 过滤时间/数字 continue if name in UI_NOISE_KEYWORDS: # 过滤UI噪声 continue lines.append(name) # 保留候选 except Exception as e: logger.warning(f"[UIA] 控件树提取失败: {e}") dedup = [] # 去重结果 seen = set() # 去重集合 for x in lines: if x not in seen: dedup.append(x) seen.add(x) logger.info(f"[UIA] 控件树提取文本数={len(dedup)}") debug_log(f"uia_lines={dedup[-8:] if dedup else []}") return dedup def save_ocr_debug_image(self, image_obj, filename): """保存OCR调试截图,便于核对截取区域和图像质量。""" if not OCR_SAVE_IMAGES: # 开关关闭则不落盘 return try: os.makedirs(OCR_SAVE_DIR, exist_ok=True) # 创建目录 file_path = os.path.join(OCR_SAVE_DIR, filename) # 文件路径 image_obj.save(file_path) # 保存图片 debug_log(f"saved_ocr_image={file_path}") except Exception as e: logger.warning(f"[OCR] 保存调试截图失败: {e}") def get_ai_reply(self, message): """调用后端接口获取最终回复(规则优先,AI兜底)。""" logger.info(f"[API] 请求后端生成回复 content={message}") try: resp = requests.post(BACKEND_URL, json={"content": message, "wx_user_id": "multi_chat_bot", "wx_nickname": "用户"}, timeout=10) # 请求后端 logger.info(f"[API] 后端响应 status={resp.status_code}") if resp.status_code == 200: data = resp.json() # 解析json if data.get("success") and data.get("should_reply"): reply = (data.get("reply_text") or "").strip() # 取回复文本 logger.info(f"[API] 生成回复成功 reply={reply}") return reply logger.info("[API] 后端未返回可发送回复") except Exception as e: logger.error(f"[API] 调用接口失败: {e}") return None def send_message(self, text, window_rect): logger.info(f"[SEND] 开始发送 reply={text}") try: self.wechat_window.SetActive() # 激活微信窗口 time.sleep(0.3) pyautogui.click(window_rect['left'] + window_rect['width'] // 2, window_rect['bottom'] - 100) # 点击输入框 time.sleep(0.3) pyautogui.hotkey('ctrl', 'a') # 全选旧文本 pyautogui.press('delete') # 清空输入框 pyperclip.copy(text) # 文本放入剪贴板 pyautogui.hotkey('ctrl', 'v') # 粘贴内容 pyautogui.press('enter') # 回车发送 logger.info("[SEND] 发送成功") except Exception as e: logger.error(f"[SEND] 发送失败: {e}") def process_current_chat(self, window_rect, contact_key): logger.info(f"[CHAT] 开始处理会话 contact_key={contact_key}") if self.is_blocked_session(): # 系统会话直接跳过 logger.info("[CHAT] 当前会话在过滤名单,跳过") return False try: all_lines = [] # OCR/UIA汇总文本 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f") # 调试图时间戳 for idx, msg_rect in enumerate(self.get_latest_message_areas(window_rect), 1): # 分区识别 screenshot = ImageGrab.grab(bbox=(msg_rect['left'], msg_rect['top'], msg_rect['right'], msg_rect['bottom'])) # 区域截图 img_np = np.array(screenshot) # 转numpy raw_pil = Image.fromarray(img_np) # 原图 self.save_ocr_debug_image(raw_pil, f"{timestamp}_{contact_key}_area{idx}_raw.png") # 保存原图 raw_bytes = BytesIO() # 原图字节流 raw_pil.save(raw_bytes, format='PNG') lines_raw = self.ocr.recognize(raw_bytes.getvalue()) # 原图OCR gray = cv2.cvtColor(img_np, cv2.COLOR_RGB2GRAY) # 灰度化 enhanced = cv2.convertScaleAbs(gray, alpha=1.35, beta=8) # 提升对比度 _, binary = cv2.threshold(enhanced, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU) # 二值化 pil_bin = Image.fromarray(binary) # 二值图 self.save_ocr_debug_image(pil_bin, f"{timestamp}_{contact_key}_area{idx}_bin.png") # 保存二值图 bin_bytes = BytesIO() # 二值图字节流 pil_bin.save(bin_bytes, format='PNG') lines_bin = self.ocr.recognize(bin_bytes.getvalue()) # 二值图OCR lines = lines_raw + lines_bin # 合并两路识别 logger.info(f"[OCR] area={idx} raw={len(lines_raw)} bin={len(lines_bin)} total={len(lines)}") debug_log(f"ocr_lines_count={len(lines)} contact_key={contact_key} area={idx}") all_lines.extend(lines) # 汇总 if not all_lines: # 分区都失败时走整块兜底 logger.info("[CHAT] 分区OCR无结果,尝试整块聊天区兜底") chat_left = window_rect['left'] + int(window_rect['width'] * 0.30) chat_right = window_rect['right'] - 20 chat_top = window_rect['top'] + int(window_rect['height'] * 0.16) chat_bottom = window_rect['bottom'] - int(window_rect['height'] * 0.20) full_shot = ImageGrab.grab(bbox=(chat_left, chat_top, chat_right, chat_bottom)) # 整块截图 full_np = np.array(full_shot) full_gray = cv2.cvtColor(full_np, cv2.COLOR_RGB2GRAY) full_enh = cv2.convertScaleAbs(full_gray, alpha=1.25, beta=6) _, full_bin = cv2.threshold(full_enh, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU) full_pil = Image.fromarray(full_bin) self.save_ocr_debug_image(full_pil, f"{timestamp}_{contact_key}_fallback_full_bin.png") # 保存兜底图 full_bytes = BytesIO() full_pil.save(full_bytes, format='PNG') full_lines = self.ocr.recognize(full_bytes.getvalue()) # 兜底OCR logger.info(f"[OCR] fallback_full_chat lines={len(full_lines)}") all_lines.extend(full_lines) if not all_lines: # OCR全失败时走UIA logger.info("[CHAT] OCR 无结果,尝试 UIA 文本兜底") all_lines = self.extract_text_by_uia() if not all_lines: # UIA也失败 logger.info("[CHAT] OCR+UIA 均无结果,结束") return False valid_lines = [line.strip() for line in all_lines if len(line.strip()) >= 2] # 过滤无效行 if not valid_lines: logger.info("[CHAT] 过滤后无有效文本,结束") return False latest = None # 最终待处理消息 prioritized = [line for line in reversed(valid_lines) if ("报名" in line or "课程" in line or "价格" in line)] # 规则关键词优先 if prioritized: latest = prioritized[0] logger.info(f"[CHAT] 规则关键词优先命中 latest={latest}") else: for line in reversed(valid_lines): # 从最新往前找可回复文本 if self.should_reply(line): latest = line break if not latest: logger.info("[CHAT] 没有命中可回复文本,结束") debug_log("skip_all_should_reply=false") return False logger.info(f"[CHAT] 识别目标消息 latest={latest}") debug_log(f"latest_text={latest}") if not self.is_new_message(latest, contact_key): # 重复消息不再回 debug_log("skip_duplicate_message") return False reply = self.get_ai_reply(latest) # 请求后端生成回复 if reply: debug_log(f"ai_reply={reply}") self.send_message(reply, window_rect) # 发送回复 logger.info("[CHAT] 本次会话处理完成(已发送)") return True debug_log("no_ai_reply") logger.info("[CHAT] 本次会话处理完成(无可发送回复)") return False except Exception as e: logger.error(f"[CHAT] 处理聊天失败: {e}") return False def run_forever(self): """主循环:红点检测 -> 点击会话 -> 处理消息 -> 间隔轮询。""" logger.info("[LOOP] 监听循环启动") self.running = True # 标记运行中 round_count = 0 # 轮次计数 while self.running: try: round_count += 1 # 新一轮 logger.info(f"[LOOP] 开始第 {round_count} 轮") debug_log(f"round={round_count}") window_rect = self.get_window_rect() # 获取窗口坐标 if not window_rect: logger.info("[LOOP] 未取到窗口位置,等待下一轮") time.sleep(LOOP_INTERVAL) continue red_dots = self.detect_red_dots(window_rect) # 红点检测 logger.info(f"[LOOP] 本轮红点数={len(red_dots)}") debug_log(f"red_dots_detected={len(red_dots)}") if not red_dots: logger.info("[LOOP] 未检测到红点,走当前会话兜底") debug_log("fallback_current_chat") self.process_current_chat(window_rect, "current_chat_fallback") # 无红点也扫当前会话 time.sleep(LOOP_INTERVAL) continue for idx, red_dot in enumerate(red_dots, 1): # 逐个红点处理 logger.info(f"[LOOP] 处理红点 {idx}/{len(red_dots)}") self.click_contact_by_red_dot(red_dot, window_rect) # 点击会话 contact_key = f"{red_dot['x']}_{red_dot['y']}" # 联系人key self.process_current_chat(window_rect, contact_key) # 处理会话 time.sleep(1) # 短暂节流 logger.info(f"[LOOP] 第 {round_count} 轮结束,休眠 {LOOP_INTERVAL} 秒") time.sleep(LOOP_INTERVAL) except Exception as e: logger.error(f"[LOOP] 循环出错: {e}") time.sleep(3) # 异常后稍等再继续 def stop(self): logger.info("[LOOP] 收到停止信号") self.running = False # 置位退出 if __name__ == "__main__": try: bot = WechatMultiChatBot() # 构造机器人 bot.run_forever() # 进入主循环 except KeyboardInterrupt: bot.stop() # Ctrl+C优雅停止 except Exception as e: print(f"错误: {e}") # 启动异常输出