概述
目录
- 一、准备工作
- 二、代码实现
- 1.添加依赖
- 2.创建一个常量类存放公共参数
- 3.调用HTTP协议的SDK 发送普通消息
- 4.调用HTTP协议的SDK 订阅普通消息
- 三、配置main的日志输出级别
- 四、测试效果
- 五、完成代码
一、准备工作
登录阿里云官网,先申请rocketMQ,再申请Topic、Group ID,然后就是参考阿里云的JAVA SDK进行编程实现。
环境要求:
安装JDK 1.6或以上版本
安装Maven
安装Java SDK
参照 阿里云 官方文档,来一步一步操作。
文档提供的SDK有TCP和Http协议
,这里使用HTTP协议来实现rocketMQ消息的发送与消费。
二、代码实现
调用HTTP协议的SDK收发普通消息
1.添加依赖
创建Springboot项目,添加 SDK依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--aliyun mq sdk-->
<dependency>
<groupId>com.aliyun.mq</groupId>
<artifactId>mq-http-sdk</artifactId>
<version>1.0.2</version>
</dependency>
注意:aliyun mq sdk的版本信息,请参见版本说明
2.创建一个常量类存放公共参数
package com.example.rocketdemo.config;
/**
* @author qzz
*/
public class MqConfigParams {
/**
* 你的topic
*/
public static final String TOPIC = "你的topic";
/**
* 消息标签 *:代表全部
*/
public static final String TAG = "你的tag";
/**
* 你的Group_ID
*/
public static final String GROUP_ID = "你的Group_ID";
/**
*你的accessKey
*/
public static final String ACCESS_KEY = "你的accessKey";
/**
*你的secretKey
*/
public static final String SECRET_KEY = "你的secretKey";
/**
* 实例ID
*/
public static final String INSTANCE_ID = "你的实例ID";
/**
*设置HTTP协议客户端接入点,进入消息队列RocketMQ版控制台实例详情页面的接入点区域查看
*/
public static final String HTTP_ENDPOINT = "http接入点地址";
}
3.调用HTTP协议的SDK 发送普通消息
package com.example.rocketdemo.util;
import com.aliyun.mq.http.MQClient;
import com.aliyun.mq.http.MQProducer;
import com.aliyun.mq.http.model.TopicMessage;
import com.example.rocketdemo.config.MqConfigParams;
import lombok.extern.slf4j.Slf4j;
import java.util.Date;
/**
* 生产 阿里云 RocketMQ 消息
* @author qzz
*/
@Slf4j
public class AliyunMessageProducerTest {
public static void main(String[] args) {
MQClient mqClient = new MQClient(
// 设置HTTP协议客户端接入点,进入消息队列RocketMQ版控制台实例详情页面的接入点区域查看。
MqConfigParams.HTTP_ENDPOINT,
// AccessKey ID,阿里云身份验证,在阿里云RAM控制台创建。
MqConfigParams.ACCESS_KEY,
// AccessKey Secret,阿里云身份验证,在阿里云RAM控制台创建。
MqConfigParams.SECRET_KEY
);
// 消息所属的Topic,在消息队列RocketMQ版控制台创建。
// 不同消息类型的Topic不能混用,例如普通消息的Topic只能用于收发普通消息,不能用于收发其他类型的消息。
final String topic = MqConfigParams.TOPIC;
// Topic所属的实例ID,在消息队列RocketMQ版控制台创建。
// 若实例有命名空间,则实例ID必须传入;若实例无命名空间,则实例ID传入null空值或字符串空值。实例的命名空间可以在消息队列RocketMQ版控制台的实例详情页面查看。
final String instanceId = MqConfigParams.INSTANCE_ID;
// 获取Topic的生产者。
MQProducer producer;
if (instanceId != null && instanceId != "") {
producer = mqClient.getProducer(instanceId, topic);
} else {
producer = mqClient.getProducer(topic);
}
try {
// 循环发送2条消息。
for (int i = 0; i < 2; i++) {
TopicMessage pubMsg; // 普通消息。
pubMsg = new TopicMessage(
// 消息内容。
"hello mq 111!".getBytes(),
// 消息标签。
MqConfigParams.TAG
);
// 设置消息的自定义属性。
pubMsg.getProperties().put("a", String.valueOf(i));
// 设置消息的Key。
pubMsg.setMessageKey("MessageKey");
// 同步发送消息,只要不抛异常就是成功。
TopicMessage pubResultMsg = producer.publishMessage(pubMsg);
// 同步发送消息,只要不抛异常就是成功。
System.out.println(new Date() + " Send mq message success. Topic is:" + topic + ", msgId is: " + pubResultMsg.getMessageId()
+ ", bodyMD5 is: " + pubResultMsg.getMessageBodyMD5());
}
} catch (Throwable e) {
// 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理。
System.out.println(new Date() + " Send mq message failed. Topic is:" + topic);
e.printStackTrace();
}
mqClient.close();
}
}
4.调用HTTP协议的SDK 订阅普通消息
订阅普通消息的代码如下:
package com.example.rocketdemo.util;
import com.aliyun.mq.http.MQClient;
import com.aliyun.mq.http.MQConsumer;
import com.aliyun.mq.http.common.AckMessageException;
import com.aliyun.mq.http.model.Message;
import com.example.rocketdemo.config.MqConfigParams;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.List;
/**
* 消费 阿里云 RocketMQ 消息
* @author qzz
*/
@Slf4j
public class AliyunMessageConsumerTest {
public static void main(String[] args) {
MQClient mqClient = new MQClient(
// 设置HTTP协议客户端接入点,进入消息队列RocketMQ版控制台实例详情页面的接入点区域查看。
MqConfigParams.HTTP_ENDPOINT,
// AccessKey ID,阿里云身份验证,在阿里云RAM控制台创建。
MqConfigParams.ACCESS_KEY,
// AccessKey Secret,阿里云身份验证,在阿里云RAM控制台创建。
MqConfigParams.SECRET_KEY
);
// 消息所属的Topic,在消息队列RocketMQ版控制台创建。
//不同消息类型的Topic不能混用,例如普通消息的Topic只能用于收发普通消息,不能用于收发其他类型的消息。
final String topic = MqConfigParams.TOPIC;
// 您在消息队列RocketMQ版控制台创建的Group ID。
final String groupId = MqConfigParams.GROUP_ID;
// Topic所属的实例ID,在消息队列RocketMQ版控制台创建。
// 若实例有命名空间,则实例ID必须传入;若实例无命名空间,则实例ID传入null空值或字符串空值。实例的命名空间可以在消息队列RocketMQ版控制台的实例详情页面查看。
final String instanceId = MqConfigParams.INSTANCE_ID;
final MQConsumer consumer;
if (instanceId != null && instanceId != "") {
consumer = mqClient.getConsumer(instanceId, topic, groupId, null);
} else {
consumer = mqClient.getConsumer(topic, groupId);
}
// 在当前线程循环消费消息,建议多开个几个线程并发消费消息。
do {
List<Message> messages = null;
try {
// 长轮询消费消息。
// 长轮询表示如果Topic没有消息,则请求会在服务端挂起3s,3s内如果有消息可以消费则立即返回客户端。
messages = consumer.consumeMessage(
1,// 一次最多消费1条消息(最多可设置为16条)。
3// 长轮询时间3秒(最多可设置为30秒)。
);
} catch (Throwable e) {
e.printStackTrace();
try {
Thread.sleep(2000);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
// Topic中没有消息可消费。
if (messages == null || messages.isEmpty()) {
System.out.println(Thread.currentThread().getName() + ": no new message, continue!");
continue;
}
// 处理业务逻辑。
for (Message message : messages) {
System.out.println("Receive message: " + message+",topic:"+message.getMessageTag());
}
// 消息重试时间到达前若不确认消息消费成功,则消息会被重复消费。
// 消息句柄有时间戳,同一条消息每次消费的时间戳都不一样。
{
List<String> handles = new ArrayList<String>();
for (Message message : messages) {
handles.add(message.getReceiptHandle());
}
try {
consumer.ackMessage(handles);
} catch (Throwable e) {
// 某些消息的句柄可能超时,会导致消息消费状态确认不成功。
if (e instanceof AckMessageException) {
AckMessageException errors = (AckMessageException) e;
System.out.println("Ack message fail, requestId is:" + errors.getRequestId() + ", fail handles:");
if (errors.getErrorMessages() != null) {
for (String errorHandle :errors.getErrorMessages().keySet()) {
System.out.println("Handle:" + errorHandle + ", ErrorCode:" + errors.getErrorMessages().get(errorHandle).getErrorCode()
+ ", ErrorMsg:" + errors.getErrorMessages().get(errorHandle).getErrorMessage());
}
}
continue;
}
e.printStackTrace();
}
}
} while (true);
}
}
三、配置main的日志输出级别
启动执行时,发现问题:控制台打印巨多debug日志
默认情况下,如果项目中集成了Logback等日志框架,在执行main方法时通过其进行日志打印,那么默认的日志级别是debug的。
此时,如果是http请求,甚至可以把请求的具体报文信息都打印出来,特别是三方框架的。为了不影响查看正常的日志,可以将main方法的日志级别进行调整
。
解决方法:
resources目录下增加logback.xml
logback.xml:
<configuration debug="false">
<logger name="org.apache" level="INFO" />
<logger name="org.apache.http.wire" level="INFO" />
<logger name="org.apache.http.headers" level="INFO" />
<property name="CONSOLE_LOG_PATTERN"
value="%date{yyyy-MM-dd HH:mm:ss} %highlight(%-5level) %magenta(%-4relative) --- [%yellow(%15.15thread)] %cyan(%-40.40logger{39}) : %msg%n"/>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>${CONSOLE_LOG_PATTERN}</pattern>
</encoder>
</appender>
<root level="ERROR">
<appender-ref ref="STDOUT"/>
</root>
</configuration>
再次启动,打印日志就简洁明多了。
四、测试效果
生产者启动运行,结果如下:
消费者启动运行,结果如下:
"D:Program FilesJavajdk1.8.0_40binjava.exe" "-javaagent:C:Program FilesJetBrainsIntelliJ IDEA 2021.1.3libidea_rt.jar=51390:C:Program FilesJetBrainsIntelliJ IDEA 2021.1.3bin" -Dfile.encoding=UTF-8 -classpath "D:Program FilesJavajdk1.8.0_40jrelibcharsets.jar;D:Program FilesJavajdk1.8.0_40jrelibdeploy.jar;D:Program FilesJavajdk1.8.0_40jrelibextaccess-bridge-64.jar;D:Program FilesJavajdk1.8.0_40jrelibextcldrdata.jar;D:Program FilesJavajdk1.8.0_40jrelibextdnsns.jar;D:Program FilesJavajdk1.8.0_40jrelibextjaccess.jar;D:Program FilesJavajdk1.8.0_40jrelibextjfxrt.jar;D:Program FilesJavajdk1.8.0_40jrelibextlocaledata.jar;D:Program FilesJavajdk1.8.0_40jrelibextnashorn.jar;D:Program FilesJavajdk1.8.0_40jrelibextsunec.jar;D:Program FilesJavajdk1.8.0_40jrelibextsunjce_provider.jar;D:Program FilesJavajdk1.8.0_40jrelibextsunmscapi.jar;D:Program FilesJavajdk1.8.0_40jrelibextsunpkcs11.jar;D:Program FilesJavajdk1.8.0_40jrelibextzipfs.jar;D:Program FilesJavajdk1.8.0_40jrelibjavaws.jar;D:Program FilesJavajdk1.8.0_40jrelibjce.jar;D:Program FilesJavajdk1.8.0_40jrelibjfr.jar;D:Program FilesJavajdk1.8.0_40jrelibjfxswt.jar;D:Program FilesJavajdk1.8.0_40jrelibjsse.jar;D:Program FilesJavajdk1.8.0_40jrelibmanagement-agent.jar;D:Program FilesJavajdk1.8.0_40jrelibplugin.jar;D:Program FilesJavajdk1.8.0_40jrelibresources.jar;D:Program FilesJavajdk1.8.0_40jrelibrt.jar;F:study-projectrocket-demotargetclasses;D:mvnrepositoryorgspringframeworkbootspring-boot-starter-web2.7.5spring-boot-starter-web-2.7.5.jar;D:mvnrepositoryorgspringframeworkbootspring-boot-starter2.7.5spring-boot-starter-2.7.5.jar;D:mvnrepositoryorgspringframeworkbootspring-boot2.7.5spring-boot-2.7.5.jar;D:mvnrepositoryorgspringframeworkbootspring-boot-autoconfigure2.7.5spring-boot-autoconfigure-2.7.5.jar;D:mvnrepositoryorgspringframeworkbootspring-boot-starter-logging2.7.5spring-boot-starter-logging-2.7.5.jar;D:mvnrepositorychqoslogbacklogback-classic1.2.11logback-classic-1.2.11.jar;D:mvnrepositorychqoslogbacklogback-core1.2.11logback-core-1.2.11.jar;D:mvnrepositoryorgapachelogginglog4jlog4j-to-slf4j2.17.2log4j-to-slf4j-2.17.2.jar;D:mvnrepositoryorgslf4jjul-to-slf4j1.7.36jul-to-slf4j-1.7.36.jar;D:mvnrepositoryjakartaannotationjakarta.annotation-api1.3.5jakarta.annotation-api-1.3.5.jar;D:mvnrepositoryorgyamlsnakeyaml1.30snakeyaml-1.30.jar;D:mvnrepositoryorgspringframeworkbootspring-boot-starter-json2.7.5spring-boot-starter-json-2.7.5.jar;D:mvnrepositorycomfasterxmljacksoncorejackson-databind2.13.4.2jackson-databind-2.13.4.2.jar;D:mvnrepositorycomfasterxmljacksoncorejackson-annotations2.13.4jackson-annotations-2.13.4.jar;D:mvnrepositorycomfasterxmljacksoncorejackson-core2.13.4jackson-core-2.13.4.jar;D:mvnrepositorycomfasterxmljacksondatatypejackson-datatype-jdk82.13.4jackson-datatype-jdk8-2.13.4.jar;D:mvnrepositorycomfasterxmljacksondatatypejackson-datatype-jsr3102.13.4jackson-datatype-jsr310-2.13.4.jar;D:mvnrepositorycomfasterxmljacksonmodulejackson-module-parameter-names2.13.4jackson-module-parameter-names-2.13.4.jar;D:mvnrepositoryorgspringframeworkbootspring-boot-starter-tomcat2.7.5spring-boot-starter-tomcat-2.7.5.jar;D:mvnrepositoryorgapachetomcatembedtomcat-embed-core9.0.68tomcat-embed-core-9.0.68.jar;D:mvnrepositoryorgapachetomcatembedtomcat-embed-el9.0.68tomcat-embed-el-9.0.68.jar;D:mvnrepositoryorgapachetomcatembedtomcat-embed-websocket9.0.68tomcat-embed-websocket-9.0.68.jar;D:mvnrepositoryorgspringframeworkspring-web5.3.23spring-web-5.3.23.jar;D:mvnrepositoryorgspringframeworkspring-beans5.3.23spring-beans-5.3.23.jar;D:mvnrepositoryorgspringframeworkspring-webmvc5.3.23spring-webmvc-5.3.23.jar;D:mvnrepositoryorgspringframeworkspring-aop5.3.23spring-aop-5.3.23.jar;D:mvnrepositoryorgspringframeworkspring-context5.3.23spring-context-5.3.23.jar;D:mvnrepositoryorgspringframeworkspring-expression5.3.23spring-expression-5.3.23.jar;D:mvnrepositoryorgprojectlomboklombok1.18.24lombok-1.18.24.jar;D:mvnrepositorycomaliyunmqmq-http-sdk1.0.2mq-http-sdk-1.0.2.jar;D:mvnrepositoryorgapachehttpcomponentshttpasyncclient4.1.5httpasyncclient-4.1.5.jar;D:mvnrepositoryorgapachehttpcomponentshttpcore4.4.15httpcore-4.4.15.jar;D:mvnrepositoryorgapachehttpcomponentshttpcore-nio4.4.15httpcore-nio-4.4.15.jar;D:mvnrepositoryorgapachehttpcomponentshttpclient4.5.13httpclient-4.5.13.jar;D:mvnrepositorycommons-codeccommons-codec1.15commons-codec-1.15.jar;D:mvnrepositoryorgapachecommonscommons-lang33.12.0commons-lang3-3.12.0.jar;D:mvnrepositorycommons-loggingcommons-logging1.1.3commons-logging-1.1.3.jar;D:mvnrepositoryorgapachelogginglog4jlog4j-api2.17.2log4j-api-2.17.2.jar;D:mvnrepositoryorgapachelogginglog4jlog4j-core2.17.2log4j-core-2.17.2.jar;D:mvnrepositoryorgapachelogginglog4jlog4j-jcl2.17.2log4j-jcl-2.17.2.jar;D:mvnrepositorycomgooglecodegsongson2.9.1gson-2.9.1.jar;D:mvnrepositoryorgslf4jslf4j-api1.7.36slf4j-api-1.7.36.jar;D:mvnrepositoryorgspringframeworkspring-core5.3.23spring-core-5.3.23.jar;D:mvnrepositoryorgspringframeworkspring-jcl5.3.23spring-jcl-5.3.23.jar" com.example.rocketdemo.util.AliyunMessageConsumerTest
Receive message: Message{MessageID:3AD19CA3000E681A95157491C815BDC8,MessageMD5:39C636AD390E69FDA0821CA29F8E8CC9,RequestID:637DC89A3944310E00E92833,Properties:{a=0, KEYS=MessageKey, __BORNHOST=58.209.156.163}, receiptHandle='3AD19CA3000E681A95157491C815BDC8-MSAxNjY5MTg3NzM4NjYxIDMwMDAwMCAyIDAgY24taGFuZ3pob3Utc2hhcmUtMDItMSA1IDE=', publishTime=1669187710997, nextConsumeTime=1669188038661, firstConsumeTime=1669187738661, consumedTimes=1, messageTag='Safety_Hat_Message_tt', errorMessage=null},topic:Safety_Hat_Message_tt
Process finished with exit code -1
接收到消息,订阅消息成功。
五、完成代码
可点击此处进行下载
参考文档:阿里云 消息队列 RocketMQ Java SDK
最后
以上就是火星上火车为你收集整理的java实现阿里云rocketMQ消息的发送与消费(http协议sdk)一、准备工作二、代码实现三、配置main的日志输出级别四、测试效果五、完成代码的全部内容,希望文章能够帮你解决java实现阿里云rocketMQ消息的发送与消费(http协议sdk)一、准备工作二、代码实现三、配置main的日志输出级别四、测试效果五、完成代码所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复