# 针对 Python 3.12+ 移除 distutils 的兼容性补丁
import sys
try:
import distutils
except ImportError:
from types import ModuleType
d, v = ModuleType("distutils"), ModuleType("distutils.version")
d.version = v
sys.modules.update({"distutils": d, "distutils.version": v})
class LooseVersion:
def __init__(self, v): self.v = v
def __lt__(self, o): return True
def __str__(self): return str(self.v)
v.LooseVersion = LooseVersion
import time, random, re, os, subprocess, urllib.parse, json, traceback, socket
from selenium import webdriver
from selenium.webdriver.edge.options import Options as EdgeOptions
from selenium.webdriver.edge.service import Service as EdgeService
from selenium.webdriver.chrome.options import Options as ChromeOptions
import undetected_chromedriver as uc
from selenium.webdriver.common.by import By
from selenium.webdriver.common.action_chains import ActionChains
from selenium_stealth import stealth
class Scraper1688:
def __init__(self, headless=True, status_callback=None, log_callback=None):
self.headless = headless
self.status_callback = status_callback
self.log_callback = log_callback # 新增:用于向 GUI 发送普通日志
self.user_data_path = os.path.abspath(os.path.join(os.getcwd(), "1688_user_data"))
self.driver = None
edge_path = self._find_edge()
if edge_path:
print(f"[*] 【极致稳定模式】正在启动 Edge 深度伪装环境...")
self._cleanup_processes()
# 使用固定且持久的 Session 目录,确保长效免登录
edge_user_data = os.path.join(os.getcwd(), "1688_edge_ultimate_session")
cmd = [
edge_path,
"--remote-debugging-port=9222",
f"--user-data-dir={edge_user_data}",
"--no-first-run",
"--no-default-browser-check",
"--disable-blink-features=AutomationControlled"
]
if headless: cmd.append("--headless")
try:
subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
time.sleep(6)
opts = EdgeOptions()
opts.add_experimental_option("debuggerAddress", "127.0.0.1:9222")
self.driver = webdriver.Edge(options=opts)
print("[+] Edge 极致稳定环境接管成功!")
except Exception as e:
print(f"[!] Edge 启动失败: {e}")
if not self.driver:
self._init_chrome(headless)
if self.driver:
# 深度擦除自动化指纹
try:
self.driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", {
"source": """
Object.defineProperty(navigator, 'webdriver', { get: () => undefined });
Object.defineProperty(navigator, 'languages', { get: () => ['zh-CN', 'zh'] });
Object.defineProperty(navigator, 'plugins', { get: () => [1, 2, 3, 4, 5] });
"""
})
except: pass
def _find_edge(self):
import winreg
reg_paths = [(winreg.HKEY_LOCAL_MACHINE, r"SOFTWARE\Microsoft\Windows\CurrentVersion\App Paths\msedge.exe"), (winreg.HKEY_CURRENT_USER, r"SOFTWARE\Microsoft\Windows\CurrentVersion\App Paths\msedge.exe")]
for hkey, subkey in reg_paths:
try:
with winreg.OpenKey(hkey, subkey) as key:
path, _ = winreg.QueryValueEx(key, "")
if os.path.exists(path): return path
except: continue
return None
def _cleanup_processes(self):
if os.name == 'nt':
for proc in ['msedge.exe', 'msedgedriver.exe', 'chromedriver.exe']:
subprocess.call(['taskkill', '/F', '/IM', proc, '/T'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
def _init_chrome(self, headless):
def create_options():
opts = uc.ChromeOptions()
opts.add_argument(f"--user-data-dir={self.user_data_path}")
return opts
self.driver = uc.Chrome(options=create_options(), headless=headless)
def check_for_captcha(self):
def is_blocked():
try:
url, src = self.driver.current_url.lower(), self.driver.page_source.lower()
sliders = self.driver.find_elements(By.ID, "nc_1_n1z")
return (len(sliders) > 0 and sliders[0].is_displayed()) or "login.1688.com" in url or "punish" in url or "哎哟喂" in src
except: return False
if is_blocked():
msg = "请登录验证"
if self.status_callback: self.status_callback(True, msg)
while is_blocked(): time.sleep(3)
if self.status_callback: self.status_callback(False, "验证通过")
cool_msg = "[*] 监测到干预完成,进入 120 秒深度冷却期以重置风控权重..."
print(cool_msg)
if self.log_callback: self.log_callback(f"{cool_msg}")
time.sleep(120)
return True
def _human_behavior(self, duration=10):
""" 高级拟人化行为模拟 """
start_time = time.time()
while time.time() - start_time < duration:
try:
# 1. 随机滚动
scroll_y = random.randint(200, 600)
self.driver.execute_script(f"window.scrollBy(0, {scroll_y});")
# 2. 随机鼠标晃动
actions = ActionChains(self.driver)
actions.move_by_offset(random.randint(-5, 5), random.randint(-5, 5)).perform()
time.sleep(random.uniform(1.5, 4.0))
# 3. 概率性往回滚
if random.random() > 0.7:
self.driver.execute_script(f"window.scrollBy(0, -{random.randint(100, 300)});")
except: break
def search_products_yield(self, keyword, total_count=200, existing_links=None):
gbk_keyword = urllib.parse.quote(keyword, encoding='gbk')
base_url = f"https://s.1688.com/selloffer/offer_search.htm?keywords={gbk_keyword}&n=y&netType=1%2C11%2C16"
self.driver.get("https://www.1688.com")
time.sleep(random.randint(3, 6))
self.check_for_captcha()
all_links = existing_links if existing_links is not None else set()
page, initial_count = 1, len(all_links)
# 随机设定下一次深度冷却的阈值 (5-12条之间)
next_cool_threshold = random.randint(5, 12)
while len(all_links) < total_count + initial_count:
print(f"[*] 正在模拟搜索: 第 {page} 页...")
self.driver.get(f"{base_url}&beginPage={page}&page={page}")
self.check_for_captcha()
# 列表页模拟“翻找”行为
for _ in range(random.randint(5, 8)):
self.driver.execute_script(f"window.scrollBy(0, {random.randint(400, 800)});")
time.sleep(random.uniform(1.5, 3.5))
if random.random() > 0.8:
self.driver.execute_script("window.scrollBy(0, -300);")
page_results = self._extract_all_methods()
page_batch = []
for it in page_results:
clean_url = self.clean_url(it["link"])
if clean_url and clean_url not in all_links:
all_links.add(clean_url)
# --- 核心订正:随机深度冷却 ---
new_processed = len(all_links) - initial_count
if new_processed >= next_cool_threshold:
rest = random.randint(120, 300)
cool_msg = f"[*] 随机触发深度保护 (已处理{new_processed}条),睡眠 {rest} 秒模拟休息..."
print(cool_msg)
if self.log_callback: self.log_callback(f"{cool_msg}")
time.sleep(rest)
next_cool_threshold += random.randint(5, 12) # 设定下一个随机检查点
print(f" [>] 详情仿真采集: {clean_url}")
# 访问前大幅随机停顿
time.sleep(random.uniform(5, 12))
detail_results = self.scrape_detail(clean_url)
if detail_results: page_batch.extend(detail_results)
else: page_batch.append({"link": clean_url, "name": it["name"]})
if len(page_batch) >= 10:
yield page_batch
page_batch = []
# 详情页之间的大跨度等待
time.sleep(random.uniform(30, 60))
if len(all_links) >= total_count + initial_count: break
if page_batch: yield page_batch
page += 1
# 每翻 3 页随机回一次 1688 首页,消除路径单一性
if page % 3 == 0:
self.driver.get("https://www.1688.com")
time.sleep(random.randint(10, 20))
return list(all_links)
def scrape_detail(self, url):
try:
self.driver.get(url)
# --- 核心改进:详情页留存仿真 ---
self._human_behavior(duration=random.randint(12, 25))
self.check_for_captcha()
model = self.driver.execute_script(
"return (window.context && window.context.result && window.context.result.global && window.context.result.global.globalData && window.context.result.global.globalData.model) || window.__INITIAL_DATA__ || window.iDetailData || window.iDetailConfig || null;"
)
if not model: return None
def get_attr(name):
try:
attrs = model.get("offerDetail", {}).get("featureAttributes", [])
for item in attrs:
if name in item.get("name", ""): return item.get("value", "")
attrs = model.get("detailData", {}).get("attributes", [])
for item in attrs:
if name in item.get("attributeName", ""): return item.get("value", "")
except: pass
return ""
trade = model.get("tradeModel", {}) if isinstance(model, dict) else {}
price_min = trade.get("minPrice", "") or ""
if not price_min:
try: price_min = model["sku"]["priceRange"][0][1]
except: pass
ranges = trade.get("disPriceRanges") or trade.get("currentPrices") or []
range_text = " / ".join([f"{r.get('beginAmount')}起 ¥{r.get('price') or r.get('discountPrice')}" for r in ranges])
base_data = {
"category": (model.get("offerDetail", {}).get("leafCategoryName", "") if isinstance(model, dict) else "") or self.driver.find_element(By.CSS_SELECTOR, "div[class*=breadcrumb] a:last-child").text.strip(),
"brand": get_attr("品牌"),
"name": (model.get("offerDetail", {}).get("subject", "") if isinstance(model, dict) else "") or self.driver.title.split('-')[0],
"spec": get_attr("尺码") or get_attr("规格") or get_attr("型号"),
"material": get_attr("材质") or get_attr("面料"),
"price": price_min,
"moq": trade.get("beginAmount", ""),
"wholesale_price": range_text,
"link": url,
"supplier": (model.get("sellerModel", {}).get("companyName", "") if isinstance(model, dict) else ""),
}
sku_props = model.get("skuModel", {}).get("skuProps", []) or model.get("detailData", {}).get("skuProps", []) or []
main_prop = next((p for p in sku_props if any(k in p.get("prop", "") for k in ["颜色", "分类", "款式", "花色"])), None)
if not main_prop and sku_props: main_prop = sku_props[0]
if main_prop and main_prop.get("value"):
results = []
for val in main_prop["value"]:
if val.get("name"):
row = base_data.copy()
row["color"] = val.get("name")
results.append(row)
return results
return [base_data]
except: return None
def clean_url(self, url):
if not url: return ""
id_match = re.search(r'offer/(\d+)\.html', url)
if id_match: return f"https://detail.1688.com/offer/{id_match.group(1)}.html"
return url
def _extract_all_methods(self):
results = []
try:
res = self.driver.execute_script("return JSON.stringify(window.data || window.__INITIAL_DATA__)")
if res:
data = json.loads(res)
def find_list(obj):
if isinstance(obj, list) and len(obj) > 0 and ('title' in obj[0] or 'offerId' in obj[0]): return obj
if isinstance(obj, dict):
for k in obj:
f = find_list(obj[k])
if f: return f
return None
for o in (find_list(data) or []):
link = o.get('itemUrl', o.get('url', ''))
if link: results.append({"name": str(o.get('title', '')), "link": link})
except: pass
if not results:
for s in [".search-offer-item", "[class*='offer-card']", ".offer-item"]:
for el in self.driver.find_elements(By.CSS_SELECTOR, s):
try:
link = el.find_element(By.TAG_NAME, "a").get_attribute("href")
if link: results.append({"name": el.text.split('\n')[0][:50], "link": link})
except: continue
if results: break
return results
def quit(self):
try: self.driver.quit()
except: pass