db_write.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464
  1. // Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
  2. // All rights reserved.
  3. //
  4. // Use of this source code is governed by a BSD-style license that can be
  5. // found in the LICENSE file.
  6. package leveldb
  7. import (
  8. "sync/atomic"
  9. "time"
  10. "github.com/syndtr/goleveldb/leveldb/memdb"
  11. "github.com/syndtr/goleveldb/leveldb/opt"
  12. "github.com/syndtr/goleveldb/leveldb/util"
  13. )
  14. func (db *DB) writeJournal(batches []*Batch, seq uint64, sync bool) error {
  15. wr, err := db.journal.Next()
  16. if err != nil {
  17. return err
  18. }
  19. if err := writeBatchesWithHeader(wr, batches, seq); err != nil {
  20. return err
  21. }
  22. if err := db.journal.Flush(); err != nil {
  23. return err
  24. }
  25. if sync {
  26. return db.journalWriter.Sync()
  27. }
  28. return nil
  29. }
  30. func (db *DB) rotateMem(n int, wait bool) (mem *memDB, err error) {
  31. retryLimit := 3
  32. retry:
  33. // Wait for pending memdb compaction.
  34. err = db.compTriggerWait(db.mcompCmdC)
  35. if err != nil {
  36. return
  37. }
  38. retryLimit--
  39. // Create new memdb and journal.
  40. mem, err = db.newMem(n)
  41. if err != nil {
  42. if err == errHasFrozenMem {
  43. if retryLimit <= 0 {
  44. panic("BUG: still has frozen memdb")
  45. }
  46. goto retry
  47. }
  48. return
  49. }
  50. // Schedule memdb compaction.
  51. if wait {
  52. err = db.compTriggerWait(db.mcompCmdC)
  53. } else {
  54. db.compTrigger(db.mcompCmdC)
  55. }
  56. return
  57. }
  58. func (db *DB) flush(n int) (mdb *memDB, mdbFree int, err error) {
  59. delayed := false
  60. slowdownTrigger := db.s.o.GetWriteL0SlowdownTrigger()
  61. pauseTrigger := db.s.o.GetWriteL0PauseTrigger()
  62. flush := func() (retry bool) {
  63. mdb = db.getEffectiveMem()
  64. if mdb == nil {
  65. err = ErrClosed
  66. return false
  67. }
  68. defer func() {
  69. if retry {
  70. mdb.decref()
  71. mdb = nil
  72. }
  73. }()
  74. tLen := db.s.tLen(0)
  75. mdbFree = mdb.Free()
  76. switch {
  77. case tLen >= slowdownTrigger && !delayed:
  78. delayed = true
  79. time.Sleep(time.Millisecond)
  80. case mdbFree >= n:
  81. return false
  82. case tLen >= pauseTrigger:
  83. delayed = true
  84. // Set the write paused flag explicitly.
  85. atomic.StoreInt32(&db.inWritePaused, 1)
  86. err = db.compTriggerWait(db.tcompCmdC)
  87. // Unset the write paused flag.
  88. atomic.StoreInt32(&db.inWritePaused, 0)
  89. if err != nil {
  90. return false
  91. }
  92. default:
  93. // Allow memdb to grow if it has no entry.
  94. if mdb.Len() == 0 {
  95. mdbFree = n
  96. } else {
  97. mdb.decref()
  98. mdb, err = db.rotateMem(n, false)
  99. if err == nil {
  100. mdbFree = mdb.Free()
  101. } else {
  102. mdbFree = 0
  103. }
  104. }
  105. return false
  106. }
  107. return true
  108. }
  109. start := time.Now()
  110. for flush() {
  111. }
  112. if delayed {
  113. db.writeDelay += time.Since(start)
  114. db.writeDelayN++
  115. } else if db.writeDelayN > 0 {
  116. db.logf("db@write was delayed N·%d T·%v", db.writeDelayN, db.writeDelay)
  117. atomic.AddInt32(&db.cWriteDelayN, int32(db.writeDelayN))
  118. atomic.AddInt64(&db.cWriteDelay, int64(db.writeDelay))
  119. db.writeDelay = 0
  120. db.writeDelayN = 0
  121. }
  122. return
  123. }
  124. type writeMerge struct {
  125. sync bool
  126. batch *Batch
  127. keyType keyType
  128. key, value []byte
  129. }
  130. func (db *DB) unlockWrite(overflow bool, merged int, err error) {
  131. for i := 0; i < merged; i++ {
  132. db.writeAckC <- err
  133. }
  134. if overflow {
  135. // Pass lock to the next write (that failed to merge).
  136. db.writeMergedC <- false
  137. } else {
  138. // Release lock.
  139. <-db.writeLockC
  140. }
  141. }
  142. // ourBatch is batch that we can modify.
  143. func (db *DB) writeLocked(batch, ourBatch *Batch, merge, sync bool) error {
  144. // Try to flush memdb. This method would also trying to throttle writes
  145. // if it is too fast and compaction cannot catch-up.
  146. mdb, mdbFree, err := db.flush(batch.internalLen)
  147. if err != nil {
  148. db.unlockWrite(false, 0, err)
  149. return err
  150. }
  151. defer mdb.decref()
  152. var (
  153. overflow bool
  154. merged int
  155. batches = []*Batch{batch}
  156. )
  157. if merge {
  158. // Merge limit.
  159. var mergeLimit int
  160. if batch.internalLen > 128<<10 {
  161. mergeLimit = (1 << 20) - batch.internalLen
  162. } else {
  163. mergeLimit = 128 << 10
  164. }
  165. mergeCap := mdbFree - batch.internalLen
  166. if mergeLimit > mergeCap {
  167. mergeLimit = mergeCap
  168. }
  169. merge:
  170. for mergeLimit > 0 {
  171. select {
  172. case incoming := <-db.writeMergeC:
  173. if incoming.batch != nil {
  174. // Merge batch.
  175. if incoming.batch.internalLen > mergeLimit {
  176. overflow = true
  177. break merge
  178. }
  179. batches = append(batches, incoming.batch)
  180. mergeLimit -= incoming.batch.internalLen
  181. } else {
  182. // Merge put.
  183. internalLen := len(incoming.key) + len(incoming.value) + 8
  184. if internalLen > mergeLimit {
  185. overflow = true
  186. break merge
  187. }
  188. if ourBatch == nil {
  189. ourBatch = db.batchPool.Get().(*Batch)
  190. ourBatch.Reset()
  191. batches = append(batches, ourBatch)
  192. }
  193. // We can use same batch since concurrent write doesn't
  194. // guarantee write order.
  195. ourBatch.appendRec(incoming.keyType, incoming.key, incoming.value)
  196. mergeLimit -= internalLen
  197. }
  198. sync = sync || incoming.sync
  199. merged++
  200. db.writeMergedC <- true
  201. default:
  202. break merge
  203. }
  204. }
  205. }
  206. // Release ourBatch if any.
  207. if ourBatch != nil {
  208. defer db.batchPool.Put(ourBatch)
  209. }
  210. // Seq number.
  211. seq := db.seq + 1
  212. // Write journal.
  213. if err := db.writeJournal(batches, seq, sync); err != nil {
  214. db.unlockWrite(overflow, merged, err)
  215. return err
  216. }
  217. // Put batches.
  218. for _, batch := range batches {
  219. if err := batch.putMem(seq, mdb.DB); err != nil {
  220. panic(err)
  221. }
  222. seq += uint64(batch.Len())
  223. }
  224. // Incr seq number.
  225. db.addSeq(uint64(batchesLen(batches)))
  226. // Rotate memdb if it's reach the threshold.
  227. if batch.internalLen >= mdbFree {
  228. db.rotateMem(0, false)
  229. }
  230. db.unlockWrite(overflow, merged, nil)
  231. return nil
  232. }
  233. // Write apply the given batch to the DB. The batch records will be applied
  234. // sequentially. Write might be used concurrently, when used concurrently and
  235. // batch is small enough, write will try to merge the batches. Set NoWriteMerge
  236. // option to true to disable write merge.
  237. //
  238. // It is safe to modify the contents of the arguments after Write returns but
  239. // not before. Write will not modify content of the batch.
  240. func (db *DB) Write(batch *Batch, wo *opt.WriteOptions) error {
  241. if err := db.ok(); err != nil || batch == nil || batch.Len() == 0 {
  242. return err
  243. }
  244. // If the batch size is larger than write buffer, it may justified to write
  245. // using transaction instead. Using transaction the batch will be written
  246. // into tables directly, skipping the journaling.
  247. if batch.internalLen > db.s.o.GetWriteBuffer() && !db.s.o.GetDisableLargeBatchTransaction() {
  248. tr, err := db.OpenTransaction()
  249. if err != nil {
  250. return err
  251. }
  252. if err := tr.Write(batch, wo); err != nil {
  253. tr.Discard()
  254. return err
  255. }
  256. return tr.Commit()
  257. }
  258. merge := !wo.GetNoWriteMerge() && !db.s.o.GetNoWriteMerge()
  259. sync := wo.GetSync() && !db.s.o.GetNoSync()
  260. // Acquire write lock.
  261. if merge {
  262. select {
  263. case db.writeMergeC <- writeMerge{sync: sync, batch: batch}:
  264. if <-db.writeMergedC {
  265. // Write is merged.
  266. return <-db.writeAckC
  267. }
  268. // Write is not merged, the write lock is handed to us. Continue.
  269. case db.writeLockC <- struct{}{}:
  270. // Write lock acquired.
  271. case err := <-db.compPerErrC:
  272. // Compaction error.
  273. return err
  274. case <-db.closeC:
  275. // Closed
  276. return ErrClosed
  277. }
  278. } else {
  279. select {
  280. case db.writeLockC <- struct{}{}:
  281. // Write lock acquired.
  282. case err := <-db.compPerErrC:
  283. // Compaction error.
  284. return err
  285. case <-db.closeC:
  286. // Closed
  287. return ErrClosed
  288. }
  289. }
  290. return db.writeLocked(batch, nil, merge, sync)
  291. }
  292. func (db *DB) putRec(kt keyType, key, value []byte, wo *opt.WriteOptions) error {
  293. if err := db.ok(); err != nil {
  294. return err
  295. }
  296. merge := !wo.GetNoWriteMerge() && !db.s.o.GetNoWriteMerge()
  297. sync := wo.GetSync() && !db.s.o.GetNoSync()
  298. // Acquire write lock.
  299. if merge {
  300. select {
  301. case db.writeMergeC <- writeMerge{sync: sync, keyType: kt, key: key, value: value}:
  302. if <-db.writeMergedC {
  303. // Write is merged.
  304. return <-db.writeAckC
  305. }
  306. // Write is not merged, the write lock is handed to us. Continue.
  307. case db.writeLockC <- struct{}{}:
  308. // Write lock acquired.
  309. case err := <-db.compPerErrC:
  310. // Compaction error.
  311. return err
  312. case <-db.closeC:
  313. // Closed
  314. return ErrClosed
  315. }
  316. } else {
  317. select {
  318. case db.writeLockC <- struct{}{}:
  319. // Write lock acquired.
  320. case err := <-db.compPerErrC:
  321. // Compaction error.
  322. return err
  323. case <-db.closeC:
  324. // Closed
  325. return ErrClosed
  326. }
  327. }
  328. batch := db.batchPool.Get().(*Batch)
  329. batch.Reset()
  330. batch.appendRec(kt, key, value)
  331. return db.writeLocked(batch, batch, merge, sync)
  332. }
  333. // Put sets the value for the given key. It overwrites any previous value
  334. // for that key; a DB is not a multi-map. Write merge also applies for Put, see
  335. // Write.
  336. //
  337. // It is safe to modify the contents of the arguments after Put returns but not
  338. // before.
  339. func (db *DB) Put(key, value []byte, wo *opt.WriteOptions) error {
  340. return db.putRec(keyTypeVal, key, value, wo)
  341. }
  342. // Delete deletes the value for the given key. Delete will not returns error if
  343. // key doesn't exist. Write merge also applies for Delete, see Write.
  344. //
  345. // It is safe to modify the contents of the arguments after Delete returns but
  346. // not before.
  347. func (db *DB) Delete(key []byte, wo *opt.WriteOptions) error {
  348. return db.putRec(keyTypeDel, key, nil, wo)
  349. }
  350. func isMemOverlaps(icmp *iComparer, mem *memdb.DB, min, max []byte) bool {
  351. iter := mem.NewIterator(nil)
  352. defer iter.Release()
  353. return (max == nil || (iter.First() && icmp.uCompare(max, internalKey(iter.Key()).ukey()) >= 0)) &&
  354. (min == nil || (iter.Last() && icmp.uCompare(min, internalKey(iter.Key()).ukey()) <= 0))
  355. }
  356. // CompactRange compacts the underlying DB for the given key range.
  357. // In particular, deleted and overwritten versions are discarded,
  358. // and the data is rearranged to reduce the cost of operations
  359. // needed to access the data. This operation should typically only
  360. // be invoked by users who understand the underlying implementation.
  361. //
  362. // A nil Range.Start is treated as a key before all keys in the DB.
  363. // And a nil Range.Limit is treated as a key after all keys in the DB.
  364. // Therefore if both is nil then it will compact entire DB.
  365. func (db *DB) CompactRange(r util.Range) error {
  366. if err := db.ok(); err != nil {
  367. return err
  368. }
  369. // Lock writer.
  370. select {
  371. case db.writeLockC <- struct{}{}:
  372. case err := <-db.compPerErrC:
  373. return err
  374. case <-db.closeC:
  375. return ErrClosed
  376. }
  377. // Check for overlaps in memdb.
  378. mdb := db.getEffectiveMem()
  379. if mdb == nil {
  380. return ErrClosed
  381. }
  382. defer mdb.decref()
  383. if isMemOverlaps(db.s.icmp, mdb.DB, r.Start, r.Limit) {
  384. // Memdb compaction.
  385. if _, err := db.rotateMem(0, false); err != nil {
  386. <-db.writeLockC
  387. return err
  388. }
  389. <-db.writeLockC
  390. if err := db.compTriggerWait(db.mcompCmdC); err != nil {
  391. return err
  392. }
  393. } else {
  394. <-db.writeLockC
  395. }
  396. // Table compaction.
  397. return db.compTriggerRange(db.tcompCmdC, -1, r.Start, r.Limit)
  398. }
  399. // SetReadOnly makes DB read-only. It will stay read-only until reopened.
  400. func (db *DB) SetReadOnly() error {
  401. if err := db.ok(); err != nil {
  402. return err
  403. }
  404. // Lock writer.
  405. select {
  406. case db.writeLockC <- struct{}{}:
  407. db.compWriteLocking = true
  408. case err := <-db.compPerErrC:
  409. return err
  410. case <-db.closeC:
  411. return ErrClosed
  412. }
  413. // Set compaction read-only.
  414. select {
  415. case db.compErrSetC <- ErrReadOnly:
  416. case perr := <-db.compPerErrC:
  417. return perr
  418. case <-db.closeC:
  419. return ErrClosed
  420. }
  421. return nil
  422. }