我是靠谱客的博主 忐忑大碗,这篇文章主要介绍kafka数据导入hbase,现在分享给大家,希望可以做个参考。

我们在使用kafka处理数据的过程中会使用kafka跟一下数据库进行交互,Hbase就是其中的一种。下面给大家介绍一下kafka中的数据是如何导入Hbase的。

 

本文的思路是通过consumers把数据消费到Hbase中。

 

首先在Hbase中创建表,创建表可以在Hbase客户端创建也可以通过API创建,这里介绍通过API创建表的方法:

 

创建CreatTableTest类

 

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.client.HBaseAdmin; public class CreatTableTest { public static void main(String[] args) throws IOException { //设置HBase据库的连接配置参数 Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", "192.168.5.128"); // Zookeeper的地址 conf.set("hbase.zookeeper.property.clientPort", "42182"); String tableName = "emp"; String[] family = { "basicinfo","deptinfo"}; HBaseAdmin hbaseAdmin = new HBaseAdmin(conf); //创建表对象 HTableDescriptor hbaseTableDesc = new HTableDescriptor(tableName); for(int i = 0; i < family.length; i++) { //设置表字段 hbaseTableDesc.addFamily(new HColumnDescriptor(family[i])); } //判断表是否存在,不存在则创建,存在则打印提示信息 if(hbaseAdmin.tableExists(tableName)) { System.out.println("TableExists!"); System.exit(0); } else{ hbaseAdmin.createTable(hbaseTableDesc); System.out.println("Create table Success!"); } } }

 

 

创建表之后我们创建一个consumer来消费数据到Hbase中

 

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
import java.io.IOException; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import com.teamsun.kafka.m001.KafkaProperties; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; public class KafkaConsumer3 extends Thread { private final ConsumerConnector consumer; private final String topic; public KafkaConsumer3(String topic) { consumer = kafka.consumer.Consumer .createJavaConsumerConnector(createConsumerConfig()); this.topic = topic; } private static ConsumerConfig createConsumerConfig() { Properties props = new Properties(); props.put("zookeeper.connect", KafkaProperties.zkConnect); props.put("group.id", KafkaProperties.groupId1); props.put("zookeeper.session.timeout.ms", "40000"); props.put("zookeeper.sync.time.ms", "200"); props.put("auto.commit.interval.ms", "1000"); return new ConsumerConfig(props); } @Override public void run() { Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, new Integer(1)); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer .createMessageStreams(topicCountMap); KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0); ConsumerIterator<byte[], byte[]> it = stream.iterator(); HBaseUtils hbase = new HBaseUtils(); while (it.hasNext()) { System.out.println("3receive:" + new String(it.next().message())); try { hbase.put(new String(it.next().message())); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } // try { // sleep(300); // 每条消息延迟300ms // } catch (InterruptedException e) { // e.printStackTrace(); // } } } }

 

再创建一个HBaseUtils来指定要连接的Hbase数据库

 

 

 

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import java.io.IOException; import java.util.Random; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.util.Bytes; public class HBaseUtils { public void put(String string) throws IOException { //设置HBase据库的连接配置参数 Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", "192.168.5.128"); // Zookeeper的地址 conf.set("hbase.zookeeper.property.clientPort", "42182"); Random random = new Random(); long a = random.nextInt(1000000000); String tableName = "emp"; String rowkey = "rowkey"+a ; String columnFamily = "basicinfo"; String column = "empname"; //String value = string; HTable table=new HTable(conf, tableName); Put put=new Put(Bytes.toBytes(rowkey)); put.add(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(string)); table.put(put);//放入表 table.close();//释放资源 } }

 

最后再加上consumer的配置文件就大功告成了。

 

 

 

 

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
public interface KafkaProperties { final static String zkConnect = "hadoop0:42182,hadoop1:42182,hadoop2:42182,hadoop3:42182"; final static String groupId1= "group1"; final static String topic = "test3"; final static String kafkaServerURL = "hadoop0,hadoop1"; final static int kafkaServerPort = 9092; final static int kafkaProducerBufferSize = 64 * 1024; final static int connectionTimeOut = 20000; final static int reconnectInterval = 10000; final static String clientId = "SimpleConsumerDemoClient"; }

 

 

 

然后执行consumer就可以了,注意要保证topic中有消息才可以消费。

 

 

 

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class KafkaConsumerProducerTest { public static void main(String[] args) { // KafkaProducer1 producerThread1 = new KafkaProducer1(KafkaProperties.topic); // producerThread1.start(); // KafkaProducer2 producerThread2 = new KafkaProducer2(KafkaProperties.topic); // producerThread2.start(); // KafkaProducer3 producerThread3 = new KafkaProducer3(KafkaProperties.topic); // producerThread3.start(); // KafkaConsumer1 consumerThread1 = new KafkaConsumer1(KafkaProperties.topic); // consumerThread1.start(); // KafkaConsumer2 consumerThread2 = new KafkaConsumer2(KafkaProperties.topic); // consumerThread2.start(); KafkaConsumer3 consumerThread3 = new KafkaConsumer3(KafkaProperties.topic); consumerThread3.start(); // KafkaConsumer4 consumerThread4 = new KafkaConsumer4(KafkaProperties.topic); // consumerThread4.start(); } }

 

 

在HBase客户端执行

            hbase(main):063:0> scan  'emp'  

就可以查看到数据了。

以上就是kafka数据进入Hbase的一个例子,当然上诉只是保证数据走通了,大家在具体项目中什么需求,还需要自行修改和完善。

 

 

---
更多文章关注公众号

更多:kafka深入理解专栏

——————————————————————————————————

作者:桃花惜春风

转载请标明出处,原文地址:  

https://blog.csdn.net/xiaoyu_bd/article/details/52305355

如果感觉本文对您有帮助,请留下您的赞,您的支持是我坚持写作最大的动力,谢谢!

 

 

 

 

 

最后

以上就是忐忑大碗最近收集整理的关于kafka数据导入hbase的全部内容,更多相关kafka数据导入hbase内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(53)

评论列表共有 0 条评论

立即
投稿
返回
顶部