Mercurial > lbo > hg > leveldb-rs
changeset 23:eaa9fd7c72ba
Implement LogReader
author | Lewin Bormann <lbo@spheniscida.de> |
---|---|
date | Sun, 12 Jun 2016 10:08:44 +0200 |
parents | b6cd846558b7 |
children | cc51ccd8970c |
files | src/log.rs |
diffstat | 1 files changed, 133 insertions(+), 8 deletions(-) [+] |
line wrap: on
line diff
--- a/src/log.rs Sat Jun 11 20:41:44 2016 +0200 +++ b/src/log.rs Sun Jun 12 10:08:44 2016 +0200 @@ -5,7 +5,7 @@ //! A record is a bytestring: [checksum: uint32, length: uint16, type: uint8, data: [u8]] //! checksum is the crc32 sum of type and data; type is one of RecordType::{Full/First/Middle/Last} -use std::io::{Result, Write}; +use std::io::{Error, ErrorKind, Read, Result, Write}; use crc::crc32; use crc::Hasher32; @@ -25,13 +25,18 @@ pub struct LogWriter<W: Write> { dst: W, current_block_offset: usize, + block_size: usize, + digest: crc32::Digest, } impl<W: Write> LogWriter<W> { pub fn new(writer: W) -> LogWriter<W> { + let digest = crc32::Digest::new(crc32::CASTAGNOLI); LogWriter { dst: writer, current_block_offset: 0, + block_size: BLOCK_SIZE, + digest: digest, } } @@ -40,7 +45,9 @@ let mut first_frag = true; let mut result = Ok(0); while result.is_ok() && record.len() > 0 { - let space_left = BLOCK_SIZE - self.current_block_offset; + assert!(self.block_size > HEADER_SIZE); + + let space_left = self.block_size - self.current_block_offset; // Fill up block; go to next block. if space_left < HEADER_SIZE { @@ -48,7 +55,7 @@ self.current_block_offset = 0; } - let avail_for_data = BLOCK_SIZE - self.current_block_offset - HEADER_SIZE; + let avail_for_data = self.block_size - self.current_block_offset - HEADER_SIZE; let data_frag_len = if record.len() < avail_for_data { record.len() @@ -78,23 +85,107 @@ fn emit_record(&mut self, t: RecordType, data: &[u8], len: usize) -> Result<usize> { assert!(len < 256 * 256); - let mut digest = crc32::Digest::new(crc32::CASTAGNOLI); - digest.write(&[t as u8]); - digest.write(data); + self.digest.reset(); + self.digest.write(&[t as u8]); + self.digest.write(&data[0..len]); - let chksum = digest.sum32(); + let chksum = self.digest.sum32(); let mut s = 0; s += try!(self.dst.write(&chksum.encode_fixed_vec())); s += try!(self.dst.write(&(len as u16).encode_fixed_vec())); s += try!(self.dst.write(&[t as u8])); - s += try!(self.dst.write(data)); + s += try!(self.dst.write(&data[0..len])); self.current_block_offset += s; Ok(s) } } + +pub struct LogReader<R: Read> { + src: R, + blk_off: usize, + blocksize: usize, + checksums: bool, + + digest: crc32::Digest, + head_scratch: [u8; 7], +} + +impl<R: Read> LogReader<R> { + pub fn new(src: R, chksum: bool, offset: usize) -> LogReader<R> { + LogReader { + src: src, + blk_off: offset, + blocksize: BLOCK_SIZE, + checksums: chksum, + head_scratch: [0; 7], + digest: crc32::Digest::new(crc32::CASTAGNOLI), + } + } + + /// EOF is signalled by Ok(0) + pub fn read(&mut self, dst: &mut Vec<u8>) -> Result<usize> { + let mut checksum: u32; + let mut length: u16; + let mut typ: u8; + let mut dst_offset: usize = 0; + + dst.clear(); + + loop { + if self.blocksize - self.blk_off < HEADER_SIZE { + // skip to next block + try!(self.src.read(&mut self.head_scratch[0..self.blocksize - self.blk_off])); + self.blk_off = 0; + } + + let mut bytes_read = try!(self.src.read(&mut self.head_scratch)); + + // EOF + if bytes_read == 0 { + return Ok(0); + } + + self.blk_off += bytes_read; + + checksum = u32::decode_fixed(&self.head_scratch[0..4]); + length = u16::decode_fixed(&self.head_scratch[4..6]); + typ = self.head_scratch[6]; + + dst.resize(dst_offset + length as usize, 0); + bytes_read = try!(self.src + .read(&mut dst[dst_offset..dst_offset + length as usize])); + self.blk_off += bytes_read; + + if self.checksums && + !self.check_integrity(typ, &dst[dst_offset..dst_offset + bytes_read], checksum) { + return Err(Error::new(ErrorKind::InvalidData, "Invalid Checksum".to_string())); + } + + dst_offset += length as usize; + + if typ == RecordType::Full as u8 { + return Ok(dst_offset); + } else if typ == RecordType::First as u8 { + continue; + } else if typ == RecordType::Middle as u8 { + continue; + } else if typ == RecordType::Last as u8 { + return Ok(dst_offset); + } + } + } + + fn check_integrity(&mut self, typ: u8, data: &[u8], expected: u32) -> bool { + self.digest.reset(); + self.digest.write(&[typ]); + self.digest.write(data); + expected == self.digest.sum32() + } +} + #[cfg(test)] mod tests { use super::*; @@ -109,4 +200,38 @@ assert_eq!(lw.current_block_offset, data.len() + super::HEADER_SIZE); assert_eq!(&lw.dst[super::HEADER_SIZE..], data.as_slice()); } + + #[test] + fn test_reader() { + let data = vec!["abcdefghi".as_bytes().to_vec(), // fits one block of 17 + "123456789012".as_bytes().to_vec(), // spans two blocks of 17 + "0101010101010101010101".as_bytes().to_vec()]; // spans three blocks of 17 + let mut lw = LogWriter::new(Vec::new()); + lw.block_size = super::HEADER_SIZE + 10; + + for e in data.iter() { + assert!(lw.add_record(e).is_ok()); + } + + assert_eq!(lw.dst.len(), 93); + + let mut lr = LogReader::new(lw.dst.as_slice(), true, 0); + lr.blocksize = super::HEADER_SIZE + 10; + let mut dst = Vec::with_capacity(128); + let mut i = 0; + + loop { + let r = lr.read(&mut dst); + + if !r.is_ok() { + panic!("{}", r.unwrap_err()); + } else if r.unwrap() == 0 { + break; + } + + assert_eq!(dst, data[i]); + i += 1; + } + assert_eq!(i, data.len()); + } }