# 针对 Python 3.12+ 移除 distutils 的兼容性补丁 import sys try: import distutils except ImportError: from types import ModuleType d, v = ModuleType("distutils"), ModuleType("distutils.version") d.version = v sys.modules.update({"distutils": d, "distutils.version": v}) class LooseVersion: def __init__(self, v): self.v = v def __lt__(self, o): return True def __str__(self): return str(self.v) v.LooseVersion = LooseVersion import time, random, re, os, subprocess, urllib.parse, json, traceback, socket from selenium import webdriver from selenium.webdriver.edge.options import Options as EdgeOptions from selenium.webdriver.edge.service import Service as EdgeService from selenium.webdriver.chrome.options import Options as ChromeOptions import undetected_chromedriver as uc from selenium.webdriver.common.by import By from selenium.webdriver.common.action_chains import ActionChains from selenium_stealth import stealth class Scraper1688: def __init__(self, headless=True, status_callback=None): self.headless = headless self.status_callback = status_callback self.user_data_path = os.path.abspath(os.path.join(os.getcwd(), "1688_user_data")) self.driver = None edge_path = self._find_edge() if edge_path: print(f"[*] 检测到 Edge: {edge_path},正在全自动启动并接管...") self._cleanup_processes() edge_user_data = os.path.join(os.getcwd(), "1688_edge_profile") cmd = [edge_path, "--remote-debugging-port=9222", f"--user-data-dir={edge_user_data}", "--no-first-run", "--no-default-browser-check"] if headless: cmd.append("--headless") try: subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) time.sleep(3) opts = EdgeOptions() opts.add_experimental_option("debuggerAddress", "127.0.0.1:9222") try: self.driver = webdriver.Edge(options=opts) print("[+] Edge 浏览器已成功接管!") except: from webdriver_manager.microsoft import EdgeChromiumDriverManager service = EdgeService(EdgeChromiumDriverManager().install()) self.driver = webdriver.Edge(service=service, options=opts) print("[+] Edge 浏览器已通过驱动管理接管!") except Exception as e: print(f"[*] Edge 启动失败: {e}") if not self.driver: print("[*] 正在启动 Chrome (undetected-chromedriver) 模式...") self._init_chrome(headless) if self.driver: if "edge" not in str(type(self.driver)).lower(): stealth(self.driver, languages=["zh-CN", "zh"], vendor="Google Inc.", platform="Win32", fix_hairline=True) else: self.driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", { "source": "Object.defineProperty(navigator, 'webdriver', { get: () => undefined });" }) def _find_edge(self): import winreg reg_paths = [(winreg.HKEY_LOCAL_MACHINE, r"SOFTWARE\Microsoft\Windows\CurrentVersion\App Paths\msedge.exe"), (winreg.HKEY_CURRENT_USER, r"SOFTWARE\Microsoft\Windows\CurrentVersion\App Paths\msedge.exe")] for hkey, subkey in reg_paths: try: with winreg.OpenKey(hkey, subkey) as key: path, _ = winreg.QueryValueEx(key, "") if os.path.exists(path): return path except: continue return None def _cleanup_processes(self): if os.name == 'nt': for proc in ['msedge.exe', 'msedgedriver.exe', 'chromedriver.exe']: subprocess.call(['taskkill', '/F', '/IM', proc, '/T'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) def _init_chrome(self, headless): def create_options(): opts = uc.ChromeOptions() opts.add_argument(f"--user-data-dir={self.user_data_path}") if headless: opts.add_argument('--headless=new') opts.add_argument('--disable-blink-features=AutomationControlled') return opts try: self.driver = uc.Chrome(options=create_options(), headless=headless) except: self.driver = uc.Chrome(options=create_options(), headless=headless) def clean_url(self, url): if not url: return "" if url.startswith("//"): url = "https:" + url id_match = re.search(r'offer/(\d+)\.html', url) if id_match: return f"https://detail.1688.com/offer/{id_match.group(1)}.html" parsed = urllib.parse.urlparse(url) params = urllib.parse.parse_qs(parsed.query) oid = params.get('offerId') or params.get('id') if oid: return f"https://detail.1688.com/offer/{oid[0]}.html" return url def check_for_captcha(self): def is_blocked(): try: url, src, title = self.driver.current_url.lower(), self.driver.page_source.lower(), self.driver.title.lower() sliders = self.driver.find_elements(By.ID, "nc_1_n1z") is_slider = len(sliders) > 0 and sliders[0].is_displayed() is_login = "login.1688.com" in url or "passport.1688.com" in url is_punish = "punish" in url or "哎哟喂" in src or "验证码" in title return is_slider or is_login or is_punish except: return False if is_blocked(): msg = "请登录验证" if self.status_callback: self.status_callback(True, msg) while is_blocked(): time.sleep(2) if self.status_callback: self.status_callback(False, "验证通过") time.sleep(3) return True def search_products_yield(self, keyword, total_count=200, existing_links=None): gbk_keyword = urllib.parse.quote(keyword, encoding='gbk') base_url = f"https://s.1688.com/selloffer/offer_search.htm?keywords={gbk_keyword}&n=y&netType=1%2C11%2C16" self.driver.get("https://www.1688.com") self.check_for_captcha() all_links = existing_links if existing_links is not None else set() page, initial_count = 1, len(all_links) while len(all_links) < total_count + initial_count: print(f"[*] 正在处理列表页: 第 {page} 页...") self.driver.get(f"{base_url}&beginPage={page}&page={page}") self.check_for_captcha() # --- 优化:模拟人类不均匀滚动,降低滑块频率 --- scroll_steps = random.randint(5, 10) for i in range(1, scroll_steps + 1): self.driver.execute_script(f"window.scrollTo(0, document.body.scrollHeight * {i/scroll_steps});") time.sleep(random.uniform(0.5, 2.0)) page_results = self._extract_all_methods() if not page_results: print(f"[!] 第 {page} 页无结果,尝试刷新...") self.driver.refresh() time.sleep(5) page_results = self._extract_all_methods() page_batch = [] for it in page_results: clean_url = self.clean_url(it["link"]) if clean_url and clean_url not in all_links: all_links.add(clean_url) # --- 优化:引入强制休息机制 --- current_new_count = len(all_links) - initial_count if current_new_count > 0 and current_new_count % 15 == 0: rest_time = random.randint(15, 30) print(f"[*] 为了账号安全,强制休息 {rest_time} 秒...") time.sleep(rest_time) print(f" [>] 抓取详情: {clean_url}") # 进入详情页前的微睡眠 time.sleep(random.uniform(1.5, 3.5)) detail_results = self.scrape_detail(clean_url) if detail_results: page_batch.extend(detail_results) else: page_batch.append({"link": clean_url, "name": it["name"]}) if len(page_batch) >= 10: yield page_batch page_batch = [] # --- 优化:详情页之间的大幅随机等待 --- time.sleep(random.uniform(6, 12)) if len(all_links) >= total_count + initial_count: break if page_batch: yield page_batch page += 1 if page > 100: break return list(all_links) def scrape_detail(self, url): """ 精准抓取详情页 """ try: self.driver.get(url) time.sleep(random.uniform(2.5, 4.5)) # 详情页加载等待 self.check_for_captcha() model = self.driver.execute_script( "return (window.context && window.context.result && window.context.result.global && window.context.result.global.globalData && window.context.result.global.globalData.model) || window.__INITIAL_DATA__ || window.iDetailData || window.iDetailConfig || null;" ) if not model: return None def get_attr(name): try: attrs = model.get("offerDetail", {}).get("featureAttributes", []) for item in attrs: if name in item.get("name", ""): return item.get("value", "") attrs = model.get("detailData", {}).get("attributes", []) for item in attrs: if name in item.get("attributeName", ""): return item.get("value", "") except: pass return "" trade = model.get("tradeModel", {}) if isinstance(model, dict) else {} price_min = trade.get("minPrice", "") or "" if not price_min: try: price_min = model["sku"]["priceRange"][0][1] except: pass ranges = trade.get("disPriceRanges") or trade.get("currentPrices") or [] range_text = " / ".join([f"{r.get('beginAmount')}起 ¥{r.get('price') or r.get('discountPrice')}" for r in ranges]) base_data = { "category": (model.get("offerDetail", {}).get("leafCategoryName", "") if isinstance(model, dict) else "") or self.driver.find_element(By.CSS_SELECTOR, "div[class*=breadcrumb] a:last-child").text.strip(), "brand": get_attr("品牌"), "name": (model.get("offerDetail", {}).get("subject", "") if isinstance(model, dict) else "") or self.driver.title.split('-')[0], "spec": get_attr("尺码") or get_attr("规格") or get_attr("型号"), "material": get_attr("材质") or get_attr("面料"), "price": price_min, "moq": trade.get("beginAmount", ""), "wholesale_price": range_text, "link": url, "supplier": (model.get("sellerModel", {}).get("companyName", "") if isinstance(model, dict) else ""), } sku_props = model.get("skuModel", {}).get("skuProps", []) or model.get("detailData", {}).get("skuProps", []) or [] main_prop = next((p for p in sku_props if any(k in p.get("prop", "") for k in ["颜色", "分类", "款式", "花色"])), None) if not main_prop and sku_props: main_prop = sku_props[0] if main_prop and main_prop.get("value"): results = [] for val in main_prop["value"]: if val.get("name"): row = base_data.copy() row["color"] = val.get("name") results.append(row) return results return [base_data] except: return None def _extract_all_methods(self): """ 列表页提取 """ results = [] try: res = self.driver.execute_script("return JSON.stringify(window.data || window.__INITIAL_DATA__)") if res: data = json.loads(res) def find_list(obj): if isinstance(obj, list) and len(obj) > 0 and ('title' in obj[0] or 'offerId' in obj[0]): return obj if isinstance(obj, dict): for k in obj: f = find_list(obj[k]) if f: return f return None for o in (find_list(data) or []): link = o.get('itemUrl', o.get('url', '')) if link: results.append({"name": str(o.get('title', '')), "link": link}) except: pass if not results: for s in [".search-offer-item", "[class*='offer-card']", ".offer-item"]: elements = self.driver.find_elements(By.CSS_SELECTOR, s) if len(elements) > 2: for el in elements: try: a = el.find_element(By.TAG_NAME, "a") link = a.get_attribute("href") if link: results.append({"name": el.text.split('\n')[0][:50], "link": link}) except: continue if results: break if not results: ids = re.findall(r'data-offer-id="(\d+)"', self.driver.page_source) for oid in set(ids): results.append({"name": f"1688商品-{oid}", "link": f"https://detail.1688.com/offer/{oid}.html"}) return results def quit(self): try: self.driver.quit() except: pass