sys_user.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471
  1. # -*- coding: utf-8 -*-
  2. # @Author : YY
  3. from typing import List, Optional
  4. from flask import g
  5. from sqlalchemy import and_, or_, func, insert, select, update
  6. from ruoyi_common.base.model import ExtraModel
  7. from ruoyi_common.domain.entity import SysDept, SysRole, SysUser
  8. from ruoyi_common.sqlalchemy.model import ColumnEntityList
  9. from ruoyi_common.sqlalchemy.transaction import Transactional
  10. from ruoyi_system.domain.po import SysDeptPo, SysRolePo, SysUserPo, \
  11. SysUserRolePo
  12. from ruoyi_admin.ext import db
  13. from ruoyi_common.utils import security_util as SecurityUtil
  14. class SysUserMapper:
  15. """
  16. 用户数据访问层
  17. """
  18. default_fields = {
  19. "user_id", "dept_id", "user_name", "nick_name", "email", "phonenumber",
  20. "avatar", "status", "password", "sex", "del_flag", "login_ip",
  21. "login_date", "create_by", "create_time", "update_by", "update_time",
  22. "remark"
  23. }
  24. default_columns = ColumnEntityList(SysUserPo, default_fields, False)
  25. @classmethod
  26. def select_user_list(cls, user: SysUser) -> List[SysUser]:
  27. """
  28. 根据条件,查询用户列表
  29. Args:
  30. user: 用户传输条件信息
  31. Returns:
  32. 用户信息列表
  33. """
  34. print(user)
  35. dept_vo_fields = {"dept_name", "leader"}
  36. user_columns = ColumnEntityList(SysUserPo, cls.default_fields)
  37. dept_columns = ColumnEntityList(SysDeptPo, dept_vo_fields)
  38. criterions = [SysUserPo.del_flag == "0"]
  39. if user.user_id is not None and user.user_id != 0:
  40. criterions.append(SysUserPo.user_id == user.user_id)
  41. if user.user_name is not None and user.user_name != '':
  42. criterions.append(SysUserPo.user_name.like(f"%{user.user_name}%"))
  43. if user.status is not None and user.status != 0:
  44. criterions.append(SysUserPo.status == user.status)
  45. if user.phonenumber is not None and user.phonenumber != '':
  46. criterions.append(SysUserPo.phonenumber.like(f"%{user.phonenumber}%"))
  47. if user.dept_id is not None and user.dept_id != 0:
  48. subquery = select(SysDeptPo.dept_id).where(or_(
  49. SysDeptPo.dept_id == user.dept_id,
  50. func.find_in_set(user.dept_id, SysDeptPo.ancestors)
  51. )).subquery()
  52. criterions.append(SysUserPo.dept_id.in_(subquery))
  53. if g.criterian_meta.extra:
  54. extra: ExtraModel = g.criterian_meta.extra
  55. if extra.start_time and extra.end_time:
  56. criterions.append(SysUserPo.create_time >= extra.start_time)
  57. criterions.append(SysUserPo.create_time <= extra.end_time)
  58. # 检查是否需要应用数据范围过滤
  59. # 只有当用户不是超级管理员时才应用数据范围过滤
  60. login_user = SecurityUtil.get_login_user()
  61. if g.criterian_meta.scope and (not login_user or not SecurityUtil.is_user_admin(login_user.user)):
  62. criterions.append(g.criterian_meta.scope)
  63. stmt = select(*user_columns, *dept_columns) \
  64. .join(SysDeptPo, SysUserPo.dept_id == SysDeptPo.dept_id, isouter=True) \
  65. .where(*criterions)
  66. print("Generated SQL:")
  67. print(stmt.compile(db.session.bind, compile_kwargs={"literal_binds": True}))
  68. if g.criterian_meta.page:
  69. g.criterian_meta.page.stmt = stmt
  70. rows = db.session.execute(stmt).all()
  71. eos = list()
  72. for row in rows:
  73. user = user_columns.cast(row, SysUser)
  74. user.dept = dept_columns.cast(row, SysDept)
  75. eos.append(user)
  76. return eos
  77. @classmethod
  78. def select_allocated_list(cls, user: SysUser) -> List[SysUser]:
  79. """
  80. 根据条件,查询已配用户角色列表
  81. Args:
  82. user (SysUser): 用户传输条件信息
  83. Returns:
  84. List[SysUser]: 用户信息列表
  85. """
  86. fields = {"user_id", "dept_id", "user_name", "nick_name", "email", \
  87. "phonenumber", "status", "create_time"}
  88. columns = ColumnEntityList(SysUserPo, fields, alia_prefix=False)
  89. criterions = [SysUserPo.del_flag == "0"]
  90. if user.user_name:
  91. criterions.append(SysUserPo.user_name.like(f"%{user.user_name}%"))
  92. if user.phonenumber:
  93. criterions.append(SysUserPo.phonenumber.like(f"%{user.phonenumber}%"))
  94. # 检查是否需要应用数据范围过滤
  95. login_user = SecurityUtil.get_login_user()
  96. if "criterian_meta" in g and g.criterian_meta.scope and (not login_user or not SecurityUtil.is_user_admin(login_user.user)):
  97. criterions.append(g.criterian_meta.scope)
  98. stmt = select(*columns).distinct() \
  99. .join(SysDeptPo, SysUserPo.dept_id == SysDeptPo.dept_id) \
  100. .join(SysUserRolePo, SysUserPo.user_id == SysUserRolePo.user_id) \
  101. .join(SysRolePo, SysUserRolePo.role_id == SysRolePo.role_id) \
  102. .where(*criterions)
  103. rows = db.session.execute(stmt).all()
  104. return [columns.cast(row, SysUser) for row in rows]
  105. @classmethod
  106. def select_unallocated_list(cls, user: SysUser) -> List[SysUser]:
  107. """
  108. 根据条件,查询未分配用户角色列表
  109. Args:
  110. user (SysUser): 用户传输条件信息
  111. Returns:
  112. List[SysUser]: 用户信息列表
  113. """
  114. fields = {
  115. "user_id", "dept_id", "user_name", "nick_name", "email", \
  116. "phonenumber", "status", "create_time"
  117. }
  118. columns = ColumnEntityList(SysUserPo, fields, False)
  119. criterions = [SysUserPo.del_flag == "0"]
  120. subquery = select(SysUserPo.user_id) \
  121. .join(SysUserRolePo, and_(
  122. SysUserPo.user_id == SysUserRolePo.user_id,
  123. SysUserRolePo.role_id == user.role_id
  124. )) \
  125. .subquery()
  126. criterions.append(or_(
  127. SysRolePo.role_id != user.role_id,
  128. SysRolePo.role_id.is_(None)
  129. ))
  130. criterions.append(SysUserPo.user_id.notin_(subquery))
  131. if user.user_name:
  132. criterions.append(SysUserPo.user_name.like(f"%{user.user_name}%"))
  133. if user.phonenumber:
  134. criterions.append(SysUserPo.phonenumber.like(f"%{user.phonenumber}%"))
  135. # 检查是否需要应用数据范围过滤
  136. login_user = SecurityUtil.get_login_user()
  137. if "criterian_meta" in g and g.criterian_meta.scope and (not login_user or not SecurityUtil.is_user_admin(login_user.user)):
  138. criterions.append(g.criterian_meta.scope)
  139. stmt = select(*columns).distinct() \
  140. .join(SysDeptPo, SysUserPo.dept_id == SysDeptPo.dept_id) \
  141. .join(SysUserRolePo, SysUserPo.user_id == SysUserRolePo.user_id) \
  142. .join(SysRolePo, SysUserRolePo.role_id == SysRolePo.role_id) \
  143. .where(*criterions)
  144. rows = db.session.execute(stmt).all()
  145. return [columns.cast(row, SysUser) for row in rows]
  146. @classmethod
  147. def select_user_by_user_name(cls, user_name: str) -> Optional[SysUser]:
  148. """
  149. 通过用户名查询用户
  150. Args:
  151. user_name (str): 用户名
  152. Returns:
  153. Optional[SysUser]: 用户信息
  154. """
  155. return cls.select_user_by_unique_map("user_name", user_name)
  156. @classmethod
  157. def select_user_by_id(cls, user_id: int) -> Optional[SysUser]:
  158. """
  159. 通过用户ID查询用户
  160. Args:
  161. user_id (int): 用户ID
  162. Returns:
  163. Optional[SysUser]: 用户信息
  164. """
  165. return cls.select_user_by_unique_map("user_id", user_id)
  166. @classmethod
  167. def select_user_by_unique_map(
  168. cls,
  169. key: str,
  170. value: int | str
  171. ) -> Optional[SysUser]:
  172. """
  173. 通过含有唯一键的条件,查询用户
  174. Args:
  175. key (str): 唯一键名
  176. value (int|str): 唯一键值
  177. Returns:
  178. Optional[SysUser]: 用户信息
  179. """
  180. dept_vo_fields = {
  181. "dept_id", "parent_id", "dept_name", "order_num", "leader",
  182. "status"
  183. }
  184. role_vo_fields = {
  185. "role_id", "role_name", "role_key", "role_sort", "data_scope",
  186. "status"
  187. }
  188. user_columns = ColumnEntityList(SysUserPo, cls.default_fields)
  189. dept_columns = ColumnEntityList(SysDeptPo, dept_vo_fields)
  190. role_columns = ColumnEntityList(SysRolePo, role_vo_fields)
  191. column = getattr(SysUserPo, key)
  192. stmt = select(*user_columns, *dept_columns, *role_columns).distinct() \
  193. .join(
  194. SysDeptPo,
  195. SysUserPo.dept_id == SysDeptPo.dept_id,
  196. isouter=True
  197. ) \
  198. .join(
  199. SysUserRolePo,
  200. SysUserPo.user_id == SysUserRolePo.user_id,
  201. isouter=True
  202. ) \
  203. .join(
  204. SysRolePo,
  205. SysUserRolePo.role_id == SysRolePo.role_id,
  206. isouter=True
  207. ) \
  208. .where(column == value)
  209. rows = db.session.execute(stmt).all()
  210. eo_tmp = {}
  211. role_pk_label = role_columns[0].key if role_columns else None
  212. for row in rows:
  213. if key in eo_tmp:
  214. user = eo_tmp[key]
  215. else:
  216. user = user_columns.cast(row, SysUser)
  217. user.dept = dept_columns.cast(row, SysDept)
  218. eo_tmp[key] = user
  219. if role_pk_label and getattr(row, role_pk_label) is not None:
  220. user.roles.append(role_columns.cast(row, SysRole))
  221. return eo_tmp[key] if rows else None
  222. @classmethod
  223. @Transactional(db.session)
  224. def insert_user(cls, user: SysUser) -> int:
  225. """
  226. 新增用户信息
  227. Args:
  228. user (SysUser): 用户信息
  229. Returns:
  230. int: 新增记录的ID
  231. """
  232. fields = {
  233. "user_id", "dept_id", "user_name", "nick_name", "email", "avatar",
  234. "phonenumber", "sex", "password", "status", "create_by", "remark",
  235. "create_time", "update_time"
  236. }
  237. data = user.model_dump(
  238. include=fields,
  239. exclude_unset=True,
  240. exclude_none=True
  241. )
  242. if user.password is not None:
  243. data["password"] = user.password
  244. stmt = insert(SysUserPo).values(data)
  245. out = db.session.execute(stmt).inserted_primary_key
  246. return out[0] if out else 0
  247. @classmethod
  248. @Transactional(db.session)
  249. def update_user(cls, user: SysUser) -> int:
  250. """
  251. 修改用户信息
  252. Args:
  253. user (SysUser): 用户信息
  254. Returns:
  255. int: 修改数量
  256. """
  257. fields = {
  258. "dept_id", "user_name", "nick_name", "email", "avatar", "login_ip",
  259. "phonenumber", "sex", "password", "login_date", "status", "update_by",
  260. "remark", "update_time"
  261. }
  262. data = user.model_dump(
  263. include=fields,
  264. exclude_unset=True,
  265. exclude_none=True
  266. )
  267. # 如果密码为空字符串或 None,则不更新密码字段,避免把密码清空
  268. if not user.password:
  269. data.pop("password", None)
  270. else:
  271. data["password"] = user.password
  272. stmt = update(SysUserPo) \
  273. .where(SysUserPo.user_id == user.user_id) \
  274. .values(data)
  275. return db.session.execute(stmt).rowcount
  276. @classmethod
  277. @Transactional(db.session)
  278. def update_user_login_info(cls, user: SysUser) -> int:
  279. """
  280. 更新用户登录信息(登录IP、时间等)
  281. Args:
  282. user (SysUser): 用户信息(需包含 user_id)
  283. Returns:
  284. int: 修改数量
  285. """
  286. fields = {"login_ip", "login_date", "update_time"}
  287. data = user.model_dump(
  288. include=fields,
  289. exclude_unset=True,
  290. exclude_none=True
  291. )
  292. if not data:
  293. return 0
  294. stmt = update(SysUserPo) \
  295. .where(SysUserPo.user_id == user.user_id) \
  296. .values(data)
  297. return db.session.execute(stmt).rowcount
  298. @classmethod
  299. @Transactional(db.session)
  300. def update_user_avatar(cls, user_name: str, avatar: str) -> int:
  301. """
  302. 修改用户头像
  303. Args:
  304. user_name (str): 用户名
  305. avatar (str): 头像地址
  306. Returns:
  307. int: 修改数量
  308. """
  309. stmt = update(SysUserPo) \
  310. .where(SysUserPo.user_name == user_name) \
  311. .values(**{'avatar': avatar})
  312. return db.session.execute(stmt).rowcount
  313. @classmethod
  314. @Transactional(db.session)
  315. def reset_user_pwd(cls, user_name: str, password: str) -> int:
  316. """
  317. 重置用户密码
  318. Args:
  319. user_name (str): 用户名
  320. password (str): 密码
  321. Returns:
  322. int: 修改数量
  323. """
  324. stmt = update(SysUserPo) \
  325. .where(SysUserPo.user_name == user_name) \
  326. .values(password=password)
  327. return db.session.execute(stmt).rowcount
  328. @classmethod
  329. @Transactional(db.session)
  330. def delete_user_by_id(cls, user_id: int) -> int:
  331. """
  332. 通过用户ID删除用户
  333. Args:
  334. user_id (int): 用户ID
  335. Returns:
  336. int: 删除数量
  337. """
  338. stmt = update(SysUserPo).where(SysUserPo.user_id == user_id) \
  339. .values(del_flag="2")
  340. num = db.session.execute(stmt).rowcount
  341. return num
  342. @classmethod
  343. @Transactional(db.session)
  344. def delete_user_by_ids(cls, user_ids: List[int]) -> int:
  345. """
  346. 批量删除用户信息
  347. Args:
  348. user_ids (List[int]): 用户ID列表
  349. Returns:
  350. int: 删除数量
  351. """
  352. stmt = update(SysUserPo).where(SysUserPo.user_id.in_(user_ids)) \
  353. .values(del_flag="2")
  354. num = db.session.execute(stmt).rowcount
  355. return num
  356. @classmethod
  357. def check_user_name_unique(cls, user_name: str) -> int:
  358. """
  359. 校验用户名称是否唯一
  360. Args:
  361. user_name (str): 用户名称
  362. Returns:
  363. int: 0-唯一,大于0-已存在
  364. """
  365. stmt = select(func.count()).select_from(SysUserPo) \
  366. .where(SysUserPo.user_name == user_name)
  367. return db.session.execute(stmt).scalar() or 0
  368. @classmethod
  369. def check_phone_unique(cls, phone_number: str) -> Optional[SysUser]:
  370. """
  371. 校验手机号码是否唯一
  372. Args:
  373. phone_number (str): 手机号码
  374. Returns:
  375. Optional[SysUser]: 用户信息
  376. """
  377. fields = {"user_id", "phonenumber"}
  378. columns = ColumnEntityList(
  379. SysUserPo, fields, alia_prefix=False
  380. )
  381. stmt = select(*columns) \
  382. .where(SysUserPo.phonenumber == phone_number)
  383. row = db.session.execute(stmt).one_or_none()
  384. return columns.cast(row, SysUser) if row else None
  385. @classmethod
  386. def check_email_unique(cls, email: str) -> Optional[SysUser]:
  387. """
  388. 校验email是否唯一
  389. Args:
  390. email (str): email
  391. Returns:
  392. Optional[SysUser]: 用户信息
  393. """
  394. fields = {"user_id", "email"}
  395. columns = ColumnEntityList(
  396. SysUserPo, fields, alia_prefix=False
  397. )
  398. stmt = select(*columns).where(SysUserPo.email == email)
  399. row = db.session.execute(stmt).one_or_none()
  400. return columns.cast(row, SysUser) if row else None