HBase put一条数据 Region 路由规则
1.客户端put接口
复制代码
1
2
3
4
5
6
7
8
9
10org.apache.hadoop.hbase.client.HTableInterface.put(Put put) org.apache.hadoop.hbase.client.HTable.put public void put(final Put put) throws IOException { //缓存数据 doPut(put); if (autoFlush) { //提交数据刷写到磁盘请求 flushCommits(); } }
2.提交写请求
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24org.apache.hadoop.hbase.client.HTable.flushCommits public void flushCommits() throws IOException { try { Object[] results = new Object[writeBuffer.size()]; try { //提交数据刷下请求 this.connection.processBatch(writeBuffer, tableName, pool, results); } catch (InterruptedException e) { throw new IOException(e); } finally { ... } finally { if (clearBufferOnFail) { writeBuffer.clear();//清理客户端缓存数据 currentWriteBufferSize = 0; } else { // 计算客户端缓存数据大小 currentWriteBufferSize = 0; for (Put aPut : writeBuffer) { currentWriteBufferSize += aPut.heapSize(); } } } }
3.处理批量写
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55org.apache.hadoop.hbase.client.HConnection.processBatch() .HConnectionImplementation.processBatch() public void processBatch(List<? extends Row> list, final byte[] tableName, ExecutorService pool, Object[] results) throws IOException, InterruptedException { // This belongs in HTable!!! Not in here. St.Ack // results must be the same size as list if (results.length != list.size()) { throw new IllegalArgumentException("argument results must be the same size as argument list"); } //处理批量写 processBatchCallback(list, tableName, pool, results, null); } //这个方法非常重要,在这里定义了,哪些rowkey对应的数据应该存放到那个Region上,然后将相应的数据提交到Region对应的RegionServer上 public <R> void processBatchCallback( List<? extends Row> list, byte[] tableName, ExecutorService pool, Object[] results, Batch.Callback<R> callback) throws IOException, InterruptedException { ... // step 1: 分解Regionserver块并构建对应的数据结构 //HRegionLocation这个对象至关重要,他定义了Region相关的信息HRegionInfo,RegionServer Name、port Map<HRegionLocation, MultiAction<R>> actionsByServer = new HashMap<HRegionLocation, MultiAction<R>>(); for (int i = 0; i < workingList.size(); i++) { Row row = workingList.get(i); if (row != null) { //下面这句是整个put数据路由的核心,将提交的数据根据row分类到不同的Region上 HRegionLocation loc = locateRegion(tableName, row.getRow()); byte[] regionName = loc.getRegionInfo().getRegionName(); MultiAction<R> actions = actionsByServer.get(loc); if (actions == null) { actions = new MultiAction<R>(); actionsByServer.put(loc, actions); } Action<R> action = new Action<R>(row, i); lastServers[i] = loc; actions.add(regionName, action); } } // step 2: 提交请求到相应的RegionServer上去处理 Map<HRegionLocation, Future<MultiResponse>> futures = new HashMap<HRegionLocation, Future<MultiResponse>>( actionsByServer.size()); for (Entry<HRegionLocation, MultiAction<R>> e: actionsByServer.entrySet()) { futures.put(e.getKey(), pool.submit(createCallable(e.getKey(), e.getValue(), tableName))); } // step 3:收集写入成功是失败的返回结果 ... // step 4: 对写入失败的数据进行重试. ... }
复制代码
1
2
3
4
5
6public HRegionLocation relocateRegion(final byte [] tableName, final byte [] row) throws IOException{ ... return locateRegion(tableName, row, false, true); }
4.处理批量写
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47org.apache.hadoop.hbase.client.HConnectionImplementation.locateRegion() private HRegionLocation locateRegion(final byte [] tableName, final byte [] row, boolean useCache, boolean retry) throws IOException { ... //确保ZK是正常的 ensureZookeeperTrackers(); if (Bytes.equals(tableName, HConstants.ROOT_TABLE_NAME)) { ... } else if (Bytes.equals(tableName, HConstants.META_TABLE_NAME)) { ... } else {//用户表的数据插入是调用下面的这个操作 // Region not in the cache - have to go to the meta RS return locateRegionInMeta(HConstants.META_TABLE_NAME, tableName, row, useCache, userRegionLock, retry); } } private HRegionLocation locateRegionInMeta(final byte [] parentTable, final byte [] tableName, final byte [] row, boolean useCache, Object regionLockObject, boolean retry) throws IOException { HRegionLocation location; //如果客户端保存的缓存,从缓存中直接查询 if (useCache) { location = getCachedLocation(tableName, row); if (location != null) { return location; } } //以下是如果客户端没有ZK缓存,从ZOOKEEPER -> -ROOT- -> .META.将这些数据缓存到客户端,然后再去从.META.表中数去判定row应该路由到那个Region上 ... } HRegionLocation getCachedLocation(final byte [] tableName, final byte [] row) { SoftValueSortedMap<byte [], HRegionLocation> tableLocations = getTableLocations(tableName); .... //判断row对应的rowkey应该在哪个Region上,应该将请求发给哪个RegionServer byte[] endKey = possibleRegion.getRegionInfo().getEndKey(); if (Bytes.equals(endKey, HConstants.EMPTY_END_ROW) || KeyValue.getRowComparator(tableName).compareRows( endKey, 0, endKey.length, row, 0, row.length) > 0) { return possibleRegion; } return null; }
5.下面的事情就是RegionServer去将数据写道Memstore,StoreFile,HFile了
最后
以上就是包容冰棍最近收集整理的关于HBase 写入数据Region路由机制的全部内容,更多相关HBase内容请搜索靠谱客的其他文章。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复