123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360 |
- // Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
- // All rights reserved.
- //
- // Use of this source code is governed by a BSD-style license that can be
- // found in the LICENSE file.
- package leveldb
- import (
- "errors"
- "math/rand"
- "runtime"
- "sync"
- "sync/atomic"
- "github.com/syndtr/goleveldb/leveldb/iterator"
- "github.com/syndtr/goleveldb/leveldb/opt"
- "github.com/syndtr/goleveldb/leveldb/util"
- )
- var (
- errInvalidInternalKey = errors.New("leveldb: Iterator: invalid internal key")
- )
- type memdbReleaser struct {
- once sync.Once
- m *memDB
- }
- func (mr *memdbReleaser) Release() {
- mr.once.Do(func() {
- mr.m.decref()
- })
- }
- func (db *DB) newRawIterator(auxm *memDB, auxt tFiles, slice *util.Range, ro *opt.ReadOptions) iterator.Iterator {
- strict := opt.GetStrict(db.s.o.Options, ro, opt.StrictReader)
- em, fm := db.getMems()
- v := db.s.version()
- tableIts := v.getIterators(slice, ro)
- n := len(tableIts) + len(auxt) + 3
- its := make([]iterator.Iterator, 0, n)
- if auxm != nil {
- ami := auxm.NewIterator(slice)
- ami.SetReleaser(&memdbReleaser{m: auxm})
- its = append(its, ami)
- }
- for _, t := range auxt {
- its = append(its, v.s.tops.newIterator(t, slice, ro))
- }
- emi := em.NewIterator(slice)
- emi.SetReleaser(&memdbReleaser{m: em})
- its = append(its, emi)
- if fm != nil {
- fmi := fm.NewIterator(slice)
- fmi.SetReleaser(&memdbReleaser{m: fm})
- its = append(its, fmi)
- }
- its = append(its, tableIts...)
- mi := iterator.NewMergedIterator(its, db.s.icmp, strict)
- mi.SetReleaser(&versionReleaser{v: v})
- return mi
- }
- func (db *DB) newIterator(auxm *memDB, auxt tFiles, seq uint64, slice *util.Range, ro *opt.ReadOptions) *dbIter {
- var islice *util.Range
- if slice != nil {
- islice = &util.Range{}
- if slice.Start != nil {
- islice.Start = makeInternalKey(nil, slice.Start, keyMaxSeq, keyTypeSeek)
- }
- if slice.Limit != nil {
- islice.Limit = makeInternalKey(nil, slice.Limit, keyMaxSeq, keyTypeSeek)
- }
- }
- rawIter := db.newRawIterator(auxm, auxt, islice, ro)
- iter := &dbIter{
- db: db,
- icmp: db.s.icmp,
- iter: rawIter,
- seq: seq,
- strict: opt.GetStrict(db.s.o.Options, ro, opt.StrictReader),
- key: make([]byte, 0),
- value: make([]byte, 0),
- }
- atomic.AddInt32(&db.aliveIters, 1)
- runtime.SetFinalizer(iter, (*dbIter).Release)
- return iter
- }
- func (db *DB) iterSamplingRate() int {
- return rand.Intn(2 * db.s.o.GetIteratorSamplingRate())
- }
- type dir int
- const (
- dirReleased dir = iota - 1
- dirSOI
- dirEOI
- dirBackward
- dirForward
- )
- // dbIter represent an interator states over a database session.
- type dbIter struct {
- db *DB
- icmp *iComparer
- iter iterator.Iterator
- seq uint64
- strict bool
- smaplingGap int
- dir dir
- key []byte
- value []byte
- err error
- releaser util.Releaser
- }
- func (i *dbIter) sampleSeek() {
- ikey := i.iter.Key()
- i.smaplingGap -= len(ikey) + len(i.iter.Value())
- for i.smaplingGap < 0 {
- i.smaplingGap += i.db.iterSamplingRate()
- i.db.sampleSeek(ikey)
- }
- }
- func (i *dbIter) setErr(err error) {
- i.err = err
- i.key = nil
- i.value = nil
- }
- func (i *dbIter) iterErr() {
- if err := i.iter.Error(); err != nil {
- i.setErr(err)
- }
- }
- func (i *dbIter) Valid() bool {
- return i.err == nil && i.dir > dirEOI
- }
- func (i *dbIter) First() bool {
- if i.err != nil {
- return false
- } else if i.dir == dirReleased {
- i.err = ErrIterReleased
- return false
- }
- if i.iter.First() {
- i.dir = dirSOI
- return i.next()
- }
- i.dir = dirEOI
- i.iterErr()
- return false
- }
- func (i *dbIter) Last() bool {
- if i.err != nil {
- return false
- } else if i.dir == dirReleased {
- i.err = ErrIterReleased
- return false
- }
- if i.iter.Last() {
- return i.prev()
- }
- i.dir = dirSOI
- i.iterErr()
- return false
- }
- func (i *dbIter) Seek(key []byte) bool {
- if i.err != nil {
- return false
- } else if i.dir == dirReleased {
- i.err = ErrIterReleased
- return false
- }
- ikey := makeInternalKey(nil, key, i.seq, keyTypeSeek)
- if i.iter.Seek(ikey) {
- i.dir = dirSOI
- return i.next()
- }
- i.dir = dirEOI
- i.iterErr()
- return false
- }
- func (i *dbIter) next() bool {
- for {
- if ukey, seq, kt, kerr := parseInternalKey(i.iter.Key()); kerr == nil {
- i.sampleSeek()
- if seq <= i.seq {
- switch kt {
- case keyTypeDel:
- // Skip deleted key.
- i.key = append(i.key[:0], ukey...)
- i.dir = dirForward
- case keyTypeVal:
- if i.dir == dirSOI || i.icmp.uCompare(ukey, i.key) > 0 {
- i.key = append(i.key[:0], ukey...)
- i.value = append(i.value[:0], i.iter.Value()...)
- i.dir = dirForward
- return true
- }
- }
- }
- } else if i.strict {
- i.setErr(kerr)
- break
- }
- if !i.iter.Next() {
- i.dir = dirEOI
- i.iterErr()
- break
- }
- }
- return false
- }
- func (i *dbIter) Next() bool {
- if i.dir == dirEOI || i.err != nil {
- return false
- } else if i.dir == dirReleased {
- i.err = ErrIterReleased
- return false
- }
- if !i.iter.Next() || (i.dir == dirBackward && !i.iter.Next()) {
- i.dir = dirEOI
- i.iterErr()
- return false
- }
- return i.next()
- }
- func (i *dbIter) prev() bool {
- i.dir = dirBackward
- del := true
- if i.iter.Valid() {
- for {
- if ukey, seq, kt, kerr := parseInternalKey(i.iter.Key()); kerr == nil {
- i.sampleSeek()
- if seq <= i.seq {
- if !del && i.icmp.uCompare(ukey, i.key) < 0 {
- return true
- }
- del = (kt == keyTypeDel)
- if !del {
- i.key = append(i.key[:0], ukey...)
- i.value = append(i.value[:0], i.iter.Value()...)
- }
- }
- } else if i.strict {
- i.setErr(kerr)
- return false
- }
- if !i.iter.Prev() {
- break
- }
- }
- }
- if del {
- i.dir = dirSOI
- i.iterErr()
- return false
- }
- return true
- }
- func (i *dbIter) Prev() bool {
- if i.dir == dirSOI || i.err != nil {
- return false
- } else if i.dir == dirReleased {
- i.err = ErrIterReleased
- return false
- }
- switch i.dir {
- case dirEOI:
- return i.Last()
- case dirForward:
- for i.iter.Prev() {
- if ukey, _, _, kerr := parseInternalKey(i.iter.Key()); kerr == nil {
- i.sampleSeek()
- if i.icmp.uCompare(ukey, i.key) < 0 {
- goto cont
- }
- } else if i.strict {
- i.setErr(kerr)
- return false
- }
- }
- i.dir = dirSOI
- i.iterErr()
- return false
- }
- cont:
- return i.prev()
- }
- func (i *dbIter) Key() []byte {
- if i.err != nil || i.dir <= dirEOI {
- return nil
- }
- return i.key
- }
- func (i *dbIter) Value() []byte {
- if i.err != nil || i.dir <= dirEOI {
- return nil
- }
- return i.value
- }
- func (i *dbIter) Release() {
- if i.dir != dirReleased {
- // Clear the finalizer.
- runtime.SetFinalizer(i, nil)
- if i.releaser != nil {
- i.releaser.Release()
- i.releaser = nil
- }
- i.dir = dirReleased
- i.key = nil
- i.value = nil
- i.iter.Release()
- i.iter = nil
- atomic.AddInt32(&i.db.aliveIters, -1)
- i.db = nil
- }
- }
- func (i *dbIter) SetReleaser(releaser util.Releaser) {
- if i.dir == dirReleased {
- panic(util.ErrReleased)
- }
- if i.releaser != nil && releaser != nil {
- panic(util.ErrHasReleaser)
- }
- i.releaser = releaser
- }
- func (i *dbIter) Error() error {
- return i.err
- }
|