# 针对 Python 3.12+ 移除 distutils 的兼容性补丁 import sys try: import distutils except ImportError: from types import ModuleType d, v = ModuleType("distutils"), ModuleType("distutils.version") d.version = v sys.modules.update({"distutils": d, "distutils.version": v}) class LooseVersion: def __init__(self, v): self.v = v def __lt__(self, o): return True def __str__(self): return str(self.v) v.LooseVersion = LooseVersion import time, random, re, os, subprocess, urllib.parse, json, traceback, socket from selenium import webdriver from selenium.webdriver.edge.options import Options as EdgeOptions from selenium.webdriver.edge.service import Service as EdgeService from selenium.webdriver.chrome.options import Options as ChromeOptions import undetected_chromedriver as uc from selenium.webdriver.common.by import By from selenium.webdriver.common.action_chains import ActionChains from selenium_stealth import stealth class Scraper1688: def __init__(self, headless=True, status_callback=None, log_callback=None): self.headless = headless self.status_callback = status_callback self.log_callback = log_callback # 新增:用于向 GUI 发送普通日志 self.user_data_path = os.path.abspath(os.path.join(os.getcwd(), "1688_user_data")) self.driver = None edge_path = self._find_edge() if edge_path: print(f"[*] 【极致稳定模式】正在启动 Edge 深度伪装环境...") self._cleanup_processes() # 使用固定且持久的 Session 目录,确保长效免登录 edge_user_data = os.path.join(os.getcwd(), "1688_edge_ultimate_session") cmd = [ edge_path, "--remote-debugging-port=9222", f"--user-data-dir={edge_user_data}", "--no-first-run", "--no-default-browser-check", "--disable-blink-features=AutomationControlled" ] if headless: cmd.append("--headless") try: subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) time.sleep(6) opts = EdgeOptions() opts.add_experimental_option("debuggerAddress", "127.0.0.1:9222") self.driver = webdriver.Edge(options=opts) print("[+] Edge 极致稳定环境接管成功!") except Exception as e: print(f"[!] Edge 启动失败: {e}") if not self.driver: self._init_chrome(headless) if self.driver: # 深度擦除自动化指纹 try: self.driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", { "source": """ Object.defineProperty(navigator, 'webdriver', { get: () => undefined }); Object.defineProperty(navigator, 'languages', { get: () => ['zh-CN', 'zh'] }); Object.defineProperty(navigator, 'plugins', { get: () => [1, 2, 3, 4, 5] }); """ }) except: pass def _find_edge(self): import winreg reg_paths = [(winreg.HKEY_LOCAL_MACHINE, r"SOFTWARE\Microsoft\Windows\CurrentVersion\App Paths\msedge.exe"), (winreg.HKEY_CURRENT_USER, r"SOFTWARE\Microsoft\Windows\CurrentVersion\App Paths\msedge.exe")] for hkey, subkey in reg_paths: try: with winreg.OpenKey(hkey, subkey) as key: path, _ = winreg.QueryValueEx(key, "") if os.path.exists(path): return path except: continue return None def _cleanup_processes(self): if os.name == 'nt': for proc in ['msedge.exe', 'msedgedriver.exe', 'chromedriver.exe']: subprocess.call(['taskkill', '/F', '/IM', proc, '/T'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) def _init_chrome(self, headless): def create_options(): opts = uc.ChromeOptions() opts.add_argument(f"--user-data-dir={self.user_data_path}") return opts self.driver = uc.Chrome(options=create_options(), headless=headless) def check_for_captcha(self): def is_blocked(): try: url, src = self.driver.current_url.lower(), self.driver.page_source.lower() sliders = self.driver.find_elements(By.ID, "nc_1_n1z") return (len(sliders) > 0 and sliders[0].is_displayed()) or "login.1688.com" in url or "punish" in url or "哎哟喂" in src except: return False if is_blocked(): msg = "请登录验证" if self.status_callback: self.status_callback(True, msg) while is_blocked(): time.sleep(3) if self.status_callback: self.status_callback(False, "验证通过") cool_msg = "[*] 监测到干预完成,进入 120 秒深度冷却期以重置风控权重..." print(cool_msg) if self.log_callback: self.log_callback(f"{cool_msg}") time.sleep(120) return True def _human_behavior(self, duration=10): """ 高级拟人化行为模拟 """ start_time = time.time() while time.time() - start_time < duration: try: # 1. 随机滚动 scroll_y = random.randint(200, 600) self.driver.execute_script(f"window.scrollBy(0, {scroll_y});") # 2. 随机鼠标晃动 actions = ActionChains(self.driver) actions.move_by_offset(random.randint(-5, 5), random.randint(-5, 5)).perform() time.sleep(random.uniform(1.5, 4.0)) # 3. 概率性往回滚 if random.random() > 0.7: self.driver.execute_script(f"window.scrollBy(0, -{random.randint(100, 300)});") except: break def search_products_yield(self, keyword, total_count=200, existing_links=None): gbk_keyword = urllib.parse.quote(keyword, encoding='gbk') base_url = f"https://s.1688.com/selloffer/offer_search.htm?keywords={gbk_keyword}&n=y&netType=1%2C11%2C16" self.driver.get("https://www.1688.com") time.sleep(random.randint(3, 6)) self.check_for_captcha() all_links = existing_links if existing_links is not None else set() page, initial_count = 1, len(all_links) # 随机设定下一次深度冷却的阈值 (5-12条之间) next_cool_threshold = random.randint(5, 12) while len(all_links) < total_count + initial_count: print(f"[*] 正在模拟搜索: 第 {page} 页...") self.driver.get(f"{base_url}&beginPage={page}&page={page}") self.check_for_captcha() # 列表页模拟“翻找”行为 for _ in range(random.randint(5, 8)): self.driver.execute_script(f"window.scrollBy(0, {random.randint(400, 800)});") time.sleep(random.uniform(1.5, 3.5)) if random.random() > 0.8: self.driver.execute_script("window.scrollBy(0, -300);") page_results = self._extract_all_methods() page_batch = [] for it in page_results: clean_url = self.clean_url(it["link"]) if clean_url and clean_url not in all_links: all_links.add(clean_url) # --- 核心订正:随机深度冷却 --- new_processed = len(all_links) - initial_count if new_processed >= next_cool_threshold: rest = random.randint(120, 300) cool_msg = f"[*] 随机触发深度保护 (已处理{new_processed}条),睡眠 {rest} 秒模拟休息..." print(cool_msg) if self.log_callback: self.log_callback(f"{cool_msg}") time.sleep(rest) next_cool_threshold += random.randint(5, 12) # 设定下一个随机检查点 print(f" [>] 详情仿真采集: {clean_url}") # 访问前大幅随机停顿 time.sleep(random.uniform(5, 12)) detail_results = self.scrape_detail(clean_url) if detail_results: page_batch.extend(detail_results) else: page_batch.append({"link": clean_url, "name": it["name"]}) if len(page_batch) >= 10: yield page_batch page_batch = [] # 详情页之间的大跨度等待 time.sleep(random.uniform(30, 60)) if len(all_links) >= total_count + initial_count: break if page_batch: yield page_batch page += 1 # 每翻 3 页随机回一次 1688 首页,消除路径单一性 if page % 3 == 0: self.driver.get("https://www.1688.com") time.sleep(random.randint(10, 20)) return list(all_links) def scrape_detail(self, url): try: self.driver.get(url) # --- 核心改进:详情页留存仿真 --- self._human_behavior(duration=random.randint(12, 25)) self.check_for_captcha() model = self.driver.execute_script( "return (window.context && window.context.result && window.context.result.global && window.context.result.global.globalData && window.context.result.global.globalData.model) || window.__INITIAL_DATA__ || window.iDetailData || window.iDetailConfig || null;" ) if not model: return None def get_attr(name): try: attrs = model.get("offerDetail", {}).get("featureAttributes", []) for item in attrs: if name in item.get("name", ""): return item.get("value", "") attrs = model.get("detailData", {}).get("attributes", []) for item in attrs: if name in item.get("attributeName", ""): return item.get("value", "") except: pass return "" trade = model.get("tradeModel", {}) if isinstance(model, dict) else {} price_min = trade.get("minPrice", "") or "" if not price_min: try: price_min = model["sku"]["priceRange"][0][1] except: pass ranges = trade.get("disPriceRanges") or trade.get("currentPrices") or [] range_text = " / ".join([f"{r.get('beginAmount')}起 ¥{r.get('price') or r.get('discountPrice')}" for r in ranges]) base_data = { "category": (model.get("offerDetail", {}).get("leafCategoryName", "") if isinstance(model, dict) else "") or self.driver.find_element(By.CSS_SELECTOR, "div[class*=breadcrumb] a:last-child").text.strip(), "brand": get_attr("品牌"), "name": (model.get("offerDetail", {}).get("subject", "") if isinstance(model, dict) else "") or self.driver.title.split('-')[0], "spec": get_attr("尺码") or get_attr("规格") or get_attr("型号"), "material": get_attr("材质") or get_attr("面料"), "price": price_min, "moq": trade.get("beginAmount", ""), "wholesale_price": range_text, "link": url, "supplier": (model.get("sellerModel", {}).get("companyName", "") if isinstance(model, dict) else ""), } sku_props = model.get("skuModel", {}).get("skuProps", []) or model.get("detailData", {}).get("skuProps", []) or [] main_prop = next((p for p in sku_props if any(k in p.get("prop", "") for k in ["颜色", "分类", "款式", "花色"])), None) if not main_prop and sku_props: main_prop = sku_props[0] if main_prop and main_prop.get("value"): results = [] for val in main_prop["value"]: if val.get("name"): row = base_data.copy() row["color"] = val.get("name") results.append(row) return results return [base_data] except: return None def clean_url(self, url): if not url: return "" id_match = re.search(r'offer/(\d+)\.html', url) if id_match: return f"https://detail.1688.com/offer/{id_match.group(1)}.html" return url def _extract_all_methods(self): results = [] try: res = self.driver.execute_script("return JSON.stringify(window.data || window.__INITIAL_DATA__)") if res: data = json.loads(res) def find_list(obj): if isinstance(obj, list) and len(obj) > 0 and ('title' in obj[0] or 'offerId' in obj[0]): return obj if isinstance(obj, dict): for k in obj: f = find_list(obj[k]) if f: return f return None for o in (find_list(data) or []): link = o.get('itemUrl', o.get('url', '')) if link: results.append({"name": str(o.get('title', '')), "link": link}) except: pass if not results: for s in [".search-offer-item", "[class*='offer-card']", ".offer-item"]: for el in self.driver.find_elements(By.CSS_SELECTOR, s): try: link = el.find_element(By.TAG_NAME, "a").get_attribute("href") if link: results.append({"name": el.text.split('\n')[0][:50], "link": link}) except: continue if results: break return results def quit(self): try: self.driver.quit() except: pass