概述
直接上代码
参考文档:阿里云rocketmq
配置文件:
rocketmq:
access-key: // AccessKeyId 阿里云身份验证,在阿里云用户信息管理控制台获取。
secret-key: // AccessKeySecret 阿里云身份验证,在阿里云用户信息管理控制台获取。
consume-thread-nums: //消费者线程数
wangfa-name-srv-addr: // 设置TCP接入域名,进入消息队列RocketMQ版控制台实例详情页面的接入点区域查看。
wangfa-group-id: // 您在控制台创建的Group ID。
wangfa-topic: // 普通消息所属的Topic,切勿使用普通消息的Topic来收发其他类型的消息。
wangfa-tag: // Message Tag可理解为Gmail中的标签,对消息进行再归类,方便Consumer指定过滤条件在消息队列RocketMQ版的服务器过滤。
// Tag的具体格式和设置方法,请参见Topic与Tag最佳实践。
参数工具类
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import java.util.List;
import java.util.Properties;
@Data
@Configuration
public class RocketmqProperties {
@Value("${rocketmq.access-key}")
private String accessKey;
@Value("${rocketmq.secret-key}")
private String secretKey;
@Value("${rocketmq.consume-thread-nums}")
private String consumeThreadNums = "20";
public Properties newMqProperties() {
Properties properties = new Properties();
properties.setProperty(PropertyKeyConst.AccessKey, this.accessKey);
properties.setProperty(PropertyKeyConst.SecretKey, this.secretKey);
properties.setProperty(PropertyKeyConst.ConsumeThreadNums, this.consumeThreadNums);
return properties;
}
}
订阅消息:
import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.bean.ConsumerBean;
import com.aliyun.openservices.ons.api.bean.ProducerBean;
import com.aliyun.openservices.ons.api.bean.Subscription;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
@Configuration
@RequiredArgsConstructor
@Slf4j
public class LcemRocketmqConfig {
private final RocketmqProperties rocketmqProperties;
private final LcemTopicMessageListener lcemTopicMessageListener;
@Value("${rocketmq.wangfa-name-srv-addr}")
private String nameSrvAddr;
@Value("${rocketmq.wangfa-group-id}")
private String groupId;
@Value("${rocketmq.wangfa-topic}")
private String topic;
@Value("${rocketmq.wangfa-tag}")
private String tag;
@Bean(initMethod = "start", destroyMethod = "shutdown")
public ConsumerBean orderStatusConsumer() {
log.info("init rocketmq................");
ConsumerBean consumerBean = new ConsumerBean();
//配置文件
Properties properties = rocketmqProperties.newMqProperties();
properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, nameSrvAddr);
properties.setProperty(PropertyKeyConst.GROUP_ID, groupId);
//将消费者线程数固定为20个 20为默认值
consumerBean.setProperties(properties);
//订阅关系
Map<Subscription, MessageListener> subscriptionTable = new HashMap<>();
subscriptionTable.put(getSubscription(), lcemTopicMessageListener);
consumerBean.setSubscriptionTable(subscriptionTable);
return consumerBean;
}
private Subscription getSubscription() {
Subscription subscription = new Subscription();
subscription.setTopic(topic);
subscription.setExpression(tag);
return subscription;
}
}
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;
@Slf4j
@RequiredArgsConstructor
@Component
public class LcemTopicMessageListener implements MessageListener {
@Override
public Action consume(Message message, ConsumeContext context) {
try {
byte[] body = message.getBody();
log.info("receive message LcemTopicMessageListener, id:{}, topic:{}, tag:{}, body:{}",
message.getMsgID(),
message.getTopic(),
message.getTag(),
new String(body, StandardCharsets.UTF_8));
//todo 调用会员中心接口,发放勋章
return Action.CommitMessage;
} catch (Exception e) {
log.error("consume LcemTopicMessageListener fail, id:{}, topic:{}, tag:{}",
message.getMsgID(),
message.getTopic(),
message.getTag(),
e);
return Action.ReconsumeLater;
}
}
}
发送消息
import cn.hutool.core.lang.UUID;
import cn.hutool.json.JSONUtil;
import com.aliyun.openservices.ons.api.*;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import java.util.Properties;
/**
* @author :xuwei
* @description:TODO
* @date :2022/9/15 13:48
*/
@Slf4j
@RequiredArgsConstructor
@Service
public class LcemRocketmqProducer implements LcemRocketMqService {
@Value("${rocketmq.access-key}")
private String accessKey;
@Value("${rocketmq.secret-key}")
private String secretKey;
@Value("${rocketmq.consume-thread-nums}")
private String consumeThreadNums = "20";
@Value("${rocketmq.wangfa-name-srv-addr}")
private String nameSrvAddr;
@Value("${rocketmq.wangfa-topic}")
private String topic;
@Value("${rocketmq.wangfa-tag}")
private String tag;
private static String timeMIlls = "3000";
@Override
public void medalIssueMq() {
//配置文件
Properties properties = properties();
Producer producer = ONSFactory.createProducer(properties);
producer.start();
//发送消息。
//todo 触发会员中心发放勋章
Message msg = new Message(
"topic_lcem_uat",
"medalIssue",
"Hello MQ".getBytes());
msg.setKey(UUID.fastUUID().toString());
try {
SendResult sendResult = producer.send(msg);
log.info("produce LcemRocketmqProducer result{}", JSONUtil.toJsonStr(sendResult));
} catch (Exception e) {
log.info("produce LcemRocketmqProducer exception{}", e.getMessage());
}
producer.shutdown();
}
public Properties properties() {
Properties properties = new Properties();
properties.setProperty(PropertyKeyConst.AccessKey, accessKey);
properties.setProperty(PropertyKeyConst.SecretKey, secretKey);
properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, nameSrvAddr);
properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, timeMIlls);
return properties;
}
}
最后
以上就是坦率故事为你收集整理的阿里云rocketmq简单接入的全部内容,希望文章能够帮你解决阿里云rocketmq简单接入所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复