|
@@ -0,0 +1,241 @@
|
|
|
|
|
+package shop.alien.lawyer.config;
|
|
|
|
|
+
|
|
|
|
|
+import cn.hutool.core.collection.CollectionUtil;
|
|
|
|
|
+import com.alibaba.fastjson2.JSONObject;
|
|
|
|
|
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
|
|
|
|
+import com.google.common.collect.Lists;
|
|
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
|
|
+import org.springframework.context.ApplicationContext;
|
|
|
|
|
+import org.springframework.context.ApplicationContextAware;
|
|
|
|
|
+import org.springframework.stereotype.Component;
|
|
|
|
|
+import org.springframework.util.ObjectUtils;
|
|
|
|
|
+import shop.alien.entity.store.LifeBlacklist;
|
|
|
|
|
+import shop.alien.entity.store.LifeMessage;
|
|
|
|
|
+import shop.alien.entity.store.vo.WebSocketVo;
|
|
|
|
|
+import shop.alien.mapper.LifeBlacklistMapper;
|
|
|
|
|
+import shop.alien.mapper.LifeMessageMapper;
|
|
|
|
|
+import shop.alien.util.common.safe.ImageReviewServiceEnum;
|
|
|
|
|
+import shop.alien.util.common.safe.TextModerationResultVO;
|
|
|
|
|
+import shop.alien.util.common.safe.TextModerationUtil;
|
|
|
|
|
+import shop.alien.util.common.safe.TextReviewServiceEnum;
|
|
|
|
|
+
|
|
|
|
|
+import javax.websocket.*;
|
|
|
|
|
+import javax.websocket.server.PathParam;
|
|
|
|
|
+import javax.websocket.server.ServerEndpoint;
|
|
|
|
|
+import java.util.List;
|
|
|
|
|
+import java.util.Map;
|
|
|
|
|
+import java.util.Set;
|
|
|
|
|
+import java.util.concurrent.ConcurrentHashMap;
|
|
|
|
|
+import java.util.stream.Collectors;
|
|
|
|
|
+
|
|
|
|
|
+/**
|
|
|
|
|
+ * @author ssk
|
|
|
|
|
+ * @version 1.0
|
|
|
|
|
+ * @date 2024/2/29 14:42
|
|
|
|
|
+ */
|
|
|
|
|
+@Slf4j
|
|
|
|
|
+@Component
|
|
|
|
|
+@ServerEndpoint(value = "/socket/{sendId}")
|
|
|
|
|
+public class WebSocketProcess implements ApplicationContextAware {
|
|
|
|
|
+
|
|
|
|
|
+ private static ApplicationContext applicationContext;
|
|
|
|
|
+ private static LifeBlacklistMapper lifeBlacklistMapper;
|
|
|
|
|
+ private static LifeMessageMapper lifeMessageMapper;
|
|
|
|
|
+ private static BaseRedisService baseRedisService;
|
|
|
|
|
+ private static TextModerationUtil textModerationUtil;
|
|
|
|
|
+ /*
|
|
|
|
|
+ * 持有每个webSocket对象,以key-value存储到线程安全ConcurrentHashMap,
|
|
|
|
|
+ */
|
|
|
|
|
+ private static final ConcurrentHashMap<String, WebSocketProcess> concurrentHashMap = new ConcurrentHashMap<>(12);
|
|
|
|
|
+
|
|
|
|
|
+ @Override
|
|
|
|
|
+ public void setApplicationContext(ApplicationContext context) {
|
|
|
|
|
+ WebSocketProcess.applicationContext = context;
|
|
|
|
|
+ WebSocketProcess.lifeBlacklistMapper = context.getBean(LifeBlacklistMapper.class);
|
|
|
|
|
+ WebSocketProcess.lifeMessageMapper = context.getBean(LifeMessageMapper.class);
|
|
|
|
|
+ WebSocketProcess.baseRedisService = context.getBean(BaseRedisService.class);
|
|
|
|
|
+ WebSocketProcess.textModerationUtil = context.getBean(TextModerationUtil.class);
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 会话对象
|
|
|
|
|
+ **/
|
|
|
|
|
+ private Session session;
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 接收到客户端消息时触发
|
|
|
|
|
+ */
|
|
|
|
|
+ @OnMessage
|
|
|
|
|
+ public void onMessage(@PathParam("sendId") String id, String message) throws Exception {
|
|
|
|
|
+ try {
|
|
|
|
|
+ // 解析消息
|
|
|
|
|
+ WebSocketVo webSocketVo = JSONObject.parseObject(message, WebSocketVo.class);
|
|
|
|
|
+
|
|
|
|
|
+ // 过滤心跳
|
|
|
|
|
+ if (null == webSocketVo || "heartbeat".equals(webSocketVo.getCategory())) return;
|
|
|
|
|
+
|
|
|
|
|
+ // 记录已读消息id
|
|
|
|
|
+ if ("receipt".equals(webSocketVo.getCategory())) {
|
|
|
|
|
+ baseRedisService.setListRight("readMessageIdKey", String.valueOf(webSocketVo.getMessageId()));
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 检查消息合规性
|
|
|
|
|
+ if (!checkCompliance(webSocketVo)) {
|
|
|
|
|
+ webSocketVo.setType("7");
|
|
|
|
|
+ webSocketVo.setText("发送内容存在违规行为");
|
|
|
|
|
+ sendMessage(webSocketVo.getSenderId(), JSONObject.from(webSocketVo).toJSONString());
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ log.info("webSocketVo----------------{}", JSONObject.from(webSocketVo).toJSONString());
|
|
|
|
|
+ log.info("concurrentHashMap----------{}", concurrentHashMap.keySet());
|
|
|
|
|
+
|
|
|
|
|
+ // 保存消息记录
|
|
|
|
|
+ LifeMessage lifeMessage = new LifeMessage();
|
|
|
|
|
+ lifeMessage.setSenderId(webSocketVo.getSenderId());
|
|
|
|
|
+ lifeMessage.setReceiverId(webSocketVo.getReceiverId());
|
|
|
|
|
+ lifeMessage.setContent(webSocketVo.getText());
|
|
|
|
|
+ lifeMessage.setType(webSocketVo.getType());
|
|
|
|
|
+ lifeMessage.setBusinessId(webSocketVo.getBusinessId());
|
|
|
|
|
+ // 查询自己是否在对方的黑名单中
|
|
|
|
|
+ if (baseRedisService.hasKey("blackList_" + webSocketVo.getSenderId())) {
|
|
|
|
|
+ List<String> blackList = baseRedisService.getList("blackList_" + webSocketVo.getSenderId());
|
|
|
|
|
+ if (blackList.contains(webSocketVo.getReceiverId())) {
|
|
|
|
|
+ lifeMessage.setDeletePhoneId(webSocketVo.getReceiverId());
|
|
|
|
|
+ lifeMessageMapper.insert(lifeMessage);
|
|
|
|
|
+ // 发送消息
|
|
|
|
|
+ webSocketVo.setMessageId(lifeMessage.getId());
|
|
|
|
|
+ webSocketVo.setCategory("message");
|
|
|
|
|
+ webSocketVo.setCreatedTime(lifeMessage.getCreatedTime());
|
|
|
|
|
+ sendMessage(webSocketVo.getSenderId(), JSONObject.from(webSocketVo).toJSONString());
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ lifeMessageMapper.insert(lifeMessage);
|
|
|
|
|
+ // 发送消息
|
|
|
|
|
+ webSocketVo.setMessageId(lifeMessage.getId());
|
|
|
|
|
+ webSocketVo.setCategory("message");
|
|
|
|
|
+ webSocketVo.setCreatedTime(lifeMessage.getCreatedTime());
|
|
|
|
|
+ sendMessage(webSocketVo.getSenderId(), JSONObject.from(webSocketVo).toJSONString());
|
|
|
|
|
+ sendMessage(webSocketVo.getReceiverId(), JSONObject.from(webSocketVo).toJSONString());
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ log.error("WebSocketProcess.onMessage()----Exception----Message={}", e.getMessage());
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 发送消息到指定客户端
|
|
|
|
|
+ */
|
|
|
|
|
+ public void sendMessage(String id, String message) throws Exception {
|
|
|
|
|
+ try {
|
|
|
|
|
+ log.info("WebSocketProcess.sendMessage()--readySend----id={},message={}", id, message);
|
|
|
|
|
+ // 根据id,从map中获取存储的webSocket对象
|
|
|
|
|
+ WebSocketProcess webSocketProcess = concurrentHashMap.get(id);
|
|
|
|
|
+ if (!ObjectUtils.isEmpty(webSocketProcess)) {
|
|
|
|
|
+ // 当客户端是Open状态时,才能发送消息
|
|
|
|
|
+ if (webSocketProcess.session.isOpen()) {
|
|
|
|
|
+ webSocketProcess.session.getBasicRemote().sendText(message);
|
|
|
|
|
+ log.info("WebSocketProcess.sendMessage()---sendSuccess---id={},message={}", id, message);
|
|
|
|
|
+ } else {
|
|
|
|
|
+ log.error("WebSocketProcess.sendMessage()---sendError----websocket session:{} is closed ", id);
|
|
|
|
|
+ }
|
|
|
|
|
+ } else {
|
|
|
|
|
+ log.error("WebSocketProcess.sendMessage()---sendError----websocket session:{} is not exit ", id);
|
|
|
|
|
+ }
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ log.error("WebSocketProcess.sendMessage()---Exception----Message={}", e.getMessage());
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 客户端创建连接时触发
|
|
|
|
|
+ * */
|
|
|
|
|
+ @OnOpen
|
|
|
|
|
+ public void onOpen(@PathParam("sendId") String id, Session session) {
|
|
|
|
|
+ try {
|
|
|
|
|
+ //每新建立一个连接,就把当前客户id为key,this为value存储到map中
|
|
|
|
|
+ this.session = session;
|
|
|
|
|
+ concurrentHashMap.put(id, this);
|
|
|
|
|
+ log.info("WebSocketProcess.onOpen() Open a websocket. id={}", id);
|
|
|
|
|
+
|
|
|
|
|
+ // 获取拉黑自己的用户信息
|
|
|
|
|
+ LambdaQueryWrapper<LifeBlacklist> wrapper = new LambdaQueryWrapper<>();
|
|
|
|
|
+ wrapper.eq(LifeBlacklist::getBlockedPhoneId, id);
|
|
|
|
|
+ List<String> blackList = lifeBlacklistMapper.selectList(wrapper).stream().map(LifeBlacklist::getBlockerPhoneId).collect(Collectors.toList());
|
|
|
|
|
+ if (CollectionUtil.isNotEmpty(blackList)) baseRedisService.setSaveOrOverwriteScriptList("blackList_" + id, blackList);
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ log.error("WebSocketProcess.onOpen()----Exception----Message={}", e.getMessage());
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 客户端连接关闭时触发
|
|
|
|
|
+ **/
|
|
|
|
|
+ @OnClose
|
|
|
|
|
+ public void onClose(Session session, @PathParam("sendId") String id) {
|
|
|
|
|
+ try {
|
|
|
|
|
+ //客户端连接关闭时,移除map中存储的键值对
|
|
|
|
|
+ concurrentHashMap.remove(id);
|
|
|
|
|
+ log.info("WebSocketProcess.onClose() close a websocket, concurrentHashMap remove sessionId= {}", id);
|
|
|
|
|
+ if (baseRedisService.hasKey("blackList_" + id)) baseRedisService.delete("blackList_" + id);
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ log.error("WebSocketProcess.onClose()----Exception----Message={}", e.getMessage());
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 连接发生异常时候触发
|
|
|
|
|
+ */
|
|
|
|
|
+ @OnError
|
|
|
|
|
+ public void onError(@PathParam("sendId") String id, Throwable error) {
|
|
|
|
|
+ try {
|
|
|
|
|
+ log.error("WebSocketProcess.onError() Error,id={}, Msg=", id, error);
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ log.error("WebSocketProcess.onError()----Exception----Message={}", e.getMessage());
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 验证消息合规性
|
|
|
|
|
+ */
|
|
|
|
|
+ private boolean checkCompliance(WebSocketVo websocketVo) throws Exception {
|
|
|
|
|
+ try {
|
|
|
|
|
+ List<String> servicesList = Lists.newArrayList();
|
|
|
|
|
+ TextModerationResultVO textModerationResultVO = null;
|
|
|
|
|
+ if ("1".equals(websocketVo.getType())) {
|
|
|
|
|
+ servicesList.add(TextReviewServiceEnum.AD_COMPLIANCE_DETECTION_PRO.getService());
|
|
|
|
|
+ servicesList.add(TextReviewServiceEnum.CHAT_DETECTION_PRO.getService());
|
|
|
|
|
+ textModerationResultVO = textModerationUtil.invokeFunction(websocketVo.getText(), servicesList);
|
|
|
|
|
+
|
|
|
|
|
+ } else if ("2".equals(websocketVo.getType())) {
|
|
|
|
|
+ servicesList.add(ImageReviewServiceEnum.TONALITY_IMPROVE.getService());
|
|
|
|
|
+ textModerationResultVO = textModerationUtil.invokeFunction(websocketVo.getText(), servicesList);
|
|
|
|
|
+ }
|
|
|
|
|
+ return !(null != textModerationResultVO && "high".equals(textModerationResultVO.getRiskLevel()));
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ log.error("WebSocketProcess.checkCompliance()----Exception----Message={}", e.getMessage());
|
|
|
|
|
+ return true;
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 发送消息到所有客户端
|
|
|
|
|
+ */
|
|
|
|
|
+ public void sendAllMessage(String msg) throws Exception {
|
|
|
|
|
+ Set<Map.Entry<String, WebSocketProcess>> entries = concurrentHashMap.entrySet();
|
|
|
|
|
+ for (Map.Entry<String, WebSocketProcess> entry : entries) {
|
|
|
|
|
+ String cid = entry.getKey();
|
|
|
|
|
+ WebSocketProcess webSocketProcess = entry.getValue();
|
|
|
|
|
+ boolean sessionOpen = webSocketProcess.session.isOpen();
|
|
|
|
|
+ if (sessionOpen) {
|
|
|
|
|
+ webSocketProcess.session.getBasicRemote().sendText(msg);
|
|
|
|
|
+ } else {
|
|
|
|
|
+ log.info("cid={} is closed,ignore send text", cid);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+}
|