MQ
大约 4 分钟
消息队列MQ
常见的消息队列有:
- ActiveMQ
- RabbitMQ
- Kafka
- RocketMQ
接下来,我们主要掌握 RabbitMQ,官方文档 https://www.rabbitmq.com/
安装方式,可以选择 Docker 安装,或 二进制包安装
如果用 Docker 安装:
$ docker run -d --restart=always -m 2g --memory-swap 2g --cpus 1.1 \
-v $PWD/rabbitmq-data:/var/lib/rabbitmq -v $PWD/log:/var/log/rabbitmq \
--hostname rabbitmq01 --name rabbitmq \
-e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin123 \
-p 5672:5672 -p 15672:15672 -p 15692:15692 rabbitmq:3.9.7-management
如果使用二进制包安装,Ubuntu 下安装步骤如下:
# 更新源
$ sudo apt-get update
# 安装 RabbitMQ
$ sudo apt-get install rabbitmq-server
# 添加 admin 用户, 密码设置为 admin123
$ sudo rabbitmqctl add_user admin admin123
# 将 admin 用户设置为管理员角色
$ sudo rabbitmqctl set_user_tags admin administrator
# 设置 admin 赋权
$ sudo rabbitmqctl set_permissions -p / admin '.*' '.*' '.*'
# 启用图形管理界面
$ sudo rabbitmq-plugins enable rabbitmq_management
安装完成后,管理界面在 15672 端口,浏览器访问
http://your-rabbitmq-server-ip:15672
服务管理命令
sudo service rabbitmq-server stop
sudo service rabbitmq-server start
sudo service rabbitmq-server restart
Java 中调用,添加 pom 依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.10.0</version>
</dependency>
Java 代码如下
import com.rabbitmq.client.*;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
public class Demo {
public static void main(String[] args) throws IOException, TimeoutException {
publish1();
publish2();
//consume();
}
private static void publish1() throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("admin");
factory.setPassword("admin123");
//设置 RabbitMQ 地址
factory.setHost("localhost");
factory.setPort(5672);
//建立到代理服务器到连接
Connection conn = factory.newConnection();
//获得信道
Channel channel = conn.createChannel();
//声明队列。
//参数1:队列名
//参数2:持久化 (true表示是,队列将在服务器重启时依旧存在)
//参数3:独占队列(创建者可以使用的私有队列,断开后自动删除)
//参数4:当所有消费者客户端连接断开时是否自动删除队列
//参数5:队列的其他参数
channel.queueDeclare("msg", true, false, false, null);
//发布消息
String message = "hello";
// 基本发布消息
// 第一个参数为交换机名称(空)
// 第二个参数为队列映射的路由key(直接使用队列名)
// 第三个参数为消息的其他属性、
// 第四个参数为发送信息的主体
channel.basicPublish("", "msg", MessageProperties.MINIMAL_PERSISTENT_BASIC, message.getBytes(StandardCharsets.UTF_8));
channel.close();
conn.close();
}
private static void publish2() throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("admin");
factory.setPassword("admin123");
//设置 RabbitMQ 地址
factory.setHost("localhost");
factory.setPort(5672);
//建立到代理服务器到连接
Connection conn = factory.newConnection();
//获得信道
Channel channel = conn.createChannel();
//声明交换器
String exchangeName = "/chat";
channel.exchangeDeclare(exchangeName, "direct", true);
//声明队列。
//参数1:队列名
//参数2:持久化 (true表示是,队列将在服务器重启时依旧存在)
//参数3:独占队列(创建者可以使用的私有队列,断开后自动删除)
//参数4:当所有消费者客户端连接断开时是否自动删除队列
//参数5:队列的其他参数
channel.queueDeclare("msg", true, false, false, null);
//队列绑定到交换机
String routingKey = "tag1";
channel.queueBind("msg", "/chat", routingKey);
//发布消息
String message = "hello";
// 基本发布消息
// 第一个参数为交换机名称、
// 第二个参数为队列映射的路由key、
// 第三个参数为消息的其他属性 指定持久化 (创建队列也需要配置持久化)
// 第四个参数为发送信息的主体
channel.basicPublish("/chat", "tag1", MessageProperties.MINIMAL_PERSISTENT_BASIC, message.getBytes(StandardCharsets.UTF_8));
channel.close();
conn.close();
}
private static void consume() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("admin");
factory.setPassword("admin123");
//设置 RabbitMQ 地址
factory.setHost("localhost");
factory.setPort(5672);
//建立到代理服务器到连接
Connection conn = factory.newConnection();
//获得信道
Channel channel = conn.createChannel();
//声明队列
channel.queueDeclare("msg", true, false, false, null);
while (true) {
//消费消息
boolean autoAck = false;
String consumerTag = "";
channel.basicConsume("msg", autoAck, consumerTag, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
String routingKey = envelope.getRoutingKey();
String contentType = properties.getContentType();
System.out.println("消费的路由键:" + routingKey);
System.out.println("消费的内容类型:" + contentType);
System.out.println("消费的消息体内容:");
String bodyStr = new String(body, "UTF-8");
System.out.println(bodyStr);
sleep(1000);
//确认消息
long deliveryTag = envelope.getDeliveryTag();
channel.basicAck(deliveryTag, false);
}
});
}
}
private static void sleep(long t) {
try {
Thread.sleep(t);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Loading...