|
|
@@ -1,6 +1,6 @@
|
|
|
-# 【更新时间:2026-01-16 11:00】
|
|
|
+# 【最终稳定版:2026-01-16 11:45】
|
|
|
# 核心功能:支持变体拆分、精准提取款式与价格、对标 req.py 逻辑
|
|
|
-# 反爬加固:大幅降低频率,增加随机人类行为,减少登录验证
|
|
|
+# 反爬策略:极低频抓取、大跨度深度休眠、行为路径混淆,目标 3小时 < 2次验证
|
|
|
import sys
|
|
|
try:
|
|
|
import distutils
|
|
|
@@ -27,7 +27,6 @@ class Scraper1688:
|
|
|
self.headless = headless
|
|
|
self.status_callback = status_callback
|
|
|
self.log_callback = log_callback
|
|
|
- # 使用独立的 Profile 目录,避免并发冲突
|
|
|
self.user_data_path = os.path.abspath(os.path.join(os.getcwd(), "chrome_stable_profile"))
|
|
|
self.driver = None
|
|
|
self._cleanup()
|
|
|
@@ -51,22 +50,13 @@ class Scraper1688:
|
|
|
return None
|
|
|
|
|
|
def _cleanup(self):
|
|
|
- """ 强制杀掉残留进程,确保环境纯净 """
|
|
|
if os.name == 'nt':
|
|
|
for proc in ['chrome.exe', 'chromedriver.exe']:
|
|
|
try: subprocess.call(['taskkill', '/F', '/IM', proc, '/T'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
|
|
|
except: pass
|
|
|
- if os.path.exists(self.user_data_path):
|
|
|
- for root, _, files in os.walk(self.user_data_path):
|
|
|
- for f in files:
|
|
|
- if "lock" in f.lower() or f == "SingletonLock":
|
|
|
- try: os.remove(os.path.join(root, f))
|
|
|
- except: pass
|
|
|
|
|
|
def _init_chrome(self, headless):
|
|
|
- """ 强化版 Chrome 启动逻辑 """
|
|
|
chrome_path = self._find_chrome()
|
|
|
-
|
|
|
def create_options():
|
|
|
opts = uc.ChromeOptions()
|
|
|
opts.add_argument(f'user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36')
|
|
|
@@ -76,21 +66,15 @@ class Scraper1688:
|
|
|
opts.add_argument("--window-size=1920,1080")
|
|
|
opts.add_argument("--no-sandbox")
|
|
|
opts.add_argument("--disable-dev-shm-usage")
|
|
|
- opts.add_argument("--remote-allow-origins=*")
|
|
|
return opts
|
|
|
-
|
|
|
try:
|
|
|
- # 优先使用 subprocess 模式启动,解决 Win11 连接难题
|
|
|
self.driver = uc.Chrome(options=create_options(), headless=headless, browser_executable_path=chrome_path, use_subprocess=True)
|
|
|
except:
|
|
|
- # 失败则尝试普通模式,每次都使用 fresh options
|
|
|
self.driver = uc.Chrome(options=create_options(), headless=headless, use_subprocess=True)
|
|
|
|
|
|
def clean_url(self, url):
|
|
|
- """ 鲁棒的 ID 提取 logic """
|
|
|
if not url: return ""
|
|
|
url_str = str(url)
|
|
|
- if url_str.startswith("//"): url_str = "https:" + url_str
|
|
|
id_match = re.search(r'(\d{9,15})', url_str)
|
|
|
if id_match:
|
|
|
return f"https://detail.1688.com/offer/{id_match.group(1)}.html"
|
|
|
@@ -108,9 +92,8 @@ class Scraper1688:
|
|
|
if self.status_callback: self.status_callback(True, msg)
|
|
|
while is_blocked(): time.sleep(2)
|
|
|
if self.status_callback: self.status_callback(False, "验证通过")
|
|
|
- # 验证后增加一段长停顿,让风控系统冷静下来
|
|
|
- if self.log_callback: self.log_callback("<font color='orange'>验证通过,由于风控限制,将额外休息 60 秒...</font>")
|
|
|
- time.sleep(60)
|
|
|
+ if self.log_callback: self.log_callback("<font color='orange'>验证成功,进入 180 秒深度冷却期重置行为指纹...</font>")
|
|
|
+ time.sleep(180)
|
|
|
return True
|
|
|
|
|
|
def search_products_yield(self, keyword, total_count=200, existing_links=None):
|
|
|
@@ -118,93 +101,66 @@ class Scraper1688:
|
|
|
base_url = f"https://s.1688.com/selloffer/offer_search.htm?keywords={gbk_keyword}&n=y&netType=1%2C11%2C16"
|
|
|
self.driver.get("https://www.1688.com")
|
|
|
self.check_for_captcha()
|
|
|
-
|
|
|
all_links = existing_links if existing_links is not None else set()
|
|
|
page, initial_count = 1, len(all_links)
|
|
|
|
|
|
while len(all_links) < total_count + initial_count:
|
|
|
- print(f"[*] 正在处理列表页: 第 {page} 页...")
|
|
|
+ print(f"[*] 列表页采集: 第 {page} 页...")
|
|
|
self.driver.get(f"{base_url}&beginPage={page}&page={page}")
|
|
|
self.check_for_captcha()
|
|
|
|
|
|
- # --- 强化:模拟真实人类分段滚动,触发懒加载 ---
|
|
|
+ # 极慢速分段滚动
|
|
|
for i in range(1, 11):
|
|
|
self.driver.execute_script(f"window.scrollTo(0, document.body.scrollHeight * {i/10});")
|
|
|
- time.sleep(random.uniform(1.5, 3.0))
|
|
|
- if i == 5: # 中途回滑
|
|
|
- self.driver.execute_script("window.scrollBy(0, -300);")
|
|
|
- time.sleep(1.0)
|
|
|
- time.sleep(random.uniform(3, 6))
|
|
|
-
|
|
|
+ time.sleep(random.uniform(2.0, 4.5))
|
|
|
+
|
|
|
page_results = self._extract_all_methods()
|
|
|
- print(f" [+] 本页发现 {len(page_results)} 个商品原始条目")
|
|
|
+ print(f" [+] 本页解析完成:共发现 {len(page_results)} 个商品链接")
|
|
|
|
|
|
page_batch = []
|
|
|
for it in page_results:
|
|
|
- clean_url = self.clean_url(it.get("link"))
|
|
|
+ clean_url = self.clean_url(it["link"])
|
|
|
if clean_url and clean_url not in all_links:
|
|
|
all_links.add(clean_url)
|
|
|
|
|
|
- # --- 反爬加固:每抓取一定数量,进行一次深度“休息” ---
|
|
|
+ # --- 极致加固:随机触发长效深度休眠 ---
|
|
|
new_count = len(all_links) - initial_count
|
|
|
- if new_count > 0 and new_count % random.randint(12, 18) == 0:
|
|
|
- rest_seconds = random.randint(180, 360) # 休息 3-6 分钟
|
|
|
- if self.log_callback:
|
|
|
- self.log_callback(f"<font color='orange'>已连续抓取 {new_count} 个商品,为模拟真实行为休息 {rest_seconds} 秒...</font>")
|
|
|
- time.sleep(rest_seconds)
|
|
|
+ if new_count > 0 and new_count % random.randint(10, 15) == 0:
|
|
|
+ rest_secs = random.randint(300, 600) # 5-10分钟
|
|
|
+ if self.log_callback: self.log_callback(f"<font color='red'><b>已采集 {new_count} 个,进入保护性长休眠 {rest_secs} 秒...</b></font>")
|
|
|
+ time.sleep(rest_secs)
|
|
|
|
|
|
- print(f" [>] 正在启动详情抓取: {clean_url}")
|
|
|
+ print(f" [>] 详情仿真抓取: {clean_url}")
|
|
|
detail_results = self.scrape_detail(clean_url)
|
|
|
-
|
|
|
- if detail_results:
|
|
|
- page_batch.extend(detail_results)
|
|
|
- else:
|
|
|
- page_batch.append({
|
|
|
- "category": "", "brand": "", "name": it.get("name", "未知"),
|
|
|
- "color": "", "spec": "", "material": "", "price": "",
|
|
|
- "moq": "", "wholesale_price": "", "link": clean_url, "supplier": ""
|
|
|
- })
|
|
|
+ if detail_results: page_batch.extend(detail_results)
|
|
|
+ else: page_batch.append({"link": clean_url, "name": it.get("name", "未知")})
|
|
|
|
|
|
if len(page_batch) >= 10:
|
|
|
yield page_batch
|
|
|
page_batch = []
|
|
|
|
|
|
- # --- 反爬加固:详情页之间的随机超长等待 ---
|
|
|
- # 降低采集频率,是减少验证最有效的方法
|
|
|
- rest_between = random.uniform(25, 55)
|
|
|
- time.sleep(rest_between)
|
|
|
-
|
|
|
+ # --- 核心:每条详情抓取后,随机静默 40-80 秒 ---
|
|
|
+ time.sleep(random.uniform(40, 80))
|
|
|
if len(all_links) >= total_count + initial_count: break
|
|
|
|
|
|
if page_batch: yield page_batch
|
|
|
page += 1
|
|
|
-
|
|
|
- # 每处理两页列表,回首页转一圈,打破“机器人模式”
|
|
|
- if page % 2 == 0:
|
|
|
- if self.log_callback: self.log_callback("<font color='gray'>处理完两页,回首页浏览以分散风控权重...</font>")
|
|
|
- self.driver.get("https://www.1688.com")
|
|
|
- time.sleep(random.randint(15, 30))
|
|
|
-
|
|
|
+ # 处理完一页列表,回首页随机浏览重置路径指纹
|
|
|
+ self.driver.get("https://www.1688.com")
|
|
|
+ time.sleep(random.randint(30, 60))
|
|
|
return list(all_links)
|
|
|
|
|
|
def scrape_detail(self, url):
|
|
|
- """ 极其精准的变体拆分逻辑 (款式+价格) """
|
|
|
+ """ 详情页深度仿真浏览 """
|
|
|
try:
|
|
|
self.driver.get(url)
|
|
|
- # 大幅拉长详情页加载后的停留时间,并模拟随机滚动
|
|
|
- time.sleep(random.uniform(8, 15))
|
|
|
- self.driver.execute_script(f"window.scrollBy(0, {random.randint(200, 600)});")
|
|
|
- time.sleep(random.uniform(2, 4))
|
|
|
+ # 仿真阅读:停留 15-30 秒并随机分段滚动
|
|
|
+ for _ in range(random.randint(3, 6)):
|
|
|
+ self.driver.execute_script(f"window.scrollBy(0, {random.randint(200, 500)});")
|
|
|
+ time.sleep(random.uniform(3.0, 6.0))
|
|
|
|
|
|
self.check_for_captcha()
|
|
|
-
|
|
|
- # 获取核心模型
|
|
|
- model = self.driver.execute_script(
|
|
|
- "return (window.context && window.context.result && "
|
|
|
- "window.context.result.global && window.context.result.global.globalData "
|
|
|
- "&& window.context.result.global.globalData.model) || "
|
|
|
- "window.__INITIAL_DATA__ || window.iDetailData || window.iDetailConfig || null;"
|
|
|
- )
|
|
|
+ model = self.driver.execute_script("return (window.context && window.context.result && window.context.result.global && window.context.result.global.globalData && window.context.result.global.globalData.model) || window.__INITIAL_DATA__ || window.iDetailData || window.iDetailConfig || null;")
|
|
|
if not model: return None
|
|
|
|
|
|
def get_attr(name):
|
|
|
@@ -219,12 +175,13 @@ class Scraper1688:
|
|
|
return ""
|
|
|
|
|
|
trade = model.get("tradeModel", {}) if isinstance(model, dict) else {}
|
|
|
- range_text = " / ".join([f"{r.get('beginAmount')}起 ¥{r.get('price') or r.get('discountPrice')}" for r in (trade.get("disPriceRanges") or trade.get("currentPrices") or [])])
|
|
|
+ ranges = trade.get("disPriceRanges") or trade.get("currentPrices") or []
|
|
|
+ range_text = " / ".join([f"{r.get('beginAmount')}起 ¥{r.get('price') or r.get('discountPrice')}" for r in ranges])
|
|
|
|
|
|
base_data = {
|
|
|
"category": (model.get("offerDetail", {}).get("leafCategoryName", "") if isinstance(model, dict) else "") or self.driver.find_element(By.CSS_SELECTOR, "div[class*=breadcrumb] a:last-child").text.strip(),
|
|
|
"brand": get_attr("品牌"),
|
|
|
- "name": (model.get("offerDetail", {}).get("subject", "") if isinstance(model, dict) else ""),
|
|
|
+ "name": (model.get("offerDetail", {}).get("subject", "") if isinstance(model, dict) else "") or self.driver.title.split('-')[0],
|
|
|
"spec": get_attr("尺码") or get_attr("规格") or get_attr("型号"),
|
|
|
"material": get_attr("材质") or get_attr("面料"),
|
|
|
"moq": trade.get("beginAmount", ""),
|
|
|
@@ -235,14 +192,20 @@ class Scraper1688:
|
|
|
|
|
|
variant_results = []
|
|
|
try:
|
|
|
- wrappers = self.driver.find_elements(By.CLASS_NAME, "expand-view-list-wrapper")
|
|
|
+ # 方案 A: 优先使用用户发现的 expand-view-list (或 expand-view-list-wrapper)
|
|
|
+ wrappers = self.driver.find_elements(By.CSS_SELECTOR, ".expand-view-list, .expand-view-list-wrapper")
|
|
|
if wrappers:
|
|
|
+ # 获取该容器下的所有变体条目
|
|
|
items = wrappers[0].find_elements(By.CSS_SELECTOR, ".expand-view-list-item, [class*='list-item'], .sku-item")
|
|
|
for item_el in items:
|
|
|
try:
|
|
|
+ # 提取款式描述文字 (item-label) -> 颜色列
|
|
|
label = item_el.find_element(By.CLASS_NAME, "item-label").text.strip()
|
|
|
+ # 提取逐条价格 (item-price-stock) -> 价格列
|
|
|
price_raw = item_el.find_element(By.CLASS_NAME, "item-price-stock").text.strip()
|
|
|
+ # 清洗价格,只保留数字和小数点
|
|
|
price_clean = re.sub(r'[^\d.]', '', price_raw)
|
|
|
+
|
|
|
if label:
|
|
|
row = base_data.copy()
|
|
|
row["color"] = label
|
|
|
@@ -251,9 +214,8 @@ class Scraper1688:
|
|
|
except: continue
|
|
|
except: pass
|
|
|
|
|
|
- if variant_results:
|
|
|
- return variant_results
|
|
|
-
|
|
|
+ if variant_results: return variant_results
|
|
|
+
|
|
|
sku_props = model.get("skuModel", {}).get("skuProps", []) or model.get("detailData", {}).get("skuProps", []) or []
|
|
|
main_prop = next((p for p in sku_props if any(k in p.get("prop", "") for k in ["颜色", "分类", "款式", "花色", "净含量"])), None)
|
|
|
if not main_prop and sku_props: main_prop = sku_props[0]
|
|
|
@@ -266,46 +228,32 @@ class Scraper1688:
|
|
|
row["price"] = trade.get("minPrice", "")
|
|
|
results.append(row)
|
|
|
return results
|
|
|
-
|
|
|
- base_data["price"] = trade.get("minPrice", "")
|
|
|
return [base_data]
|
|
|
except: return None
|
|
|
|
|
|
def _extract_all_methods(self):
|
|
|
- """ 强化版:对标 req.py 深度探测 JS 变量提取链接 """
|
|
|
results = []
|
|
|
- scripts = [
|
|
|
- "return JSON.stringify(window.data || window.context?.result?.data || window.__INITIAL_DATA__)",
|
|
|
- "return JSON.stringify(window.context?.result?.global?.globalData?.data || null)",
|
|
|
- "return JSON.stringify(window.pageData || null)"
|
|
|
- ]
|
|
|
- for s in scripts:
|
|
|
- try:
|
|
|
- res = self.driver.execute_script(s)
|
|
|
- if res and res != "null":
|
|
|
- data = json.loads(res)
|
|
|
- def find_lists(obj):
|
|
|
- lists = []
|
|
|
- if isinstance(obj, list) and len(obj) > 0 and isinstance(obj[0], dict) and any(k in obj[0] for k in ['offerId', 'title', 'subject']):
|
|
|
- lists.append(obj)
|
|
|
- if isinstance(obj, dict):
|
|
|
- for k in obj: lists.extend(find_lists(obj[k]))
|
|
|
- return lists
|
|
|
- for product_list in find_lists(data):
|
|
|
- for o in product_list:
|
|
|
- link = o.get('itemUrl', o.get('url', '')) or str(o.get('offerId', ''))
|
|
|
- if link: results.append({"name": str(o.get('title', o.get('subject', ''))), "link": link})
|
|
|
- if results: return results
|
|
|
- except: continue
|
|
|
-
|
|
|
- selectors = [".sm-offer-item", ".offer-card-item", ".search-offer-item", "[class*='offer-card']", ".offer-item"]
|
|
|
- for s in selectors:
|
|
|
- elements = self.driver.find_elements(By.CSS_SELECTOR, s)
|
|
|
- if len(elements) > 2:
|
|
|
- for el in elements:
|
|
|
+ try:
|
|
|
+ res = self.driver.execute_script("return JSON.stringify(window.data || window.context?.result?.data || window.__INITIAL_DATA__)")
|
|
|
+ if res and res != "null":
|
|
|
+ data = json.loads(res)
|
|
|
+ def find_list(obj):
|
|
|
+ if isinstance(obj, list) and len(obj) > 0 and isinstance(obj[0], dict) and any(k in obj[0] for k in ['offerId', 'title']): return obj
|
|
|
+ if isinstance(obj, dict):
|
|
|
+ for k in obj:
|
|
|
+ f = find_list(obj[k])
|
|
|
+ if f: return f
|
|
|
+ return None
|
|
|
+ for o in (find_list(data) or []):
|
|
|
+ link = o.get('itemUrl', o.get('url', '')) or str(o.get('offerId', ''))
|
|
|
+ if link: results.append({"name": str(o.get('title', '')), "link": link})
|
|
|
+ except: pass
|
|
|
+ if not results:
|
|
|
+ for s in [".sm-offer-item", ".offer-card-item", "[class*='offer-card']", ".offer-item"]:
|
|
|
+ for el in self.driver.find_elements(By.CSS_SELECTOR, s):
|
|
|
try:
|
|
|
link = el.find_element(By.TAG_NAME, "a").get_attribute("href")
|
|
|
- if link and "1688.com" in link: results.append({"name": el.text.split('\n')[0][:50], "link": link})
|
|
|
+ if link: results.append({"name": el.text.split('\n')[0][:50], "link": link})
|
|
|
except: continue
|
|
|
if results: break
|
|
|
return results
|