writer.go 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375
  1. // Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
  2. // All rights reserved.
  3. //
  4. // Use of this source code is governed by a BSD-style license that can be
  5. // found in the LICENSE file.
  6. package table
  7. import (
  8. "encoding/binary"
  9. "errors"
  10. "fmt"
  11. "io"
  12. "github.com/golang/snappy"
  13. "github.com/syndtr/goleveldb/leveldb/comparer"
  14. "github.com/syndtr/goleveldb/leveldb/filter"
  15. "github.com/syndtr/goleveldb/leveldb/opt"
  16. "github.com/syndtr/goleveldb/leveldb/util"
  17. )
  18. func sharedPrefixLen(a, b []byte) int {
  19. i, n := 0, len(a)
  20. if n > len(b) {
  21. n = len(b)
  22. }
  23. for i < n && a[i] == b[i] {
  24. i++
  25. }
  26. return i
  27. }
  28. type blockWriter struct {
  29. restartInterval int
  30. buf util.Buffer
  31. nEntries int
  32. prevKey []byte
  33. restarts []uint32
  34. scratch []byte
  35. }
  36. func (w *blockWriter) append(key, value []byte) {
  37. nShared := 0
  38. if w.nEntries%w.restartInterval == 0 {
  39. w.restarts = append(w.restarts, uint32(w.buf.Len()))
  40. } else {
  41. nShared = sharedPrefixLen(w.prevKey, key)
  42. }
  43. n := binary.PutUvarint(w.scratch[0:], uint64(nShared))
  44. n += binary.PutUvarint(w.scratch[n:], uint64(len(key)-nShared))
  45. n += binary.PutUvarint(w.scratch[n:], uint64(len(value)))
  46. w.buf.Write(w.scratch[:n])
  47. w.buf.Write(key[nShared:])
  48. w.buf.Write(value)
  49. w.prevKey = append(w.prevKey[:0], key...)
  50. w.nEntries++
  51. }
  52. func (w *blockWriter) finish() {
  53. // Write restarts entry.
  54. if w.nEntries == 0 {
  55. // Must have at least one restart entry.
  56. w.restarts = append(w.restarts, 0)
  57. }
  58. w.restarts = append(w.restarts, uint32(len(w.restarts)))
  59. for _, x := range w.restarts {
  60. buf4 := w.buf.Alloc(4)
  61. binary.LittleEndian.PutUint32(buf4, x)
  62. }
  63. }
  64. func (w *blockWriter) reset() {
  65. w.buf.Reset()
  66. w.nEntries = 0
  67. w.restarts = w.restarts[:0]
  68. }
  69. func (w *blockWriter) bytesLen() int {
  70. restartsLen := len(w.restarts)
  71. if restartsLen == 0 {
  72. restartsLen = 1
  73. }
  74. return w.buf.Len() + 4*restartsLen + 4
  75. }
  76. type filterWriter struct {
  77. generator filter.FilterGenerator
  78. buf util.Buffer
  79. nKeys int
  80. offsets []uint32
  81. }
  82. func (w *filterWriter) add(key []byte) {
  83. if w.generator == nil {
  84. return
  85. }
  86. w.generator.Add(key)
  87. w.nKeys++
  88. }
  89. func (w *filterWriter) flush(offset uint64) {
  90. if w.generator == nil {
  91. return
  92. }
  93. for x := int(offset / filterBase); x > len(w.offsets); {
  94. w.generate()
  95. }
  96. }
  97. func (w *filterWriter) finish() {
  98. if w.generator == nil {
  99. return
  100. }
  101. // Generate last keys.
  102. if w.nKeys > 0 {
  103. w.generate()
  104. }
  105. w.offsets = append(w.offsets, uint32(w.buf.Len()))
  106. for _, x := range w.offsets {
  107. buf4 := w.buf.Alloc(4)
  108. binary.LittleEndian.PutUint32(buf4, x)
  109. }
  110. w.buf.WriteByte(filterBaseLg)
  111. }
  112. func (w *filterWriter) generate() {
  113. // Record offset.
  114. w.offsets = append(w.offsets, uint32(w.buf.Len()))
  115. // Generate filters.
  116. if w.nKeys > 0 {
  117. w.generator.Generate(&w.buf)
  118. w.nKeys = 0
  119. }
  120. }
  121. // Writer is a table writer.
  122. type Writer struct {
  123. writer io.Writer
  124. err error
  125. // Options
  126. cmp comparer.Comparer
  127. filter filter.Filter
  128. compression opt.Compression
  129. blockSize int
  130. dataBlock blockWriter
  131. indexBlock blockWriter
  132. filterBlock filterWriter
  133. pendingBH blockHandle
  134. offset uint64
  135. nEntries int
  136. // Scratch allocated enough for 5 uvarint. Block writer should not use
  137. // first 20-bytes since it will be used to encode block handle, which
  138. // then passed to the block writer itself.
  139. scratch [50]byte
  140. comparerScratch []byte
  141. compressionScratch []byte
  142. }
  143. func (w *Writer) writeBlock(buf *util.Buffer, compression opt.Compression) (bh blockHandle, err error) {
  144. // Compress the buffer if necessary.
  145. var b []byte
  146. if compression == opt.SnappyCompression {
  147. // Allocate scratch enough for compression and block trailer.
  148. if n := snappy.MaxEncodedLen(buf.Len()) + blockTrailerLen; len(w.compressionScratch) < n {
  149. w.compressionScratch = make([]byte, n)
  150. }
  151. compressed := snappy.Encode(w.compressionScratch, buf.Bytes())
  152. n := len(compressed)
  153. b = compressed[:n+blockTrailerLen]
  154. b[n] = blockTypeSnappyCompression
  155. } else {
  156. tmp := buf.Alloc(blockTrailerLen)
  157. tmp[0] = blockTypeNoCompression
  158. b = buf.Bytes()
  159. }
  160. // Calculate the checksum.
  161. n := len(b) - 4
  162. checksum := util.NewCRC(b[:n]).Value()
  163. binary.LittleEndian.PutUint32(b[n:], checksum)
  164. // Write the buffer to the file.
  165. _, err = w.writer.Write(b)
  166. if err != nil {
  167. return
  168. }
  169. bh = blockHandle{w.offset, uint64(len(b) - blockTrailerLen)}
  170. w.offset += uint64(len(b))
  171. return
  172. }
  173. func (w *Writer) flushPendingBH(key []byte) {
  174. if w.pendingBH.length == 0 {
  175. return
  176. }
  177. var separator []byte
  178. if len(key) == 0 {
  179. separator = w.cmp.Successor(w.comparerScratch[:0], w.dataBlock.prevKey)
  180. } else {
  181. separator = w.cmp.Separator(w.comparerScratch[:0], w.dataBlock.prevKey, key)
  182. }
  183. if separator == nil {
  184. separator = w.dataBlock.prevKey
  185. } else {
  186. w.comparerScratch = separator
  187. }
  188. n := encodeBlockHandle(w.scratch[:20], w.pendingBH)
  189. // Append the block handle to the index block.
  190. w.indexBlock.append(separator, w.scratch[:n])
  191. // Reset prev key of the data block.
  192. w.dataBlock.prevKey = w.dataBlock.prevKey[:0]
  193. // Clear pending block handle.
  194. w.pendingBH = blockHandle{}
  195. }
  196. func (w *Writer) finishBlock() error {
  197. w.dataBlock.finish()
  198. bh, err := w.writeBlock(&w.dataBlock.buf, w.compression)
  199. if err != nil {
  200. return err
  201. }
  202. w.pendingBH = bh
  203. // Reset the data block.
  204. w.dataBlock.reset()
  205. // Flush the filter block.
  206. w.filterBlock.flush(w.offset)
  207. return nil
  208. }
  209. // Append appends key/value pair to the table. The keys passed must
  210. // be in increasing order.
  211. //
  212. // It is safe to modify the contents of the arguments after Append returns.
  213. func (w *Writer) Append(key, value []byte) error {
  214. if w.err != nil {
  215. return w.err
  216. }
  217. if w.nEntries > 0 && w.cmp.Compare(w.dataBlock.prevKey, key) >= 0 {
  218. w.err = fmt.Errorf("leveldb/table: Writer: keys are not in increasing order: %q, %q", w.dataBlock.prevKey, key)
  219. return w.err
  220. }
  221. w.flushPendingBH(key)
  222. // Append key/value pair to the data block.
  223. w.dataBlock.append(key, value)
  224. // Add key to the filter block.
  225. w.filterBlock.add(key)
  226. // Finish the data block if block size target reached.
  227. if w.dataBlock.bytesLen() >= w.blockSize {
  228. if err := w.finishBlock(); err != nil {
  229. w.err = err
  230. return w.err
  231. }
  232. }
  233. w.nEntries++
  234. return nil
  235. }
  236. // BlocksLen returns number of blocks written so far.
  237. func (w *Writer) BlocksLen() int {
  238. n := w.indexBlock.nEntries
  239. if w.pendingBH.length > 0 {
  240. // Includes the pending block.
  241. n++
  242. }
  243. return n
  244. }
  245. // EntriesLen returns number of entries added so far.
  246. func (w *Writer) EntriesLen() int {
  247. return w.nEntries
  248. }
  249. // BytesLen returns number of bytes written so far.
  250. func (w *Writer) BytesLen() int {
  251. return int(w.offset)
  252. }
  253. // Close will finalize the table. Calling Append is not possible
  254. // after Close, but calling BlocksLen, EntriesLen and BytesLen
  255. // is still possible.
  256. func (w *Writer) Close() error {
  257. if w.err != nil {
  258. return w.err
  259. }
  260. // Write the last data block. Or empty data block if there
  261. // aren't any data blocks at all.
  262. if w.dataBlock.nEntries > 0 || w.nEntries == 0 {
  263. if err := w.finishBlock(); err != nil {
  264. w.err = err
  265. return w.err
  266. }
  267. }
  268. w.flushPendingBH(nil)
  269. // Write the filter block.
  270. var filterBH blockHandle
  271. w.filterBlock.finish()
  272. if buf := &w.filterBlock.buf; buf.Len() > 0 {
  273. filterBH, w.err = w.writeBlock(buf, opt.NoCompression)
  274. if w.err != nil {
  275. return w.err
  276. }
  277. }
  278. // Write the metaindex block.
  279. if filterBH.length > 0 {
  280. key := []byte("filter." + w.filter.Name())
  281. n := encodeBlockHandle(w.scratch[:20], filterBH)
  282. w.dataBlock.append(key, w.scratch[:n])
  283. }
  284. w.dataBlock.finish()
  285. metaindexBH, err := w.writeBlock(&w.dataBlock.buf, w.compression)
  286. if err != nil {
  287. w.err = err
  288. return w.err
  289. }
  290. // Write the index block.
  291. w.indexBlock.finish()
  292. indexBH, err := w.writeBlock(&w.indexBlock.buf, w.compression)
  293. if err != nil {
  294. w.err = err
  295. return w.err
  296. }
  297. // Write the table footer.
  298. footer := w.scratch[:footerLen]
  299. for i := range footer {
  300. footer[i] = 0
  301. }
  302. n := encodeBlockHandle(footer, metaindexBH)
  303. encodeBlockHandle(footer[n:], indexBH)
  304. copy(footer[footerLen-len(magic):], magic)
  305. if _, err := w.writer.Write(footer); err != nil {
  306. w.err = err
  307. return w.err
  308. }
  309. w.offset += footerLen
  310. w.err = errors.New("leveldb/table: writer is closed")
  311. return nil
  312. }
  313. // NewWriter creates a new initialized table writer for the file.
  314. //
  315. // Table writer is not safe for concurrent use.
  316. func NewWriter(f io.Writer, o *opt.Options) *Writer {
  317. w := &Writer{
  318. writer: f,
  319. cmp: o.GetComparer(),
  320. filter: o.GetFilter(),
  321. compression: o.GetCompression(),
  322. blockSize: o.GetBlockSize(),
  323. comparerScratch: make([]byte, 0),
  324. }
  325. // data block
  326. w.dataBlock.restartInterval = o.GetBlockRestartInterval()
  327. // The first 20-bytes are used for encoding block handle.
  328. w.dataBlock.scratch = w.scratch[20:]
  329. // index block
  330. w.indexBlock.restartInterval = 1
  331. w.indexBlock.scratch = w.scratch[20:]
  332. // filter block
  333. if w.filter != nil {
  334. w.filterBlock.generator = w.filter.NewGenerator()
  335. w.filterBlock.flush(0)
  336. }
  337. return w
  338. }