概述
数据库数据缓存类设计
对于需要从数据库中获取的数据,如果频繁的进行数据库操作,在高并发的情况下会对数据库造成较大的压力, 此时可以对每次数据的查询结果进行缓存,并进行定期的更新,可以有效的解决这个问题。
在游戏中,关于服务端对于非玩家本身数据的加载获取与更新, 多设计到数据库的操作, 其中关于好友和战队成员等信息往往需要从数据库中获取,但如果每次都从数据库获取,那么大量玩家操作的情况下就会对数据库造成一定的压力, 此外好友和战队队员的信息在游戏中不断的变化,玩家想要获取这种变化就不得不去更新这些数据,从数据库操作会造成数据库的压力, 直接由变化的玩家向外辐射又会造成大量的夸进程rpc通信,那么一个折中的策略就是缓存。缓存的概念比较古老,技术上最鲜为人知的应用就是电脑磁盘与内存,早期内存空间有限, 访问速度较快, 磁盘空间大但速度很慢, 这使得缓存技术有了较大的发展。 鉴于此,我们可以在每个游戏进程上对需要从数据库中获得的信息进行缓存, 这样可以避免夸进程的通信。既然是缓存就会存在数据过期的情况, 更新数据的时机和策略就是显得十分重要。
class DataCacheManager(object):
"""
数据库内数据缓存管理器, 每个进程一份, 提供从数据库读取数据并进行缓存,
缓存空间有限, 当前采用FIFO的策略, 之后可改进LRU
为分布式数据提供数据访问点, 避免夸进程通信
"""
def __init__(self):
"""
初始化数据缓存管理器
"""
super(DataCacheManager, self).__init__()
self.logger = LogManager.get_logger("server.DataCacheManager")
# 数据缓存字典,以数据表名称进行分类缓存, 方便进行动态添加
self._data_cache_dict = {}
# 数据获取所需的必要信息
self._data_fetch_info = {}
def register_data_type_info(self, db_name, key_str, fields, cache_time, opacity):
"""
注册想要缓存的数据表数据的相关信息, 可重复注册, 但是会删除之前注册与缓存的所有信息哦
@ db_name: 数据表的名称
@ key_str: 数据表中查询的键值名称, 必须是创建了索引的键值!!!!!!
@ fields: 需要缓存的字段名称列表
@ cache_time: 数据缓存的时间
@ opacity: 缓存容量的大小
"""
# 检查参数类型
if not isinstance(fields, list):
return
if key_str not in fields:
fields.append(key_str)
# 添加数据查询信息
self._data_fetch_info[db_name] = {
'key_str': key_str,
'fields': fields,
}
# 创建数据缓冲对象
self._data_cache_dict[db_name] = LRUData(cache_time, opacity)
def unregister_data_type_info(self, db_name):
"""
数据表信息注册取消
@ db_name: 数据表的名称
"""
self._data_fetch_info.pop(db_name, 0)
self._data_cache_dict.pop(db_name, 0)
def fetch_data(self, db_name, key_id_list, done_cb=None, refresh=False):
"""
获取缓存的数据, 单个或者多个都可以
@ db_name: 数据表的名称
@ key_id_list: 需要查询的数据索引Key列表
@ done_cb: 获取数据后的回调函数
@ refresh: 是否直接绕过缓存从数据库中获取与更新
"""
# 检查想要获取的数据类型是否已经注册必要信息
data_fetch_info = self._data_fetch_info.get(db_name, {})
if not data_fetch_info:
return
# 包装key_id
if not isinstance(key_id_list, list):
key_id_list = [key_id_list]
# 计算需要查询的数据
no_cached_key_list = []
if refresh:
no_cached_key_list = key_id_list
else:
data_cache_obj = self._data_cache_dict[db_name]
for key_id in key_id_list:
if data_cache_obj.get_data(key_id) is None and key_id not in no_cached_key_list:
no_cached_key_list.append(key_id)
# 如果不需要查询就直接返回结果
if not no_cached_key_list:
self.get_data_from_cache(db_name, key_id_list, done_cb=done_cb)
return
# 从数据库中查询数据需要请求的数据
query = {data_fetch_info.get('key_str'): {'$in': no_cached_key_list, }}
fields = data_fetch_info.get('fields')
db_mgr.find(db_name, query, fields, limit=len(no_cached_key_list),
callback=lambda docs: self.fetch_data_cb(db_name, docs, key_id_list, done_cb))
def fetch_data_cb(self, db_name, docs, key_id_list, done_cb):
"""
数据库信息加载回调
"""
# 规范查询结果
if not docs:
self.get_data_from_cache(db_name, key_id_list, done_cb=done_cb)
return
if isinstance(docs, dict):
docs = [docs]
# 获取键值
data_fetch_info = self._data_fetch_info.get(db_name, {})
if not data_fetch_info:
return
key_str = data_fetch_info.get('key_str')
# 缓存数据
data_cache_obj = self._data_cache_dict[db_name]
for info in docs:
uid = info.get(key_str)
data_cache_obj.push_data(uid, info)
# 查询缓存中的数据
self.get_data_from_cache(db_name, key_id_list, done_cb=done_cb)
def get_data_from_cache(self, db_name, key_id_list, done_cb=None):
"""
从缓存中获取数据, 并执行获取数据后的回调
"""
# 从缓存中获取数据
fetch_result_info = {}
data_cache_obj = self._data_cache_dict.get(db_name)
if data_cache_obj is None:
return fetch_result_info
for key_id in key_id_list:
data = data_cache_obj.get_data(key_id)
if data:
fetch_result_info[key_id] = data
# 执行数据获取后的回调
done_cb and done_cb(fetch_result_info)
return fetch_result_info
数据库缓存类DataCacheManager通过 register_data_type_info和 unregister_data_type_info 接口对数据库操作进行注册, 注册_id 和要查询的字段, 注册必要的查询信息后就能从数据表中查询获取必要的信息。
fetch_data 接口则是从缓存中取出数据,其包含从数据库中获取和从缓存中获取, 具体看数据是否在缓存中和数据是否过期,而数据对象LRUData 提供了数据的基本操作, 是真正缓存数据的对象。 同时获取数据有回调函数, 在完成数据的获取后就会执行回调。此外接口还提供了强行更新的参数,以便需要最新数据时直接从数据库进行查询,而非缓存。
LRUData 类是数据缓存类, 提供了两个接口, 数据的获取和插入, 数据在插入时会有一个时间戳, 以便判定数据在取出时是否过期, 缓存空间的大小必须有限,因此空间不足时需要对数据进行剔除, 这是需要数据移除策略, 常用的策略有FIFO、LRU、频率最低等。
FIFO缓存策略:
首先缓存两个基本的特点, 一个是缓存空间有限, 另一个是数据过期,缓存空间有限就需要我们对有限的控件进行利用,数据的置换策略非常重要,基本的置换策略有FIFO、LRU、LFU,其中FIFO最为简单,其基本假设就是最近被加载进来的数据下次使用到的可能性大于之前被加载进来的数据,对于符合这种假设的场景较为适用。其python代码如下:
class FIFOCache(object):
"""
数据缓存类, 当前使用FIFO策略, 设置过期时间戳
"""
def __init__(self, cache_time, opacity):
"""
数据缓存类初始化
"""
# 数据缓存的时间长短, 秒为单位
self.cache_time = cache_time
# 数据池的大小
self.opacity = opacity
# 当前数据的索引
self._index = 0
# 缓存的字典key的列表
self._key_list = [None for i in xrange(self.opacity)]
# 缓存数据的查询字典
self._data_cache = {}
def push_data(self, key, data):
"""
将数据缓存
"""
if not isinstance(data, dict):
return
# 数据进来的时候的时间戳
data['_time_stamps'] = int(time.time())
# 如果key已经在字典中, 直接更新, 保持list中key不同
if key in self._data_cache:
self._data_cache[key] = data
return
# 索引循环
if self._index >= self.opacity:
self._index = 0
# 删除老的数据
old_key = self._key_list[self._index]
self._data_cache.pop(old_key, 0)
# 加入新的数据
self._data_cache[key] = data
# 索引递增
self._key_list[self._index] = key
self._index += 1
def get_data(self, key):
"""
取出缓存的数据
"""
# 检查下出来时候时间戳
data = self._data_cache.get(key, {})
time_stamp = data.get('_time_stamps', 0)
if time.time() - time_stamp < self.cache_time:
return data
else:
return None
def destory(self):
"""
缓存的销毁
"""
self._data_cache.clear()
self._key_list = []
LRU缓存算法:
LRU缓存算法采取的缓存置换策略是, 当缓存空间满时新来的数据置换到未使用时间最长的那个,实现中采用双端队列, 将每次访问到的数据放在队列的最前端,从而保证队列里的数据是按使用时间有序的。Python代码如下:
class ListNode(object):
"""
双向链表节点类
"""
def __init__(self, data=None):
""" 双向链表节点类的初始化 """
self.data = data
self.pre_node = None
self.next_node = None
def clear_pointer(self):
""" 清空指针避免不必要的链接错误 """
self.pre_node = None
self.next_node = None
def destory(self):
""" 节点的销毁 """
self.clear_pointer()
self.data = None
class DoubleLinkList(object):
"""
双向链表类
"""
def __init__(self):
""" 双向链表节点类的初始化 """
self._head_node = ListNode()
self._tail_node = ListNode()
self._head_node.next_node = self._tail_node
self._tail_node.pre_node = self._head_node
self._size = 0
def get_head(self):
""" 获取头结点 """
if self._head_node.next_node is self._tail_node:
return None
return self._head_node.next_node
def get_tail(self):
""" 获取尾节点 """
if self._tail_node.pre_node is self._head_node:
return None
return self._tail_node.pre_node
def show_link_list(self):
""" 显示下链表 """
next_node = self._head_node.next_node
index = 1
while next_node:
temp_node = next_node.next_node
print '---->', index, next_node.data
index += 1
next_node = temp_node
def _insert_node(self, node, pre_node, next_node):
""" 将节点插入到两个节点之间 """
node.clear_pointer()
pre_node.next_node = node
next_node.pre_node = node
node.pre_node = pre_node
node.next_node = next_node
self._size += 1
def _remove_node(self, node):
""" 将节点移除双向链表 """
if node is self._head_node or node is self._tail_node:
return None
pre_node = node.pre_node
next_node = node.next_node
if pre_node:
pre_node.next_node = next_node
if next_node:
next_node.pre_node = pre_node
self._size -= 1
node.clear_pointer()
return node
def push_back(self, node):
""" 队尾添加节点 """
if not isinstance(node, ListNode):
return
pre_node = self._tail_node.pre_node
next_node = self._tail_node
self._insert_node(node, pre_node, next_node)
def push_front(self, node):
""" 队头添加元素 """
if not isinstance(node, ListNode):
return
pre_node = self._head_node
next_node = self._head_node.next_node
self._insert_node(node, pre_node, next_node)
def pop_back(self):
""" 队尾推出 """
return self._remove_node(self._tail_node.pre_node)
def pop_front(self):
""" 队头推出 """
return self._remove_node(self._head_node.next_node)
def remove_node(self, node):
""" 去除指定的链表节点 """
return self._remove_node(node)
def get_size(self):
""" 计算链表的长度 """
return self._size
def clear_node(self):
""" 清除所有的链表节点 """
next_node = self._head_node.next_node
while next_node:
temp_node = next_node.next_node
next_node.destory()
next_node = temp_node
self._head_node.next_node = self._tail_node
self._tail_node.pre_node = self._head_node
self._size = 0
def destory(self):
""" 链表的销毁 """
self.clear_node()
self._head_node and self._head_node.destory()
self._tail_node and self._tail_node.destory()
class LRUCache(object):
"""
最近未使用的缓存策略类
"""
def __init__(self, cache_time, opacity):
"""
最近未使用策略缓存
"""
# 缓存类的参数设置
self._dity_time = cache_time
self._opacity = opacity
# 数据缓存类的内部变量
self._data_map = {} # {key: Node}
self._link_list = DoubleLinkList() # 双向链表
def push_data(self, key, data):
"""
数据的存储
"""
if not isinstance(data, dict):
return
# 数据进来的时候的时间戳
data['_key'] = key
data['_time_stamps'] = int(time.time())
# 数据已经在字典中直接更新
if key in self._data_map:
if self._data_map[key]:
self._data_map[key].data = data
return
else:
self._data_map.pop(key, 0)
# 将数据插入到双端队列头部
node = ListNode(data)
if self._link_list.get_size() >= self._opacity:
removed_node = self._link_list.pop_back()
if removed_node and removed_node.data:
self._data_map.pop(removed_node.data.get('_key'), 0)
self._link_list.push_front(node)
self._data_map[key] = node
def get_data(self, key):
"""
数据的访问
"""
# 检出数据的时间戳
node = self._data_map.get(key)
if not node or not node.data:
return None
time_stamp = node.data.get('_time_stamps', 0)
if time.time() - time_stamp < self._dity_time:
# 将最近访问的数据放在双端队列的队头
self._link_list.remove_node(node)
self._link_list.push_front(node)
return node.data
else:
return None
def destory(self):
"""
缓存的清空
"""
self._data_map.clear()
self._link_list.destory()
LFU缓存策略:
LFU缓存算法采取的缓存置换策略是, 当缓存空间满时新来的数据置换使用频率最低的那个缓存数据,实现中采用最小堆维护数据的访问次数,每次插入都从堆的顶部删除数据, 再将新的数据插入。Python代码如下:
class HeapNode(object):
"""
数据堆的节点
"""
def __init__(self, data):
""" 数据堆节点初始化 """
self.index = 0
self.data = data
self.use_times = 1
def __str__(self):
return self.data
def __gt__(self, other):
""" 大于运算符 """
return self.use_times > other.use_times
def __lt__(self, other):
""" 小于运算符 """
return self.use_times < other.use_times
def destory(self):
""" 数据释放 """
self.data = None
self.use_times = 1
class EasyHeap(object):
"""
简单的堆结构, 不用内置的原因是内置的堆操作列表事件复杂度为O(n)
"""
def __init__(self, opacity):
""" 数据结构的初始化 """
# 参数初始化
self._opacity = opacity
self._heap = [None for i in xrange(self._opacity)]
self._size = 0
def _get_parent(self, index):
""" 获取父节点 """
if index <= 0:
return None
parent_index = (index - 1) / 2
return self._heap[parent_index]
def _get_left_child(self, index):
""" 获取左孩子 """
if 2 * index + 1 < self._size:
return self._heap[2 * index + 1]
return None
def _get_right_child(self, index):
""" 获取右节点 """
if 2 * index + 2 < self._size:
return self._heap[2 * index + 2]
return None
def _heapfix_up(self, node):
""" 节点上升 """
if not node:
return
index = node.index
# 节点上升
parent_node = self._get_parent(index)
if parent_node and node < parent_node:
self._exchange_node(parent_node, node)
self._heapfix_up(node)
def _heapfix_down(self, node):
""" 节点下降 """
if not node:
return
index = node.index
# 挑选父子节点中最小的节点
min_child = node
left_node = self._get_left_child(index)
min_child = left_node if left_node and left_node < min_child else min_child
right_node = self._get_right_child(index)
min_child = right_node if right_node and right_node < min_child else min_child
# 选择最小的节点进行下降
if min_child is not node:
self._exchange_node(min_child, node)
self._heapfix_down(node)
def _exchange_node(self, node1, node2):
""" 交换两个节点 """
node1.index, node2.index = node2.index, node1.index
self._heap[node1.index] = node1
self._heap[node2.index] = node2
def heapfix(self, node):
""" 调整节点位置 """
self._heapfix_up(node)
self._heapfix_down(node)
def get_head(self):
""" 获取头部节点 """
if self._size <= 0:
return None
return self._heap[0]
def heappop(self):
""" 推出头部节点 """
if self._size <= 0:
return None
# 交换头尾节点
head_node = self._heap[0]
tail_node = self._heap[self._size - 1]
self._exchange_node(head_node, tail_node)
self._size -= 1
# 节点调整
self.heapfix(self._heap[0])
return head_node
def heappush(self, node):
""" 将节点加到到堆中 """
node.index = self._size
self._heap[self._size] = node
self._size += 1
# 节点调整
self.heapfix(self._heap[self._size - 1])
def get_size(self):
""" 获取堆的尺寸 """
return self._size
def show_heap(self):
""" 显示整个堆 """
result = []
for node in self._heap:
node and result.append([node.data.get('_key'), node.use_times, node.index])
print '======', result
def destory(self):
""" 缓存的清空 """
for node in self._heap:
node and node.destory()
class LFUCache(object):
"""
最近最少使用的缓存策略类
"""
def __init__(self, cache_time, opacity):
"""
最近未使用策略缓存
"""
# 缓存类的参数设置
self._dity_time = cache_time
self._opacity = opacity
# 数据缓存类的内部变量
self._data_map = {} # {key: Node}
self._heap = EasyHeap(self._opacity) # 数据堆
def push_data(self, key, data):
"""
数据的存储
"""
if not isinstance(data, dict):
return
# 数据进来的时候的时间戳
data['_key'] = key
data['_time_stamps'] = int(time.time())
# 数据已经在字典中直接更新
if key in self._data_map:
if self._data_map[key]:
self._data_map[key].data = data
return
else:
self._data_map.pop(key, 0)
# 将数据插入到最小堆中
node = HeapNode(data)
if self._heap.get_size() >= self._opacity:
removed_node = self._heap.heappop()
if removed_node and removed_node.data:
self._data_map.pop(removed_node.data.get('_key'), 0)
self._heap.heappush(node)
self._data_map[key] = node
def get_data(self, key):
"""
数据的访问
"""
# 检出数据的时间戳
node = self._data_map.get(key)
if not node or not node.data:
return None
time_stamp = node.data.get('_time_stamps', 0)
if time.time() - time_stamp < self._dity_time:
node.use_times += 1
self._heap.heapfix(node)
return node.data
else:
return None
def destory(self):
"""
缓存的清空
"""
self._data_map.clear()
self._heap.destory()
最后
以上就是细腻白猫为你收集整理的数据缓存类设计数据库数据缓存类设计的全部内容,希望文章能够帮你解决数据缓存类设计数据库数据缓存类设计所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复