|
|
@@ -15,6 +15,8 @@ except ImportError:
|
|
|
|
|
|
import time, random, re, os, subprocess, urllib.parse, json, traceback, socket
|
|
|
from selenium import webdriver
|
|
|
+from selenium.webdriver.edge.options import Options as EdgeOptions
|
|
|
+from selenium.webdriver.edge.service import Service as EdgeService
|
|
|
import undetected_chromedriver as uc
|
|
|
from selenium.webdriver.common.by import By
|
|
|
from selenium.webdriver.common.action_chains import ActionChains
|
|
|
@@ -29,36 +31,60 @@ class Scraper1688:
|
|
|
|
|
|
# 1. 探测 Edge 路径
|
|
|
edge_path = self._find_edge()
|
|
|
- if not edge_path:
|
|
|
- raise Exception("电脑上未检测到 Edge 浏览器,请先安装。")
|
|
|
-
|
|
|
- print(f"[*] 检测到 Edge: {edge_path},正在自动启动...")
|
|
|
|
|
|
- # 2. 启动前强制清理残留进程,防止端口或用户目录被锁定
|
|
|
- self._cleanup_processes()
|
|
|
-
|
|
|
- # 3. 使用 undetected-chromedriver 强行驱动 Edge
|
|
|
- try:
|
|
|
- options = uc.ChromeOptions()
|
|
|
- options.binary_location = edge_path # 关键:指定使用 Edge 二进制文件
|
|
|
- options.add_argument(f"--user-data-dir={self.user_data_path}")
|
|
|
- if headless: options.add_argument('--headless=new')
|
|
|
- options.add_argument('--disable-blink-features=AutomationControlled')
|
|
|
- options.add_argument("--window-size=1920,1080")
|
|
|
+ if edge_path:
|
|
|
+ print(f"[*] 检测到 Edge: {edge_path},正在全自动启动并接管...")
|
|
|
+ # 2. 清理旧进程,确保 9222 端口可用
|
|
|
+ self._cleanup_processes()
|
|
|
|
|
|
- # 使用全新的 options 初始化,避免 reuse 错误
|
|
|
- self.driver = uc.Chrome(options=options, headless=headless)
|
|
|
- print("[+] Edge 浏览器已自动弹出并成功连接!")
|
|
|
+ # 3. 后台启动 Edge (带调试端口)
|
|
|
+ # 使用独立的用户数据目录,避免和日常使用的 Edge 冲突
|
|
|
+ edge_user_data = os.path.join(os.getcwd(), "1688_edge_profile")
|
|
|
+ cmd = [
|
|
|
+ edge_path,
|
|
|
+ "--remote-debugging-port=9222",
|
|
|
+ f"--user-data-dir={edge_user_data}",
|
|
|
+ "--no-first-run",
|
|
|
+ "--no-default-browser-check"
|
|
|
+ ]
|
|
|
+ if headless:
|
|
|
+ cmd.append("--headless")
|
|
|
|
|
|
- except Exception as e:
|
|
|
- print(f"[!] 启动失败: {traceback.format_exc()}")
|
|
|
- raise Exception(f"无法自动启动 Edge 浏览器: {e}")
|
|
|
+ try:
|
|
|
+ # 异步启动浏览器进程
|
|
|
+ subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
|
|
|
+ time.sleep(3) # 等待浏览器初始化
|
|
|
+
|
|
|
+ # 4. 接管 Edge
|
|
|
+ opts = EdgeOptions()
|
|
|
+ opts.add_experimental_option("debuggerAddress", "127.0.0.1:9222")
|
|
|
+
|
|
|
+ # 尝试连接
|
|
|
+ try:
|
|
|
+ # 优先尝试使用系统路径中的驱动
|
|
|
+ self.driver = webdriver.Edge(options=opts)
|
|
|
+ print("[+] Edge 浏览器已成功自动弹出并接管!")
|
|
|
+ except:
|
|
|
+ # 如果连不上,尝试使用 webdriver_manager 自动下载匹配驱动
|
|
|
+ print("[*] 尝试自动下载匹配的 EdgeDriver...")
|
|
|
+ from webdriver_manager.microsoft import EdgeChromiumDriverManager
|
|
|
+ service = EdgeService(EdgeChromiumDriverManager().install())
|
|
|
+ self.driver = webdriver.Edge(service=service, options=opts)
|
|
|
+ print("[+] Edge 浏览器已通过驱动管理自动弹出并接管!")
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ print(f"[*] Edge 自动接管模式失败,准备回退到 Chrome: {e}")
|
|
|
+
|
|
|
+ # 5. 兜底方案:如果 Edge 启动或接管失败,启动 Chrome
|
|
|
+ if not self.driver:
|
|
|
+ print("[*] 正在启动 Chrome (undetected-chromedriver) 模式...")
|
|
|
+ self._init_chrome(headless)
|
|
|
|
|
|
- # 应用 stealth 增强隐蔽性
|
|
|
- stealth(self.driver, languages=["zh-CN", "zh"], vendor="Google Inc.", platform="Win32", fix_hairline=True)
|
|
|
+ if self.driver:
|
|
|
+ stealth(self.driver, languages=["zh-CN", "zh"], vendor="Google Inc.", platform="Win32", fix_hairline=True)
|
|
|
|
|
|
def _find_edge(self):
|
|
|
- """ 通过注册表获取 Windows 下 Edge 的精准路径 """
|
|
|
+ """ 通过注册表寻找 Edge 精准安装路径 """
|
|
|
import winreg
|
|
|
reg_paths = [
|
|
|
(winreg.HKEY_LOCAL_MACHINE, r"SOFTWARE\Microsoft\Windows\CurrentVersion\App Paths\msedge.exe"),
|
|
|
@@ -70,21 +96,27 @@ class Scraper1688:
|
|
|
path, _ = winreg.QueryValueEx(key, "")
|
|
|
if os.path.exists(path): return path
|
|
|
except: continue
|
|
|
-
|
|
|
- # 暴力路径补丁
|
|
|
- common = [
|
|
|
- r"C:\Program Files (x86)\Microsoft\Edge\Application\msedge.exe",
|
|
|
- r"C:\Program Files\Microsoft\Edge\Application\msedge.exe"
|
|
|
- ]
|
|
|
- for p in common:
|
|
|
- if os.path.exists(p): return p
|
|
|
return None
|
|
|
|
|
|
def _cleanup_processes(self):
|
|
|
- """ 在启动前杀掉残留的 Edge 进程,确保 9222 端口可用 """
|
|
|
+ """ 清理残留的 Edge 和驱动进程 """
|
|
|
if os.name == 'nt':
|
|
|
- subprocess.call(['taskkill', '/F', '/IM', 'msedge.exe', '/T'],
|
|
|
- stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
|
|
|
+ for proc in ['msedge.exe', 'msedgedriver.exe']:
|
|
|
+ subprocess.call(['taskkill', '/F', '/IM', proc, '/T'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
|
|
|
+
|
|
|
+ def _init_chrome(self, headless):
|
|
|
+ """ 初始化 undetected-chromedriver (Chrome) """
|
|
|
+ def create_options():
|
|
|
+ opts = uc.ChromeOptions()
|
|
|
+ opts.add_argument(f"--user-data-dir={self.user_data_path}")
|
|
|
+ if headless: opts.add_argument('--headless=new')
|
|
|
+ opts.add_argument('--disable-blink-features=AutomationControlled')
|
|
|
+ opts.add_argument("--window-size=1920,1080")
|
|
|
+ return opts
|
|
|
+ try:
|
|
|
+ self.driver = uc.Chrome(options=create_options(), headless=headless)
|
|
|
+ except:
|
|
|
+ self.driver = uc.Chrome(options=create_options(), headless=headless)
|
|
|
|
|
|
def clean_url(self, url):
|
|
|
"""极其鲁棒的 1688 URL 清洗逻辑"""
|
|
|
@@ -99,14 +131,10 @@ class Scraper1688:
|
|
|
return url
|
|
|
|
|
|
def check_for_captcha(self):
|
|
|
- """
|
|
|
- 核心监控:检测登录、滑块验证、访问受限等需要人工干预的状态
|
|
|
- """
|
|
|
+ """ 检测验证码、登录等干预状态 """
|
|
|
def is_blocked():
|
|
|
try:
|
|
|
- url = self.driver.current_url.lower()
|
|
|
- src = self.driver.page_source.lower()
|
|
|
- title = self.driver.title.lower()
|
|
|
+ url, src, title = self.driver.current_url.lower(), self.driver.page_source.lower(), self.driver.title.lower()
|
|
|
sliders = self.driver.find_elements(By.ID, "nc_1_n1z")
|
|
|
is_slider = len(sliders) > 0 and sliders[0].is_displayed()
|
|
|
is_login = "login.1688.com" in url or "passport.1688.com" in url
|
|
|
@@ -126,13 +154,12 @@ class Scraper1688:
|
|
|
gbk_keyword = urllib.parse.quote(keyword, encoding='gbk')
|
|
|
base_url = f"https://s.1688.com/selloffer/offer_search.htm?keywords={gbk_keyword}&n=y&netType=1%2C11%2C16"
|
|
|
|
|
|
- # 首页预热,检查登录
|
|
|
+ # 预热并检查验证
|
|
|
self.driver.get("https://www.1688.com")
|
|
|
self.check_for_captcha()
|
|
|
|
|
|
all_links = existing_links if existing_links is not None else set()
|
|
|
- page = 1
|
|
|
- initial_count = len(all_links)
|
|
|
+ page, initial_count = 1, len(all_links)
|
|
|
|
|
|
while len(all_links) < total_count + initial_count:
|
|
|
print(f"[*] 正在搜索列表页: 第 {page} 页...")
|
|
|
@@ -172,7 +199,7 @@ class Scraper1688:
|
|
|
return list(all_links)
|
|
|
|
|
|
def scrape_detail(self, url):
|
|
|
- """ 抓取详情并拆分规格 """
|
|
|
+ """ 抓取并解析详情页,支持主维度拆分 """
|
|
|
try:
|
|
|
self.driver.get(url)
|
|
|
time.sleep(2)
|
|
|
@@ -193,10 +220,6 @@ class Scraper1688:
|
|
|
except: pass
|
|
|
return ""
|
|
|
|
|
|
- def safe_text(by, sel):
|
|
|
- try: return self.driver.find_element(by, sel).text.strip()
|
|
|
- except: return ""
|
|
|
-
|
|
|
trade = model.get("tradeModel", {}) if isinstance(model, dict) else {}
|
|
|
price_min = trade.get("minPrice", "") or ""
|
|
|
if not price_min:
|
|
|
@@ -207,11 +230,11 @@ class Scraper1688:
|
|
|
range_text = " / ".join([f"{r.get('beginAmount')}起 ¥{r.get('price') or r.get('discountPrice')}" for r in ranges])
|
|
|
|
|
|
base_data = {
|
|
|
- "category": (model.get("offerDetail", {}).get("leafCategoryName", "") if isinstance(model, dict) else "") or safe_text(By.CSS_SELECTOR, "div[class*=breadcrumb] a:last-child"),
|
|
|
+ "category": (model.get("offerDetail", {}).get("leafCategoryName", "") if isinstance(model, dict) else "") or self.driver.find_element(By.CSS_SELECTOR, "div[class*=breadcrumb] a:last-child").text.strip(),
|
|
|
"brand": get_attr("品牌"),
|
|
|
"name": (model.get("offerDetail", {}).get("subject", "") if isinstance(model, dict) else ""),
|
|
|
- "spec": get_attr("尺码") or get_attr("规格") or get_attr("型号") or safe_text(By.XPATH, "//div[@id='productAttributes']//th[span='尺码' or span='规格']/following-sibling::td[1]//span[@class='field-value']"),
|
|
|
- "material": get_attr("材质") or get_attr("面料") or safe_text(By.XPATH, "//div[@id='productAttributes']//th[span='材质']/following-sibling::td[1]//span[@class='field-value']"),
|
|
|
+ "spec": get_attr("尺码") or get_attr("规格") or get_attr("型号"),
|
|
|
+ "material": get_attr("材质") or get_attr("面料"),
|
|
|
"price": price_min,
|
|
|
"moq": trade.get("beginAmount", ""),
|
|
|
"wholesale_price": range_text,
|
|
|
@@ -219,15 +242,9 @@ class Scraper1688:
|
|
|
"supplier": (model.get("sellerModel", {}).get("companyName", "") if isinstance(model, dict) else ""),
|
|
|
}
|
|
|
|
|
|
- sku_props = []
|
|
|
- try:
|
|
|
- sku_props = model.get("skuModel", {}).get("skuProps", []) or model.get("detailData", {}).get("skuProps", []) or model.get("sku", {}).get("skuProps", [])
|
|
|
- except: pass
|
|
|
-
|
|
|
- main_prop = None
|
|
|
- if sku_props:
|
|
|
- main_prop = next((p for p in sku_props if any(k in p.get("prop", "") for k in ["颜色", "分类", "款式", "花色"])), None)
|
|
|
- if not main_prop: main_prop = sku_props[0]
|
|
|
+ sku_props = model.get("skuModel", {}).get("skuProps", []) or model.get("detailData", {}).get("skuProps", []) or []
|
|
|
+ main_prop = next((p for p in sku_props if any(k in p.get("prop", "") for k in ["颜色", "分类", "款式", "花色"])), None)
|
|
|
+ if not main_prop and sku_props: main_prop = sku_props[0]
|
|
|
|
|
|
if main_prop and main_prop.get("value"):
|
|
|
results = []
|
|
|
@@ -241,7 +258,7 @@ class Scraper1688:
|
|
|
except: return None
|
|
|
|
|
|
def _extract_all_methods(self):
|
|
|
- """ 列表页提取 """
|
|
|
+ """ 列表页多方式提取 """
|
|
|
results = []
|
|
|
try:
|
|
|
res = self.driver.execute_script("return JSON.stringify(window.data || window.__INITIAL_DATA__)")
|