Files
ai-shiliu/app/infrastructure/wechat_multi_chat_bot copy.py.bak
figmar 81115dc23d 初始提交:识流 AI 助手项目
微信自动回复机器人,基于截图+OCR识别消息,支持关键词规则和 AI(OpenAI/DeepSeek/Dify)自动回复。
技术栈:PySide6 + Flask + Vue3 + RapidOCR + SQLite

注:OCR大模型文件(.onnx / .pdiparams)不纳入版本控制,需单独下载。

🤖 Generated with [Qoder][https://qoder.com]
2026-05-30 15:09:40 +08:00

505 lines
27 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import ctypes # Windows窗口句柄相关API
import hashlib # 文本去重哈希
import logging # 日志模块
import os # 文件目录操作
import re # 正则过滤文本
import time # 休眠/节流
from datetime import datetime # 时间戳命名与日志辅助
from io import BytesIO # 图片转字节流给OCR
import cv2 # 图像处理
import numpy as np # 数组/图像矩阵
import pyautogui # 鼠标键盘自动化
import pyperclip # 剪贴板粘贴发送文本
import requests # 调用后端接口
import uiautomation as auto # Windows UI 自动化
from PIL import ImageGrab, Image # 截图与图像对象
from app.infrastructure.service.wechat.config import BAIDU_API_KEY, BAIDU_SECRET_KEY, BACKEND_URL, LOOP_INTERVAL, NO_REPLY_KEYWORDS, BLOCKED_SESSION_KEYWORDS, UI_NOISE_KEYWORDS, BOT_DEBUG_LOG, WECHAT_WINDOW_TARGET_WIDTH, WECHAT_WINDOW_TARGET_HEIGHT, WECHAT_WINDOW_TARGET_LEFT, WECHAT_WINDOW_TARGET_TOP, OCR_SAVE_IMAGES, OCR_SAVE_DIR # 机器人配置
from app.infrastructure.service.wechat.ocr import BaiduOCR # 百度OCR封装
logging.basicConfig( # 配置全局日志
level=logging.INFO, # 默认输出INFO及以上
format="%(asctime)s - %(levelname)s - %(message)s", # 日志格式
handlers=[
logging.FileHandler("wechat_multi_chat_bot.log", encoding="utf-8"), # 写文件
logging.StreamHandler() # 打终端
],
)
logger = logging.getLogger(__name__) # 当前模块日志器
def debug_log(message):
if BOT_DEBUG_LOG: # 仅在调试开关开启时输出
logger.info(f"[DEBUG] {message}")
class WechatMultiChatBot:
"""微信自动回复机器人UI自动化 + OCR + 规则/AI回复"""
def __init__(self):
"""初始化OCR、微信窗口句柄和运行期状态。"""
logger.info("[BOT] 初始化开始") # 生命周期日志
self.ocr = BaiduOCR(BAIDU_API_KEY, BAIDU_SECRET_KEY) # OCR客户端
self.wechat_window = None # 微信窗口对象
self.find_wechat_window() # 启动即查找窗口
self.seen_messages = {} # 去重缓存contact_key -> hash set
self.processed_contacts = set() # 预留:已处理联系人集合
self.running = False # 主循环运行状态
logger.info("[BOT] 初始化完成")
def find_wechat_window(self):
"""查找微信主窗口;成功后执行窗口标准化。"""
logger.info("[WIN] 正在查找微信窗口")
self.wechat_window = auto.WindowControl(searchDepth=1, Name='微信') # 按窗口名查找
if self.wechat_window.Exists(0, 0): # 立即探测是否存在
logger.info("[WIN] 找到微信窗口")
self.normalize_wechat_window() # 找到后标准化尺寸
return
logger.error("[WIN] 未找到微信窗口")
raise Exception("未找到微信窗口")
def normalize_wechat_window(self):
"""将微信窗口调整到预设位置和尺寸,保证识别区域稳定。"""
logger.info("[WIN] 开始标准化窗口大小和位置")
try:
self.wechat_window.SetActive() # 激活窗口
time.sleep(0.25) # 等待激活稳定
hwnd = getattr(self.wechat_window, "NativeWindowHandle", 0) # 尝试取窗口句柄
if hwnd:
user32 = ctypes.windll.user32 # Win32接口
SWP_NOZORDER = 0x0004 # 不改变Z序
SWP_NOACTIVATE = 0x0010 # 不抢前台焦点
user32.SetWindowPos(
int(hwnd), # 目标窗口句柄
0, # hWndInsertAfter
int(WECHAT_WINDOW_TARGET_LEFT), # 目标左上x
int(WECHAT_WINDOW_TARGET_TOP), # 目标左上y
int(WECHAT_WINDOW_TARGET_WIDTH), # 目标宽度
int(WECHAT_WINDOW_TARGET_HEIGHT), # 目标高度
SWP_NOZORDER | SWP_NOACTIVATE, # 标志位
)
logger.info(f"[WIN] 句柄模式窗口标准化成功 hwnd={hwnd}")
else:
if hasattr(self.wechat_window, "MoveTo") and hasattr(self.wechat_window, "Resize"): # 兼容旧接口
self.wechat_window.MoveTo(WECHAT_WINDOW_TARGET_LEFT, WECHAT_WINDOW_TARGET_TOP) # 移动窗口
self.wechat_window.Resize(WECHAT_WINDOW_TARGET_WIDTH, WECHAT_WINDOW_TARGET_HEIGHT) # 调整尺寸
logger.info("[WIN] MoveTo/Resize 模式窗口标准化成功")
else:
raise Exception("未获取到窗口句柄且不支持 MoveTo/Resize")
time.sleep(0.35) # 等待窗口重排完成
logger.info(f"[WIN] 标准化参数 left={WECHAT_WINDOW_TARGET_LEFT} top={WECHAT_WINDOW_TARGET_TOP} width={WECHAT_WINDOW_TARGET_WIDTH} height={WECHAT_WINDOW_TARGET_HEIGHT}")
debug_log(f"window_normalized=({WECHAT_WINDOW_TARGET_LEFT},{WECHAT_WINDOW_TARGET_TOP},{WECHAT_WINDOW_TARGET_WIDTH},{WECHAT_WINDOW_TARGET_HEIGHT})")
except Exception as e:
logger.warning(f"[WIN] 窗口标准化失败,继续按当前窗口运行: {e}")
def get_window_rect(self):
try:
rect = self.wechat_window.BoundingRectangle # UIA矩形
result = {'left': rect.left, 'top': rect.top, 'right': rect.right, 'bottom': rect.bottom, 'width': rect.right - rect.left, 'height': rect.bottom - rect.top} # 转字典
debug_log(f"window_rect={result}")
return result
except Exception as e:
logger.warning(f"[WIN] 获取窗口位置失败: {e}")
return None
def get_contact_list_rect(self, window_rect):
rect = {'left': window_rect['left'] + 10, 'top': window_rect['top'] + 50, 'right': window_rect['left'] + int(window_rect['width'] * 0.25) - 10, 'bottom': window_rect['bottom'] - 50} # 左侧会话列表区域
debug_log(f"contact_rect={rect}")
return rect
def detect_red_dots(self, window_rect):
logger.info("[DETECT] 开始检测红点")
contact_rect = self.get_contact_list_rect(window_rect) # 获取列表区域
try:
screenshot = ImageGrab.grab(bbox=(contact_rect['left'], contact_rect['top'], contact_rect['right'], contact_rect['bottom'])) # 列表截图
img_np = np.array(screenshot) # PIL -> numpy
img_bgr = cv2.cvtColor(img_np, cv2.COLOR_RGB2BGR) # RGB -> BGR
hsv = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2HSV) # BGR -> HSV
mask = cv2.inRange(hsv, np.array([0, 80, 80]), np.array([12, 255, 255])) + cv2.inRange(hsv, np.array([168, 80, 80]), np.array([180, 255, 255])) # 红色双区间
contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) # 查找连通域
contact_width = contact_rect['right'] - contact_rect['left'] # 列表宽度
red_dots_raw = [] # 原始红点
for contour in contours:
area = cv2.contourArea(contour) # 面积过滤
if 12 < area < 220:
perimeter = cv2.arcLength(contour, True) # 周长
if perimeter > 0:
circularity = 4 * np.pi * area / (perimeter * perimeter) # 圆度
if circularity > 0.45:
M = cv2.moments(contour) # 矩中心
if M["m00"] != 0:
cx = int(M["m10"] / M["m00"]) # 中心x
cy = int(M["m01"] / M["m00"]) # 中心y
if cx > contact_width * 0.1: # 过滤左边缘噪声
red_dots_raw.append({'x': contact_rect['left'] + cx, 'y': contact_rect['top'] + cy, 'rel_y': cy}) # 记录绝对坐标
red_dots_grouped = [] # 按行合并后的红点
debug_log(f"red_dots_raw={len(red_dots_raw)}")
used = set() # 已分组索引
for i, dot in enumerate(red_dots_raw):
if i in used:
continue
group = [dot] # 当前分组
for j, other in enumerate(red_dots_raw):
if j != i and j not in used and abs(dot['rel_y'] - other['rel_y']) < 50: # 同一行
group.append(other)
used.add(j)
red_dots_grouped.append({'x': sum(d['x'] for d in group) // len(group), 'y': sum(d['y'] for d in group) // len(group)}) # 行中心
used.add(i)
logger.info(f"[DETECT] 红点检测完成 grouped={len(red_dots_grouped)}")
debug_log(f"red_dots_grouped={len(red_dots_grouped)}")
return red_dots_grouped
except Exception as e:
logger.error(f"[DETECT] 检测红点失败: {e}")
return []
def click_contact_by_red_dot(self, red_dot, window_rect):
contact_rect = self.get_contact_list_rect(window_rect) # 计算左侧联系人列表区域
click_x = (contact_rect['left'] + contact_rect['right']) // 2 # 点击联系人列表中线,避免点到红点本身
click_y = red_dot['y'] # 使用红点纵坐标对应会话行
logger.info(f"[ACTION] 点击联系人 x={click_x} y={click_y}") # 记录点击行为
pyautogui.click(click_x, click_y) # 执行点击
time.sleep(1.2) # 等待会话内容加载
session_title = self.get_current_session_title() # 读取当前会话标题
logger.info(f"[ACTION] 当前会话标题 title={session_title or '未知'}") # 输出标题用于排查
time.sleep(1.0) # 再等待短时间保证后续OCR稳定
def get_session_title_by_ocr(self):
try:
window_rect = self.get_window_rect() # 获取微信窗口坐标
if not window_rect:
return "" # 窗口坐标不可用
title_areas = [ # 多个标题区域兜底,适配不同微信版本/缩放
(
window_rect['left'] + int(window_rect['width'] * 0.32),
window_rect['top'] + 6,
window_rect['right'] - int(window_rect['width'] * 0.30),
window_rect['top'] + max(40, int(window_rect['height'] * 0.085)),
),
(
window_rect['left'] + int(window_rect['width'] * 0.28),
window_rect['top'] + 4,
window_rect['right'] - int(window_rect['width'] * 0.24),
window_rect['top'] + max(46, int(window_rect['height'] * 0.095)),
),
]
for idx, (left, top, right, bottom) in enumerate(title_areas, 1): # 逐区域尝试
screenshot = ImageGrab.grab(bbox=(left, top, right, bottom)) # 截标题图
self.save_ocr_debug_image(screenshot, f"title_ocr_{idx}_{datetime.now().strftime('%Y%m%d_%H%M%S_%f')}.png") # 落盘调试
img_bytes = BytesIO() # 内存字节流
screenshot.save(img_bytes, format='PNG') # 转png字节
lines = self.ocr.recognize(img_bytes.getvalue()) # OCR识别
valid = [x.strip() for x in lines if x and x.strip() and len(x.strip()) >= 2] # 清洗文本
valid = [x for x in valid if x not in UI_NOISE_KEYWORDS] # 去噪
if valid:
title = valid[0] # 取首个有效标题
debug_log(f"session_title_ocr={title} area={idx} lines={valid[:5]}")
return title
return "" # 全部区域失败
except Exception as e:
debug_log(f"session_title_ocr_error={e}")
return "" # OCR异常返回空
def get_current_session_title(self):
try:
title_ctrl = self.wechat_window.TextControl(foundIndex=1) # 优先UIA读标题
if hasattr(title_ctrl, "Exists") and not title_ctrl.Exists(0.3, 0.05): # 快速存在性检查
debug_log("session_title_not_found_fast")
title = self.get_session_title_by_ocr() # UIA失败转OCR
return title
title = (getattr(title_ctrl, "Name", "") or "").strip() # 读取控件Name
if not title or title in UI_NOISE_KEYWORDS: # 噪声/空值
title = self.get_session_title_by_ocr() # OCR兜底
debug_log(f"session_title={title}")
return title
except Exception as e:
debug_log(f"session_title_error={e}")
return self.get_session_title_by_ocr() # 异常也兜底
def is_blocked_session(self):
title = self.get_current_session_title() # 取会话标题
if not title:
return False # 标题未知默认不拦截
for keyword in BLOCKED_SESSION_KEYWORDS: # 命中过滤词则跳过
if keyword in title:
logger.info(f"[SESSION] 命中过滤会话 title={title} keyword={keyword}")
return True
return False
def get_latest_message_areas(self, window_rect):
"""计算多段消息识别区域(靠近底部,适配不同窗口尺寸)。"""
chat_left = window_rect['left'] + int(window_rect['width'] * 0.30) # 聊天区左边界
chat_right = window_rect['right'] - 20 # 聊天区右边界
chat_top = window_rect['top'] + int(window_rect['height'] * 0.16) # 聊天区上边界
chat_bottom = window_rect['bottom'] - int(window_rect['height'] * 0.20) # 聊天区下边界(排除输入区)
chat_height = max(420, int((chat_bottom - chat_top) * 0.90)) # 有效聊天高度
area1_h = max(220, int(chat_height * 0.46)) # 近底部区域
area2_h = max(320, int(chat_height * 0.72)) # 中大区域
area3_h = max(420, int(chat_height * 0.98)) # 大区域兜底
areas = [
{'left': chat_left, 'top': max(chat_top, chat_bottom - area1_h), 'right': chat_right, 'bottom': chat_bottom}, # 区域1
{'left': chat_left, 'top': max(chat_top, chat_bottom - area2_h), 'right': chat_right, 'bottom': chat_bottom - max(35, int(area1_h * 0.12))}, # 区域2
{'left': chat_left, 'top': max(chat_top, chat_bottom - area3_h), 'right': chat_right, 'bottom': chat_bottom - max(80, int(area2_h * 0.20))}, # 区域3
]
debug_log(f"message_areas={areas}")
return areas
def should_reply(self, text):
"""基础过滤:过短、无意义、无需回复文本直接跳过。"""
if not text or len(text) < 2: # 文本太短不回复
return False
text_lower = text.lower() # 统一小写比较
for keyword in NO_REPLY_KEYWORDS: # 命中免回复词
if keyword.lower() in text_lower:
return False
return not text.isdigit() # 纯数字通常无语义
def is_new_message(self, text, contact_key):
"""消息去重:同一会话内已处理过的文本不重复回复。"""
if contact_key not in self.seen_messages:
self.seen_messages[contact_key] = set() # 首次初始化会话集合
h = hashlib.md5(text.encode("utf-8")).hexdigest() # 文本哈希
if h in self.seen_messages[contact_key]:
logger.info(f"[FILTER] 重复消息 contact_key={contact_key}")
return False
self.seen_messages[contact_key].add(h) # 记录已处理
if len(self.seen_messages[contact_key]) > 50: # 限制缓存规模
self.seen_messages[contact_key] = set(list(self.seen_messages[contact_key])[-50:])
return True
def extract_text_by_uia(self):
logger.info("[UIA] 尝试控件树文本提取")
lines = [] # 候选文本
try:
all_text_controls = self.wechat_window.GetChildren() # 取一级子控件
for ctrl in all_text_controls:
name = (getattr(ctrl, 'Name', '') or '').strip() # 控件文本
if not name:
continue
if len(name) < 2:
continue
if re.fullmatch(r"[0-9:\-\s]+", name): # 过滤时间/数字
continue
if name in UI_NOISE_KEYWORDS: # 过滤UI噪声
continue
lines.append(name) # 保留候选
except Exception as e:
logger.warning(f"[UIA] 控件树提取失败: {e}")
dedup = [] # 去重结果
seen = set() # 去重集合
for x in lines:
if x not in seen:
dedup.append(x)
seen.add(x)
logger.info(f"[UIA] 控件树提取文本数={len(dedup)}")
debug_log(f"uia_lines={dedup[-8:] if dedup else []}")
return dedup
def save_ocr_debug_image(self, image_obj, filename):
"""保存OCR调试截图便于核对截取区域和图像质量。"""
if not OCR_SAVE_IMAGES: # 开关关闭则不落盘
return
try:
os.makedirs(OCR_SAVE_DIR, exist_ok=True) # 创建目录
file_path = os.path.join(OCR_SAVE_DIR, filename) # 文件路径
image_obj.save(file_path) # 保存图片
debug_log(f"saved_ocr_image={file_path}")
except Exception as e:
logger.warning(f"[OCR] 保存调试截图失败: {e}")
def get_ai_reply(self, message):
"""调用后端接口获取最终回复规则优先AI兜底"""
logger.info(f"[API] 请求后端生成回复 content={message}")
try:
resp = requests.post(BACKEND_URL, json={"content": message, "wx_user_id": "multi_chat_bot", "wx_nickname": "用户"}, timeout=10) # 请求后端
logger.info(f"[API] 后端响应 status={resp.status_code}")
if resp.status_code == 200:
data = resp.json() # 解析json
if data.get("success") and data.get("should_reply"):
reply = (data.get("reply_text") or "").strip() # 取回复文本
logger.info(f"[API] 生成回复成功 reply={reply}")
return reply
logger.info("[API] 后端未返回可发送回复")
except Exception as e:
logger.error(f"[API] 调用接口失败: {e}")
return None
def send_message(self, text, window_rect):
logger.info(f"[SEND] 开始发送 reply={text}")
try:
self.wechat_window.SetActive() # 激活微信窗口
time.sleep(0.3)
pyautogui.click(window_rect['left'] + window_rect['width'] // 2, window_rect['bottom'] - 100) # 点击输入框
time.sleep(0.3)
pyautogui.hotkey('ctrl', 'a') # 全选旧文本
pyautogui.press('delete') # 清空输入框
pyperclip.copy(text) # 文本放入剪贴板
pyautogui.hotkey('ctrl', 'v') # 粘贴内容
pyautogui.press('enter') # 回车发送
logger.info("[SEND] 发送成功")
except Exception as e:
logger.error(f"[SEND] 发送失败: {e}")
def process_current_chat(self, window_rect, contact_key):
logger.info(f"[CHAT] 开始处理会话 contact_key={contact_key}")
if self.is_blocked_session(): # 系统会话直接跳过
logger.info("[CHAT] 当前会话在过滤名单,跳过")
return False
try:
all_lines = [] # OCR/UIA汇总文本
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f") # 调试图时间戳
for idx, msg_rect in enumerate(self.get_latest_message_areas(window_rect), 1): # 分区识别
screenshot = ImageGrab.grab(bbox=(msg_rect['left'], msg_rect['top'], msg_rect['right'], msg_rect['bottom'])) # 区域截图
img_np = np.array(screenshot) # 转numpy
raw_pil = Image.fromarray(img_np) # 原图
self.save_ocr_debug_image(raw_pil, f"{timestamp}_{contact_key}_area{idx}_raw.png") # 保存原图
raw_bytes = BytesIO() # 原图字节流
raw_pil.save(raw_bytes, format='PNG')
lines_raw = self.ocr.recognize(raw_bytes.getvalue()) # 原图OCR
gray = cv2.cvtColor(img_np, cv2.COLOR_RGB2GRAY) # 灰度化
enhanced = cv2.convertScaleAbs(gray, alpha=1.35, beta=8) # 提升对比度
_, binary = cv2.threshold(enhanced, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU) # 二值化
pil_bin = Image.fromarray(binary) # 二值图
self.save_ocr_debug_image(pil_bin, f"{timestamp}_{contact_key}_area{idx}_bin.png") # 保存二值图
bin_bytes = BytesIO() # 二值图字节流
pil_bin.save(bin_bytes, format='PNG')
lines_bin = self.ocr.recognize(bin_bytes.getvalue()) # 二值图OCR
lines = lines_raw + lines_bin # 合并两路识别
logger.info(f"[OCR] area={idx} raw={len(lines_raw)} bin={len(lines_bin)} total={len(lines)}")
debug_log(f"ocr_lines_count={len(lines)} contact_key={contact_key} area={idx}")
all_lines.extend(lines) # 汇总
if not all_lines: # 分区都失败时走整块兜底
logger.info("[CHAT] 分区OCR无结果尝试整块聊天区兜底")
chat_left = window_rect['left'] + int(window_rect['width'] * 0.30)
chat_right = window_rect['right'] - 20
chat_top = window_rect['top'] + int(window_rect['height'] * 0.16)
chat_bottom = window_rect['bottom'] - int(window_rect['height'] * 0.20)
full_shot = ImageGrab.grab(bbox=(chat_left, chat_top, chat_right, chat_bottom)) # 整块截图
full_np = np.array(full_shot)
full_gray = cv2.cvtColor(full_np, cv2.COLOR_RGB2GRAY)
full_enh = cv2.convertScaleAbs(full_gray, alpha=1.25, beta=6)
_, full_bin = cv2.threshold(full_enh, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
full_pil = Image.fromarray(full_bin)
self.save_ocr_debug_image(full_pil, f"{timestamp}_{contact_key}_fallback_full_bin.png") # 保存兜底图
full_bytes = BytesIO()
full_pil.save(full_bytes, format='PNG')
full_lines = self.ocr.recognize(full_bytes.getvalue()) # 兜底OCR
logger.info(f"[OCR] fallback_full_chat lines={len(full_lines)}")
all_lines.extend(full_lines)
if not all_lines: # OCR全失败时走UIA
logger.info("[CHAT] OCR 无结果,尝试 UIA 文本兜底")
all_lines = self.extract_text_by_uia()
if not all_lines: # UIA也失败
logger.info("[CHAT] OCR+UIA 均无结果,结束")
return False
valid_lines = [line.strip() for line in all_lines if len(line.strip()) >= 2] # 过滤无效行
if not valid_lines:
logger.info("[CHAT] 过滤后无有效文本,结束")
return False
latest = None # 最终待处理消息
prioritized = [line for line in reversed(valid_lines) if ("报名" in line or "课程" in line or "价格" in line)] # 规则关键词优先
if prioritized:
latest = prioritized[0]
logger.info(f"[CHAT] 规则关键词优先命中 latest={latest}")
else:
for line in reversed(valid_lines): # 从最新往前找可回复文本
if self.should_reply(line):
latest = line
break
if not latest:
logger.info("[CHAT] 没有命中可回复文本,结束")
debug_log("skip_all_should_reply=false")
return False
logger.info(f"[CHAT] 识别目标消息 latest={latest}")
debug_log(f"latest_text={latest}")
if not self.is_new_message(latest, contact_key): # 重复消息不再回
debug_log("skip_duplicate_message")
return False
reply = self.get_ai_reply(latest) # 请求后端生成回复
if reply:
debug_log(f"ai_reply={reply}")
self.send_message(reply, window_rect) # 发送回复
logger.info("[CHAT] 本次会话处理完成(已发送)")
return True
debug_log("no_ai_reply")
logger.info("[CHAT] 本次会话处理完成(无可发送回复)")
return False
except Exception as e:
logger.error(f"[CHAT] 处理聊天失败: {e}")
return False
def run_forever(self):
"""主循环:红点检测 -> 点击会话 -> 处理消息 -> 间隔轮询。"""
logger.info("[LOOP] 监听循环启动")
self.running = True # 标记运行中
round_count = 0 # 轮次计数
while self.running:
try:
round_count += 1 # 新一轮
logger.info(f"[LOOP] 开始第 {round_count}")
debug_log(f"round={round_count}")
window_rect = self.get_window_rect() # 获取窗口坐标
if not window_rect:
logger.info("[LOOP] 未取到窗口位置,等待下一轮")
time.sleep(LOOP_INTERVAL)
continue
red_dots = self.detect_red_dots(window_rect) # 红点检测
logger.info(f"[LOOP] 本轮红点数={len(red_dots)}")
debug_log(f"red_dots_detected={len(red_dots)}")
if not red_dots:
logger.info("[LOOP] 未检测到红点,走当前会话兜底")
debug_log("fallback_current_chat")
self.process_current_chat(window_rect, "current_chat_fallback") # 无红点也扫当前会话
time.sleep(LOOP_INTERVAL)
continue
for idx, red_dot in enumerate(red_dots, 1): # 逐个红点处理
logger.info(f"[LOOP] 处理红点 {idx}/{len(red_dots)}")
self.click_contact_by_red_dot(red_dot, window_rect) # 点击会话
contact_key = f"{red_dot['x']}_{red_dot['y']}" # 联系人key
self.process_current_chat(window_rect, contact_key) # 处理会话
time.sleep(1) # 短暂节流
logger.info(f"[LOOP] 第 {round_count} 轮结束,休眠 {LOOP_INTERVAL}")
time.sleep(LOOP_INTERVAL)
except Exception as e:
logger.error(f"[LOOP] 循环出错: {e}")
time.sleep(3) # 异常后稍等再继续
def stop(self):
logger.info("[LOOP] 收到停止信号")
self.running = False # 置位退出
if __name__ == "__main__":
try:
bot = WechatMultiChatBot() # 构造机器人
bot.run_forever() # 进入主循环
except KeyboardInterrupt:
bot.stop() # Ctrl+C优雅停止
except Exception as e:
print(f"错误: {e}") # 启动异常输出