RabbitMQ实用组件汇总
1.消费端自定义监听组件
需继承DefaultConsumer,并且重写方法handleDelivery,然后在消费者方做消息确认(ACK)或拒绝(NACK)处理
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
super.handleDelivery(consumerTag, envelope, properties, body);
try {
Integer age = (Integer) properties.getHeaders().get("age");
if (age != 0) {
System.out.println("消费消息"+ new String(body));
channel.basicAck( envelope.getDeliveryTag(),false);
}else{
throw new RuntimeException("Exception");
}
}catch (Exception e) {
System.out.println("异常消费消息:"+new String(body));
// channel.basicNack(envelope.getDeliveryTag(),false,true);
channel.basicNack(envelope.getDeliveryTag(),false,false);
}
2.生产端自定义消息确认组件
需实现ConfirmListener接口,重写handleAck和handleNack方法,根据生产者发送消息到Broker的签收状态可在生产者方处理业务逻辑
3.消费端消息限流
消费端新增channel.basicQos(消息大小,限流数目,限流级别),实现一个自定义消费监听组件,进行手动或自动消息签收处理。
4.生产者ReturnListener消息不可达监听组件
由于routingkey或者exchange不存在导致消息不能路由到队列上,生产者可启用ReturnListener组件,具体为需实现ReturnListener接口,并重写handleReturn方法,可以进行业务逻辑处理。
@Override
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("处理Return_listener消息,记录等.."+replyCode+routingKey+replyText+properties.getHeaders()+(new String(body)));
}
