from __future__ import annotations from dataclasses import dataclass from typing import Dict import cv2 import numpy as np from PIL import ImageGrab from app.infrastructure.service.logging.log_service import log_event, new_trace_id from app.infrastructure.service.wechat.config import ( CHAT_CAPTURE_HEIGHT, CHAT_CAPTURE_LEFT_OFFSET, CHAT_CAPTURE_TOP_OFFSET, CHAT_CAPTURE_WIDTH, CONTACT_LIST_BOTTOM_OFFSET, CONTACT_LIST_LEFT_OFFSET, CONTACT_LIST_TOP_OFFSET, CONTACT_ROW_WIDTH, SESSION_NAME_HEIGHT, SESSION_NAME_LEFT_OFFSET, SESSION_NAME_TOP_OFFSET, SESSION_NAME_WIDTH, TITLE_OCR_AREA_HEIGHT, TITLE_OCR_AREA_LEFT_OFFSET, TITLE_OCR_AREA_TOP_OFFSET, TITLE_OCR_AREA_WIDTH, ) @dataclass class CaptureBox: left: int top: int right: int bottom: int @property def width(self) -> int: return self.right - self.left @property def height(self) -> int: return self.bottom - self.top def as_tuple(self): return (self.left, self.top, self.right, self.bottom) def as_dict(self) -> Dict[str, int]: return { "left": self.left, "top": self.top, "right": self.right, "bottom": self.bottom, "width": self.width, "height": self.height, } class ScreenshotService: def build_box(self, left: int, top: int, width: int, height: int) -> CaptureBox: return CaptureBox( left=int(left), top=int(top), right=int(left + width), bottom=int(top + height), ) def build_box_from_window(self, window_rect: dict, left_offset: int, top_offset: int, width: int, height: int) -> CaptureBox: return self.build_box( left=window_rect["left"] + int(left_offset), top=window_rect["top"] + int(top_offset), width=int(width), height=int(height), ) def build_contact_list_box(self, window_rect: dict, left_offset: int, top_offset: int, width: int, bottom_offset: int) -> CaptureBox: left = window_rect["left"] + int(left_offset) top = window_rect["top"] + int(top_offset) right = left + int(width) bottom = window_rect["bottom"] - int(bottom_offset) return CaptureBox(left=left, top=top, right=right, bottom=bottom) def is_valid_box(self, box: CaptureBox) -> bool: return box.right > box.left and box.bottom > box.top def capture_box(self, left: int, top: int, width: int, height: int): box = self.build_box(left, top, width, height) if not self.is_valid_box(box): raise ValueError(f"invalid capture box: {box.as_dict()}") return ImageGrab.grab(bbox=box.as_tuple()) def capture_from_window(self, window_rect: dict, left_offset: int, top_offset: int, width: int, height: int): box = self.build_box_from_window(window_rect, left_offset, top_offset, width, height) if not self.is_valid_box(box): raise ValueError(f"invalid window capture box: {box.as_dict()}") return ImageGrab.grab(bbox=box.as_tuple()) def capture_contact_list(self, window_rect: dict, left_offset: int, top_offset: int, width: int, bottom_offset: int): box = self.build_contact_list_box(window_rect, left_offset, top_offset, width, bottom_offset) if not self.is_valid_box(box): raise ValueError(f"invalid contact list box: {box.as_dict()}") return ImageGrab.grab(bbox=box.as_tuple()) def get_contact_list_box(self, window_rect: dict) -> CaptureBox: return self.build_contact_list_box( window_rect, left_offset=CONTACT_LIST_LEFT_OFFSET, top_offset=CONTACT_LIST_TOP_OFFSET, width=CONTACT_ROW_WIDTH, bottom_offset=CONTACT_LIST_BOTTOM_OFFSET, ) def capture_contact_list_default(self, window_rect: dict): trace_id = new_trace_id("capture") box = self.get_contact_list_box(window_rect) log_event("INFO", "capture", "capture.contact_list", trace_id, "capture", "ok", "截图会话列表区域", extra=box.as_dict()) return self.capture_contact_list( window_rect, left_offset=CONTACT_LIST_LEFT_OFFSET, top_offset=CONTACT_LIST_TOP_OFFSET, width=CONTACT_ROW_WIDTH, bottom_offset=CONTACT_LIST_BOTTOM_OFFSET, ) def get_session_title_box(self, window_rect: dict) -> CaptureBox: return self.build_box_from_window( window_rect, left_offset=TITLE_OCR_AREA_LEFT_OFFSET, top_offset=TITLE_OCR_AREA_TOP_OFFSET, width=TITLE_OCR_AREA_WIDTH, height=TITLE_OCR_AREA_HEIGHT, ) def capture_session_title(self, window_rect: dict): trace_id = new_trace_id("capture") box = self.get_session_title_box(window_rect) log_event("INFO", "capture", "capture.session_title", trace_id, "capture", "ok", "截图会话标题区域", extra=box.as_dict()) return self.capture_area_from_box(box) def get_chat_capture_box(self, window_rect: dict) -> CaptureBox: base_height = max(120, CHAT_CAPTURE_HEIGHT) max_height = max(base_height, window_rect["height"] - CHAT_CAPTURE_TOP_OFFSET) return self.build_box_from_window( window_rect, left_offset=CHAT_CAPTURE_LEFT_OFFSET, top_offset=CHAT_CAPTURE_TOP_OFFSET, width=CHAT_CAPTURE_WIDTH, height=max_height, ) def capture_chat_area(self, window_rect: dict): trace_id = new_trace_id("capture") box = self.get_chat_capture_box(window_rect) image = self.capture_area_from_box(box) chat_bottom = self._detect_chat_bottom_by_binary_merge(image) if chat_bottom is not None: image = image.crop((0, 0, image.size[0], chat_bottom)) extra = box.as_dict() extra["dynamic_bottom"] = chat_bottom or "" extra["final_width"] = image.size[0] extra["final_height"] = image.size[1] log_event("INFO", "capture", "capture.chat_area", trace_id, "capture", "ok", "截图聊天区域", extra=extra) return image def crop_session_name(self, row_img): return self.crop_from_image( row_img, left=SESSION_NAME_LEFT_OFFSET, top=SESSION_NAME_TOP_OFFSET, width=SESSION_NAME_WIDTH, height=SESSION_NAME_HEIGHT, ) def capture_area_from_box(self, box: CaptureBox): if not self.is_valid_box(box): raise ValueError(f"invalid capture box: {box.as_dict()}") return ImageGrab.grab(bbox=box.as_tuple()) def _build_merged_binary_array(self, image_obj): arr = np.array(image_obj.convert("RGB")) gray = cv2.cvtColor(arr, cv2.COLOR_RGB2GRAY) blurred = cv2.GaussianBlur(gray, (5, 5), 0) _, binary_inv = cv2.threshold(blurred, 248, 255, cv2.THRESH_BINARY_INV) adaptive_inv = cv2.adaptiveThreshold( blurred, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY_INV, 13, 1, ) merged = cv2.bitwise_or(binary_inv, adaptive_inv) kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (5, 1)) merged = cv2.morphologyEx(merged, cv2.MORPH_CLOSE, kernel, iterations=1) merged = cv2.morphologyEx(merged, cv2.MORPH_OPEN, cv2.getStructuringElement(cv2.MORPH_RECT, (2, 2))) return merged def _detect_chat_bottom_by_binary_merge(self, image_obj) -> int | None: if image_obj is None: return None merged = self._build_merged_binary_array(image_obj) img_h, img_w = merged.shape[:2] start_y = max(0, int(img_h * 0.55)) roi = merged[start_y:, :] if roi.size == 0: return None row_density = (roi > 0).mean(axis=1) min_run = max(28, int(img_h * 0.045)) dense_limit = 0.018 run_start = None candidates = [] for idx, density in enumerate(row_density.tolist() + [1.0]): is_blank = density <= dense_limit if is_blank and run_start is None: run_start = idx continue if is_blank: continue if run_start is not None: run_end = idx if run_end - run_start >= min_run: top = start_y + run_start bottom = start_y + run_end if top >= img_h * 0.58 and bottom <= img_h - 8: candidates.append((top, bottom)) run_start = None if not candidates: return self._detect_chat_bottom(image_obj) top, _ = candidates[-1] bottom = max(120, int(top - 4)) if bottom >= img_h - 20: return None return bottom def crop_from_image(self, image_obj, left: int, top: int, width: int, height: int): if image_obj is None: return None img_w, img_h = image_obj.size crop_left = min(max(0, int(left)), img_w) crop_top = min(max(0, int(top)), img_h) crop_right = min(img_w, crop_left + max(1, int(width))) crop_bottom = min(img_h, crop_top + max(1, int(height))) if crop_right <= crop_left or crop_bottom <= crop_top: return None return image_obj.crop((crop_left, crop_top, crop_right, crop_bottom)) def _detect_chat_bottom(self, image_obj) -> int | None: if image_obj is None: return None img_rgb = np.array(image_obj.convert("RGB")) if img_rgb.size == 0: return None gray = cv2.cvtColor(img_rgb, cv2.COLOR_RGB2GRAY) img_h, img_w = gray.shape[:2] start_y = max(0, int(img_h * 0.55)) focus = gray[start_y:, :] if focus.size == 0: return None row_mean = focus.mean(axis=1) row_std = focus.std(axis=1) bright_mask = (row_mean >= 242) & (row_std <= 18) run = self._find_last_run(bright_mask, min_len=max(18, int(img_h * 0.035))) candidate_y = None if run is not None: run_top, run_bottom = run candidate_y = start_y + run_top edge_img = cv2.Canny(focus, 40, 120) edge_strength = edge_img.mean(axis=1) if len(row_mean) >= 2: transition = np.abs(np.diff(row_mean, prepend=row_mean[0])) else: transition = np.zeros_like(row_mean) score = edge_strength * 1.8 + transition * 2.4 score[: max(8, int(len(score) * 0.15))] = 0 if candidate_y is not None: local_limit = max(0, candidate_y - start_y + 4) score[local_limit:] = 0 best_idx = int(np.argmax(score)) if score.size else -1 best_score = float(score[best_idx]) if best_idx >= 0 else 0.0 edge_candidate = None if best_idx >= 0 and best_score >= 12.0: edge_candidate = start_y + best_idx final_y = None if candidate_y is not None and edge_candidate is not None: if abs(candidate_y - edge_candidate) <= 28: final_y = min(candidate_y, edge_candidate) else: final_y = candidate_y else: final_y = candidate_y if candidate_y is not None else edge_candidate if final_y is None: return None final_y = max(120, min(img_h, int(final_y - 6))) if final_y >= img_h - 20: return None return final_y def _find_last_run(self, mask: np.ndarray, min_len: int) -> tuple[int, int] | None: run_start = None best = None for idx, flag in enumerate(mask.tolist() + [False]): if flag and run_start is None: run_start = idx continue if flag: continue if run_start is None: continue run_len = idx - run_start if run_len >= min_len: best = (run_start, idx) run_start = None return best