概述
1. 概述
最近在研究pulsar的权限验证部分,权限验证包含两部分:
- 客户端连接
- 客户端的访问控制
pulsar提供了authentication和authorization两种方式实现上述两个功能,每一种方式提供了接口。
2. 版本
pulsar 2.8.0
3. 客户端连接验证
默认情况下,pulsar是不会开启连接验证的,即客户端到broker之间、broker到broker之间的访问都没有任何限制。但是在线上环境中,对于权限的控制往往是很重要的。
3.1 在broker.conf文件中开启客户端连接认证
# Enable authentication
authenticationEnabled=true
# Authentication provider name list, which is comma separated list of class names
# 可以提供N个处理验证的处理类,然后broker接收到客户端连接后就会调用此类的方法进行处理
authenticationProviders=auth.server.VVAuthenticationProvider
# Interval of time for checking for expired authentication credentials
authenticationRefreshCheckSeconds=60
# Authentication settings of the broker itself. Used when the broker connects to other brokers,
# either in same or other clusters
brokerClientTlsEnabled=false
# 这里是broker之间连接时,broker客户端用到的处理类。通常可以和客户端的认证处理类一样。
# 正式项目中,需要在broker端判断是哪一种连接,分别做好权限认证。
brokerClientAuthenticationPlugin=auth.client.VVAuthentication
brokerClientAuthenticationParameters=
brokerClientTrustCertsFilePath=
3.2 实现连接验证处理类
权限验证分为客户端和服务端,服务端即broker,客户端即我们自己编写的producer或者consumer。服务端需要实现org.apache.pulsar.broker.authentication.AuthenticationProvider
接口,客户端需要实现org.apache.pulsar.client.api.Authentication
。
下面分别给出服务端和客户端的代码。
服务端认证代码
package auth.server;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authentication.AuthenticationProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.naming.AuthenticationException;
import java.io.IOException;
import java.util.Set;
/**
* @author cc
* @function
* @date 2021/7/27 14:19
*/
public class VVAuthenticationProvider implements AuthenticationProvider {
private static final Logger log = LoggerFactory.getLogger(VVAuthenticationProvider.class);
private static final String methodName = "vv_auth_v2";
private String header = "vv_auth";
@Override
public void initialize(ServiceConfiguration config) throws IOException {
log.info(methodName + " initialize");
if (config == null) {
return;
}
Set<String> superRoles = config.getSuperUserRoles();
if (superRoles == null) {
return;
}
for (String role : superRoles) {
log.info(methodName + " initialize " + role);
}
}
@Override
public String getAuthMethodName() {
log.info(methodName + " getAuthMethodName");
return methodName;
}
@Override
public String authenticate(AuthenticationDataSource authData) throws AuthenticationException {
log.info(methodName + " authenticate");
String roleToken = "unknown";
if (authData.hasDataFromCommand()) {
roleToken = authData.getCommandData();
} else if (authData.hasDataFromHttp()) {
roleToken = authData.getHttpHeader(header);
} else {
throw new AuthenticationException("Authentication data source does not have a role token");
}
log.info(methodName + " authenticate " + roleToken);
return roleToken;
}
@Override
public void close() throws IOException {
log.info(methodName + " close");
}
}
客户端连接认证代码
package auth.client;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
import org.apache.pulsar.client.api.PulsarClientException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Map;
/**
* @author cc
* @function
* @date 2021/7/27 14:00
*/
public class VVAuthentication implements Authentication {
private static final Logger log = LoggerFactory.getLogger(VVAuthentication.class);
private static final String methodName = "vv_auth_v2";
@Override
public String getAuthMethodName() {
log.info(methodName + " getAuthMethodName");
return methodName;
}
@Override
public AuthenticationDataProvider getAuthData() throws PulsarClientException {
log.info(methodName + " getAuthData");
return new VVAuthenticationDataProvider();
}
@Override
public void configure(Map<String, String> authParams) {
log.info(methodName + " configure");
if (authParams == null) {
return;
}
authParams.forEach((key, value) -> {
log.info(methodName + " configure " + key + "=" + value);
});
}
@Override
public void start() throws PulsarClientException {
log.info(methodName + " start");
}
@Override
public void close() throws IOException {
log.info(methodName + " close");
}
}
package auth.client;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
/**
* @author cc
* @function
* @date 2021/7/27 14:08
*/
public class VVAuthenticationDataProvider implements AuthenticationDataProvider {
private static final Logger log = LoggerFactory.getLogger(VVAuthenticationDataProvider.class);
private static final String methodName = "vv_auth_v2";
private String header = "vv_auth";
private String token = "vv-role";
@Override
public boolean hasDataForHttp() {
log.info(methodName + " hasDataForHttp");
return true;
}
@Override
public Set<Map.Entry<String, String>> getHttpHeaders() throws Exception {
log.info(methodName + " getHttpHeaders");
Map<String, String> headers = new HashMap<>();
headers.put(header, token);
return headers.entrySet();
}
@Override
public boolean hasDataFromCommand() {
log.info(methodName + " hasDataFromCommand");
return true;
}
@Override
public String getCommandData() {
log.info(methodName + " getCommandData");
return token;
}
}
编写好代码后用maven打包,然后放到pulsar的lib下,重启broker组件即可。
4. 测试
package auth;
import auth.client.VVAuthentication;
import org.apache.pulsar.client.api.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author cc
* @function
* @date 2021/7/19 10:47
*/
public class AuthTest {
private static final Logger log = LoggerFactory.getLogger(AuthTest.class);
public static void main(String[] args) throws Exception {
AuthTest main = new AuthTest();
main.run();
}
String pulsarUrl = "pulsar://x.x.x.x:6650";
String topic = "persistent://tenant_vv/ns1/auth_test";
Authentication authentication = new VVAuthentication();
void run() throws Exception {
PulsarClient client = PulsarClient.builder()
.authentication(authentication)
.serviceUrl(pulsarUrl)
.build();
send(client);
consume(client);
System.out.println("connect successed ");
client.close();
}
void consume(PulsarClient client) throws Exception {
Consumer consumer = client.newConsumer()
.topic(topic)
.subscriptionName("consumer-test")
.subscribe();
while (true) {
Message m = consumer.receive();
if (m != null) {
log.info("recv " + new String(m.getData()));
consumer.acknowledge(m);
} else {
break;
}
}
}
void send(PulsarClient client) throws Exception {
Producer p = client.newProducer()
.topic(topic)
.create();
for (int i=0; i<10; i++) {
p.newMessage().key("aaa").value(("hello " + i).getBytes()).send();
log.info("send " + i);
Thread.sleep(1000);
}
p.flush();
p.close();
System.out.println("send done");
}
}
最后
以上就是俊逸自行车为你收集整理的pulsar的客户端权限控制功能(一)的全部内容,希望文章能够帮你解决pulsar的客户端权限控制功能(一)所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复