batch.go 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349
  1. // Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
  2. // All rights reserved.
  3. //
  4. // Use of this source code is governed by a BSD-style license that can be
  5. // found in the LICENSE file.
  6. package leveldb
  7. import (
  8. "encoding/binary"
  9. "fmt"
  10. "io"
  11. "github.com/syndtr/goleveldb/leveldb/errors"
  12. "github.com/syndtr/goleveldb/leveldb/memdb"
  13. "github.com/syndtr/goleveldb/leveldb/storage"
  14. )
  15. // ErrBatchCorrupted records reason of batch corruption. This error will be
  16. // wrapped with errors.ErrCorrupted.
  17. type ErrBatchCorrupted struct {
  18. Reason string
  19. }
  20. func (e *ErrBatchCorrupted) Error() string {
  21. return fmt.Sprintf("leveldb: batch corrupted: %s", e.Reason)
  22. }
  23. func newErrBatchCorrupted(reason string) error {
  24. return errors.NewErrCorrupted(storage.FileDesc{}, &ErrBatchCorrupted{reason})
  25. }
  26. const (
  27. batchHeaderLen = 8 + 4
  28. batchGrowRec = 3000
  29. batchBufioSize = 16
  30. )
  31. // BatchReplay wraps basic batch operations.
  32. type BatchReplay interface {
  33. Put(key, value []byte)
  34. Delete(key []byte)
  35. }
  36. type batchIndex struct {
  37. keyType keyType
  38. keyPos, keyLen int
  39. valuePos, valueLen int
  40. }
  41. func (index batchIndex) k(data []byte) []byte {
  42. return data[index.keyPos : index.keyPos+index.keyLen]
  43. }
  44. func (index batchIndex) v(data []byte) []byte {
  45. if index.valueLen != 0 {
  46. return data[index.valuePos : index.valuePos+index.valueLen]
  47. }
  48. return nil
  49. }
  50. func (index batchIndex) kv(data []byte) (key, value []byte) {
  51. return index.k(data), index.v(data)
  52. }
  53. // Batch is a write batch.
  54. type Batch struct {
  55. data []byte
  56. index []batchIndex
  57. // internalLen is sums of key/value pair length plus 8-bytes internal key.
  58. internalLen int
  59. }
  60. func (b *Batch) grow(n int) {
  61. o := len(b.data)
  62. if cap(b.data)-o < n {
  63. div := 1
  64. if len(b.index) > batchGrowRec {
  65. div = len(b.index) / batchGrowRec
  66. }
  67. ndata := make([]byte, o, o+n+o/div)
  68. copy(ndata, b.data)
  69. b.data = ndata
  70. }
  71. }
  72. func (b *Batch) appendRec(kt keyType, key, value []byte) {
  73. n := 1 + binary.MaxVarintLen32 + len(key)
  74. if kt == keyTypeVal {
  75. n += binary.MaxVarintLen32 + len(value)
  76. }
  77. b.grow(n)
  78. index := batchIndex{keyType: kt}
  79. o := len(b.data)
  80. data := b.data[:o+n]
  81. data[o] = byte(kt)
  82. o++
  83. o += binary.PutUvarint(data[o:], uint64(len(key)))
  84. index.keyPos = o
  85. index.keyLen = len(key)
  86. o += copy(data[o:], key)
  87. if kt == keyTypeVal {
  88. o += binary.PutUvarint(data[o:], uint64(len(value)))
  89. index.valuePos = o
  90. index.valueLen = len(value)
  91. o += copy(data[o:], value)
  92. }
  93. b.data = data[:o]
  94. b.index = append(b.index, index)
  95. b.internalLen += index.keyLen + index.valueLen + 8
  96. }
  97. // Put appends 'put operation' of the given key/value pair to the batch.
  98. // It is safe to modify the contents of the argument after Put returns but not
  99. // before.
  100. func (b *Batch) Put(key, value []byte) {
  101. b.appendRec(keyTypeVal, key, value)
  102. }
  103. // Delete appends 'delete operation' of the given key to the batch.
  104. // It is safe to modify the contents of the argument after Delete returns but
  105. // not before.
  106. func (b *Batch) Delete(key []byte) {
  107. b.appendRec(keyTypeDel, key, nil)
  108. }
  109. // Dump dumps batch contents. The returned slice can be loaded into the
  110. // batch using Load method.
  111. // The returned slice is not its own copy, so the contents should not be
  112. // modified.
  113. func (b *Batch) Dump() []byte {
  114. return b.data
  115. }
  116. // Load loads given slice into the batch. Previous contents of the batch
  117. // will be discarded.
  118. // The given slice will not be copied and will be used as batch buffer, so
  119. // it is not safe to modify the contents of the slice.
  120. func (b *Batch) Load(data []byte) error {
  121. return b.decode(data, -1)
  122. }
  123. // Replay replays batch contents.
  124. func (b *Batch) Replay(r BatchReplay) error {
  125. for _, index := range b.index {
  126. switch index.keyType {
  127. case keyTypeVal:
  128. r.Put(index.k(b.data), index.v(b.data))
  129. case keyTypeDel:
  130. r.Delete(index.k(b.data))
  131. }
  132. }
  133. return nil
  134. }
  135. // Len returns number of records in the batch.
  136. func (b *Batch) Len() int {
  137. return len(b.index)
  138. }
  139. // Reset resets the batch.
  140. func (b *Batch) Reset() {
  141. b.data = b.data[:0]
  142. b.index = b.index[:0]
  143. b.internalLen = 0
  144. }
  145. func (b *Batch) replayInternal(fn func(i int, kt keyType, k, v []byte) error) error {
  146. for i, index := range b.index {
  147. if err := fn(i, index.keyType, index.k(b.data), index.v(b.data)); err != nil {
  148. return err
  149. }
  150. }
  151. return nil
  152. }
  153. func (b *Batch) append(p *Batch) {
  154. ob := len(b.data)
  155. oi := len(b.index)
  156. b.data = append(b.data, p.data...)
  157. b.index = append(b.index, p.index...)
  158. b.internalLen += p.internalLen
  159. // Updating index offset.
  160. if ob != 0 {
  161. for ; oi < len(b.index); oi++ {
  162. index := &b.index[oi]
  163. index.keyPos += ob
  164. if index.valueLen != 0 {
  165. index.valuePos += ob
  166. }
  167. }
  168. }
  169. }
  170. func (b *Batch) decode(data []byte, expectedLen int) error {
  171. b.data = data
  172. b.index = b.index[:0]
  173. b.internalLen = 0
  174. err := decodeBatch(data, func(i int, index batchIndex) error {
  175. b.index = append(b.index, index)
  176. b.internalLen += index.keyLen + index.valueLen + 8
  177. return nil
  178. })
  179. if err != nil {
  180. return err
  181. }
  182. if expectedLen >= 0 && len(b.index) != expectedLen {
  183. return newErrBatchCorrupted(fmt.Sprintf("invalid records length: %d vs %d", expectedLen, len(b.index)))
  184. }
  185. return nil
  186. }
  187. func (b *Batch) putMem(seq uint64, mdb *memdb.DB) error {
  188. var ik []byte
  189. for i, index := range b.index {
  190. ik = makeInternalKey(ik, index.k(b.data), seq+uint64(i), index.keyType)
  191. if err := mdb.Put(ik, index.v(b.data)); err != nil {
  192. return err
  193. }
  194. }
  195. return nil
  196. }
  197. func (b *Batch) revertMem(seq uint64, mdb *memdb.DB) error {
  198. var ik []byte
  199. for i, index := range b.index {
  200. ik = makeInternalKey(ik, index.k(b.data), seq+uint64(i), index.keyType)
  201. if err := mdb.Delete(ik); err != nil {
  202. return err
  203. }
  204. }
  205. return nil
  206. }
  207. func newBatch() interface{} {
  208. return &Batch{}
  209. }
  210. func decodeBatch(data []byte, fn func(i int, index batchIndex) error) error {
  211. var index batchIndex
  212. for i, o := 0, 0; o < len(data); i++ {
  213. // Key type.
  214. index.keyType = keyType(data[o])
  215. if index.keyType > keyTypeVal {
  216. return newErrBatchCorrupted(fmt.Sprintf("bad record: invalid type %#x", uint(index.keyType)))
  217. }
  218. o++
  219. // Key.
  220. x, n := binary.Uvarint(data[o:])
  221. o += n
  222. if n <= 0 || o+int(x) > len(data) {
  223. return newErrBatchCorrupted("bad record: invalid key length")
  224. }
  225. index.keyPos = o
  226. index.keyLen = int(x)
  227. o += index.keyLen
  228. // Value.
  229. if index.keyType == keyTypeVal {
  230. x, n = binary.Uvarint(data[o:])
  231. o += n
  232. if n <= 0 || o+int(x) > len(data) {
  233. return newErrBatchCorrupted("bad record: invalid value length")
  234. }
  235. index.valuePos = o
  236. index.valueLen = int(x)
  237. o += index.valueLen
  238. } else {
  239. index.valuePos = 0
  240. index.valueLen = 0
  241. }
  242. if err := fn(i, index); err != nil {
  243. return err
  244. }
  245. }
  246. return nil
  247. }
  248. func decodeBatchToMem(data []byte, expectSeq uint64, mdb *memdb.DB) (seq uint64, batchLen int, err error) {
  249. seq, batchLen, err = decodeBatchHeader(data)
  250. if err != nil {
  251. return 0, 0, err
  252. }
  253. if seq < expectSeq {
  254. return 0, 0, newErrBatchCorrupted("invalid sequence number")
  255. }
  256. data = data[batchHeaderLen:]
  257. var ik []byte
  258. var decodedLen int
  259. err = decodeBatch(data, func(i int, index batchIndex) error {
  260. if i >= batchLen {
  261. return newErrBatchCorrupted("invalid records length")
  262. }
  263. ik = makeInternalKey(ik, index.k(data), seq+uint64(i), index.keyType)
  264. if err := mdb.Put(ik, index.v(data)); err != nil {
  265. return err
  266. }
  267. decodedLen++
  268. return nil
  269. })
  270. if err == nil && decodedLen != batchLen {
  271. err = newErrBatchCorrupted(fmt.Sprintf("invalid records length: %d vs %d", batchLen, decodedLen))
  272. }
  273. return
  274. }
  275. func encodeBatchHeader(dst []byte, seq uint64, batchLen int) []byte {
  276. dst = ensureBuffer(dst, batchHeaderLen)
  277. binary.LittleEndian.PutUint64(dst, seq)
  278. binary.LittleEndian.PutUint32(dst[8:], uint32(batchLen))
  279. return dst
  280. }
  281. func decodeBatchHeader(data []byte) (seq uint64, batchLen int, err error) {
  282. if len(data) < batchHeaderLen {
  283. return 0, 0, newErrBatchCorrupted("too short")
  284. }
  285. seq = binary.LittleEndian.Uint64(data)
  286. batchLen = int(binary.LittleEndian.Uint32(data[8:]))
  287. if batchLen < 0 {
  288. return 0, 0, newErrBatchCorrupted("invalid records length")
  289. }
  290. return
  291. }
  292. func batchesLen(batches []*Batch) int {
  293. batchLen := 0
  294. for _, batch := range batches {
  295. batchLen += batch.Len()
  296. }
  297. return batchLen
  298. }
  299. func writeBatchesWithHeader(wr io.Writer, batches []*Batch, seq uint64) error {
  300. if _, err := wr.Write(encodeBatchHeader(nil, seq, batchesLen(batches))); err != nil {
  301. return err
  302. }
  303. for _, batch := range batches {
  304. if _, err := wr.Write(batch.data); err != nil {
  305. return err
  306. }
  307. }
  308. return nil
  309. }