Browse Source

1688爬虫

LuTong 3 months ago
commit
aadae472ab

+ 57 - 0
src/excel_handler.py

@@ -0,0 +1,57 @@
+import pandas as pd
+from openpyxl import load_workbook
+import os
+
+def append_to_template(products, output_path):
+    """
+    将产品数据追加写入到指定的 Excel 文件中。
+    如果文件不存在,则以 '【进价】产品信息空表.xlsx' 为模板创建。
+    """
+    # 1. 确定模板路径 (只找这个特定文件名)
+    base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
+    template_path = os.path.join(base_dir, 'templates', '【进价】产品信息空表.xlsx')
+    
+    if not os.path.exists(template_path):
+        raise FileNotFoundError(f"未找到核心模板文件: {template_path}")
+
+    # 2. 如果目标输出文件已存在,则加载它;否则加载模板
+    if os.path.exists(output_path):
+        wb = load_workbook(output_path)
+    else:
+        # 确保输出目录存在
+        os.makedirs(os.path.dirname(output_path), exist_ok=True)
+        wb = load_workbook(template_path)
+    
+    ws = wb.active
+    
+    # 3. 寻找起始行 (跳过前两行标题和表头,从有数据的最后一行之后开始)
+    # 假设 A 列(编码) 是必填的,用来判断行数
+    last_row = 2
+    for r in range(3, ws.max_row + 2):
+        if ws.cell(row=r, column=1).value is None:
+            last_row = r - 1
+            break
+    
+    start_row = last_row + 1
+    
+    # 4. 写入本批次数据
+    for i, product in enumerate(products):
+        row = start_row + i
+        # 编码按照总行数自增
+        ws.cell(row=row, column=1, value=row - 2) 
+        ws.cell(row=row, column=2, value=product.get('category', ''))
+        ws.cell(row=row, column=3, value=product.get('brand', ''))
+        ws.cell(row=row, column=4, value=product.get('name', ''))
+        ws.cell(row=row, column=5, value=product.get('image', ''))
+        ws.cell(row=row, column=6, value=product.get('color', ''))
+        ws.cell(row=row, column=7, value=product.get('spec', ''))
+        ws.cell(row=row, column=8, value=product.get('material', ''))
+        ws.cell(row=row, column=9, value=product.get('price', ''))
+        ws.cell(row=row, column=10, value=product.get('moq', ''))
+        ws.cell(row=row, column=11, value=product.get('wholesale_price', ''))
+        ws.cell(row=row, column=12, value=product.get('desc', ''))
+        ws.cell(row=row, column=13, value=product.get('link', '')) # 已清洗的干净链接
+        ws.cell(row=row, column=14, value=product.get('supplier', ''))
+
+    wb.save(output_path)
+    print(f"[*] 批次数据({len(products)}条)已写入: {output_path}")

+ 208 - 0
src/gui.py

@@ -0,0 +1,208 @@
+import sys
+import os
+import time
+import traceback
+import pandas as pd
+from PyQt6.QtWidgets import (QApplication, QMainWindow, QWidget, QVBoxLayout, 
+                             QHBoxLayout, QLineEdit, QPushButton, QTextEdit, 
+                             QLabel, QFileDialog, QProgressBar, QTreeView, QSplitter, QCheckBox)
+from PyQt6.QtCore import QThread, pyqtSignal, Qt
+from PyQt6.QtGui import QStandardItemModel, QStandardItem
+
+from src.scraper import Scraper1688
+from src.excel_handler import append_to_template
+
+class ScraperThread(QThread):
+    progress = pyqtSignal(int)
+    log = pyqtSignal(str)
+    # finished 信号改为只传错误信息和 scraper 对象,数据已实时写入
+    finished = pyqtSignal(str, object)
+
+    def __init__(self, keyword, output_path, headless=True):
+        super().__init__()
+        self.keyword = keyword
+        self.output_path = output_path
+        self.headless = headless
+
+    def run(self):
+        scraper = None
+        try:
+            self.log.emit(f"<b>[*] 任务启动: {self.keyword}</b>")
+            scraper = Scraper1688(headless=self.headless)
+            
+            # 使用流式生成器抓取
+            total_target = 200
+            collected_count = 0
+            
+            for page_results in scraper.search_products_yield(self.keyword, total_count=total_target):
+                # 实时写入 Excel
+                append_to_template(page_results, self.output_path)
+                
+                collected_count += len(page_results)
+                self.log.emit(f"[+] 批次写入成功: {len(page_results)} 条,当前总计: {collected_count}")
+                
+                prog = int((collected_count / total_target) * 100)
+                self.progress.emit(prog)
+            
+            self.log.emit(f"<b>[完成] 任务结束,共抓取 {collected_count} 条数据。</b>")
+            self.finished.emit("", scraper)
+        except Exception as e:
+            err = traceback.format_exc()
+            print(err)
+            self.log.emit(f"<font color='red'>[错误] {str(e)}</font>")
+            self.finished.emit(err, scraper)
+
+class MainWindow(QMainWindow):
+    def __init__(self):
+        super().__init__()
+        self.selected_category_1 = ""
+        self.selected_category_2 = ""
+        self.output_base_path = ""
+        self.active_scraper = None
+        self.initUI()
+        self.load_default_categories()
+
+    def initUI(self):
+        self.setWindowTitle("1688 产品信息实时抓取工具 v3.0")
+        self.setGeometry(100, 100, 1100, 750)
+        central_widget = QWidget()
+        self.setCentralWidget(central_widget)
+        main_layout = QHBoxLayout(central_widget)
+
+        # 左侧类目树
+        left_widget = QWidget()
+        left_layout = QVBoxLayout(left_widget)
+        self.load_category_btn = QPushButton("选择类目文件")
+        self.load_category_btn.clicked.connect(self.select_category_file)
+        self.category_tree = QTreeView()
+        self.category_tree.setHeaderHidden(True)
+        self.category_model = QStandardItemModel()
+        self.category_tree.setModel(self.category_model)
+        self.category_tree.clicked.connect(self.on_category_clicked)
+        left_layout.addWidget(QLabel("<b>商品类目树</b>"))
+        left_layout.addWidget(self.load_category_btn)
+        left_layout.addWidget(self.category_tree)
+
+        # 右侧操作区
+        right_widget = QWidget()
+        right_layout = QVBoxLayout(right_widget)
+
+        opt_layout = QHBoxLayout()
+        self.show_browser_cb = QCheckBox("显示浏览器界面 (手动过验证时勾选)")
+        self.show_browser_cb.setChecked(True)
+        opt_layout.addWidget(self.show_browser_cb)
+        right_layout.addLayout(opt_layout)
+
+        path_layout = QHBoxLayout()
+        self.path_display = QLabel("未选择输出路径")
+        self.path_display.setStyleSheet("color: gray; border: 1px solid #ccc; padding: 5px;")
+        self.select_path_btn = QPushButton("选择输出目录")
+        self.select_path_btn.clicked.connect(self.select_output_path)
+        path_layout.addWidget(QLabel("<font color='red'>*</font>输出路径:"))
+        path_layout.addWidget(self.path_display, 1)
+        path_layout.addWidget(self.select_path_btn)
+        right_layout.addLayout(path_layout)
+
+        action_layout = QHBoxLayout()
+        self.category_display = QLabel("请选择二级类目")
+        self.search_btn = QPushButton("开始抓取")
+        self.search_btn.clicked.connect(self.start_scraping)
+        self.search_btn.setMinimumHeight(50)
+        self.search_btn.setStyleSheet("QPushButton { background-color: #0078d4; color: white; font-weight: bold; font-size: 16px; }")
+        action_layout.addWidget(QLabel("<font color='red'>*</font>检索类目:"))
+        action_layout.addWidget(self.category_display, 1)
+        action_layout.addWidget(self.search_btn)
+        right_layout.addLayout(action_layout)
+
+        self.pbar = QProgressBar()
+        self.log_output = QTextEdit()
+        self.log_output.setReadOnly(True)
+        right_layout.addWidget(QLabel("<b>任务日志:</b>"))
+        right_layout.addWidget(self.log_output)
+        right_layout.addWidget(self.pbar)
+        self.status_label = QLabel("就绪")
+        right_layout.addWidget(self.status_label)
+
+        splitter = QSplitter(Qt.Orientation.Horizontal)
+        splitter.addWidget(left_widget)
+        splitter.addWidget(right_widget)
+        splitter.setStretchFactor(1, 3)
+        main_layout.addWidget(splitter)
+
+    def load_default_categories(self):
+        p = os.path.join('templates', '商品类目.xlsx')
+        if os.path.exists(p): self.load_categories(p)
+
+    def select_category_file(self):
+        f, _ = QFileDialog.getOpenFileName(self, "选择类目文件", "templates", "Excel (*.xlsx)")
+        if f: self.load_categories(f)
+
+    def load_categories(self, f_path):
+        try:
+            df = pd.read_excel(f_path)
+            c1, c2 = df.columns[0], df.columns[1]
+            df[c1] = df[c1].ffill()
+            self.category_model.clear()
+            cats = {}
+            for _, row in df.iterrows():
+                v1, v2 = str(row[c1]), str(row[c2])
+                if v1 not in cats:
+                    p = QStandardItem(v1); p.setSelectable(False)
+                    self.category_model.appendRow(p); cats[v1] = p
+                child = QStandardItem(v2); child.setData(v1, Qt.ItemDataRole.UserRole)
+                cats[v1].appendRow(child)
+            self.category_tree.expandAll()
+        except: pass
+
+    def on_category_clicked(self, index):
+        item = self.category_model.itemFromIndex(index)
+        if item.isSelectable():
+            self.selected_category_2 = item.text()
+            self.selected_category_1 = item.data(Qt.ItemDataRole.UserRole)
+            self.update_displays()
+
+    def update_displays(self):
+        if self.selected_category_1 and self.selected_category_2:
+            self.category_display.setText(f"{self.selected_category_1} / <font color='#0078d4'><b>{self.selected_category_2}</b></font>")
+            if self.output_base_path:
+                full_p = os.path.normpath(os.path.join(self.output_base_path, "选品", self.selected_category_1, f"{self.selected_category_2}.xlsx"))
+                self.path_display.setText(full_p)
+
+    def select_output_path(self):
+        p = QFileDialog.getExistingDirectory(self, "选择保存根目录")
+        if p: self.output_base_path = p; self.update_displays()
+
+    def start_scraping(self):
+        if not self.selected_category_2 or not self.output_base_path:
+            self.log_output.append("<font color='red'>[错误] 请选择类目和输出路径</font>")
+            return
+        
+        target_dir = os.path.join(self.output_base_path, "选品", self.selected_category_1)
+        file_path = os.path.join(target_dir, f"{self.selected_category_2}.xlsx")
+        
+        # 启动抓取前清理旧文件,确保从模板重新开始
+        if os.path.exists(file_path):
+            try: os.remove(file_path)
+            except: pass
+
+        self.search_btn.setEnabled(False)
+        self.log_output.clear()
+        self.pbar.setValue(0)
+        headless = not self.show_browser_cb.isChecked()
+        
+        self.thread = ScraperThread(self.selected_category_2, file_path, headless)
+        self.thread.log.connect(self.log_output.append)
+        self.thread.progress.connect(self.pbar.setValue)
+        self.thread.finished.connect(self.on_finished)
+        self.thread.start()
+
+    def on_finished(self, err, scraper):
+        self.search_btn.setEnabled(True)
+        if scraper: self.active_scraper = scraper
+        self.status_label.setText("任务完成" if not err else "异常终止")
+
+if __name__ == "__main__":
+    app = QApplication(sys.argv)
+    win = MainWindow()
+    win.show()
+    sys.exit(app.exec())

+ 17 - 0
src/main.py

@@ -0,0 +1,17 @@
+import sys
+import os
+
+# 确保可以导入 src 模块
+sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
+
+from src.gui import QApplication, MainWindow
+
+def main():
+    app = QApplication(sys.argv)
+    win = MainWindow()
+    win.show()
+    print("[MAIN] 窗口已显示,等待操作...")
+    sys.exit(app.exec())
+
+if __name__ == "__main__":
+    main()

+ 215 - 0
src/scraper.py

@@ -0,0 +1,215 @@
+# 针对 Python 3.12+ 移除 distutils 的兼容性补丁
+import sys
+try:
+    import distutils
+except ImportError:
+    from types import ModuleType
+    d, v = ModuleType("distutils"), ModuleType("distutils.version")
+    d.version = v
+    sys.modules.update({"distutils": d, "distutils.version": v})
+    class LooseVersion:
+        def __init__(self, v): self.v = v
+        def __lt__(self, o): return True
+        def __str__(self): return str(self.v)
+    v.LooseVersion = LooseVersion
+
+import time, random, re, os, subprocess, urllib.parse, json, traceback
+import undetected_chromedriver as uc 
+from selenium.webdriver.common.by import By
+from selenium.webdriver.common.action_chains import ActionChains
+from selenium_stealth import stealth
+
+class Scraper1688:
+    def __init__(self, headless=True):
+        self.headless = headless
+        self.user_data_path = os.path.abspath(os.path.join(os.getcwd(), "1688_user_data"))
+        self._cleanup()
+        options = uc.ChromeOptions()
+        options.add_argument(f'user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36')
+        options.add_argument(f"--user-data-dir={self.user_data_path}")
+        if headless: options.add_argument('--headless=new')
+        options.add_argument('--disable-blink-features=AutomationControlled')
+        options.add_argument("--window-size=1920,1080")
+        try:
+            self.driver = uc.Chrome(options=options, headless=headless, version_main=137)
+        except:
+            self.driver = uc.Chrome(options=options, headless=headless)
+        stealth(self.driver, languages=["zh-CN", "zh"], vendor="Google Inc.", platform="Win32", fix_hairline=True)
+
+    def _cleanup(self):
+        if os.name == 'nt': subprocess.call(['taskkill', '/F', '/IM', 'chrome.exe', '/T'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
+        if os.path.exists(self.user_data_path):
+            for root, _, files in os.walk(self.user_data_path):
+                for f in files:
+                    if "lock" in f.lower() or f == "SingletonLock":
+                        try: os.remove(os.path.join(root, f))
+                        except: pass
+
+    def clean_url(self, url):
+        """极其鲁棒的 1688 URL 清洗逻辑"""
+        if not url: return ""
+        if url.startswith("//"): url = "https:" + url
+        
+        # 1. 尝试从路径中匹配 offer ID (标准 PC 链接)
+        id_match = re.search(r'offer/(\d+)\.html', url)
+        if id_match: return f"https://detail.1688.com/offer/{id_match.group(1)}.html"
+        
+        # 2. 尝试从查询参数中提取 offerId (移动端或广告链接)
+        parsed = urllib.parse.urlparse(url)
+        params = urllib.parse.parse_qs(parsed.query)
+        oid = params.get('offerId') or params.get('id')
+        if oid: return f"https://detail.1688.com/offer/{oid[0]}.html"
+        
+        # 3. 针对某些特殊加密链接,尝试寻找 data-aplus-report 或类似字符串中的 ID
+        id_match_report = re.search(r'object_id@(\d+)', url)
+        if id_match_report: return f"https://detail.1688.com/offer/{id_match_report.group(1)}.html"
+        
+        return url
+
+    def check_for_captcha(self):
+        def is_blocked():
+            try:
+                src, url, title = self.driver.page_source.lower(), self.driver.current_url.lower(), self.driver.title.lower()
+                sliders = self.driver.find_elements(By.ID, "nc_1_n1z")
+                is_slider = len(sliders) > 0 and sliders[0].is_displayed()
+                return is_slider or "punish" in url or "哎哟喂" in src or "验证码" in title
+            except: return False
+        
+        if is_blocked():
+            print("\n[!] 触发拦截,请手动完成验证...")
+            while is_blocked(): time.sleep(2)
+            print("[+] 验证通过!")
+            time.sleep(3)
+        return True
+
+    def search_products_yield(self, keyword, total_count=200):
+        gbk_keyword = urllib.parse.quote(keyword, encoding='gbk')
+        base_url = f"https://s.1688.com/selloffer/offer_search.htm?keywords={gbk_keyword}&n=y&netType=1%2C11%2C16"
+        
+        all_links = set()
+        page = 1
+        consecutive_empty_pages = 0
+        
+        while len(all_links) < total_count and consecutive_empty_pages < 3:
+            print(f"[*] 正在抓取第 {page} 页...")
+            target_url = f"{base_url}&beginPage={page}&page={page}"
+            self.driver.get(target_url)
+            
+            # 关键:首屏强制等待渲染
+            time.sleep(10)
+            self.check_for_captcha()
+
+            # 深度滚动
+            for i in range(1, 6):
+                self.driver.execute_script(f"window.scrollTo(0, document.body.scrollHeight * {i/5});")
+                time.sleep(1.2)
+
+            page_results = self._extract_all_methods()
+            
+            new_batch = []
+            for it in page_results:
+                it["link"] = self.clean_url(it["link"])
+                if it["link"] and it["link"] not in all_links:
+                    all_links.add(it["link"])
+                    new_batch.append(it)
+            
+            if new_batch:
+                consecutive_empty_pages = 0
+                yield new_batch
+            else:
+                print(f"[-] 第 {page} 页未发现新数据,尝试刷新重试...")
+                self.driver.refresh()
+                time.sleep(8)
+                retry_results = self._extract_all_methods()
+                new_retry = []
+                for it in retry_results:
+                    it["link"] = self.clean_url(it["link"])
+                    if it["link"] and it["link"] not in all_links:
+                        all_links.add(it["link"]); new_retry.append(it)
+                
+                if new_retry:
+                    yield new_retry
+                else:
+                    consecutive_empty_pages += 1
+                    print(f"[!] 连续 {consecutive_empty_pages} 页无数据")
+
+            page += 1
+            if len(all_links) < total_count:
+                print(f"[*] 累计抓取: {len(all_links)} 条,准备翻页...")
+                time.sleep(5)
+
+        return list(all_links)
+
+    def _extract_all_methods(self):
+        """三位一体提取法:JSON + DOM + 深度搜索"""
+        results = []
+        
+        # 1. JSON 提取 (window.data 或 window.__INITIAL_DATA__)
+        try:
+            res = self.driver.execute_script("return JSON.stringify(window.data || window.__INITIAL_DATA__)")
+            if res:
+                data = json.loads(res)
+                def find_list(obj):
+                    if isinstance(obj, list) and len(obj) > 0:
+                        if 'title' in obj[0] or 'offerId' in obj[0]: return obj
+                    if isinstance(obj, dict):
+                        for k in obj:
+                            found = find_list(obj[k])
+                            if found: return found
+                    return None
+                raw = find_list(data) or []
+                for o in raw:
+                    title = str(o.get('title', o.get('name', ''))).replace('<em>','').replace('</em>','')
+                    link = o.get('itemUrl', o.get('url', ''))
+                    price = o.get('priceInfo', {}).get('price', o.get('price', '面议'))
+                    if link: results.append({"name": title, "link": link, "price": price})
+        except: pass
+
+        # 2. 增强版 DOM 扫描
+        if not results:
+            # 包含最新的选择器
+            selectors = [".search-offer-item", "[class*='offer-card']", ".offer-item", ".major-offer"]
+            for s in selectors:
+                cards = self.driver.find_elements(By.CSS_SELECTOR, s)
+                if len(cards) > 3:
+                    for el in cards:
+                        try:
+                            # 1. 链接提取:自身或子孙节点
+                            link = ""
+                            if el.tag_name == 'a':
+                                link = el.get_attribute("href")
+                            else:
+                                a_tags = el.find_elements(By.TAG_NAME, "a")
+                                for a in a_tags:
+                                    h = a.get_attribute("href")
+                                    if h and ("offer" in h or "item" in h or "ci_bb" in h):
+                                        link = h; break
+                            
+                            # 2. ID 补丁
+                            if not link or "1688.com" not in link:
+                                oid = el.get_attribute("data-offer-id") or el.get_attribute("data-id")
+                                if oid: link = f"https://detail.1688.com/offer/{oid}.html"
+                            
+                            if link:
+                                # 3. 标题和价格提取
+                                title = el.text.split('\n')[0][:50]
+                                price = "面议"
+                                try:
+                                    price_el = el.find_element(By.CSS_SELECTOR, ".text-main, [class*='price'], .amount")
+                                    price = price_el.text.strip().replace("¥", "")
+                                except: pass
+                                results.append({"name": title, "link": link, "price": price})
+                        except: continue
+                    if results: break # 成功一次就不再尝试其他选择器
+
+        # 3. 最后的保底:正则源码提取 (极其暴力)
+        if not results:
+            ids = re.findall(r'data-offer-id="(\d+)"', self.driver.page_source)
+            for oid in set(ids):
+                results.append({"name": f"1688商品-{oid}", "link": f"https://detail.1688.com/offer/{oid}.html", "price": "面议"})
+                
+        return results
+
+    def quit(self):
+        try: self.driver.quit()
+        except: pass

BIN
templates/【进价】产品信息空表.xlsx


BIN
templates/【进价】办公用品产品信息01.02.xlsx


BIN
templates/商品类目.xlsx