我是靠谱客的博主 无辜小馒头,最近开发中收集的这篇文章主要介绍Thingsboard源码探索(1)初识项目分析服务引用,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

  • 该系列文章基于Thingsboard release-3.0分支的源码进行分析,可能与最新的特性有所区别。

初识项目

拉代码

把我们把代码pull下来,打开IDEA,相信大多数人的反应会是,卧槽,这什么项目,怎么这么多模块!

我刚把项目拉下来的时候也是一脸懵逼,完全不知道这么多module是怎么划分的,脸上大写的两个字,卧槽!

整理思路

既来之则安之,虽然是满脸的卧槽,但也只能静下心来慢慢分析了。其实tb的源码模块化做的还是不错的,起码让我写这代码我写不到这程度,不理解还是由于模块之间的依赖关系不清晰导致的。

找官方文档

根据tb的官网我们知道tb主要由一下这几种微服务

tb-http-transport 
tb-mqtt-transport
tb-coap-transport
tb-core
tb-rule-engine
tb-js-executor
tb-web-ui

官网链接:[https://thingsboard.io/docs/reference/](https://thingsboard.io/docs/reference/)

从源码里面找到对应的服务

既然都已经知道有哪些服务了,那我只要在这一堆模块中找到对应服务的启动类就完事儿了,然后按照这个逻辑不就一下子就能找到对应服务的代码了嘛。照着这个思路,一番查找之后找到了一下几个服务

tb-http-transport---./transport/http
tb-mqtt-transport---./transport/mqtt
tb-coap-transport---./transport/coap
tb-js-executor---./msa/js-executor
tb-web-ui---./ui-ngx

其中js-executor和web-ui因为是ng和前端写的,所以可以分开来看,但是这几个transport就一个启动类是个什么鬼,而且也没有找到tb-core以及tb-rule-engine,只有ThingsboardInstallApplication和ThingsboardServerApplication这俩莫名其妙的启动类,这时候心里已经开始骂娘了。

虽然没法一下子就搞清楚项目,但起码还是通过几个启动类把服务大致划分了一下,接下来那就只能看一下这几个启动类对应服务的依赖关系了,看看能不能找到一些线索。

分析服务引用

分析transport服务的引用

由于transport是一个比较独立的模块,且不同的transport功能基本一致,所以先从这个开始分析,我先选取了mqtt-transport这个模块进行分析,它的pom文件展示了它的依赖关系

        <dependency>
            <groupId>org.thingsboard.common.transport</groupId>
            <artifactId>mqtt</artifactId>
        </dependency>
        <dependency>
            <groupId>org.thingsboard.common</groupId>
            <artifactId>queue</artifactId>
        </dependency>

可以看到这两个主要的依赖,从名字上基本上可以猜到queue肯定是transport的管道服务,因为之前搭建过tb的集群,所以基本猜到里面可能会有与kafka的管道相关的代码,而mqtt这个依赖基本上可以猜到是一个mqttserver服务,接下来就是分析这两个模块的源码。

分析mqtt模块源码

源码路径位于 ./common/transport/mqtt

果然这个模块就是一个用netty封装的mqttserver,而且还通过配置文件的条件来判断是否要初始化这个mqttserver服务。

package org.thingsboard.server.transport.mqtt;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.ResourceLeakDetector;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;

/**
 * @author Andrew Shvayka
 */
@Service("MqttTransportService")
@ConditionalOnExpression("'${service.type:null}'=='tb-transport' || ('${service.type:null}'=='monolith' && '${transport.mqtt.enabled}'=='true')")
@Slf4j
public class MqttTransportService {

    @Value("${transport.mqtt.bind_address}")
    private String host;
    @Value("${transport.mqtt.bind_port}")
    private Integer port;

    @Value("${transport.mqtt.netty.leak_detector_level}")
    private String leakDetectorLevel;
    @Value("${transport.mqtt.netty.boss_group_thread_count}")
    private Integer bossGroupThreadCount;
    @Value("${transport.mqtt.netty.worker_group_thread_count}")
    private Integer workerGroupThreadCount;
    @Value("${transport.mqtt.netty.so_keep_alive}")
    private boolean keepAlive;

    @Autowired
    private MqttTransportContext context;

    private Channel serverChannel;
    private EventLoopGroup bossGroup;
    private EventLoopGroup workerGroup;

    @PostConstruct
    public void init() throws Exception {
        log.info("Setting resource leak detector level to {}", leakDetectorLevel);
        ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.valueOf(leakDetectorLevel.toUpperCase()));

        log.info("Starting MQTT transport...");
        bossGroup = new NioEventLoopGroup(bossGroupThreadCount);
        workerGroup = new NioEventLoopGroup(workerGroupThreadCount);
        ServerBootstrap b = new ServerBootstrap();
        b.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new MqttTransportServerInitializer(context))
                .childOption(ChannelOption.SO_KEEPALIVE, keepAlive);

        serverChannel = b.bind(host, port).sync().channel();
        log.info("Mqtt transport started!");
    }

    @PreDestroy
    public void shutdown() throws InterruptedException {
        log.info("Stopping MQTT transport!");
        try {
            serverChannel.close().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
        log.info("MQTT transport stopped!");
    }
}

对于熟悉netty的朋友来说这个就比较简单了,就是根据handler来处理接收到的消息,这个其实就没什么好多说了。不熟悉netty的朋友可以自己去百度一下就好了,就是简单的接收到消息之后丢给org.thingsboard.server.transport.mqtt.MqttTransportHandler这个类来处理而已,具体的消息处理则都是通过handler来进行。

分析MqttTransportHandler

消息都是从channelRead方法进入到handler,然后丢到processMqttMsg这个方法进行具体的分类处理。

    private void processMqttMsg(ChannelHandlerContext ctx, MqttMessage msg) {
        address = (InetSocketAddress) ctx.channel().remoteAddress();
        if (msg.fixedHeader() == null) {
            log.info("[{}:{}] Invalid message received", address.getHostName(), address.getPort());
            processDisconnect(ctx);
            return;
        }
        deviceSessionCtx.setChannel(ctx);
        switch (msg.fixedHeader().messageType()) {
            case CONNECT:
                processConnect(ctx, (MqttConnectMessage) msg);
                break;
            case PUBLISH:
                processPublish(ctx, (MqttPublishMessage) msg);
                break;
            case SUBSCRIBE:
                processSubscribe(ctx, (MqttSubscribeMessage) msg);
                break;
            case UNSUBSCRIBE:
                processUnsubscribe(ctx, (MqttUnsubscribeMessage) msg);
                break;
            case PINGREQ:
                if (checkConnected(ctx, msg)) {
                    ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, false, AT_MOST_ONCE, false, 0)));
                    transportService.reportActivity(sessionInfo);
                }
                break;
            case DISCONNECT:
                if (checkConnected(ctx, msg)) {
                    processDisconnect(ctx);
                }
                break;
            default:
                break;
        }
    }

通过这个handler,消息会按照类型来做不通的处理。越往下面看就会有更多的模块之间的依赖,将模块独立出来分析反而不是很方便,所以接下来换一种思路,从一个完整的鉴权流程来分析源码以及模块之间的依赖关系。

分析连接鉴权流程

tb默认的设备鉴权方式是token的方式,所以先挑这个来分析整个的数据上下行流程,因此我们详细分析一下processAuthTokenConnect这个方法,源码如下。

    private void processAuthTokenConnect(ChannelHandlerContext ctx, MqttConnectMessage msg) {
        String userName = msg.payload().userName();
        log.info("[{}] Processing connect msg for client with user name: {}!", sessionId, userName);
        if (StringUtils.isEmpty(userName)) {
            ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD));
            ctx.close();
        } else {
            transportService.process(ValidateDeviceTokenRequestMsg.newBuilder().setToken(userName).build(),
                    new TransportServiceCallback<ValidateDeviceCredentialsResponseMsg>() {
                        @Override
                        public void onSuccess(ValidateDeviceCredentialsResponseMsg msg) {
                            onValidateDeviceResponse(msg, ctx);
                        }

                        @Override
                        public void onError(Throwable e) {
                            log.trace("[{}] Failed to process credentials: {}", address, userName, e);
                            ctx.writeAndFlush(createMqttConnAckMsg(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE));
                            ctx.close();
                        }
                    });
        }
    }

通过将token提取出来,然后拼装成一个ValidateDeviceTokenRequestMsg对象之后调用transportService服务。其中transportService服务的实现类为org.thingsboard.server.common.transport.service.DefaultTransportService,该类属于transport-api模块。

@Slf4j
@Service
@ConditionalOnExpression("'${service.type:null}'=='monolith' || '${service.type:null}'=='tb-transport'")
public class DefaultTransportService implements TransportService {

    @Value("${transport.rate_limits.enabled}")
    private boolean rateLimitEnabled;
    @Value("${transport.rate_limits.tenant}")

通过process将消息发送出去,process的时候又调用了transportApiRequestTemplate这个对象

@Override
    public void process(TransportProtos.ValidateDeviceTokenRequestMsg msg, TransportServiceCallback<TransportProtos.ValidateDeviceCredentialsResponseMsg> callback) {
        log.trace("Processing msg: {}", msg);
        TbProtoQueueMsg<TransportApiRequestMsg> protoMsg = new TbProtoQueueMsg<>(UUID.randomUUID(), TransportApiRequestMsg.newBuilder().setValidateTokenRequestMsg(msg).build());
        AsyncCallbackTemplate.withCallback(transportApiRequestTemplate.send(protoMsg),
                response -> callback.onSuccess(response.getValue().getValidateTokenResponseMsg()), callback::onError, transportCallbackExecutor);
    }

这个对象对应的实现类位于queue模块,org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate,他的send方法会返回一个ListenableFuture

@Override
    public ListenableFuture<Response> send(Request request) {
        if (tickSize > maxPendingRequests) {
            return Futures.immediateFailedFuture(new RuntimeException("Pending request map is full!"));
        }
        UUID requestId = UUID.randomUUID();
        request.getHeaders().put(REQUEST_ID_HEADER, uuidToBytes(requestId));
        request.getHeaders().put(RESPONSE_TOPIC_HEADER, stringToBytes(responseTemplate.getTopic()));
        request.getHeaders().put(REQUEST_TIME, longToBytes(System.currentTimeMillis()));
        SettableFuture<Response> future = SettableFuture.create();
        ResponseMetaData<Response> responseMetaData = new ResponseMetaData<>(tickTs + maxRequestTimeout, future);
        pendingRequests.putIfAbsent(requestId, responseMetaData);
        log.trace("[{}] Sending request, key [{}], expTime [{}]", requestId, request.getKey(), responseMetaData.expTime);
        requestTemplate.send(TopicPartitionInfo.builder().topic(requestTemplate.getDefaultTopic()).build(), request, new TbQueueCallback() {
            @Override
            public void onSuccess(TbQueueMsgMetadata metadata) {
                log.trace("[{}] Request sent: {}", requestId, metadata);
            }

            @Override
            public void onFailure(Throwable t) {
                pendingRequests.remove(requestId);
                future.setException(t);
            }
        });
        return future;
    }

send方法通过requestTemplate进行消息的发送,因为使用的是Kafka作为消息管道,所以其实现类为queue模块的org.thingsboard.server.queue.kafka.TbKafkaProducerTemplate类,调用其send方法将消息真实发送出去,其中producer就是一个kafka的producer实现,最后将信息发送到tb_transport.api.requests这个topical中。

@Override
    public void send(TopicPartitionInfo tpi, T msg, TbQueueCallback callback) {
        createTopicIfNotExist(tpi);
        String key = msg.getKey().toString();
        byte[] data = msg.getData();
        ProducerRecord<String, byte[]> record;
        Iterable<Header> headers = msg.getHeaders().getData().entrySet().stream().map(e -> new RecordHeader(e.getKey(), e.getValue())).collect(Collectors.toList());
        record = new ProducerRecord<>(tpi.getFullTopicName(), null, key, data, headers);
        producer.send(record, (metadata, exception) -> {
            if (exception == null) {
                if (callback != null) {
                    callback.onSuccess(new KafkaTbQueueMsgMetadata(metadata));
                }
            } else {
                if (callback != null) {
                    callback.onFailure(exception);
                } else {
                    log.warn("Producer template failure: {}", exception.getMessage(), exception);
                }
            }
        });
    }

乍看至下认证的流程已经完成了,其实仔细一看里面问题还很多,首先需要问的是这个认证信息是如何返回给mqtt-transport的?

像kafka这类消息管道都是异步进行数据的传输的,所以发送消息之后最多会收到一个是否发送成功的信息,实际上是没有认证的返回信息的,但是在MqttTransportHandler的processAuthTokenConnect方法里面明确是有对返回信息做进一步核实的代码的,那这个返回信息到底是怎么来的呢?

如何拿到认证的返回信息

在tb的代码中有很多类似的异步操作,并且都是要对返回信息进行确认的。比如这个认证信息,其实是通过queue模块的org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate方法来实现的,下面是处理的源码

    @Override
    public void init() {
        queueAdmin.createTopicIfNotExists(responseTemplate.getTopic());
        this.requestTemplate.init();
        tickTs = System.currentTimeMillis();
        responseTemplate.subscribe();
        executor.submit(() -> {
            long nextCleanupMs = 0L;
            while (!stopped) {
                try {
                    List<Response> responses = responseTemplate.poll(pollInterval);
                    if (responses.size() > 0) {
                        log.trace("Polling responses completed, consumer records count [{}]", responses.size());
                    } else {
                        continue;
                    }
                    responses.forEach(response -> {
                        byte[] requestIdHeader = response.getHeaders().get(REQUEST_ID_HEADER);
                        UUID requestId;
                        if (requestIdHeader == null) {
                            log.error("[{}] Missing requestId in header and body", response);
                        } else {
                            requestId = bytesToUuid(requestIdHeader);
                            log.trace("[{}] Response received: {}", requestId, response);
                            ResponseMetaData<Response> expectedResponse = pendingRequests.remove(requestId);
                            if (expectedResponse == null) {
                                log.trace("[{}] Invalid or stale request", requestId);
                            } else {
                                expectedResponse.future.set(response);
                            }
                        }
                    });
                    responseTemplate.commit();
                    tickTs = System.currentTimeMillis();
                    tickSize = pendingRequests.size();
                    if (nextCleanupMs < tickTs) {
                        //cleanup;
                        pendingRequests.forEach((key, value) -> {
                            if (value.expTime < tickTs) {
                                ResponseMetaData<Response> staleRequest = pendingRequests.remove(key);
                                if (staleRequest != null) {
                                    log.trace("[{}] Request timeout detected, expTime [{}], tickTs [{}]", key, staleRequest.expTime, tickTs);
                                    staleRequest.future.setException(new TimeoutException());
                                }
                            }
                        });
                        nextCleanupMs = tickTs + maxRequestTimeout;
                    }
                } catch (Throwable e) {
                    log.warn("Failed to obtain responses from queue.", e);
                    try {
                        Thread.sleep(pollInterval);
                    } catch (InterruptedException e2) {
                        log.trace("Failed to wait until the server has capacity to handle new responses", e2);
                    }
                }
            }
        });
    }

服务启动的时候会创建一个response对应topical的消费者,对应的topical格式为tb_transport.api.responses.hostname,然后从这个topical拉取消息。

DefaultTbQueueRequestTemplate.send()方法在发送的时候会将当前需要发送的消息存入pendingRequests对象,以requestId为唯一ID,将返回的Future装箱之后作为value。当从topical中获取到消息,且ID可以对应上的时候会将pendingRequests对应key的value的状态设置为已完成,然后出发回调。

回调之后就可以拿到core返回的认证信息了,这样就完成了一个完整的认证流程。



作者:哦呵呵_3579
链接:https://www.jianshu.com/p/33ac5d394f68
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

最后

以上就是无辜小馒头为你收集整理的Thingsboard源码探索(1)初识项目分析服务引用的全部内容,希望文章能够帮你解决Thingsboard源码探索(1)初识项目分析服务引用所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(56)

评论列表共有 0 条评论

立即
投稿
返回
顶部