123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704 |
- // Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
- // All rights reserved.
- //
- // Use of this source code is governed by a BSD-style license that can be
- // found in the LICENSE file.
- // Package cache provides interface and implementation of a cache algorithms.
- package cache
- import (
- "sync"
- "sync/atomic"
- "unsafe"
- "github.com/syndtr/goleveldb/leveldb/util"
- )
- // Cacher provides interface to implements a caching functionality.
- // An implementation must be safe for concurrent use.
- type Cacher interface {
- // Capacity returns cache capacity.
- Capacity() int
- // SetCapacity sets cache capacity.
- SetCapacity(capacity int)
- // Promote promotes the 'cache node'.
- Promote(n *Node)
- // Ban evicts the 'cache node' and prevent subsequent 'promote'.
- Ban(n *Node)
- // Evict evicts the 'cache node'.
- Evict(n *Node)
- // EvictNS evicts 'cache node' with the given namespace.
- EvictNS(ns uint64)
- // EvictAll evicts all 'cache node'.
- EvictAll()
- // Close closes the 'cache tree'
- Close() error
- }
- // Value is a 'cacheable object'. It may implements util.Releaser, if
- // so the the Release method will be called once object is released.
- type Value interface{}
- // NamespaceGetter provides convenient wrapper for namespace.
- type NamespaceGetter struct {
- Cache *Cache
- NS uint64
- }
- // Get simply calls Cache.Get() method.
- func (g *NamespaceGetter) Get(key uint64, setFunc func() (size int, value Value)) *Handle {
- return g.Cache.Get(g.NS, key, setFunc)
- }
- // The hash tables implementation is based on:
- // "Dynamic-Sized Nonblocking Hash Tables", by Yujie Liu,
- // Kunlong Zhang, and Michael Spear.
- // ACM Symposium on Principles of Distributed Computing, Jul 2014.
- const (
- mInitialSize = 1 << 4
- mOverflowThreshold = 1 << 5
- mOverflowGrowThreshold = 1 << 7
- )
- type mBucket struct {
- mu sync.Mutex
- node []*Node
- frozen bool
- }
- func (b *mBucket) freeze() []*Node {
- b.mu.Lock()
- defer b.mu.Unlock()
- if !b.frozen {
- b.frozen = true
- }
- return b.node
- }
- func (b *mBucket) get(r *Cache, h *mNode, hash uint32, ns, key uint64, noset bool) (done, added bool, n *Node) {
- b.mu.Lock()
- if b.frozen {
- b.mu.Unlock()
- return
- }
- // Scan the node.
- for _, n := range b.node {
- if n.hash == hash && n.ns == ns && n.key == key {
- atomic.AddInt32(&n.ref, 1)
- b.mu.Unlock()
- return true, false, n
- }
- }
- // Get only.
- if noset {
- b.mu.Unlock()
- return true, false, nil
- }
- // Create node.
- n = &Node{
- r: r,
- hash: hash,
- ns: ns,
- key: key,
- ref: 1,
- }
- // Add node to bucket.
- b.node = append(b.node, n)
- bLen := len(b.node)
- b.mu.Unlock()
- // Update counter.
- grow := atomic.AddInt32(&r.nodes, 1) >= h.growThreshold
- if bLen > mOverflowThreshold {
- grow = grow || atomic.AddInt32(&h.overflow, 1) >= mOverflowGrowThreshold
- }
- // Grow.
- if grow && atomic.CompareAndSwapInt32(&h.resizeInProgess, 0, 1) {
- nhLen := len(h.buckets) << 1
- nh := &mNode{
- buckets: make([]unsafe.Pointer, nhLen),
- mask: uint32(nhLen) - 1,
- pred: unsafe.Pointer(h),
- growThreshold: int32(nhLen * mOverflowThreshold),
- shrinkThreshold: int32(nhLen >> 1),
- }
- ok := atomic.CompareAndSwapPointer(&r.mHead, unsafe.Pointer(h), unsafe.Pointer(nh))
- if !ok {
- panic("BUG: failed swapping head")
- }
- go nh.initBuckets()
- }
- return true, true, n
- }
- func (b *mBucket) delete(r *Cache, h *mNode, hash uint32, ns, key uint64) (done, deleted bool) {
- b.mu.Lock()
- if b.frozen {
- b.mu.Unlock()
- return
- }
- // Scan the node.
- var (
- n *Node
- bLen int
- )
- for i := range b.node {
- n = b.node[i]
- if n.ns == ns && n.key == key {
- if atomic.LoadInt32(&n.ref) == 0 {
- deleted = true
- // Call releaser.
- if n.value != nil {
- if r, ok := n.value.(util.Releaser); ok {
- r.Release()
- }
- n.value = nil
- }
- // Remove node from bucket.
- b.node = append(b.node[:i], b.node[i+1:]...)
- bLen = len(b.node)
- }
- break
- }
- }
- b.mu.Unlock()
- if deleted {
- // Call OnDel.
- for _, f := range n.onDel {
- f()
- }
- // Update counter.
- atomic.AddInt32(&r.size, int32(n.size)*-1)
- shrink := atomic.AddInt32(&r.nodes, -1) < h.shrinkThreshold
- if bLen >= mOverflowThreshold {
- atomic.AddInt32(&h.overflow, -1)
- }
- // Shrink.
- if shrink && len(h.buckets) > mInitialSize && atomic.CompareAndSwapInt32(&h.resizeInProgess, 0, 1) {
- nhLen := len(h.buckets) >> 1
- nh := &mNode{
- buckets: make([]unsafe.Pointer, nhLen),
- mask: uint32(nhLen) - 1,
- pred: unsafe.Pointer(h),
- growThreshold: int32(nhLen * mOverflowThreshold),
- shrinkThreshold: int32(nhLen >> 1),
- }
- ok := atomic.CompareAndSwapPointer(&r.mHead, unsafe.Pointer(h), unsafe.Pointer(nh))
- if !ok {
- panic("BUG: failed swapping head")
- }
- go nh.initBuckets()
- }
- }
- return true, deleted
- }
- type mNode struct {
- buckets []unsafe.Pointer // []*mBucket
- mask uint32
- pred unsafe.Pointer // *mNode
- resizeInProgess int32
- overflow int32
- growThreshold int32
- shrinkThreshold int32
- }
- func (n *mNode) initBucket(i uint32) *mBucket {
- if b := (*mBucket)(atomic.LoadPointer(&n.buckets[i])); b != nil {
- return b
- }
- p := (*mNode)(atomic.LoadPointer(&n.pred))
- if p != nil {
- var node []*Node
- if n.mask > p.mask {
- // Grow.
- pb := (*mBucket)(atomic.LoadPointer(&p.buckets[i&p.mask]))
- if pb == nil {
- pb = p.initBucket(i & p.mask)
- }
- m := pb.freeze()
- // Split nodes.
- for _, x := range m {
- if x.hash&n.mask == i {
- node = append(node, x)
- }
- }
- } else {
- // Shrink.
- pb0 := (*mBucket)(atomic.LoadPointer(&p.buckets[i]))
- if pb0 == nil {
- pb0 = p.initBucket(i)
- }
- pb1 := (*mBucket)(atomic.LoadPointer(&p.buckets[i+uint32(len(n.buckets))]))
- if pb1 == nil {
- pb1 = p.initBucket(i + uint32(len(n.buckets)))
- }
- m0 := pb0.freeze()
- m1 := pb1.freeze()
- // Merge nodes.
- node = make([]*Node, 0, len(m0)+len(m1))
- node = append(node, m0...)
- node = append(node, m1...)
- }
- b := &mBucket{node: node}
- if atomic.CompareAndSwapPointer(&n.buckets[i], nil, unsafe.Pointer(b)) {
- if len(node) > mOverflowThreshold {
- atomic.AddInt32(&n.overflow, int32(len(node)-mOverflowThreshold))
- }
- return b
- }
- }
- return (*mBucket)(atomic.LoadPointer(&n.buckets[i]))
- }
- func (n *mNode) initBuckets() {
- for i := range n.buckets {
- n.initBucket(uint32(i))
- }
- atomic.StorePointer(&n.pred, nil)
- }
- // Cache is a 'cache map'.
- type Cache struct {
- mu sync.RWMutex
- mHead unsafe.Pointer // *mNode
- nodes int32
- size int32
- cacher Cacher
- closed bool
- }
- // NewCache creates a new 'cache map'. The cacher is optional and
- // may be nil.
- func NewCache(cacher Cacher) *Cache {
- h := &mNode{
- buckets: make([]unsafe.Pointer, mInitialSize),
- mask: mInitialSize - 1,
- growThreshold: int32(mInitialSize * mOverflowThreshold),
- shrinkThreshold: 0,
- }
- for i := range h.buckets {
- h.buckets[i] = unsafe.Pointer(&mBucket{})
- }
- r := &Cache{
- mHead: unsafe.Pointer(h),
- cacher: cacher,
- }
- return r
- }
- func (r *Cache) getBucket(hash uint32) (*mNode, *mBucket) {
- h := (*mNode)(atomic.LoadPointer(&r.mHead))
- i := hash & h.mask
- b := (*mBucket)(atomic.LoadPointer(&h.buckets[i]))
- if b == nil {
- b = h.initBucket(i)
- }
- return h, b
- }
- func (r *Cache) delete(n *Node) bool {
- for {
- h, b := r.getBucket(n.hash)
- done, deleted := b.delete(r, h, n.hash, n.ns, n.key)
- if done {
- return deleted
- }
- }
- }
- // Nodes returns number of 'cache node' in the map.
- func (r *Cache) Nodes() int {
- return int(atomic.LoadInt32(&r.nodes))
- }
- // Size returns sums of 'cache node' size in the map.
- func (r *Cache) Size() int {
- return int(atomic.LoadInt32(&r.size))
- }
- // Capacity returns cache capacity.
- func (r *Cache) Capacity() int {
- if r.cacher == nil {
- return 0
- }
- return r.cacher.Capacity()
- }
- // SetCapacity sets cache capacity.
- func (r *Cache) SetCapacity(capacity int) {
- if r.cacher != nil {
- r.cacher.SetCapacity(capacity)
- }
- }
- // Get gets 'cache node' with the given namespace and key.
- // If cache node is not found and setFunc is not nil, Get will atomically creates
- // the 'cache node' by calling setFunc. Otherwise Get will returns nil.
- //
- // The returned 'cache handle' should be released after use by calling Release
- // method.
- func (r *Cache) Get(ns, key uint64, setFunc func() (size int, value Value)) *Handle {
- r.mu.RLock()
- defer r.mu.RUnlock()
- if r.closed {
- return nil
- }
- hash := murmur32(ns, key, 0xf00)
- for {
- h, b := r.getBucket(hash)
- done, _, n := b.get(r, h, hash, ns, key, setFunc == nil)
- if done {
- if n != nil {
- n.mu.Lock()
- if n.value == nil {
- if setFunc == nil {
- n.mu.Unlock()
- n.unref()
- return nil
- }
- n.size, n.value = setFunc()
- if n.value == nil {
- n.size = 0
- n.mu.Unlock()
- n.unref()
- return nil
- }
- atomic.AddInt32(&r.size, int32(n.size))
- }
- n.mu.Unlock()
- if r.cacher != nil {
- r.cacher.Promote(n)
- }
- return &Handle{unsafe.Pointer(n)}
- }
- break
- }
- }
- return nil
- }
- // Delete removes and ban 'cache node' with the given namespace and key.
- // A banned 'cache node' will never inserted into the 'cache tree'. Ban
- // only attributed to the particular 'cache node', so when a 'cache node'
- // is recreated it will not be banned.
- //
- // If onDel is not nil, then it will be executed if such 'cache node'
- // doesn't exist or once the 'cache node' is released.
- //
- // Delete return true is such 'cache node' exist.
- func (r *Cache) Delete(ns, key uint64, onDel func()) bool {
- r.mu.RLock()
- defer r.mu.RUnlock()
- if r.closed {
- return false
- }
- hash := murmur32(ns, key, 0xf00)
- for {
- h, b := r.getBucket(hash)
- done, _, n := b.get(r, h, hash, ns, key, true)
- if done {
- if n != nil {
- if onDel != nil {
- n.mu.Lock()
- n.onDel = append(n.onDel, onDel)
- n.mu.Unlock()
- }
- if r.cacher != nil {
- r.cacher.Ban(n)
- }
- n.unref()
- return true
- }
- break
- }
- }
- if onDel != nil {
- onDel()
- }
- return false
- }
- // Evict evicts 'cache node' with the given namespace and key. This will
- // simply call Cacher.Evict.
- //
- // Evict return true is such 'cache node' exist.
- func (r *Cache) Evict(ns, key uint64) bool {
- r.mu.RLock()
- defer r.mu.RUnlock()
- if r.closed {
- return false
- }
- hash := murmur32(ns, key, 0xf00)
- for {
- h, b := r.getBucket(hash)
- done, _, n := b.get(r, h, hash, ns, key, true)
- if done {
- if n != nil {
- if r.cacher != nil {
- r.cacher.Evict(n)
- }
- n.unref()
- return true
- }
- break
- }
- }
- return false
- }
- // EvictNS evicts 'cache node' with the given namespace. This will
- // simply call Cacher.EvictNS.
- func (r *Cache) EvictNS(ns uint64) {
- r.mu.RLock()
- defer r.mu.RUnlock()
- if r.closed {
- return
- }
- if r.cacher != nil {
- r.cacher.EvictNS(ns)
- }
- }
- // EvictAll evicts all 'cache node'. This will simply call Cacher.EvictAll.
- func (r *Cache) EvictAll() {
- r.mu.RLock()
- defer r.mu.RUnlock()
- if r.closed {
- return
- }
- if r.cacher != nil {
- r.cacher.EvictAll()
- }
- }
- // Close closes the 'cache map' and forcefully releases all 'cache node'.
- func (r *Cache) Close() error {
- r.mu.Lock()
- if !r.closed {
- r.closed = true
- h := (*mNode)(r.mHead)
- h.initBuckets()
- for i := range h.buckets {
- b := (*mBucket)(h.buckets[i])
- for _, n := range b.node {
- // Call releaser.
- if n.value != nil {
- if r, ok := n.value.(util.Releaser); ok {
- r.Release()
- }
- n.value = nil
- }
- // Call OnDel.
- for _, f := range n.onDel {
- f()
- }
- n.onDel = nil
- }
- }
- }
- r.mu.Unlock()
- // Avoid deadlock.
- if r.cacher != nil {
- if err := r.cacher.Close(); err != nil {
- return err
- }
- }
- return nil
- }
- // CloseWeak closes the 'cache map' and evict all 'cache node' from cacher, but
- // unlike Close it doesn't forcefully releases 'cache node'.
- func (r *Cache) CloseWeak() error {
- r.mu.Lock()
- if !r.closed {
- r.closed = true
- }
- r.mu.Unlock()
- // Avoid deadlock.
- if r.cacher != nil {
- r.cacher.EvictAll()
- if err := r.cacher.Close(); err != nil {
- return err
- }
- }
- return nil
- }
- // Node is a 'cache node'.
- type Node struct {
- r *Cache
- hash uint32
- ns, key uint64
- mu sync.Mutex
- size int
- value Value
- ref int32
- onDel []func()
- CacheData unsafe.Pointer
- }
- // NS returns this 'cache node' namespace.
- func (n *Node) NS() uint64 {
- return n.ns
- }
- // Key returns this 'cache node' key.
- func (n *Node) Key() uint64 {
- return n.key
- }
- // Size returns this 'cache node' size.
- func (n *Node) Size() int {
- return n.size
- }
- // Value returns this 'cache node' value.
- func (n *Node) Value() Value {
- return n.value
- }
- // Ref returns this 'cache node' ref counter.
- func (n *Node) Ref() int32 {
- return atomic.LoadInt32(&n.ref)
- }
- // GetHandle returns an handle for this 'cache node'.
- func (n *Node) GetHandle() *Handle {
- if atomic.AddInt32(&n.ref, 1) <= 1 {
- panic("BUG: Node.GetHandle on zero ref")
- }
- return &Handle{unsafe.Pointer(n)}
- }
- func (n *Node) unref() {
- if atomic.AddInt32(&n.ref, -1) == 0 {
- n.r.delete(n)
- }
- }
- func (n *Node) unrefLocked() {
- if atomic.AddInt32(&n.ref, -1) == 0 {
- n.r.mu.RLock()
- if !n.r.closed {
- n.r.delete(n)
- }
- n.r.mu.RUnlock()
- }
- }
- // Handle is a 'cache handle' of a 'cache node'.
- type Handle struct {
- n unsafe.Pointer // *Node
- }
- // Value returns the value of the 'cache node'.
- func (h *Handle) Value() Value {
- n := (*Node)(atomic.LoadPointer(&h.n))
- if n != nil {
- return n.value
- }
- return nil
- }
- // Release releases this 'cache handle'.
- // It is safe to call release multiple times.
- func (h *Handle) Release() {
- nPtr := atomic.LoadPointer(&h.n)
- if nPtr != nil && atomic.CompareAndSwapPointer(&h.n, nPtr, nil) {
- n := (*Node)(nPtr)
- n.unrefLocked()
- }
- }
- func murmur32(ns, key uint64, seed uint32) uint32 {
- const (
- m = uint32(0x5bd1e995)
- r = 24
- )
- k1 := uint32(ns >> 32)
- k2 := uint32(ns)
- k3 := uint32(key >> 32)
- k4 := uint32(key)
- k1 *= m
- k1 ^= k1 >> r
- k1 *= m
- k2 *= m
- k2 ^= k2 >> r
- k2 *= m
- k3 *= m
- k3 ^= k3 >> r
- k3 *= m
- k4 *= m
- k4 ^= k4 >> r
- k4 *= m
- h := seed
- h *= m
- h ^= k1
- h *= m
- h ^= k2
- h *= m
- h ^= k3
- h *= m
- h ^= k4
- h ^= h >> 13
- h *= m
- h ^= h >> 15
- return h
- }
|