概述
哈希表在数据库的实现中可以被很多算子使用,比如aggregation,join或者distinct这些物理算子。构建哈希表的时候,必须指定有多少个bucket,下面我们以6个bucket为例,当我们指定哈希规则为模6的话,哈希表初始化的时候是建立6个bucket。首先由6个指针分别指向这6个bucket,然后申请具体的6个bucket所需要的空间,且每个bucket所需要的空间后面有两个指针,这两个指针一个指向这个bucket中的空闲位置,还有一个是指向当此bucket溢出之后重新申请空间的bucket,比如图中的第一个bucket分区。
存入哈希表中数据的时候,有分配函数首先检验是否还有空间提供,如果没有空间代表桶已经溢出,当溢出的时候,重新申请空间放在最后一个桶之后,如果还有空间即分配。分配函数在多线程插入哈希表的时候可以加锁,或者将所得粒度减的更小,将锁放在bucket中维护。使并发度增大。见代码:
/*
* HashTable.cpp
*
*
Created on: 2013-8-29
*
Author: casa
*/
#include "HashTable.h"
HashTable::HashTable(unsigned n_buckets,unsigned bucket_size,unsigned tuple_size)
:n_buckets(n_buckets),bucket_size(bucket_size),tuple_size(tuple_size){
//首先,申请指针空间,指向hashtable中的某一块
buckets_start=(void **)malloc(sizeof(void *)*n_buckets);
//先用block大小来对齐cache line的空间,再来申请内存
repo_size=n_buckets*get_cacheline_aligned_space(bucket_size);
//申请内存
char *cur_repo=(char*)malloc(repo_size);
repo_start=cur_repo;
//内存清零
memset(cur_repo,0,repo_size);
//将这个内存仓库装进仓库list中,因为HashTable构造的时候是先分配一部分,
//然后再插入的时候,可能hash表中一个hash值的bucket的个数不够,或者已
//经仓库中的内存用完,就再要申请一些内存来分配
repo_list.push_back(cur_repo);
//已经从cur_repo中分出去的内存是0
cur_repo_dis=0;
for(unsigned i=0;i<n_buckets;i++){
//如果剩下的空间不足cache line对齐的一块,再申请一个内存仓库
if(get_cacheline_aligned_space(bucket_size)>(repo_size-cur_repo_dis)){
cur_repo=(char*)malloc(repo_size);
memset(cur_repo,0,repo_size);
repo_list.push_back(cur_repo);
cur_repo_dis=0;
}
//这里是以一个get_cacheline_aligned_space(bucket_size)的大小,而
//并不是bucket_size的大小来分配的
buckets_start[i]=(void *)(cur_repo+cur_repo_dis);
cur_repo_dis+=get_cacheline_aligned_space(bucket_size);
//一个bucket中的“最后一位和倒数第二位”是指向指针的指针
//分别指向的是next(下一块)记录和free记录
void **free=(void **)((char *)buckets_start[i]+bucket_size);
void **next=(void **)((char *)buckets_start[i]+bucket_size+sizeof(void *));
*free=buckets_start[i];
*next=0;
}
}
HashTable::~HashTable() {
for(unsigned i=0;i<repo_list.size();i++){
free(repo_list.at(i));
}
free(buckets_start);
}
//这个函数给tuple分配内存,参数是key经过hash之后应该放在hashtable中的哪个块中,
//然后返回的是分配之后的地址
void * HashTable::allocate(unsigned offset){
//得到指向第offset块的指针
void *data=(char *)repo_start+get_cacheline_aligned_space(bucket_size);
//找到free的指针
void **free_p=(void **)((char *)data+bucket_size);
//定义一个返回的指针ret
void *ret;
//如果还能存下一块的话,就他妈存
if((*free_p)<(char *)data+bucket_size-tuple_size){
ret=*free_p;
//更新那个free_p的值,因为你的free改变了
*free_p=(char *)(*free_p)+tuple_size;
return ret;
}
/*如果到这里了,证明相应的key所对应的bucket没有空间了,所以现在就是要再在内存仓
库中申请内存,如果内存仓库中也没有了,那就再申请一个内存仓库*/
//首先获取到现在的repo的地址
char *cur_repo_=repo_list.at(repo_list.size()-1);
if(get_cacheline_aligned_space(bucket_size)>(repo_size-cur_repo_dis)){
cur_repo_=(char *)malloc(repo_size);
memset(cur_repo_,0,repo_size);
repo_list.push_back(cur_repo_);
cur_repo_dis=0;
}
ret=cur_repo_+cur_repo_dis;
cur_repo_dis+=get_cacheline_aligned_space(bucket_size);
//将下一块的地址填入相应的位置
void **next_p=(void **)((char *)data+bucket_size+sizeof(void *));
*next_p=ret;
//设置新申请的块的最后两个标志
free_p=(void **)((char *)ret+bucket_size);
*free_p=ret;
next_p=(void **)((char *)ret+bucket_size-sizeof(void *));
//最后一块
*next_p=0;
return ret;
}
/*
* HashTable.h
*
*
Created on: 2013-8-29
*
Author: casa
*/
#ifndef HASHTABLE_H_
#define HASHTABLE_H_
#include <stdlib.h>
#include <string.h>
#include <vector>
using namespace std;
#define CACHE_LINE 64
//如果不和cacheline对齐的话性能损失有多大?
inline unsigned get_cacheline_aligned_space(unsigned bucket_size){
return ((bucket_size+2*sizeof(void*)-1)/CACHE_LINE+1)*CACHE_LINE;
}
class HashTable {
public:
class HashTableIterator{
friend class HashTable;
public:
HashTableIterator(unsigned bucket_size,unsigned tuple_size)
:bucket_size_it(bucket_size),tuple_size_it(tuple_size),cur_it(0),
free_it(0),next_it(0){
};
virtual ~HashTableIterator(){};
//这个函数读取下一个tuple,返回下一个tuple的位置
inline void *readnext(){
void *ret;
//some ugly here
//TODO: do something strong
if(cur_it<free_it){
ret=cur_it;
cur_it=(char *)cur_it+tuple_size_it;
return ret;
}
else if(next_it!=0){
//在这个链中还有bucket没有被遍历的时候,切换到下一个bucket
ret=next_it;
cur_it=(char *)next_it+tuple_size_it;
//这个指针指向一个地址,转化为指向指针的指针
free_it=*(void **)((char *)next_it+bucket_size_it);
next_it=*(void **)((char *)next_it+bucket_size_it+sizeof(void *));
return ret< free_it? free_it: 0;
}
else{
return 0;
}
}
private:
unsigned bucket_size_it;
unsigned tuple_size_it;
void *cur_it;
void *free_it;
void *next_it;
void *bucketlink_start_it;
};
HashTable(unsigned n_buckets,unsigned bucket_size,unsigned tuple_size);
virtual ~HashTable();
void *allocate(unsigned offset);
bool placeIterator(HashTableIterator &it,unsigned int offset){
if(offset>=n_buckets)
return false;
//这个地址是key的hash值所对应的hash链中的首地址
void *start=repo_start+offset*get_cacheline_aligned_space(bucket_size);
it.cur_it=start;
//取到的是free的地址
it.free_it=*(void **)((char *)start+bucket_size);
it.next_it=*(void **)((char *)start+bucket_size+sizeof(void *));
it.tuple_size_it=tuple_size;
it.bucketlink_start_it=start;
it.bucket_size_it=bucket_size;
return true;
}
HashTable::HashTableIterator createIterator(){
return HashTable::HashTableIterator(bucket_size,tuple_size);
}
private:
//hash表有几个bucket
unsigned n_buckets;
//一个bucket多大
unsigned bucket_size;
//一个tuple大小
unsigned tuple_size;
//指向bucket指针的起始地址
void **buckets_start;
//一个内存仓库的大小,每个bucket从这个内存仓库中取内存,如果没有内存了,就再建一个内存仓库
unsigned repo_size;
char *repo_start;
//当前内存仓库分配的内存大小
unsigned cur_repo_dis;
//内存仓库的list
std::vector<char *> repo_list;
};
#endif /* HASHTABLE_H_ */
此哈希表的好处:1,由于数据定长,所以利用这种结构,内存逻辑地址连续。
2,加锁的粒度小,并发量可以保证,且如果数据尽量平铺,不需要申请空间。
最后
以上就是粗暴流沙为你收集整理的C++实现高性能哈希表的全部内容,希望文章能够帮你解决C++实现高性能哈希表所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复