259 lines
11 KiB
Python
259 lines
11 KiB
Python
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
import cv2
|
||
|
|
import numpy as np
|
||
|
|
|
||
|
|
from app.infrastructure.service.logging.log_service import log_event, new_trace_id
|
||
|
|
from app.infrastructure.service.wechat.config import CONTACT_ROW_HEIGHT
|
||
|
|
|
||
|
|
|
||
|
|
|
||
|
|
class UnreadSessionAnalyzer:
|
||
|
|
def __init__(self):
|
||
|
|
pass
|
||
|
|
|
||
|
|
def detect_red_dots(self, contact_rect: dict, screenshot) -> list[dict]:
|
||
|
|
trace_id = new_trace_id("bot")
|
||
|
|
try:
|
||
|
|
img_np = np.array(screenshot)
|
||
|
|
hsv = cv2.cvtColor(cv2.cvtColor(img_np, cv2.COLOR_RGB2BGR), cv2.COLOR_BGR2HSV)
|
||
|
|
mask = cv2.inRange(hsv, np.array([0, 80, 80]), np.array([12, 255, 255])) + cv2.inRange(hsv, np.array([168, 80, 80]), np.array([180, 255, 255]))
|
||
|
|
contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
|
||
|
|
|
||
|
|
contact_width = contact_rect['right'] - contact_rect['left']
|
||
|
|
red_dots_raw = []
|
||
|
|
for contour in contours:
|
||
|
|
area = cv2.contourArea(contour)
|
||
|
|
if 12 < area < 220:
|
||
|
|
perimeter = cv2.arcLength(contour, True)
|
||
|
|
if perimeter <= 0:
|
||
|
|
continue
|
||
|
|
circularity = 4 * np.pi * area / (perimeter * perimeter)
|
||
|
|
if circularity <= 0.45:
|
||
|
|
continue
|
||
|
|
moments = cv2.moments(contour)
|
||
|
|
if moments['m00'] == 0:
|
||
|
|
continue
|
||
|
|
cx = int(moments['m10'] / moments['m00'])
|
||
|
|
cy = int(moments['m01'] / moments['m00'])
|
||
|
|
if cx > contact_width * 0.1:
|
||
|
|
red_dots_raw.append({'x': contact_rect['left'] + cx, 'y': contact_rect['top'] + cy, 'rel_y': cy})
|
||
|
|
|
||
|
|
snapped_map = {}
|
||
|
|
for dot in red_dots_raw:
|
||
|
|
row_idx = int(round((dot['y'] - contact_rect['top']) / max(1, CONTACT_ROW_HEIGHT)))
|
||
|
|
snapped_y = int(contact_rect['top'] + row_idx * CONTACT_ROW_HEIGHT + CONTACT_ROW_HEIGHT // 2)
|
||
|
|
if row_idx not in snapped_map:
|
||
|
|
snapped_map[row_idx] = {'x': dot['x'], 'y': snapped_y, 'row_idx': row_idx}
|
||
|
|
|
||
|
|
red_dots_final = sorted(snapped_map.values(), key=lambda d: d['y'])
|
||
|
|
log_event("INFO", "bot", "bot.unread.detect", trace_id, "detect", "ok", "红点检测完成", extra={"dot_count": len(red_dots_final)})
|
||
|
|
return red_dots_final
|
||
|
|
except Exception as e:
|
||
|
|
log_event("ERROR", "bot", "bot.unread.detect", trace_id, "detect", "failed", "红点检测异常", reason="detect_error", extra={"error": str(e)})
|
||
|
|
return []
|
||
|
|
|
||
|
|
def row_has_red_dot(self, row_img, relaxed: bool = False) -> bool:
|
||
|
|
try:
|
||
|
|
row_np = np.array(row_img)
|
||
|
|
h, w = row_np.shape[:2]
|
||
|
|
if h < 30 or w < 100:
|
||
|
|
return False
|
||
|
|
|
||
|
|
margin_left = max(6, int(w * 0.012))
|
||
|
|
avatar_size = int(h * 0.72)
|
||
|
|
avatar_y = (h - avatar_size) // 2
|
||
|
|
avatar_x = margin_left
|
||
|
|
|
||
|
|
avatar_cx = avatar_x + avatar_size / 2.0
|
||
|
|
avatar_cy = avatar_y + avatar_size / 2.0
|
||
|
|
avatar_r = avatar_size * 0.50
|
||
|
|
|
||
|
|
probe_x1 = avatar_x + int(avatar_size * 0.42)
|
||
|
|
probe_y1 = max(0, avatar_y - int(avatar_size * 0.10))
|
||
|
|
probe_x2 = min(w, avatar_x + int(avatar_size * 1.00))
|
||
|
|
probe_y2 = min(h, avatar_y + int(avatar_size * 0.36))
|
||
|
|
|
||
|
|
if probe_x2 <= probe_x1 or probe_y2 <= probe_y1:
|
||
|
|
return False
|
||
|
|
|
||
|
|
probe = row_np[probe_y1:probe_y2, probe_x1:probe_x2]
|
||
|
|
if probe.size == 0:
|
||
|
|
return False
|
||
|
|
|
||
|
|
probe_hsv = cv2.cvtColor(probe, cv2.COLOR_RGB2HSV)
|
||
|
|
mask1 = cv2.inRange(probe_hsv, np.array([0, 115, 125]), np.array([12, 255, 255]))
|
||
|
|
mask2 = cv2.inRange(probe_hsv, np.array([168, 115, 125]), np.array([180, 255, 255]))
|
||
|
|
mask = cv2.bitwise_or(mask1, mask2)
|
||
|
|
|
||
|
|
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (3, 3))
|
||
|
|
mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel)
|
||
|
|
mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, kernel)
|
||
|
|
|
||
|
|
contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
|
||
|
|
if not contours:
|
||
|
|
self.debug_log(f"row_red w={w} h={h} candidates=0")
|
||
|
|
return False
|
||
|
|
|
||
|
|
candidates = []
|
||
|
|
pw = probe_x2 - probe_x1
|
||
|
|
ph = probe_y2 - probe_y1
|
||
|
|
|
||
|
|
for cnt in contours:
|
||
|
|
area = cv2.contourArea(cnt)
|
||
|
|
if not (8 <= area <= 260):
|
||
|
|
continue
|
||
|
|
|
||
|
|
x, y, cw, ch = cv2.boundingRect(cnt)
|
||
|
|
peri = cv2.arcLength(cnt, True)
|
||
|
|
if peri <= 0:
|
||
|
|
continue
|
||
|
|
|
||
|
|
circ = 4 * np.pi * area / (peri * peri)
|
||
|
|
ar = max(cw, ch) / max(1, min(cw, ch))
|
||
|
|
|
||
|
|
cx = x + cw / 2.0
|
||
|
|
cy = y + ch / 2.0
|
||
|
|
gx = probe_x1 + cx
|
||
|
|
gy = probe_y1 + cy
|
||
|
|
|
||
|
|
in_upper_right = cx > pw * 0.30 and cx < pw * 0.90 and cy < ph * 0.68
|
||
|
|
near_avatar_corner = (
|
||
|
|
gx >= avatar_x + avatar_size * 0.66 and
|
||
|
|
gx <= avatar_x + avatar_size * 1.00 and
|
||
|
|
gy >= avatar_y - avatar_size * 0.06 and
|
||
|
|
gy <= avatar_y + avatar_size * 0.24
|
||
|
|
)
|
||
|
|
if not (in_upper_right and near_avatar_corner):
|
||
|
|
continue
|
||
|
|
|
||
|
|
comp_mask = np.zeros(mask.shape, dtype=np.uint8)
|
||
|
|
cv2.drawContours(comp_mask, [cnt], -1, 255, thickness=-1)
|
||
|
|
ys, xs = np.where(comp_mask > 0)
|
||
|
|
if len(xs) == 0:
|
||
|
|
continue
|
||
|
|
|
||
|
|
global_xs = xs + probe_x1
|
||
|
|
global_ys = ys + probe_y1
|
||
|
|
d2 = (global_xs - avatar_cx) ** 2 + (global_ys - avatar_cy) ** 2
|
||
|
|
outside_ratio = float(np.count_nonzero(d2 > (avatar_r * 0.92) ** 2)) / len(d2)
|
||
|
|
|
||
|
|
min_area = 10 if relaxed else 14
|
||
|
|
min_small_outside = 0.18 if relaxed else 0.25
|
||
|
|
min_small_circ = 0.72 if relaxed else 0.82
|
||
|
|
min_match_score = 7 if relaxed else 8
|
||
|
|
min_match_outside = 0.10 if relaxed else 0.15
|
||
|
|
|
||
|
|
if area < min_area:
|
||
|
|
continue
|
||
|
|
if area < 20 and outside_ratio < min_small_outside:
|
||
|
|
continue
|
||
|
|
if gy > avatar_y + avatar_size * 0.24:
|
||
|
|
continue
|
||
|
|
if area < 20 and circ < min_small_circ:
|
||
|
|
continue
|
||
|
|
if area < 20 and ar > 1.20:
|
||
|
|
continue
|
||
|
|
|
||
|
|
if area >= 120:
|
||
|
|
shape_ok = circ > 0.26 and ar < 2.6
|
||
|
|
elif area >= 28:
|
||
|
|
shape_ok = circ > 0.45 and ar < 1.9
|
||
|
|
else:
|
||
|
|
shape_ok = circ > 0.82 and ar <= 1.20 and outside_ratio >= 0.25
|
||
|
|
if not shape_ok:
|
||
|
|
continue
|
||
|
|
|
||
|
|
white_ratio = 0.0
|
||
|
|
if cw >= 7 and ch >= 7:
|
||
|
|
inner = probe[max(0, y):min(probe.shape[0], y + ch), max(0, x):min(probe.shape[1], x + cw)]
|
||
|
|
if inner.size > 0:
|
||
|
|
gray = cv2.cvtColor(inner, cv2.COLOR_RGB2GRAY)
|
||
|
|
white_ratio = np.count_nonzero(gray > 190) / gray.size
|
||
|
|
|
||
|
|
score = 3
|
||
|
|
if area >= 14:
|
||
|
|
score += 2
|
||
|
|
if circ > 0.85:
|
||
|
|
score += 2
|
||
|
|
elif circ > 0.70:
|
||
|
|
score += 1
|
||
|
|
if ar <= 1.15:
|
||
|
|
score += 2
|
||
|
|
elif ar <= 1.35:
|
||
|
|
score += 1
|
||
|
|
if outside_ratio >= 0.35:
|
||
|
|
score += 4
|
||
|
|
elif outside_ratio >= 0.25:
|
||
|
|
score += 3
|
||
|
|
elif outside_ratio >= 0.15:
|
||
|
|
score += 1
|
||
|
|
if 0.05 <= white_ratio <= 0.60:
|
||
|
|
score += 1
|
||
|
|
|
||
|
|
candidates.append({
|
||
|
|
'score': score,
|
||
|
|
'area': area,
|
||
|
|
'circ': circ,
|
||
|
|
'ar': ar,
|
||
|
|
'outside_ratio': outside_ratio,
|
||
|
|
'white_ratio': white_ratio,
|
||
|
|
'center': (gx, gy),
|
||
|
|
'bbox': (probe_x1 + x, probe_y1 + y, cw, ch),
|
||
|
|
'min_match_score': min_match_score,
|
||
|
|
'min_match_outside': min_match_outside,
|
||
|
|
})
|
||
|
|
|
||
|
|
if not candidates:
|
||
|
|
return False
|
||
|
|
|
||
|
|
best = max(candidates, key=lambda x: x['score'])
|
||
|
|
matched = best['score'] >= best['min_match_score'] and best['outside_ratio'] >= best['min_match_outside']
|
||
|
|
return matched
|
||
|
|
except Exception as e:
|
||
|
|
return False
|
||
|
|
|
||
|
|
def row_has_red_dot_weak(self, row_img) -> bool:
|
||
|
|
return self.row_has_red_dot(row_img, relaxed=True)
|
||
|
|
|
||
|
|
def get_all_sessions_with_unread(self, contact_rect: dict, screenshot, round_count: int, save_debug_image: Callable | None = None) -> tuple[list[dict], list[dict]]:
|
||
|
|
trace_id = new_trace_id("bot")
|
||
|
|
red_dots = self.detect_red_dots(contact_rect, screenshot)
|
||
|
|
red_y_list = [dot['y'] for dot in red_dots]
|
||
|
|
row_count = max(1, int((contact_rect['bottom'] - contact_rect['top']) / max(1, CONTACT_ROW_HEIGHT)))
|
||
|
|
sessions = []
|
||
|
|
|
||
|
|
for row_idx in range(row_count):
|
||
|
|
top = int(row_idx * CONTACT_ROW_HEIGHT)
|
||
|
|
bottom = int(min((row_idx + 1) * CONTACT_ROW_HEIGHT, screenshot.height))
|
||
|
|
if bottom <= top:
|
||
|
|
continue
|
||
|
|
row_img = screenshot.crop((0, top, screenshot.width, bottom))
|
||
|
|
center_y = int(contact_rect['top'] + row_idx * CONTACT_ROW_HEIGHT + CONTACT_ROW_HEIGHT // 2)
|
||
|
|
has_red_by_global = any(abs(center_y - y) <= max(7, CONTACT_ROW_HEIGHT // 4) for y in red_y_list)
|
||
|
|
has_red_by_row = self.row_has_red_dot(row_img)
|
||
|
|
has_red_by_row_weak = self.row_has_red_dot_weak(row_img) if has_red_by_global and not has_red_by_row else has_red_by_row
|
||
|
|
has_red = has_red_by_row or (has_red_by_global and has_red_by_row_weak)
|
||
|
|
row_name = f"round_{round_count:04d}_row_{row_idx:03d}.png"
|
||
|
|
if save_debug_image:
|
||
|
|
save_debug_image(row_img, f"sessions/all/{row_name}")
|
||
|
|
if has_red:
|
||
|
|
save_debug_image(row_img, f"sessions/unread/{row_name}")
|
||
|
|
sessions.append({
|
||
|
|
'row_idx': row_idx,
|
||
|
|
'has_red_dot': has_red,
|
||
|
|
'has_red_by_global': has_red_by_global,
|
||
|
|
'has_red_by_row': has_red_by_row,
|
||
|
|
'has_red_by_row_weak': has_red_by_row_weak,
|
||
|
|
'click_x': int((contact_rect['left'] + contact_rect['right']) // 2),
|
||
|
|
'click_y': center_y,
|
||
|
|
'row_img': row_img.copy(),
|
||
|
|
})
|
||
|
|
|
||
|
|
unread_sessions = [s for s in sessions if s['has_red_dot']]
|
||
|
|
global_hits = sum(1 for s in sessions if s['has_red_by_global'])
|
||
|
|
row_hits = sum(1 for s in sessions if s['has_red_by_row'])
|
||
|
|
row_weak_hits = sum(1 for s in sessions if s['has_red_by_row_weak'])
|
||
|
|
log_event("INFO", "bot", "bot.unread.scan", trace_id, "scan", "ok", "未读会话扫描完成", extra={"round": int(round_count), "rows": len(sessions), "unread": len(unread_sessions), "global_hits": global_hits, "row_hits": row_hits, "row_weak_hits": row_weak_hits})
|
||
|
|
return sessions, unread_sessions
|