# 针对 Python 3.12+ 移除 distutils 的兼容性补丁 import sys try: import distutils except ImportError: from types import ModuleType d, v = ModuleType("distutils"), ModuleType("distutils.version") d.version = v sys.modules.update({"distutils": d, "distutils.version": v}) class LooseVersion: def __init__(self, v): self.v = v def __lt__(self, o): return True def __str__(self): return str(self.v) v.LooseVersion = LooseVersion import time, random, re, os, subprocess, urllib.parse, json, traceback import undetected_chromedriver as uc from selenium.webdriver.common.by import By from selenium.webdriver.common.action_chains import ActionChains from selenium_stealth import stealth class Scraper1688: def __init__(self, headless=True, status_callback=None): self.headless = headless self.status_callback = status_callback # 用于回调 GUI 状态 self.user_data_path = os.path.abspath(os.path.join(os.getcwd(), "1688_user_data")) self._cleanup() def create_options(): options = uc.ChromeOptions() options.add_argument(f'user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36') options.add_argument(f"--user-data-dir={self.user_data_path}") if headless: options.add_argument('--headless=new') options.add_argument('--disable-blink-features=AutomationControlled') options.add_argument("--window-size=1920,1080") return options try: # 关键修复:每次启动都使用 create_options() 产生的全新对象 self.driver = uc.Chrome(options=create_options(), headless=headless, version_main=131) except: # 关键修复:这里也要用全新的 options 对象 self.driver = uc.Chrome(options=create_options(), headless=headless) stealth(self.driver, languages=["zh-CN", "zh"], vendor="Google Inc.", platform="Win32", fix_hairline=True) def _cleanup(self): if os.name == 'nt': subprocess.call(['taskkill', '/F', '/IM', 'chrome.exe', '/T'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) if os.path.exists(self.user_data_path): for root, _, files in os.walk(self.user_data_path): for f in files: if "lock" in f.lower() or f == "SingletonLock": try: os.remove(os.path.join(root, f)) except: pass def clean_url(self, url): """极其鲁棒的 1688 URL 清洗逻辑""" if not url: return "" if url.startswith("//"): url = "https:" + url # 1. 尝试从路径中匹配 offer ID (标准 PC 链接) id_match = re.search(r'offer/(\d+)\.html', url) if id_match: return f"https://detail.1688.com/offer/{id_match.group(1)}.html" # 2. 尝试从查询参数中提取 offerId (移动端或广告链接) parsed = urllib.parse.urlparse(url) params = urllib.parse.parse_qs(parsed.query) oid = params.get('offerId') or params.get('id') if oid: return f"https://detail.1688.com/offer/{oid[0]}.html" # 3. 针对某些特殊加密链接,尝试寻找 data-aplus-report 或类似字符串中的 ID id_match_report = re.search(r'object_id@(\d+)', url) if id_match_report: return f"https://detail.1688.com/offer/{id_match_report.group(1)}.html" return url def check_for_captcha(self): """ 核心监控:检测登录、滑块验证、访问受限等需要人工干预的状态 """ def is_blocked(): try: url = self.driver.current_url.lower() src = self.driver.page_source.lower() title = self.driver.title.lower() # 1. 检测滑块验证码 sliders = self.driver.find_elements(By.ID, "nc_1_n1z") is_slider = len(sliders) > 0 and sliders[0].is_displayed() # 2. 检测登录页面 (如果跳转到了登录页) is_login = "login.1688.com" in url or "passport.1688.com" in url # 3. 检测惩罚/验证提示页 is_punish = "punish" in url or "哎哟喂" in src or "验证码" in title or "验证提示" in title # 4. 检测是否被登出 (如果页面包含登录按钮且当前是详情/搜索页) # 这部分可以根据实际情况增强,目前主要靠 URL 判定 return is_slider or is_login or is_punish except: return False if is_blocked(): msg = "请登录验证" print(f"\n[!] {msg}...") if self.status_callback: self.status_callback(True, msg) # 持续监控,直到上述所有拦截状态消失 while is_blocked(): time.sleep(2) if self.status_callback: self.status_callback(False, "验证通过") print("\n[OK] 监测到人工干预已完成,3秒后恢复自动抓取...") time.sleep(3) return True # def search_products_yield(self, keyword, total_count=200): def search_products_yield(self, keyword, total_count=200, existing_links=None): gbk_keyword = urllib.parse.quote(keyword, encoding='gbk') base_url = f"https://s.1688.com/selloffer/offer_search.htm?keywords={gbk_keyword}&n=y&netType=1%2C11%2C16" # 初始检查:确保在开始抓取前没被拦截(比如没登录) self.driver.get("https://www.1688.com") self.check_for_captcha() all_links = existing_links if existing_links is not None else set() page = 1 consecutive_empty_pages = 0 # 记录初始抓取的链接数,用于计算进度 initial_count = len(all_links) while len(all_links) < total_count + initial_count and consecutive_empty_pages < 3: print(f"[*] 正在搜索列表页: 第 {page} 页...") target_url = f"{base_url}&beginPage={page}&page={page}" self.driver.get(target_url) # 关键:首屏强制等待渲染 time.sleep(5) self.check_for_captcha() # 深度滚动确保加载 for i in range(1, 4): self.driver.execute_script(f"window.scrollTo(0, document.body.scrollHeight * {i/3});") time.sleep(1) page_results = self._extract_all_methods() page_batch = [] for it in page_results: clean_url = self.clean_url(it["link"]) if clean_url and clean_url not in all_links: all_links.add(clean_url) # 核心改进:进入详情页抓取精准数据 print(f" [>] 抓取详情: {clean_url}") detail_results = self.scrape_detail(clean_url) if detail_results: # detail_results 现在是一个列表 (包含多个颜色分类) page_batch.extend(detail_results) else: # 兜底 it["link"] = clean_url page_batch.append({ "category": "", "brand": "", "name": it["name"], "color": "", "spec": "", "material": "", "price": it["price"], "moq": "", "wholesale_price": "", "link": clean_url, "supplier": "" }) # 每满 10 条 yield 一次 if len(page_batch) >= 10: yield page_batch page_batch = [] # 详情页抓取后的随机等待 time.sleep(random.uniform(2, 4)) if len(all_links) >= total_count + initial_count: break # 每页结束,将不足 10 条的余数 yield 出去 if page_batch: yield page_batch page_batch = [] page += 1 if len(all_links) < total_count + initial_count: print(f"[*] 累计已处理新链接: {len(all_links) - initial_count} 条,准备翻下一页...") time.sleep(3) return list(all_links) def scrape_detail(self, url): """ 根据 /refe/req.py 订正的详情页抓取逻辑 获取极其精准的商品属性和价格数据,并支持将“颜色分类”拆分为多行 """ try: self.driver.get(url) time.sleep(2) self.check_for_captcha() # 执行 JS 获取 1688 详情页背后的完整数据模型 model = self.driver.execute_script( "return (window.context && window.context.result && " "window.context.result.global && window.context.result.global.globalData " "&& window.context.result.global.globalData.model) || " "window.__INITIAL_DATA__ || window.iDetailData || window.iDetailConfig || null;" ) if not model: return None def get_attr(name): """从 featureAttributes 里取指定属性值""" try: # 现代版 attrs = model.get("offerDetail", {}).get("featureAttributes", []) for item in attrs: if name in item.get("name", ""): return item.get("value", "") # 老版兼容 attrs = model.get("detailData", {}).get("attributes", []) for item in attrs: if name in item.get("attributeName", ""): return item.get("value", "") except: pass return "" def safe_text(by, sel): try: return self.driver.find_element(by, sel).text.strip() except: return "" # 价格处理逻辑 trade = model.get("tradeModel", {}) if isinstance(model, dict) else {} price_min = trade.get("minPrice", "") or "" price_max = trade.get("maxPrice", "") or "" # 老版价格补丁 if not price_min: try: price_min = model["sku"]["priceRange"][0][1] except: pass begin_amount = trade.get("beginAmount", "") # 批发价区间 ranges = trade.get("disPriceRanges") or trade.get("currentPrices") or \ trade.get("offerPriceModel", {}).get("currentPrices", []) range_text = " / ".join( [f"{r.get('beginAmount')}起 ¥{r.get('price') or r.get('discountPrice')}" for r in ranges] ) if ranges else "" # 基础数据模板 base_data = { "category": (model.get("offerDetail", {}).get("leafCategoryName", "") if isinstance(model, dict) else "") or safe_text(By.CSS_SELECTOR, "div[class*=breadcrumb] a:last-child"), "brand": get_attr("品牌"), "name": (model.get("offerDetail", {}).get("subject", "") if isinstance(model, dict) else "") or safe_text(By.CSS_SELECTOR, "h1.d-title") or safe_text(By.CSS_SELECTOR, "h1[class*=title]"), "color": "", # 待填充 "spec": get_attr("尺码") or get_attr("规格") or get_attr("型号") or \ safe_text(By.XPATH, "//div[@id='productAttributes']//th[span='尺码' or span='规格']/following-sibling::td[1]//span[@class='field-value']"), "material": get_attr("材质") or get_attr("面料") or \ safe_text(By.XPATH, "//div[@id='productAttributes']//th[span='材质']/following-sibling::td[1]//span[@class='field-value']"), "price": f"{price_min}-{price_max}" if price_min and price_max and price_min != price_max else f"{price_min}" if price_min else "", "moq": begin_amount or safe_text(By.XPATH, "//div[@id='productAttributes']//th[span='起订量' or span='起批量']/following-sibling::td[1]//span[@class='field-value']"), "wholesale_price": range_text, "link": url, "supplier": (model.get("sellerModel", {}).get("companyName", "") if isinstance(model, dict) else "") or safe_text(By.CSS_SELECTOR, "a.company-name") or safe_text(By.CSS_SELECTOR, "div.company-name"), } # --- 核心逻辑:拆分规格/颜色分类 --- sku_props = [] try: # 尝试多种路径获取 SKU 属性 sku_props = model.get("skuModel", {}).get("skuProps", []) or \ model.get("detailData", {}).get("skuProps", []) or \ model.get("sku", {}).get("skuProps", []) except: pass # 智能寻找主维度: # 1. 优先找包含“颜色”、“分类”、“款式”、“花色”的维度 # 2. 如果没有,则取第一个 SKU 维度(例如“净含量”、“规格”等) main_prop = None if sku_props: main_prop = next((p for p in sku_props if any(k in p.get("prop", "") for k in ["颜色", "分类", "款式", "花色"])), None) if not main_prop: main_prop = sku_props[0] if main_prop and main_prop.get("value"): variant_results = [] for val in main_prop["value"]: # 只有当该分类确实有名字时才记录 variant_name = val.get("name") if variant_name: row = base_data.copy() row["color"] = variant_name variant_results.append(row) return variant_results else: # 兜底:如果没有发现规格选择区,则获取单属性颜色 base_data["color"] = get_attr("颜色") or get_attr("颜色分类") or "" return [base_data] except Exception as e: print(f"[!] 详情页抓取异常 ({url}): {e}") return None except Exception as e: print(f"[!] 详情页抓取异常 ({url}): {e}") return None def _extract_all_methods(self): """三位一体提取法:JSON + DOM + 深度搜索""" results = [] # 1. JSON 提取 (window.data 或 window.__INITIAL_DATA__) try: res = self.driver.execute_script("return JSON.stringify(window.data || window.__INITIAL_DATA__)") if res: data = json.loads(res) def find_list(obj): if isinstance(obj, list) and len(obj) > 0: if 'title' in obj[0] or 'offerId' in obj[0]: return obj if isinstance(obj, dict): for k in obj: found = find_list(obj[k]) if found: return found return None raw = find_list(data) or [] for o in raw: title = str(o.get('title', o.get('name', ''))).replace('','').replace('','') link = o.get('itemUrl', o.get('url', '')) price = o.get('priceInfo', {}).get('price', o.get('price', '面议')) if link: results.append({"name": title, "link": link, "price": price}) except: pass # 2. 增强版 DOM 扫描 if not results: # 包含最新的选择器 selectors = [".search-offer-item", "[class*='offer-card']", ".offer-item", ".major-offer"] for s in selectors: cards = self.driver.find_elements(By.CSS_SELECTOR, s) if len(cards) > 3: for el in cards: try: # 1. 链接提取:自身或子孙节点 link = "" if el.tag_name == 'a': link = el.get_attribute("href") else: a_tags = el.find_elements(By.TAG_NAME, "a") for a in a_tags: h = a.get_attribute("href") if h and ("offer" in h or "item" in h or "ci_bb" in h): link = h; break # 2. ID 补丁 if not link or "1688.com" not in link: oid = el.get_attribute("data-offer-id") or el.get_attribute("data-id") if oid: link = f"https://detail.1688.com/offer/{oid}.html" if link: # 3. 标题和价格提取 title = el.text.split('\n')[0][:50] price = "面议" try: price_el = el.find_element(By.CSS_SELECTOR, ".text-main, [class*='price'], .amount") price = price_el.text.strip().replace("¥", "") except: pass results.append({"name": title, "link": link, "price": price}) except: continue if results: break # 成功一次就不再尝试其他选择器 # 3. 最后的保底:正则源码提取 (极其暴力) if not results: ids = re.findall(r'data-offer-id="(\d+)"', self.driver.page_source) for oid in set(ids): results.append({"name": f"1688商品-{oid}", "link": f"https://detail.1688.com/offer/{oid}.html", "price": "面议"}) return results def quit(self): try: self.driver.quit() except: pass