我是靠谱客的博主 坦率故事,这篇文章主要介绍阿里云rocketmq简单接入,现在分享给大家,希望可以做个参考。

直接上代码
参考文档:阿里云rocketmq
配置文件:

复制代码
1
2
3
4
5
6
7
8
9
10
11
rocketmq: access-key: // AccessKeyId 阿里云身份验证,在阿里云用户信息管理控制台获取。 secret-key: // AccessKeySecret 阿里云身份验证,在阿里云用户信息管理控制台获取。 consume-thread-nums: //消费者线程数 wangfa-name-srv-addr: // 设置TCP接入域名,进入消息队列RocketMQ版控制台实例详情页面的接入点区域查看。 wangfa-group-id: // 您在控制台创建的Group ID。 wangfa-topic: // 普通消息所属的Topic,切勿使用普通消息的Topic来收发其他类型的消息。 wangfa-tag: // Message Tag可理解为Gmail中的标签,对消息进行再归类,方便Consumer指定过滤条件在消息队列RocketMQ版的服务器过滤。 // Tag的具体格式和设置方法,请参见Topic与Tag最佳实践。

参数工具类

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import com.aliyun.openservices.ons.api.PropertyKeyConst; import lombok.Data; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Configuration; import java.util.List; import java.util.Properties; @Data @Configuration public class RocketmqProperties { @Value("${rocketmq.access-key}") private String accessKey; @Value("${rocketmq.secret-key}") private String secretKey; @Value("${rocketmq.consume-thread-nums}") private String consumeThreadNums = "20"; public Properties newMqProperties() { Properties properties = new Properties(); properties.setProperty(PropertyKeyConst.AccessKey, this.accessKey); properties.setProperty(PropertyKeyConst.SecretKey, this.secretKey); properties.setProperty(PropertyKeyConst.ConsumeThreadNums, this.consumeThreadNums); return properties; } }

订阅消息:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import com.aliyun.openservices.ons.api.MessageListener; import com.aliyun.openservices.ons.api.PropertyKeyConst; import com.aliyun.openservices.ons.api.bean.ConsumerBean; import com.aliyun.openservices.ons.api.bean.ProducerBean; import com.aliyun.openservices.ons.api.bean.Subscription; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; import java.util.Properties; @Configuration @RequiredArgsConstructor @Slf4j public class LcemRocketmqConfig { private final RocketmqProperties rocketmqProperties; private final LcemTopicMessageListener lcemTopicMessageListener; @Value("${rocketmq.wangfa-name-srv-addr}") private String nameSrvAddr; @Value("${rocketmq.wangfa-group-id}") private String groupId; @Value("${rocketmq.wangfa-topic}") private String topic; @Value("${rocketmq.wangfa-tag}") private String tag; @Bean(initMethod = "start", destroyMethod = "shutdown") public ConsumerBean orderStatusConsumer() { log.info("init rocketmq................"); ConsumerBean consumerBean = new ConsumerBean(); //配置文件 Properties properties = rocketmqProperties.newMqProperties(); properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, nameSrvAddr); properties.setProperty(PropertyKeyConst.GROUP_ID, groupId); //将消费者线程数固定为20个 20为默认值 consumerBean.setProperties(properties); //订阅关系 Map<Subscription, MessageListener> subscriptionTable = new HashMap<>(); subscriptionTable.put(getSubscription(), lcemTopicMessageListener); consumerBean.setSubscriptionTable(subscriptionTable); return consumerBean; } private Subscription getSubscription() { Subscription subscription = new Subscription(); subscription.setTopic(topic); subscription.setExpression(tag); return subscription; } }
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import com.aliyun.openservices.ons.api.Action; import com.aliyun.openservices.ons.api.ConsumeContext; import com.aliyun.openservices.ons.api.Message; import com.aliyun.openservices.ons.api.MessageListener; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import java.nio.charset.StandardCharsets; @Slf4j @RequiredArgsConstructor @Component public class LcemTopicMessageListener implements MessageListener { @Override public Action consume(Message message, ConsumeContext context) { try { byte[] body = message.getBody(); log.info("receive message LcemTopicMessageListener, id:{}, topic:{}, tag:{}, body:{}", message.getMsgID(), message.getTopic(), message.getTag(), new String(body, StandardCharsets.UTF_8)); //todo 调用会员中心接口,发放勋章 return Action.CommitMessage; } catch (Exception e) { log.error("consume LcemTopicMessageListener fail, id:{}, topic:{}, tag:{}", message.getMsgID(), message.getTopic(), message.getTag(), e); return Action.ReconsumeLater; } } }

发送消息

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
import cn.hutool.core.lang.UUID; import cn.hutool.json.JSONUtil; import com.aliyun.openservices.ons.api.*; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import java.util.Properties; /** * @author :xuwei * @description:TODO * @date :2022/9/15 13:48 */ @Slf4j @RequiredArgsConstructor @Service public class LcemRocketmqProducer implements LcemRocketMqService { @Value("${rocketmq.access-key}") private String accessKey; @Value("${rocketmq.secret-key}") private String secretKey; @Value("${rocketmq.consume-thread-nums}") private String consumeThreadNums = "20"; @Value("${rocketmq.wangfa-name-srv-addr}") private String nameSrvAddr; @Value("${rocketmq.wangfa-topic}") private String topic; @Value("${rocketmq.wangfa-tag}") private String tag; private static String timeMIlls = "3000"; @Override public void medalIssueMq() { //配置文件 Properties properties = properties(); Producer producer = ONSFactory.createProducer(properties); producer.start(); //发送消息。 //todo 触发会员中心发放勋章 Message msg = new Message( "topic_lcem_uat", "medalIssue", "Hello MQ".getBytes()); msg.setKey(UUID.fastUUID().toString()); try { SendResult sendResult = producer.send(msg); log.info("produce LcemRocketmqProducer result{}", JSONUtil.toJsonStr(sendResult)); } catch (Exception e) { log.info("produce LcemRocketmqProducer exception{}", e.getMessage()); } producer.shutdown(); } public Properties properties() { Properties properties = new Properties(); properties.setProperty(PropertyKeyConst.AccessKey, accessKey); properties.setProperty(PropertyKeyConst.SecretKey, secretKey); properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, nameSrvAddr); properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, timeMIlls); return properties; } }

最后

以上就是坦率故事最近收集整理的关于阿里云rocketmq简单接入的全部内容,更多相关阿里云rocketmq简单接入内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(248)

评论列表共有 0 条评论

立即
投稿
返回
顶部