我是靠谱客的博主 火星上火车,这篇文章主要介绍java实现阿里云rocketMQ消息的发送与消费(http协议sdk)一、准备工作二、代码实现三、配置main的日志输出级别四、测试效果五、完成代码,现在分享给大家,希望可以做个参考。

目录

  • 一、准备工作
  • 二、代码实现
    • 1.添加依赖
    • 2.创建一个常量类存放公共参数
    • 3.调用HTTP协议的SDK 发送普通消息
    • 4.调用HTTP协议的SDK 订阅普通消息
  • 三、配置main的日志输出级别
  • 四、测试效果
  • 五、完成代码

一、准备工作

登录阿里云官网,先申请rocketMQ,再申请Topic、Group ID,然后就是参考阿里云的JAVA SDK进行编程实现。

环境要求:
安装JDK 1.6或以上版本
安装Maven
安装Java SDK

参照 阿里云 官方文档,来一步一步操作。
文档提供的SDK有TCP和Http协议,这里使用HTTP协议来实现rocketMQ消息的发送与消费。

在这里插入图片描述

二、代码实现

调用HTTP协议的SDK收发普通消息

1.添加依赖

创建Springboot项目,添加 SDK依赖:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!--aliyun mq sdk--> <dependency> <groupId>com.aliyun.mq</groupId> <artifactId>mq-http-sdk</artifactId> <version>1.0.2</version> </dependency>

注意:aliyun mq sdk的版本信息,请参见版本说明

2.创建一个常量类存放公共参数

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package com.example.rocketdemo.config; /** * @author qzz */ public class MqConfigParams { /** * 你的topic */ public static final String TOPIC = "你的topic"; /** * 消息标签 *:代表全部 */ public static final String TAG = "你的tag"; /** * 你的Group_ID */ public static final String GROUP_ID = "你的Group_ID"; /** *你的accessKey */ public static final String ACCESS_KEY = "你的accessKey"; /** *你的secretKey */ public static final String SECRET_KEY = "你的secretKey"; /** * 实例ID */ public static final String INSTANCE_ID = "你的实例ID"; /** *设置HTTP协议客户端接入点,进入消息队列RocketMQ版控制台实例详情页面的接入点区域查看 */ public static final String HTTP_ENDPOINT = "http接入点地址"; }

3.调用HTTP协议的SDK 发送普通消息

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package com.example.rocketdemo.util; import com.aliyun.mq.http.MQClient; import com.aliyun.mq.http.MQProducer; import com.aliyun.mq.http.model.TopicMessage; import com.example.rocketdemo.config.MqConfigParams; import lombok.extern.slf4j.Slf4j; import java.util.Date; /** * 生产 阿里云 RocketMQ 消息 * @author qzz */ @Slf4j public class AliyunMessageProducerTest { public static void main(String[] args) { MQClient mqClient = new MQClient( // 设置HTTP协议客户端接入点,进入消息队列RocketMQ版控制台实例详情页面的接入点区域查看。 MqConfigParams.HTTP_ENDPOINT, // AccessKey ID,阿里云身份验证,在阿里云RAM控制台创建。 MqConfigParams.ACCESS_KEY, // AccessKey Secret,阿里云身份验证,在阿里云RAM控制台创建。 MqConfigParams.SECRET_KEY ); // 消息所属的Topic,在消息队列RocketMQ版控制台创建。 // 不同消息类型的Topic不能混用,例如普通消息的Topic只能用于收发普通消息,不能用于收发其他类型的消息。 final String topic = MqConfigParams.TOPIC; // Topic所属的实例ID,在消息队列RocketMQ版控制台创建。 // 若实例有命名空间,则实例ID必须传入;若实例无命名空间,则实例ID传入null空值或字符串空值。实例的命名空间可以在消息队列RocketMQ版控制台的实例详情页面查看。 final String instanceId = MqConfigParams.INSTANCE_ID; // 获取Topic的生产者。 MQProducer producer; if (instanceId != null && instanceId != "") { producer = mqClient.getProducer(instanceId, topic); } else { producer = mqClient.getProducer(topic); } try { // 循环发送2条消息。 for (int i = 0; i < 2; i++) { TopicMessage pubMsg; // 普通消息。 pubMsg = new TopicMessage( // 消息内容。 "hello mq 111!".getBytes(), // 消息标签。 MqConfigParams.TAG ); // 设置消息的自定义属性。 pubMsg.getProperties().put("a", String.valueOf(i)); // 设置消息的Key。 pubMsg.setMessageKey("MessageKey"); // 同步发送消息,只要不抛异常就是成功。 TopicMessage pubResultMsg = producer.publishMessage(pubMsg); // 同步发送消息,只要不抛异常就是成功。 System.out.println(new Date() + " Send mq message success. Topic is:" + topic + ", msgId is: " + pubResultMsg.getMessageId() + ", bodyMD5 is: " + pubResultMsg.getMessageBodyMD5()); } } catch (Throwable e) { // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理。 System.out.println(new Date() + " Send mq message failed. Topic is:" + topic); e.printStackTrace(); } mqClient.close(); } }

4.调用HTTP协议的SDK 订阅普通消息

订阅普通消息的代码如下:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
package com.example.rocketdemo.util; import com.aliyun.mq.http.MQClient; import com.aliyun.mq.http.MQConsumer; import com.aliyun.mq.http.common.AckMessageException; import com.aliyun.mq.http.model.Message; import com.example.rocketdemo.config.MqConfigParams; import lombok.extern.slf4j.Slf4j; import java.util.ArrayList; import java.util.List; /** * 消费 阿里云 RocketMQ 消息 * @author qzz */ @Slf4j public class AliyunMessageConsumerTest { public static void main(String[] args) { MQClient mqClient = new MQClient( // 设置HTTP协议客户端接入点,进入消息队列RocketMQ版控制台实例详情页面的接入点区域查看。 MqConfigParams.HTTP_ENDPOINT, // AccessKey ID,阿里云身份验证,在阿里云RAM控制台创建。 MqConfigParams.ACCESS_KEY, // AccessKey Secret,阿里云身份验证,在阿里云RAM控制台创建。 MqConfigParams.SECRET_KEY ); // 消息所属的Topic,在消息队列RocketMQ版控制台创建。 //不同消息类型的Topic不能混用,例如普通消息的Topic只能用于收发普通消息,不能用于收发其他类型的消息。 final String topic = MqConfigParams.TOPIC; // 您在消息队列RocketMQ版控制台创建的Group ID。 final String groupId = MqConfigParams.GROUP_ID; // Topic所属的实例ID,在消息队列RocketMQ版控制台创建。 // 若实例有命名空间,则实例ID必须传入;若实例无命名空间,则实例ID传入null空值或字符串空值。实例的命名空间可以在消息队列RocketMQ版控制台的实例详情页面查看。 final String instanceId = MqConfigParams.INSTANCE_ID; final MQConsumer consumer; if (instanceId != null && instanceId != "") { consumer = mqClient.getConsumer(instanceId, topic, groupId, null); } else { consumer = mqClient.getConsumer(topic, groupId); } // 在当前线程循环消费消息,建议多开个几个线程并发消费消息。 do { List<Message> messages = null; try { // 长轮询消费消息。 // 长轮询表示如果Topic没有消息,则请求会在服务端挂起3s,3s内如果有消息可以消费则立即返回客户端。 messages = consumer.consumeMessage( 1,// 一次最多消费1条消息(最多可设置为16条)。 3// 长轮询时间3秒(最多可设置为30秒)。 ); } catch (Throwable e) { e.printStackTrace(); try { Thread.sleep(2000); } catch (InterruptedException e1) { e1.printStackTrace(); } } // Topic中没有消息可消费。 if (messages == null || messages.isEmpty()) { System.out.println(Thread.currentThread().getName() + ": no new message, continue!"); continue; } // 处理业务逻辑。 for (Message message : messages) { System.out.println("Receive message: " + message+",topic:"+message.getMessageTag()); } // 消息重试时间到达前若不确认消息消费成功,则消息会被重复消费。 // 消息句柄有时间戳,同一条消息每次消费的时间戳都不一样。 { List<String> handles = new ArrayList<String>(); for (Message message : messages) { handles.add(message.getReceiptHandle()); } try { consumer.ackMessage(handles); } catch (Throwable e) { // 某些消息的句柄可能超时,会导致消息消费状态确认不成功。 if (e instanceof AckMessageException) { AckMessageException errors = (AckMessageException) e; System.out.println("Ack message fail, requestId is:" + errors.getRequestId() + ", fail handles:"); if (errors.getErrorMessages() != null) { for (String errorHandle :errors.getErrorMessages().keySet()) { System.out.println("Handle:" + errorHandle + ", ErrorCode:" + errors.getErrorMessages().get(errorHandle).getErrorCode() + ", ErrorMsg:" + errors.getErrorMessages().get(errorHandle).getErrorMessage()); } } continue; } e.printStackTrace(); } } } while (true); } }

三、配置main的日志输出级别

启动执行时,发现问题:控制台打印巨多debug日志

默认情况下,如果项目中集成了Logback等日志框架,在执行main方法时通过其进行日志打印,那么默认的日志级别是debug的

此时,如果是http请求,甚至可以把请求的具体报文信息都打印出来,特别是三方框架的。为了不影响查看正常的日志,可以将main方法的日志级别进行调整

解决方法:
resources目录下增加logback.xml

logback.xml:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
<configuration debug="false"> <logger name="org.apache" level="INFO" /> <logger name="org.apache.http.wire" level="INFO" /> <logger name="org.apache.http.headers" level="INFO" /> <property name="CONSOLE_LOG_PATTERN" value="%date{yyyy-MM-dd HH:mm:ss} %highlight(%-5level) %magenta(%-4relative) --- [%yellow(%15.15thread)] %cyan(%-40.40logger{39}) : %msg%n"/> <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> <encoder> <pattern>${CONSOLE_LOG_PATTERN}</pattern> </encoder> </appender> <root level="ERROR"> <appender-ref ref="STDOUT"/> </root> </configuration>

再次启动,打印日志就简洁明多了。

四、测试效果

生产者启动运行,结果如下:
在这里插入图片描述

消费者启动运行,结果如下:

复制代码
1
2
3
4
5
6
"D:Program FilesJavajdk1.8.0_40binjava.exe" "-javaagent:C:Program FilesJetBrainsIntelliJ IDEA 2021.1.3libidea_rt.jar=51390:C:Program FilesJetBrainsIntelliJ IDEA 2021.1.3bin" -Dfile.encoding=UTF-8 -classpath "D:Program FilesJavajdk1.8.0_40jrelibcharsets.jar;D:Program FilesJavajdk1.8.0_40jrelibdeploy.jar;D:Program FilesJavajdk1.8.0_40jrelibextaccess-bridge-64.jar;D:Program FilesJavajdk1.8.0_40jrelibextcldrdata.jar;D:Program FilesJavajdk1.8.0_40jrelibextdnsns.jar;D:Program FilesJavajdk1.8.0_40jrelibextjaccess.jar;D:Program FilesJavajdk1.8.0_40jrelibextjfxrt.jar;D:Program FilesJavajdk1.8.0_40jrelibextlocaledata.jar;D:Program FilesJavajdk1.8.0_40jrelibextnashorn.jar;D:Program FilesJavajdk1.8.0_40jrelibextsunec.jar;D:Program FilesJavajdk1.8.0_40jrelibextsunjce_provider.jar;D:Program FilesJavajdk1.8.0_40jrelibextsunmscapi.jar;D:Program FilesJavajdk1.8.0_40jrelibextsunpkcs11.jar;D:Program FilesJavajdk1.8.0_40jrelibextzipfs.jar;D:Program FilesJavajdk1.8.0_40jrelibjavaws.jar;D:Program FilesJavajdk1.8.0_40jrelibjce.jar;D:Program FilesJavajdk1.8.0_40jrelibjfr.jar;D:Program FilesJavajdk1.8.0_40jrelibjfxswt.jar;D:Program FilesJavajdk1.8.0_40jrelibjsse.jar;D:Program FilesJavajdk1.8.0_40jrelibmanagement-agent.jar;D:Program FilesJavajdk1.8.0_40jrelibplugin.jar;D:Program FilesJavajdk1.8.0_40jrelibresources.jar;D:Program FilesJavajdk1.8.0_40jrelibrt.jar;F:study-projectrocket-demotargetclasses;D:mvnrepositoryorgspringframeworkbootspring-boot-starter-web2.7.5spring-boot-starter-web-2.7.5.jar;D:mvnrepositoryorgspringframeworkbootspring-boot-starter2.7.5spring-boot-starter-2.7.5.jar;D:mvnrepositoryorgspringframeworkbootspring-boot2.7.5spring-boot-2.7.5.jar;D:mvnrepositoryorgspringframeworkbootspring-boot-autoconfigure2.7.5spring-boot-autoconfigure-2.7.5.jar;D:mvnrepositoryorgspringframeworkbootspring-boot-starter-logging2.7.5spring-boot-starter-logging-2.7.5.jar;D:mvnrepositorychqoslogbacklogback-classic1.2.11logback-classic-1.2.11.jar;D:mvnrepositorychqoslogbacklogback-core1.2.11logback-core-1.2.11.jar;D:mvnrepositoryorgapachelogginglog4jlog4j-to-slf4j2.17.2log4j-to-slf4j-2.17.2.jar;D:mvnrepositoryorgslf4jjul-to-slf4j1.7.36jul-to-slf4j-1.7.36.jar;D:mvnrepositoryjakartaannotationjakarta.annotation-api1.3.5jakarta.annotation-api-1.3.5.jar;D:mvnrepositoryorgyamlsnakeyaml1.30snakeyaml-1.30.jar;D:mvnrepositoryorgspringframeworkbootspring-boot-starter-json2.7.5spring-boot-starter-json-2.7.5.jar;D:mvnrepositorycomfasterxmljacksoncorejackson-databind2.13.4.2jackson-databind-2.13.4.2.jar;D:mvnrepositorycomfasterxmljacksoncorejackson-annotations2.13.4jackson-annotations-2.13.4.jar;D:mvnrepositorycomfasterxmljacksoncorejackson-core2.13.4jackson-core-2.13.4.jar;D:mvnrepositorycomfasterxmljacksondatatypejackson-datatype-jdk82.13.4jackson-datatype-jdk8-2.13.4.jar;D:mvnrepositorycomfasterxmljacksondatatypejackson-datatype-jsr3102.13.4jackson-datatype-jsr310-2.13.4.jar;D:mvnrepositorycomfasterxmljacksonmodulejackson-module-parameter-names2.13.4jackson-module-parameter-names-2.13.4.jar;D:mvnrepositoryorgspringframeworkbootspring-boot-starter-tomcat2.7.5spring-boot-starter-tomcat-2.7.5.jar;D:mvnrepositoryorgapachetomcatembedtomcat-embed-core9.0.68tomcat-embed-core-9.0.68.jar;D:mvnrepositoryorgapachetomcatembedtomcat-embed-el9.0.68tomcat-embed-el-9.0.68.jar;D:mvnrepositoryorgapachetomcatembedtomcat-embed-websocket9.0.68tomcat-embed-websocket-9.0.68.jar;D:mvnrepositoryorgspringframeworkspring-web5.3.23spring-web-5.3.23.jar;D:mvnrepositoryorgspringframeworkspring-beans5.3.23spring-beans-5.3.23.jar;D:mvnrepositoryorgspringframeworkspring-webmvc5.3.23spring-webmvc-5.3.23.jar;D:mvnrepositoryorgspringframeworkspring-aop5.3.23spring-aop-5.3.23.jar;D:mvnrepositoryorgspringframeworkspring-context5.3.23spring-context-5.3.23.jar;D:mvnrepositoryorgspringframeworkspring-expression5.3.23spring-expression-5.3.23.jar;D:mvnrepositoryorgprojectlomboklombok1.18.24lombok-1.18.24.jar;D:mvnrepositorycomaliyunmqmq-http-sdk1.0.2mq-http-sdk-1.0.2.jar;D:mvnrepositoryorgapachehttpcomponentshttpasyncclient4.1.5httpasyncclient-4.1.5.jar;D:mvnrepositoryorgapachehttpcomponentshttpcore4.4.15httpcore-4.4.15.jar;D:mvnrepositoryorgapachehttpcomponentshttpcore-nio4.4.15httpcore-nio-4.4.15.jar;D:mvnrepositoryorgapachehttpcomponentshttpclient4.5.13httpclient-4.5.13.jar;D:mvnrepositorycommons-codeccommons-codec1.15commons-codec-1.15.jar;D:mvnrepositoryorgapachecommonscommons-lang33.12.0commons-lang3-3.12.0.jar;D:mvnrepositorycommons-loggingcommons-logging1.1.3commons-logging-1.1.3.jar;D:mvnrepositoryorgapachelogginglog4jlog4j-api2.17.2log4j-api-2.17.2.jar;D:mvnrepositoryorgapachelogginglog4jlog4j-core2.17.2log4j-core-2.17.2.jar;D:mvnrepositoryorgapachelogginglog4jlog4j-jcl2.17.2log4j-jcl-2.17.2.jar;D:mvnrepositorycomgooglecodegsongson2.9.1gson-2.9.1.jar;D:mvnrepositoryorgslf4jslf4j-api1.7.36slf4j-api-1.7.36.jar;D:mvnrepositoryorgspringframeworkspring-core5.3.23spring-core-5.3.23.jar;D:mvnrepositoryorgspringframeworkspring-jcl5.3.23spring-jcl-5.3.23.jar" com.example.rocketdemo.util.AliyunMessageConsumerTest Receive message: Message{MessageID:3AD19CA3000E681A95157491C815BDC8,MessageMD5:39C636AD390E69FDA0821CA29F8E8CC9,RequestID:637DC89A3944310E00E92833,Properties:{a=0, KEYS=MessageKey, __BORNHOST=58.209.156.163}, receiptHandle='3AD19CA3000E681A95157491C815BDC8-MSAxNjY5MTg3NzM4NjYxIDMwMDAwMCAyIDAgY24taGFuZ3pob3Utc2hhcmUtMDItMSA1IDE=', publishTime=1669187710997, nextConsumeTime=1669188038661, firstConsumeTime=1669187738661, consumedTimes=1, messageTag='Safety_Hat_Message_tt', errorMessage=null},topic:Safety_Hat_Message_tt Process finished with exit code -1

接收到消息,订阅消息成功。

五、完成代码

可点击此处进行下载

参考文档:阿里云 消息队列 RocketMQ Java SDK

最后

以上就是火星上火车最近收集整理的关于java实现阿里云rocketMQ消息的发送与消费(http协议sdk)一、准备工作二、代码实现三、配置main的日志输出级别四、测试效果五、完成代码的全部内容,更多相关java实现阿里云rocketMQ消息内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(86)

评论列表共有 0 条评论

立即
投稿
返回
顶部