LuTong 2 tháng trước cách đây
mục cha
commit
415bbb1849
3 tập tin đã thay đổi với 115 bổ sung246 xóa
  1. 12 9
      src/excel_handler.py
  2. 6 2
      src/gui.py
  3. 97 235
      src/scraper.py

+ 12 - 9
src/excel_handler.py

@@ -96,15 +96,18 @@ def append_to_template(products, output_path, status_callback=None):
         if link: existing_links.add(str(link).strip())
 
     # 2. 写入/更新计数 Sheet (第二个 Sheet)
-    sheet_names = wb.sheetnames
-    if len(sheet_names) < 2:
-        wb.create_sheet("统计状态")
-    
-    ws_stat = wb["统计状态"]
-    ws_stat.cell(row=1, column=1, value="已解析商品总数")
-    ws_stat.cell(row=1, column=2, value=len(existing_links))
-    ws_stat.cell(row=2, column=1, value="最后更新时间")
-    ws_stat.cell(row=2, column=2, value=time.strftime("%Y-%m-%d %H:%M:%S"))
+    try:
+        sheet_names = wb.sheetnames
+        if "统计状态" not in sheet_names:
+            wb.create_sheet("统计状态")
+        
+        ws_stat = wb["统计状态"]
+        ws_stat.cell(row=1, column=1, value="已解析商品总数")
+        ws_stat.cell(row=1, column=2, value=len(existing_links))
+        ws_stat.cell(row=2, column=1, value="最后更新时间")
+        ws_stat.cell(row=2, column=2, value=time.strftime("%Y-%m-%d %H:%M:%S"))
+    except Exception as e:
+        print(f"[!] 统计状态 Sheet 更新失败: {e}")
 
     # 3. 占用检测保存循环
     while True:

+ 6 - 2
src/gui.py

@@ -59,10 +59,14 @@ class ScraperThread(QThread):
                 try:
                     import openpyxl
                     wb_tmp = openpyxl.load_workbook(self.output_path, data_only=True)
+                    # 关键修改:先检查 Sheet 是否存在,避免 KeyError
                     if "统计状态" in wb_tmp.sheetnames:
-                        initial_p_count = int(wb_tmp["统计状态"].cell(row=1, column=2).value or 0)
+                        val = wb_tmp["统计状态"].cell(row=1, column=2).value
+                        initial_p_count = int(val) if val is not None else 0
                     wb_tmp.close()
-                except: pass
+                except Exception as e:
+                    print(f"[*] 读取初始商品计数失败 (可能文件尚不包含统计页): {e}")
+                    initial_p_count = 0
 
             product_index = initial_p_count
             

+ 97 - 235
src/scraper.py

@@ -25,18 +25,21 @@ class Scraper1688:
         self.headless = headless
         self.status_callback = status_callback
         self.log_callback = log_callback # 用于向 GUI 发送普通日志
-        self.user_data_path = os.path.abspath(os.path.join(os.getcwd(), "1688_user_data"))
+        # 使用全新的独立目录,避开锁定冲突
+        self.user_data_path = os.path.abspath(os.path.join(os.getcwd(), "chrome_stable_profile"))
         self.driver = None
         
-        # 初始化 Chrome 环境
+        # 1. 强制清理残留,解决 ConnectionResetError
         self._cleanup()
+        
+        # 2. 启动浏览器
         self._init_chrome(headless)
         
         if self.driver:
             stealth(self.driver, languages=["zh-CN", "zh"], vendor="Google Inc.", platform="Win32", fix_hairline=True)
 
     def _find_chrome(self):
-        """ 强力锁定 Chrome 安装路径 """
+        """ 通过注册表寻找 Chrome 精准安装路径 """
         import winreg
         reg_paths = [
             (winreg.HKEY_LOCAL_MACHINE, r"SOFTWARE\Microsoft\Windows\CurrentVersion\App Paths\chrome.exe"),
@@ -51,10 +54,14 @@ class Scraper1688:
         return None
 
     def _cleanup(self):
-        """ 清理残留进程和锁定文件 """
+        """ 杀掉所有残留进程,确保端口和文件未被锁定 """
         if os.name == 'nt':
-            subprocess.call(['taskkill', '/F', '/IM', 'chrome.exe', '/T'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
+            for proc in ['chrome.exe', 'chromedriver.exe']:
+                try:
+                    subprocess.call(['taskkill', '/F', '/IM', proc, '/T'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
+                except: pass
         
+        # 清理锁定文件
         if os.path.exists(self.user_data_path):
             for root, _, files in os.walk(self.user_data_path):
                 for f in files:
@@ -63,7 +70,7 @@ class Scraper1688:
                         except: pass
 
     def _init_chrome(self, headless):
-        """ 终极初始化:解决浏览器不弹出及连接失败问题 """
+        """ 强化版启动:解决浏览器不弹出及连接重置报错 """
         chrome_path = self._find_chrome()
         
         def create_options():
@@ -77,191 +84,129 @@ class Scraper1688:
             opts.add_argument("--no-sandbox")
             opts.add_argument("--disable-dev-shm-usage")
             opts.add_argument("--remote-allow-origins=*")
-            opts.add_argument("--disable-gpu")
-            opts.add_argument("--disable-software-rasterizer")
-            # 强制不检查默认浏览器
-            opts.add_argument("--no-default-browser-check")
             opts.add_argument("--no-first-run")
+            opts.add_argument("--no-default-browser-check")
             return opts
 
+        print(f"[*] 正在物理启动 Chrome: {chrome_path}")
         try:
-            print(f"[*] 正在通过路径启动 Chrome: {chrome_path}")
-            # 显式指定浏览器路径 (browser_executable_path)
+            # 增加 use_subprocess=True,显著提升在 Win11 下的连接稳定性
             self.driver = uc.Chrome(
                 options=create_options(), 
                 headless=headless, 
-                browser_executable_path=chrome_path
+                browser_executable_path=chrome_path,
+                use_subprocess=True
             )
             print("[+] Chrome 浏览器已成功弹出!")
         except Exception as e:
-            print(f"[*] 首次路径启动失败: {e},尝试自动兼容模式...")
+            print(f"[*] 首次启动失败 ({e}),尝试自动兼容模式...")
             try:
                 self._cleanup()
                 time.sleep(2)
                 # 兜底方案
-                self.driver = uc.Chrome(options=create_options(), headless=headless)
-                print("[+] Chrome 自动兼容模式启动成功!")
+                self.driver = uc.Chrome(options=create_options(), headless=headless, use_subprocess=True)
+                print("[+] 自动兼容模式启动成功!")
             except Exception as e2:
                 print(f"[致命错误] 无法启动 Chrome: {e2}")
-                raise Exception("请检查是否安装了 Chrome 浏览器,并尝试关闭杀毒软件后运行。")
+                raise Exception("无法拉起 Chrome,请尝试关闭杀毒软件或重新安装 Chrome。")
 
     def clean_url(self, url):
-        """极其鲁棒的 1688 URL 清洗逻辑"""
+        """ 【核心订正】极其鲁棒的 ID 提取逻辑,强制转化为详情页链接,过滤店铺页 """
         if not url: return ""
         if url.startswith("//"): url = "https:" + url
         
-        # 1. 尝试从路径中匹配 offer ID (标准 PC 链接)
-        id_match = re.search(r'offer/(\d+)\.html', url)
-        if id_match: return f"https://detail.1688.com/offer/{id_match.group(1)}.html"
+        # 1. 尝试从各种路径模式中提取纯数字商品 ID
+        id_match = re.search(r'offer/(\d+)\.html', url) or \
+                   re.search(r'[?&](?:offerId|id)=(\d+)', url) or \
+                   re.search(r'object_id@(\d+)', url)
         
-        # 2. 尝试从查询参数中提取 offerId (移动端或广告链接)
-        parsed = urllib.parse.urlparse(url)
-        params = urllib.parse.parse_qs(parsed.query)
-        oid = params.get('offerId') or params.get('id')
-        if oid: return f"https://detail.1688.com/offer/{oid[0]}.html"
+        if id_match:
+            # 只有提取到 ID 的才被认为是商品,统一转化为标准详情页格式
+            return f"https://detail.1688.com/offer/{id_match.group(1)}.html"
         
-        # 3. 针对某些特殊加密链接,尝试寻找 data-aplus-report 或类似字符串中的 ID
-        id_match_report = re.search(r'object_id@(\d+)', url)
-        if id_match_report: return f"https://detail.1688.com/offer/{id_match_report.group(1)}.html"
-        
-        return url
+        # 2. 如果没提取到 ID(说明是店铺首页、广告页等),返回空以过滤掉
+        return ""
 
     def check_for_captcha(self):
-        """
-        核心监控:检测登录、滑块验证、访问受限等需要人工干预的状态
-        """
+        """ 检测登录、滑块、验证等状态 """
         def is_blocked():
             try:
-                url = self.driver.current_url.lower()
-                src = self.driver.page_source.lower()
-                title = self.driver.title.lower()
-                
-                # 1. 检测滑块验证码
+                url, src, title = self.driver.current_url.lower(), self.driver.page_source.lower(), self.driver.title.lower()
                 sliders = self.driver.find_elements(By.ID, "nc_1_n1z")
                 is_slider = len(sliders) > 0 and sliders[0].is_displayed()
-                
-                # 2. 检测登录页面 (如果跳转到了登录页)
                 is_login = "login.1688.com" in url or "passport.1688.com" in url
-                
-                # 3. 检测惩罚/验证提示页
-                is_punish = "punish" in url or "哎哟喂" in src or "验证码" in title or "验证提示" in title
-                
+                is_punish = "punish" in url or "哎哟喂" in src or "验证码" in title
                 return is_slider or is_login or is_punish
-            except: 
-                return False
+            except: return False
         
         if is_blocked():
             msg = "请登录验证"
-            print(f"\n[!] {msg}...")
-            if self.status_callback:
-                self.status_callback(True, msg)
+            if self.status_callback: self.status_callback(True, msg)
+            while is_blocked(): time.sleep(3)
+            if self.status_callback: self.status_callback(False, "验证通过")
             
-            # 持续监控,直到上述所有拦截状态消失
-            while is_blocked():
-                time.sleep(2)
-                
-            if self.status_callback:
-                self.status_callback(False, "验证通过")
-            print("\n[OK] 监测到人工干预已完成,3秒后恢复自动抓取...")
-            time.sleep(3)
+            cool_msg = "[*] 解封成功,进入 120 秒冷却期以规避风控追溯..."
+            if self.log_callback: self.log_callback(f"<font color='orange'>{cool_msg}</font>")
+            time.sleep(120) 
         return True
 
     def search_products_yield(self, keyword, total_count=200, existing_links=None):
         gbk_keyword = urllib.parse.quote(keyword, encoding='gbk')
         base_url = f"https://s.1688.com/selloffer/offer_search.htm?keywords={gbk_keyword}&n=y&netType=1%2C11%2C16"
-        
-        # 初始检查:确保在开始抓取前没被拦截(比如没登录)
         self.driver.get("https://www.1688.com")
         self.check_for_captcha()
-
-        all_links = existing_links if existing_links is not None else set()
-        page = 1
-        consecutive_empty_pages = 0
         
-        # 记录初始抓取的链接数,用于计算进度
-        initial_count = len(all_links)
+        all_links = existing_links if existing_links is not None else set()
+        page, initial_count = 1, len(all_links)
         
-        while len(all_links) < total_count + initial_count and consecutive_empty_pages < 3:
-            print(f"[*] 正在搜索列表页: 第 {page} 页...")
-            target_url = f"{base_url}&beginPage={page}&page={page}"
-            self.driver.get(target_url)
-            
-            # 关键:首屏强制等待渲染
-            time.sleep(5)
+        while len(all_links) < total_count + initial_count:
+            print(f"[*] 正在处理列表页: 第 {page} 页...")
+            self.driver.get(f"{base_url}&beginPage={page}&page={page}")
             self.check_for_captcha()
-
-            # 深度滚动确保加载
-            for i in range(1, 4):
-                self.driver.execute_script(f"window.scrollTo(0, document.body.scrollHeight * {i/3});")
-                time.sleep(1)
+            for i in range(1, 5):
+                self.driver.execute_script(f"window.scrollTo(0, document.body.scrollHeight * {i/4});")
+                time.sleep(1.5)
 
             page_results = self._extract_all_methods()
-            
             page_batch = []
             for it in page_results:
                 clean_url = self.clean_url(it["link"])
+                # 核心改进:只处理清洗成功的商品链接
                 if clean_url and clean_url not in all_links:
                     all_links.add(clean_url)
-                    
-                    # 核心改进:进入详情页抓取精准数据
                     print(f"  [>] 抓取详情: {clean_url}")
                     detail_results = self.scrape_detail(clean_url)
-                    if detail_results:
-                        # detail_results 现在是一个列表 (包含多个颜色分类)
-                        page_batch.extend(detail_results)
-                    else:
-                        # 兜底
-                        it["link"] = clean_url
-                        page_batch.append({
-                            "category": "", "brand": "", "name": it["name"],
-                            "color": "", "spec": "", "material": "", "price": it["price"],
-                            "moq": "", "wholesale_price": "", "link": clean_url, "supplier": ""
-                        })
+                    if detail_results: page_batch.extend(detail_results)
+                    else: page_batch.append({
+                        "category": "", "brand": "", "name": it["name"],
+                        "color": "", "spec": "", "material": "", "price": it.get("price", ""),
+                        "moq": "", "wholesale_price": "", "link": clean_url, "supplier": ""
+                    })
                     
-                    # 每满 10 条 yield 一次
                     if len(page_batch) >= 10:
                         yield page_batch
                         page_batch = []
-
-                    # 详情页抓取后的随机等待
-                    time.sleep(random.uniform(2, 4))
                     
-                    if len(all_links) >= total_count + initial_count:
-                        break
+                    time.sleep(random.uniform(15, 30)) # 保持慢速,确保长效稳定
+                    if len(all_links) >= total_count + initial_count: break
             
-            # 每页结束,将不足 10 条的余数 yield 出去
-            if page_batch:
-                yield page_batch
-                page_batch = []
-
+            if page_batch: yield page_batch
             page += 1
-            if len(all_links) < total_count + initial_count:
-                print(f"[*] 累计已处理新链接: {len(all_links) - initial_count} 条,准备翻下一页...")
-                time.sleep(3)
-
+            if page % 2 == 0:
+                self.driver.get("https://www.1688.com")
+                time.sleep(random.randint(10, 20))
         return list(all_links)
 
     def scrape_detail(self, url):
-        """
-        极其精准的详情页抓取逻辑
-        支持获取特定 DOM 容器 (expand-view-list-wrapper) 中的款式文字 (item-label) 
-        及对应价格 (item-price-stock),并拆分为多行。
-        """
+        """ 精准拆分款式与价格 """
         try:
             self.driver.get(url)
-            time.sleep(2)
+            time.sleep(random.uniform(5, 10))
             self.check_for_captcha()
-            
-            # 1. 执行 JS 获取完整模型
             model = self.driver.execute_script(
-                "return (window.context && window.context.result && "
-                "window.context.result.global && window.context.result.global.globalData "
-                "&& window.context.result.global.globalData.model) || "
-                "window.__INITIAL_DATA__ || window.iDetailData || window.iDetailConfig || null;"
+                "return (window.context && window.context.result && window.context.result.global && window.context.result.global.globalData && window.context.result.global.globalData.model) || window.__INITIAL_DATA__ || window.iDetailData || window.iDetailConfig || null;"
             )
-            
-            if not model:
-                return None
+            if not model: return None
 
             def get_attr(name):
                 try:
@@ -274,24 +219,15 @@ class Scraper1688:
                 except: pass
                 return ""
 
-            def safe_text(by, sel):
-                try: return self.driver.find_element(by, sel).text.strip()
-                except: return ""
-
-            # 基础数据
             trade = model.get("tradeModel", {}) if isinstance(model, dict) else {}
             ranges = trade.get("disPriceRanges") or trade.get("currentPrices") or []
             range_text = " / ".join([f"{r.get('beginAmount')}起 ¥{r.get('price') or r.get('discountPrice')}" for r in ranges])
 
             base_data = {
-                "category": (model.get("offerDetail", {}).get("leafCategoryName", "") if isinstance(model, dict) else "")
-                           or safe_text(By.CSS_SELECTOR, "div[class*=breadcrumb] a:last-child"),
+                "category": (model.get("offerDetail", {}).get("leafCategoryName", "") if isinstance(model, dict) else "") or self.driver.find_element(By.CSS_SELECTOR, "div[class*=breadcrumb] a:last-child").text.strip(),
                 "brand": get_attr("品牌"),
-                "name": (model.get("offerDetail", {}).get("subject", "") if isinstance(model, dict) else "")
-                         or safe_text(By.CSS_SELECTOR, "h1.d-title") or self.driver.title.split('-')[0],
-                "color": "", 
-                "spec": get_attr("尺码") or get_attr("规格") or get_attr("型号") or \
-                        safe_text(By.XPATH, "//div[@id='productAttributes']//th[span='尺码' or span='规格']/following-sibling::td[1]//span[@class='field-value']"),
+                "name": (model.get("offerDetail", {}).get("subject", "") if isinstance(model, dict) else "") or self.driver.title.split('-')[0],
+                "spec": get_attr("尺码") or get_attr("规格") or get_attr("型号"),
                 "material": get_attr("材质") or get_attr("面料"),
                 "price": "", 
                 "moq": trade.get("beginAmount", ""),
@@ -300,134 +236,60 @@ class Scraper1688:
                 "supplier": (model.get("sellerModel", {}).get("companyName", "") if isinstance(model, dict) else ""),
             }
 
-            # 2. 核心逻辑:智能识别并拆分变体 (款式描述 + 价格)
             variant_data_list = []
             try:
-                # 寻找特定 DOM 容器
                 wrappers = self.driver.find_elements(By.CLASS_NAME, "expand-view-list-wrapper")
                 if wrappers:
-                    # 获取该容器下的所有子项条目
                     items = wrappers[0].find_elements(By.CSS_SELECTOR, ".expand-view-list-item, [class*='list-item'], .sku-item")
-                    
-                    if not items:
-                        # 兜底:直接寻找 label 和 price 元素并成对组合
-                        labels = wrappers[0].find_elements(By.CLASS_NAME, "item-label")
-                        prices = wrappers[0].find_elements(By.CLASS_NAME, "item-price-stock")
-                        for l, p in zip(labels, prices):
-                            variant_data_list.append({"label": l.text.strip(), "price": p.text.strip()})
-                    else:
-                        for item_el in items:
-                            try:
-                                label = item_el.find_element(By.CLASS_NAME, "item-label").text.strip()
-                                price = item_el.find_element(By.CLASS_NAME, "item-price-stock").text.strip()
-                                if label: variant_data_list.append({"label": label, "price": price})
-                            except: continue
+                    for item_el in items:
+                        try:
+                            label = item_el.find_element(By.CLASS_NAME, "item-label").text.strip()
+                            price = item_el.find_element(By.CLASS_NAME, "item-price-stock").text.strip()
+                            if label: variant_data_list.append({"label": label, "price": re.sub(r'[^\d.]', '', price)})
+                        except: continue
             except: pass
 
             if variant_data_list:
                 results = []
                 for vd in variant_data_list:
                     row = base_data.copy()
-                    row["color"] = vd["label"] # 款式描述填入“颜色”列
-                    # 清洗价格,只保留数字
-                    clean_p = re.sub(r'[^\d.]', '', vd["price"])
-                    row["price"] = clean_p if clean_p else vd["price"]
+                    row["color"] = vd["label"]
+                    row["price"] = vd["price"]
                     results.append(row)
                 return results
-
-            # 3. 如果 DOM 探测失败,尝试 JS 模型变体拆分
-            sku_props = model.get("skuModel", {}).get("skuProps", []) or model.get("detailData", {}).get("skuProps", []) or []
-            main_prop = next((p for p in sku_props if any(k in p.get("prop", "") for k in ["颜色", "分类", "款式", "花色", "净含量"])), None)
-            if not main_prop and sku_props: main_prop = sku_props[0]
-            
-            if main_prop and main_prop.get("value"):
-                variant_results = []
-                for val in main_prop["value"]:
-                    if val.get("name"):
-                        row = base_data.copy()
-                        row["color"] = val.get("name")
-                        # 此处尝试获取该变体对应的价格(如果模型中有)
-                        row["price"] = trade.get("minPrice", "")
-                        variant_results.append(row)
-                return variant_results
-            else:
-                # 最终兜底
-                base_data["color"] = get_attr("颜色") or get_attr("颜色分类") or ""
-                base_data["price"] = trade.get("minPrice", "")
-                return [base_data]
-
-        except Exception as e:
-            print(f"[!] 详情页抓取异常 ({url}): {e}")
-            return None
+            return [base_data]
+        except: return None
 
     def _extract_all_methods(self):
-        """三位一体提取法:JSON + DOM + 深度搜索"""
+        """ 列表页提取 """
         results = []
-        
-        # 1. JSON 提取 (window.data 或 window.__INITIAL_DATA__)
         try:
             res = self.driver.execute_script("return JSON.stringify(window.data || window.__INITIAL_DATA__)")
             if res:
                 data = json.loads(res)
                 def find_list(obj):
-                    if isinstance(obj, list) and len(obj) > 0:
-                        if 'title' in obj[0] or 'offerId' in obj[0]: return obj
+                    if isinstance(obj, list) and len(obj) > 0 and ('title' in obj[0] or 'offerId' in obj[0]): return obj
                     if isinstance(obj, dict):
                         for k in obj:
-                            found = find_list(obj[k])
-                            if found: return found
+                            f = find_list(obj[k])
+                            if f: return f
                     return None
-                raw = find_list(data) or []
-                for o in raw:
-                    title = str(o.get('title', o.get('name', ''))).replace('<em>','').replace('</em>','')
+                for o in (find_list(data) or []):
                     link = o.get('itemUrl', o.get('url', ''))
-                    price = o.get('priceInfo', {}).get('price', o.get('price', '面议'))
-                    if link: results.append({"name": title, "link": link, "price": price})
+                    # 过滤干扰链接
+                    if link and "similar_search" not in link:
+                        results.append({"name": str(o.get('title', '')), "link": link})
         except: pass
-
-        # 2. 增强版 DOM 扫描
-        if not results:
-            # 包含最新的选择器
-            selectors = [".search-offer-item", "[class*='offer-card']", ".offer-item", ".major-offer"]
-            for s in selectors:
-                cards = self.driver.find_elements(By.CSS_SELECTOR, s)
-                if len(cards) > 3:
-                    for el in cards:
-                        try:
-                            # 1. 链接提取:自身或子孙节点
-                            link = ""
-                            if el.tag_name == 'a':
-                                link = el.get_attribute("href")
-                            else:
-                                a_tags = el.find_elements(By.TAG_NAME, "a")
-                                for a in a_tags:
-                                    h = a.get_attribute("href")
-                                    if h and ("offer" in h or "item" in h or "ci_bb" in h):
-                                        link = h; break
-                            
-                            # 2. ID 补丁
-                            if not link or "1688.com" not in link:
-                                oid = el.get_attribute("data-offer-id") or el.get_attribute("data-id")
-                                if oid: link = f"https://detail.1688.com/offer/{oid}.html"
-                            
-                            if link:
-                                # 3. 标题和价格提取
-                                title = el.text.split('\n')[0][:50]
-                                price = "面议"
-                                try:
-                                    price_el = el.find_element(By.CSS_SELECTOR, ".text-main, [class*='price'], .amount")
-                                    price = price_el.text.strip().replace("¥", "")
-                                except: pass
-                                results.append({"name": title, "link": link, "price": price})
-                        except: continue
-                    if results: break # 成功一次就不再尝试其他选择器
-
-        # 3. 最后的保底:正则源码提取 (极其暴力)
         if not results:
-            ids = re.findall(r'data-offer-id="(\d+)"', self.driver.page_source)
-            for oid in set(ids):
-                results.append({"name": f"1688商品-{oid}", "link": f"https://detail.1688.com/offer/{oid}.html", "price": "面议"})
-                
+            for s in [".search-offer-item", "[class*='offer-card']", ".offer-item"]:
+                for el in self.driver.find_elements(By.CSS_SELECTOR, s):
+                    try:
+                        link = el.find_element(By.TAG_NAME, "a").get_attribute("href")
+                        # 只有包含详情特征的链接才提取
+                        if link and ("offer" in link or "item" in link) and "similar_search" not in link:
+                            results.append({"name": el.text.split('\n')[0][:50], "link": link})
+                    except: continue
+                if results: break
         return results
 
     def quit(self):