token.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191
  1. # -*- coding: utf-8 -*-
  2. from datetime import datetime
  3. from typing import Optional
  4. from flask import Request, request
  5. from ruoyi_common.exception import ServiceException
  6. from ruoyi_common.utils import AddressUtil, IpUtil, TokenUtil
  7. from ruoyi_common.constant import Constants
  8. from ruoyi_common.domain.entity import LoginUser
  9. from ruoyi_framework.config import TokenConfig
  10. try:
  11. from ruoyi_admin.ext import redis_cache
  12. except Exception:
  13. # 如果redis扩展未初始化,则使用模拟的redis缓存
  14. redis_cache = None
  15. class TokenService:
  16. @classmethod
  17. def create_token(cls, user:LoginUser) -> str:
  18. """
  19. 创建token
  20. Args:
  21. user(LoginUser): 登录用户
  22. Returns:
  23. str: token
  24. """
  25. cls.set_useragent(user)
  26. cls.refresh_token(user)
  27. claims = {
  28. Constants.LOGIN_USER_KEY: user.token.hex
  29. }
  30. return cls.create_token_by_claims(claims)
  31. @classmethod
  32. def create_token_by_claims(cls, claims:dict) -> str:
  33. """
  34. 根据claims创建token
  35. Args:
  36. claims (dict): token的参数claims
  37. Returns:
  38. str: token
  39. """
  40. token = TokenUtil.encode(claims,TokenConfig.secret)
  41. return token
  42. @classmethod
  43. def verify_token(cls,user:LoginUser):
  44. '''
  45. 验证token有效期
  46. Args:
  47. user(LoginUser): 登录用户
  48. '''
  49. expire_time = user.expire_time
  50. current_time = datetime.now()
  51. if (expire_time - current_time).min < 20:
  52. cls.refresh_token(user)
  53. @classmethod
  54. def refresh_token(cls, user:LoginUser):
  55. '''
  56. 刷新token
  57. Args:
  58. user(LoginUser): 登录用户
  59. '''
  60. user.login_time = datetime.now()
  61. user.expire_time = user.login_time + TokenConfig.expire_time()
  62. usertoken_key = cls.get_token_key(user.token.hex)
  63. user_json = user.model_dump_json()
  64. if redis_cache:
  65. redis_cache.set(usertoken_key, user_json, TokenConfig.expire_time() * 60)
  66. @classmethod
  67. def set_useragent(cls, user:LoginUser):
  68. '''
  69. 设置浏览器信息
  70. Args:
  71. user(LoginUser): 登录用户
  72. '''
  73. user.ip_addr = IpUtil.get_ip()
  74. user.login_location = AddressUtil.get_address(user.ip_addr)
  75. user.browser = request.user_agent.browser
  76. user.os = request.user_agent.platform
  77. @classmethod
  78. def parse_token(cls, token:str) -> dict:
  79. '''
  80. 解析token
  81. Args:
  82. token(str): token
  83. Returns:
  84. dict: token参数claims
  85. '''
  86. payload = TokenUtil.decode(token, TokenConfig.secret)
  87. return payload
  88. @classmethod
  89. def get_login_user(cls, request:Request) -> Optional[LoginUser]:
  90. '''
  91. 获取登录用户信息
  92. Args:
  93. request(Request): 请求对象
  94. Returns:
  95. LoginUser: 登录用户信息
  96. '''
  97. token = cls.get_token(request)
  98. if token:
  99. try:
  100. claims = cls.parse_token(token)
  101. except Exception:
  102. return None
  103. token_uuid = claims.get(Constants.LOGIN_USER_KEY)
  104. usertoken_key = cls.get_token_key(token_uuid)
  105. if redis_cache:
  106. jsoned_user = redis_cache.get(usertoken_key)
  107. else:
  108. jsoned_user = None
  109. if not jsoned_user:
  110. return None
  111. login_user = LoginUser.model_validate_json(jsoned_user)
  112. if login_user:
  113. return login_user
  114. else:
  115. raise ServiceException("Token信息不存在")
  116. @classmethod
  117. def get_token(cls,request:Request) -> str:
  118. '''
  119. 获取token
  120. Args:
  121. request(Request): 请求对象
  122. Returns:
  123. str: token
  124. '''
  125. token = request.headers.get(Constants.TOKEN_HEADER)
  126. if token and token.startswith(Constants.TOKEN_PREFIX):
  127. token = token[len(Constants.TOKEN_PREFIX):]
  128. token = token.strip()
  129. return token
  130. @classmethod
  131. def get_token_key(cls, uuid:str) -> str:
  132. '''
  133. 获取token缓存key
  134. Args:
  135. uuid(str): token的uuid
  136. Returns:
  137. str: token缓存key
  138. '''
  139. return Constants.LOGIN_TOKEN_KEY + uuid
  140. @classmethod
  141. def set_login_user(cls, user:LoginUser):
  142. '''
  143. 设置登录用户信息
  144. Args:
  145. user(LoginUser): 登录用户信息
  146. '''
  147. if user and user.token:
  148. cls.refresh_token(user)
  149. @classmethod
  150. def del_login_user(cls, token:str):
  151. '''
  152. 删除登录用户信息
  153. Args:
  154. token(str): token
  155. '''
  156. if token:
  157. user_key:str = cls.get_token_key(token)
  158. if redis_cache:
  159. redis_cache.delete(user_key)