LuTong 3 месяцев назад
Родитель
Сommit
501aa88e27
1 измененных файлов с 36 добавлено и 57 удалено
  1. 36 57
      src/scraper.py

+ 36 - 57
src/scraper.py

@@ -30,68 +30,44 @@ class Scraper1688:
         self.user_data_path = os.path.abspath(os.path.join(os.getcwd(), "1688_user_data"))
         self.driver = None
         
-        # 1. 探测 Edge 路径
         edge_path = self._find_edge()
-        
         if edge_path:
             print(f"[*] 检测到 Edge: {edge_path},正在全自动启动并接管...")
             self._cleanup_processes()
-            
-            # 2. 后台启动 Edge (开启 9222 端口)
             edge_user_data = os.path.join(os.getcwd(), "1688_edge_profile")
-            cmd = [
-                edge_path,
-                "--remote-debugging-port=9222",
-                f"--user-data-dir={edge_user_data}",
-                "--no-first-run",
-                "--no-default-browser-check"
-            ]
+            cmd = [edge_path, "--remote-debugging-port=9222", f"--user-data-dir={edge_user_data}", "--no-first-run", "--no-default-browser-check"]
             if headless: cmd.append("--headless")
-            
             try:
                 subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
                 time.sleep(3) 
-                
-                # 3. 通过调试端口接管
                 opts = EdgeOptions()
                 opts.add_experimental_option("debuggerAddress", "127.0.0.1:9222")
-                
                 try:
                     self.driver = webdriver.Edge(options=opts)
-                    print("[+] Edge 浏览器已成功自动弹出并接管!")
+                    print("[+] Edge 浏览器已成功接管!")
                 except:
-                    print("[*] 尝试自动下载匹配的 EdgeDriver...")
                     from webdriver_manager.microsoft import EdgeChromiumDriverManager
                     service = EdgeService(EdgeChromiumDriverManager().install())
                     self.driver = webdriver.Edge(service=service, options=opts)
-                    print("[+] Edge 浏览器已接管成功!")
-                    
+                    print("[+] Edge 浏览器已通过驱动管理接管!")
             except Exception as e:
-                print(f"[*] Edge 自动接管失败,准备回退到 Chrome: {e}")
+                print(f"[*] Edge 启动失败: {e}")
         
-        # 4. 兜底方案
         if not self.driver:
             print("[*] 正在启动 Chrome (undetected-chromedriver) 模式...")
             self._init_chrome(headless)
 
         if self.driver:
-            # 只有在 Chrome 模式下才应用 stealth,Edge 接管模式本身就很真实
             if "edge" not in str(type(self.driver)).lower():
                 stealth(self.driver, languages=["zh-CN", "zh"], vendor="Google Inc.", platform="Win32", fix_hairline=True)
             else:
-                # 给 Edge 一个轻量级补丁
-                try:
-                    self.driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", {
-                        "source": "Object.defineProperty(navigator, 'webdriver', { get: () => undefined });"
-                    })
-                except: pass
+                self.driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", {
+                    "source": "Object.defineProperty(navigator, 'webdriver', { get: () => undefined });"
+                })
 
     def _find_edge(self):
         import winreg
-        reg_paths = [
-            (winreg.HKEY_LOCAL_MACHINE, r"SOFTWARE\Microsoft\Windows\CurrentVersion\App Paths\msedge.exe"),
-            (winreg.HKEY_CURRENT_USER, r"SOFTWARE\Microsoft\Windows\CurrentVersion\App Paths\msedge.exe"),
-        ]
+        reg_paths = [(winreg.HKEY_LOCAL_MACHINE, r"SOFTWARE\Microsoft\Windows\CurrentVersion\App Paths\msedge.exe"), (winreg.HKEY_CURRENT_USER, r"SOFTWARE\Microsoft\Windows\CurrentVersion\App Paths\msedge.exe")]
         for hkey, subkey in reg_paths:
             try:
                 with winreg.OpenKey(hkey, subkey) as key:
@@ -112,10 +88,8 @@ class Scraper1688:
             if headless: opts.add_argument('--headless=new')
             opts.add_argument('--disable-blink-features=AutomationControlled')
             return opts
-        try:
-            self.driver = uc.Chrome(options=create_options(), headless=headless)
-        except:
-            self.driver = uc.Chrome(options=create_options(), headless=headless)
+        try: self.driver = uc.Chrome(options=create_options(), headless=headless)
+        except: self.driver = uc.Chrome(options=create_options(), headless=headless)
 
     def clean_url(self, url):
         if not url: return ""
@@ -138,7 +112,6 @@ class Scraper1688:
                 is_punish = "punish" in url or "哎哟喂" in src or "验证码" in title
                 return is_slider or is_login or is_punish
             except: return False
-        
         if is_blocked():
             msg = "请登录验证"
             if self.status_callback: self.status_callback(True, msg)
@@ -150,10 +123,8 @@ class Scraper1688:
     def search_products_yield(self, keyword, total_count=200, existing_links=None):
         gbk_keyword = urllib.parse.quote(keyword, encoding='gbk')
         base_url = f"https://s.1688.com/selloffer/offer_search.htm?keywords={gbk_keyword}&n=y&netType=1%2C11%2C16"
-        
         self.driver.get("https://www.1688.com")
         self.check_for_captcha()
-
         all_links = existing_links if existing_links is not None else set()
         page, initial_count = 1, len(all_links)
         
@@ -162,10 +133,12 @@ class Scraper1688:
             self.driver.get(f"{base_url}&beginPage={page}&page={page}")
             self.check_for_captcha()
             
-            for i in range(1, 5):
-                self.driver.execute_script(f"window.scrollTo(0, document.body.scrollHeight * {i/4});")
-                time.sleep(1)
-
+            # --- 优化:模拟人类不均匀滚动,降低滑块频率 ---
+            scroll_steps = random.randint(5, 10)
+            for i in range(1, scroll_steps + 1):
+                self.driver.execute_script(f"window.scrollTo(0, document.body.scrollHeight * {i/scroll_steps});")
+                time.sleep(random.uniform(0.5, 2.0))
+            
             page_results = self._extract_all_methods()
             if not page_results:
                 print(f"[!] 第 {page} 页无结果,尝试刷新...")
@@ -178,21 +151,30 @@ class Scraper1688:
                 clean_url = self.clean_url(it["link"])
                 if clean_url and clean_url not in all_links:
                     all_links.add(clean_url)
+                    
+                    # --- 优化:引入强制休息机制 ---
+                    current_new_count = len(all_links) - initial_count
+                    if current_new_count > 0 and current_new_count % 15 == 0:
+                        rest_time = random.randint(15, 30)
+                        print(f"[*] 为了账号安全,强制休息 {rest_time} 秒...")
+                        time.sleep(rest_time)
+
                     print(f"  [>] 抓取详情: {clean_url}")
+                    
+                    # 进入详情页前的微睡眠
+                    time.sleep(random.uniform(1.5, 3.5))
+                    
                     detail_results = self.scrape_detail(clean_url)
-                    if detail_results:
-                        page_batch.extend(detail_results)
-                    else:
-                        page_batch.append({
-                            "category": "", "brand": "", "name": it["name"],
-                            "color": "", "spec": "", "material": "", "price": "",
-                            "moq": "", "wholesale_price": "", "link": clean_url, "supplier": ""
-                        })
+                    if detail_results: page_batch.extend(detail_results)
+                    else: page_batch.append({"link": clean_url, "name": it["name"]})
                     
                     if len(page_batch) >= 10:
                         yield page_batch
                         page_batch = []
-                    time.sleep(random.uniform(2, 4))
+                    
+                    # --- 优化:详情页之间的大幅随机等待 ---
+                    time.sleep(random.uniform(6, 12))
+                    
                     if len(all_links) >= total_count + initial_count: break
             
             if page_batch: yield page_batch
@@ -204,7 +186,7 @@ class Scraper1688:
         """ 精准抓取详情页 """
         try:
             self.driver.get(url)
-            time.sleep(2)
+            time.sleep(random.uniform(2.5, 4.5)) # 详情页加载等待
             self.check_for_captcha()
             model = self.driver.execute_script(
                 "return (window.context && window.context.result && window.context.result.global && window.context.result.global.globalData && window.context.result.global.globalData.model) || window.__INITIAL_DATA__ || window.iDetailData || window.iDetailConfig || null;"
@@ -260,9 +242,8 @@ class Scraper1688:
         except: return None
 
     def _extract_all_methods(self):
-        """ 列表页全能提取 """
+        """ 列表页提取 """
         results = []
-        # 1. JSON
         try:
             res = self.driver.execute_script("return JSON.stringify(window.data || window.__INITIAL_DATA__)")
             if res:
@@ -279,7 +260,6 @@ class Scraper1688:
                     if link: results.append({"name": str(o.get('title', '')), "link": link})
         except: pass
 
-        # 2. DOM
         if not results:
             for s in [".search-offer-item", "[class*='offer-card']", ".offer-item"]:
                 elements = self.driver.find_elements(By.CSS_SELECTOR, s)
@@ -292,7 +272,6 @@ class Scraper1688:
                         except: continue
                     if results: break
         
-        # 3. Regex
         if not results:
             ids = re.findall(r'data-offer-id="(\d+)"', self.driver.page_source)
             for oid in set(ids):