db_compaction.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854
  1. // Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
  2. // All rights reserved.
  3. //
  4. // Use of this source code is governed by a BSD-style license that can be
  5. // found in the LICENSE file.
  6. package leveldb
  7. import (
  8. "sync"
  9. "time"
  10. "github.com/syndtr/goleveldb/leveldb/errors"
  11. "github.com/syndtr/goleveldb/leveldb/opt"
  12. "github.com/syndtr/goleveldb/leveldb/storage"
  13. )
  14. var (
  15. errCompactionTransactExiting = errors.New("leveldb: compaction transact exiting")
  16. )
  17. type cStat struct {
  18. duration time.Duration
  19. read int64
  20. write int64
  21. }
  22. func (p *cStat) add(n *cStatStaging) {
  23. p.duration += n.duration
  24. p.read += n.read
  25. p.write += n.write
  26. }
  27. func (p *cStat) get() (duration time.Duration, read, write int64) {
  28. return p.duration, p.read, p.write
  29. }
  30. type cStatStaging struct {
  31. start time.Time
  32. duration time.Duration
  33. on bool
  34. read int64
  35. write int64
  36. }
  37. func (p *cStatStaging) startTimer() {
  38. if !p.on {
  39. p.start = time.Now()
  40. p.on = true
  41. }
  42. }
  43. func (p *cStatStaging) stopTimer() {
  44. if p.on {
  45. p.duration += time.Since(p.start)
  46. p.on = false
  47. }
  48. }
  49. type cStats struct {
  50. lk sync.Mutex
  51. stats []cStat
  52. }
  53. func (p *cStats) addStat(level int, n *cStatStaging) {
  54. p.lk.Lock()
  55. if level >= len(p.stats) {
  56. newStats := make([]cStat, level+1)
  57. copy(newStats, p.stats)
  58. p.stats = newStats
  59. }
  60. p.stats[level].add(n)
  61. p.lk.Unlock()
  62. }
  63. func (p *cStats) getStat(level int) (duration time.Duration, read, write int64) {
  64. p.lk.Lock()
  65. defer p.lk.Unlock()
  66. if level < len(p.stats) {
  67. return p.stats[level].get()
  68. }
  69. return
  70. }
  71. func (db *DB) compactionError() {
  72. var err error
  73. noerr:
  74. // No error.
  75. for {
  76. select {
  77. case err = <-db.compErrSetC:
  78. switch {
  79. case err == nil:
  80. case err == ErrReadOnly, errors.IsCorrupted(err):
  81. goto hasperr
  82. default:
  83. goto haserr
  84. }
  85. case <-db.closeC:
  86. return
  87. }
  88. }
  89. haserr:
  90. // Transient error.
  91. for {
  92. select {
  93. case db.compErrC <- err:
  94. case err = <-db.compErrSetC:
  95. switch {
  96. case err == nil:
  97. goto noerr
  98. case err == ErrReadOnly, errors.IsCorrupted(err):
  99. goto hasperr
  100. default:
  101. }
  102. case <-db.closeC:
  103. return
  104. }
  105. }
  106. hasperr:
  107. // Persistent error.
  108. for {
  109. select {
  110. case db.compErrC <- err:
  111. case db.compPerErrC <- err:
  112. case db.writeLockC <- struct{}{}:
  113. // Hold write lock, so that write won't pass-through.
  114. db.compWriteLocking = true
  115. case <-db.closeC:
  116. if db.compWriteLocking {
  117. // We should release the lock or Close will hang.
  118. <-db.writeLockC
  119. }
  120. return
  121. }
  122. }
  123. }
  124. type compactionTransactCounter int
  125. func (cnt *compactionTransactCounter) incr() {
  126. *cnt++
  127. }
  128. type compactionTransactInterface interface {
  129. run(cnt *compactionTransactCounter) error
  130. revert() error
  131. }
  132. func (db *DB) compactionTransact(name string, t compactionTransactInterface) {
  133. defer func() {
  134. if x := recover(); x != nil {
  135. if x == errCompactionTransactExiting {
  136. if err := t.revert(); err != nil {
  137. db.logf("%s revert error %q", name, err)
  138. }
  139. }
  140. panic(x)
  141. }
  142. }()
  143. const (
  144. backoffMin = 1 * time.Second
  145. backoffMax = 8 * time.Second
  146. backoffMul = 2 * time.Second
  147. )
  148. var (
  149. backoff = backoffMin
  150. backoffT = time.NewTimer(backoff)
  151. lastCnt = compactionTransactCounter(0)
  152. disableBackoff = db.s.o.GetDisableCompactionBackoff()
  153. )
  154. for n := 0; ; n++ {
  155. // Check whether the DB is closed.
  156. if db.isClosed() {
  157. db.logf("%s exiting", name)
  158. db.compactionExitTransact()
  159. } else if n > 0 {
  160. db.logf("%s retrying N·%d", name, n)
  161. }
  162. // Execute.
  163. cnt := compactionTransactCounter(0)
  164. err := t.run(&cnt)
  165. if err != nil {
  166. db.logf("%s error I·%d %q", name, cnt, err)
  167. }
  168. // Set compaction error status.
  169. select {
  170. case db.compErrSetC <- err:
  171. case perr := <-db.compPerErrC:
  172. if err != nil {
  173. db.logf("%s exiting (persistent error %q)", name, perr)
  174. db.compactionExitTransact()
  175. }
  176. case <-db.closeC:
  177. db.logf("%s exiting", name)
  178. db.compactionExitTransact()
  179. }
  180. if err == nil {
  181. return
  182. }
  183. if errors.IsCorrupted(err) {
  184. db.logf("%s exiting (corruption detected)", name)
  185. db.compactionExitTransact()
  186. }
  187. if !disableBackoff {
  188. // Reset backoff duration if counter is advancing.
  189. if cnt > lastCnt {
  190. backoff = backoffMin
  191. lastCnt = cnt
  192. }
  193. // Backoff.
  194. backoffT.Reset(backoff)
  195. if backoff < backoffMax {
  196. backoff *= backoffMul
  197. if backoff > backoffMax {
  198. backoff = backoffMax
  199. }
  200. }
  201. select {
  202. case <-backoffT.C:
  203. case <-db.closeC:
  204. db.logf("%s exiting", name)
  205. db.compactionExitTransact()
  206. }
  207. }
  208. }
  209. }
  210. type compactionTransactFunc struct {
  211. runFunc func(cnt *compactionTransactCounter) error
  212. revertFunc func() error
  213. }
  214. func (t *compactionTransactFunc) run(cnt *compactionTransactCounter) error {
  215. return t.runFunc(cnt)
  216. }
  217. func (t *compactionTransactFunc) revert() error {
  218. if t.revertFunc != nil {
  219. return t.revertFunc()
  220. }
  221. return nil
  222. }
  223. func (db *DB) compactionTransactFunc(name string, run func(cnt *compactionTransactCounter) error, revert func() error) {
  224. db.compactionTransact(name, &compactionTransactFunc{run, revert})
  225. }
  226. func (db *DB) compactionExitTransact() {
  227. panic(errCompactionTransactExiting)
  228. }
  229. func (db *DB) compactionCommit(name string, rec *sessionRecord) {
  230. db.compCommitLk.Lock()
  231. defer db.compCommitLk.Unlock() // Defer is necessary.
  232. db.compactionTransactFunc(name+"@commit", func(cnt *compactionTransactCounter) error {
  233. return db.s.commit(rec)
  234. }, nil)
  235. }
  236. func (db *DB) memCompaction() {
  237. mdb := db.getFrozenMem()
  238. if mdb == nil {
  239. return
  240. }
  241. defer mdb.decref()
  242. db.logf("memdb@flush N·%d S·%s", mdb.Len(), shortenb(mdb.Size()))
  243. // Don't compact empty memdb.
  244. if mdb.Len() == 0 {
  245. db.logf("memdb@flush skipping")
  246. // drop frozen memdb
  247. db.dropFrozenMem()
  248. return
  249. }
  250. // Pause table compaction.
  251. resumeC := make(chan struct{})
  252. select {
  253. case db.tcompPauseC <- (chan<- struct{})(resumeC):
  254. case <-db.compPerErrC:
  255. close(resumeC)
  256. resumeC = nil
  257. case <-db.closeC:
  258. db.compactionExitTransact()
  259. }
  260. var (
  261. rec = &sessionRecord{}
  262. stats = &cStatStaging{}
  263. flushLevel int
  264. )
  265. // Generate tables.
  266. db.compactionTransactFunc("memdb@flush", func(cnt *compactionTransactCounter) (err error) {
  267. stats.startTimer()
  268. flushLevel, err = db.s.flushMemdb(rec, mdb.DB, db.memdbMaxLevel)
  269. stats.stopTimer()
  270. return
  271. }, func() error {
  272. for _, r := range rec.addedTables {
  273. db.logf("memdb@flush revert @%d", r.num)
  274. if err := db.s.stor.Remove(storage.FileDesc{Type: storage.TypeTable, Num: r.num}); err != nil {
  275. return err
  276. }
  277. }
  278. return nil
  279. })
  280. rec.setJournalNum(db.journalFd.Num)
  281. rec.setSeqNum(db.frozenSeq)
  282. // Commit.
  283. stats.startTimer()
  284. db.compactionCommit("memdb", rec)
  285. stats.stopTimer()
  286. db.logf("memdb@flush committed F·%d T·%v", len(rec.addedTables), stats.duration)
  287. for _, r := range rec.addedTables {
  288. stats.write += r.size
  289. }
  290. db.compStats.addStat(flushLevel, stats)
  291. // Drop frozen memdb.
  292. db.dropFrozenMem()
  293. // Resume table compaction.
  294. if resumeC != nil {
  295. select {
  296. case <-resumeC:
  297. close(resumeC)
  298. case <-db.closeC:
  299. db.compactionExitTransact()
  300. }
  301. }
  302. // Trigger table compaction.
  303. db.compTrigger(db.tcompCmdC)
  304. }
  305. type tableCompactionBuilder struct {
  306. db *DB
  307. s *session
  308. c *compaction
  309. rec *sessionRecord
  310. stat0, stat1 *cStatStaging
  311. snapHasLastUkey bool
  312. snapLastUkey []byte
  313. snapLastSeq uint64
  314. snapIter int
  315. snapKerrCnt int
  316. snapDropCnt int
  317. kerrCnt int
  318. dropCnt int
  319. minSeq uint64
  320. strict bool
  321. tableSize int
  322. tw *tWriter
  323. }
  324. func (b *tableCompactionBuilder) appendKV(key, value []byte) error {
  325. // Create new table if not already.
  326. if b.tw == nil {
  327. // Check for pause event.
  328. if b.db != nil {
  329. select {
  330. case ch := <-b.db.tcompPauseC:
  331. b.db.pauseCompaction(ch)
  332. case <-b.db.closeC:
  333. b.db.compactionExitTransact()
  334. default:
  335. }
  336. }
  337. // Create new table.
  338. var err error
  339. b.tw, err = b.s.tops.create()
  340. if err != nil {
  341. return err
  342. }
  343. }
  344. // Write key/value into table.
  345. return b.tw.append(key, value)
  346. }
  347. func (b *tableCompactionBuilder) needFlush() bool {
  348. return b.tw.tw.BytesLen() >= b.tableSize
  349. }
  350. func (b *tableCompactionBuilder) flush() error {
  351. t, err := b.tw.finish()
  352. if err != nil {
  353. return err
  354. }
  355. b.rec.addTableFile(b.c.sourceLevel+1, t)
  356. b.stat1.write += t.size
  357. b.s.logf("table@build created L%d@%d N·%d S·%s %q:%q", b.c.sourceLevel+1, t.fd.Num, b.tw.tw.EntriesLen(), shortenb(int(t.size)), t.imin, t.imax)
  358. b.tw = nil
  359. return nil
  360. }
  361. func (b *tableCompactionBuilder) cleanup() {
  362. if b.tw != nil {
  363. b.tw.drop()
  364. b.tw = nil
  365. }
  366. }
  367. func (b *tableCompactionBuilder) run(cnt *compactionTransactCounter) error {
  368. snapResumed := b.snapIter > 0
  369. hasLastUkey := b.snapHasLastUkey // The key might has zero length, so this is necessary.
  370. lastUkey := append([]byte{}, b.snapLastUkey...)
  371. lastSeq := b.snapLastSeq
  372. b.kerrCnt = b.snapKerrCnt
  373. b.dropCnt = b.snapDropCnt
  374. // Restore compaction state.
  375. b.c.restore()
  376. defer b.cleanup()
  377. b.stat1.startTimer()
  378. defer b.stat1.stopTimer()
  379. iter := b.c.newIterator()
  380. defer iter.Release()
  381. for i := 0; iter.Next(); i++ {
  382. // Incr transact counter.
  383. cnt.incr()
  384. // Skip until last state.
  385. if i < b.snapIter {
  386. continue
  387. }
  388. resumed := false
  389. if snapResumed {
  390. resumed = true
  391. snapResumed = false
  392. }
  393. ikey := iter.Key()
  394. ukey, seq, kt, kerr := parseInternalKey(ikey)
  395. if kerr == nil {
  396. shouldStop := !resumed && b.c.shouldStopBefore(ikey)
  397. if !hasLastUkey || b.s.icmp.uCompare(lastUkey, ukey) != 0 {
  398. // First occurrence of this user key.
  399. // Only rotate tables if ukey doesn't hop across.
  400. if b.tw != nil && (shouldStop || b.needFlush()) {
  401. if err := b.flush(); err != nil {
  402. return err
  403. }
  404. // Creates snapshot of the state.
  405. b.c.save()
  406. b.snapHasLastUkey = hasLastUkey
  407. b.snapLastUkey = append(b.snapLastUkey[:0], lastUkey...)
  408. b.snapLastSeq = lastSeq
  409. b.snapIter = i
  410. b.snapKerrCnt = b.kerrCnt
  411. b.snapDropCnt = b.dropCnt
  412. }
  413. hasLastUkey = true
  414. lastUkey = append(lastUkey[:0], ukey...)
  415. lastSeq = keyMaxSeq
  416. }
  417. switch {
  418. case lastSeq <= b.minSeq:
  419. // Dropped because newer entry for same user key exist
  420. fallthrough // (A)
  421. case kt == keyTypeDel && seq <= b.minSeq && b.c.baseLevelForKey(lastUkey):
  422. // For this user key:
  423. // (1) there is no data in higher levels
  424. // (2) data in lower levels will have larger seq numbers
  425. // (3) data in layers that are being compacted here and have
  426. // smaller seq numbers will be dropped in the next
  427. // few iterations of this loop (by rule (A) above).
  428. // Therefore this deletion marker is obsolete and can be dropped.
  429. lastSeq = seq
  430. b.dropCnt++
  431. continue
  432. default:
  433. lastSeq = seq
  434. }
  435. } else {
  436. if b.strict {
  437. return kerr
  438. }
  439. // Don't drop corrupted keys.
  440. hasLastUkey = false
  441. lastUkey = lastUkey[:0]
  442. lastSeq = keyMaxSeq
  443. b.kerrCnt++
  444. }
  445. if err := b.appendKV(ikey, iter.Value()); err != nil {
  446. return err
  447. }
  448. }
  449. if err := iter.Error(); err != nil {
  450. return err
  451. }
  452. // Finish last table.
  453. if b.tw != nil && !b.tw.empty() {
  454. return b.flush()
  455. }
  456. return nil
  457. }
  458. func (b *tableCompactionBuilder) revert() error {
  459. for _, at := range b.rec.addedTables {
  460. b.s.logf("table@build revert @%d", at.num)
  461. if err := b.s.stor.Remove(storage.FileDesc{Type: storage.TypeTable, Num: at.num}); err != nil {
  462. return err
  463. }
  464. }
  465. return nil
  466. }
  467. func (db *DB) tableCompaction(c *compaction, noTrivial bool) {
  468. defer c.release()
  469. rec := &sessionRecord{}
  470. rec.addCompPtr(c.sourceLevel, c.imax)
  471. if !noTrivial && c.trivial() {
  472. t := c.levels[0][0]
  473. db.logf("table@move L%d@%d -> L%d", c.sourceLevel, t.fd.Num, c.sourceLevel+1)
  474. rec.delTable(c.sourceLevel, t.fd.Num)
  475. rec.addTableFile(c.sourceLevel+1, t)
  476. db.compactionCommit("table-move", rec)
  477. return
  478. }
  479. var stats [2]cStatStaging
  480. for i, tables := range c.levels {
  481. for _, t := range tables {
  482. stats[i].read += t.size
  483. // Insert deleted tables into record
  484. rec.delTable(c.sourceLevel+i, t.fd.Num)
  485. }
  486. }
  487. sourceSize := int(stats[0].read + stats[1].read)
  488. minSeq := db.minSeq()
  489. db.logf("table@compaction L%d·%d -> L%d·%d S·%s Q·%d", c.sourceLevel, len(c.levels[0]), c.sourceLevel+1, len(c.levels[1]), shortenb(sourceSize), minSeq)
  490. b := &tableCompactionBuilder{
  491. db: db,
  492. s: db.s,
  493. c: c,
  494. rec: rec,
  495. stat1: &stats[1],
  496. minSeq: minSeq,
  497. strict: db.s.o.GetStrict(opt.StrictCompaction),
  498. tableSize: db.s.o.GetCompactionTableSize(c.sourceLevel + 1),
  499. }
  500. db.compactionTransact("table@build", b)
  501. // Commit.
  502. stats[1].startTimer()
  503. db.compactionCommit("table", rec)
  504. stats[1].stopTimer()
  505. resultSize := int(stats[1].write)
  506. db.logf("table@compaction committed F%s S%s Ke·%d D·%d T·%v", sint(len(rec.addedTables)-len(rec.deletedTables)), sshortenb(resultSize-sourceSize), b.kerrCnt, b.dropCnt, stats[1].duration)
  507. // Save compaction stats
  508. for i := range stats {
  509. db.compStats.addStat(c.sourceLevel+1, &stats[i])
  510. }
  511. }
  512. func (db *DB) tableRangeCompaction(level int, umin, umax []byte) error {
  513. db.logf("table@compaction range L%d %q:%q", level, umin, umax)
  514. if level >= 0 {
  515. if c := db.s.getCompactionRange(level, umin, umax, true); c != nil {
  516. db.tableCompaction(c, true)
  517. }
  518. } else {
  519. // Retry until nothing to compact.
  520. for {
  521. compacted := false
  522. // Scan for maximum level with overlapped tables.
  523. v := db.s.version()
  524. m := 1
  525. for i := m; i < len(v.levels); i++ {
  526. tables := v.levels[i]
  527. if tables.overlaps(db.s.icmp, umin, umax, false) {
  528. m = i
  529. }
  530. }
  531. v.release()
  532. for level := 0; level < m; level++ {
  533. if c := db.s.getCompactionRange(level, umin, umax, false); c != nil {
  534. db.tableCompaction(c, true)
  535. compacted = true
  536. }
  537. }
  538. if !compacted {
  539. break
  540. }
  541. }
  542. }
  543. return nil
  544. }
  545. func (db *DB) tableAutoCompaction() {
  546. if c := db.s.pickCompaction(); c != nil {
  547. db.tableCompaction(c, false)
  548. }
  549. }
  550. func (db *DB) tableNeedCompaction() bool {
  551. v := db.s.version()
  552. defer v.release()
  553. return v.needCompaction()
  554. }
  555. // resumeWrite returns an indicator whether we should resume write operation if enough level0 files are compacted.
  556. func (db *DB) resumeWrite() bool {
  557. v := db.s.version()
  558. defer v.release()
  559. if v.tLen(0) < db.s.o.GetWriteL0PauseTrigger() {
  560. return true
  561. }
  562. return false
  563. }
  564. func (db *DB) pauseCompaction(ch chan<- struct{}) {
  565. select {
  566. case ch <- struct{}{}:
  567. case <-db.closeC:
  568. db.compactionExitTransact()
  569. }
  570. }
  571. type cCmd interface {
  572. ack(err error)
  573. }
  574. type cAuto struct {
  575. // Note for table compaction, an non-empty ackC represents it's a compaction waiting command.
  576. ackC chan<- error
  577. }
  578. func (r cAuto) ack(err error) {
  579. if r.ackC != nil {
  580. defer func() {
  581. recover()
  582. }()
  583. r.ackC <- err
  584. }
  585. }
  586. type cRange struct {
  587. level int
  588. min, max []byte
  589. ackC chan<- error
  590. }
  591. func (r cRange) ack(err error) {
  592. if r.ackC != nil {
  593. defer func() {
  594. recover()
  595. }()
  596. r.ackC <- err
  597. }
  598. }
  599. // This will trigger auto compaction but will not wait for it.
  600. func (db *DB) compTrigger(compC chan<- cCmd) {
  601. select {
  602. case compC <- cAuto{}:
  603. default:
  604. }
  605. }
  606. // This will trigger auto compaction and/or wait for all compaction to be done.
  607. func (db *DB) compTriggerWait(compC chan<- cCmd) (err error) {
  608. ch := make(chan error)
  609. defer close(ch)
  610. // Send cmd.
  611. select {
  612. case compC <- cAuto{ch}:
  613. case err = <-db.compErrC:
  614. return
  615. case <-db.closeC:
  616. return ErrClosed
  617. }
  618. // Wait cmd.
  619. select {
  620. case err = <-ch:
  621. case err = <-db.compErrC:
  622. case <-db.closeC:
  623. return ErrClosed
  624. }
  625. return err
  626. }
  627. // Send range compaction request.
  628. func (db *DB) compTriggerRange(compC chan<- cCmd, level int, min, max []byte) (err error) {
  629. ch := make(chan error)
  630. defer close(ch)
  631. // Send cmd.
  632. select {
  633. case compC <- cRange{level, min, max, ch}:
  634. case err := <-db.compErrC:
  635. return err
  636. case <-db.closeC:
  637. return ErrClosed
  638. }
  639. // Wait cmd.
  640. select {
  641. case err = <-ch:
  642. case err = <-db.compErrC:
  643. case <-db.closeC:
  644. return ErrClosed
  645. }
  646. return err
  647. }
  648. func (db *DB) mCompaction() {
  649. var x cCmd
  650. defer func() {
  651. if x := recover(); x != nil {
  652. if x != errCompactionTransactExiting {
  653. panic(x)
  654. }
  655. }
  656. if x != nil {
  657. x.ack(ErrClosed)
  658. }
  659. db.closeW.Done()
  660. }()
  661. for {
  662. select {
  663. case x = <-db.mcompCmdC:
  664. switch x.(type) {
  665. case cAuto:
  666. db.memCompaction()
  667. x.ack(nil)
  668. x = nil
  669. default:
  670. panic("leveldb: unknown command")
  671. }
  672. case <-db.closeC:
  673. return
  674. }
  675. }
  676. }
  677. func (db *DB) tCompaction() {
  678. var (
  679. x cCmd
  680. waitQ []cCmd
  681. )
  682. defer func() {
  683. if x := recover(); x != nil {
  684. if x != errCompactionTransactExiting {
  685. panic(x)
  686. }
  687. }
  688. for i := range waitQ {
  689. waitQ[i].ack(ErrClosed)
  690. waitQ[i] = nil
  691. }
  692. if x != nil {
  693. x.ack(ErrClosed)
  694. }
  695. db.closeW.Done()
  696. }()
  697. for {
  698. if db.tableNeedCompaction() {
  699. select {
  700. case x = <-db.tcompCmdC:
  701. case ch := <-db.tcompPauseC:
  702. db.pauseCompaction(ch)
  703. continue
  704. case <-db.closeC:
  705. return
  706. default:
  707. }
  708. // Resume write operation as soon as possible.
  709. if len(waitQ) > 0 && db.resumeWrite() {
  710. for i := range waitQ {
  711. waitQ[i].ack(nil)
  712. waitQ[i] = nil
  713. }
  714. waitQ = waitQ[:0]
  715. }
  716. } else {
  717. for i := range waitQ {
  718. waitQ[i].ack(nil)
  719. waitQ[i] = nil
  720. }
  721. waitQ = waitQ[:0]
  722. select {
  723. case x = <-db.tcompCmdC:
  724. case ch := <-db.tcompPauseC:
  725. db.pauseCompaction(ch)
  726. continue
  727. case <-db.closeC:
  728. return
  729. }
  730. }
  731. if x != nil {
  732. switch cmd := x.(type) {
  733. case cAuto:
  734. if cmd.ackC != nil {
  735. // Check the write pause state before caching it.
  736. if db.resumeWrite() {
  737. x.ack(nil)
  738. } else {
  739. waitQ = append(waitQ, x)
  740. }
  741. }
  742. case cRange:
  743. x.ack(db.tableRangeCompaction(cmd.level, cmd.min, cmd.max))
  744. default:
  745. panic("leveldb: unknown command")
  746. }
  747. x = nil
  748. }
  749. db.tableAutoCompaction()
  750. }
  751. }