Эх сурвалжийг харах

feat(rabbitmq): 配置订单延迟消息队列并优化监听逻辑- 修改开发环境 RabbitMQ 用户名为 guest
- 新增生产环境 RabbitMQ 配置项,包括并发消费参数
- 重构延迟消息监听器,区分订单超时与过期场景- 新增订单过期延迟消息监听器 OrderExpireDelayedMessageListener
- 调整 DelayedMessageService 方法名及路由键常量- 删除旧的订单过期定时任务类 OrderExpiredJobService
- 更新 RabbitMQ 配置类,定义两套独立的延迟消息交换机与队列
- 修改测试接口调用方法名以匹配新的业务语义

wzq 4 долоо хоног өмнө
parent
commit
ab6c2f4002

+ 1 - 1
national-motion-module-system/national-motion-system-biz/src/main/java/org/jeecg/modules/pay/paytest/payController.java

@@ -269,7 +269,7 @@ public class payController {
     private final DelayedMessageService delayedMessageService;
     @GetMapping(value = "/test/rabbitTest")
     public void rabbitTest(@RequestParam("msg") String msg) {
-        delayedMessageService.send25DayMessage( msg);
+        delayedMessageService.sendOrderExpireMessage( msg);
         log.info("接口调用,发送消息成功");
     }
 

+ 0 - 25
national-motion-module-system/national-motion-system-biz/src/main/java/org/jeecg/modules/quartz/job/OrderExpiredJobService.java

@@ -1,25 +0,0 @@
-package org.jeecg.modules.quartz.job;
-
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.scheduling.annotation.Scheduled;
-
-import javax.transaction.Transactional;
-
-/**
- * 订单过期定时任务
- */
-@Slf4j
-public class OrderExpiredJobService {
-    @Scheduled(cron = "0 0 * * * ?")
-    @Transactional(rollbackOn = Exception.class)
-    public void profitSharingExecute() {
-        log.info("开始执行分账定时任务");
-        try {
-
-            //订单过期
-
-        } catch (Exception e) {
-            log.error("分账定时任务异常", e);
-        }
-    }
-}

+ 10 - 4
national-motion-module-system/national-motion-system-biz/src/main/java/org/jeecg/modules/rabbitmq/DelayedMessageListener.java

@@ -28,14 +28,20 @@ public class DelayedMessageListener {
     private final IAppOrderService appOrderService;
     private final IAppOrderProInfoService appOrderProInfoService;
 
-    @RabbitListener(queues = RabbitMQConfig.DELAY_QUEUE)
+    /**
+     * 监听订单超时未支付延迟队列
+     *
+     * @param message
+     * @param channel
+     */
+    @RabbitListener(queues = RabbitMQConfig.ORDER_DELAY_QUEUE)
     public void handleMessage(Message message, Channel channel) throws IOException {
         try {
-            String orderId = new String(message.getBody());
-            log.info("收到延迟消息,订单ID,{}:",orderId);
+            String msg = new String(message.getBody());
+            log.info("收到延迟消息,{}:",msg);
 
             // 业务逻辑处理
-            orderProcessMessage(orderId);
+            orderProcessMessage(msg);
 
             // 手动确认成功
             channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

+ 5 - 5
national-motion-module-system/national-motion-system-biz/src/main/java/org/jeecg/modules/rabbitmq/DelayedMessageService.java

@@ -12,7 +12,7 @@ public class DelayedMessageService {
     @Autowired
     private RabbitTemplate rabbitTemplate;
 
-    public void send25DayMessage(String message) {
+    public void sendOrderExpireMessage(String message) {
         MessagePostProcessor processor = msg -> {
             // 设置延迟时间(25天毫秒数)
             msg.getMessageProperties().setDelay(60 * 1000);
@@ -22,8 +22,8 @@ public class DelayedMessageService {
         };
 
         rabbitTemplate.convertAndSend(
-                RabbitMQConfig.DELAY_EXCHANGE,
-                RabbitMQConfig.DELAY_ROUTING_KEY,
+                RabbitMQConfig.ORDER_DELAY_EXCHANGE,
+                RabbitMQConfig.ORDER_DELAY_ROUTING_KEY,
                 message,
                 processor
         );
@@ -44,8 +44,8 @@ public class DelayedMessageService {
         };
 
         rabbitTemplate.convertAndSend(
-                RabbitMQConfig.DELAY_EXCHANGE,
-                RabbitMQConfig.DELAY_ROUTING_KEY,
+                RabbitMQConfig.ORDER_DELAY_EXCHANGE,
+                RabbitMQConfig.ORDER_DELAY_ROUTING_KEY,
                 message,
                 processor
         );

+ 89 - 0
national-motion-module-system/national-motion-system-biz/src/main/java/org/jeecg/modules/rabbitmq/OrderExpireDelayedMessageListener.java

@@ -0,0 +1,89 @@
+package org.jeecg.modules.rabbitmq;
+
+
+import cn.hutool.core.collection.CollUtil;
+import cn.hutool.core.util.ObjectUtil;
+import com.baomidou.mybatisplus.core.toolkit.Wrappers;
+import com.rabbitmq.client.Channel;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.jeecg.common.constant.CommonConstant;
+import org.jeecg.modules.system.app.entity.AppOrder;
+import org.jeecg.modules.system.app.entity.AppOrderProInfo;
+import org.jeecg.modules.system.app.service.IAppOrderProInfoService;
+import org.jeecg.modules.system.app.service.IAppOrderService;
+import org.springframework.amqp.core.Message;
+import org.springframework.amqp.rabbit.annotation.RabbitListener;
+import org.springframework.stereotype.Component;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+@Slf4j
+@Component
+@AllArgsConstructor
+public class OrderExpireDelayedMessageListener {
+
+    private final IAppOrderService appOrderService;
+    private final IAppOrderProInfoService appOrderProInfoService;
+
+    /**
+     * 监听订单过期延迟队列
+     *
+     * @param message
+     * @param channel
+     */
+    @RabbitListener(queues = RabbitMQConfig.ORDER_EXPIRE_DELAY_QUEUE)
+    public void handleMessage(Message message, Channel channel) throws IOException {
+        try {
+            String msg = new String(message.getBody());
+            log.info("收到延迟消息,订单ID,{}:",msg);
+
+            // 业务逻辑处理
+            orderExpireMessage(msg);
+
+            // 手动确认成功
+            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
+        } catch (Exception e) {
+            // 处理失败,拒绝消息并重新入队(或进入死信队列)
+            log.error("处理延迟消息失败:{}", e.getMessage());
+            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
+        }
+    }
+
+    private void orderExpireMessage(String orderId) {
+        // 业务处理
+        log.info("处理订单消息:{}",orderId);
+
+        //执行业务代码
+        AppOrder appOrder = appOrderService.getById(orderId);
+        if(ObjectUtil.isNotEmpty(appOrder)){
+            if (Objects.equals(appOrder.getOrderStatus(), CommonConstant.ORDER_STATUS_0) && appOrder.getRevision() == 0) {
+
+                //修改子订单状态
+                List<AppOrderProInfo> appOrderProInfoList = appOrderProInfoService.list(Wrappers.<AppOrderProInfo>lambdaQuery()
+                        .eq(AppOrderProInfo::getOrderId, orderId)
+                );
+                if (CollUtil.isNotEmpty(appOrderProInfoList)){
+                    for (AppOrderProInfo appOrderProInfo : appOrderProInfoList) {
+                        if (Objects.equals(appOrderProInfo.getOrderStatus(), CommonConstant.ORDER_STATUS_1)){
+                            log.info("修改订单:{},支付状态为已过期", orderId);
+                            appOrderProInfo.setOrderStatus(CommonConstant.ORDER_STATUS_3);
+                            appOrderProInfoService.updateById(appOrderProInfo);
+                        }
+                    }
+                }
+                if (appOrderProInfoList.stream().filter(appOrderProInfo -> Objects.equals(appOrderProInfo.getOrderStatus(),
+                        CommonConstant.ORDER_STATUS_3)).count() == appOrderProInfoList.size()){
+                    log.info("修改订单:{},支付状态为已过期", orderId);
+                    appOrder.setOrderStatus(CommonConstant.ORDER_STATUS_4);
+                    appOrderService.updateById(appOrder);
+                }
+            }
+
+        }
+    }
+}

+ 38 - 9
national-motion-module-system/national-motion-system-biz/src/main/java/org/jeecg/modules/rabbitmq/RabbitMQConfig.java

@@ -13,29 +13,58 @@ public class RabbitMQConfig {
     /**
      * ---------------------------------------------订单超时未支付取消
      */
-    public static final String DELAY_EXCHANGE = "order_delayed_exchange";
-    public static final String DELAY_QUEUE = "order_delayed_queue";
-    public static final String DELAY_ROUTING_KEY = "order_delayed_key";
+    public static final String ORDER_DELAY_EXCHANGE = "order_delayed_exchange";
+    public static final String ORDER_DELAY_QUEUE = "order_delayed_queue";
+    public static final String ORDER_DELAY_ROUTING_KEY = "order_delayed_key";
 
     // 声明延迟交换机(持久化)
     @Bean
-    public CustomExchange delayExchange() {
+    public CustomExchange orderDelayExchange() {
         Map<String, Object> args = new HashMap<>();
         args.put("x-delayed-type", "direct"); // 延迟交换机类型
-        return new CustomExchange(DELAY_EXCHANGE, "x-delayed-message", true, false, args);
+        return new CustomExchange(ORDER_DELAY_EXCHANGE, "x-delayed-message", true, false, args);
     }
 
     // 声明持久化队列
     @Bean
-    public Queue delayQueue() {
-        return QueueBuilder.durable(DELAY_QUEUE)
+    public Queue orderDelayQueue() {
+        return QueueBuilder.durable(ORDER_DELAY_QUEUE)
                 .withArgument("x-dead-letter-exchange", "dlx_exchange") // 死信交换机(可选)
                 .build();
     }
 
     // 绑定队列到交换机
     @Bean
-    public Binding delayBinding(Queue delayQueue, CustomExchange delayExchange) {
-        return BindingBuilder.bind(delayQueue).to(delayExchange).with(DELAY_ROUTING_KEY).noargs();
+    public Binding orderDelayBinding(Queue orderDelayQueue, CustomExchange orderDelayExchange) {
+        return BindingBuilder.bind(orderDelayQueue).to(orderDelayExchange).with(ORDER_DELAY_ROUTING_KEY).noargs();
+    }
+
+    /**
+     * ---------------------------------------------订单过期
+     */
+    public static final String ORDER_EXPIRE_DELAY_EXCHANGE = "order_expire_delayed_exchange";
+    public static final String ORDER_EXPIRE_DELAY_QUEUE = "order_expire_delayed_queue";
+    public static final String ORDER_EXPIRE_DELAY_ROUTING_KEY = "order_expire_delayed_key";
+
+    // 声明延迟交换机(持久化)
+    @Bean
+    public CustomExchange orderExpireDelayExchange() {
+        Map<String, Object> args = new HashMap<>();
+        args.put("x-delayed-type", "direct"); // 延迟交换机类型
+        return new CustomExchange(ORDER_EXPIRE_DELAY_EXCHANGE, "x-delayed-message", true, false, args);
+    }
+
+    // 声明持久化队列
+    @Bean
+    public Queue orderExpireDelayQueue() {
+        return QueueBuilder.durable(ORDER_EXPIRE_DELAY_QUEUE)
+                .withArgument("x-dead-letter-exchange", "dlx_exchange") // 死信交换机(可选)
+                .build();
+    }
+
+    // 绑定队列到交换机
+    @Bean
+    public Binding orderExpireDelayBinding(Queue orderExpireDelayQueue, CustomExchange orderExpireDelayExchange) {
+        return BindingBuilder.bind(orderExpireDelayQueue).to(orderExpireDelayExchange).with(ORDER_EXPIRE_DELAY_ROUTING_KEY).noargs();
     }
 }

+ 2 - 2
national-motion-module-system/national-motion-system-start/src/main/resources/application-dev.yml

@@ -49,8 +49,8 @@ spring:
   rabbitmq:
     host: localhost
     port: 5672
-    username: admin
-    password: admin123
+    username: guest
+    password: guest
     listener:
       simple:
         acknowledge-mode: manual  # 手动确认

+ 10 - 0
national-motion-module-system/national-motion-system-start/src/main/resources/application-prod.yml

@@ -46,6 +46,16 @@ spring:
       mail.smtp.auth: true
       smtp.ssl.enable: true
       mail.debug: true  # 启用调试模式(查看详细日志)
+  rabbitmq:
+    host: localhost
+    port: 5672
+    username: admin
+    password: admin123
+    listener:
+      simple:
+        acknowledge-mode: manual  # 手动确认
+        concurrency: 5
+        prefetch: 1000
   ## quartz定时任务,采用数据库方式
   quartz:
     job-store-type: jdbc