我是靠谱客的博主 忐忑丝袜,这篇文章主要介绍RocketMQ--生产者与消费者的简单示例,现在分享给大家,希望可以做个参考。

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>dym-rocketmq-test</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.8.0</version>
        </dependency>
    </dependencies>
</project>

Producer.java

package com.dym.simple;

//发送消息

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

public class Producer {
    public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {

        //1. 谁来发?
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        //2. 发给谁?
        producer.setNamesrvAddr("localhost:9876");
        producer.start();
        // 3. 怎么发?
        // 4. 发什么?
        String msg="hello rocketmq";
        Message message = new Message("topic1", "tag1", msg.getBytes());
        SendResult sendResult = producer.send(message);
        // 5. 发的结果是什么?
        System.out.println(sendResult);
        //6. 打扫战场
        producer.shutdown();
    }
}

Consumer.java

package com.dym.simple;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class Consumer {
    public static void main(String[] args) throws MQClientException {
        //1. 谁来收
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
        //2. 从哪里收消息
        consumer.setNamesrvAddr("localhost:9876");
        //3. 监听哪个消息队列
        consumer.subscribe("topic1","*");
        //4. 处理业务流程  注册监听器
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                //写我们的业务逻辑
                for(MessageExt msg:msgs){
                    System.out.println(msg);
                    byte[] body = msg.getBody();
                    System.out.println(new String(body));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();

        System.out.println("消费者起起来了");
    }
}

 

最后

以上就是忐忑丝袜最近收集整理的关于RocketMQ--生产者与消费者的简单示例的全部内容,更多相关RocketMQ--生产者与消费者内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(55)

评论列表共有 0 条评论

立即
投稿
返回
顶部