123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239 |
- // Copyright (c) 2013, Suryandaru Triandana <syndtr@gmail.com>
- // All rights reserved.
- //
- // Use of this source code is governed by a BSD-style license that can be
- // found in the LICENSE file.
- package leveldb
- import (
- "errors"
- "sync/atomic"
- "time"
- "github.com/syndtr/goleveldb/leveldb/journal"
- "github.com/syndtr/goleveldb/leveldb/memdb"
- "github.com/syndtr/goleveldb/leveldb/storage"
- )
- var (
- errHasFrozenMem = errors.New("has frozen mem")
- )
- type memDB struct {
- db *DB
- *memdb.DB
- ref int32
- }
- func (m *memDB) getref() int32 {
- return atomic.LoadInt32(&m.ref)
- }
- func (m *memDB) incref() {
- atomic.AddInt32(&m.ref, 1)
- }
- func (m *memDB) decref() {
- if ref := atomic.AddInt32(&m.ref, -1); ref == 0 {
- // Only put back memdb with std capacity.
- if m.Capacity() == m.db.s.o.GetWriteBuffer() {
- m.Reset()
- m.db.mpoolPut(m.DB)
- }
- m.db = nil
- m.DB = nil
- } else if ref < 0 {
- panic("negative memdb ref")
- }
- }
- // Get latest sequence number.
- func (db *DB) getSeq() uint64 {
- return atomic.LoadUint64(&db.seq)
- }
- // Atomically adds delta to seq.
- func (db *DB) addSeq(delta uint64) {
- atomic.AddUint64(&db.seq, delta)
- }
- func (db *DB) setSeq(seq uint64) {
- atomic.StoreUint64(&db.seq, seq)
- }
- func (db *DB) sampleSeek(ikey internalKey) {
- v := db.s.version()
- if v.sampleSeek(ikey) {
- // Trigger table compaction.
- db.compTrigger(db.tcompCmdC)
- }
- v.release()
- }
- func (db *DB) mpoolPut(mem *memdb.DB) {
- if !db.isClosed() {
- select {
- case db.memPool <- mem:
- default:
- }
- }
- }
- func (db *DB) mpoolGet(n int) *memDB {
- var mdb *memdb.DB
- select {
- case mdb = <-db.memPool:
- default:
- }
- if mdb == nil || mdb.Capacity() < n {
- mdb = memdb.New(db.s.icmp, maxInt(db.s.o.GetWriteBuffer(), n))
- }
- return &memDB{
- db: db,
- DB: mdb,
- }
- }
- func (db *DB) mpoolDrain() {
- ticker := time.NewTicker(30 * time.Second)
- for {
- select {
- case <-ticker.C:
- select {
- case <-db.memPool:
- default:
- }
- case <-db.closeC:
- ticker.Stop()
- // Make sure the pool is drained.
- select {
- case <-db.memPool:
- case <-time.After(time.Second):
- }
- close(db.memPool)
- return
- }
- }
- }
- // Create new memdb and froze the old one; need external synchronization.
- // newMem only called synchronously by the writer.
- func (db *DB) newMem(n int) (mem *memDB, err error) {
- fd := storage.FileDesc{Type: storage.TypeJournal, Num: db.s.allocFileNum()}
- w, err := db.s.stor.Create(fd)
- if err != nil {
- db.s.reuseFileNum(fd.Num)
- return
- }
- db.memMu.Lock()
- defer db.memMu.Unlock()
- if db.frozenMem != nil {
- return nil, errHasFrozenMem
- }
- if db.journal == nil {
- db.journal = journal.NewWriter(w)
- } else {
- db.journal.Reset(w)
- db.journalWriter.Close()
- db.frozenJournalFd = db.journalFd
- }
- db.journalWriter = w
- db.journalFd = fd
- db.frozenMem = db.mem
- mem = db.mpoolGet(n)
- mem.incref() // for self
- mem.incref() // for caller
- db.mem = mem
- // The seq only incremented by the writer. And whoever called newMem
- // should hold write lock, so no need additional synchronization here.
- db.frozenSeq = db.seq
- return
- }
- // Get all memdbs.
- func (db *DB) getMems() (e, f *memDB) {
- db.memMu.RLock()
- defer db.memMu.RUnlock()
- if db.mem != nil {
- db.mem.incref()
- } else if !db.isClosed() {
- panic("nil effective mem")
- }
- if db.frozenMem != nil {
- db.frozenMem.incref()
- }
- return db.mem, db.frozenMem
- }
- // Get effective memdb.
- func (db *DB) getEffectiveMem() *memDB {
- db.memMu.RLock()
- defer db.memMu.RUnlock()
- if db.mem != nil {
- db.mem.incref()
- } else if !db.isClosed() {
- panic("nil effective mem")
- }
- return db.mem
- }
- // Check whether we has frozen memdb.
- func (db *DB) hasFrozenMem() bool {
- db.memMu.RLock()
- defer db.memMu.RUnlock()
- return db.frozenMem != nil
- }
- // Get frozen memdb.
- func (db *DB) getFrozenMem() *memDB {
- db.memMu.RLock()
- defer db.memMu.RUnlock()
- if db.frozenMem != nil {
- db.frozenMem.incref()
- }
- return db.frozenMem
- }
- // Drop frozen memdb; assume that frozen memdb isn't nil.
- func (db *DB) dropFrozenMem() {
- db.memMu.Lock()
- if err := db.s.stor.Remove(db.frozenJournalFd); err != nil {
- db.logf("journal@remove removing @%d %q", db.frozenJournalFd.Num, err)
- } else {
- db.logf("journal@remove removed @%d", db.frozenJournalFd.Num)
- }
- db.frozenJournalFd = storage.FileDesc{}
- db.frozenMem.decref()
- db.frozenMem = nil
- db.memMu.Unlock()
- }
- // Clear mems ptr; used by DB.Close().
- func (db *DB) clearMems() {
- db.memMu.Lock()
- db.mem = nil
- db.frozenMem = nil
- db.memMu.Unlock()
- }
- // Set closed flag; return true if not already closed.
- func (db *DB) setClosed() bool {
- return atomic.CompareAndSwapUint32(&db.closed, 0, 1)
- }
- // Check whether DB was closed.
- func (db *DB) isClosed() bool {
- return atomic.LoadUint32(&db.closed) != 0
- }
- // Check read ok status.
- func (db *DB) ok() error {
- if db.isClosed() {
- return ErrClosed
- }
- return nil
- }
|