changeset 23:eaa9fd7c72ba

Implement LogReader
author Lewin Bormann <lbo@spheniscida.de>
date Sun, 12 Jun 2016 10:08:44 +0200
parents b6cd846558b7
children cc51ccd8970c
files src/log.rs
diffstat 1 files changed, 133 insertions(+), 8 deletions(-) [+]
line wrap: on
line diff
--- a/src/log.rs	Sat Jun 11 20:41:44 2016 +0200
+++ b/src/log.rs	Sun Jun 12 10:08:44 2016 +0200
@@ -5,7 +5,7 @@
 //! A record is a bytestring: [checksum: uint32, length: uint16, type: uint8, data: [u8]]
 //! checksum is the crc32 sum of type and data; type is one of RecordType::{Full/First/Middle/Last}
 
-use std::io::{Result, Write};
+use std::io::{Error, ErrorKind, Read, Result, Write};
 
 use crc::crc32;
 use crc::Hasher32;
@@ -25,13 +25,18 @@
 pub struct LogWriter<W: Write> {
     dst: W,
     current_block_offset: usize,
+    block_size: usize,
+    digest: crc32::Digest,
 }
 
 impl<W: Write> LogWriter<W> {
     pub fn new(writer: W) -> LogWriter<W> {
+        let digest = crc32::Digest::new(crc32::CASTAGNOLI);
         LogWriter {
             dst: writer,
             current_block_offset: 0,
+            block_size: BLOCK_SIZE,
+            digest: digest,
         }
     }
 
@@ -40,7 +45,9 @@
         let mut first_frag = true;
         let mut result = Ok(0);
         while result.is_ok() && record.len() > 0 {
-            let space_left = BLOCK_SIZE - self.current_block_offset;
+            assert!(self.block_size > HEADER_SIZE);
+
+            let space_left = self.block_size - self.current_block_offset;
 
             // Fill up block; go to next block.
             if space_left < HEADER_SIZE {
@@ -48,7 +55,7 @@
                 self.current_block_offset = 0;
             }
 
-            let avail_for_data = BLOCK_SIZE - self.current_block_offset - HEADER_SIZE;
+            let avail_for_data = self.block_size - self.current_block_offset - HEADER_SIZE;
 
             let data_frag_len = if record.len() < avail_for_data {
                 record.len()
@@ -78,23 +85,107 @@
     fn emit_record(&mut self, t: RecordType, data: &[u8], len: usize) -> Result<usize> {
         assert!(len < 256 * 256);
 
-        let mut digest = crc32::Digest::new(crc32::CASTAGNOLI);
-        digest.write(&[t as u8]);
-        digest.write(data);
+        self.digest.reset();
+        self.digest.write(&[t as u8]);
+        self.digest.write(&data[0..len]);
 
-        let chksum = digest.sum32();
+        let chksum = self.digest.sum32();
 
         let mut s = 0;
         s += try!(self.dst.write(&chksum.encode_fixed_vec()));
         s += try!(self.dst.write(&(len as u16).encode_fixed_vec()));
         s += try!(self.dst.write(&[t as u8]));
-        s += try!(self.dst.write(data));
+        s += try!(self.dst.write(&data[0..len]));
 
         self.current_block_offset += s;
         Ok(s)
     }
 }
 
+
+pub struct LogReader<R: Read> {
+    src: R,
+    blk_off: usize,
+    blocksize: usize,
+    checksums: bool,
+
+    digest: crc32::Digest,
+    head_scratch: [u8; 7],
+}
+
+impl<R: Read> LogReader<R> {
+    pub fn new(src: R, chksum: bool, offset: usize) -> LogReader<R> {
+        LogReader {
+            src: src,
+            blk_off: offset,
+            blocksize: BLOCK_SIZE,
+            checksums: chksum,
+            head_scratch: [0; 7],
+            digest: crc32::Digest::new(crc32::CASTAGNOLI),
+        }
+    }
+
+    /// EOF is signalled by Ok(0)
+    pub fn read(&mut self, dst: &mut Vec<u8>) -> Result<usize> {
+        let mut checksum: u32;
+        let mut length: u16;
+        let mut typ: u8;
+        let mut dst_offset: usize = 0;
+
+        dst.clear();
+
+        loop {
+            if self.blocksize - self.blk_off < HEADER_SIZE {
+                // skip to next block
+                try!(self.src.read(&mut self.head_scratch[0..self.blocksize - self.blk_off]));
+                self.blk_off = 0;
+            }
+
+            let mut bytes_read = try!(self.src.read(&mut self.head_scratch));
+
+            // EOF
+            if bytes_read == 0 {
+                return Ok(0);
+            }
+
+            self.blk_off += bytes_read;
+
+            checksum = u32::decode_fixed(&self.head_scratch[0..4]);
+            length = u16::decode_fixed(&self.head_scratch[4..6]);
+            typ = self.head_scratch[6];
+
+            dst.resize(dst_offset + length as usize, 0);
+            bytes_read = try!(self.src
+                .read(&mut dst[dst_offset..dst_offset + length as usize]));
+            self.blk_off += bytes_read;
+
+            if self.checksums &&
+               !self.check_integrity(typ, &dst[dst_offset..dst_offset + bytes_read], checksum) {
+                return Err(Error::new(ErrorKind::InvalidData, "Invalid Checksum".to_string()));
+            }
+
+            dst_offset += length as usize;
+
+            if typ == RecordType::Full as u8 {
+                return Ok(dst_offset);
+            } else if typ == RecordType::First as u8 {
+                continue;
+            } else if typ == RecordType::Middle as u8 {
+                continue;
+            } else if typ == RecordType::Last as u8 {
+                return Ok(dst_offset);
+            }
+        }
+    }
+
+    fn check_integrity(&mut self, typ: u8, data: &[u8], expected: u32) -> bool {
+        self.digest.reset();
+        self.digest.write(&[typ]);
+        self.digest.write(data);
+        expected == self.digest.sum32()
+    }
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;
@@ -109,4 +200,38 @@
         assert_eq!(lw.current_block_offset, data.len() + super::HEADER_SIZE);
         assert_eq!(&lw.dst[super::HEADER_SIZE..], data.as_slice());
     }
+
+    #[test]
+    fn test_reader() {
+        let data = vec!["abcdefghi".as_bytes().to_vec(), // fits one block of 17
+                        "123456789012".as_bytes().to_vec(), // spans two blocks of 17
+                        "0101010101010101010101".as_bytes().to_vec()]; // spans three blocks of 17
+        let mut lw = LogWriter::new(Vec::new());
+        lw.block_size = super::HEADER_SIZE + 10;
+
+        for e in data.iter() {
+            assert!(lw.add_record(e).is_ok());
+        }
+
+        assert_eq!(lw.dst.len(), 93);
+
+        let mut lr = LogReader::new(lw.dst.as_slice(), true, 0);
+        lr.blocksize = super::HEADER_SIZE + 10;
+        let mut dst = Vec::with_capacity(128);
+        let mut i = 0;
+
+        loop {
+            let r = lr.read(&mut dst);
+
+            if !r.is_ok() {
+                panic!("{}", r.unwrap_err());
+            } else if r.unwrap() == 0 {
+                break;
+            }
+
+            assert_eq!(dst, data[i]);
+            i += 1;
+        }
+        assert_eq!(i, data.len());
+    }
 }