LuTong 3 місяців тому
батько
коміт
d1089c4fb9
3 змінених файлів з 142 додано та 91 видалено
  1. 24 7
      src/excel_handler.py
  2. 21 2
      src/gui.py
  3. 97 82
      src/scraper.py

+ 24 - 7
src/excel_handler.py

@@ -42,30 +42,28 @@ def append_to_template(products, output_path, status_callback=None):
     """
     将产品数据追加写入到指定的 Excel 文件中。
     增加文件占用检测:如果文件被打开,则暂停任务直到关闭。
+    并在第二个 Sheet 中记录商品总数。
     """
-    # 1. 使用增强的路径获取函数寻找模板
     template_path = get_resource_path(os.path.join('templates', '【进价】产品信息空表.xlsx'))
     
     if not os.path.exists(template_path):
-        # 兜底:如果打包路径没找到,尝试当前目录
         template_path = os.path.join('templates', '【进价】产品信息空表.xlsx')
 
     if not os.path.exists(template_path):
         raise FileNotFoundError(f"未找到核心模板文件: {template_path}")
 
-    # 2. 加载文件
     if os.path.exists(output_path):
         wb = load_workbook(output_path)
     else:
         os.makedirs(os.path.dirname(output_path), exist_ok=True)
         wb = load_workbook(template_path)
     
+    # 1. 写入主数据 Sheet
     ws = wb.active
     
-    # 3. 寻找起始行 (基于第 11 列“产品链接”进行判定,防止覆盖)
+    # 寻找起始行 (基于第 11 列“产品链接”进行判定,防止覆盖)
     start_row = 3
     for r in range(3, ws.max_row + 2):
-        # 第 11 列是产品链接
         val_link = ws.cell(row=r, column=11).value
         if val_link is None or str(val_link).strip() == "":
             start_row = r
@@ -73,7 +71,12 @@ def append_to_template(products, output_path, status_callback=None):
     else:
         start_row = ws.max_row + 1
     
-    # 4. 写入本批次数据
+    # 获取当前已有的链接集合,用于后续统计唯一商品
+    existing_links = set()
+    for r in range(3, start_row):
+        link = ws.cell(row=r, column=11).value
+        if link: existing_links.add(str(link).strip())
+
     for i, product in enumerate(products):
         row = start_row + i
         ws.cell(row=row, column=1, value=row - 2) 
@@ -88,8 +91,22 @@ def append_to_template(products, output_path, status_callback=None):
         ws.cell(row=row, column=10, value=product.get('wholesale_price', ''))
         ws.cell(row=row, column=11, value=product.get('link', '')) 
         ws.cell(row=row, column=12, value=product.get('supplier', ''))
+        
+        link = product.get('link')
+        if link: existing_links.add(str(link).strip())
+
+    # 2. 写入/更新计数 Sheet (第二个 Sheet)
+    sheet_names = wb.sheetnames
+    if len(sheet_names) < 2:
+        wb.create_sheet("统计状态")
+    
+    ws_stat = wb["统计状态"]
+    ws_stat.cell(row=1, column=1, value="已解析商品总数")
+    ws_stat.cell(row=1, column=2, value=len(existing_links))
+    ws_stat.cell(row=2, column=1, value="最后更新时间")
+    ws_stat.cell(row=2, column=2, value=time.strftime("%Y-%m-%d %H:%M:%S"))
 
-    # 5. 核心改进:占用检测保存循环
+    # 3. 占用检测保存循环
     while True:
         try:
             wb.save(output_path)

+ 21 - 2
src/gui.py

@@ -53,14 +53,33 @@ class ScraperThread(QThread):
             # 使用流式生成器抓取
             collected_count = 0
             
+            # 记录初始商品数,用于断点续记
+            initial_p_count = 0
+            if os.path.exists(self.output_path):
+                try:
+                    import openpyxl
+                    wb_tmp = openpyxl.load_workbook(self.output_path, data_only=True)
+                    if "统计状态" in wb_tmp.sheetnames:
+                        initial_p_count = int(wb_tmp["统计状态"].cell(row=1, column=2).value or 0)
+                    wb_tmp.close()
+                except: pass
+
+            product_index = initial_p_count
+            
             for batch_results in scraper.search_products_yield(self.keyword, total_count=self.total_count, existing_links=existing_links):
                 # 实时写入 Excel (此时 batch_results 为 10 条或页末余数)
                 append_to_template(batch_results, self.output_path, status_callback=status_cb)
                 
+                # 计算本批次包含的独立商品数量并累加
+                unique_links_in_batch = len(set(item.get('link') for item in batch_results if item.get('link')))
+                product_index += unique_links_in_batch
                 collected_count += len(batch_results)
-                self.log.emit(f"[+] 新增数据已持久化: {len(batch_results)} 条,本次共计: {collected_count}")
                 
-                prog = int((collected_count / self.total_count) * 100)
+                self.log.emit(f"[+] 解析到第 {product_index} 个商品,新增数据已持久化: {len(batch_results)} 条,本次共计: {collected_count}")
+                
+                # 进度条基于本次任务的目标数量
+                current_task_done = product_index - initial_p_count
+                prog = int((current_task_done / self.total_count) * 100)
                 self.progress.emit(min(prog, 100))
             
             duration = time.time() - start_time

+ 97 - 82
src/scraper.py

@@ -13,39 +13,33 @@ except ImportError:
         def __str__(self): return str(self.v)
     v.LooseVersion = LooseVersion
 
-import time, random, re, os, subprocess, urllib.parse, json, traceback
+import time, random, re, os, subprocess, urllib.parse, json, traceback, socket
+from selenium import webdriver
 import undetected_chromedriver as uc 
 from selenium.webdriver.common.by import By
 from selenium.webdriver.common.action_chains import ActionChains
 from selenium_stealth import stealth
 
 class Scraper1688:
-    def __init__(self, headless=True, status_callback=None):
+    def __init__(self, headless=True, status_callback=None, log_callback=None):
         self.headless = headless
-        self.status_callback = status_callback # 用于回调 GUI 状态
+        self.status_callback = status_callback
+        self.log_callback = log_callback # 用于向 GUI 发送普通日志
         self.user_data_path = os.path.abspath(os.path.join(os.getcwd(), "1688_user_data"))
+        self.driver = None
+        
+        # 初始化 Chrome 环境
         self._cleanup()
-
-        def create_options():
-            options = uc.ChromeOptions()
-            options.add_argument(f'user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36')
-            options.add_argument(f"--user-data-dir={self.user_data_path}")
-            if headless: options.add_argument('--headless=new')
-            options.add_argument('--disable-blink-features=AutomationControlled')
-            options.add_argument("--window-size=1920,1080")
-            return options
-
-        try:
-            # 关键修复:每次启动都使用 create_options() 产生的全新对象
-            self.driver = uc.Chrome(options=create_options(), headless=headless, version_main=131)
-        except:
-            # 关键修复:这里也要用全新的 options 对象
-            self.driver = uc.Chrome(options=create_options(), headless=headless)
+        self._init_chrome(headless)
         
-        stealth(self.driver, languages=["zh-CN", "zh"], vendor="Google Inc.", platform="Win32", fix_hairline=True)
+        if self.driver:
+            stealth(self.driver, languages=["zh-CN", "zh"], vendor="Google Inc.", platform="Win32", fix_hairline=True)
 
     def _cleanup(self):
-        if os.name == 'nt': subprocess.call(['taskkill', '/F', '/IM', 'chrome.exe', '/T'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
+        """ 清理残留进程和锁定文件 """
+        if os.name == 'nt':
+            subprocess.call(['taskkill', '/F', '/IM', 'chrome.exe', '/T'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
+        
         if os.path.exists(self.user_data_path):
             for root, _, files in os.walk(self.user_data_path):
                 for f in files:
@@ -53,6 +47,32 @@ class Scraper1688:
                         try: os.remove(os.path.join(root, f))
                         except: pass
 
+    def _init_chrome(self, headless):
+        """ 强化版初始化:解决 session not created 连接失败问题 """
+        def create_options():
+            opts = uc.ChromeOptions()
+            opts.add_argument(f'user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36')
+            opts.add_argument(f"--user-data-dir={self.user_data_path}")
+            if headless: opts.add_argument('--headless=new')
+            opts.add_argument('--disable-blink-features=AutomationControlled')
+            opts.add_argument("--window-size=1920,1080")
+            # 解决 session not created 报错的关键参数
+            opts.add_argument("--no-sandbox")
+            opts.add_argument("--disable-dev-shm-usage")
+            opts.add_argument("--remote-allow-origins=*")
+            opts.add_argument("--disable-gpu")
+            return opts
+
+        try:
+            # 增加 use_subprocess=True,显著提升 Windows 下的启动稳定性
+            self.driver = uc.Chrome(options=create_options(), headless=headless, use_subprocess=True)
+        except Exception as e:
+            print(f"[*] 首次启动失败: {e},正在执行强制清理并重试...")
+            self._cleanup() # 彻底杀掉残留进程
+            time.sleep(2)
+            # 第二次尝试,使用 subprocess 模式
+            self.driver = uc.Chrome(options=create_options(), headless=headless, use_subprocess=True)
+
     def clean_url(self, url):
         """极其鲁棒的 1688 URL 清洗逻辑"""
         if not url: return ""
@@ -94,9 +114,6 @@ class Scraper1688:
                 # 3. 检测惩罚/验证提示页
                 is_punish = "punish" in url or "哎哟喂" in src or "验证码" in title or "验证提示" in title
                 
-                # 4. 检测是否被登出 (如果页面包含登录按钮且当前是详情/搜索页)
-                # 这部分可以根据实际情况增强,目前主要靠 URL 判定
-                
                 return is_slider or is_login or is_punish
             except: 
                 return False
@@ -117,7 +134,6 @@ class Scraper1688:
             time.sleep(3)
         return True
 
-    # def search_products_yield(self, keyword, total_count=200):
     def search_products_yield(self, keyword, total_count=200, existing_links=None):
         gbk_keyword = urllib.parse.quote(keyword, encoding='gbk')
         base_url = f"https://s.1688.com/selloffer/offer_search.htm?keywords={gbk_keyword}&n=y&netType=1%2C11%2C16"
@@ -195,15 +211,16 @@ class Scraper1688:
 
     def scrape_detail(self, url):
         """
-        根据 /refe/req.py 订正的详情页抓取逻辑
-        获取极其精准的商品属性和价格数据,并支持将“颜色分类”拆分为多行
+        极其精准的详情页抓取逻辑
+        支持获取特定 DOM 容器 (expand-view-list-wrapper) 中的款式文字 (item-label) 
+        及对应价格 (item-price-stock),并拆分为多行。
         """
         try:
             self.driver.get(url)
             time.sleep(2)
             self.check_for_captcha()
             
-            # 执行 JS 获取 1688 详情页背后的完整数据模型
+            # 1. 执行 JS 获取完整模型
             model = self.driver.execute_script(
                 "return (window.context && window.context.result && "
                 "window.context.result.global && window.context.result.global.globalData "
@@ -215,13 +232,10 @@ class Scraper1688:
                 return None
 
             def get_attr(name):
-                """从 featureAttributes 里取指定属性值"""
                 try:
-                    # 现代版
                     attrs = model.get("offerDetail", {}).get("featureAttributes", [])
                     for item in attrs:
                         if name in item.get("name", ""): return item.get("value", "")
-                    # 老版兼容
                     attrs = model.get("detailData", {}).get("attributes", [])
                     for item in attrs:
                         if name in item.get("attributeName", ""): return item.get("value", "")
@@ -229,89 +243,90 @@ class Scraper1688:
                 return ""
 
             def safe_text(by, sel):
-                try:
-                    return self.driver.find_element(by, sel).text.strip()
+                try: return self.driver.find_element(by, sel).text.strip()
                 except: return ""
 
-            # 价格处理逻辑
+            # 基础数据
             trade = model.get("tradeModel", {}) if isinstance(model, dict) else {}
-            price_min = trade.get("minPrice", "") or ""
-            price_max = trade.get("maxPrice", "") or ""
-            # 老版价格补丁
-            if not price_min:
-                try: price_min = model["sku"]["priceRange"][0][1]
-                except: pass
+            ranges = trade.get("disPriceRanges") or trade.get("currentPrices") or []
+            range_text = " / ".join([f"{r.get('beginAmount')}起 ¥{r.get('price') or r.get('discountPrice')}" for r in ranges])
 
-            begin_amount = trade.get("beginAmount", "")
-            
-            # 批发价区间
-            ranges = trade.get("disPriceRanges") or trade.get("currentPrices") or \
-                     trade.get("offerPriceModel", {}).get("currentPrices", [])
-            range_text = " / ".join(
-                [f"{r.get('beginAmount')}起 ¥{r.get('price') or r.get('discountPrice')}" for r in ranges]
-            ) if ranges else ""
-
-            # 基础数据模板
             base_data = {
                 "category": (model.get("offerDetail", {}).get("leafCategoryName", "") if isinstance(model, dict) else "")
                            or safe_text(By.CSS_SELECTOR, "div[class*=breadcrumb] a:last-child"),
                 "brand": get_attr("品牌"),
                 "name": (model.get("offerDetail", {}).get("subject", "") if isinstance(model, dict) else "")
-                         or safe_text(By.CSS_SELECTOR, "h1.d-title")
-                         or safe_text(By.CSS_SELECTOR, "h1[class*=title]"),
-                "color": "", # 待填充
+                         or safe_text(By.CSS_SELECTOR, "h1.d-title") or self.driver.title.split('-')[0],
+                "color": "", 
                 "spec": get_attr("尺码") or get_attr("规格") or get_attr("型号") or \
                         safe_text(By.XPATH, "//div[@id='productAttributes']//th[span='尺码' or span='规格']/following-sibling::td[1]//span[@class='field-value']"),
-                "material": get_attr("材质") or get_attr("面料") or \
-                            safe_text(By.XPATH, "//div[@id='productAttributes']//th[span='材质']/following-sibling::td[1]//span[@class='field-value']"),
-                "price": f"{price_min}-{price_max}" if price_min and price_max and price_min != price_max else f"{price_min}" if price_min else "",
-                "moq": begin_amount or safe_text(By.XPATH, "//div[@id='productAttributes']//th[span='起订量' or span='起批量']/following-sibling::td[1]//span[@class='field-value']"),
+                "material": get_attr("材质") or get_attr("面料"),
+                "price": "", 
+                "moq": trade.get("beginAmount", ""),
                 "wholesale_price": range_text,
                 "link": url,
-                "supplier": (model.get("sellerModel", {}).get("companyName", "") if isinstance(model, dict) else "")
-                           or safe_text(By.CSS_SELECTOR, "a.company-name")
-                           or safe_text(By.CSS_SELECTOR, "div.company-name"),
+                "supplier": (model.get("sellerModel", {}).get("companyName", "") if isinstance(model, dict) else ""),
             }
 
-            # --- 核心逻辑:拆分规格/颜色分类 ---
-            sku_props = []
+            # 2. 核心逻辑:智能识别并拆分变体 (款式描述 + 价格)
+            variant_data_list = []
             try:
-                # 尝试多种路径获取 SKU 属性
-                sku_props = model.get("skuModel", {}).get("skuProps", []) or \
-                            model.get("detailData", {}).get("skuProps", []) or \
-                            model.get("sku", {}).get("skuProps", [])
+                # 寻找特定 DOM 容器
+                wrappers = self.driver.find_elements(By.CLASS_NAME, "expand-view-list-wrapper")
+                if wrappers:
+                    # 获取该容器下的所有子项条目
+                    items = wrappers[0].find_elements(By.CSS_SELECTOR, ".expand-view-list-item, [class*='list-item'], .sku-item")
+                    
+                    if not items:
+                        # 兜底:直接寻找 label 和 price 元素并成对组合
+                        labels = wrappers[0].find_elements(By.CLASS_NAME, "item-label")
+                        prices = wrappers[0].find_elements(By.CLASS_NAME, "item-price-stock")
+                        for l, p in zip(labels, prices):
+                            variant_data_list.append({"label": l.text.strip(), "price": p.text.strip()})
+                    else:
+                        for item_el in items:
+                            try:
+                                label = item_el.find_element(By.CLASS_NAME, "item-label").text.strip()
+                                price = item_el.find_element(By.CLASS_NAME, "item-price-stock").text.strip()
+                                if label: variant_data_list.append({"label": label, "price": price})
+                            except: continue
             except: pass
 
-            # 智能寻找主维度:
-            # 1. 优先找包含“颜色”、“分类”、“款式”、“花色”的维度
-            # 2. 如果没有,则取第一个 SKU 维度(例如“净含量”、“规格”等)
-            main_prop = None
-            if sku_props:
-                main_prop = next((p for p in sku_props if any(k in p.get("prop", "") for k in ["颜色", "分类", "款式", "花色"])), None)
-                if not main_prop:
-                    main_prop = sku_props[0]
+            if variant_data_list:
+                results = []
+                for vd in variant_data_list:
+                    row = base_data.copy()
+                    row["color"] = vd["label"] # 款式描述填入“颜色”列
+                    # 清洗价格,只保留数字
+                    clean_p = re.sub(r'[^\d.]', '', vd["price"])
+                    row["price"] = clean_p if clean_p else vd["price"]
+                    results.append(row)
+                return results
+
+            # 3. 如果 DOM 探测失败,尝试 JS 模型变体拆分
+            sku_props = model.get("skuModel", {}).get("skuProps", []) or model.get("detailData", {}).get("skuProps", []) or []
+            main_prop = next((p for p in sku_props if any(k in p.get("prop", "") for k in ["颜色", "分类", "款式", "花色", "净含量"])), None)
+            if not main_prop and sku_props: main_prop = sku_props[0]
             
             if main_prop and main_prop.get("value"):
                 variant_results = []
                 for val in main_prop["value"]:
-                    # 只有当该分类确实有名字时才记录
-                    variant_name = val.get("name")
-                    if variant_name:
+                    if val.get("name"):
                         row = base_data.copy()
-                        row["color"] = variant_name
+                        row["color"] = val.get("name")
+                        # 此处尝试获取该变体对应的价格(如果模型中有)
+                        row["price"] = trade.get("minPrice", "")
                         variant_results.append(row)
                 return variant_results
             else:
-                # 兜底:如果没有发现规格选择区,则获取单属性颜色
+                # 最终兜底
                 base_data["color"] = get_attr("颜色") or get_attr("颜色分类") or ""
+                base_data["price"] = trade.get("minPrice", "")
                 return [base_data]
 
         except Exception as e:
             print(f"[!] 详情页抓取异常 ({url}): {e}")
             return None
-        except Exception as e:
-            print(f"[!] 详情页抓取异常 ({url}): {e}")
-            return None
 
     def _extract_all_methods(self):
         """三位一体提取法:JSON + DOM + 深度搜索"""