session.go 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210
  1. // Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
  2. // All rights reserved.
  3. //
  4. // Use of this source code is governed by a BSD-style license that can be
  5. // found in the LICENSE file.
  6. package leveldb
  7. import (
  8. "fmt"
  9. "io"
  10. "os"
  11. "sync"
  12. "github.com/syndtr/goleveldb/leveldb/errors"
  13. "github.com/syndtr/goleveldb/leveldb/journal"
  14. "github.com/syndtr/goleveldb/leveldb/opt"
  15. "github.com/syndtr/goleveldb/leveldb/storage"
  16. )
  17. // ErrManifestCorrupted records manifest corruption. This error will be
  18. // wrapped with errors.ErrCorrupted.
  19. type ErrManifestCorrupted struct {
  20. Field string
  21. Reason string
  22. }
  23. func (e *ErrManifestCorrupted) Error() string {
  24. return fmt.Sprintf("leveldb: manifest corrupted (field '%s'): %s", e.Field, e.Reason)
  25. }
  26. func newErrManifestCorrupted(fd storage.FileDesc, field, reason string) error {
  27. return errors.NewErrCorrupted(fd, &ErrManifestCorrupted{field, reason})
  28. }
  29. // session represent a persistent database session.
  30. type session struct {
  31. // Need 64-bit alignment.
  32. stNextFileNum int64 // current unused file number
  33. stJournalNum int64 // current journal file number; need external synchronization
  34. stPrevJournalNum int64 // prev journal file number; no longer used; for compatibility with older version of leveldb
  35. stTempFileNum int64
  36. stSeqNum uint64 // last mem compacted seq; need external synchronization
  37. stor *iStorage
  38. storLock storage.Locker
  39. o *cachedOptions
  40. icmp *iComparer
  41. tops *tOps
  42. fileRef map[int64]int
  43. manifest *journal.Writer
  44. manifestWriter storage.Writer
  45. manifestFd storage.FileDesc
  46. stCompPtrs []internalKey // compaction pointers; need external synchronization
  47. stVersion *version // current version
  48. vmu sync.Mutex
  49. }
  50. // Creates new initialized session instance.
  51. func newSession(stor storage.Storage, o *opt.Options) (s *session, err error) {
  52. if stor == nil {
  53. return nil, os.ErrInvalid
  54. }
  55. storLock, err := stor.Lock()
  56. if err != nil {
  57. return
  58. }
  59. s = &session{
  60. stor: newIStorage(stor),
  61. storLock: storLock,
  62. fileRef: make(map[int64]int),
  63. }
  64. s.setOptions(o)
  65. s.tops = newTableOps(s)
  66. s.setVersion(newVersion(s))
  67. s.log("log@legend F·NumFile S·FileSize N·Entry C·BadEntry B·BadBlock Ke·KeyError D·DroppedEntry L·Level Q·SeqNum T·TimeElapsed")
  68. return
  69. }
  70. // Close session.
  71. func (s *session) close() {
  72. s.tops.close()
  73. if s.manifest != nil {
  74. s.manifest.Close()
  75. }
  76. if s.manifestWriter != nil {
  77. s.manifestWriter.Close()
  78. }
  79. s.manifest = nil
  80. s.manifestWriter = nil
  81. s.setVersion(&version{s: s, closing: true})
  82. }
  83. // Release session lock.
  84. func (s *session) release() {
  85. s.storLock.Unlock()
  86. }
  87. // Create a new database session; need external synchronization.
  88. func (s *session) create() error {
  89. // create manifest
  90. return s.newManifest(nil, nil)
  91. }
  92. // Recover a database session; need external synchronization.
  93. func (s *session) recover() (err error) {
  94. defer func() {
  95. if os.IsNotExist(err) {
  96. // Don't return os.ErrNotExist if the underlying storage contains
  97. // other files that belong to LevelDB. So the DB won't get trashed.
  98. if fds, _ := s.stor.List(storage.TypeAll); len(fds) > 0 {
  99. err = &errors.ErrCorrupted{Fd: storage.FileDesc{Type: storage.TypeManifest}, Err: &errors.ErrMissingFiles{}}
  100. }
  101. }
  102. }()
  103. fd, err := s.stor.GetMeta()
  104. if err != nil {
  105. return
  106. }
  107. reader, err := s.stor.Open(fd)
  108. if err != nil {
  109. return
  110. }
  111. defer reader.Close()
  112. var (
  113. // Options.
  114. strict = s.o.GetStrict(opt.StrictManifest)
  115. jr = journal.NewReader(reader, dropper{s, fd}, strict, true)
  116. rec = &sessionRecord{}
  117. staging = s.stVersion.newStaging()
  118. )
  119. for {
  120. var r io.Reader
  121. r, err = jr.Next()
  122. if err != nil {
  123. if err == io.EOF {
  124. err = nil
  125. break
  126. }
  127. return errors.SetFd(err, fd)
  128. }
  129. err = rec.decode(r)
  130. if err == nil {
  131. // save compact pointers
  132. for _, r := range rec.compPtrs {
  133. s.setCompPtr(r.level, internalKey(r.ikey))
  134. }
  135. // commit record to version staging
  136. staging.commit(rec)
  137. } else {
  138. err = errors.SetFd(err, fd)
  139. if strict || !errors.IsCorrupted(err) {
  140. return
  141. }
  142. s.logf("manifest error: %v (skipped)", errors.SetFd(err, fd))
  143. }
  144. rec.resetCompPtrs()
  145. rec.resetAddedTables()
  146. rec.resetDeletedTables()
  147. }
  148. switch {
  149. case !rec.has(recComparer):
  150. return newErrManifestCorrupted(fd, "comparer", "missing")
  151. case rec.comparer != s.icmp.uName():
  152. return newErrManifestCorrupted(fd, "comparer", fmt.Sprintf("mismatch: want '%s', got '%s'", s.icmp.uName(), rec.comparer))
  153. case !rec.has(recNextFileNum):
  154. return newErrManifestCorrupted(fd, "next-file-num", "missing")
  155. case !rec.has(recJournalNum):
  156. return newErrManifestCorrupted(fd, "journal-file-num", "missing")
  157. case !rec.has(recSeqNum):
  158. return newErrManifestCorrupted(fd, "seq-num", "missing")
  159. }
  160. s.manifestFd = fd
  161. s.setVersion(staging.finish())
  162. s.setNextFileNum(rec.nextFileNum)
  163. s.recordCommited(rec)
  164. return nil
  165. }
  166. // Commit session; need external synchronization.
  167. func (s *session) commit(r *sessionRecord) (err error) {
  168. v := s.version()
  169. defer v.release()
  170. // spawn new version based on current version
  171. nv := v.spawn(r)
  172. if s.manifest == nil {
  173. // manifest journal writer not yet created, create one
  174. err = s.newManifest(r, nv)
  175. } else {
  176. err = s.flushManifest(r)
  177. }
  178. // finally, apply new version if no error rise
  179. if err == nil {
  180. s.setVersion(nv)
  181. }
  182. return
  183. }