LuTong пре 3 месеци
родитељ
комит
e953bdb64d
1 измењених фајлова са 150 додато и 47 уклоњено
  1. 150 47
      src/scraper.py

+ 150 - 47
src/scraper.py

@@ -24,17 +24,27 @@ class Scraper1688:
     def __init__(self, headless=True, status_callback=None, log_callback=None):
         self.headless = headless
         self.status_callback = status_callback
-        self.log_callback = log_callback
+        self.log_callback = log_callback # 用于向 GUI 发送普通日志
+        # 使用全新的独立目录,避开锁定冲突
         self.user_data_path = os.path.abspath(os.path.join(os.getcwd(), "chrome_stable_profile"))
         self.driver = None
+        
+        # 1. 强制清理残留,确保端口不被占用
         self._cleanup()
+        
+        # 2. 启动浏览器
         self._init_chrome(headless)
+        
         if self.driver:
             stealth(self.driver, languages=["zh-CN", "zh"], vendor="Google Inc.", platform="Win32", fix_hairline=True)
 
     def _find_chrome(self):
+        """ 通过注册表寻找 Chrome 精准安装路径 """
         import winreg
-        reg_paths = [(winreg.HKEY_LOCAL_MACHINE, r"SOFTWARE\Microsoft\Windows\CurrentVersion\App Paths\chrome.exe"), (winreg.HKEY_CURRENT_USER, r"SOFTWARE\Microsoft\Windows\CurrentVersion\App Paths\chrome.exe")]
+        reg_paths = [
+            (winreg.HKEY_LOCAL_MACHINE, r"SOFTWARE\Microsoft\Windows\CurrentVersion\App Paths\chrome.exe"),
+            (winreg.HKEY_CURRENT_USER, r"SOFTWARE\Microsoft\Windows\CurrentVersion\App Paths\chrome.exe")
+        ]
         for hkey, subkey in reg_paths:
             try:
                 with winreg.OpenKey(hkey, subkey) as key:
@@ -44,10 +54,14 @@ class Scraper1688:
         return None
 
     def _cleanup(self):
+        """ 杀掉所有残留进程,确保端口和文件未被锁定 """
         if os.name == 'nt':
             for proc in ['chrome.exe', 'chromedriver.exe']:
-                try: subprocess.call(['taskkill', '/F', '/IM', proc, '/T'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
+                try:
+                    subprocess.call(['taskkill', '/F', '/IM', proc, '/T'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
                 except: pass
+        
+        # 清理锁定文件
         if os.path.exists(self.user_data_path):
             for root, _, files in os.walk(self.user_data_path):
                 for f in files:
@@ -56,7 +70,9 @@ class Scraper1688:
                         except: pass
 
     def _init_chrome(self, headless):
+        """ 强化版启动:解决浏览器不弹出及连接重置报错 """
         chrome_path = self._find_chrome()
+        
         def create_options():
             opts = uc.ChromeOptions()
             opts.add_argument(f'user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36')
@@ -64,35 +80,72 @@ class Scraper1688:
             if headless: opts.add_argument('--headless=new')
             opts.add_argument('--disable-blink-features=AutomationControlled')
             opts.add_argument("--window-size=1920,1080")
+            # 兼容性全家桶
             opts.add_argument("--no-sandbox")
             opts.add_argument("--disable-dev-shm-usage")
             opts.add_argument("--remote-allow-origins=*")
+            opts.add_argument("--no-first-run")
+            opts.add_argument("--no-default-browser-check")
             return opts
+
+        print(f"[*] 正在物理启动 Chrome: {chrome_path}")
         try:
-            self.driver = uc.Chrome(options=create_options(), headless=headless, browser_executable_path=chrome_path, use_subprocess=True)
-        except Exception:
-            self.driver = uc.Chrome(options=create_options(), headless=headless, use_subprocess=True)
+            # 增加 use_subprocess=True,显著提升在 Win11 下的连接稳定性
+            self.driver = uc.Chrome(
+                options=create_options(), 
+                headless=headless, 
+                browser_executable_path=chrome_path,
+                use_subprocess=True
+            )
+            print("[+] Chrome 浏览器已成功弹出!")
+        except Exception as e:
+            print(f"[*] 首次启动失败 ({e}),尝试自动兼容模式...")
+            try:
+                self._cleanup()
+                time.sleep(2)
+                # 兜底方案:使用 subprocess
+                self.driver = uc.Chrome(options=create_options(), headless=headless, use_subprocess=True)
+                print("[+] 自动兼容模式启动成功!")
+            except Exception as e2:
+                print(f"[致命错误] 无法启动 Chrome: {e2}")
+                raise Exception("无法拉起 Chrome,请尝试关闭杀毒软件或重新安装 Chrome。")
 
     def clean_url(self, url):
+        """ 只要包含 9 位以上 ID,就强制转化为标准详情链接 """
         if not url: return ""
         url_str = str(url)
         if url_str.startswith("//"): url_str = "https:" + url_str
-        id_match = re.search(r'(\d{9,15})', url_str)
+        
+        # 1. 尝试匹配典型的 1688 offer ID 模式
+        id_match = re.search(r'offer/(\d{9,15})\.html', url_str) or \
+                   re.search(r'[?&](?:offerId|id)=(\d{9,15})', url_str)
+        
         if id_match:
             return f"https://detail.1688.com/offer/{id_match.group(1)}.html"
+        
+        # 2. 备选方案:匹配任何 9-15 位连续数字
+        id_match_alt = re.search(r'(\d{9,15})', url_str)
+        if id_match_alt:
+            return f"https://detail.1688.com/offer/{id_match_alt.group(1)}.html"
+            
         return ""
 
     def check_for_captcha(self):
+        """ 检测登录、滑块、验证等状态 """
         def is_blocked():
             try:
-                url, src = self.driver.current_url.lower(), self.driver.page_source.lower()
+                url, src, title = self.driver.current_url.lower(), self.driver.page_source.lower(), self.driver.title.lower()
                 sliders = self.driver.find_elements(By.ID, "nc_1_n1z")
-                return (len(sliders) > 0 and sliders[0].is_displayed()) or "login.1688.com" in url or "punish" in url or "哎哟喂" in src
+                is_slider = len(sliders) > 0 and sliders[0].is_displayed()
+                is_login = "login.1688.com" in url or "passport.1688.com" in url
+                is_punish = "punish" in url or "哎哟喂" in src or "验证码" in title
+                return is_slider or is_login or is_punish
             except: return False
+        
         if is_blocked():
             msg = "请登录验证"
             if self.status_callback: self.status_callback(True, msg)
-            while is_blocked(): time.sleep(2)
+            while is_blocked(): time.sleep(3)
             if self.status_callback: self.status_callback(False, "验证通过")
             time.sleep(3)
         return True
@@ -102,6 +155,7 @@ class Scraper1688:
         base_url = f"https://s.1688.com/selloffer/offer_search.htm?keywords={gbk_keyword}&n=y&netType=1%2C11%2C16"
         self.driver.get("https://www.1688.com")
         self.check_for_captcha()
+        
         all_links = existing_links if existing_links is not None else set()
         page, initial_count = 1, len(all_links)
         
@@ -109,45 +163,59 @@ class Scraper1688:
             print(f"[*] 正在处理列表页: 第 {page} 页...")
             self.driver.get(f"{base_url}&beginPage={page}&page={page}")
             self.check_for_captcha()
-            for i in range(1, 6):
-                self.driver.execute_script(f"window.scrollTo(0, document.body.scrollHeight * {i/5});")
-                time.sleep(1.5)
+            
+            # 增强型阶梯式滚动,确保懒加载内容全部加载
+            for i in range(1, 11):
+                self.driver.execute_script(f"window.scrollTo(0, document.body.scrollHeight * {i/10});")
+                time.sleep(1.2)
+            
+            # 额外等待时间,确保 JS 渲染完成
+            time.sleep(3)
 
             page_results = self._extract_all_methods()
-            print(f"  [+] 本页发现 {len(page_results)} 个原始条目")
+            print(f"  [+] 本页发现 {len(page_results)} 个商品原始条目")
             
             page_batch = []
             for it in page_results:
                 clean_url = self.clean_url(it["link"])
                 if clean_url and clean_url not in all_links:
                     all_links.add(clean_url)
-                    print(f"  [>] 正在执行详情抓取流程: {clean_url}")
+                    print(f"  [>] 抓取详情: {clean_url}")
                     detail_results = self.scrape_detail(clean_url)
                     if detail_results:
                         page_batch.extend(detail_results)
                     else:
-                        page_batch.append({"category": "", "brand": "", "name": it.get("name", "未知"), "color": "", "spec": "", "material": "", "price": "", "moq": "", "wholesale_price": "", "link": clean_url, "supplier": ""})
+                        page_batch.append({
+                            "category": "", "brand": "", "name": it.get("name", "未知"),
+                            "color": "", "spec": "", "material": "", "price": it.get("price", ""),
+                            "moq": "", "wholesale_price": "", "link": clean_url, "supplier": ""
+                        })
                     
                     if len(page_batch) >= 10:
                         yield page_batch
                         page_batch = []
-                    time.sleep(random.uniform(5, 10)) 
+                    
+                    time.sleep(random.uniform(5, 10)) # 保持较慢频率,避免被封
                     if len(all_links) >= total_count + initial_count: break
             
             if page_batch: yield page_batch
             page += 1
-            if page % 3 == 0:
+            if page % 2 == 0:
                 self.driver.get("https://www.1688.com")
                 time.sleep(random.randint(10, 20))
         return list(all_links)
 
     def scrape_detail(self, url):
+        """ 极其精准的详情页解析:完全同步自 req.py """
         try:
             self.driver.get(url)
-            time.sleep(random.uniform(5, 8))
+            time.sleep(random.uniform(5, 10))
             self.check_for_captcha()
             model = self.driver.execute_script(
-                "return (window.context && window.context.result && window.context.result.global && window.context.result.global.globalData && window.context.result.global.globalData.model) || window.__INITIAL_DATA__ || window.iDetailData || window.iDetailConfig || null;"
+                "return (window.context && window.context.result && "
+                "window.context.result.global && window.context.result.global.globalData "
+                "&& window.context.result.global.globalData.model) || "
+                "window.__INITIAL_DATA__ || window.iDetailData || window.iDetailConfig || null;"
             )
             if not model: return None
 
@@ -172,65 +240,100 @@ class Scraper1688:
                 "name": (model.get("offerDetail", {}).get("subject", "") if isinstance(model, dict) else "") or self.driver.title.split('-')[0],
                 "spec": get_attr("尺码") or get_attr("规格") or get_attr("型号"),
                 "material": get_attr("材质") or get_attr("面料"),
-                "price": "", "moq": trade.get("beginAmount", ""), "wholesale_price": range_text, "link": url,
+                "price": "", 
+                "moq": trade.get("beginAmount", ""),
+                "wholesale_price": range_text,
+                "link": url,
                 "supplier": (model.get("sellerModel", {}).get("companyName", "") if isinstance(model, dict) else ""),
             }
 
             variant_data_list = []
             try:
+                # 核心需求:从 expand-view-list-wrapper 中提取文字和价格
                 wrappers = self.driver.find_elements(By.CLASS_NAME, "expand-view-list-wrapper")
                 if wrappers:
                     items = wrappers[0].find_elements(By.CSS_SELECTOR, ".expand-view-list-item, [class*='list-item'], .sku-item")
                     for item_el in items:
                         try:
+                            # 款式描述文字 (item-label)
                             label = item_el.find_element(By.CLASS_NAME, "item-label").text.strip()
-                            price = item_el.find_element(By.CLASS_NAME, "item-price-stock").text.strip()
-                            if label: variant_data_list.append({"label": label, "price": re.sub(r'[^\d.]', '', price)})
+                            # 逐条价格 (item-price-stock)
+                            price_raw = item_el.find_element(By.CLASS_NAME, "item-price-stock").text.strip()
+                            # 清洗价格,只保留数字
+                            price_clean = re.sub(r'[^\d.]', '', price_raw)
+                            if label:
+                                variant_data_list.append({"label": label, "price": price_clean})
                         except: continue
             except: pass
 
             if variant_data_list:
                 results = []
                 for vd in variant_data_list:
-                    row = base_data.copy(); row["color"] = vd["label"]; row["price"] = vd["price"]; results.append(row)
+                    row = base_data.copy()
+                    row["color"] = vd["label"]
+                    row["price"] = vd["price"]
+                    results.append(row)
                 return results
+            
             return [base_data]
         except: return None
 
     def _extract_all_methods(self):
-        results = []
-        scripts = ["return JSON.stringify(window.data || window.context?.result?.data || window.__INITIAL_DATA__)", "return JSON.stringify(window.context?.result?.global?.globalData?.data || null)"]
+        """ 强化版列表链接提取:收集所有来源并去重 """
+        all_results = []
+        seen_ids = set()
+
+        def add_item(name, link, price=""):
+            cid = self.clean_url(link)
+            if cid and cid not in seen_ids:
+                seen_ids.add(cid)
+                all_results.append({"name": name, "link": cid, "price": price})
+
+        # 1. 内存 JS 变量探测 (深度扫描)
+        scripts = [
+            "return JSON.stringify(window.data || window.context?.result?.data || window.__INITIAL_DATA__)",
+            "return JSON.stringify(window.context?.result?.global?.globalData?.data || null)",
+            "return JSON.stringify(window.pageData || null)"
+        ]
         for s in scripts:
             try:
                 res = self.driver.execute_script(s)
                 if res and res != "null":
                     data = json.loads(res)
-                    def find_list(obj):
-                        if isinstance(obj, list) and len(obj) > 0 and isinstance(obj[0], dict) and any(k in obj[0] for k in ['offerId', 'title', 'subject']): return obj
+                    def find_lists(obj):
+                        lists = []
+                        if isinstance(obj, list) and len(obj) > 0 and isinstance(obj[0], dict) and any(k in obj[0] for k in ['offerId', 'title', 'subject']):
+                            lists.append(obj)
                         if isinstance(obj, dict):
                             for k in obj:
-                                f = find_list(obj[k])
-                                if f: return f
-                        return None
-                    for o in (find_list(data) or []):
-                        link = o.get('itemUrl', o.get('url', '')) or str(o.get('offerId', ''))
-                        if link: results.append({"name": str(o.get('title', o.get('subject', ''))), "link": link})
-                    if results: return results
+                                lists.extend(find_lists(obj[k]))
+                        return lists
+                    
+                    found_lists = find_lists(data)
+                    for product_list in found_lists:
+                        for o in product_list:
+                            link = o.get('itemUrl', o.get('url', '')) or str(o.get('offerId', ''))
+                            name = str(o.get('title', o.get('subject', o.get('name', ''))))
+                            price = o.get('price', '')
+                            add_item(name, link, price)
             except: continue
-        
-        links = self.driver.find_elements(By.TAG_NAME, "a")
-        seen_ids = set()
-        for l in links:
+
+        # 2. 最新 DOM 选择器扫描 (补全 JS 没抓到的)
+        selectors = [".sm-offer-item", ".offer-card-item", ".search-offer-item", "[class*='offer-card']", ".offer-item"]
+        for s in selectors:
             try:
-                href = l.get_attribute("href")
-                if href:
-                    id_match = re.search(r'offer/(\d{9,15})\.html', href)
-                    if id_match:
-                        oid = id_match.group(1)
-                        if oid not in seen_ids:
-                            seen_ids.add(oid); results.append({"name": l.text.split('\n')[0][:50] or f"商品-{oid}", "link": href})
+                elements = self.driver.find_elements(By.CSS_SELECTOR, s)
+                for el in elements:
+                    try:
+                        a_tags = el.find_elements(By.TAG_NAME, "a")
+                        for a in a_tags:
+                            href = a.get_attribute("href")
+                            if href:
+                                add_item(el.text.split('\n')[0][:50], href)
+                    except: continue
             except: continue
-        return results
+            
+        return all_results
 
     def quit(self):
         try: self.driver.quit()