123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375 |
- // Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
- // All rights reserved.
- //
- // Use of this source code is governed by a BSD-style license that can be
- // found in the LICENSE file.
- package table
- import (
- "encoding/binary"
- "errors"
- "fmt"
- "io"
- "github.com/golang/snappy"
- "github.com/syndtr/goleveldb/leveldb/comparer"
- "github.com/syndtr/goleveldb/leveldb/filter"
- "github.com/syndtr/goleveldb/leveldb/opt"
- "github.com/syndtr/goleveldb/leveldb/util"
- )
- func sharedPrefixLen(a, b []byte) int {
- i, n := 0, len(a)
- if n > len(b) {
- n = len(b)
- }
- for i < n && a[i] == b[i] {
- i++
- }
- return i
- }
- type blockWriter struct {
- restartInterval int
- buf util.Buffer
- nEntries int
- prevKey []byte
- restarts []uint32
- scratch []byte
- }
- func (w *blockWriter) append(key, value []byte) {
- nShared := 0
- if w.nEntries%w.restartInterval == 0 {
- w.restarts = append(w.restarts, uint32(w.buf.Len()))
- } else {
- nShared = sharedPrefixLen(w.prevKey, key)
- }
- n := binary.PutUvarint(w.scratch[0:], uint64(nShared))
- n += binary.PutUvarint(w.scratch[n:], uint64(len(key)-nShared))
- n += binary.PutUvarint(w.scratch[n:], uint64(len(value)))
- w.buf.Write(w.scratch[:n])
- w.buf.Write(key[nShared:])
- w.buf.Write(value)
- w.prevKey = append(w.prevKey[:0], key...)
- w.nEntries++
- }
- func (w *blockWriter) finish() {
- // Write restarts entry.
- if w.nEntries == 0 {
- // Must have at least one restart entry.
- w.restarts = append(w.restarts, 0)
- }
- w.restarts = append(w.restarts, uint32(len(w.restarts)))
- for _, x := range w.restarts {
- buf4 := w.buf.Alloc(4)
- binary.LittleEndian.PutUint32(buf4, x)
- }
- }
- func (w *blockWriter) reset() {
- w.buf.Reset()
- w.nEntries = 0
- w.restarts = w.restarts[:0]
- }
- func (w *blockWriter) bytesLen() int {
- restartsLen := len(w.restarts)
- if restartsLen == 0 {
- restartsLen = 1
- }
- return w.buf.Len() + 4*restartsLen + 4
- }
- type filterWriter struct {
- generator filter.FilterGenerator
- buf util.Buffer
- nKeys int
- offsets []uint32
- }
- func (w *filterWriter) add(key []byte) {
- if w.generator == nil {
- return
- }
- w.generator.Add(key)
- w.nKeys++
- }
- func (w *filterWriter) flush(offset uint64) {
- if w.generator == nil {
- return
- }
- for x := int(offset / filterBase); x > len(w.offsets); {
- w.generate()
- }
- }
- func (w *filterWriter) finish() {
- if w.generator == nil {
- return
- }
- // Generate last keys.
- if w.nKeys > 0 {
- w.generate()
- }
- w.offsets = append(w.offsets, uint32(w.buf.Len()))
- for _, x := range w.offsets {
- buf4 := w.buf.Alloc(4)
- binary.LittleEndian.PutUint32(buf4, x)
- }
- w.buf.WriteByte(filterBaseLg)
- }
- func (w *filterWriter) generate() {
- // Record offset.
- w.offsets = append(w.offsets, uint32(w.buf.Len()))
- // Generate filters.
- if w.nKeys > 0 {
- w.generator.Generate(&w.buf)
- w.nKeys = 0
- }
- }
- // Writer is a table writer.
- type Writer struct {
- writer io.Writer
- err error
- // Options
- cmp comparer.Comparer
- filter filter.Filter
- compression opt.Compression
- blockSize int
- dataBlock blockWriter
- indexBlock blockWriter
- filterBlock filterWriter
- pendingBH blockHandle
- offset uint64
- nEntries int
- // Scratch allocated enough for 5 uvarint. Block writer should not use
- // first 20-bytes since it will be used to encode block handle, which
- // then passed to the block writer itself.
- scratch [50]byte
- comparerScratch []byte
- compressionScratch []byte
- }
- func (w *Writer) writeBlock(buf *util.Buffer, compression opt.Compression) (bh blockHandle, err error) {
- // Compress the buffer if necessary.
- var b []byte
- if compression == opt.SnappyCompression {
- // Allocate scratch enough for compression and block trailer.
- if n := snappy.MaxEncodedLen(buf.Len()) + blockTrailerLen; len(w.compressionScratch) < n {
- w.compressionScratch = make([]byte, n)
- }
- compressed := snappy.Encode(w.compressionScratch, buf.Bytes())
- n := len(compressed)
- b = compressed[:n+blockTrailerLen]
- b[n] = blockTypeSnappyCompression
- } else {
- tmp := buf.Alloc(blockTrailerLen)
- tmp[0] = blockTypeNoCompression
- b = buf.Bytes()
- }
- // Calculate the checksum.
- n := len(b) - 4
- checksum := util.NewCRC(b[:n]).Value()
- binary.LittleEndian.PutUint32(b[n:], checksum)
- // Write the buffer to the file.
- _, err = w.writer.Write(b)
- if err != nil {
- return
- }
- bh = blockHandle{w.offset, uint64(len(b) - blockTrailerLen)}
- w.offset += uint64(len(b))
- return
- }
- func (w *Writer) flushPendingBH(key []byte) {
- if w.pendingBH.length == 0 {
- return
- }
- var separator []byte
- if len(key) == 0 {
- separator = w.cmp.Successor(w.comparerScratch[:0], w.dataBlock.prevKey)
- } else {
- separator = w.cmp.Separator(w.comparerScratch[:0], w.dataBlock.prevKey, key)
- }
- if separator == nil {
- separator = w.dataBlock.prevKey
- } else {
- w.comparerScratch = separator
- }
- n := encodeBlockHandle(w.scratch[:20], w.pendingBH)
- // Append the block handle to the index block.
- w.indexBlock.append(separator, w.scratch[:n])
- // Reset prev key of the data block.
- w.dataBlock.prevKey = w.dataBlock.prevKey[:0]
- // Clear pending block handle.
- w.pendingBH = blockHandle{}
- }
- func (w *Writer) finishBlock() error {
- w.dataBlock.finish()
- bh, err := w.writeBlock(&w.dataBlock.buf, w.compression)
- if err != nil {
- return err
- }
- w.pendingBH = bh
- // Reset the data block.
- w.dataBlock.reset()
- // Flush the filter block.
- w.filterBlock.flush(w.offset)
- return nil
- }
- // Append appends key/value pair to the table. The keys passed must
- // be in increasing order.
- //
- // It is safe to modify the contents of the arguments after Append returns.
- func (w *Writer) Append(key, value []byte) error {
- if w.err != nil {
- return w.err
- }
- if w.nEntries > 0 && w.cmp.Compare(w.dataBlock.prevKey, key) >= 0 {
- w.err = fmt.Errorf("leveldb/table: Writer: keys are not in increasing order: %q, %q", w.dataBlock.prevKey, key)
- return w.err
- }
- w.flushPendingBH(key)
- // Append key/value pair to the data block.
- w.dataBlock.append(key, value)
- // Add key to the filter block.
- w.filterBlock.add(key)
- // Finish the data block if block size target reached.
- if w.dataBlock.bytesLen() >= w.blockSize {
- if err := w.finishBlock(); err != nil {
- w.err = err
- return w.err
- }
- }
- w.nEntries++
- return nil
- }
- // BlocksLen returns number of blocks written so far.
- func (w *Writer) BlocksLen() int {
- n := w.indexBlock.nEntries
- if w.pendingBH.length > 0 {
- // Includes the pending block.
- n++
- }
- return n
- }
- // EntriesLen returns number of entries added so far.
- func (w *Writer) EntriesLen() int {
- return w.nEntries
- }
- // BytesLen returns number of bytes written so far.
- func (w *Writer) BytesLen() int {
- return int(w.offset)
- }
- // Close will finalize the table. Calling Append is not possible
- // after Close, but calling BlocksLen, EntriesLen and BytesLen
- // is still possible.
- func (w *Writer) Close() error {
- if w.err != nil {
- return w.err
- }
- // Write the last data block. Or empty data block if there
- // aren't any data blocks at all.
- if w.dataBlock.nEntries > 0 || w.nEntries == 0 {
- if err := w.finishBlock(); err != nil {
- w.err = err
- return w.err
- }
- }
- w.flushPendingBH(nil)
- // Write the filter block.
- var filterBH blockHandle
- w.filterBlock.finish()
- if buf := &w.filterBlock.buf; buf.Len() > 0 {
- filterBH, w.err = w.writeBlock(buf, opt.NoCompression)
- if w.err != nil {
- return w.err
- }
- }
- // Write the metaindex block.
- if filterBH.length > 0 {
- key := []byte("filter." + w.filter.Name())
- n := encodeBlockHandle(w.scratch[:20], filterBH)
- w.dataBlock.append(key, w.scratch[:n])
- }
- w.dataBlock.finish()
- metaindexBH, err := w.writeBlock(&w.dataBlock.buf, w.compression)
- if err != nil {
- w.err = err
- return w.err
- }
- // Write the index block.
- w.indexBlock.finish()
- indexBH, err := w.writeBlock(&w.indexBlock.buf, w.compression)
- if err != nil {
- w.err = err
- return w.err
- }
- // Write the table footer.
- footer := w.scratch[:footerLen]
- for i := range footer {
- footer[i] = 0
- }
- n := encodeBlockHandle(footer, metaindexBH)
- encodeBlockHandle(footer[n:], indexBH)
- copy(footer[footerLen-len(magic):], magic)
- if _, err := w.writer.Write(footer); err != nil {
- w.err = err
- return w.err
- }
- w.offset += footerLen
- w.err = errors.New("leveldb/table: writer is closed")
- return nil
- }
- // NewWriter creates a new initialized table writer for the file.
- //
- // Table writer is not safe for concurrent use.
- func NewWriter(f io.Writer, o *opt.Options) *Writer {
- w := &Writer{
- writer: f,
- cmp: o.GetComparer(),
- filter: o.GetFilter(),
- compression: o.GetCompression(),
- blockSize: o.GetBlockSize(),
- comparerScratch: make([]byte, 0),
- }
- // data block
- w.dataBlock.restartInterval = o.GetBlockRestartInterval()
- // The first 20-bytes are used for encoding block handle.
- w.dataBlock.scratch = w.scratch[20:]
- // index block
- w.indexBlock.restartInterval = 1
- w.indexBlock.scratch = w.scratch[20:]
- // filter block
- if w.filter != nil {
- w.filterBlock.generator = w.filter.NewGenerator()
- w.filterBlock.flush(0)
- }
- return w
- }
|