LuTong 2 月之前
父節點
當前提交
482f792582
共有 1 個文件被更改,包括 85 次插入169 次删除
  1. 85 169
      src/scraper.py

+ 85 - 169
src/scraper.py

@@ -1,3 +1,4 @@
+# 【版本:20260115-终极订正版】
 # 针对 Python 3.12+ 移除 distutils 的兼容性补丁
 import sys
 try:
@@ -27,18 +28,13 @@ class Scraper1688:
         self.log_callback = log_callback
         self.user_data_path = os.path.abspath(os.path.join(os.getcwd(), "chrome_stable_profile"))
         self.driver = None
-        
-        # 1. 初始化清理
         self._cleanup()
-        
-        # 2. 启动浏览器
         self._init_chrome(headless)
-        
         if self.driver:
             stealth(self.driver, languages=["zh-CN", "zh"], vendor="Google Inc.", platform="Win32", fix_hairline=True)
 
     def _find_chrome(self):
-        """ 通过注册表寻找 Chrome 精准安装路径 """
+        """ 强力锁定 Chrome 安装路径 """
         import winreg
         reg_paths = [
             (winreg.HKEY_LOCAL_MACHINE, r"SOFTWARE\Microsoft\Windows\CurrentVersion\App Paths\chrome.exe"),
@@ -53,14 +49,10 @@ class Scraper1688:
         return None
 
     def _cleanup(self):
-        """ 杀掉所有残留进程,确保端口和文件未被锁定 """
         if os.name == 'nt':
             for proc in ['chrome.exe', 'chromedriver.exe']:
-                try:
-                    subprocess.call(['taskkill', '/F', '/IM', proc, '/T'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
+                try: subprocess.call(['taskkill', '/F', '/IM', proc, '/T'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
                 except: pass
-        
-        # 清理锁定文件
         if os.path.exists(self.user_data_path):
             for root, _, files in os.walk(self.user_data_path):
                 for f in files:
@@ -69,9 +61,7 @@ class Scraper1688:
                         except: pass
 
     def _init_chrome(self, headless):
-        """ 强化版启动:解决浏览器不弹出及连接重置报错 """
         chrome_path = self._find_chrome()
-        
         def create_options():
             opts = uc.ChromeOptions()
             opts.add_argument(f'user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36')
@@ -79,161 +69,120 @@ class Scraper1688:
             if headless: opts.add_argument('--headless=new')
             opts.add_argument('--disable-blink-features=AutomationControlled')
             opts.add_argument("--window-size=1920,1080")
-            # 兼容性全家桶
             opts.add_argument("--no-sandbox")
             opts.add_argument("--disable-dev-shm-usage")
             opts.add_argument("--remote-allow-origins=*")
-            opts.add_argument("--no-first-run")
-            opts.add_argument("--no-default-browser-check")
             return opts
-
-        print(f"[*] 正在物理启动 Chrome: {chrome_path}")
         try:
-            # 增加 use_subprocess=True,显著提升在 Win11 下的连接稳定性
-            self.driver = uc.Chrome(
-                options=create_options(), 
-                headless=headless, 
-                browser_executable_path=chrome_path,
-                use_subprocess=True
-            )
-            print("[+] Chrome 浏览器已成功弹出!")
+            self.driver = uc.Chrome(options=create_options(), headless=headless, browser_executable_path=chrome_path, use_subprocess=True)
         except Exception as e:
-            print(f"[*] 首次启动失败 ({e}),尝试自动兼容模式...")
-            try:
-                self._cleanup()
-                time.sleep(2)
-                # 兜底方案
-                self.driver = uc.Chrome(options=create_options(), headless=headless, use_subprocess=True)
-                print("[+] 自动兼容模式启动成功!")
-            except Exception as e2:
-                print(f"[致命错误] 无法启动 Chrome: {e2}")
-                raise Exception("无法拉起 Chrome,请尝试关闭杀毒软件或重新安装 Chrome。")
+            self.driver = uc.Chrome(options=create_options(), headless=headless, use_subprocess=True)
 
     def clean_url(self, url):
-        """ 提取链接中的 ID 并重组为标准详情页链接 """
+        """ 【关键订正】极其简化的 ID 提取逻辑,只要是商品就必须进入详情页 """
         if not url: return ""
-        if isinstance(url, str) and url.startswith("//"): url = "https:" + url
+        # 强制转换为字符串并处理
+        url_str = str(url)
+        if url_str.startswith("//"): url_str = "https:" + url_str
         
-        # 只要能提取出 9 位以上数字 ID,就视为合法商品链接
-        id_match = re.search(r'(\d{9,15})', str(url))
+        # 只要能匹配到连续的 9-15 位数字(1688 商品 ID 特征),就重组
+        id_match = re.search(r'(\d{9,15})', url_str)
         if id_match:
-            return f"https://detail.1688.com/offer/{id_match.group(1)}.html"
+            standard_url = f"https://detail.1688.com/offer/{id_match.group(1)}.html"
+            return standard_url
         return ""
 
     def check_for_captcha(self):
-        """ 检测登录、滑块、验证等状态 """
         def is_blocked():
             try:
-                url, src, title = self.driver.current_url.lower(), self.driver.page_source.lower(), self.driver.title.lower()
+                url, src = self.driver.current_url.lower(), self.driver.page_source.lower()
                 sliders = self.driver.find_elements(By.ID, "nc_1_n1z")
-                is_slider = len(sliders) > 0 and sliders[0].is_displayed()
-                is_login = "login.1688.com" in url or "passport.1688.com" in url
-                is_punish = "punish" in url or "哎哟喂" in src or "验证码" in title
-                return is_slider or is_login or is_punish
+                return (len(sliders) > 0 and sliders[0].is_displayed()) or "login.1688.com" in url or "punish" in url or "哎哟喂" in src
             except: return False
-        
         if is_blocked():
             msg = "请登录验证"
             if self.status_callback: self.status_callback(True, msg)
-            while is_blocked(): time.sleep(3)
+            while is_blocked(): time.sleep(2)
             if self.status_callback: self.status_callback(False, "验证通过")
-            
-            cool_msg = "[*] 解封成功,进入 120 秒冷却期以规避风控追溯..."
-            if self.log_callback: self.log_callback(f"<font color='orange'>{cool_msg}</font>")
-            time.sleep(120) 
+            time.sleep(3)
         return True
 
     def search_products_yield(self, keyword, total_count=200, existing_links=None):
         gbk_keyword = urllib.parse.quote(keyword, encoding='gbk')
         base_url = f"https://s.1688.com/selloffer/offer_search.htm?keywords={gbk_keyword}&n=y&netType=1%2C11%2C16"
-        
-        # 首页预热,检查登录
         self.driver.get("https://www.1688.com")
         self.check_for_captcha()
-
+        
         all_links = existing_links if existing_links is not None else set()
-        page = 1
-        initial_count = len(all_links)
+        page, initial_count = 1, len(all_links)
         
         while len(all_links) < total_count + initial_count:
             print(f"[*] 正在处理列表页: 第 {page} 页...")
-            target_url = f"{base_url}&beginPage={page}&page={page}"
-            self.driver.get(target_url)
+            self.driver.get(f"{base_url}&beginPage={page}&page={page}")
             self.check_for_captcha()
-            
-            # 模拟人类翻页滚动
             for i in range(1, 5):
                 self.driver.execute_script(f"window.scrollTo(0, document.body.scrollHeight * {i/4});")
-                time.sleep(1.5)
+                time.sleep(1.2)
 
-            # --- 核心订正:此处必须获取到 page_results ---
+            # 获取本页链接 (完全对标 req.py 变量探测)
             page_results = self._extract_all_methods()
-            print(f"  [+] 本页发现 {len(page_results)} 个潜在商品链接")
+            print(f"  [+] 本页发现 {len(page_results)} 个原始条目")
             
             page_batch = []
             for it in page_results:
                 raw_link = it.get("link")
                 clean_url = self.clean_url(raw_link)
                 
-                if clean_url and clean_url not in all_links:
-                    all_links.add(clean_url)
-                    
-                    # --- 核心改进:显式打印详情抓取日志并进入循环 ---
-                    print(f"  [>] 正在启动详情抓取: {clean_url}")
-                    detail_results = self.scrape_detail(clean_url)
-                    
-                    if detail_results:
-                        page_batch.extend(detail_results)
-                    else:
-                        # 兜底:如果详情页抓取失败,至少保留列表页基本信息
-                        page_batch.append({
-                            "category": "", "brand": "", "name": it.get("name", "未知商品"),
-                            "color": "", "spec": "", "material": "", "price": "",
-                            "moq": "", "wholesale_price": "", "link": clean_url, "supplier": ""
-                        })
-                    
-                    # 每满 10 条(或此时的批次)yield 一次给 GUI
-                    if len(page_batch) >= 10:
-                        yield page_batch
-                        page_batch = []
-                    
-                    # 抓取后的随机等待,维持长效免验证
-                    time.sleep(random.uniform(15, 30)) 
-                    
-                    if len(all_links) >= total_count + initial_count:
-                        break
-            
-            # 页末清算
-            if page_batch:
-                yield page_batch
-                page_batch = []
+                if not clean_url:
+                    continue
+                
+                if clean_url in all_links:
+                    print(f"  [-] 跳过已存在商品: {clean_url}")
+                    continue
 
+                all_links.add(clean_url)
+                # 【强制日志】只要进入这里,就一定会打印并执行详情抓取
+                print(f"  [>] 正在执行详情抓取流程: {clean_url}")
+                
+                detail_results = self.scrape_detail(clean_url)
+                if detail_results:
+                    page_batch.extend(detail_results)
+                else:
+                    # 即使详情失败也记录基本信息,防止死循环
+                    page_batch.append({
+                        "category": "", "brand": "", "name": it.get("name", "未知"),
+                        "color": "", "spec": "", "material": "", "price": "",
+                        "moq": "", "wholesale_price": "", "link": clean_url, "supplier": ""
+                    })
+                
+                if len(page_batch) >= 10:
+                    yield page_batch
+                    page_batch = []
+                
+                time.sleep(random.uniform(15, 25)) 
+                if len(all_links) >= total_count + initial_count: break
+            
+            if page_batch: yield page_batch
             page += 1
-            if page % 2 == 0:
-                self.driver.get("https://www.1688.com") # 随机回首页,重置指纹
+            if page % 3 == 0:
+                self.driver.get("https://www.1688.com")
                 time.sleep(random.randint(10, 20))
         return list(all_links)
 
     def scrape_detail(self, url):
-        """ 
-        完全对标 req.py 的详情页精准解析逻辑
-        支持 expand-view-list-wrapper 中的款式描述 + 价格 拆分
-        """
+        """ 精准解析:完全同步自 req.py 的模型获取逻辑 """
         try:
             self.driver.get(url)
-            time.sleep(random.uniform(5, 10)) # 给页面充分加载时间
+            time.sleep(random.uniform(5, 8))
             self.check_for_captcha()
-            
-            # 1. 执行 JS 获取完整模型 (req.py 核心思路)
+            # 执行 JS 获取核心模型 (完全对标 req.py)
             model = self.driver.execute_script(
                 "return (window.context && window.context.result && "
                 "window.context.result.global && window.context.result.global.globalData "
                 "&& window.context.result.global.globalData.model) || "
                 "window.__INITIAL_DATA__ || window.iDetailData || window.iDetailConfig || null;"
             )
-            
-            if not model:
-                return None
+            if not model: return None
 
             def get_attr(name):
                 try:
@@ -246,67 +195,45 @@ class Scraper1688:
                 except: pass
                 return ""
 
-            def safe_text(by, sel):
-                try: return self.driver.find_element(by, sel).text.strip()
-                except: return ""
-
-            # 解析批发价区间
             trade = model.get("tradeModel", {}) if isinstance(model, dict) else {}
-            ranges = trade.get("disPriceRanges") or trade.get("currentPrices") or []
-            range_text = " / ".join([f"{r.get('beginAmount')}起 ¥{r.get('price') or r.get('discountPrice')}" for r in ranges])
+            range_text = " / ".join([f"{r.get('beginAmount')}起 ¥{r.get('price') or r.get('discountPrice')}" for r in (trade.get("disPriceRanges") or trade.get("currentPrices") or [])])
 
             base_data = {
-                "category": (model.get("offerDetail", {}).get("leafCategoryName", "") if isinstance(model, dict) else "") or safe_text(By.CSS_SELECTOR, "div[class*=breadcrumb] a:last-child"),
+                "category": (model.get("offerDetail", {}).get("leafCategoryName", "") if isinstance(model, dict) else "") or self.driver.find_element(By.CSS_SELECTOR, "div[class*=breadcrumb] a:last-child").text.strip(),
                 "brand": get_attr("品牌"),
                 "name": (model.get("offerDetail", {}).get("subject", "") if isinstance(model, dict) else "") or self.driver.title.split('-')[0],
-                "spec": get_attr("尺码") or get_attr("规格") or get_attr("型号") or safe_text(By.XPATH, "//div[@id='productAttributes']//th[span='尺码' or span='规格']/following-sibling::td[1]//span[@class='field-value']"),
+                "spec": get_attr("尺码") or get_attr("规格") or get_attr("型号"),
                 "material": get_attr("材质") or get_attr("面料"),
-                "price": "", # 待填充
-                "moq": trade.get("beginAmount", ""),
-                "wholesale_price": range_text,
-                "link": url,
+                "price": "", "moq": trade.get("beginAmount", ""), "wholesale_price": range_text, "link": url,
                 "supplier": (model.get("sellerModel", {}).get("companyName", "") if isinstance(model, dict) else ""),
             }
 
-            # 2. 核心需求:智能识别并拆分变体 (款式 + 价格)
-            variant_results = []
+            variant_data_list = []
             try:
-                # 寻找用户指定的 DOM 区域
+                # 方案 A: 优先使用 expand-view-list-wrapper 获取款式和价格
                 wrappers = self.driver.find_elements(By.CLASS_NAME, "expand-view-list-wrapper")
                 if wrappers:
                     items = wrappers[0].find_elements(By.CSS_SELECTOR, ".expand-view-list-item, [class*='list-item'], .sku-item")
                     for item_el in items:
                         try:
-                            # 提取款式 (item-label) 和 价格 (item-price-stock)
                             label = item_el.find_element(By.CLASS_NAME, "item-label").text.strip()
-                            price_raw = item_el.find_element(By.CLASS_NAME, "item-price-stock").text.strip()
-                            # 过滤掉非数字
-                            price_clean = re.sub(r'[^\d.]', '', price_raw)
-                            
-                            if label:
-                                row = base_data.copy()
-                                row["color"] = label
-                                row["price"] = price_clean
-                                variant_results.append(row)
+                            price = item_el.find_element(By.CLASS_NAME, "item-price-stock").text.strip()
+                            if label: variant_data_list.append({"label": label, "price": re.sub(r'[^\d.]', '', price)})
                         except: continue
             except: pass
 
-            if variant_results:
-                return variant_results
-            
-            # 3. 兜底方案:如果没有变体列表,获取主价格
-            base_data["price"] = trade.get("minPrice", "")
-            base_data["color"] = get_attr("颜色") or get_attr("颜色分类") or ""
+            if variant_data_list:
+                results = []
+                for vd in variant_data_list:
+                    row = base_data.copy(); row["color"] = vd["label"]; row["price"] = vd["price"]; results.append(row)
+                return results
             return [base_data]
-
-        except Exception as e:
-            print(f"[!] 详情页抓取异常 ({url}): {e}")
-            return None
+        except: return None
 
     def _extract_all_methods(self):
-        """ 强化版列表页提取:对标 req.py 的 JS 变量探测 """
+        """ 强化版:全力探测 1688 列表页数据 (对标 req.py) """
         results = []
-        # 1. 优先使用脚本直接获取内存中的列表数据 (req.py 方式)
+        # 1. 深度内存变量扫描
         scripts = [
             "return JSON.stringify(window.data || window.context?.result?.data || window.__INITIAL_DATA__)",
             "return JSON.stringify(window.context?.result?.global?.globalData?.data || null)"
@@ -323,31 +250,20 @@ class Scraper1688:
                                 f = find_list(obj[k])
                                 if f: return f
                         return None
-                    
-                    found_items = find_list(data) or []
-                    for o in found_items:
+                    for o in (find_list(data) or []):
                         link = o.get('itemUrl', o.get('url', '')) or str(o.get('offerId', ''))
-                        if link:
-                            results.append({
-                                "name": str(o.get('title', o.get('subject', ''))),
-                                "link": link
-                            })
+                        if link: results.append({"name": str(o.get('title', o.get('subject', ''))), "link": link})
                     if results: return results
             except: continue
-
-        # 2. 暴力 DOM 扫描 (如果 JS 变量失效)
-        selectors = [".sm-offer-item", ".offer-card-item", ".pc-search-offer-item", "[class*='offer-card']", ".offer-item"]
-        for s in selectors:
-            elements = self.driver.find_elements(By.CSS_SELECTOR, s)
-            if len(elements) > 2:
-                for el in elements:
-                    try:
-                        a_tag = el.find_element(By.TAG_NAME, "a")
-                        link = a_tag.get_attribute("href")
-                        if link:
-                            results.append({"name": el.text.split('\n')[0][:50], "link": link})
-                    except: continue
-                if results: break
+        # 2. 暴力 DOM 选择器保底
+        for s in [".sm-offer-item", ".offer-card-item", ".pc-search-offer-item", "[class*='offer-card']", ".offer-item"]:
+            for el in self.driver.find_elements(By.CSS_SELECTOR, s):
+                try:
+                    a = el.find_element(By.TAG_NAME, "a")
+                    link = a.get_attribute("href")
+                    if link and "1688.com" in link: results.append({"name": el.text.split('\n')[0][:50], "link": link})
+                except: continue
+            if results: break
         return results
 
     def quit(self):