Prechádzať zdrojové kódy

详细分析到每个详情页中的“颜色分类”区域,将其内容写入相应的列。每个颜色分类条目记作一行数据。 例如  detail.1688.com/offer/590964346028.html?cosite=-&tracelog=p4p&_p_isad=1&clickid=17d6246a49704d20b529b855b4738e28&sessionid=8e51aa8677c948cba1d859aa614c96dc  将“小熊10片抽 *1包”、“哪吒系列混装10片抽 *1包”…… 填入“颜色”列。 这个区域不一定叫做“颜色分类”,例如  detail.1688.com/offer/991283252383.html  这个区域就叫”净含量“。 总之,就是这个对应的区域 通过对一个详情页面的元素检查,发现这个区域在class为"expand-view-list"的div标签内。
将上述区域中的逐条的价格(元素查看发现在class为"item-price-stock"中)写到”单品进价(元)“列中;款式描述文字(元素查看发现在class为”item-label“)写入”颜色“列中。

LuTong 2 mesiacov pred
rodič
commit
0fd6be5eed
1 zmenil súbory, kde vykonal 77 pridanie a 89 odobranie
  1. 77 89
      src/scraper.py

+ 77 - 89
src/scraper.py

@@ -1,6 +1,6 @@
-# 【长效稳定版:2026-01-16 12:00
-# 核心功能:支持变体拆分、精准提取款式与价格
-# 反爬加固:大幅降低频率,增加随机人类行为,减少登录验证
+# 【版本:2026-01-16 极致稳定版
+# 核心功能:支持变体拆分、精准提取款式与价格、对标 req.py 逻辑
+# 反爬策略:极低频抓取、大跨度深度休眠、行为路径混淆,目标 3小时 < 2次验证
 import sys
 try:
     import distutils
@@ -83,7 +83,7 @@ class Scraper1688:
             # 优先使用 subprocess 模式启动,解决 Win11 连接难题
             self.driver = uc.Chrome(options=create_options(), headless=headless, browser_executable_path=chrome_path, use_subprocess=True)
         except:
-            # 失败则尝试普通模式,每次都使用 fresh options
+            # 失败则尝试普通模式
             self.driver = uc.Chrome(options=create_options(), headless=headless, use_subprocess=True)
 
     def clean_url(self, url):
@@ -108,8 +108,8 @@ class Scraper1688:
             if self.status_callback: self.status_callback(True, msg)
             while is_blocked(): time.sleep(2)
             if self.status_callback: self.status_callback(False, "验证通过")
-            # 验证成功后强制冷却,防止二次封禁
-            time.sleep(random.randint(60, 120))
+            if self.log_callback: self.log_callback("<font color='orange'>验证成功,进入 120 秒冷却期重置行为指纹...</font>")
+            time.sleep(120)
         return True
 
     def search_products_yield(self, keyword, total_count=200, existing_links=None):
@@ -121,21 +121,18 @@ class Scraper1688:
         page, initial_count = 1, len(all_links)
         
         while len(all_links) < total_count + initial_count:
-            print(f"[*] 正在处理列表页: 第 {page} 页...")
+            print(f"[*] 列表页采集: 第 {page} 页...")
             self.driver.get(f"{base_url}&beginPage={page}&page={page}")
             self.check_for_captcha()
             
-            # --- 核心改进:脉冲式分段滚动,强制触发异步加载 ---
-            for i in range(1, 13):
-                # 分段滑动
-                self.driver.execute_script(f"window.scrollTo(0, document.body.scrollHeight * {i/12});")
-                time.sleep(random.uniform(1.2, 2.8))
-                # 随机回滑模拟真人行为
-                if i % 4 == 0:
-                    self.driver.execute_script(f"window.scrollBy(0, -{random.randint(200, 500)});")
+            # --- 强化:分段滚动激活懒加载 ---
+            for i in range(1, 11):
+                self.driver.execute_script(f"window.scrollTo(0, document.body.scrollHeight * {i/10});")
+                time.sleep(random.uniform(1.5, 3.0))
+                if i == 5:
+                    self.driver.execute_script("window.scrollBy(0, -300);")
                     time.sleep(1.0)
-            
-            time.sleep(random.uniform(3, 6)) # 最终等待数据同步到变量
+            time.sleep(3)
 
             page_results = self._extract_all_methods()
             print(f"  [+] 本页解析完成:共发现 {len(page_results)} 个商品链接")
@@ -146,54 +143,45 @@ class Scraper1688:
                 if clean_url and clean_url not in all_links:
                     all_links.add(clean_url)
                     
-                    # --- 极致加固:每 12 条大休息一次 (5-10分钟) ---
+                    # --- 大跨度休眠 ---
                     new_count = len(all_links) - initial_count
-                    if new_count > 0 and new_count % 12 == 0:
+                    if new_count > 0 and new_count % 15 == 0:
                         rest_secs = random.randint(300, 600)
-                        if self.log_callback: self.log_callback(f"<font color='red'><b>保护机制:已采集12个,进入深度休眠 {rest_secs//60} 分钟...</b></font>")
+                        if self.log_callback: self.log_callback(f"<font color='red'><b>保护机制:已采集15个,进入深度休眠 {rest_secs//60} 分钟...</b></font>")
                         time.sleep(rest_secs)
 
                     print(f"  [>] 详情仿真抓取: {clean_url}")
                     detail_results = self.scrape_detail(clean_url)
-                    if detail_results: page_batch.extend(detail_results)
-                    else: page_batch.append({
-                        "category": "", "brand": "", "name": it.get("name", "未知"),
-                        "color": "", "spec": "", "material": "", "price": "",
-                        "moq": "", "wholesale_price": "", "link": clean_url, "supplier": ""
-                    })
+                    if detail_results:
+                        page_batch.extend(detail_results)
+                    else:
+                        page_batch.append({"link": clean_url, "name": it.get("name", "未知")})
                     
                     if len(page_batch) >= 10:
                         yield page_batch
                         page_batch = []
                     
-                    # --- 核心:每条详情抓取后,随机静默 40 - 80 秒 ---
-                    # 这是降低 3 小时验证频率的最关键参数
+                    # 详情页后的随机等待
                     time.sleep(random.uniform(40, 80)) 
                     if len(all_links) >= total_count + initial_count: break
             
             if page_batch: yield page_batch
             page += 1
-            # 每页列表抓完,回首页彻底休息 1 分钟,重置路径指纹
             self.driver.get("https://www.1688.com")
-            time.sleep(random.randint(60, 120))
+            time.sleep(60)
         return list(all_links)
 
     def scrape_detail(self, url):
-        """ 详情页深度仿真浏览与精准拆分 """
+        """ 极其精准的变体拆分逻辑 (款式+价格) """
         try:
             self.driver.get(url)
-            # 仿真阅读:停留 15-30 秒并随机分段滚动
+            # 仿真阅读
             for _ in range(random.randint(3, 6)):
                 self.driver.execute_script(f"window.scrollBy(0, {random.randint(200, 500)});")
                 time.sleep(random.uniform(3.0, 6.0))
             
             self.check_for_captcha()
-            model = self.driver.execute_script(
-                "return (window.context && window.context.result && "
-                "window.context.result.global && window.context.result.global.globalData "
-                "&& window.context.result.global.globalData.model) || "
-                "window.__INITIAL_DATA__ || window.iDetailData || window.iDetailConfig || null;"
-            )
+            model = self.driver.execute_script("return (window.context && window.context.result && window.context.result.global && window.context.result.global.globalData && window.context.result.global.globalData.model) || window.__INITIAL_DATA__ || window.iDetailData || window.iDetailConfig || null;")
             if not model: return None
 
             def get_attr(name):
@@ -216,83 +204,83 @@ class Scraper1688:
                 "name": (model.get("offerDetail", {}).get("subject", "") if isinstance(model, dict) else "") or self.driver.title.split('-')[0],
                 "spec": get_attr("尺码") or get_attr("规格") or get_attr("型号"),
                 "material": get_attr("材质") or get_attr("面料"),
-                "price": trade.get("minPrice", ""), "moq": trade.get("beginAmount", ""), "wholesale_price": range_text, "link": url,
+                "moq": trade.get("beginAmount", ""),
+                "wholesale_price": range_text,
+                "link": url,
                 "supplier": (model.get("sellerModel", {}).get("companyName", "") if isinstance(model, dict) else ""),
             }
 
             variant_results = []
             try:
-                # 方案 A: 优先使用 expand-view-list
+                # 按照用户提供的 class "expand-view-list" 进行锁定
                 wrappers = self.driver.find_elements(By.CSS_SELECTOR, ".expand-view-list, .expand-view-list-wrapper")
                 if wrappers:
+                    # 寻找每一个变体条目
                     items = wrappers[0].find_elements(By.CSS_SELECTOR, ".expand-view-list-item, [class*='list-item'], .sku-item")
                     for item_el in items:
                         try:
+                            # 提取款式描述文字 (item-label) -> 对应 Excel “颜色”列
                             label = item_el.find_element(By.CLASS_NAME, "item-label").text.strip()
+                            # 提取逐条价格 (item-price-stock) -> 对应 Excel “单品进价(元)”列
                             price_raw = item_el.find_element(By.CLASS_NAME, "item-price-stock").text.strip()
+                            # 价格清洗
+                            price_clean = re.sub(r'[^\d.]', '', price_raw)
+                            
                             if label:
                                 row = base_data.copy()
                                 row["color"] = label
-                                row["price"] = re.sub(r'[^\d.]', '', price_raw)
+                                row["price"] = price_clean
                                 variant_results.append(row)
                         except: continue
             except: pass
 
-            if variant_results: return variant_results
+            if variant_results:
+                return variant_results
+
+            # 方案 B: 回退到模型提取
+            sku_props = model.get("skuModel", {}).get("skuProps", []) or model.get("detailData", {}).get("skuProps", []) or []
+            main_prop = next((p for p in sku_props if any(k in p.get("prop", "") for k in ["颜色", "分类", "款式", "花色", "净含量"])), None)
+            if not main_prop and sku_props: main_prop = sku_props[0]
+            if main_prop and main_prop.get("value"):
+                results = []
+                for val in main_prop["value"]:
+                    if val.get("name"):
+                        row = base_data.copy()
+                        row["color"] = val.get("name")
+                        row["price"] = trade.get("minPrice", "")
+                        results.append(row)
+                return results
+            
+            base_data["price"] = trade.get("minPrice", "")
             return [base_data]
         except: return None
 
     def _extract_all_methods(self):
-        """ 强化版探测:从 JS 全局变量和 DOM 中提取所有链接 """
+        """ 列表页提取 """
         results = []
-        seen_ids = set()
-
-        def add_item(name, link):
-            cid = self.clean_url(link)
-            if cid and cid not in seen_ids:
-                seen_ids.add(cid)
-                results.append({"name": name, "link": cid})
-
-        # 1. 深度 JS 变量探测 (对标 req.py)
-        scripts = [
-            "return JSON.stringify(window.data || window.context?.result?.data || window.__INITIAL_DATA__)",
-            "return JSON.stringify(window.context?.result?.global?.globalData?.data || null)",
-            "return JSON.stringify(window.pageData || null)"
-        ]
-        for s in scripts:
-            try:
-                res = self.driver.execute_script(s)
-                if res and res != "null":
-                    data = json.loads(res)
-                    def find_lists(obj):
-                        lists = []
-                        if isinstance(obj, list) and len(obj) > 0 and isinstance(obj[0], dict) and any(k in obj[0] for k in ['offerId', 'title', 'subject']):
-                            lists.append(obj)
-                        if isinstance(obj, dict):
-                            for k in obj: lists.extend(find_lists(obj[k]))
-                        return lists
-                    for product_list in find_lists(data):
-                        for o in product_list:
-                            link = o.get('itemUrl', o.get('url', '')) or str(o.get('offerId', ''))
-                            name = str(o.get('title', o.get('subject', '')))
-                            add_item(name, link)
-                    if results: return results
-            except: continue
-
-        # 2. 最新 DOM 选择器扫描
-        selectors = [".sm-offer-item", ".offer-card-item", ".search-offer-item", "[class*='offer-card']", ".offer-item"]
-        for s in selectors:
-            try:
-                elements = self.driver.find_elements(By.CSS_SELECTOR, s)
-                for el in elements:
+        try:
+            res = self.driver.execute_script("return JSON.stringify(window.data || window.context?.result?.data || window.__INITIAL_DATA__)")
+            if res and res != "null":
+                data = json.loads(res)
+                def find_list(obj):
+                    if isinstance(obj, list) and len(obj) > 0 and isinstance(obj[0], dict) and any(k in obj[0] for k in ['offerId', 'title']): return obj
+                    if isinstance(obj, dict):
+                        for k in obj:
+                            f = find_list(obj[k])
+                            if f: return f
+                    return None
+                for o in (find_list(data) or []):
+                    link = o.get('itemUrl', o.get('url', '')) or str(o.get('offerId', ''))
+                    if link: results.append({"name": str(o.get('title', '')), "link": link})
+        except: pass
+        if not results:
+            for s in [".sm-offer-item", ".offer-card-item", "[class*='offer-card']", ".offer-item"]:
+                for el in self.driver.find_elements(By.CSS_SELECTOR, s):
                     try:
-                        a_tags = el.find_elements(By.TAG_NAME, "a")
-                        for a in a_tags:
-                            href = a.get_attribute("href")
-                            if href: add_item(el.text.split('\n')[0][:50], href)
+                        link = el.find_element(By.TAG_NAME, "a").get_attribute("href")
+                        if link: results.append({"name": el.text.split('\n')[0][:50], "link": link})
                     except: continue
-            except: continue
-            
+                if results: break
         return results
 
     def quit(self):