Files
ai-shiliu/app/infrastructure/service/wechat/chat_snapshot_analyzer.py

1130 lines
45 KiB
Python
Raw Normal View History

from dataclasses import dataclass, asdict
from io import BytesIO
from pathlib import Path
import json
import re
import sys
import cv2
import numpy as np
from PIL import Image
# 项目根目录路径
ROOT = Path(__file__).resolve().parents[4]
sys.path.insert(0, str(ROOT))
from app.infrastructure.service.logging.log_service import log_event, new_trace_id
from app.infrastructure.service.wechat.config import (
OCR_SAVE_DIR,
OCR_TOP_PENALTY_RATIO,
OCR_TOP_PENALTY_BIN_FACTOR,
OCR_TOP_PENALTY_COLOR_FACTOR,
)
from app.infrastructure.service.wechat.ocr import OCRService
# 点击后的聊天截图目录
CLICKED_DIR = ROOT / OCR_SAVE_DIR / 'sessions' / 'clicked'
# OCR输出目录包含子目录original、merged_binary、ocr_crops
OUT_DIR = ROOT / OCR_SAVE_DIR / 'sessions' / 'clicked_ocr'
# 原始截图保存目录
ORIGINAL_DIR = OUT_DIR / 'original'
# 合并二值化图片保存目录
MERGED_DIR = OUT_DIR / 'merged_binary'
# OCR裁剪区域保存目录
OCR_DIR = OUT_DIR / 'ocr_crops'
# 创建所有输出目录
for _dir in (OUT_DIR, ORIGINAL_DIR, MERGED_DIR, OCR_DIR):
_dir.mkdir(parents=True, exist_ok=True)
# 获取图片输出目录,根据类别返回不同子目录路径
def get_image_out_dir(stem: str, category: str = 'ocr') -> Path:
safe = stem or 'unknown'
base = OCR_DIR
if category == 'original':
base = ORIGINAL_DIR
elif category == 'merged':
base = MERGED_DIR
out = base / safe
out.mkdir(parents=True, exist_ok=True)
return out
# 日志记录器
# OCR服务单例
ocr = OCRService()
# 二值化和自适应阈值参数
BINARY_THRESHOLD = 248 # 固定二值化阈值
ADAPTIVE_BLOCK_SIZE = 13 # 自适应阈值块大小
ADAPTIVE_C = 1 # 自适应阈值偏移量
LEFT_MERGE_KERNEL = (5, 1) # 左侧气泡的形态学操作核
DEFAULT_MERGE_KERNEL = (3, 1) # 默认形态学操作核
OPEN_KERNEL = (2, 2) # 开运算核大小
# 气泡颜色和位置启发式参数
PEER_BUBBLE_RGB = (238, 238, 240) # 对方消息气泡颜色(#EEEEF0
LEFT_BUBBLE_CX_RATIO = 0.58 # 左侧气泡中心x坐标比例阈值
RIGHT_BUBBLE_MIN_LEFT_RATIO = 0.48 # 右侧气泡最小左侧比例
LEFT_SIDE_MAX_RIGHT_RATIO = 0.78 # 左侧气泡最大右侧比例
LEFT_COLOR_THRESHOLD = 64 # 左侧气泡颜色差异阈值
RIGHT_COLOR_THRESHOLD = 108 # 右侧气泡颜色差异阈值
BRIGHTNESS_CUTOFF = 248 # 亮度截止值
BG_WHITE_THRESHOLD = 245 # 背景白色阈值
BRIGHT_MASK_THRESHOLD = 242 # 高亮遮罩阈值
# 连通组件分析的面积和尺寸阈值
MIN_AREA_BINARY = 700 # 二值图像最小连通区域面积
MIN_WIDTH_BINARY = 42 # 二值图像最小宽度
MIN_HEIGHT_BINARY = 24 # 二值图像最小高度
MIN_AREA_COLOR = 560 # 彩色图像最小连通区域面积
MIN_WIDTH_COLOR = 38 # 彩色图像最小宽度
MIN_HEIGHT_COLOR = 22 # 彩色图像最小高度
# 全二值化备选框评分参数
FULL_BIN_SCORE_BOTTOM = 5.0 # 底部位置得分系数
FULL_BIN_SCORE_AREA = 0.08 # 面积得分系数
FULL_BIN_SCORE_WIDTH = 0.15 # 宽度得分系数
FULL_BIN_PENALTY_TALL = 8.0 # 过高惩罚系数
FULL_BIN_PENALTY_WIDE = 3.0 # 过宽惩罚系数
# 最新消息候选框过滤规则用于pick_last_white_box_from_merged_binary
AVATAR_RATIO_MIN = 0.75
AVATAR_RATIO_MAX = 1.35
AVATAR_MIN_SIZE = 28
AVATAR_MAX_SIZE = 72
AVATAR_EDGE_LEFT_RATIO = 0.2
AVATAR_EDGE_RIGHT_RATIO = 0.8
TIME_MARKER_MIN_H = 10
TIME_MARKER_MAX_H = 22
TIME_MARKER_MIN_W = 28
TIME_MARKER_MAX_W = 110
TIME_MARKER_RATIO_MIN = 1.8
TIME_MARKER_RATIO_MAX = 8.0
TIME_MARKER_CENTER_TOLERANCE = 0.16
# 二值气泡框评分参数
BIN_SCORE_BOTTOM = 5.0 # 底部位置得分系数
BIN_SCORE_AREA = 0.12 # 面积得分系数
BIN_SCORE_OVERLAP = 220.0 # 重叠度得分系数
BIN_PENALTY_TOP_GAP_LEFT = 10.0 # 左侧气泡顶部间隙惩罚
BIN_PENALTY_TOP_GAP_RIGHT = 4.0 # 右侧气泡顶部间隙惩罚
BIN_PENALTY_BOTTOM_GAP = 1.8 # 底部间隙惩罚
BIN_PENALTY_TOO_TALL = 9.0 # 过高惩罚
BIN_PENALTY_TOO_WIDE = 2.0 # 过宽惩罚
# 彩色气泡框评分参数
COLOR_SCORE_AREA = 0.35 # 面积得分系数
COLOR_SCORE_BOTTOM = 3.0 # 底部位置得分系数
COLOR_SCORE_OVERLAP = 180.0 # 重叠度得分系数
COLOR_PENALTY_TOP_GAP_LEFT = 7.5 # 左侧气泡顶部间隙惩罚
COLOR_PENALTY_BOTTOM_GAP = 1.6 # 底部间隙惩罚
COLOR_PENALTY_TOO_TALL = 7.0 # 过高惩罚
COLOR_PENALTY_TOO_WIDE = 1.6 # 过宽惩罚
COLOR_SCORE_TOP_GAP_RIGHT = 1.0 # 右侧气泡顶部间隙得分
COLOR_SCORE_HEIGHT_RIGHT = 2.0 # 右侧气泡高度得分
# 最新消息簇评分参数
CLUSTER_SCORE_BOTTOM = 2.2 # 簇底部位置得分系数
CLUSTER_SCORE_HEIGHT = 0.9 # 簇高度得分系数
CLUSTER_SCORE_WIDTH = 0.12 # 簇宽度得分系数
CLUSTER_SCORE_TEXT_LEN = 1.5 # 文本长度得分系数
CLUSTER_SCORE_MULTI_LINE_BONUS = 35.0 # 多行消息额外加分
# 非正文文本过滤:精确匹配的内容
NON_BODY_EXACT_TEXTS = {
'小程序',
':S',
}
# 非正文文本过滤:包含这些关键词的内容
NON_BODY_CONTAINS_TEXTS = (
'有事请@其他福利官',
'我是机器人',
'群主小助手',
)
# 聊天快照分析结果数据类
@dataclass
class AnalyzeResult:
file: str
size: tuple[int, int] | None
crop_box: dict | None
latest_text: str | None
is_self_sent: bool | None
bubble_side: str | None
confidence: float
valid_lines: list[str]
error: str | None
# 将PIL图片转换为PNG格式的字节数据
def pil_to_bytes(img: Image.Image) -> bytes:
buf = BytesIO()
img.save(buf, format='PNG')
return buf.getvalue()
# 对图片进行OCR识别返回识别到的文本行列表
def ocr_lines(img: Image.Image, scene: str) -> list[str]:
return [x.strip() for x in ocr.recognize(pil_to_bytes(img), scene=scene) if x and str(x).strip()]
def ocr_items_direct(img: Image.Image) -> list[dict]:
provider = getattr(ocr, 'rapid_provider', None)
if provider is None or not provider.ensure_ready() or provider.engine is None:
return []
arr = cv2.cvtColor(np.array(img.convert('RGB')), cv2.COLOR_RGB2BGR)
result = provider.engine(arr)
if not result or len(result) < 1:
return []
rec_res = result[0] or []
items = []
for item in rec_res:
if not item or len(item) < 2:
continue
box = np.array(item[0], dtype=np.float32)
text = str(item[1]).strip()
confidence = float(item[2]) if len(item) > 2 else 0.0
if not text:
continue
items.append({
'text': text,
'confidence': confidence,
'left': float(box[:, 0].min()),
'top': float(box[:, 1].min()),
'right': float(box[:, 0].max()),
'bottom': float(box[:, 1].max()),
})
return items
# 构建合并的二值化图像,结合固定阈值和自适应阈值进行文字区域提取
def build_merged_binary_image(img: Image.Image, *, is_left_bubble: bool | None = None) -> Image.Image:
arr = np.array(img.convert('RGB'))
gray = cv2.cvtColor(arr, cv2.COLOR_RGB2GRAY)
blurred = cv2.GaussianBlur(gray, (5, 5), 0)
_, binary_inv = cv2.threshold(blurred, BINARY_THRESHOLD, 255, cv2.THRESH_BINARY_INV)
adaptive_inv = cv2.adaptiveThreshold(
blurred, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
cv2.THRESH_BINARY_INV, ADAPTIVE_BLOCK_SIZE, ADAPTIVE_C
)
merged = cv2.bitwise_or(binary_inv, adaptive_inv)
# 接近极限收缩:几乎只保留细小横向连接,尽量打断大白块
if is_left_bubble is True:
merge_kernel = cv2.getStructuringElement(cv2.MORPH_RECT, LEFT_MERGE_KERNEL)
merged = cv2.morphologyEx(merged, cv2.MORPH_CLOSE, merge_kernel, iterations=1)
merged = cv2.morphologyEx(merged, cv2.MORPH_OPEN, cv2.getStructuringElement(cv2.MORPH_RECT, OPEN_KERNEL))
else:
merge_kernel = cv2.getStructuringElement(cv2.MORPH_RECT, DEFAULT_MERGE_KERNEL)
merged = cv2.morphologyEx(merged, cv2.MORPH_CLOSE, merge_kernel, iterations=1)
merged = cv2.morphologyEx(merged, cv2.MORPH_OPEN, cv2.getStructuringElement(cv2.MORPH_RECT, OPEN_KERNEL))
return Image.fromarray(merged)
# 从全二值化图像中选择最佳文字区域框
def pick_box_from_full_binary(img: Image.Image) -> dict | None:
merged = np.array(build_merged_binary_image(img))
num_labels, _, stats, _ = cv2.connectedComponentsWithStats(merged, 8)
h, w = merged.shape[:2]
best = None
for label in range(1, num_labels):
x, y, bw, bh, area = stats[label]
if area < 300:
continue
if bw < 50 or bh < 24:
continue
right = x + bw
bottom = y + bh
score = 0.0
score += bottom * FULL_BIN_SCORE_BOTTOM
score += min(area, 20000) * FULL_BIN_SCORE_AREA
score += min(bw, int(w * 0.7)) * FULL_BIN_SCORE_WIDTH
if bh > h * 0.45:
score -= (bh - h * 0.45) * FULL_BIN_PENALTY_TALL
if bw > w * 0.65:
score -= (bw - w * 0.65) * FULL_BIN_PENALTY_WIDE
if best is None or score > best[0]:
best = (score, x, y, right, bottom)
if best is None:
return None
_, left, top, right, bottom = best
return {
'left': int(left),
'top': int(top),
'right': int(right),
'bottom': int(bottom),
'width': int(right - left),
'height': int(bottom - top),
}
# 从合并的二值化图像中选择最下方(最新)的白色文字框
def pick_last_white_box_from_merged_binary(merged_img: Image.Image) -> dict | None:
merged = np.array(merged_img.convert('L'))
img_h, img_w = merged.shape[:2]
num_labels, _, stats, _ = cv2.connectedComponentsWithStats(merged, 8)
boxes = []
for label in range(1, num_labels):
x, y, w, h, area = stats[label]
if area <= 0 or w <= 0 or h <= 0:
continue
right = x + w
bottom = y + h
ratio = w / max(1.0, float(h))
is_full_width_strip = x <= img_w * 0.02 and right >= img_w * 0.98 and w >= img_w * 0.9
if is_full_width_strip:
continue
if w < 32 or h < 12:
continue
# 规则1过滤边缘头像框近似正方形 + 固定尺寸 + 处于左右边缘)
is_avatar_like = AVATAR_RATIO_MIN <= ratio <= AVATAR_RATIO_MAX and AVATAR_MIN_SIZE <= w <= AVATAR_MAX_SIZE and AVATAR_MIN_SIZE <= h <= AVATAR_MAX_SIZE
is_edge_avatar = x <= img_w * AVATAR_EDGE_LEFT_RATIO or right >= img_w * AVATAR_EDGE_RIGHT_RATIO
if is_avatar_like and is_edge_avatar:
continue
# 规则2过滤中间时间标记短高、细长、位于屏幕中轴附近
cx = (x + right) / 2.0
is_time_marker_like = TIME_MARKER_MIN_H <= h <= TIME_MARKER_MAX_H and TIME_MARKER_MIN_W <= w <= TIME_MARKER_MAX_W and TIME_MARKER_RATIO_MIN <= ratio <= TIME_MARKER_RATIO_MAX and abs(cx - img_w / 2.0) <= img_w * TIME_MARKER_CENTER_TOLERANCE
if is_time_marker_like:
continue
is_left_box = cx <= img_w * LEFT_BUBBLE_CX_RATIO and x <= img_w * RIGHT_BUBBLE_MIN_LEFT_RATIO
if not is_left_box:
continue
boxes.append((x, y, right, bottom, w, h, area))
if not boxes:
return None
candidates = []
for x, y, right, bottom, w, h, area in boxes:
score = 0.0
score += bottom * 1.0
score += right * 0.08
score += min(area, 28000) * 0.0012
if right < img_w * 0.35:
score -= 80.0
if right > img_w * 0.82:
score -= (right - img_w * 0.82) * 1.4
if w > img_w * 0.45 and right > img_w * 0.75:
score -= min(260.0, (w - img_w * 0.45) * 1.2 + (right - img_w * 0.75) * 1.0)
candidates.append((score, x, y, right, bottom))
candidates.sort(key=lambda item: item[0])
_, left, top, right, bottom = candidates[-1]
return {
'left': int(left),
'top': int(top),
'right': int(right),
'bottom': int(bottom),
'width': int(right - left),
'height': int(bottom - top),
}
# 使用OCR引擎识别图片中的文字和位置信息
def ocr_items_with_boxes(img: Image.Image, offset_x: int = 0, offset_y: int = 0) -> list[dict]:
provider = getattr(ocr, 'provider', None)
engine = getattr(provider, 'engine', None)
ready = getattr(provider, 'ready', False)
if not ready or engine is None:
return []
try:
result = engine.ocr(np.array(img.convert('RGB')))
except Exception:
return []
items = []
if not isinstance(result, list):
return items
for block in result:
if not isinstance(block, dict):
continue
texts = block.get('rec_texts') or []
scores = block.get('rec_scores') or []
boxes = block.get('rec_boxes')
if boxes is None:
boxes = []
for idx, text in enumerate(texts):
text = str(text).strip()
if not text:
continue
score = float(scores[idx]) if idx < len(scores) and scores[idx] is not None else 0.0
if score < 0.3 or idx >= len(boxes):
continue
box = np.array(boxes[idx]).astype(float)
if box.ndim == 1:
if box.size < 4:
continue
left, top, right, bottom = float(box[0]), float(box[1]), float(box[2]), float(box[3])
cx = (left + right) / 2.0
cy = (top + bottom) / 2.0
else:
xs = box[:, 0]
ys = box[:, 1]
left, right = float(xs.min()), float(xs.max())
top, bottom = float(ys.min()), float(ys.max())
cx = float(xs.mean())
cy = float(ys.mean())
items.append({
'text': text,
'score': score,
'left': left + offset_x,
'right': right + offset_x,
'top': top + offset_y,
'bottom': bottom + offset_y,
'cx': cx + offset_x,
'cy': cy + offset_y,
'height': (bottom - top),
'width': (right - left),
})
return items
# 判断OCR识别出的文本行是否应该保留
def should_keep_line(text: str) -> bool:
text = (text or '').strip()
return bool(text)
# 清理候选文本行,去除非正文前缀和无效符号
def cleanup_candidate_lines(lines: list[str]) -> list[str]:
cleaned = []
for idx, raw in enumerate(lines):
text = str(raw).strip()
if not text:
continue
cleaned.append(text)
# 如果最后只剩一个很短的符号行,优先回退到其上一条正文
if len(cleaned) >= 2 and len(cleaned[-1]) <= 3 and re.fullmatch(r'[:;SsxX]+', cleaned[-1]):
cleaned = cleaned[:-1]
# 若仍有明显非正文前缀挂在最上面,继续剥掉
return cleaned
# 去重文本行列表,移除重复内容
def dedupe_lines(lines: list[str]) -> list[str]:
out = []
seen = set()
for line in lines:
text = str(line).strip()
if not text or text in seen:
continue
seen.add(text)
out.append(text)
return out
# 根据几何位置去重OCR识别项移除位置和文本都相同的重复项
def dedupe_items_by_geometry(items: list[dict]) -> list[dict]:
if not items:
return []
items = sorted(items, key=lambda x: (-x.get('score', 0.0), x['top'], x['left']))
kept: list[dict] = []
for item in items:
text = item['text'].strip()
duplicate = False
for old in kept:
if old['text'].strip() != text:
continue
if abs(old['cx'] - item['cx']) <= 18 and abs(old['cy'] - item['cy']) <= 12:
duplicate = True
break
if not duplicate:
kept.append(item)
return sorted(kept, key=lambda x: (x['cy'], x['left']))
# 将OCR识别项按行分组为多个簇每簇包含同一行的文本项
def build_line_clusters(items: list[dict], y_gap: float = 18.0) -> list[list[dict]]:
items = dedupe_items_by_geometry([x for x in items if should_keep_line(x['text'])])
if not items:
return []
clusters: list[list[dict]] = []
current = [items[0]]
for item in items[1:]:
prev = current[-1]
avg_h = max(10.0, (prev.get('height', 0) + item.get('height', 0)) / 2.0)
gap_limit = max(y_gap, avg_h * 1.15)
if abs(item['cy'] - prev['cy']) <= gap_limit:
current.append(item)
else:
clusters.append(current)
current = [item]
clusters.append(current)
return clusters
# 将同一行的OCR项合并为单行文本字符串
def cluster_to_lines(cluster: list[dict]) -> list[str]:
rows = build_line_clusters(cluster, y_gap=10.0)
lines = []
for row in rows:
row = dedupe_items_by_geometry(sorted(row, key=lambda x: x['left']))
parts = []
for item in row:
text = item['text'].strip()
if not text:
continue
if parts and parts[-1] == text:
continue
parts.append(text)
merged = ''.join(parts)
if merged:
lines.append(merged)
return dedupe_lines(lines)
# 计算文本簇的边界框(上下左右和中心点)
def cluster_bounds(cluster: list[dict]) -> dict:
return {
'top': min(x['top'] for x in cluster),
'bottom': max(x['bottom'] for x in cluster),
'left': min(x['left'] for x in cluster),
'right': max(x['right'] for x in cluster),
'cx': sum(x['cx'] for x in cluster) / len(cluster),
}
# 根据簇内文本项估算单行文字的高度
def estimate_cluster_line_height(cluster: list[dict]) -> float:
heights = [float(x.get('height', 0) or 0) for x in cluster if float(x.get('height', 0) or 0) > 0]
if not heights:
return 18.0
return max(14.0, float(np.median(heights)))
# 根据文本簇边界裁剪出气泡区域图片,使用颜色和形态学方法定位气泡
def crop_bubble_box(img: Image.Image, cluster_items: list[dict], stem: str | None = None) -> tuple[Image.Image | None, dict | None]:
if not cluster_items:
return None, None
arr = np.array(img.convert('RGB'))
img_h, img_w = arr.shape[:2]
bounds = cluster_bounds(cluster_items)
seed_left = max(0, int(bounds['left'] - 20))
seed_right = min(img_w, int(bounds['right'] + 20))
seed_top = max(0, int(bounds['top'] - 30))
seed_bottom = min(img_h, int(bounds['bottom'] + 14))
if seed_right <= seed_left or seed_bottom <= seed_top:
return None, None
seed = arr[seed_top:seed_bottom, seed_left:seed_right]
if seed.size == 0:
return None, None
# 对方消息气泡:优先锚定到 #EEEEF0 附近;自己消息可再走别的颜色策略
seed_pixels = seed.reshape(-1, 3).astype(np.int16)
brightness = seed_pixels.mean(axis=1)
bubble_pixels = seed_pixels[brightness < BRIGHTNESS_CUTOFF]
if bubble_pixels.size == 0:
bubble_pixels = seed_pixels
median_target = np.median(bubble_pixels, axis=0)
peer_target = np.array(PEER_BUBBLE_RGB, dtype=np.int16) # #EEEEF0
# 左侧消息优先锚定 #EEEEF0 的灰底黑字气泡;右侧保留原有自适应策略
is_left_bubble = bounds['cx'] <= img_w * LEFT_BUBBLE_CX_RATIO
if is_left_bubble:
target = peer_target
threshold = LEFT_COLOR_THRESHOLD
else:
target = median_target.astype(np.int16)
threshold = RIGHT_COLOR_THRESHOLD
diff = np.abs(arr.astype(np.int16) - target.reshape(1, 1, 3))
dist = diff.sum(axis=2)
bg_mask = (arr[:, :, 0] > BG_WHITE_THRESHOLD) & (arr[:, :, 1] > BG_WHITE_THRESHOLD) & (arr[:, :, 2] > BG_WHITE_THRESHOLD)
bright_mask = arr.mean(axis=2) > BRIGHT_MASK_THRESHOLD
color_mask = (dist <= threshold) & (~bg_mask) & (~bright_mask)
color_mask = color_mask.astype(np.uint8) * 255
if is_left_bubble:
# 左侧灰气泡更怕把上下两条消息连在一起,这里弱化纵向粘连、强化横向连通
kernel_close = cv2.getStructuringElement(cv2.MORPH_RECT, (11, 3))
kernel_open = cv2.getStructuringElement(cv2.MORPH_RECT, (3, 3))
else:
kernel_close = cv2.getStructuringElement(cv2.MORPH_RECT, (7, 7))
kernel_open = cv2.getStructuringElement(cv2.MORPH_RECT, (3, 3))
color_mask = cv2.morphologyEx(color_mask, cv2.MORPH_CLOSE, kernel_close)
color_mask = cv2.morphologyEx(color_mask, cv2.MORPH_OPEN, kernel_open)
text_cx = bounds['cx']
text_cy = (bounds['top'] + bounds['bottom']) / 2.0
text_h = max(1.0, bounds['bottom'] - bounds['top'])
text_w = max(1.0, bounds['right'] - bounds['left'])
def is_probable_left_avatar_candidate(stats_arr, label_idx: int, x: int, y: int, w: int, h: int) -> bool:
if not is_left_bubble:
return False
if x > img_w * 0.2:
return False
if w <= 0 or h <= 0:
return False
wh_ratio = w / max(1.0, float(h))
if wh_ratio < 0.72 or wh_ratio > 1.35:
return False
if w < max(18.0, text_h * 0.55) or w > max(92.0, text_h * 2.4):
return False
right_edge = x + w
row_hit = False
n = stats_arr.shape[0] if hasattr(stats_arr, 'shape') else len(stats_arr)
for j in range(1, n):
if j == label_idx:
continue
ox, oy, ow, oh, oarea = stats_arr[j]
if oarea <= 0 or ow <= 0 or oh <= 0:
continue
if ox <= right_edge:
continue
gap = ox - right_edge
if gap < 2 or gap > max(120.0, text_w * 1.4):
continue
overlap_h = max(0.0, min(y + h, oy + oh) - max(y, oy))
if overlap_h < min(h, oh) * 0.45:
continue
if ow < w * 1.15:
continue
if oh < h * 0.75:
continue
row_hit = True
break
return row_hit
def pick_box_from_binary_merge() -> tuple[int, int, int, int] | None:
merged_out_dir = get_image_out_dir(stem or 'unknown', 'merged') if stem else None
merged_img = build_merged_binary_image(img, is_left_bubble=is_left_bubble)
merged = np.array(merged_img)
if merged_out_dir is not None:
merged_img.save(merged_out_dir / 'bubble_bin.png')
num_labels2, labels2, stats2, _ = cv2.connectedComponentsWithStats(merged, 8)
best_local = None
for label in range(1, num_labels2):
x, y, w, h, area = stats2[label]
if area <= 0 or w <= 0 or h <= 0:
continue
if is_probable_left_avatar_candidate(stats2, label, x, y, w, h):
continue
right = x + w
bottom = y + h
if not (x <= text_cx <= right and y <= text_cy <= bottom):
continue
top_gap = max(0.0, bounds['top'] - y)
bottom_gap = max(0.0, bottom - bounds['bottom'])
overlap_x = max(0.0, min(bounds['right'], right) - max(bounds['left'], x))
overlap_ratio = overlap_x / text_w if text_w > 0 else 0.0
if overlap_ratio < 0.45:
continue
score = 0.0
score += bottom * BIN_SCORE_BOTTOM
score += area * BIN_SCORE_AREA
score += overlap_ratio * BIN_SCORE_OVERLAP
if y < img_h * 0.18:
score -= (img_h * 0.18 - y) * 2.0
score -= top_gap * (BIN_PENALTY_TOP_GAP_LEFT if is_left_bubble else BIN_PENALTY_TOP_GAP_RIGHT)
score -= abs(bottom_gap - 12.0) * BIN_PENALTY_BOTTOM_GAP
max_reasonable_h = max(120.0, text_h * 3.4)
max_reasonable_w = max(360.0, text_w * 1.9)
if h > max_reasonable_h:
score -= (h - max_reasonable_h) * BIN_PENALTY_TOO_TALL
if w > max_reasonable_w:
score -= (w - max_reasonable_w) * BIN_PENALTY_TOO_WIDE
if is_left_bubble:
region = arr[y:bottom, x:right].reshape(-1, 3).astype(np.int16)
if region.size > 0:
region_mean = np.median(region, axis=0)
color_score = float(320 - np.abs(region_mean - peer_target).sum())
score += max(0.0, color_score)
if best_local is None or score > best_local[0]:
best_local = (score, x, y, right, bottom)
if best_local is None:
return None
return best_local[1], best_local[2], best_local[3], best_local[4]
best = None
binary_box = pick_box_from_binary_merge()
if binary_box is not None:
left, top, right, bottom = binary_box
best = (float(bottom), left, top, right, bottom)
else:
num_labels, labels, stats, _ = cv2.connectedComponentsWithStats(color_mask, 8)
for label in range(1, num_labels):
x, y, w, h, area = stats[label]
if area <= 0 or w <= 0 or h <= 0:
continue
if is_probable_left_avatar_candidate(stats, label, x, y, w, h):
continue
right = x + w
bottom = y + h
if not (x <= text_cx <= right and y <= text_cy <= bottom):
continue
top_gap = max(0.0, bounds['top'] - y)
bottom_gap = max(0.0, bottom - bounds['bottom'])
overlap_x = max(0.0, min(bounds['right'], right) - max(bounds['left'], x))
overlap_ratio = overlap_x / text_w if text_w > 0 else 0.0
score = 0.0
score += area * COLOR_SCORE_AREA
score += bottom * COLOR_SCORE_BOTTOM
score += overlap_ratio * COLOR_SCORE_OVERLAP
if y < img_h * OCR_TOP_PENALTY_RATIO:
score -= (img_h * OCR_TOP_PENALTY_RATIO - y) * OCR_TOP_PENALTY_COLOR_FACTOR
if is_left_bubble:
region = arr[y:bottom, x:right].reshape(-1, 3).astype(np.int16)
region_mean = np.median(region, axis=0)
color_score = float(320 - np.abs(region_mean - peer_target).sum())
score += max(0.0, color_score)
# 目标是最后一片灰框,不是整列灰底
score -= top_gap * COLOR_PENALTY_TOP_GAP_LEFT
score -= abs(bottom_gap - 12.0) * COLOR_PENALTY_BOTTOM_GAP
max_reasonable_h = max(110.0, text_h * 2.8)
if h > max_reasonable_h:
score -= (h - max_reasonable_h) * COLOR_PENALTY_TOO_TALL
max_reasonable_w = max(320.0, text_w * 1.6)
if w > max_reasonable_w:
score -= (w - max_reasonable_w) * COLOR_PENALTY_TOO_WIDE
else:
score += top_gap * COLOR_SCORE_TOP_GAP_RIGHT
score += h * COLOR_SCORE_HEIGHT_RIGHT
if best is None or score > best[0]:
best = (score, x, y, right, bottom)
if best is not None:
_, left, top, right, bottom = best
if is_left_bubble:
cand_w = max(1, int(right - left))
cand_h = max(1, int(bottom - top))
cand_ratio = cand_w / float(cand_h)
if left <= int(img_w * 0.2) and 0.70 <= cand_ratio <= 1.35 and cand_w <= max(96, int(text_h * 2.6)):
left = max(0, int(bounds['left'] - 6))
right = min(img_w, int(bounds['right'] + 12))
top = max(0, int(bounds['top'] - 4))
bottom = min(img_h, int(bounds['bottom'] + 10))
# 基于已找到的色块做方向扩张:左侧气泡重点往左/上/下扩
def col_match_ratio(x: int, y1: int, y2: int) -> float:
seg = dist[max(0, y1):min(img_h, y2), max(0, x):min(img_w, x + 1)]
if seg.size == 0:
return 0.0
return float((seg <= threshold).mean())
def row_match_ratio(y: int, x1: int, x2: int) -> float:
seg = dist[max(0, y):min(img_h, y + 1), max(0, x1):min(img_w, x2)]
if seg.size == 0:
return 0.0
return float((seg <= threshold).mean())
is_left_bubble = bounds['cx'] <= img_w * 0.58
line_h = estimate_cluster_line_height(cluster_items)
current_h = max(1.0, float(bottom - top))
# 不写死目标高度,按当前识别到的文字行高推测一个更合理的气泡高度。
# 经验上 4 行消息块通常需要约 5~6 个文本行高的容纳空间;
# 当前框偏小时,优先把增量用于向上扩展。
desired_h = max(current_h, line_h * 5.6)
need_more_h = max(0.0, desired_h - current_h)
# 左侧消息按“红框风格”收紧:更贴正文,避免包太大
expand_left_limit = 12 if is_left_bubble else 30
expand_right_limit = 22 if is_left_bubble else 90
expand_up_limit = max(6, int(min(18, line_h * 0.55 + need_more_h * 0.08))) if is_left_bubble else max(40, int(min(120, line_h * 3.8 + need_more_h * 0.9)))
expand_down_limit = max(10, int(min(28, line_h * 0.85 + need_more_h * 0.10))) if is_left_bubble else max(20, int(min(60, line_h * 1.6 + need_more_h * 0.25)))
for _ in range(expand_left_limit):
if left <= 1:
break
ratio = col_match_ratio(left - 1, top, bottom)
if ratio < 0.18:
break
left -= 1
for _ in range(expand_right_limit):
if right >= img_w - 1:
break
ratio = col_match_ratio(right, top, bottom)
if ratio < 0.18:
break
right += 1
up_steps = 0
top_guard = max(0, int(bounds['top'] - (line_h * 0.9 if is_left_bubble else line_h * 2.2)))
for _ in range(expand_up_limit):
if top <= 1 or top <= top_guard:
break
ratio = row_match_ratio(top - 1, left, right)
# 左侧消息向上严格很多,避免把昵称/头像旁白区卷进来
if is_left_bubble:
if ratio < 0.18:
break
else:
if ratio < 0.10 and up_steps > int(line_h * 0.8):
break
top -= 1
up_steps += 1
down_steps = 0
for _ in range(expand_down_limit):
if bottom >= img_h - 1:
break
ratio = row_match_ratio(bottom, left, right)
if ratio < 0.16 and down_steps > int(line_h * 0.5):
break
bottom += 1
down_steps += 1
# 如果扩张后仍明显矮于按行高估计的合理高度,左侧消息也只做很有限补偿,避免变成大包围框
current_h2 = max(1, bottom - top)
missing_h = max(0, int(desired_h - current_h2))
if missing_h > 0:
if is_left_bubble:
extra_up = min(max(0, int(missing_h * 0.08)), max(0, top - top_guard))
extra_down = min(max(0, int(missing_h * 0.35)), max(0, img_h - bottom))
else:
extra_up = min(max(0, int(missing_h * 0.75)), max(0, top))
extra_down = min(max(0, int(missing_h * 0.25)), max(0, img_h - bottom))
top -= extra_up
bottom += extra_down
# 左侧消息最终再做一次边界钳制,逼近你标的红框大小
if is_left_bubble:
target_left = int(bounds['left'] - 6)
target_top = int(bounds['top'] - max(3, line_h * 0.28))
target_right = int(bounds['right'] + 10)
target_bottom = int(bounds['bottom'] + max(8, line_h * 0.45))
left = max(0, min(int(left), target_left))
left = max(left, max(0, target_left - 2))
top = max(0, min(int(top), target_top))
top = max(top, max(0, target_top - 2))
right = min(img_w, max(int(right), target_right))
right = min(right, target_right + 4)
bottom = min(img_h, max(int(bottom), target_bottom))
bottom = min(bottom, target_bottom + 4)
else:
left = max(0, int(left - 4))
top = max(0, int(top - 4))
right = min(img_w, int(right + 4))
bottom = min(img_h, int(bottom + 4))
else:
# 找不到色块时才退回文本框轻微扩边;左侧消息按正文贴边框处理
if bounds['cx'] <= img_w * 0.58:
left = max(0, int(bounds['left'] - 6))
right = min(img_w, int(bounds['right'] + 12))
top = max(0, int(bounds['top'] - 4))
bottom = min(img_h, int(bounds['bottom'] + 10))
else:
left = max(0, int(bounds['left'] - 24))
right = min(img_w, int(bounds['right'] + 26))
top = max(0, int(bounds['top'] - 42))
bottom = min(img_h, int(bounds['bottom'] + 22))
if right <= left or bottom <= top:
return None, None
box = {'left': left, 'top': top, 'right': right, 'bottom': bottom, 'width': right - left, 'height': bottom - top}
return img.crop((left, top, right, bottom)), box
# 对气泡图片进行OCR识别返回识别到的文本行列表
def ocr_bubble_text(bubble_img: Image.Image, stem: str) -> list[str]:
scale = 5
scaled_img = bubble_img.resize((bubble_img.width * scale, bubble_img.height * scale), Image.Resampling.LANCZOS)
raw_lines = ocr_lines(scaled_img, f'clicked_{stem}_bubble_crop')
def normalize(lines: list[str]) -> list[str]:
merged = []
for line in lines:
text = str(line).strip()
if not should_keep_line(text):
continue
if text not in merged:
merged.append(text)
return merged
def build_bold_binary_image(source_img: Image.Image) -> Image.Image:
gray = np.array(source_img.convert('L'))
_, binary = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
binary_img = Image.fromarray(binary).convert('L')
scaled_binary = binary_img.resize((source_img.width * scale, source_img.height * scale), Image.Resampling.NEAREST)
arr = np.array(scaled_binary)
kernel = np.ones((2, 2), np.uint8)
dilated = cv2.dilate(255 - arr, kernel, iterations=1)
return Image.fromarray(255 - dilated).convert('RGB')
def recognize_joined_short_text(source_img: Image.Image) -> str | None:
items = ocr_items_direct(source_img)
if len(items) < 2:
return None
items.sort(key=lambda item: (item['left'], item['top']))
text = ''.join(item['text'] for item in items if should_keep_line(item['text']))
if not text:
return None
if len(text) <= 8 and re.fullmatch(r'[一-鿿A-Za-z0-9]+', text):
return text
return None
def looks_like_short_name(text: str) -> bool:
text = (text or '').strip()
if not text:
return False
if len(text) > 4:
return False
if any(ch in text for ch in ',。!?!?.、/@'):
return False
return re.fullmatch(r'[一-鿿A-Za-z0-9]+', text) is not None
def looks_like_headerish(text: str) -> bool:
text = (text or '').strip()
if not text:
return False
if looks_like_short_name(text):
return True
if ('+' in text and re.search(r'\d', text)) or '拍立减' in text or '限时' in text:
return True
return False
def choose_best_suffix(lines: list[str]) -> list[str]:
lines = cleanup_candidate_lines(lines)
if not lines:
return []
suffixes = [lines[i:] for i in range(len(lines))]
def suffix_score(seq: list[str]) -> tuple[float, float, float]:
total_chars = sum(len(x) for x in seq)
score = total_chars * 0.5
score -= max(0, len(seq) - 4) * 12
score -= len(seq) * 2
first = seq[0]
last = seq[-1]
if looks_like_headerish(first) and len(seq) > 1:
score -= 35
if len(first) <= 8 and len(seq) > 1 and len(seq[1]) >= 8:
score -= 26
if looks_like_short_name(last) and len(seq) > 1:
score -= 55
if len(last) <= 3:
score -= 40
if len(seq) == 1 and len(last) >= 2:
score += 14
return (score, -len(seq), total_chars)
best = max(suffixes, key=suffix_score)
return cleanup_candidate_lines(best)
raw_norm = normalize(raw_lines)
should_try_short_text = bubble_img.width <= 90 and bubble_img.height <= 60 and (not raw_norm or len(''.join(raw_norm)) <= 2)
if should_try_short_text:
bold_binary_img = build_bold_binary_image(bubble_img)
bold_binary_img.save(get_image_out_dir(stem, 'ocr') / 'bubble_crop_bold_binary_5x.png')
joined_text = recognize_joined_short_text(bold_binary_img)
if joined_text and (not raw_norm or len(joined_text) > len(''.join(raw_norm))):
return [joined_text]
if raw_norm:
return raw_norm
return []
# 从OCR识别项中选择最新的消息簇返回文本和原始簇
def pick_latest_cluster(items: list[dict], image_w: int, image_h: int) -> tuple[list[str], list[str], list[dict]]:
if not items:
return [], [], []
clusters = build_line_clusters(items, y_gap=max(18.0, image_h * 0.018))
if not clusters:
return [], [], []
scored = []
for idx, cluster in enumerate(clusters):
bounds = cluster_bounds(cluster)
lines = cluster_to_lines(cluster)
if not lines:
continue
height = bounds['bottom'] - bounds['top']
width = bounds['right'] - bounds['left']
text_len = sum(len(x) for x in lines)
score = 0.0
score += bounds['bottom'] * CLUSTER_SCORE_BOTTOM
score += min(120.0, height * CLUSTER_SCORE_HEIGHT)
score += min(80.0, width * CLUSTER_SCORE_WIDTH)
score += min(60.0, text_len * CLUSTER_SCORE_TEXT_LEN)
if len(lines) >= 2:
score += CLUSTER_SCORE_MULTI_LINE_BONUS
is_left_cluster = bounds['cx'] <= image_w * LEFT_BUBBLE_CX_RATIO and bounds['left'] <= image_w * RIGHT_BUBBLE_MIN_LEFT_RATIO
if not is_left_cluster:
continue
min_line_len = min((len(x.strip()) for x in lines if x and x.strip()), default=0)
if is_left_cluster and len(lines) == 1 and min_line_len <= 4:
near_bottom = bounds['bottom'] >= image_h * 0.62
has_cjk = any(re.search(r'[一-鿿]', x or '') for x in lines)
if not (near_bottom and has_cjk):
score -= 120.0
if is_left_cluster and width <= max(52.0, height * 1.25):
near_bottom = bounds['bottom'] >= image_h * 0.62
has_cjk = any(re.search(r'[一-鿿]', x or '') for x in lines)
if not (near_bottom and has_cjk):
score -= 140.0
scored.append((score, idx, lines, cluster))
if not scored:
return [], [], []
scored.sort(key=lambda x: x[0], reverse=True)
_, _, best_lines, best_cluster = scored[0]
# 为了优先保证截图只落在最后一条正文块,这里先不向上合并前一个 cluster。
return list(best_lines)[:4], dedupe_lines([x['text'] for x in items]), list(best_cluster)
# 从聊天快照中提取最新消息文本
def extract_latest_text(img: Image.Image, stem: str, preferred_ocr_img: Image.Image | None = None, preferred_crop_box: dict | None = None) -> tuple[str | None, list[str], list[dict], dict | None]:
img_np = np.array(img)
h, w = img_np.shape[:2]
all_items: list[dict] = []
merged_out_dir = get_image_out_dir(stem, 'merged')
ocr_out_dir = get_image_out_dir(stem, 'ocr')
merged_full = build_merged_binary_image(img)
merged_full.save(merged_out_dir / 'full_bin.png')
raw_items = ocr_items_with_boxes(img, offset_x=0, offset_y=0)
merged_items = dedupe_items_by_geometry(raw_items)
for item in merged_items:
text = item['text'].strip()
if should_keep_line(text):
all_items.append(item)
cluster_lines, valid_lines, cluster_items = pick_latest_cluster(all_items, w, h)
bubble_img, bubble_box = crop_bubble_box(img, cluster_items, stem)
ocr_source_img = preferred_ocr_img if preferred_ocr_img is not None else bubble_img
crop_box = preferred_crop_box if preferred_ocr_img is not None else bubble_box
if ocr_source_img is not None:
ocr_source_img.save(ocr_out_dir / 'bubble_crop.png')
bubble_lines = ocr_bubble_text(ocr_source_img, stem)
bubble_lines = cleanup_candidate_lines(bubble_lines)
if bubble_lines:
return '\n'.join(bubble_lines), valid_lines, cluster_items, crop_box
latest_block_lines = list(cluster_lines)
latest_block_lines = cleanup_candidate_lines(latest_block_lines)
latest_block = '\n'.join(latest_block_lines) if latest_block_lines else None
return latest_block, valid_lines, cluster_items, crop_box
# 检测最新消息气泡位于屏幕左侧还是右侧(判断是自己还是对方的消息)
def detect_latest_bubble_side(img: Image.Image, cluster_items: list[dict], crop_box: dict | None = None) -> tuple[str | None, float]:
arr = np.array(img.convert('RGB'))
h, w = arr.shape[:2]
if crop_box:
left = float(crop_box.get('left', 0))
right = float(crop_box.get('right', left))
center_x = (left + right) / 2.0
if center_x >= w * LEFT_BUBBLE_CX_RATIO or left >= w * RIGHT_BUBBLE_MIN_LEFT_RATIO:
return 'right', 0.9
return 'left', 0.9
if cluster_items:
avg_cx = sum(x['cx'] for x in cluster_items) / len(cluster_items)
min_left = min(x['left'] for x in cluster_items)
max_right = max(x['right'] for x in cluster_items)
width = max_right - min_left
if avg_cx >= w * LEFT_BUBBLE_CX_RATIO or min_left >= w * RIGHT_BUBBLE_MIN_LEFT_RATIO:
return 'right', 0.84 if width > w * 0.18 else 0.78
if avg_cx <= w * 0.52 or max_right <= w * LEFT_SIDE_MAX_RIGHT_RATIO:
return 'left', 0.84 if width > w * 0.18 else 0.78
focus = arr[int(h * 0.38):, :, :]
hsv = cv2.cvtColor(focus, cv2.COLOR_RGB2HSV)
green_mask = cv2.inRange(hsv, np.array([35, 25, 80]), np.array([95, 255, 255]))
kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (5, 5))
green_mask = cv2.morphologyEx(green_mask, cv2.MORPH_CLOSE, kernel)
contours, _ = cv2.findContours(green_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
candidates = []
y_offset = int(h * 0.38)
for cnt in contours:
x, y, ww, hh = cv2.boundingRect(cnt)
area = ww * hh
right = x + ww
if area < 2500:
continue
if hh < 28 or ww < 80:
continue
if right < w * 0.66:
continue
if x < w * 0.45 and ww > w * 0.45:
continue
candidates.append((y + y_offset, x, ww, hh, area))
if candidates:
candidates.sort(key=lambda item: (item[0] + item[3], item[1]))
y, x, ww, hh, area = candidates[-1]
center_x = x + ww / 2.0
side = 'right' if center_x >= w * LEFT_BUBBLE_CX_RATIO else 'left'
confidence = min(0.72, 0.50 + min(0.22, area / 40000.0))
return side, confidence
return 'left', 0.35
# 分析PIL图片的主函数提取最新消息、判断发送方并返回分析结果
def analyze_pil_image(img: Image.Image, stem: str, file_name: str | None = None) -> AnalyzeResult:
trace_id = new_trace_id("bot")
safe_stem = stem or 'unknown'
ocr_out_dir = get_image_out_dir(safe_stem, 'ocr')
for p in ocr_out_dir.glob('*.png'):
p.unlink(missing_ok=True)
img = img.convert('RGB')
original_out_dir = get_image_out_dir(safe_stem, 'original')
img.save(original_out_dir / 'full_raw.png')
merged_full = build_merged_binary_image(img)
last_white_box = pick_last_white_box_from_merged_binary(merged_full)
preferred_ocr_img = None
preferred_crop_box = None
if last_white_box is not None:
preferred_crop_box = dict(last_white_box)
preferred_ocr_img = img.crop((
int(last_white_box['left']),
int(last_white_box['top']),
int(last_white_box['right']),
int(last_white_box['bottom']),
))
preferred_ocr_img.save(original_out_dir / 'last_white_box_raw.png')
latest_text, valid_lines, cluster_items, crop_box = extract_latest_text(img, safe_stem, preferred_ocr_img=preferred_ocr_img, preferred_crop_box=preferred_crop_box)
bubble_side, confidence = detect_latest_bubble_side(img, cluster_items, crop_box=crop_box)
if bubble_side == 'right':
latest_text = None
valid_lines = []
is_self_sent = None if bubble_side is None else (bubble_side == 'right')
result = AnalyzeResult(
file=file_name or f'{safe_stem}.png',
size=img.size,
crop_box=crop_box,
latest_text=latest_text,
is_self_sent=is_self_sent,
bubble_side=bubble_side,
confidence=confidence,
valid_lines=valid_lines,
error=None,
)
(ocr_out_dir / 'result.json').write_text(json.dumps(asdict(result), ensure_ascii=False, indent=2), encoding='utf-8')
log_event("INFO", "bot", "bot.chat_snapshot", trace_id, "analyze", "ok", "聊天快照分析完成", extra={"file": result.file, "has_text": bool(result.latest_text), "bubble_side": result.bubble_side or "", "confidence": result.confidence})
return result