Procházet zdrojové kódy

MQ配置第一次提交

liudongzhi před 1 měsícem
rodič
revize
0d2c37a28f

+ 10 - 0
alien-config/pom.xml

@@ -61,6 +61,16 @@
         </dependency>
 
         <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-amqp</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.springframework.retry</groupId>
+            <artifactId>spring-retry</artifactId>
+        </dependency>
+
+        <dependency>
             <groupId>com.baomidou</groupId>
             <artifactId>mybatis-plus-extension</artifactId>
         </dependency>

+ 164 - 0
alien-config/src/main/java/shop/alien/config/RabbitMq/RabbitMQConfig.java

@@ -0,0 +1,164 @@
+
+package shop.alien.config.RabbitMq;
+
+import org.springframework.amqp.core.*;
+import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
+import org.springframework.amqp.rabbit.connection.ConnectionFactory;
+import org.springframework.amqp.rabbit.core.RabbitTemplate;
+import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
+import org.springframework.amqp.support.converter.MessageConverter;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.retry.annotation.EnableRetry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Configuration
+@EnableRetry
+public class RabbitMQConfig {
+
+    private static final Logger logger = LoggerFactory.getLogger(RabbitMQConfig.class);
+
+    @Value("${spring.rabbitmq.host}")
+    private String host;
+
+    @Value("${spring.rabbitmq.port}")
+    private int port;
+
+    @Value("${spring.rabbitmq.username}")
+    private String username;
+
+    @Value("${spring.rabbitmq.password}")
+    private String password;
+
+    @Value("${spring.rabbitmq.virtual-host}")
+    private String virtualHost;
+
+    /**
+     * 连接工厂
+     */
+    @Bean
+    public ConnectionFactory connectionFactory() {
+        CachingConnectionFactory factory = new CachingConnectionFactory();
+        factory.setHost(host);
+        factory.setPort(port);
+        factory.setUsername(username);
+        factory.setPassword(password);
+        factory.setVirtualHost(virtualHost);
+        
+        // 启用发布确认和返回
+        factory.setPublisherConfirms(true);
+        factory.setPublisherReturns(true);
+        
+        // 连接池配置优化
+        factory.setChannelCacheSize(25); // 缓存通道数量
+        factory.setConnectionCacheSize(1); // 连接缓存数量
+        
+        logger.info("RabbitMQ 连接工厂配置完成 - Host: {}, Port: {}, VirtualHost: {}", host, port, virtualHost);
+        return factory;
+    }
+
+    /**
+     * JSON 消息转换器
+     */
+    @Bean
+    public MessageConverter jsonMessageConverter() {
+        return new Jackson2JsonMessageConverter();
+    }
+
+    /**
+     * RabbitTemplate 配置
+     */
+    @Bean
+    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, MessageConverter messageConverter) {
+        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
+        rabbitTemplate.setMessageConverter(messageConverter);
+
+        // 设置确认回调 - 消息发送到交换机的确认
+        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
+            if (ack) {
+                logger.debug("消息发送成功到交换机,correlationData: {}", correlationData);
+            } else {
+                logger.error("消息发送失败到交换机,原因: {}, correlationData: {}", cause, correlationData);
+            }
+        });
+
+        // 设置返回回调 - 消息无法路由到队列时的回调
+//        rabbitTemplate.setReturnsCallback(returnedMessage -> {
+//            logger.warn("消息被退回 - Exchange: {}, RoutingKey: {}, ReplyCode: {}, ReplyText: {}, Message: {}",
+//                    returnedMessage.getExchange(),
+//                    returnedMessage.getRoutingKey(),
+//                    returnedMessage.getReplyCode(),
+//                    returnedMessage.getReplyText(),
+//                    new String(returnedMessage.getMessage().getBody()));
+//        });
+
+        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
+            if (!ack) {
+                logger.error("消息发送失败到交换机,原因: {}, correlationData: {}", cause, correlationData);
+            }
+        });
+
+
+
+
+        // 设置强制标志,确保消息无法路由时触发返回回调
+        rabbitTemplate.setMandatory(true);
+        
+        logger.info("RabbitTemplate 配置完成");
+        return rabbitTemplate;
+    }
+
+    // ==================== 队列、交换机、绑定配置 ====================
+
+    /**
+     * 订单队列
+     */
+    @Bean
+    public Queue orderQueue() {
+        return QueueBuilder.durable("order.queue")
+                .withArgument("x-message-ttl", 60000) // 消息 TTL 60 秒
+                .build();
+    }
+
+    /**
+     * 订单交换机
+     */
+    @Bean
+    public DirectExchange orderExchange() {
+        return ExchangeBuilder.directExchange("order.exchange").durable(true).build();
+    }
+
+    /**
+     * 订单路由键
+     */
+    @Bean
+    public Binding orderBinding(Queue orderQueue, DirectExchange orderExchange) {
+        return BindingBuilder.bind(orderQueue).to(orderExchange).with("order.routing.key");
+    }
+
+    /**
+     * 用户通知队列
+     */
+    @Bean
+    public Queue userNoticeQueue() {
+        return QueueBuilder.durable("user.notice.queue").build();
+    }
+
+    /**
+     * 用户通知交换机(主题交换机)
+     */
+    @Bean
+    public TopicExchange userNoticeExchange() {
+        return ExchangeBuilder.topicExchange("user.notice.exchange").durable(true).build();
+    }
+
+    /**
+     * 用户通知绑定
+     */
+    @Bean
+    public Binding userNoticeBinding(Queue userNoticeQueue, TopicExchange userNoticeExchange) {
+        return BindingBuilder.bind(userNoticeQueue).to(userNoticeExchange).with("user.notice.#");
+    }
+}