session_compaction.go 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302
  1. // Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
  2. // All rights reserved.
  3. //
  4. // Use of this source code is governed by a BSD-style license that can be
  5. // found in the LICENSE file.
  6. package leveldb
  7. import (
  8. "sync/atomic"
  9. "github.com/syndtr/goleveldb/leveldb/iterator"
  10. "github.com/syndtr/goleveldb/leveldb/memdb"
  11. "github.com/syndtr/goleveldb/leveldb/opt"
  12. )
  13. func (s *session) pickMemdbLevel(umin, umax []byte, maxLevel int) int {
  14. v := s.version()
  15. defer v.release()
  16. return v.pickMemdbLevel(umin, umax, maxLevel)
  17. }
  18. func (s *session) flushMemdb(rec *sessionRecord, mdb *memdb.DB, maxLevel int) (int, error) {
  19. // Create sorted table.
  20. iter := mdb.NewIterator(nil)
  21. defer iter.Release()
  22. t, n, err := s.tops.createFrom(iter)
  23. if err != nil {
  24. return 0, err
  25. }
  26. // Pick level other than zero can cause compaction issue with large
  27. // bulk insert and delete on strictly incrementing key-space. The
  28. // problem is that the small deletion markers trapped at lower level,
  29. // while key/value entries keep growing at higher level. Since the
  30. // key-space is strictly incrementing it will not overlaps with
  31. // higher level, thus maximum possible level is always picked, while
  32. // overlapping deletion marker pushed into lower level.
  33. // See: https://github.com/syndtr/goleveldb/issues/127.
  34. flushLevel := s.pickMemdbLevel(t.imin.ukey(), t.imax.ukey(), maxLevel)
  35. rec.addTableFile(flushLevel, t)
  36. s.logf("memdb@flush created L%d@%d N·%d S·%s %q:%q", flushLevel, t.fd.Num, n, shortenb(int(t.size)), t.imin, t.imax)
  37. return flushLevel, nil
  38. }
  39. // Pick a compaction based on current state; need external synchronization.
  40. func (s *session) pickCompaction() *compaction {
  41. v := s.version()
  42. var sourceLevel int
  43. var t0 tFiles
  44. if v.cScore >= 1 {
  45. sourceLevel = v.cLevel
  46. cptr := s.getCompPtr(sourceLevel)
  47. tables := v.levels[sourceLevel]
  48. for _, t := range tables {
  49. if cptr == nil || s.icmp.Compare(t.imax, cptr) > 0 {
  50. t0 = append(t0, t)
  51. break
  52. }
  53. }
  54. if len(t0) == 0 {
  55. t0 = append(t0, tables[0])
  56. }
  57. } else {
  58. if p := atomic.LoadPointer(&v.cSeek); p != nil {
  59. ts := (*tSet)(p)
  60. sourceLevel = ts.level
  61. t0 = append(t0, ts.table)
  62. } else {
  63. v.release()
  64. return nil
  65. }
  66. }
  67. return newCompaction(s, v, sourceLevel, t0)
  68. }
  69. // Create compaction from given level and range; need external synchronization.
  70. func (s *session) getCompactionRange(sourceLevel int, umin, umax []byte, noLimit bool) *compaction {
  71. v := s.version()
  72. if sourceLevel >= len(v.levels) {
  73. v.release()
  74. return nil
  75. }
  76. t0 := v.levels[sourceLevel].getOverlaps(nil, s.icmp, umin, umax, sourceLevel == 0)
  77. if len(t0) == 0 {
  78. v.release()
  79. return nil
  80. }
  81. // Avoid compacting too much in one shot in case the range is large.
  82. // But we cannot do this for level-0 since level-0 files can overlap
  83. // and we must not pick one file and drop another older file if the
  84. // two files overlap.
  85. if !noLimit && sourceLevel > 0 {
  86. limit := int64(v.s.o.GetCompactionSourceLimit(sourceLevel))
  87. total := int64(0)
  88. for i, t := range t0 {
  89. total += t.size
  90. if total >= limit {
  91. s.logf("table@compaction limiting F·%d -> F·%d", len(t0), i+1)
  92. t0 = t0[:i+1]
  93. break
  94. }
  95. }
  96. }
  97. return newCompaction(s, v, sourceLevel, t0)
  98. }
  99. func newCompaction(s *session, v *version, sourceLevel int, t0 tFiles) *compaction {
  100. c := &compaction{
  101. s: s,
  102. v: v,
  103. sourceLevel: sourceLevel,
  104. levels: [2]tFiles{t0, nil},
  105. maxGPOverlaps: int64(s.o.GetCompactionGPOverlaps(sourceLevel)),
  106. tPtrs: make([]int, len(v.levels)),
  107. }
  108. c.expand()
  109. c.save()
  110. return c
  111. }
  112. // compaction represent a compaction state.
  113. type compaction struct {
  114. s *session
  115. v *version
  116. sourceLevel int
  117. levels [2]tFiles
  118. maxGPOverlaps int64
  119. gp tFiles
  120. gpi int
  121. seenKey bool
  122. gpOverlappedBytes int64
  123. imin, imax internalKey
  124. tPtrs []int
  125. released bool
  126. snapGPI int
  127. snapSeenKey bool
  128. snapGPOverlappedBytes int64
  129. snapTPtrs []int
  130. }
  131. func (c *compaction) save() {
  132. c.snapGPI = c.gpi
  133. c.snapSeenKey = c.seenKey
  134. c.snapGPOverlappedBytes = c.gpOverlappedBytes
  135. c.snapTPtrs = append(c.snapTPtrs[:0], c.tPtrs...)
  136. }
  137. func (c *compaction) restore() {
  138. c.gpi = c.snapGPI
  139. c.seenKey = c.snapSeenKey
  140. c.gpOverlappedBytes = c.snapGPOverlappedBytes
  141. c.tPtrs = append(c.tPtrs[:0], c.snapTPtrs...)
  142. }
  143. func (c *compaction) release() {
  144. if !c.released {
  145. c.released = true
  146. c.v.release()
  147. }
  148. }
  149. // Expand compacted tables; need external synchronization.
  150. func (c *compaction) expand() {
  151. limit := int64(c.s.o.GetCompactionExpandLimit(c.sourceLevel))
  152. vt0 := c.v.levels[c.sourceLevel]
  153. vt1 := tFiles{}
  154. if level := c.sourceLevel + 1; level < len(c.v.levels) {
  155. vt1 = c.v.levels[level]
  156. }
  157. t0, t1 := c.levels[0], c.levels[1]
  158. imin, imax := t0.getRange(c.s.icmp)
  159. // We expand t0 here just incase ukey hop across tables.
  160. t0 = vt0.getOverlaps(t0, c.s.icmp, imin.ukey(), imax.ukey(), c.sourceLevel == 0)
  161. if len(t0) != len(c.levels[0]) {
  162. imin, imax = t0.getRange(c.s.icmp)
  163. }
  164. t1 = vt1.getOverlaps(t1, c.s.icmp, imin.ukey(), imax.ukey(), false)
  165. // Get entire range covered by compaction.
  166. amin, amax := append(t0, t1...).getRange(c.s.icmp)
  167. // See if we can grow the number of inputs in "sourceLevel" without
  168. // changing the number of "sourceLevel+1" files we pick up.
  169. if len(t1) > 0 {
  170. exp0 := vt0.getOverlaps(nil, c.s.icmp, amin.ukey(), amax.ukey(), c.sourceLevel == 0)
  171. if len(exp0) > len(t0) && t1.size()+exp0.size() < limit {
  172. xmin, xmax := exp0.getRange(c.s.icmp)
  173. exp1 := vt1.getOverlaps(nil, c.s.icmp, xmin.ukey(), xmax.ukey(), false)
  174. if len(exp1) == len(t1) {
  175. c.s.logf("table@compaction expanding L%d+L%d (F·%d S·%s)+(F·%d S·%s) -> (F·%d S·%s)+(F·%d S·%s)",
  176. c.sourceLevel, c.sourceLevel+1, len(t0), shortenb(int(t0.size())), len(t1), shortenb(int(t1.size())),
  177. len(exp0), shortenb(int(exp0.size())), len(exp1), shortenb(int(exp1.size())))
  178. imin, imax = xmin, xmax
  179. t0, t1 = exp0, exp1
  180. amin, amax = append(t0, t1...).getRange(c.s.icmp)
  181. }
  182. }
  183. }
  184. // Compute the set of grandparent files that overlap this compaction
  185. // (parent == sourceLevel+1; grandparent == sourceLevel+2)
  186. if level := c.sourceLevel + 2; level < len(c.v.levels) {
  187. c.gp = c.v.levels[level].getOverlaps(c.gp, c.s.icmp, amin.ukey(), amax.ukey(), false)
  188. }
  189. c.levels[0], c.levels[1] = t0, t1
  190. c.imin, c.imax = imin, imax
  191. }
  192. // Check whether compaction is trivial.
  193. func (c *compaction) trivial() bool {
  194. return len(c.levels[0]) == 1 && len(c.levels[1]) == 0 && c.gp.size() <= c.maxGPOverlaps
  195. }
  196. func (c *compaction) baseLevelForKey(ukey []byte) bool {
  197. for level := c.sourceLevel + 2; level < len(c.v.levels); level++ {
  198. tables := c.v.levels[level]
  199. for c.tPtrs[level] < len(tables) {
  200. t := tables[c.tPtrs[level]]
  201. if c.s.icmp.uCompare(ukey, t.imax.ukey()) <= 0 {
  202. // We've advanced far enough.
  203. if c.s.icmp.uCompare(ukey, t.imin.ukey()) >= 0 {
  204. // Key falls in this file's range, so definitely not base level.
  205. return false
  206. }
  207. break
  208. }
  209. c.tPtrs[level]++
  210. }
  211. }
  212. return true
  213. }
  214. func (c *compaction) shouldStopBefore(ikey internalKey) bool {
  215. for ; c.gpi < len(c.gp); c.gpi++ {
  216. gp := c.gp[c.gpi]
  217. if c.s.icmp.Compare(ikey, gp.imax) <= 0 {
  218. break
  219. }
  220. if c.seenKey {
  221. c.gpOverlappedBytes += gp.size
  222. }
  223. }
  224. c.seenKey = true
  225. if c.gpOverlappedBytes > c.maxGPOverlaps {
  226. // Too much overlap for current output; start new output.
  227. c.gpOverlappedBytes = 0
  228. return true
  229. }
  230. return false
  231. }
  232. // Creates an iterator.
  233. func (c *compaction) newIterator() iterator.Iterator {
  234. // Creates iterator slice.
  235. icap := len(c.levels)
  236. if c.sourceLevel == 0 {
  237. // Special case for level-0.
  238. icap = len(c.levels[0]) + 1
  239. }
  240. its := make([]iterator.Iterator, 0, icap)
  241. // Options.
  242. ro := &opt.ReadOptions{
  243. DontFillCache: true,
  244. Strict: opt.StrictOverride,
  245. }
  246. strict := c.s.o.GetStrict(opt.StrictCompaction)
  247. if strict {
  248. ro.Strict |= opt.StrictReader
  249. }
  250. for i, tables := range c.levels {
  251. if len(tables) == 0 {
  252. continue
  253. }
  254. // Level-0 is not sorted and may overlaps each other.
  255. if c.sourceLevel+i == 0 {
  256. for _, t := range tables {
  257. its = append(its, c.s.tops.newIterator(t, nil, ro))
  258. }
  259. } else {
  260. it := iterator.NewIndexedIterator(tables.newIndexIterator(c.s.tops, c.s.icmp, nil, ro), strict)
  261. its = append(its, it)
  262. }
  263. }
  264. return iterator.NewMergedIterator(its, c.s.icmp, strict)
  265. }