LuTong 3 meses atrás
pai
commit
c64db5eca3
1 arquivos alterados com 32 adições e 129 exclusões
  1. 32 129
      src/scraper.py

+ 32 - 129
src/scraper.py

@@ -13,12 +13,10 @@ except ImportError:
         def __str__(self): return str(self.v)
         def __str__(self): return str(self.v)
     v.LooseVersion = LooseVersion
     v.LooseVersion = LooseVersion
 
 
-import time, random, re, os, subprocess, urllib.parse, json, traceback
+import time, random, re, os, subprocess, urllib.parse, json, traceback, socket
 from selenium import webdriver
 from selenium import webdriver
 from selenium.webdriver.edge.options import Options as EdgeOptions
 from selenium.webdriver.edge.options import Options as EdgeOptions
 from selenium.webdriver.edge.service import Service as EdgeService
 from selenium.webdriver.edge.service import Service as EdgeService
-from webdriver_manager.microsoft import EdgeChromiumDriverManager
-import undetected_chromedriver as uc 
 from selenium.webdriver.common.by import By
 from selenium.webdriver.common.by import By
 from selenium.webdriver.common.action_chains import ActionChains
 from selenium.webdriver.common.action_chains import ActionChains
 from selenium_stealth import stealth
 from selenium_stealth import stealth
@@ -28,92 +26,40 @@ class Scraper1688:
         self.headless = headless
         self.headless = headless
         self.status_callback = status_callback # 用于回调 GUI 状态
         self.status_callback = status_callback # 用于回调 GUI 状态
         self.user_data_path = os.path.abspath(os.path.join(os.getcwd(), "1688_user_data"))
         self.user_data_path = os.path.abspath(os.path.join(os.getcwd(), "1688_user_data"))
-        self._cleanup()
         
         
-        # 1. 优先探测 Edge
-        edge_path = self._find_edge()
         self.driver = None
         self.driver = None
         
         
-        if edge_path:
-            print(f"[*] 检测到 Edge 浏览器,尝试启动: {edge_path}")
-            from selenium.webdriver.edge.options import Options as EdgeOptions
-            from selenium.webdriver.edge.service import Service as EdgeService
-            from webdriver_manager.microsoft import EdgeChromiumDriverManager
-            
+        # 1. 检查调试端口是否开启
+        if not self._is_port_open(9222):
+            print("[!] 未检测到开启 9222 端口的 Edge 浏览器")
+            raise Exception("请启动Edge浏览器")
+
+        print("[*] 监测到 Edge 调试端口已开启,正在尝试接管...")
+        
+        try:
             options = EdgeOptions()
             options = EdgeOptions()
-            options.binary_location = edge_path
-            options.add_argument(f"--user-data-dir={self.user_data_path}")
-            if headless: options.add_argument('--headless=new')
-            options.add_argument('--disable-blink-features=AutomationControlled')
-            options.add_argument("--window-size=1920,1080")
+            # 核心:连接到手动打开的浏览器
+            options.add_experimental_option("debuggerAddress", "127.0.0.1:9222")
             
             
-            # 解决权限和被占用的常见参数
-            options.add_argument("--no-sandbox")
-            options.add_argument("--disable-dev-shm-usage")
-            options.add_experimental_option("excludeSwitches", ["enable-automation"])
-            options.add_experimental_option('useAutomationExtension', False)
+            # 接管模式通常不需要 service,但为了防止 selenium 报错,传入一个
+            self.driver = webdriver.Edge(options=options)
+            
+            # 测试连接
+            _ = self.driver.current_url
+            print("[+] 成功接管 Edge 浏览器!")
             
             
-            try:
-                # 尝试自动安装驱动
-                driver_path = EdgeChromiumDriverManager().install()
-                print(f"[*] 使用 EdgeDriver: {driver_path}")
-                service = EdgeService(driver_path)
-                self.driver = webdriver.Edge(service=service, options=options)
-                print("[+] Edge 启动成功!")
-            except Exception as e:
-                # 捕获并打印完整错误
-                err_msg = traceback.format_exc()
-                print(f"[!] Edge 启动异常: \n{err_msg}")
-                print("[*] 正在尝试回退到 Chrome 模式...")
-        
-        if not self.driver:
-            print("[×] 没有Edge?🐈...")
-            # print("[*] 正在初始化 Chrome (undetected-chromedriver)...")
-            # self._init_chrome(headless)
-
-        if self.driver:
-            stealth(self.driver, languages=["zh-CN", "zh"], vendor="Google Inc.", platform="Win32", fix_hairline=True)
-
-    def _find_edge(self):
-        """ 探测 Windows 下 Edge 的常见安装路径 """
-        paths = [
-            r"C:\Program Files (x86)\Microsoft\Edge\Application\msedge.exe",
-            r"C:\Program Files\Microsoft\Edge\Application\msedge.exe",
-            os.path.expandvars(r"%LOCALAPPDATA%\Microsoft\Edge\Application\msedge.exe")
-        ]
-        for p in paths:
-            if os.path.exists(p): return p
-        return None
-
-    def _init_chrome(self, headless):
-        """ 原有的 Chrome (undetected-chromedriver) 初始化逻辑 """
-        def create_options():
-            opts = uc.ChromeOptions()
-            opts.add_argument(f"--user-data-dir={self.user_data_path}")
-            if headless: opts.add_argument('--headless=new')
-            opts.add_argument('--disable-blink-features=AutomationControlled')
-            opts.add_argument("--window-size=1920,1080")
-            return opts
-
-        try:
-            # 第一尝试:指定版本
-            self.driver = uc.Chrome(options=create_options(), headless=headless, version_main=131)
         except Exception as e:
         except Exception as e:
-            print(f"[*] 指定版本 Chrome 启动失败,尝试自动匹配: {e}")
-            # 关键:创建全新的 options 对象,避免 reuse 错误
-            self.driver = uc.Chrome(options=create_options(), headless=headless)
+            print(f"[!] 接管失败: {e}")
+            raise Exception("请启动Edge浏览器")
+
+        # 应用 stealth 增强隐蔽性
+        stealth(self.driver, languages=["zh-CN", "zh"], vendor="Google Inc.", platform="Win32", fix_hairline=True)
 
 
-    def _cleanup(self):
-        if os.name == 'nt':
-            for proc in ['chrome.exe', 'msedge.exe', 'edgedriver.exe', 'chromedriver.exe']:
-                subprocess.call(['taskkill', '/F', '/IM', proc, '/T'], 
-                                stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
-        if os.path.exists(self.user_data_path):
-            for root, _, files in os.walk(self.user_data_path):
-                for f in files:
-                    if "lock" in f.lower() or f == "SingletonLock":
-                        try: os.remove(os.path.join(root, f))
-                        except: pass
+    def _is_port_open(self, port):
+        """ 检查本地端口是否开放 """
+        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
+            s.settimeout(1)
+            return s.connect_ex(('127.0.0.1', port)) == 0
 
 
     def clean_url(self, url):
     def clean_url(self, url):
         """极其鲁棒的 1688 URL 清洗逻辑"""
         """极其鲁棒的 1688 URL 清洗逻辑"""
@@ -156,9 +102,6 @@ class Scraper1688:
                 # 3. 检测惩罚/验证提示页
                 # 3. 检测惩罚/验证提示页
                 is_punish = "punish" in url or "哎哟喂" in src or "验证码" in title or "验证提示" in title
                 is_punish = "punish" in url or "哎哟喂" in src or "验证码" in title or "验证提示" in title
                 
                 
-                # 4. 检测是否被登出 (如果页面包含登录按钮且当前是详情/搜索页)
-                # 这部分可以根据实际情况增强,目前主要靠 URL 判定
-                
                 return is_slider or is_login or is_punish
                 return is_slider or is_login or is_punish
             except: 
             except: 
                 return False
                 return False
@@ -179,20 +122,17 @@ class Scraper1688:
             time.sleep(3)
             time.sleep(3)
         return True
         return True
 
 
-    # def search_products_yield(self, keyword, total_count=200):
     def search_products_yield(self, keyword, total_count=200, existing_links=None):
     def search_products_yield(self, keyword, total_count=200, existing_links=None):
         gbk_keyword = urllib.parse.quote(keyword, encoding='gbk')
         gbk_keyword = urllib.parse.quote(keyword, encoding='gbk')
         base_url = f"https://s.1688.com/selloffer/offer_search.htm?keywords={gbk_keyword}&n=y&netType=1%2C11%2C16"
         base_url = f"https://s.1688.com/selloffer/offer_search.htm?keywords={gbk_keyword}&n=y&netType=1%2C11%2C16"
         
         
-        # 初始检查:确保在开始抓取前没被拦截(比如没登录)
+        # 初始检查
         self.driver.get("https://www.1688.com")
         self.driver.get("https://www.1688.com")
         self.check_for_captcha()
         self.check_for_captcha()
 
 
         all_links = existing_links if existing_links is not None else set()
         all_links = existing_links if existing_links is not None else set()
         page = 1
         page = 1
         consecutive_empty_pages = 0
         consecutive_empty_pages = 0
-        
-        # 记录初始抓取的链接数,用于计算进度
         initial_count = len(all_links)
         initial_count = len(all_links)
         
         
         while len(all_links) < total_count + initial_count and consecutive_empty_pages < 3:
         while len(all_links) < total_count + initial_count and consecutive_empty_pages < 3:
@@ -200,11 +140,9 @@ class Scraper1688:
             target_url = f"{base_url}&beginPage={page}&page={page}"
             target_url = f"{base_url}&beginPage={page}&page={page}"
             self.driver.get(target_url)
             self.driver.get(target_url)
             
             
-            # 关键:首屏强制等待渲染
             time.sleep(5)
             time.sleep(5)
             self.check_for_captcha()
             self.check_for_captcha()
 
 
-            # 深度滚动确保加载
             for i in range(1, 4):
             for i in range(1, 4):
                 self.driver.execute_script(f"window.scrollTo(0, document.body.scrollHeight * {i/3});")
                 self.driver.execute_script(f"window.scrollTo(0, document.body.scrollHeight * {i/3});")
                 time.sleep(1)
                 time.sleep(1)
@@ -217,14 +155,11 @@ class Scraper1688:
                 if clean_url and clean_url not in all_links:
                 if clean_url and clean_url not in all_links:
                     all_links.add(clean_url)
                     all_links.add(clean_url)
                     
                     
-                    # 核心改进:进入详情页抓取精准数据
                     print(f"  [>] 抓取详情: {clean_url}")
                     print(f"  [>] 抓取详情: {clean_url}")
                     detail_results = self.scrape_detail(clean_url)
                     detail_results = self.scrape_detail(clean_url)
                     if detail_results:
                     if detail_results:
-                        # detail_results 现在是一个列表 (包含多个颜色分类)
                         page_batch.extend(detail_results)
                         page_batch.extend(detail_results)
                     else:
                     else:
-                        # 兜底
                         it["link"] = clean_url
                         it["link"] = clean_url
                         page_batch.append({
                         page_batch.append({
                             "category": "", "brand": "", "name": it["name"],
                             "category": "", "brand": "", "name": it["name"],
@@ -232,18 +167,15 @@ class Scraper1688:
                             "moq": "", "wholesale_price": "", "link": clean_url, "supplier": ""
                             "moq": "", "wholesale_price": "", "link": clean_url, "supplier": ""
                         })
                         })
                     
                     
-                    # 每满 10 条 yield 一次
                     if len(page_batch) >= 10:
                     if len(page_batch) >= 10:
                         yield page_batch
                         yield page_batch
                         page_batch = []
                         page_batch = []
 
 
-                    # 详情页抓取后的随机等待
                     time.sleep(random.uniform(2, 4))
                     time.sleep(random.uniform(2, 4))
                     
                     
                     if len(all_links) >= total_count + initial_count:
                     if len(all_links) >= total_count + initial_count:
                         break
                         break
             
             
-            # 每页结束,将不足 10 条的余数 yield 出去
             if page_batch:
             if page_batch:
                 yield page_batch
                 yield page_batch
                 page_batch = []
                 page_batch = []
@@ -258,14 +190,13 @@ class Scraper1688:
     def scrape_detail(self, url):
     def scrape_detail(self, url):
         """
         """
         根据 /refe/req.py 订正的详情页抓取逻辑
         根据 /refe/req.py 订正的详情页抓取逻辑
-        获取极其精准的商品属性和价格数据,并支持将“颜色分类”拆分为多行
+        获取极其精准的商品属性和价格数据,并支持将规格拆分为多行
         """
         """
         try:
         try:
             self.driver.get(url)
             self.driver.get(url)
             time.sleep(2)
             time.sleep(2)
             self.check_for_captcha()
             self.check_for_captcha()
             
             
-            # 执行 JS 获取 1688 详情页背后的完整数据模型
             model = self.driver.execute_script(
             model = self.driver.execute_script(
                 "return (window.context && window.context.result && "
                 "return (window.context && window.context.result && "
                 "window.context.result.global && window.context.result.global.globalData "
                 "window.context.result.global && window.context.result.global.globalData "
@@ -277,13 +208,10 @@ class Scraper1688:
                 return None
                 return None
 
 
             def get_attr(name):
             def get_attr(name):
-                """从 featureAttributes 里取指定属性值"""
                 try:
                 try:
-                    # 现代版
                     attrs = model.get("offerDetail", {}).get("featureAttributes", [])
                     attrs = model.get("offerDetail", {}).get("featureAttributes", [])
                     for item in attrs:
                     for item in attrs:
                         if name in item.get("name", ""): return item.get("value", "")
                         if name in item.get("name", ""): return item.get("value", "")
-                    # 老版兼容
                     attrs = model.get("detailData", {}).get("attributes", [])
                     attrs = model.get("detailData", {}).get("attributes", [])
                     for item in attrs:
                     for item in attrs:
                         if name in item.get("attributeName", ""): return item.get("value", "")
                         if name in item.get("attributeName", ""): return item.get("value", "")
@@ -295,25 +223,21 @@ class Scraper1688:
                     return self.driver.find_element(by, sel).text.strip()
                     return self.driver.find_element(by, sel).text.strip()
                 except: return ""
                 except: return ""
 
 
-            # 价格处理逻辑
             trade = model.get("tradeModel", {}) if isinstance(model, dict) else {}
             trade = model.get("tradeModel", {}) if isinstance(model, dict) else {}
             price_min = trade.get("minPrice", "") or ""
             price_min = trade.get("minPrice", "") or ""
             price_max = trade.get("maxPrice", "") or ""
             price_max = trade.get("maxPrice", "") or ""
-            # 老版价格补丁
             if not price_min:
             if not price_min:
                 try: price_min = model["sku"]["priceRange"][0][1]
                 try: price_min = model["sku"]["priceRange"][0][1]
                 except: pass
                 except: pass
 
 
             begin_amount = trade.get("beginAmount", "")
             begin_amount = trade.get("beginAmount", "")
             
             
-            # 批发价区间
             ranges = trade.get("disPriceRanges") or trade.get("currentPrices") or \
             ranges = trade.get("disPriceRanges") or trade.get("currentPrices") or \
                      trade.get("offerPriceModel", {}).get("currentPrices", [])
                      trade.get("offerPriceModel", {}).get("currentPrices", [])
             range_text = " / ".join(
             range_text = " / ".join(
                 [f"{r.get('beginAmount')}起 ¥{r.get('price') or r.get('discountPrice')}" for r in ranges]
                 [f"{r.get('beginAmount')}起 ¥{r.get('price') or r.get('discountPrice')}" for r in ranges]
             ) if ranges else ""
             ) if ranges else ""
 
 
-            # 基础数据模板
             base_data = {
             base_data = {
                 "category": (model.get("offerDetail", {}).get("leafCategoryName", "") if isinstance(model, dict) else "")
                 "category": (model.get("offerDetail", {}).get("leafCategoryName", "") if isinstance(model, dict) else "")
                            or safe_text(By.CSS_SELECTOR, "div[class*=breadcrumb] a:last-child"),
                            or safe_text(By.CSS_SELECTOR, "div[class*=breadcrumb] a:last-child"),
@@ -321,7 +245,7 @@ class Scraper1688:
                 "name": (model.get("offerDetail", {}).get("subject", "") if isinstance(model, dict) else "")
                 "name": (model.get("offerDetail", {}).get("subject", "") if isinstance(model, dict) else "")
                          or safe_text(By.CSS_SELECTOR, "h1.d-title")
                          or safe_text(By.CSS_SELECTOR, "h1.d-title")
                          or safe_text(By.CSS_SELECTOR, "h1[class*=title]"),
                          or safe_text(By.CSS_SELECTOR, "h1[class*=title]"),
-                "color": "", # 待填充
+                "color": "", 
                 "spec": get_attr("尺码") or get_attr("规格") or get_attr("型号") or \
                 "spec": get_attr("尺码") or get_attr("规格") or get_attr("型号") or \
                         safe_text(By.XPATH, "//div[@id='productAttributes']//th[span='尺码' or span='规格']/following-sibling::td[1]//span[@class='field-value']"),
                         safe_text(By.XPATH, "//div[@id='productAttributes']//th[span='尺码' or span='规格']/following-sibling::td[1]//span[@class='field-value']"),
                 "material": get_attr("材质") or get_attr("面料") or \
                 "material": get_attr("材质") or get_attr("面料") or \
@@ -335,18 +259,13 @@ class Scraper1688:
                            or safe_text(By.CSS_SELECTOR, "div.company-name"),
                            or safe_text(By.CSS_SELECTOR, "div.company-name"),
             }
             }
 
 
-            # --- 核心逻辑:拆分规格/颜色分类 ---
             sku_props = []
             sku_props = []
             try:
             try:
-                # 尝试多种路径获取 SKU 属性
                 sku_props = model.get("skuModel", {}).get("skuProps", []) or \
                 sku_props = model.get("skuModel", {}).get("skuProps", []) or \
                             model.get("detailData", {}).get("skuProps", []) or \
                             model.get("detailData", {}).get("skuProps", []) or \
                             model.get("sku", {}).get("skuProps", [])
                             model.get("sku", {}).get("skuProps", [])
             except: pass
             except: pass
 
 
-            # 智能寻找主维度:
-            # 1. 优先找包含“颜色”、“分类”、“款式”、“花色”的维度
-            # 2. 如果没有,则取第一个 SKU 维度(例如“净含量”、“规格”等)
             main_prop = None
             main_prop = None
             if sku_props:
             if sku_props:
                 main_prop = next((p for p in sku_props if any(k in p.get("prop", "") for k in ["颜色", "分类", "款式", "花色"])), None)
                 main_prop = next((p for p in sku_props if any(k in p.get("prop", "") for k in ["颜色", "分类", "款式", "花色"])), None)
@@ -356,7 +275,6 @@ class Scraper1688:
             if main_prop and main_prop.get("value"):
             if main_prop and main_prop.get("value"):
                 variant_results = []
                 variant_results = []
                 for val in main_prop["value"]:
                 for val in main_prop["value"]:
-                    # 只有当该分类确实有名字时才记录
                     variant_name = val.get("name")
                     variant_name = val.get("name")
                     if variant_name:
                     if variant_name:
                         row = base_data.copy()
                         row = base_data.copy()
@@ -364,22 +282,16 @@ class Scraper1688:
                         variant_results.append(row)
                         variant_results.append(row)
                 return variant_results
                 return variant_results
             else:
             else:
-                # 兜底:如果没有发现规格选择区,则获取单属性颜色
                 base_data["color"] = get_attr("颜色") or get_attr("颜色分类") or ""
                 base_data["color"] = get_attr("颜色") or get_attr("颜色分类") or ""
                 return [base_data]
                 return [base_data]
 
 
         except Exception as e:
         except Exception as e:
-            print(f"[!] 详情页抓取异常 ({url}): {e}")
-            return None
-        except Exception as e:
-            print(f"[!] 详情页抓取异常 ({url}): {e}")
+            print(f"[!] 详情页抓取异常 ({url}): {traceback.format_exc()}")
             return None
             return None
 
 
     def _extract_all_methods(self):
     def _extract_all_methods(self):
         """三位一体提取法:JSON + DOM + 深度搜索"""
         """三位一体提取法:JSON + DOM + 深度搜索"""
         results = []
         results = []
-        
-        # 1. JSON 提取 (window.data 或 window.__INITIAL_DATA__)
         try:
         try:
             res = self.driver.execute_script("return JSON.stringify(window.data || window.__INITIAL_DATA__)")
             res = self.driver.execute_script("return JSON.stringify(window.data || window.__INITIAL_DATA__)")
             if res:
             if res:
@@ -400,16 +312,13 @@ class Scraper1688:
                     if link: results.append({"name": title, "link": link, "price": price})
                     if link: results.append({"name": title, "link": link, "price": price})
         except: pass
         except: pass
 
 
-        # 2. 增强版 DOM 扫描
         if not results:
         if not results:
-            # 包含最新的选择器
             selectors = [".search-offer-item", "[class*='offer-card']", ".offer-item", ".major-offer"]
             selectors = [".search-offer-item", "[class*='offer-card']", ".offer-item", ".major-offer"]
             for s in selectors:
             for s in selectors:
                 cards = self.driver.find_elements(By.CSS_SELECTOR, s)
                 cards = self.driver.find_elements(By.CSS_SELECTOR, s)
                 if len(cards) > 3:
                 if len(cards) > 3:
                     for el in cards:
                     for el in cards:
                         try:
                         try:
-                            # 1. 链接提取:自身或子孙节点
                             link = ""
                             link = ""
                             if el.tag_name == 'a':
                             if el.tag_name == 'a':
                                 link = el.get_attribute("href")
                                 link = el.get_attribute("href")
@@ -419,14 +328,10 @@ class Scraper1688:
                                     h = a.get_attribute("href")
                                     h = a.get_attribute("href")
                                     if h and ("offer" in h or "item" in h or "ci_bb" in h):
                                     if h and ("offer" in h or "item" in h or "ci_bb" in h):
                                         link = h; break
                                         link = h; break
-                            
-                            # 2. ID 补丁
                             if not link or "1688.com" not in link:
                             if not link or "1688.com" not in link:
                                 oid = el.get_attribute("data-offer-id") or el.get_attribute("data-id")
                                 oid = el.get_attribute("data-offer-id") or el.get_attribute("data-id")
                                 if oid: link = f"https://detail.1688.com/offer/{oid}.html"
                                 if oid: link = f"https://detail.1688.com/offer/{oid}.html"
-                            
                             if link:
                             if link:
-                                # 3. 标题和价格提取
                                 title = el.text.split('\n')[0][:50]
                                 title = el.text.split('\n')[0][:50]
                                 price = "面议"
                                 price = "面议"
                                 try:
                                 try:
@@ -435,14 +340,12 @@ class Scraper1688:
                                 except: pass
                                 except: pass
                                 results.append({"name": title, "link": link, "price": price})
                                 results.append({"name": title, "link": link, "price": price})
                         except: continue
                         except: continue
-                    if results: break # 成功一次就不再尝试其他选择器
+                    if results: break
 
 
-        # 3. 最后的保底:正则源码提取 (极其暴力)
         if not results:
         if not results:
             ids = re.findall(r'data-offer-id="(\d+)"', self.driver.page_source)
             ids = re.findall(r'data-offer-id="(\d+)"', self.driver.page_source)
             for oid in set(ids):
             for oid in set(ids):
                 results.append({"name": f"1688商品-{oid}", "link": f"https://detail.1688.com/offer/{oid}.html", "price": "面议"})
                 results.append({"name": f"1688商品-{oid}", "link": f"https://detail.1688.com/offer/{oid}.html", "price": "面议"})
-                
         return results
         return results
 
 
     def quit(self):
     def quit(self):