我是靠谱客的博主 称心朋友,最近开发中收集的这篇文章主要介绍rabbitmq学习以及spring项目spring-amqp配置rabbitmq学习spring-amqp项目,觉得挺不错的,现在分享给大家,希望可以做个参考。
概述
rabbitmq学习
rabbimq是一种由erlang语言开发的,程序和程序之间通讯工具,因此,在使用rabbitmq时候需要安装erlang的环境,这里就略了。
装好rabbitmq后,我们登录127.0.0.1:15672 ,就可以进入页面视图,进行账户相关的设置,这里也略过.
我们主要学习rabbitmq的消息通讯学习,我们使用pom工程,引入rabbitmq的依赖,坐标是:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.4.1</version>
</dependency>
rabbitmq消息通讯主要有5种模式:简单模式,work模式,订阅模式,路由模式,通配符模式.
- 简单模式
/** 发送者 **/
public class Send {
private static String QUEUE_NAME= "test_queue";
public static void main(String[] args) throws IOException {
/*
* 步骤:
* 1.建立连接
* 2.创建通道
* 3.声明队列
* 4.设定消息内容,连接通道
* 5.关闭通道和连接
*/
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "hello rabbitmq";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
channel.close();
connection.close();
}
}
/**连接工具类,通过工具类连接开头设置的host/账号/密码**/
public class ConnectionUtil {
public static Connection getConnection() throws IOException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setVirtualHost("/huawei");
factory.setUsername("huawei");
factory.setPassword("huawei");
Connection connection = factory.newConnection();
return connection;
}
}
/**接收者**/
public class Recev {
private final static String QUEUE_NAME = "test_queue";
public static void main(String[] args) throws Exception {
/*
* 步骤:
* 1.建立连接
* 2.创建通道
* 3.声明队列
* 4.设定消息者,监听队列
* 5.获取消息
*/
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME, true,consumer);//监听 ,true表示消息自动确认
while(true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("receve:"+message);
}
}
}
代码写好后,先运行接收者,让其处理待接收状态,然后运行发送者,这样接收者就能接收到消息,并打印出消息内容,在页面的queue木块也能查看到队列名以及队列消 息运行历史.
- work模式
work模式和简单模式的区别就是,简单模式是一个发送者一个接收者,但是work模式可以是一个发送者,多个接收者,缺点就是多个接收者接收到的消息都是消息的一部分,并不是全部 .
/**
消息发送者 **/
public class Send {
private static String QUEUE_NAME = "test_queue_work";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
for (int i = 0; i < 100; i++) {
String message = "" + i ;
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
Thread.sleep(i*10);
}
channel.close();
connection.close();
}
}
/** 消息接收者1 **/
public class Recev {
private static String QUEUE_NAME = "test_queue_work";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//开启能者多劳模式,同一时间只会发送过一条消息给消费者
channel.basicQos(1);
QueueingConsumer consumer
= new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME, true,consumer);//true表示消息自动确认
while(true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("received :" + message);
Thread.sleep(10);
}
}
}
/**
消息接收者2
**/
public class Recev2 {
private static String QUEUE_NAME = "test_queue_work";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//开启能者多劳模式,同一时间只会发送过一条消息给消费者
channel.basicQos(1);
QueueingConsumer consumer
= new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME, true,consumer);//true表示消息自动确认
while(true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("received2 :" + message);
Thread.sleep(1000);
}
}
}
- 订阅模式
订阅模式解决了work模式的缺点,使得多个接收者都能够接收到所有的消息,解决的机制是通过一个交换机,消息经过发送者发送后,到达交换机,交换机再将消息分发到多个队列中,再由接收者接收
/** 发送者 **/
public class Send{
private final static String EXCHANGE_NAME = "test_exchange_fanout";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明exchange 交换机
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
// 消息内容
String message = "Hello World!";
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
System.out.println("send: " + message + "'");
channel.close();
connection.close();
}
}
/**
接收者1 **/
public class Recv {
private final static String QUEUE_NAME = "test_queue_work";
private final static String EXCHANGE_NAME = "test_exchange_fanout";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 绑定队列到交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
// 同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);
// 定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 监听队列,手动返回完成
channel.basicConsume(QUEUE_NAME, false, consumer);
// 获取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
Thread.sleep(10);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
/**
接收者2 **/
public class Recv2 {
private final static String QUEUE_NAME = "test_queue_work2";
private final static String EXCHANGE_NAME = "test_exchange_fanout";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 绑定队列到交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
// 同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);
// 定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 监听队列,手动返回完成
channel.basicConsume(QUEUE_NAME, false, consumer);
// 获取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
Thread.sleep(10);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
- 路由模式
订阅模式看起来已经比较完善了,但是还是有个缺点,就是不能满足我们的特殊需求,比如我们消息分为很多中,有的是更新消息,有的是插入删除消息,如果需要使不同的消费者处理不同的消息,可以使用路由模式,路由模式的原理是,在交换机设置的地方设置routingkey ,消费者根据发送者发送的routingkey来匹配是否需要处理。
/** 发送者 **/
public class Send {
private final static String EXCHANGE_NAME = "test_exchange_direct";
public static void main(String[] args) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//设定交换机
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String messge = "hello rabbitmq";
//这里第二个参数,设定了routingkey为insert
channel.basicPublish(EXCHANGE_NAME, "insert", null, messge.getBytes());
channel.close();
connection.close();
}
}
/**
消费者1 **/
public class Recev {
private final static String QUEUE_NAME = "test_queue_work";
private final static String EXCHANGE_NAME = "test_exchange_direct";
public static void main(String[] args) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//绑定到交换机,第三个参数为routingkey
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");
// 同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);
// 定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 监听队列,手动返回完成
channel.basicConsume(QUEUE_NAME, false, consumer);
// 获取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
Thread.sleep(10);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
/**
消费者2 **/
public class Recv2 {
private final static String QUEUE_NAME = "test_queue_work2";
private final static String EXCHANGE_NAME = "test_exchange_direct";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 绑定队列到交换机,第三个参数为routingkey
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "insert");
// 同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);
// 定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 监听队列,手动返回完成
channel.basicConsume(QUEUE_NAME, false, consumer);
// 获取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
Thread.sleep(10);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
- 通配符模式
通配符模式是路由模式的升级版,是对routingkey的匹配上做了规则改进,可以使用#号或星号指定routingkey的匹配规则, #匹配一个或多个单词,而 星号 匹配不多不少一个单词
/** 发送者
**/
public class Send {
private final static String EXCHANGE_NAME = "test_exchange_topic";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明exchange
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
// 消息内容
String message = "{item:123123,type:insert}";
channel.basicPublish(EXCHANGE_NAME, "key.insert", null, message.getBytes());
channel.close();
connection.close();
}
}
/** 接收者1 **/
public class Recev {
private final static String QUEUE_NAME = "test_queue_topic_work";
private final static String EXCHANGE_NAME = "test_exchange_topic";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 绑定队列到交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key.*");
// 同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);
// 定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 监听队列,手动返回完成
channel.basicConsume(QUEUE_NAME, false, consumer);
// 获取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
Thread.sleep(10);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
/**
接收者2 **/
public class Recev2 {
private final static String QUEUE_NAME = "test_queue_topic_work2";
private final static String EXCHANGE_NAME = "test_exchange_topic";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 绑定队列到交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.*");
// 同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);
// 定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 监听队列,手动返回完成
channel.basicConsume(QUEUE_NAME, false, consumer);
// 获取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
Thread.sleep(10);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
spring-amqp项目
spring-amqp项目是spring整合了rabbitmq,使得我们的项目中可以通过配置文件配置rabbitmq来使用
首先导入rabbitmq以及spring整合rabbitmq的依赖
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.4.0.RELEASE</version>
</dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.4.1</version>
</dependency>
applicationContext-rabbitmq.xml的配置文件配置:
<!-- amqp 需要配置的信息有
1.连接工厂
2.管理器(管理队列和交换机的)
3.交换机(路由)
4.amqp模板(用于发送消息)
5.队列
6.监听(消费者用于监听队列的消息)
-->
<!-- 1.定义连接工厂 -->
<rabbit:connection-factory id="connectionFactory" host="127.0.0.1" port="5672"
username="huawei" password="huawei" virtual-host="/huawei"/>
<!-- 2.定义管理器,需要依赖连接工厂 -->
<rabbit:admin connection-factory="connectionFactory"/>
<!-- 3.定义交换机 -->
<rabbit:fanout-exchange name="fanoutExchange" auto-declare="true">
<rabbit:bindings>
<rabbit:binding
queue="myQueue"/>
</rabbit:bindings>
</rabbit:fanout-exchange>
<!-- <rabbit:topic-exchange name="">
<rabbit:bindings>
<rabbit:binding queue="myQueue" pattern="foo.*" />
</rabbit:bindings>
</rabbit:topic-exchange> -->
<!-- 4.定义模板 -->
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory"
exchange="fanoutExchange">
</rabbit:template>
<!-- 5.定义队列 -->
<rabbit:queue name="myQueue" auto-declare="true"/>
<!-- 6.定义监听 ,需要依赖连接工厂,定义bean,方法 ,队列名-->
<rabbit:listener-container connection-factory="connectionFactory">
<rabbit:listener ref="foo" method="listen" queue-names="myQueue"/>
</rabbit:listener-container>
<bean id="foo"
class="com.hw.rabbitmq.spring.Foo"/>
加载配置文件
public class SpringMain {
public static void main(String[] args) throws Exception {
AbstractApplicationContext context =
new ClassPathXmlApplicationContext("classpath:spring/rabbitmq-context.xml");
RabbitTemplate template = context.getBean(RabbitTemplate.class);
try {
template.convertAndSend("hello rabbitmq");
} catch (Exception e) {
e.printStackTrace();
}
Thread.sleep(1000);
context.destroy();
}
}
public class Foo {
public
void listen(String foo){
System.out.println(foo);
}
}
当我们运行SpringMain时候,Foo就会打印出收到的消息
最后
以上就是称心朋友为你收集整理的rabbitmq学习以及spring项目spring-amqp配置rabbitmq学习spring-amqp项目的全部内容,希望文章能够帮你解决rabbitmq学习以及spring项目spring-amqp配置rabbitmq学习spring-amqp项目所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复