123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524 |
- // Copyright 2011 The LevelDB-Go Authors. All rights reserved.
- // Use of this source code is governed by a BSD-style
- // license that can be found in the LICENSE file.
- // Taken from: https://code.google.com/p/leveldb-go/source/browse/leveldb/record/record.go?r=1d5ccbe03246da926391ee12d1c6caae054ff4b0
- // License, authors and contributors informations can be found at bellow URLs respectively:
- // https://code.google.com/p/leveldb-go/source/browse/LICENSE
- // https://code.google.com/p/leveldb-go/source/browse/AUTHORS
- // https://code.google.com/p/leveldb-go/source/browse/CONTRIBUTORS
- // Package journal reads and writes sequences of journals. Each journal is a stream
- // of bytes that completes before the next journal starts.
- //
- // When reading, call Next to obtain an io.Reader for the next journal. Next will
- // return io.EOF when there are no more journals. It is valid to call Next
- // without reading the current journal to exhaustion.
- //
- // When writing, call Next to obtain an io.Writer for the next journal. Calling
- // Next finishes the current journal. Call Close to finish the final journal.
- //
- // Optionally, call Flush to finish the current journal and flush the underlying
- // writer without starting a new journal. To start a new journal after flushing,
- // call Next.
- //
- // Neither Readers or Writers are safe to use concurrently.
- //
- // Example code:
- // func read(r io.Reader) ([]string, error) {
- // var ss []string
- // journals := journal.NewReader(r, nil, true, true)
- // for {
- // j, err := journals.Next()
- // if err == io.EOF {
- // break
- // }
- // if err != nil {
- // return nil, err
- // }
- // s, err := ioutil.ReadAll(j)
- // if err != nil {
- // return nil, err
- // }
- // ss = append(ss, string(s))
- // }
- // return ss, nil
- // }
- //
- // func write(w io.Writer, ss []string) error {
- // journals := journal.NewWriter(w)
- // for _, s := range ss {
- // j, err := journals.Next()
- // if err != nil {
- // return err
- // }
- // if _, err := j.Write([]byte(s)), err != nil {
- // return err
- // }
- // }
- // return journals.Close()
- // }
- //
- // The wire format is that the stream is divided into 32KiB blocks, and each
- // block contains a number of tightly packed chunks. Chunks cannot cross block
- // boundaries. The last block may be shorter than 32 KiB. Any unused bytes in a
- // block must be zero.
- //
- // A journal maps to one or more chunks. Each chunk has a 7 byte header (a 4
- // byte checksum, a 2 byte little-endian uint16 length, and a 1 byte chunk type)
- // followed by a payload. The checksum is over the chunk type and the payload.
- //
- // There are four chunk types: whether the chunk is the full journal, or the
- // first, middle or last chunk of a multi-chunk journal. A multi-chunk journal
- // has one first chunk, zero or more middle chunks, and one last chunk.
- //
- // The wire format allows for limited recovery in the face of data corruption:
- // on a format error (such as a checksum mismatch), the reader moves to the
- // next block and looks for the next full or first chunk.
- package journal
- import (
- "encoding/binary"
- "fmt"
- "io"
- "github.com/syndtr/goleveldb/leveldb/errors"
- "github.com/syndtr/goleveldb/leveldb/storage"
- "github.com/syndtr/goleveldb/leveldb/util"
- )
- // These constants are part of the wire format and should not be changed.
- const (
- fullChunkType = 1
- firstChunkType = 2
- middleChunkType = 3
- lastChunkType = 4
- )
- const (
- blockSize = 32 * 1024
- headerSize = 7
- )
- type flusher interface {
- Flush() error
- }
- // ErrCorrupted is the error type that generated by corrupted block or chunk.
- type ErrCorrupted struct {
- Size int
- Reason string
- }
- func (e *ErrCorrupted) Error() string {
- return fmt.Sprintf("leveldb/journal: block/chunk corrupted: %s (%d bytes)", e.Reason, e.Size)
- }
- // Dropper is the interface that wrap simple Drop method. The Drop
- // method will be called when the journal reader dropping a block or chunk.
- type Dropper interface {
- Drop(err error)
- }
- // Reader reads journals from an underlying io.Reader.
- type Reader struct {
- // r is the underlying reader.
- r io.Reader
- // the dropper.
- dropper Dropper
- // strict flag.
- strict bool
- // checksum flag.
- checksum bool
- // seq is the sequence number of the current journal.
- seq int
- // buf[i:j] is the unread portion of the current chunk's payload.
- // The low bound, i, excludes the chunk header.
- i, j int
- // n is the number of bytes of buf that are valid. Once reading has started,
- // only the final block can have n < blockSize.
- n int
- // last is whether the current chunk is the last chunk of the journal.
- last bool
- // err is any accumulated error.
- err error
- // buf is the buffer.
- buf [blockSize]byte
- }
- // NewReader returns a new reader. The dropper may be nil, and if
- // strict is true then corrupted or invalid chunk will halt the journal
- // reader entirely.
- func NewReader(r io.Reader, dropper Dropper, strict, checksum bool) *Reader {
- return &Reader{
- r: r,
- dropper: dropper,
- strict: strict,
- checksum: checksum,
- last: true,
- }
- }
- var errSkip = errors.New("leveldb/journal: skipped")
- func (r *Reader) corrupt(n int, reason string, skip bool) error {
- if r.dropper != nil {
- r.dropper.Drop(&ErrCorrupted{n, reason})
- }
- if r.strict && !skip {
- r.err = errors.NewErrCorrupted(storage.FileDesc{}, &ErrCorrupted{n, reason})
- return r.err
- }
- return errSkip
- }
- // nextChunk sets r.buf[r.i:r.j] to hold the next chunk's payload, reading the
- // next block into the buffer if necessary.
- func (r *Reader) nextChunk(first bool) error {
- for {
- if r.j+headerSize <= r.n {
- checksum := binary.LittleEndian.Uint32(r.buf[r.j+0 : r.j+4])
- length := binary.LittleEndian.Uint16(r.buf[r.j+4 : r.j+6])
- chunkType := r.buf[r.j+6]
- unprocBlock := r.n - r.j
- if checksum == 0 && length == 0 && chunkType == 0 {
- // Drop entire block.
- r.i = r.n
- r.j = r.n
- return r.corrupt(unprocBlock, "zero header", false)
- }
- if chunkType < fullChunkType || chunkType > lastChunkType {
- // Drop entire block.
- r.i = r.n
- r.j = r.n
- return r.corrupt(unprocBlock, fmt.Sprintf("invalid chunk type %#x", chunkType), false)
- }
- r.i = r.j + headerSize
- r.j = r.j + headerSize + int(length)
- if r.j > r.n {
- // Drop entire block.
- r.i = r.n
- r.j = r.n
- return r.corrupt(unprocBlock, "chunk length overflows block", false)
- } else if r.checksum && checksum != util.NewCRC(r.buf[r.i-1:r.j]).Value() {
- // Drop entire block.
- r.i = r.n
- r.j = r.n
- return r.corrupt(unprocBlock, "checksum mismatch", false)
- }
- if first && chunkType != fullChunkType && chunkType != firstChunkType {
- chunkLength := (r.j - r.i) + headerSize
- r.i = r.j
- // Report the error, but skip it.
- return r.corrupt(chunkLength, "orphan chunk", true)
- }
- r.last = chunkType == fullChunkType || chunkType == lastChunkType
- return nil
- }
- // The last block.
- if r.n < blockSize && r.n > 0 {
- if !first {
- return r.corrupt(0, "missing chunk part", false)
- }
- r.err = io.EOF
- return r.err
- }
- // Read block.
- n, err := io.ReadFull(r.r, r.buf[:])
- if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
- return err
- }
- if n == 0 {
- if !first {
- return r.corrupt(0, "missing chunk part", false)
- }
- r.err = io.EOF
- return r.err
- }
- r.i, r.j, r.n = 0, 0, n
- }
- }
- // Next returns a reader for the next journal. It returns io.EOF if there are no
- // more journals. The reader returned becomes stale after the next Next call,
- // and should no longer be used. If strict is false, the reader will returns
- // io.ErrUnexpectedEOF error when found corrupted journal.
- func (r *Reader) Next() (io.Reader, error) {
- r.seq++
- if r.err != nil {
- return nil, r.err
- }
- r.i = r.j
- for {
- if err := r.nextChunk(true); err == nil {
- break
- } else if err != errSkip {
- return nil, err
- }
- }
- return &singleReader{r, r.seq, nil}, nil
- }
- // Reset resets the journal reader, allows reuse of the journal reader. Reset returns
- // last accumulated error.
- func (r *Reader) Reset(reader io.Reader, dropper Dropper, strict, checksum bool) error {
- r.seq++
- err := r.err
- r.r = reader
- r.dropper = dropper
- r.strict = strict
- r.checksum = checksum
- r.i = 0
- r.j = 0
- r.n = 0
- r.last = true
- r.err = nil
- return err
- }
- type singleReader struct {
- r *Reader
- seq int
- err error
- }
- func (x *singleReader) Read(p []byte) (int, error) {
- r := x.r
- if r.seq != x.seq {
- return 0, errors.New("leveldb/journal: stale reader")
- }
- if x.err != nil {
- return 0, x.err
- }
- if r.err != nil {
- return 0, r.err
- }
- for r.i == r.j {
- if r.last {
- return 0, io.EOF
- }
- x.err = r.nextChunk(false)
- if x.err != nil {
- if x.err == errSkip {
- x.err = io.ErrUnexpectedEOF
- }
- return 0, x.err
- }
- }
- n := copy(p, r.buf[r.i:r.j])
- r.i += n
- return n, nil
- }
- func (x *singleReader) ReadByte() (byte, error) {
- r := x.r
- if r.seq != x.seq {
- return 0, errors.New("leveldb/journal: stale reader")
- }
- if x.err != nil {
- return 0, x.err
- }
- if r.err != nil {
- return 0, r.err
- }
- for r.i == r.j {
- if r.last {
- return 0, io.EOF
- }
- x.err = r.nextChunk(false)
- if x.err != nil {
- if x.err == errSkip {
- x.err = io.ErrUnexpectedEOF
- }
- return 0, x.err
- }
- }
- c := r.buf[r.i]
- r.i++
- return c, nil
- }
- // Writer writes journals to an underlying io.Writer.
- type Writer struct {
- // w is the underlying writer.
- w io.Writer
- // seq is the sequence number of the current journal.
- seq int
- // f is w as a flusher.
- f flusher
- // buf[i:j] is the bytes that will become the current chunk.
- // The low bound, i, includes the chunk header.
- i, j int
- // buf[:written] has already been written to w.
- // written is zero unless Flush has been called.
- written int
- // first is whether the current chunk is the first chunk of the journal.
- first bool
- // pending is whether a chunk is buffered but not yet written.
- pending bool
- // err is any accumulated error.
- err error
- // buf is the buffer.
- buf [blockSize]byte
- }
- // NewWriter returns a new Writer.
- func NewWriter(w io.Writer) *Writer {
- f, _ := w.(flusher)
- return &Writer{
- w: w,
- f: f,
- }
- }
- // fillHeader fills in the header for the pending chunk.
- func (w *Writer) fillHeader(last bool) {
- if w.i+headerSize > w.j || w.j > blockSize {
- panic("leveldb/journal: bad writer state")
- }
- if last {
- if w.first {
- w.buf[w.i+6] = fullChunkType
- } else {
- w.buf[w.i+6] = lastChunkType
- }
- } else {
- if w.first {
- w.buf[w.i+6] = firstChunkType
- } else {
- w.buf[w.i+6] = middleChunkType
- }
- }
- binary.LittleEndian.PutUint32(w.buf[w.i+0:w.i+4], util.NewCRC(w.buf[w.i+6:w.j]).Value())
- binary.LittleEndian.PutUint16(w.buf[w.i+4:w.i+6], uint16(w.j-w.i-headerSize))
- }
- // writeBlock writes the buffered block to the underlying writer, and reserves
- // space for the next chunk's header.
- func (w *Writer) writeBlock() {
- _, w.err = w.w.Write(w.buf[w.written:])
- w.i = 0
- w.j = headerSize
- w.written = 0
- }
- // writePending finishes the current journal and writes the buffer to the
- // underlying writer.
- func (w *Writer) writePending() {
- if w.err != nil {
- return
- }
- if w.pending {
- w.fillHeader(true)
- w.pending = false
- }
- _, w.err = w.w.Write(w.buf[w.written:w.j])
- w.written = w.j
- }
- // Close finishes the current journal and closes the writer.
- func (w *Writer) Close() error {
- w.seq++
- w.writePending()
- if w.err != nil {
- return w.err
- }
- w.err = errors.New("leveldb/journal: closed Writer")
- return nil
- }
- // Flush finishes the current journal, writes to the underlying writer, and
- // flushes it if that writer implements interface{ Flush() error }.
- func (w *Writer) Flush() error {
- w.seq++
- w.writePending()
- if w.err != nil {
- return w.err
- }
- if w.f != nil {
- w.err = w.f.Flush()
- return w.err
- }
- return nil
- }
- // Reset resets the journal writer, allows reuse of the journal writer. Reset
- // will also closes the journal writer if not already.
- func (w *Writer) Reset(writer io.Writer) (err error) {
- w.seq++
- if w.err == nil {
- w.writePending()
- err = w.err
- }
- w.w = writer
- w.f, _ = writer.(flusher)
- w.i = 0
- w.j = 0
- w.written = 0
- w.first = false
- w.pending = false
- w.err = nil
- return
- }
- // Next returns a writer for the next journal. The writer returned becomes stale
- // after the next Close, Flush or Next call, and should no longer be used.
- func (w *Writer) Next() (io.Writer, error) {
- w.seq++
- if w.err != nil {
- return nil, w.err
- }
- if w.pending {
- w.fillHeader(true)
- }
- w.i = w.j
- w.j = w.j + headerSize
- // Check if there is room in the block for the header.
- if w.j > blockSize {
- // Fill in the rest of the block with zeroes.
- for k := w.i; k < blockSize; k++ {
- w.buf[k] = 0
- }
- w.writeBlock()
- if w.err != nil {
- return nil, w.err
- }
- }
- w.first = true
- w.pending = true
- return singleWriter{w, w.seq}, nil
- }
- type singleWriter struct {
- w *Writer
- seq int
- }
- func (x singleWriter) Write(p []byte) (int, error) {
- w := x.w
- if w.seq != x.seq {
- return 0, errors.New("leveldb/journal: stale writer")
- }
- if w.err != nil {
- return 0, w.err
- }
- n0 := len(p)
- for len(p) > 0 {
- // Write a block, if it is full.
- if w.j == blockSize {
- w.fillHeader(false)
- w.writeBlock()
- if w.err != nil {
- return 0, w.err
- }
- w.first = false
- }
- // Copy bytes into the buffer.
- n := copy(w.buf[w.j:], p)
- w.j += n
- p = p[n:]
- }
- return n0, nil
- }
|