scraper.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271
  1. # 【版本:20260115-终极订正版】
  2. # 针对 Python 3.12+ 移除 distutils 的兼容性补丁
  3. import sys
  4. try:
  5. import distutils
  6. except ImportError:
  7. from types import ModuleType
  8. d, v = ModuleType("distutils"), ModuleType("distutils.version")
  9. d.version = v
  10. sys.modules.update({"distutils": d, "distutils.version": v})
  11. class LooseVersion:
  12. def __init__(self, v): self.v = v
  13. def __lt__(self, o): return True
  14. def __str__(self): return str(self.v)
  15. v.LooseVersion = LooseVersion
  16. import time, random, re, os, subprocess, urllib.parse, json, traceback, socket
  17. from selenium import webdriver
  18. import undetected_chromedriver as uc
  19. from selenium.webdriver.common.by import By
  20. from selenium.webdriver.common.action_chains import ActionChains
  21. from selenium_stealth import stealth
  22. class Scraper1688:
  23. def __init__(self, headless=True, status_callback=None, log_callback=None):
  24. self.headless = headless
  25. self.status_callback = status_callback
  26. self.log_callback = log_callback
  27. self.user_data_path = os.path.abspath(os.path.join(os.getcwd(), "chrome_stable_profile"))
  28. self.driver = None
  29. self._cleanup()
  30. self._init_chrome(headless)
  31. if self.driver:
  32. stealth(self.driver, languages=["zh-CN", "zh"], vendor="Google Inc.", platform="Win32", fix_hairline=True)
  33. def _find_chrome(self):
  34. """ 强力锁定 Chrome 安装路径 """
  35. import winreg
  36. reg_paths = [
  37. (winreg.HKEY_LOCAL_MACHINE, r"SOFTWARE\Microsoft\Windows\CurrentVersion\App Paths\chrome.exe"),
  38. (winreg.HKEY_CURRENT_USER, r"SOFTWARE\Microsoft\Windows\CurrentVersion\App Paths\chrome.exe")
  39. ]
  40. for hkey, subkey in reg_paths:
  41. try:
  42. with winreg.OpenKey(hkey, subkey) as key:
  43. path, _ = winreg.QueryValueEx(key, "")
  44. if os.path.exists(path): return path
  45. except: continue
  46. return None
  47. def _cleanup(self):
  48. if os.name == 'nt':
  49. for proc in ['chrome.exe', 'chromedriver.exe']:
  50. try: subprocess.call(['taskkill', '/F', '/IM', proc, '/T'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
  51. except: pass
  52. if os.path.exists(self.user_data_path):
  53. for root, _, files in os.walk(self.user_data_path):
  54. for f in files:
  55. if "lock" in f.lower() or f == "SingletonLock":
  56. try: os.remove(os.path.join(root, f))
  57. except: pass
  58. def _init_chrome(self, headless):
  59. chrome_path = self._find_chrome()
  60. def create_options():
  61. opts = uc.ChromeOptions()
  62. opts.add_argument(f'user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36')
  63. opts.add_argument(f"--user-data-dir={self.user_data_path}")
  64. if headless: opts.add_argument('--headless=new')
  65. opts.add_argument('--disable-blink-features=AutomationControlled')
  66. opts.add_argument("--window-size=1920,1080")
  67. opts.add_argument("--no-sandbox")
  68. opts.add_argument("--disable-dev-shm-usage")
  69. opts.add_argument("--remote-allow-origins=*")
  70. return opts
  71. try:
  72. self.driver = uc.Chrome(options=create_options(), headless=headless, browser_executable_path=chrome_path, use_subprocess=True)
  73. except Exception as e:
  74. self.driver = uc.Chrome(options=create_options(), headless=headless, use_subprocess=True)
  75. def clean_url(self, url):
  76. """ 【关键订正】极其简化的 ID 提取逻辑,只要是商品就必须进入详情页 """
  77. if not url: return ""
  78. # 强制转换为字符串并处理
  79. url_str = str(url)
  80. if url_str.startswith("//"): url_str = "https:" + url_str
  81. # 只要能匹配到连续的 9-15 位数字(1688 商品 ID 特征),就重组
  82. id_match = re.search(r'(\d{9,15})', url_str)
  83. if id_match:
  84. standard_url = f"https://detail.1688.com/offer/{id_match.group(1)}.html"
  85. return standard_url
  86. return ""
  87. def check_for_captcha(self):
  88. def is_blocked():
  89. try:
  90. url, src = self.driver.current_url.lower(), self.driver.page_source.lower()
  91. sliders = self.driver.find_elements(By.ID, "nc_1_n1z")
  92. return (len(sliders) > 0 and sliders[0].is_displayed()) or "login.1688.com" in url or "punish" in url or "哎哟喂" in src
  93. except: return False
  94. if is_blocked():
  95. msg = "请登录验证"
  96. if self.status_callback: self.status_callback(True, msg)
  97. while is_blocked(): time.sleep(2)
  98. if self.status_callback: self.status_callback(False, "验证通过")
  99. time.sleep(3)
  100. return True
  101. def search_products_yield(self, keyword, total_count=200, existing_links=None):
  102. gbk_keyword = urllib.parse.quote(keyword, encoding='gbk')
  103. base_url = f"https://s.1688.com/selloffer/offer_search.htm?keywords={gbk_keyword}&n=y&netType=1%2C11%2C16"
  104. self.driver.get("https://www.1688.com")
  105. self.check_for_captcha()
  106. all_links = existing_links if existing_links is not None else set()
  107. page, initial_count = 1, len(all_links)
  108. while len(all_links) < total_count + initial_count:
  109. print(f"[*] 正在处理列表页: 第 {page} 页...")
  110. self.driver.get(f"{base_url}&beginPage={page}&page={page}")
  111. self.check_for_captcha()
  112. for i in range(1, 5):
  113. self.driver.execute_script(f"window.scrollTo(0, document.body.scrollHeight * {i/4});")
  114. time.sleep(1.2)
  115. # 获取本页链接 (完全对标 req.py 变量探测)
  116. page_results = self._extract_all_methods()
  117. print(f" [+] 本页发现 {len(page_results)} 个原始条目")
  118. page_batch = []
  119. for it in page_results:
  120. raw_link = it.get("link")
  121. clean_url = self.clean_url(raw_link)
  122. if not clean_url:
  123. continue
  124. if clean_url in all_links:
  125. print(f" [-] 跳过已存在商品: {clean_url}")
  126. continue
  127. all_links.add(clean_url)
  128. # 【强制日志】只要进入这里,就一定会打印并执行详情抓取
  129. print(f" [>] 正在执行详情抓取流程: {clean_url}")
  130. detail_results = self.scrape_detail(clean_url)
  131. if detail_results:
  132. page_batch.extend(detail_results)
  133. else:
  134. # 即使详情失败也记录基本信息,防止死循环
  135. page_batch.append({
  136. "category": "", "brand": "", "name": it.get("name", "未知"),
  137. "color": "", "spec": "", "material": "", "price": "",
  138. "moq": "", "wholesale_price": "", "link": clean_url, "supplier": ""
  139. })
  140. if len(page_batch) >= 10:
  141. yield page_batch
  142. page_batch = []
  143. time.sleep(random.uniform(15, 25))
  144. if len(all_links) >= total_count + initial_count: break
  145. if page_batch: yield page_batch
  146. page += 1
  147. if page % 3 == 0:
  148. self.driver.get("https://www.1688.com")
  149. time.sleep(random.randint(10, 20))
  150. return list(all_links)
  151. def scrape_detail(self, url):
  152. """ 精准解析:完全同步自 req.py 的模型获取逻辑 """
  153. try:
  154. self.driver.get(url)
  155. time.sleep(random.uniform(5, 8))
  156. self.check_for_captcha()
  157. # 执行 JS 获取核心模型 (完全对标 req.py)
  158. model = self.driver.execute_script(
  159. "return (window.context && window.context.result && "
  160. "window.context.result.global && window.context.result.global.globalData "
  161. "&& window.context.result.global.globalData.model) || "
  162. "window.__INITIAL_DATA__ || window.iDetailData || window.iDetailConfig || null;"
  163. )
  164. if not model: return None
  165. def get_attr(name):
  166. try:
  167. attrs = model.get("offerDetail", {}).get("featureAttributes", [])
  168. for item in attrs:
  169. if name in item.get("name", ""): return item.get("value", "")
  170. attrs = model.get("detailData", {}).get("attributes", [])
  171. for item in attrs:
  172. if name in item.get("attributeName", ""): return item.get("value", "")
  173. except: pass
  174. return ""
  175. trade = model.get("tradeModel", {}) if isinstance(model, dict) else {}
  176. range_text = " / ".join([f"{r.get('beginAmount')}起 ¥{r.get('price') or r.get('discountPrice')}" for r in (trade.get("disPriceRanges") or trade.get("currentPrices") or [])])
  177. base_data = {
  178. "category": (model.get("offerDetail", {}).get("leafCategoryName", "") if isinstance(model, dict) else "") or self.driver.find_element(By.CSS_SELECTOR, "div[class*=breadcrumb] a:last-child").text.strip(),
  179. "brand": get_attr("品牌"),
  180. "name": (model.get("offerDetail", {}).get("subject", "") if isinstance(model, dict) else "") or self.driver.title.split('-')[0],
  181. "spec": get_attr("尺码") or get_attr("规格") or get_attr("型号"),
  182. "material": get_attr("材质") or get_attr("面料"),
  183. "price": "", "moq": trade.get("beginAmount", ""), "wholesale_price": range_text, "link": url,
  184. "supplier": (model.get("sellerModel", {}).get("companyName", "") if isinstance(model, dict) else ""),
  185. }
  186. variant_data_list = []
  187. try:
  188. # 方案 A: 优先使用 expand-view-list-wrapper 获取款式和价格
  189. wrappers = self.driver.find_elements(By.CLASS_NAME, "expand-view-list-wrapper")
  190. if wrappers:
  191. items = wrappers[0].find_elements(By.CSS_SELECTOR, ".expand-view-list-item, [class*='list-item'], .sku-item")
  192. for item_el in items:
  193. try:
  194. label = item_el.find_element(By.CLASS_NAME, "item-label").text.strip()
  195. price = item_el.find_element(By.CLASS_NAME, "item-price-stock").text.strip()
  196. if label: variant_data_list.append({"label": label, "price": re.sub(r'[^\d.]', '', price)})
  197. except: continue
  198. except: pass
  199. if variant_data_list:
  200. results = []
  201. for vd in variant_data_list:
  202. row = base_data.copy(); row["color"] = vd["label"]; row["price"] = vd["price"]; results.append(row)
  203. return results
  204. return [base_data]
  205. except: return None
  206. def _extract_all_methods(self):
  207. """ 强化版:全力探测 1688 列表页数据 (对标 req.py) """
  208. results = []
  209. # 1. 深度内存变量扫描
  210. scripts = [
  211. "return JSON.stringify(window.data || window.context?.result?.data || window.__INITIAL_DATA__)",
  212. "return JSON.stringify(window.context?.result?.global?.globalData?.data || null)"
  213. ]
  214. for s in scripts:
  215. try:
  216. res = self.driver.execute_script(s)
  217. if res and res != "null":
  218. data = json.loads(res)
  219. def find_list(obj):
  220. if isinstance(obj, list) and len(obj) > 0 and isinstance(obj[0], dict) and any(k in obj[0] for k in ['offerId', 'title', 'subject']): return obj
  221. if isinstance(obj, dict):
  222. for k in obj:
  223. f = find_list(obj[k])
  224. if f: return f
  225. return None
  226. for o in (find_list(data) or []):
  227. link = o.get('itemUrl', o.get('url', '')) or str(o.get('offerId', ''))
  228. if link: results.append({"name": str(o.get('title', o.get('subject', ''))), "link": link})
  229. if results: return results
  230. except: continue
  231. # 2. 暴力 DOM 选择器保底
  232. for s in [".sm-offer-item", ".offer-card-item", ".pc-search-offer-item", "[class*='offer-card']", ".offer-item"]:
  233. for el in self.driver.find_elements(By.CSS_SELECTOR, s):
  234. try:
  235. a = el.find_element(By.TAG_NAME, "a")
  236. link = a.get_attribute("href")
  237. if link and "1688.com" in link: results.append({"name": el.text.split('\n')[0][:50], "link": link})
  238. except: continue
  239. if results: break
  240. return results
  241. def quit(self):
  242. try: self.driver.quit()
  243. except: pass