changeset 609:68887f0a3e83

Add Compressor
author kaiyohugo <41114603+KAIYOHUGO@users.noreply.github.com>
date Tue, 04 Jul 2023 21:50:32 +0800
parents 08549e2435ff
children 9a561cd122c3
files examples/leveldb-tool/src/main.rs examples/stresstest/src/main.rs examples/word-analyze/src/main.rs examples/write-a-lot/src/main.rs src/compressor.rs src/db_impl.rs src/error.rs src/lib.rs src/options.rs src/table_block.rs src/table_builder.rs src/table_reader.rs
diffstat 12 files changed, 125 insertions(+), 57 deletions(-) [+]
line wrap: on
line diff
--- a/examples/leveldb-tool/src/main.rs	Tue Jun 20 17:52:43 2023 +0200
+++ b/examples/leveldb-tool/src/main.rs	Tue Jul 04 21:50:32 2023 +0800
@@ -1,6 +1,6 @@
 extern crate rusty_leveldb;
 
-use rusty_leveldb::{LdbIterator, Options, DB};
+use rusty_leveldb::{CompressorId, LdbIterator, Options, DB};
 
 use std::env::args;
 use std::io::{self, Write};
@@ -59,7 +59,7 @@
     let mut opt = Options::default();
     opt.reuse_logs = false;
     opt.reuse_manifest = false;
-    opt.compression_type = rusty_leveldb::CompressionType::CompressionSnappy;
+    opt.compressor = rusty_leveldb::compressor::SnappyCompressor::ID;
     let mut db = DB::open("tooldb", opt).unwrap();
 
     match args[1].as_str() {
--- a/examples/stresstest/src/main.rs	Tue Jun 20 17:52:43 2023 +0200
+++ b/examples/stresstest/src/main.rs	Tue Jul 04 21:50:32 2023 +0800
@@ -1,5 +1,5 @@
 use rand::distributions::{Alphanumeric, DistString};
-use rusty_leveldb::{Options, DB};
+use rusty_leveldb::{compressor, CompressorId, Options, DB};
 
 const KEY_LEN: usize = 4;
 const VAL_LEN: usize = 8;
@@ -45,7 +45,7 @@
 
     for i in 0..m {
         let mut opt = Options::default();
-        opt.compression_type = rusty_leveldb::CompressionType::CompressionSnappy;
+        opt.compressor = compressor::SnappyCompressor::ID;
         let mut db = DB::open(path, opt).unwrap();
         write(&mut db, N);
         entries += N;
--- a/examples/word-analyze/src/main.rs	Tue Jun 20 17:52:43 2023 +0200
+++ b/examples/word-analyze/src/main.rs	Tue Jul 04 21:50:32 2023 +0800
@@ -1,3 +1,4 @@
+use leveldb::CompressorId;
 use rusty_leveldb as leveldb;
 
 use std::fs::OpenOptions;
@@ -35,7 +36,7 @@
 
 fn main() {
     let mut opts = leveldb::Options::default();
-    opts.compression_type = leveldb::CompressionType::CompressionNone;
+    opts.compressor = leveldb::compressor::NoneCompressor::ID;
     let db = leveldb::DB::open("wordsdb", opts).unwrap();
 
     run(db).unwrap();
--- a/examples/write-a-lot/src/main.rs	Tue Jun 20 17:52:43 2023 +0200
+++ b/examples/write-a-lot/src/main.rs	Tue Jul 04 21:50:32 2023 +0800
@@ -2,9 +2,7 @@
 extern crate rusty_leveldb;
 
 use rand::Rng;
-use rusty_leveldb::CompressionType;
-use rusty_leveldb::Options;
-use rusty_leveldb::DB;
+use rusty_leveldb::{compressor, CompressorId, Options, DB};
 
 use std::error::Error;
 use std::iter::FromIterator;
@@ -31,7 +29,7 @@
 
 fn main() {
     let mut opt = Options::default();
-    opt.compression_type = CompressionType::CompressionSnappy;
+    opt.compressor = compressor::SnappyCompressor::ID;
     let mut db = DB::open("test1", opt).unwrap();
 
     fill_db(&mut db, 32768).unwrap();
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/compressor.rs	Tue Jul 04 21:50:32 2023 +0800
@@ -0,0 +1,55 @@
+pub trait Compressor {
+    fn encode(&self, block: Vec<u8>) -> crate::Result<Vec<u8>>;
+
+    fn decode(&self, block: Vec<u8>) -> crate::Result<Vec<u8>>;
+}
+
+pub trait CompressorId {
+    const ID: u8;
+}
+
+#[derive(Debug, Clone, Copy, Default)]
+pub struct NoneCompressor;
+
+impl NoneCompressor {
+    pub fn new() -> Box<dyn Compressor> {
+        Box::new(Self)
+    }
+}
+
+impl CompressorId for NoneCompressor {
+    const ID: u8 = 0;
+}
+
+impl Compressor for NoneCompressor {
+    fn encode(&self, block: Vec<u8>) -> crate::Result<Vec<u8>> {
+        Ok(block)
+    }
+
+    fn decode(&self, block: Vec<u8>) -> crate::Result<Vec<u8>> {
+        Ok(block)
+    }
+}
+
+#[derive(Debug, Clone, Copy, Default)]
+pub struct SnappyCompressor;
+
+impl SnappyCompressor {
+    pub fn new() -> Box<dyn Compressor> {
+        Box::new(Self)
+    }
+}
+
+impl CompressorId for SnappyCompressor {
+    const ID: u8 = 1;
+}
+
+impl Compressor for SnappyCompressor {
+    fn encode(&self, block: Vec<u8>) -> crate::Result<Vec<u8>> {
+        Ok(snap::raw::Encoder::new().compress_vec(&block)?)
+    }
+
+    fn decode(&self, block: Vec<u8>) -> crate::Result<Vec<u8>> {
+        Ok(snap::raw::Decoder::new().decompress_vec(&block)?)
+    }
+}
--- a/src/db_impl.rs	Tue Jun 20 17:52:43 2023 +0200
+++ b/src/db_impl.rs	Tue Jul 04 21:50:32 2023 +0800
@@ -110,6 +110,9 @@
     /// depends on the options set (`create_if_missing`, `error_if_exists`).
     pub fn open<P: AsRef<Path>>(name: P, opt: Options) -> Result<DB> {
         let name = name.as_ref();
+        if opt.compressor_list[opt.compressor as usize].is_none() {
+            err(StatusCode::InvalidOption, "need set compressor")?;
+        }
         let mut db = DB::new(name, opt);
         let mut ve = VersionEdit::new();
         let save_manifest = db.recover(&mut ve)?;
@@ -889,10 +892,7 @@
         Ok(())
     }
 
-    fn finish_compaction_output(
-        &mut self,
-        cs: &mut CompactionState,
-    ) -> Result<()> {
+    fn finish_compaction_output(&mut self, cs: &mut CompactionState) -> Result<()> {
         assert!(cs.builder.is_some());
         let output_num = cs.current_output().num;
         assert!(output_num > 0);
--- a/src/error.rs	Tue Jun 20 17:52:43 2023 +0200
+++ b/src/error.rs	Tue Jul 04 21:50:32 2023 +0800
@@ -20,6 +20,7 @@
     IOError,
     InvalidArgument,
     InvalidData,
+    InvalidOption,
     LockError,
     NotFound,
     NotSupported,
--- a/src/lib.rs	Tue Jun 20 17:52:43 2023 +0200
+++ b/src/lib.rs	Tue Jul 04 21:50:32 2023 +0800
@@ -73,10 +73,13 @@
 mod db_impl;
 mod db_iter;
 
+pub mod compressor;
+
 #[cfg(feature = "async")]
 pub use asyncdb::AsyncDB;
 
 pub use cmp::{Cmp, DefaultCmp};
+pub use compressor::{Compressor, CompressorId};
 pub use db_impl::DB;
 pub use db_iter::DBIterator;
 pub use disk_env::PosixDiskEnv;
@@ -84,7 +87,7 @@
 pub use error::{Result, Status, StatusCode};
 pub use filter::{BloomPolicy, FilterPolicy};
 pub use mem_env::MemEnv;
-pub use options::{in_memory, CompressionType, Options};
+pub use options::{in_memory, Options};
 pub use skipmap::SkipMap;
 pub use types::LdbIterator;
 pub use write_batch::WriteBatch;
--- a/src/options.rs	Tue Jun 20 17:52:43 2023 +0200
+++ b/src/options.rs	Tue Jul 04 21:50:32 2023 +0800
@@ -1,6 +1,7 @@
 use crate::block::Block;
 use crate::cache::Cache;
 use crate::cmp::{Cmp, DefaultCmp};
+use crate::compressor::Compressor;
 use crate::disk_env;
 use crate::env::Env;
 use crate::filter;
@@ -19,20 +20,6 @@
 const WRITE_BUFFER_SIZE: usize = 4 * MB;
 const DEFAULT_BITS_PER_KEY: u32 = 10; // NOTE: This may need to be optimized.
 
-#[derive(Clone, Copy, PartialEq, Debug)]
-pub enum CompressionType {
-    CompressionNone = 0,
-    CompressionSnappy = 1,
-}
-
-pub fn int_to_compressiontype(i: u32) -> Option<CompressionType> {
-    match i {
-        0 => Some(CompressionType::CompressionNone),
-        1 => Some(CompressionType::CompressionSnappy),
-        _ => None,
-    }
-}
-
 /// Options contains general parameters for a LevelDB instance. Most of the names are
 /// self-explanatory; the defaults are defined in the `Default` implementation.
 #[derive(Clone)]
@@ -51,7 +38,9 @@
     pub block_restart_interval: usize,
     /// Note: you have to open a database with the same compression type as it was written to, in
     /// order to not lose data! (this is a bug and will be fixed)
-    pub compression_type: CompressionType,
+    pub compressor: u8,
+
+    pub compressor_list: Rc<[Option<Box<dyn Compressor>>; 256]>,
     pub reuse_logs: bool,
     pub reuse_manifest: bool,
     pub filter_policy: filter::BoxedFilterPolicy,
@@ -59,6 +48,11 @@
 
 impl Default for Options {
     fn default() -> Options {
+        const INIT: Option<Box<dyn Compressor>> = None;
+        let mut compressor_list = [INIT; 256];
+        compressor_list[0] = Some(crate::compressor::NoneCompressor::new());
+        compressor_list[1] = Some(crate::compressor::SnappyCompressor::new());
+
         Options {
             cmp: Rc::new(Box::new(DefaultCmp)),
             env: Rc::new(Box::new(disk_env::PosixDiskEnv::new())),
@@ -75,7 +69,8 @@
             block_restart_interval: 16,
             reuse_logs: true,
             reuse_manifest: true,
-            compression_type: CompressionType::CompressionNone,
+            compressor: 0,
+            compressor_list: Rc::new(compressor_list),
             filter_policy: Rc::new(Box::new(filter::BloomPolicy::new(DEFAULT_BITS_PER_KEY))),
         }
     }
--- a/src/table_block.rs	Tue Jun 20 17:52:43 2023 +0200
+++ b/src/table_block.rs	Tue Jul 04 21:50:32 2023 +0800
@@ -5,7 +5,7 @@
 use crate::filter;
 use crate::filter_block::FilterBlockReader;
 use crate::log::unmask_crc;
-use crate::options::{self, CompressionType, Options};
+use crate::options::Options;
 use crate::table_builder;
 
 use crc::crc32::{self, Hasher32};
@@ -69,15 +69,9 @@
             ),
         );
     }
-
-    if let Some(ctype) = options::int_to_compressiontype(compress[0] as u32) {
-        match ctype {
-            CompressionType::CompressionNone => Ok(Block::new(opt, buf)),
-            CompressionType::CompressionSnappy => {
-                let decoded = snap::raw::Decoder::new().decompress_vec(buf.as_slice())?;
-                Ok(Block::new(opt, decoded))
-            }
-        }
+    let compressor_list = opt.compressor_list.clone();
+    if let Some(compressor) = compressor_list[compress[0] as usize].as_ref() {
+        Ok(Block::new(opt, compressor.decode(buf)?))
     } else {
         err(StatusCode::InvalidData, "invalid compression type")
     }
--- a/src/table_builder.rs	Tue Jun 20 17:52:43 2023 +0200
+++ b/src/table_builder.rs	Tue Jul 04 21:50:32 2023 +0800
@@ -2,12 +2,13 @@
 use crate::block_builder::BlockBuilder;
 use crate::blockhandle::BlockHandle;
 use crate::cmp::InternalKeyCmp;
+use crate::compressor::{self, Compressor, CompressorId};
 use crate::error::Result;
 use crate::filter::{InternalFilterPolicy, NoFilterPolicy};
 use crate::filter_block::FilterBlockBuilder;
 use crate::key_types::InternalKey;
 use crate::log::mask_crc;
-use crate::options::{CompressionType, Options};
+use crate::options::Options;
 
 use std::cmp::Ordering;
 use std::io::Write;
@@ -178,8 +179,12 @@
         self.prev_block_last_key = Vec::from(block.last_key());
         let contents = block.finish();
 
-        let ctype = self.opt.compression_type;
-        let handle = self.write_block(contents, ctype)?;
+        let compressor_list = self.opt.compressor_list.clone();
+        let compressor = compressor_list[self.opt.compressor as usize]
+            .as_ref()
+            .unwrap();
+
+        let handle = self.write_block(contents, (self.opt.compressor, compressor))?;
 
         let mut handle_enc = [0 as u8; 16];
         let enc_len = handle.encode_to(&mut handle_enc);
@@ -198,19 +203,21 @@
     }
 
     /// Calculates the checksum, writes the block to disk and updates the offset.
-    fn write_block(&mut self, block: BlockContents, ctype: CompressionType) -> Result<BlockHandle> {
-        let mut data = block;
-        if ctype == CompressionType::CompressionSnappy {
-            data = snap::raw::Encoder::new().compress_vec(&data)?;
-        }
+    fn write_block(
+        &mut self,
+        block: BlockContents,
+        compressor_id_pair: (u8, &Box<dyn Compressor>),
+    ) -> Result<BlockHandle> {
+        let (ctype, compressor) = compressor_id_pair;
+        let data = compressor.encode(block)?;
 
         let mut digest = crc32::Digest::new(crc32::CASTAGNOLI);
 
         digest.write(&data);
-        digest.write(&[ctype as u8; TABLE_BLOCK_COMPRESS_LEN]);
+        digest.write(&[ctype; TABLE_BLOCK_COMPRESS_LEN]);
 
         self.dst.write(&data)?;
-        self.dst.write(&[ctype as u8; TABLE_BLOCK_COMPRESS_LEN])?;
+        self.dst.write(&[ctype; TABLE_BLOCK_COMPRESS_LEN])?;
         self.dst.write_fixedint(mask_crc(digest.sum32()))?;
 
         let handle = BlockHandle::new(self.offset, data.len());
@@ -221,7 +228,14 @@
 
     pub fn finish(mut self) -> Result<usize> {
         assert!(self.data_block.is_some());
-        let ctype = self.opt.compression_type;
+
+        let compressor_list = self.opt.compressor_list.clone();
+        let compressor_id_pair = (
+            self.opt.compressor,
+            compressor_list[self.opt.compressor as usize]
+                .as_ref()
+                .unwrap(),
+        );
 
         // If there's a pending data block, write it
         if self.data_block.as_ref().unwrap().entries() > 0 {
@@ -241,7 +255,13 @@
             let fblock = self.filter_block.take().unwrap();
             let filter_key = format!("filter.{}", fblock.filter_name());
             let fblock_data = fblock.finish();
-            let fblock_handle = self.write_block(fblock_data, CompressionType::CompressionNone)?;
+            let fblock_handle = self.write_block(
+                fblock_data,
+                (
+                    compressor::NoneCompressor::ID,
+                    &compressor::NoneCompressor::new(),
+                ),
+            )?;
 
             let mut handle_enc = [0 as u8; 16];
             let enc_len = fblock_handle.encode_to(&mut handle_enc);
@@ -251,11 +271,11 @@
 
         // write metaindex block
         let meta_ix = meta_ix_block.finish();
-        let meta_ix_handle = self.write_block(meta_ix, ctype)?;
+        let meta_ix_handle = self.write_block(meta_ix, compressor_id_pair)?;
 
         // write index block
         let index_cont = self.index_block.take().unwrap().finish();
-        let ix_handle = self.write_block(index_cont, ctype)?;
+        let ix_handle = self.write_block(index_cont, compressor_id_pair)?;
 
         // write footer.
         let footer = Footer::new(meta_ix_handle, ix_handle);
@@ -292,7 +312,7 @@
         let mut d = Vec::with_capacity(512);
         let mut opt = options::for_test();
         opt.block_restart_interval = 3;
-        opt.compression_type = CompressionType::CompressionSnappy;
+        opt.compressor = compressor::SnappyCompressor::ID;
         let mut b = TableBuilder::new_raw(opt, &mut d);
 
         let data = vec![
--- a/src/table_reader.rs	Tue Jun 20 17:52:43 2023 +0200
+++ b/src/table_reader.rs	Tue Jul 04 21:50:32 2023 +0800
@@ -374,12 +374,13 @@
 
 #[cfg(test)]
 mod tests {
+    use crate::compressor::CompressorId;
     use crate::filter::BloomPolicy;
     use crate::key_types::LookupKey;
-    use crate::options::{self, CompressionType};
     use crate::table_builder::TableBuilder;
     use crate::test_util::{test_iterator_properties, LdbIteratorIter};
     use crate::types::{current_key_val, LdbIterator};
+    use crate::{compressor, options};
 
     use super::*;
 
@@ -405,7 +406,7 @@
         let mut opt = options::for_test();
         opt.block_restart_interval = 2;
         opt.block_size = 32;
-        opt.compression_type = CompressionType::CompressionSnappy;
+        opt.compressor = compressor::SnappyCompressor::ID;
 
         {
             // Uses the standard comparator in opt.