Jelajahi Sumber

Edge Repair

LuTong 3 bulan lalu
induk
melakukan
2a924d2130
1 mengubah file dengan 84 tambahan dan 71 penghapusan
  1. 84 71
      src/scraper.py

+ 84 - 71
src/scraper.py

@@ -30,44 +30,55 @@ class Scraper1688:
         self.user_data_path = os.path.abspath(os.path.join(os.getcwd(), "1688_user_data"))
         self.driver = None
         
+        # 1. 强力锁定长效 Session
         edge_path = self._find_edge()
         if edge_path:
-            print(f"[*] 检测到 Edge: {edge_path},正在全自动启动并接管...")
+            print(f"[*] 【长效稳定模式】正在启动 Edge 独立环境...")
             self._cleanup_processes()
-            edge_user_data = os.path.join(os.getcwd(), "1688_edge_profile")
-            cmd = [edge_path, "--remote-debugging-port=9222", f"--user-data-dir={edge_user_data}", "--no-first-run", "--no-default-browser-check"]
+            # 使用固定且持久的 Session 目录,确保 12 小时以上免登录
+            edge_user_data = os.path.join(os.getcwd(), "1688_edge_stable_session")
+            cmd = [
+                edge_path, 
+                "--remote-debugging-port=9222", 
+                f"--user-data-dir={edge_user_data}", 
+                "--no-first-run", 
+                "--no-default-browser-check",
+                "--disable-blink-features=AutomationControlled"
+            ]
             if headless: cmd.append("--headless")
             try:
                 subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
-                time.sleep(3) 
+                time.sleep(5) # 预留更充足的启动时间
                 opts = EdgeOptions()
                 opts.add_experimental_option("debuggerAddress", "127.0.0.1:9222")
                 try:
                     self.driver = webdriver.Edge(options=opts)
-                    print("[+] Edge 浏览器已成功接管!")
+                    print("[+] Edge 长效环境连接成功!")
                 except:
                     from webdriver_manager.microsoft import EdgeChromiumDriverManager
                     service = EdgeService(EdgeChromiumDriverManager().install())
                     self.driver = webdriver.Edge(service=service, options=opts)
-                    print("[+] Edge 浏览器已通过驱动管理接管!")
             except Exception as e:
-                print(f"[*] Edge 启动失败: {e}")
+                print(f"[!] Edge 启动失败: {e}")
         
         if not self.driver:
-            print("[*] 正在启动 Chrome (undetected-chromedriver) 模式...")
+            print("[*] 正在启动备用 Chrome 模式...")
             self._init_chrome(headless)
 
         if self.driver:
-            if "edge" not in str(type(self.driver)).lower():
-                stealth(self.driver, languages=["zh-CN", "zh"], vendor="Google Inc.", platform="Win32", fix_hairline=True)
-            else:
+            # 基础特征隐藏补丁
+            try:
                 self.driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", {
                     "source": "Object.defineProperty(navigator, 'webdriver', { get: () => undefined });"
                 })
+            except: pass
 
     def _find_edge(self):
         import winreg
-        reg_paths = [(winreg.HKEY_LOCAL_MACHINE, r"SOFTWARE\Microsoft\Windows\CurrentVersion\App Paths\msedge.exe"), (winreg.HKEY_CURRENT_USER, r"SOFTWARE\Microsoft\Windows\CurrentVersion\App Paths\msedge.exe")]
+        reg_paths = [
+            (winreg.HKEY_LOCAL_MACHINE, r"SOFTWARE\Microsoft\Windows\CurrentVersion\App Paths\msedge.exe"),
+            (winreg.HKEY_CURRENT_USER, r"SOFTWARE\Microsoft\Windows\CurrentVersion\App Paths\msedge.exe")
+        ]
         for hkey, subkey in reg_paths:
             try:
                 with winreg.OpenKey(hkey, subkey) as key:
@@ -85,46 +96,49 @@ class Scraper1688:
         def create_options():
             opts = uc.ChromeOptions()
             opts.add_argument(f"--user-data-dir={self.user_data_path}")
-            if headless: opts.add_argument('--headless=new')
-            opts.add_argument('--disable-blink-features=AutomationControlled')
             return opts
-        try: self.driver = uc.Chrome(options=create_options(), headless=headless)
-        except: self.driver = uc.Chrome(options=create_options(), headless=headless)
-
-    def clean_url(self, url):
-        if not url: return ""
-        if url.startswith("//"): url = "https:" + url
-        id_match = re.search(r'offer/(\d+)\.html', url)
-        if id_match: return f"https://detail.1688.com/offer/{id_match.group(1)}.html"
-        parsed = urllib.parse.urlparse(url)
-        params = urllib.parse.parse_qs(parsed.query)
-        oid = params.get('offerId') or params.get('id')
-        if oid: return f"https://detail.1688.com/offer/{oid[0]}.html"
-        return url
+        try:
+            self.driver = uc.Chrome(options=create_options(), headless=headless)
+        except:
+            self.driver = uc.Chrome(options=create_options(), headless=headless)
 
     def check_for_captcha(self):
         def is_blocked():
             try:
-                url, src, title = self.driver.current_url.lower(), self.driver.page_source.lower(), self.driver.title.lower()
+                url, src = self.driver.current_url.lower(), self.driver.page_source.lower()
                 sliders = self.driver.find_elements(By.ID, "nc_1_n1z")
-                is_slider = len(sliders) > 0 and sliders[0].is_displayed()
-                is_login = "login.1688.com" in url or "passport.1688.com" in url
-                is_punish = "punish" in url or "哎哟喂" in src or "验证码" in title
-                return is_slider or is_login or is_punish
+                return (len(sliders) > 0 and sliders[0].is_displayed()) or "login.1688.com" in url or "punish" in url or "哎哟喂" in src
             except: return False
+        
         if is_blocked():
             msg = "请登录验证"
             if self.status_callback: self.status_callback(True, msg)
-            while is_blocked(): time.sleep(2)
+            while is_blocked(): time.sleep(3)
             if self.status_callback: self.status_callback(False, "验证通过")
-            time.sleep(3)
+            print("[*] 解封成功,强制进入 60 秒安全静默期...")
+            time.sleep(60) 
         return True
 
+    def _human_interact(self):
+        """ 深度模拟人类交互,降低被识别概率 """
+        try:
+            actions = ActionChains(self.driver)
+            # 1. 随机小幅度鼠标移动
+            for _ in range(random.randint(3, 6)):
+                actions.move_by_offset(random.randint(-10, 10), random.randint(-10, 10)).perform()
+                time.sleep(random.uniform(0.3, 0.8))
+            # 2. 随机滚动一小段
+            self.driver.execute_script(f"window.scrollBy(0, {random.randint(100, 400)});")
+            time.sleep(random.uniform(1.0, 2.0))
+        except: pass
+
     def search_products_yield(self, keyword, total_count=200, existing_links=None):
         gbk_keyword = urllib.parse.quote(keyword, encoding='gbk')
         base_url = f"https://s.1688.com/selloffer/offer_search.htm?keywords={gbk_keyword}&n=y&netType=1%2C11%2C16"
+        
         self.driver.get("https://www.1688.com")
         self.check_for_captcha()
+
         all_links = existing_links if existing_links is not None else set()
         page, initial_count = 1, len(all_links)
         
@@ -133,12 +147,15 @@ class Scraper1688:
             self.driver.get(f"{base_url}&beginPage={page}&page={page}")
             self.check_for_captcha()
             
-            # --- 优化:模拟人类不均匀滚动,降低滑块频率 ---
-            scroll_steps = random.randint(5, 10)
-            for i in range(1, scroll_steps + 1):
-                self.driver.execute_script(f"window.scrollTo(0, document.body.scrollHeight * {i/scroll_steps});")
-                time.sleep(random.uniform(0.5, 2.0))
-            
+            # 高级模拟滚动:滑下去,停顿,回滑一点
+            steps = random.randint(6, 10)
+            for i in range(1, steps + 1):
+                self.driver.execute_script(f"window.scrollTo(0, document.body.scrollHeight * {i/steps});")
+                time.sleep(random.uniform(1.2, 3.0))
+                if i == random.randint(3, 5):
+                    self.driver.execute_script("window.scrollBy(0, -250);")
+                    time.sleep(1.0)
+
             page_results = self._extract_all_methods()
             if not page_results:
                 print(f"[!] 第 {page} 页无结果,尝试刷新...")
@@ -152,17 +169,15 @@ class Scraper1688:
                 if clean_url and clean_url not in all_links:
                     all_links.add(clean_url)
                     
-                    # --- 优化:引入强制休息机制 ---
-                    current_new_count = len(all_links) - initial_count
-                    if current_new_count > 0 and current_new_count % 15 == 0:
-                        rest_time = random.randint(15, 30)
-                        print(f"[*] 为了账号安全,强制休息 {rest_time} 秒...")
-                        time.sleep(rest_time)
+                    # --- 核心频率控制:深度保护机制 ---
+                    current_new = len(all_links) - initial_count
+                    if current_new > 0 and current_new % 8 == 0:
+                        rest = random.randint(90, 240)
+                        print(f"[*] 已连续作业8个详情,触发深度冷却 {rest} 秒以维持 12 小时 Session...")
+                        time.sleep(rest)
 
-                    print(f"  [>] 抓取详情: {clean_url}")
-                    
-                    # 进入详情页前的微睡眠
-                    time.sleep(random.uniform(1.5, 3.5))
+                    print(f"  [>] 详情解析: {clean_url}")
+                    time.sleep(random.uniform(4.0, 7.0)) # 访问前随机静默
                     
                     detail_results = self.scrape_detail(clean_url)
                     if detail_results: page_batch.extend(detail_results)
@@ -172,8 +187,8 @@ class Scraper1688:
                         yield page_batch
                         page_batch = []
                     
-                    # --- 优化:详情页之间的大幅随机等待 ---
-                    time.sleep(random.uniform(6, 12))
+                    # --- 极低频率抓取:详情页间超长等待 ---
+                    time.sleep(random.uniform(20, 45)) 
                     
                     if len(all_links) >= total_count + initial_count: break
             
@@ -183,11 +198,13 @@ class Scraper1688:
         return list(all_links)
 
     def scrape_detail(self, url):
-        """ 精准抓取详情页 """
+        """ 拟人化的详情数据提取 """
         try:
             self.driver.get(url)
-            time.sleep(random.uniform(2.5, 4.5)) # 详情页加载等待
+            time.sleep(random.uniform(5.0, 10.0)) # 充分加载
+            self._human_interact() # 执行随机交互
             self.check_for_captcha()
+            
             model = self.driver.execute_script(
                 "return (window.context && window.context.result && window.context.result.global && window.context.result.global.globalData && window.context.result.global.globalData.model) || window.__INITIAL_DATA__ || window.iDetailData || window.iDetailConfig || null;"
             )
@@ -209,7 +226,6 @@ class Scraper1688:
             if not price_min:
                 try: price_min = model["sku"]["priceRange"][0][1]
                 except: pass
-            
             ranges = trade.get("disPriceRanges") or trade.get("currentPrices") or []
             range_text = " / ".join([f"{r.get('beginAmount')}起 ¥{r.get('price') or r.get('discountPrice')}" for r in ranges])
 
@@ -241,8 +257,14 @@ class Scraper1688:
             return [base_data]
         except: return None
 
+    def clean_url(self, url):
+        if not url: return ""
+        id_match = re.search(r'offer/(\d+)\.html', url)
+        if id_match: return f"https://detail.1688.com/offer/{id_match.group(1)}.html"
+        return url
+
     def _extract_all_methods(self):
-        """ 列表页提取 """
+        """ 全力提取列表页链接 """
         results = []
         try:
             res = self.driver.execute_script("return JSON.stringify(window.data || window.__INITIAL_DATA__)")
@@ -259,23 +281,14 @@ class Scraper1688:
                     link = o.get('itemUrl', o.get('url', ''))
                     if link: results.append({"name": str(o.get('title', '')), "link": link})
         except: pass
-
         if not results:
             for s in [".search-offer-item", "[class*='offer-card']", ".offer-item"]:
-                elements = self.driver.find_elements(By.CSS_SELECTOR, s)
-                if len(elements) > 2:
-                    for el in elements:
-                        try:
-                            a = el.find_element(By.TAG_NAME, "a")
-                            link = a.get_attribute("href")
-                            if link: results.append({"name": el.text.split('\n')[0][:50], "link": link})
-                        except: continue
-                    if results: break
-        
-        if not results:
-            ids = re.findall(r'data-offer-id="(\d+)"', self.driver.page_source)
-            for oid in set(ids):
-                results.append({"name": f"1688商品-{oid}", "link": f"https://detail.1688.com/offer/{oid}.html"})
+                for el in self.driver.find_elements(By.CSS_SELECTOR, s):
+                    try:
+                        link = el.find_element(By.TAG_NAME, "a").get_attribute("href")
+                        if link: results.append({"name": el.text.split('\n')[0][:50], "link": link})
+                    except: continue
+                if results: break
         return results
 
     def quit(self):