我是靠谱客的博主 忧虑太阳,这篇文章主要介绍Eureka Server的多级缓存和过期机制,现在分享给大家,希望可以做个参考。

Eureka Server的多级缓存和过期机制

多级缓存

之前写eureka-client客户端全量请求的时候,会走到缓存这边,下面就具体的看一下,server的多级缓存是怎么回事?

eureka client初始化的时候,就会自动发送个请求到eureka server拉一次清抓取全量的注册表,这一讲,我们来看看eureka server端如何处理抓取全量注册表的请求的,eureka client发送的请求是:http://localhost:8080/v2/apps/,get请求 ApplicationsResource的getContainers()方法,获取全量注册表的方法

com.netflix.eureka.registry.ResponseCache,响应缓存接口,接口代码如下:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
public interface ResponseCache { String get(Key key); byte[] getGZIP(Key key); void invalidate(String appName, @Nullable String vipAddress, @Nullable String secureVipAddress); AtomicLong getVersionDelta(); AtomicLong getVersionDeltaWithRegions(); }

com.netflix.eureka.registry.Key,缓存键。实现代码如下:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
public class Key { public enum KeyType { JSON, XML } /** * An enum to define the entity that is stored in this cache for this key. */ public enum EntityType { Application, VIP, SVIP } /** * 实体名 */ private final String entityName; /** * TODO[0009]:RemoteRegionRegistry */ private final String[] regions; /** * 请求参数类型 */ private final KeyType requestType; /** * 请求 API 版本号 */ private final Version requestVersion; /** * hashKey */ private final String hashKey; /** * 实体类型 * * {@link EntityType} */ private final EntityType entityType; /** * {@link EurekaAccept} */ private final EurekaAccept eurekaAccept; public Key(EntityType entityType, String entityName, KeyType type, Version v, EurekaAccept eurekaAccept, @Nullable String[] regions) { this.regions = regions; this.entityType = entityType; this.entityName = entityName; this.requestType = type; this.requestVersion = v; this.eurekaAccept = eurekaAccept; hashKey = this.entityType + this.entityName + (null != this.regions ? Arrays.toString(this.regions) : "") + requestType.name() + requestVersion.name() + this.eurekaAccept.name(); } public Key(EntityType entityType, String entityName, KeyType type, Version v, EurekaAccept eurekaAccept, @Nullable String[] regions) { this.regions = regions; this.entityType = entityType; this.entityName = entityName; this.requestType = type; this.requestVersion = v; this.eurekaAccept = eurekaAccept; hashKey = this.entityType + this.entityName + (null != this.regions ? Arrays.toString(this.regions) : "") + requestType.name() + requestVersion.name() + this.eurekaAccept.name(); } @Override public int hashCode() { String hashKey = getHashKey(); return hashKey.hashCode(); } @Override public boolean equals(Object other) { if (other instanceof Key) { return getHashKey().equals(((Key) other).getHashKey()); } else { return false; } } }

具体的缓存是在它的实现类ResponseCacheImpl中,在ResponseCacheImpl初始化的时候,就会定义一个readWriteCacheMap,指定它的一些策略。

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
ResponseCacheImpl(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, AbstractInstanceRegistry registry) { this.serverConfig = serverConfig; this.serverCodecs = serverCodecs; this.shouldUseReadOnlyResponseCache = serverConfig.shouldUseReadOnlyResponseCache(); this.registry = registry; long responseCacheUpdateIntervalMs = serverConfig.getResponseCacheUpdateIntervalMs(); this.readWriteCacheMap = CacheBuilder.newBuilder().initialCapacity(serverConfig.getInitialCapacityOfResponseCache()) .expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS) .removalListener(new RemovalListener<Key, Value>() { @Override public void onRemoval(RemovalNotification<Key, Value> notification) { Key removedKey = notification.getKey(); if (removedKey.hasRegions()) { Key cloneWithNoRegions = removedKey.cloneWithoutRegions(); regionSpecificKeys.remove(cloneWithNoRegions, removedKey); } } }) //这里的话,就是如果在缓存中找不到的话,就会generatePayload从这里获取,抓取全量注册表,后面会单独写个demo测试一下。 .build(new CacheLoader<Key, Value>() { @Override public Value load(Key key) throws Exception { if (key.hasRegions()) { Key cloneWithNoRegions = key.cloneWithoutRegions(); regionSpecificKeys.put(cloneWithNoRegions, key); } Value value = generatePayload(key); return value; } }); if (shouldUseReadOnlyResponseCache) { timer.schedule(getCacheUpdateTask(), new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs) + responseCacheUpdateIntervalMs), responseCacheUpdateIntervalMs); } try { Monitors.registerObject(this); } catch (Throwable e) { logger.warn("Cannot register the JMX monitor for the InstanceRegistry", e); } }

下面的代码就是先走

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public String get(final Key key) { return get(key, shouldUseReadOnlyResponseCache); } @VisibleForTesting String get(final Key key, boolean useReadOnlyCache) { Value payload = getValue(key, useReadOnlyCache); if (payload == null || payload.getPayload().equals(EMPTY_PAYLOAD)) { return null; } else { return payload.getPayload(); } } @VisibleForTesting Value getValue(final Key key, boolean useReadOnlyCache) { Value payload = null; try { if (useReadOnlyCache) { final Value currentPayload = readOnlyCacheMap.get(key); if (currentPayload != null) { payload = currentPayload; } else { payload = readWriteCacheMap.get(key); readOnlyCacheMap.put(key, payload); } } else { payload = readWriteCacheMap.get(key); } } catch (Throwable t) { logger.error("Cannot get value for key : {}", key, t); } return payload; }

#get(key, useReadOnlyCache) 方法,读取缓存。其中 shouldUseReadOnlyResponseCache 通过配置 eureka.shouldUseReadOnlyResponseCache = true (默认值 :true ) 开启只读缓存。如果你对数据的一致性有相对高的要求,可以关闭这个开关,当然因为少了 readOnlyCacheMap ,性能会有一定的下降。

过期策略

主动过期

image-20211009145734568

image-20211009145806582

进行搜索一下,发现注册,下线、故障的时候,都会调用这个方法,那就去看一下这个方法到底做了什么

image-20211009145920592

应用实例注册、下线、过期时,调用 ResponseCacheImpl#invalidate() 方法,主动过期读写缓存( readWriteCacheMap ),实现代码如下:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
@Override public void invalidate(String appName, @Nullable String vipAddress, @Nullable String secureVipAddress) { for (Key.KeyType type : Key.KeyType.values()) { for (Version v : Version.values()) { invalidate( new Key(Key.EntityType.Application, appName, type, v, EurekaAccept.full), new Key(Key.EntityType.Application, appName, type, v, EurekaAccept.compact), new Key(Key.EntityType.Application, ALL_APPS, type, v, EurekaAccept.full), new Key(Key.EntityType.Application, ALL_APPS, type, v, EurekaAccept.compact), new Key(Key.EntityType.Application, ALL_APPS_DELTA, type, v, EurekaAccept.full), new Key(Key.EntityType.Application, ALL_APPS_DELTA, type, v, EurekaAccept.compact) ); if (null != vipAddress) { invalidate(new Key(Key.EntityType.VIP, vipAddress, type, v, EurekaAccept.full)); } if (null != secureVipAddress) { invalidate(new Key(Key.EntityType.SVIP, secureVipAddress, type, v, EurekaAccept.full)); } } } } public void invalidate(Key... keys) { for (Key key : keys) { logger.debug("Invalidating the response cache key : {} {} {} {}, {}", key.getEntityType(), key.getName(), key.getVersion(), key.getType(), key.getEurekaAccept()); readWriteCacheMap.invalidate(key); Collection<Key> keysWithRegions = regionSpecificKeys.get(key); if (null != keysWithRegions && !keysWithRegions.isEmpty()) { for (Key keysWithRegion : keysWithRegions) { logger.debug("Invalidating the response cache key : {} {} {} {} {}", key.getEntityType(), key.getName(), key.getVersion(), key.getType(), key.getEurekaAccept()); readWriteCacheMap.invalidate(keysWithRegion); } } } }

将readWriteCacheMap中的ALL_APPS缓存key,对应的缓存给过期掉

被动过期

  • 配置 eureka.responseCacheAutoExpirationInSeconds ,设置写入过期时长。默认值 :180 秒。

image-20211009150624516

定时刷新

初始化定时任务。配置 eureka.responseCacheUpdateIntervalMs,设置任务执行频率,默认值 :30 * 1000 毫秒。对比 readWriteCacheMapreadOnlyCacheMap 的缓存值,若不一致,以前者为主。通过这样的方式,实现了 readOnlyCacheMap 的定时过期。

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
if (shouldUseReadOnlyResponseCache) { timer.schedule(getCacheUpdateTask(), new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs) + responseCacheUpdateIntervalMs), responseCacheUpdateIntervalMs); } private TimerTask getCacheUpdateTask() { return new TimerTask() { @Override public void run() { logger.debug("Updating the client cache from response cache"); for (Key key : readOnlyCacheMap.keySet()) { if (logger.isDebugEnabled()) { logger.debug("Updating the client cache from response cache for key : {} {} {} {}", key.getEntityType(), key.getName(), key.getVersion(), key.getType()); } try { CurrentRequestVersion.set(key.getVersion()); Value cacheValue = readWriteCacheMap.get(key); Value currentCacheValue = readOnlyCacheMap.get(key); if (cacheValue != currentCacheValue) { //对比,如果不一致的话,将会读取读写缓存中的记录,将只读 readOnlyCacheMap.put(key, cacheValue); } } catch (Throwable th) { logger.error("Error while updating the client cache from response cache for key {}", key.toStringCompact(), th); } finally { CurrentRequestVersion.remove(); } } } }; }

测试

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@SneakyThrows public static void main(String[] args) { LoadingCache<String,String> cahceBuilder= CacheBuilder .newBuilder() .expireAfterWrite(2L, TimeUnit.SECONDS) .build(new CacheLoader<String, String>(){ @Override public String load(String key) throws Exception { System.out.println("缓存中没有。。。"); String strProValue="hello "+key+"!"; return strProValue; } }); System.out.println("jerry value:"+cahceBuilder.get("jerry")); System.out.println("jerry value:"+cahceBuilder.get("jerry")); Thread.sleep(2000); System.out.println("jerry value:"+cahceBuilder.get("jerry")); }

image-20211011170527631

最后

以上就是忧虑太阳最近收集整理的关于Eureka Server的多级缓存和过期机制的全部内容,更多相关Eureka内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(94)

评论列表共有 0 条评论

立即
投稿
返回
顶部