pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>dym-rocketmq-test</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.8.0</version>
</dependency>
</dependencies>
</project>
Producer.java
package com.dym.simple;
//发送消息
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
public class Producer {
public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
//1. 谁来发?
DefaultMQProducer producer = new DefaultMQProducer("group1");
//2. 发给谁?
producer.setNamesrvAddr("localhost:9876");
producer.start();
// 3. 怎么发?
// 4. 发什么?
String msg="hello rocketmq";
Message message = new Message("topic1", "tag1", msg.getBytes());
SendResult sendResult = producer.send(message);
// 5. 发的结果是什么?
System.out.println(sendResult);
//6. 打扫战场
producer.shutdown();
}
}
Consumer.java
package com.dym.simple;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class Consumer {
public static void main(String[] args) throws MQClientException {
//1. 谁来收
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
//2. 从哪里收消息
consumer.setNamesrvAddr("localhost:9876");
//3. 监听哪个消息队列
consumer.subscribe("topic1","*");
//4. 处理业务流程 注册监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
//写我们的业务逻辑
for(MessageExt msg:msgs){
System.out.println(msg);
byte[] body = msg.getBody();
System.out.println(new String(body));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("消费者起起来了");
}
}
最后
以上就是忐忑丝袜最近收集整理的关于RocketMQ--生产者与消费者的简单示例的全部内容,更多相关RocketMQ--生产者与消费者内容请搜索靠谱客的其他文章。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复