MQ

xss大约 4 分钟MQMQ

消息队列MQ

常见的消息队列有:

  • ActiveMQ
  • RabbitMQ
  • Kafka
  • RocketMQ

接下来,我们主要掌握 RabbitMQ,官方文档 https://www.rabbitmq.com/open in new window

安装方式,可以选择 Docker 安装,或 二进制包安装

如果用 Docker 安装:

$ docker run -d --restart=always -m 2g --memory-swap 2g --cpus 1.1 \
-v $PWD/rabbitmq-data:/var/lib/rabbitmq -v $PWD/log:/var/log/rabbitmq \
--hostname rabbitmq01 --name rabbitmq \
-e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin123 \
-p 5672:5672 -p 15672:15672 -p 15692:15692 rabbitmq:3.9.7-management

如果使用二进制包安装,Ubuntu 下安装步骤如下:

# 更新源
$ sudo apt-get update

# 安装 RabbitMQ
$ sudo apt-get install rabbitmq-server

# 添加 admin 用户, 密码设置为 admin123
$ sudo rabbitmqctl add_user admin admin123

# 将 admin 用户设置为管理员角色
$ sudo rabbitmqctl set_user_tags admin administrator

# 设置 admin 赋权
$ sudo rabbitmqctl set_permissions -p / admin '.*' '.*' '.*'

# 启用图形管理界面
$ sudo rabbitmq-plugins enable rabbitmq_management

安装完成后,管理界面在 15672 端口,浏览器访问

http://your-rabbitmq-server-ip:15672open in new window

服务管理命令

sudo service rabbitmq-server stop
sudo service rabbitmq-server start
sudo service rabbitmq-server restart

Java 中调用,添加 pom 依赖

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.10.0</version>
</dependency>

Java 代码如下

import com.rabbitmq.client.*;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

public class Demo {
    public static void main(String[] args) throws IOException, TimeoutException {

        publish1();
        publish2();

        //consume();
    }


    private static void publish1() throws IOException, TimeoutException {
        //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();

        factory.setUsername("admin");
        factory.setPassword("admin123");

        //设置 RabbitMQ 地址
        factory.setHost("localhost");
        factory.setPort(5672);

        //建立到代理服务器到连接
        Connection conn = factory.newConnection();

        //获得信道
        Channel channel = conn.createChannel();

        //声明队列。
        //参数1:队列名
        //参数2:持久化 (true表示是,队列将在服务器重启时依旧存在)
        //参数3:独占队列(创建者可以使用的私有队列,断开后自动删除)
        //参数4:当所有消费者客户端连接断开时是否自动删除队列
        //参数5:队列的其他参数
        channel.queueDeclare("msg", true, false, false, null);

        //发布消息
        String message = "hello";

        // 基本发布消息
        // 第一个参数为交换机名称(空)
        // 第二个参数为队列映射的路由key(直接使用队列名)
        // 第三个参数为消息的其他属性、
        // 第四个参数为发送信息的主体
        channel.basicPublish("", "msg", MessageProperties.MINIMAL_PERSISTENT_BASIC, message.getBytes(StandardCharsets.UTF_8));

        channel.close();
        conn.close();
    }

    private static void publish2() throws IOException, TimeoutException {
        //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();

        factory.setUsername("admin");
        factory.setPassword("admin123");

        //设置 RabbitMQ 地址
        factory.setHost("localhost");
        factory.setPort(5672);

        //建立到代理服务器到连接
        Connection conn = factory.newConnection();

        //获得信道
        Channel channel = conn.createChannel();

        //声明交换器
        String exchangeName = "/chat";
        channel.exchangeDeclare(exchangeName, "direct", true);


        //声明队列。
        //参数1:队列名
        //参数2:持久化 (true表示是,队列将在服务器重启时依旧存在)
        //参数3:独占队列(创建者可以使用的私有队列,断开后自动删除)
        //参数4:当所有消费者客户端连接断开时是否自动删除队列
        //参数5:队列的其他参数
        channel.queueDeclare("msg", true, false, false, null);

        //队列绑定到交换机
        String routingKey = "tag1";
        channel.queueBind("msg", "/chat", routingKey);


        //发布消息
        String message = "hello";


        // 基本发布消息
        // 第一个参数为交换机名称、
        // 第二个参数为队列映射的路由key、
        // 第三个参数为消息的其他属性 指定持久化 (创建队列也需要配置持久化)
        // 第四个参数为发送信息的主体
        channel.basicPublish("/chat", "tag1", MessageProperties.MINIMAL_PERSISTENT_BASIC, message.getBytes(StandardCharsets.UTF_8));


        channel.close();
        conn.close();
    }

    private static void consume() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUsername("admin");
        factory.setPassword("admin123");

        //设置 RabbitMQ 地址
        factory.setHost("localhost");
        factory.setPort(5672);

        //建立到代理服务器到连接
        Connection conn = factory.newConnection();

        //获得信道
        Channel channel = conn.createChannel();

        //声明队列
        channel.queueDeclare("msg", true, false, false, null);

        while (true) {
            //消费消息
            boolean autoAck = false;
            String consumerTag = "";
            channel.basicConsume("msg", autoAck, consumerTag, new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag,
                                           Envelope envelope,
                                           AMQP.BasicProperties properties,
                                           byte[] body) throws IOException {

                    String routingKey = envelope.getRoutingKey();
                    String contentType = properties.getContentType();

                    System.out.println("消费的路由键:" + routingKey);
                    System.out.println("消费的内容类型:" + contentType);

                    System.out.println("消费的消息体内容:");
                    String bodyStr = new String(body, "UTF-8");
                    System.out.println(bodyStr);

                    sleep(1000);

                    //确认消息
                    long deliveryTag = envelope.getDeliveryTag();
                    channel.basicAck(deliveryTag, false);

                }
            });
        }
    }

    private static void sleep(long t) {
        try {
            Thread.sleep(t);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
Loading...