原创

RabbitMQ

一、RabbitMQ介绍

1.1 引言

1、模块之间的耦合度高,导致其中一个模块宕机后,全部功能都不能使用

2、同步通讯的成本问题

1.2 RabbitMQ的介绍

市面上比较火的几款MQ:

ActiveMQ、RokeMQ、Kafka、RabbitMQ

  1. 语言的支持:ActiveMQ和RokeMQ只支持Java语言,Kafka和RabbitMQ支持多种语言
  2. 效率方面:ActiveMQ、RokeMQ和Kafka效率都是毫秒级别,RabbitMQ是微秒级别
  3. 消息丢失,消息重复问题:RabbitMQ针对消息的持久化和重复问题都有比较成熟的解决方案
  4. 学习成本:RabbitMQ较为简单

RabbitMQ严格遵循AMQP协议(高级消息队列协议),帮助我们在进程之间传递异步消息

二、RabbitMQ安装

==使用docker-compose安装==

version: "3.1"
services:
  rabbitmq:
    image: daocloud.io/library/rabbitmq:management
    restart: always
    container_name: rabbitmq
    ports:
      - 5672:5672
      - 15672:15672
    volumes:
      - ./data:/var/lib/rabbitmq

三、RabbitMQ架构

3.1 官方简单架构图

  1. Publisher-生产者:发布消息到RabbitMQ中的Exchange
  2. Consumer-消费者:监听RabbitMQ中的Queue的消息
  3. Exchange-交换机:和生产者建立连接并接收生产者的消息
  4. Queue-队列:Exchange会将消息分发到指定的Queue,然后Queue和消费者进行交互
  5. Routes-路由:交换机以什么样的策略将消息发布到Queue

img

3.2完整架构图

3.3查看图形化界面并创建一个virtualhost

创建一个全新的用户和全新的virtualhost,并将test用户设置可以操作/test的权限






四、RabbitMQ的使用

4.1、RabbitMQ常用的六种通讯方式






4.2、Java连接RabbitMQ

创建maven项目

导入依赖

    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.10.0</version>
        </dependency>
    </dependencies>

创建工具类连接RabbitMQ

package com.bingo;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;


public class RabbitMQClient {
    public static Connection getConnection(){
        //创建connection工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("*.*.*.*");
        factory.setPort(5672);
        factory.setUsername("test");
        factory.setPassword("test");
        factory.setVirtualHost("/test");
        //创建connection
        Connection connection = null;
        try {
            connection = factory.newConnection();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
        return connection;
    }
}

测试

package com.bingo;

import com.rabbitmq.client.Connection;
import org.junit.Test;

import java.io.IOException;

public class Demo1 {

    @Test
    public void getConnection() throws IOException {
        Connection connection = RabbitMQClient.getConnection();
        System.out.println(connection);
        connection.close();
    }
}

提示:虚拟需要关闭防火墙,阿里云服务器需要添加安全组访问规则(打开16572和5672端口)






4.3、Hello-world

一个生产者、一个默认交换机、一个队列和一个消费者

img

创建生产者,创建一个channel、发布消息到exchange,指定路由规则

package com.bingo.helloworld;

import com.bingo.RabbitMQClient;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.junit.Test;


public class Publish {

    @Test
    public void publish() throws Exception {
        //1. 获取Connection
        Connection connection = RabbitMQClient.getConnection();
        //2. 创建channel
        Channel channel = connection.createChannel();
        //3. 发布消息到exchange
        String msg = "Hello-world!";
        /**
         * 参数1:指定exchange,使用“”
         * 参数2:指定路由的规则,使用具体的队列名称
         * 参数3:指定传递的消息所携带的properties,使用null
         * 参数4:指定发布的具体消息,byte[]类型
         */
        channel.basicPublish("","HelloWorld",null,msg.getBytes());
        //exchange是不会将消息持久化到本地,Queue才会将消息持久化
        System.out.println("生产者发布消息成功!");

        //4. 释放资源
        channel.close();
        connection.close();
    }
}

创建消费者,创建一个channel,创建一个队列并且消费当前队列

package com.bingo.helloworld;
import com.bingo.RabbitMQClient;
import com.rabbitmq.client.*;
import org.junit.Test;              	 	
import java.io.IOException;

public class Consumer {

    @Test
    public void consumer() throws Exception {
        //1. 获取连接对象
        Connection connection = RabbitMQClient.getConnection();
        //2. 创建channel
        Channel channel = connection.createChannel();
        //3. 声明队列,Hello-World
        /**
         * 参数1. queue-指定队列名称
         * 参数2. durable-当前队列是否需要持久化
         * 参数3. exchange-是否排外(执行conn.close()之后-当前队列会被自动删除,当前队列只能被一个消费者消费)
         * 参数4. autoDelete-如果这个队列没有消费者在消费,队列自动删除
         * 参数5. arguments-指定当前队列的其他信息
         */
        channel.queueDeclare("HelloWorld",true,false,false,null);

        //4. 开启监听Queue
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接收到消息"+new String(body,"UTF-8"));
            }
        };
        /**
         * 参数1. queue-指定消费哪一个队列
         * 参数2. deliverCallback-指定是否自动ACK(当值为true时,接收消息后会立即高速RabbitMQ)
         * 参数3. consumer-指定消费回调
         */
        channel.basicConsume("HelloWorld",true,consumer);
        System.out.println("开始监听队列");
        //System.in.read()
        System.in.read();

        //5. 释放资源
        channel.close();
        connection.close();

    }
}





4.4、Work

一个生产者、一个默认交换机、一个队列和两个消费者

img

循环发送消息的生产者

package com.work;
import com.RabbitMQClient;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.junit.jupiter.api.Test;
/**
 * @author chenpingping
 * @version 1.0
 * @date 2021/1/15 13:52
 */
public class Publisher {
    @Test
    public void publish() throws Exception {
        //1. 获取Connection
        Connection connection = RabbitMQClient.getConnection();
        //2. 创建channel
        Channel channel = connection.createChannel();
        //3. 发布消息到exchange
        for (int i = 0; i < 10; i++) {
            String msg = "Work-->"+i;
            channel.basicPublish("","Work",null,msg.getBytes());
            /**
             * 参数1:指定exchange,使用“”
             * 参数2:指定路由的规则,使用具体的队列名称
             * 参数3:指定传递的消息所携带的properties,使用null
             * 参数4:指定发布的具体消息,byte[]类型
             */
        }

        //exchange是不会将消息持久化到本地,Queue才会将消息持久化
        System.out.println("生产者发布消息成功!");

        //4. 释放资源
        channel.close();
        connection.close();
    }
}

  1. 创建多个消费者(手动设置ACK,每一个消费者消费能力不同)
package com.work;
import com.RabbitMQClient;
import com.rabbitmq.client.*;
import org.junit.jupiter.api.Test;
import java.io.IOException;

/**
 * @author chenpingping
 * @version 1.0
 * @date 2021/1/15 13:56
 */
public class Consumer1 {
    @Test
    public void consumer() throws Exception {
        //1. 获取连接对象
        Connection connection = RabbitMQClient.getConnection();
        //2. 创建channel
        final Channel channel = connection.createChannel();
        //3. 声明队列,Hello-World
        /**
         * 参数1. queue-指定队列名称
         * 参数2. durable-当前队列是否需要持久化
         * 参数3. exchange-是否排外(执行conn.close()之后-当前队列会被自动删除,当前队列只能被一个消费者消费)
         * 参数4. autoDelete-如果这个队列没有消费者在消费,队列自动删除
         * 参数5. arguments-指定当前队列的其他信息
         */
        channel.queueDeclare("Work",true,false,false,null);
        //3.5 指定当前消费者,一次消费多少个消息
        channel.basicQos(1);

        //4. 开启监听Queue
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("消费者1接收到消息"+new String(body,"UTF-8"));
                //手动ACK
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        /**
         * 参数1. queue-指定消费哪一个队列
         * 参数2. deliverCallback-指定是否自动ACK(当值为true时,接收消息后会立即高速RabbitMQ)
         * 参数3. consumer-指定消费回调
         */
        channel.basicConsume("Work",false,consumer);
        System.out.println("开始监听队列");
        //System.in.read()
        System.in.read();

        //5. 释放资源
        channel.close();
        connection.close();

    }
}






4.5、Publish/Subscribe

一个生产者、一个自己创建的交换机、两个队列和两个消费者

img

声明一个Fanout类型的exchange,并且将exchange和queue绑定在一起,绑定的方式就是直接绑定

    @Test
    public void publish() throws Exception {
        //1. 获取Connection、2. 创建channel
		...
        //3. 创建exchange,绑定一个队列
        /**
         * 参数1. exchange的名称
         * 参数2. exchange的类型 FANOUT-publish、DIRECT-Routing、TOPIC-Topics
         */
        channel.exchangeDeclare("publish-exchange", BuiltinExchangeType.FANOUT);
        channel.queueBind("publish-queue1","publish-exchange","");
        channel.queueBind("publish-queue2","publish-exchange","");
        //4. 发布消息到exchange
        for (int i = 0; i < 10; i++) {
            String msg = "pubsub-->"+i;
            channel.basicPublish("publish-exchange","",null,msg.getBytes());
        }
        //exchange是不会将消息持久化到本地,Queue才会将消息持久化
        System.out.println("生产者发布消息成功!");

        //5. 释放资源
        ...
    }

消费者依旧正常监听某一个队列即可

与Work的不同之处在于publish-subscribe将同一个消息发布到多个队列






4.6、Routing

一个生产者、一个交换机、两个队列和两个消费者

img

创建一个DIRECT类型的exchange,并且根据RoutingKey去绑定指定的队列

生产者在创建DIRECT类型的exchange后绑定响应的队列,并且在发送消息时指定具体的RoutingKey即可

        //1. 获取Connection、2. 创建channel
        ...
        //3. 创建exchange,routing-queue-error,routing-queue-info
        channel.exchangeDeclare("routing-exchange", BuiltinExchangeType.DIRECT);
        channel.queueBind("routing-queue-error","routing-exchange","ERROR");
        channel.queueBind("routing-queue-info","routing-exchange","INFO");
        //4. 发布消息到exchange
        channel.basicPublish("routing-exchange","ERROR",null,"ERROR".getBytes());
        channel.basicPublish("routing-exchange","INFO",null,"INFO1".getBytes());
        channel.basicPublish("routing-exchange","INFO",null,"INFO2".getBytes());
        channel.basicPublish("routing-exchange","INFO",null,"INFO3".getBytes());
        System.out.println("生产者发布消息成功!");

        //5. 释放资源
        ...
  1. 消费者基本不变





4.7、Topic

一个生产者、一个交换机、两个队列和两个消费者

img

生产者创建Topic的Exchange并且绑定到队列中,此次绑定通过*和#关键字,对指定routingKey的内容

*-->占位符、#-->通配符,在发送消息时,指定具体的routingKey

        //1. 获取Connection、2. 创建channel
        ...
        //3. 创建exchange,绑定队列 topic-queue-1和topic-queue-2
        // *-->占位符
        // #-->通配符
        channel.exchangeDeclare("topic-exchange", BuiltinExchangeType.TOPIC);
        channel.queueBind("topic-queue-1","topic-exchange","*.red.*");
        channel.queueBind("topic-queue-2","topic-exchange","fast.#");
        channel.queueBind("topic-queue-2","topic-exchange","*.*.rabbit");
        //4. 发布消息到exchange
        channel.basicPublish("topic-exchange","fast.red.monkey",null,"快红猴".getBytes());
        channel.basicPublish("topic-exchange","slow.black.dog",null,"慢黑狗".getBytes());
        channel.basicPublish("topic-exchange","fast.white.cat",null,"快百猫".getBytes());
        //5. 释放资源
        ...
  1. 消费者基本不变





五、RabbitMQ整合SpringBoot

5.1、springboot整合RabbitMQ

  1. 创建SpringBoot工程
  1. 导入依赖
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-amqp</artifactId>
		</dependency>
  1. 编写配置文件
spring:
  rabbitmq:
    host: *.*.*.*
    port: 5672
    username: test
    password: test
    virtual-host: /test

编写配置类,声明exchange和queue,并且绑定在一起

package com.example.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class RabbitMQConfig {

    //1. 创建exchange-topic
    @Bean
    public TopicExchange getTopicExchange(){
        return new TopicExchange("boot-topic-exchange",true,false);
    }

    //2. 创建queue
    @Bean
    public Queue getQueue(){
        return new Queue("boot-queue",true,false,false,null);
    }

    //3. 绑定在一起
    @Bean
    public Binding getBinding(TopicExchange topicExchange,Queue queue){
        return BindingBuilder.bind(queue).to(topicExchange).with("*.red.*");
    }
}

发布消息到RabbitMQ

	@Autowired
	private RabbitTemplate rabbitTemplate;

	@Test
	void contextLoads() {
		rabbitTemplate.convertAndSend("boot-topic-exchange","slow.red.dog","红色的大狼狗".getBytes());
	}

创建消费者监听消息(使用@RabbitListener注解)

package com.example.listen;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;


@Component
public class Consumer {

    @RabbitListener(queues = "boot-queue")
    public void getMassage(Object message){
        System.out.println("接收到消息:"+message.toString());
    }
}





5.2、手动ACK

添加配置文件

spring:
  rabbitmq:
    host: *.*.*.*
    port: 5672
    username: test
    password: test
    virtual-host: /test
    listener:
      simple:
        acknowledge-mode: manual

在消费消息的位置修改方法,再手动ACK

package com.example.listen;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;


@Component
public class Consumer {

    @RabbitListener(queues = "boot-queue")
    public void getMassage(String msg, Channel channel, Message message) throws IOException {
        System.out.println("接收到消息:"+msg);
        //手动ack
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    }
}





六、RabbitMQ的其它操作

6.1、消息的可靠性

RabbitMQ的事务:事务可以保证消息的100%传递,可以通过事务回滚去记录日志,后面定时再发送当前消息。但是事务的操作效率太低,加入事务后效率比不加事务慢至少100倍

6.1.1、Confirm机制(保证生产者发布消息到exchange,无法保证发送到queue)

RabbitMQ除了事务,还提供了Confirm的确认机制,这种机制的效率比事务高很多

普通Confirm方式

        //1. 获取Connection、2. 创建channel
		...
        //3. 发布消息到exchange
        //开启confirm
        channel.confirmSelect();
        String msg = "Hello-world!";
        channel.basicPublish("","HelloWorld",null,msg.getBytes());
		//判断消息是否发送成功
        if (channel.waitForConfirms()){
            System.out.println("生产者发布消息成功!");
        }else {
            System.out.println("消息发送失败");
        }
        //4. 释放资源
        ...

批量Confirm方式

        //1. 获取Connection、2. 创建channel
		...
		//3. 发布消息到exchange
        //开启confirm
        channel.confirmSelect();
		//批量发送消息
        for (int i = 0; i < 10; i++) {
            String msg = "Hello-world"+i;
            channel.basicPublish("","HelloWorld",null,msg.getBytes());
        }

		//确认消息是否发送成功
        //当发送的全部消息有一个失败时,就直接全部失败并抛出异常(IOException)
        channel.waitForConfirmsOrDie();

        //4. 释放资源
		...

异步Confirm方式

        //1. 获取Connection、2. 创建channel
		...
		//3. 发布消息到exchange
        //开启confirm
        channel.confirmSelect();
		//批量发送消息
        for (int i = 0; i < 10; i++) {
            String msg = "Hello-world"+i;
            channel.basicPublish("","HelloWorld",null,msg.getBytes());
        }

		//确认消息是否发送成功
        channel.addConfirmListener(new ConfirmListener() {
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("消息发送成功,标识:"+deliveryTag+",是否是批量"+multiple);
            }

            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("消息发送失败,标识:"+deliveryTag+",是否是批量"+multiple);
            }
        });

        //4. 释放资源
		...





6.1.2、Return机制

Confirm只能保证到exchange,无法保证可以被exchange分发到queue

而且exchange不能持久化消息,queue才可以持久化消息

采用Return机制来监听消息是否从exchange发送到queue中

开启return机制后,要是程序执行回调函数必须使用另一个带mandatory参数(设置为true)的消息构造方法

        //1. 获取Connection、2. 创建channel
        ...
        //开启return机制
        channel.addReturnListener(new ReturnListener() {
            public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //当消息没有送达queue是才会执行
                System.out.println(new String(body,"UTF-8")+"没有送达Queue中");
            }
        });
        //3. 发布消息到exchange

        //开启confirm
        channel.confirmSelect();
        for (int i = 0; i < 10; i++) {
            String msg = "Hello-world"+i;
            //消息的构造方法不一样,多一个参数pros开启保证return执行
            channel.basicPublish("","bingo",true,null,msg.getBytes());
        }

        channel.addConfirmListener(new ConfirmListener() {
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("消息发送成功,标识:"+deliveryTag+",是否是批量"+multiple);
            }

            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("消息发送失败,标识:"+deliveryTag+",是否是批量"+multiple);
            }
        });

        System.in.read();
        //4. 释放资源
        ...





6.1.3、SpringBoot实现Confirm和Return

编写配置文件,克莱齐Confirm和Return机制

spring:
  rabbitmq:
    host: *.*.*.*
    port: 5672
    username: test
    password: test
    virtual-host: /test
    listener:
      simple:
        acknowledge-mode: manual
    publisher-confirm-type: simple
    publisher-returns: true

指定RabbitTemplate对象,开启Confirm和Return,并且编写了回调函数

package com.example.config;

import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;


@Component
public class PublishConfirmAndReturn implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void initMethod(){
        //初始化的时候告诉RabbitMQ开启了Confirm和Returns
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnsCallback(this);
    }

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack){
            System.out.println("消息已经送达exchange");
        }else {
            System.out.println("消息没有送到exchange");
        }
    }

    @Override
    public void returnedMessage(ReturnedMessage returned) {
        System.out.println("消息没有送达queue");
    }
}





6.2.1、消息的重复消费

重复消费消息,会对非幂等操作造成问题(幂等性操作:执行多次操作不影响最终结果,例如删除操作)

重复消费消息的原因是消费者没有给RabbitMQ一个ack

为了解决消息被重复消费的问题,可以采用Redis,在消费者消费之前,将消息id放到redis中

id-0(正在执行业务)

id-1(执行业务成功)

如果ack失败,在RabbitMQ将消息交给其他消费者时,消费者先执行setnx(redis方法),如果key已经存在则获取key的值,当获取到的值为0时,消费者什么都不做,当获取到的值为1时,消费者直接ack.

极端情况:第一个消费者执行业务时出现了死锁,在setnx基础上,再给key设置一个生存时间。

生产者发送消息时指定messageID

AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().deliveryMode(1).messageId(UUID.randomUUID().toString()).build();
        String msg = "Hello-world";
        channel.basicPublish("", "bingo", true, properties, msg.getBytes());
  1. 消费者在消费消息时根据具体业务逻辑操作redis
        //4. 开启监听Queue
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                Jedis jedis = new Jedis("*.*.*.*", 6379);
                String messageId = properties.getMessageId();
                String result = jedis.set(messageId, "0", "NX", "EX", 10);//存活10s
                if (result!=null&&result.equalsIgnoreCase("OK")) {
                    System.out.println("接收到消息" + new String(body, "UTF-8"));
                    jedis.set(messageId,"1");
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }else {
                    String s = jedis.get(messageId);
                    if ("1".equalsIgnoreCase(s)){
                        channel.basicAck(envelope.getDeliveryTag(),false);
                    }
                }
            }
        };

需要先导入redis依赖






6.2.2、SpringBoot实现

导入依赖

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-data-redis</artifactId>
		</dependency>

编写配置文件

spring:
  rabbitmq:
    ...
  redis:
    host: *.*.*.*
    port: 6379

修改生产者

	@Test
	void contextLoads() {
		CorrelationData messageId = new CorrelationData(UUID.randomUUID().toString());
		rabbitTemplate.convertAndSend("boot-topic-exchange","slow.red.dog","红色的大狼狗".getBytes(),messageId);
	}

修改消费者

package com.example.listen;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.concurrent.TimeUnit;


@Component
public class Consumer {

    @Autowired
    private StringRedisTemplate redisTemplate;

    @RabbitListener(queues = "boot-queue")
    public void getMassage(String msg, Channel channel, Message message) throws IOException {
        //0. 获取MessageId
        String messageId = message.getMessageProperties().getMessageId();
        //1. 设置key到redis
        if (redisTemplate.opsForValue().setIfAbsent(messageId, "0", 10, TimeUnit.SECONDS)) {
            //2. 消费消息
            System.out.println("消费消息");
            //3. 设置key的value为1
            redisTemplate.opsForValue().set(messageId, "1");
            //4. 手动ack
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } else {
            //5. 获取redis中的value即可,如果是1,手动ack
            if ("1".equalsIgnoreCase(redisTemplate.opsForValue().get(messageId))) {
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            }
        }
    }
}





  • 作者:管理员(联系作者)
  • 发表时间:2021-01-17 00:01
  • 版权声明:自由转载-非商用-非衍生-保持署名
  • 公众号转载:请在文末添加作者公众号二维码
  • 评论

    随心 QQ
    去年5月份的时候内容来过,又增加了好多新的内容,真好,学习了
    和煦  @ 随心 游客
    sdasasdsa
    随心 QQ
    去年5月份的时候内容来过,又增加了好多新的内容,真好,学习了
    嘎嘎嘎  @ 随心 游客
    哈哈哈哈
    嘎嘎嘎  @ 随心 游客
    哈哈哈哈
    嘎嘎嘎  @ 随心 游客
    哈哈哈哈
    嘎嘎嘎  @ 随心 游客
    哈哈哈哈
    嘎嘎嘎  @ 随心 游客
    哈哈哈哈
    嘎嘎嘎  @ 随心 游客
    哈哈哈哈
    嘎嘎嘎  @ 随心 游客
    哈哈哈哈