12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139 |
- // Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
- // All rights reserved.
- //
- // Use of this source code is governed by a BSD-style license that can be
- // found in the LICENSE file.
- package table
- import (
- "encoding/binary"
- "fmt"
- "io"
- "sort"
- "strings"
- "sync"
- "github.com/golang/snappy"
- "github.com/syndtr/goleveldb/leveldb/cache"
- "github.com/syndtr/goleveldb/leveldb/comparer"
- "github.com/syndtr/goleveldb/leveldb/errors"
- "github.com/syndtr/goleveldb/leveldb/filter"
- "github.com/syndtr/goleveldb/leveldb/iterator"
- "github.com/syndtr/goleveldb/leveldb/opt"
- "github.com/syndtr/goleveldb/leveldb/storage"
- "github.com/syndtr/goleveldb/leveldb/util"
- )
- // Reader errors.
- var (
- ErrNotFound = errors.ErrNotFound
- ErrReaderReleased = errors.New("leveldb/table: reader released")
- ErrIterReleased = errors.New("leveldb/table: iterator released")
- )
- // ErrCorrupted describes error due to corruption. This error will be wrapped
- // with errors.ErrCorrupted.
- type ErrCorrupted struct {
- Pos int64
- Size int64
- Kind string
- Reason string
- }
- func (e *ErrCorrupted) Error() string {
- return fmt.Sprintf("leveldb/table: corruption on %s (pos=%d): %s", e.Kind, e.Pos, e.Reason)
- }
- func max(x, y int) int {
- if x > y {
- return x
- }
- return y
- }
- type block struct {
- bpool *util.BufferPool
- bh blockHandle
- data []byte
- restartsLen int
- restartsOffset int
- }
- func (b *block) seek(cmp comparer.Comparer, rstart, rlimit int, key []byte) (index, offset int, err error) {
- index = sort.Search(b.restartsLen-rstart-(b.restartsLen-rlimit), func(i int) bool {
- offset := int(binary.LittleEndian.Uint32(b.data[b.restartsOffset+4*(rstart+i):]))
- offset++ // shared always zero, since this is a restart point
- v1, n1 := binary.Uvarint(b.data[offset:]) // key length
- _, n2 := binary.Uvarint(b.data[offset+n1:]) // value length
- m := offset + n1 + n2
- return cmp.Compare(b.data[m:m+int(v1)], key) > 0
- }) + rstart - 1
- if index < rstart {
- // The smallest key is greater-than key sought.
- index = rstart
- }
- offset = int(binary.LittleEndian.Uint32(b.data[b.restartsOffset+4*index:]))
- return
- }
- func (b *block) restartIndex(rstart, rlimit, offset int) int {
- return sort.Search(b.restartsLen-rstart-(b.restartsLen-rlimit), func(i int) bool {
- return int(binary.LittleEndian.Uint32(b.data[b.restartsOffset+4*(rstart+i):])) > offset
- }) + rstart - 1
- }
- func (b *block) restartOffset(index int) int {
- return int(binary.LittleEndian.Uint32(b.data[b.restartsOffset+4*index:]))
- }
- func (b *block) entry(offset int) (key, value []byte, nShared, n int, err error) {
- if offset >= b.restartsOffset {
- if offset != b.restartsOffset {
- err = &ErrCorrupted{Reason: "entries offset not aligned"}
- }
- return
- }
- v0, n0 := binary.Uvarint(b.data[offset:]) // Shared prefix length
- v1, n1 := binary.Uvarint(b.data[offset+n0:]) // Key length
- v2, n2 := binary.Uvarint(b.data[offset+n0+n1:]) // Value length
- m := n0 + n1 + n2
- n = m + int(v1) + int(v2)
- if n0 <= 0 || n1 <= 0 || n2 <= 0 || offset+n > b.restartsOffset {
- err = &ErrCorrupted{Reason: "entries corrupted"}
- return
- }
- key = b.data[offset+m : offset+m+int(v1)]
- value = b.data[offset+m+int(v1) : offset+n]
- nShared = int(v0)
- return
- }
- func (b *block) Release() {
- b.bpool.Put(b.data)
- b.bpool = nil
- b.data = nil
- }
- type dir int
- const (
- dirReleased dir = iota - 1
- dirSOI
- dirEOI
- dirBackward
- dirForward
- )
- type blockIter struct {
- tr *Reader
- block *block
- blockReleaser util.Releaser
- releaser util.Releaser
- key, value []byte
- offset int
- // Previous offset, only filled by Next.
- prevOffset int
- prevNode []int
- prevKeys []byte
- restartIndex int
- // Iterator direction.
- dir dir
- // Restart index slice range.
- riStart int
- riLimit int
- // Offset slice range.
- offsetStart int
- offsetRealStart int
- offsetLimit int
- // Error.
- err error
- }
- func (i *blockIter) sErr(err error) {
- i.err = err
- i.key = nil
- i.value = nil
- i.prevNode = nil
- i.prevKeys = nil
- }
- func (i *blockIter) reset() {
- if i.dir == dirBackward {
- i.prevNode = i.prevNode[:0]
- i.prevKeys = i.prevKeys[:0]
- }
- i.restartIndex = i.riStart
- i.offset = i.offsetStart
- i.dir = dirSOI
- i.key = i.key[:0]
- i.value = nil
- }
- func (i *blockIter) isFirst() bool {
- switch i.dir {
- case dirForward:
- return i.prevOffset == i.offsetRealStart
- case dirBackward:
- return len(i.prevNode) == 1 && i.restartIndex == i.riStart
- }
- return false
- }
- func (i *blockIter) isLast() bool {
- switch i.dir {
- case dirForward, dirBackward:
- return i.offset == i.offsetLimit
- }
- return false
- }
- func (i *blockIter) First() bool {
- if i.err != nil {
- return false
- } else if i.dir == dirReleased {
- i.err = ErrIterReleased
- return false
- }
- if i.dir == dirBackward {
- i.prevNode = i.prevNode[:0]
- i.prevKeys = i.prevKeys[:0]
- }
- i.dir = dirSOI
- return i.Next()
- }
- func (i *blockIter) Last() bool {
- if i.err != nil {
- return false
- } else if i.dir == dirReleased {
- i.err = ErrIterReleased
- return false
- }
- if i.dir == dirBackward {
- i.prevNode = i.prevNode[:0]
- i.prevKeys = i.prevKeys[:0]
- }
- i.dir = dirEOI
- return i.Prev()
- }
- func (i *blockIter) Seek(key []byte) bool {
- if i.err != nil {
- return false
- } else if i.dir == dirReleased {
- i.err = ErrIterReleased
- return false
- }
- ri, offset, err := i.block.seek(i.tr.cmp, i.riStart, i.riLimit, key)
- if err != nil {
- i.sErr(err)
- return false
- }
- i.restartIndex = ri
- i.offset = max(i.offsetStart, offset)
- if i.dir == dirSOI || i.dir == dirEOI {
- i.dir = dirForward
- }
- for i.Next() {
- if i.tr.cmp.Compare(i.key, key) >= 0 {
- return true
- }
- }
- return false
- }
- func (i *blockIter) Next() bool {
- if i.dir == dirEOI || i.err != nil {
- return false
- } else if i.dir == dirReleased {
- i.err = ErrIterReleased
- return false
- }
- if i.dir == dirSOI {
- i.restartIndex = i.riStart
- i.offset = i.offsetStart
- } else if i.dir == dirBackward {
- i.prevNode = i.prevNode[:0]
- i.prevKeys = i.prevKeys[:0]
- }
- for i.offset < i.offsetRealStart {
- key, value, nShared, n, err := i.block.entry(i.offset)
- if err != nil {
- i.sErr(i.tr.fixErrCorruptedBH(i.block.bh, err))
- return false
- }
- if n == 0 {
- i.dir = dirEOI
- return false
- }
- i.key = append(i.key[:nShared], key...)
- i.value = value
- i.offset += n
- }
- if i.offset >= i.offsetLimit {
- i.dir = dirEOI
- if i.offset != i.offsetLimit {
- i.sErr(i.tr.newErrCorruptedBH(i.block.bh, "entries offset not aligned"))
- }
- return false
- }
- key, value, nShared, n, err := i.block.entry(i.offset)
- if err != nil {
- i.sErr(i.tr.fixErrCorruptedBH(i.block.bh, err))
- return false
- }
- if n == 0 {
- i.dir = dirEOI
- return false
- }
- i.key = append(i.key[:nShared], key...)
- i.value = value
- i.prevOffset = i.offset
- i.offset += n
- i.dir = dirForward
- return true
- }
- func (i *blockIter) Prev() bool {
- if i.dir == dirSOI || i.err != nil {
- return false
- } else if i.dir == dirReleased {
- i.err = ErrIterReleased
- return false
- }
- var ri int
- if i.dir == dirForward {
- // Change direction.
- i.offset = i.prevOffset
- if i.offset == i.offsetRealStart {
- i.dir = dirSOI
- return false
- }
- ri = i.block.restartIndex(i.restartIndex, i.riLimit, i.offset)
- i.dir = dirBackward
- } else if i.dir == dirEOI {
- // At the end of iterator.
- i.restartIndex = i.riLimit
- i.offset = i.offsetLimit
- if i.offset == i.offsetRealStart {
- i.dir = dirSOI
- return false
- }
- ri = i.riLimit - 1
- i.dir = dirBackward
- } else if len(i.prevNode) == 1 {
- // This is the end of a restart range.
- i.offset = i.prevNode[0]
- i.prevNode = i.prevNode[:0]
- if i.restartIndex == i.riStart {
- i.dir = dirSOI
- return false
- }
- i.restartIndex--
- ri = i.restartIndex
- } else {
- // In the middle of restart range, get from cache.
- n := len(i.prevNode) - 3
- node := i.prevNode[n:]
- i.prevNode = i.prevNode[:n]
- // Get the key.
- ko := node[0]
- i.key = append(i.key[:0], i.prevKeys[ko:]...)
- i.prevKeys = i.prevKeys[:ko]
- // Get the value.
- vo := node[1]
- vl := vo + node[2]
- i.value = i.block.data[vo:vl]
- i.offset = vl
- return true
- }
- // Build entries cache.
- i.key = i.key[:0]
- i.value = nil
- offset := i.block.restartOffset(ri)
- if offset == i.offset {
- ri--
- if ri < 0 {
- i.dir = dirSOI
- return false
- }
- offset = i.block.restartOffset(ri)
- }
- i.prevNode = append(i.prevNode, offset)
- for {
- key, value, nShared, n, err := i.block.entry(offset)
- if err != nil {
- i.sErr(i.tr.fixErrCorruptedBH(i.block.bh, err))
- return false
- }
- if offset >= i.offsetRealStart {
- if i.value != nil {
- // Appends 3 variables:
- // 1. Previous keys offset
- // 2. Value offset in the data block
- // 3. Value length
- i.prevNode = append(i.prevNode, len(i.prevKeys), offset-len(i.value), len(i.value))
- i.prevKeys = append(i.prevKeys, i.key...)
- }
- i.value = value
- }
- i.key = append(i.key[:nShared], key...)
- offset += n
- // Stop if target offset reached.
- if offset >= i.offset {
- if offset != i.offset {
- i.sErr(i.tr.newErrCorruptedBH(i.block.bh, "entries offset not aligned"))
- return false
- }
- break
- }
- }
- i.restartIndex = ri
- i.offset = offset
- return true
- }
- func (i *blockIter) Key() []byte {
- if i.err != nil || i.dir <= dirEOI {
- return nil
- }
- return i.key
- }
- func (i *blockIter) Value() []byte {
- if i.err != nil || i.dir <= dirEOI {
- return nil
- }
- return i.value
- }
- func (i *blockIter) Release() {
- if i.dir != dirReleased {
- i.tr = nil
- i.block = nil
- i.prevNode = nil
- i.prevKeys = nil
- i.key = nil
- i.value = nil
- i.dir = dirReleased
- if i.blockReleaser != nil {
- i.blockReleaser.Release()
- i.blockReleaser = nil
- }
- if i.releaser != nil {
- i.releaser.Release()
- i.releaser = nil
- }
- }
- }
- func (i *blockIter) SetReleaser(releaser util.Releaser) {
- if i.dir == dirReleased {
- panic(util.ErrReleased)
- }
- if i.releaser != nil && releaser != nil {
- panic(util.ErrHasReleaser)
- }
- i.releaser = releaser
- }
- func (i *blockIter) Valid() bool {
- return i.err == nil && (i.dir == dirBackward || i.dir == dirForward)
- }
- func (i *blockIter) Error() error {
- return i.err
- }
- type filterBlock struct {
- bpool *util.BufferPool
- data []byte
- oOffset int
- baseLg uint
- filtersNum int
- }
- func (b *filterBlock) contains(filter filter.Filter, offset uint64, key []byte) bool {
- i := int(offset >> b.baseLg)
- if i < b.filtersNum {
- o := b.data[b.oOffset+i*4:]
- n := int(binary.LittleEndian.Uint32(o))
- m := int(binary.LittleEndian.Uint32(o[4:]))
- if n < m && m <= b.oOffset {
- return filter.Contains(b.data[n:m], key)
- } else if n == m {
- return false
- }
- }
- return true
- }
- func (b *filterBlock) Release() {
- b.bpool.Put(b.data)
- b.bpool = nil
- b.data = nil
- }
- type indexIter struct {
- *blockIter
- tr *Reader
- slice *util.Range
- // Options
- fillCache bool
- }
- func (i *indexIter) Get() iterator.Iterator {
- value := i.Value()
- if value == nil {
- return nil
- }
- dataBH, n := decodeBlockHandle(value)
- if n == 0 {
- return iterator.NewEmptyIterator(i.tr.newErrCorruptedBH(i.tr.indexBH, "bad data block handle"))
- }
- var slice *util.Range
- if i.slice != nil && (i.blockIter.isFirst() || i.blockIter.isLast()) {
- slice = i.slice
- }
- return i.tr.getDataIterErr(dataBH, slice, i.tr.verifyChecksum, i.fillCache)
- }
- // Reader is a table reader.
- type Reader struct {
- mu sync.RWMutex
- fd storage.FileDesc
- reader io.ReaderAt
- cache *cache.NamespaceGetter
- err error
- bpool *util.BufferPool
- // Options
- o *opt.Options
- cmp comparer.Comparer
- filter filter.Filter
- verifyChecksum bool
- dataEnd int64
- metaBH, indexBH, filterBH blockHandle
- indexBlock *block
- filterBlock *filterBlock
- }
- func (r *Reader) blockKind(bh blockHandle) string {
- switch bh.offset {
- case r.metaBH.offset:
- return "meta-block"
- case r.indexBH.offset:
- return "index-block"
- case r.filterBH.offset:
- if r.filterBH.length > 0 {
- return "filter-block"
- }
- }
- return "data-block"
- }
- func (r *Reader) newErrCorrupted(pos, size int64, kind, reason string) error {
- return &errors.ErrCorrupted{Fd: r.fd, Err: &ErrCorrupted{Pos: pos, Size: size, Kind: kind, Reason: reason}}
- }
- func (r *Reader) newErrCorruptedBH(bh blockHandle, reason string) error {
- return r.newErrCorrupted(int64(bh.offset), int64(bh.length), r.blockKind(bh), reason)
- }
- func (r *Reader) fixErrCorruptedBH(bh blockHandle, err error) error {
- if cerr, ok := err.(*ErrCorrupted); ok {
- cerr.Pos = int64(bh.offset)
- cerr.Size = int64(bh.length)
- cerr.Kind = r.blockKind(bh)
- return &errors.ErrCorrupted{Fd: r.fd, Err: cerr}
- }
- return err
- }
- func (r *Reader) readRawBlock(bh blockHandle, verifyChecksum bool) ([]byte, error) {
- data := r.bpool.Get(int(bh.length + blockTrailerLen))
- if _, err := r.reader.ReadAt(data, int64(bh.offset)); err != nil && err != io.EOF {
- return nil, err
- }
- if verifyChecksum {
- n := bh.length + 1
- checksum0 := binary.LittleEndian.Uint32(data[n:])
- checksum1 := util.NewCRC(data[:n]).Value()
- if checksum0 != checksum1 {
- r.bpool.Put(data)
- return nil, r.newErrCorruptedBH(bh, fmt.Sprintf("checksum mismatch, want=%#x got=%#x", checksum0, checksum1))
- }
- }
- switch data[bh.length] {
- case blockTypeNoCompression:
- data = data[:bh.length]
- case blockTypeSnappyCompression:
- decLen, err := snappy.DecodedLen(data[:bh.length])
- if err != nil {
- r.bpool.Put(data)
- return nil, r.newErrCorruptedBH(bh, err.Error())
- }
- decData := r.bpool.Get(decLen)
- decData, err = snappy.Decode(decData, data[:bh.length])
- r.bpool.Put(data)
- if err != nil {
- r.bpool.Put(decData)
- return nil, r.newErrCorruptedBH(bh, err.Error())
- }
- data = decData
- default:
- r.bpool.Put(data)
- return nil, r.newErrCorruptedBH(bh, fmt.Sprintf("unknown compression type %#x", data[bh.length]))
- }
- return data, nil
- }
- func (r *Reader) readBlock(bh blockHandle, verifyChecksum bool) (*block, error) {
- data, err := r.readRawBlock(bh, verifyChecksum)
- if err != nil {
- return nil, err
- }
- restartsLen := int(binary.LittleEndian.Uint32(data[len(data)-4:]))
- b := &block{
- bpool: r.bpool,
- bh: bh,
- data: data,
- restartsLen: restartsLen,
- restartsOffset: len(data) - (restartsLen+1)*4,
- }
- return b, nil
- }
- func (r *Reader) readBlockCached(bh blockHandle, verifyChecksum, fillCache bool) (*block, util.Releaser, error) {
- if r.cache != nil {
- var (
- err error
- ch *cache.Handle
- )
- if fillCache {
- ch = r.cache.Get(bh.offset, func() (size int, value cache.Value) {
- var b *block
- b, err = r.readBlock(bh, verifyChecksum)
- if err != nil {
- return 0, nil
- }
- return cap(b.data), b
- })
- } else {
- ch = r.cache.Get(bh.offset, nil)
- }
- if ch != nil {
- b, ok := ch.Value().(*block)
- if !ok {
- ch.Release()
- return nil, nil, errors.New("leveldb/table: inconsistent block type")
- }
- return b, ch, err
- } else if err != nil {
- return nil, nil, err
- }
- }
- b, err := r.readBlock(bh, verifyChecksum)
- return b, b, err
- }
- func (r *Reader) readFilterBlock(bh blockHandle) (*filterBlock, error) {
- data, err := r.readRawBlock(bh, true)
- if err != nil {
- return nil, err
- }
- n := len(data)
- if n < 5 {
- return nil, r.newErrCorruptedBH(bh, "too short")
- }
- m := n - 5
- oOffset := int(binary.LittleEndian.Uint32(data[m:]))
- if oOffset > m {
- return nil, r.newErrCorruptedBH(bh, "invalid data-offsets offset")
- }
- b := &filterBlock{
- bpool: r.bpool,
- data: data,
- oOffset: oOffset,
- baseLg: uint(data[n-1]),
- filtersNum: (m - oOffset) / 4,
- }
- return b, nil
- }
- func (r *Reader) readFilterBlockCached(bh blockHandle, fillCache bool) (*filterBlock, util.Releaser, error) {
- if r.cache != nil {
- var (
- err error
- ch *cache.Handle
- )
- if fillCache {
- ch = r.cache.Get(bh.offset, func() (size int, value cache.Value) {
- var b *filterBlock
- b, err = r.readFilterBlock(bh)
- if err != nil {
- return 0, nil
- }
- return cap(b.data), b
- })
- } else {
- ch = r.cache.Get(bh.offset, nil)
- }
- if ch != nil {
- b, ok := ch.Value().(*filterBlock)
- if !ok {
- ch.Release()
- return nil, nil, errors.New("leveldb/table: inconsistent block type")
- }
- return b, ch, err
- } else if err != nil {
- return nil, nil, err
- }
- }
- b, err := r.readFilterBlock(bh)
- return b, b, err
- }
- func (r *Reader) getIndexBlock(fillCache bool) (b *block, rel util.Releaser, err error) {
- if r.indexBlock == nil {
- return r.readBlockCached(r.indexBH, true, fillCache)
- }
- return r.indexBlock, util.NoopReleaser{}, nil
- }
- func (r *Reader) getFilterBlock(fillCache bool) (*filterBlock, util.Releaser, error) {
- if r.filterBlock == nil {
- return r.readFilterBlockCached(r.filterBH, fillCache)
- }
- return r.filterBlock, util.NoopReleaser{}, nil
- }
- func (r *Reader) newBlockIter(b *block, bReleaser util.Releaser, slice *util.Range, inclLimit bool) *blockIter {
- bi := &blockIter{
- tr: r,
- block: b,
- blockReleaser: bReleaser,
- // Valid key should never be nil.
- key: make([]byte, 0),
- dir: dirSOI,
- riStart: 0,
- riLimit: b.restartsLen,
- offsetStart: 0,
- offsetRealStart: 0,
- offsetLimit: b.restartsOffset,
- }
- if slice != nil {
- if slice.Start != nil {
- if bi.Seek(slice.Start) {
- bi.riStart = b.restartIndex(bi.restartIndex, b.restartsLen, bi.prevOffset)
- bi.offsetStart = b.restartOffset(bi.riStart)
- bi.offsetRealStart = bi.prevOffset
- } else {
- bi.riStart = b.restartsLen
- bi.offsetStart = b.restartsOffset
- bi.offsetRealStart = b.restartsOffset
- }
- }
- if slice.Limit != nil {
- if bi.Seek(slice.Limit) && (!inclLimit || bi.Next()) {
- bi.offsetLimit = bi.prevOffset
- bi.riLimit = bi.restartIndex + 1
- }
- }
- bi.reset()
- if bi.offsetStart > bi.offsetLimit {
- bi.sErr(errors.New("leveldb/table: invalid slice range"))
- }
- }
- return bi
- }
- func (r *Reader) getDataIter(dataBH blockHandle, slice *util.Range, verifyChecksum, fillCache bool) iterator.Iterator {
- b, rel, err := r.readBlockCached(dataBH, verifyChecksum, fillCache)
- if err != nil {
- return iterator.NewEmptyIterator(err)
- }
- return r.newBlockIter(b, rel, slice, false)
- }
- func (r *Reader) getDataIterErr(dataBH blockHandle, slice *util.Range, verifyChecksum, fillCache bool) iterator.Iterator {
- r.mu.RLock()
- defer r.mu.RUnlock()
- if r.err != nil {
- return iterator.NewEmptyIterator(r.err)
- }
- return r.getDataIter(dataBH, slice, verifyChecksum, fillCache)
- }
- // NewIterator creates an iterator from the table.
- //
- // Slice allows slicing the iterator to only contains keys in the given
- // range. A nil Range.Start is treated as a key before all keys in the
- // table. And a nil Range.Limit is treated as a key after all keys in
- // the table.
- //
- // WARNING: Any slice returned by interator (e.g. slice returned by calling
- // Iterator.Key() or Iterator.Key() methods), its content should not be modified
- // unless noted otherwise.
- //
- // The returned iterator is not safe for concurrent use and should be released
- // after use.
- //
- // Also read Iterator documentation of the leveldb/iterator package.
- func (r *Reader) NewIterator(slice *util.Range, ro *opt.ReadOptions) iterator.Iterator {
- r.mu.RLock()
- defer r.mu.RUnlock()
- if r.err != nil {
- return iterator.NewEmptyIterator(r.err)
- }
- fillCache := !ro.GetDontFillCache()
- indexBlock, rel, err := r.getIndexBlock(fillCache)
- if err != nil {
- return iterator.NewEmptyIterator(err)
- }
- index := &indexIter{
- blockIter: r.newBlockIter(indexBlock, rel, slice, true),
- tr: r,
- slice: slice,
- fillCache: !ro.GetDontFillCache(),
- }
- return iterator.NewIndexedIterator(index, opt.GetStrict(r.o, ro, opt.StrictReader))
- }
- func (r *Reader) find(key []byte, filtered bool, ro *opt.ReadOptions, noValue bool) (rkey, value []byte, err error) {
- r.mu.RLock()
- defer r.mu.RUnlock()
- if r.err != nil {
- err = r.err
- return
- }
- indexBlock, rel, err := r.getIndexBlock(true)
- if err != nil {
- return
- }
- defer rel.Release()
- index := r.newBlockIter(indexBlock, nil, nil, true)
- defer index.Release()
- if !index.Seek(key) {
- if err = index.Error(); err == nil {
- err = ErrNotFound
- }
- return
- }
- dataBH, n := decodeBlockHandle(index.Value())
- if n == 0 {
- r.err = r.newErrCorruptedBH(r.indexBH, "bad data block handle")
- return nil, nil, r.err
- }
- // The filter should only used for exact match.
- if filtered && r.filter != nil {
- filterBlock, frel, ferr := r.getFilterBlock(true)
- if ferr == nil {
- if !filterBlock.contains(r.filter, dataBH.offset, key) {
- frel.Release()
- return nil, nil, ErrNotFound
- }
- frel.Release()
- } else if !errors.IsCorrupted(ferr) {
- return nil, nil, ferr
- }
- }
- data := r.getDataIter(dataBH, nil, r.verifyChecksum, !ro.GetDontFillCache())
- if !data.Seek(key) {
- data.Release()
- if err = data.Error(); err != nil {
- return
- }
- // The nearest greater-than key is the first key of the next block.
- if !index.Next() {
- if err = index.Error(); err == nil {
- err = ErrNotFound
- }
- return
- }
- dataBH, n = decodeBlockHandle(index.Value())
- if n == 0 {
- r.err = r.newErrCorruptedBH(r.indexBH, "bad data block handle")
- return nil, nil, r.err
- }
- data = r.getDataIter(dataBH, nil, r.verifyChecksum, !ro.GetDontFillCache())
- if !data.Next() {
- data.Release()
- if err = data.Error(); err == nil {
- err = ErrNotFound
- }
- return
- }
- }
- // Key doesn't use block buffer, no need to copy the buffer.
- rkey = data.Key()
- if !noValue {
- if r.bpool == nil {
- value = data.Value()
- } else {
- // Value does use block buffer, and since the buffer will be
- // recycled, it need to be copied.
- value = append([]byte{}, data.Value()...)
- }
- }
- data.Release()
- return
- }
- // Find finds key/value pair whose key is greater than or equal to the
- // given key. It returns ErrNotFound if the table doesn't contain
- // such pair.
- // If filtered is true then the nearest 'block' will be checked against
- // 'filter data' (if present) and will immediately return ErrNotFound if
- // 'filter data' indicates that such pair doesn't exist.
- //
- // The caller may modify the contents of the returned slice as it is its
- // own copy.
- // It is safe to modify the contents of the argument after Find returns.
- func (r *Reader) Find(key []byte, filtered bool, ro *opt.ReadOptions) (rkey, value []byte, err error) {
- return r.find(key, filtered, ro, false)
- }
- // FindKey finds key that is greater than or equal to the given key.
- // It returns ErrNotFound if the table doesn't contain such key.
- // If filtered is true then the nearest 'block' will be checked against
- // 'filter data' (if present) and will immediately return ErrNotFound if
- // 'filter data' indicates that such key doesn't exist.
- //
- // The caller may modify the contents of the returned slice as it is its
- // own copy.
- // It is safe to modify the contents of the argument after Find returns.
- func (r *Reader) FindKey(key []byte, filtered bool, ro *opt.ReadOptions) (rkey []byte, err error) {
- rkey, _, err = r.find(key, filtered, ro, true)
- return
- }
- // Get gets the value for the given key. It returns errors.ErrNotFound
- // if the table does not contain the key.
- //
- // The caller may modify the contents of the returned slice as it is its
- // own copy.
- // It is safe to modify the contents of the argument after Find returns.
- func (r *Reader) Get(key []byte, ro *opt.ReadOptions) (value []byte, err error) {
- r.mu.RLock()
- defer r.mu.RUnlock()
- if r.err != nil {
- err = r.err
- return
- }
- rkey, value, err := r.find(key, false, ro, false)
- if err == nil && r.cmp.Compare(rkey, key) != 0 {
- value = nil
- err = ErrNotFound
- }
- return
- }
- // OffsetOf returns approximate offset for the given key.
- //
- // It is safe to modify the contents of the argument after Get returns.
- func (r *Reader) OffsetOf(key []byte) (offset int64, err error) {
- r.mu.RLock()
- defer r.mu.RUnlock()
- if r.err != nil {
- err = r.err
- return
- }
- indexBlock, rel, err := r.readBlockCached(r.indexBH, true, true)
- if err != nil {
- return
- }
- defer rel.Release()
- index := r.newBlockIter(indexBlock, nil, nil, true)
- defer index.Release()
- if index.Seek(key) {
- dataBH, n := decodeBlockHandle(index.Value())
- if n == 0 {
- r.err = r.newErrCorruptedBH(r.indexBH, "bad data block handle")
- return
- }
- offset = int64(dataBH.offset)
- return
- }
- err = index.Error()
- if err == nil {
- offset = r.dataEnd
- }
- return
- }
- // Release implements util.Releaser.
- // It also close the file if it is an io.Closer.
- func (r *Reader) Release() {
- r.mu.Lock()
- defer r.mu.Unlock()
- if closer, ok := r.reader.(io.Closer); ok {
- closer.Close()
- }
- if r.indexBlock != nil {
- r.indexBlock.Release()
- r.indexBlock = nil
- }
- if r.filterBlock != nil {
- r.filterBlock.Release()
- r.filterBlock = nil
- }
- r.reader = nil
- r.cache = nil
- r.bpool = nil
- r.err = ErrReaderReleased
- }
- // NewReader creates a new initialized table reader for the file.
- // The fi, cache and bpool is optional and can be nil.
- //
- // The returned table reader instance is safe for concurrent use.
- func NewReader(f io.ReaderAt, size int64, fd storage.FileDesc, cache *cache.NamespaceGetter, bpool *util.BufferPool, o *opt.Options) (*Reader, error) {
- if f == nil {
- return nil, errors.New("leveldb/table: nil file")
- }
- r := &Reader{
- fd: fd,
- reader: f,
- cache: cache,
- bpool: bpool,
- o: o,
- cmp: o.GetComparer(),
- verifyChecksum: o.GetStrict(opt.StrictBlockChecksum),
- }
- if size < footerLen {
- r.err = r.newErrCorrupted(0, size, "table", "too small")
- return r, nil
- }
- footerPos := size - footerLen
- var footer [footerLen]byte
- if _, err := r.reader.ReadAt(footer[:], footerPos); err != nil && err != io.EOF {
- return nil, err
- }
- if string(footer[footerLen-len(magic):footerLen]) != magic {
- r.err = r.newErrCorrupted(footerPos, footerLen, "table-footer", "bad magic number")
- return r, nil
- }
- var n int
- // Decode the metaindex block handle.
- r.metaBH, n = decodeBlockHandle(footer[:])
- if n == 0 {
- r.err = r.newErrCorrupted(footerPos, footerLen, "table-footer", "bad metaindex block handle")
- return r, nil
- }
- // Decode the index block handle.
- r.indexBH, n = decodeBlockHandle(footer[n:])
- if n == 0 {
- r.err = r.newErrCorrupted(footerPos, footerLen, "table-footer", "bad index block handle")
- return r, nil
- }
- // Read metaindex block.
- metaBlock, err := r.readBlock(r.metaBH, true)
- if err != nil {
- if errors.IsCorrupted(err) {
- r.err = err
- return r, nil
- }
- return nil, err
- }
- // Set data end.
- r.dataEnd = int64(r.metaBH.offset)
- // Read metaindex.
- metaIter := r.newBlockIter(metaBlock, nil, nil, true)
- for metaIter.Next() {
- key := string(metaIter.Key())
- if !strings.HasPrefix(key, "filter.") {
- continue
- }
- fn := key[7:]
- if f0 := o.GetFilter(); f0 != nil && f0.Name() == fn {
- r.filter = f0
- } else {
- for _, f0 := range o.GetAltFilters() {
- if f0.Name() == fn {
- r.filter = f0
- break
- }
- }
- }
- if r.filter != nil {
- filterBH, n := decodeBlockHandle(metaIter.Value())
- if n == 0 {
- continue
- }
- r.filterBH = filterBH
- // Update data end.
- r.dataEnd = int64(filterBH.offset)
- break
- }
- }
- metaIter.Release()
- metaBlock.Release()
- // Cache index and filter block locally, since we don't have global cache.
- if cache == nil {
- r.indexBlock, err = r.readBlock(r.indexBH, true)
- if err != nil {
- if errors.IsCorrupted(err) {
- r.err = err
- return r, nil
- }
- return nil, err
- }
- if r.filter != nil {
- r.filterBlock, err = r.readFilterBlock(r.filterBH)
- if err != nil {
- if !errors.IsCorrupted(err) {
- return nil, err
- }
- // Don't use filter then.
- r.filter = nil
- }
- }
- }
- return r, nil
- }
|