changeset 614:e41530366262

Merge pull request #33 from KAIYOHUGO/master Add Compressor
author Lewin Bormann <lbo@spheniscida.de>
date Sat, 15 Jul 2023 22:09:44 +0200
parents 2abfa394ba2d (current diff) 3cfa3633c012 (diff)
children f48ec74f26b0
files src/db_impl.rs
diffstat 15 files changed, 291 insertions(+), 71 deletions(-) [+]
line wrap: on
line diff
--- a/Cargo.toml	Sat Jul 15 22:06:55 2023 +0200
+++ b/Cargo.toml	Sat Jul 15 22:09:44 2023 +0200
@@ -36,5 +36,11 @@
 path = "src/benches/maps_bench.rs"
 
 [workspace]
-members = ["examples/write-a-lot", "examples/leveldb-tool", "examples/word-analyze",  "examples/stresstest", "examples/asyncdb"]
-
+members = [
+    "examples/write-a-lot",
+    "examples/leveldb-tool",
+    "examples/word-analyze",
+    "examples/stresstest",
+    "examples/asyncdb",
+    "examples/mcpe",
+]
--- a/examples/leveldb-tool/src/main.rs	Sat Jul 15 22:06:55 2023 +0200
+++ b/examples/leveldb-tool/src/main.rs	Sat Jul 15 22:09:44 2023 +0200
@@ -1,6 +1,6 @@
 extern crate rusty_leveldb;
 
-use rusty_leveldb::{LdbIterator, Options, DB};
+use rusty_leveldb::{compressor, CompressorId, LdbIterator, Options, DB};
 
 use std::env::args;
 use std::io::{self, Write};
@@ -59,7 +59,7 @@
     let mut opt = Options::default();
     opt.reuse_logs = false;
     opt.reuse_manifest = false;
-    opt.compression_type = rusty_leveldb::CompressionType::CompressionSnappy;
+    opt.compressor = compressor::SnappyCompressor::ID;
     let mut db = DB::open("tooldb", opt).unwrap();
 
     match args[1].as_str() {
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/examples/mcpe/Cargo.toml	Sat Jul 15 22:09:44 2023 +0200
@@ -0,0 +1,8 @@
+[package]
+name = "mcpe"
+version = "0.1.0"
+edition = "2021"
+
+[dependencies]
+miniz_oxide = "0.7.1"
+rusty-leveldb = { path = "../../" }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/examples/mcpe/README.md	Sat Jul 15 22:09:44 2023 +0200
@@ -0,0 +1,5 @@
+# MCPE
+
+This example show how to customize compression method.
+
+This setup is compatible to [Mojang's leveldb](https://github.com/Mojang/leveldb-mcpe).
\ No newline at end of file
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/examples/mcpe/src/main.rs	Sat Jul 15 22:09:44 2023 +0200
@@ -0,0 +1,70 @@
+use miniz_oxide::deflate::{compress_to_vec, compress_to_vec_zlib};
+use miniz_oxide::inflate::{decompress_to_vec, decompress_to_vec_zlib};
+use rusty_leveldb::{Compressor, CompressorList, Options, DB};
+use std::rc::Rc;
+
+struct ZlibCompressor(u8);
+
+impl ZlibCompressor {
+    /// level 0-10
+    pub fn new(level: u8) -> Self {
+        assert!(level <= 10);
+        Self(level)
+    }
+}
+
+impl Compressor for ZlibCompressor {
+    fn encode(&self, block: Vec<u8>) -> rusty_leveldb::Result<Vec<u8>> {
+        Ok(compress_to_vec_zlib(&block, self.0))
+    }
+
+    fn decode(&self, block: Vec<u8>) -> rusty_leveldb::Result<Vec<u8>> {
+        decompress_to_vec_zlib(&block).map_err(|e| rusty_leveldb::Status {
+            code: rusty_leveldb::StatusCode::CompressionError,
+            err: e.to_string(),
+        })
+    }
+}
+
+struct RawZlibCompressor(u8);
+
+impl RawZlibCompressor {
+    /// level 0-10
+    pub fn new(level: u8) -> Self {
+        assert!(level <= 10);
+        Self(level)
+    }
+}
+
+impl Compressor for RawZlibCompressor {
+    fn encode(&self, block: Vec<u8>) -> rusty_leveldb::Result<Vec<u8>> {
+        Ok(compress_to_vec(&block, self.0))
+    }
+
+    fn decode(&self, block: Vec<u8>) -> rusty_leveldb::Result<Vec<u8>> {
+        decompress_to_vec(&block).map_err(|e| rusty_leveldb::Status {
+            code: rusty_leveldb::StatusCode::CompressionError,
+            err: e.to_string(),
+        })
+    }
+}
+
+pub fn mcpe_options(compression_level: u8) -> Options {
+    let mut opt = Options::default();
+    opt.compressor = 0;
+    let mut list = CompressorList::new();
+    list.set_with_id(0, RawZlibCompressor::new(compression_level));
+    list.set_with_id(1, ZlibCompressor::new(compression_level));
+    opt.compressor_list = Rc::new(list);
+    opt
+}
+
+fn main() {
+    let path = "mcpe_db";
+    let compression_level = 10;
+    let opt = mcpe_options(compression_level);
+    let mut db = DB::open(path, opt).unwrap();
+    db.put(b"~local_player", b"NBT data goes here").unwrap();
+    let value = db.get(b"~local_player").unwrap();
+    assert_eq!(&value, b"NBT data goes here")
+}
--- a/examples/stresstest/src/main.rs	Sat Jul 15 22:06:55 2023 +0200
+++ b/examples/stresstest/src/main.rs	Sat Jul 15 22:09:44 2023 +0200
@@ -1,5 +1,5 @@
 use rand::distributions::{Alphanumeric, DistString};
-use rusty_leveldb::{Options, DB};
+use rusty_leveldb::{compressor, CompressorId, Options, DB};
 
 const KEY_LEN: usize = 4;
 const VAL_LEN: usize = 8;
@@ -12,7 +12,7 @@
 
 fn write(db: &mut DB, n: usize) {
     time_test::time_test!("write");
-    for i in 0..n {
+    for _ in 0..n {
         let (k, v) = (gen_string(KEY_LEN), gen_string(VAL_LEN));
 
         db.put(k.as_bytes(), v.as_bytes()).unwrap();
@@ -27,10 +27,10 @@
 fn read(db: &mut DB, n: usize) -> usize {
     let mut succ = 0;
     time_test::time_test!("read");
-    for i in 0..n {
+    for _ in 0..n {
         let k = gen_string(KEY_LEN);
 
-        if let Some(v) = db.get(k.as_bytes()) {
+        if let Some(_) = db.get(k.as_bytes()) {
             succ += 1;
         }
     }
@@ -38,20 +38,20 @@
 }
 
 fn main() {
-    let N = 100_000;
+    let n = 100_000;
     let m = 10;
     let path = "stresstestdb";
     let mut entries = 0;
 
     for i in 0..m {
         let mut opt = Options::default();
-        opt.compression_type = rusty_leveldb::CompressionType::CompressionSnappy;
+        opt.compressor = compressor::SnappyCompressor::ID;
         let mut db = DB::open(path, opt).unwrap();
-        write(&mut db, N);
-        entries += N;
+        write(&mut db, n);
+        entries += n;
         println!("Wrote {} entries ({}/{})", entries, i + 1, m);
 
-        let s = read(&mut db, N);
-        println!("Read back {} entries (found {}) ({}/{})", N, s, i + 1, m);
+        let s = read(&mut db, n);
+        println!("Read back {} entries (found {}) ({}/{})", n, s, i + 1, m);
     }
 }
--- a/examples/word-analyze/src/main.rs	Sat Jul 15 22:06:55 2023 +0200
+++ b/examples/word-analyze/src/main.rs	Sat Jul 15 22:09:44 2023 +0200
@@ -1,3 +1,4 @@
+use leveldb::CompressorId;
 use rusty_leveldb as leveldb;
 
 use std::fs::OpenOptions;
@@ -35,7 +36,7 @@
 
 fn main() {
     let mut opts = leveldb::Options::default();
-    opts.compression_type = leveldb::CompressionType::CompressionNone;
+    opts.compressor = leveldb::compressor::NoneCompressor::ID;
     let db = leveldb::DB::open("wordsdb", opts).unwrap();
 
     run(db).unwrap();
--- a/examples/write-a-lot/src/main.rs	Sat Jul 15 22:06:55 2023 +0200
+++ b/examples/write-a-lot/src/main.rs	Sat Jul 15 22:09:44 2023 +0200
@@ -2,9 +2,7 @@
 extern crate rusty_leveldb;
 
 use rand::Rng;
-use rusty_leveldb::CompressionType;
-use rusty_leveldb::Options;
-use rusty_leveldb::DB;
+use rusty_leveldb::{compressor, CompressorId, Options, DB};
 
 use std::error::Error;
 use std::iter::FromIterator;
@@ -31,7 +29,7 @@
 
 fn main() {
     let mut opt = Options::default();
-    opt.compression_type = CompressionType::CompressionSnappy;
+    opt.compressor = compressor::SnappyCompressor::ID;
     let mut db = DB::open("test1", opt).unwrap();
 
     fill_db(&mut db, 32768).unwrap();
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/compressor.rs	Sat Jul 15 22:09:44 2023 +0200
@@ -0,0 +1,77 @@
+/// Custom compression method
+///
+/// ```
+/// # use rusty_leveldb::{Compressor, CompressorId};
+///
+/// #[derive(Debug, Clone, Copy, Default)]
+/// pub struct CustomCompressor;
+///
+/// impl CompressorId for CustomCompressor {
+///     // a unique id to identify what compressor should DB use
+///     const ID: u8 = 42;
+/// }
+///
+/// impl Compressor for CustomCompressor {
+///     fn encode(&self, block: Vec<u8>) -> rusty_leveldb::Result<Vec<u8>> {
+///         // Do something
+///         Ok(block)
+///     }
+///
+///     fn decode(&self, block: Vec<u8>) -> rusty_leveldb::Result<Vec<u8>> {
+///         // Do something
+///         Ok(block)
+///     }
+/// }
+/// ```
+///
+/// See [crate::CompressorList] for usage
+pub trait Compressor {
+    fn encode(&self, block: Vec<u8>) -> crate::Result<Vec<u8>>;
+
+    fn decode(&self, block: Vec<u8>) -> crate::Result<Vec<u8>>;
+}
+
+/// Set default compressor id
+pub trait CompressorId {
+    const ID: u8;
+}
+
+/// A compressor that do **Nothing**
+///
+/// It default id is `0`
+#[derive(Debug, Clone, Copy, Default)]
+pub struct NoneCompressor;
+
+impl CompressorId for NoneCompressor {
+    const ID: u8 = 0;
+}
+
+impl Compressor for NoneCompressor {
+    fn encode(&self, block: Vec<u8>) -> crate::Result<Vec<u8>> {
+        Ok(block)
+    }
+
+    fn decode(&self, block: Vec<u8>) -> crate::Result<Vec<u8>> {
+        Ok(block)
+    }
+}
+
+/// A compressor that compress data with Google's Snappy
+///
+/// It default id is `1`
+#[derive(Debug, Clone, Copy, Default)]
+pub struct SnappyCompressor;
+
+impl CompressorId for SnappyCompressor {
+    const ID: u8 = 1;
+}
+
+impl Compressor for SnappyCompressor {
+    fn encode(&self, block: Vec<u8>) -> crate::Result<Vec<u8>> {
+        Ok(snap::raw::Encoder::new().compress_vec(&block)?)
+    }
+
+    fn decode(&self, block: Vec<u8>) -> crate::Result<Vec<u8>> {
+        Ok(snap::raw::Decoder::new().decompress_vec(&block)?)
+    }
+}
--- a/src/db_impl.rs	Sat Jul 15 22:06:55 2023 +0200
+++ b/src/db_impl.rs	Sat Jul 15 22:09:44 2023 +0200
@@ -890,10 +890,7 @@
         Ok(())
     }
 
-    fn finish_compaction_output(
-        &mut self,
-        cs: &mut CompactionState,
-    ) -> Result<()> {
+    fn finish_compaction_output(&mut self, cs: &mut CompactionState) -> Result<()> {
         assert!(cs.builder.is_some());
         let output_num = cs.current_output().num;
         assert!(output_num > 0);
--- a/src/lib.rs	Sat Jul 15 22:06:55 2023 +0200
+++ b/src/lib.rs	Sat Jul 15 22:09:44 2023 +0200
@@ -73,10 +73,13 @@
 mod db_impl;
 mod db_iter;
 
+pub mod compressor;
+
 #[cfg(feature = "async")]
 pub use asyncdb::AsyncDB;
 
 pub use cmp::{Cmp, DefaultCmp};
+pub use compressor::{Compressor, CompressorId};
 pub use db_impl::DB;
 pub use db_iter::DBIterator;
 pub use disk_env::PosixDiskEnv;
@@ -84,7 +87,7 @@
 pub use error::{Result, Status, StatusCode};
 pub use filter::{BloomPolicy, FilterPolicy};
 pub use mem_env::MemEnv;
-pub use options::{in_memory, CompressionType, Options};
+pub use options::{in_memory, CompressorList, Options};
 pub use skipmap::SkipMap;
 pub use types::LdbIterator;
 pub use write_batch::WriteBatch;
--- a/src/options.rs	Sat Jul 15 22:06:55 2023 +0200
+++ b/src/options.rs	Sat Jul 15 22:09:44 2023 +0200
@@ -1,12 +1,13 @@
 use crate::block::Block;
 use crate::cache::Cache;
 use crate::cmp::{Cmp, DefaultCmp};
-use crate::disk_env;
+use crate::compressor::{self, Compressor, CompressorId};
 use crate::env::Env;
-use crate::filter;
 use crate::infolog::{self, Logger};
 use crate::mem_env::MemEnv;
 use crate::types::{share, Shared};
+use crate::{disk_env, Result};
+use crate::{filter, Status, StatusCode};
 
 use std::default::Default;
 use std::rc::Rc;
@@ -19,20 +20,6 @@
 const WRITE_BUFFER_SIZE: usize = 4 * MB;
 const DEFAULT_BITS_PER_KEY: u32 = 10; // NOTE: This may need to be optimized.
 
-#[derive(Clone, Copy, PartialEq, Debug)]
-pub enum CompressionType {
-    CompressionNone = 0,
-    CompressionSnappy = 1,
-}
-
-pub fn int_to_compressiontype(i: u32) -> Option<CompressionType> {
-    match i {
-        0 => Some(CompressionType::CompressionNone),
-        1 => Some(CompressionType::CompressionSnappy),
-        _ => None,
-    }
-}
-
 /// Options contains general parameters for a LevelDB instance. Most of the names are
 /// self-explanatory; the defaults are defined in the `Default` implementation.
 #[derive(Clone)]
@@ -49,9 +36,13 @@
     pub block_cache: Shared<Cache<Block>>,
     pub block_size: usize,
     pub block_restart_interval: usize,
+    /// Compressor id in compressor list
+    ///
     /// Note: you have to open a database with the same compression type as it was written to, in
     /// order to not lose data! (this is a bug and will be fixed)
-    pub compression_type: CompressionType,
+    pub compressor: u8,
+
+    pub compressor_list: Rc<CompressorList>,
     pub reuse_logs: bool,
     pub reuse_manifest: bool,
     pub filter_policy: filter::BoxedFilterPolicy,
@@ -75,12 +66,65 @@
             block_restart_interval: 16,
             reuse_logs: true,
             reuse_manifest: true,
-            compression_type: CompressionType::CompressionNone,
+            compressor: 0,
+            compressor_list: Rc::new(CompressorList::default()),
             filter_policy: Rc::new(Box::new(filter::BloomPolicy::new(DEFAULT_BITS_PER_KEY))),
         }
     }
 }
 
+/// Customize compressor method for leveldb
+///
+/// `Default` value is like the code below
+/// ```
+/// # use rusty_leveldb::{compressor, CompressorList};
+/// let mut list = CompressorList::new();
+/// list.set(compressor::NoneCompressor);
+/// list.set(compressor::SnappyCompressor);
+/// ```
+pub struct CompressorList([Option<Box<dyn Compressor>>; 256]);
+
+impl CompressorList {
+    /// Create a **Empty** compressor list
+    pub fn new() -> Self {
+        const INIT: Option<Box<dyn Compressor>> = None;
+        Self([INIT; 256])
+    }
+
+    /// Set compressor with the id in `CompressorId` trait
+    pub fn set<T>(&mut self, compressor: T)
+    where
+        T: Compressor + CompressorId + 'static,
+    {
+        self.set_with_id(T::ID, compressor)
+    }
+
+    /// Set compressor with id
+    pub fn set_with_id(&mut self, id: u8, compressor: impl Compressor + 'static) {
+        self.0[id as usize] = Some(Box::new(compressor));
+    }
+
+    pub fn is_set(&self, id: u8) -> bool {
+        self.0[id as usize].is_some()
+    }
+
+    pub fn get(&self, id: u8) -> Result<&Box<dyn Compressor + 'static>> {
+        self.0[id as usize].as_ref().ok_or_else(|| Status {
+            code: StatusCode::NotSupported,
+            err: format!("invalid compression id `{}`", id),
+        })
+    }
+}
+
+impl Default for CompressorList {
+    fn default() -> Self {
+        let mut list = Self::new();
+        list.set(compressor::NoneCompressor);
+        list.set(compressor::SnappyCompressor);
+        list
+    }
+}
+
 /// Returns Options that will cause a database to exist purely in-memory instead of being stored on
 /// disk. This is useful for testing or ephemeral databases.
 pub fn in_memory() -> Options {
--- a/src/table_block.rs	Sat Jul 15 22:06:55 2023 +0200
+++ b/src/table_block.rs	Sat Jul 15 22:09:44 2023 +0200
@@ -5,7 +5,7 @@
 use crate::filter;
 use crate::filter_block::FilterBlockReader;
 use crate::log::unmask_crc;
-use crate::options::{self, CompressionType, Options};
+use crate::options::Options;
 use crate::table_builder;
 
 use crc::crc32::{self, Hasher32};
@@ -69,18 +69,12 @@
             ),
         );
     }
+    let compressor_list = opt.compressor_list.clone();
 
-    if let Some(ctype) = options::int_to_compressiontype(compress[0] as u32) {
-        match ctype {
-            CompressionType::CompressionNone => Ok(Block::new(opt, buf)),
-            CompressionType::CompressionSnappy => {
-                let decoded = snap::raw::Decoder::new().decompress_vec(buf.as_slice())?;
-                Ok(Block::new(opt, decoded))
-            }
-        }
-    } else {
-        err(StatusCode::InvalidData, "invalid compression type")
-    }
+    Ok(Block::new(
+        opt,
+        compressor_list.get(compress[0])?.decode(buf)?,
+    ))
 }
 
 /// Verify checksum of block
--- a/src/table_builder.rs	Sat Jul 15 22:06:55 2023 +0200
+++ b/src/table_builder.rs	Sat Jul 15 22:09:44 2023 +0200
@@ -2,12 +2,13 @@
 use crate::block_builder::BlockBuilder;
 use crate::blockhandle::BlockHandle;
 use crate::cmp::InternalKeyCmp;
+use crate::compressor::{self, Compressor, CompressorId};
 use crate::error::Result;
 use crate::filter::{InternalFilterPolicy, NoFilterPolicy};
 use crate::filter_block::FilterBlockBuilder;
 use crate::key_types::InternalKey;
 use crate::log::mask_crc;
-use crate::options::{CompressionType, Options};
+use crate::options::Options;
 
 use std::cmp::Ordering;
 use std::io::Write;
@@ -178,8 +179,10 @@
         self.prev_block_last_key = Vec::from(block.last_key());
         let contents = block.finish();
 
-        let ctype = self.opt.compression_type;
-        let handle = self.write_block(contents, ctype)?;
+        let compressor_list = self.opt.compressor_list.clone();
+        let compressor = compressor_list.get(self.opt.compressor)?;
+
+        let handle = self.write_block(contents, (self.opt.compressor, compressor))?;
 
         let mut handle_enc = [0 as u8; 16];
         let enc_len = handle.encode_to(&mut handle_enc);
@@ -198,19 +201,21 @@
     }
 
     /// Calculates the checksum, writes the block to disk and updates the offset.
-    fn write_block(&mut self, block: BlockContents, ctype: CompressionType) -> Result<BlockHandle> {
-        let mut data = block;
-        if ctype == CompressionType::CompressionSnappy {
-            data = snap::raw::Encoder::new().compress_vec(&data)?;
-        }
+    fn write_block(
+        &mut self,
+        block: BlockContents,
+        compressor_id_pair: (u8, &Box<dyn Compressor>),
+    ) -> Result<BlockHandle> {
+        let (ctype, compressor) = compressor_id_pair;
+        let data = compressor.encode(block)?;
 
         let mut digest = crc32::Digest::new(crc32::CASTAGNOLI);
 
         digest.write(&data);
-        digest.write(&[ctype as u8; TABLE_BLOCK_COMPRESS_LEN]);
+        digest.write(&[ctype; TABLE_BLOCK_COMPRESS_LEN]);
 
         self.dst.write(&data)?;
-        self.dst.write(&[ctype as u8; TABLE_BLOCK_COMPRESS_LEN])?;
+        self.dst.write(&[ctype; TABLE_BLOCK_COMPRESS_LEN])?;
         self.dst.write_fixedint(mask_crc(digest.sum32()))?;
 
         let handle = BlockHandle::new(self.offset, data.len());
@@ -221,7 +226,12 @@
 
     pub fn finish(mut self) -> Result<usize> {
         assert!(self.data_block.is_some());
-        let ctype = self.opt.compression_type;
+
+        let compressor_list = self.opt.compressor_list.clone();
+        let compressor_id_pair = (
+            self.opt.compressor,
+            compressor_list.get(self.opt.compressor)?,
+        );
 
         // If there's a pending data block, write it
         if self.data_block.as_ref().unwrap().entries() > 0 {
@@ -241,7 +251,13 @@
             let fblock = self.filter_block.take().unwrap();
             let filter_key = format!("filter.{}", fblock.filter_name());
             let fblock_data = fblock.finish();
-            let fblock_handle = self.write_block(fblock_data, CompressionType::CompressionNone)?;
+            let fblock_handle = self.write_block(
+                fblock_data,
+                (
+                    compressor::NoneCompressor::ID,
+                    &(Box::new(compressor::NoneCompressor) as Box<dyn Compressor>),
+                ),
+            )?;
 
             let mut handle_enc = [0 as u8; 16];
             let enc_len = fblock_handle.encode_to(&mut handle_enc);
@@ -251,11 +267,11 @@
 
         // write metaindex block
         let meta_ix = meta_ix_block.finish();
-        let meta_ix_handle = self.write_block(meta_ix, ctype)?;
+        let meta_ix_handle = self.write_block(meta_ix, compressor_id_pair)?;
 
         // write index block
         let index_cont = self.index_block.take().unwrap().finish();
-        let ix_handle = self.write_block(index_cont, ctype)?;
+        let ix_handle = self.write_block(index_cont, compressor_id_pair)?;
 
         // write footer.
         let footer = Footer::new(meta_ix_handle, ix_handle);
@@ -292,7 +308,7 @@
         let mut d = Vec::with_capacity(512);
         let mut opt = options::for_test();
         opt.block_restart_interval = 3;
-        opt.compression_type = CompressionType::CompressionSnappy;
+        opt.compressor = compressor::SnappyCompressor::ID;
         let mut b = TableBuilder::new_raw(opt, &mut d);
 
         let data = vec![
--- a/src/table_reader.rs	Sat Jul 15 22:06:55 2023 +0200
+++ b/src/table_reader.rs	Sat Jul 15 22:09:44 2023 +0200
@@ -374,12 +374,13 @@
 
 #[cfg(test)]
 mod tests {
+    use crate::compressor::CompressorId;
     use crate::filter::BloomPolicy;
     use crate::key_types::LookupKey;
-    use crate::options::{self, CompressionType};
     use crate::table_builder::TableBuilder;
     use crate::test_util::{test_iterator_properties, LdbIteratorIter};
     use crate::types::{current_key_val, LdbIterator};
+    use crate::{compressor, options};
 
     use super::*;
 
@@ -405,7 +406,7 @@
         let mut opt = options::for_test();
         opt.block_restart_interval = 2;
         opt.block_size = 32;
-        opt.compression_type = CompressionType::CompressionSnappy;
+        opt.compressor = compressor::SnappyCompressor::ID;
 
         {
             // Uses the standard comparator in opt.