Files
figmar 81115dc23d 初始提交:识流 AI 助手项目
微信自动回复机器人,基于截图+OCR识别消息,支持关键词规则和 AI(OpenAI/DeepSeek/Dify)自动回复。
技术栈:PySide6 + Flask + Vue3 + RapidOCR + SQLite

注:OCR大模型文件(.onnx / .pdiparams)不纳入版本控制,需单独下载。

🤖 Generated with [Qoder][https://qoder.com]
2026-05-30 15:09:40 +08:00

315 lines
12 KiB
Python

from __future__ import annotations
from dataclasses import dataclass
from typing import Dict
import cv2
import numpy as np
from PIL import ImageGrab
from app.infrastructure.service.logging.log_service import log_event, new_trace_id
from app.infrastructure.service.wechat.config import (
CHAT_CAPTURE_HEIGHT,
CHAT_CAPTURE_LEFT_OFFSET,
CHAT_CAPTURE_TOP_OFFSET,
CHAT_CAPTURE_WIDTH,
CONTACT_LIST_BOTTOM_OFFSET,
CONTACT_LIST_LEFT_OFFSET,
CONTACT_LIST_TOP_OFFSET,
CONTACT_ROW_WIDTH,
SESSION_NAME_HEIGHT,
SESSION_NAME_LEFT_OFFSET,
SESSION_NAME_TOP_OFFSET,
SESSION_NAME_WIDTH,
TITLE_OCR_AREA_HEIGHT,
TITLE_OCR_AREA_LEFT_OFFSET,
TITLE_OCR_AREA_TOP_OFFSET,
TITLE_OCR_AREA_WIDTH,
)
@dataclass
class CaptureBox:
left: int
top: int
right: int
bottom: int
@property
def width(self) -> int:
return self.right - self.left
@property
def height(self) -> int:
return self.bottom - self.top
def as_tuple(self):
return (self.left, self.top, self.right, self.bottom)
def as_dict(self) -> Dict[str, int]:
return {
"left": self.left,
"top": self.top,
"right": self.right,
"bottom": self.bottom,
"width": self.width,
"height": self.height,
}
class ScreenshotService:
def build_box(self, left: int, top: int, width: int, height: int) -> CaptureBox:
return CaptureBox(
left=int(left),
top=int(top),
right=int(left + width),
bottom=int(top + height),
)
def build_box_from_window(self, window_rect: dict, left_offset: int, top_offset: int, width: int, height: int) -> CaptureBox:
return self.build_box(
left=window_rect["left"] + int(left_offset),
top=window_rect["top"] + int(top_offset),
width=int(width),
height=int(height),
)
def build_contact_list_box(self, window_rect: dict, left_offset: int, top_offset: int, width: int, bottom_offset: int) -> CaptureBox:
left = window_rect["left"] + int(left_offset)
top = window_rect["top"] + int(top_offset)
right = left + int(width)
bottom = window_rect["bottom"] - int(bottom_offset)
return CaptureBox(left=left, top=top, right=right, bottom=bottom)
def is_valid_box(self, box: CaptureBox) -> bool:
return box.right > box.left and box.bottom > box.top
def capture_box(self, left: int, top: int, width: int, height: int):
box = self.build_box(left, top, width, height)
if not self.is_valid_box(box):
raise ValueError(f"invalid capture box: {box.as_dict()}")
return ImageGrab.grab(bbox=box.as_tuple())
def capture_from_window(self, window_rect: dict, left_offset: int, top_offset: int, width: int, height: int):
box = self.build_box_from_window(window_rect, left_offset, top_offset, width, height)
if not self.is_valid_box(box):
raise ValueError(f"invalid window capture box: {box.as_dict()}")
return ImageGrab.grab(bbox=box.as_tuple())
def capture_contact_list(self, window_rect: dict, left_offset: int, top_offset: int, width: int, bottom_offset: int):
box = self.build_contact_list_box(window_rect, left_offset, top_offset, width, bottom_offset)
if not self.is_valid_box(box):
raise ValueError(f"invalid contact list box: {box.as_dict()}")
return ImageGrab.grab(bbox=box.as_tuple())
def get_contact_list_box(self, window_rect: dict) -> CaptureBox:
return self.build_contact_list_box(
window_rect,
left_offset=CONTACT_LIST_LEFT_OFFSET,
top_offset=CONTACT_LIST_TOP_OFFSET,
width=CONTACT_ROW_WIDTH,
bottom_offset=CONTACT_LIST_BOTTOM_OFFSET,
)
def capture_contact_list_default(self, window_rect: dict):
trace_id = new_trace_id("capture")
box = self.get_contact_list_box(window_rect)
log_event("INFO", "capture", "capture.contact_list", trace_id, "capture", "ok", "截图会话列表区域", extra=box.as_dict())
return self.capture_contact_list(
window_rect,
left_offset=CONTACT_LIST_LEFT_OFFSET,
top_offset=CONTACT_LIST_TOP_OFFSET,
width=CONTACT_ROW_WIDTH,
bottom_offset=CONTACT_LIST_BOTTOM_OFFSET,
)
def get_session_title_box(self, window_rect: dict) -> CaptureBox:
return self.build_box_from_window(
window_rect,
left_offset=TITLE_OCR_AREA_LEFT_OFFSET,
top_offset=TITLE_OCR_AREA_TOP_OFFSET,
width=TITLE_OCR_AREA_WIDTH,
height=TITLE_OCR_AREA_HEIGHT,
)
def capture_session_title(self, window_rect: dict):
trace_id = new_trace_id("capture")
box = self.get_session_title_box(window_rect)
log_event("INFO", "capture", "capture.session_title", trace_id, "capture", "ok", "截图会话标题区域", extra=box.as_dict())
return self.capture_area_from_box(box)
def get_chat_capture_box(self, window_rect: dict) -> CaptureBox:
base_height = max(120, CHAT_CAPTURE_HEIGHT)
max_height = max(base_height, window_rect["height"] - CHAT_CAPTURE_TOP_OFFSET)
return self.build_box_from_window(
window_rect,
left_offset=CHAT_CAPTURE_LEFT_OFFSET,
top_offset=CHAT_CAPTURE_TOP_OFFSET,
width=CHAT_CAPTURE_WIDTH,
height=max_height,
)
def capture_chat_area(self, window_rect: dict):
trace_id = new_trace_id("capture")
box = self.get_chat_capture_box(window_rect)
image = self.capture_area_from_box(box)
chat_bottom = self._detect_chat_bottom_by_binary_merge(image)
if chat_bottom is not None:
image = image.crop((0, 0, image.size[0], chat_bottom))
extra = box.as_dict()
extra["dynamic_bottom"] = chat_bottom or ""
extra["final_width"] = image.size[0]
extra["final_height"] = image.size[1]
log_event("INFO", "capture", "capture.chat_area", trace_id, "capture", "ok", "截图聊天区域", extra=extra)
return image
def crop_session_name(self, row_img):
return self.crop_from_image(
row_img,
left=SESSION_NAME_LEFT_OFFSET,
top=SESSION_NAME_TOP_OFFSET,
width=SESSION_NAME_WIDTH,
height=SESSION_NAME_HEIGHT,
)
def capture_area_from_box(self, box: CaptureBox):
if not self.is_valid_box(box):
raise ValueError(f"invalid capture box: {box.as_dict()}")
return ImageGrab.grab(bbox=box.as_tuple())
def _build_merged_binary_array(self, image_obj):
arr = np.array(image_obj.convert("RGB"))
gray = cv2.cvtColor(arr, cv2.COLOR_RGB2GRAY)
blurred = cv2.GaussianBlur(gray, (5, 5), 0)
_, binary_inv = cv2.threshold(blurred, 248, 255, cv2.THRESH_BINARY_INV)
adaptive_inv = cv2.adaptiveThreshold(
blurred,
255,
cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
cv2.THRESH_BINARY_INV,
13,
1,
)
merged = cv2.bitwise_or(binary_inv, adaptive_inv)
kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (5, 1))
merged = cv2.morphologyEx(merged, cv2.MORPH_CLOSE, kernel, iterations=1)
merged = cv2.morphologyEx(merged, cv2.MORPH_OPEN, cv2.getStructuringElement(cv2.MORPH_RECT, (2, 2)))
return merged
def _detect_chat_bottom_by_binary_merge(self, image_obj) -> int | None:
if image_obj is None:
return None
merged = self._build_merged_binary_array(image_obj)
img_h, img_w = merged.shape[:2]
start_y = max(0, int(img_h * 0.55))
roi = merged[start_y:, :]
if roi.size == 0:
return None
row_density = (roi > 0).mean(axis=1)
min_run = max(28, int(img_h * 0.045))
dense_limit = 0.018
run_start = None
candidates = []
for idx, density in enumerate(row_density.tolist() + [1.0]):
is_blank = density <= dense_limit
if is_blank and run_start is None:
run_start = idx
continue
if is_blank:
continue
if run_start is not None:
run_end = idx
if run_end - run_start >= min_run:
top = start_y + run_start
bottom = start_y + run_end
if top >= img_h * 0.58 and bottom <= img_h - 8:
candidates.append((top, bottom))
run_start = None
if not candidates:
return self._detect_chat_bottom(image_obj)
top, _ = candidates[-1]
bottom = max(120, int(top - 4))
if bottom >= img_h - 20:
return None
return bottom
def crop_from_image(self, image_obj, left: int, top: int, width: int, height: int):
if image_obj is None:
return None
img_w, img_h = image_obj.size
crop_left = min(max(0, int(left)), img_w)
crop_top = min(max(0, int(top)), img_h)
crop_right = min(img_w, crop_left + max(1, int(width)))
crop_bottom = min(img_h, crop_top + max(1, int(height)))
if crop_right <= crop_left or crop_bottom <= crop_top:
return None
return image_obj.crop((crop_left, crop_top, crop_right, crop_bottom))
def _detect_chat_bottom(self, image_obj) -> int | None:
if image_obj is None:
return None
img_rgb = np.array(image_obj.convert("RGB"))
if img_rgb.size == 0:
return None
gray = cv2.cvtColor(img_rgb, cv2.COLOR_RGB2GRAY)
img_h, img_w = gray.shape[:2]
start_y = max(0, int(img_h * 0.55))
focus = gray[start_y:, :]
if focus.size == 0:
return None
row_mean = focus.mean(axis=1)
row_std = focus.std(axis=1)
bright_mask = (row_mean >= 242) & (row_std <= 18)
run = self._find_last_run(bright_mask, min_len=max(18, int(img_h * 0.035)))
candidate_y = None
if run is not None:
run_top, run_bottom = run
candidate_y = start_y + run_top
edge_img = cv2.Canny(focus, 40, 120)
edge_strength = edge_img.mean(axis=1)
if len(row_mean) >= 2:
transition = np.abs(np.diff(row_mean, prepend=row_mean[0]))
else:
transition = np.zeros_like(row_mean)
score = edge_strength * 1.8 + transition * 2.4
score[: max(8, int(len(score) * 0.15))] = 0
if candidate_y is not None:
local_limit = max(0, candidate_y - start_y + 4)
score[local_limit:] = 0
best_idx = int(np.argmax(score)) if score.size else -1
best_score = float(score[best_idx]) if best_idx >= 0 else 0.0
edge_candidate = None
if best_idx >= 0 and best_score >= 12.0:
edge_candidate = start_y + best_idx
final_y = None
if candidate_y is not None and edge_candidate is not None:
if abs(candidate_y - edge_candidate) <= 28:
final_y = min(candidate_y, edge_candidate)
else:
final_y = candidate_y
else:
final_y = candidate_y if candidate_y is not None else edge_candidate
if final_y is None:
return None
final_y = max(120, min(img_h, int(final_y - 6)))
if final_y >= img_h - 20:
return None
return final_y
def _find_last_run(self, mask: np.ndarray, min_len: int) -> tuple[int, int] | None:
run_start = None
best = None
for idx, flag in enumerate(mask.tolist() + [False]):
if flag and run_start is None:
run_start = idx
continue
if flag:
continue
if run_start is None:
continue
run_len = idx - run_start
if run_len >= min_len:
best = (run_start, idx)
run_start = None
return best