LuTong vor 2 Monaten
Ursprung
Commit
c3d3ef2a8e
1 geänderte Dateien mit 44 neuen und 70 gelöschten Zeilen
  1. 44 70
      src/scraper.py

+ 44 - 70
src/scraper.py

@@ -34,10 +34,7 @@ class Scraper1688:
 
     def _find_chrome(self):
         import winreg
-        reg_paths = [
-            (winreg.HKEY_LOCAL_MACHINE, r"SOFTWARE\Microsoft\Windows\CurrentVersion\App Paths\chrome.exe"),
-            (winreg.HKEY_CURRENT_USER, r"SOFTWARE\Microsoft\Windows\CurrentVersion\App Paths\chrome.exe")
-        ]
+        reg_paths = [(winreg.HKEY_LOCAL_MACHINE, r"SOFTWARE\Microsoft\Windows\CurrentVersion\App Paths\chrome.exe"), (winreg.HKEY_CURRENT_USER, r"SOFTWARE\Microsoft\Windows\CurrentVersion\App Paths\chrome.exe")]
         for hkey, subkey in reg_paths:
             try:
                 with winreg.OpenKey(hkey, subkey) as key:
@@ -73,15 +70,15 @@ class Scraper1688:
             return opts
         try:
             self.driver = uc.Chrome(options=create_options(), headless=headless, browser_executable_path=chrome_path, use_subprocess=True)
-        except Exception as e:
+        except:
             self.driver = uc.Chrome(options=create_options(), headless=headless, use_subprocess=True)
 
     def clean_url(self, url):
+        """ 极其鲁棒的 ID 提取并转化为详情链接 """
         if not url: return ""
         if url.startswith("//"): url = "https:" + url
-        id_match = re.search(r'offer(?:Id|Ids)?/(\d+)\.html', url) or \
-                   re.search(r'[?&](?:offerId|offerIds|id)=(\d+)', url) or \
-                   re.search(r'object_id@(\d+)', url)
+        # 只要能提取出 9 位以上数字 ID,就视为合法商品
+        id_match = re.search(r'(\d{9,15})', url)
         if id_match:
             return f"https://detail.1688.com/offer/{id_match.group(1)}.html"
         return ""
@@ -89,17 +86,14 @@ class Scraper1688:
     def check_for_captcha(self):
         def is_blocked():
             try:
-                url, src, title = self.driver.current_url.lower(), self.driver.page_source.lower(), self.driver.title.lower()
+                url, src = self.driver.current_url.lower(), self.driver.page_source.lower()
                 sliders = self.driver.find_elements(By.ID, "nc_1_n1z")
-                is_slider = len(sliders) > 0 and sliders[0].is_displayed()
-                is_login = "login.1688.com" in url or "passport.1688.com" in url
-                is_punish = "punish" in url or "哎哟喂" in src or "验证码" in title
-                return is_slider or is_login or is_punish
+                return (len(sliders) > 0 and sliders[0].is_displayed()) or "login.1688.com" in url or "punish" in url or "哎哟喂" in src
             except: return False
         if is_blocked():
             msg = "请登录验证"
             if self.status_callback: self.status_callback(True, msg)
-            while is_blocked(): time.sleep(3)
+            while is_blocked(): time.sleep(2)
             if self.status_callback: self.status_callback(False, "验证通过")
             time.sleep(3)
         return True
@@ -118,46 +112,43 @@ class Scraper1688:
             self.check_for_captcha()
             for i in range(1, 6):
                 self.driver.execute_script(f"window.scrollTo(0, document.body.scrollHeight * {i/5});")
-                time.sleep(1.5)
+                time.sleep(1.2)
 
             page_results = self._extract_all_methods()
             print(f"  [+] 本页发现 {len(page_results)} 个商品链接")
             
             page_batch = []
             for it in page_results:
-                clean_url = self.clean_url(it["link"])
+                raw_link = it.get("link")
+                clean_url = self.clean_url(raw_link)
                 if clean_url and clean_url not in all_links:
                     all_links.add(clean_url)
-                    print(f"  [>] 抓取详情: {clean_url}")
+                    # 关键修改:此处必须进入详情页抓取
+                    print(f"  [>] 正在启动详情抓取: {clean_url}")
                     detail_results = self.scrape_detail(clean_url)
                     if detail_results:
                         page_batch.extend(detail_results)
                     else:
-                        page_batch.append({
-                            "category": "", "brand": "", "name": it.get("name", ""),
-                            "color": "", "spec": "", "material": "", "price": "",
-                            "moq": "", "wholesale_price": "", "link": clean_url, "supplier": ""
-                        })
+                        page_batch.append({"link": clean_url, "name": it.get("name", "未知"), "price": ""})
                     
                     if len(page_batch) >= 10:
                         yield page_batch
                         page_batch = []
-                    
-                    time.sleep(random.uniform(15, 30))
+                    time.sleep(random.uniform(15, 25)) 
                     if len(all_links) >= total_count + initial_count: break
             
             if page_batch: yield page_batch
             page += 1
-            if page % 2 == 0:
+            if page % 3 == 0:
                 self.driver.get("https://www.1688.com")
                 time.sleep(random.randint(10, 20))
         return list(all_links)
 
     def scrape_detail(self, url):
-        """ 深度提取变体逻辑:款式描述 + 逐条价格 """
+        """ 精准拆分款式与价格 """
         try:
             self.driver.get(url)
-            time.sleep(random.uniform(5, 10))
+            time.sleep(random.uniform(5, 8))
             self.check_for_captcha()
             model = self.driver.execute_script(
                 "return (window.context && window.context.result && "
@@ -188,10 +179,7 @@ class Scraper1688:
                 "name": (model.get("offerDetail", {}).get("subject", "") if isinstance(model, dict) else "") or self.driver.title.split('-')[0],
                 "spec": get_attr("尺码") or get_attr("规格") or get_attr("型号"),
                 "material": get_attr("材质") or get_attr("面料"),
-                "price": "", 
-                "moq": trade.get("beginAmount", ""),
-                "wholesale_price": range_text,
-                "link": url,
+                "price": "", "moq": trade.get("beginAmount", ""), "wholesale_price": range_text, "link": url,
                 "supplier": (model.get("sellerModel", {}).get("companyName", "") if isinstance(model, dict) else ""),
             }
 
@@ -211,51 +199,37 @@ class Scraper1688:
             if variant_data_list:
                 results = []
                 for vd in variant_data_list:
-                    row = base_data.copy()
-                    row["color"] = vd["label"]
-                    row["price"] = vd["price"]
-                    results.append(row)
+                    row = base_data.copy(); row["color"] = vd["label"]; row["price"] = vd["price"]; results.append(row)
                 return results
             return [base_data]
         except: return None
 
     def _extract_all_methods(self):
-        """ 强化版:对标 req.py 的 JS 变量探测 """
+        """ 全力提取列表页链接 """
         results = []
-        scripts = [
-            "return JSON.stringify(window.data || window.context?.result?.data || window.__INITIAL_DATA__)",
-            "return JSON.stringify(window.context?.result?.global?.globalData?.data || null)"
-        ]
-        for s in scripts:
-            try:
-                res = self.driver.execute_script(s)
-                if res and res != "null":
-                    data = json.loads(res)
-                    def find_list(obj):
-                        if isinstance(obj, list) and len(obj) > 0 and isinstance(obj[0], dict) and any(k in obj[0] for k in ['offerId', 'title', 'subject']): return obj
-                        if isinstance(obj, dict):
-                            for k in obj:
-                                f = find_list(obj[k])
-                                if f: return f
-                        return None
-                    for o in (find_list(data) or []):
-                        link = o.get('itemUrl', o.get('url', ''))
-                        if link and "similar_search" not in link:
-                            results.append({"name": str(o.get('title', o.get('subject', ''))), "link": link})
-                    if results: return results
-            except: continue
-        
-        selectors = [".sm-offer-item", ".offer-card-item", ".search-offer-item", "[class*='offer-card']", ".offer-item"]
-        for s in selectors:
-            elements = self.driver.find_elements(By.CSS_SELECTOR, s)
-            if len(elements) > 2:
-                for el in elements:
-                    try:
-                        link = el.find_element(By.TAG_NAME, "a").get_attribute("href")
-                        if link and "similar_search" not in link:
-                            results.append({"name": el.text.split('\n')[0][:50], "link": link})
-                    except: continue
-                if results: break
+        try:
+            res = self.driver.execute_script("return JSON.stringify(window.data || window.context?.result?.data || window.__INITIAL_DATA__)")
+            if res and res != "null":
+                data = json.loads(res)
+                def find_list(obj):
+                    if isinstance(obj, list) and len(obj) > 0 and isinstance(obj[0], dict) and any(k in obj[0] for k in ['offerId', 'title', 'subject']): return obj
+                    if isinstance(obj, dict):
+                        for k in obj:
+                            f = find_list(obj[k])
+                            if f: return f
+                    return None
+                for o in (find_list(data) or []):
+                    link = o.get('itemUrl', o.get('url', '')) or str(o.get('offerId', ''))
+                    if link: results.append({"name": str(o.get('title', '')), "link": link})
+                if results: return results
+        except: pass
+        for s in [".sm-offer-item", ".offer-card-item", "[class*='offer-card']"]:
+            for el in self.driver.find_elements(By.CSS_SELECTOR, s):
+                try:
+                    a = el.find_element(By.TAG_NAME, "a"); link = a.get_attribute("href")
+                    if link: results.append({"name": el.text.split('\n')[0][:50], "link": link})
+                except: continue
+            if results: break
         return results
 
     def quit(self):