我是靠谱客的博主 坦率故事,最近开发中收集的这篇文章主要介绍阿里云rocketmq简单接入,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

直接上代码
参考文档:阿里云rocketmq
配置文件:

rocketmq:
  access-key:  // AccessKeyId 阿里云身份验证,在阿里云用户信息管理控制台获取。
  secret-key: // AccessKeySecret 阿里云身份验证,在阿里云用户信息管理控制台获取。
  consume-thread-nums: //消费者线程数
  wangfa-name-srv-addr:  // 设置TCP接入域名,进入消息队列RocketMQ版控制台实例详情页面的接入点区域查看。
  wangfa-group-id: // 您在控制台创建的Group ID。
  wangfa-topic: // 普通消息所属的Topic,切勿使用普通消息的Topic来收发其他类型的消息。
  wangfa-tag: // Message Tag可理解为Gmail中的标签,对消息进行再归类,方便Consumer指定过滤条件在消息队列RocketMQ版的服务器过滤。
                    // Tag的具体格式和设置方法,请参见Topic与Tag最佳实践。

参数工具类


import com.aliyun.openservices.ons.api.PropertyKeyConst;
import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

import java.util.List;
import java.util.Properties;

@Data
@Configuration
public class RocketmqProperties {
    @Value("${rocketmq.access-key}")
    private String accessKey;
    @Value("${rocketmq.secret-key}")
    private String secretKey;
    @Value("${rocketmq.consume-thread-nums}")
    private String consumeThreadNums = "20";


    public Properties newMqProperties() {
        Properties properties = new Properties();
        properties.setProperty(PropertyKeyConst.AccessKey, this.accessKey);
        properties.setProperty(PropertyKeyConst.SecretKey, this.secretKey);
        properties.setProperty(PropertyKeyConst.ConsumeThreadNums, this.consumeThreadNums);

        return properties;
    }
}

订阅消息:


import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.bean.ConsumerBean;
import com.aliyun.openservices.ons.api.bean.ProducerBean;
import com.aliyun.openservices.ons.api.bean.Subscription;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

@Configuration
@RequiredArgsConstructor
@Slf4j
public class LcemRocketmqConfig {
    private final RocketmqProperties rocketmqProperties;
    private final LcemTopicMessageListener lcemTopicMessageListener;

    @Value("${rocketmq.wangfa-name-srv-addr}")
    private String nameSrvAddr;

    @Value("${rocketmq.wangfa-group-id}")
    private String groupId;

    @Value("${rocketmq.wangfa-topic}")
    private String topic;

    @Value("${rocketmq.wangfa-tag}")
    private String tag;


    @Bean(initMethod = "start", destroyMethod = "shutdown")
    public ConsumerBean orderStatusConsumer() {
        log.info("init rocketmq................");
        ConsumerBean consumerBean = new ConsumerBean();

        //配置文件
        Properties properties = rocketmqProperties.newMqProperties();
        properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, nameSrvAddr);
        properties.setProperty(PropertyKeyConst.GROUP_ID, groupId);
        //将消费者线程数固定为20个 20为默认值
        consumerBean.setProperties(properties);

        //订阅关系
        Map<Subscription, MessageListener> subscriptionTable = new HashMap<>();
        subscriptionTable.put(getSubscription(), lcemTopicMessageListener);
        consumerBean.setSubscriptionTable(subscriptionTable);

        return consumerBean;
    }

    private Subscription getSubscription() {
        Subscription subscription = new Subscription();
        subscription.setTopic(topic);
        subscription.setExpression(tag);
        return subscription;
    }
}


import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import java.nio.charset.StandardCharsets;

@Slf4j
@RequiredArgsConstructor
@Component
public class LcemTopicMessageListener implements MessageListener {



    @Override
    public Action consume(Message message, ConsumeContext context) {
        try {
            byte[] body = message.getBody();
            log.info("receive message LcemTopicMessageListener, id:{}, topic:{}, tag:{}, body:{}",
                      message.getMsgID(),
                      message.getTopic(),
                      message.getTag(),
                      new String(body, StandardCharsets.UTF_8));
            //todo 调用会员中心接口,发放勋章


            return Action.CommitMessage;
        } catch (Exception e) {
            log.error("consume LcemTopicMessageListener fail, id:{}, topic:{}, tag:{}",
                      message.getMsgID(),
                      message.getTopic(),
                      message.getTag(),
                      e);
            return Action.ReconsumeLater;
        }
    }
}

发送消息


import cn.hutool.core.lang.UUID;
import cn.hutool.json.JSONUtil;
import com.aliyun.openservices.ons.api.*;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import java.util.Properties;

/**
 * @author :xuwei
 * @description:TODO
 * @date :2022/9/15 13:48
 */
@Slf4j
@RequiredArgsConstructor
@Service
public class LcemRocketmqProducer implements LcemRocketMqService {

    @Value("${rocketmq.access-key}")
    private  String accessKey;

    @Value("${rocketmq.secret-key}")
    private String secretKey;

    @Value("${rocketmq.consume-thread-nums}")
    private String consumeThreadNums = "20";

    @Value("${rocketmq.wangfa-name-srv-addr}")
    private String nameSrvAddr;

    @Value("${rocketmq.wangfa-topic}")
    private String topic;

    @Value("${rocketmq.wangfa-tag}")
    private String tag;

    private  static String timeMIlls = "3000";

    @Override
    public void medalIssueMq() {
        //配置文件
        Properties properties = properties();
        Producer producer = ONSFactory.createProducer(properties);
        producer.start();
        //发送消息。
        //todo 触发会员中心发放勋章
        Message msg = new Message(
                "topic_lcem_uat",
                "medalIssue",
                "Hello MQ".getBytes());
        msg.setKey(UUID.fastUUID().toString());
        try {
            SendResult sendResult = producer.send(msg);
            log.info("produce LcemRocketmqProducer result{}", JSONUtil.toJsonStr(sendResult));
        } catch (Exception e) {
            log.info("produce LcemRocketmqProducer exception{}", e.getMessage());
        }

        producer.shutdown();
    }


    public  Properties properties() {
        Properties properties = new Properties();
        properties.setProperty(PropertyKeyConst.AccessKey, accessKey);
        properties.setProperty(PropertyKeyConst.SecretKey, secretKey);
        properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, nameSrvAddr);
        properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, timeMIlls);
        return properties;
    }
}

最后

以上就是坦率故事为你收集整理的阿里云rocketmq简单接入的全部内容,希望文章能够帮你解决阿里云rocketmq简单接入所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(52)

评论列表共有 0 条评论

立即
投稿
返回
顶部