journal.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524
  1. // Copyright 2011 The LevelDB-Go Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. // Taken from: https://code.google.com/p/leveldb-go/source/browse/leveldb/record/record.go?r=1d5ccbe03246da926391ee12d1c6caae054ff4b0
  5. // License, authors and contributors informations can be found at bellow URLs respectively:
  6. // https://code.google.com/p/leveldb-go/source/browse/LICENSE
  7. // https://code.google.com/p/leveldb-go/source/browse/AUTHORS
  8. // https://code.google.com/p/leveldb-go/source/browse/CONTRIBUTORS
  9. // Package journal reads and writes sequences of journals. Each journal is a stream
  10. // of bytes that completes before the next journal starts.
  11. //
  12. // When reading, call Next to obtain an io.Reader for the next journal. Next will
  13. // return io.EOF when there are no more journals. It is valid to call Next
  14. // without reading the current journal to exhaustion.
  15. //
  16. // When writing, call Next to obtain an io.Writer for the next journal. Calling
  17. // Next finishes the current journal. Call Close to finish the final journal.
  18. //
  19. // Optionally, call Flush to finish the current journal and flush the underlying
  20. // writer without starting a new journal. To start a new journal after flushing,
  21. // call Next.
  22. //
  23. // Neither Readers or Writers are safe to use concurrently.
  24. //
  25. // Example code:
  26. // func read(r io.Reader) ([]string, error) {
  27. // var ss []string
  28. // journals := journal.NewReader(r, nil, true, true)
  29. // for {
  30. // j, err := journals.Next()
  31. // if err == io.EOF {
  32. // break
  33. // }
  34. // if err != nil {
  35. // return nil, err
  36. // }
  37. // s, err := ioutil.ReadAll(j)
  38. // if err != nil {
  39. // return nil, err
  40. // }
  41. // ss = append(ss, string(s))
  42. // }
  43. // return ss, nil
  44. // }
  45. //
  46. // func write(w io.Writer, ss []string) error {
  47. // journals := journal.NewWriter(w)
  48. // for _, s := range ss {
  49. // j, err := journals.Next()
  50. // if err != nil {
  51. // return err
  52. // }
  53. // if _, err := j.Write([]byte(s)), err != nil {
  54. // return err
  55. // }
  56. // }
  57. // return journals.Close()
  58. // }
  59. //
  60. // The wire format is that the stream is divided into 32KiB blocks, and each
  61. // block contains a number of tightly packed chunks. Chunks cannot cross block
  62. // boundaries. The last block may be shorter than 32 KiB. Any unused bytes in a
  63. // block must be zero.
  64. //
  65. // A journal maps to one or more chunks. Each chunk has a 7 byte header (a 4
  66. // byte checksum, a 2 byte little-endian uint16 length, and a 1 byte chunk type)
  67. // followed by a payload. The checksum is over the chunk type and the payload.
  68. //
  69. // There are four chunk types: whether the chunk is the full journal, or the
  70. // first, middle or last chunk of a multi-chunk journal. A multi-chunk journal
  71. // has one first chunk, zero or more middle chunks, and one last chunk.
  72. //
  73. // The wire format allows for limited recovery in the face of data corruption:
  74. // on a format error (such as a checksum mismatch), the reader moves to the
  75. // next block and looks for the next full or first chunk.
  76. package journal
  77. import (
  78. "encoding/binary"
  79. "fmt"
  80. "io"
  81. "github.com/syndtr/goleveldb/leveldb/errors"
  82. "github.com/syndtr/goleveldb/leveldb/storage"
  83. "github.com/syndtr/goleveldb/leveldb/util"
  84. )
  85. // These constants are part of the wire format and should not be changed.
  86. const (
  87. fullChunkType = 1
  88. firstChunkType = 2
  89. middleChunkType = 3
  90. lastChunkType = 4
  91. )
  92. const (
  93. blockSize = 32 * 1024
  94. headerSize = 7
  95. )
  96. type flusher interface {
  97. Flush() error
  98. }
  99. // ErrCorrupted is the error type that generated by corrupted block or chunk.
  100. type ErrCorrupted struct {
  101. Size int
  102. Reason string
  103. }
  104. func (e *ErrCorrupted) Error() string {
  105. return fmt.Sprintf("leveldb/journal: block/chunk corrupted: %s (%d bytes)", e.Reason, e.Size)
  106. }
  107. // Dropper is the interface that wrap simple Drop method. The Drop
  108. // method will be called when the journal reader dropping a block or chunk.
  109. type Dropper interface {
  110. Drop(err error)
  111. }
  112. // Reader reads journals from an underlying io.Reader.
  113. type Reader struct {
  114. // r is the underlying reader.
  115. r io.Reader
  116. // the dropper.
  117. dropper Dropper
  118. // strict flag.
  119. strict bool
  120. // checksum flag.
  121. checksum bool
  122. // seq is the sequence number of the current journal.
  123. seq int
  124. // buf[i:j] is the unread portion of the current chunk's payload.
  125. // The low bound, i, excludes the chunk header.
  126. i, j int
  127. // n is the number of bytes of buf that are valid. Once reading has started,
  128. // only the final block can have n < blockSize.
  129. n int
  130. // last is whether the current chunk is the last chunk of the journal.
  131. last bool
  132. // err is any accumulated error.
  133. err error
  134. // buf is the buffer.
  135. buf [blockSize]byte
  136. }
  137. // NewReader returns a new reader. The dropper may be nil, and if
  138. // strict is true then corrupted or invalid chunk will halt the journal
  139. // reader entirely.
  140. func NewReader(r io.Reader, dropper Dropper, strict, checksum bool) *Reader {
  141. return &Reader{
  142. r: r,
  143. dropper: dropper,
  144. strict: strict,
  145. checksum: checksum,
  146. last: true,
  147. }
  148. }
  149. var errSkip = errors.New("leveldb/journal: skipped")
  150. func (r *Reader) corrupt(n int, reason string, skip bool) error {
  151. if r.dropper != nil {
  152. r.dropper.Drop(&ErrCorrupted{n, reason})
  153. }
  154. if r.strict && !skip {
  155. r.err = errors.NewErrCorrupted(storage.FileDesc{}, &ErrCorrupted{n, reason})
  156. return r.err
  157. }
  158. return errSkip
  159. }
  160. // nextChunk sets r.buf[r.i:r.j] to hold the next chunk's payload, reading the
  161. // next block into the buffer if necessary.
  162. func (r *Reader) nextChunk(first bool) error {
  163. for {
  164. if r.j+headerSize <= r.n {
  165. checksum := binary.LittleEndian.Uint32(r.buf[r.j+0 : r.j+4])
  166. length := binary.LittleEndian.Uint16(r.buf[r.j+4 : r.j+6])
  167. chunkType := r.buf[r.j+6]
  168. unprocBlock := r.n - r.j
  169. if checksum == 0 && length == 0 && chunkType == 0 {
  170. // Drop entire block.
  171. r.i = r.n
  172. r.j = r.n
  173. return r.corrupt(unprocBlock, "zero header", false)
  174. }
  175. if chunkType < fullChunkType || chunkType > lastChunkType {
  176. // Drop entire block.
  177. r.i = r.n
  178. r.j = r.n
  179. return r.corrupt(unprocBlock, fmt.Sprintf("invalid chunk type %#x", chunkType), false)
  180. }
  181. r.i = r.j + headerSize
  182. r.j = r.j + headerSize + int(length)
  183. if r.j > r.n {
  184. // Drop entire block.
  185. r.i = r.n
  186. r.j = r.n
  187. return r.corrupt(unprocBlock, "chunk length overflows block", false)
  188. } else if r.checksum && checksum != util.NewCRC(r.buf[r.i-1:r.j]).Value() {
  189. // Drop entire block.
  190. r.i = r.n
  191. r.j = r.n
  192. return r.corrupt(unprocBlock, "checksum mismatch", false)
  193. }
  194. if first && chunkType != fullChunkType && chunkType != firstChunkType {
  195. chunkLength := (r.j - r.i) + headerSize
  196. r.i = r.j
  197. // Report the error, but skip it.
  198. return r.corrupt(chunkLength, "orphan chunk", true)
  199. }
  200. r.last = chunkType == fullChunkType || chunkType == lastChunkType
  201. return nil
  202. }
  203. // The last block.
  204. if r.n < blockSize && r.n > 0 {
  205. if !first {
  206. return r.corrupt(0, "missing chunk part", false)
  207. }
  208. r.err = io.EOF
  209. return r.err
  210. }
  211. // Read block.
  212. n, err := io.ReadFull(r.r, r.buf[:])
  213. if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
  214. return err
  215. }
  216. if n == 0 {
  217. if !first {
  218. return r.corrupt(0, "missing chunk part", false)
  219. }
  220. r.err = io.EOF
  221. return r.err
  222. }
  223. r.i, r.j, r.n = 0, 0, n
  224. }
  225. }
  226. // Next returns a reader for the next journal. It returns io.EOF if there are no
  227. // more journals. The reader returned becomes stale after the next Next call,
  228. // and should no longer be used. If strict is false, the reader will returns
  229. // io.ErrUnexpectedEOF error when found corrupted journal.
  230. func (r *Reader) Next() (io.Reader, error) {
  231. r.seq++
  232. if r.err != nil {
  233. return nil, r.err
  234. }
  235. r.i = r.j
  236. for {
  237. if err := r.nextChunk(true); err == nil {
  238. break
  239. } else if err != errSkip {
  240. return nil, err
  241. }
  242. }
  243. return &singleReader{r, r.seq, nil}, nil
  244. }
  245. // Reset resets the journal reader, allows reuse of the journal reader. Reset returns
  246. // last accumulated error.
  247. func (r *Reader) Reset(reader io.Reader, dropper Dropper, strict, checksum bool) error {
  248. r.seq++
  249. err := r.err
  250. r.r = reader
  251. r.dropper = dropper
  252. r.strict = strict
  253. r.checksum = checksum
  254. r.i = 0
  255. r.j = 0
  256. r.n = 0
  257. r.last = true
  258. r.err = nil
  259. return err
  260. }
  261. type singleReader struct {
  262. r *Reader
  263. seq int
  264. err error
  265. }
  266. func (x *singleReader) Read(p []byte) (int, error) {
  267. r := x.r
  268. if r.seq != x.seq {
  269. return 0, errors.New("leveldb/journal: stale reader")
  270. }
  271. if x.err != nil {
  272. return 0, x.err
  273. }
  274. if r.err != nil {
  275. return 0, r.err
  276. }
  277. for r.i == r.j {
  278. if r.last {
  279. return 0, io.EOF
  280. }
  281. x.err = r.nextChunk(false)
  282. if x.err != nil {
  283. if x.err == errSkip {
  284. x.err = io.ErrUnexpectedEOF
  285. }
  286. return 0, x.err
  287. }
  288. }
  289. n := copy(p, r.buf[r.i:r.j])
  290. r.i += n
  291. return n, nil
  292. }
  293. func (x *singleReader) ReadByte() (byte, error) {
  294. r := x.r
  295. if r.seq != x.seq {
  296. return 0, errors.New("leveldb/journal: stale reader")
  297. }
  298. if x.err != nil {
  299. return 0, x.err
  300. }
  301. if r.err != nil {
  302. return 0, r.err
  303. }
  304. for r.i == r.j {
  305. if r.last {
  306. return 0, io.EOF
  307. }
  308. x.err = r.nextChunk(false)
  309. if x.err != nil {
  310. if x.err == errSkip {
  311. x.err = io.ErrUnexpectedEOF
  312. }
  313. return 0, x.err
  314. }
  315. }
  316. c := r.buf[r.i]
  317. r.i++
  318. return c, nil
  319. }
  320. // Writer writes journals to an underlying io.Writer.
  321. type Writer struct {
  322. // w is the underlying writer.
  323. w io.Writer
  324. // seq is the sequence number of the current journal.
  325. seq int
  326. // f is w as a flusher.
  327. f flusher
  328. // buf[i:j] is the bytes that will become the current chunk.
  329. // The low bound, i, includes the chunk header.
  330. i, j int
  331. // buf[:written] has already been written to w.
  332. // written is zero unless Flush has been called.
  333. written int
  334. // first is whether the current chunk is the first chunk of the journal.
  335. first bool
  336. // pending is whether a chunk is buffered but not yet written.
  337. pending bool
  338. // err is any accumulated error.
  339. err error
  340. // buf is the buffer.
  341. buf [blockSize]byte
  342. }
  343. // NewWriter returns a new Writer.
  344. func NewWriter(w io.Writer) *Writer {
  345. f, _ := w.(flusher)
  346. return &Writer{
  347. w: w,
  348. f: f,
  349. }
  350. }
  351. // fillHeader fills in the header for the pending chunk.
  352. func (w *Writer) fillHeader(last bool) {
  353. if w.i+headerSize > w.j || w.j > blockSize {
  354. panic("leveldb/journal: bad writer state")
  355. }
  356. if last {
  357. if w.first {
  358. w.buf[w.i+6] = fullChunkType
  359. } else {
  360. w.buf[w.i+6] = lastChunkType
  361. }
  362. } else {
  363. if w.first {
  364. w.buf[w.i+6] = firstChunkType
  365. } else {
  366. w.buf[w.i+6] = middleChunkType
  367. }
  368. }
  369. binary.LittleEndian.PutUint32(w.buf[w.i+0:w.i+4], util.NewCRC(w.buf[w.i+6:w.j]).Value())
  370. binary.LittleEndian.PutUint16(w.buf[w.i+4:w.i+6], uint16(w.j-w.i-headerSize))
  371. }
  372. // writeBlock writes the buffered block to the underlying writer, and reserves
  373. // space for the next chunk's header.
  374. func (w *Writer) writeBlock() {
  375. _, w.err = w.w.Write(w.buf[w.written:])
  376. w.i = 0
  377. w.j = headerSize
  378. w.written = 0
  379. }
  380. // writePending finishes the current journal and writes the buffer to the
  381. // underlying writer.
  382. func (w *Writer) writePending() {
  383. if w.err != nil {
  384. return
  385. }
  386. if w.pending {
  387. w.fillHeader(true)
  388. w.pending = false
  389. }
  390. _, w.err = w.w.Write(w.buf[w.written:w.j])
  391. w.written = w.j
  392. }
  393. // Close finishes the current journal and closes the writer.
  394. func (w *Writer) Close() error {
  395. w.seq++
  396. w.writePending()
  397. if w.err != nil {
  398. return w.err
  399. }
  400. w.err = errors.New("leveldb/journal: closed Writer")
  401. return nil
  402. }
  403. // Flush finishes the current journal, writes to the underlying writer, and
  404. // flushes it if that writer implements interface{ Flush() error }.
  405. func (w *Writer) Flush() error {
  406. w.seq++
  407. w.writePending()
  408. if w.err != nil {
  409. return w.err
  410. }
  411. if w.f != nil {
  412. w.err = w.f.Flush()
  413. return w.err
  414. }
  415. return nil
  416. }
  417. // Reset resets the journal writer, allows reuse of the journal writer. Reset
  418. // will also closes the journal writer if not already.
  419. func (w *Writer) Reset(writer io.Writer) (err error) {
  420. w.seq++
  421. if w.err == nil {
  422. w.writePending()
  423. err = w.err
  424. }
  425. w.w = writer
  426. w.f, _ = writer.(flusher)
  427. w.i = 0
  428. w.j = 0
  429. w.written = 0
  430. w.first = false
  431. w.pending = false
  432. w.err = nil
  433. return
  434. }
  435. // Next returns a writer for the next journal. The writer returned becomes stale
  436. // after the next Close, Flush or Next call, and should no longer be used.
  437. func (w *Writer) Next() (io.Writer, error) {
  438. w.seq++
  439. if w.err != nil {
  440. return nil, w.err
  441. }
  442. if w.pending {
  443. w.fillHeader(true)
  444. }
  445. w.i = w.j
  446. w.j = w.j + headerSize
  447. // Check if there is room in the block for the header.
  448. if w.j > blockSize {
  449. // Fill in the rest of the block with zeroes.
  450. for k := w.i; k < blockSize; k++ {
  451. w.buf[k] = 0
  452. }
  453. w.writeBlock()
  454. if w.err != nil {
  455. return nil, w.err
  456. }
  457. }
  458. w.first = true
  459. w.pending = true
  460. return singleWriter{w, w.seq}, nil
  461. }
  462. type singleWriter struct {
  463. w *Writer
  464. seq int
  465. }
  466. func (x singleWriter) Write(p []byte) (int, error) {
  467. w := x.w
  468. if w.seq != x.seq {
  469. return 0, errors.New("leveldb/journal: stale writer")
  470. }
  471. if w.err != nil {
  472. return 0, w.err
  473. }
  474. n0 := len(p)
  475. for len(p) > 0 {
  476. // Write a block, if it is full.
  477. if w.j == blockSize {
  478. w.fillHeader(false)
  479. w.writeBlock()
  480. if w.err != nil {
  481. return 0, w.err
  482. }
  483. w.first = false
  484. }
  485. // Copy bytes into the buffer.
  486. n := copy(w.buf[w.j:], p)
  487. w.j += n
  488. p = p[n:]
  489. }
  490. return n0, nil
  491. }