我是靠谱客的博主 俊逸自行车,最近开发中收集的这篇文章主要介绍pulsar的客户端权限控制功能(一),觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

1. 概述

最近在研究pulsar的权限验证部分,权限验证包含两部分:

  • 客户端连接
  • 客户端的访问控制

pulsar提供了authentication和authorization两种方式实现上述两个功能,每一种方式提供了接口。

2. 版本

pulsar 2.8.0

3. 客户端连接验证

默认情况下,pulsar是不会开启连接验证的,即客户端到broker之间、broker到broker之间的访问都没有任何限制。但是在线上环境中,对于权限的控制往往是很重要的。

3.1 在broker.conf文件中开启客户端连接认证

# Enable authentication
authenticationEnabled=true

# Authentication provider name list, which is comma separated list of class names
# 可以提供N个处理验证的处理类,然后broker接收到客户端连接后就会调用此类的方法进行处理
authenticationProviders=auth.server.VVAuthenticationProvider

# Interval of time for checking for expired authentication credentials
authenticationRefreshCheckSeconds=60

# Authentication settings of the broker itself. Used when the broker connects to other brokers,
# either in same or other clusters
brokerClientTlsEnabled=false
# 这里是broker之间连接时,broker客户端用到的处理类。通常可以和客户端的认证处理类一样。
# 正式项目中,需要在broker端判断是哪一种连接,分别做好权限认证。
brokerClientAuthenticationPlugin=auth.client.VVAuthentication
brokerClientAuthenticationParameters=
brokerClientTrustCertsFilePath=

3.2 实现连接验证处理类

权限验证分为客户端和服务端,服务端即broker,客户端即我们自己编写的producer或者consumer。服务端需要实现org.apache.pulsar.broker.authentication.AuthenticationProvider接口,客户端需要实现org.apache.pulsar.client.api.Authentication
下面分别给出服务端和客户端的代码。

服务端认证代码
package auth.server;

import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authentication.AuthenticationProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.naming.AuthenticationException;
import java.io.IOException;
import java.util.Set;

/**
 * @author cc
 * @function
 * @date 2021/7/27 14:19
 */
public class VVAuthenticationProvider implements AuthenticationProvider {
    private static final Logger log = LoggerFactory.getLogger(VVAuthenticationProvider.class);
    private static final String methodName = "vv_auth_v2";

    private String header = "vv_auth";

    @Override
    public void initialize(ServiceConfiguration config) throws IOException {
        log.info(methodName + " initialize");
        if (config == null) {
            return;
        }

        Set<String> superRoles = config.getSuperUserRoles();
        if (superRoles == null) {
            return;
        }
        for (String role : superRoles) {
            log.info(methodName + " initialize " + role);
        }
    }

    @Override
    public String getAuthMethodName() {
        log.info(methodName + " getAuthMethodName");
        return methodName;
    }

    @Override
    public String authenticate(AuthenticationDataSource authData) throws AuthenticationException {
        log.info(methodName + " authenticate");

        String roleToken = "unknown";
        if (authData.hasDataFromCommand()) {
            roleToken = authData.getCommandData();
        } else if (authData.hasDataFromHttp()) {
            roleToken = authData.getHttpHeader(header);
        } else {
            throw new AuthenticationException("Authentication data source does not have a role token");
        }

        log.info(methodName + " authenticate " + roleToken);
        return roleToken;
    }

    @Override
    public void close() throws IOException {
        log.info(methodName + " close");
    }
}


客户端连接认证代码
package auth.client;

import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
import org.apache.pulsar.client.api.PulsarClientException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Map;

/**
 * @author cc
 * @function
 * @date 2021/7/27 14:00
 */
public class VVAuthentication implements Authentication {
    private static final Logger log = LoggerFactory.getLogger(VVAuthentication.class);
    private static final String methodName = "vv_auth_v2";

    @Override
    public String getAuthMethodName() {
        log.info(methodName + " getAuthMethodName");
        return methodName;
    }

    @Override
    public AuthenticationDataProvider getAuthData() throws PulsarClientException {
        log.info(methodName + " getAuthData");
        return new VVAuthenticationDataProvider();
    }

    @Override
    public void configure(Map<String, String> authParams) {
        log.info(methodName + " configure");
        if (authParams == null) {
            return;
        }

        authParams.forEach((key, value) -> {
            log.info(methodName + " configure " + key + "=" + value);
        });
    }

    @Override
    public void start() throws PulsarClientException {
        log.info(methodName + " start");
    }

    @Override
    public void close() throws IOException {
        log.info(methodName + " close");
    }
}
package auth.client;

import org.apache.pulsar.client.api.AuthenticationDataProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.Map;
import java.util.Set;

/**
 * @author cc
 * @function
 * @date 2021/7/27 14:08
 */
public class VVAuthenticationDataProvider implements AuthenticationDataProvider {
    private static final Logger log = LoggerFactory.getLogger(VVAuthenticationDataProvider.class);
    private static final String methodName = "vv_auth_v2";

    private String header = "vv_auth";
    private String token = "vv-role";

    @Override
    public boolean hasDataForHttp() {
        log.info(methodName + " hasDataForHttp");
        return true;
    }

    @Override
    public Set<Map.Entry<String, String>> getHttpHeaders() throws Exception {
        log.info(methodName + " getHttpHeaders");
        Map<String, String> headers = new HashMap<>();
        headers.put(header, token);
        return headers.entrySet();

    }

    @Override
    public boolean hasDataFromCommand() {
        log.info(methodName + " hasDataFromCommand");
        return true;
    }

    @Override
    public String getCommandData() {
        log.info(methodName + " getCommandData");
        return token;
    }
}

编写好代码后用maven打包,然后放到pulsar的lib下,重启broker组件即可。

4. 测试

package auth;

import auth.client.VVAuthentication;
import org.apache.pulsar.client.api.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
 * @author cc
 * @function
 * @date 2021/7/19 10:47
 */
public class AuthTest {
    private static final Logger log = LoggerFactory.getLogger(AuthTest.class);

    public static void main(String[] args) throws Exception {
        AuthTest main = new AuthTest();
        main.run();
    }

    String pulsarUrl = "pulsar://x.x.x.x:6650";
    String topic = "persistent://tenant_vv/ns1/auth_test";

    Authentication authentication = new VVAuthentication();

    void run() throws Exception {
        PulsarClient client = PulsarClient.builder()
                .authentication(authentication)
                .serviceUrl(pulsarUrl)
                .build();
        send(client);
        consume(client);

        System.out.println("connect successed ");

        client.close();
    }

    void consume(PulsarClient client) throws Exception {
        Consumer consumer = client.newConsumer()
                .topic(topic)
                .subscriptionName("consumer-test")
                .subscribe();

        while (true) {
            Message m = consumer.receive();
            if (m != null) {
                log.info("recv " + new String(m.getData()));
                consumer.acknowledge(m);
            } else {
                break;
            }
        }
    }

    void send(PulsarClient client) throws Exception {
        Producer p = client.newProducer()
                .topic(topic)
                .create();

        for (int i=0; i<10; i++) {
            p.newMessage().key("aaa").value(("hello " + i).getBytes()).send();
            log.info("send " + i);
            Thread.sleep(1000);
        }
        p.flush();
        p.close();
        System.out.println("send done");
    }

}

最后

以上就是俊逸自行车为你收集整理的pulsar的客户端权限控制功能(一)的全部内容,希望文章能够帮你解决pulsar的客户端权限控制功能(一)所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(63)

评论列表共有 0 条评论

立即
投稿
返回
顶部