123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531 |
- // Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
- // All rights reserved.
- //
- // Use of this source code is governed by a BSD-style license that can be
- // found in the LICENSE file.
- package leveldb
- import (
- "fmt"
- "sort"
- "sync/atomic"
- "github.com/syndtr/goleveldb/leveldb/cache"
- "github.com/syndtr/goleveldb/leveldb/iterator"
- "github.com/syndtr/goleveldb/leveldb/opt"
- "github.com/syndtr/goleveldb/leveldb/storage"
- "github.com/syndtr/goleveldb/leveldb/table"
- "github.com/syndtr/goleveldb/leveldb/util"
- )
- // tFile holds basic information about a table.
- type tFile struct {
- fd storage.FileDesc
- seekLeft int32
- size int64
- imin, imax internalKey
- }
- // Returns true if given key is after largest key of this table.
- func (t *tFile) after(icmp *iComparer, ukey []byte) bool {
- return ukey != nil && icmp.uCompare(ukey, t.imax.ukey()) > 0
- }
- // Returns true if given key is before smallest key of this table.
- func (t *tFile) before(icmp *iComparer, ukey []byte) bool {
- return ukey != nil && icmp.uCompare(ukey, t.imin.ukey()) < 0
- }
- // Returns true if given key range overlaps with this table key range.
- func (t *tFile) overlaps(icmp *iComparer, umin, umax []byte) bool {
- return !t.after(icmp, umin) && !t.before(icmp, umax)
- }
- // Cosumes one seek and return current seeks left.
- func (t *tFile) consumeSeek() int32 {
- return atomic.AddInt32(&t.seekLeft, -1)
- }
- // Creates new tFile.
- func newTableFile(fd storage.FileDesc, size int64, imin, imax internalKey) *tFile {
- f := &tFile{
- fd: fd,
- size: size,
- imin: imin,
- imax: imax,
- }
- // We arrange to automatically compact this file after
- // a certain number of seeks. Let's assume:
- // (1) One seek costs 10ms
- // (2) Writing or reading 1MB costs 10ms (100MB/s)
- // (3) A compaction of 1MB does 25MB of IO:
- // 1MB read from this level
- // 10-12MB read from next level (boundaries may be misaligned)
- // 10-12MB written to next level
- // This implies that 25 seeks cost the same as the compaction
- // of 1MB of data. I.e., one seek costs approximately the
- // same as the compaction of 40KB of data. We are a little
- // conservative and allow approximately one seek for every 16KB
- // of data before triggering a compaction.
- f.seekLeft = int32(size / 16384)
- if f.seekLeft < 100 {
- f.seekLeft = 100
- }
- return f
- }
- func tableFileFromRecord(r atRecord) *tFile {
- return newTableFile(storage.FileDesc{Type: storage.TypeTable, Num: r.num}, r.size, r.imin, r.imax)
- }
- // tFiles hold multiple tFile.
- type tFiles []*tFile
- func (tf tFiles) Len() int { return len(tf) }
- func (tf tFiles) Swap(i, j int) { tf[i], tf[j] = tf[j], tf[i] }
- func (tf tFiles) nums() string {
- x := "[ "
- for i, f := range tf {
- if i != 0 {
- x += ", "
- }
- x += fmt.Sprint(f.fd.Num)
- }
- x += " ]"
- return x
- }
- // Returns true if i smallest key is less than j.
- // This used for sort by key in ascending order.
- func (tf tFiles) lessByKey(icmp *iComparer, i, j int) bool {
- a, b := tf[i], tf[j]
- n := icmp.Compare(a.imin, b.imin)
- if n == 0 {
- return a.fd.Num < b.fd.Num
- }
- return n < 0
- }
- // Returns true if i file number is greater than j.
- // This used for sort by file number in descending order.
- func (tf tFiles) lessByNum(i, j int) bool {
- return tf[i].fd.Num > tf[j].fd.Num
- }
- // Sorts tables by key in ascending order.
- func (tf tFiles) sortByKey(icmp *iComparer) {
- sort.Sort(&tFilesSortByKey{tFiles: tf, icmp: icmp})
- }
- // Sorts tables by file number in descending order.
- func (tf tFiles) sortByNum() {
- sort.Sort(&tFilesSortByNum{tFiles: tf})
- }
- // Returns sum of all tables size.
- func (tf tFiles) size() (sum int64) {
- for _, t := range tf {
- sum += t.size
- }
- return sum
- }
- // Searches smallest index of tables whose its smallest
- // key is after or equal with given key.
- func (tf tFiles) searchMin(icmp *iComparer, ikey internalKey) int {
- return sort.Search(len(tf), func(i int) bool {
- return icmp.Compare(tf[i].imin, ikey) >= 0
- })
- }
- // Searches smallest index of tables whose its largest
- // key is after or equal with given key.
- func (tf tFiles) searchMax(icmp *iComparer, ikey internalKey) int {
- return sort.Search(len(tf), func(i int) bool {
- return icmp.Compare(tf[i].imax, ikey) >= 0
- })
- }
- // Returns true if given key range overlaps with one or more
- // tables key range. If unsorted is true then binary search will not be used.
- func (tf tFiles) overlaps(icmp *iComparer, umin, umax []byte, unsorted bool) bool {
- if unsorted {
- // Check against all files.
- for _, t := range tf {
- if t.overlaps(icmp, umin, umax) {
- return true
- }
- }
- return false
- }
- i := 0
- if len(umin) > 0 {
- // Find the earliest possible internal key for min.
- i = tf.searchMax(icmp, makeInternalKey(nil, umin, keyMaxSeq, keyTypeSeek))
- }
- if i >= len(tf) {
- // Beginning of range is after all files, so no overlap.
- return false
- }
- return !tf[i].before(icmp, umax)
- }
- // Returns tables whose its key range overlaps with given key range.
- // Range will be expanded if ukey found hop across tables.
- // If overlapped is true then the search will be restarted if umax
- // expanded.
- // The dst content will be overwritten.
- func (tf tFiles) getOverlaps(dst tFiles, icmp *iComparer, umin, umax []byte, overlapped bool) tFiles {
- dst = dst[:0]
- for i := 0; i < len(tf); {
- t := tf[i]
- if t.overlaps(icmp, umin, umax) {
- if umin != nil && icmp.uCompare(t.imin.ukey(), umin) < 0 {
- umin = t.imin.ukey()
- dst = dst[:0]
- i = 0
- continue
- } else if umax != nil && icmp.uCompare(t.imax.ukey(), umax) > 0 {
- umax = t.imax.ukey()
- // Restart search if it is overlapped.
- if overlapped {
- dst = dst[:0]
- i = 0
- continue
- }
- }
- dst = append(dst, t)
- }
- i++
- }
- return dst
- }
- // Returns tables key range.
- func (tf tFiles) getRange(icmp *iComparer) (imin, imax internalKey) {
- for i, t := range tf {
- if i == 0 {
- imin, imax = t.imin, t.imax
- continue
- }
- if icmp.Compare(t.imin, imin) < 0 {
- imin = t.imin
- }
- if icmp.Compare(t.imax, imax) > 0 {
- imax = t.imax
- }
- }
- return
- }
- // Creates iterator index from tables.
- func (tf tFiles) newIndexIterator(tops *tOps, icmp *iComparer, slice *util.Range, ro *opt.ReadOptions) iterator.IteratorIndexer {
- if slice != nil {
- var start, limit int
- if slice.Start != nil {
- start = tf.searchMax(icmp, internalKey(slice.Start))
- }
- if slice.Limit != nil {
- limit = tf.searchMin(icmp, internalKey(slice.Limit))
- } else {
- limit = tf.Len()
- }
- tf = tf[start:limit]
- }
- return iterator.NewArrayIndexer(&tFilesArrayIndexer{
- tFiles: tf,
- tops: tops,
- icmp: icmp,
- slice: slice,
- ro: ro,
- })
- }
- // Tables iterator index.
- type tFilesArrayIndexer struct {
- tFiles
- tops *tOps
- icmp *iComparer
- slice *util.Range
- ro *opt.ReadOptions
- }
- func (a *tFilesArrayIndexer) Search(key []byte) int {
- return a.searchMax(a.icmp, internalKey(key))
- }
- func (a *tFilesArrayIndexer) Get(i int) iterator.Iterator {
- if i == 0 || i == a.Len()-1 {
- return a.tops.newIterator(a.tFiles[i], a.slice, a.ro)
- }
- return a.tops.newIterator(a.tFiles[i], nil, a.ro)
- }
- // Helper type for sortByKey.
- type tFilesSortByKey struct {
- tFiles
- icmp *iComparer
- }
- func (x *tFilesSortByKey) Less(i, j int) bool {
- return x.lessByKey(x.icmp, i, j)
- }
- // Helper type for sortByNum.
- type tFilesSortByNum struct {
- tFiles
- }
- func (x *tFilesSortByNum) Less(i, j int) bool {
- return x.lessByNum(i, j)
- }
- // Table operations.
- type tOps struct {
- s *session
- noSync bool
- evictRemoved bool
- cache *cache.Cache
- bcache *cache.Cache
- bpool *util.BufferPool
- }
- // Creates an empty table and returns table writer.
- func (t *tOps) create() (*tWriter, error) {
- fd := storage.FileDesc{Type: storage.TypeTable, Num: t.s.allocFileNum()}
- fw, err := t.s.stor.Create(fd)
- if err != nil {
- return nil, err
- }
- return &tWriter{
- t: t,
- fd: fd,
- w: fw,
- tw: table.NewWriter(fw, t.s.o.Options),
- }, nil
- }
- // Builds table from src iterator.
- func (t *tOps) createFrom(src iterator.Iterator) (f *tFile, n int, err error) {
- w, err := t.create()
- if err != nil {
- return
- }
- defer func() {
- if err != nil {
- w.drop()
- }
- }()
- for src.Next() {
- err = w.append(src.Key(), src.Value())
- if err != nil {
- return
- }
- }
- err = src.Error()
- if err != nil {
- return
- }
- n = w.tw.EntriesLen()
- f, err = w.finish()
- return
- }
- // Opens table. It returns a cache handle, which should
- // be released after use.
- func (t *tOps) open(f *tFile) (ch *cache.Handle, err error) {
- ch = t.cache.Get(0, uint64(f.fd.Num), func() (size int, value cache.Value) {
- var r storage.Reader
- r, err = t.s.stor.Open(f.fd)
- if err != nil {
- return 0, nil
- }
- var bcache *cache.NamespaceGetter
- if t.bcache != nil {
- bcache = &cache.NamespaceGetter{Cache: t.bcache, NS: uint64(f.fd.Num)}
- }
- var tr *table.Reader
- tr, err = table.NewReader(r, f.size, f.fd, bcache, t.bpool, t.s.o.Options)
- if err != nil {
- r.Close()
- return 0, nil
- }
- return 1, tr
- })
- if ch == nil && err == nil {
- err = ErrClosed
- }
- return
- }
- // Finds key/value pair whose key is greater than or equal to the
- // given key.
- func (t *tOps) find(f *tFile, key []byte, ro *opt.ReadOptions) (rkey, rvalue []byte, err error) {
- ch, err := t.open(f)
- if err != nil {
- return nil, nil, err
- }
- defer ch.Release()
- return ch.Value().(*table.Reader).Find(key, true, ro)
- }
- // Finds key that is greater than or equal to the given key.
- func (t *tOps) findKey(f *tFile, key []byte, ro *opt.ReadOptions) (rkey []byte, err error) {
- ch, err := t.open(f)
- if err != nil {
- return nil, err
- }
- defer ch.Release()
- return ch.Value().(*table.Reader).FindKey(key, true, ro)
- }
- // Returns approximate offset of the given key.
- func (t *tOps) offsetOf(f *tFile, key []byte) (offset int64, err error) {
- ch, err := t.open(f)
- if err != nil {
- return
- }
- defer ch.Release()
- return ch.Value().(*table.Reader).OffsetOf(key)
- }
- // Creates an iterator from the given table.
- func (t *tOps) newIterator(f *tFile, slice *util.Range, ro *opt.ReadOptions) iterator.Iterator {
- ch, err := t.open(f)
- if err != nil {
- return iterator.NewEmptyIterator(err)
- }
- iter := ch.Value().(*table.Reader).NewIterator(slice, ro)
- iter.SetReleaser(ch)
- return iter
- }
- // Removes table from persistent storage. It waits until
- // no one use the the table.
- func (t *tOps) remove(f *tFile) {
- t.cache.Delete(0, uint64(f.fd.Num), func() {
- if err := t.s.stor.Remove(f.fd); err != nil {
- t.s.logf("table@remove removing @%d %q", f.fd.Num, err)
- } else {
- t.s.logf("table@remove removed @%d", f.fd.Num)
- }
- if t.evictRemoved && t.bcache != nil {
- t.bcache.EvictNS(uint64(f.fd.Num))
- }
- })
- }
- // Closes the table ops instance. It will close all tables,
- // regadless still used or not.
- func (t *tOps) close() {
- t.bpool.Close()
- t.cache.Close()
- if t.bcache != nil {
- t.bcache.CloseWeak()
- }
- }
- // Creates new initialized table ops instance.
- func newTableOps(s *session) *tOps {
- var (
- cacher cache.Cacher
- bcache *cache.Cache
- bpool *util.BufferPool
- )
- if s.o.GetOpenFilesCacheCapacity() > 0 {
- cacher = cache.NewLRU(s.o.GetOpenFilesCacheCapacity())
- }
- if !s.o.GetDisableBlockCache() {
- var bcacher cache.Cacher
- if s.o.GetBlockCacheCapacity() > 0 {
- bcacher = s.o.GetBlockCacher().New(s.o.GetBlockCacheCapacity())
- }
- bcache = cache.NewCache(bcacher)
- }
- if !s.o.GetDisableBufferPool() {
- bpool = util.NewBufferPool(s.o.GetBlockSize() + 5)
- }
- return &tOps{
- s: s,
- noSync: s.o.GetNoSync(),
- evictRemoved: s.o.GetBlockCacheEvictRemoved(),
- cache: cache.NewCache(cacher),
- bcache: bcache,
- bpool: bpool,
- }
- }
- // tWriter wraps the table writer. It keep track of file descriptor
- // and added key range.
- type tWriter struct {
- t *tOps
- fd storage.FileDesc
- w storage.Writer
- tw *table.Writer
- first, last []byte
- }
- // Append key/value pair to the table.
- func (w *tWriter) append(key, value []byte) error {
- if w.first == nil {
- w.first = append([]byte{}, key...)
- }
- w.last = append(w.last[:0], key...)
- return w.tw.Append(key, value)
- }
- // Returns true if the table is empty.
- func (w *tWriter) empty() bool {
- return w.first == nil
- }
- // Closes the storage.Writer.
- func (w *tWriter) close() {
- if w.w != nil {
- w.w.Close()
- w.w = nil
- }
- }
- // Finalizes the table and returns table file.
- func (w *tWriter) finish() (f *tFile, err error) {
- defer w.close()
- err = w.tw.Close()
- if err != nil {
- return
- }
- if !w.t.noSync {
- err = w.w.Sync()
- if err != nil {
- return
- }
- }
- f = newTableFile(w.fd, int64(w.tw.BytesLen()), internalKey(w.first), internalKey(w.last))
- return
- }
- // Drops the table.
- func (w *tWriter) drop() {
- w.close()
- w.t.s.stor.Remove(w.fd)
- w.t.s.reuseFileNum(w.fd.Num)
- w.tw = nil
- w.first = nil
- w.last = nil
- }
|