# 【版本:20260115-终极订正版】 # 针对 Python 3.12+ 移除 distutils 的兼容性补丁 import sys try: import distutils except ImportError: from types import ModuleType d, v = ModuleType("distutils"), ModuleType("distutils.version") d.version = v sys.modules.update({"distutils": d, "distutils.version": v}) class LooseVersion: def __init__(self, v): self.v = v def __lt__(self, o): return True def __str__(self): return str(self.v) v.LooseVersion = LooseVersion import time, random, re, os, subprocess, urllib.parse, json, traceback, socket from selenium import webdriver import undetected_chromedriver as uc from selenium.webdriver.common.by import By from selenium.webdriver.common.action_chains import ActionChains from selenium_stealth import stealth class Scraper1688: def __init__(self, headless=True, status_callback=None, log_callback=None): self.headless = headless self.status_callback = status_callback self.log_callback = log_callback self.user_data_path = os.path.abspath(os.path.join(os.getcwd(), "chrome_stable_profile")) self.driver = None self._cleanup() self._init_chrome(headless) if self.driver: stealth(self.driver, languages=["zh-CN", "zh"], vendor="Google Inc.", platform="Win32", fix_hairline=True) def _find_chrome(self): """ 强力锁定 Chrome 安装路径 """ import winreg reg_paths = [ (winreg.HKEY_LOCAL_MACHINE, r"SOFTWARE\Microsoft\Windows\CurrentVersion\App Paths\chrome.exe"), (winreg.HKEY_CURRENT_USER, r"SOFTWARE\Microsoft\Windows\CurrentVersion\App Paths\chrome.exe") ] for hkey, subkey in reg_paths: try: with winreg.OpenKey(hkey, subkey) as key: path, _ = winreg.QueryValueEx(key, "") if os.path.exists(path): return path except: continue return None def _cleanup(self): if os.name == 'nt': for proc in ['chrome.exe', 'chromedriver.exe']: try: subprocess.call(['taskkill', '/F', '/IM', proc, '/T'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) except: pass if os.path.exists(self.user_data_path): for root, _, files in os.walk(self.user_data_path): for f in files: if "lock" in f.lower() or f == "SingletonLock": try: os.remove(os.path.join(root, f)) except: pass def _init_chrome(self, headless): chrome_path = self._find_chrome() def create_options(): opts = uc.ChromeOptions() opts.add_argument(f'user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36') opts.add_argument(f"--user-data-dir={self.user_data_path}") if headless: opts.add_argument('--headless=new') opts.add_argument('--disable-blink-features=AutomationControlled') opts.add_argument("--window-size=1920,1080") opts.add_argument("--no-sandbox") opts.add_argument("--disable-dev-shm-usage") opts.add_argument("--remote-allow-origins=*") return opts try: self.driver = uc.Chrome(options=create_options(), headless=headless, browser_executable_path=chrome_path, use_subprocess=True) except Exception as e: self.driver = uc.Chrome(options=create_options(), headless=headless, use_subprocess=True) def clean_url(self, url): """ 【关键订正】极其简化的 ID 提取逻辑,只要是商品就必须进入详情页 """ if not url: return "" # 强制转换为字符串并处理 url_str = str(url) if url_str.startswith("//"): url_str = "https:" + url_str # 只要能匹配到连续的 9-15 位数字(1688 商品 ID 特征),就重组 id_match = re.search(r'(\d{9,15})', url_str) if id_match: standard_url = f"https://detail.1688.com/offer/{id_match.group(1)}.html" return standard_url return "" def check_for_captcha(self): def is_blocked(): try: url, src = self.driver.current_url.lower(), self.driver.page_source.lower() sliders = self.driver.find_elements(By.ID, "nc_1_n1z") return (len(sliders) > 0 and sliders[0].is_displayed()) or "login.1688.com" in url or "punish" in url or "哎哟喂" in src except: return False if is_blocked(): msg = "请登录验证" if self.status_callback: self.status_callback(True, msg) while is_blocked(): time.sleep(2) if self.status_callback: self.status_callback(False, "验证通过") time.sleep(3) return True def search_products_yield(self, keyword, total_count=200, existing_links=None): gbk_keyword = urllib.parse.quote(keyword, encoding='gbk') base_url = f"https://s.1688.com/selloffer/offer_search.htm?keywords={gbk_keyword}&n=y&netType=1%2C11%2C16" self.driver.get("https://www.1688.com") self.check_for_captcha() all_links = existing_links if existing_links is not None else set() page, initial_count = 1, len(all_links) while len(all_links) < total_count + initial_count: print(f"[*] 正在处理列表页: 第 {page} 页...") self.driver.get(f"{base_url}&beginPage={page}&page={page}") self.check_for_captcha() for i in range(1, 5): self.driver.execute_script(f"window.scrollTo(0, document.body.scrollHeight * {i/4});") time.sleep(1.2) # 获取本页链接 (完全对标 req.py 变量探测) page_results = self._extract_all_methods() print(f" [+] 本页发现 {len(page_results)} 个原始条目") page_batch = [] for it in page_results: raw_link = it.get("link") clean_url = self.clean_url(raw_link) if not clean_url: continue if clean_url in all_links: print(f" [-] 跳过已存在商品: {clean_url}") continue all_links.add(clean_url) # 【强制日志】只要进入这里,就一定会打印并执行详情抓取 print(f" [>] 正在执行详情抓取流程: {clean_url}") detail_results = self.scrape_detail(clean_url) if detail_results: page_batch.extend(detail_results) else: # 即使详情失败也记录基本信息,防止死循环 page_batch.append({ "category": "", "brand": "", "name": it.get("name", "未知"), "color": "", "spec": "", "material": "", "price": "", "moq": "", "wholesale_price": "", "link": clean_url, "supplier": "" }) if len(page_batch) >= 10: yield page_batch page_batch = [] time.sleep(random.uniform(15, 25)) if len(all_links) >= total_count + initial_count: break if page_batch: yield page_batch page += 1 if page % 3 == 0: self.driver.get("https://www.1688.com") time.sleep(random.randint(10, 20)) return list(all_links) def scrape_detail(self, url): """ 精准解析:完全同步自 req.py 的模型获取逻辑 """ try: self.driver.get(url) time.sleep(random.uniform(5, 8)) self.check_for_captcha() # 执行 JS 获取核心模型 (完全对标 req.py) model = self.driver.execute_script( "return (window.context && window.context.result && " "window.context.result.global && window.context.result.global.globalData " "&& window.context.result.global.globalData.model) || " "window.__INITIAL_DATA__ || window.iDetailData || window.iDetailConfig || null;" ) if not model: return None def get_attr(name): try: attrs = model.get("offerDetail", {}).get("featureAttributes", []) for item in attrs: if name in item.get("name", ""): return item.get("value", "") attrs = model.get("detailData", {}).get("attributes", []) for item in attrs: if name in item.get("attributeName", ""): return item.get("value", "") except: pass return "" trade = model.get("tradeModel", {}) if isinstance(model, dict) else {} range_text = " / ".join([f"{r.get('beginAmount')}起 ¥{r.get('price') or r.get('discountPrice')}" for r in (trade.get("disPriceRanges") or trade.get("currentPrices") or [])]) base_data = { "category": (model.get("offerDetail", {}).get("leafCategoryName", "") if isinstance(model, dict) else "") or self.driver.find_element(By.CSS_SELECTOR, "div[class*=breadcrumb] a:last-child").text.strip(), "brand": get_attr("品牌"), "name": (model.get("offerDetail", {}).get("subject", "") if isinstance(model, dict) else "") or self.driver.title.split('-')[0], "spec": get_attr("尺码") or get_attr("规格") or get_attr("型号"), "material": get_attr("材质") or get_attr("面料"), "price": "", "moq": trade.get("beginAmount", ""), "wholesale_price": range_text, "link": url, "supplier": (model.get("sellerModel", {}).get("companyName", "") if isinstance(model, dict) else ""), } variant_data_list = [] try: # 方案 A: 优先使用 expand-view-list-wrapper 获取款式和价格 wrappers = self.driver.find_elements(By.CLASS_NAME, "expand-view-list-wrapper") if wrappers: items = wrappers[0].find_elements(By.CSS_SELECTOR, ".expand-view-list-item, [class*='list-item'], .sku-item") for item_el in items: try: label = item_el.find_element(By.CLASS_NAME, "item-label").text.strip() price = item_el.find_element(By.CLASS_NAME, "item-price-stock").text.strip() if label: variant_data_list.append({"label": label, "price": re.sub(r'[^\d.]', '', price)}) except: continue except: pass if variant_data_list: results = [] for vd in variant_data_list: row = base_data.copy(); row["color"] = vd["label"]; row["price"] = vd["price"]; results.append(row) return results return [base_data] except: return None def _extract_all_methods(self): """ 强化版:全力探测 1688 列表页数据 (对标 req.py) """ results = [] # 1. 深度内存变量扫描 scripts = [ "return JSON.stringify(window.data || window.context?.result?.data || window.__INITIAL_DATA__)", "return JSON.stringify(window.context?.result?.global?.globalData?.data || null)" ] for s in scripts: try: res = self.driver.execute_script(s) if res and res != "null": data = json.loads(res) def find_list(obj): if isinstance(obj, list) and len(obj) > 0 and isinstance(obj[0], dict) and any(k in obj[0] for k in ['offerId', 'title', 'subject']): return obj if isinstance(obj, dict): for k in obj: f = find_list(obj[k]) if f: return f return None for o in (find_list(data) or []): link = o.get('itemUrl', o.get('url', '')) or str(o.get('offerId', '')) if link: results.append({"name": str(o.get('title', o.get('subject', ''))), "link": link}) if results: return results except: continue # 2. 暴力 DOM 选择器保底 for s in [".sm-offer-item", ".offer-card-item", ".pc-search-offer-item", "[class*='offer-card']", ".offer-item"]: for el in self.driver.find_elements(By.CSS_SELECTOR, s): try: a = el.find_element(By.TAG_NAME, "a") link = a.get_attribute("href") if link and "1688.com" in link: results.append({"name": el.text.split('\n')[0][:50], "link": link}) except: continue if results: break return results def quit(self): try: self.driver.quit() except: pass