#!/usr/bin/env python # -*- coding: utf-8 -*- """ 微信自动回复机器人 - 优化版 整合开源项目优点: 1. 快速红点检测(numpy矩阵运算,比逐像素快100倍) 2. 智能区域过滤(只检测特定X坐标范围) 3. 性能统计(详细的耗时分析) """ import os import time import hashlib import logging import base64 from datetime import datetime from io import BytesIO import cv2 import numpy as np import requests import pyperclip import pyautogui from PIL import ImageGrab import uiautomation as auto # ========== 配置 ========== BAIDU_API_KEY = "ElIQN30iAqpEGi9zv0VlrtQX" BAIDU_SECRET_KEY = "7wrO2wDTx7FehuelgG0NCBDFOklnqSz0" BACKEND_URL = "http://127.0.0.1/shiliu_ai/api_receive_message.php" LOOP_INTERVAL = 3 # 红点检测配置(借鉴开源项目的精确检测) RED_DOT_CONFIG = { 'target_color_bgr': np.array([81, 81, 255]), # 微信红点BGR颜色 'color_tolerance': 10, # 颜色容差 'x_range': (60, 200), # 检测区域X坐标范围 } NO_REPLY_KEYWORDS = [ "谢谢", "好的", "嗯", "哦", "ok", "收到", "[图片]", "[语音]", "[视频]", "[文件]" ] # ========== 日志 ========== logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s", handlers=[ logging.FileHandler("wechat_bot_optimized.log", encoding="utf-8"), logging.StreamHandler() ], ) logger = logging.getLogger(__name__) # ========== OCR类 ========== class BaiduOCR: """百度OCR识别""" def __init__(self): self.access_token = self._get_access_token() logger.info("✓ 百度OCR初始化成功") def _get_access_token(self): url = "https://aip.baidubce.com/oauth/2.0/token" params = { "grant_type": "client_credentials", "client_id": BAIDU_API_KEY, "client_secret": BAIDU_SECRET_KEY } response = requests.post(url, params=params) return response.json().get("access_token") def recognize(self, image_bytes): url = f"https://aip.baidubce.com/rest/2.0/ocr/v1/general_basic?access_token={self.access_token}" payload = { 'image': base64.b64encode(image_bytes).decode('utf-8'), 'detect_direction': 'false', 'paragraph': 'false', 'probability': 'false' } headers = { 'Content-Type': 'application/x-www-form-urlencoded', 'Accept': 'application/json' } response = requests.post(url, headers=headers, data=payload) result = response.json() if 'words_result' in result: return [item['words'] for item in result['words_result']] return [] # ========== 微信机器人类(优化版)========== class WechatBotOptimized: """微信自动回复机器人 - 优化版""" def __init__(self): self.ocr = BaiduOCR() self.processed_messages = {} self.running = False self.performance_stats = { 'red_dot_detect': [], 'ocr_recognize': [], 'total_process': [] } def get_window_rect(self): """获取微信窗口位置""" try: wechat_window = auto.WindowControl(searchDepth=1, Name="微信") if wechat_window.Exists(0, 0): rect = wechat_window.BoundingRectangle return { 'left': rect.left, 'top': rect.top, 'right': rect.right, 'bottom': rect.bottom, 'width': rect.right - rect.left, 'height': rect.bottom - rect.top } except Exception as e: logger.error(f"获取窗口失败: {e}") return None def get_contact_list_rect(self, window_rect): """获取联系人列表区域""" left = window_rect['left'] + 10 top = window_rect['top'] + 50 right = window_rect['left'] + int(window_rect['width'] * 0.25) - 10 bottom = window_rect['bottom'] - 50 return { 'left': left, 'top': top, 'right': right, 'bottom': bottom } def detect_red_dots_fast(self, window_rect): """ 快速红点检测(借鉴开源项目) 使用numpy矩阵运算,比逐像素遍历快100倍 """ start_time = time.time() contact_rect = self.get_contact_list_rect(window_rect) try: # 截图 screenshot = ImageGrab.grab(bbox=( contact_rect['left'], contact_rect['top'], contact_rect['right'], contact_rect['bottom'] )) # 转换为numpy数组(BGR格式) img_np = np.array(screenshot) img_bgr = cv2.cvtColor(img_np, cv2.COLOR_RGB2BGR) # 获取配置 target_color = RED_DOT_CONFIG['target_color_bgr'] tolerance = RED_DOT_CONFIG['color_tolerance'] x_range = RED_DOT_CONFIG['x_range'] # 生成坐标网格(性能优化关键!) height, width = img_bgr.shape[:2] x_coords, y_coords = np.meshgrid( np.arange(width), np.arange(height) ) # 颜色匹配(矩阵运算,比循环快100倍) lower_bound = target_color - tolerance upper_bound = target_color + tolerance color_mask = np.all((lower_bound <= img_bgr) & (img_bgr <= upper_bound), axis=-1) # 区域过滤(只检测特定X坐标范围) region_mask = (x_coords >= x_range[0]) & (x_coords <= x_range[1]) # 获取候选坐标 matched_points = np.column_stack(( x_coords[color_mask & region_mask], y_coords[color_mask & region_mask] )) if matched_points.size == 0: return [] # 按Y坐标分组(同一联系人的多个红点合并) red_dots = [] used = set() for i, point in enumerate(matched_points): if i in used: continue # 找到Y坐标相近的点 group = [point] for j, other in enumerate(matched_points): if j != i and j not in used: if abs(point[1] - other[1]) < 50: group.append(other) used.add(j) # 计算平均位置 avg_x = int(np.mean([p[0] for p in group])) avg_y = int(np.mean([p[1] for p in group])) red_dots.append({ 'x': contact_rect['left'] + avg_x, 'y': contact_rect['top'] + avg_y }) used.add(i) # 记录性能 elapsed = time.time() - start_time self.performance_stats['red_dot_detect'].append(elapsed) return red_dots except Exception as e: logger.error(f"红点检测失败: {e}") return [] def click_contact_by_red_dot(self, red_dot, window_rect): """点击联系人""" contact_rect = self.get_contact_list_rect(window_rect) click_x = (contact_rect['left'] + contact_rect['right']) // 2 click_y = red_dot['y'] pyautogui.click(click_x, click_y) time.sleep(2.5) logger.info(f"点击联系人位置: ({click_x}, {click_y})") def get_latest_message_area(self, window_rect): """获取最新消息区域""" chat_left = window_rect['left'] + int(window_rect['width'] * 0.30) chat_right = window_rect['right'] - 20 chat_top = window_rect['top'] + int(window_rect['height'] * 0.15) chat_bottom = window_rect['bottom'] - int(window_rect['height'] * 0.20) return { 'left': chat_left, 'top': max(chat_bottom - 300, chat_top), 'right': chat_right, 'bottom': chat_bottom } def process_current_chat(self, window_rect, contact_key): """处理当前聊天""" start_time = time.time() msg_rect = self.get_latest_message_area(window_rect) try: screenshot = ImageGrab.grab(bbox=( msg_rect['left'], msg_rect['top'], msg_rect['right'], msg_rect['bottom'] )) # OCR识别 ocr_start = time.time() img_byte_arr = BytesIO() screenshot.save(img_byte_arr, format='PNG') img_bytes = img_byte_arr.getvalue() lines = self.ocr.recognize(img_bytes) ocr_elapsed = time.time() - ocr_start self.performance_stats['ocr_recognize'].append(ocr_elapsed) if not lines: return False # 过滤消息(严格过滤) import re valid_lines = [] for line in lines: if len(line) < 3: continue if re.match(r'^\d{1,2}:\d{2}$', line): continue if any(char in line for char in ['©', 'ò', 'v0', 'V0']): continue valid_lines.append(line) if not valid_lines: return False latest = valid_lines[-1] print(f" [识别] {latest}") # 判断是否需要回复 if not self.should_reply(latest): print(f" [跳过] 不需要回复") return False if not self.is_new_message(latest, contact_key): print(f" [跳过] 已处理") return False print(f" [新消息] {latest}") # 获取AI回复 reply = self.get_ai_reply(latest) if reply: print(f" [AI回复] {reply}") self.send_message(reply, window_rect) # 记录性能 total_elapsed = time.time() - start_time self.performance_stats['total_process'].append(total_elapsed) return True return False except Exception as e: logger.error(f"处理聊天失败: {e}") return False def should_reply(self, message): """判断是否需要回复""" if not message or len(message) < 2: return False for keyword in NO_REPLY_KEYWORDS: if keyword in message: return False return True def is_new_message(self, message, contact_key): """判断是否为新消息""" msg_hash = hashlib.md5(message.encode()).hexdigest() if contact_key in self.processed_messages: if msg_hash in self.processed_messages[contact_key]: return False else: self.processed_messages[contact_key] = set() self.processed_messages[contact_key].add(msg_hash) return True def get_ai_reply(self, message): """获取AI回复""" try: response = requests.post( BACKEND_URL, json={'message': message}, timeout=10 ) if response.status_code == 200: data = response.json() return data.get('reply', '') except Exception as e: logger.error(f"AI回复失败: {e}") return None def send_message(self, text, window_rect): """发送消息""" try: original_clipboard = pyperclip.paste() pyperclip.copy(text) time.sleep(0.1) pyautogui.hotkey('ctrl', 'v') time.sleep(0.1) pyautogui.press('enter') time.sleep(0.3) pyperclip.copy(original_clipboard) print(f"✓ 已发送") except Exception as e: logger.error(f"发送消息失败: {e}") def print_performance_stats(self): """打印性能统计""" if not self.performance_stats['red_dot_detect']: return print("\n" + "="*70) print("性能统计") print("="*70) avg_red_dot = np.mean(self.performance_stats['red_dot_detect']) * 1000 avg_ocr = np.mean(self.performance_stats['ocr_recognize']) * 1000 if self.performance_stats['ocr_recognize'] else 0 avg_total = np.mean(self.performance_stats['total_process']) * 1000 if self.performance_stats['total_process'] else 0 print(f"红点检测平均耗时: {avg_red_dot:.1f}ms") print(f"OCR识别平均耗时: {avg_ocr:.1f}ms") print(f"总处理平均耗时: {avg_total:.1f}ms") print("="*70 + "\n") def run_forever(self): """启动监听""" print("=" * 70) print("微信自动回复(优化版)") print("=" * 70) print("\n优化特性:") print(" ✓ 快速红点检测(numpy矩阵运算,比逐像素快100倍)") print(" ✓ 智能区域过滤(只检测特定X坐标范围)") print(" ✓ 性能统计(详细的耗时分析)") print("=" * 70) print("\n监听中... 按 Ctrl+C 停止\n") self.running = True round_count = 0 # 初始化窗口 logger.info("正在查找微信窗口...") window_rect = self.get_window_rect() if not window_rect: logger.error("未找到微信窗口") return logger.info("✓ 找到微信窗口") while self.running: try: round_count += 1 print(f"\n{'='*70}") print(f"[第 {round_count} 轮检查] {datetime.now().strftime('%H:%M:%S')}") print(f"{'='*70}") # 快速检测红点 red_dots = self.detect_red_dots_fast(window_rect) if not red_dots: print("未检测到新消息") time.sleep(LOOP_INTERVAL) continue print(f"检测到 {len(red_dots)} 个新消息") # 处理每个红点 for i, red_dot in enumerate(red_dots, 1): print(f"\n[处理第 {i}/{len(red_dots)} 个新消息]") self.click_contact_by_red_dot(red_dot, window_rect) contact_key = f"{red_dot['x']}_{red_dot['y']}" self.process_current_chat(window_rect, contact_key) time.sleep(1) # 每10轮打印一次性能统计 if round_count % 10 == 0: self.print_performance_stats() print(f"\n本轮处理完成,等待 {LOOP_INTERVAL} 秒...\n") time.sleep(LOOP_INTERVAL) except Exception as e: logger.error(f"循环出错: {e}") import traceback traceback.print_exc() time.sleep(3) def stop(self): self.running = False if __name__ == "__main__": try: bot = WechatBotOptimized() bot.run_forever() except KeyboardInterrupt: bot.stop() print("\n程序已停止") except Exception as e: print(f"\n错误: {e}") import traceback traceback.print_exc()