RabbitMQ实用组件汇总

RabbitMQ实用组件汇总

1.消费端自定义监听组件

需继承DefaultConsumer,并且重写方法handleDelivery,然后在消费者方做消息确认(ACK)或拒绝(NACK)处理
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        super.handleDelivery(consumerTag, envelope, properties, body);
        try {
            Integer age = (Integer) properties.getHeaders().get("age");
            if (age != 0) {
                System.out.println("消费消息"+ new String(body));
                channel.basicAck( envelope.getDeliveryTag(),false);
            }else{
                throw new RuntimeException("Exception");
            }
        }catch (Exception e) {
            System.out.println("异常消费消息:"+new String(body));
          //  channel.basicNack(envelope.getDeliveryTag(),false,true);
            channel.basicNack(envelope.getDeliveryTag(),false,false);
        }

2.生产端自定义消息确认组件

需实现ConfirmListener接口,重写handleAck和handleNack方法,根据生产者发送消息到Broker的签收状态可在生产者方处理业务逻辑

3.消费端消息限流

消费端新增channel.basicQos(消息大小,限流数目,限流级别),实现一个自定义消费监听组件,进行手动或自动消息签收处理。

4.生产者ReturnListener消息不可达监听组件

由于routingkey或者exchange不存在导致消息不能路由到队列上,生产者可启用ReturnListener组件,具体为需实现ReturnListener接口,并重写handleReturn方法,可以进行业务逻辑处理。

  @Override
    public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
            System.out.println("处理Return_listener消息,记录等.."+replyCode+routingKey+replyText+properties.getHeaders()+(new String(body)));
    }
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

友情链接更多精彩内容