LuTong 3 months ago
parent
commit
40cd77f652
1 changed files with 93 additions and 175 deletions
  1. 93 175
      src/scraper.py

+ 93 - 175
src/scraper.py

@@ -15,8 +15,7 @@ except ImportError:
 
 import time, random, re, os, subprocess, urllib.parse, json, traceback, socket
 from selenium import webdriver
-from selenium.webdriver.edge.options import Options as EdgeOptions
-from selenium.webdriver.edge.service import Service as EdgeService
+import undetected_chromedriver as uc 
 from selenium.webdriver.common.by import By
 from selenium.webdriver.common.action_chains import ActionChains
 from selenium_stealth import stealth
@@ -26,60 +25,77 @@ class Scraper1688:
         self.headless = headless
         self.status_callback = status_callback # 用于回调 GUI 状态
         self.user_data_path = os.path.abspath(os.path.join(os.getcwd(), "1688_user_data"))
-        
         self.driver = None
         
-        # 1. 检查调试端口是否开启
-        if not self._is_port_open(9222):
-            print("[!] 未检测到开启 9222 端口的 Edge 浏览器")
-            raise Exception("请启动Edge浏览器")
+        # 1. 探测 Edge 路径
+        edge_path = self._find_edge()
+        if not edge_path:
+            raise Exception("电脑上未检测到 Edge 浏览器,请先安装。")
 
-        print("[*] 监测到 Edge 调试端口已开启,正在尝试接管...")
+        print(f"[*] 检测到 Edge: {edge_path},正在自动启动...")
         
+        # 2. 启动前强制清理残留进程,防止端口或用户目录被锁定
+        self._cleanup_processes()
+
+        # 3. 使用 undetected-chromedriver 强行驱动 Edge
         try:
-            options = EdgeOptions()
-            # 核心:连接到手动打开的浏览器
-            options.add_experimental_option("debuggerAddress", "127.0.0.1:9222")
+            options = uc.ChromeOptions()
+            options.binary_location = edge_path # 关键:指定使用 Edge 二进制文件
+            options.add_argument(f"--user-data-dir={self.user_data_path}")
+            if headless: options.add_argument('--headless=new')
+            options.add_argument('--disable-blink-features=AutomationControlled')
+            options.add_argument("--window-size=1920,1080")
             
-            # 接管模式通常不需要 service,但为了防止 selenium 报错,传入一个
-            self.driver = webdriver.Edge(options=options)
-            
-            # 测试连接
-            _ = self.driver.current_url
-            print("[+] 成功接管 Edge 浏览器!")
+            # 使用全新的 options 初始化,避免 reuse 错误
+            self.driver = uc.Chrome(options=options, headless=headless)
+            print("[+] Edge 浏览器已自动弹出并成功连接!")
             
         except Exception as e:
-            print(f"[!] 接管失败: {e}")
-            raise Exception("请启动Edge浏览器")
+            print(f"[!] 启动失败: {traceback.format_exc()}")
+            raise Exception(f"无法自动启动 Edge 浏览器: {e}")
 
         # 应用 stealth 增强隐蔽性
         stealth(self.driver, languages=["zh-CN", "zh"], vendor="Google Inc.", platform="Win32", fix_hairline=True)
 
-    def _is_port_open(self, port):
-        """ 检查本地端口是否开放 """
-        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
-            s.settimeout(1)
-            return s.connect_ex(('127.0.0.1', port)) == 0
+    def _find_edge(self):
+        """ 通过注册表获取 Windows 下 Edge 的精准路径 """
+        import winreg
+        reg_paths = [
+            (winreg.HKEY_LOCAL_MACHINE, r"SOFTWARE\Microsoft\Windows\CurrentVersion\App Paths\msedge.exe"),
+            (winreg.HKEY_CURRENT_USER, r"SOFTWARE\Microsoft\Windows\CurrentVersion\App Paths\msedge.exe"),
+        ]
+        for hkey, subkey in reg_paths:
+            try:
+                with winreg.OpenKey(hkey, subkey) as key:
+                    path, _ = winreg.QueryValueEx(key, "")
+                    if os.path.exists(path): return path
+            except: continue
+        
+        # 暴力路径补丁
+        common = [
+            r"C:\Program Files (x86)\Microsoft\Edge\Application\msedge.exe",
+            r"C:\Program Files\Microsoft\Edge\Application\msedge.exe"
+        ]
+        for p in common:
+            if os.path.exists(p): return p
+        return None
+
+    def _cleanup_processes(self):
+        """ 在启动前杀掉残留的 Edge 进程,确保 9222 端口可用 """
+        if os.name == 'nt':
+            subprocess.call(['taskkill', '/F', '/IM', 'msedge.exe', '/T'], 
+                            stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
 
     def clean_url(self, url):
         """极其鲁棒的 1688 URL 清洗逻辑"""
         if not url: return ""
         if url.startswith("//"): url = "https:" + url
-        
-        # 1. 尝试从路径中匹配 offer ID (标准 PC 链接)
         id_match = re.search(r'offer/(\d+)\.html', url)
         if id_match: return f"https://detail.1688.com/offer/{id_match.group(1)}.html"
-        
-        # 2. 尝试从查询参数中提取 offerId (移动端或广告链接)
         parsed = urllib.parse.urlparse(url)
         params = urllib.parse.parse_qs(parsed.query)
         oid = params.get('offerId') or params.get('id')
         if oid: return f"https://detail.1688.com/offer/{oid[0]}.html"
-        
-        # 3. 针对某些特殊加密链接,尝试寻找 data-aplus-report 或类似字符串中的 ID
-        id_match_report = re.search(r'object_id@(\d+)', url)
-        if id_match_report: return f"https://detail.1688.com/offer/{id_match_report.group(1)}.html"
-        
         return url
 
     def check_for_captcha(self):
@@ -91,34 +107,18 @@ class Scraper1688:
                 url = self.driver.current_url.lower()
                 src = self.driver.page_source.lower()
                 title = self.driver.title.lower()
-                
-                # 1. 检测滑块验证码
                 sliders = self.driver.find_elements(By.ID, "nc_1_n1z")
                 is_slider = len(sliders) > 0 and sliders[0].is_displayed()
-                
-                # 2. 检测登录页面 (如果跳转到了登录页)
                 is_login = "login.1688.com" in url or "passport.1688.com" in url
-                
-                # 3. 检测惩罚/验证提示页
-                is_punish = "punish" in url or "哎哟喂" in src or "验证码" in title or "验证提示" in title
-                
+                is_punish = "punish" in url or "哎哟喂" in src or "验证码" in title
                 return is_slider or is_login or is_punish
-            except: 
-                return False
+            except: return False
         
         if is_blocked():
             msg = "请登录验证"
-            print(f"\n[!] {msg}...")
-            if self.status_callback:
-                self.status_callback(True, msg)
-            
-            # 持续监控,直到上述所有拦截状态消失
-            while is_blocked():
-                time.sleep(2)
-                
-            if self.status_callback:
-                self.status_callback(False, "验证通过")
-            print("\n[OK] 监测到人工干预已完成,3秒后恢复自动抓取...")
+            if self.status_callback: self.status_callback(True, msg)
+            while is_blocked(): time.sleep(2)
+            if self.status_callback: self.status_callback(False, "验证通过")
             time.sleep(3)
         return True
 
@@ -126,41 +126,35 @@ class Scraper1688:
         gbk_keyword = urllib.parse.quote(keyword, encoding='gbk')
         base_url = f"https://s.1688.com/selloffer/offer_search.htm?keywords={gbk_keyword}&n=y&netType=1%2C11%2C16"
         
-        # 初始检查
+        # 首页预热,检查登录
         self.driver.get("https://www.1688.com")
         self.check_for_captcha()
 
         all_links = existing_links if existing_links is not None else set()
         page = 1
-        consecutive_empty_pages = 0
         initial_count = len(all_links)
         
-        while len(all_links) < total_count + initial_count and consecutive_empty_pages < 3:
+        while len(all_links) < total_count + initial_count:
             print(f"[*] 正在搜索列表页: 第 {page} 页...")
             target_url = f"{base_url}&beginPage={page}&page={page}"
             self.driver.get(target_url)
-            
-            time.sleep(5)
             self.check_for_captcha()
-
+            
             for i in range(1, 4):
                 self.driver.execute_script(f"window.scrollTo(0, document.body.scrollHeight * {i/3});")
                 time.sleep(1)
 
             page_results = self._extract_all_methods()
-            
             page_batch = []
             for it in page_results:
                 clean_url = self.clean_url(it["link"])
                 if clean_url and clean_url not in all_links:
                     all_links.add(clean_url)
-                    
                     print(f"  [>] 抓取详情: {clean_url}")
                     detail_results = self.scrape_detail(clean_url)
                     if detail_results:
                         page_batch.extend(detail_results)
                     else:
-                        it["link"] = clean_url
                         page_batch.append({
                             "category": "", "brand": "", "name": it["name"],
                             "color": "", "spec": "", "material": "", "price": it["price"],
@@ -170,42 +164,23 @@ class Scraper1688:
                     if len(page_batch) >= 10:
                         yield page_batch
                         page_batch = []
-
                     time.sleep(random.uniform(2, 4))
-                    
-                    if len(all_links) >= total_count + initial_count:
-                        break
+                    if len(all_links) >= total_count + initial_count: break
             
-            if page_batch:
-                yield page_batch
-                page_batch = []
-
+            if page_batch: yield page_batch
             page += 1
-            if len(all_links) < total_count + initial_count:
-                print(f"[*] 累计已处理新链接: {len(all_links) - initial_count} 条,准备翻下一页...")
-                time.sleep(3)
-
         return list(all_links)
 
     def scrape_detail(self, url):
-        """
-        根据 /refe/req.py 订正的详情页抓取逻辑
-        获取极其精准的商品属性和价格数据,并支持将规格拆分为多行
-        """
+        """ 抓取详情并拆分规格 """
         try:
             self.driver.get(url)
             time.sleep(2)
             self.check_for_captcha()
-            
             model = self.driver.execute_script(
-                "return (window.context && window.context.result && "
-                "window.context.result.global && window.context.result.global.globalData "
-                "&& window.context.result.global.globalData.model) || "
-                "window.__INITIAL_DATA__ || window.iDetailData || window.iDetailConfig || null;"
+                "return (window.context && window.context.result && window.context.result.global && window.context.result.global.globalData && window.context.result.global.globalData.model) || window.__INITIAL_DATA__ || window.iDetailData || window.iDetailConfig || null;"
             )
-            
-            if not model:
-                return None
+            if not model: return None
 
             def get_attr(name):
                 try:
@@ -219,133 +194,76 @@ class Scraper1688:
                 return ""
 
             def safe_text(by, sel):
-                try:
-                    return self.driver.find_element(by, sel).text.strip()
+                try: return self.driver.find_element(by, sel).text.strip()
                 except: return ""
 
             trade = model.get("tradeModel", {}) if isinstance(model, dict) else {}
             price_min = trade.get("minPrice", "") or ""
-            price_max = trade.get("maxPrice", "") or ""
             if not price_min:
                 try: price_min = model["sku"]["priceRange"][0][1]
                 except: pass
-
-            begin_amount = trade.get("beginAmount", "")
             
-            ranges = trade.get("disPriceRanges") or trade.get("currentPrices") or \
-                     trade.get("offerPriceModel", {}).get("currentPrices", [])
-            range_text = " / ".join(
-                [f"{r.get('beginAmount')}起 ¥{r.get('price') or r.get('discountPrice')}" for r in ranges]
-            ) if ranges else ""
+            ranges = trade.get("disPriceRanges") or trade.get("currentPrices") or []
+            range_text = " / ".join([f"{r.get('beginAmount')}起 ¥{r.get('price') or r.get('discountPrice')}" for r in ranges])
 
             base_data = {
-                "category": (model.get("offerDetail", {}).get("leafCategoryName", "") if isinstance(model, dict) else "")
-                           or safe_text(By.CSS_SELECTOR, "div[class*=breadcrumb] a:last-child"),
+                "category": (model.get("offerDetail", {}).get("leafCategoryName", "") if isinstance(model, dict) else "") or safe_text(By.CSS_SELECTOR, "div[class*=breadcrumb] a:last-child"),
                 "brand": get_attr("品牌"),
-                "name": (model.get("offerDetail", {}).get("subject", "") if isinstance(model, dict) else "")
-                         or safe_text(By.CSS_SELECTOR, "h1.d-title")
-                         or safe_text(By.CSS_SELECTOR, "h1[class*=title]"),
-                "color": "", 
-                "spec": get_attr("尺码") or get_attr("规格") or get_attr("型号") or \
-                        safe_text(By.XPATH, "//div[@id='productAttributes']//th[span='尺码' or span='规格']/following-sibling::td[1]//span[@class='field-value']"),
-                "material": get_attr("材质") or get_attr("面料") or \
-                            safe_text(By.XPATH, "//div[@id='productAttributes']//th[span='材质']/following-sibling::td[1]//span[@class='field-value']"),
-                "price": f"{price_min}-{price_max}" if price_min and price_max and price_min != price_max else f"{price_min}" if price_min else "",
-                "moq": begin_amount or safe_text(By.XPATH, "//div[@id='productAttributes']//th[span='起订量' or span='起批量']/following-sibling::td[1]//span[@class='field-value']"),
+                "name": (model.get("offerDetail", {}).get("subject", "") if isinstance(model, dict) else ""),
+                "spec": get_attr("尺码") or get_attr("规格") or get_attr("型号") or safe_text(By.XPATH, "//div[@id='productAttributes']//th[span='尺码' or span='规格']/following-sibling::td[1]//span[@class='field-value']"),
+                "material": get_attr("材质") or get_attr("面料") or safe_text(By.XPATH, "//div[@id='productAttributes']//th[span='材质']/following-sibling::td[1]//span[@class='field-value']"),
+                "price": price_min,
+                "moq": trade.get("beginAmount", ""),
                 "wholesale_price": range_text,
                 "link": url,
-                "supplier": (model.get("sellerModel", {}).get("companyName", "") if isinstance(model, dict) else "")
-                           or safe_text(By.CSS_SELECTOR, "a.company-name")
-                           or safe_text(By.CSS_SELECTOR, "div.company-name"),
+                "supplier": (model.get("sellerModel", {}).get("companyName", "") if isinstance(model, dict) else ""),
             }
 
             sku_props = []
             try:
-                sku_props = model.get("skuModel", {}).get("skuProps", []) or \
-                            model.get("detailData", {}).get("skuProps", []) or \
-                            model.get("sku", {}).get("skuProps", [])
+                sku_props = model.get("skuModel", {}).get("skuProps", []) or model.get("detailData", {}).get("skuProps", []) or model.get("sku", {}).get("skuProps", [])
             except: pass
 
             main_prop = None
             if sku_props:
                 main_prop = next((p for p in sku_props if any(k in p.get("prop", "") for k in ["颜色", "分类", "款式", "花色"])), None)
-                if not main_prop:
-                    main_prop = sku_props[0]
+                if not main_prop: main_prop = sku_props[0]
             
             if main_prop and main_prop.get("value"):
-                variant_results = []
+                results = []
                 for val in main_prop["value"]:
-                    variant_name = val.get("name")
-                    if variant_name:
+                    if val.get("name"):
                         row = base_data.copy()
-                        row["color"] = variant_name
-                        variant_results.append(row)
-                return variant_results
-            else:
-                base_data["color"] = get_attr("颜色") or get_attr("颜色分类") or ""
-                return [base_data]
-
-        except Exception as e:
-            print(f"[!] 详情页抓取异常 ({url}): {traceback.format_exc()}")
-            return None
+                        row["color"] = val.get("name")
+                        results.append(row)
+                return results
+            return [base_data]
+        except: return None
 
     def _extract_all_methods(self):
-        """三位一体提取法:JSON + DOM + 深度搜索"""
+        """ 列表页提取 """
         results = []
         try:
             res = self.driver.execute_script("return JSON.stringify(window.data || window.__INITIAL_DATA__)")
             if res:
                 data = json.loads(res)
                 def find_list(obj):
-                    if isinstance(obj, list) and len(obj) > 0:
-                        if 'title' in obj[0] or 'offerId' in obj[0]: return obj
+                    if isinstance(obj, list) and len(obj) > 0 and ('title' in obj[0] or 'offerId' in obj[0]): return obj
                     if isinstance(obj, dict):
                         for k in obj:
-                            found = find_list(obj[k])
-                            if found: return found
+                            f = find_list(obj[k])
+                            if f: return f
                     return None
-                raw = find_list(data) or []
-                for o in raw:
-                    title = str(o.get('title', o.get('name', ''))).replace('<em>','').replace('</em>','')
-                    link = o.get('itemUrl', o.get('url', ''))
-                    price = o.get('priceInfo', {}).get('price', o.get('price', '面议'))
-                    if link: results.append({"name": title, "link": link, "price": price})
+                for o in (find_list(data) or []):
+                    results.append({"name": str(o.get('title', '')), "link": o.get('itemUrl', ''), "price": ""})
         except: pass
-
-        if not results:
-            selectors = [".search-offer-item", "[class*='offer-card']", ".offer-item", ".major-offer"]
-            for s in selectors:
-                cards = self.driver.find_elements(By.CSS_SELECTOR, s)
-                if len(cards) > 3:
-                    for el in cards:
-                        try:
-                            link = ""
-                            if el.tag_name == 'a':
-                                link = el.get_attribute("href")
-                            else:
-                                a_tags = el.find_elements(By.TAG_NAME, "a")
-                                for a in a_tags:
-                                    h = a.get_attribute("href")
-                                    if h and ("offer" in h or "item" in h or "ci_bb" in h):
-                                        link = h; break
-                            if not link or "1688.com" not in link:
-                                oid = el.get_attribute("data-offer-id") or el.get_attribute("data-id")
-                                if oid: link = f"https://detail.1688.com/offer/{oid}.html"
-                            if link:
-                                title = el.text.split('\n')[0][:50]
-                                price = "面议"
-                                try:
-                                    price_el = el.find_element(By.CSS_SELECTOR, ".text-main, [class*='price'], .amount")
-                                    price = price_el.text.strip().replace("¥", "")
-                                except: pass
-                                results.append({"name": title, "link": link, "price": price})
-                        except: continue
-                    if results: break
-
         if not results:
-            ids = re.findall(r'data-offer-id="(\d+)"', self.driver.page_source)
-            for oid in set(ids):
-                results.append({"name": f"1688商品-{oid}", "link": f"https://detail.1688.com/offer/{oid}.html", "price": "面议"})
+            for s in [".search-offer-item", "[class*='offer-card']"]:
+                for el in self.driver.find_elements(By.CSS_SELECTOR, s):
+                    try:
+                        link = el.find_element(By.TAG_NAME, "a").get_attribute("href")
+                        results.append({"name": el.text.split('\n')[0], "link": link, "price": ""})
+                    except: continue
         return results
 
     def quit(self):