Преглед на файлове

反爬加固:每抓取一定数量,进行一次深度“休息”

LuTong преди 2 месеца
родител
ревизия
6d56eff267
променени са 1 файла, в които са добавени 44 реда и са изтрити 27 реда
  1. 44 27
      src/scraper.py

+ 44 - 27
src/scraper.py

@@ -1,5 +1,6 @@
-# 【更新时间:2026-01-16 10:00】
+# 【更新时间:2026-01-16 11:00】
 # 核心功能:支持变体拆分、精准提取款式与价格、对标 req.py 逻辑
+# 反爬加固:大幅降低频率,增加随机人类行为,减少登录验证
 import sys
 try:
     import distutils
@@ -107,7 +108,9 @@ class Scraper1688:
             if self.status_callback: self.status_callback(True, msg)
             while is_blocked(): time.sleep(2)
             if self.status_callback: self.status_callback(False, "验证通过")
-            time.sleep(3)
+            # 验证后增加一段长停顿,让风控系统冷静下来
+            if self.log_callback: self.log_callback("<font color='orange'>验证通过,由于风控限制,将额外休息 60 秒...</font>")
+            time.sleep(60)
         return True
 
     def search_products_yield(self, keyword, total_count=200, existing_links=None):
@@ -127,11 +130,11 @@ class Scraper1688:
             # --- 强化:模拟真实人类分段滚动,触发懒加载 ---
             for i in range(1, 11):
                 self.driver.execute_script(f"window.scrollTo(0, document.body.scrollHeight * {i/10});")
-                time.sleep(1.5)
+                time.sleep(random.uniform(1.5, 3.0))
                 if i == 5: # 中途回滑
                     self.driver.execute_script("window.scrollBy(0, -300);")
                     time.sleep(1.0)
-            time.sleep(3)
+            time.sleep(random.uniform(3, 6))
 
             page_results = self._extract_all_methods()
             print(f"  [+] 本页发现 {len(page_results)} 个商品原始条目")
@@ -141,6 +144,15 @@ class Scraper1688:
                 clean_url = self.clean_url(it.get("link"))
                 if clean_url and clean_url not in all_links:
                     all_links.add(clean_url)
+                    
+                    # --- 反爬加固:每抓取一定数量,进行一次深度“休息” ---
+                    new_count = len(all_links) - initial_count
+                    if new_count > 0 and new_count % random.randint(12, 18) == 0:
+                        rest_seconds = random.randint(180, 360) # 休息 3-6 分钟
+                        if self.log_callback: 
+                            self.log_callback(f"<font color='orange'>已连续抓取 {new_count} 个商品,为模拟真实行为休息 {rest_seconds} 秒...</font>")
+                        time.sleep(rest_seconds)
+
                     print(f"  [>] 正在启动详情抓取: {clean_url}")
                     detail_results = self.scrape_detail(clean_url)
                     
@@ -157,24 +169,36 @@ class Scraper1688:
                         yield page_batch
                         page_batch = []
                     
-                    time.sleep(random.uniform(15, 25)) 
+                    # --- 反爬加固:详情页之间的随机超长等待 ---
+                    # 降低采集频率,是减少验证最有效的方法
+                    rest_between = random.uniform(25, 55)
+                    time.sleep(rest_between) 
+                    
                     if len(all_links) >= total_count + initial_count: break
             
             if page_batch: yield page_batch
             page += 1
+            
+            # 每处理两页列表,回首页转一圈,打破“机器人模式”
             if page % 2 == 0:
+                if self.log_callback: self.log_callback("<font color='gray'>处理完两页,回首页浏览以分散风控权重...</font>")
                 self.driver.get("https://www.1688.com")
-                time.sleep(random.randint(10, 20))
+                time.sleep(random.randint(15, 30))
+                
         return list(all_links)
 
     def scrape_detail(self, url):
         """ 极其精准的变体拆分逻辑 (款式+价格) """
         try:
             self.driver.get(url)
-            time.sleep(random.uniform(5, 10))
+            # 大幅拉长详情页加载后的停留时间,并模拟随机滚动
+            time.sleep(random.uniform(8, 15))
+            self.driver.execute_script(f"window.scrollBy(0, {random.randint(200, 600)});")
+            time.sleep(random.uniform(2, 4))
+            
             self.check_for_captcha()
             
-            # 1. 对标 req.py 获取 JS 模型
+            # 获取核心模型
             model = self.driver.execute_script(
                 "return (window.context && window.context.result && "
                 "window.context.result.global && window.context.result.global.globalData "
@@ -195,13 +219,12 @@ class Scraper1688:
                 return ""
 
             trade = model.get("tradeModel", {}) if isinstance(model, dict) else {}
-            ranges = trade.get("disPriceRanges") or trade.get("currentPrices") or []
-            range_text = " / ".join([f"{r.get('beginAmount')}起 ¥{r.get('price') or r.get('discountPrice')}" for r in ranges])
+            range_text = " / ".join([f"{r.get('beginAmount')}起 ¥{r.get('price') or r.get('discountPrice')}" for r in (trade.get("disPriceRanges") or trade.get("currentPrices") or [])])
 
             base_data = {
                 "category": (model.get("offerDetail", {}).get("leafCategoryName", "") if isinstance(model, dict) else "") or self.driver.find_element(By.CSS_SELECTOR, "div[class*=breadcrumb] a:last-child").text.strip(),
                 "brand": get_attr("品牌"),
-                "name": (model.get("offerDetail", {}).get("subject", "") if isinstance(model, dict) else "") or self.driver.title.split('-')[0],
+                "name": (model.get("offerDetail", {}).get("subject", "") if isinstance(model, dict) else ""),
                 "spec": get_attr("尺码") or get_attr("规格") or get_attr("型号"),
                 "material": get_attr("材质") or get_attr("面料"),
                 "moq": trade.get("beginAmount", ""),
@@ -210,7 +233,6 @@ class Scraper1688:
                 "supplier": (model.get("sellerModel", {}).get("companyName", "") if isinstance(model, dict) else ""),
             }
 
-            # 2. 核心:拆分 expand-view-list-wrapper 区域 (款式名称与价格)
             variant_results = []
             try:
                 wrappers = self.driver.find_elements(By.CLASS_NAME, "expand-view-list-wrapper")
@@ -218,13 +240,9 @@ class Scraper1688:
                     items = wrappers[0].find_elements(By.CSS_SELECTOR, ".expand-view-list-item, [class*='list-item'], .sku-item")
                     for item_el in items:
                         try:
-                            # 提取款式描述文字 (item-label)
                             label = item_el.find_element(By.CLASS_NAME, "item-label").text.strip()
-                            # 提取逐条价格 (item-price-stock)
                             price_raw = item_el.find_element(By.CLASS_NAME, "item-price-stock").text.strip()
-                            # 清洗价格
                             price_clean = re.sub(r'[^\d.]', '', price_raw)
-                            
                             if label:
                                 row = base_data.copy()
                                 row["color"] = label
@@ -236,11 +254,9 @@ class Scraper1688:
             if variant_results:
                 return variant_results
 
-            # 3. 兜底:如果 DOM 探测失败,尝试从 JS 模型提取 SKU
             sku_props = model.get("skuModel", {}).get("skuProps", []) or model.get("detailData", {}).get("skuProps", []) or []
             main_prop = next((p for p in sku_props if any(k in p.get("prop", "") for k in ["颜色", "分类", "款式", "花色", "净含量"])), None)
             if not main_prop and sku_props: main_prop = sku_props[0]
-            
             if main_prop and main_prop.get("value"):
                 results = []
                 for val in main_prop["value"]:
@@ -282,15 +298,16 @@ class Scraper1688:
                     if results: return results
             except: continue
         
-        # DOM 选择器保底
-        for s in [".sm-offer-item", ".offer-card-item", "[class*='offer-card']", ".offer-item"]:
-            for el in self.driver.find_elements(By.CSS_SELECTOR, s):
-                try:
-                    a = el.find_element(By.TAG_NAME, "a")
-                    link = a.get_attribute("href")
-                    if link: results.append({"name": el.text.split('\n')[0][:50], "link": link})
-                except: continue
-            if results: break
+        selectors = [".sm-offer-item", ".offer-card-item", ".search-offer-item", "[class*='offer-card']", ".offer-item"]
+        for s in selectors:
+            elements = self.driver.find_elements(By.CSS_SELECTOR, s)
+            if len(elements) > 2:
+                for el in elements:
+                    try:
+                        link = el.find_element(By.TAG_NAME, "a").get_attribute("href")
+                        if link and "1688.com" in link: results.append({"name": el.text.split('\n')[0][:50], "link": link})
+                    except: continue
+                if results: break
         return results
 
     def quit(self):