LuTong 3 месяцев назад
Родитель
Сommit
526c7e0bc6
2 измененных файлов с 54 добавлено и 132 удалено
  1. 3 1
      src/gui.py
  2. 51 131
      src/scraper.py

+ 3 - 1
src/gui.py

@@ -219,7 +219,9 @@ class MainWindow(QMainWindow):
         if not err:
             self.status_label.setText("任务完成")
             if hasattr(self, 'current_output_file') and os.path.exists(self.current_output_file):
-                try: os.startfile(self.current_output_file)
+                try: 
+                    print("【打开文档】")
+                    os.startfile(self.current_output_file)
                 except: pass
         else: self.status_label.setText("异常终止")
 

+ 51 - 131
src/scraper.py

@@ -1,6 +1,4 @@
-# 【版本:2026-01-16 极致稳定版】
-# 核心功能:支持变体拆分、精准提取款式与价格、对标 req.py 逻辑
-# 反爬策略:极低频抓取、大跨度深度休眠、行为路径混淆,目标 3小时 < 2次验证
+# 【版本:2026-01-16 13:45 - 变体与价格精准同步版】
 import sys
 try:
     import distutils
@@ -66,7 +64,6 @@ class Scraper1688:
     def _init_chrome(self, headless):
         """ 强化版 Chrome 启动逻辑 """
         chrome_path = self._find_chrome()
-        
         def create_options():
             opts = uc.ChromeOptions()
             opts.add_argument(f'user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36')
@@ -78,16 +75,14 @@ class Scraper1688:
             opts.add_argument("--disable-dev-shm-usage")
             opts.add_argument("--remote-allow-origins=*")
             return opts
-
         try:
-            # 优先使用 subprocess 模式启动,解决 Win11 连接难题
             self.driver = uc.Chrome(options=create_options(), headless=headless, browser_executable_path=chrome_path, use_subprocess=True)
-        except:
+        except Exception:
             # 失败则尝试普通模式
             self.driver = uc.Chrome(options=create_options(), headless=headless, use_subprocess=True)
 
     def clean_url(self, url):
-        """ 鲁棒的 ID 提取 logic """
+        """ 鲁棒的 ID 提取并重组链接 """
         if not url: return ""
         url_str = str(url)
         if url_str.startswith("//"): url_str = "https:" + url_str
@@ -108,8 +103,9 @@ class Scraper1688:
             if self.status_callback: self.status_callback(True, msg)
             while is_blocked(): time.sleep(2)
             if self.status_callback: self.status_callback(False, "验证通过")
-            if self.log_callback: self.log_callback("<font color='orange'>验证成功,进入 120 秒冷却期重置行为指纹...</font>")
-            time.sleep(120)
+            if self.log_callback: self.log_callback(
+                "<font color='orange'>验证成功,进入 120 秒冷却期重置行为指纹...</font>")
+            time.sleep(random.randint(60, 120))
         return True
 
     def search_products_yield(self, keyword, total_count=200, existing_links=None):
@@ -136,11 +132,10 @@ class Scraper1688:
                 if i % 4 == 0:
                     self.driver.execute_script(f"window.scrollBy(0, -{random.randint(200, 500)});")
                     time.sleep(1.0)
-            
-            time.sleep(random.uniform(3, 6)) # 最终等待数据同步到变量
+            time.sleep(random.uniform(3, 6))
 
             page_results = self._extract_all_methods()
-            print(f"  [+] 本页解析完成:共发现 {len(page_results)} 个潜在商品链接")
+            print(f"  [+] 本页解析完成:共发现 {len(page_results)} 个潜在商品链接")
             
             page_batch = []
             for it in page_results:
@@ -148,11 +143,11 @@ class Scraper1688:
                 if clean_url and clean_url not in all_links:
                     all_links.add(clean_url)
                     
-                    # --- 大跨度休眠 ---
+                    # 冷却机制
                     new_count = len(all_links) - initial_count
-                    if new_count > 0 and new_count % 15 == 0:
+                    if new_count > 0 and new_count % 12 == 0:
                         rest_secs = random.randint(300, 600)
-                        if self.log_callback: self.log_callback(f"<font color='red'><b>保护机制:已采集15个,进入深度休眠 {rest_secs//60} 分钟...</b></font>")
+                        if self.log_callback: self.log_callback(f"<font color='red'><b>保护机制:进入休眠 {rest_secs//60} 分钟...</b></font>")
                         time.sleep(rest_secs)
 
                     print(f"  [>] 详情仿真抓取: {clean_url}")
@@ -167,7 +162,7 @@ class Scraper1688:
                         page_batch = []
                     
                     # 详情页后的随机等待
-                    time.sleep(random.uniform(40, 80)) 
+                    time.sleep(random.uniform(40, 80))
                     if len(all_links) >= total_count + initial_count: break
             
             if page_batch: yield page_batch
@@ -177,7 +172,7 @@ class Scraper1688:
         return list(all_links)
 
     def scrape_detail(self, url):
-        """ 极其精准的变体拆分逻辑 (款式+价格) """
+        """ 极精准变体解析:锁定 expand-view-list 并提取款式与逐条价格 """
         try:
             self.driver.get(url)
             # 仿真阅读
@@ -215,139 +210,64 @@ class Scraper1688:
                 "supplier": (model.get("sellerModel", {}).get("companyName", "") if isinstance(model, dict) else ""),
             }
 
-            # --- 核心逻辑订正:智能识别规格区域(获取款式名称和逐条价格) ---
-            variant_data_list = []
+            variant_results = []
             try:
-                # 按照用户提供的线索,优先尝试多种可能的 Class 名
-                selectors = [".expand-view-list", ".expand-view-list-wrapper", ".sku-wrapper", ".obj-sku"]
-                wrapper = None
-                for s in selectors:
-                    found = self.driver.find_elements(By.CSS_SELECTOR, s)
-                    if found and found[0].is_displayed():
-                        wrapper = found[0]
-                        break
-                
-                if wrapper:
-                    # 获取该容器下的所有子项条目
-                    items = wrapper.find_elements(By.CSS_SELECTOR, ".expand-view-list-item, [class*='list-item'], .sku-item, .obj-sku-item")
-                    
+                # 按照用户提供的线索,精准锁定变体容器
+                wrappers = self.driver.find_elements(By.CSS_SELECTOR, ".expand-view-list, .expand-view-list-wrapper")
+                if wrappers:
+                    # 寻找每一个变体子项条目
+                    items = wrappers[0].find_elements(By.CSS_SELECTOR, ".expand-view-list-item, [class*='list-item'], .sku-item")
                     for item_el in items:
                         try:
-                            # 提取款式描述文字 (item-label) -> 对应 Excel “颜色”列
-                            label_el = item_el.find_elements(By.CLASS_NAME, "item-label")
-                            # 提取逐条价格 (item-price-stock) -> 对应 Excel “单品进价(元)”列
-                            price_el = item_el.find_elements(By.CLASS_NAME, "item-price-stock")
+                            # 描述文字文字 (item-label) -> 颜色列
+                            label = item_el.find_element(By.CLASS_NAME, "item-label").text.strip()
+                            # 逐条对应的价格 (item-price-stock) -> 单品进价列
+                            price_raw = item_el.find_element(By.CLASS_NAME, "item-price-stock").text.strip()
+                            # 价格清洗
+                            price_clean = re.sub(r'[^\d.]', '', price_raw)
                             
-                            if label_el and price_el:
-                                label_text = label_el[0].text.strip()
-                                price_text = price_el[0].text.strip()
-                                # 清洗价格,只保留数字和小数点
-                                price_clean = re.sub(r'[^\d.]', '', price_text)
-                                
-                                if label_text:
-                                    variant_data_list.append({
-                                        "label": label_text,
-                                        "price": price_clean if price_clean else price_text
-                                    })
+                            if label:
+                                row = base_data.copy()
+                                row["color"] = label
+                                row["price"] = price_clean
+                                # 如果 spec 还没拿,就把款式描述填入规格
+                                if not row["spec"]: row["spec"] = label
+                                variant_results.append(row)
                         except: continue
-            except Exception as e:
-                print(f"  [!] DOM 变体解析异常: {e}")
+            except: pass
 
-            if variant_data_list:
-                print(f"  [+] 成功解析到 {len(variant_data_list)} 个款式变体")
-                results = []
-                for vd in variant_data_list:
-                    row = base_data.copy()
-                    row["color"] = vd["label"]
-                    row["price"] = vd["price"]
-                    results.append(row)
-                return results
-
-            # --- 方案 B:如果 DOM 探测失败,回退到 JS 模型提取 ---
-            sku_props = model.get("skuModel", {}).get("skuProps", []) or model.get("detailData", {}).get("skuProps", []) or []
-            main_prop = next((p for p in sku_props if any(k in p.get("prop", "") for k in ["颜色", "分类", "款式", "花色", "净含量"])), None)
-            if not main_prop and sku_props: main_prop = sku_props[0]
-            if main_prop and main_prop.get("value"):
-                results = []
-                for val in main_prop["value"]:
-                    if val.get("name"):
-                        row = base_data.copy()
-                        row["color"] = val.get("name")
-                        row["price"] = trade.get("minPrice", "")
-                        results.append(row)
-                return results
-            
-            base_data["price"] = trade.get("minPrice", "")
+            if variant_results:
+                return variant_results
             return [base_data]
         except: return None
 
     def _extract_all_methods(self):
-        """ 强化版:从所有可能的内存变量中收集商品列表,并去重 """
-        all_items = []
+        results = []
         seen_ids = set()
-
         def add_item(name, link):
-            if not link: return
-            # 统一转化为标准详情页链接并提取 ID 作为去重键
-            url_str = str(link)
-            id_match = re.search(r'(\d{9,15})', url_str)
-            if id_match:
-                oid = id_match.group(1)
-                if oid not in seen_ids:
-                    seen_ids.add(oid)
-                    standard_url = f"https://detail.1688.com/offer/{oid}.html"
-                    all_items.append({"name": name, "link": standard_url})
+            cid = self.clean_url(link)
+            if cid and cid not in seen_ids:
+                seen_ids.add(cid); results.append({"name": name, "link": cid})
 
-        # 1. 深度内存探测 (对标 req.py 逻辑)
-        scripts = [
-            "return JSON.stringify(window.data)",
-            "return JSON.stringify(window.context?.result?.data)",
-            "return JSON.stringify(window.__INITIAL_DATA__)",
-            "return JSON.stringify(window.pageData)"
-        ]
-        
+        scripts = ["return JSON.stringify(window.data || window.context?.result?.data || window.__INITIAL_DATA__)", "return JSON.stringify(window.context?.result?.global?.globalData?.data || null)", "return JSON.stringify(window.pageData || null)"]
         for s in scripts:
             try:
                 res = self.driver.execute_script(s)
                 if res and res != "null":
                     data = json.loads(res)
-                    
-                    # 递归寻找所有符合商品列表特征的 list
-                    def collect_lists(obj):
-                        found = []
-                        if isinstance(obj, list) and len(obj) > 0:
-                            # 只要列表第一项包含 offerId 或 title,就认为是目标列表
-                            if isinstance(obj[0], dict) and any(k in obj[0] for k in ['offerId', 'title', 'subject']):
-                                found.append(obj)
-                        elif isinstance(obj, dict):
-                            for v in obj.values():
-                                found.extend(collect_lists(v))
-                        return found
-
-                    all_found_lists = collect_lists(data)
-                    for plist in all_found_lists:
-                        for o in plist:
-                            name = str(o.get('title', o.get('subject', o.get('name', ''))))
+                    def find_lists(obj):
+                        lists = []
+                        if isinstance(obj, list) and len(obj) > 0 and isinstance(obj[0], dict) and any(k in obj[0] for k in ['offerId', 'title', 'subject']): lists.append(obj)
+                        if isinstance(obj, dict):
+                            for k in obj: lists.extend(find_lists(obj[k]))
+                        return lists
+                    for product_list in find_lists(data):
+                        for o in product_list:
                             link = o.get('itemUrl', o.get('url', '')) or str(o.get('offerId', ''))
-                            add_item(name, link)
+                            add_item(str(o.get('title', o.get('subject', ''))), link)
+                    if results: return results
             except: continue
-
-        # 2. DOM 暴力补位 (如果内存变量探测到的不足 10 个,说明可能渲染机制变了)
-        if len(all_items) < 10:
-            selectors = [".sm-offer-item", ".offer-card-item", "[class*='offer-card']", ".offer-item", ".major-offer"]
-            for s in selectors:
-                elements = self.driver.find_elements(By.CSS_SELECTOR, s)
-                for el in elements:
-                    try:
-                        # 尝试寻找 a 标签
-                        a_tags = el.find_elements(By.TAG_NAME, "a")
-                        for a in a_tags:
-                            link = a.get_attribute("href")
-                            name = el.text.split('\n')[0][:50]
-                            add_item(name, link)
-                    except: continue
-        
-        return all_items
+        return results
 
     def quit(self):
         try: self.driver.quit()