| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296 |
- # 针对 Python 3.12+ 移除 distutils 的兼容性补丁
- import sys
- try:
- import distutils
- except ImportError:
- from types import ModuleType
- d, v = ModuleType("distutils"), ModuleType("distutils.version")
- d.version = v
- sys.modules.update({"distutils": d, "distutils.version": v})
- class LooseVersion:
- def __init__(self, v): self.v = v
- def __lt__(self, o): return True
- def __str__(self): return str(self.v)
- v.LooseVersion = LooseVersion
- import time, random, re, os, subprocess, urllib.parse, json, traceback, socket
- from selenium import webdriver
- from selenium.webdriver.edge.options import Options as EdgeOptions
- from selenium.webdriver.edge.service import Service as EdgeService
- from selenium.webdriver.chrome.options import Options as ChromeOptions
- import undetected_chromedriver as uc
- from selenium.webdriver.common.by import By
- from selenium.webdriver.common.action_chains import ActionChains
- from selenium_stealth import stealth
- class Scraper1688:
- def __init__(self, headless=True, status_callback=None, log_callback=None):
- self.headless = headless
- self.status_callback = status_callback
- self.log_callback = log_callback # 新增:用于向 GUI 发送普通日志
- self.user_data_path = os.path.abspath(os.path.join(os.getcwd(), "1688_user_data"))
- self.driver = None
-
- edge_path = self._find_edge()
- if edge_path:
- print(f"[*] 【极致稳定模式】正在启动 Edge 深度伪装环境...")
- self._cleanup_processes()
- # 使用固定且持久的 Session 目录,确保长效免登录
- edge_user_data = os.path.join(os.getcwd(), "1688_edge_ultimate_session")
- cmd = [
- edge_path,
- "--remote-debugging-port=9222",
- f"--user-data-dir={edge_user_data}",
- "--no-first-run",
- "--no-default-browser-check",
- "--disable-blink-features=AutomationControlled"
- ]
- if headless: cmd.append("--headless")
- try:
- subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
- time.sleep(6)
- opts = EdgeOptions()
- opts.add_experimental_option("debuggerAddress", "127.0.0.1:9222")
- self.driver = webdriver.Edge(options=opts)
- print("[+] Edge 极致稳定环境接管成功!")
- except Exception as e:
- print(f"[!] Edge 启动失败: {e}")
-
- if not self.driver:
- self._init_chrome(headless)
- if self.driver:
- # 深度擦除自动化指纹
- try:
- self.driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", {
- "source": """
- Object.defineProperty(navigator, 'webdriver', { get: () => undefined });
- Object.defineProperty(navigator, 'languages', { get: () => ['zh-CN', 'zh'] });
- Object.defineProperty(navigator, 'plugins', { get: () => [1, 2, 3, 4, 5] });
- """
- })
- except: pass
- def _find_edge(self):
- import winreg
- reg_paths = [(winreg.HKEY_LOCAL_MACHINE, r"SOFTWARE\Microsoft\Windows\CurrentVersion\App Paths\msedge.exe"), (winreg.HKEY_CURRENT_USER, r"SOFTWARE\Microsoft\Windows\CurrentVersion\App Paths\msedge.exe")]
- for hkey, subkey in reg_paths:
- try:
- with winreg.OpenKey(hkey, subkey) as key:
- path, _ = winreg.QueryValueEx(key, "")
- if os.path.exists(path): return path
- except: continue
- return None
- def _cleanup_processes(self):
- if os.name == 'nt':
- for proc in ['msedge.exe', 'msedgedriver.exe', 'chromedriver.exe']:
- subprocess.call(['taskkill', '/F', '/IM', proc, '/T'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
- def _init_chrome(self, headless):
- def create_options():
- opts = uc.ChromeOptions()
- opts.add_argument(f"--user-data-dir={self.user_data_path}")
- return opts
- self.driver = uc.Chrome(options=create_options(), headless=headless)
- def check_for_captcha(self):
- def is_blocked():
- try:
- url, src = self.driver.current_url.lower(), self.driver.page_source.lower()
- sliders = self.driver.find_elements(By.ID, "nc_1_n1z")
- return (len(sliders) > 0 and sliders[0].is_displayed()) or "login.1688.com" in url or "punish" in url or "哎哟喂" in src
- except: return False
-
- if is_blocked():
- msg = "请登录验证"
- if self.status_callback: self.status_callback(True, msg)
- while is_blocked(): time.sleep(3)
- if self.status_callback: self.status_callback(False, "验证通过")
-
- cool_msg = "[*] 监测到干预完成,进入 120 秒深度冷却期以重置风控权重..."
- print(cool_msg)
- if self.log_callback: self.log_callback(f"<font color='orange'>{cool_msg}</font>")
- time.sleep(120)
- return True
- def _human_behavior(self, duration=10):
- """ 高级拟人化行为模拟 """
- start_time = time.time()
- while time.time() - start_time < duration:
- try:
- # 1. 随机滚动
- scroll_y = random.randint(200, 600)
- self.driver.execute_script(f"window.scrollBy(0, {scroll_y});")
- # 2. 随机鼠标晃动
- actions = ActionChains(self.driver)
- actions.move_by_offset(random.randint(-5, 5), random.randint(-5, 5)).perform()
- time.sleep(random.uniform(1.5, 4.0))
- # 3. 概率性往回滚
- if random.random() > 0.7:
- self.driver.execute_script(f"window.scrollBy(0, -{random.randint(100, 300)});")
- except: break
- def search_products_yield(self, keyword, total_count=200, existing_links=None):
- gbk_keyword = urllib.parse.quote(keyword, encoding='gbk')
- base_url = f"https://s.1688.com/selloffer/offer_search.htm?keywords={gbk_keyword}&n=y&netType=1%2C11%2C16"
-
- self.driver.get("https://www.1688.com")
- time.sleep(random.randint(3, 6))
- self.check_for_captcha()
- all_links = existing_links if existing_links is not None else set()
- page, initial_count = 1, len(all_links)
- # 随机设定下一次深度冷却的阈值 (5-12条之间)
- next_cool_threshold = random.randint(5, 12)
-
- while len(all_links) < total_count + initial_count:
- print(f"[*] 正在模拟搜索: 第 {page} 页...")
- self.driver.get(f"{base_url}&beginPage={page}&page={page}")
- self.check_for_captcha()
-
- # 列表页模拟“翻找”行为
- for _ in range(random.randint(5, 8)):
- self.driver.execute_script(f"window.scrollBy(0, {random.randint(400, 800)});")
- time.sleep(random.uniform(1.5, 3.5))
- if random.random() > 0.8:
- self.driver.execute_script("window.scrollBy(0, -300);")
- page_results = self._extract_all_methods()
- page_batch = []
- for it in page_results:
- clean_url = self.clean_url(it["link"])
- if clean_url and clean_url not in all_links:
- all_links.add(clean_url)
-
- # --- 核心订正:随机深度冷却 ---
- new_processed = len(all_links) - initial_count
- if new_processed >= next_cool_threshold:
- rest = random.randint(120, 300)
- cool_msg = f"[*] 随机触发深度保护 (已处理{new_processed}条),睡眠 {rest} 秒模拟休息..."
- print(cool_msg)
- if self.log_callback: self.log_callback(f"<font color='orange'><b>{cool_msg}</b></font>")
- time.sleep(rest)
- next_cool_threshold += random.randint(5, 12) # 设定下一个随机检查点
- print(f" [>] 详情仿真采集: {clean_url}")
-
- # 访问前大幅随机停顿
- time.sleep(random.uniform(5, 12))
-
- detail_results = self.scrape_detail(clean_url)
- if detail_results: page_batch.extend(detail_results)
- else: page_batch.append({"link": clean_url, "name": it["name"]})
-
- if len(page_batch) >= 10:
- yield page_batch
- page_batch = []
-
- # 详情页之间的大跨度等待
- time.sleep(random.uniform(30, 60))
-
- if len(all_links) >= total_count + initial_count: break
-
- if page_batch: yield page_batch
- page += 1
- # 每翻 3 页随机回一次 1688 首页,消除路径单一性
- if page % 3 == 0:
- self.driver.get("https://www.1688.com")
- time.sleep(random.randint(10, 20))
- return list(all_links)
- def scrape_detail(self, url):
- try:
- self.driver.get(url)
- # --- 核心改进:详情页留存仿真 ---
- self._human_behavior(duration=random.randint(12, 25))
- self.check_for_captcha()
-
- model = self.driver.execute_script(
- "return (window.context && window.context.result && window.context.result.global && window.context.result.global.globalData && window.context.result.global.globalData.model) || window.__INITIAL_DATA__ || window.iDetailData || window.iDetailConfig || null;"
- )
- if not model: return None
- def get_attr(name):
- try:
- attrs = model.get("offerDetail", {}).get("featureAttributes", [])
- for item in attrs:
- if name in item.get("name", ""): return item.get("value", "")
- attrs = model.get("detailData", {}).get("attributes", [])
- for item in attrs:
- if name in item.get("attributeName", ""): return item.get("value", "")
- except: pass
- return ""
- trade = model.get("tradeModel", {}) if isinstance(model, dict) else {}
- price_min = trade.get("minPrice", "") or ""
- if not price_min:
- try: price_min = model["sku"]["priceRange"][0][1]
- except: pass
- ranges = trade.get("disPriceRanges") or trade.get("currentPrices") or []
- range_text = " / ".join([f"{r.get('beginAmount')}起 ¥{r.get('price') or r.get('discountPrice')}" for r in ranges])
- base_data = {
- "category": (model.get("offerDetail", {}).get("leafCategoryName", "") if isinstance(model, dict) else "") or self.driver.find_element(By.CSS_SELECTOR, "div[class*=breadcrumb] a:last-child").text.strip(),
- "brand": get_attr("品牌"),
- "name": (model.get("offerDetail", {}).get("subject", "") if isinstance(model, dict) else "") or self.driver.title.split('-')[0],
- "spec": get_attr("尺码") or get_attr("规格") or get_attr("型号"),
- "material": get_attr("材质") or get_attr("面料"),
- "price": price_min,
- "moq": trade.get("beginAmount", ""),
- "wholesale_price": range_text,
- "link": url,
- "supplier": (model.get("sellerModel", {}).get("companyName", "") if isinstance(model, dict) else ""),
- }
- sku_props = model.get("skuModel", {}).get("skuProps", []) or model.get("detailData", {}).get("skuProps", []) or []
- main_prop = next((p for p in sku_props if any(k in p.get("prop", "") for k in ["颜色", "分类", "款式", "花色"])), None)
- if not main_prop and sku_props: main_prop = sku_props[0]
-
- if main_prop and main_prop.get("value"):
- results = []
- for val in main_prop["value"]:
- if val.get("name"):
- row = base_data.copy()
- row["color"] = val.get("name")
- results.append(row)
- return results
- return [base_data]
- except: return None
- def clean_url(self, url):
- if not url: return ""
- id_match = re.search(r'offer/(\d+)\.html', url)
- if id_match: return f"https://detail.1688.com/offer/{id_match.group(1)}.html"
- return url
- def _extract_all_methods(self):
- results = []
- try:
- res = self.driver.execute_script("return JSON.stringify(window.data || window.__INITIAL_DATA__)")
- if res:
- data = json.loads(res)
- def find_list(obj):
- if isinstance(obj, list) and len(obj) > 0 and ('title' in obj[0] or 'offerId' in obj[0]): return obj
- if isinstance(obj, dict):
- for k in obj:
- f = find_list(obj[k])
- if f: return f
- return None
- for o in (find_list(data) or []):
- link = o.get('itemUrl', o.get('url', ''))
- if link: results.append({"name": str(o.get('title', '')), "link": link})
- except: pass
- if not results:
- for s in [".search-offer-item", "[class*='offer-card']", ".offer-item"]:
- for el in self.driver.find_elements(By.CSS_SELECTOR, s):
- try:
- link = el.find_element(By.TAG_NAME, "a").get_attribute("href")
- if link: results.append({"name": el.text.split('\n')[0][:50], "link": link})
- except: continue
- if results: break
- return results
- def quit(self):
- try: self.driver.quit()
- except: pass
|