|
|
@@ -27,14 +27,23 @@ class Scraper1688:
|
|
|
self.log_callback = log_callback
|
|
|
self.user_data_path = os.path.abspath(os.path.join(os.getcwd(), "chrome_stable_profile"))
|
|
|
self.driver = None
|
|
|
+
|
|
|
+ # 1. 初始化清理
|
|
|
self._cleanup()
|
|
|
+
|
|
|
+ # 2. 启动浏览器
|
|
|
self._init_chrome(headless)
|
|
|
+
|
|
|
if self.driver:
|
|
|
stealth(self.driver, languages=["zh-CN", "zh"], vendor="Google Inc.", platform="Win32", fix_hairline=True)
|
|
|
|
|
|
def _find_chrome(self):
|
|
|
+ """ 通过注册表寻找 Chrome 精准安装路径 """
|
|
|
import winreg
|
|
|
- reg_paths = [(winreg.HKEY_LOCAL_MACHINE, r"SOFTWARE\Microsoft\Windows\CurrentVersion\App Paths\chrome.exe"), (winreg.HKEY_CURRENT_USER, r"SOFTWARE\Microsoft\Windows\CurrentVersion\App Paths\chrome.exe")]
|
|
|
+ reg_paths = [
|
|
|
+ (winreg.HKEY_LOCAL_MACHINE, r"SOFTWARE\Microsoft\Windows\CurrentVersion\App Paths\chrome.exe"),
|
|
|
+ (winreg.HKEY_CURRENT_USER, r"SOFTWARE\Microsoft\Windows\CurrentVersion\App Paths\chrome.exe")
|
|
|
+ ]
|
|
|
for hkey, subkey in reg_paths:
|
|
|
try:
|
|
|
with winreg.OpenKey(hkey, subkey) as key:
|
|
|
@@ -44,10 +53,14 @@ class Scraper1688:
|
|
|
return None
|
|
|
|
|
|
def _cleanup(self):
|
|
|
+ """ 杀掉所有残留进程,确保端口和文件未被锁定 """
|
|
|
if os.name == 'nt':
|
|
|
for proc in ['chrome.exe', 'chromedriver.exe']:
|
|
|
- try: subprocess.call(['taskkill', '/F', '/IM', proc, '/T'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
|
|
|
+ try:
|
|
|
+ subprocess.call(['taskkill', '/F', '/IM', proc, '/T'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
|
|
|
except: pass
|
|
|
+
|
|
|
+ # 清理锁定文件
|
|
|
if os.path.exists(self.user_data_path):
|
|
|
for root, _, files in os.walk(self.user_data_path):
|
|
|
for f in files:
|
|
|
@@ -56,7 +69,9 @@ class Scraper1688:
|
|
|
except: pass
|
|
|
|
|
|
def _init_chrome(self, headless):
|
|
|
+ """ 强化版启动:解决浏览器不弹出及连接重置报错 """
|
|
|
chrome_path = self._find_chrome()
|
|
|
+
|
|
|
def create_options():
|
|
|
opts = uc.ChromeOptions()
|
|
|
opts.add_argument(f'user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36')
|
|
|
@@ -64,99 +79,161 @@ class Scraper1688:
|
|
|
if headless: opts.add_argument('--headless=new')
|
|
|
opts.add_argument('--disable-blink-features=AutomationControlled')
|
|
|
opts.add_argument("--window-size=1920,1080")
|
|
|
+ # 兼容性全家桶
|
|
|
opts.add_argument("--no-sandbox")
|
|
|
opts.add_argument("--disable-dev-shm-usage")
|
|
|
opts.add_argument("--remote-allow-origins=*")
|
|
|
+ opts.add_argument("--no-first-run")
|
|
|
+ opts.add_argument("--no-default-browser-check")
|
|
|
return opts
|
|
|
+
|
|
|
+ print(f"[*] 正在物理启动 Chrome: {chrome_path}")
|
|
|
try:
|
|
|
- self.driver = uc.Chrome(options=create_options(), headless=headless, browser_executable_path=chrome_path, use_subprocess=True)
|
|
|
- except:
|
|
|
- self.driver = uc.Chrome(options=create_options(), headless=headless, use_subprocess=True)
|
|
|
+ # 增加 use_subprocess=True,显著提升在 Win11 下的连接稳定性
|
|
|
+ self.driver = uc.Chrome(
|
|
|
+ options=create_options(),
|
|
|
+ headless=headless,
|
|
|
+ browser_executable_path=chrome_path,
|
|
|
+ use_subprocess=True
|
|
|
+ )
|
|
|
+ print("[+] Chrome 浏览器已成功弹出!")
|
|
|
+ except Exception as e:
|
|
|
+ print(f"[*] 首次启动失败 ({e}),尝试自动兼容模式...")
|
|
|
+ try:
|
|
|
+ self._cleanup()
|
|
|
+ time.sleep(2)
|
|
|
+ # 兜底方案
|
|
|
+ self.driver = uc.Chrome(options=create_options(), headless=headless, use_subprocess=True)
|
|
|
+ print("[+] 自动兼容模式启动成功!")
|
|
|
+ except Exception as e2:
|
|
|
+ print(f"[致命错误] 无法启动 Chrome: {e2}")
|
|
|
+ raise Exception("无法拉起 Chrome,请尝试关闭杀毒软件或重新安装 Chrome。")
|
|
|
|
|
|
def clean_url(self, url):
|
|
|
- """ 极其鲁棒的 ID 提取并转化为详情链接 """
|
|
|
+ """ 提取链接中的 ID 并重组为标准详情页链接 """
|
|
|
if not url: return ""
|
|
|
- if url.startswith("//"): url = "https:" + url
|
|
|
- # 只要能提取出 9 位以上数字 ID,就视为合法商品
|
|
|
- id_match = re.search(r'(\d{9,15})', url)
|
|
|
+ if isinstance(url, str) and url.startswith("//"): url = "https:" + url
|
|
|
+
|
|
|
+ # 只要能提取出 9 位以上数字 ID,就视为合法商品链接
|
|
|
+ id_match = re.search(r'(\d{9,15})', str(url))
|
|
|
if id_match:
|
|
|
return f"https://detail.1688.com/offer/{id_match.group(1)}.html"
|
|
|
return ""
|
|
|
|
|
|
def check_for_captcha(self):
|
|
|
+ """ 检测登录、滑块、验证等状态 """
|
|
|
def is_blocked():
|
|
|
try:
|
|
|
- url, src = self.driver.current_url.lower(), self.driver.page_source.lower()
|
|
|
+ url, src, title = self.driver.current_url.lower(), self.driver.page_source.lower(), self.driver.title.lower()
|
|
|
sliders = self.driver.find_elements(By.ID, "nc_1_n1z")
|
|
|
- return (len(sliders) > 0 and sliders[0].is_displayed()) or "login.1688.com" in url or "punish" in url or "哎哟喂" in src
|
|
|
+ is_slider = len(sliders) > 0 and sliders[0].is_displayed()
|
|
|
+ is_login = "login.1688.com" in url or "passport.1688.com" in url
|
|
|
+ is_punish = "punish" in url or "哎哟喂" in src or "验证码" in title
|
|
|
+ return is_slider or is_login or is_punish
|
|
|
except: return False
|
|
|
+
|
|
|
if is_blocked():
|
|
|
msg = "请登录验证"
|
|
|
if self.status_callback: self.status_callback(True, msg)
|
|
|
- while is_blocked(): time.sleep(2)
|
|
|
+ while is_blocked(): time.sleep(3)
|
|
|
if self.status_callback: self.status_callback(False, "验证通过")
|
|
|
- time.sleep(3)
|
|
|
+
|
|
|
+ cool_msg = "[*] 解封成功,进入 120 秒冷却期以规避风控追溯..."
|
|
|
+ if self.log_callback: self.log_callback(f"<font color='orange'>{cool_msg}</font>")
|
|
|
+ time.sleep(120)
|
|
|
return True
|
|
|
|
|
|
def search_products_yield(self, keyword, total_count=200, existing_links=None):
|
|
|
gbk_keyword = urllib.parse.quote(keyword, encoding='gbk')
|
|
|
base_url = f"https://s.1688.com/selloffer/offer_search.htm?keywords={gbk_keyword}&n=y&netType=1%2C11%2C16"
|
|
|
+
|
|
|
+ # 首页预热,检查登录
|
|
|
self.driver.get("https://www.1688.com")
|
|
|
self.check_for_captcha()
|
|
|
+
|
|
|
all_links = existing_links if existing_links is not None else set()
|
|
|
- page, initial_count = 1, len(all_links)
|
|
|
+ page = 1
|
|
|
+ initial_count = len(all_links)
|
|
|
|
|
|
while len(all_links) < total_count + initial_count:
|
|
|
print(f"[*] 正在处理列表页: 第 {page} 页...")
|
|
|
- self.driver.get(f"{base_url}&beginPage={page}&page={page}")
|
|
|
+ target_url = f"{base_url}&beginPage={page}&page={page}"
|
|
|
+ self.driver.get(target_url)
|
|
|
self.check_for_captcha()
|
|
|
- for i in range(1, 6):
|
|
|
- self.driver.execute_script(f"window.scrollTo(0, document.body.scrollHeight * {i/5});")
|
|
|
- time.sleep(1.2)
|
|
|
+
|
|
|
+ # 模拟人类翻页滚动
|
|
|
+ for i in range(1, 5):
|
|
|
+ self.driver.execute_script(f"window.scrollTo(0, document.body.scrollHeight * {i/4});")
|
|
|
+ time.sleep(1.5)
|
|
|
|
|
|
+ # --- 核心订正:此处必须获取到 page_results ---
|
|
|
page_results = self._extract_all_methods()
|
|
|
- print(f" [+] 本页发现 {len(page_results)} 个商品链接")
|
|
|
+ print(f" [+] 本页发现 {len(page_results)} 个潜在商品链接")
|
|
|
|
|
|
page_batch = []
|
|
|
for it in page_results:
|
|
|
raw_link = it.get("link")
|
|
|
clean_url = self.clean_url(raw_link)
|
|
|
+
|
|
|
if clean_url and clean_url not in all_links:
|
|
|
all_links.add(clean_url)
|
|
|
- # 关键修改:此处必须进入详情页抓取
|
|
|
+
|
|
|
+ # --- 核心改进:显式打印详情抓取日志并进入循环 ---
|
|
|
print(f" [>] 正在启动详情抓取: {clean_url}")
|
|
|
detail_results = self.scrape_detail(clean_url)
|
|
|
+
|
|
|
if detail_results:
|
|
|
page_batch.extend(detail_results)
|
|
|
else:
|
|
|
- page_batch.append({"link": clean_url, "name": it.get("name", "未知"), "price": ""})
|
|
|
+ # 兜底:如果详情页抓取失败,至少保留列表页基本信息
|
|
|
+ page_batch.append({
|
|
|
+ "category": "", "brand": "", "name": it.get("name", "未知商品"),
|
|
|
+ "color": "", "spec": "", "material": "", "price": "",
|
|
|
+ "moq": "", "wholesale_price": "", "link": clean_url, "supplier": ""
|
|
|
+ })
|
|
|
|
|
|
+ # 每满 10 条(或此时的批次)yield 一次给 GUI
|
|
|
if len(page_batch) >= 10:
|
|
|
yield page_batch
|
|
|
page_batch = []
|
|
|
- time.sleep(random.uniform(15, 25))
|
|
|
- if len(all_links) >= total_count + initial_count: break
|
|
|
+
|
|
|
+ # 抓取后的随机等待,维持长效免验证
|
|
|
+ time.sleep(random.uniform(15, 30))
|
|
|
+
|
|
|
+ if len(all_links) >= total_count + initial_count:
|
|
|
+ break
|
|
|
|
|
|
- if page_batch: yield page_batch
|
|
|
+ # 页末清算
|
|
|
+ if page_batch:
|
|
|
+ yield page_batch
|
|
|
+ page_batch = []
|
|
|
+
|
|
|
page += 1
|
|
|
- if page % 3 == 0:
|
|
|
- self.driver.get("https://www.1688.com")
|
|
|
+ if page % 2 == 0:
|
|
|
+ self.driver.get("https://www.1688.com") # 随机回首页,重置指纹
|
|
|
time.sleep(random.randint(10, 20))
|
|
|
return list(all_links)
|
|
|
|
|
|
def scrape_detail(self, url):
|
|
|
- """ 精准拆分款式与价格 """
|
|
|
+ """
|
|
|
+ 完全对标 req.py 的详情页精准解析逻辑
|
|
|
+ 支持 expand-view-list-wrapper 中的款式描述 + 价格 拆分
|
|
|
+ """
|
|
|
try:
|
|
|
self.driver.get(url)
|
|
|
- time.sleep(random.uniform(5, 8))
|
|
|
+ time.sleep(random.uniform(5, 10)) # 给页面充分加载时间
|
|
|
self.check_for_captcha()
|
|
|
+
|
|
|
+ # 1. 执行 JS 获取完整模型 (req.py 核心思路)
|
|
|
model = self.driver.execute_script(
|
|
|
"return (window.context && window.context.result && "
|
|
|
"window.context.result.global && window.context.result.global.globalData "
|
|
|
"&& window.context.result.global.globalData.model) || "
|
|
|
"window.__INITIAL_DATA__ || window.iDetailData || window.iDetailConfig || null;"
|
|
|
)
|
|
|
- if not model: return None
|
|
|
+
|
|
|
+ if not model:
|
|
|
+ return None
|
|
|
|
|
|
def get_attr(name):
|
|
|
try:
|
|
|
@@ -169,67 +246,108 @@ class Scraper1688:
|
|
|
except: pass
|
|
|
return ""
|
|
|
|
|
|
+ def safe_text(by, sel):
|
|
|
+ try: return self.driver.find_element(by, sel).text.strip()
|
|
|
+ except: return ""
|
|
|
+
|
|
|
+ # 解析批发价区间
|
|
|
trade = model.get("tradeModel", {}) if isinstance(model, dict) else {}
|
|
|
ranges = trade.get("disPriceRanges") or trade.get("currentPrices") or []
|
|
|
range_text = " / ".join([f"{r.get('beginAmount')}起 ¥{r.get('price') or r.get('discountPrice')}" for r in ranges])
|
|
|
|
|
|
base_data = {
|
|
|
- "category": (model.get("offerDetail", {}).get("leafCategoryName", "") if isinstance(model, dict) else "") or self.driver.find_element(By.CSS_SELECTOR, "div[class*=breadcrumb] a:last-child").text.strip(),
|
|
|
+ "category": (model.get("offerDetail", {}).get("leafCategoryName", "") if isinstance(model, dict) else "") or safe_text(By.CSS_SELECTOR, "div[class*=breadcrumb] a:last-child"),
|
|
|
"brand": get_attr("品牌"),
|
|
|
"name": (model.get("offerDetail", {}).get("subject", "") if isinstance(model, dict) else "") or self.driver.title.split('-')[0],
|
|
|
- "spec": get_attr("尺码") or get_attr("规格") or get_attr("型号"),
|
|
|
+ "spec": get_attr("尺码") or get_attr("规格") or get_attr("型号") or safe_text(By.XPATH, "//div[@id='productAttributes']//th[span='尺码' or span='规格']/following-sibling::td[1]//span[@class='field-value']"),
|
|
|
"material": get_attr("材质") or get_attr("面料"),
|
|
|
- "price": "", "moq": trade.get("beginAmount", ""), "wholesale_price": range_text, "link": url,
|
|
|
+ "price": "", # 待填充
|
|
|
+ "moq": trade.get("beginAmount", ""),
|
|
|
+ "wholesale_price": range_text,
|
|
|
+ "link": url,
|
|
|
"supplier": (model.get("sellerModel", {}).get("companyName", "") if isinstance(model, dict) else ""),
|
|
|
}
|
|
|
|
|
|
- variant_data_list = []
|
|
|
+ # 2. 核心需求:智能识别并拆分变体 (款式 + 价格)
|
|
|
+ variant_results = []
|
|
|
try:
|
|
|
+ # 寻找用户指定的 DOM 区域
|
|
|
wrappers = self.driver.find_elements(By.CLASS_NAME, "expand-view-list-wrapper")
|
|
|
if wrappers:
|
|
|
items = wrappers[0].find_elements(By.CSS_SELECTOR, ".expand-view-list-item, [class*='list-item'], .sku-item")
|
|
|
for item_el in items:
|
|
|
try:
|
|
|
+ # 提取款式 (item-label) 和 价格 (item-price-stock)
|
|
|
label = item_el.find_element(By.CLASS_NAME, "item-label").text.strip()
|
|
|
- price = item_el.find_element(By.CLASS_NAME, "item-price-stock").text.strip()
|
|
|
- if label: variant_data_list.append({"label": label, "price": re.sub(r'[^\d.]', '', price)})
|
|
|
+ price_raw = item_el.find_element(By.CLASS_NAME, "item-price-stock").text.strip()
|
|
|
+ # 过滤掉非数字
|
|
|
+ price_clean = re.sub(r'[^\d.]', '', price_raw)
|
|
|
+
|
|
|
+ if label:
|
|
|
+ row = base_data.copy()
|
|
|
+ row["color"] = label
|
|
|
+ row["price"] = price_clean
|
|
|
+ variant_results.append(row)
|
|
|
except: continue
|
|
|
except: pass
|
|
|
|
|
|
- if variant_data_list:
|
|
|
- results = []
|
|
|
- for vd in variant_data_list:
|
|
|
- row = base_data.copy(); row["color"] = vd["label"]; row["price"] = vd["price"]; results.append(row)
|
|
|
- return results
|
|
|
+ if variant_results:
|
|
|
+ return variant_results
|
|
|
+
|
|
|
+ # 3. 兜底方案:如果没有变体列表,获取主价格
|
|
|
+ base_data["price"] = trade.get("minPrice", "")
|
|
|
+ base_data["color"] = get_attr("颜色") or get_attr("颜色分类") or ""
|
|
|
return [base_data]
|
|
|
- except: return None
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ print(f"[!] 详情页抓取异常 ({url}): {e}")
|
|
|
+ return None
|
|
|
|
|
|
def _extract_all_methods(self):
|
|
|
- """ 全力提取列表页链接 """
|
|
|
+ """ 强化版列表页提取:对标 req.py 的 JS 变量探测 """
|
|
|
results = []
|
|
|
- try:
|
|
|
- res = self.driver.execute_script("return JSON.stringify(window.data || window.context?.result?.data || window.__INITIAL_DATA__)")
|
|
|
- if res and res != "null":
|
|
|
- data = json.loads(res)
|
|
|
- def find_list(obj):
|
|
|
- if isinstance(obj, list) and len(obj) > 0 and isinstance(obj[0], dict) and any(k in obj[0] for k in ['offerId', 'title', 'subject']): return obj
|
|
|
- if isinstance(obj, dict):
|
|
|
- for k in obj:
|
|
|
- f = find_list(obj[k])
|
|
|
- if f: return f
|
|
|
- return None
|
|
|
- for o in (find_list(data) or []):
|
|
|
- link = o.get('itemUrl', o.get('url', '')) or str(o.get('offerId', ''))
|
|
|
- if link: results.append({"name": str(o.get('title', '')), "link": link})
|
|
|
- if results: return results
|
|
|
- except: pass
|
|
|
- for s in [".sm-offer-item", ".offer-card-item", "[class*='offer-card']"]:
|
|
|
- for el in self.driver.find_elements(By.CSS_SELECTOR, s):
|
|
|
- try:
|
|
|
- a = el.find_element(By.TAG_NAME, "a"); link = a.get_attribute("href")
|
|
|
- if link: results.append({"name": el.text.split('\n')[0][:50], "link": link})
|
|
|
- except: continue
|
|
|
- if results: break
|
|
|
+ # 1. 优先使用脚本直接获取内存中的列表数据 (req.py 方式)
|
|
|
+ scripts = [
|
|
|
+ "return JSON.stringify(window.data || window.context?.result?.data || window.__INITIAL_DATA__)",
|
|
|
+ "return JSON.stringify(window.context?.result?.global?.globalData?.data || null)"
|
|
|
+ ]
|
|
|
+ for s in scripts:
|
|
|
+ try:
|
|
|
+ res = self.driver.execute_script(s)
|
|
|
+ if res and res != "null":
|
|
|
+ data = json.loads(res)
|
|
|
+ def find_list(obj):
|
|
|
+ if isinstance(obj, list) and len(obj) > 0 and isinstance(obj[0], dict) and any(k in obj[0] for k in ['offerId', 'title', 'subject']): return obj
|
|
|
+ if isinstance(obj, dict):
|
|
|
+ for k in obj:
|
|
|
+ f = find_list(obj[k])
|
|
|
+ if f: return f
|
|
|
+ return None
|
|
|
+
|
|
|
+ found_items = find_list(data) or []
|
|
|
+ for o in found_items:
|
|
|
+ link = o.get('itemUrl', o.get('url', '')) or str(o.get('offerId', ''))
|
|
|
+ if link:
|
|
|
+ results.append({
|
|
|
+ "name": str(o.get('title', o.get('subject', ''))),
|
|
|
+ "link": link
|
|
|
+ })
|
|
|
+ if results: return results
|
|
|
+ except: continue
|
|
|
+
|
|
|
+ # 2. 暴力 DOM 扫描 (如果 JS 变量失效)
|
|
|
+ selectors = [".sm-offer-item", ".offer-card-item", ".pc-search-offer-item", "[class*='offer-card']", ".offer-item"]
|
|
|
+ for s in selectors:
|
|
|
+ elements = self.driver.find_elements(By.CSS_SELECTOR, s)
|
|
|
+ if len(elements) > 2:
|
|
|
+ for el in elements:
|
|
|
+ try:
|
|
|
+ a_tag = el.find_element(By.TAG_NAME, "a")
|
|
|
+ link = a_tag.get_attribute("href")
|
|
|
+ if link:
|
|
|
+ results.append({"name": el.text.split('\n')[0][:50], "link": link})
|
|
|
+ except: continue
|
|
|
+ if results: break
|
|
|
return results
|
|
|
|
|
|
def quit(self):
|