1130 lines
45 KiB
Python
1130 lines
45 KiB
Python
|
|
from dataclasses import dataclass, asdict
|
|||
|
|
from io import BytesIO
|
|||
|
|
from pathlib import Path
|
|||
|
|
import json
|
|||
|
|
import re
|
|||
|
|
import sys
|
|||
|
|
|
|||
|
|
import cv2
|
|||
|
|
import numpy as np
|
|||
|
|
from PIL import Image
|
|||
|
|
|
|||
|
|
# 项目根目录路径
|
|||
|
|
ROOT = Path(__file__).resolve().parents[4]
|
|||
|
|
sys.path.insert(0, str(ROOT))
|
|||
|
|
|
|||
|
|
from app.infrastructure.service.logging.log_service import log_event, new_trace_id
|
|||
|
|
from app.infrastructure.service.wechat.config import (
|
|||
|
|
OCR_SAVE_DIR,
|
|||
|
|
OCR_TOP_PENALTY_RATIO,
|
|||
|
|
OCR_TOP_PENALTY_BIN_FACTOR,
|
|||
|
|
OCR_TOP_PENALTY_COLOR_FACTOR,
|
|||
|
|
)
|
|||
|
|
from app.infrastructure.service.wechat.ocr import OCRService
|
|||
|
|
|
|||
|
|
# 点击后的聊天截图目录
|
|||
|
|
CLICKED_DIR = ROOT / OCR_SAVE_DIR / 'sessions' / 'clicked'
|
|||
|
|
# OCR输出目录,包含子目录original、merged_binary、ocr_crops
|
|||
|
|
OUT_DIR = ROOT / OCR_SAVE_DIR / 'sessions' / 'clicked_ocr'
|
|||
|
|
# 原始截图保存目录
|
|||
|
|
ORIGINAL_DIR = OUT_DIR / 'original'
|
|||
|
|
# 合并二值化图片保存目录
|
|||
|
|
MERGED_DIR = OUT_DIR / 'merged_binary'
|
|||
|
|
# OCR裁剪区域保存目录
|
|||
|
|
OCR_DIR = OUT_DIR / 'ocr_crops'
|
|||
|
|
# 创建所有输出目录
|
|||
|
|
for _dir in (OUT_DIR, ORIGINAL_DIR, MERGED_DIR, OCR_DIR):
|
|||
|
|
_dir.mkdir(parents=True, exist_ok=True)
|
|||
|
|
|
|||
|
|
|
|||
|
|
# 获取图片输出目录,根据类别返回不同子目录路径
|
|||
|
|
def get_image_out_dir(stem: str, category: str = 'ocr') -> Path:
|
|||
|
|
safe = stem or 'unknown'
|
|||
|
|
base = OCR_DIR
|
|||
|
|
if category == 'original':
|
|||
|
|
base = ORIGINAL_DIR
|
|||
|
|
elif category == 'merged':
|
|||
|
|
base = MERGED_DIR
|
|||
|
|
out = base / safe
|
|||
|
|
out.mkdir(parents=True, exist_ok=True)
|
|||
|
|
return out
|
|||
|
|
|
|||
|
|
# 日志记录器
|
|||
|
|
# OCR服务单例
|
|||
|
|
ocr = OCRService()
|
|||
|
|
|
|||
|
|
# 二值化和自适应阈值参数
|
|||
|
|
BINARY_THRESHOLD = 248 # 固定二值化阈值
|
|||
|
|
ADAPTIVE_BLOCK_SIZE = 13 # 自适应阈值块大小
|
|||
|
|
ADAPTIVE_C = 1 # 自适应阈值偏移量
|
|||
|
|
LEFT_MERGE_KERNEL = (5, 1) # 左侧气泡的形态学操作核
|
|||
|
|
DEFAULT_MERGE_KERNEL = (3, 1) # 默认形态学操作核
|
|||
|
|
OPEN_KERNEL = (2, 2) # 开运算核大小
|
|||
|
|
|
|||
|
|
# 气泡颜色和位置启发式参数
|
|||
|
|
PEER_BUBBLE_RGB = (238, 238, 240) # 对方消息气泡颜色(#EEEEF0)
|
|||
|
|
LEFT_BUBBLE_CX_RATIO = 0.58 # 左侧气泡中心x坐标比例阈值
|
|||
|
|
RIGHT_BUBBLE_MIN_LEFT_RATIO = 0.48 # 右侧气泡最小左侧比例
|
|||
|
|
LEFT_SIDE_MAX_RIGHT_RATIO = 0.78 # 左侧气泡最大右侧比例
|
|||
|
|
LEFT_COLOR_THRESHOLD = 64 # 左侧气泡颜色差异阈值
|
|||
|
|
RIGHT_COLOR_THRESHOLD = 108 # 右侧气泡颜色差异阈值
|
|||
|
|
BRIGHTNESS_CUTOFF = 248 # 亮度截止值
|
|||
|
|
BG_WHITE_THRESHOLD = 245 # 背景白色阈值
|
|||
|
|
BRIGHT_MASK_THRESHOLD = 242 # 高亮遮罩阈值
|
|||
|
|
|
|||
|
|
# 连通组件分析的面积和尺寸阈值
|
|||
|
|
MIN_AREA_BINARY = 700 # 二值图像最小连通区域面积
|
|||
|
|
MIN_WIDTH_BINARY = 42 # 二值图像最小宽度
|
|||
|
|
MIN_HEIGHT_BINARY = 24 # 二值图像最小高度
|
|||
|
|
MIN_AREA_COLOR = 560 # 彩色图像最小连通区域面积
|
|||
|
|
MIN_WIDTH_COLOR = 38 # 彩色图像最小宽度
|
|||
|
|
MIN_HEIGHT_COLOR = 22 # 彩色图像最小高度
|
|||
|
|
|
|||
|
|
# 全二值化备选框评分参数
|
|||
|
|
FULL_BIN_SCORE_BOTTOM = 5.0 # 底部位置得分系数
|
|||
|
|
FULL_BIN_SCORE_AREA = 0.08 # 面积得分系数
|
|||
|
|
FULL_BIN_SCORE_WIDTH = 0.15 # 宽度得分系数
|
|||
|
|
FULL_BIN_PENALTY_TALL = 8.0 # 过高惩罚系数
|
|||
|
|
FULL_BIN_PENALTY_WIDE = 3.0 # 过宽惩罚系数
|
|||
|
|
|
|||
|
|
# 最新消息候选框过滤规则(用于pick_last_white_box_from_merged_binary)
|
|||
|
|
AVATAR_RATIO_MIN = 0.75
|
|||
|
|
AVATAR_RATIO_MAX = 1.35
|
|||
|
|
AVATAR_MIN_SIZE = 28
|
|||
|
|
AVATAR_MAX_SIZE = 72
|
|||
|
|
AVATAR_EDGE_LEFT_RATIO = 0.2
|
|||
|
|
AVATAR_EDGE_RIGHT_RATIO = 0.8
|
|||
|
|
TIME_MARKER_MIN_H = 10
|
|||
|
|
TIME_MARKER_MAX_H = 22
|
|||
|
|
TIME_MARKER_MIN_W = 28
|
|||
|
|
TIME_MARKER_MAX_W = 110
|
|||
|
|
TIME_MARKER_RATIO_MIN = 1.8
|
|||
|
|
TIME_MARKER_RATIO_MAX = 8.0
|
|||
|
|
TIME_MARKER_CENTER_TOLERANCE = 0.16
|
|||
|
|
|
|||
|
|
# 二值气泡框评分参数
|
|||
|
|
BIN_SCORE_BOTTOM = 5.0 # 底部位置得分系数
|
|||
|
|
BIN_SCORE_AREA = 0.12 # 面积得分系数
|
|||
|
|
BIN_SCORE_OVERLAP = 220.0 # 重叠度得分系数
|
|||
|
|
BIN_PENALTY_TOP_GAP_LEFT = 10.0 # 左侧气泡顶部间隙惩罚
|
|||
|
|
BIN_PENALTY_TOP_GAP_RIGHT = 4.0 # 右侧气泡顶部间隙惩罚
|
|||
|
|
BIN_PENALTY_BOTTOM_GAP = 1.8 # 底部间隙惩罚
|
|||
|
|
BIN_PENALTY_TOO_TALL = 9.0 # 过高惩罚
|
|||
|
|
BIN_PENALTY_TOO_WIDE = 2.0 # 过宽惩罚
|
|||
|
|
|
|||
|
|
# 彩色气泡框评分参数
|
|||
|
|
COLOR_SCORE_AREA = 0.35 # 面积得分系数
|
|||
|
|
COLOR_SCORE_BOTTOM = 3.0 # 底部位置得分系数
|
|||
|
|
COLOR_SCORE_OVERLAP = 180.0 # 重叠度得分系数
|
|||
|
|
COLOR_PENALTY_TOP_GAP_LEFT = 7.5 # 左侧气泡顶部间隙惩罚
|
|||
|
|
COLOR_PENALTY_BOTTOM_GAP = 1.6 # 底部间隙惩罚
|
|||
|
|
COLOR_PENALTY_TOO_TALL = 7.0 # 过高惩罚
|
|||
|
|
COLOR_PENALTY_TOO_WIDE = 1.6 # 过宽惩罚
|
|||
|
|
COLOR_SCORE_TOP_GAP_RIGHT = 1.0 # 右侧气泡顶部间隙得分
|
|||
|
|
COLOR_SCORE_HEIGHT_RIGHT = 2.0 # 右侧气泡高度得分
|
|||
|
|
|
|||
|
|
# 最新消息簇评分参数
|
|||
|
|
CLUSTER_SCORE_BOTTOM = 2.2 # 簇底部位置得分系数
|
|||
|
|
CLUSTER_SCORE_HEIGHT = 0.9 # 簇高度得分系数
|
|||
|
|
CLUSTER_SCORE_WIDTH = 0.12 # 簇宽度得分系数
|
|||
|
|
CLUSTER_SCORE_TEXT_LEN = 1.5 # 文本长度得分系数
|
|||
|
|
CLUSTER_SCORE_MULTI_LINE_BONUS = 35.0 # 多行消息额外加分
|
|||
|
|
|
|||
|
|
# 非正文文本过滤:精确匹配的内容
|
|||
|
|
NON_BODY_EXACT_TEXTS = {
|
|||
|
|
'小程序',
|
|||
|
|
':S',
|
|||
|
|
}
|
|||
|
|
# 非正文文本过滤:包含这些关键词的内容
|
|||
|
|
NON_BODY_CONTAINS_TEXTS = (
|
|||
|
|
'有事请@其他福利官',
|
|||
|
|
'我是机器人',
|
|||
|
|
'群主小助手',
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
|
|||
|
|
# 聊天快照分析结果数据类
|
|||
|
|
@dataclass
|
|||
|
|
class AnalyzeResult:
|
|||
|
|
file: str
|
|||
|
|
size: tuple[int, int] | None
|
|||
|
|
crop_box: dict | None
|
|||
|
|
latest_text: str | None
|
|||
|
|
is_self_sent: bool | None
|
|||
|
|
bubble_side: str | None
|
|||
|
|
confidence: float
|
|||
|
|
valid_lines: list[str]
|
|||
|
|
error: str | None
|
|||
|
|
|
|||
|
|
|
|||
|
|
# 将PIL图片转换为PNG格式的字节数据
|
|||
|
|
def pil_to_bytes(img: Image.Image) -> bytes:
|
|||
|
|
buf = BytesIO()
|
|||
|
|
img.save(buf, format='PNG')
|
|||
|
|
return buf.getvalue()
|
|||
|
|
|
|||
|
|
|
|||
|
|
# 对图片进行OCR识别,返回识别到的文本行列表
|
|||
|
|
def ocr_lines(img: Image.Image, scene: str) -> list[str]:
|
|||
|
|
return [x.strip() for x in ocr.recognize(pil_to_bytes(img), scene=scene) if x and str(x).strip()]
|
|||
|
|
|
|||
|
|
|
|||
|
|
def ocr_items_direct(img: Image.Image) -> list[dict]:
|
|||
|
|
provider = getattr(ocr, 'rapid_provider', None)
|
|||
|
|
if provider is None or not provider.ensure_ready() or provider.engine is None:
|
|||
|
|
return []
|
|||
|
|
arr = cv2.cvtColor(np.array(img.convert('RGB')), cv2.COLOR_RGB2BGR)
|
|||
|
|
result = provider.engine(arr)
|
|||
|
|
if not result or len(result) < 1:
|
|||
|
|
return []
|
|||
|
|
rec_res = result[0] or []
|
|||
|
|
items = []
|
|||
|
|
for item in rec_res:
|
|||
|
|
if not item or len(item) < 2:
|
|||
|
|
continue
|
|||
|
|
box = np.array(item[0], dtype=np.float32)
|
|||
|
|
text = str(item[1]).strip()
|
|||
|
|
confidence = float(item[2]) if len(item) > 2 else 0.0
|
|||
|
|
if not text:
|
|||
|
|
continue
|
|||
|
|
items.append({
|
|||
|
|
'text': text,
|
|||
|
|
'confidence': confidence,
|
|||
|
|
'left': float(box[:, 0].min()),
|
|||
|
|
'top': float(box[:, 1].min()),
|
|||
|
|
'right': float(box[:, 0].max()),
|
|||
|
|
'bottom': float(box[:, 1].max()),
|
|||
|
|
})
|
|||
|
|
return items
|
|||
|
|
|
|||
|
|
|
|||
|
|
# 构建合并的二值化图像,结合固定阈值和自适应阈值进行文字区域提取
|
|||
|
|
def build_merged_binary_image(img: Image.Image, *, is_left_bubble: bool | None = None) -> Image.Image:
|
|||
|
|
arr = np.array(img.convert('RGB'))
|
|||
|
|
gray = cv2.cvtColor(arr, cv2.COLOR_RGB2GRAY)
|
|||
|
|
blurred = cv2.GaussianBlur(gray, (5, 5), 0)
|
|||
|
|
_, binary_inv = cv2.threshold(blurred, BINARY_THRESHOLD, 255, cv2.THRESH_BINARY_INV)
|
|||
|
|
adaptive_inv = cv2.adaptiveThreshold(
|
|||
|
|
blurred, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
|
|||
|
|
cv2.THRESH_BINARY_INV, ADAPTIVE_BLOCK_SIZE, ADAPTIVE_C
|
|||
|
|
)
|
|||
|
|
merged = cv2.bitwise_or(binary_inv, adaptive_inv)
|
|||
|
|
# 接近极限收缩:几乎只保留细小横向连接,尽量打断大白块
|
|||
|
|
if is_left_bubble is True:
|
|||
|
|
merge_kernel = cv2.getStructuringElement(cv2.MORPH_RECT, LEFT_MERGE_KERNEL)
|
|||
|
|
merged = cv2.morphologyEx(merged, cv2.MORPH_CLOSE, merge_kernel, iterations=1)
|
|||
|
|
merged = cv2.morphologyEx(merged, cv2.MORPH_OPEN, cv2.getStructuringElement(cv2.MORPH_RECT, OPEN_KERNEL))
|
|||
|
|
else:
|
|||
|
|
merge_kernel = cv2.getStructuringElement(cv2.MORPH_RECT, DEFAULT_MERGE_KERNEL)
|
|||
|
|
merged = cv2.morphologyEx(merged, cv2.MORPH_CLOSE, merge_kernel, iterations=1)
|
|||
|
|
merged = cv2.morphologyEx(merged, cv2.MORPH_OPEN, cv2.getStructuringElement(cv2.MORPH_RECT, OPEN_KERNEL))
|
|||
|
|
return Image.fromarray(merged)
|
|||
|
|
|
|||
|
|
|
|||
|
|
# 从全二值化图像中选择最佳文字区域框
|
|||
|
|
def pick_box_from_full_binary(img: Image.Image) -> dict | None:
|
|||
|
|
merged = np.array(build_merged_binary_image(img))
|
|||
|
|
num_labels, _, stats, _ = cv2.connectedComponentsWithStats(merged, 8)
|
|||
|
|
h, w = merged.shape[:2]
|
|||
|
|
best = None
|
|||
|
|
for label in range(1, num_labels):
|
|||
|
|
x, y, bw, bh, area = stats[label]
|
|||
|
|
if area < 300:
|
|||
|
|
continue
|
|||
|
|
if bw < 50 or bh < 24:
|
|||
|
|
continue
|
|||
|
|
right = x + bw
|
|||
|
|
bottom = y + bh
|
|||
|
|
score = 0.0
|
|||
|
|
score += bottom * FULL_BIN_SCORE_BOTTOM
|
|||
|
|
score += min(area, 20000) * FULL_BIN_SCORE_AREA
|
|||
|
|
score += min(bw, int(w * 0.7)) * FULL_BIN_SCORE_WIDTH
|
|||
|
|
if bh > h * 0.45:
|
|||
|
|
score -= (bh - h * 0.45) * FULL_BIN_PENALTY_TALL
|
|||
|
|
if bw > w * 0.65:
|
|||
|
|
score -= (bw - w * 0.65) * FULL_BIN_PENALTY_WIDE
|
|||
|
|
if best is None or score > best[0]:
|
|||
|
|
best = (score, x, y, right, bottom)
|
|||
|
|
if best is None:
|
|||
|
|
return None
|
|||
|
|
_, left, top, right, bottom = best
|
|||
|
|
return {
|
|||
|
|
'left': int(left),
|
|||
|
|
'top': int(top),
|
|||
|
|
'right': int(right),
|
|||
|
|
'bottom': int(bottom),
|
|||
|
|
'width': int(right - left),
|
|||
|
|
'height': int(bottom - top),
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
|
|||
|
|
# 从合并的二值化图像中选择最下方(最新)的白色文字框
|
|||
|
|
def pick_last_white_box_from_merged_binary(merged_img: Image.Image) -> dict | None:
|
|||
|
|
merged = np.array(merged_img.convert('L'))
|
|||
|
|
img_h, img_w = merged.shape[:2]
|
|||
|
|
num_labels, _, stats, _ = cv2.connectedComponentsWithStats(merged, 8)
|
|||
|
|
boxes = []
|
|||
|
|
for label in range(1, num_labels):
|
|||
|
|
x, y, w, h, area = stats[label]
|
|||
|
|
if area <= 0 or w <= 0 or h <= 0:
|
|||
|
|
continue
|
|||
|
|
right = x + w
|
|||
|
|
bottom = y + h
|
|||
|
|
ratio = w / max(1.0, float(h))
|
|||
|
|
is_full_width_strip = x <= img_w * 0.02 and right >= img_w * 0.98 and w >= img_w * 0.9
|
|||
|
|
if is_full_width_strip:
|
|||
|
|
continue
|
|||
|
|
if w < 32 or h < 12:
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
# 规则1:过滤边缘头像框(近似正方形 + 固定尺寸 + 处于左右边缘)
|
|||
|
|
is_avatar_like = AVATAR_RATIO_MIN <= ratio <= AVATAR_RATIO_MAX and AVATAR_MIN_SIZE <= w <= AVATAR_MAX_SIZE and AVATAR_MIN_SIZE <= h <= AVATAR_MAX_SIZE
|
|||
|
|
is_edge_avatar = x <= img_w * AVATAR_EDGE_LEFT_RATIO or right >= img_w * AVATAR_EDGE_RIGHT_RATIO
|
|||
|
|
if is_avatar_like and is_edge_avatar:
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
# 规则2:过滤中间时间标记(短高、细长、位于屏幕中轴附近)
|
|||
|
|
cx = (x + right) / 2.0
|
|||
|
|
is_time_marker_like = TIME_MARKER_MIN_H <= h <= TIME_MARKER_MAX_H and TIME_MARKER_MIN_W <= w <= TIME_MARKER_MAX_W and TIME_MARKER_RATIO_MIN <= ratio <= TIME_MARKER_RATIO_MAX and abs(cx - img_w / 2.0) <= img_w * TIME_MARKER_CENTER_TOLERANCE
|
|||
|
|
if is_time_marker_like:
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
is_left_box = cx <= img_w * LEFT_BUBBLE_CX_RATIO and x <= img_w * RIGHT_BUBBLE_MIN_LEFT_RATIO
|
|||
|
|
if not is_left_box:
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
boxes.append((x, y, right, bottom, w, h, area))
|
|||
|
|
|
|||
|
|
if not boxes:
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
candidates = []
|
|||
|
|
for x, y, right, bottom, w, h, area in boxes:
|
|||
|
|
score = 0.0
|
|||
|
|
score += bottom * 1.0
|
|||
|
|
score += right * 0.08
|
|||
|
|
score += min(area, 28000) * 0.0012
|
|||
|
|
if right < img_w * 0.35:
|
|||
|
|
score -= 80.0
|
|||
|
|
if right > img_w * 0.82:
|
|||
|
|
score -= (right - img_w * 0.82) * 1.4
|
|||
|
|
if w > img_w * 0.45 and right > img_w * 0.75:
|
|||
|
|
score -= min(260.0, (w - img_w * 0.45) * 1.2 + (right - img_w * 0.75) * 1.0)
|
|||
|
|
candidates.append((score, x, y, right, bottom))
|
|||
|
|
|
|||
|
|
candidates.sort(key=lambda item: item[0])
|
|||
|
|
_, left, top, right, bottom = candidates[-1]
|
|||
|
|
return {
|
|||
|
|
'left': int(left),
|
|||
|
|
'top': int(top),
|
|||
|
|
'right': int(right),
|
|||
|
|
'bottom': int(bottom),
|
|||
|
|
'width': int(right - left),
|
|||
|
|
'height': int(bottom - top),
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
|
|||
|
|
# 使用OCR引擎识别图片中的文字和位置信息
|
|||
|
|
def ocr_items_with_boxes(img: Image.Image, offset_x: int = 0, offset_y: int = 0) -> list[dict]:
|
|||
|
|
provider = getattr(ocr, 'provider', None)
|
|||
|
|
engine = getattr(provider, 'engine', None)
|
|||
|
|
ready = getattr(provider, 'ready', False)
|
|||
|
|
if not ready or engine is None:
|
|||
|
|
return []
|
|||
|
|
try:
|
|||
|
|
result = engine.ocr(np.array(img.convert('RGB')))
|
|||
|
|
except Exception:
|
|||
|
|
return []
|
|||
|
|
items = []
|
|||
|
|
if not isinstance(result, list):
|
|||
|
|
return items
|
|||
|
|
for block in result:
|
|||
|
|
if not isinstance(block, dict):
|
|||
|
|
continue
|
|||
|
|
texts = block.get('rec_texts') or []
|
|||
|
|
scores = block.get('rec_scores') or []
|
|||
|
|
boxes = block.get('rec_boxes')
|
|||
|
|
if boxes is None:
|
|||
|
|
boxes = []
|
|||
|
|
for idx, text in enumerate(texts):
|
|||
|
|
text = str(text).strip()
|
|||
|
|
if not text:
|
|||
|
|
continue
|
|||
|
|
score = float(scores[idx]) if idx < len(scores) and scores[idx] is not None else 0.0
|
|||
|
|
if score < 0.3 or idx >= len(boxes):
|
|||
|
|
continue
|
|||
|
|
box = np.array(boxes[idx]).astype(float)
|
|||
|
|
if box.ndim == 1:
|
|||
|
|
if box.size < 4:
|
|||
|
|
continue
|
|||
|
|
left, top, right, bottom = float(box[0]), float(box[1]), float(box[2]), float(box[3])
|
|||
|
|
cx = (left + right) / 2.0
|
|||
|
|
cy = (top + bottom) / 2.0
|
|||
|
|
else:
|
|||
|
|
xs = box[:, 0]
|
|||
|
|
ys = box[:, 1]
|
|||
|
|
left, right = float(xs.min()), float(xs.max())
|
|||
|
|
top, bottom = float(ys.min()), float(ys.max())
|
|||
|
|
cx = float(xs.mean())
|
|||
|
|
cy = float(ys.mean())
|
|||
|
|
items.append({
|
|||
|
|
'text': text,
|
|||
|
|
'score': score,
|
|||
|
|
'left': left + offset_x,
|
|||
|
|
'right': right + offset_x,
|
|||
|
|
'top': top + offset_y,
|
|||
|
|
'bottom': bottom + offset_y,
|
|||
|
|
'cx': cx + offset_x,
|
|||
|
|
'cy': cy + offset_y,
|
|||
|
|
'height': (bottom - top),
|
|||
|
|
'width': (right - left),
|
|||
|
|
})
|
|||
|
|
return items
|
|||
|
|
|
|||
|
|
|
|||
|
|
# 判断OCR识别出的文本行是否应该保留
|
|||
|
|
def should_keep_line(text: str) -> bool:
|
|||
|
|
text = (text or '').strip()
|
|||
|
|
return bool(text)
|
|||
|
|
|
|||
|
|
|
|||
|
|
# 清理候选文本行,去除非正文前缀和无效符号
|
|||
|
|
def cleanup_candidate_lines(lines: list[str]) -> list[str]:
|
|||
|
|
cleaned = []
|
|||
|
|
for idx, raw in enumerate(lines):
|
|||
|
|
text = str(raw).strip()
|
|||
|
|
if not text:
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
cleaned.append(text)
|
|||
|
|
|
|||
|
|
# 如果最后只剩一个很短的符号行,优先回退到其上一条正文
|
|||
|
|
if len(cleaned) >= 2 and len(cleaned[-1]) <= 3 and re.fullmatch(r'[::;;SsxX]+', cleaned[-1]):
|
|||
|
|
cleaned = cleaned[:-1]
|
|||
|
|
|
|||
|
|
# 若仍有明显非正文前缀挂在最上面,继续剥掉
|
|||
|
|
|
|||
|
|
return cleaned
|
|||
|
|
|
|||
|
|
|
|||
|
|
# 去重文本行列表,移除重复内容
|
|||
|
|
def dedupe_lines(lines: list[str]) -> list[str]:
|
|||
|
|
out = []
|
|||
|
|
seen = set()
|
|||
|
|
for line in lines:
|
|||
|
|
text = str(line).strip()
|
|||
|
|
if not text or text in seen:
|
|||
|
|
continue
|
|||
|
|
seen.add(text)
|
|||
|
|
out.append(text)
|
|||
|
|
return out
|
|||
|
|
|
|||
|
|
|
|||
|
|
# 根据几何位置去重OCR识别项,移除位置和文本都相同的重复项
|
|||
|
|
def dedupe_items_by_geometry(items: list[dict]) -> list[dict]:
|
|||
|
|
if not items:
|
|||
|
|
return []
|
|||
|
|
items = sorted(items, key=lambda x: (-x.get('score', 0.0), x['top'], x['left']))
|
|||
|
|
kept: list[dict] = []
|
|||
|
|
for item in items:
|
|||
|
|
text = item['text'].strip()
|
|||
|
|
duplicate = False
|
|||
|
|
for old in kept:
|
|||
|
|
if old['text'].strip() != text:
|
|||
|
|
continue
|
|||
|
|
if abs(old['cx'] - item['cx']) <= 18 and abs(old['cy'] - item['cy']) <= 12:
|
|||
|
|
duplicate = True
|
|||
|
|
break
|
|||
|
|
if not duplicate:
|
|||
|
|
kept.append(item)
|
|||
|
|
return sorted(kept, key=lambda x: (x['cy'], x['left']))
|
|||
|
|
|
|||
|
|
|
|||
|
|
# 将OCR识别项按行分组为多个簇,每簇包含同一行的文本项
|
|||
|
|
def build_line_clusters(items: list[dict], y_gap: float = 18.0) -> list[list[dict]]:
|
|||
|
|
items = dedupe_items_by_geometry([x for x in items if should_keep_line(x['text'])])
|
|||
|
|
if not items:
|
|||
|
|
return []
|
|||
|
|
clusters: list[list[dict]] = []
|
|||
|
|
current = [items[0]]
|
|||
|
|
for item in items[1:]:
|
|||
|
|
prev = current[-1]
|
|||
|
|
avg_h = max(10.0, (prev.get('height', 0) + item.get('height', 0)) / 2.0)
|
|||
|
|
gap_limit = max(y_gap, avg_h * 1.15)
|
|||
|
|
if abs(item['cy'] - prev['cy']) <= gap_limit:
|
|||
|
|
current.append(item)
|
|||
|
|
else:
|
|||
|
|
clusters.append(current)
|
|||
|
|
current = [item]
|
|||
|
|
clusters.append(current)
|
|||
|
|
return clusters
|
|||
|
|
|
|||
|
|
|
|||
|
|
# 将同一行的OCR项合并为单行文本字符串
|
|||
|
|
def cluster_to_lines(cluster: list[dict]) -> list[str]:
|
|||
|
|
rows = build_line_clusters(cluster, y_gap=10.0)
|
|||
|
|
lines = []
|
|||
|
|
for row in rows:
|
|||
|
|
row = dedupe_items_by_geometry(sorted(row, key=lambda x: x['left']))
|
|||
|
|
parts = []
|
|||
|
|
for item in row:
|
|||
|
|
text = item['text'].strip()
|
|||
|
|
if not text:
|
|||
|
|
continue
|
|||
|
|
if parts and parts[-1] == text:
|
|||
|
|
continue
|
|||
|
|
parts.append(text)
|
|||
|
|
merged = ''.join(parts)
|
|||
|
|
if merged:
|
|||
|
|
lines.append(merged)
|
|||
|
|
return dedupe_lines(lines)
|
|||
|
|
|
|||
|
|
|
|||
|
|
# 计算文本簇的边界框(上下左右和中心点)
|
|||
|
|
def cluster_bounds(cluster: list[dict]) -> dict:
|
|||
|
|
return {
|
|||
|
|
'top': min(x['top'] for x in cluster),
|
|||
|
|
'bottom': max(x['bottom'] for x in cluster),
|
|||
|
|
'left': min(x['left'] for x in cluster),
|
|||
|
|
'right': max(x['right'] for x in cluster),
|
|||
|
|
'cx': sum(x['cx'] for x in cluster) / len(cluster),
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
|
|||
|
|
# 根据簇内文本项估算单行文字的高度
|
|||
|
|
def estimate_cluster_line_height(cluster: list[dict]) -> float:
|
|||
|
|
heights = [float(x.get('height', 0) or 0) for x in cluster if float(x.get('height', 0) or 0) > 0]
|
|||
|
|
if not heights:
|
|||
|
|
return 18.0
|
|||
|
|
return max(14.0, float(np.median(heights)))
|
|||
|
|
|
|||
|
|
|
|||
|
|
# 根据文本簇边界裁剪出气泡区域图片,使用颜色和形态学方法定位气泡
|
|||
|
|
def crop_bubble_box(img: Image.Image, cluster_items: list[dict], stem: str | None = None) -> tuple[Image.Image | None, dict | None]:
|
|||
|
|
if not cluster_items:
|
|||
|
|
return None, None
|
|||
|
|
|
|||
|
|
arr = np.array(img.convert('RGB'))
|
|||
|
|
img_h, img_w = arr.shape[:2]
|
|||
|
|
bounds = cluster_bounds(cluster_items)
|
|||
|
|
|
|||
|
|
seed_left = max(0, int(bounds['left'] - 20))
|
|||
|
|
seed_right = min(img_w, int(bounds['right'] + 20))
|
|||
|
|
seed_top = max(0, int(bounds['top'] - 30))
|
|||
|
|
seed_bottom = min(img_h, int(bounds['bottom'] + 14))
|
|||
|
|
if seed_right <= seed_left or seed_bottom <= seed_top:
|
|||
|
|
return None, None
|
|||
|
|
|
|||
|
|
seed = arr[seed_top:seed_bottom, seed_left:seed_right]
|
|||
|
|
if seed.size == 0:
|
|||
|
|
return None, None
|
|||
|
|
|
|||
|
|
# 对方消息气泡:优先锚定到 #EEEEF0 附近;自己消息可再走别的颜色策略
|
|||
|
|
seed_pixels = seed.reshape(-1, 3).astype(np.int16)
|
|||
|
|
brightness = seed_pixels.mean(axis=1)
|
|||
|
|
bubble_pixels = seed_pixels[brightness < BRIGHTNESS_CUTOFF]
|
|||
|
|
if bubble_pixels.size == 0:
|
|||
|
|
bubble_pixels = seed_pixels
|
|||
|
|
median_target = np.median(bubble_pixels, axis=0)
|
|||
|
|
peer_target = np.array(PEER_BUBBLE_RGB, dtype=np.int16) # #EEEEF0
|
|||
|
|
# 左侧消息优先锚定 #EEEEF0 的灰底黑字气泡;右侧保留原有自适应策略
|
|||
|
|
is_left_bubble = bounds['cx'] <= img_w * LEFT_BUBBLE_CX_RATIO
|
|||
|
|
if is_left_bubble:
|
|||
|
|
target = peer_target
|
|||
|
|
threshold = LEFT_COLOR_THRESHOLD
|
|||
|
|
else:
|
|||
|
|
target = median_target.astype(np.int16)
|
|||
|
|
threshold = RIGHT_COLOR_THRESHOLD
|
|||
|
|
|
|||
|
|
diff = np.abs(arr.astype(np.int16) - target.reshape(1, 1, 3))
|
|||
|
|
dist = diff.sum(axis=2)
|
|||
|
|
|
|||
|
|
bg_mask = (arr[:, :, 0] > BG_WHITE_THRESHOLD) & (arr[:, :, 1] > BG_WHITE_THRESHOLD) & (arr[:, :, 2] > BG_WHITE_THRESHOLD)
|
|||
|
|
bright_mask = arr.mean(axis=2) > BRIGHT_MASK_THRESHOLD
|
|||
|
|
color_mask = (dist <= threshold) & (~bg_mask) & (~bright_mask)
|
|||
|
|
color_mask = color_mask.astype(np.uint8) * 255
|
|||
|
|
|
|||
|
|
if is_left_bubble:
|
|||
|
|
# 左侧灰气泡更怕把上下两条消息连在一起,这里弱化纵向粘连、强化横向连通
|
|||
|
|
kernel_close = cv2.getStructuringElement(cv2.MORPH_RECT, (11, 3))
|
|||
|
|
kernel_open = cv2.getStructuringElement(cv2.MORPH_RECT, (3, 3))
|
|||
|
|
else:
|
|||
|
|
kernel_close = cv2.getStructuringElement(cv2.MORPH_RECT, (7, 7))
|
|||
|
|
kernel_open = cv2.getStructuringElement(cv2.MORPH_RECT, (3, 3))
|
|||
|
|
color_mask = cv2.morphologyEx(color_mask, cv2.MORPH_CLOSE, kernel_close)
|
|||
|
|
color_mask = cv2.morphologyEx(color_mask, cv2.MORPH_OPEN, kernel_open)
|
|||
|
|
|
|||
|
|
text_cx = bounds['cx']
|
|||
|
|
text_cy = (bounds['top'] + bounds['bottom']) / 2.0
|
|||
|
|
text_h = max(1.0, bounds['bottom'] - bounds['top'])
|
|||
|
|
text_w = max(1.0, bounds['right'] - bounds['left'])
|
|||
|
|
|
|||
|
|
def is_probable_left_avatar_candidate(stats_arr, label_idx: int, x: int, y: int, w: int, h: int) -> bool:
|
|||
|
|
if not is_left_bubble:
|
|||
|
|
return False
|
|||
|
|
if x > img_w * 0.2:
|
|||
|
|
return False
|
|||
|
|
if w <= 0 or h <= 0:
|
|||
|
|
return False
|
|||
|
|
wh_ratio = w / max(1.0, float(h))
|
|||
|
|
if wh_ratio < 0.72 or wh_ratio > 1.35:
|
|||
|
|
return False
|
|||
|
|
if w < max(18.0, text_h * 0.55) or w > max(92.0, text_h * 2.4):
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
right_edge = x + w
|
|||
|
|
row_hit = False
|
|||
|
|
n = stats_arr.shape[0] if hasattr(stats_arr, 'shape') else len(stats_arr)
|
|||
|
|
for j in range(1, n):
|
|||
|
|
if j == label_idx:
|
|||
|
|
continue
|
|||
|
|
ox, oy, ow, oh, oarea = stats_arr[j]
|
|||
|
|
if oarea <= 0 or ow <= 0 or oh <= 0:
|
|||
|
|
continue
|
|||
|
|
if ox <= right_edge:
|
|||
|
|
continue
|
|||
|
|
gap = ox - right_edge
|
|||
|
|
if gap < 2 or gap > max(120.0, text_w * 1.4):
|
|||
|
|
continue
|
|||
|
|
overlap_h = max(0.0, min(y + h, oy + oh) - max(y, oy))
|
|||
|
|
if overlap_h < min(h, oh) * 0.45:
|
|||
|
|
continue
|
|||
|
|
if ow < w * 1.15:
|
|||
|
|
continue
|
|||
|
|
if oh < h * 0.75:
|
|||
|
|
continue
|
|||
|
|
row_hit = True
|
|||
|
|
break
|
|||
|
|
return row_hit
|
|||
|
|
|
|||
|
|
def pick_box_from_binary_merge() -> tuple[int, int, int, int] | None:
|
|||
|
|
merged_out_dir = get_image_out_dir(stem or 'unknown', 'merged') if stem else None
|
|||
|
|
merged_img = build_merged_binary_image(img, is_left_bubble=is_left_bubble)
|
|||
|
|
merged = np.array(merged_img)
|
|||
|
|
|
|||
|
|
if merged_out_dir is not None:
|
|||
|
|
merged_img.save(merged_out_dir / 'bubble_bin.png')
|
|||
|
|
|
|||
|
|
num_labels2, labels2, stats2, _ = cv2.connectedComponentsWithStats(merged, 8)
|
|||
|
|
best_local = None
|
|||
|
|
for label in range(1, num_labels2):
|
|||
|
|
x, y, w, h, area = stats2[label]
|
|||
|
|
if area <= 0 or w <= 0 or h <= 0:
|
|||
|
|
continue
|
|||
|
|
if is_probable_left_avatar_candidate(stats2, label, x, y, w, h):
|
|||
|
|
continue
|
|||
|
|
right = x + w
|
|||
|
|
bottom = y + h
|
|||
|
|
if not (x <= text_cx <= right and y <= text_cy <= bottom):
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
top_gap = max(0.0, bounds['top'] - y)
|
|||
|
|
bottom_gap = max(0.0, bottom - bounds['bottom'])
|
|||
|
|
overlap_x = max(0.0, min(bounds['right'], right) - max(bounds['left'], x))
|
|||
|
|
overlap_ratio = overlap_x / text_w if text_w > 0 else 0.0
|
|||
|
|
if overlap_ratio < 0.45:
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
score = 0.0
|
|||
|
|
score += bottom * BIN_SCORE_BOTTOM
|
|||
|
|
score += area * BIN_SCORE_AREA
|
|||
|
|
score += overlap_ratio * BIN_SCORE_OVERLAP
|
|||
|
|
if y < img_h * 0.18:
|
|||
|
|
score -= (img_h * 0.18 - y) * 2.0
|
|||
|
|
score -= top_gap * (BIN_PENALTY_TOP_GAP_LEFT if is_left_bubble else BIN_PENALTY_TOP_GAP_RIGHT)
|
|||
|
|
score -= abs(bottom_gap - 12.0) * BIN_PENALTY_BOTTOM_GAP
|
|||
|
|
|
|||
|
|
max_reasonable_h = max(120.0, text_h * 3.4)
|
|||
|
|
max_reasonable_w = max(360.0, text_w * 1.9)
|
|||
|
|
if h > max_reasonable_h:
|
|||
|
|
score -= (h - max_reasonable_h) * BIN_PENALTY_TOO_TALL
|
|||
|
|
if w > max_reasonable_w:
|
|||
|
|
score -= (w - max_reasonable_w) * BIN_PENALTY_TOO_WIDE
|
|||
|
|
|
|||
|
|
if is_left_bubble:
|
|||
|
|
region = arr[y:bottom, x:right].reshape(-1, 3).astype(np.int16)
|
|||
|
|
if region.size > 0:
|
|||
|
|
region_mean = np.median(region, axis=0)
|
|||
|
|
color_score = float(320 - np.abs(region_mean - peer_target).sum())
|
|||
|
|
score += max(0.0, color_score)
|
|||
|
|
|
|||
|
|
if best_local is None or score > best_local[0]:
|
|||
|
|
best_local = (score, x, y, right, bottom)
|
|||
|
|
if best_local is None:
|
|||
|
|
return None
|
|||
|
|
return best_local[1], best_local[2], best_local[3], best_local[4]
|
|||
|
|
|
|||
|
|
best = None
|
|||
|
|
binary_box = pick_box_from_binary_merge()
|
|||
|
|
if binary_box is not None:
|
|||
|
|
left, top, right, bottom = binary_box
|
|||
|
|
best = (float(bottom), left, top, right, bottom)
|
|||
|
|
else:
|
|||
|
|
num_labels, labels, stats, _ = cv2.connectedComponentsWithStats(color_mask, 8)
|
|||
|
|
for label in range(1, num_labels):
|
|||
|
|
x, y, w, h, area = stats[label]
|
|||
|
|
if area <= 0 or w <= 0 or h <= 0:
|
|||
|
|
continue
|
|||
|
|
if is_probable_left_avatar_candidate(stats, label, x, y, w, h):
|
|||
|
|
continue
|
|||
|
|
right = x + w
|
|||
|
|
bottom = y + h
|
|||
|
|
if not (x <= text_cx <= right and y <= text_cy <= bottom):
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
top_gap = max(0.0, bounds['top'] - y)
|
|||
|
|
bottom_gap = max(0.0, bottom - bounds['bottom'])
|
|||
|
|
overlap_x = max(0.0, min(bounds['right'], right) - max(bounds['left'], x))
|
|||
|
|
overlap_ratio = overlap_x / text_w if text_w > 0 else 0.0
|
|||
|
|
|
|||
|
|
score = 0.0
|
|||
|
|
score += area * COLOR_SCORE_AREA
|
|||
|
|
score += bottom * COLOR_SCORE_BOTTOM
|
|||
|
|
score += overlap_ratio * COLOR_SCORE_OVERLAP
|
|||
|
|
if y < img_h * OCR_TOP_PENALTY_RATIO:
|
|||
|
|
score -= (img_h * OCR_TOP_PENALTY_RATIO - y) * OCR_TOP_PENALTY_COLOR_FACTOR
|
|||
|
|
|
|||
|
|
if is_left_bubble:
|
|||
|
|
region = arr[y:bottom, x:right].reshape(-1, 3).astype(np.int16)
|
|||
|
|
region_mean = np.median(region, axis=0)
|
|||
|
|
color_score = float(320 - np.abs(region_mean - peer_target).sum())
|
|||
|
|
score += max(0.0, color_score)
|
|||
|
|
# 目标是最后一片灰框,不是整列灰底
|
|||
|
|
score -= top_gap * COLOR_PENALTY_TOP_GAP_LEFT
|
|||
|
|
score -= abs(bottom_gap - 12.0) * COLOR_PENALTY_BOTTOM_GAP
|
|||
|
|
max_reasonable_h = max(110.0, text_h * 2.8)
|
|||
|
|
if h > max_reasonable_h:
|
|||
|
|
score -= (h - max_reasonable_h) * COLOR_PENALTY_TOO_TALL
|
|||
|
|
max_reasonable_w = max(320.0, text_w * 1.6)
|
|||
|
|
if w > max_reasonable_w:
|
|||
|
|
score -= (w - max_reasonable_w) * COLOR_PENALTY_TOO_WIDE
|
|||
|
|
else:
|
|||
|
|
score += top_gap * COLOR_SCORE_TOP_GAP_RIGHT
|
|||
|
|
score += h * COLOR_SCORE_HEIGHT_RIGHT
|
|||
|
|
|
|||
|
|
if best is None or score > best[0]:
|
|||
|
|
best = (score, x, y, right, bottom)
|
|||
|
|
|
|||
|
|
if best is not None:
|
|||
|
|
_, left, top, right, bottom = best
|
|||
|
|
|
|||
|
|
if is_left_bubble:
|
|||
|
|
cand_w = max(1, int(right - left))
|
|||
|
|
cand_h = max(1, int(bottom - top))
|
|||
|
|
cand_ratio = cand_w / float(cand_h)
|
|||
|
|
if left <= int(img_w * 0.2) and 0.70 <= cand_ratio <= 1.35 and cand_w <= max(96, int(text_h * 2.6)):
|
|||
|
|
left = max(0, int(bounds['left'] - 6))
|
|||
|
|
right = min(img_w, int(bounds['right'] + 12))
|
|||
|
|
top = max(0, int(bounds['top'] - 4))
|
|||
|
|
bottom = min(img_h, int(bounds['bottom'] + 10))
|
|||
|
|
|
|||
|
|
# 基于已找到的色块做方向扩张:左侧气泡重点往左/上/下扩
|
|||
|
|
def col_match_ratio(x: int, y1: int, y2: int) -> float:
|
|||
|
|
seg = dist[max(0, y1):min(img_h, y2), max(0, x):min(img_w, x + 1)]
|
|||
|
|
if seg.size == 0:
|
|||
|
|
return 0.0
|
|||
|
|
return float((seg <= threshold).mean())
|
|||
|
|
|
|||
|
|
def row_match_ratio(y: int, x1: int, x2: int) -> float:
|
|||
|
|
seg = dist[max(0, y):min(img_h, y + 1), max(0, x1):min(img_w, x2)]
|
|||
|
|
if seg.size == 0:
|
|||
|
|
return 0.0
|
|||
|
|
return float((seg <= threshold).mean())
|
|||
|
|
|
|||
|
|
is_left_bubble = bounds['cx'] <= img_w * 0.58
|
|||
|
|
line_h = estimate_cluster_line_height(cluster_items)
|
|||
|
|
current_h = max(1.0, float(bottom - top))
|
|||
|
|
# 不写死目标高度,按当前识别到的文字行高推测一个更合理的气泡高度。
|
|||
|
|
# 经验上 4 行消息块通常需要约 5~6 个文本行高的容纳空间;
|
|||
|
|
# 当前框偏小时,优先把增量用于向上扩展。
|
|||
|
|
desired_h = max(current_h, line_h * 5.6)
|
|||
|
|
need_more_h = max(0.0, desired_h - current_h)
|
|||
|
|
|
|||
|
|
# 左侧消息按“红框风格”收紧:更贴正文,避免包太大
|
|||
|
|
expand_left_limit = 12 if is_left_bubble else 30
|
|||
|
|
expand_right_limit = 22 if is_left_bubble else 90
|
|||
|
|
expand_up_limit = max(6, int(min(18, line_h * 0.55 + need_more_h * 0.08))) if is_left_bubble else max(40, int(min(120, line_h * 3.8 + need_more_h * 0.9)))
|
|||
|
|
expand_down_limit = max(10, int(min(28, line_h * 0.85 + need_more_h * 0.10))) if is_left_bubble else max(20, int(min(60, line_h * 1.6 + need_more_h * 0.25)))
|
|||
|
|
|
|||
|
|
for _ in range(expand_left_limit):
|
|||
|
|
if left <= 1:
|
|||
|
|
break
|
|||
|
|
ratio = col_match_ratio(left - 1, top, bottom)
|
|||
|
|
if ratio < 0.18:
|
|||
|
|
break
|
|||
|
|
left -= 1
|
|||
|
|
|
|||
|
|
for _ in range(expand_right_limit):
|
|||
|
|
if right >= img_w - 1:
|
|||
|
|
break
|
|||
|
|
ratio = col_match_ratio(right, top, bottom)
|
|||
|
|
if ratio < 0.18:
|
|||
|
|
break
|
|||
|
|
right += 1
|
|||
|
|
|
|||
|
|
up_steps = 0
|
|||
|
|
top_guard = max(0, int(bounds['top'] - (line_h * 0.9 if is_left_bubble else line_h * 2.2)))
|
|||
|
|
for _ in range(expand_up_limit):
|
|||
|
|
if top <= 1 or top <= top_guard:
|
|||
|
|
break
|
|||
|
|
ratio = row_match_ratio(top - 1, left, right)
|
|||
|
|
# 左侧消息向上严格很多,避免把昵称/头像旁白区卷进来
|
|||
|
|
if is_left_bubble:
|
|||
|
|
if ratio < 0.18:
|
|||
|
|
break
|
|||
|
|
else:
|
|||
|
|
if ratio < 0.10 and up_steps > int(line_h * 0.8):
|
|||
|
|
break
|
|||
|
|
top -= 1
|
|||
|
|
up_steps += 1
|
|||
|
|
|
|||
|
|
down_steps = 0
|
|||
|
|
for _ in range(expand_down_limit):
|
|||
|
|
if bottom >= img_h - 1:
|
|||
|
|
break
|
|||
|
|
ratio = row_match_ratio(bottom, left, right)
|
|||
|
|
if ratio < 0.16 and down_steps > int(line_h * 0.5):
|
|||
|
|
break
|
|||
|
|
bottom += 1
|
|||
|
|
down_steps += 1
|
|||
|
|
|
|||
|
|
# 如果扩张后仍明显矮于按行高估计的合理高度,左侧消息也只做很有限补偿,避免变成大包围框
|
|||
|
|
current_h2 = max(1, bottom - top)
|
|||
|
|
missing_h = max(0, int(desired_h - current_h2))
|
|||
|
|
if missing_h > 0:
|
|||
|
|
if is_left_bubble:
|
|||
|
|
extra_up = min(max(0, int(missing_h * 0.08)), max(0, top - top_guard))
|
|||
|
|
extra_down = min(max(0, int(missing_h * 0.35)), max(0, img_h - bottom))
|
|||
|
|
else:
|
|||
|
|
extra_up = min(max(0, int(missing_h * 0.75)), max(0, top))
|
|||
|
|
extra_down = min(max(0, int(missing_h * 0.25)), max(0, img_h - bottom))
|
|||
|
|
top -= extra_up
|
|||
|
|
bottom += extra_down
|
|||
|
|
|
|||
|
|
# 左侧消息最终再做一次边界钳制,逼近你标的红框大小
|
|||
|
|
if is_left_bubble:
|
|||
|
|
target_left = int(bounds['left'] - 6)
|
|||
|
|
target_top = int(bounds['top'] - max(3, line_h * 0.28))
|
|||
|
|
target_right = int(bounds['right'] + 10)
|
|||
|
|
target_bottom = int(bounds['bottom'] + max(8, line_h * 0.45))
|
|||
|
|
left = max(0, min(int(left), target_left))
|
|||
|
|
left = max(left, max(0, target_left - 2))
|
|||
|
|
top = max(0, min(int(top), target_top))
|
|||
|
|
top = max(top, max(0, target_top - 2))
|
|||
|
|
right = min(img_w, max(int(right), target_right))
|
|||
|
|
right = min(right, target_right + 4)
|
|||
|
|
bottom = min(img_h, max(int(bottom), target_bottom))
|
|||
|
|
bottom = min(bottom, target_bottom + 4)
|
|||
|
|
else:
|
|||
|
|
left = max(0, int(left - 4))
|
|||
|
|
top = max(0, int(top - 4))
|
|||
|
|
right = min(img_w, int(right + 4))
|
|||
|
|
bottom = min(img_h, int(bottom + 4))
|
|||
|
|
else:
|
|||
|
|
# 找不到色块时才退回文本框轻微扩边;左侧消息按正文贴边框处理
|
|||
|
|
if bounds['cx'] <= img_w * 0.58:
|
|||
|
|
left = max(0, int(bounds['left'] - 6))
|
|||
|
|
right = min(img_w, int(bounds['right'] + 12))
|
|||
|
|
top = max(0, int(bounds['top'] - 4))
|
|||
|
|
bottom = min(img_h, int(bounds['bottom'] + 10))
|
|||
|
|
else:
|
|||
|
|
left = max(0, int(bounds['left'] - 24))
|
|||
|
|
right = min(img_w, int(bounds['right'] + 26))
|
|||
|
|
top = max(0, int(bounds['top'] - 42))
|
|||
|
|
bottom = min(img_h, int(bounds['bottom'] + 22))
|
|||
|
|
|
|||
|
|
if right <= left or bottom <= top:
|
|||
|
|
return None, None
|
|||
|
|
box = {'left': left, 'top': top, 'right': right, 'bottom': bottom, 'width': right - left, 'height': bottom - top}
|
|||
|
|
return img.crop((left, top, right, bottom)), box
|
|||
|
|
|
|||
|
|
|
|||
|
|
# 对气泡图片进行OCR识别,返回识别到的文本行列表
|
|||
|
|
def ocr_bubble_text(bubble_img: Image.Image, stem: str) -> list[str]:
|
|||
|
|
scale = 5
|
|||
|
|
scaled_img = bubble_img.resize((bubble_img.width * scale, bubble_img.height * scale), Image.Resampling.LANCZOS)
|
|||
|
|
raw_lines = ocr_lines(scaled_img, f'clicked_{stem}_bubble_crop')
|
|||
|
|
|
|||
|
|
def normalize(lines: list[str]) -> list[str]:
|
|||
|
|
merged = []
|
|||
|
|
for line in lines:
|
|||
|
|
text = str(line).strip()
|
|||
|
|
if not should_keep_line(text):
|
|||
|
|
continue
|
|||
|
|
if text not in merged:
|
|||
|
|
merged.append(text)
|
|||
|
|
return merged
|
|||
|
|
|
|||
|
|
def build_bold_binary_image(source_img: Image.Image) -> Image.Image:
|
|||
|
|
gray = np.array(source_img.convert('L'))
|
|||
|
|
_, binary = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
|
|||
|
|
binary_img = Image.fromarray(binary).convert('L')
|
|||
|
|
scaled_binary = binary_img.resize((source_img.width * scale, source_img.height * scale), Image.Resampling.NEAREST)
|
|||
|
|
arr = np.array(scaled_binary)
|
|||
|
|
kernel = np.ones((2, 2), np.uint8)
|
|||
|
|
dilated = cv2.dilate(255 - arr, kernel, iterations=1)
|
|||
|
|
return Image.fromarray(255 - dilated).convert('RGB')
|
|||
|
|
|
|||
|
|
def recognize_joined_short_text(source_img: Image.Image) -> str | None:
|
|||
|
|
items = ocr_items_direct(source_img)
|
|||
|
|
if len(items) < 2:
|
|||
|
|
return None
|
|||
|
|
items.sort(key=lambda item: (item['left'], item['top']))
|
|||
|
|
text = ''.join(item['text'] for item in items if should_keep_line(item['text']))
|
|||
|
|
if not text:
|
|||
|
|
return None
|
|||
|
|
if len(text) <= 8 and re.fullmatch(r'[一-鿿A-Za-z0-9]+', text):
|
|||
|
|
return text
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
def looks_like_short_name(text: str) -> bool:
|
|||
|
|
text = (text or '').strip()
|
|||
|
|
if not text:
|
|||
|
|
return False
|
|||
|
|
if len(text) > 4:
|
|||
|
|
return False
|
|||
|
|
if any(ch in text for ch in ',。!?!?.、/@'):
|
|||
|
|
return False
|
|||
|
|
return re.fullmatch(r'[一-鿿A-Za-z0-9]+', text) is not None
|
|||
|
|
|
|||
|
|
def looks_like_headerish(text: str) -> bool:
|
|||
|
|
text = (text or '').strip()
|
|||
|
|
if not text:
|
|||
|
|
return False
|
|||
|
|
if looks_like_short_name(text):
|
|||
|
|
return True
|
|||
|
|
if ('+' in text and re.search(r'\d', text)) or '拍立减' in text or '限时' in text:
|
|||
|
|
return True
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
def choose_best_suffix(lines: list[str]) -> list[str]:
|
|||
|
|
lines = cleanup_candidate_lines(lines)
|
|||
|
|
if not lines:
|
|||
|
|
return []
|
|||
|
|
suffixes = [lines[i:] for i in range(len(lines))]
|
|||
|
|
def suffix_score(seq: list[str]) -> tuple[float, float, float]:
|
|||
|
|
total_chars = sum(len(x) for x in seq)
|
|||
|
|
score = total_chars * 0.5
|
|||
|
|
score -= max(0, len(seq) - 4) * 12
|
|||
|
|
score -= len(seq) * 2
|
|||
|
|
first = seq[0]
|
|||
|
|
last = seq[-1]
|
|||
|
|
if looks_like_headerish(first) and len(seq) > 1:
|
|||
|
|
score -= 35
|
|||
|
|
if len(first) <= 8 and len(seq) > 1 and len(seq[1]) >= 8:
|
|||
|
|
score -= 26
|
|||
|
|
if looks_like_short_name(last) and len(seq) > 1:
|
|||
|
|
score -= 55
|
|||
|
|
if len(last) <= 3:
|
|||
|
|
score -= 40
|
|||
|
|
if len(seq) == 1 and len(last) >= 2:
|
|||
|
|
score += 14
|
|||
|
|
return (score, -len(seq), total_chars)
|
|||
|
|
best = max(suffixes, key=suffix_score)
|
|||
|
|
return cleanup_candidate_lines(best)
|
|||
|
|
|
|||
|
|
raw_norm = normalize(raw_lines)
|
|||
|
|
should_try_short_text = bubble_img.width <= 90 and bubble_img.height <= 60 and (not raw_norm or len(''.join(raw_norm)) <= 2)
|
|||
|
|
if should_try_short_text:
|
|||
|
|
bold_binary_img = build_bold_binary_image(bubble_img)
|
|||
|
|
bold_binary_img.save(get_image_out_dir(stem, 'ocr') / 'bubble_crop_bold_binary_5x.png')
|
|||
|
|
joined_text = recognize_joined_short_text(bold_binary_img)
|
|||
|
|
if joined_text and (not raw_norm or len(joined_text) > len(''.join(raw_norm))):
|
|||
|
|
return [joined_text]
|
|||
|
|
|
|||
|
|
if raw_norm:
|
|||
|
|
return raw_norm
|
|||
|
|
return []
|
|||
|
|
|
|||
|
|
|
|||
|
|
# 从OCR识别项中选择最新的消息簇,返回文本和原始簇
|
|||
|
|
def pick_latest_cluster(items: list[dict], image_w: int, image_h: int) -> tuple[list[str], list[str], list[dict]]:
|
|||
|
|
if not items:
|
|||
|
|
return [], [], []
|
|||
|
|
clusters = build_line_clusters(items, y_gap=max(18.0, image_h * 0.018))
|
|||
|
|
if not clusters:
|
|||
|
|
return [], [], []
|
|||
|
|
|
|||
|
|
scored = []
|
|||
|
|
for idx, cluster in enumerate(clusters):
|
|||
|
|
bounds = cluster_bounds(cluster)
|
|||
|
|
lines = cluster_to_lines(cluster)
|
|||
|
|
if not lines:
|
|||
|
|
continue
|
|||
|
|
height = bounds['bottom'] - bounds['top']
|
|||
|
|
width = bounds['right'] - bounds['left']
|
|||
|
|
text_len = sum(len(x) for x in lines)
|
|||
|
|
score = 0.0
|
|||
|
|
score += bounds['bottom'] * CLUSTER_SCORE_BOTTOM
|
|||
|
|
score += min(120.0, height * CLUSTER_SCORE_HEIGHT)
|
|||
|
|
score += min(80.0, width * CLUSTER_SCORE_WIDTH)
|
|||
|
|
score += min(60.0, text_len * CLUSTER_SCORE_TEXT_LEN)
|
|||
|
|
if len(lines) >= 2:
|
|||
|
|
score += CLUSTER_SCORE_MULTI_LINE_BONUS
|
|||
|
|
|
|||
|
|
is_left_cluster = bounds['cx'] <= image_w * LEFT_BUBBLE_CX_RATIO and bounds['left'] <= image_w * RIGHT_BUBBLE_MIN_LEFT_RATIO
|
|||
|
|
if not is_left_cluster:
|
|||
|
|
continue
|
|||
|
|
min_line_len = min((len(x.strip()) for x in lines if x and x.strip()), default=0)
|
|||
|
|
if is_left_cluster and len(lines) == 1 and min_line_len <= 4:
|
|||
|
|
near_bottom = bounds['bottom'] >= image_h * 0.62
|
|||
|
|
has_cjk = any(re.search(r'[一-鿿]', x or '') for x in lines)
|
|||
|
|
if not (near_bottom and has_cjk):
|
|||
|
|
score -= 120.0
|
|||
|
|
if is_left_cluster and width <= max(52.0, height * 1.25):
|
|||
|
|
near_bottom = bounds['bottom'] >= image_h * 0.62
|
|||
|
|
has_cjk = any(re.search(r'[一-鿿]', x or '') for x in lines)
|
|||
|
|
if not (near_bottom and has_cjk):
|
|||
|
|
score -= 140.0
|
|||
|
|
|
|||
|
|
scored.append((score, idx, lines, cluster))
|
|||
|
|
|
|||
|
|
if not scored:
|
|||
|
|
return [], [], []
|
|||
|
|
scored.sort(key=lambda x: x[0], reverse=True)
|
|||
|
|
_, _, best_lines, best_cluster = scored[0]
|
|||
|
|
|
|||
|
|
# 为了优先保证截图只落在最后一条正文块,这里先不向上合并前一个 cluster。
|
|||
|
|
return list(best_lines)[:4], dedupe_lines([x['text'] for x in items]), list(best_cluster)
|
|||
|
|
|
|||
|
|
|
|||
|
|
# 从聊天快照中提取最新消息文本
|
|||
|
|
def extract_latest_text(img: Image.Image, stem: str, preferred_ocr_img: Image.Image | None = None, preferred_crop_box: dict | None = None) -> tuple[str | None, list[str], list[dict], dict | None]:
|
|||
|
|
img_np = np.array(img)
|
|||
|
|
h, w = img_np.shape[:2]
|
|||
|
|
all_items: list[dict] = []
|
|||
|
|
merged_out_dir = get_image_out_dir(stem, 'merged')
|
|||
|
|
ocr_out_dir = get_image_out_dir(stem, 'ocr')
|
|||
|
|
|
|||
|
|
merged_full = build_merged_binary_image(img)
|
|||
|
|
merged_full.save(merged_out_dir / 'full_bin.png')
|
|||
|
|
|
|||
|
|
raw_items = ocr_items_with_boxes(img, offset_x=0, offset_y=0)
|
|||
|
|
merged_items = dedupe_items_by_geometry(raw_items)
|
|||
|
|
for item in merged_items:
|
|||
|
|
text = item['text'].strip()
|
|||
|
|
if should_keep_line(text):
|
|||
|
|
all_items.append(item)
|
|||
|
|
|
|||
|
|
cluster_lines, valid_lines, cluster_items = pick_latest_cluster(all_items, w, h)
|
|||
|
|
bubble_img, bubble_box = crop_bubble_box(img, cluster_items, stem)
|
|||
|
|
ocr_source_img = preferred_ocr_img if preferred_ocr_img is not None else bubble_img
|
|||
|
|
crop_box = preferred_crop_box if preferred_ocr_img is not None else bubble_box
|
|||
|
|
if ocr_source_img is not None:
|
|||
|
|
ocr_source_img.save(ocr_out_dir / 'bubble_crop.png')
|
|||
|
|
bubble_lines = ocr_bubble_text(ocr_source_img, stem)
|
|||
|
|
bubble_lines = cleanup_candidate_lines(bubble_lines)
|
|||
|
|
if bubble_lines:
|
|||
|
|
return '\n'.join(bubble_lines), valid_lines, cluster_items, crop_box
|
|||
|
|
latest_block_lines = list(cluster_lines)
|
|||
|
|
latest_block_lines = cleanup_candidate_lines(latest_block_lines)
|
|||
|
|
latest_block = '\n'.join(latest_block_lines) if latest_block_lines else None
|
|||
|
|
return latest_block, valid_lines, cluster_items, crop_box
|
|||
|
|
|
|||
|
|
|
|||
|
|
# 检测最新消息气泡位于屏幕左侧还是右侧(判断是自己还是对方的消息)
|
|||
|
|
def detect_latest_bubble_side(img: Image.Image, cluster_items: list[dict], crop_box: dict | None = None) -> tuple[str | None, float]:
|
|||
|
|
arr = np.array(img.convert('RGB'))
|
|||
|
|
h, w = arr.shape[:2]
|
|||
|
|
|
|||
|
|
if crop_box:
|
|||
|
|
left = float(crop_box.get('left', 0))
|
|||
|
|
right = float(crop_box.get('right', left))
|
|||
|
|
center_x = (left + right) / 2.0
|
|||
|
|
if center_x >= w * LEFT_BUBBLE_CX_RATIO or left >= w * RIGHT_BUBBLE_MIN_LEFT_RATIO:
|
|||
|
|
return 'right', 0.9
|
|||
|
|
return 'left', 0.9
|
|||
|
|
|
|||
|
|
if cluster_items:
|
|||
|
|
avg_cx = sum(x['cx'] for x in cluster_items) / len(cluster_items)
|
|||
|
|
min_left = min(x['left'] for x in cluster_items)
|
|||
|
|
max_right = max(x['right'] for x in cluster_items)
|
|||
|
|
width = max_right - min_left
|
|||
|
|
if avg_cx >= w * LEFT_BUBBLE_CX_RATIO or min_left >= w * RIGHT_BUBBLE_MIN_LEFT_RATIO:
|
|||
|
|
return 'right', 0.84 if width > w * 0.18 else 0.78
|
|||
|
|
if avg_cx <= w * 0.52 or max_right <= w * LEFT_SIDE_MAX_RIGHT_RATIO:
|
|||
|
|
return 'left', 0.84 if width > w * 0.18 else 0.78
|
|||
|
|
|
|||
|
|
focus = arr[int(h * 0.38):, :, :]
|
|||
|
|
hsv = cv2.cvtColor(focus, cv2.COLOR_RGB2HSV)
|
|||
|
|
green_mask = cv2.inRange(hsv, np.array([35, 25, 80]), np.array([95, 255, 255]))
|
|||
|
|
kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (5, 5))
|
|||
|
|
green_mask = cv2.morphologyEx(green_mask, cv2.MORPH_CLOSE, kernel)
|
|||
|
|
contours, _ = cv2.findContours(green_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
|
|||
|
|
|
|||
|
|
candidates = []
|
|||
|
|
y_offset = int(h * 0.38)
|
|||
|
|
for cnt in contours:
|
|||
|
|
x, y, ww, hh = cv2.boundingRect(cnt)
|
|||
|
|
area = ww * hh
|
|||
|
|
right = x + ww
|
|||
|
|
if area < 2500:
|
|||
|
|
continue
|
|||
|
|
if hh < 28 or ww < 80:
|
|||
|
|
continue
|
|||
|
|
if right < w * 0.66:
|
|||
|
|
continue
|
|||
|
|
if x < w * 0.45 and ww > w * 0.45:
|
|||
|
|
continue
|
|||
|
|
candidates.append((y + y_offset, x, ww, hh, area))
|
|||
|
|
|
|||
|
|
if candidates:
|
|||
|
|
candidates.sort(key=lambda item: (item[0] + item[3], item[1]))
|
|||
|
|
y, x, ww, hh, area = candidates[-1]
|
|||
|
|
center_x = x + ww / 2.0
|
|||
|
|
side = 'right' if center_x >= w * LEFT_BUBBLE_CX_RATIO else 'left'
|
|||
|
|
confidence = min(0.72, 0.50 + min(0.22, area / 40000.0))
|
|||
|
|
return side, confidence
|
|||
|
|
|
|||
|
|
return 'left', 0.35
|
|||
|
|
|
|||
|
|
|
|||
|
|
# 分析PIL图片的主函数,提取最新消息、判断发送方并返回分析结果
|
|||
|
|
def analyze_pil_image(img: Image.Image, stem: str, file_name: str | None = None) -> AnalyzeResult:
|
|||
|
|
trace_id = new_trace_id("bot")
|
|||
|
|
safe_stem = stem or 'unknown'
|
|||
|
|
ocr_out_dir = get_image_out_dir(safe_stem, 'ocr')
|
|||
|
|
for p in ocr_out_dir.glob('*.png'):
|
|||
|
|
p.unlink(missing_ok=True)
|
|||
|
|
|
|||
|
|
img = img.convert('RGB')
|
|||
|
|
original_out_dir = get_image_out_dir(safe_stem, 'original')
|
|||
|
|
img.save(original_out_dir / 'full_raw.png')
|
|||
|
|
|
|||
|
|
merged_full = build_merged_binary_image(img)
|
|||
|
|
last_white_box = pick_last_white_box_from_merged_binary(merged_full)
|
|||
|
|
preferred_ocr_img = None
|
|||
|
|
preferred_crop_box = None
|
|||
|
|
if last_white_box is not None:
|
|||
|
|
preferred_crop_box = dict(last_white_box)
|
|||
|
|
preferred_ocr_img = img.crop((
|
|||
|
|
int(last_white_box['left']),
|
|||
|
|
int(last_white_box['top']),
|
|||
|
|
int(last_white_box['right']),
|
|||
|
|
int(last_white_box['bottom']),
|
|||
|
|
))
|
|||
|
|
preferred_ocr_img.save(original_out_dir / 'last_white_box_raw.png')
|
|||
|
|
|
|||
|
|
latest_text, valid_lines, cluster_items, crop_box = extract_latest_text(img, safe_stem, preferred_ocr_img=preferred_ocr_img, preferred_crop_box=preferred_crop_box)
|
|||
|
|
bubble_side, confidence = detect_latest_bubble_side(img, cluster_items, crop_box=crop_box)
|
|||
|
|
if bubble_side == 'right':
|
|||
|
|
latest_text = None
|
|||
|
|
valid_lines = []
|
|||
|
|
is_self_sent = None if bubble_side is None else (bubble_side == 'right')
|
|||
|
|
result = AnalyzeResult(
|
|||
|
|
file=file_name or f'{safe_stem}.png',
|
|||
|
|
size=img.size,
|
|||
|
|
crop_box=crop_box,
|
|||
|
|
latest_text=latest_text,
|
|||
|
|
is_self_sent=is_self_sent,
|
|||
|
|
bubble_side=bubble_side,
|
|||
|
|
confidence=confidence,
|
|||
|
|
valid_lines=valid_lines,
|
|||
|
|
error=None,
|
|||
|
|
)
|
|||
|
|
(ocr_out_dir / 'result.json').write_text(json.dumps(asdict(result), ensure_ascii=False, indent=2), encoding='utf-8')
|
|||
|
|
log_event("INFO", "bot", "bot.chat_snapshot", trace_id, "analyze", "ok", "聊天快照分析完成", extra={"file": result.file, "has_text": bool(result.latest_text), "bubble_side": result.bubble_side or "", "confidence": result.confidence})
|
|||
|
|
return result
|
|||
|
|
|
|||
|
|
|