我是靠谱客的博主 火星上火车,最近开发中收集的这篇文章主要介绍java实现阿里云rocketMQ消息的发送与消费(http协议sdk)一、准备工作二、代码实现三、配置main的日志输出级别四、测试效果五、完成代码,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

目录

  • 一、准备工作
  • 二、代码实现
    • 1.添加依赖
    • 2.创建一个常量类存放公共参数
    • 3.调用HTTP协议的SDK 发送普通消息
    • 4.调用HTTP协议的SDK 订阅普通消息
  • 三、配置main的日志输出级别
  • 四、测试效果
  • 五、完成代码

一、准备工作

登录阿里云官网,先申请rocketMQ,再申请Topic、Group ID,然后就是参考阿里云的JAVA SDK进行编程实现。

环境要求:
安装JDK 1.6或以上版本
安装Maven
安装Java SDK

参照 阿里云 官方文档,来一步一步操作。
文档提供的SDK有TCP和Http协议,这里使用HTTP协议来实现rocketMQ消息的发送与消费。

在这里插入图片描述

二、代码实现

调用HTTP协议的SDK收发普通消息

1.添加依赖

创建Springboot项目,添加 SDK依赖:

 <dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-web</artifactId>
 </dependency>
 <!--aliyun mq sdk-->
 <dependency>
     <groupId>com.aliyun.mq</groupId>
     <artifactId>mq-http-sdk</artifactId>
     <version>1.0.2</version>
 </dependency>

注意:aliyun mq sdk的版本信息,请参见版本说明

2.创建一个常量类存放公共参数

package com.example.rocketdemo.config;

/**
 * @author qzz
 */
public class MqConfigParams {

    /**
     * 你的topic
     */
    public static final String TOPIC = "你的topic";
    /**
     * 消息标签  *:代表全部
     */
    public static final String TAG = "你的tag";
     /**
     * 你的Group_ID
     */
    public static final String GROUP_ID = "你的Group_ID";
    /**
     *你的accessKey
     */
    public static final String ACCESS_KEY = "你的accessKey";
    /**
     *你的secretKey
     */
    public static final String SECRET_KEY = "你的secretKey";
    /**
     * 实例ID
     */
    public static final String INSTANCE_ID = "你的实例ID";
    /**
     *设置HTTP协议客户端接入点,进入消息队列RocketMQ版控制台实例详情页面的接入点区域查看
     */
    public static final String HTTP_ENDPOINT = "http接入点地址";
}

3.调用HTTP协议的SDK 发送普通消息

package com.example.rocketdemo.util;

import com.aliyun.mq.http.MQClient;
import com.aliyun.mq.http.MQProducer;
import com.aliyun.mq.http.model.TopicMessage;
import com.example.rocketdemo.config.MqConfigParams;
import lombok.extern.slf4j.Slf4j;
import java.util.Date;

/**
 * 生产 阿里云 RocketMQ 消息
 * @author qzz
 */
@Slf4j
public class AliyunMessageProducerTest {
    public static void main(String[] args) {
        MQClient mqClient = new MQClient(
                // 设置HTTP协议客户端接入点,进入消息队列RocketMQ版控制台实例详情页面的接入点区域查看。
                MqConfigParams.HTTP_ENDPOINT,
                // AccessKey ID,阿里云身份验证,在阿里云RAM控制台创建。
                MqConfigParams.ACCESS_KEY,
                // AccessKey Secret,阿里云身份验证,在阿里云RAM控制台创建。
                MqConfigParams.SECRET_KEY
        );

        // 消息所属的Topic,在消息队列RocketMQ版控制台创建。
        // 不同消息类型的Topic不能混用,例如普通消息的Topic只能用于收发普通消息,不能用于收发其他类型的消息。
        final String topic = MqConfigParams.TOPIC;
        // Topic所属的实例ID,在消息队列RocketMQ版控制台创建。
        // 若实例有命名空间,则实例ID必须传入;若实例无命名空间,则实例ID传入null空值或字符串空值。实例的命名空间可以在消息队列RocketMQ版控制台的实例详情页面查看。
        final String instanceId = MqConfigParams.INSTANCE_ID;

        // 获取Topic的生产者。
        MQProducer producer;
        if (instanceId != null && instanceId != "") {
            producer = mqClient.getProducer(instanceId, topic);
        } else {
            producer = mqClient.getProducer(topic);
        }

        try {
            // 循环发送2条消息。
            for (int i = 0; i < 2; i++) {
                TopicMessage pubMsg;        // 普通消息。
                pubMsg = new TopicMessage(
                        // 消息内容。
                        "hello mq 111!".getBytes(),
                        // 消息标签。
                        MqConfigParams.TAG
                );
                // 设置消息的自定义属性。
                pubMsg.getProperties().put("a", String.valueOf(i));
                // 设置消息的Key。
                pubMsg.setMessageKey("MessageKey");

                // 同步发送消息,只要不抛异常就是成功。
                TopicMessage pubResultMsg = producer.publishMessage(pubMsg);

                // 同步发送消息,只要不抛异常就是成功。
                System.out.println(new Date() + " Send mq message success. Topic is:" + topic + ", msgId is: " + pubResultMsg.getMessageId()
                        + ", bodyMD5 is: " + pubResultMsg.getMessageBodyMD5());
            }
        } catch (Throwable e) {
            // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理。
            System.out.println(new Date() + " Send mq message failed. Topic is:" + topic);
            e.printStackTrace();
        }

        mqClient.close();
    }


}

4.调用HTTP协议的SDK 订阅普通消息

订阅普通消息的代码如下:

package com.example.rocketdemo.util;

import com.aliyun.mq.http.MQClient;
import com.aliyun.mq.http.MQConsumer;
import com.aliyun.mq.http.common.AckMessageException;
import com.aliyun.mq.http.model.Message;
import com.example.rocketdemo.config.MqConfigParams;
import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
import java.util.List;

/**
 * 消费 阿里云 RocketMQ 消息
 * @author qzz
 */
@Slf4j
public class AliyunMessageConsumerTest {

    public static void main(String[] args) {
        MQClient mqClient = new MQClient(
                // 设置HTTP协议客户端接入点,进入消息队列RocketMQ版控制台实例详情页面的接入点区域查看。
                MqConfigParams.HTTP_ENDPOINT,
                // AccessKey ID,阿里云身份验证,在阿里云RAM控制台创建。
                MqConfigParams.ACCESS_KEY,
                // AccessKey Secret,阿里云身份验证,在阿里云RAM控制台创建。
                MqConfigParams.SECRET_KEY
        );

        // 消息所属的Topic,在消息队列RocketMQ版控制台创建。
        //不同消息类型的Topic不能混用,例如普通消息的Topic只能用于收发普通消息,不能用于收发其他类型的消息。
        final String topic = MqConfigParams.TOPIC;
        // 您在消息队列RocketMQ版控制台创建的Group ID。
        final String groupId = MqConfigParams.GROUP_ID;
        // Topic所属的实例ID,在消息队列RocketMQ版控制台创建。
        // 若实例有命名空间,则实例ID必须传入;若实例无命名空间,则实例ID传入null空值或字符串空值。实例的命名空间可以在消息队列RocketMQ版控制台的实例详情页面查看。
        final String instanceId = MqConfigParams.INSTANCE_ID;

        final MQConsumer consumer;
        if (instanceId != null && instanceId != "") {
            consumer = mqClient.getConsumer(instanceId, topic, groupId, null);
        } else {
            consumer = mqClient.getConsumer(topic, groupId);
        }

        // 在当前线程循环消费消息,建议多开个几个线程并发消费消息。
        do {
            List<Message> messages = null;

            try {
                // 长轮询消费消息。
                // 长轮询表示如果Topic没有消息,则请求会在服务端挂起3s,3s内如果有消息可以消费则立即返回客户端。
                messages = consumer.consumeMessage(
                        1,// 一次最多消费1条消息(最多可设置为16条)。
                        3// 长轮询时间3秒(最多可设置为30秒)。
                );
            } catch (Throwable e) {
                e.printStackTrace();
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e1) {
                    e1.printStackTrace();
                }
            }
            // Topic中没有消息可消费。
            if (messages == null || messages.isEmpty()) {
                System.out.println(Thread.currentThread().getName() + ": no new message, continue!");
                continue;
            }

            // 处理业务逻辑。
            for (Message message : messages) {
                System.out.println("Receive message: " + message+",topic:"+message.getMessageTag());
            }

            // 消息重试时间到达前若不确认消息消费成功,则消息会被重复消费。
            // 消息句柄有时间戳,同一条消息每次消费的时间戳都不一样。
            {
                List<String> handles = new ArrayList<String>();
                for (Message message : messages) {
                    handles.add(message.getReceiptHandle());
                }

                try {
                    consumer.ackMessage(handles);
                } catch (Throwable e) {
                    // 某些消息的句柄可能超时,会导致消息消费状态确认不成功。
                    if (e instanceof AckMessageException) {
                        AckMessageException errors = (AckMessageException) e;
                        System.out.println("Ack message fail, requestId is:" + errors.getRequestId() + ", fail handles:");
                        if (errors.getErrorMessages() != null) {
                            for (String errorHandle :errors.getErrorMessages().keySet()) {
                                System.out.println("Handle:" + errorHandle + ", ErrorCode:" + errors.getErrorMessages().get(errorHandle).getErrorCode()
                                        + ", ErrorMsg:" + errors.getErrorMessages().get(errorHandle).getErrorMessage());
                            }
                        }
                       continue;
                    }
                    e.printStackTrace();
                }
            }
        } while (true);
    }
}

三、配置main的日志输出级别

启动执行时,发现问题:控制台打印巨多debug日志

默认情况下,如果项目中集成了Logback等日志框架,在执行main方法时通过其进行日志打印,那么默认的日志级别是debug的

此时,如果是http请求,甚至可以把请求的具体报文信息都打印出来,特别是三方框架的。为了不影响查看正常的日志,可以将main方法的日志级别进行调整

解决方法:
resources目录下增加logback.xml

logback.xml:

<configuration debug="false">
    <logger name="org.apache" level="INFO" />
    <logger name="org.apache.http.wire" level="INFO" />
    <logger name="org.apache.http.headers" level="INFO" />
    <property name="CONSOLE_LOG_PATTERN"
              value="%date{yyyy-MM-dd HH:mm:ss}  %highlight(%-5level) %magenta(%-4relative) --- [%yellow(%15.15thread)] %cyan(%-40.40logger{39}) : %msg%n"/>
    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <pattern>${CONSOLE_LOG_PATTERN}</pattern>
        </encoder>
    </appender>
    <root level="ERROR">
        <appender-ref ref="STDOUT"/>
    </root>
</configuration>

再次启动,打印日志就简洁明多了。

四、测试效果

生产者启动运行,结果如下:
在这里插入图片描述

消费者启动运行,结果如下:

"D:Program FilesJavajdk1.8.0_40binjava.exe" "-javaagent:C:Program FilesJetBrainsIntelliJ IDEA 2021.1.3libidea_rt.jar=51390:C:Program FilesJetBrainsIntelliJ IDEA 2021.1.3bin" -Dfile.encoding=UTF-8 -classpath "D:Program FilesJavajdk1.8.0_40jrelibcharsets.jar;D:Program FilesJavajdk1.8.0_40jrelibdeploy.jar;D:Program FilesJavajdk1.8.0_40jrelibextaccess-bridge-64.jar;D:Program FilesJavajdk1.8.0_40jrelibextcldrdata.jar;D:Program FilesJavajdk1.8.0_40jrelibextdnsns.jar;D:Program FilesJavajdk1.8.0_40jrelibextjaccess.jar;D:Program FilesJavajdk1.8.0_40jrelibextjfxrt.jar;D:Program FilesJavajdk1.8.0_40jrelibextlocaledata.jar;D:Program FilesJavajdk1.8.0_40jrelibextnashorn.jar;D:Program FilesJavajdk1.8.0_40jrelibextsunec.jar;D:Program FilesJavajdk1.8.0_40jrelibextsunjce_provider.jar;D:Program FilesJavajdk1.8.0_40jrelibextsunmscapi.jar;D:Program FilesJavajdk1.8.0_40jrelibextsunpkcs11.jar;D:Program FilesJavajdk1.8.0_40jrelibextzipfs.jar;D:Program FilesJavajdk1.8.0_40jrelibjavaws.jar;D:Program FilesJavajdk1.8.0_40jrelibjce.jar;D:Program FilesJavajdk1.8.0_40jrelibjfr.jar;D:Program FilesJavajdk1.8.0_40jrelibjfxswt.jar;D:Program FilesJavajdk1.8.0_40jrelibjsse.jar;D:Program FilesJavajdk1.8.0_40jrelibmanagement-agent.jar;D:Program FilesJavajdk1.8.0_40jrelibplugin.jar;D:Program FilesJavajdk1.8.0_40jrelibresources.jar;D:Program FilesJavajdk1.8.0_40jrelibrt.jar;F:study-projectrocket-demotargetclasses;D:mvnrepositoryorgspringframeworkbootspring-boot-starter-web2.7.5spring-boot-starter-web-2.7.5.jar;D:mvnrepositoryorgspringframeworkbootspring-boot-starter2.7.5spring-boot-starter-2.7.5.jar;D:mvnrepositoryorgspringframeworkbootspring-boot2.7.5spring-boot-2.7.5.jar;D:mvnrepositoryorgspringframeworkbootspring-boot-autoconfigure2.7.5spring-boot-autoconfigure-2.7.5.jar;D:mvnrepositoryorgspringframeworkbootspring-boot-starter-logging2.7.5spring-boot-starter-logging-2.7.5.jar;D:mvnrepositorychqoslogbacklogback-classic1.2.11logback-classic-1.2.11.jar;D:mvnrepositorychqoslogbacklogback-core1.2.11logback-core-1.2.11.jar;D:mvnrepositoryorgapachelogginglog4jlog4j-to-slf4j2.17.2log4j-to-slf4j-2.17.2.jar;D:mvnrepositoryorgslf4jjul-to-slf4j1.7.36jul-to-slf4j-1.7.36.jar;D:mvnrepositoryjakartaannotationjakarta.annotation-api1.3.5jakarta.annotation-api-1.3.5.jar;D:mvnrepositoryorgyamlsnakeyaml1.30snakeyaml-1.30.jar;D:mvnrepositoryorgspringframeworkbootspring-boot-starter-json2.7.5spring-boot-starter-json-2.7.5.jar;D:mvnrepositorycomfasterxmljacksoncorejackson-databind2.13.4.2jackson-databind-2.13.4.2.jar;D:mvnrepositorycomfasterxmljacksoncorejackson-annotations2.13.4jackson-annotations-2.13.4.jar;D:mvnrepositorycomfasterxmljacksoncorejackson-core2.13.4jackson-core-2.13.4.jar;D:mvnrepositorycomfasterxmljacksondatatypejackson-datatype-jdk82.13.4jackson-datatype-jdk8-2.13.4.jar;D:mvnrepositorycomfasterxmljacksondatatypejackson-datatype-jsr3102.13.4jackson-datatype-jsr310-2.13.4.jar;D:mvnrepositorycomfasterxmljacksonmodulejackson-module-parameter-names2.13.4jackson-module-parameter-names-2.13.4.jar;D:mvnrepositoryorgspringframeworkbootspring-boot-starter-tomcat2.7.5spring-boot-starter-tomcat-2.7.5.jar;D:mvnrepositoryorgapachetomcatembedtomcat-embed-core9.0.68tomcat-embed-core-9.0.68.jar;D:mvnrepositoryorgapachetomcatembedtomcat-embed-el9.0.68tomcat-embed-el-9.0.68.jar;D:mvnrepositoryorgapachetomcatembedtomcat-embed-websocket9.0.68tomcat-embed-websocket-9.0.68.jar;D:mvnrepositoryorgspringframeworkspring-web5.3.23spring-web-5.3.23.jar;D:mvnrepositoryorgspringframeworkspring-beans5.3.23spring-beans-5.3.23.jar;D:mvnrepositoryorgspringframeworkspring-webmvc5.3.23spring-webmvc-5.3.23.jar;D:mvnrepositoryorgspringframeworkspring-aop5.3.23spring-aop-5.3.23.jar;D:mvnrepositoryorgspringframeworkspring-context5.3.23spring-context-5.3.23.jar;D:mvnrepositoryorgspringframeworkspring-expression5.3.23spring-expression-5.3.23.jar;D:mvnrepositoryorgprojectlomboklombok1.18.24lombok-1.18.24.jar;D:mvnrepositorycomaliyunmqmq-http-sdk1.0.2mq-http-sdk-1.0.2.jar;D:mvnrepositoryorgapachehttpcomponentshttpasyncclient4.1.5httpasyncclient-4.1.5.jar;D:mvnrepositoryorgapachehttpcomponentshttpcore4.4.15httpcore-4.4.15.jar;D:mvnrepositoryorgapachehttpcomponentshttpcore-nio4.4.15httpcore-nio-4.4.15.jar;D:mvnrepositoryorgapachehttpcomponentshttpclient4.5.13httpclient-4.5.13.jar;D:mvnrepositorycommons-codeccommons-codec1.15commons-codec-1.15.jar;D:mvnrepositoryorgapachecommonscommons-lang33.12.0commons-lang3-3.12.0.jar;D:mvnrepositorycommons-loggingcommons-logging1.1.3commons-logging-1.1.3.jar;D:mvnrepositoryorgapachelogginglog4jlog4j-api2.17.2log4j-api-2.17.2.jar;D:mvnrepositoryorgapachelogginglog4jlog4j-core2.17.2log4j-core-2.17.2.jar;D:mvnrepositoryorgapachelogginglog4jlog4j-jcl2.17.2log4j-jcl-2.17.2.jar;D:mvnrepositorycomgooglecodegsongson2.9.1gson-2.9.1.jar;D:mvnrepositoryorgslf4jslf4j-api1.7.36slf4j-api-1.7.36.jar;D:mvnrepositoryorgspringframeworkspring-core5.3.23spring-core-5.3.23.jar;D:mvnrepositoryorgspringframeworkspring-jcl5.3.23spring-jcl-5.3.23.jar" com.example.rocketdemo.util.AliyunMessageConsumerTest
Receive message: Message{MessageID:3AD19CA3000E681A95157491C815BDC8,MessageMD5:39C636AD390E69FDA0821CA29F8E8CC9,RequestID:637DC89A3944310E00E92833,Properties:{a=0, KEYS=MessageKey, __BORNHOST=58.209.156.163}, receiptHandle='3AD19CA3000E681A95157491C815BDC8-MSAxNjY5MTg3NzM4NjYxIDMwMDAwMCAyIDAgY24taGFuZ3pob3Utc2hhcmUtMDItMSA1IDE=', publishTime=1669187710997, nextConsumeTime=1669188038661, firstConsumeTime=1669187738661, consumedTimes=1, messageTag='Safety_Hat_Message_tt', errorMessage=null},topic:Safety_Hat_Message_tt

Process finished with exit code -1

接收到消息,订阅消息成功。

五、完成代码

可点击此处进行下载

参考文档:阿里云 消息队列 RocketMQ Java SDK

最后

以上就是火星上火车为你收集整理的java实现阿里云rocketMQ消息的发送与消费(http协议sdk)一、准备工作二、代码实现三、配置main的日志输出级别四、测试效果五、完成代码的全部内容,希望文章能够帮你解决java实现阿里云rocketMQ消息的发送与消费(http协议sdk)一、准备工作二、代码实现三、配置main的日志输出级别四、测试效果五、完成代码所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(86)

评论列表共有 0 条评论

立即
投稿
返回
顶部