123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329 |
- // Copyright (c) 2016, Suryandaru Triandana <syndtr@gmail.com>
- // All rights reserved.
- //
- // Use of this source code is governed by a BSD-style license that can be
- // found in the LICENSE file.
- package leveldb
- import (
- "errors"
- "sync"
- "time"
- "github.com/syndtr/goleveldb/leveldb/iterator"
- "github.com/syndtr/goleveldb/leveldb/opt"
- "github.com/syndtr/goleveldb/leveldb/util"
- )
- var errTransactionDone = errors.New("leveldb: transaction already closed")
- // Transaction is the transaction handle.
- type Transaction struct {
- db *DB
- lk sync.RWMutex
- seq uint64
- mem *memDB
- tables tFiles
- ikScratch []byte
- rec sessionRecord
- stats cStatStaging
- closed bool
- }
- // Get gets the value for the given key. It returns ErrNotFound if the
- // DB does not contains the key.
- //
- // The returned slice is its own copy, it is safe to modify the contents
- // of the returned slice.
- // It is safe to modify the contents of the argument after Get returns.
- func (tr *Transaction) Get(key []byte, ro *opt.ReadOptions) ([]byte, error) {
- tr.lk.RLock()
- defer tr.lk.RUnlock()
- if tr.closed {
- return nil, errTransactionDone
- }
- return tr.db.get(tr.mem.DB, tr.tables, key, tr.seq, ro)
- }
- // Has returns true if the DB does contains the given key.
- //
- // It is safe to modify the contents of the argument after Has returns.
- func (tr *Transaction) Has(key []byte, ro *opt.ReadOptions) (bool, error) {
- tr.lk.RLock()
- defer tr.lk.RUnlock()
- if tr.closed {
- return false, errTransactionDone
- }
- return tr.db.has(tr.mem.DB, tr.tables, key, tr.seq, ro)
- }
- // NewIterator returns an iterator for the latest snapshot of the transaction.
- // The returned iterator is not safe for concurrent use, but it is safe to use
- // multiple iterators concurrently, with each in a dedicated goroutine.
- // It is also safe to use an iterator concurrently while writes to the
- // transaction. The resultant key/value pairs are guaranteed to be consistent.
- //
- // Slice allows slicing the iterator to only contains keys in the given
- // range. A nil Range.Start is treated as a key before all keys in the
- // DB. And a nil Range.Limit is treated as a key after all keys in
- // the DB.
- //
- // WARNING: Any slice returned by interator (e.g. slice returned by calling
- // Iterator.Key() or Iterator.Key() methods), its content should not be modified
- // unless noted otherwise.
- //
- // The iterator must be released after use, by calling Release method.
- //
- // Also read Iterator documentation of the leveldb/iterator package.
- func (tr *Transaction) NewIterator(slice *util.Range, ro *opt.ReadOptions) iterator.Iterator {
- tr.lk.RLock()
- defer tr.lk.RUnlock()
- if tr.closed {
- return iterator.NewEmptyIterator(errTransactionDone)
- }
- tr.mem.incref()
- return tr.db.newIterator(tr.mem, tr.tables, tr.seq, slice, ro)
- }
- func (tr *Transaction) flush() error {
- // Flush memdb.
- if tr.mem.Len() != 0 {
- tr.stats.startTimer()
- iter := tr.mem.NewIterator(nil)
- t, n, err := tr.db.s.tops.createFrom(iter)
- iter.Release()
- tr.stats.stopTimer()
- if err != nil {
- return err
- }
- if tr.mem.getref() == 1 {
- tr.mem.Reset()
- } else {
- tr.mem.decref()
- tr.mem = tr.db.mpoolGet(0)
- tr.mem.incref()
- }
- tr.tables = append(tr.tables, t)
- tr.rec.addTableFile(0, t)
- tr.stats.write += t.size
- tr.db.logf("transaction@flush created L0@%d N·%d S·%s %q:%q", t.fd.Num, n, shortenb(int(t.size)), t.imin, t.imax)
- }
- return nil
- }
- func (tr *Transaction) put(kt keyType, key, value []byte) error {
- tr.ikScratch = makeInternalKey(tr.ikScratch, key, tr.seq+1, kt)
- if tr.mem.Free() < len(tr.ikScratch)+len(value) {
- if err := tr.flush(); err != nil {
- return err
- }
- }
- if err := tr.mem.Put(tr.ikScratch, value); err != nil {
- return err
- }
- tr.seq++
- return nil
- }
- // Put sets the value for the given key. It overwrites any previous value
- // for that key; a DB is not a multi-map.
- // Please note that the transaction is not compacted until committed, so if you
- // writes 10 same keys, then those 10 same keys are in the transaction.
- //
- // It is safe to modify the contents of the arguments after Put returns.
- func (tr *Transaction) Put(key, value []byte, wo *opt.WriteOptions) error {
- tr.lk.Lock()
- defer tr.lk.Unlock()
- if tr.closed {
- return errTransactionDone
- }
- return tr.put(keyTypeVal, key, value)
- }
- // Delete deletes the value for the given key.
- // Please note that the transaction is not compacted until committed, so if you
- // writes 10 same keys, then those 10 same keys are in the transaction.
- //
- // It is safe to modify the contents of the arguments after Delete returns.
- func (tr *Transaction) Delete(key []byte, wo *opt.WriteOptions) error {
- tr.lk.Lock()
- defer tr.lk.Unlock()
- if tr.closed {
- return errTransactionDone
- }
- return tr.put(keyTypeDel, key, nil)
- }
- // Write apply the given batch to the transaction. The batch will be applied
- // sequentially.
- // Please note that the transaction is not compacted until committed, so if you
- // writes 10 same keys, then those 10 same keys are in the transaction.
- //
- // It is safe to modify the contents of the arguments after Write returns.
- func (tr *Transaction) Write(b *Batch, wo *opt.WriteOptions) error {
- if b == nil || b.Len() == 0 {
- return nil
- }
- tr.lk.Lock()
- defer tr.lk.Unlock()
- if tr.closed {
- return errTransactionDone
- }
- return b.replayInternal(func(i int, kt keyType, k, v []byte) error {
- return tr.put(kt, k, v)
- })
- }
- func (tr *Transaction) setDone() {
- tr.closed = true
- tr.db.tr = nil
- tr.mem.decref()
- <-tr.db.writeLockC
- }
- // Commit commits the transaction. If error is not nil, then the transaction is
- // not committed, it can then either be retried or discarded.
- //
- // Other methods should not be called after transaction has been committed.
- func (tr *Transaction) Commit() error {
- if err := tr.db.ok(); err != nil {
- return err
- }
- tr.lk.Lock()
- defer tr.lk.Unlock()
- if tr.closed {
- return errTransactionDone
- }
- if err := tr.flush(); err != nil {
- // Return error, lets user decide either to retry or discard
- // transaction.
- return err
- }
- if len(tr.tables) != 0 {
- // Committing transaction.
- tr.rec.setSeqNum(tr.seq)
- tr.db.compCommitLk.Lock()
- tr.stats.startTimer()
- var cerr error
- for retry := 0; retry < 3; retry++ {
- cerr = tr.db.s.commit(&tr.rec)
- if cerr != nil {
- tr.db.logf("transaction@commit error R·%d %q", retry, cerr)
- select {
- case <-time.After(time.Second):
- case <-tr.db.closeC:
- tr.db.logf("transaction@commit exiting")
- tr.db.compCommitLk.Unlock()
- return cerr
- }
- } else {
- // Success. Set db.seq.
- tr.db.setSeq(tr.seq)
- break
- }
- }
- tr.stats.stopTimer()
- if cerr != nil {
- // Return error, lets user decide either to retry or discard
- // transaction.
- return cerr
- }
- // Update compaction stats. This is safe as long as we hold compCommitLk.
- tr.db.compStats.addStat(0, &tr.stats)
- // Trigger table auto-compaction.
- tr.db.compTrigger(tr.db.tcompCmdC)
- tr.db.compCommitLk.Unlock()
- // Additionally, wait compaction when certain threshold reached.
- // Ignore error, returns error only if transaction can't be committed.
- tr.db.waitCompaction()
- }
- // Only mark as done if transaction committed successfully.
- tr.setDone()
- return nil
- }
- func (tr *Transaction) discard() {
- // Discard transaction.
- for _, t := range tr.tables {
- tr.db.logf("transaction@discard @%d", t.fd.Num)
- if err1 := tr.db.s.stor.Remove(t.fd); err1 == nil {
- tr.db.s.reuseFileNum(t.fd.Num)
- }
- }
- }
- // Discard discards the transaction.
- //
- // Other methods should not be called after transaction has been discarded.
- func (tr *Transaction) Discard() {
- tr.lk.Lock()
- if !tr.closed {
- tr.discard()
- tr.setDone()
- }
- tr.lk.Unlock()
- }
- func (db *DB) waitCompaction() error {
- if db.s.tLen(0) >= db.s.o.GetWriteL0PauseTrigger() {
- return db.compTriggerWait(db.tcompCmdC)
- }
- return nil
- }
- // OpenTransaction opens an atomic DB transaction. Only one transaction can be
- // opened at a time. Subsequent call to Write and OpenTransaction will be blocked
- // until in-flight transaction is committed or discarded.
- // The returned transaction handle is safe for concurrent use.
- //
- // Transaction is expensive and can overwhelm compaction, especially if
- // transaction size is small. Use with caution.
- //
- // The transaction must be closed once done, either by committing or discarding
- // the transaction.
- // Closing the DB will discard open transaction.
- func (db *DB) OpenTransaction() (*Transaction, error) {
- if err := db.ok(); err != nil {
- return nil, err
- }
- // The write happen synchronously.
- select {
- case db.writeLockC <- struct{}{}:
- case err := <-db.compPerErrC:
- return nil, err
- case <-db.closeC:
- return nil, ErrClosed
- }
- if db.tr != nil {
- panic("leveldb: has open transaction")
- }
- // Flush current memdb.
- if db.mem != nil && db.mem.Len() != 0 {
- if _, err := db.rotateMem(0, true); err != nil {
- return nil, err
- }
- }
- // Wait compaction when certain threshold reached.
- if err := db.waitCompaction(); err != nil {
- return nil, err
- }
- tr := &Transaction{
- db: db,
- seq: db.seq,
- mem: db.mpoolGet(0),
- }
- tr.mem.incref()
- db.tr = tr
- return tr, nil
- }
|