import json import logging import os from logging.handlers import RotatingFileHandler from pathlib import Path from app.configs.runtime_config import get_bool, get_int, get_str from app.infrastructure.service.logging.log_schema import ALLOWED_EVENTS, LogEvent, MODULES, normalize_event from app.infrastructure.service.wechat.config import LOG_ROOT_DIR _INITIALIZED = False _LOGGERS: dict[str, logging.Logger] = {} _JSON_LOGGERS: dict[str, logging.Logger] = {} class _JsonFormatter(logging.Formatter): def format(self, record): if isinstance(record.msg, dict): return json.dumps(record.msg, ensure_ascii=False) return json.dumps({"message": str(record.msg)}, ensure_ascii=False) class _TextFormatter(logging.Formatter): def format(self, record): if isinstance(record.msg, dict): data = record.msg extra = data.get("extra") or {} kv = " ".join([f"{k}={v}" for k, v in extra.items()]) suffix = f" | {kv}" if kv else "" return f"[{data.get('ts')}][{data.get('level')}][{data.get('module')}][{data.get('event')}][{data.get('trace_id')}] {data.get('message')}{suffix}" return str(record.msg) def _build_logger(name: str, file_path: Path, formatter: logging.Formatter, level: int, rotate_mb: int, backup_count: int): logger = logging.getLogger(name) logger.handlers.clear() logger.propagate = False logger.setLevel(level) file_path.parent.mkdir(parents=True, exist_ok=True) handler = RotatingFileHandler(str(file_path), maxBytes=rotate_mb * 1024 * 1024, backupCount=backup_count, encoding="utf-8") handler.setLevel(level) handler.setFormatter(formatter) logger.addHandler(handler) return logger def init_logging(): global _INITIALIZED if _INITIALIZED: return enabled = get_bool("LOG_ENABLED", True) if not enabled: logging.disable(logging.CRITICAL) _INITIALIZED = True return level_name = (get_str("LOG_LEVEL", "INFO") or "INFO").upper() level = getattr(logging, level_name, logging.INFO) rotate_mb = max(1, get_int("LOG_ROTATE_MB", 5)) backup_count = max(1, get_int("LOG_BACKUP_COUNT", 7)) root = Path(LOG_ROOT_DIR) for module in MODULES: text_logger = _build_logger(f"solo.{module}.text", root / f"{module}.log", _TextFormatter(), level, rotate_mb, backup_count) json_logger = _build_logger(f"solo.{module}.json", root / f"{module}.jsonl", _JsonFormatter(), level, rotate_mb, backup_count) _LOGGERS[module] = text_logger _JSON_LOGGERS[module] = json_logger _INITIALIZED = True def log_event(level: str, module: str, event: str, trace_id: str, stage: str, status: str, message: str, reason: str = "", extra: dict | None = None): if not _INITIALIZED: init_logging() if logging.root.manager.disable >= logging.CRITICAL: return normalized_event = normalize_event(module, event) payload_extra = dict(extra or {}) if normalized_event != str(event or "").strip().lower() and normalized_event not in ALLOWED_EVENTS: payload_extra["event_raw"] = event payload_extra["event_normalized"] = normalized_event payload = LogEvent( level=level, module=module, event=normalized_event, trace_id=trace_id, stage=stage, status=status, reason=reason, message=message, extra=payload_extra, ).to_dict() module_name = payload["module"] lvl = getattr(logging, payload["level"], logging.INFO) _LOGGERS[module_name].log(lvl, payload) _JSON_LOGGERS[module_name].log(lvl, payload) if lvl >= logging.ERROR and module_name != "error": _LOGGERS["error"].log(lvl, payload) _JSON_LOGGERS["error"].log(lvl, payload) def new_trace_id(prefix: str = "trace") -> str: import uuid return f"{prefix}_{uuid.uuid4().hex[:12]}"