Selaa lähdekoodia

feat(channel): 添加渠道方订单推送补偿功能

- 新增渠道方订单推送补偿接口,补偿已完成且备注为“通过补偿修复处理”的订单
- 实现推送充电订单信息给渠道方并扣减渠道方账户余额的补偿逻辑
- 补偿推送失败时记录日志但不回滚,保证补偿流程的容错性
- 优化渠道方账户余额扣减,新增资金流水记录功能
- 订单处理流程中针对渠道方订单调用补偿推送逻辑
- 用户账户余额更新增加行锁,防止余额并发更新丢失
- 用户订单状态更新新增基于当前状态的条件更新,避免并发覆盖问题
- 订单支付前增加订单存在及状态验证,确保业务流程正确性
- 在创建订单及支付接口上添加防重复提交注解,防止重复请求造成数据异常
wzq 1 viikko sitten
vanhempi
commit
1cf7cd0647

+ 11 - 0
src/main/java/com/zsElectric/boot/business/controller/applet/AppletChargeController.java

@@ -168,4 +168,15 @@ public class AppletChargeController {
         return Result.success(apiLog.getDecryptedRequestData());
     }
 
+    /**
+     * 渠道方订单推送补偿
+     * 针对已完成(status=3)、备注为"通过补偿修复处理"的渠道方订单,补推送消息 + 扣减余额
+     */
+    @Operation(summary = "渠道方订单推送补偿", description = "补偿已完成的渠道方订单:推送消息+扣减余额")
+    @PostMapping("/compensateChannelOrderPush")
+    public Result<String> compensateChannelOrderPush() {
+        String result = chargeOrderInfoService.compensateChannelOrderPush();
+        return Result.success(result);
+    }
+
 }

+ 2 - 0
src/main/java/com/zsElectric/boot/business/controller/applet/AppletWFTOrderController.java

@@ -46,6 +46,7 @@ public class AppletWFTOrderController {
      */
     @Operation(summary = "创建订单")
     @PostMapping("/createOrder")
+    @RepeatSubmit(expire = 5)
     @Log(value = "创建订单", module = LogModuleEnum.APP_ORDER)
     public Result<AppUserPayForm> createOrder(@Valid @RequestBody AppLevelOrderForm appLevelOrderForm,HttpServletRequest request) {
         String IP = IPUtils.getIpAddr(request);
@@ -62,6 +63,7 @@ public class AppletWFTOrderController {
      */
     @Operation(summary = "订单-支付")
     @PutMapping("/payOrder/{orderId}")
+    @RepeatSubmit(expire = 5)
     @Log(value = "订单-支付", module = LogModuleEnum.APP_ORDER)
     public Result<AppUserPayForm> payOrder(@PathVariable("orderId") String orderId,HttpServletRequest request){
         String ip = IPUtils.getIpAddr(request);

+ 8 - 0
src/main/java/com/zsElectric/boot/business/mapper/UserAccountMapper.java

@@ -47,4 +47,12 @@ public interface UserAccountMapper extends BaseMapper<UserAccount> {
      */
     int executeCompensation(@Param("startTime") LocalDateTime startTime);
 
+    /**
+     * 按用户ID查询账户并加行锁,避免余额并发更新丢失。
+     *
+     * @param userId 用户ID
+     * @return 用户账户
+     */
+    UserAccount selectByUserIdForUpdate(@Param("userId") Long userId);
+
 }

+ 15 - 0
src/main/java/com/zsElectric/boot/business/mapper/UserOrderInfoMapper.java

@@ -10,6 +10,7 @@ import com.zsElectric.boot.business.model.vo.UserOrderInfoVO;
 import org.apache.ibatis.annotations.Mapper;
 import org.apache.ibatis.annotations.Param;
 
+import java.time.LocalDateTime;
 import java.util.List;
 
 /**
@@ -37,4 +38,18 @@ public interface UserOrderInfoMapper extends BaseMapper<UserOrderInfo> {
      * @return 用户支付订单信息导出列表
      */
     List<UserOrderInfoExportDTO> listExportUserOrderInfo(@Param("queryParams") UserOrderInfoQuery queryParams);
+
+    /**
+     * 仅在当前状态匹配时更新订单状态,避免并发覆盖。
+     *
+     * @param orderNo 订单号
+     * @param currentStatus 当前状态
+     * @param targetStatus 目标状态
+     * @param updateTime 更新时间
+     * @return 受影响行数
+     */
+    int updateOrderStatusIfCurrent(@Param("orderNo") String orderNo,
+                                   @Param("currentStatus") Integer currentStatus,
+                                   @Param("targetStatus") Integer targetStatus,
+                                   @Param("updateTime") LocalDateTime updateTime);
 }

+ 8 - 0
src/main/java/com/zsElectric/boot/business/service/ChargeOrderInfoService.java

@@ -115,4 +115,12 @@ public interface ChargeOrderInfoService extends IService<ChargeOrderInfo> {
      */
     String compensateUnprocessedOrders();
 
+    /**
+     * 补偿渠道方订单:补推送消息 + 扣减余额
+     * 针对已完成(status=3)、备注为"通过补偿修复处理"的渠道方订单
+     *
+     * @return 补偿结果描述
+     */
+    String compensateChannelOrderPush();
+
 }

+ 18 - 4
src/main/java/com/zsElectric/boot/business/service/WFTOrderService.java

@@ -218,6 +218,12 @@ public class WFTOrderService {
 
     public AppUserPayForm payOrder(String orderId, String ip) {
         UserOrderInfo orderInfo = userOrderInfoMapper.selectById(orderId);
+        if (orderInfo == null) {
+            throw new BusinessException("订单不存在");
+        }
+        if (!SystemConstants.STATUS_ONE.equals(orderInfo.getOrderStatus())) {
+            throw new BusinessException("当前订单状态不允许发起支付");
+        }
         //构建支付表单
         AppUserPayForm payForm = new AppUserPayForm();
         payForm.setOrderId(orderInfo.getId()).setOrderNo(orderInfo.getOrderNo());
@@ -286,12 +292,20 @@ public class WFTOrderService {
         if (ObjectUtil.isNull(userOrderInfo)) {
             throw new BusinessException("订单不存在");
         }
-        userOrderInfo.setOrderStatus(SystemConstants.STATUS_THREE);
-        int i = userOrderInfoMapper.updateById(userOrderInfo);
-        if (i > 0) {
+        if (SystemConstants.STATUS_THREE.equals(userOrderInfo.getOrderStatus())) {
             return Boolean.TRUE;
         }
-        return Boolean.FALSE;
+        if (!SystemConstants.STATUS_ONE.equals(userOrderInfo.getOrderStatus())) {
+            log.warn("订单状态不是待支付,拒绝关闭: orderNo={}, orderStatus={}", orderNo, userOrderInfo.getOrderStatus());
+            return Boolean.FALSE;
+        }
+        int updated = userOrderInfoMapper.updateOrderStatusIfCurrent(
+                orderNo,
+                SystemConstants.STATUS_ONE,
+                SystemConstants.STATUS_THREE,
+                LocalDateTime.now()
+        );
+        return updated > 0;
     }
 
     /**

+ 169 - 0
src/main/java/com/zsElectric/boot/business/service/impl/ChargeOrderInfoServiceImpl.java

@@ -68,6 +68,9 @@ import org.redisson.api.RLock;
 import org.redisson.api.RedissonClient;
 import java.util.concurrent.TimeUnit;
 
+import com.zsElectric.boot.business.model.entity.FirmAccountLog;
+import com.zsElectric.boot.common.util.OkHttpUtil;
+
 import static com.zsElectric.boot.business.service.WFTOrderService.USER_FUND_LOCK_KEY;
 import static com.zsElectric.boot.business.service.WFTOrderService.USER_FUND_LOCK_EXPIRE;
 
@@ -113,6 +116,8 @@ public class ChargeOrderInfoServiceImpl extends ServiceImpl<ChargeOrderInfoMappe
     private final ObjectMapper objectMapper;
     private final DictItemService dictItemService;
     private final RedissonClient redissonClient;
+    private final OkHttpUtil okHttpUtil;
+    private final FirmAccountLogMapper firmAccountLogMapper;
 
     //充电订单号前缀
     private final String ORDER_NO_PREFIX = "CD";
@@ -1124,6 +1129,11 @@ public class ChargeOrderInfoServiceImpl extends ServiceImpl<ChargeOrderInfoMappe
                 orderSettlement(order.getId());
                 log.info("订单{}余额扣减完成", order.getChargeOrderNo());
             }
+
+            // 渠道方订单:推送充电订单信息 + 渠道方账户余额扣减
+            if (Objects.equals(order.getOrderType(), SystemConstants.CHARGE_ORDER_TYPE_CHANNEL)) {
+                compensateChannelOrder(order, apiLog.getDecryptedRequestData());
+            }
             
             log.info("订单{}通过API日志处理成功,实际费用: {}", order.getChargeOrderNo(), order.getRealCost());
             return true;
@@ -1296,6 +1306,11 @@ public class ChargeOrderInfoServiceImpl extends ServiceImpl<ChargeOrderInfoMappe
                     orderSettlement(order.getId());
                     log.info("补偿任务: 订单{}余额扣减完成", order.getChargeOrderNo());
                 }
+
+                // 渠道方订单:推送充电订单信息 + 渠道方账户余额扣减
+                if (Objects.equals(order.getOrderType(), SystemConstants.CHARGE_ORDER_TYPE_CHANNEL)) {
+                    compensateChannelOrder(order, null);
+                }
                 
                 successCount++;
                 chargeStatusCount++;
@@ -1316,4 +1331,158 @@ public class ChargeOrderInfoServiceImpl extends ServiceImpl<ChargeOrderInfoMappe
         return result;
     }
 
+    /**
+     * 渠道方订单补偿:推送充电订单信息给渠道方 + 渠道方账户余额扣减
+     * 推送失败只记日志,不回滚补偿
+     *
+     * @param order        补偿完成的订单
+     * @param apiLogData   API日志原始数据(可为null,则从订单字段构建推送数据)
+     */
+    private void compensateChannelOrder(ChargeOrderInfo order, String apiLogData) {
+        try {
+            FirmInfo firmInfo = firmInfoMapper.selectById(order.getFirmId());
+            if (firmInfo == null) {
+                log.warn("补偿任务: 订单{}找不到渠道方信息,firmId: {}", order.getChargeOrderNo(), order.getFirmId());
+                return;
+            }
+
+            // 1. 推送充电订单信息给渠道方
+            pushChargeOrderInfoToChannel(order, firmInfo, apiLogData);
+
+            // 2. 渠道方账户余额扣减
+            deductChannelFirmBalance(order, firmInfo);
+
+        } catch (Exception e) {
+            log.error("补偿任务: 渠道方订单{}补偿处理异常: {}", order.getChargeOrderNo(), e.getMessage(), e);
+        }
+    }
+
+    /**
+     * 推送充电订单信息给渠道方(参考 /notification_charge_order_info 接口推送格式)
+     * 失败只记日志,不影响补偿结果
+     */
+    private void pushChargeOrderInfoToChannel(ChargeOrderInfo order, FirmInfo firmInfo, String apiLogData) {
+        try {
+            // 构建推送数据
+            Map<String, Object> pushData;
+            if (apiLogData != null) {
+                // 使用API日志原始数据
+                pushData = objectMapper.readValue(apiLogData, Map.class);
+            } else {
+                // 从order字段构建推送数据
+                pushData = new HashMap<>();
+                pushData.put("StartChargeSeq", order.getStartChargeSeq());
+                pushData.put("ConnectorID", order.getConnectorId());
+                pushData.put("StartTime", order.getStartTime());
+                pushData.put("EndTime", order.getEndTime());
+                pushData.put("TotalPower", order.getTotalCharge());
+                pushData.put("TotalElecMoney", order.getThirdPartyElecfee());
+                pushData.put("TotalSeviceMoney", order.getThirdPartyServerfee());
+                pushData.put("TotalMoney", order.getThirdPartyTotalCost());
+                pushData.put("StopReason", order.getStopReason());
+            }
+            pushData.put("chargeOrderNo", order.getChargeOrderNo());
+
+            String url = firmInfo.getChannelUrl() + "/notification_charge_order_info";
+            String requestBody = com.alibaba.fastjson2.JSONObject.toJSONString(pushData);
+
+            int maxRetries = 3;
+            int retryIntervalMs = 5000;
+            for (int attempt = 1; attempt <= maxRetries; attempt++) {
+                try {
+                    JsonNode response = okHttpUtil.doPostJson(url, requestBody, null);
+                    log.info("补偿任务: 渠道方推送充电订单信息成功 - chargeOrderNo: {}, firmId: {}, response: {}",
+                            order.getChargeOrderNo(), order.getFirmId(), response);
+                    return;
+                } catch (Exception e) {
+                    log.error("补偿任务: 渠道方推送充电订单信息失败(第{}次) - chargeOrderNo: {}, firmId: {}, url: {}, 错误: {}",
+                            attempt, order.getChargeOrderNo(), order.getFirmId(), url, e.getMessage(), e);
+                    if (attempt < maxRetries) {
+                        try {
+                            Thread.sleep(retryIntervalMs);
+                        } catch (InterruptedException ie) {
+                            Thread.currentThread().interrupt();
+                            break;
+                        }
+                    }
+                }
+            }
+        } catch (Exception e) {
+            log.error("补偿任务: 构建渠道方推送数据失败 - chargeOrderNo: {}, 错误: {}", order.getChargeOrderNo(), e.getMessage(), e);
+        }
+    }
+
+    /**
+     * 渠道方账户余额扣减 + 记录资金流水
+     */
+    private void deductChannelFirmBalance(ChargeOrderInfo order, FirmInfo firmInfo) {
+        BigDecimal cost = order.getRealCost();
+        if (cost == null || cost.compareTo(BigDecimal.ZERO) <= 0) {
+            log.info("补偿任务: 订单{}实际费用为0,跳过渠道方余额扣减", order.getChargeOrderNo());
+            return;
+        }
+
+        // 记录资金流水
+        FirmAccountLog accountLog = new FirmAccountLog();
+        accountLog.setFirmId(firmInfo.getId());
+        accountLog.setFirmType(firmInfo.getFirmType());
+        accountLog.setEventDesc("渠道方充电订单下账(补偿)");
+        accountLog.setSerialNo(order.getChargeOrderNo());
+        accountLog.setIncomeType(2);
+        accountLog.setBeforeChange(firmInfo.getBalance());
+        accountLog.setAfterChange(firmInfo.getBalance().subtract(cost));
+        accountLog.setMoneyChange(cost);
+        firmAccountLogMapper.insert(accountLog);
+
+        // 渠道方账户余额修改
+        firmInfo.setBalance(firmInfo.getBalance().subtract(cost));
+        firmInfoMapper.updateById(firmInfo);
+        log.info("补偿任务: 订单{}渠道方余额扣减完成,扣减金额: {}, 扣减后余额: {}", 
+                order.getChargeOrderNo(), cost, firmInfo.getBalance());
+    }
+
+    @Override
+    @Transactional(rollbackFor = Exception.class)
+    public String compensateChannelOrderPush() {
+        log.info("开始执行渠道方订单推送补偿...");
+
+        // 查询已完成(status=3)、备注为"通过补偿修复处理"的渠道方订单
+        List<ChargeOrderInfo> channelOrders = this.list(Wrappers.<ChargeOrderInfo>lambdaQuery()
+                .eq(ChargeOrderInfo::getStatus, 3)
+                .eq(ChargeOrderInfo::getOrderType, SystemConstants.CHARGE_ORDER_TYPE_CHANNEL)
+                .eq(ChargeOrderInfo::getRemark, "通过补偿修复处理")
+        );
+
+        if (channelOrders.isEmpty()) {
+            log.info("渠道方推送补偿: 没有找到需要补偿的渠道方订单");
+            return "没有找到需要补偿的渠道方订单";
+        }
+
+        log.info("渠道方推送补偿: 找到{}个需要补偿的渠道方订单", channelOrders.size());
+
+        int totalCount = 0;
+        int successCount = 0;
+        List<String> failedOrders = new ArrayList<>();
+
+        for (ChargeOrderInfo order : channelOrders) {
+            totalCount++;
+            try {
+                compensateChannelOrder(order, null);
+                successCount++;
+                log.info("渠道方推送补偿: 订单{}补偿成功", order.getChargeOrderNo());
+            } catch (Exception e) {
+                log.error("渠道方推送补偿: 订单{}补偿失败: {}", order.getChargeOrderNo(), e.getMessage(), e);
+                failedOrders.add(order.getChargeOrderNo());
+            }
+        }
+
+        String result = String.format("渠道方推送补偿完成!总计: %d, 成功: %d, 失败: %d",
+                totalCount, successCount, failedOrders.size());
+        if (!failedOrders.isEmpty()) {
+            result += ", 失败订单: " + String.join(",", failedOrders);
+        }
+        log.info(result);
+        return result;
+    }
+
 }

+ 10 - 6
src/main/java/com/zsElectric/boot/business/service/impl/UserAccountServiceImpl.java

@@ -1,6 +1,5 @@
 package com.zsElectric.boot.business.service.impl;
 
-import com.baomidou.mybatisplus.core.toolkit.Wrappers;
 import com.zsElectric.boot.business.mapper.UserAccountLogMapper;
 import com.zsElectric.boot.business.mapper.UserInfoIntegralLogMapper;
 import com.zsElectric.boot.business.model.entity.UserAccountLog;
@@ -126,20 +125,25 @@ public class UserAccountServiceImpl extends ServiceImpl<UserAccountMapper, UserA
      * @return 更新后的用户账户信息
      */
     @Override
+    @Transactional(rollbackFor = Exception.class)
     public UserAccount updateAccountBalanceAndLog(Long userId, BigDecimal changeAmount,
                                                   Integer changeType, String changeNote, Long changeId) {
         // 查询用户账户
-        UserAccount userAccount = this.getOne(Wrappers.lambdaQuery(UserAccount.class).eq(UserAccount::getUserId, userId).last("limit 1"));
+        UserAccount userAccount = this.baseMapper.selectByUserIdForUpdate(userId);
+        Assert.notNull(userAccount, "用户账户不存在");
+
+        BigDecimal balance = userAccount.getBalance() == null ? BigDecimal.ZERO : userAccount.getBalance();
+        BigDecimal redeemBalance = userAccount.getRedeemBalance() == null ? BigDecimal.ZERO : userAccount.getRedeemBalance();
         
         // 创建账户变动日志
         UserAccountLog accountLog = new UserAccountLog();
-        accountLog.setBeforeBalance(userAccount.getBalance().add(userAccount.getRedeemBalance()));
+        accountLog.setBeforeBalance(balance.add(redeemBalance));
         
         // 计算变更后余额
         BigDecimal finalBalance;
         if (SystemConstants.CHANGE_TYPE_ADD.equals(changeType)) {
             // 增加余额
-            finalBalance = userAccount.getBalance().add(changeAmount);
+            finalBalance = balance.add(changeAmount);
             // 更新账户余额
             userAccount.setBalance(finalBalance);
         }
@@ -158,7 +162,7 @@ public class UserAccountServiceImpl extends ServiceImpl<UserAccountMapper, UserA
 
         if (SystemConstants.CHANGE_TYPE_REDUCE.equals(changeType)){
             // 减少余额
-            finalBalance = userAccount.getBalance().subtract(changeAmount);
+            finalBalance = balance.subtract(changeAmount);
             // 减去账户余额
             userAccount.setBalance(finalBalance);
 
@@ -200,7 +204,7 @@ public class UserAccountServiceImpl extends ServiceImpl<UserAccountMapper, UserA
         accountLog.setChangeNote(changeNote);
         accountLog.setChangeId(changeId);
         accountLog.setAccountType(SystemConstants.ACCOUNT_TYPE_PERSONAL);
-        accountLog.setChangeBalance(userAccount.getBalance().add(userAccount.getRedeemBalance()));
+        accountLog.setChangeBalance(userAccount.getBalance().add(redeemBalance));
         userAccountLogMapper.insert(accountLog);
         
         return userAccount;

+ 19 - 0
src/main/resources/mapper/business/UserAccountMapper.xml

@@ -63,4 +63,23 @@
         WHERE acc.is_deleted = 0
     </update>
 
+    <select id="selectByUserIdForUpdate" resultType="com.zsElectric.boot.business.model.entity.UserAccount">
+        SELECT
+            id,
+            user_id,
+            balance,
+            redeem_balance,
+            integral,
+            create_time,
+            create_by,
+            update_time,
+            update_by,
+            is_deleted
+        FROM c_user_account
+        WHERE user_id = #{userId}
+          AND is_deleted = 0
+        LIMIT 1
+        FOR UPDATE
+    </select>
+
 </mapper>

+ 9 - 0
src/main/resources/mapper/business/UserOrderInfoMapper.xml

@@ -122,4 +122,13 @@
         ORDER BY a.create_time DESC
     </select>
 
+    <update id="updateOrderStatusIfCurrent">
+        UPDATE c_user_order_info
+        SET order_status = #{targetStatus},
+            update_time = #{updateTime}
+        WHERE order_no = #{orderNo}
+          AND order_status = #{currentStatus}
+          AND is_deleted = 0
+    </update>
+
 </mapper>