Browse Source

功能基

LuTong 1 month ago
commit
cb1f0ef643
9 changed files with 566 additions and 0 deletions
  1. 29 0
      README.md
  2. 17 0
      main.py
  3. 5 0
      requirements.txt
  4. 0 0
      src/__init__.py
  5. 64 0
      src/config_loader.py
  6. 155 0
      src/excel_processor.py
  7. 113 0
      src/image_text_remover.py
  8. 183 0
      src/main_window.py
  9. BIN
      template/raw_template.xlsx

+ 29 - 0
README.md

@@ -0,0 +1,29 @@
+# 选品部图片去文字
+
+PyQt5 桌面客户端:选择 Excel 文件,对「商品图片」列(E 列,从第 3 行起)中的图片做去文字处理,并保存到指定目录。
+
+## 功能
+
+- **选择文件**:仅支持 `.xlsx`、`.xls`;建议使用 `.xlsx` 以完整支持图片处理。
+- **文件路径**:选择后显示所选文件全路径。
+- **保存路径**:选择目录后在该行右侧显示;新文件保存为该目录下 `原文件名_0000.xlsx`,重复写入时序号递增(0001、0002…)。
+- **转化**:点击后下方出现进度条,与处理过程同步;处理完成后弹出保存路径。
+
+处理逻辑:
+
+1. 读取上传的 Excel(格式参考 `template/raw_template.xlsx`),定位 E 列从第 3 行开始的所有图片。
+2. 对每张图片做去文字(图中商品身上的文字尽量保留);当前使用 OpenCV 实现,大模型 Key 可从 `D:\AllePro\upaibm_system` 的配置读取,用于后续扩展 AI 去文字。
+3. 复制整份文档,用处理后的图片按原顺序替换原图(同一媒体文件在多处使用时会被统一替换)。
+4. 将新文档写入「保存路径」,文件名:`原文件名_四位数字.xlsx`,从 0000 起递增。
+
+## 环境与运行
+
+```bash
+# 建议使用 Python 3.8+
+pip install -r requirements.txt
+python main.py
+```
+
+## 配置说明
+
+- 大模型 API Key 从项目 `D:\AllePro\upaibm_system` 读取:优先使用 `claude-config.json` 中第一个带 `api_key` 的 Provider,若无则尝试 `common/app/schemas/vllm_model.py` 中的 `API_BASE`、`API_KEY`。当前去文字仍以 OpenCV 为主,该配置预留用于后续接入 AI 图像编辑接口。

+ 17 - 0
main.py

@@ -0,0 +1,17 @@
+# -*- coding: utf-8 -*-
+"""选品部图片去文字 - 桌面客户端入口。"""
+import sys
+from PyQt5.QtWidgets import QApplication
+from src.main_window import MainWindow
+
+
+def main():
+    app = QApplication(sys.argv)
+    app.setApplicationName("选品部图片去文字")
+    win = MainWindow()
+    win.show()
+    sys.exit(app.exec_())
+
+
+if __name__ == "__main__":
+    main()

+ 5 - 0
requirements.txt

@@ -0,0 +1,5 @@
+PyQt5>=5.15.0
+openpyxl>=3.1.0
+Pillow>=10.0.0
+opencv-python-headless>=4.8.0
+numpy>=1.24.0

+ 0 - 0
src/__init__.py


+ 64 - 0
src/config_loader.py

@@ -0,0 +1,64 @@
+# -*- coding: utf-8 -*-
+"""从 upaibm_system 项目读取大模型配置(API Key、base_url)。"""
+import json
+import os
+from pathlib import Path
+from typing import Optional, Tuple
+
+# 默认 upaibm_system 项目路径
+UPAIBM_SYSTEM_ROOT = Path(r"D:\AllePro\upaibm_system")
+CLAUDE_CONFIG_PATH = UPAIBM_SYSTEM_ROOT / "claude-config.json"
+VLLM_MODEL_PATH = UPAIBM_SYSTEM_ROOT / "common" / "app" / "schemas" / "vllm_model.py"
+
+
+def load_claude_config() -> Optional[dict]:
+    """读取 claude-config.json,返回第一个带 api_key 的 Provider。"""
+    if not CLAUDE_CONFIG_PATH.exists():
+        return None
+    try:
+        with open(CLAUDE_CONFIG_PATH, "r", encoding="utf-8") as f:
+            data = json.load(f)
+        providers = data.get("Providers") or []
+        for p in providers:
+            key = (p.get("api_key") or "").strip()
+            if key:
+                return {
+                    "api_base_url": (p.get("api_base_url") or "").strip(),
+                    "api_key": key,
+                    "name": p.get("name", ""),
+                }
+    except Exception:
+        pass
+    return None
+
+
+def load_vllm_config() -> Optional[dict]:
+    """从 vllm_model.py 读取 API_BASE 与 API_KEY(若存在)。"""
+    if not VLLM_MODEL_PATH.exists():
+        return None
+    try:
+        text = VLLM_MODEL_PATH.read_text(encoding="utf-8")
+        api_base = None
+        api_key = None
+        for line in text.splitlines():
+            line = line.strip()
+            if line.startswith("API_BASE ="):
+                api_base = line.split("=", 1)[1].strip().strip('"').strip("'")
+            elif line.startswith("API_KEY ="):
+                api_key = line.split("=", 1)[1].strip().strip('"').strip("'")
+        if api_base and api_key:
+            return {"api_base_url": api_base, "api_key": api_key, "name": "vllm"}
+    except Exception:
+        pass
+    return None
+
+
+def get_ai_config() -> Tuple[Optional[str], Optional[str]]:
+    """
+    获取用于调用大模型的 (api_base_url, api_key)。
+    优先使用 claude-config.json,其次 vllm_model.py。
+    """
+    cfg = load_claude_config() or load_vllm_config()
+    if not cfg:
+        return None, None
+    return cfg.get("api_base_url") or None, cfg.get("api_key") or None

+ 155 - 0
src/excel_processor.py

@@ -0,0 +1,155 @@
+# -*- coding: utf-8 -*-
+"""Excel 处理:读取 E 列(商品图片)从第 3 行起的图片,去文字后写回并保存。"""
+import io
+import zipfile
+import shutil
+import tempfile
+import logging
+from pathlib import Path
+from typing import Callable, Optional, Set, Tuple
+
+import openpyxl
+
+from .image_text_remover import remove_text_from_image_bytes
+
+logger = logging.getLogger(__name__)
+
+COL_E_INDEX = 4
+START_ROW_INDEX = 2
+
+
+def _anchor_in_e_from_row3(anchor) -> bool:
+    """判断图片是否在 E 列且从第 3 行开始(0-based row>=2)。"""
+    if anchor is None:
+        return False
+    from_obj = getattr(anchor, "_from", None) or getattr(anchor, "from_obj", None)
+    if from_obj is None:
+        return False
+    col = getattr(from_obj, "col", -1)
+    row = getattr(from_obj, "row", -1)
+    return col == COL_E_INDEX and row >= START_ROW_INDEX
+
+
+def _get_e_column_media_paths(ws) -> Set[str]:
+    """获取工作表中 E 列从第 3 行开始的所有图片在 xlsx 内的媒体路径(如 /xl/media/image1.jpeg)。"""
+    paths = set()
+    for img in getattr(ws, "_images", []):
+        if not _anchor_in_e_from_row3(img.anchor):
+            continue
+        path = getattr(img, "path", None) or getattr(img, "ref", None)
+        if path:
+            # 统一为 zip 内路径格式(无前导 / 或带 xl/)
+            path = path.lstrip("/").replace("\\", "/")
+            if not path.startswith("xl/"):
+                path = "xl/media/" + path.split("/")[-1] if "/" in path else "xl/media/" + path
+            paths.add(path)
+    return paths
+
+
+def _normalize_zip_path(name: str) -> str:
+    """xl/media/image1.jpeg 形式。"""
+    name = name.replace("\\", "/").lstrip("/")
+    if name.startswith("xl/media/"):
+        return name
+    if name.startswith("xl/"):
+        return name
+    return "xl/media/" + name.split("/")[-1] if "/" in name else "xl/media/" + name
+
+
+def process_excel(
+    input_path: str,
+    output_dir: str,
+    api_base_url: Optional[str] = None,
+    api_key: Optional[str] = None,
+    progress_callback: Optional[Callable[[int, int, str], None]] = None,
+) -> str:
+    """
+    处理上传的 Excel:E 列从第 3 行起的图片去文字,复制工作簿并保存到 output_dir。
+    文件名:原文件名_0000.xlsx,重复写入时序号递增。
+    返回最终保存的完整路径。
+    """
+    input_path = Path(input_path)
+    output_dir = Path(output_dir)
+    output_dir.mkdir(parents=True, exist_ok=True)
+
+    # 确定输出文件名:原文件名_0000.xlsx 且序号递增
+    stem = input_path.stem
+    suffix = input_path.suffix.lower()
+    if suffix != ".xlsx":
+        suffix = ".xlsx"
+    existing = list(output_dir.glob(f"{stem}_*.xlsx"))
+    max_num = -1
+    for p in existing:
+        try:
+            num_part = p.stem.rsplit("_", 1)[-1]
+            if len(num_part) == 4 and num_part.isdigit():
+                max_num = max(max_num, int(num_part))
+        except Exception:
+            pass
+    next_num = max_num + 1
+    out_name = f"{stem}_{next_num:04d}{suffix}"
+    out_path = output_dir / out_name
+
+    def report(current: int, total: int, message: str = ""):
+        if progress_callback:
+            progress_callback(current, total, message)
+
+    # 用 openpyxl 只读方式获取需要处理的媒体路径(不修改原文件)
+    wb = openpyxl.load_workbook(input_path, keep_vba=False, read_only=False)
+    ws = wb.active
+    media_paths = _get_e_column_media_paths(ws)
+    wb.close()
+
+    total = len(media_paths)
+    report(0, max(total, 1), "正在读取 Excel 中的图片…")
+
+    # 从原始 xlsx 的 zip 中读取需要处理的图片并去文字
+    processed: dict = {}
+    with zipfile.ZipFile(input_path, "r") as zf:
+        names = zf.namelist()
+        norm_names = {n.replace("\\", "/"): n for n in names}
+        for idx, rel_path in enumerate(sorted(media_paths)):
+            rn = rel_path.replace("\\", "/")
+            n = norm_names.get(rn) or (rn if rn in names else None)
+            if not n:
+                for k, v in norm_names.items():
+                    if k == rn or k.endswith("/" + rn.split("/")[-1]):
+                        n = v
+                        break
+            if not n:
+                report(idx + 1, total, f"跳过未找到: {rel_path}")
+                continue
+            try:
+                data = zf.read(n)
+                ext = Path(n).suffix or ".png"
+                new_data = remove_text_from_image_bytes(
+                    data,
+                    api_base_url=api_base_url,
+                    api_key=api_key,
+                    image_ext=ext,
+                )
+                processed[n] = new_data
+            except Exception as e:
+                logger.exception("处理图片 %s 失败: %s", n, e)
+                processed[n] = zf.read(n)
+            report(idx + 1, total, f"去文字处理 {idx + 1}/{total}")
+
+    # 复制整个 xlsx(zip),替换处理过的媒体文件
+    report(total, total, "正在写入新文档…")
+    with tempfile.NamedTemporaryFile(suffix=".xlsx", delete=False) as tmp:
+        tmp_path = tmp.name
+    try:
+        with zipfile.ZipFile(input_path, "r") as z_in:
+            with zipfile.ZipFile(tmp_path, "w", zipfile.ZIP_DEFLATED) as z_out:
+                for name in z_in.namelist():
+                    if name in processed:
+                        z_out.writestr(name, processed[name])
+                    else:
+                        z_out.writestr(name, z_in.read(name))
+        shutil.move(tmp_path, out_path)
+    except Exception:
+        Path(tmp_path).unlink(missing_ok=True)
+        raise
+
+    report(total, total, "完成")
+    return str(out_path)

+ 113 - 0
src/image_text_remover.py

@@ -0,0 +1,113 @@
+# -*- coding: utf-8 -*-
+"""图片去文字:去除图中叠加文字,尽量保留商品身上的文字(基于 OpenCV,可选 AI)。"""
+import io
+import base64
+import logging
+from typing import Optional, Tuple
+
+import cv2
+import numpy as np
+from PIL import Image
+
+logger = logging.getLogger(__name__)
+
+# E 列从第 3 行开始(0-based 行为 2)
+COL_E_INDEX = 4
+START_ROW_INDEX = 2
+
+
+def _decode_image_bytes(data: bytes) -> np.ndarray:
+    """将图片字节转为 BGR 数组。"""
+    nparr = np.frombuffer(data, np.uint8)
+    img = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
+    if img is None:
+        pil_img = Image.open(io.BytesIO(data))
+        img = cv2.cvtColor(np.array(pil_img), cv2.COLOR_RGB2BGR)
+    return img
+
+
+def _encode_image_bgr(img: np.ndarray, ext: str = ".png") -> bytes:
+    """将 BGR 数组编码为字节。"""
+    ext = ext.lower()
+    if ext in (".jpg", ".jpeg"):
+        _, buf = cv2.imencode(".jpg", img)
+    else:
+        _, buf = cv2.imencode(".png", img)
+    return buf.tobytes()
+
+
+def remove_text_opencv(img_bgr: np.ndarray) -> np.ndarray:
+    """
+    使用 OpenCV 检测并修复(涂掉)可能的叠加文字区域。
+    通过高对比度区域 + 形态学得到 mask,再 inpaint。
+    商品身上的小字可能也会被弱化,但叠加的大字会优先被去掉。
+    """
+    h, w = img_bgr.shape[:2]
+    gray = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2GRAY)
+
+    # 二值化得到可能文字区域(黑字/白字都考虑)
+    _, th1 = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
+    th2 = cv2.adaptiveThreshold(gray, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY, 11, 2)
+    # 取边缘
+    edges = cv2.Canny(gray, 50, 150)
+    # 组合:文字往往是高对比度小块
+    mask = np.zeros_like(gray, dtype=np.uint8)
+    mask[(th1 != th2) | (edges > 0)] = 255
+    # 形态学:膨胀成块,避免只涂细线
+    kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (3, 3))
+    mask = cv2.dilate(mask, kernel)
+    kernel2 = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
+    mask = cv2.dilate(mask, kernel2)
+    # 去掉过大的连通域(可能是商品主体)
+    contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
+    area_thresh = (h * w) * 0.05
+    mask_final = np.zeros_like(gray, dtype=np.uint8)
+    for c in contours:
+        if cv2.contourArea(c) < area_thresh:
+            cv2.drawContours(mask_final, [c], -1, 255, -1)
+    if mask_final.max() == 0:
+        return img_bgr
+    inpaint_radius = max(3, min(h, w) // 100)
+    result = cv2.inpaint(img_bgr, mask_final, inpaint_radius, cv2.INPAINT_TELEA)
+    return result
+
+
+def remove_text_from_image_bytes(
+    data: bytes,
+    api_base_url: Optional[str] = None,
+    api_key: Optional[str] = None,
+    image_ext: str = ".png",
+) -> bytes:
+    """
+    对图片字节做去文字处理。
+    若提供了 api_base_url 与 api_key,可在此处调用支持图像编辑的大模型(当前未实现具体 API,仍用 OpenCV)。
+    否则使用 OpenCV 进行去文字。
+    """
+    img = _decode_image_bytes(data)
+    if img is None:
+        return data
+    # 若后续接入可返回编辑后图片的 AI 接口,可在此调用
+    if api_base_url and api_key:
+        try:
+            out = _try_ai_remove_text(img, data, api_base_url, api_key, image_ext)
+            if out is not None:
+                return out
+        except Exception as e:
+            logger.warning("AI 去文字失败,回退 OpenCV: %s", e)
+    out_bgr = remove_text_opencv(img)
+    return _encode_image_bgr(out_bgr, image_ext)
+
+
+def _try_ai_remove_text(
+    img_bgr: np.ndarray,
+    original_bytes: bytes,
+    api_base_url: str,
+    api_key: str,
+    image_ext: str,
+) -> Optional[bytes]:
+    """
+    尝试用兼容 OpenAI 的 vision/chat 接口做“去叠加文字”的说明;
+    若接口支持图像编辑并返回图片,可在此实现。目前返回 None 表示不使用 AI 结果。
+    """
+    # 多数 chat/vision 接口只返回文本,不返回编辑后的图片,此处保留扩展点
+    return None

+ 183 - 0
src/main_window.py

@@ -0,0 +1,183 @@
+# -*- coding: utf-8 -*-
+"""PyQt5 主窗口:选择文件、保存路径、转化、进度条。"""
+import os
+from pathlib import Path
+
+from PyQt5.QtWidgets import (
+    QWidget,
+    QVBoxLayout,
+    QHBoxLayout,
+    QPushButton,
+    QLabel,
+    QFileDialog,
+    QProgressBar,
+    QMessageBox,
+    QApplication,
+    QMainWindow,
+    QFrame,
+)
+from PyQt5.QtCore import Qt, QThread, pyqtSignal
+from PyQt5.QtGui import QFont
+
+from .config_loader import get_ai_config
+from .excel_processor import process_excel
+
+
+class ConvertWorker(QThread):
+    """在后台线程执行 Excel 转化,避免阻塞界面。"""
+    progress = pyqtSignal(int, int, str)  # current, total, message
+    finished = pyqtSignal(str)            # 保存路径
+    error = pyqtSignal(str)
+
+    def __init__(self, input_path: str, output_dir: str, api_base_url=None, api_key=None):
+        super().__init__()
+        self.input_path = input_path
+        self.output_dir = output_dir
+        self.api_base_url = api_base_url
+        self.api_key = api_key
+
+    def run(self):
+        try:
+            def report(current: int, total: int, message: str = ""):
+                self.progress.emit(current, max(total, 1), message or "")
+
+            out_path = process_excel(
+                self.input_path,
+                self.output_dir,
+                api_base_url=self.api_base_url,
+                api_key=self.api_key,
+                progress_callback=report,
+            )
+            self.finished.emit(out_path)
+        except Exception as e:
+            self.error.emit(str(e))
+
+
+class MainWindow(QMainWindow):
+    def __init__(self):
+        super().__init__()
+        self.setWindowTitle("选品部图片去文字")
+        self.setMinimumWidth(560)
+        self.setMinimumHeight(260)
+
+        self._file_path = ""
+        self._save_dir = ""
+        self._worker = None
+
+        central = QWidget()
+        self.setCentralWidget(central)
+        layout = QVBoxLayout(central)
+        layout.setSpacing(12)
+
+        # 选择文件
+        btn_file = QPushButton("选择文件")
+        btn_file.setMinimumHeight(36)
+        btn_file.clicked.connect(self._on_select_file)
+        layout.addWidget(btn_file)
+
+        # 文件路径
+        row_path = QHBoxLayout()
+        lbl_path = QLabel("文件路径:")
+        lbl_path.setMinimumWidth(80)
+        self._lbl_file_path = QLabel("")
+        self._lbl_file_path.setWordWrap(True)
+        self._lbl_file_path.setStyleSheet("color: #333;")
+        row_path.addWidget(lbl_path)
+        row_path.addWidget(self._lbl_file_path, 1)
+        layout.addLayout(row_path)
+
+        # 保存路径
+        row_save = QHBoxLayout()
+        btn_save = QPushButton("保存路径")
+        btn_save.setMinimumHeight(32)
+        btn_save.clicked.connect(self._on_select_save_dir)
+        self._lbl_save_path = QLabel("")
+        self._lbl_save_path.setWordWrap(True)
+        self._lbl_save_path.setStyleSheet("color: #333;")
+        row_save.addWidget(btn_save)
+        row_save.addWidget(self._lbl_save_path, 1)
+        layout.addLayout(row_save)
+
+        # 转化按钮
+        btn_convert = QPushButton("转化")
+        btn_convert.setMinimumHeight(40)
+        btn_convert.clicked.connect(self._on_convert)
+        layout.addWidget(btn_convert)
+
+        # 进度条(初始隐藏)
+        self._progress = QProgressBar()
+        self._progress.setMinimum(0)
+        self._progress.setMaximum(100)
+        self._progress.setValue(0)
+        self._progress.setTextVisible(True)
+        self._progress.setVisible(False)
+        layout.addWidget(self._progress)
+
+        self._lbl_file_path.setText("")
+        self._lbl_save_path.setText("")
+
+    def _on_select_file(self):
+        path, _ = QFileDialog.getOpenFileName(
+            self,
+            "选择 Excel 文件",
+            "",
+            "Excel 文件 (*.xlsx *.xls);;所有文件 (*.*)",
+        )
+        if path:
+            self._file_path = path
+            self._lbl_file_path.setText(path)
+
+    def _on_select_save_dir(self):
+        path = QFileDialog.getExistingDirectory(self, "选择保存目录", "")
+        if path:
+            self._save_dir = path
+            self._lbl_save_path.setText(path)
+
+    def _on_convert(self):
+        if not self._file_path or not os.path.isfile(self._file_path):
+            QMessageBox.warning(self, "提示", "请先选择有效的 Excel 文件。")
+            return
+        if not self._save_dir or not os.path.isdir(self._save_dir):
+            QMessageBox.warning(self, "提示", "请先选择保存路径(目录)。")
+            return
+
+        ext = Path(self._file_path).suffix.lower()
+        if ext == ".xls":
+            QMessageBox.information(
+                self,
+                "提示",
+                "当前仅对 .xlsx 格式进行图片去文字处理;.xls 将尝试按 xlsx 方式处理,若失败请先另存为 xlsx。",
+            )
+
+        self._progress.setVisible(True)
+        self._progress.setValue(0)
+        self._progress.setFormat("%p%")
+
+        api_base, api_key = get_ai_config()
+        self._worker = ConvertWorker(
+            self._file_path,
+            self._save_dir,
+            api_base_url=api_base,
+            api_key=api_key,
+        )
+        self._worker.progress.connect(self._on_progress)
+        self._worker.finished.connect(self._on_finished)
+        self._worker.error.connect(self._on_error)
+        self._worker.start()
+
+    def _on_progress(self, current: int, total: int, message: str):
+        if total > 0:
+            self._progress.setMaximum(total)
+            self._progress.setValue(current)
+            if message:
+                self._progress.setFormat(f"%p% - {message}")
+
+    def _on_finished(self, out_path: str):
+        self._progress.setVisible(True)
+        self._progress.setValue(self._progress.maximum())
+        self._progress.setFormat("100% - 完成")
+        QMessageBox.information(self, "完成", f"已保存到:\n{out_path}")
+
+    def _on_error(self, err: str):
+        self._progress.setVisible(False)
+        QMessageBox.critical(self, "错误", f"转化失败:\n{err}")

BIN
template/raw_template.xlsx