微信自动回复机器人,基于截图+OCR识别消息,支持关键词规则和 AI(OpenAI/DeepSeek/Dify)自动回复。 技术栈:PySide6 + Flask + Vue3 + RapidOCR + SQLite 注:OCR大模型文件(.onnx / .pdiparams)不纳入版本控制,需单独下载。 🤖 Generated with [Qoder][https://qoder.com]
487 lines
16 KiB
Python
487 lines
16 KiB
Python
#!/usr/bin/env python
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
微信自动回复机器人 - 优化版
|
||
整合开源项目优点:
|
||
1. 快速红点检测(numpy矩阵运算,比逐像素快100倍)
|
||
2. 智能区域过滤(只检测特定X坐标范围)
|
||
3. 性能统计(详细的耗时分析)
|
||
"""
|
||
|
||
import os
|
||
import time
|
||
import hashlib
|
||
import logging
|
||
import base64
|
||
from datetime import datetime
|
||
from io import BytesIO
|
||
import cv2
|
||
import numpy as np
|
||
|
||
import requests
|
||
import pyperclip
|
||
import pyautogui
|
||
from PIL import ImageGrab
|
||
import uiautomation as auto
|
||
|
||
# ========== 配置 ==========
|
||
|
||
BAIDU_API_KEY = "ElIQN30iAqpEGi9zv0VlrtQX"
|
||
BAIDU_SECRET_KEY = "7wrO2wDTx7FehuelgG0NCBDFOklnqSz0"
|
||
BACKEND_URL = "http://127.0.0.1/shiliu_ai/api_receive_message.php"
|
||
LOOP_INTERVAL = 3
|
||
|
||
# 红点检测配置(借鉴开源项目的精确检测)
|
||
RED_DOT_CONFIG = {
|
||
'target_color_bgr': np.array([81, 81, 255]), # 微信红点BGR颜色
|
||
'color_tolerance': 10, # 颜色容差
|
||
'x_range': (60, 200), # 检测区域X坐标范围
|
||
}
|
||
|
||
NO_REPLY_KEYWORDS = [
|
||
"谢谢", "好的", "嗯", "哦", "ok", "收到",
|
||
"[图片]", "[语音]", "[视频]", "[文件]"
|
||
]
|
||
|
||
# ========== 日志 ==========
|
||
|
||
logging.basicConfig(
|
||
level=logging.INFO,
|
||
format="%(asctime)s - %(levelname)s - %(message)s",
|
||
handlers=[
|
||
logging.FileHandler("wechat_bot_optimized.log", encoding="utf-8"),
|
||
logging.StreamHandler()
|
||
],
|
||
)
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# ========== OCR类 ==========
|
||
|
||
class BaiduOCR:
|
||
"""百度OCR识别"""
|
||
|
||
def __init__(self):
|
||
self.access_token = self._get_access_token()
|
||
logger.info("✓ 百度OCR初始化成功")
|
||
|
||
def _get_access_token(self):
|
||
url = "https://aip.baidubce.com/oauth/2.0/token"
|
||
params = {
|
||
"grant_type": "client_credentials",
|
||
"client_id": BAIDU_API_KEY,
|
||
"client_secret": BAIDU_SECRET_KEY
|
||
}
|
||
response = requests.post(url, params=params)
|
||
return response.json().get("access_token")
|
||
|
||
def recognize(self, image_bytes):
|
||
url = f"https://aip.baidubce.com/rest/2.0/ocr/v1/general_basic?access_token={self.access_token}"
|
||
|
||
payload = {
|
||
'image': base64.b64encode(image_bytes).decode('utf-8'),
|
||
'detect_direction': 'false',
|
||
'paragraph': 'false',
|
||
'probability': 'false'
|
||
}
|
||
|
||
headers = {
|
||
'Content-Type': 'application/x-www-form-urlencoded',
|
||
'Accept': 'application/json'
|
||
}
|
||
|
||
response = requests.post(url, headers=headers, data=payload)
|
||
result = response.json()
|
||
|
||
if 'words_result' in result:
|
||
return [item['words'] for item in result['words_result']]
|
||
return []
|
||
|
||
# ========== 微信机器人类(优化版)==========
|
||
|
||
class WechatBotOptimized:
|
||
"""微信自动回复机器人 - 优化版"""
|
||
|
||
def __init__(self):
|
||
self.ocr = BaiduOCR()
|
||
self.processed_messages = {}
|
||
self.running = False
|
||
self.performance_stats = {
|
||
'red_dot_detect': [],
|
||
'ocr_recognize': [],
|
||
'total_process': []
|
||
}
|
||
|
||
def get_window_rect(self):
|
||
"""获取微信窗口位置"""
|
||
try:
|
||
wechat_window = auto.WindowControl(searchDepth=1, Name="微信")
|
||
if wechat_window.Exists(0, 0):
|
||
rect = wechat_window.BoundingRectangle
|
||
return {
|
||
'left': rect.left,
|
||
'top': rect.top,
|
||
'right': rect.right,
|
||
'bottom': rect.bottom,
|
||
'width': rect.right - rect.left,
|
||
'height': rect.bottom - rect.top
|
||
}
|
||
except Exception as e:
|
||
logger.error(f"获取窗口失败: {e}")
|
||
return None
|
||
|
||
def get_contact_list_rect(self, window_rect):
|
||
"""获取联系人列表区域"""
|
||
left = window_rect['left'] + 10
|
||
top = window_rect['top'] + 50
|
||
right = window_rect['left'] + int(window_rect['width'] * 0.25) - 10
|
||
bottom = window_rect['bottom'] - 50
|
||
|
||
return {
|
||
'left': left,
|
||
'top': top,
|
||
'right': right,
|
||
'bottom': bottom
|
||
}
|
||
|
||
def detect_red_dots_fast(self, window_rect):
|
||
"""
|
||
快速红点检测(借鉴开源项目)
|
||
使用numpy矩阵运算,比逐像素遍历快100倍
|
||
"""
|
||
start_time = time.time()
|
||
contact_rect = self.get_contact_list_rect(window_rect)
|
||
|
||
try:
|
||
# 截图
|
||
screenshot = ImageGrab.grab(bbox=(
|
||
contact_rect['left'], contact_rect['top'],
|
||
contact_rect['right'], contact_rect['bottom']
|
||
))
|
||
|
||
# 转换为numpy数组(BGR格式)
|
||
img_np = np.array(screenshot)
|
||
img_bgr = cv2.cvtColor(img_np, cv2.COLOR_RGB2BGR)
|
||
|
||
# 获取配置
|
||
target_color = RED_DOT_CONFIG['target_color_bgr']
|
||
tolerance = RED_DOT_CONFIG['color_tolerance']
|
||
x_range = RED_DOT_CONFIG['x_range']
|
||
|
||
# 生成坐标网格(性能优化关键!)
|
||
height, width = img_bgr.shape[:2]
|
||
x_coords, y_coords = np.meshgrid(
|
||
np.arange(width),
|
||
np.arange(height)
|
||
)
|
||
|
||
# 颜色匹配(矩阵运算,比循环快100倍)
|
||
lower_bound = target_color - tolerance
|
||
upper_bound = target_color + tolerance
|
||
color_mask = np.all((lower_bound <= img_bgr) & (img_bgr <= upper_bound), axis=-1)
|
||
|
||
# 区域过滤(只检测特定X坐标范围)
|
||
region_mask = (x_coords >= x_range[0]) & (x_coords <= x_range[1])
|
||
|
||
# 获取候选坐标
|
||
matched_points = np.column_stack((
|
||
x_coords[color_mask & region_mask],
|
||
y_coords[color_mask & region_mask]
|
||
))
|
||
|
||
if matched_points.size == 0:
|
||
return []
|
||
|
||
# 按Y坐标分组(同一联系人的多个红点合并)
|
||
red_dots = []
|
||
used = set()
|
||
|
||
for i, point in enumerate(matched_points):
|
||
if i in used:
|
||
continue
|
||
|
||
# 找到Y坐标相近的点
|
||
group = [point]
|
||
for j, other in enumerate(matched_points):
|
||
if j != i and j not in used:
|
||
if abs(point[1] - other[1]) < 50:
|
||
group.append(other)
|
||
used.add(j)
|
||
|
||
# 计算平均位置
|
||
avg_x = int(np.mean([p[0] for p in group]))
|
||
avg_y = int(np.mean([p[1] for p in group]))
|
||
|
||
red_dots.append({
|
||
'x': contact_rect['left'] + avg_x,
|
||
'y': contact_rect['top'] + avg_y
|
||
})
|
||
used.add(i)
|
||
|
||
# 记录性能
|
||
elapsed = time.time() - start_time
|
||
self.performance_stats['red_dot_detect'].append(elapsed)
|
||
|
||
return red_dots
|
||
|
||
except Exception as e:
|
||
logger.error(f"红点检测失败: {e}")
|
||
return []
|
||
|
||
def click_contact_by_red_dot(self, red_dot, window_rect):
|
||
"""点击联系人"""
|
||
contact_rect = self.get_contact_list_rect(window_rect)
|
||
|
||
click_x = (contact_rect['left'] + contact_rect['right']) // 2
|
||
click_y = red_dot['y']
|
||
|
||
pyautogui.click(click_x, click_y)
|
||
time.sleep(2.5)
|
||
|
||
logger.info(f"点击联系人位置: ({click_x}, {click_y})")
|
||
|
||
def get_latest_message_area(self, window_rect):
|
||
"""获取最新消息区域"""
|
||
chat_left = window_rect['left'] + int(window_rect['width'] * 0.30)
|
||
chat_right = window_rect['right'] - 20
|
||
chat_top = window_rect['top'] + int(window_rect['height'] * 0.15)
|
||
chat_bottom = window_rect['bottom'] - int(window_rect['height'] * 0.20)
|
||
|
||
return {
|
||
'left': chat_left,
|
||
'top': max(chat_bottom - 300, chat_top),
|
||
'right': chat_right,
|
||
'bottom': chat_bottom
|
||
}
|
||
|
||
def process_current_chat(self, window_rect, contact_key):
|
||
"""处理当前聊天"""
|
||
start_time = time.time()
|
||
msg_rect = self.get_latest_message_area(window_rect)
|
||
|
||
try:
|
||
screenshot = ImageGrab.grab(bbox=(
|
||
msg_rect['left'], msg_rect['top'],
|
||
msg_rect['right'], msg_rect['bottom']
|
||
))
|
||
|
||
# OCR识别
|
||
ocr_start = time.time()
|
||
img_byte_arr = BytesIO()
|
||
screenshot.save(img_byte_arr, format='PNG')
|
||
img_bytes = img_byte_arr.getvalue()
|
||
|
||
lines = self.ocr.recognize(img_bytes)
|
||
ocr_elapsed = time.time() - ocr_start
|
||
self.performance_stats['ocr_recognize'].append(ocr_elapsed)
|
||
|
||
if not lines:
|
||
return False
|
||
|
||
# 过滤消息(严格过滤)
|
||
import re
|
||
valid_lines = []
|
||
for line in lines:
|
||
if len(line) < 3:
|
||
continue
|
||
if re.match(r'^\d{1,2}:\d{2}$', line):
|
||
continue
|
||
if any(char in line for char in ['©', 'ò', 'v0', 'V0']):
|
||
continue
|
||
valid_lines.append(line)
|
||
|
||
if not valid_lines:
|
||
return False
|
||
|
||
latest = valid_lines[-1]
|
||
|
||
print(f" [识别] {latest}")
|
||
|
||
# 判断是否需要回复
|
||
if not self.should_reply(latest):
|
||
print(f" [跳过] 不需要回复")
|
||
return False
|
||
|
||
if not self.is_new_message(latest, contact_key):
|
||
print(f" [跳过] 已处理")
|
||
return False
|
||
|
||
print(f" [新消息] {latest}")
|
||
|
||
# 获取AI回复
|
||
reply = self.get_ai_reply(latest)
|
||
if reply:
|
||
print(f" [AI回复] {reply}")
|
||
self.send_message(reply, window_rect)
|
||
|
||
# 记录性能
|
||
total_elapsed = time.time() - start_time
|
||
self.performance_stats['total_process'].append(total_elapsed)
|
||
|
||
return True
|
||
|
||
return False
|
||
|
||
except Exception as e:
|
||
logger.error(f"处理聊天失败: {e}")
|
||
return False
|
||
|
||
def should_reply(self, message):
|
||
"""判断是否需要回复"""
|
||
if not message or len(message) < 2:
|
||
return False
|
||
|
||
for keyword in NO_REPLY_KEYWORDS:
|
||
if keyword in message:
|
||
return False
|
||
|
||
return True
|
||
|
||
def is_new_message(self, message, contact_key):
|
||
"""判断是否为新消息"""
|
||
msg_hash = hashlib.md5(message.encode()).hexdigest()
|
||
|
||
if contact_key in self.processed_messages:
|
||
if msg_hash in self.processed_messages[contact_key]:
|
||
return False
|
||
else:
|
||
self.processed_messages[contact_key] = set()
|
||
|
||
self.processed_messages[contact_key].add(msg_hash)
|
||
return True
|
||
|
||
def get_ai_reply(self, message):
|
||
"""获取AI回复"""
|
||
try:
|
||
response = requests.post(
|
||
BACKEND_URL,
|
||
json={'message': message},
|
||
timeout=10
|
||
)
|
||
|
||
if response.status_code == 200:
|
||
data = response.json()
|
||
return data.get('reply', '')
|
||
except Exception as e:
|
||
logger.error(f"AI回复失败: {e}")
|
||
|
||
return None
|
||
|
||
def send_message(self, text, window_rect):
|
||
"""发送消息"""
|
||
try:
|
||
original_clipboard = pyperclip.paste()
|
||
|
||
pyperclip.copy(text)
|
||
time.sleep(0.1)
|
||
|
||
pyautogui.hotkey('ctrl', 'v')
|
||
time.sleep(0.1)
|
||
|
||
pyautogui.press('enter')
|
||
time.sleep(0.3)
|
||
|
||
pyperclip.copy(original_clipboard)
|
||
|
||
print(f"✓ 已发送")
|
||
|
||
except Exception as e:
|
||
logger.error(f"发送消息失败: {e}")
|
||
|
||
def print_performance_stats(self):
|
||
"""打印性能统计"""
|
||
if not self.performance_stats['red_dot_detect']:
|
||
return
|
||
|
||
print("\n" + "="*70)
|
||
print("性能统计")
|
||
print("="*70)
|
||
|
||
avg_red_dot = np.mean(self.performance_stats['red_dot_detect']) * 1000
|
||
avg_ocr = np.mean(self.performance_stats['ocr_recognize']) * 1000 if self.performance_stats['ocr_recognize'] else 0
|
||
avg_total = np.mean(self.performance_stats['total_process']) * 1000 if self.performance_stats['total_process'] else 0
|
||
|
||
print(f"红点检测平均耗时: {avg_red_dot:.1f}ms")
|
||
print(f"OCR识别平均耗时: {avg_ocr:.1f}ms")
|
||
print(f"总处理平均耗时: {avg_total:.1f}ms")
|
||
print("="*70 + "\n")
|
||
|
||
def run_forever(self):
|
||
"""启动监听"""
|
||
print("=" * 70)
|
||
print("微信自动回复(优化版)")
|
||
print("=" * 70)
|
||
print("\n优化特性:")
|
||
print(" ✓ 快速红点检测(numpy矩阵运算,比逐像素快100倍)")
|
||
print(" ✓ 智能区域过滤(只检测特定X坐标范围)")
|
||
print(" ✓ 性能统计(详细的耗时分析)")
|
||
print("=" * 70)
|
||
print("\n监听中... 按 Ctrl+C 停止\n")
|
||
|
||
self.running = True
|
||
round_count = 0
|
||
|
||
# 初始化窗口
|
||
logger.info("正在查找微信窗口...")
|
||
window_rect = self.get_window_rect()
|
||
if not window_rect:
|
||
logger.error("未找到微信窗口")
|
||
return
|
||
logger.info("✓ 找到微信窗口")
|
||
|
||
while self.running:
|
||
try:
|
||
round_count += 1
|
||
print(f"\n{'='*70}")
|
||
print(f"[第 {round_count} 轮检查] {datetime.now().strftime('%H:%M:%S')}")
|
||
print(f"{'='*70}")
|
||
|
||
# 快速检测红点
|
||
red_dots = self.detect_red_dots_fast(window_rect)
|
||
|
||
if not red_dots:
|
||
print("未检测到新消息")
|
||
time.sleep(LOOP_INTERVAL)
|
||
continue
|
||
|
||
print(f"检测到 {len(red_dots)} 个新消息")
|
||
|
||
# 处理每个红点
|
||
for i, red_dot in enumerate(red_dots, 1):
|
||
print(f"\n[处理第 {i}/{len(red_dots)} 个新消息]")
|
||
|
||
self.click_contact_by_red_dot(red_dot, window_rect)
|
||
|
||
contact_key = f"{red_dot['x']}_{red_dot['y']}"
|
||
self.process_current_chat(window_rect, contact_key)
|
||
|
||
time.sleep(1)
|
||
|
||
# 每10轮打印一次性能统计
|
||
if round_count % 10 == 0:
|
||
self.print_performance_stats()
|
||
|
||
print(f"\n本轮处理完成,等待 {LOOP_INTERVAL} 秒...\n")
|
||
time.sleep(LOOP_INTERVAL)
|
||
|
||
except Exception as e:
|
||
logger.error(f"循环出错: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
time.sleep(3)
|
||
|
||
def stop(self):
|
||
self.running = False
|
||
|
||
|
||
if __name__ == "__main__":
|
||
try:
|
||
bot = WechatBotOptimized()
|
||
bot.run_forever()
|
||
except KeyboardInterrupt:
|
||
bot.stop()
|
||
print("\n程序已停止")
|
||
except Exception as e:
|
||
print(f"\n错误: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|