概述
HBase put一条数据 Region 路由规则
1.客户端put接口
org.apache.hadoop.hbase.client.HTableInterface.put(Put put)
org.apache.hadoop.hbase.client.HTable.put
public void put(final Put put) throws IOException {
//缓存数据
doPut(put);
if (autoFlush) {
//提交数据刷写到磁盘请求
flushCommits();
}
}
2.提交写请求
org.apache.hadoop.hbase.client.HTable.flushCommits
public void flushCommits() throws IOException {
try {
Object[] results = new Object[writeBuffer.size()];
try {
//提交数据刷下请求
this.connection.processBatch(writeBuffer, tableName, pool, results);
} catch (InterruptedException e) {
throw new IOException(e);
} finally {
...
} finally {
if (clearBufferOnFail) {
writeBuffer.clear();//清理客户端缓存数据
currentWriteBufferSize = 0;
} else {
// 计算客户端缓存数据大小
currentWriteBufferSize = 0;
for (Put aPut : writeBuffer) {
currentWriteBufferSize += aPut.heapSize();
}
}
}
}
3.处理批量写
org.apache.hadoop.hbase.client.HConnection.processBatch()
.HConnectionImplementation.processBatch()
public void processBatch(List<? extends Row> list,
final byte[] tableName,
ExecutorService pool,
Object[] results) throws IOException, InterruptedException {
// This belongs in HTable!!! Not in here. St.Ack
// results must be the same size as list
if (results.length != list.size()) {
throw new IllegalArgumentException("argument results must be the same size as argument list");
}
//处理批量写
processBatchCallback(list, tableName, pool, results, null);
}
//这个方法非常重要,在这里定义了,哪些rowkey对应的数据应该存放到那个Region上,然后将相应的数据提交到Region对应的RegionServer上
public <R> void processBatchCallback(
List<? extends Row> list,
byte[] tableName,
ExecutorService pool,
Object[] results,
Batch.Callback<R> callback)
throws IOException, InterruptedException {
...
// step 1: 分解Regionserver块并构建对应的数据结构
//HRegionLocation这个对象至关重要,他定义了Region相关的信息HRegionInfo,RegionServer Name、port
Map<HRegionLocation, MultiAction<R>> actionsByServer =
new HashMap<HRegionLocation, MultiAction<R>>();
for (int i = 0; i < workingList.size(); i++) {
Row row = workingList.get(i);
if (row != null) {
//下面这句是整个put数据路由的核心,将提交的数据根据row分类到不同的Region上
HRegionLocation loc = locateRegion(tableName, row.getRow());
byte[] regionName = loc.getRegionInfo().getRegionName();
MultiAction<R> actions = actionsByServer.get(loc);
if (actions == null) {
actions = new MultiAction<R>();
actionsByServer.put(loc, actions);
}
Action<R> action = new Action<R>(row, i);
lastServers[i] = loc;
actions.add(regionName, action);
}
}
// step 2: 提交请求到相应的RegionServer上去处理
Map<HRegionLocation, Future<MultiResponse>> futures =
new HashMap<HRegionLocation, Future<MultiResponse>>(
actionsByServer.size());
for (Entry<HRegionLocation, MultiAction<R>> e: actionsByServer.entrySet()) {
futures.put(e.getKey(), pool.submit(createCallable(e.getKey(), e.getValue(), tableName)));
}
// step 3:收集写入成功是失败的返回结果
...
// step 4: 对写入失败的数据进行重试.
...
}
public HRegionLocation relocateRegion(final byte [] tableName,
final byte [] row)
throws IOException{
...
return locateRegion(tableName, row, false, true);
}
4.处理批量写
org.apache.hadoop.hbase.client.HConnectionImplementation.locateRegion()
private HRegionLocation locateRegion(final byte [] tableName,
final byte [] row, boolean useCache, boolean retry)
throws IOException {
...
//确保ZK是正常的
ensureZookeeperTrackers();
if (Bytes.equals(tableName, HConstants.ROOT_TABLE_NAME)) {
...
} else if (Bytes.equals(tableName, HConstants.META_TABLE_NAME)) {
...
} else {//用户表的数据插入是调用下面的这个操作
// Region not in the cache - have to go to the meta RS
return locateRegionInMeta(HConstants.META_TABLE_NAME, tableName, row,
useCache, userRegionLock, retry);
}
}
private HRegionLocation locateRegionInMeta(final byte [] parentTable,
final byte [] tableName, final byte [] row, boolean useCache,
Object regionLockObject, boolean retry)
throws IOException {
HRegionLocation location;
//如果客户端保存的缓存,从缓存中直接查询
if (useCache) {
location = getCachedLocation(tableName, row);
if (location != null) {
return location;
}
}
//以下是如果客户端没有ZK缓存,从ZOOKEEPER -> -ROOT- -> .META.将这些数据缓存到客户端,然后再去从.META.表中数去判定row应该路由到那个Region上
...
}
HRegionLocation getCachedLocation(final byte [] tableName,
final byte [] row) {
SoftValueSortedMap<byte [], HRegionLocation> tableLocations =
getTableLocations(tableName);
....
//判断row对应的rowkey应该在哪个Region上,应该将请求发给哪个RegionServer
byte[] endKey = possibleRegion.getRegionInfo().getEndKey();
if (Bytes.equals(endKey, HConstants.EMPTY_END_ROW) ||
KeyValue.getRowComparator(tableName).compareRows(
endKey, 0, endKey.length, row, 0, row.length) > 0) {
return possibleRegion;
}
return null;
}
5.下面的事情就是RegionServer去将数据写道Memstore,StoreFile,HFile了
最后
以上就是包容冰棍为你收集整理的HBase 写入数据Region路由机制的全部内容,希望文章能够帮你解决HBase 写入数据Region路由机制所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复