LuTong 3 月之前
父節點
當前提交
dd1275ba8d
共有 5 個文件被更改,包括 290 次插入86 次删除
  1. 二進制
      app.ico
  2. 1 0
      bash/pack.bash
  3. 51 29
      src/excel_handler.py
  4. 41 17
      src/gui.py
  5. 197 40
      src/scraper.py

二進制
app.ico


+ 1 - 0
bash/pack.bash

@@ -0,0 +1 @@
+pyinstaller --noconsole --onedir --add-data "templates;templates" --paths . --collect-all selenium_stealth --icon="app.ico" --name "1688_Product_Scraper" --clean src/main.py

+ 51 - 29
src/excel_handler.py

@@ -1,57 +1,79 @@
-import pandas as pd
-from openpyxl import load_workbook
+import sys
 import os
+import time
+from openpyxl import load_workbook
 
-def append_to_template(products, output_path):
+def get_resource_path(relative_path):
+    """ 获取资源绝对路径,兼容开发环境和 PyInstaller 打包环境 """
+    if hasattr(sys, '_MEIPASS'):
+        # PyInstaller 打包后的临时解压路径
+        return os.path.join(sys._MEIPASS, relative_path)
+    # 开发环境下的路径
+    base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
+    return os.path.join(base_dir, relative_path)
+
+def append_to_template(products, output_path, status_callback=None):
     """
     将产品数据追加写入到指定的 Excel 文件中。
-    如果文件不存在,则以 '【进价】产品信息空表.xlsx' 为模板创建。
+    增加文件占用检测:如果文件被打开,则暂停任务直到关闭
     """
-    # 1. 确定模板路径 (只找这个特定文件名)
-    base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
-    template_path = os.path.join(base_dir, 'templates', '【进价】产品信息空表.xlsx')
+    # 1. 使用增强的路径获取函数寻找模板
+    template_path = get_resource_path(os.path.join('templates', '【进价】产品信息空表.xlsx'))
     
     if not os.path.exists(template_path):
+        # 兜底:如果打包路径没找到,尝试当前目录
+        template_path = os.path.join('templates', '【进价】产品信息空表.xlsx')
+
+    if not os.path.exists(template_path):
         raise FileNotFoundError(f"未找到核心模板文件: {template_path}")
 
-    # 2. 如果目标输出文件已存在,则加载它;否则加载模板
+    # 2. 加载文件
     if os.path.exists(output_path):
         wb = load_workbook(output_path)
     else:
-        # 确保输出目录存在
         os.makedirs(os.path.dirname(output_path), exist_ok=True)
         wb = load_workbook(template_path)
     
     ws = wb.active
     
-    # 3. 寻找起始行 (跳过前两行标题和表头,从有数据的最后一行之后开始)
-    # 假设 A 列(编码) 是必填的,用来判断行数
-    last_row = 2
+    # 3. 寻找起始行 (品类或名称为空的行)
+    start_row = 3
     for r in range(3, ws.max_row + 2):
-        if ws.cell(row=r, column=1).value is None:
-            last_row = r - 1
+        val_cat = ws.cell(row=r, column=2).value
+        val_name = ws.cell(row=r, column=4).value
+        if (val_cat is None or str(val_cat).strip() == "") and \
+           (val_name is None or str(val_name).strip() == ""):
+            start_row = r
             break
-    
-    start_row = last_row + 1
+    else:
+        start_row = ws.max_row + 1
     
     # 4. 写入本批次数据
     for i, product in enumerate(products):
         row = start_row + i
-        # 编码按照总行数自增
         ws.cell(row=row, column=1, value=row - 2) 
         ws.cell(row=row, column=2, value=product.get('category', ''))
         ws.cell(row=row, column=3, value=product.get('brand', ''))
         ws.cell(row=row, column=4, value=product.get('name', ''))
-        ws.cell(row=row, column=5, value=product.get('image', ''))
-        ws.cell(row=row, column=6, value=product.get('color', ''))
-        ws.cell(row=row, column=7, value=product.get('spec', ''))
-        ws.cell(row=row, column=8, value=product.get('material', ''))
-        ws.cell(row=row, column=9, value=product.get('price', ''))
-        ws.cell(row=row, column=10, value=product.get('moq', ''))
-        ws.cell(row=row, column=11, value=product.get('wholesale_price', ''))
-        ws.cell(row=row, column=12, value=product.get('desc', ''))
-        ws.cell(row=row, column=13, value=product.get('link', '')) # 已清洗的干净链接
-        ws.cell(row=row, column=14, value=product.get('supplier', ''))
+        ws.cell(row=row, column=5, value=product.get('color', ''))
+        ws.cell(row=row, column=6, value=product.get('spec', ''))
+        ws.cell(row=row, column=7, value=product.get('material', ''))
+        ws.cell(row=row, column=8, value=product.get('price', ''))
+        ws.cell(row=row, column=9, value=product.get('moq', ''))
+        ws.cell(row=row, column=10, value=product.get('wholesale_price', ''))
+        ws.cell(row=row, column=11, value=product.get('link', '')) 
+        ws.cell(row=row, column=12, value=product.get('supplier', ''))
 
-    wb.save(output_path)
-    print(f"[*] 批次数据({len(products)}条)已写入: {output_path}")
+    # 5. 核心改进:占用检测保存循环
+    while True:
+        try:
+            wb.save(output_path)
+            if status_callback:
+                status_callback(False, "写入成功")
+            break
+        except PermissionError:
+            msg = "文件被占用,请关闭 Excel"
+            print(f"[!] {msg}: {output_path}")
+            if status_callback:
+                status_callback(True, msg)
+            time.sleep(3)

+ 41 - 17
src/gui.py

@@ -15,8 +15,8 @@ from src.excel_handler import append_to_template
 class ScraperThread(QThread):
     progress = pyqtSignal(int)
     log = pyqtSignal(str)
-    # finished 信号改为只传错误信息和 scraper 对象,数据已实时写入
-    finished = pyqtSignal(str, object)
+    # finished 信号增加耗时参数 (秒)
+    finished = pyqtSignal(str, object, float)
 
     def __init__(self, keyword, output_path, headless=True):
         super().__init__()
@@ -26,31 +26,42 @@ class ScraperThread(QThread):
 
     def run(self):
         scraper = None
+        start_time = time.time()
         try:
             self.log.emit(f"<b>[*] 任务启动: {self.keyword}</b>")
-            scraper = Scraper1688(headless=self.headless)
+            
+            def status_cb(is_waiting, msg):
+                if is_waiting:
+                    self.log.emit(f"<font color='red' size='5'><b>!!! {msg} !!!</b></font>")
+                else:
+                    self.log.emit(f"<font color='green'><b>[√] {msg}</b></font>")
+
+            scraper = Scraper1688(headless=self.headless, status_callback=status_cb)
             
             # 使用流式生成器抓取
-            total_target = 200
+            total_target = 20
+            # total_target = 200
             collected_count = 0
             
-            for page_results in scraper.search_products_yield(self.keyword, total_count=total_target):
-                # 实时写入 Excel
-                append_to_template(page_results, self.output_path)
+            for batch_results in scraper.search_products_yield(self.keyword, total_count=total_target):
+                # 实时写入 Excel (此时 batch_results 为 10 条或页末余数)
+                append_to_template(batch_results, self.output_path)
                 
-                collected_count += len(page_results)
-                self.log.emit(f"[+] 批次写入成功: {len(page_results)} 条,当前总计: {collected_count}")
+                collected_count += len(batch_results)
+                self.log.emit(f"[+] 数据已持久化: {len(batch_results)} 条,当前总计: {collected_count}")
                 
                 prog = int((collected_count / total_target) * 100)
-                self.progress.emit(prog)
+                self.progress.emit(min(prog, 100))
             
+            duration = time.time() - start_time
             self.log.emit(f"<b>[完成] 任务结束,共抓取 {collected_count} 条数据。</b>")
-            self.finished.emit("", scraper)
+            self.log.emit(f"<b>[耗时] 处理总时间: {duration:.2f} 秒</b>")
+            self.finished.emit("", scraper, duration)
         except Exception as e:
+            duration = time.time() - start_time
             err = traceback.format_exc()
-            print(err)
             self.log.emit(f"<font color='red'>[错误] {str(e)}</font>")
-            self.finished.emit(err, scraper)
+            self.finished.emit(err, scraper, duration)
 
 class MainWindow(QMainWindow):
     def __init__(self):
@@ -178,14 +189,16 @@ class MainWindow(QMainWindow):
             return
         
         target_dir = os.path.join(self.output_base_path, "选品", self.selected_category_1)
-        file_path = os.path.join(target_dir, f"{self.selected_category_2}.xlsx")
+        file_path = os.path.normpath(os.path.join(target_dir, f"{self.selected_category_2}.xlsx"))
+        self.current_output_file = file_path # 记录当前文件用于最后打开
         
-        # 启动抓取前清理旧文件,确保从模板重新开始
+        # 启动抓取前清理旧文件
         if os.path.exists(file_path):
             try: os.remove(file_path)
             except: pass
 
         self.search_btn.setEnabled(False)
+        self.status_label.setText("处理中……")
         self.log_output.clear()
         self.pbar.setValue(0)
         headless = not self.show_browser_cb.isChecked()
@@ -196,10 +209,21 @@ class MainWindow(QMainWindow):
         self.thread.finished.connect(self.on_finished)
         self.thread.start()
 
-    def on_finished(self, err, scraper):
+    def on_finished(self, err, scraper, duration):
         self.search_btn.setEnabled(True)
         if scraper: self.active_scraper = scraper
-        self.status_label.setText("任务完成" if not err else "异常终止")
+        
+        if not err:
+            self.status_label.setText("任务完成")
+            # 自动打开目标文件
+            if hasattr(self, 'current_output_file') and os.path.exists(self.current_output_file):
+                try:
+                    os.startfile(self.current_output_file)
+                    self.log_output.append(f"<font color='blue'>[系统] 已自动打开结果文件</font>")
+                except Exception as e:
+                    self.log_output.append(f"<font color='orange'>[警告] 无法自动打开文件: {e}</font>")
+        else:
+            self.status_label.setText("异常终止")
 
 if __name__ == "__main__":
     app = QApplication(sys.argv)

+ 197 - 40
src/scraper.py

@@ -20,8 +20,9 @@ from selenium.webdriver.common.action_chains import ActionChains
 from selenium_stealth import stealth
 
 class Scraper1688:
-    def __init__(self, headless=True):
+    def __init__(self, headless=True, status_callback=None):
         self.headless = headless
+        self.status_callback = status_callback # 用于回调 GUI 状态
         self.user_data_path = os.path.abspath(os.path.join(os.getcwd(), "1688_user_data"))
         self._cleanup()
         options = uc.ChromeOptions()
@@ -67,79 +68,235 @@ class Scraper1688:
         return url
 
     def check_for_captcha(self):
+        """
+        核心监控:检测登录、滑块验证、访问受限等需要人工干预的状态
+        """
         def is_blocked():
             try:
-                src, url, title = self.driver.page_source.lower(), self.driver.current_url.lower(), self.driver.title.lower()
+                url = self.driver.current_url.lower()
+                src = self.driver.page_source.lower()
+                title = self.driver.title.lower()
+                
+                # 1. 检测滑块验证码
                 sliders = self.driver.find_elements(By.ID, "nc_1_n1z")
                 is_slider = len(sliders) > 0 and sliders[0].is_displayed()
-                return is_slider or "punish" in url or "哎哟喂" in src or "验证码" in title
-            except: return False
+                
+                # 2. 检测登录页面 (如果跳转到了登录页)
+                is_login = "login.1688.com" in url or "passport.1688.com" in url
+                
+                # 3. 检测惩罚/验证提示页
+                is_punish = "punish" in url or "哎哟喂" in src or "验证码" in title or "验证提示" in title
+                
+                # 4. 检测是否被登出 (如果页面包含登录按钮且当前是详情/搜索页)
+                # 这部分可以根据实际情况增强,目前主要靠 URL 判定
+                
+                return is_slider or is_login or is_punish
+            except: 
+                return False
         
         if is_blocked():
-            print("\n[!] 触发拦截,请手动完成验证...")
-            while is_blocked(): time.sleep(2)
-            print("[+] 验证通过!")
+            msg = "请登录验证"
+            print(f"\n[!] {msg}...")
+            if self.status_callback:
+                self.status_callback(True, msg)
+            
+            # 持续监控,直到上述所有拦截状态消失
+            while is_blocked():
+                time.sleep(2)
+                
+            if self.status_callback:
+                self.status_callback(False, "验证通过")
+            print("\n[OK] 监测到人工干预已完成,3秒后恢复自动抓取...")
             time.sleep(3)
         return True
 
-    def search_products_yield(self, keyword, total_count=200):
+    # def search_products_yield(self, keyword, total_count=200):
+    def search_products_yield(self, keyword, total_count=20):
         gbk_keyword = urllib.parse.quote(keyword, encoding='gbk')
         base_url = f"https://s.1688.com/selloffer/offer_search.htm?keywords={gbk_keyword}&n=y&netType=1%2C11%2C16"
         
+        # 初始检查:确保在开始抓取前没被拦截(比如没登录)
+        self.driver.get("https://www.1688.com")
+        self.check_for_captcha()
+
         all_links = set()
         page = 1
         consecutive_empty_pages = 0
         
         while len(all_links) < total_count and consecutive_empty_pages < 3:
-            print(f"[*] 正在抓取第 {page} 页...")
+            print(f"[*] 正在抓取列表页: 第 {page} 页...")
             target_url = f"{base_url}&beginPage={page}&page={page}"
             self.driver.get(target_url)
             
             # 关键:首屏强制等待渲染
-            time.sleep(10)
+            time.sleep(5)
             self.check_for_captcha()
 
-            # 深度滚动
-            for i in range(1, 6):
-                self.driver.execute_script(f"window.scrollTo(0, document.body.scrollHeight * {i/5});")
-                time.sleep(1.2)
+            # 深度滚动确保加载
+            for i in range(1, 4):
+                self.driver.execute_script(f"window.scrollTo(0, document.body.scrollHeight * {i/3});")
+                time.sleep(1)
 
             page_results = self._extract_all_methods()
             
-            new_batch = []
+            page_batch = []
             for it in page_results:
-                it["link"] = self.clean_url(it["link"])
-                if it["link"] and it["link"] not in all_links:
-                    all_links.add(it["link"])
-                    new_batch.append(it)
+                clean_url = self.clean_url(it["link"])
+                if clean_url and clean_url not in all_links:
+                    all_links.add(clean_url)
+                    
+                    # 核心改进:进入详情页抓取精准数据
+                    print(f"  [>] 抓取详情: {clean_url}")
+                    detail_results = self.scrape_detail(clean_url)
+                    if detail_results:
+                        # detail_results 现在是一个列表 (包含多个颜色分类)
+                        page_batch.extend(detail_results)
+                    else:
+                        # 兜底
+                        it["link"] = clean_url
+                        page_batch.append({
+                            "category": "", "brand": "", "name": it["name"],
+                            "color": "", "spec": "", "material": "", "price": it["price"],
+                            "moq": "", "wholesale_price": "", "link": clean_url, "supplier": ""
+                        })
+                    
+                    # 每满 10 条 yield 一次
+                    if len(page_batch) >= 10:
+                        yield page_batch
+                        page_batch = []
+
+                    # 详情页抓取后的随机等待
+                    time.sleep(random.uniform(2, 4))
+                    
+                    if len(all_links) >= total_count:
+                        break
             
-            if new_batch:
-                consecutive_empty_pages = 0
-                yield new_batch
-            else:
-                print(f"[-] 第 {page} 页未发现新数据,尝试刷新重试...")
-                self.driver.refresh()
-                time.sleep(8)
-                retry_results = self._extract_all_methods()
-                new_retry = []
-                for it in retry_results:
-                    it["link"] = self.clean_url(it["link"])
-                    if it["link"] and it["link"] not in all_links:
-                        all_links.add(it["link"]); new_retry.append(it)
-                
-                if new_retry:
-                    yield new_retry
-                else:
-                    consecutive_empty_pages += 1
-                    print(f"[!] 连续 {consecutive_empty_pages} 页无数据")
+            # 每页结束,将不足 10 条的余数 yield 出去
+            if page_batch:
+                yield page_batch
+                page_batch = []
 
             page += 1
             if len(all_links) < total_count:
-                print(f"[*] 累计抓取: {len(all_links)} 条,准备翻页...")
-                time.sleep(5)
+                print(f"[*] 累计已处理: {len(all_links)} 条,准备翻下一页...")
+                time.sleep(3)
 
         return list(all_links)
 
+    def scrape_detail(self, url):
+        """
+        根据 /refe/req.py 订正的详情页抓取逻辑
+        获取极其精准的商品属性和价格数据,并支持将“颜色分类”拆分为多行
+        """
+        try:
+            self.driver.get(url)
+            time.sleep(2)
+            self.check_for_captcha()
+            
+            # 执行 JS 获取 1688 详情页背后的完整数据模型
+            model = self.driver.execute_script(
+                "return (window.context && window.context.result && "
+                "window.context.result.global && window.context.result.global.globalData "
+                "&& window.context.result.global.globalData.model) || "
+                "window.__INITIAL_DATA__ || window.iDetailData || window.iDetailConfig || null;"
+            )
+            
+            if not model:
+                return None
+
+            def get_attr(name):
+                """从 featureAttributes 里取指定属性值"""
+                try:
+                    # 现代版
+                    attrs = model.get("offerDetail", {}).get("featureAttributes", [])
+                    for item in attrs:
+                        if name in item.get("name", ""): return item.get("value", "")
+                    # 老版兼容
+                    attrs = model.get("detailData", {}).get("attributes", [])
+                    for item in attrs:
+                        if name in item.get("attributeName", ""): return item.get("value", "")
+                except: pass
+                return ""
+
+            def safe_text(by, sel):
+                try:
+                    return self.driver.find_element(by, sel).text.strip()
+                except: return ""
+
+            # 价格处理逻辑
+            trade = model.get("tradeModel", {}) if isinstance(model, dict) else {}
+            price_min = trade.get("minPrice", "") or ""
+            price_max = trade.get("maxPrice", "") or ""
+            # 老版价格补丁
+            if not price_min:
+                try: price_min = model["sku"]["priceRange"][0][1]
+                except: pass
+
+            begin_amount = trade.get("beginAmount", "")
+            
+            # 批发价区间
+            ranges = trade.get("disPriceRanges") or trade.get("currentPrices") or \
+                     trade.get("offerPriceModel", {}).get("currentPrices", [])
+            range_text = " / ".join(
+                [f"{r.get('beginAmount')}起 ¥{r.get('price') or r.get('discountPrice')}" for r in ranges]
+            ) if ranges else ""
+
+            # 基础数据模板
+            base_data = {
+                "category": (model.get("offerDetail", {}).get("leafCategoryName", "") if isinstance(model, dict) else "")
+                           or safe_text(By.CSS_SELECTOR, "div[class*=breadcrumb] a:last-child"),
+                "brand": get_attr("品牌"),
+                "name": (model.get("offerDetail", {}).get("subject", "") if isinstance(model, dict) else "")
+                         or safe_text(By.CSS_SELECTOR, "h1.d-title")
+                         or safe_text(By.CSS_SELECTOR, "h1[class*=title]"),
+                "color": "", # 待填充
+                "spec": get_attr("尺码") or get_attr("规格") or get_attr("型号") or \
+                        safe_text(By.XPATH, "//div[@id='productAttributes']//th[span='尺码' or span='规格']/following-sibling::td[1]//span[@class='field-value']"),
+                "material": get_attr("材质") or get_attr("面料") or \
+                            safe_text(By.XPATH, "//div[@id='productAttributes']//th[span='材质']/following-sibling::td[1]//span[@class='field-value']"),
+                "price": f"{price_min}-{price_max}" if price_min and price_max and price_min != price_max else f"{price_min}" if price_min else "",
+                "moq": begin_amount or safe_text(By.XPATH, "//div[@id='productAttributes']//th[span='起订量' or span='起批量']/following-sibling::td[1]//span[@class='field-value']"),
+                "wholesale_price": range_text,
+                "link": url,
+                "supplier": (model.get("sellerModel", {}).get("companyName", "") if isinstance(model, dict) else "")
+                           or safe_text(By.CSS_SELECTOR, "a.company-name")
+                           or safe_text(By.CSS_SELECTOR, "div.company-name"),
+            }
+
+            # --- 核心逻辑:拆分颜色分类 ---
+            sku_props = []
+            try:
+                # 尝试多种路径获取 SKU 属性
+                sku_props = model.get("skuModel", {}).get("skuProps", []) or \
+                            model.get("detailData", {}).get("skuProps", []) or \
+                            model.get("sku", {}).get("skuProps", [])
+            except: pass
+
+            # 寻找“颜色分类”或类似的属性
+            color_prop = next((p for p in sku_props if p.get("prop") in ["颜色", "颜色分类", "花色"]), None)
+            
+            if color_prop and color_prop.get("value"):
+                variant_results = []
+                for val in color_prop["value"]:
+                    # 只有当该分类确实有名字时才记录
+                    c_name = val.get("name")
+                    if c_name:
+                        row = base_data.copy()
+                        row["color"] = c_name
+                        variant_results.append(row)
+                return variant_results
+            else:
+                # 兜底:如果没有 SKU 拆分,则尝试获取单属性颜色
+                base_data["color"] = get_attr("颜色") or get_attr("颜色分类") or ""
+                return [base_data]
+
+        except Exception as e:
+            print(f"[!] 详情页抓取异常 ({url}): {e}")
+            return None
+        except Exception as e:
+            print(f"[!] 详情页抓取异常 ({url}): {e}")
+            return None
+
     def _extract_all_methods(self):
         """三位一体提取法:JSON + DOM + 深度搜索"""
         results = []