上一篇 <<<Rabbitmq解决分布式事务思路
下一篇 >>>Rabbitmq环境安装

核心思路
补单队列和主程序不在一台机器
1、使用扇形交换机
2、生成者只需要发送消息到交换机,扇形交换机让绑定的队列进行消费
3、派单消费者正常消费,补单消费者做好幂等性,在主程序报错的情况下进行补单操作
核心代码
1.配置信息,必须开启生产者消息确认机制
spring:
rabbitmq:
host: 10.211.55.16
port: 5672
username: jarye
password: 123456
virtual-host: /mytest1205
###开启消息确认机制 confirms
publisher-confirms: true
publisher-returns: true
listener:
simple:
retry:
enabled: true
max-attempts: 5
initial-interval: 3000ms
acknowledge-mode: manual
2.生产者必须继承ConfirmCallback,保证消息的可达性,消费者不确认的话则手动重试
@Component
@Slf4j
public class OrderProducer implements RabbitTemplate.ConfirmCallback {
@Autowired
private OrderMapper orderMapper;
@Autowired
private RabbitTemplate rabbitTemplate;
@Transactional
public String send() {
// 1.创建订单
String orderId = System.currentTimeMillis() + "";
UserEntity orderEntity = createOrder(orderId);
//2.将订单添加到数据库中(步骤一 先往数据库中添加一条数据)
int result = orderMapper.addOrder(orderEntity);
if (result <= 0) {
return orderId;
}
//3.使用消息中间件异步 ,分配订单
String sendMsgJson = JSONObject.toJSONString(orderEntity);
send(sendMsgJson);
//抛出异常,让本地回滚,但消息已经发送
int i = 1 / 0;
return orderId;
}
public UserEntity createOrder(String orderId) {
UserEntity orderEntity = new UserEntity();
orderEntity.setName("张三");
orderEntity.setId(1L);
// 价格是300元
orderEntity.setAge(18);
return orderEntity;
}
private void send(String sendMsg) {
log.info(">>>生产者发送订单数据:" + sendMsg);
// 设置生产者消息确认机制
this.rabbitTemplate.setMandatory(true);
this.rabbitTemplate.setConfirmCallback(this);
// 构建回调返回参数
CorrelationData correlationData = new CorrelationData(sendMsg);
rabbitTemplate.convertAndSend(RabbitMQConfig.ORDER_EXCHANGE_NAME, "", sendMsg, correlationData);
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String s) {
String sendMsg = correlationData.getId();
System.out.println("生产者开始消息确认orderId:" + sendMsg);
//如果没有发送成功,则考虑重试
if (!ack) {
//递归调用发送,可以考虑重试次数
send(sendMsg);
return;
}
System.out.println("生产者消息确认orderId:" + sendMsg);
}
}
3.消费者保证幂等性及手动消息确认
@Component
public class DistriLeafleConsumer {
@RabbitListener(queues = "order_dic_queue")
public void distriLeafleConsumer(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody());
System.out.println("派代服务平台消费msg:" + msg);
JSONObject jsonObject = JSONObject.parseObject(msg);
// 订单id
String orderId = jsonObject.getString("orderId");
// 手动ack 删除该消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
// 假设派单userID 1234
// Long userId = 1234L;
/* DispatchEntity dispatchEntity = new DispatchEntity(orderId, userId);
int result = dispatchMapper.insertDistribute(dispatchEntity);
if (result >= 0) {
// 手动ack 删除该消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}*/
}
}
@Component
public class CreateOrderConsumer {
@Autowired
private OrderMapper orderMapper;
@RabbitListener(queues = "order_create_queue")
public void createOrderConsumer(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody());
System.out.println("补单服务平台消费msg:" + msg);
UserEntity orderEntity = JSONObject.parseObject(msg, UserEntity.class);
Long orderId = orderEntity.getId();
// 根据订单号码查询该笔订单是否创建
UserEntity dbOrderEntity = orderMapper.findOrderId(orderId);
if (dbOrderEntity != null) {
// 手动ack 删除该消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
return;
}
int result = orderMapper.addOrder(orderEntity);
if (result >= 0) {
// 手动ack 删除该消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
}
推荐阅读:
<<<消息中间件的核心思想
<<<消息中间件常见问题汇总
<<<基于Netty简单手写消息中间件思路
<<<消息队列常用名词与中间件对比
<<<Rabbitmq基础知识
<<<Rabbitmq示例之点对点简单队列
<<<Rabbitmq示例之工作(公平)队列
<<<Rabbitmq示例之发布订阅模式
<<<Rabbitmq示例之路由模式Routing
<<<Rabbitmq示例之通配符模式Topics
<<<Rabbitmq示例之RPC模式
<<<Rabbitmq队列模式总结
<<<Rabbitmq如何保证消息不丢失
<<<Springboot利用AmqpTemplate整合Rabbitmq
<<<Rabbitmq如何保证幂等性
<<<Rabbitmq的重试策略
<<<Rabbitmq通过死信队列实现过期监听
<<<Rabbitmq解决分布式事务思路
<<<Rabbitmq环境安装
<<<Kafka中的专业术语都有哪些
<<<Kafka的设计原理介绍
<<<Kafka集群如何实现相互感知
<<<Kafka如何实现分区及指定分区消费
<<<Kafka如何保证消息顺序消费
<<<Kafka如何保证高吞吐量
<<<Kafka集群环境搭建
<<<RocketMQ架构原理
<<<RocketMQ、RabbitMQ和Kafka的对比
<<<SpringBoot整合RocketMQ示例
<<<RocketMQ保证顺序消费demo
<<<RocketMQ如何动态扩容和缩容
<<<RocketMQ如何解决分布式事务
<<<RocketMQ单机版本安装
<<<RocketMQ集群环境程序启用相关知识点
<<<RocketMQ单机做主备实操
<<<RocketMQ所有配置说明
