Files
ai-shiliu/app/infrastructure/service/wechat/unread_session_analyzer.py

259 lines
11 KiB
Python
Raw Permalink Normal View History

from __future__ import annotations
import cv2
import numpy as np
from app.infrastructure.service.logging.log_service import log_event, new_trace_id
from app.infrastructure.service.wechat.config import CONTACT_ROW_HEIGHT
class UnreadSessionAnalyzer:
def __init__(self):
pass
def detect_red_dots(self, contact_rect: dict, screenshot) -> list[dict]:
trace_id = new_trace_id("bot")
try:
img_np = np.array(screenshot)
hsv = cv2.cvtColor(cv2.cvtColor(img_np, cv2.COLOR_RGB2BGR), cv2.COLOR_BGR2HSV)
mask = cv2.inRange(hsv, np.array([0, 80, 80]), np.array([12, 255, 255])) + cv2.inRange(hsv, np.array([168, 80, 80]), np.array([180, 255, 255]))
contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
contact_width = contact_rect['right'] - contact_rect['left']
red_dots_raw = []
for contour in contours:
area = cv2.contourArea(contour)
if 12 < area < 220:
perimeter = cv2.arcLength(contour, True)
if perimeter <= 0:
continue
circularity = 4 * np.pi * area / (perimeter * perimeter)
if circularity <= 0.45:
continue
moments = cv2.moments(contour)
if moments['m00'] == 0:
continue
cx = int(moments['m10'] / moments['m00'])
cy = int(moments['m01'] / moments['m00'])
if cx > contact_width * 0.1:
red_dots_raw.append({'x': contact_rect['left'] + cx, 'y': contact_rect['top'] + cy, 'rel_y': cy})
snapped_map = {}
for dot in red_dots_raw:
row_idx = int(round((dot['y'] - contact_rect['top']) / max(1, CONTACT_ROW_HEIGHT)))
snapped_y = int(contact_rect['top'] + row_idx * CONTACT_ROW_HEIGHT + CONTACT_ROW_HEIGHT // 2)
if row_idx not in snapped_map:
snapped_map[row_idx] = {'x': dot['x'], 'y': snapped_y, 'row_idx': row_idx}
red_dots_final = sorted(snapped_map.values(), key=lambda d: d['y'])
log_event("INFO", "bot", "bot.unread.detect", trace_id, "detect", "ok", "红点检测完成", extra={"dot_count": len(red_dots_final)})
return red_dots_final
except Exception as e:
log_event("ERROR", "bot", "bot.unread.detect", trace_id, "detect", "failed", "红点检测异常", reason="detect_error", extra={"error": str(e)})
return []
def row_has_red_dot(self, row_img, relaxed: bool = False) -> bool:
try:
row_np = np.array(row_img)
h, w = row_np.shape[:2]
if h < 30 or w < 100:
return False
margin_left = max(6, int(w * 0.012))
avatar_size = int(h * 0.72)
avatar_y = (h - avatar_size) // 2
avatar_x = margin_left
avatar_cx = avatar_x + avatar_size / 2.0
avatar_cy = avatar_y + avatar_size / 2.0
avatar_r = avatar_size * 0.50
probe_x1 = avatar_x + int(avatar_size * 0.42)
probe_y1 = max(0, avatar_y - int(avatar_size * 0.10))
probe_x2 = min(w, avatar_x + int(avatar_size * 1.00))
probe_y2 = min(h, avatar_y + int(avatar_size * 0.36))
if probe_x2 <= probe_x1 or probe_y2 <= probe_y1:
return False
probe = row_np[probe_y1:probe_y2, probe_x1:probe_x2]
if probe.size == 0:
return False
probe_hsv = cv2.cvtColor(probe, cv2.COLOR_RGB2HSV)
mask1 = cv2.inRange(probe_hsv, np.array([0, 115, 125]), np.array([12, 255, 255]))
mask2 = cv2.inRange(probe_hsv, np.array([168, 115, 125]), np.array([180, 255, 255]))
mask = cv2.bitwise_or(mask1, mask2)
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (3, 3))
mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel)
mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, kernel)
contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
if not contours:
self.debug_log(f"row_red w={w} h={h} candidates=0")
return False
candidates = []
pw = probe_x2 - probe_x1
ph = probe_y2 - probe_y1
for cnt in contours:
area = cv2.contourArea(cnt)
if not (8 <= area <= 260):
continue
x, y, cw, ch = cv2.boundingRect(cnt)
peri = cv2.arcLength(cnt, True)
if peri <= 0:
continue
circ = 4 * np.pi * area / (peri * peri)
ar = max(cw, ch) / max(1, min(cw, ch))
cx = x + cw / 2.0
cy = y + ch / 2.0
gx = probe_x1 + cx
gy = probe_y1 + cy
in_upper_right = cx > pw * 0.30 and cx < pw * 0.90 and cy < ph * 0.68
near_avatar_corner = (
gx >= avatar_x + avatar_size * 0.66 and
gx <= avatar_x + avatar_size * 1.00 and
gy >= avatar_y - avatar_size * 0.06 and
gy <= avatar_y + avatar_size * 0.24
)
if not (in_upper_right and near_avatar_corner):
continue
comp_mask = np.zeros(mask.shape, dtype=np.uint8)
cv2.drawContours(comp_mask, [cnt], -1, 255, thickness=-1)
ys, xs = np.where(comp_mask > 0)
if len(xs) == 0:
continue
global_xs = xs + probe_x1
global_ys = ys + probe_y1
d2 = (global_xs - avatar_cx) ** 2 + (global_ys - avatar_cy) ** 2
outside_ratio = float(np.count_nonzero(d2 > (avatar_r * 0.92) ** 2)) / len(d2)
min_area = 10 if relaxed else 14
min_small_outside = 0.18 if relaxed else 0.25
min_small_circ = 0.72 if relaxed else 0.82
min_match_score = 7 if relaxed else 8
min_match_outside = 0.10 if relaxed else 0.15
if area < min_area:
continue
if area < 20 and outside_ratio < min_small_outside:
continue
if gy > avatar_y + avatar_size * 0.24:
continue
if area < 20 and circ < min_small_circ:
continue
if area < 20 and ar > 1.20:
continue
if area >= 120:
shape_ok = circ > 0.26 and ar < 2.6
elif area >= 28:
shape_ok = circ > 0.45 and ar < 1.9
else:
shape_ok = circ > 0.82 and ar <= 1.20 and outside_ratio >= 0.25
if not shape_ok:
continue
white_ratio = 0.0
if cw >= 7 and ch >= 7:
inner = probe[max(0, y):min(probe.shape[0], y + ch), max(0, x):min(probe.shape[1], x + cw)]
if inner.size > 0:
gray = cv2.cvtColor(inner, cv2.COLOR_RGB2GRAY)
white_ratio = np.count_nonzero(gray > 190) / gray.size
score = 3
if area >= 14:
score += 2
if circ > 0.85:
score += 2
elif circ > 0.70:
score += 1
if ar <= 1.15:
score += 2
elif ar <= 1.35:
score += 1
if outside_ratio >= 0.35:
score += 4
elif outside_ratio >= 0.25:
score += 3
elif outside_ratio >= 0.15:
score += 1
if 0.05 <= white_ratio <= 0.60:
score += 1
candidates.append({
'score': score,
'area': area,
'circ': circ,
'ar': ar,
'outside_ratio': outside_ratio,
'white_ratio': white_ratio,
'center': (gx, gy),
'bbox': (probe_x1 + x, probe_y1 + y, cw, ch),
'min_match_score': min_match_score,
'min_match_outside': min_match_outside,
})
if not candidates:
return False
best = max(candidates, key=lambda x: x['score'])
matched = best['score'] >= best['min_match_score'] and best['outside_ratio'] >= best['min_match_outside']
return matched
except Exception as e:
return False
def row_has_red_dot_weak(self, row_img) -> bool:
return self.row_has_red_dot(row_img, relaxed=True)
def get_all_sessions_with_unread(self, contact_rect: dict, screenshot, round_count: int, save_debug_image: Callable | None = None) -> tuple[list[dict], list[dict]]:
trace_id = new_trace_id("bot")
red_dots = self.detect_red_dots(contact_rect, screenshot)
red_y_list = [dot['y'] for dot in red_dots]
row_count = max(1, int((contact_rect['bottom'] - contact_rect['top']) / max(1, CONTACT_ROW_HEIGHT)))
sessions = []
for row_idx in range(row_count):
top = int(row_idx * CONTACT_ROW_HEIGHT)
bottom = int(min((row_idx + 1) * CONTACT_ROW_HEIGHT, screenshot.height))
if bottom <= top:
continue
row_img = screenshot.crop((0, top, screenshot.width, bottom))
center_y = int(contact_rect['top'] + row_idx * CONTACT_ROW_HEIGHT + CONTACT_ROW_HEIGHT // 2)
has_red_by_global = any(abs(center_y - y) <= max(7, CONTACT_ROW_HEIGHT // 4) for y in red_y_list)
has_red_by_row = self.row_has_red_dot(row_img)
has_red_by_row_weak = self.row_has_red_dot_weak(row_img) if has_red_by_global and not has_red_by_row else has_red_by_row
has_red = has_red_by_row or (has_red_by_global and has_red_by_row_weak)
row_name = f"round_{round_count:04d}_row_{row_idx:03d}.png"
if save_debug_image:
save_debug_image(row_img, f"sessions/all/{row_name}")
if has_red:
save_debug_image(row_img, f"sessions/unread/{row_name}")
sessions.append({
'row_idx': row_idx,
'has_red_dot': has_red,
'has_red_by_global': has_red_by_global,
'has_red_by_row': has_red_by_row,
'has_red_by_row_weak': has_red_by_row_weak,
'click_x': int((contact_rect['left'] + contact_rect['right']) // 2),
'click_y': center_y,
'row_img': row_img.copy(),
})
unread_sessions = [s for s in sessions if s['has_red_dot']]
global_hits = sum(1 for s in sessions if s['has_red_by_global'])
row_hits = sum(1 for s in sessions if s['has_red_by_row'])
row_weak_hits = sum(1 for s in sessions if s['has_red_by_row_weak'])
log_event("INFO", "bot", "bot.unread.scan", trace_id, "scan", "ok", "未读会话扫描完成", extra={"round": int(round_count), "rows": len(sessions), "unread": len(unread_sessions), "global_hits": global_hits, "row_hits": row_hits, "row_weak_hits": row_weak_hits})
return sessions, unread_sessions