token.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207
  1. # -*- coding: utf-8 -*-
  2. from datetime import datetime
  3. from typing import Optional
  4. from flask import Request, request
  5. from ruoyi_common.exception import ServiceException
  6. from ruoyi_common.utils import AddressUtil, IpUtil, TokenUtil
  7. from ruoyi_common.utils.base import UserAgentUtil
  8. from ruoyi_common.constant import Constants
  9. from ruoyi_common.domain.entity import LoginUser
  10. from ruoyi_framework.config import TokenConfig
  11. try:
  12. from ruoyi_admin.ext import redis_cache
  13. except Exception:
  14. # 如果redis扩展未初始化,则使用模拟的redis缓存
  15. redis_cache = None
  16. class TokenService:
  17. @classmethod
  18. def create_token(cls, user:LoginUser) -> str:
  19. """
  20. 创建token
  21. Args:
  22. user(LoginUser): 登录用户
  23. Returns:
  24. str: token
  25. """
  26. cls.set_useragent(user)
  27. cls.refresh_token(user)
  28. claims = {
  29. Constants.LOGIN_USER_KEY: user.token.hex
  30. }
  31. return cls.create_token_by_claims(claims)
  32. @classmethod
  33. def create_token_by_claims(cls, claims:dict) -> str:
  34. """
  35. 根据claims创建token
  36. Args:
  37. claims (dict): token的参数claims
  38. Returns:
  39. str: token
  40. """
  41. token = TokenUtil.encode(claims,TokenConfig.secret)
  42. return token
  43. @classmethod
  44. def verify_token(cls,user:LoginUser):
  45. '''
  46. 验证token有效期
  47. Args:
  48. user(LoginUser): 登录用户
  49. '''
  50. expire_time = user.expire_time
  51. current_time = datetime.now()
  52. if (expire_time - current_time).min < 20:
  53. cls.refresh_token(user)
  54. @classmethod
  55. def refresh_token(cls, user:LoginUser):
  56. '''
  57. 刷新token
  58. Args:
  59. user(LoginUser): 登录用户
  60. '''
  61. user.login_time = datetime.now()
  62. expire_delta = TokenConfig.expire_time()
  63. user.expire_time = user.login_time + expire_delta
  64. expire_seconds = TokenConfig.expire_seconds()
  65. usertoken_key = cls.get_token_key(user.token.hex)
  66. user_json = user.model_dump_json()
  67. if redis_cache:
  68. redis_cache.set(usertoken_key, user_json, ex=expire_seconds)
  69. @classmethod
  70. def set_useragent(cls, user:LoginUser):
  71. '''
  72. 设置浏览器信息
  73. Args:
  74. user(LoginUser): 登录用户
  75. '''
  76. agent = request.user_agent
  77. user.ip_addr = IpUtil.get_ip()
  78. user.login_location = AddressUtil.get_address(user.ip_addr)
  79. browser = getattr(agent, "browser", None)
  80. if not browser:
  81. browser = UserAgentUtil.browser()
  82. if not browser and agent:
  83. browser = agent.string
  84. user.browser = browser
  85. platform = getattr(agent, "platform", None)
  86. if not platform:
  87. platform = UserAgentUtil.os()
  88. if not platform and agent:
  89. platform = agent.string
  90. user.os = platform
  91. @classmethod
  92. def parse_token(cls, token:str) -> dict:
  93. '''
  94. 解析token
  95. Args:
  96. token(str): token
  97. Returns:
  98. dict: token参数claims
  99. '''
  100. payload = TokenUtil.decode(token, TokenConfig.secret)
  101. return payload
  102. @classmethod
  103. def get_login_user(cls, request:Request) -> Optional[LoginUser]:
  104. '''
  105. 获取登录用户信息
  106. Args:
  107. request(Request): 请求对象
  108. Returns:
  109. LoginUser: 登录用户信息
  110. '''
  111. token = cls.get_token(request)
  112. if token:
  113. try:
  114. claims = cls.parse_token(token)
  115. except Exception:
  116. return None
  117. token_uuid = claims.get(Constants.LOGIN_USER_KEY)
  118. usertoken_key = cls.get_token_key(token_uuid)
  119. if redis_cache:
  120. jsoned_user = redis_cache.get(usertoken_key)
  121. else:
  122. jsoned_user = None
  123. if not jsoned_user:
  124. return None
  125. login_user = LoginUser.model_validate_json(jsoned_user)
  126. if login_user:
  127. return login_user
  128. else:
  129. raise ServiceException("Token信息不存在")
  130. @classmethod
  131. def get_token(cls,request:Request) -> str:
  132. '''
  133. 获取token
  134. Args:
  135. request(Request): 请求对象
  136. Returns:
  137. str: token
  138. '''
  139. token = request.headers.get(Constants.TOKEN_HEADER)
  140. if token and token.startswith(Constants.TOKEN_PREFIX):
  141. token = token[len(Constants.TOKEN_PREFIX):]
  142. token = token.strip()
  143. return token
  144. @classmethod
  145. def get_token_key(cls, uuid:str) -> str:
  146. '''
  147. 获取token缓存key
  148. Args:
  149. uuid(str): token的uuid
  150. Returns:
  151. str: token缓存key
  152. '''
  153. return Constants.LOGIN_TOKEN_KEY + uuid
  154. @classmethod
  155. def set_login_user(cls, user:LoginUser):
  156. '''
  157. 设置登录用户信息
  158. Args:
  159. user(LoginUser): 登录用户信息
  160. '''
  161. if user and user.token:
  162. cls.refresh_token(user)
  163. @classmethod
  164. def del_login_user(cls, token:str):
  165. '''
  166. 删除登录用户信息
  167. Args:
  168. token(str): token
  169. '''
  170. if token:
  171. user_key:str = cls.get_token_key(token)
  172. if redis_cache:
  173. redis_cache.delete(user_key)