文章目录
- 一、创建maven-java项目
- 二、依赖引入
- 三、java代码
- 3.1、编写连接工具类
- 3.2、编写发送信息类
- 3.3、查看消息
- 3.3.1、进入本地浏览器页面,输入账号密码信息
- 3.3.2、进入指定的queue中,查看相关信息
- 3.4、自动获取消息
- 3.4.1、使用一个已过时的方式获取
- 3.4.2、使用新方式监听并获取相关的队列中的消息
- 四、2019.11.04 增加笔记
- 五、增加绑定消息队列参数说明(2022.02.26)
- 六、增加消息发送参数说明
- 七、增加消费者拉取mq服务器消息方法参数说明
一、创建maven-java项目
创建maven项目工程,实现简单消息队列操作。
简单消息图示信息解释:
P:消息生产者
C 消息消费者
二、依赖引入
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28<dependencies> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>4.0.2</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.10</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.5</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> </dependency> </dependencies>
三、java代码
3.1、编写连接工具类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class MqConnectUtil { /** * 获取mq的连接 * @return * @throws TimeoutException * @throws IOException */ public static Connection getMqConnection() throws IOException, TimeoutException{ //1、定义一个连接工厂 ConnectionFactory factory = new ConnectionFactory(); //2、设置连接地址等信息 factory.setHost("127.0.0.1"); //amqp协议端口 factory.setPort(5672); //设置具体的vhost(想象成数据库) factory.setVirtualHost("/xiangjiao"); //设置用户名密码信息 factory.setUsername("xiangjiao"); factory.setPassword("bunana"); return factory.newConnection(); } }
3.2、编写发送信息类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import cn.linkpower.util.MqConnectUtil; /** * 简单消息队列 生产者 * @author 76519 * */ public class Send { private static String simpleQueueName = "test_simple_queue"; public static void main(String[] args) throws Exception{ //创建一个连接 Connection conn = MqConnectUtil.getMqConnection(); //构建一条通道 --- 从连接中获取一个通道 Channel createChannel = conn.createChannel(); //申明一个指定的队列 createChannel.queueDeclare(simpleQueueName, false, false, false, null); //发送数据信息 String msg = "hello xiangjiao bunana"; //发送数据 createChannel.basicPublish("", simpleQueueName, null, msg.getBytes()); System.out.println("---send msg = "+msg); //关闭流和连接 createChannel.close(); conn.close(); } }
我们在操作 rabbitmq
时,可以将其看作一个数据库(mysql、oracle等),相关操作和数据库jdbc操作很类似。在使用前,需要指明详细的哪个消息队列;在使用完成后,因为他的操作属于IO流操作,所以为了节省资源和避免出现不必要bug,需要及时的对流进行关闭操作。
3.3、查看消息
在运行上述代码之后,我们如何才能知道消息是否添加至队列中了呢?
3.3.1、进入本地浏览器页面,输入账号密码信息
前提:需要保证
RabbitMq
队列服务器,开启了视图管理
。
3.3.2、进入指定的queue中,查看相关信息
点击java代码中设定的名称后,我们会进入另外一个页面,找到Get Messages
项,展开列表
选择 Get Message按钮后,则会出现上述已发送的信息。
这里需要注意一点:当获取了其中的消息后,这个队列中也就没有这个消息了。
实际开发中,我们不可能每次手动去网址上取数据,所以必须使用一个另外的代码去自动获取相关的消息,当有消息生产者提供了消息后,消息消费者则需要自动获取相关的消息。如何用代码实现自动获取呢?
3.4、自动获取消息
3.4.1、使用一个已过时的方式获取
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConsumerCancelledException; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.QueueingConsumer.Delivery; import com.rabbitmq.client.ShutdownSignalException; import cn.linkpower.util.MqConnectUtil; /** * 使用程序代码获取send端发送的消息(从mq中自动取得) * @author 76519 * */ public class GetMqMsg_old { private static String simpleQueueName = "test_simple_queue"; public static void main(String[] args) throws IOException, TimeoutException, ShutdownSignalException, ConsumerCancelledException, InterruptedException { //获取连接 Connection mqConnection = MqConnectUtil.getMqConnection(); //构建通道 Channel createChannel = mqConnection.createChannel(); //定义一个队列的消费者 QueueingConsumer queueingConsumer = new QueueingConsumer(createChannel); //监听 createChannel.basicConsume(simpleQueueName, true,queueingConsumer); //获取数据 while(true){ Delivery nextDelivery = queueingConsumer.nextDelivery(); String msg = new String(nextDelivery.getBody()); System.out.println("get old msg == "+msg); } } }
过时的方式,我们使用的案例为创建一个指定消息队列的监听对象,又创建了一个死循环不断地去打印新的消息信息。但死循环毕竟不是最好的实现方式,所以我们需要使用新的方式实现相同的技术。
3.4.2、使用新方式监听并获取相关的队列中的消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import cn.linkpower.util.MqConnectUtil; public class GetMqMsg_new { private static String simpleQueueName = "test_simple_queue"; public static void main(String[] args) throws IOException, TimeoutException { // 获取连接 Connection mqConnection = MqConnectUtil.getMqConnection(); // 构建通道 Channel createChannel = mqConnection.createChannel(); DefaultConsumer consumer = new DefaultConsumer(createChannel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" get msg new = " + message ); } }; //监听队列 createChannel.basicConsume(simpleQueueName, true, consumer); } }
四、2019.11.04 增加笔记
不管是生产者生产消息还是消费者消费消息,都需要申明消息队列(配置消息队列)
1
2
3createChannel.queueDeclare(simpleQueueName, false, false, false, null);
其实原因大家都会知道的,如果在 localhost:15672中的Queues没有相关的队列信息时,如下所示:
没有指定的 test_simple_queue。
消息消费者或生产者启动若没有进行消息队列的设置,则会出现一个异常信息。
出现这个问题的原因就在于,创建连接和通信管道后,此时的消息队列中不存在指定的消息,消息队列不知道是那种类型的队列(轮询、公平分发等等),才出现的报错。
之前有说到:消费者不需要再设置。
原则上是这样的,但消费者在java中采取的是监听机制,他可以在消息生产者生产消息之前就进行启动的,如果是这样的话,在消息队列中依旧不存在申明好的消息队列,运行也会出现报错信息。
五、增加绑定消息队列参数说明(2022.02.26)
将信道 channel
绑定至制定的消息队列中,采取如下所示的方式实现基本的绑定操作:
1
2
3createChannel.queueDeclare(simpleQueueName, false, false, false, null);
在源码中的对应方法参数类型如下所示:
1
2
3
4
5
6Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException;
可以发现,需要设备具体的几种类型参数信息,针对各项参数信息进行以下说明补充。
参数位置 | 参数描述 |
---|---|
参数一 | 设定消息队列的名称ID |
参数二 | 是否持久化队列 。false表示不将队列进行持久化操作。当MQ重启或者停掉时,会将该队列进行删除。 |
参数三 | 是否将当前队列私有化。 false表示所有的消费者均能访问当前消息队列。 true时表示该队列只能第一次与拥有他的消费者进行连接使用。 |
参数四 | 是否自动删除当前队列。 false表示当前连接关闭后,不会自动删除该队列。 |
第五个参数 | 用于配置其他信息。 参照后期博客:消息应答(autoAck)、队列持久化(durable)以及消息持久化 |
六、增加消息发送参数说明
消息队列建立连接后,进行了制定信道和队列的绑定操作,此时需要采取如下方式,进行将消息 入 队列
操作:
针对channel.basicPublish
方法,增加如下各种参数信息说明补充:
参数位置 | 参数描述 |
---|---|
参数一 | 指定交换机 的名称ID如果不存在对应的交换机,则只需要传递 "" 即可。mq底层会使用默认的交换机进行消息的发送。 |
参数二 | 消息队列的名称ID。 指定交换机需要向哪个队列推送消息。 |
参数三 | 用来配置当前消息的参数。 参见博客:消息应答(autoAck)、队列持久化(durable)以及消息持久化 |
参数四 | 需要发送的消息的字节数组 。 |
七、增加消费者拉取mq服务器消息方法参数说明
消费者
需要从指定MQ服务器
中拉取消息信息,通常采取如下方式进行数据的获取:
1
2channel.basicConsume(queue_name, false,consumer);
该方法同样具有一些参数设置,针对其参数信息做如下描述补充:
参数位置 | 参数描述 |
---|---|
参数一 | 目标消息队列名称ID 指定从哪个消息队列中获取消息。 |
参数二 | Boolean 类型,用于表示当前消息是否自动应答 。即:设置true,消息发送给消费者后,自动从消息队列中删除指定数据。 但mq不建议使用,比如发送给指定消费者,但出异常后,消息到底是消费了还是未消费?造成消息丢失。 |
参数三 | 当前的消费对象。 如案例中给出的 new DefaultConsumer 。 |
最后
以上就是碧蓝硬币最近收集整理的关于rabbitmq学习(2)——java实现简单队列一、创建maven-java项目二、依赖引入三、java代码四、2019.11.04 增加笔记五、增加绑定消息队列参数说明(2022.02.26)六、增加消息发送参数说明七、增加消费者拉取mq服务器消息方法参数说明的全部内容,更多相关rabbitmq学习(2)——java实现简单队列一、创建maven-java项目二、依赖引入三、java代码四、2019.11.04内容请搜索靠谱客的其他文章。
发表评论 取消回复