LuTong пре 3 месеци
родитељ
комит
f381f841c6
1 измењених фајлова са 53 додато и 47 уклоњено
  1. 53 47
      src/scraper.py

+ 53 - 47
src/scraper.py

@@ -26,7 +26,7 @@ from selenium_stealth import stealth
 class Scraper1688:
     def __init__(self, headless=True, status_callback=None):
         self.headless = headless
-        self.status_callback = status_callback # 用于回调 GUI 状态
+        self.status_callback = status_callback
         self.user_data_path = os.path.abspath(os.path.join(os.getcwd(), "1688_user_data"))
         self.driver = None
         
@@ -35,10 +35,9 @@ class Scraper1688:
         
         if edge_path:
             print(f"[*] 检测到 Edge: {edge_path},正在全自动启动并接管...")
-            # 2. 清理旧进程,确保 9222 端口可用
             self._cleanup_processes()
             
-            # 3. 后台启动 Edge (带调试端口)
+            # 2. 后台启动 Edge (开启 9222 端口)
             edge_user_data = os.path.join(os.getcwd(), "1688_edge_profile")
             cmd = [
                 edge_path,
@@ -47,54 +46,47 @@ class Scraper1688:
                 "--no-first-run",
                 "--no-default-browser-check"
             ]
-            if headless: 
-                cmd.append("--headless")
+            if headless: cmd.append("--headless")
             
             try:
-                # 异步启动浏览器进程
                 subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
-                time.sleep(3) # 等待浏览器初始化
+                time.sleep(3) 
                 
-                # 4. 接管 Edge
+                # 3. 通过调试端口接管
                 opts = EdgeOptions()
                 opts.add_experimental_option("debuggerAddress", "127.0.0.1:9222")
                 
-                # 尝试连接
                 try:
-                    # 使用真正的 Edge 驱动类连接,解决“unrecognized Chrome version”报错
                     self.driver = webdriver.Edge(options=opts)
                     print("[+] Edge 浏览器已成功自动弹出并接管!")
                 except:
-                    # 如果连不上,尝试使用 webdriver_manager 自动下载匹配驱动
                     print("[*] 尝试自动下载匹配的 EdgeDriver...")
                     from webdriver_manager.microsoft import EdgeChromiumDriverManager
                     service = EdgeService(EdgeChromiumDriverManager().install())
                     self.driver = webdriver.Edge(service=service, options=opts)
-                    print("[+] Edge 浏览器已通过驱动管理接管成功!")
+                    print("[+] Edge 浏览器已接管成功!")
                     
             except Exception as e:
-                print(f"[*] Edge 自动接管模式失败,准备回退到 Chrome: {e}")
+                print(f"[*] Edge 自动接管失败,准备回退到 Chrome: {e}")
         
-        # 5. 兜底方案:如果 Edge 启动或接管失败,启动 Chrome
+        # 4. 兜底方案
         if not self.driver:
             print("[*] 正在启动 Chrome (undetected-chromedriver) 模式...")
             self._init_chrome(headless)
 
         if self.driver:
-            # 关键:只有在使用 Chrome 模式时才应用 stealth
-            # 接管模式下的 Edge 是真实的浏览器进程,本身就具备极高的隐蔽性
-            if "chrome" in str(type(self.driver)).lower() and "edge" not in str(type(self.driver)).lower():
+            # 只有在 Chrome 模式下才应用 stealth,Edge 接管模式本身就很真实
+            if "edge" not in str(type(self.driver)).lower():
                 stealth(self.driver, languages=["zh-CN", "zh"], vendor="Google Inc.", platform="Win32", fix_hairline=True)
             else:
-                # 针对 Edge 的轻量级反爬补丁(避开库类型检查错误)
-                self.driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", {
-                    "source": """
-                        Object.defineProperty(navigator, 'webdriver', { get: () => undefined });
-                    """
-                })
+                # 给 Edge 一个轻量级补丁
+                try:
+                    self.driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", {
+                        "source": "Object.defineProperty(navigator, 'webdriver', { get: () => undefined });"
+                    })
+                except: pass
 
     def _find_edge(self):
-        """ 通过注册表寻找 Edge 精准安装路径 """
         import winreg
         reg_paths = [
             (winreg.HKEY_LOCAL_MACHINE, r"SOFTWARE\Microsoft\Windows\CurrentVersion\App Paths\msedge.exe"),
@@ -109,19 +101,16 @@ class Scraper1688:
         return None
 
     def _cleanup_processes(self):
-        """ 清理残留的 Edge 和驱动进程 """
         if os.name == 'nt':
-            for proc in ['msedge.exe', 'msedgedriver.exe']:
+            for proc in ['msedge.exe', 'msedgedriver.exe', 'chromedriver.exe']:
                 subprocess.call(['taskkill', '/F', '/IM', proc, '/T'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
 
     def _init_chrome(self, headless):
-        """ 初始化 undetected-chromedriver (Chrome) """
         def create_options():
             opts = uc.ChromeOptions()
             opts.add_argument(f"--user-data-dir={self.user_data_path}")
             if headless: opts.add_argument('--headless=new')
             opts.add_argument('--disable-blink-features=AutomationControlled')
-            opts.add_argument("--window-size=1920,1080")
             return opts
         try:
             self.driver = uc.Chrome(options=create_options(), headless=headless)
@@ -129,7 +118,6 @@ class Scraper1688:
             self.driver = uc.Chrome(options=create_options(), headless=headless)
 
     def clean_url(self, url):
-        """极其鲁棒的 1688 URL 清洗逻辑"""
         if not url: return ""
         if url.startswith("//"): url = "https:" + url
         id_match = re.search(r'offer/(\d+)\.html', url)
@@ -141,7 +129,6 @@ class Scraper1688:
         return url
 
     def check_for_captcha(self):
-        """ 检测验证码、登录等干预状态 """
         def is_blocked():
             try:
                 url, src, title = self.driver.current_url.lower(), self.driver.page_source.lower(), self.driver.title.lower()
@@ -164,7 +151,6 @@ class Scraper1688:
         gbk_keyword = urllib.parse.quote(keyword, encoding='gbk')
         base_url = f"https://s.1688.com/selloffer/offer_search.htm?keywords={gbk_keyword}&n=y&netType=1%2C11%2C16"
         
-        # 预热并检查验证
         self.driver.get("https://www.1688.com")
         self.check_for_captcha()
 
@@ -172,16 +158,21 @@ class Scraper1688:
         page, initial_count = 1, len(all_links)
         
         while len(all_links) < total_count + initial_count:
-            print(f"[*] 正在搜索列表页: 第 {page} 页...")
-            target_url = f"{base_url}&beginPage={page}&page={page}"
-            self.driver.get(target_url)
+            print(f"[*] 正在处理列表页: 第 {page} 页...")
+            self.driver.get(f"{base_url}&beginPage={page}&page={page}")
             self.check_for_captcha()
             
-            for i in range(1, 4):
-                self.driver.execute_script(f"window.scrollTo(0, document.body.scrollHeight * {i/3});")
+            for i in range(1, 5):
+                self.driver.execute_script(f"window.scrollTo(0, document.body.scrollHeight * {i/4});")
                 time.sleep(1)
 
             page_results = self._extract_all_methods()
+            if not page_results:
+                print(f"[!] 第 {page} 页无结果,尝试刷新...")
+                self.driver.refresh()
+                time.sleep(5)
+                page_results = self._extract_all_methods()
+
             page_batch = []
             for it in page_results:
                 clean_url = self.clean_url(it["link"])
@@ -194,7 +185,7 @@ class Scraper1688:
                     else:
                         page_batch.append({
                             "category": "", "brand": "", "name": it["name"],
-                            "color": "", "spec": "", "material": "", "price": it["price"],
+                            "color": "", "spec": "", "material": "", "price": "",
                             "moq": "", "wholesale_price": "", "link": clean_url, "supplier": ""
                         })
                     
@@ -206,10 +197,11 @@ class Scraper1688:
             
             if page_batch: yield page_batch
             page += 1
+            if page > 100: break
         return list(all_links)
 
     def scrape_detail(self, url):
-        """ 抓取并解析详情页,支持主维度拆分 """
+        """ 精准抓取详情页 """
         try:
             self.driver.get(url)
             time.sleep(2)
@@ -242,7 +234,7 @@ class Scraper1688:
             base_data = {
                 "category": (model.get("offerDetail", {}).get("leafCategoryName", "") if isinstance(model, dict) else "") or self.driver.find_element(By.CSS_SELECTOR, "div[class*=breadcrumb] a:last-child").text.strip(),
                 "brand": get_attr("品牌"),
-                "name": (model.get("offerDetail", {}).get("subject", "") if isinstance(model, dict) else ""),
+                "name": (model.get("offerDetail", {}).get("subject", "") if isinstance(model, dict) else "") or self.driver.title.split('-')[0],
                 "spec": get_attr("尺码") or get_attr("规格") or get_attr("型号"),
                 "material": get_attr("材质") or get_attr("面料"),
                 "price": price_min,
@@ -268,8 +260,9 @@ class Scraper1688:
         except: return None
 
     def _extract_all_methods(self):
-        """ 列表页提取 """
+        """ 列表页全能提取 """
         results = []
+        # 1. JSON
         try:
             res = self.driver.execute_script("return JSON.stringify(window.data || window.__INITIAL_DATA__)")
             if res:
@@ -282,15 +275,28 @@ class Scraper1688:
                             if f: return f
                     return None
                 for o in (find_list(data) or []):
-                    results.append({"name": str(o.get('title', '')), "link": o.get('itemUrl', ''), "price": ""})
+                    link = o.get('itemUrl', o.get('url', ''))
+                    if link: results.append({"name": str(o.get('title', '')), "link": link})
         except: pass
+
+        # 2. DOM
+        if not results:
+            for s in [".search-offer-item", "[class*='offer-card']", ".offer-item"]:
+                elements = self.driver.find_elements(By.CSS_SELECTOR, s)
+                if len(elements) > 2:
+                    for el in elements:
+                        try:
+                            a = el.find_element(By.TAG_NAME, "a")
+                            link = a.get_attribute("href")
+                            if link: results.append({"name": el.text.split('\n')[0][:50], "link": link})
+                        except: continue
+                    if results: break
+        
+        # 3. Regex
         if not results:
-            for s in [".search-offer-item", "[class*='offer-card']"]:
-                for el in self.driver.find_elements(By.CSS_SELECTOR, s):
-                    try:
-                        link = el.find_element(By.TAG_NAME, "a").get_attribute("href")
-                        results.append({"name": el.text.split('\n')[0], "link": link, "price": ""})
-                    except: continue
+            ids = re.findall(r'data-offer-id="(\d+)"', self.driver.page_source)
+            for oid in set(ids):
+                results.append({"name": f"1688商品-{oid}", "link": f"https://detail.1688.com/offer/{oid}.html"})
         return results
 
     def quit(self):