欧美在线专区-欧美在线伊人-欧美在线一区二区三区欧美-欧美在线一区二区三区-pornodoxxx中国妞-pornodoldoo欧美另类

position>home>Softball

是什么讓Spring5放棄了使用Guava Cache?

[導讀]Caffeine是什讓使用一個高性能,高命中率,放棄低內存占用,什讓使用near optimal 的放棄本地緩存,簡單來說它是什讓使用Guava Cache的優化加強版,有些文章把Caffeine稱為“新一代的放棄緩存”、“現代緩存之王”。什讓使用本文將重點講解Caffeine的放棄高性能設計,以及對應部分的什讓使用源碼分析。

來源:https://albenw.github.io/posts/a4ae1aa2/

是什么讓Spring5放棄了使用Guava Cache?

概要

Caffeine是放棄一個高性能,高命中率,什讓使用低內存占用,放棄near optimal 的什讓使用本地緩存,簡單來說它是放棄Guava Cache的優化加強版,有些文章把Caffeine稱為“新一代的什讓使用緩存”、“現代緩存之王”。本文將重點講解Caffeine的高性能設計,以及對應部分的源碼分析。

與Guava Cache比較

大家都知道,Spring5即將放棄掉Guava Cache作為緩存機制,而改用Caffeine作為新的本地Cache的組件,這對于Caffeine來說是一個很大的肯定。為什么Spring會這樣做呢?其實在Caffeine的Benchmarks里給出了好靚仔的數據,對讀和寫的場景,還有跟其他幾個緩存工具進行了比較,Caffeine的性能都表現很突出。

使用Caffeine

Caffeine為了方便大家使用以及從Guava Cache切換過來(很有針對性啊~),借鑒了Guava Cache大部分的概念(諸如核心概念Cache、LoadingCache、CacheLoader、CacheBuilder等等),對于Caffeine的理解只要把它當作Guava Cache就可以了。

使用上,大家只要把Caffeine的包引進來,然后換一下cache的實現類,基本應該就沒問題了。這對與已經使用過Guava Cache的同學來說沒有任何難度,甚至還有一點熟悉的味道,如果你之前沒有使用過Guava Cache,可以查看Caffeine的官方API說明文檔,其中Population,Eviction,Removal,Refresh,Statistics,Cleanup,Policy等等這些特性都是跟Guava Cache基本一樣的。

下面給出一個例子說明怎樣創建一個Cache:

private?static?LoadingCache?cache?=?Caffeine.newBuilder()
????????????//最大個數限制
????????????.maximumSize(256L)
????????????//初始化容量
????????????.initialCapacity(1)
????????????//訪問后過期(包括讀和寫)
????????????.expireAfterAccess(2,?TimeUnit.DAYS)
????????????//寫后過期
????????????.expireAfterWrite(2,?TimeUnit.HOURS)
????????????//寫后自動異步刷新
????????????.refreshAfterWrite(1,?TimeUnit.HOURS)
????????????//記錄下緩存的一些統計數據,例如命中率等
????????????.recordStats()
????????????//cache對緩存寫的通知回調
????????????.writer(new?CacheWriter()?{
????????????????@Override
????????????????public?void?write(@NonNull?Object?key,?@NonNull?Object?value)?{
????????????????????log.info("key={ },?CacheWriter?write",?key);
????????????????}

????????????????@Override
????????????????public?void?delete(@NonNull?Object?key,?@Nullable?Object?value,?@NonNull?RemovalCause?cause)?{
????????????????????log.info("key={ },?cause={ },?CacheWriter?delete",?key,?cause);
????????????????}
????????????})
????????????//使用CacheLoader創建一個LoadingCache
????????????.build(new?CacheLoader()?{
????????????????//同步加載數據
????????????????@Nullable
????????????????@Override
????????????????public?String?load(@NonNull?String?key)?throws?Exception?{
????????????????????return?"value_"?+?key;
????????????????}

????????????????//異步加載數據
????????????????@Nullable
????????????????@Override
????????????????public?String?reload(@NonNull?String?key,?@NonNull?String?oldValue)?throws?Exception?{
????????????????????return?"value_"?+?key;
????????????????}
????????????});

Caffeine的高性能設計

判斷一個緩存的好壞最核心的指標就是命中率,影響緩存命中率有很多因素,包括業務場景、淘汰策略、清理策略、緩存容量等等。如果作為本地緩存, 它的性能的情況,資源的占用也都是一個很重要的指標。下面

我們來看看Caffeine在這幾個方面是怎么著手的,如何做優化的。

(注:本文不會分析Caffeine全部源碼,只會對核心設計的實現進行分析,但我建議讀者把Caffeine的源碼都涉獵一下,有個overview才能更好理解本文。如果你看過Guava Cache的源碼也行,代碼的數據結構和處理邏輯很類似的。

源碼基于:(caffeine-2.8.0.jar

W-TinyLFU整體設計

上面說到淘汰策略是影響緩存命中率的因素之一,一般比較簡單的緩存就會直接用到LFU(Least Frequently Used,即最不經常使用)或者LRU(Least Recently Used,即最近最少使用),而Caffeine就是使用了W-TinyLFU算法。

W-TinyLFU看名字就能大概猜出來,它是LFU的變種,也是一種緩存淘汰算法。那為什么要使用W-TinyLFU呢?

LRU和LFU的缺點
  • LRU實現簡單,在一般情況下能夠表現出很好的命中率,是一個“性價比”很高的算法,平時也很常用。雖然LRU對突發性的稀疏流量(sparse bursts)表現很好,但同時也會產生緩存污染,舉例來說,如果偶然性的要對全量數據進行遍歷,那么“歷史訪問記錄”就會被刷走,造成污染。

  • 如果數據的分布在一段時間內是固定的話,那么LFU可以達到最高的命中率。但是LFU有兩個缺點,第一,它需要給每個記錄項維護頻率信息,每次訪問都需要更新,這是個巨大的開銷;第二,對突發性的稀疏流量無力,因為前期經常訪問的記錄已經占用了緩存,偶然的流量不太可能會被保留下來,而且過去的一些大量被訪問的記錄在將來也不一定會使用上,這樣就一直把“坑”占著了。

無論LRU還是LFU都有其各自的缺點,不過,現在已經有很多針對其缺點而改良、優化出來的變種算法。

TinyLFU

TinyLFU就是其中一個優化算法,它是專門為了解決LFU上述提到的兩個問題而被設計出來的。

解決第一個問題是采用了Count–Min Sketch算法。

解決第二個問題是讓記錄盡量保持相對的“新鮮”(Freshness Mechanism),并且當有新的記錄插入時,可以讓它跟老的記錄進行“PK”,輸者就會被淘汰,這樣一些老的、不再需要的記錄就會被剔除。

下圖是TinyLFU設計圖(來自官方)

統計頻率Count–Min Sketch算法

如何對一個key進行統計,但又可以節省空間呢?(不是簡單的使用HashMap,這太消耗內存了),注意哦,不需要精確的統計,只需要一個近似值就可以了,怎么樣,這樣場景是不是很熟悉,如果你是老司機,或許已經聯想到布隆過濾器(Bloom Filter)的應用了。

沒錯,將要介紹的Count–Min Sketch的原理跟Bloom Filter一樣,只不過Bloom Filter只有0和1的值,那么你可以把Count–Min Sketch看作是“數值”版的Bloom Filter。

更多關于Count–Min Sketch的介紹請自行搜索。

在TinyLFU中,近似頻率的統計如下圖所示:

對一個key進行多次hash函數后,index到多個數組位置后進行累加,查詢時取多個值中的最小值即可。

Caffeine對這個算法的實現在FrequencySketch類。但Caffeine對此有進一步的優化,例如Count–Min Sketch使用了二維數組,Caffeine只是用了一個一維的數組;再者,如果是數值類型的話,這個數需要用int或long來存儲,但是Caffeine認為緩存的訪問頻率不需要用到那么大,只需要15就足夠,一般認為達到15次的頻率算是很高的了,而且Caffeine還有另外一個機制來使得這個頻率進行衰退減半(下面就會講到)。如果最大是15的話,那么只需要4個bit就可以滿足了,一個long有64bit,可以存儲16個這樣的統計數,Caffeine就是這樣的設計,使得存儲效率提高了16倍。

Caffeine對緩存的讀寫(afterReadafterWrite方法)都會調用onAccesss方法,而onAccess方法里有一句:

frequencySketch().increment(key);

這句就是追加記錄的頻率,下面我們看看具體實現

//FrequencySketch的一些屬性

//種子數
static?final?long[]?SEED?=?{ ?//?A?mixture?of?seeds?from?FNV-1a,?CityHash,?and?Murmur3
????0xc3a5c85c97cb3127L,?0xb492b66fbe98f273L,?0x9ae16a3b2f90404fL,?0xcbf29ce484222325L};
static?final?long?RESET_MASK?=?0x7777777777777777L;
static?final?long?ONE_MASK?=?0x1111111111111111L;

int?sampleSize;
//為了快速根據hash值得到table的index值的掩碼
//table的長度size一般為2的n次方,而tableMask為size-1,這樣就可以通過&操作來模擬取余操作,速度快很多,老司機都知道
int?tableMask;
//存儲數據的一維long數組
long[]?table;
int?size;

/**
?*?Increments?the?popularity?of?the?element?if?it?does?not?exceed?the?maximum?(15).?The?popularity
?*?of?all?elements?will?be?periodically?down?sampled?when?the?observed?events?exceeds?a?threshold.
?*?This?process?provides?a?frequency?aging?to?allow?expired?long?term?entries?to?fade?away.
?*
?*?@param?e?the?element?to?add
?*/
public?void?increment(@NonNull?E?e)?{
??if?(isNotInitialized())?{
????return;
??}

??//根據key的hashCode通過一個哈希函數得到一個hash值
??//本來就是hashCode了,為什么還要再做一次hash?怕原來的hashCode不夠均勻分散,再打散一下。
??int?hash?=?spread(e.hashCode());
??//這句光看有點難理解
??//就如我剛才說的,Caffeine把一個long的64bit劃分成16個等分,每一等分4個bit。
??//這個start就是用來定位到是哪一個等分的,用hash值低兩位作為隨機數,再左移2位,得到一個小于16的值
??int?start?=?(hash?&?3)?<
??//indexOf方法的意思就是,根據hash值和不同種子得到table的下標index
??//這里通過四個不同的種子,得到四個不同的下標index
??int?index0?=?indexOf(hash,?0);
??int?index1?=?indexOf(hash,?1);
??int?index2?=?indexOf(hash,?2);
??int?index3?=?indexOf(hash,?3);

??//根據index和start(+1,?+2,?+3)的值,把table[index]對應的等分追加1
??//這個incrementAt方法有點難理解,看我下面的解釋
??boolean?added?=?incrementAt(index0,?start);
??added?|=?incrementAt(index1,?start?+?1);
??added?|=?incrementAt(index2,?start?+?2);
??added?|=?incrementAt(index3,?start?+?3);

??//這個reset等下說
??if?(added?&&?(++size?==?sampleSize))?{
????reset();
??}
}

/**
?*?Increments?the?specified?counter?by?1?if?it?is?not?already?at?the?maximum?value?(15).
?*
?*?@param?i?the?table?index?(16?counters)
?*?@param?j?the?counter?to?increment
?*?@return?if?incremented
?*/
boolean?incrementAt(int?i,?int?j)?{
??//這個j表示16個等分的下標,那么offset就是相當于在64位中的下標(這個自己想想)
??int?offset?=?j?<??//上面提到Caffeine把頻率統計最大定為15,即0xfL
??//mask就是在64位中的掩碼,即1111后面跟很多個0
??long?mask?=?(0xfL?<??//如果&的結果不等于15,那么就追加1。等于15就不會再加了
??if?((table[i]?&?mask)?!=?mask)?{
????table[i]?+=?(1L?<????return?true;
??}
??return?false;
}

/**
?*?Returns?the?table?index?for?the?counter?at?the?specified?depth.
?*
?*?@param?item?the?element's?hash
?*?@param?i?the?counter?depth
?*?@return?the?table?index
?*/
int?indexOf(int?item,?int?i)?{
??long?hash?=?SEED[i]?*?item;
??hash?+=?hash?>>>?32;
??return?((int)?hash)?&?tableMask;
}

/**
?*?Applies?a?supplemental?hash?function?to?a?given?hashCode,?which?defends?against?poor?quality
?*?hash?functions.
?*/
int?spread(int?x)?{
??x?=?((x?>>>?16)?^?x)?*?0x45d9f3b;
??x?=?((x?>>>?16)?^?x)?*?0x45d9f3b;
??return?(x?>>>?16)?^?x;
}

知道了追加方法,那么讀取方法frequency就很容易理解了。

/**
?*?Returns?the?estimated?number?of?occurrences?of?an?element,?up?to?the?maximum?(15).
?*
?*?@param?e?the?element?to?count?occurrences?of
?*?@return?the?estimated?number?of?occurrences?of?the?element;?possibly?zero?but?never?negative
?*/
@NonNegative
public?int?frequency(@NonNull?E?e)?{
??if?(isNotInitialized())?{
????return?0;
??}

??//得到hash值,跟上面一樣
??int?hash?=?spread(e.hashCode());
??//得到等分的下標,跟上面一樣
??int?start?=?(hash?&?3)?<??int?frequency?=?Integer.MAX_VALUE;
??//循環四次,分別獲取在table數組中不同的下標位置
??for?(int?i?=?0;?i?????int?index?=?indexOf(hash,?i);
????//這個操作就不多說了,其實跟上面incrementAt是一樣的,定位到table[index]?+?等分的位置,再根據mask取出計數值
????int?count?=?(int)?((table[index]?>>>?((start?+?i)?<????//取四個中的較小值
????frequency?=?Math.min(frequency,?count);
??}
??return?frequency;
}

通過代碼和注釋或者讀者可能難以理解,下圖是我畫出來幫助大家理解的結構圖。

注意紫色虛線框,其中藍色小格就是需要計算的位置:

保新機制

為了讓緩存保持“新鮮”,剔除掉過往頻率很高但之后不經常的緩存,Caffeine有一個Freshness Mechanism。做法很簡答,就是當整體的統計計數(當前所有記錄的頻率統計之和,這個數值內部維護)達到某一個值時,那么所有記錄的頻率統計除以2。

從上面的代碼

//size變量就是所有記錄的頻率統計之,即每個記錄加1,這個size都會加1
//sampleSize一個閾值,從FrequencySketch初始化可以看到它的值為maximumSize的10倍
if?(added?&&?(++size?==?sampleSize))?{
??????reset();
}

看到reset方法就是做這個事情

/**?Reduces?every?counter?by?half?of?its?original?value.?*/
void?reset()?{
??int?count?=?0;
??for?(int?i?=?0;?i?????count?+=?Long.bitCount(table[i]?&?ONE_MASK);
????table[i]?=?(table[i]?>>>?1)?&?RESET_MASK;
??}
??size?=?(size?>>>?1)?-?(count?>>>?2);
}

關于這個reset方法,為什么是除以2,而不是其他,及其正確性,在最下面的參考資料的TinyLFU論文中3.3章節給出了數學證明,大家有興趣可以看看。

增加一個Window?

Caffeine通過測試發現TinyLFU在面對突發性的稀疏流量(sparse bursts)時表現很差,因為新的記錄(new items)還沒來得及建立足夠的頻率就被剔除出去了,這就使得命中率下降。

于是Caffeine設計出一種新的policy,即Window Tiny LFU(W-TinyLFU),并通過實驗和實踐發現W-TinyLFU比TinyLFU表現的更好。

W-TinyLFU的設計如下所示(兩圖等價):

它主要包括兩個緩存模塊,主緩存是SLRU(Segmented LRU,即分段LRU),SLRU包括一個名為protected和一個名為probation的緩存區。通過增加一個緩存區(即Window Cache),當有新的記錄插入時,會先在window區呆一下,就可以避免上述說的sparse bursts問題。

淘汰策略(eviction policy)

當window區滿了,就會根據LRU把candidate(即淘汰出來的元素)放到probation區,如果probation區也滿了,就把candidate和probation將要淘汰的元素victim,兩個進行“PK”,勝者留在probation,輸者就要被淘汰了。

而且經過實驗發現當window區配置為總容量的1%,剩余的99%當中的80%分給protected區,20%分給probation區時,這時整體性能和命中率表現得最好,所以Caffeine默認的比例設置就是這個。

不過這個比例Caffeine會在運行時根據統計數據(statistics)去動態調整,如果你的應用程序的緩存隨著時間變化比較快的話,那么增加window區的比例可以提高命中率,相反緩存都是比較固定不變的話,增加Main Cache區(protected區 +probation區)的比例會有較好的效果。

下面我們看看上面說到的淘汰策略是怎么實現的:

一般緩存對讀寫操作后都有后續的一系列“維護”操作,Caffeine也不例外,這些操作都在maintenance方法,我們將要說到的淘汰策略也在里面。

這方法比較重要,下面也會提到,所以這里只先說跟“淘汰策略”有關的evictEntriesclimb

/**
???*?Performs?the?pending?maintenance?work?and?sets?the?state?flags?during?processing?to?avoid
???*?excess?scheduling?attempts.?The?read?buffer,?write?buffer,?and?reference?queues?are
???*?drained,?followed?by?expiration,?and?size-based?eviction.
???*
???*?@param?task?an?additional?pending?task?to?run,?or?{ @code?null}?if?not?present
???*/
??@GuardedBy("evictionLock")
??void?maintenance(@Nullable?Runnable?task)?{
????lazySetDrainStatus(PROCESSING_TO_IDLE);

????try?{
??????drainReadBuffer();

??????drainWriteBuffer();
??????if?(task?!=?null)?{
????????task.run();
??????}

??????drainKeyReferences();
??????drainValueReferences();

??????expireEntries();
??????//把符合條件的記錄淘汰掉
??????evictEntries();
??????//動態調整window區和protected區的大小
??????climb();
????}?finally?{
??????if?((drainStatus()?!=?PROCESSING_TO_IDLE)?||?!casDrainStatus(PROCESSING_TO_IDLE,?IDLE))?{
????????lazySetDrainStatus(REQUIRED);
??????}
????}
??}
?```

?先說一下Caffeine對上面說到的W-TinyLFU策略的實現用到的數據結構:

?```
?//最大的個數限制
long?maximum;
//當前的個數
long?weightedSize;
//window區的最大限制
long?windowMaximum;
//window區當前的個數
long?windowWeightedSize;
//protected區的最大限制
long?mainProtectedMaximum;
//protected區當前的個數
long?mainProtectedWeightedSize;
//下一次需要調整的大小(還需要進一步計算)
double?stepSize;
//window區需要調整的大小
long?adjustment;
//命中計數
int?hitsInSample;
//不命中的計數
int?missesInSample;
//上一次的緩存命中率
double?previousSampleHitRate;

final?FrequencySketch?sketch;
//window區的LRU?queue(FIFO)
final?AccessOrderDeque>?accessOrderWindowDeque;
//probation區的LRU?queue(FIFO)
final?AccessOrderDeque>?accessOrderProbationDeque;
//protected區的LRU?queue(FIFO)
final?AccessOrderDeque>?accessOrderProtectedDeque;

以及默認比例設置(意思看注釋)

/**?The?initial?percent?of?the?maximum?weighted?capacity?dedicated?to?the?main?space.?*/
static?final?double?PERCENT_MAIN?=?0.99d;
/**?The?percent?of?the?maximum?weighted?capacity?dedicated?to?the?main's?protected?space.?*/
static?final?double?PERCENT_MAIN_PROTECTED?=?0.80d;
/**?The?difference?in?hit?rates?that?restarts?the?climber.?*/
static?final?double?HILL_CLIMBER_RESTART_THRESHOLD?=?0.05d;
/**?The?percent?of?the?total?size?to?adapt?the?window?by.?*/
static?final?double?HILL_CLIMBER_STEP_PERCENT?=?0.0625d;
/**?The?rate?to?decrease?the?step?size?to?adapt?by.?*/
static?final?double?HILL_CLIMBER_STEP_DECAY_RATE?=?0.98d;
/**?The?maximum?number?of?entries?that?can?be?transfered?between?queues.?*/

重點來了,evictEntriesclimb方法:

/**?Evicts?entries?if?the?cache?exceeds?the?maximum.?*/
@GuardedBy("evictionLock")
void?evictEntries()?{
??if?(!evicts())?{
????return;
??}
??//淘汰window區的記錄
??int?candidates?=?evictFromWindow();
??//淘汰Main區的記錄
??evictFromMain(candidates);
}

/**
?*?Evicts?entries?from?the?window?space?into?the?main?space?while?the?window?size?exceeds?a
?*?maximum.
?*
?*?@return?the?number?of?candidate?entries?evicted?from?the?window?space
?*/
//根據W-TinyLFU,新的數據都會無條件的加到admission?window
//但是window是有大小限制,所以要“定期”做一下“維護”
@GuardedBy("evictionLock")
int?evictFromWindow()?{
??int?candidates?=?0;
??//查看window?queue的頭部節點
??Node?node?=?accessOrderWindowDeque().peek();
??//如果window區超過了最大的限制,那么就要把“多出來”的記錄做處理
??while?(windowWeightedSize()?>?windowMaximum())?{
????//?The?pending?operations?will?adjust?the?size?to?reflect?the?correct?weight
????if?(node?==?null)?{
??????break;
????}
????//下一個節點
????Node?next?=?node.getNextInAccessOrder();
????if?(node.getWeight()?!=?0)?{
??????//把node定位在probation區
??????node.makeMainProbation();
??????//從window區去掉
??????accessOrderWindowDeque().remove(node);
??????//加入到probation?queue,相當于把節點移動到probation區(晉升了)
??????accessOrderProbationDeque().add(node);
??????candidates++;
??????//因為移除了一個節點,所以需要調整window的size
??????setWindowWeightedSize(windowWeightedSize()?-?node.getPolicyWeight());
????}
????//處理下一個節點
????node?=?next;
??}

??return?candidates;
}

evictFromMain方法:

/**
?*?Evicts?entries?from?the?main?space?if?the?cache?exceeds?the?maximum?capacity.?The?main?space
?*?determines?whether?admitting?an?entry?(coming?from?the?window?space)?is?preferable?to?retaining
?*?the?eviction?policy's?victim.?This?is?decision?is?made?using?a?frequency?filter?so?that?the
?*?least?frequently?used?entry?is?removed.
?*
?*?The?window?space?candidates?were?previously?placed?in?the?MRU?position?and?the?eviction
?*?policy's?victim?is?at?the?LRU?position.?The?two?ends?of?the?queue?are?evaluated?while?an
?*?eviction?is?required.?The?number?of?remaining?candidates?is?provided?and?decremented?on
?*?eviction,?so?that?when?there?are?no?more?candidates?the?victim?is?evicted.
?*
?*?@param?candidates?the?number?of?candidate?entries?evicted?from?the?window?space
?*/
//根據W-TinyLFU,從window晉升過來的要跟probation區的進行“PK”,勝者才能留下
@GuardedBy("evictionLock")
void?evictFromMain(int?candidates)?{
??int?victimQueue?=?PROBATION;
??//victim是probation?queue的頭部
??Node?victim?=?accessOrderProbationDeque().peekFirst();
??//candidate是probation?queue的尾部,也就是剛從window晉升來的
??Node?candidate?=?accessOrderProbationDeque().peekLast();
??//當cache不夠容量時才做處理
??while?(weightedSize()?>?maximum())?{
????//?Stop?trying?to?evict?candidates?and?always?prefer?the?victim
????if?(candidates?==?0)?{
??????candidate?=?null;
????}

????//對candidate為null且victim為bull的處理
????if?((candidate?==?null)?&&?(victim?==?null))?{
??????if?(victimQueue?==?PROBATION)?{
????????victim?=?accessOrderProtectedDeque().peekFirst();
????????victimQueue?=?PROTECTED;
????????continue;
??????}?else?if?(victimQueue?==?PROTECTED)?{
????????victim?=?accessOrderWindowDeque().peekFirst();
????????victimQueue?=?WINDOW;
????????continue;
??????}

??????//?The?pending?operations?will?adjust?the?size?to?reflect?the?correct?weight
??????break;
????}

????//對節點的weight為0的處理
????if?((victim?!=?null)?&&?(victim.getPolicyWeight()?==?0))?{
??????victim?=?victim.getNextInAccessOrder();
??????continue;
????}?else?if?((candidate?!=?null)?&&?(candidate.getPolicyWeight()?==?0))?{
??????candidate?=?candidate.getPreviousInAccessOrder();
??????candidates--;
??????continue;
????}

????//?Evict?immediately?if?only?one?of?the?entries?is?present
????if?(victim?==?null)?{
??????@SuppressWarnings("NullAway")
??????Node?previous?=?candidate.getPreviousInAccessOrder();
??????Node?evict?=?candidate;
??????candidate?=?previous;
??????candidates--;
??????evictEntry(evict,?RemovalCause.SIZE,?0L);
??????continue;
????}?else?if?(candidate?==?null)?{
??????Node?evict?=?victim;
??????victim?=?victim.getNextInAccessOrder();
??????evictEntry(evict,?RemovalCause.SIZE,?0L);
??????continue;
????}

????//?Evict?immediately?if?an?entry?was?collected
????K?victimKey?=?victim.getKey();
????K?candidateKey?=?candidate.getKey();
????if?(victimKey?==?null)?{
??????@NonNull?Node?evict?=?victim;
??????victim?=?victim.getNextInAccessOrder();
??????evictEntry(evict,?RemovalCause.COLLECTED,?0L);
??????continue;
????}?else?if?(candidateKey?==?null)?{
??????candidates--;
??????@NonNull?Node?evict?=?candidate;
??????candidate?=?candidate.getPreviousInAccessOrder();
??????evictEntry(evict,?RemovalCause.COLLECTED,?0L);
??????continue;
????}

????//放不下的節點直接處理掉
????if?(candidate.getPolicyWeight()?>?maximum())?{
??????candidates--;
??????Node?evict?=?candidate;
??????candidate?=?candidate.getPreviousInAccessOrder();
??????evictEntry(evict,?RemovalCause.SIZE,?0L);
??????continue;
????}

????//根據節點的統計頻率frequency來做比較,看看要處理掉victim還是candidate
????//admit是具體的比較規則,看下面
????candidates--;
????//如果candidate勝出則淘汰victim
????if?(admit(candidateKey,?victimKey))?{
??????Node?evict?=?victim;
??????victim?=?victim.getNextInAccessOrder();
??????evictEntry(evict,?RemovalCause.SIZE,?0L);
??????candidate?=?candidate.getPreviousInAccessOrder();
????}?else?{
??????//如果是victim勝出,則淘汰candidate
??????Node?evict?=?candidate;
??????candidate?=?candidate.getPreviousInAccessOrder();
??????evictEntry(evict,?RemovalCause.SIZE,?0L);
????}
??}
}

/**
?*?Determines?if?the?candidate?should?be?accepted?into?the?main?space,?as?determined?by?its
?*?frequency?relative?to?the?victim.?A?small?amount?of?randomness?is?used?to?protect?against?hash
?*?collision?attacks,?where?the?victim's?frequency?is?artificially?raised?so?that?no?new?entries
?*?are?admitted.
?*
?*?@param?candidateKey?the?key?for?the?entry?being?proposed?for?long?term?retention
?*?@param?victimKey?the?key?for?the?entry?chosen?by?the?eviction?policy?for?replacement
?*?@return?if?the?candidate?should?be?admitted?and?the?victim?ejected
?*/
@GuardedBy("evictionLock")
boolean?admit(K?candidateKey,?K?victimKey)?{
??//分別獲取victim和candidate的統計頻率
??//frequency這個方法的原理和實現上面已經解釋了
??int?victimFreq?=?frequencySketch().frequency(victimKey);
??int?candidateFreq?=?frequencySketch().frequency(candidateKey);
??//誰大誰贏
??if?(candidateFreq?>?victimFreq)?{
????return?true;

????//如果相等,candidate小于5都當輸了
??}?else?if?(candidateFreq?<=?5)?{
????//?The?maximum?frequency?is?15?and?halved?to?7?after?a?reset?to?age?the?history.?An?attack
????//?exploits?that?a?hot?candidate?is?rejected?in?favor?of?a?hot?victim.?The?threshold?of?a?warm
????//?candidate?reduces?the?number?of?random?acceptances?to?minimize?the?impact?on?the?hit?rate.
????return?false;
??}
??//如果相等且candidate大于5,則隨機淘汰一個
??int?random?=?ThreadLocalRandom.current().nextInt();
??return?((random?&?127)?==?0);
}

climb方法主要是用來調整window size的,使得Caffeine可以適應你的應用類型(如OLAP或OLTP)表現出最佳的命中率。

下圖是官方測試的數據:

我們看看window size的調整是怎么實現的。

調整時用到的默認比例數據:

//與上次命中率之差的閾值
static?final?double?HILL_CLIMBER_RESTART_THRESHOLD?=?0.05d;
//步長(調整)的大小(跟最大值maximum的比例)
static?final?double?HILL_CLIMBER_STEP_PERCENT?=?0.0625d;
//步長的衰減比例
static?final?double?HILL_CLIMBER_STEP_DECAY_RATE?=?0.98d;
?/**?Adapts?the?eviction?policy?to?towards?the?optimal?recency?/?frequency?configuration.?*/
//climb方法的主要作用就是動態調整window區的大小(相應的,main區的大小也會發生變化,兩個之和為100%)。
//因為區域的大小發生了變化,那么區域內的數據也可能需要發生相應的移動。
@GuardedBy("evictionLock")
void?climb()?{
??if?(!evicts())?{
????return;
??}
??//確定window需要調整的大小
??determineAdjustment();
??//如果protected區有溢出,把溢出部分移動到probation區。因為下面的操作有可能需要調整到protected區。
??demoteFromMainProtected();
??long?amount?=?adjustment();
??if?(amount?==?0)?{
????return;
??}?else?if?(amount?>?0)?{
????//增加window的大小
????increaseWindow();
??}?else?{
????//減少window的大小
????decreaseWindow();
??}
}

下面分別展開每個方法來解釋:

/**?Calculates?the?amount?to?adapt?the?window?by?and?sets?{ @link?#adjustment()}?accordingly.?*/
@GuardedBy("evictionLock")
void?determineAdjustment()?{
??//如果frequencySketch還沒初始化,則返回
??if?(frequencySketch().isNotInitialized())?{
????setPreviousSampleHitRate(0.0);
????setMissesInSample(0);
????setHitsInSample(0);
????return;
??}
??//總請求量?=?命中?+?miss
??int?requestCount?=?hitsInSample()?+?missesInSample();
??//沒達到sampleSize則返回
??//默認下sampleSize = 10?* maximum。用sampleSize來判斷緩存是否足夠”熱“。
??if?(requestCount?????return;
??}

??//命中率的公式?=?命中?/?總請求
??double?hitRate?=?(double)?hitsInSample()?/?requestCount;
??//命中率的差值
??double?hitRateChange?=?hitRate?-?previousSampleHitRate();
??//本次調整的大小,是由命中率的差值和上次的stepSize決定的
??double?amount?=?(hitRateChange?>=?0)???stepSize()?:?-stepSize();
??//下次的調整大小:如果命中率的之差大于0.05,則重置為0.065 * maximum,否則按照0.98來進行衰減
??double?nextStepSize?=?(Math.abs(hitRateChange)?>=?HILL_CLIMBER_RESTART_THRESHOLD)
????????HILL_CLIMBER_STEP_PERCENT?*?maximum()?*?(amount?>=?0???1?:?-1)
??????:?HILL_CLIMBER_STEP_DECAY_RATE?*?amount;
??setPreviousSampleHitRate(hitRate);
??setAdjustment((long)?amount);
??setStepSize(nextStepSize);
??setMissesInSample(0);
??setHitsInSample(0);
}

/**?Transfers?the?nodes?from?the?protected?to?the?probation?region?if?it?exceeds?the?maximum.?*/

//這個方法比較簡單,減少protected區溢出的部分
@GuardedBy("evictionLock")
void?demoteFromMainProtected()?{
??long?mainProtectedMaximum?=?mainProtectedMaximum();
??long?mainProtectedWeightedSize?=?mainProtectedWeightedSize();
??if?(mainProtectedWeightedSize?<=?mainProtectedMaximum)?{
????return;
??}

??for?(int?i?=?0;?i?????if?(mainProtectedWeightedSize?<=?mainProtectedMaximum)?{
??????break;
????}

????Node?demoted?=?accessOrderProtectedDeque().poll();
????if?(demoted?==?null)?{
??????break;
????}
????demoted.makeMainProbation();
????accessOrderProbationDeque().add(demoted);
????mainProtectedWeightedSize?-=?demoted.getPolicyWeight();
??}
??setMainProtectedWeightedSize(mainProtectedWeightedSize);
}

/**
?*?Increases?the?size?of?the?admission?window?by?shrinking?the?portion?allocated?to?the?main
?*?space.?As?the?main?space?is?partitioned?into?probation?and?protected?regions?(80%?/?20%),?for
?*?simplicity?only?the?protected?is?reduced.?If?the?regions?exceed?their?maximums,?this?may?cause
?*?protected?items?to?be?demoted?to?the?probation?region?and?probation?items?to?be?demoted?to?the
?*?admission?window.
?*/

//增加window區的大小,這個方法比較簡單,思路就像我上面說的
@GuardedBy("evictionLock")
void?increaseWindow()?{
??if?(mainProtectedMaximum()?==?0)?{
????return;
??}

??long?quota?=?Math.min(adjustment(),?mainProtectedMaximum());
??setMainProtectedMaximum(mainProtectedMaximum()?-?quota);
??setWindowMaximum(windowMaximum()?+?quota);
??demoteFromMainProtected();

??for?(int?i?=?0;?i?????Node?candidate?=?accessOrderProbationDeque().peek();
????boolean?probation?=?true;
????if?((candidate?==?null)?||?(quota???????candidate?=?accessOrderProtectedDeque().peek();
??????probation?=?false;
????}
????if?(candidate?==?null)?{
??????break;
????}

????int?weight?=?candidate.getPolicyWeight();
????if?(quota???????break;
????}

????quota?-=?weight;
????if?(probation)?{
??????accessOrderProbationDeque().remove(candidate);
????}?else?{
??????setMainProtectedWeightedSize(mainProtectedWeightedSize()?-?weight);
??????accessOrderProtectedDeque().remove(candidate);
????}
????setWindowWeightedSize(windowWeightedSize()?+?weight);
????accessOrderWindowDeque().add(candidate);
????candidate.makeWindow();
??}

??setMainProtectedMaximum(mainProtectedMaximum()?+?quota);
??setWindowMaximum(windowMaximum()?-?quota);
??setAdjustment(quota);
}

/**?Decreases?the?size?of?the?admission?window?and?increases?the?main's?protected?region.?*/
//同上increaseWindow差不多,反操作
@GuardedBy("evictionLock")
void?decreaseWindow()?{
??if?(windowMaximum()?<=?1)?{
????return;
??}

??long?quota?=?Math.min(-adjustment(),?Math.max(0,?windowMaximum()?-?1));
??setMainProtectedMaximum(mainProtectedMaximum()?+?quota);
??setWindowMaximum(windowMaximum()?-?quota);

??for?(int?i?=?0;?i?????Node?candidate?=?accessOrderWindowDeque().peek();
????if?(candidate?==?null)?{
??????break;
????}

????int?weight?=?candidate.getPolicyWeight();
????if?(quota???????break;
????}

????quota?-=?weight;
????setMainProtectedWeightedSize(mainProtectedWeightedSize()?+?weight);
????setWindowWeightedSize(windowWeightedSize()?-?weight);
????accessOrderWindowDeque().remove(candidate);
????accessOrderProbationDeque().add(candidate);
????candidate.makeMainProbation();
??}

??setMainProtectedMaximum(mainProtectedMaximum()?-?quota);
??setWindowMaximum(windowMaximum()?+?quota);
??setAdjustment(-quota);
}

以上,是Caffeine的W-TinyLFU策略的設計原理及代碼實現解析。

異步的高性能讀寫

一般的緩存每次對數據處理完之后(讀的話,已經存在則直接返回,不存在則load數據,保存,再返回;寫的話,則直接插入或更新),但是因為要維護一些淘汰策略,則需要一些額外的操作,諸如:

  • 計算和比較數據的是否過期

  • 統計頻率(像LFU或其變種)

  • 維護read queue和write queue

  • 淘汰符合條件的數據

  • 等等。。。

這種數據的讀寫伴隨著緩存狀態的變更,Guava Cache的做法是把這些操作和讀寫操作放在一起,在一個同步加鎖的操作中完成,雖然Guava Cache巧妙地利用了JDK的ConcurrentHashMap(分段鎖或者無鎖CAS)來降低鎖的密度,達到提高并發度的目的。但是,對于一些熱點數據,這種做法還是避免不了頻繁的鎖競爭。Caffeine借鑒了數據庫系統的WAL(Write-Ahead Logging)思想,即先寫日志再執行操作,這種思想同樣適合緩存的,執行讀寫操作時,先把操作記錄在緩沖區,然后在合適的時機異步、批量地執行緩沖區中的內容。但在執行緩沖區的內容時,也是需要在緩沖區加上同步鎖的,不然存在并發問題,只不過這樣就可以把對鎖的競爭從緩存數據轉移到對緩沖區上。

ReadBuffer

在Caffeine的內部實現中,為了很好的支持不同的Features(如Eviction,Removal,Refresh,Statistics,Cleanup,Policy等等),擴展了很多子類,它們共同的父類是BoundedLocalCache,而readBuffer就是作為它們共有的屬性,即都是用一樣的readBuffer,看定義:

final?Buffer>?readBuffer;

readBuffer?=?evicts()?||?collectKeys()?||?collectValues()?||?expiresAfterAccess()
??????????new?BoundedBuffer<>()
????????:?Buffer.disabled();

上面提到Caffeine對每次緩存的讀操作都會觸發afterRead

/**
?*?Performs?the?post-processing?work?required?after?a?read.
?*
?*?@param?node?the?entry?in?the?page?replacement?policy
?*?@param?now?the?current?time,?in?nanoseconds
?*?@param?recordHit?if?the?hit?count?should?be?incremented
?*/
void?afterRead(Node?node,?long?now,?boolean?recordHit)?{
??if?(recordHit)?{
????statsCounter().recordHits(1);
??}
??//把記錄加入到readBuffer
??//判斷是否需要立即處理readBuffer
??//注意這里無論offer是否成功都可以走下去的,即允許寫入readBuffer丟失,因為這個
??boolean?delayable?=?skipReadBuffer()?||?(readBuffer.offer(node)?!=?Buffer.FULL);
??if?(shouldDrainBuffers(delayable))?{
????scheduleDrainBuffers();
??}
??refreshIfNeeded(node,?now);
}

?/**
???*?Returns?whether?maintenance?work?is?needed.
???*
???*?@param?delayable?if?draining?the?read?buffer?can?be?delayed
???*/

??//caffeine用了一組狀態來定義和管理“維護”的過程
??boolean?shouldDrainBuffers(boolean?delayable)?{
????switch?(drainStatus())?{
??????case?IDLE:
????????return?!delayable;
??????case?REQUIRED:
????????return?true;
??????case?PROCESSING_TO_IDLE:
??????case?PROCESSING_TO_REQUIRED:
????????return?false;
??????default:
????????throw?new?IllegalStateException();
????}
??}

重點看BoundedBuffer

/**
?*?A?striped,?non-blocking,?bounded?buffer.
?*
?*?@author?ben.manes@gmail.com?(Ben?Manes)
?*?@param??the?type?of?elements?maintained?by?this?buffer
?*/
final?class?BoundedBuffer?extends?StripedBuffer

它是一個striped、非阻塞、有界限的buffer,繼承于StripedBuffer類。下面看看StripedBuffer的實現:

/**
?*?A?base?class?providing?the?mechanics?for?supporting?dynamic?striping?of?bounded?buffers.?This
?*?implementation?is?an?adaption?of?the?numeric?64-bit?{ @link?java.util.concurrent.atomic.Striped64}
?*?class,?which?is?used?by?atomic?counters.?The?approach?was?modified?to?lazily?grow?an?array?of
?*?buffers?in?order?to?minimize?memory?usage?for?caches?that?are?not?heavily?contended?on.
?*
?*?@author?dl@cs.oswego.edu?(Doug?Lea)
?*?@author?ben.manes@gmail.com?(Ben?Manes)
?*/

abstract?class?StripedBuffer?implements?Buffer

這個StripedBuffer設計的思想是跟Striped64類似的,通過擴展結構把競爭熱點分離。

具體實現是這樣的,StripedBuffer維護一個Buffer[]數組,每個元素就是一個RingBuffer,每個線程用自己threadLocalRandomProbe屬性作為hash值,這樣就相當于每個線程都有自己“專屬”的RingBuffer,就不會產生競爭啦,而不是用key的hashCode作為hash值,因為會產生熱點數據問題。

看看StripedBuffer的屬性

/**?Table?of?buffers.?When?non-null,?size?is?a?power?of?2.?*/
//RingBuffer數組
transient?volatile?Buffer?@Nullable[]?table;

//當進行resize時,需要整個table鎖住。tableBusy作為CAS的標記。
static?final?long?TABLE_BUSY?=?UnsafeAccess.objectFieldOffset(StripedBuffer.class,?"tableBusy");
static?final?long?PROBE?=?UnsafeAccess.objectFieldOffset(Thread.class,?"threadLocalRandomProbe");

/**?Number?of?CPUS.?*/
static?final?int?NCPU?=?Runtime.getRuntime().availableProcessors();

/**?The?bound?on?the?table?size.?*/
//table最大size
static?final?int?MAXIMUM_TABLE_SIZE?=?4?*?ceilingNextPowerOfTwo(NCPU);

/**?The?maximum?number?of?attempts?when?trying?to?expand?the?table.?*/
//如果發生競爭時(CAS失敗)的嘗試次數
static?final?int?ATTEMPTS?=?3;

/**?Table?of?buffers.?When?non-null,?size?is?a?power?of?2.?*/
//核心數據結構
transient?volatile?Buffer?@Nullable[]?table;

/**?Spinlock?(locked?via?CAS)?used?when?resizing?and/or?creating?Buffers.?*/
transient?volatile?int?tableBusy;

/**?CASes?the?tableBusy?field?from?0?to?1?to?acquire?lock.?*/
final?boolean?casTableBusy()?{
??return?UnsafeAccess.UNSAFE.compareAndSwapInt(this,?TABLE_BUSY,?0,?1);
}

/**
?*?Returns?the?probe?value?for?the?current?thread.?Duplicated?from?ThreadLocalRandom?because?of
?*?packaging?restrictions.
?*/
static?final?int?getProbe()?{
??return?UnsafeAccess.UNSAFE.getInt(Thread.currentThread(),?PROBE);
}

offer方法,當沒初始化或存在競爭時,則擴容為2倍。

實際是調用RingBuffer的offer方法,把數據追加到RingBuffer后面。

@Override
public?int?offer(E?e)?{
??int?mask;
??int?result?=?0;
??Buffer?buffer;
??//是否不存在競爭
??boolean?uncontended?=?true;
??Buffer[]?buffers?=?table
??//是否已經初始化
??if?((buffers?==?null)
??????||?(mask?=?buffers.length?-?1)???????//用thread的隨機值作為hash值,得到對應位置的RingBuffer
??????||?(buffer?=?buffers[getProbe()?&?mask])?==?null
??????//檢查追加到RingBuffer是否成功
??????||?!(uncontended?=?((result?=?buffer.offer(e))?!=?Buffer.FAILED)))?{
????//其中一個符合條件則進行擴容
????expandOrRetry(e,?uncontended);
??}
??return?result;
}

/**
?*?Handles?cases?of?updates?involving?initialization,?resizing,?creating?new?Buffers,?and/or
?*?contention.?See?above?for?explanation.?This?method?suffers?the?usual?non-modularity?problems?of
?*?optimistic?retry?code,?relying?on?rechecked?sets?of?reads.
?*
?*?@param?e?the?element?to?add
?*?@param?wasUncontended?false?if?CAS?failed?before?call
?*/

//這個方法比較長,但思路還是相對清晰的。
@SuppressWarnings("PMD.ConfusingTernary")
final?void?expandOrRetry(E?e,?boolean?wasUncontended)?{
??int?h;
??if?((h?=?getProbe())?==?0)?{
????ThreadLocalRandom.current();?//?force?initialization
????h?=?getProbe();
????wasUncontended?=?true;
??}
??boolean?collide?=?false;?//?True?if?last?slot?nonempty
??for?(int?attempt?=?0;?attempt?????Buffer[]?buffers;
????Buffer?buffer;
????int?n;
????if?(((buffers?=?table)?!=?null)?&&?((n?=?buffers.length)?>?0))?{
??????if?((buffer?=?buffers[(n?-?1)?&?h])?==?null)?{
????????if?((tableBusy?==?0)?&&?casTableBusy())?{ ?//?Try?to?attach?new?Buffer
??????????boolean?created?=?false;
??????????try?{ ?//?Recheck?under?lock
????????????Buffer[]?rs;
????????????int?mask,?j;
????????????if?(((rs?=?table)?!=?null)?&&?((mask?=?rs.length)?>?0)
????????????????&&?(rs[j?=?(mask?-?1)?&?h]?==?null))?{
??????????????rs[j]?=?create(e);
??????????????created?=?true;
????????????}
??????????}?finally?{
????????????tableBusy?=?0;
??????????}
??????????if?(created)?{
????????????break;
??????????}
??????????continue;?//?Slot?is?now?non-empty
????????}
????????collide?=?false;
??????}?else?if?(!wasUncontended)?{ ?//?CAS?already?known?to?fail
????????wasUncontended?=?true;??????//?Continue?after?rehash
??????}?else?if?(buffer.offer(e)?!=?Buffer.FAILED)?{
????????break;
??????}?else?if?(n?>=?MAXIMUM_TABLE_SIZE?||?table?!=?buffers)?{
????????collide?=?false;?//?At?max?size?or?stale
??????}?else?if?(!collide)?{
????????collide?=?true;
??????}?else?if?(tableBusy?==?0?&&?casTableBusy())?{
????????try?{
??????????if?(table?==?buffers)?{ ?//?Expand?table?unless?stale
????????????table?=?Arrays.copyOf(buffers,?n?<??????????}
????????}?finally?{
??????????tableBusy?=?0;
????????}
????????collide?=?false;
????????continue;?//?Retry?with?expanded?table
??????}
??????h?=?advanceProbe(h);
????}?else?if?((tableBusy?==?0)?&&?(table?==?buffers)?&&?casTableBusy())?{
??????boolean?init?=?false;
??????try?{ ?//?Initialize?table
????????if?(table?==?buffers)?{
??????????@SuppressWarnings({ "unchecked",?"rawtypes"})
??????????Buffer[]?rs?=?new?Buffer[1];
??????????rs[0]?=?create(e);
??????????table?=?rs;
??????????init?=?true;
????????}
??????}?finally?{
????????tableBusy?=?0;
??????}
??????if?(init)?{
????????break;
??????}
????}
??}
}

最后看看RingBuffer,注意RingBuffer是BoundedBuffer的內部類。

/**?The?maximum?number?of?elements?per?buffer.?*/
static?final?int?BUFFER_SIZE?=?16;

//?Assume?4-byte?references?and?64-byte?cache?line?(16?elements?per?line)
//256長度,但是是以16為單位,所以最多存放16個元素
static?final?int?SPACED_SIZE?=?BUFFER_SIZE?<static?final?int?SPACED_MASK?=?SPACED_SIZE?-?1;
static?final?int?OFFSET?=?16;????
//RingBuffer數組
final?AtomicReferenceArray?buffer;

?//插入方法
?@Override
?public?int?offer(E?e)?{
???long?head?=?readCounter;
???long?tail?=?relaxedWriteCounter();
???//用head和tail來限制個數
???long?size?=?(tail?-?head);
???if?(size?>=?SPACED_SIZE)?{
?????return?Buffer.FULL;
???}
???//tail追加16
???if?(casWriteCounter(tail,?tail?+?OFFSET))?{
?????//用tail“取余”得到下標
?????int?index?=?(int)?(tail?&?SPACED_MASK);
?????//用unsafe.putOrderedObject設值
?????buffer.lazySet(index,?e);
?????return?Buffer.SUCCESS;
???}
???//如果CAS失敗則返回失敗
???return?Buffer.FAILED;
?}

?//用consumer來處理buffer的數據
?@Override
?public?void?drainTo(Consumer?consumer)?{
???long?head?=?readCounter;
???long?tail?=?relaxedWriteCounter();
???//判斷數據多少
???long?size?=?(tail?-?head);
???if?(size?==?0)?{
?????return;
???}
???do?{
?????int?index?=?(int)?(head?&?SPACED_MASK);
?????E?e?=?buffer.get(index);
?????if?(e?==?null)?{
???????//?not?published?yet
???????break;
?????}
?????buffer.lazySet(index,?null);
?????consumer.accept(e);
?????//head也跟tail一樣,每次遞增16
?????head?+=?OFFSET;
???}?while?(head?!=?tail);
???lazySetReadCounter(head);
?}

注意,ring buffer的size(固定是16個)是不變的,變的是head和tail而已。

總的來說ReadBuffer有如下特點:

  • 使用 Striped-RingBuffer來提升對buffer的讀寫

  • 用thread的hash來避開熱點key的競爭

  • 允許寫入的丟失

WriteBuffer

writeBuffer跟readBuffer不一樣,主要體現在使用場景的不一樣。本來緩存的一般場景是讀多寫少的,讀的并發會更高,且afterRead顯得沒那么重要,允許延遲甚至丟失。寫不一樣,寫afterWrite不允許丟失,且要求盡量馬上執行。Caffeine使用MPSC(Multiple Producer / Single Consumer)作為buffer數組,實現在MpscGrowableArrayQueue類,它是仿照JCTools的MpscGrowableArrayQueue來寫的。

MPSC允許無鎖的高并發寫入,但只允許一個消費者,同時也犧牲了部分操作。

MPSC我打算另外分析,這里不展開了。

TimerWheel

除了支持expireAfterAccess和expireAfterWrite之外(Guava Cache也支持這兩個特性),Caffeine還支持expireAfter。因為expireAfterAccess和expireAfterWrite都只能是固定的過期時間,這可能滿足不了某些場景,譬如記錄的過期時間是需要根據某些條件而不一樣的,這就需要用戶自定義過期時間。

先看看expireAfter的用法

private?static?LoadingCache?cache?=?Caffeine.newBuilder()
????????.maximumSize(256L)
????????.initialCapacity(1)
????????//.expireAfterAccess(2,?TimeUnit.DAYS)
????????//.expireAfterWrite(2,?TimeUnit.HOURS)
????????.refreshAfterWrite(1,?TimeUnit.HOURS)
????????//自定義過期時間
????????.expireAfter(new?Expiry()?{
????????????//返回創建后的過期時間
????????????@Override
????????????public?long?expireAfterCreate(@NonNull?String?key,?@NonNull?String?value,?long?currentTime)?{
????????????????return?0;
????????????}

????????????//返回更新后的過期時間
????????????@Override
????????????public?long?expireAfterUpdate(@NonNull?String?key,?@NonNull?String?value,?long?currentTime,?@NonNegative?long?currentDuration)?{
????????????????return?0;
????????????}

????????????//返回讀取后的過期時間
????????????@Override
????????????public?long?expireAfterRead(@NonNull?String?key,?@NonNull?String?value,?long?currentTime,?@NonNegative?long?currentDuration)?{
????????????????return?0;
????????????}
????????})
????????.recordStats()
????????.build(new?CacheLoader()?{
????????????@Nullable
????????????@Override
????????????public?String?load(@NonNull?String?key)?throws?Exception?{
????????????????return?"value_"?+?key;
????????????}
????????});

通過自定義過期時間,使得不同的key可以動態的得到不同的過期時間。

注意,我把expireAfterAccess和expireAfterWrite注釋了,因為這兩個特性不能跟expireAfter一起使用。

而當使用了expireAfter特性后,Caffeine會啟用一種叫“時間輪”的算法來實現這個功能。

好,重點來了,為什么要用時間輪?

對expireAfterAccess和expireAfterWrite的實現是用一個AccessOrderDeque雙端隊列,它是FIFO的,因為它們的過期時間是固定的,所以在隊列頭的數據肯定是最早過期的,要處理過期數據時,只需要首先看看頭部是否過期,然后再挨個檢查就可以了。但是,如果過期時間不一樣的話,這需要對accessOrderQueue進行排序&插入,這個代價太大了。于是,Caffeine用了一種更加高效、優雅的算法-時間輪。

時間輪的結構:

因為在我的對時間輪分析的文章里已經說了時間輪的原理和機制了,所以我就不展開Caffeine對時間輪的實現了。

Caffeine對時間輪的實現在TimerWheel,它是一種多層時間輪(hierarchical timing wheels )。

看看元素加入到時間輪的schedule方法:

/**
?*?Schedules?a?timer?event?for?the?node.
?*
?*?@param?node?the?entry?in?the?cache
?*/
public?void?schedule(@NonNull?Node?node)?{
??Node?sentinel?=?findBucket(node.getVariableTime());
??link(sentinel,?node);
}

/**
?*?Determines?the?bucket?that?the?timer?event?should?be?added?to.
?*
?*?@param?time?the?time?when?the?event?fires
?*?@return?the?sentinel?at?the?head?of?the?bucket
?*/
Node?findBucket(long?time)?{
??long?duration?=?time?-?nanos;
??int?length?=?wheel.length?-?1;
??for?(int?i?=?0;?i?????if?(duration???????long?ticks?=?(time?>>>?SHIFT[i]);
??????int?index?=?(int)?(ticks?&?(wheel[i].length?-?1));
??????return?wheel[i][index];
????}
??}
??return?wheel[length][0];
}

/**?Adds?the?entry?at?the?tail?of?the?bucket's?list.?*/
void?link(Node?sentinel,?Node?node)?{
??node.setPreviousInVariableOrder(sentinel.getPreviousInVariableOrder());
??node.setNextInVariableOrder(sentinel);

??sentinel.getPreviousInVariableOrder().setNextInVariableOrder(node);
??sentinel.setPreviousInVariableOrder(node);
}

其他

Caffeine還有其他的優化性能的手段,如使用軟引用和弱引用、消除偽共享、CompletableFuture異步等等。

總結

Caffeien是一個優秀的本地緩存,通過使用W-TinyLFU算法, 高性能的readBuffer和WriteBuffer,時間輪算法等,使得它擁有高性能,高命中率(near optimal),低內存占用等特點。

參考資料

  • TinyLFU論文

  • Design Of A Modern Cache

  • Design Of A Modern Cache—Part Deux

  • Caffeine的github

特別推薦一個分享架構+算法的優質內容,還沒關注的小伙伴,可以長按關注一下:

長按訂閱更多精彩▼

如有收獲,點個在看,誠摯感謝

免責聲明:本文內容由21ic獲得授權后發布,版權歸原作者所有,本平臺僅提供信息存儲服務。文章僅代表作者個人觀點,不代表本平臺立場,如有問題,請聯系我們,謝謝!

Popular articles

主站蜘蛛池模板: 无翼乌口工| 欧美性大战久久久久久久蜜桃| 亚洲欧美日韩在线不卡| 色老太bbw| 国产又爽又色在线观看| 三上悠亚国产精品一区| 一本一本久久a久久精品综合| 欧美军人男男同videos可播放| 农夫山泉有点甜高清2在线观看| 美女扒开大腿让我爽| 精品国产一区二区三区久久| 亚洲午夜久久久精品电影院| 美女在线免费观看| 国产欧美精品一区二区色综合| 强挺进小y头的小花苞漫画| 18观看免费永久视频| 日本v电影| 遭绝伦三个老头侵犯波多野结衣| chinese乱子伦xxxx视频播放| 在线免费中文字幕| 美女毛片在线观看| 欧美亚洲国产精品久久高清| 成人福利电影在线观看| 岛国视频在线观看免费播放| 高h全肉动漫在线观看| 老子影院午夜伦手机不卡6080| 一本色道久久88加勒比—综合| 亚洲欧美日韩精品久久| a国产乱理伦片在线观看夜| 国产精品国产三级国产潘金莲| 性芭蕾k8经典| 欧美日韩精品| 国产黄色片91| 福利一区二区三区视频在线观看 | gav男人天堂| 波多野结衣电影免费在线观看| 粉色视频在线播放| 中文国产成人精品久久app| 久久精品国产一区二区三区肥胖| 久久亚洲精品中文字幕| 四虎成人影院网址|