LuTong 2 месяцев назад
Родитель
Сommit
c7b5a1c817
3 измененных файлов с 124 добавлено и 133 удалено
  1. 34 9
      src/excel_handler.py
  2. 16 8
      src/gui.py
  3. 74 116
      src/scraper.py

+ 34 - 9
src/excel_handler.py

@@ -1,34 +1,52 @@
+# 【更新时间:2026-01-16 10:00】
 import sys
 import os
 import time
 from openpyxl import load_workbook
 
 def get_resource_path(relative_path):
+    """ 获取资源绝对路径,兼容开发环境和 PyInstaller 打包环境 """
     if hasattr(sys, '_MEIPASS'):
         return os.path.join(sys._MEIPASS, relative_path)
     base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
     return os.path.join(base_dir, relative_path)
 
 def get_existing_info(file_path):
+    """
+    读取已有文件中的链接和最后一行编码
+    """
     links = set()
     last_code = 0
     if not os.path.exists(file_path):
         return links, last_code
+    
     try:
         wb = load_workbook(file_path, data_only=True)
         ws = wb.active
+        # A 列是编码,K 列是链接
         for r in range(3, ws.max_row + 1):
             code_val = ws.cell(row=r, column=1).value
             link_val = ws.cell(row=r, column=11).value
-            if link_val: links.add(str(link_val).strip())
-            if isinstance(code_val, (int, float)): last_code = max(last_code, int(code_val))
-    except: pass
+            
+            if link_val:
+                links.add(str(link_val).strip())
+            
+            if isinstance(code_val, (int, float)):
+                last_code = max(last_code, int(code_val))
+    except:
+        pass
     return links, last_code
 
 def append_to_template(products, output_path, status_callback=None):
+    """
+    将产品数据追加写入到指定的 Excel 文件中。
+    并在第二个 Sheet 中记录商品总数用于断点续爬统计。
+    """
     template_path = get_resource_path(os.path.join('templates', '【进价】产品信息空表.xlsx'))
+    
     if not os.path.exists(template_path):
         template_path = os.path.join('templates', '【进价】产品信息空表.xlsx')
+
     if not os.path.exists(template_path):
         raise FileNotFoundError(f"未找到核心模板文件: {template_path}")
 
@@ -38,19 +56,24 @@ def append_to_template(products, output_path, status_callback=None):
         os.makedirs(os.path.dirname(output_path), exist_ok=True)
         wb = load_workbook(template_path)
     
+    # 1. 写入主数据 Sheet
     ws = wb.active
+    
+    # 寻找起始行 (基于第 11 列“产品链接”判定)
     start_row = 3
     for r in range(3, ws.max_row + 2):
         val_link = ws.cell(row=r, column=11).value
         if val_link is None or str(val_link).strip() == "":
             start_row = r
             break
-    else: start_row = ws.max_row + 1
+    else:
+        start_row = ws.max_row + 1
     
-    current_links = set()
+    # 获取已有链接用于 Sheet2 统计
+    all_links = set()
     for r in range(3, start_row):
-        link = ws.cell(row=r, column=11).value
-        if link: current_links.add(str(link).strip())
+        l = ws.cell(row=r, column=11).value
+        if l: all_links.add(str(l).strip())
 
     for i, product in enumerate(products):
         row = start_row + i
@@ -66,16 +89,18 @@ def append_to_template(products, output_path, status_callback=None):
         ws.cell(row=row, column=10, value=product.get('wholesale_price', ''))
         ws.cell(row=row, column=11, value=product.get('link', '')) 
         ws.cell(row=row, column=12, value=product.get('supplier', ''))
-        if product.get('link'): current_links.add(str(product['link']).strip())
+        if product.get('link'): all_links.add(str(product['link']).strip())
 
+    # 2. 写入/更新统计 Sheet
     if "统计状态" not in wb.sheetnames:
         wb.create_sheet("统计状态")
     ws_stat = wb["统计状态"]
     ws_stat.cell(row=1, column=1, value="已解析商品总数")
-    ws_stat.cell(row=1, column=2, value=len(current_links))
+    ws_stat.cell(row=1, column=2, value=len(all_links))
     ws_stat.cell(row=2, column=1, value="最后更新时间")
     ws_stat.cell(row=2, column=2, value=time.strftime("%Y-%m-%d %H:%M:%S"))
 
+    # 3. 占用检测循环保存
     while True:
         try:
             wb.save(output_path)

+ 16 - 8
src/gui.py

@@ -1,3 +1,4 @@
+# 【更新时间:2026-01-16 10:00】
 import sys
 import os
 import time
@@ -35,43 +36,48 @@ class ScraperThread(QThread):
         try:
             existing_links, _ = get_existing_info(self.output_path)
             
+            # 从 Excel 统计页获取起始商品序号
             initial_p_count = 0
             if os.path.exists(self.output_path):
                 try:
                     import openpyxl
                     wb_tmp = openpyxl.load_workbook(self.output_path, data_only=True)
                     if "统计状态" in wb_tmp.sheetnames:
-                        val = wb_tmp["统计状态"].cell(row=1, column=2).value
-                        initial_p_count = int(val) if val is not None else 0
+                        initial_p_count = int(wb_tmp["统计状态"].cell(row=1, column=2).value or 0)
                     wb_tmp.close()
                 except: pass
 
             self.log.emit(f"<b>[*] 任务启动: {self.keyword}</b>")
-            
+            if initial_p_count > 0:
+                self.log.emit(f"[*] 发现已有商品记录: {initial_p_count} 条,将接力计数...")
+
             def status_cb(is_waiting, msg):
                 if is_waiting: self.log.emit(f"<font color='red' size='5'><b>!!! {msg} !!!</b></font>")
                 else: self.log.emit(f"<font color='green'><b>[√] {msg}</b></font>")
 
             scraper = Scraper1688(headless=self.headless, status_callback=status_cb, log_callback=self.log.emit)
             
-            collected_count = 0
-            product_index = initial_p_count
+            collected_count = 0 # 本次抓取的数据行数
+            product_index = initial_p_count # 商品总序号计数
             
             for batch_results in scraper.search_products_yield(self.keyword, total_count=self.total_count, existing_links=existing_links):
                 append_to_template(batch_results, self.output_path, status_callback=status_cb)
                 
+                # 计算本次批次涉及的独立商品数
                 unique_links = len(set(item.get('link') for item in batch_results if item.get('link')))
                 product_index += unique_links
                 collected_count += len(batch_results)
                 
+                # 订正后的日志文字格式
                 self.log.emit(f"[+] 解析到第 {product_index} 个商品,新增数据已持久化: {len(batch_results)} 条,本次共计: {collected_count}")
                 
-                current_task_done = product_index - initial_p_count
-                prog = int((current_task_done / self.total_count) * 100)
+                # 进度条基于本次任务新增的商品数
+                task_progress = product_index - initial_p_count
+                prog = int((task_progress / self.total_count) * 100)
                 self.progress.emit(min(prog, 100))
             
             duration = time.time() - start_time
-            self.log.emit(f"<b>[完成] 任务结束,本次新增抓取 {collected_count} 条数据。</b>")
+            self.log.emit(f"<b>[完成] 任务结束,本次共解析 {product_index - initial_p_count} 个商品。</b>")
             self.log.emit(f"<b>[耗时] 处理总时间: {duration:.2f} 秒</b>")
             self.finished.emit("", scraper, duration)
         except Exception as e:
@@ -100,6 +106,7 @@ class MainWindow(QMainWindow):
         self.setCentralWidget(central_widget)
         main_layout = QHBoxLayout(central_widget)
 
+        # 左侧类目树
         left_widget = QWidget()
         left_layout = QVBoxLayout(left_widget)
         self.load_category_btn = QPushButton("选择类目文件")
@@ -113,6 +120,7 @@ class MainWindow(QMainWindow):
         left_layout.addWidget(self.load_category_btn)
         left_layout.addWidget(self.category_tree)
 
+        # 右侧操作区
         right_widget = QWidget()
         right_layout = QVBoxLayout(right_widget)
 

+ 74 - 116
src/scraper.py

@@ -1,4 +1,5 @@
-# 针对 Python 3.12+ 移除 distutils 的兼容性补丁
+# 【更新时间:2026-01-16 10:00】
+# 核心功能:支持变体拆分、精准提取款式与价格、对标 req.py 逻辑
 import sys
 try:
     import distutils
@@ -24,22 +25,17 @@ class Scraper1688:
     def __init__(self, headless=True, status_callback=None, log_callback=None):
         self.headless = headless
         self.status_callback = status_callback
-        self.log_callback = log_callback # 用于向 GUI 发送普通日志
-        # 使用全新的独立目录,避开锁定冲突
+        self.log_callback = log_callback
+        # 使用独立的 Profile 目录,避免并发冲突
         self.user_data_path = os.path.abspath(os.path.join(os.getcwd(), "chrome_stable_profile"))
         self.driver = None
-        
-        # 1. 强制清理残留,确保端口不被占用
         self._cleanup()
-        
-        # 2. 启动浏览器
         self._init_chrome(headless)
-        
         if self.driver:
             stealth(self.driver, languages=["zh-CN", "zh"], vendor="Google Inc.", platform="Win32", fix_hairline=True)
 
     def _find_chrome(self):
-        """ 通过注册表寻找 Chrome 精准安装路径 """
+        """ 强力锁定 Chrome 安装路径 """
         import winreg
         reg_paths = [
             (winreg.HKEY_LOCAL_MACHINE, r"SOFTWARE\Microsoft\Windows\CurrentVersion\App Paths\chrome.exe"),
@@ -54,14 +50,11 @@ class Scraper1688:
         return None
 
     def _cleanup(self):
-        """ 杀掉所有残留进程,确保端口和文件未被锁定 """
+        """ 强制杀掉残留进程,确保环境纯净 """
         if os.name == 'nt':
             for proc in ['chrome.exe', 'chromedriver.exe']:
-                try:
-                    subprocess.call(['taskkill', '/F', '/IM', proc, '/T'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
+                try: subprocess.call(['taskkill', '/F', '/IM', proc, '/T'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
                 except: pass
-        
-        # 清理锁定文件
         if os.path.exists(self.user_data_path):
             for root, _, files in os.walk(self.user_data_path):
                 for f in files:
@@ -70,7 +63,7 @@ class Scraper1688:
                         except: pass
 
     def _init_chrome(self, headless):
-        """ 强化版启动:解决浏览器不弹出及连接重置报错 """
+        """ 强化版 Chrome 启动逻辑 """
         chrome_path = self._find_chrome()
         
         def create_options():
@@ -80,72 +73,39 @@ class Scraper1688:
             if headless: opts.add_argument('--headless=new')
             opts.add_argument('--disable-blink-features=AutomationControlled')
             opts.add_argument("--window-size=1920,1080")
-            # 兼容性全家桶
             opts.add_argument("--no-sandbox")
             opts.add_argument("--disable-dev-shm-usage")
             opts.add_argument("--remote-allow-origins=*")
-            opts.add_argument("--no-first-run")
-            opts.add_argument("--no-default-browser-check")
             return opts
 
-        print(f"[*] 正在物理启动 Chrome: {chrome_path}")
         try:
-            # 增加 use_subprocess=True,显著提升在 Win11 下的连接稳定性
-            self.driver = uc.Chrome(
-                options=create_options(), 
-                headless=headless, 
-                browser_executable_path=chrome_path,
-                use_subprocess=True
-            )
-            print("[+] Chrome 浏览器已成功弹出!")
-        except Exception as e:
-            print(f"[*] 首次启动失败 ({e}),尝试自动兼容模式...")
-            try:
-                self._cleanup()
-                time.sleep(2)
-                # 兜底方案:使用 subprocess
-                self.driver = uc.Chrome(options=create_options(), headless=headless, use_subprocess=True)
-                print("[+] 自动兼容模式启动成功!")
-            except Exception as e2:
-                print(f"[致命错误] 无法启动 Chrome: {e2}")
-                raise Exception("无法拉起 Chrome,请尝试关闭杀毒软件或重新安装 Chrome。")
+            # 优先使用 subprocess 模式启动,解决 Win11 连接难题
+            self.driver = uc.Chrome(options=create_options(), headless=headless, browser_executable_path=chrome_path, use_subprocess=True)
+        except:
+            # 失败则尝试普通模式,每次都使用 fresh options
+            self.driver = uc.Chrome(options=create_options(), headless=headless, use_subprocess=True)
 
     def clean_url(self, url):
-        """ 只要包含 9 位以上 ID,就强制转化为标准详情链接 """
+        """ 鲁棒的 ID 提取 logic """
         if not url: return ""
         url_str = str(url)
         if url_str.startswith("//"): url_str = "https:" + url_str
-        
-        # 1. 尝试匹配典型的 1688 offer ID 模式
-        id_match = re.search(r'offer/(\d{9,15})\.html', url_str) or \
-                   re.search(r'[?&](?:offerId|id)=(\d{9,15})', url_str)
-        
+        id_match = re.search(r'(\d{9,15})', url_str)
         if id_match:
             return f"https://detail.1688.com/offer/{id_match.group(1)}.html"
-        
-        # 2. 备选方案:匹配任何 9-15 位连续数字
-        id_match_alt = re.search(r'(\d{9,15})', url_str)
-        if id_match_alt:
-            return f"https://detail.1688.com/offer/{id_match_alt.group(1)}.html"
-            
         return ""
 
     def check_for_captcha(self):
-        """ 检测登录、滑块、验证等状态 """
         def is_blocked():
             try:
-                url, src, title = self.driver.current_url.lower(), self.driver.page_source.lower(), self.driver.title.lower()
+                url, src = self.driver.current_url.lower(), self.driver.page_source.lower()
                 sliders = self.driver.find_elements(By.ID, "nc_1_n1z")
-                is_slider = len(sliders) > 0 and sliders[0].is_displayed()
-                is_login = "login.1688.com" in url or "passport.1688.com" in url
-                is_punish = "punish" in url or "哎哟喂" in src or "验证码" in title
-                return is_slider or is_login or is_punish
+                return (len(sliders) > 0 and sliders[0].is_displayed()) or "login.1688.com" in url or "punish" in url or "哎哟喂" in src
             except: return False
-        
         if is_blocked():
             msg = "请登录验证"
             if self.status_callback: self.status_callback(True, msg)
-            while is_blocked(): time.sleep(3)
+            while is_blocked(): time.sleep(2)
             if self.status_callback: self.status_callback(False, "验证通过")
             time.sleep(3)
         return True
@@ -164,12 +124,13 @@ class Scraper1688:
             self.driver.get(f"{base_url}&beginPage={page}&page={page}")
             self.check_for_captcha()
             
-            # 增强型阶梯式滚动,确保懒加载内容全部加载
+            # --- 强化:模拟真实人类分段滚动,触发懒加载 ---
             for i in range(1, 11):
                 self.driver.execute_script(f"window.scrollTo(0, document.body.scrollHeight * {i/10});")
-                time.sleep(1.2)
-            
-            # 额外等待时间,确保 JS 渲染完成
+                time.sleep(1.5)
+                if i == 5: # 中途回滑
+                    self.driver.execute_script("window.scrollBy(0, -300);")
+                    time.sleep(1.0)
             time.sleep(3)
 
             page_results = self._extract_all_methods()
@@ -177,17 +138,18 @@ class Scraper1688:
             
             page_batch = []
             for it in page_results:
-                clean_url = self.clean_url(it["link"])
+                clean_url = self.clean_url(it.get("link"))
                 if clean_url and clean_url not in all_links:
                     all_links.add(clean_url)
-                    print(f"  [>] 抓取详情: {clean_url}")
+                    print(f"  [>] 正在启动详情抓取: {clean_url}")
                     detail_results = self.scrape_detail(clean_url)
+                    
                     if detail_results:
                         page_batch.extend(detail_results)
                     else:
                         page_batch.append({
                             "category": "", "brand": "", "name": it.get("name", "未知"),
-                            "color": "", "spec": "", "material": "", "price": it.get("price", ""),
+                            "color": "", "spec": "", "material": "", "price": "",
                             "moq": "", "wholesale_price": "", "link": clean_url, "supplier": ""
                         })
                     
@@ -195,7 +157,7 @@ class Scraper1688:
                         yield page_batch
                         page_batch = []
                     
-                    time.sleep(random.uniform(5, 10)) # 保持较慢频率,避免被封
+                    time.sleep(random.uniform(15, 25)) 
                     if len(all_links) >= total_count + initial_count: break
             
             if page_batch: yield page_batch
@@ -206,11 +168,13 @@ class Scraper1688:
         return list(all_links)
 
     def scrape_detail(self, url):
-        """ 极其精准的详情页解析:完全同步自 req.py """
+        """ 极其精准的变体拆分逻辑 (款式+价格) """
         try:
             self.driver.get(url)
             time.sleep(random.uniform(5, 10))
             self.check_for_captcha()
+            
+            # 1. 对标 req.py 获取 JS 模型
             model = self.driver.execute_script(
                 "return (window.context && window.context.result && "
                 "window.context.result.global && window.context.result.global.globalData "
@@ -240,56 +204,60 @@ class Scraper1688:
                 "name": (model.get("offerDetail", {}).get("subject", "") if isinstance(model, dict) else "") or self.driver.title.split('-')[0],
                 "spec": get_attr("尺码") or get_attr("规格") or get_attr("型号"),
                 "material": get_attr("材质") or get_attr("面料"),
-                "price": "", 
                 "moq": trade.get("beginAmount", ""),
                 "wholesale_price": range_text,
                 "link": url,
                 "supplier": (model.get("sellerModel", {}).get("companyName", "") if isinstance(model, dict) else ""),
             }
 
-            variant_data_list = []
+            # 2. 核心:拆分 expand-view-list-wrapper 区域 (款式名称与价格)
+            variant_results = []
             try:
-                # 核心需求:从 expand-view-list-wrapper 中提取文字和价格
                 wrappers = self.driver.find_elements(By.CLASS_NAME, "expand-view-list-wrapper")
                 if wrappers:
                     items = wrappers[0].find_elements(By.CSS_SELECTOR, ".expand-view-list-item, [class*='list-item'], .sku-item")
                     for item_el in items:
                         try:
-                            # 款式描述文字 (item-label)
+                            # 提取款式描述文字 (item-label)
                             label = item_el.find_element(By.CLASS_NAME, "item-label").text.strip()
-                            # 逐条价格 (item-price-stock)
+                            # 提取逐条价格 (item-price-stock)
                             price_raw = item_el.find_element(By.CLASS_NAME, "item-price-stock").text.strip()
-                            # 清洗价格,只保留数字
+                            # 清洗价格
                             price_clean = re.sub(r'[^\d.]', '', price_raw)
+                            
                             if label:
-                                variant_data_list.append({"label": label, "price": price_clean})
+                                row = base_data.copy()
+                                row["color"] = label
+                                row["price"] = price_clean
+                                variant_results.append(row)
                         except: continue
             except: pass
 
-            if variant_data_list:
+            if variant_results:
+                return variant_results
+
+            # 3. 兜底:如果 DOM 探测失败,尝试从 JS 模型提取 SKU
+            sku_props = model.get("skuModel", {}).get("skuProps", []) or model.get("detailData", {}).get("skuProps", []) or []
+            main_prop = next((p for p in sku_props if any(k in p.get("prop", "") for k in ["颜色", "分类", "款式", "花色", "净含量"])), None)
+            if not main_prop and sku_props: main_prop = sku_props[0]
+            
+            if main_prop and main_prop.get("value"):
                 results = []
-                for vd in variant_data_list:
-                    row = base_data.copy()
-                    row["color"] = vd["label"]
-                    row["price"] = vd["price"]
-                    results.append(row)
+                for val in main_prop["value"]:
+                    if val.get("name"):
+                        row = base_data.copy()
+                        row["color"] = val.get("name")
+                        row["price"] = trade.get("minPrice", "")
+                        results.append(row)
                 return results
             
+            base_data["price"] = trade.get("minPrice", "")
             return [base_data]
         except: return None
 
     def _extract_all_methods(self):
-        """ 强化版列表链接提取:收集所有来源并去重 """
-        all_results = []
-        seen_ids = set()
-
-        def add_item(name, link, price=""):
-            cid = self.clean_url(link)
-            if cid and cid not in seen_ids:
-                seen_ids.add(cid)
-                all_results.append({"name": name, "link": cid, "price": price})
-
-        # 1. 内存 JS 变量探测 (深度扫描)
+        """ 强化版:对标 req.py 深度探测 JS 变量提取链接 """
+        results = []
         scripts = [
             "return JSON.stringify(window.data || window.context?.result?.data || window.__INITIAL_DATA__)",
             "return JSON.stringify(window.context?.result?.global?.globalData?.data || null)",
@@ -305,35 +273,25 @@ class Scraper1688:
                         if isinstance(obj, list) and len(obj) > 0 and isinstance(obj[0], dict) and any(k in obj[0] for k in ['offerId', 'title', 'subject']):
                             lists.append(obj)
                         if isinstance(obj, dict):
-                            for k in obj:
-                                lists.extend(find_lists(obj[k]))
+                            for k in obj: lists.extend(find_lists(obj[k]))
                         return lists
-                    
-                    found_lists = find_lists(data)
-                    for product_list in found_lists:
+                    for product_list in find_lists(data):
                         for o in product_list:
                             link = o.get('itemUrl', o.get('url', '')) or str(o.get('offerId', ''))
-                            name = str(o.get('title', o.get('subject', o.get('name', ''))))
-                            price = o.get('price', '')
-                            add_item(name, link, price)
+                            if link: results.append({"name": str(o.get('title', o.get('subject', ''))), "link": link})
+                    if results: return results
             except: continue
-
-        # 2. 最新 DOM 选择器扫描 (补全 JS 没抓到的)
-        selectors = [".sm-offer-item", ".offer-card-item", ".search-offer-item", "[class*='offer-card']", ".offer-item"]
-        for s in selectors:
-            try:
-                elements = self.driver.find_elements(By.CSS_SELECTOR, s)
-                for el in elements:
-                    try:
-                        a_tags = el.find_elements(By.TAG_NAME, "a")
-                        for a in a_tags:
-                            href = a.get_attribute("href")
-                            if href:
-                                add_item(el.text.split('\n')[0][:50], href)
-                    except: continue
-            except: continue
-            
-        return all_results
+        
+        # DOM 选择器保底
+        for s in [".sm-offer-item", ".offer-card-item", "[class*='offer-card']", ".offer-item"]:
+            for el in self.driver.find_elements(By.CSS_SELECTOR, s):
+                try:
+                    a = el.find_element(By.TAG_NAME, "a")
+                    link = a.get_attribute("href")
+                    if link: results.append({"name": el.text.split('\n')[0][:50], "link": link})
+                except: continue
+            if results: break
+        return results
 
     def quit(self):
         try: self.driver.quit()