LuTong 2 сар өмнө
parent
commit
e2521309bc
3 өөрчлөгдсөн 91 нэмэгдсэн , 161 устгасан
  1. 14 42
      src/excel_handler.py
  2. 10 37
      src/gui.py
  3. 67 82
      src/scraper.py

+ 14 - 42
src/excel_handler.py

@@ -1,79 +1,54 @@
-# 【更新时间:2026-01-16 10:00
+# 【版本:2026-01-16 极致稳定版
 import sys
 import os
 import time
 from openpyxl import load_workbook
 
 def get_resource_path(relative_path):
-    """ 获取资源绝对路径,兼容开发环境和 PyInstaller 打包环境 """
     if hasattr(sys, '_MEIPASS'):
         return os.path.join(sys._MEIPASS, relative_path)
     base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
     return os.path.join(base_dir, relative_path)
 
 def get_existing_info(file_path):
-    """
-    读取已有文件中的链接和最后一行编码
-    """
     links = set()
     last_code = 0
     if not os.path.exists(file_path):
         return links, last_code
-    
     try:
         wb = load_workbook(file_path, data_only=True)
         ws = wb.active
-        # A 列是编码,K 列是链接
         for r in range(3, ws.max_row + 1):
             code_val = ws.cell(row=r, column=1).value
             link_val = ws.cell(row=r, column=11).value
-            
-            if link_val:
-                links.add(str(link_val).strip())
-            
-            if isinstance(code_val, (int, float)):
-                last_code = max(last_code, int(code_val))
-    except:
-        pass
+            if link_val: links.add(str(link_val).strip())
+            if isinstance(code_val, (int, float)): last_code = max(last_code, int(code_val))
+    except: pass
     return links, last_code
 
 def append_to_template(products, output_path, status_callback=None):
-    """
-    将产品数据追加写入到指定的 Excel 文件中。
-    并在第二个 Sheet 中记录商品总数用于断点续爬统计。
-    """
     template_path = get_resource_path(os.path.join('templates', '【进价】产品信息空表.xlsx'))
-    
-    if not os.path.exists(template_path):
-        template_path = os.path.join('templates', '【进价】产品信息空表.xlsx')
-
-    if not os.path.exists(template_path):
-        raise FileNotFoundError(f"未找到核心模板文件: {template_path}")
+    if not os.path.exists(template_path): template_path = os.path.join('templates', '【进价】产品信息空表.xlsx')
+    if not os.path.exists(template_path): raise FileNotFoundError(f"未找到模板: {template_path}")
 
-    if os.path.exists(output_path):
-        wb = load_workbook(output_path)
+    if os.path.exists(output_path): wb = load_workbook(output_path)
     else:
         os.makedirs(os.path.dirname(output_path), exist_ok=True)
         wb = load_workbook(template_path)
     
-    # 1. 写入主数据 Sheet
     ws = wb.active
-    
-    # 寻找起始行 (基于第 11 列“产品链接”判定)
     start_row = 3
     for r in range(3, ws.max_row + 2):
         val_link = ws.cell(row=r, column=11).value
         if val_link is None or str(val_link).strip() == "":
             start_row = r
             break
-    else:
-        start_row = ws.max_row + 1
+    else: start_row = ws.max_row + 1
     
-    # 获取已有链接用于 Sheet2 统计
-    all_links = set()
+    current_links = set()
     for r in range(3, start_row):
-        l = ws.cell(row=r, column=11).value
-        if l: all_links.add(str(l).strip())
+        link = ws.cell(row=r, column=11).value
+        if link: current_links.add(str(link).strip())
 
     for i, product in enumerate(products):
         row = start_row + i
@@ -89,18 +64,15 @@ def append_to_template(products, output_path, status_callback=None):
         ws.cell(row=row, column=10, value=product.get('wholesale_price', ''))
         ws.cell(row=row, column=11, value=product.get('link', '')) 
         ws.cell(row=row, column=12, value=product.get('supplier', ''))
-        if product.get('link'): all_links.add(str(product['link']).strip())
+        if product.get('link'): current_links.add(str(product['link']).strip())
 
-    # 2. 写入/更新统计 Sheet
-    if "统计状态" not in wb.sheetnames:
-        wb.create_sheet("统计状态")
+    if "统计状态" not in wb.sheetnames: wb.create_sheet("统计状态")
     ws_stat = wb["统计状态"]
     ws_stat.cell(row=1, column=1, value="已解析商品总数")
-    ws_stat.cell(row=1, column=2, value=len(all_links))
+    ws_stat.cell(row=1, column=2, value=len(current_links))
     ws_stat.cell(row=2, column=1, value="最后更新时间")
     ws_stat.cell(row=2, column=2, value=time.strftime("%Y-%m-%d %H:%M:%S"))
 
-    # 3. 占用检测循环保存
     while True:
         try:
             wb.save(output_path)

+ 10 - 37
src/gui.py

@@ -1,4 +1,4 @@
-# 【更新时间:2026-01-16 10:00
+# 【版本:2026-01-16 极致稳定版
 import sys
 import os
 import time
@@ -14,8 +14,7 @@ from src.scraper import Scraper1688
 from src.excel_handler import append_to_template, get_existing_info
 
 def get_resource_path(relative_path):
-    if hasattr(sys, '_MEIPASS'):
-        return os.path.join(sys._MEIPASS, relative_path)
+    if hasattr(sys, '_MEIPASS'): return os.path.join(sys._MEIPASS, relative_path)
     return os.path.join(os.getcwd(), relative_path)
 
 class ScraperThread(QThread):
@@ -35,49 +34,38 @@ class ScraperThread(QThread):
         start_time = time.time()
         try:
             existing_links, _ = get_existing_info(self.output_path)
-            
-            # 从 Excel 统计页获取起始商品序号
             initial_p_count = 0
             if os.path.exists(self.output_path):
                 try:
                     import openpyxl
                     wb_tmp = openpyxl.load_workbook(self.output_path, data_only=True)
                     if "统计状态" in wb_tmp.sheetnames:
-                        initial_p_count = int(wb_tmp["统计状态"].cell(row=1, column=2).value or 0)
+                        val = wb_tmp["统计状态"].cell(row=1, column=2).value
+                        initial_p_count = int(val) if val is not None else 0
                     wb_tmp.close()
                 except: pass
 
             self.log.emit(f"<b>[*] 任务启动: {self.keyword}</b>")
-            if initial_p_count > 0:
-                self.log.emit(f"[*] 发现已有商品记录: {initial_p_count} 条,将接力计数...")
-
             def status_cb(is_waiting, msg):
                 if is_waiting: self.log.emit(f"<font color='red' size='5'><b>!!! {msg} !!!</b></font>")
                 else: self.log.emit(f"<font color='green'><b>[√] {msg}</b></font>")
 
             scraper = Scraper1688(headless=self.headless, status_callback=status_cb, log_callback=self.log.emit)
-            
-            collected_count = 0 # 本次抓取的数据行数
-            product_index = initial_p_count # 商品总序号计数
+            collected_count = 0
+            product_index = initial_p_count
             
             for batch_results in scraper.search_products_yield(self.keyword, total_count=self.total_count, existing_links=existing_links):
                 append_to_template(batch_results, self.output_path, status_callback=status_cb)
-                
-                # 计算本次批次涉及的独立商品数
                 unique_links = len(set(item.get('link') for item in batch_results if item.get('link')))
                 product_index += unique_links
                 collected_count += len(batch_results)
-                
-                # 订正后的日志文字格式
                 self.log.emit(f"[+] 解析到第 {product_index} 个商品,新增数据已持久化: {len(batch_results)} 条,本次共计: {collected_count}")
-                
-                # 进度条基于本次任务新增的商品数
-                task_progress = product_index - initial_p_count
-                prog = int((task_progress / self.total_count) * 100)
+                current_task_done = product_index - initial_p_count
+                prog = int((current_task_done / self.total_count) * 100)
                 self.progress.emit(min(prog, 100))
             
             duration = time.time() - start_time
-            self.log.emit(f"<b>[完成] 任务结束,本次共解析 {product_index - initial_p_count} 个商品。</b>")
+            self.log.emit(f"<b>[完成] 任务结束,本次新增抓取 {collected_count} 条数据。</b>")
             self.log.emit(f"<b>[耗时] 处理总时间: {duration:.2f} 秒</b>")
             self.finished.emit("", scraper, duration)
         except Exception as e:
@@ -101,12 +89,9 @@ class MainWindow(QMainWindow):
         self.setGeometry(100, 100, 1100, 750)
         icon_path = get_resource_path("app.ico")
         if os.path.exists(icon_path): self.setWindowIcon(QIcon(icon_path))
-
         central_widget = QWidget()
         self.setCentralWidget(central_widget)
         main_layout = QHBoxLayout(central_widget)
-
-        # 左侧类目树
         left_widget = QWidget()
         left_layout = QVBoxLayout(left_widget)
         self.load_category_btn = QPushButton("选择类目文件")
@@ -119,17 +104,13 @@ class MainWindow(QMainWindow):
         left_layout.addWidget(QLabel("<b>商品类目树</b>"))
         left_layout.addWidget(self.load_category_btn)
         left_layout.addWidget(self.category_tree)
-
-        # 右侧操作区
         right_widget = QWidget()
         right_layout = QVBoxLayout(right_widget)
-
         opt_layout = QHBoxLayout()
         self.show_browser_cb = QCheckBox("显示浏览器界面 (手动过验证时勾选)")
         self.show_browser_cb.setChecked(True)
         opt_layout.addWidget(self.show_browser_cb)
         right_layout.addLayout(opt_layout)
-
         path_layout = QHBoxLayout()
         self.path_display = QLabel("未选择输出路径")
         self.path_display.setStyleSheet("color: gray; border: 1px solid #ccc; padding: 5px;")
@@ -139,7 +120,6 @@ class MainWindow(QMainWindow):
         path_layout.addWidget(self.path_display, 1)
         path_layout.addWidget(self.select_path_btn)
         right_layout.addLayout(path_layout)
-
         action_layout = QHBoxLayout()
         self.category_display = QLabel("请选择二级类目")
         count_layout = QHBoxLayout()
@@ -149,22 +129,16 @@ class MainWindow(QMainWindow):
         self.count_spin.setFixedWidth(80)
         count_layout.addWidget(QLabel("抓取数量:"))
         count_layout.addWidget(self.count_spin)
-        
         self.search_btn = QPushButton("开始抓取")
         self.search_btn.setEnabled(False)
         self.search_btn.clicked.connect(self.start_scraping)
         self.search_btn.setMinimumHeight(50)
-        self.search_btn.setStyleSheet("""
-            QPushButton { background-color: #0078d4; color: white; font-weight: bold; font-size: 16px; border-radius: 4px; }
-            QPushButton:disabled { background-color: #cccccc; color: #888888; }
-        """)
-        
+        self.search_btn.setStyleSheet("QPushButton { background-color: #0078d4; color: white; font-weight: bold; font-size: 16px; border-radius: 4px; } QPushButton:disabled { background-color: #cccccc; color: #888888; }")
         action_layout.addWidget(QLabel("<font color='red'>*</font>检索类目:"))
         action_layout.addWidget(self.category_display, 1)
         action_layout.addLayout(count_layout)
         action_layout.addWidget(self.search_btn)
         right_layout.addLayout(action_layout)
-
         self.pbar = QProgressBar()
         self.log_output = QTextEdit()
         self.log_output.setReadOnly(True)
@@ -173,7 +147,6 @@ class MainWindow(QMainWindow):
         right_layout.addWidget(self.pbar)
         self.status_label = QLabel("就绪")
         right_layout.addWidget(self.status_label)
-
         splitter = QSplitter(Qt.Orientation.Horizontal)
         splitter.addWidget(left_widget)
         splitter.addWidget(right_widget)

+ 67 - 82
src/scraper.py

@@ -1,5 +1,5 @@
-# 【更新时间:2026-01-16 11:00】
-# 核心功能:支持变体拆分、精准提取款式与价格、对标 req.py 逻辑
+# 【长效稳定版:2026-01-16 12:00】
+# 核心功能:支持变体拆分、精准提取款式与价格
 # 反爬加固:大幅降低频率,增加随机人类行为,减少登录验证
 import sys
 try:
@@ -108,9 +108,8 @@ class Scraper1688:
             if self.status_callback: self.status_callback(True, msg)
             while is_blocked(): time.sleep(2)
             if self.status_callback: self.status_callback(False, "验证通过")
-            # 验证后增加一段长停顿,让风控系统冷静下来
-            if self.log_callback: self.log_callback("<font color='orange'>验证通过,由于风控限制,将额外休息 60 秒...</font>")
-            time.sleep(60)
+            # 验证成功后强制冷却,防止二次封禁
+            time.sleep(random.randint(60, 120))
         return True
 
     def search_products_yield(self, keyword, total_count=200, existing_links=None):
@@ -118,7 +117,6 @@ class Scraper1688:
         base_url = f"https://s.1688.com/selloffer/offer_search.htm?keywords={gbk_keyword}&n=y&netType=1%2C11%2C16"
         self.driver.get("https://www.1688.com")
         self.check_for_captcha()
-        
         all_links = existing_links if existing_links is not None else set()
         page, initial_count = 1, len(all_links)
         
@@ -127,78 +125,69 @@ class Scraper1688:
             self.driver.get(f"{base_url}&beginPage={page}&page={page}")
             self.check_for_captcha()
             
-            # --- 强化:模拟真实人类分段滚动,触发懒加载 ---
-            for i in range(1, 11):
-                self.driver.execute_script(f"window.scrollTo(0, document.body.scrollHeight * {i/10});")
-                time.sleep(random.uniform(1.5, 3.0))
-                if i == 5: # 中途回滑
-                    self.driver.execute_script("window.scrollBy(0, -300);")
+            # --- 核心改进:脉冲式分段滚动,强制触发异步加载 ---
+            for i in range(1, 13):
+                # 分段滑动
+                self.driver.execute_script(f"window.scrollTo(0, document.body.scrollHeight * {i/12});")
+                time.sleep(random.uniform(1.2, 2.8))
+                # 随机回滑模拟真人行为
+                if i % 4 == 0:
+                    self.driver.execute_script(f"window.scrollBy(0, -{random.randint(200, 500)});")
                     time.sleep(1.0)
-            time.sleep(random.uniform(3, 6))
+            
+            time.sleep(random.uniform(3, 6)) # 最终等待数据同步到变量
 
             page_results = self._extract_all_methods()
-            print(f"  [+] 本页发现 {len(page_results)} 个商品原始条目")
+            print(f"  [+] 本页解析完成:共发现 {len(page_results)} 个商品链接")
             
             page_batch = []
             for it in page_results:
-                clean_url = self.clean_url(it.get("link"))
+                clean_url = self.clean_url(it["link"])
                 if clean_url and clean_url not in all_links:
                     all_links.add(clean_url)
                     
-                    # --- 反爬加固:每抓取一定数量,进行一次深度“休息” ---
+                    # --- 极致加固:每 12 条大休息一次 (5-10分钟) ---
                     new_count = len(all_links) - initial_count
-                    if new_count > 0 and new_count % random.randint(12, 18) == 0:
-                        rest_seconds = random.randint(180, 360) # 休息 3-6 分钟
-                        if self.log_callback: 
-                            self.log_callback(f"<font color='orange'>已连续抓取 {new_count} 个商品,为模拟真实行为休息 {rest_seconds} 秒...</font>")
-                        time.sleep(rest_seconds)
+                    if new_count > 0 and new_count % 12 == 0:
+                        rest_secs = random.randint(300, 600)
+                        if self.log_callback: self.log_callback(f"<font color='red'><b>保护机制:已采集12个,进入深度休眠 {rest_secs//60} 分钟...</b></font>")
+                        time.sleep(rest_secs)
 
-                    print(f"  [>] 正在启动详情抓取: {clean_url}")
+                    print(f"  [>] 详情仿真抓取: {clean_url}")
                     detail_results = self.scrape_detail(clean_url)
-                    
-                    if detail_results:
-                        page_batch.extend(detail_results)
-                    else:
-                        page_batch.append({
-                            "category": "", "brand": "", "name": it.get("name", "未知"),
-                            "color": "", "spec": "", "material": "", "price": "",
-                            "moq": "", "wholesale_price": "", "link": clean_url, "supplier": ""
-                        })
+                    if detail_results: page_batch.extend(detail_results)
+                    else: page_batch.append({
+                        "category": "", "brand": "", "name": it.get("name", "未知"),
+                        "color": "", "spec": "", "material": "", "price": "",
+                        "moq": "", "wholesale_price": "", "link": clean_url, "supplier": ""
+                    })
                     
                     if len(page_batch) >= 10:
                         yield page_batch
                         page_batch = []
                     
-                    # --- 反爬加固:详情页之间的随机超长等待 ---
-                    # 降低采集频率,是减少验证最有效的方法
-                    rest_between = random.uniform(25, 55)
-                    time.sleep(rest_between) 
-                    
+                    # --- 核心:每条详情抓取后,随机静默 40 - 80 秒 ---
+                    # 这是降低 3 小时验证频率的最关键参数
+                    time.sleep(random.uniform(40, 80)) 
                     if len(all_links) >= total_count + initial_count: break
             
             if page_batch: yield page_batch
             page += 1
-            
-            # 每处理两页列表,回首页转一圈,打破“机器人模式”
-            if page % 2 == 0:
-                if self.log_callback: self.log_callback("<font color='gray'>处理完两页,回首页浏览以分散风控权重...</font>")
-                self.driver.get("https://www.1688.com")
-                time.sleep(random.randint(15, 30))
-                
+            # 每页列表抓完,回首页彻底休息 1 分钟,重置路径指纹
+            self.driver.get("https://www.1688.com")
+            time.sleep(random.randint(60, 120))
         return list(all_links)
 
     def scrape_detail(self, url):
-        """ 极其精准的变体拆分逻辑 (款式+价格) """
+        """ 详情页深度仿真浏览与精准拆分 """
         try:
             self.driver.get(url)
-            # 大幅拉长详情页加载后的停留时间,并模拟随机滚动
-            time.sleep(random.uniform(8, 15))
-            self.driver.execute_script(f"window.scrollBy(0, {random.randint(200, 600)});")
-            time.sleep(random.uniform(2, 4))
+            # 仿真阅读:停留 15-30 秒并随机分段滚动
+            for _ in range(random.randint(3, 6)):
+                self.driver.execute_script(f"window.scrollBy(0, {random.randint(200, 500)});")
+                time.sleep(random.uniform(3.0, 6.0))
             
             self.check_for_captcha()
-            
-            # 获取核心模型
             model = self.driver.execute_script(
                 "return (window.context && window.context.result && "
                 "window.context.result.global && window.context.result.global.globalData "
@@ -224,56 +213,47 @@ class Scraper1688:
             base_data = {
                 "category": (model.get("offerDetail", {}).get("leafCategoryName", "") if isinstance(model, dict) else "") or self.driver.find_element(By.CSS_SELECTOR, "div[class*=breadcrumb] a:last-child").text.strip(),
                 "brand": get_attr("品牌"),
-                "name": (model.get("offerDetail", {}).get("subject", "") if isinstance(model, dict) else ""),
+                "name": (model.get("offerDetail", {}).get("subject", "") if isinstance(model, dict) else "") or self.driver.title.split('-')[0],
                 "spec": get_attr("尺码") or get_attr("规格") or get_attr("型号"),
                 "material": get_attr("材质") or get_attr("面料"),
-                "moq": trade.get("beginAmount", ""),
-                "wholesale_price": range_text,
-                "link": url,
+                "price": trade.get("minPrice", ""), "moq": trade.get("beginAmount", ""), "wholesale_price": range_text, "link": url,
                 "supplier": (model.get("sellerModel", {}).get("companyName", "") if isinstance(model, dict) else ""),
             }
 
             variant_results = []
             try:
-                wrappers = self.driver.find_elements(By.CLASS_NAME, "expand-view-list-wrapper")
+                # 方案 A: 优先使用 expand-view-list
+                wrappers = self.driver.find_elements(By.CSS_SELECTOR, ".expand-view-list, .expand-view-list-wrapper")
                 if wrappers:
                     items = wrappers[0].find_elements(By.CSS_SELECTOR, ".expand-view-list-item, [class*='list-item'], .sku-item")
                     for item_el in items:
                         try:
                             label = item_el.find_element(By.CLASS_NAME, "item-label").text.strip()
                             price_raw = item_el.find_element(By.CLASS_NAME, "item-price-stock").text.strip()
-                            price_clean = re.sub(r'[^\d.]', '', price_raw)
                             if label:
                                 row = base_data.copy()
                                 row["color"] = label
-                                row["price"] = price_clean
+                                row["price"] = re.sub(r'[^\d.]', '', price_raw)
                                 variant_results.append(row)
                         except: continue
             except: pass
 
-            if variant_results:
-                return variant_results
-
-            sku_props = model.get("skuModel", {}).get("skuProps", []) or model.get("detailData", {}).get("skuProps", []) or []
-            main_prop = next((p for p in sku_props if any(k in p.get("prop", "") for k in ["颜色", "分类", "款式", "花色", "净含量"])), None)
-            if not main_prop and sku_props: main_prop = sku_props[0]
-            if main_prop and main_prop.get("value"):
-                results = []
-                for val in main_prop["value"]:
-                    if val.get("name"):
-                        row = base_data.copy()
-                        row["color"] = val.get("name")
-                        row["price"] = trade.get("minPrice", "")
-                        results.append(row)
-                return results
-            
-            base_data["price"] = trade.get("minPrice", "")
+            if variant_results: return variant_results
             return [base_data]
         except: return None
 
     def _extract_all_methods(self):
-        """ 强化版:对标 req.py 深度探测 JS 变量提取链接 """
+        """ 强化版探测:从 JS 全局变量和 DOM 中提取所有链接 """
         results = []
+        seen_ids = set()
+
+        def add_item(name, link):
+            cid = self.clean_url(link)
+            if cid and cid not in seen_ids:
+                seen_ids.add(cid)
+                results.append({"name": name, "link": cid})
+
+        # 1. 深度 JS 变量探测 (对标 req.py)
         scripts = [
             "return JSON.stringify(window.data || window.context?.result?.data || window.__INITIAL_DATA__)",
             "return JSON.stringify(window.context?.result?.global?.globalData?.data || null)",
@@ -294,20 +274,25 @@ class Scraper1688:
                     for product_list in find_lists(data):
                         for o in product_list:
                             link = o.get('itemUrl', o.get('url', '')) or str(o.get('offerId', ''))
-                            if link: results.append({"name": str(o.get('title', o.get('subject', ''))), "link": link})
+                            name = str(o.get('title', o.get('subject', '')))
+                            add_item(name, link)
                     if results: return results
             except: continue
-        
+
+        # 2. 最新 DOM 选择器扫描
         selectors = [".sm-offer-item", ".offer-card-item", ".search-offer-item", "[class*='offer-card']", ".offer-item"]
         for s in selectors:
-            elements = self.driver.find_elements(By.CSS_SELECTOR, s)
-            if len(elements) > 2:
+            try:
+                elements = self.driver.find_elements(By.CSS_SELECTOR, s)
                 for el in elements:
                     try:
-                        link = el.find_element(By.TAG_NAME, "a").get_attribute("href")
-                        if link and "1688.com" in link: results.append({"name": el.text.split('\n')[0][:50], "link": link})
+                        a_tags = el.find_elements(By.TAG_NAME, "a")
+                        for a in a_tags:
+                            href = a.get_attribute("href")
+                            if href: add_item(el.text.split('\n')[0][:50], href)
                     except: continue
-                if results: break
+            except: continue
+            
         return results
 
     def quit(self):