概述
代码连接Kerberos认证的CDH
本章主要介绍通过编写java代码连接有Kerberos认证的CDH(5.14.2)(HDFS、HBase、Hive、Kafka)等。编写代码时要检查principal和keytab是否匹配、krb5.conf是否正确。我的Kerberos如下:
- keytab : hadoop_node1.keytab
- principal : hadoop/cdh-node1@HADOOP.COM
后面我会使用这个principal连接到HDFS、HBase、Hive、Kafka等组件上。首先我们新建项目并引入依赖:
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hive/hive-jdbc -->
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>1.1.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-client -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.2.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.0.1</version>
</dependency>
集群连接
- HDFS连接、HBase连接
- Hive连接
- Kafka连接
HDFS连接、HBase连接
通过Kerberos验证的方式很简单,根据示例主要是以下代码:
//KrbConf 是krb5.conf的路径
System.setProperty("java.security.krb5.conf", KrbConf);
//conf中包涵HDFS/hbase的配置信息
UserGroupInformation.setConfiguration(conf);
//principal ,keytab 的路径
UserGroupInformation.loginUserFromKeytab(KrbUser, KrbKey);
我看有其它的示例会使用 conf.set(key, value) 加入一些配置信息,但是我是直接读取了core-site.xml、hdfs-site.xml、hbase-site.xml等配置文件,这样不会出现缺少参数导致认证失败。
- 编写通过Kerberos认证代码:
package com.gonghf.kerberos.logUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.security.UserGroupInformation;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
public class KerberosUtil {
/**
* @param KrbUser 需要通过认证的principal
* @param KrbKey keytab的路径
* @throws IOException
*/
public static void loginKerberos(Configuration conf, String KrbUser, String KrbKey) throws IOException {
String userdir = System.getProperty("user.dir") + File.separator + "config" + File.separator;
String KrbConf = userdir + "/kerberos/krb5.conf";
//设置krb5.conf的路径
System.setProperty("java.security.krb5.conf", KrbConf);
try {
//读取core-site.xml、hdfs-site.xml配置文件
conf.addResource(new FileInputStream(userdir + "hadoop/core-site.xml"));
conf.addResource(new FileInputStream(userdir + "hadoop/hdfs-site.xml"));
if (conf instanceof HBaseConfiguration) {
//如果是hbase连接读取hbase-site.xml
conf.addResource(new FileInputStream(userdir + "hadoop/hbase-site.xml"));
}
} catch (FileNotFoundException e) {
e.printStackTrace();
}
//kerberos认证
UserGroupInformation.setConfiguration(conf);
UserGroupInformation.loginUserFromKeytab(KrbUser, KrbKey);
}
}
- HDFS 认证测试
package com.gonghf.kerberos.hdfs;
import com.gonghf.kerberos.logUtil.KerberosUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
/**
* @description: HDFS kerberos 测试程序
*/
public class HDFSExample {
private static String KrbKey = "";
private static String KrbUser = "";
//访问的目录
private static String Home = "/hadoopHome";
private static void init() {
String userdir = System.getProperty("user.dir") + File.separator +
"config" + File.separator + "kerberos" + File.separator;
KrbKey = userdir + "hadoop_node1.keytab";
KrbUser = "hadoop/cdh-node1@HADOOP.COM";
}
private static ArrayList<String> getPathFiles(FileSystem fs, String path) {
ArrayList<String> files = null;
try {
FileStatus[] listStatus = fs.listStatus(new Path(path));
files = new ArrayList<String>(20);
for (FileStatus status : listStatus) {
files.add(status.getPath().getName());
}
System.out.println("获取( " + path + " )目录下的文件名称 == " + files);
} catch (IOException e) {
System.out.println("获取(" + path + ")目录下的文件名称失败!");
e.printStackTrace();
} finally {
return files;
}
}
public static void main(String[] args) {
init();
Configuration conf = new Configuration();
FileSystem fs = null;
try {
KerberosUtil.loginKerberos(conf,KrbUser,KrbKey);
System.out.println(KrbUser+"认证成功!");
fs = FileSystem.get(conf);
System.out.println("HDFS文件系统获取成功!");
} catch (IOException e) {
System.out.println(KrbUser+"认证失败");
e.printStackTrace();
}
getPathFiles(fs, Home);
String testdir = Home + "/testdir_";
String testfile = Home + "/testdir_/data.dat";
//创建文件夹
try {
fs.mkdirs(new Path(testdir));
System.out.println("在根目录下添加 " + testdir + " 文件夹!");
getPathFiles(fs, Home);
} catch (IOException e) {
e.printStackTrace();
}
//创建文件并写入数据
try {
boolean bool = fs.createNewFile(new Path(testfile));
if (bool) {
System.out.println("在 " + testdir + " 目录下创建文件 data.dat 成功!");
FSDataOutputStream outputStream = fs.create(new Path(testfile));
outputStream.write("这是一个测试".getBytes());
System.out.println("向 " + testfile + " 文件写入数据成功!");
outputStream.close();
getPathFiles(fs, testdir);
} else {
System.out.println("在 " + testdir + " 目录下创建文件 data.dat 失败!");
}
} catch (IOException e) {
e.printStackTrace();
}
//删除文件夹
try {
fs.delete(new Path(testdir), true);
System.out.println("删除 " + testdir + " 成功!");
getPathFiles(fs, Home);
} catch (IOException e) {
e.printStackTrace();
}
//关闭HDFS连接
try {
fs.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
运行结果:
- HBase 认证测试
package com.gonghf.kerberos.hbase;
import com.gonghf.kerberos.logUtil.KerberosUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
/**
* @description: HBase kerberos验证
*/
public class HBaseExample {
private static String KrbKey = "";
private static String KrbUser = "";
public static Configuration HBaseConf = null;
private static final String ZK_CONNECT_STR = "cdh-node1:2181";
public static void init() {
String userdir = System.getProperty("user.dir") + File.separator + "config" + File.separator + "kerberos" + File.separator;
KrbKey = userdir + "hadoop_node1.keytab";
KrbUser = "hadoop/cdh-node1@HADOOP.COM";
}
//创建表
public static void creatTable(HBaseAdmin admin, String tableName, String[] family) throws Exception {
HTableDescriptor desc = new HTableDescriptor(tableName);
for (int i = 0; i < family.length; i++) {
desc.addFamily(new HColumnDescriptor(family[i]));
}
if (admin.tableExists(tableName)) {
System.out.println("table Exists!");
System.exit(0);
} else {
admin.createTable(desc);
}
}
//删除表
public static void deleteTable(HBaseAdmin admin, String tableName) throws IOException {
admin.disableTable(tableName);
admin.deleteTable(tableName);
}
public static void main(String[] args) {
init();
HBaseConf = new HBaseConfiguration();
HBaseConf.set("hbase.zookeeper.quorum", ZK_CONNECT_STR);
HBaseAdmin admin = null;
try {
KerberosUtil.loginKerberos(HBaseConf, KrbUser, KrbKey);
admin = new HBaseAdmin(HBaseConf);
System.out.println(KrbUser + "认证成功!");
} catch (IOException e) {
System.out.println(KrbUser + "认证失败");
e.printStackTrace();
}
String tableName = "testtable";
String[] tableFamily = {"data1", "data2"};
try {
creatTable(admin, tableName, tableFamily);
System.out.println(tableName + "创建成功!");
} catch (Exception e) {
System.out.println(tableName + "创建失败!");
e.printStackTrace();
}
try {
//获取所有表名
String[] tableNames = admin.getTableNames();
System.out.println("Tables=" + Arrays.toString(tableNames));
} catch (IOException e) {
e.printStackTrace();
}
try {
deleteTable(admin, tableName);
System.out.println(tableName + "删除成功!");
} catch (IOException e) {
System.out.println(tableName + "删除失败!");
e.printStackTrace();
}
}
}
运行结果:
Hive连接
我的hive的测试时采用JDBC的方式去连接HiveServer2,因为CDH在加入Kerberos是会自动的增加一些配置,如图(hive-site.xml):
vim /opt/cloudera-manager/cm-5.14.2/run/cloudera-scm-agent/process/218-hive-HIVESERVER2/hive-site.xml
hive.server2.authentication.kerberos.principal这个配置会限制连接HiveServer2的principal为hive;
那如果我想用hadoop用户去连接HiveServer2是就会出现报错:
其实实现hadoop用户去连接HiveServer的过程是:
- 用hadoop/cdh-node1@HADOOP.COM做Kerberos认证。
- 在HiveServer2的url中principal为hive/cdh-node1@HADOOP.COM。
package com.gonghf.kerberos.hive;
import com.gonghf.kerberos.logUtil.KerberosUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import java.io.File;
import java.io.IOException;
import java.sql.*;
/**
* @description: HiveServer2的测试类
*/
public class HiveServer2Example {
private static String HiveUrl = "jdbc:hive2://192.168.30.173:10000/default;principal=hive/cdh-node1@HADOOP.COM";
private static final String HIVE_DRIVER = "org.apache.hive.jdbc.HiveDriver";
private static String KrbKey = "";
private static String KrbUser = "";
public static void init() {
String userdir = System.getProperty("user.dir") + File.separator + "config" + File.separator + "kerberos" + File.separator;
KrbKey = userdir + "hadoop_node1.keytab";
KrbUser = "hadoop/cdh-node1@HADOOP.COM";
}
public static void main(String[] args) {
// 定义HQL,HQL为单条语句,不能包含“;”
String[] sqls = {
"CREATE TABLE IF NOT EXISTS employees_info(id INT,name STRING)",
"SELECT COUNT(*) FROM employees_info",
"DROP TABLE employees_info"
};
init();
Configuration conf = new HiveConf();
try {
KerberosUtil.loginKerberos(conf, KrbUser, KrbKey);
System.out.println(KrbUser+"认证成功!");
} catch (IOException e) {
System.out.println(KrbUser+"认证失败");
e.printStackTrace();
}
Connection connection = null;
try {
Class.forName(HIVE_DRIVER);
connection = DriverManager.getConnection(HiveUrl,"","");
execDDL(connection, sqls[0]);
System.out.println("Create table success!");
// 查询
execDML(connection, sqls[1]);
// 删表
execDDL(connection, sqls[2]);
System.out.println("Delete table success!");
} catch (ClassNotFoundException e) {
e.printStackTrace();
} catch (SQLException e) {
e.printStackTrace();
} finally {
if(connection != null){
try {
connection.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
}
public static void execDDL(Connection connection, String sql) throws SQLException {
PreparedStatement statement = null;
try {
statement = connection.prepareStatement(sql);
statement.execute();
} finally {
if (null != statement) {
statement.close();
}
}
}
public static void execDML(Connection connection, String sql) throws SQLException {
PreparedStatement statement = null;
ResultSet resultSet = null;
ResultSetMetaData resultMetaData = null;
try {
// 执行HQL
statement = connection.prepareStatement(sql);
resultSet = statement.executeQuery();
// 输出查询的列名到控制台
resultMetaData = resultSet.getMetaData();
int columnCount = resultMetaData.getColumnCount();
for (int i = 1; i <= columnCount; i++) {
System.out.print(resultMetaData.getColumnLabel(i) + 't');
}
System.out.println();
// 输出查询结果到控制台
while (resultSet.next()) {
for (int i = 1; i <= columnCount; i++) {
System.out.print(resultSet.getString(i) + 't');
}
System.out.println();
}
} finally {
if (null != resultSet) {
resultSet.close();
}
if (null != statement) {
statement.close();
}
}
}
}
Kafka连接
Kafka主要是消费者的连接和生产者的连接需要通过Kerberos认证,其实和HDFS、HBase等认证方式一样都是通过JAAS进行认证的,只是代码的写法不一样。当然我们只需要在上层调用kafka API就可以实现通过JAAS进行Kerberos认证。
首先我说一下Kafka通过Kerberos认证的方法(以成产者为例):
- 编写 jaas.conf 文件,注意keyTab为绝对路径,切不能出现 “” 意思是路径不能写为E:IDEA_HOMEkerberostest…
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="E:/IDEA_HOME/kerberostest/config/kerberos/hadoop_node1.keytab"
principal="hadoop/cdh-node1@HADOOP.COM";
};
- 将该文件的绝对路径添加到环境变量中
System.setProperty("java.security.auth.login.config", jaasPath);
- 通过KafkaProducer连接
Properties prop = new Properties();
prop.put(ConsumerConfig.GROUP_ID_CONFIG, "DemoConsumer");
prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
prop.put("security.protocol", "SASL_PLAINTEXT");
prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
prop.put("sasl.kerberos.service.name", "kafka");
Producer<String, String> producer = new KafkaProducer<String, String>(prop);
通过上面三步就可以用代码连接Kafka为所欲为了!!!下面我贴出我的示例,KafkaProperties、ProducerTest、ComsumerTest。
- KafkaProperties :Kafka、kerberos的配置类
package com.gonghf.kerberos.kafka;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.Properties;
/**
* @description: Kafka的配置类
*/
public class KafkaProperties extends Properties {
private Properties properties;
private static final String JAAS_TEMPLATE =
"KafkaClient {n" +
"com.sun.security.auth.module.Krb5LoginModule requiredn" +
"useKeyTab=truen" +
"keyTab="%1$s"n" +
"principal="%2$s";n" +
"};";
public KafkaProperties() {
properties = new Properties();
}
public KafkaProperties self() {
return this;
}
public Properties getProperties() {
return properties;
}
public KafkaProperties put(String key, String value) {
if (properties == null) {
properties = new Properties();
}
properties.put(key, value);
return self();
}
public static KafkaProperties initKerberos() {
return new KafkaProperties()
.put(ConsumerConfig.GROUP_ID_CONFIG, "DemoConsumer")
.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
.put("security.protocol", "SASL_PLAINTEXT")
.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
.put("sasl.kerberos.service.name", "kafka");
}
//生成jaas.conf临时文件
public static void configureJAAS(String keyTab, String principal) {
String content = String.format(JAAS_TEMPLATE, keyTab, principal).replaceAll("\\", "/");
System.out.println(content);
File jaasConf = null;
PrintWriter writer = null;
try {
jaasConf = File.createTempFile("jaas", ".conf");
writer = new PrintWriter(jaasConf);
writer.println(content);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally {
if (writer != null) {
writer.close();
}
jaasConf.deleteOnExit();
}
System.setProperty("java.security.auth.login.config", jaasConf.getAbsolutePath());
System.setProperty("javax.security.auth.useSubjectCredsOnly", "false");
}
}
- ProducerTest :生产者测试代码
package com.gonghf.kerberos.kafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.io.File;
import java.util.UUID;
/**
* @description: kafka生产者测试程序
*/
public class ProducerTest {
//发送的topic
public static String TOPIC_NAME = "TopicMessage";
private static String KrbKey = "";
private static String KrbUser = "";
private static String KrbConf = "";
public static void init() {
String userdir = System.getProperty("user.dir") + File.separator + "config" + File.separator + "kerberos" + File.separator;
KrbKey = userdir + "hadoop_node1.keytab";
KrbUser = "hadoop/cdh-node1@HADOOP.COM";
KrbConf = userdir + "krb5.conf";
System.setProperty("java.security.krb5.conf", KrbConf);
}
public static void main(String[] args) {
init();
KafkaProperties.configureJAAS(KrbKey, KrbUser);
System.out.println("Kerberos配置加载完成!");
//初始化kerberos
KafkaProperties props = KafkaProperties.initKerberos();
//kafka brokers地址
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "cdh-node1:9092,cdh-node2:9092");
Producer<String, String> producer = new KafkaProducer<String, String>(props.getProperties());
System.out.println("kerberos认证成功!(生产者)");
while (true) {
System.out.println("Topic---------------:" + TOPIC_NAME);
for (int i = 0; i < 10; i++) {
String key = UUID.randomUUID().toString() + i;
String message = UUID.randomUUID().toString() + i;
ProducerRecord record = new ProducerRecord<String, String>(TOPIC_NAME, key, message);
producer.send(record);
System.out.println(
"写入数据:" +
"Key=" + key + "t" +
"Value=" + message
);
}
System.out.println();
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
- ComsumerTest :消费者测试代码
package com.gonghf.kerberos.kafka;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import java.io.File;
import java.util.Arrays;
/**
* @description: kafka消费者测试程序
*/
public class ComsumerTest {
//接受的Topic
public static String TOPIC_NAME = "TopicMessage";
private static String KrbKey = "";
private static String KrbUser = "";
private static String KrbConf = "";
public static void init() {
String userdir = System.getProperty("user.dir") + File.separator + "config" + File.separator + "kerberos" + File.separator;
KrbKey = userdir + "hadoop_node1.keytab";
KrbUser = "hadoop/cdh-node1@HADOOP.COM";
KrbConf = userdir + "krb5.conf";
System.setProperty("java.security.krb5.conf", KrbConf);
}
public static void main(String[] args) {
init();
KafkaProperties.configureJAAS(KrbKey, KrbUser);
System.out.println("Kerberos配置加载完成!");
KafkaProperties props = KafkaProperties.initKerberos();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "cdh-node1:9092,cdh-node2:9092");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props.getProperties());
System.out.println("kerberos认证成功!(消费者)");
consumer.subscribe(Arrays.asList(TOPIC_NAME));
//读取 TOPIC_NAME 中的数据
while (true) {
ConsumerRecords<String, String> records = consumer.poll(10);
for (ConsumerRecord<String, String> record : records) {
System.out.println(
"Topic=" + record.topic() + "t" +
"Partition=" + record.partition() + "t" +
"Key=" + record.key() + "t" +
"Value=" + record.value() + "t" +
"Offset=" + record.offset()
);
}
}
}
}
先运行ComsumerTest 在运行ComsumerTest:
最后
以上就是安静滑板为你收集整理的代码连接Kerberos认证的CDH代码连接Kerberos认证的CDH的全部内容,希望文章能够帮你解决代码连接Kerberos认证的CDH代码连接Kerberos认证的CDH所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复