token.py 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215
  1. # -*- coding: utf-8 -*-
  2. from datetime import datetime
  3. from typing import Optional
  4. from flask import Request, request
  5. from ruoyi_common.exception import ServiceException
  6. from ruoyi_common.utils import AddressUtil, IpUtil, TokenUtil
  7. from ruoyi_common.utils.base import UserAgentUtil
  8. from ruoyi_common.constant import Constants
  9. from ruoyi_common.domain.entity import LoginUser
  10. from ruoyi_framework.config import TokenConfig
  11. try:
  12. from ruoyi_admin.ext import redis_cache
  13. except Exception:
  14. # 如果redis扩展未初始化,则使用模拟的redis缓存
  15. redis_cache = None
  16. class TokenService:
  17. refresh_threshold_minutes = 20
  18. @classmethod
  19. def create_token(cls, user:LoginUser) -> str:
  20. """
  21. 创建token
  22. Args:
  23. user(LoginUser): 登录用户
  24. Returns:
  25. str: token
  26. """
  27. cls.set_useragent(user)
  28. cls.refresh_token(user)
  29. claims = {
  30. Constants.LOGIN_USER_KEY: user.token.hex
  31. }
  32. return cls.create_token_by_claims(claims)
  33. @classmethod
  34. def create_token_by_claims(cls, claims:dict) -> str:
  35. """
  36. 根据claims创建token
  37. Args:
  38. claims (dict): token的参数claims
  39. Returns:
  40. str: token
  41. """
  42. token = TokenUtil.encode(claims,TokenConfig.secret)
  43. return token
  44. @classmethod
  45. def verify_token(cls,user:LoginUser):
  46. '''
  47. 验证token有效期
  48. Args:
  49. user(LoginUser): 登录用户
  50. '''
  51. expire_time = user.expire_time
  52. current_time = datetime.now()
  53. if not expire_time:
  54. cls.refresh_token(user)
  55. return
  56. remaining = (expire_time - current_time).total_seconds() / 60
  57. if remaining <= cls.refresh_threshold_minutes:
  58. cls.refresh_token(user)
  59. @classmethod
  60. def refresh_token(cls, user:LoginUser):
  61. '''
  62. 刷新token
  63. Args:
  64. user(LoginUser): 登录用户
  65. '''
  66. user.login_time = datetime.now()
  67. expire_delta = TokenConfig.expire_time()
  68. user.expire_time = user.login_time + expire_delta
  69. expire_seconds = TokenConfig.expire_seconds()
  70. usertoken_key = cls.get_token_key(user.token.hex)
  71. if redis_cache:
  72. user_json = user.model_dump_json()
  73. redis_cache.set(usertoken_key, user_json, ex=expire_seconds)
  74. @classmethod
  75. def set_useragent(cls, user:LoginUser):
  76. '''
  77. 设置浏览器信息
  78. Args:
  79. user(LoginUser): 登录用户
  80. '''
  81. agent = request.user_agent
  82. user.ip_addr = IpUtil.get_ip()
  83. user.login_location = AddressUtil.get_address(user.ip_addr)
  84. browser = getattr(agent, "browser", None)
  85. if not browser:
  86. browser = UserAgentUtil.browser()
  87. if not browser and agent:
  88. browser = agent.string
  89. user.browser = browser
  90. platform = getattr(agent, "platform", None)
  91. if not platform:
  92. platform = UserAgentUtil.os()
  93. if not platform and agent:
  94. platform = agent.string
  95. user.os = platform
  96. @classmethod
  97. def parse_token(cls, token:str) -> dict:
  98. '''
  99. 解析token
  100. Args:
  101. token(str): token
  102. Returns:
  103. dict: token参数claims
  104. '''
  105. payload = TokenUtil.decode(token, TokenConfig.secret)
  106. return payload
  107. @classmethod
  108. def get_login_user(cls, request:Request) -> Optional[LoginUser]:
  109. '''
  110. 获取登录用户信息
  111. Args:
  112. request(Request): 请求对象
  113. Returns:
  114. LoginUser: 登录用户信息
  115. '''
  116. token = cls.get_token(request)
  117. if token:
  118. try:
  119. claims = cls.parse_token(token)
  120. except Exception:
  121. return None
  122. token_uuid = claims.get(Constants.LOGIN_USER_KEY)
  123. usertoken_key = cls.get_token_key(token_uuid)
  124. if redis_cache:
  125. jsoned_user = redis_cache.get(usertoken_key)
  126. else:
  127. jsoned_user = None
  128. if not jsoned_user:
  129. return None
  130. login_user = LoginUser.model_validate_json(jsoned_user)
  131. if login_user:
  132. # 根据剩余有效期选择性刷新TTL,避免每次请求都重写缓存
  133. cls.verify_token(login_user)
  134. return login_user
  135. else:
  136. raise ServiceException("Token信息不存在")
  137. @classmethod
  138. def get_token(cls,request:Request) -> str:
  139. '''
  140. 获取token
  141. Args:
  142. request(Request): 请求对象
  143. Returns:
  144. str: token
  145. '''
  146. token = request.headers.get(Constants.TOKEN_HEADER)
  147. if token and token.startswith(Constants.TOKEN_PREFIX):
  148. token = token[len(Constants.TOKEN_PREFIX):]
  149. token = token.strip()
  150. return token
  151. @classmethod
  152. def get_token_key(cls, uuid:str) -> str:
  153. '''
  154. 获取token缓存key
  155. Args:
  156. uuid(str): token的uuid
  157. Returns:
  158. str: token缓存key
  159. '''
  160. return Constants.LOGIN_TOKEN_KEY + uuid
  161. @classmethod
  162. def set_login_user(cls, user:LoginUser):
  163. '''
  164. 设置登录用户信息
  165. Args:
  166. user(LoginUser): 登录用户信息
  167. '''
  168. if user and user.token:
  169. cls.refresh_token(user)
  170. @classmethod
  171. def del_login_user(cls, token:str):
  172. '''
  173. 删除登录用户信息
  174. Args:
  175. token(str): token
  176. '''
  177. if token:
  178. user_key:str = cls.get_token_key(token)
  179. if redis_cache:
  180. redis_cache.delete(user_key)